Source code for dodal.common.visit

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Literal

from aiohttp import ClientSession
from ophyd_async.core import FilenameProvider, PathInfo
from pydantic import BaseModel

from dodal.common.types import UpdatingPathProvider
from dodal.log import LOGGER

"""
Functionality required for/from the API of a DirectoryService which exposes the specifics of the Diamond filesystem.
"""


[docs] class DataCollectionIdentifier(BaseModel): """ Equivalent to a `Scan Number` or `scan_id`, non-globally unique scan identifier. Should be always incrementing, unique per-visit, co-ordinated with any other scan engines. """ collectionNumber: int
[docs] class DirectoryServiceClient(ABC): """ Object responsible for I/O in determining collection number """
[docs] @abstractmethod async def create_new_collection(self) -> DataCollectionIdentifier: """Create new collection"""
[docs] @abstractmethod async def get_current_collection(self) -> DataCollectionIdentifier: """Get current collection"""
[docs] class DiamondFilenameProvider(FilenameProvider): def __init__(self, beamline: str, client: DirectoryServiceClient): self._beamline = beamline self._client = client self.collectionId: DataCollectionIdentifier | None = None def __call__(self, device_name: str | None = None): assert device_name, "Diamond filename requires device_name to be passed" assert self.collectionId is not None return f"{self._beamline}-{self.collectionId.collectionNumber}-{device_name}"
[docs] class RemoteDirectoryServiceClient(DirectoryServiceClient): """Client for the VisitService REST API Currently exposed by the GDA Server to co-ordinate unique filenames. While VisitService is embedded in GDA, url is likely to be `ixx-control:8088/api` """ def __init__(self, url: str) -> None: self._url = url async def create_new_collection(self) -> DataCollectionIdentifier: new_collection = await self._identifier_from_response("POST") LOGGER.debug("New DataCollection: %s", new_collection) return new_collection async def get_current_collection(self) -> DataCollectionIdentifier: current_collection = await self._identifier_from_response("GET") LOGGER.debug("Current DataCollection: %s", current_collection) return current_collection async def _identifier_from_response( self, method: Literal["GET", "POST"], ) -> DataCollectionIdentifier: async with ( ClientSession() as session, session.request(method, f"{self._url}/numtracker") as response, ): response.raise_for_status() json = await response.json() return DataCollectionIdentifier.model_validate_json(json)
[docs] class LocalDirectoryServiceClient(DirectoryServiceClient): """Local or dummy impl of VisitService client to co-ordinate unique filenames.""" def __init__(self) -> None: self._count = 0 async def create_new_collection(self) -> DataCollectionIdentifier: self._count += 1 LOGGER.debug("New DataCollection: %s", self._count) return DataCollectionIdentifier(collectionNumber=self._count) async def get_current_collection(self) -> DataCollectionIdentifier: LOGGER.debug("Current DataCollection: %s", self._count) return DataCollectionIdentifier(collectionNumber=self._count)
[docs] class StaticVisitPathProvider(UpdatingPathProvider): """ Static (single visit) implementation of PathProvider whilst awaiting auth infrastructure to generate necessary information per-scan. Allows setting a singular visit into which all run files will be saved. update() queries a visit service to get the next DataCollectionIdentifier to increment the suffix of all file writers' next files. Requires that all detectors are running with a mutual view on the filesystem. Supports a single Visit which should be passed as a Path relative to the root of the Detector IOC mounting. i.e. to write to visit /dls/ixx/data/YYYY/cm12345-1 """ def __init__( self, beamline: str, root: Path, client: DirectoryServiceClient | None = None, ): self._beamline = beamline self._client = client or RemoteDirectoryServiceClient( f"{beamline}-control:8088/api" ) self._filename_provider = DiamondFilenameProvider(self._beamline, self._client) self._root = root self.current_collection: PathInfo | None self._session: ClientSession | None
[docs] async def update(self, **kwargs) -> None: """ Creates a new data collection in the current visit. """ # https://github.com/DiamondLightSource/dodal/issues/452 # TODO: Allow selecting visit as part of a request # TODO: DAQ-4827: Pass AuthN information as part of request try: self._filename_provider.collectionId = ( await self._client.create_new_collection() ) except Exception: LOGGER.error( "Exception while updating data collection, preventing overwriting data by setting current_collection to None" ) self._collection_id_info = None raise
async def data_session(self) -> str: collection = await self._client.get_current_collection() return f"{self._beamline}-{collection.collectionNumber}" def __call__(self, device_name: str | None = None) -> PathInfo: assert device_name, "Must call PathProvider with device_name" return PathInfo( directory_path=self._root, filename=self._filename_provider(device_name) )