Source code for malcolm.modules.ADAndor.parts.andordriverpart

from typing import Any, Tuple

from annotypes import Anno, add_call_types
from scanpointgenerator import CompoundGenerator

from malcolm.core import NumberMeta, PartRegistrar
from malcolm.modules import ADCore, builtin, scanning
from malcolm.modules.ADCore.parts.detectordriverpart import AMinAcquirePeriod
from malcolm.modules.scanning.util import AFramesPerStep, exposure_attribute

# Pull re-used annotypes into our namespace in case we are subclassed
APartName = builtin.parts.APartName
AMri = builtin.parts.AMri

with Anno("Directory to write data to"):
    AFileDir = str


[docs]class AndorDriverPart(ADCore.parts.DetectorDriverPart): def __init__( self, name: APartName, mri: AMri, min_acquire_period: AMinAcquirePeriod = 0.0 ) -> None: super().__init__( name, mri, soft_trigger_modes=["Internal", "Software"], min_acquire_period=min_acquire_period, ) self.exposure = exposure_attribute(min_exposure=0.0) def setup(self, registrar: PartRegistrar) -> None: super().setup(registrar) # Validate hook for exposure time registrar.hook(scanning.hooks.ValidateHook, self.on_validate) # Attributes registrar.add_attribute_model("exposure", self.exposure) # Tell the controller to pass "exposure" and "frames_per_step" to configure info = scanning.infos.ConfigureParamsInfo( metas=dict( exposure=NumberMeta.from_annotype( scanning.hooks.AExposure, writeable=True ), frames_per_step=NumberMeta.from_annotype( AFramesPerStep, writeable=False ), ), required=[], defaults=dict(exposure=0.0, frames_per_step=1), ) registrar.report(info)
[docs] @add_call_types def on_validate( self, context: scanning.hooks.AContext, generator: scanning.hooks.AGenerator, exposure: scanning.hooks.AExposure = 0.0, frames_per_step: AFramesPerStep = 1, ) -> scanning.hooks.UParameterTweakInfos: # Get the duration per frame duration = generator.duration assert ( duration >= 0 ), f"Generator duration of {duration} must be >= 0 to signify fixed exposure" # As the Andor runnable block does not have an ExposureDeadTimePart, we handle # the case where we have been given an exposure time here if exposure > 0: # Grab the current value of the readout time and hope that the parameters # will not change before we actually run configure... child = context.block_view(self.mri) driver_readout_time = child.andorReadoutTime.value # Add the exposure time and multiply up to get the total generator duration duration_per_frame = exposure + driver_readout_time # Check if we need to guess the duration if duration == 0.0: serialized = generator.to_dict() new_generator = CompoundGenerator.from_dict(serialized) # Multiply the duration per frame up duration_per_point = duration_per_frame * frames_per_step new_generator.duration = duration_per_frame * frames_per_step self.log.debug( f"{self.name}: tweaking generator duration from " f"{generator.duration} to {duration_per_point}" ) return scanning.hooks.ParameterTweakInfo("generator", new_generator) # Otherwise we just want to check if we can achieve the exposure expected else: assert duration_per_frame <= duration, ( f"{self.name}: cannot achieve exposure of {exposure} with per frame" f" duration of {duration}" ) return None # Otherwise just let the DetectorDriverPart validate for us else: return super().on_validate( context, generator, frames_per_step=frames_per_step )
def setup_detector( self, context: scanning.hooks.AContext, completed_steps: scanning.hooks.ACompletedSteps, steps_to_do: scanning.hooks.AStepsToDo, num_images: int, duration: float, part_info: scanning.hooks.APartInfo, initial_configure: bool = True, **kwargs: Any, ): # Calculate the readout time child = context.block_view(self.mri) if child.andorFrameTransferMode.value: # Set exposure to zero and use accumulation period for readout time exposure = 0.0 child.exposure.put_value(exposure) child.acquirePeriod.put_value(exposure) readout_time = child.andorAccumulatePeriod.value # With frame transfer mode enabled, the exposure is the time between # triggers. The epics 'acquireTime' (exposure) actually becomes the # User Defined delay before acquisition start after the trigger. The # duration floor becomes the readout time assert duration > readout_time, ( "The duration: %s has to be longer than the Andor 2 readout " "time: %s." % (duration, readout_time) ) period = readout_time else: # Behaves like a "normal" detector child.exposure.put_value(duration) child.acquirePeriod.put_value(duration) # Readout time can be approximated from difference in exposure time # and acquire period readout_time = child.acquirePeriod.value - child.exposure.value # Calculate the adjusted exposure time (exposure, period) = self.get_adjusted_exposure_time_and_acquire_period( duration, readout_time, kwargs.get("exposure", 0) ) # The real exposure self.exposure.set_value(exposure) kwargs["exposure"] = exposure super().setup_detector( context, completed_steps, steps_to_do, num_images, duration, part_info, initial_configure=initial_configure, **kwargs, ) child.acquirePeriod.put_value(period) def get_adjusted_exposure_time_and_acquire_period( self, duration: float, readout_time: float, exposure_time: float ) -> Tuple[float, float]: # It seems that the difference between acquirePeriod and exposure # doesn't tell the whole story, we seem to need an additional bit # of readout (or something) time on top readout_time += self.get_additional_readout_factor(duration) # Otherwise we can behave like a "normal" detector info = scanning.infos.ExposureDeadtimeInfo( readout_time, frequency_accuracy=50, min_exposure=0.0 ) exposure_time = info.calculate_exposure(duration, exposure_time) acquire_period = exposure_time + readout_time return exposure_time, acquire_period @staticmethod def get_additional_readout_factor(duration: float) -> float: return duration * 0.004 + 0.001