from typing import Any, Tuple
from annotypes import Anno
from malcolm.core import Context, NumberMeta, PartRegistrar
from malcolm.modules import ADCore, builtin, scanning
# Pull re-used annotypes into our namespace in case we are subclassed
APartName = builtin.parts.APartName
AMri = builtin.parts.AMri
with Anno("Directory to write data to"):
AFileDir = str
[docs]class AndorDriverPart(ADCore.parts.DetectorDriverPart):
def __init__(self, name: APartName, mri: AMri) -> None:
super().__init__(name, mri, soft_trigger_modes=["Internal", "Software"])
self.exposure = scanning.util.exposure_attribute(min_exposure=0.0)
[docs] def setup(self, registrar: PartRegistrar) -> None:
super().setup(registrar)
# Attributes
registrar.add_attribute_model("exposure", self.exposure)
# Tell the controller to pass "exposure" to configure
info = scanning.infos.ConfigureParamsInfo(
metas=dict(
exposure=NumberMeta.from_annotype(
scanning.hooks.AExposure, writeable=True
)
),
required=[],
defaults=dict(exposure=0.0),
)
registrar.report(info)
def setup_detector(
self,
context: Context,
completed_steps: scanning.hooks.ACompletedSteps,
steps_to_do: scanning.hooks.AStepsToDo,
num_images: int,
duration: float,
part_info: scanning.hooks.APartInfo,
initial_configure: bool = True,
**kwargs: Any,
):
# Calculate the readout time
child = context.block_view(self.mri)
if child.andorFrameTransferMode.value:
# Set exposure to zero and use accumulation period for readout time
exposure = 0.0
child.exposure.put_value(exposure)
child.acquirePeriod.put_value(exposure)
readout_time = child.andorAccumulatePeriod.value
# With frame transfer mode enabled, the exposure is the time between
# triggers. The epics 'acquireTime' (exposure) actually becomes the
# User Defined delay before acquisition start after the trigger. The
# duration floor becomes the readout time
assert duration > readout_time, (
"The duration: %s has to be longer than the Andor 2 readout "
"time: %s." % (duration, readout_time)
)
period = readout_time
else:
# Behaves like a "normal" detector
child.exposure.put_value(duration)
child.acquirePeriod.put_value(duration)
# Readout time can be approximated from difference in exposure time
# and acquire period
readout_time = child.acquirePeriod.value - child.exposure.value
# Calculate the adjusted exposure time
(exposure, period) = self.get_adjusted_exposure_time_and_acquire_period(
duration, readout_time, kwargs.get("exposure", 0)
)
# The real exposure
self.exposure.set_value(exposure)
kwargs["exposure"] = exposure
super().setup_detector(
context,
completed_steps,
steps_to_do,
num_images,
duration,
part_info,
initial_configure=initial_configure,
**kwargs,
)
child.acquirePeriod.put_value(period)
def get_adjusted_exposure_time_and_acquire_period(
self, duration: float, readout_time: float, exposure_time: float
) -> Tuple[float, float]:
# It seems that the difference between acquirePeriod and exposure
# doesn't tell the whole story, we seem to need an additional bit
# of readout (or something) time on top
readout_time += self.get_additional_readout_factor(duration)
# Otherwise we can behave like a "normal" detector
info = scanning.infos.ExposureDeadtimeInfo(
readout_time, frequency_accuracy=50, min_exposure=0.0
)
exposure_time = info.calculate_exposure(duration, exposure_time)
acquire_period = exposure_time + readout_time
return exposure_time, acquire_period
@staticmethod
def get_additional_readout_factor(duration: float) -> float:
return duration * 0.004 + 0.001