Source code for dodal.devices.tetramm

import asyncio

from bluesky.protocols import Hints
from ophyd_async.core import (
    DatasetDescriber,
    DetectorController,
    DetectorTrigger,
    Device,
    PathProvider,
    StandardDetector,
    StrictEnum,
    TriggerInfo,
    set_and_wait_for_value,
    soft_signal_r_and_setter,
)
from ophyd_async.epics.adcore import (
    ADHDFWriter,
    NDFileHDFIO,
    NDPluginBaseIO,
    stop_busy_record,
)
from ophyd_async.epics.core import (
    epics_signal_r,
    epics_signal_rw,
    epics_signal_rw_rbv,
)


[docs] class TetrammRange(StrictEnum): UA = "+- 120 uA" NA = "+- 120 nA"
[docs] class TetrammTrigger(StrictEnum): FREE_RUN = "Free run" EXT_TRIGGER = "Ext. trig." EXT_BULB = "Ext. bulb" EXT_GATE = "Ext. gate"
[docs] class TetrammChannels(StrictEnum): ONE = "1" TWO = "2" FOUR = "4"
[docs] class TetrammResolution(StrictEnum): SIXTEEN_BITS = "16 bits" TWENTY_FOUR_BITS = "24 bits"
[docs] class TetrammGeometry(StrictEnum): DIAMOND = "Diamond" SQUARE = "Square"
[docs] class TetrammDriver(Device): def __init__( self, prefix: str, name: str = "", ): self._prefix = prefix self.range = epics_signal_rw_rbv(TetrammRange, prefix + "Range") self.sample_time = epics_signal_r(float, prefix + "SampleTime_RBV") self.values_per_reading = epics_signal_rw_rbv(int, prefix + "ValuesPerRead") self.averaging_time = epics_signal_rw_rbv(float, prefix + "AveragingTime") self.to_average = epics_signal_r(int, prefix + "NumAverage_RBV") self.averaged = epics_signal_r(int, prefix + "NumAveraged_RBV") self.acquire = epics_signal_rw_rbv(bool, prefix + "Acquire") # this PV is special, for some reason it doesn't have a _RBV suffix... self.overflows = epics_signal_r(int, prefix + "RingOverflows") self.num_channels = epics_signal_rw_rbv(TetrammChannels, prefix + "NumChannels") self.resolution = epics_signal_rw_rbv(TetrammResolution, prefix + "Resolution") self.trigger_mode = epics_signal_rw_rbv(TetrammTrigger, prefix + "TriggerMode") self.bias = epics_signal_rw_rbv(bool, prefix + "BiasState") self.bias_volts = epics_signal_rw_rbv(float, prefix + "BiasVoltage") self.geometry = epics_signal_rw_rbv(TetrammGeometry, prefix + "Geometry") self.nd_attributes_file = epics_signal_rw(str, prefix + "NDAttributesFile") super().__init__(name=name)
[docs] class TetrammController(DetectorController): """Controller for a TetrAMM current monitor Attributes: base_sample_rate (int): Fixed in hardware Args: drv (TetrammDriver): A configured driver for the device maximum_readings_per_frame (int): Maximum number of readings per frame: actual readings may be lower if higher frame rate is required minimum_values_per_reading (int): Lower bound on the values that will be averaged to create a single reading readings_per_frame (int): Actual number of readings per frame. """ base_sample_rate: int = 100_000 def __init__( self, drv: TetrammDriver, minimum_values_per_reading: int = 5, maximum_readings_per_frame: int = 1_000, readings_per_frame: int = 1_000, ): # TODO: Are any of these also fixed by hardware constraints? self._drv = drv self.maximum_readings_per_frame = maximum_readings_per_frame self.minimum_values_per_reading = minimum_values_per_reading self.readings_per_frame = readings_per_frame def get_deadtime(self, exposure: float | None) -> float: # 2 internal clock cycles. Best effort approximation return 2 / self.base_sample_rate async def prepare(self, trigger_info: TriggerInfo): self._validate_trigger(trigger_info.trigger) assert trigger_info.livetime is not None # trigger mode must be set first and on its own! await self._drv.trigger_mode.set(TetrammTrigger.EXT_TRIGGER) await asyncio.gather( self._drv.averaging_time.set(trigger_info.livetime), self.set_exposure(trigger_info.livetime), ) async def arm(self): self._arm_status = await set_and_wait_for_value(self._drv.acquire, True) async def wait_for_idle(self): if self._arm_status: await self._arm_status def _validate_trigger(self, trigger: DetectorTrigger) -> None: supported_trigger_types = { DetectorTrigger.EDGE_TRIGGER, DetectorTrigger.CONSTANT_GATE, } if trigger not in supported_trigger_types: raise ValueError( f"{self.__class__.__name__} only supports the following trigger " f"types: {supported_trigger_types} but was asked to " f"use {trigger}" ) async def disarm(self): await stop_busy_record(self._drv.acquire, False, timeout=1)
[docs] async def set_exposure(self, exposure: float): """Tries to set the exposure time of a single frame. As during the exposure time, the device must collect an integer number of readings, in the case where the exposure is not a multiple of the base sample rate, it will be lowered to the prior multiple ot ensure triggers are not missed. Args: exposure (float): The time for a single frame in seconds Raises: ValueError: If exposure is too low to collect the required number of readings per frame. """ # Set up the number of readings across the exposure period to scale with # the exposure time self._set_minimum_exposure(exposure) values_per_reading: int = int( exposure * self.base_sample_rate / self.readings_per_frame ) await self._drv.values_per_reading.set(values_per_reading)
@property def max_frame_rate(self) -> float: """Max frame rate in Hz for the current configuration""" return 1 / self.minimum_exposure @max_frame_rate.setter def max_frame_rate(self, mfr: float): self._set_minimum_exposure(1 / mfr) @property def minimum_exposure(self) -> float: """Smallest amount of time needed to take a frame""" time_per_reading = self.minimum_values_per_reading / self.base_sample_rate return self.readings_per_frame * time_per_reading def _set_minimum_exposure(self, exposure: float): time_per_reading = self.minimum_values_per_reading / self.base_sample_rate if exposure < time_per_reading: raise ValueError( "Tetramm exposure time must be at least " f"{time_per_reading}s, asked to set it to {exposure}s" ) self.readings_per_frame = int( min(self.maximum_readings_per_frame, exposure / time_per_reading) )
[docs] class TetrammDatasetDescriber(DatasetDescriber): max_channels = 11 def __init__(self, controller: TetrammController) -> None: self.controller = controller async def np_datatype(self) -> str: return "<f8" # IEEE 754 double precision floating point async def shape(self) -> tuple[int, int]: return (self.max_channels, self.controller.readings_per_frame)
# TODO: Support MeanValue signals https://github.com/DiamondLightSource/dodal/issues/337
[docs] class TetrammDetector(StandardDetector): def __init__( self, prefix: str, path_provider: PathProvider, name: str = "", type: str | None = None, plugins: dict[str, NDPluginBaseIO] | None = None, ) -> None: self.drv = TetrammDriver(prefix + "DRV:") self.hdf = NDFileHDFIO(prefix + "HDF5:") controller = TetrammController(self.drv) config_signals = [ self.drv.values_per_reading, self.drv.averaging_time, self.drv.sample_time, ] if type: self.type, _ = soft_signal_r_and_setter(str, type) config_signals.append(self.type) else: self.type = None super().__init__( controller, ADHDFWriter( self.hdf, path_provider, lambda: self.name, TetrammDatasetDescriber(controller), plugins=plugins, ), config_signals, name, ) @property def hints(self) -> Hints: return {"fields": [self.name]}