Source code for malcolm.modules.ADPandABlocks.parts.pandapulsetriggerpart
from typing import Optional
from annotypes import add_call_types
from malcolm.core import CAMEL_RE, APartName, BadValueError, PartRegistrar
from malcolm.modules import builtin, scanning
# Pull re-used annotypes into our namespace in case we are subclassed
APartName = APartName
AMri = builtin.parts.AMri
AInitialVisibility = builtin.parts.AInitialVisibility
[docs]class PandAPulseTriggerPart(builtin.parts.ChildPart):
"""Part for operating a single PULSE block in a PandA to stretch a trigger
pulse into a gate centred on the middle of the exposure. For the PandA it
needs the following exports:
- $(name)Width: width Attribute of the PULSE block with units set to "s"
- $(name)Delay: delay Attribute of the PULSE block with units set to "s"
- $(name)Step: step Attribute of the PULSE block with units set to "s"
- $(name)Pulses: pulses Attribute of the PULSE block
The Detector is required to have:
- exposure: an Attribute that reports after configure() the exposure that
is expected by the detector
"""
def __init__(
self, name: APartName, mri: AMri, initial_visibility: AInitialVisibility = True
) -> None:
super().__init__(
name, mri, initial_visibility=initial_visibility, stateful=False
)
assert CAMEL_RE.match(name), (
"PandAPulseTriggerPart name %r should be camelCase" % name
)
# The stored generator duration and detector framesPerStep from
# configure
self.generator_duration = None
self.frames_per_step = 1
# The panda Block we will be prodding
self.panda = None
# The detector Block we will be reading from
self.detector = None
[docs] def setup(self, registrar: PartRegistrar) -> None:
super().setup(registrar)
# Hooks
registrar.hook(scanning.hooks.ReportStatusHook, self.on_report_status)
registrar.hook(scanning.hooks.ConfigureHook, self.on_configure)
registrar.hook(scanning.hooks.PostConfigureHook, self.on_post_configure)
[docs] @add_call_types
def on_report_status(
self, context: scanning.hooks.AContext
) -> scanning.hooks.UInfos:
child = context.block_view(self.mri)
detector_mri = child.detector.value
# Say that we can do multi frame for this detector
info = scanning.infos.DetectorMutiframeInfo(detector_mri)
return info
# Allow CamelCase as these parameters will be serialized
# noinspection PyPep8Naming
[docs] @add_call_types
def on_configure(
self,
context: scanning.hooks.AContext,
generator: scanning.hooks.AGenerator,
detectors: scanning.util.ADetectorTable = None,
) -> None:
assert generator.duration > 0, (
"Can only create pulse triggers for a generator with the same "
"duration for every point, not %s" % generator
)
self.generator_duration = generator.duration
# Get the panda and the detector we will be using
child = context.block_view(self.mri)
panda_mri = child.panda.value
self.panda = context.block_view(panda_mri)
detector_mri = child.detector.value
self.detector = context.block_view(detector_mri)
# Get the framesPerStep for this detector from the detectors table
assert detectors, "No detectors passed in table"
for enable, _, mri, _, frames_per_step in detectors.rows():
if mri == detector_mri:
# Found a row telling us how many frames per step to generate
if enable:
assert (
frames_per_step > 0
), "Zero frames per step for %s, how did this happen?" % (mri)
self.frames_per_step = frames_per_step
else:
self.frames_per_step = 0
break
else:
raise BadValueError(
"Detector table %s doesn't contain row for %s"
% (detectors, detector_mri)
)
# Check that the Attributes we expect are exported
pulse_name = None
suffixes = ["Pulses", "Width", "Step", "Delay"]
expected_exports = set(self.name + s for s in suffixes)
assert self.panda, "No assigned PandA"
for source, export in self.panda.exports.value.rows():
if export in expected_exports:
part_name = source.split(".")[0]
if pulse_name:
assert part_name == pulse_name, (
"Export %s defined for a different pulse block" % export
)
else:
pulse_name = part_name
expected_exports.remove(export)
assert not expected_exports, "PandA %r did not define exports %s" % (
panda_mri,
sorted(expected_exports),
)
# Find the PULSE Block for further checks
pulse_mri: Optional[str] = None
assert self.panda, "No assigned PandA"
for name, mri, _, _, _ in self.panda.layout.value.rows():
if name == pulse_name:
pulse_mri = mri
assert pulse_mri, "Can't find mri for pulse block %r" % pulse_name
# Check that the Attributes have the right units for all except Pulses
pulse_block = context.block_view(pulse_mri)
for suffix in suffixes:
if suffix != "Pulses":
units = pulse_block[suffix.lower() + "Units"].value
assert (
units == "s"
), "Pulse block %r attribute %r needs units 's', not %r" % (
panda_mri,
suffix,
units,
)
def on_post_configure(self):
if self.frames_per_step > 0:
# Sanity check that the detector is armed
detector_state = self.detector.state.value
assert (
detector_state == "Armed"
), "Expected %s to be Armed, but it is %s" % (
self.detector.mri,
detector_state,
)
# We are taking part, so calculate pulse values
step = float(self.generator_duration) / self.frames_per_step
try:
width = self.detector.exposure.value
except KeyError:
# No exposure, so assume a very tiny readout time
width = step - 1e-6
assert width < step, "Width %s is not less than Step %s" % (width, step)
values = {
self.name + "Step": step,
self.name + "Width": width,
self.name + "Delay": (step - width) / 2,
self.name + "Pulses": self.frames_per_step,
}
self.panda.put_attribute_values(values)