import os
from typing import Any, List, Optional, Sequence, Union
from xml.etree import cElementTree as ET
from annotypes import Anno, Array, add_call_types
from scanpointgenerator import CompoundGenerator
from malcolm.compat import et_to_string
from malcolm.core import (
DEFAULT_TIMEOUT,
APartName,
BadValueError,
Context,
Future,
Info,
NumberMeta,
PartRegistrar,
TableMeta,
config_tag,
)
from malcolm.modules import builtin, scanning
from malcolm.modules.scanning.infos import ExposureDeadtimeInfo
from malcolm.modules.scanning.parts import ADetectorFramesPerStep
from ..infos import FilePathTranslatorInfo, NDArrayDatasetInfo, NDAttributeDatasetInfo
from ..util import (
FRAME_TIMEOUT,
APartRunsOnWindows,
AVersionRequirement,
DataType,
ExtraAttributesTable,
SourceType,
check_driver_version,
make_xml_filename,
)
with Anno("Minimum acquire period the detector is capable of"):
AMinAcquirePeriod = float
with Anno("Is main detector dataset useful to publish in DatasetTable?"):
AMainDatasetUseful = bool
with Anno("List of trigger modes that do not use hardware triggers"):
ASoftTriggerModes = Union[Array[str]]
USoftTriggerModes = Union[ASoftTriggerModes, Sequence[str]]
with Anno("Name of image mode for taking multiple images"):
AMultipleImageMode = str
# Pull re-used annotypes into our namespace in case we are subclassed
AMri = builtin.parts.AMri
# We will set these attributes on the child block, so don't save them
[docs]@builtin.util.no_save(
"acquirePeriod",
"arrayCallbacks",
"arrayCounter",
"attributesFile",
"exposure",
"imageMode",
"numImages",
)
class DetectorDriverPart(builtin.parts.ChildPart):
def __init__(
self,
name: APartName,
mri: AMri,
soft_trigger_modes: USoftTriggerModes = None,
multiple_image_mode: AMultipleImageMode = "Multiple",
main_dataset_useful: AMainDatasetUseful = True,
runs_on_windows: APartRunsOnWindows = False,
required_version: AVersionRequirement = None,
min_acquire_period: AMinAcquirePeriod = 0.0,
) -> None:
super().__init__(name, mri)
self.required_version = required_version
self.min_acquire_period = min_acquire_period
self.soft_trigger_modes = soft_trigger_modes
self.multiple_image_mode = multiple_image_mode
self.is_hardware_triggered = True
self.main_dataset_useful = main_dataset_useful
self.attributes_filename = ""
self.extra_attributes = TableMeta.from_table(
ExtraAttributesTable,
"Extra attributes to be added to the dataset",
writeable=[
"name",
"pv",
"description",
"sourceId",
"sourceType",
"dataType",
"datasetType",
],
extra_tags=[config_tag()],
).create_attribute_model()
self.runs_on_windows = runs_on_windows
# How long to wait between frame updates before error
self.frame_timeout = 0.0
# When arrayCounter gets to here we are done
self.done_when_reaches = 0
# CompletedSteps = arrayCounter + self.uniqueid_offset
self.uniqueid_offset = 0
# A future that completes when detector start calls back
self.start_future: Optional[Future] = None
def setup_detector(
self,
context: Context,
completed_steps: scanning.hooks.ACompletedSteps,
steps_to_do: scanning.hooks.AStepsToDo,
num_images: int,
duration: float,
part_info: scanning.hooks.APartInfo,
initial_configure: bool = True,
**kwargs: Any,
) -> None:
child = context.block_view(self.mri)
if initial_configure:
# This is an initial configure, so reset arrayCounter to 0
array_counter = 0
self.done_when_reaches = steps_to_do
else:
# This is rewinding or setting up for another batch,
# skip to a uniqueID that has not been produced yet
array_counter = self.done_when_reaches
self.done_when_reaches += steps_to_do
self.uniqueid_offset = completed_steps - array_counter
for k, v in dict(
arrayCounter=array_counter,
imageMode=self.multiple_image_mode,
numImages=num_images,
arrayCallbacks=True,
).items():
if k not in kwargs and k in child:
kwargs[k] = v
child.put_attribute_values(kwargs)
# Might need to reset acquirePeriod as it's sometimes wrong
# in some detectors
try:
info: ExposureDeadtimeInfo = ExposureDeadtimeInfo.filter_single_value(
part_info
)
except BadValueError:
# This is ok, no exposure info
pass
else:
exposure = kwargs.get("exposure", info.calculate_exposure(duration))
child.acquirePeriod.put_value(exposure + info.readout_time)
def arm_detector(self, context: Context) -> None:
child = context.block_view(self.mri)
self.start_future = child.start_async()
child.when_value_matches("acquiring", True, timeout=DEFAULT_TIMEOUT)
def wait_for_detector(
self,
context: Context,
registrar: PartRegistrar,
event_timeout: Optional[float] = None,
) -> None:
child = context.block_view(self.mri)
child.arrayCounterReadback.subscribe_value(
self.update_completed_steps, registrar
)
# Wait for the array counter to reach the desired value. If any one frame takes
# more than event_timeout to appear, consider scan dead
child.when_value_matches(
"arrayCounterReadback",
self.done_when_reaches,
event_timeout=event_timeout,
)
def abort_detector(self, context: Context) -> None:
child = context.block_view(self.mri)
child.stop()
# Stop is a put to a busy record which returns immediately
# The detector might take a while to actually stop so use the
# acquiring pv (which is the same asyn parameter as the busy record
# that stop() pokes) to check that it has finished
child.when_value_matches("acquiring", False, timeout=DEFAULT_TIMEOUT)
def update_completed_steps(self, value: int, registrar: PartRegistrar) -> None:
completed_steps = value + self.uniqueid_offset
registrar.report(scanning.infos.RunProgressInfo(completed_steps))
def build_attribute_xml(self):
root_el = ET.Element("Attributes")
for index, s_type in enumerate(self.extra_attributes.value.sourceType):
if s_type == SourceType.PV:
dbr_type = self.extra_attributes.value.dataType[index]
if dbr_type == DataType.INT:
dbr_type = "DBR_LONG"
elif dbr_type == DataType.DOUBLE:
dbr_type = "DBR_DOUBLE"
elif dbr_type == DataType.STRING:
dbr_type = "DBR_STRING"
else:
dbr_type = dbr_type.value
ET.SubElement(
root_el,
"Attribute",
name=self.extra_attributes.value.name[index],
type="EPICS_PV",
dbrtype=dbr_type,
description=self.extra_attributes.value.description[index],
source=self.extra_attributes.value.sourceId[index],
)
elif s_type == SourceType.PARAM:
ET.SubElement(
root_el,
"Attribute",
name=self.extra_attributes.value.name[index],
type="PARAM",
datatype=self.extra_attributes.value.dataType[index].value,
description=self.extra_attributes.value.description[index],
source=self.extra_attributes.value.sourceId[index],
)
return et_to_string(root_el)
def set_extra_attributes(self, value):
for row, _ in enumerate(value.name):
if value.sourceType[row] == SourceType.PARAM:
if value.dataType[row] == DataType.DBRNATIVE:
raise ValueError(
"data type DBR_NATIVE invalid for asyn param attribute"
)
self.extra_attributes.set_value(value)
def setup(self, registrar: PartRegistrar) -> None:
super().setup(registrar)
# Hooks
registrar.hook(scanning.hooks.ReportStatusHook, self.on_report_status)
registrar.hook(scanning.hooks.ValidateHook, self.on_validate)
registrar.hook(
scanning.hooks.ConfigureHook,
self.on_configure,
self.configure_args_with_exposure,
)
registrar.hook(
scanning.hooks.SeekHook,
self.on_seek,
self.configure_args_with_exposure,
)
registrar.hook(scanning.hooks.RunHook, self.on_run)
registrar.hook(scanning.hooks.PostRunArmedHook, self.on_post_run_armed)
registrar.hook(
(scanning.hooks.PauseHook, scanning.hooks.AbortHook), self.on_abort
)
# Attributes
registrar.add_attribute_model(
"attributesToCapture", self.extra_attributes, self.set_extra_attributes
)
# Tell the controller to pass "exposure" and "frames_per_step" to configure
info = scanning.infos.ConfigureParamsInfo(
metas=dict(
frames_per_step=NumberMeta.from_annotype(
ADetectorFramesPerStep, writeable=False
),
),
required=[],
defaults=dict(frames_per_step=1),
)
registrar.report(info)
[docs] @add_call_types
def on_validate(
self,
context: scanning.hooks.AContext,
generator: scanning.hooks.AGenerator,
frames_per_step: ADetectorFramesPerStep = 1,
) -> scanning.hooks.UParameterTweakInfos:
# Check if we have a minimum acquire period
if self.required_version is not None:
child = context.block_view(self.mri)
check_driver_version(child.driverVersion.value, self.required_version)
if self.min_acquire_period > 0.0:
duration = generator.duration
# Check if we need to guess the generator duration
if duration == 0.0:
# Use the minimum acquire period as an estimate of readout time. We
# also need to multiple by frames_per_step as the DetectorChildPart
# divides the generator down to the duration for a single detector
# frame.
duration = self.min_acquire_period * frames_per_step
serialized = generator.to_dict()
new_generator = CompoundGenerator.from_dict(serialized)
new_generator.duration = duration
self.log.debug(
f"{self.name}: tweaking generator duration from "
f"{generator.duration} to {duration}"
)
return scanning.hooks.ParameterTweakInfo("generator", new_generator)
# Otherwise check the provided duration is long enough
else:
assert generator.duration >= self.min_acquire_period, (
f"Duration {generator.duration} per frame is less than minimum "
f"acquire period {self.min_acquire_period}s"
)
return None
return None
[docs] @add_call_types
def on_reset(self, context: scanning.hooks.AContext) -> None:
super().on_reset(context)
self.abort_detector(context)
# Delete the layout XML file
if self.attributes_filename and os.path.isfile(self.attributes_filename):
os.remove(self.attributes_filename)
child = context.block_view(self.mri)
child.attributesFile.put_value("")
[docs] @add_call_types
def on_report_status(self) -> scanning.hooks.UInfos:
ret: List[Info] = []
if self.main_dataset_useful:
ret.append(NDArrayDatasetInfo(rank=2))
for name, dataset_type in zip(
self.extra_attributes.value.name, self.extra_attributes.value.datasetType
):
ret.append(
NDAttributeDatasetInfo.from_attribute_type(
name=name, attr=name, type=dataset_type
)
)
return ret
def configure_args_with_exposure(self, keys):
need_keys = list(self.on_configure.call_types)
if "exposure" in keys:
need_keys.append("exposure")
return need_keys
# Allow CamelCase as fileDir parameter will be serialized
# noinspection PyPep8Naming
[docs] @add_call_types
def on_seek(
self,
context: scanning.hooks.AContext,
completed_steps: scanning.hooks.ACompletedSteps,
steps_to_do: scanning.hooks.AStepsToDo,
part_info: scanning.hooks.APartInfo,
generator: scanning.hooks.AGenerator,
fileDir: scanning.hooks.AFileDir,
breakpoints: scanning.controllers.ABreakpoints = None,
**kwargs: Any,
) -> None:
context.unsubscribe_all()
# If detector is hardware triggered, and we aren't using breakpoints, we can
# configure the detector for all frames now. This is instead of configuring and
# arming the detector for each inner scan, so we save some time
if self.is_hardware_triggered and not breakpoints:
num_images = generator.size - completed_steps
else:
num_images = steps_to_do
# Set up the detector
self.setup_detector(
context,
completed_steps,
steps_to_do,
num_images,
generator.duration,
part_info,
initial_configure=False,
**kwargs,
)
# Start now if we are hardware triggered
if self.is_hardware_triggered:
self.arm_detector(context)
# Allow CamelCase as fileDir parameter will be serialized
# noinspection PyPep8Naming
[docs] @add_call_types
def on_run(self, context: scanning.hooks.AContext) -> None:
if not self.is_hardware_triggered:
# Start now if we are software triggered
self.arm_detector(context)
assert self.registrar, "No assigned registrar"
self.log.debug(f"{self.mri}: Done when reaches: {self.done_when_reaches}")
self.wait_for_detector(
context, self.registrar, event_timeout=self.frame_timeout
)
[docs] @add_call_types
def on_post_run_armed(
self,
context: scanning.hooks.AContext,
completed_steps: scanning.hooks.ACompletedSteps,
steps_to_do: scanning.hooks.AStepsToDo,
part_info: scanning.hooks.APartInfo,
generator: scanning.hooks.AGenerator,
breakpoints: scanning.controllers.ABreakpoints = None,
) -> None:
# We may need to set up the detector again based on two conditions:
# - breakpoints can be an uneven number of steps
# - if pause has been called for a software-triggered detector which
# will set a different number of steps to do
if breakpoints or not self.is_hardware_triggered:
self.setup_detector(
context,
completed_steps,
steps_to_do,
steps_to_do,
generator.duration,
part_info,
initial_configure=False,
)
if self.is_hardware_triggered:
# We can now re-arm hardware-triggered detectors
self.arm_detector(context)
# Otherwise if we have hardware triggers and no breakpoints then
# seek should have set up the correct number of images for the
# remainder of the scan and we do not need to re-arm
else:
self.done_when_reaches += steps_to_do
[docs] @add_call_types
def on_abort(self, context: scanning.hooks.AContext) -> None:
self.abort_detector(context)