Source code for dodal.common.visit

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional

from aiohttp import ClientSession
from ophyd_async.core import DirectoryInfo
from pydantic import BaseModel

from dodal.common.types import UpdatingDirectoryProvider
from dodal.log import LOGGER

"""
Functionality required for/from the API of a DirectoryService which exposes the specifics of the Diamond filesystem.
"""


[docs] class DataCollectionIdentifier(BaseModel): """ Equivalent to a `Scan Number` or `scan_id`, non-globally unique scan identifier. Should be always incrementing, unique per-visit, co-ordinated with any other scan engines. """ collectionNumber: int
[docs] class DirectoryServiceClientBase(ABC): """ Object responsible for I/O in determining collection number """
[docs] @abstractmethod async def create_new_collection(self) -> DataCollectionIdentifier: """Create new collection"""
[docs] @abstractmethod async def get_current_collection(self) -> DataCollectionIdentifier: """Get current collection"""
[docs] class DirectoryServiceClient(DirectoryServiceClientBase): """Client for the VisitService REST API Currently exposed by the GDA Server to co-ordinate unique filenames. While VisitService is embedded in GDA, url is likely to be `ixx-control:8088/api` """ _url: str def __init__(self, url: str) -> None: self._url = url async def create_new_collection(self) -> DataCollectionIdentifier: async with ClientSession() as session: async with session.post(f"{self._url}/numtracker") as response: response.raise_for_status() json = await response.json() new_collection = DataCollectionIdentifier.parse_obj(json) LOGGER.debug("New DataCollection: %s", new_collection) return new_collection async def get_current_collection(self) -> DataCollectionIdentifier: async with ClientSession() as session: async with session.get(f"{self._url}/numtracker") as response: response.raise_for_status() json = await response.json() current_collection = DataCollectionIdentifier.parse_obj(json) LOGGER.debug("Current DataCollection: %s", current_collection) return current_collection
[docs] class LocalDirectoryServiceClient(DirectoryServiceClientBase): """Local or dummy impl of VisitService client to co-ordinate unique filenames.""" _count: int def __init__(self) -> None: self._count = 0 async def create_new_collection(self) -> DataCollectionIdentifier: self._count += 1 LOGGER.debug("New DataCollection: %s", self._count) return DataCollectionIdentifier(collectionNumber=self._count) async def get_current_collection(self) -> DataCollectionIdentifier: LOGGER.debug("Current DataCollection: %s", self._count) return DataCollectionIdentifier(collectionNumber=self._count)
[docs] class StaticVisitDirectoryProvider(UpdatingDirectoryProvider): """ Static (single visit) implementation of DirectoryProvider whilst awaiting auth infrastructure to generate necessary information per-scan. Allows setting a singular visit into which all run files will be saved. update() queries a visit service to get the next DataCollectionIdentifier to increment the suffix of all file writers' next files. Requires that all detectors are running with a mutual view on the filesystem. Supports a single Visit which should be passed as a Path relative to the root of the Detector IOC mounting. i.e. to write to visit /dls/ixx/data/YYYY/cm12345-1, assuming all detectors are mounted with /data -> /dls/ixx/data, root=/data/YYYY/cm12345-1/ """ _beamline: str _root: Path _client: DirectoryServiceClientBase _current_collection: DirectoryInfo | None _session: ClientSession | None def __init__( self, beamline: str, root: Path, client: Optional[DirectoryServiceClientBase] = None, ): self._beamline = beamline self._client = client or DirectoryServiceClient(f"{beamline}-control:8088/api") self._root = root self._current_collection = None self._session = None
[docs] async def update(self) -> None: """ Creates a new data collection in the current visit. """ # https://github.com/DiamondLightSource/dodal/issues/452 # TODO: Allow selecting visit as part of a request # TODO: DAQ-4827: Pass AuthN information as part of request try: collection_id_info = await self._client.create_new_collection() self._current_collection = self._generate_directory_info(collection_id_info) except Exception: LOGGER.error( "Exception while updating data collection, preventing overwriting data by setting current_collection to None" ) self._current_collection = None raise
def _generate_directory_info( self, collection_id_info: DataCollectionIdentifier, ) -> DirectoryInfo: return DirectoryInfo( # See DocString of DirectoryInfo. At DLS, root = visit directory, resource_dir is relative to it. root=self._root, # https://github.com/DiamondLightSource/dodal/issues/452 # Currently all h5 files written to visit/ directory, as no guarantee that visit/dataCollection/ directory will have been produced. If it is as part of #452, append the resource_dir resource_dir=Path("."), # Diamond standard file naming prefix=f"{self._beamline}-{collection_id_info.collectionNumber}-", ) def __call__(self) -> DirectoryInfo: if self._current_collection is not None: return self._current_collection else: raise ValueError( "No current collection, update() needs to be called at least once" )