Source code for malcolm.modules.adUtil.parts.reframepluginpart

from typing import Any

from annotypes import Anno, add_call_types
from scanpointgenerator import CompoundGenerator

from malcolm.core import Context, PartRegistrar
from malcolm.modules import ADCore, builtin, scanning

with Anno("Sample frequency of ADC signal in Hz"):
    ASampleFreq = float

with Anno("Is the input trigger gated?"):
    AGatedTrigger = bool

# Pull re-used annotypes into our namespace in case we are subclassed
APartName = ADCore.parts.APartName
AMri = ADCore.parts.AMri


# We will set these attributes on the child block, so don't save them
[docs]@builtin.util.no_save("postCount", "averageSamples") class ReframePluginPart(ADCore.parts.DetectorDriverPart): def __init__( self, name: APartName, mri: AMri, sample_freq: ASampleFreq = 10000.0, gated_trigger: AGatedTrigger = False, ) -> None: super().__init__(name, mri, soft_trigger_modes="Always On") self.sample_freq = sample_freq self.gated_trigger = gated_trigger def setup(self, registrar: PartRegistrar) -> None: super().setup(registrar) # Hooks registrar.hook(scanning.hooks.ValidateHook, self.on_validate)
[docs] @add_call_types def on_validate( self, generator: scanning.hooks.AGenerator ) -> scanning.hooks.UParameterTweakInfos: duration = generator.duration if duration == 0.0: # Set the duration for 2 samples (1 live 1 dead) serialized = generator.to_dict() new_generator = CompoundGenerator.from_dict(serialized) new_generator.duration = 2 / self.sample_freq return scanning.infos.ParameterTweakInfo("generator", new_generator) else: assert ( duration > 0 ), f"Generator duration of {duration} must be > 0 to signify fixed exposure" assert ( self._number_of_adc_samples(duration) > 0 ), f"Generator duration of {duration} gives < 1 ADC sample" return None
def _number_of_adc_samples(self, generator_duration: float): return int(generator_duration * self.sample_freq) def setup_detector( self, context: Context, completed_steps: scanning.hooks.ACompletedSteps, steps_to_do: scanning.hooks.AStepsToDo, num_images: int, duration: float, part_info: scanning.hooks.APartInfo, initial_configure: bool = True, **kwargs: Any, ) -> None: if initial_configure: # This is an initial configure, so reset arrayCounter to 0 array_counter = 0 self.done_when_reaches = steps_to_do else: # This is rewinding or setting up for another batch, # skip to a uniqueID that has not been produced yet array_counter = self.done_when_reaches self.done_when_reaches += steps_to_do self.uniqueid_offset = completed_steps - array_counter child = context.block_view(self.mri) for k, v in dict( arrayCounter=array_counter, imageMode=self.multiple_image_mode, numImages=num_images, arrayCallbacks=True, ).items(): if k not in kwargs and k in child: kwargs[k] = v # Ignore exposure time attribute kwargs.pop("exposure", None) child.put_attribute_values(kwargs) # Calculate number of samples post_trigger_samples = self._number_of_adc_samples(duration) if self.is_hardware_triggered: if self.gated_trigger: # Gated signal is responsible for number of samples post_trigger_samples = 0 # We also need averaging to ensure we get consistent frame dimensions child.averageSamples.put_value("Yes") else: # For getting just start triggers, ensure we do not miss one post_trigger_samples -= 1 assert ( post_trigger_samples > 0 ), f"Generator duration {duration} too short for start triggers" else: # Need triggerOffCondition to be Always On for Software triggers assert ( child.triggerOffCondition.value == "Always On" ), "Software triggering requires off condition to be 'Always On'" child.postCount.put_value(post_trigger_samples)