Source code for dodal.devices.tetramm

import asyncio
from collections.abc import Sequence
from enum import Enum

from bluesky.protocols import Hints
from ophyd_async.core import (
    AsyncStatus,
    DetectorControl,
    DetectorTrigger,
    Device,
    DirectoryProvider,
    ShapeProvider,
    StandardDetector,
    set_and_wait_for_value,
    soft_signal_r_and_setter,
)
from ophyd_async.epics.areadetector.utils import stop_busy_record
from ophyd_async.epics.areadetector.writers import HDFWriter, NDFileHDF
from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw
from ophyd_async.epics.signal.signal import epics_signal_rw_rbv


[docs] class TetrammRange(str, Enum): uA = "+- 120 uA" nA = "+- 120 nA"
[docs] class TetrammTrigger(str, Enum): FreeRun = "Free run" ExtTrigger = "Ext. trig." ExtBulb = "Ext. bulb" ExtGate = "Ext. gate"
[docs] class TetrammChannels(str, Enum): One = "1" Two = "2" Four = "4"
[docs] class TetrammResolution(str, Enum): SixteenBits = "16 bits" TwentyFourBits = "24 bits"
[docs] class TetrammGeometry(str, Enum): Diamond = "Diamond" Square = "Square"
[docs] class TetrammDriver(Device): def __init__( self, prefix: str, name: str = "", ): self._prefix = prefix self.range = epics_signal_rw_rbv(TetrammRange, prefix + "Range") self.sample_time = epics_signal_r(float, prefix + "SampleTime_RBV") self.values_per_reading = epics_signal_rw_rbv(int, prefix + "ValuesPerRead") self.averaging_time = epics_signal_rw_rbv(float, prefix + "AveragingTime") self.to_average = epics_signal_r(int, prefix + "NumAverage_RBV") self.averaged = epics_signal_r(int, prefix + "NumAveraged_RBV") self.acquire = epics_signal_rw_rbv(bool, prefix + "Acquire") # this PV is special, for some reason it doesn't have a _RBV suffix... self.overflows = epics_signal_r(int, prefix + "RingOverflows") self.num_channels = epics_signal_rw_rbv(TetrammChannels, prefix + "NumChannels") self.resolution = epics_signal_rw_rbv(TetrammResolution, prefix + "Resolution") self.trigger_mode = epics_signal_rw_rbv(TetrammTrigger, prefix + "TriggerMode") self.bias = epics_signal_rw_rbv(bool, prefix + "BiasState") self.bias_volts = epics_signal_rw_rbv(float, prefix + "BiasVoltage") self.geometry = epics_signal_rw_rbv(TetrammGeometry, prefix + "Geometry") self.nd_attributes_file = epics_signal_rw(str, prefix + "NDAttributesFile") super().__init__(name=name)
[docs] class TetrammController(DetectorControl): """Controller for a TetrAMM current monitor Attributes: base_sample_rate (int): Fixed in hardware Args: drv (TetrammDriver): A configured driver for the device maximum_readings_per_frame (int): Maximum number of readings per frame: actual readings may be lower if higher frame rate is required minimum_values_per_reading (int): Lower bound on the values that will be averaged to create a single reading readings_per_frame (int): Actual number of readings per frame. """ base_sample_rate: int = 100_000 def __init__( self, drv: TetrammDriver, minimum_values_per_reading: int = 5, maximum_readings_per_frame: int = 1_000, readings_per_frame: int = 1_000, ): # TODO: Are any of these also fixed by hardware constraints? self._drv = drv self.maximum_readings_per_frame = maximum_readings_per_frame self.minimum_values_per_reading = minimum_values_per_reading self.readings_per_frame = readings_per_frame def get_deadtime(self, exposure: float) -> float: # 2 internal clock cycles. Best effort approximation return 2 / self.base_sample_rate async def arm( self, num: int, trigger: DetectorTrigger = DetectorTrigger.edge_trigger, exposure: float | None = None, ) -> AsyncStatus: if exposure is None: raise ValueError( "Tetramm does not support arm without exposure time. " "Is this a software scan? Tetramm only supports hardware scans." ) self._validate_trigger(trigger) # trigger mode must be set first and on its own! await self._drv.trigger_mode.set(TetrammTrigger.ExtTrigger) await asyncio.gather( self._drv.averaging_time.set(exposure), self.set_exposure(exposure) ) status = await set_and_wait_for_value(self._drv.acquire, True) return status def _validate_trigger(self, trigger: DetectorTrigger) -> None: supported_trigger_types = { DetectorTrigger.edge_trigger, DetectorTrigger.constant_gate, } if trigger not in supported_trigger_types: raise ValueError( f"{self.__class__.__name__} only supports the following trigger " f"types: {supported_trigger_types} but was asked to " f"use {trigger}" ) async def disarm(self): await stop_busy_record(self._drv.acquire, False, timeout=1)
[docs] async def set_exposure(self, exposure: float): """Tries to set the exposure time of a single frame. As during the exposure time, the device must collect an integer number of readings, in the case where the exposure is not a multiple of the base sample rate, it will be lowered to the prior multiple ot ensure triggers are not missed. Args: exposure (float): The time for a single frame in seconds Raises: ValueError: If exposure is too low to collect the required number of readings per frame. """ # Set up the number of readings across the exposure period to scale with # the exposure time self._set_minimum_exposure(exposure) values_per_reading: int = int( exposure * self.base_sample_rate / self.readings_per_frame ) await self._drv.values_per_reading.set(values_per_reading)
@property def max_frame_rate(self) -> float: """Max frame rate in Hz for the current configuration""" return 1 / self.minimum_exposure @max_frame_rate.setter def max_frame_rate(self, mfr: float): self._set_minimum_exposure(1 / mfr) @property def minimum_exposure(self) -> float: """Smallest amount of time needed to take a frame""" time_per_reading = self.minimum_values_per_reading / self.base_sample_rate return self.readings_per_frame * time_per_reading def _set_minimum_exposure(self, exposure: float): time_per_reading = self.minimum_values_per_reading / self.base_sample_rate if exposure < time_per_reading: raise ValueError( "Tetramm exposure time must be at least " f"{time_per_reading}s, asked to set it to {exposure}s" ) self.readings_per_frame = int( min(self.maximum_readings_per_frame, exposure / time_per_reading) )
[docs] class TetrammShapeProvider(ShapeProvider): max_channels = 11 def __init__(self, controller: TetrammController) -> None: self.controller = controller async def __call__(self) -> Sequence[int]: return [self.max_channels, self.controller.readings_per_frame]
# TODO: Support MeanValue signals https://github.com/DiamondLightSource/dodal/issues/337
[docs] class TetrammDetector(StandardDetector): def __init__( self, prefix: str, directory_provider: DirectoryProvider, name: str, type: str | None = None, **scalar_sigs: str, ) -> None: self.drv = TetrammDriver(prefix + "DRV:") self.hdf = NDFileHDF(prefix + "HDF5:") controller = TetrammController(self.drv) config_signals = [ self.drv.values_per_reading, self.drv.averaging_time, self.drv.sample_time, ] if type: self.type, _ = soft_signal_r_and_setter(str, type) config_signals.append(self.type) else: self.type = None super().__init__( controller, HDFWriter( self.hdf, directory_provider, lambda: self.name, TetrammShapeProvider(controller), **scalar_sigs, ), config_signals, name, ) @property def hints(self) -> Hints: return {"fields": [self.name]}