Source code for dodal.devices.areadetector.adutils

import time as ttime

from ophyd import Component as Cpt
from ophyd import DetectorBase, Device, EpicsSignal, EpicsSignalRO, Staged
from ophyd.areadetector import ADTriggerStatus, TriggerBase
from ophyd.areadetector.cam import AreaDetectorCam
from ophyd.areadetector.filestore_mixins import FileStoreHDF5, FileStoreIterativeWrite
from ophyd.areadetector.plugins import HDF5Plugin


[docs] class SingleTriggerV33(TriggerBase): _status_type = ADTriggerStatus def __init__(self, *args, image_name=None, **kwargs): super().__init__(*args, **kwargs) if image_name is None: # Ensure that this mixin is part of valid device with name assert isinstance(self, Device) image_name = "_".join([self.name, "image"]) self._image_name = image_name
[docs] def trigger(self): "Trigger one acquisition." if self._staged != Staged.yes: raise RuntimeError( "This detector is not ready to trigger." "Call the stage() method before triggering." ) self._status = self._status_type(self) def _acq_done(*args, **kwargs): # TODO sort out if anything useful in here self._status._finished() # noqa: SLF001 self._acquisition_signal.put(1, use_complete=True, callback=_acq_done) # Ensure that this mixin is part of valid Detector with generate_datum assert isinstance(self, DetectorBase) self.generate_datum(self._image_name, ttime.time()) return self._status
[docs] class SynchronisedAdDriverBase(AreaDetectorCam): """ Base Ophyd device to control an AreaDetector driver and syncrhonise it on other AreaDetector plugins, even non-blocking ones. """ adcore_version = Cpt(EpicsSignalRO, "ADCoreVersion_RBV", string=True, kind="config") driver_version = Cpt(EpicsSignalRO, "DriverVersion_RBV", string=True, kind="config") wait_for_plugins = Cpt(EpicsSignal, "WaitForPlugins", string=True, kind="config") def stage(self, *args, **kwargs): # Makes the detector allow non-blocking AD plugins but makes Ophyd use # the AcquireBusy PV to determine when an acquisition is complete self.ensure_nonblocking() return super().stage(*args, **kwargs) def ensure_nonblocking(self): self.stage_sigs["wait_for_plugins"] = "Yes" if self.parent is not None: for c in self.parent.component_names: cpt = getattr(self.parent, c) if cpt is self: continue if hasattr(cpt, "ensure_nonblocking"): cpt.ensure_nonblocking()
# ophyd code to be removed, only used for adim # https://github.com/DiamondLightSource/dodal/issues/404
[docs] class Hdf5Writer(HDF5Plugin, FileStoreHDF5, FileStoreIterativeWrite): # type: ignore """ """ pool_max_buffers = None file_number_sync = None file_number_write = None def get_frames_per_point(self): assert isinstance(self.parent, DetectorBase) return self.parent.cam.num_images.get()