from typing import Optional, Tuple
from annotypes import Anno, add_call_types
from scanpointgenerator import (
CompoundGenerator,
Generator,
LineGenerator,
StaticPointGenerator,
)
from malcolm.modules import builtin, scanning
from malcolm.modules.scanning.util import ADetectorTable
from ..util import AlternatingDelayAfterMutator, get_min_turnaround
from .pmacchildpart import PmacChildPart, VelocityModes
# 80 char line lengths...
AIV = builtin.parts.AInitialVisibility
# Pull re-used annotypes into our namespace in case we are subclassed
APartName = builtin.parts.APartName
AMri = builtin.parts.AMri
with Anno("Name of the selector axis scannable"):
ASelectorAxis = str
with Anno("Angle of for the tomography detector position"):
ATomoAngle = float
with Anno("Angle of the diffraction detector position"):
ADiffAngle = float
with Anno("Name of the tomography detector (should match DetectorChildPart)"):
ATomoDetector = str
with Anno("Name of the diffraction detector (should match DetectorChildPart)"):
ADiffDetector = str
with Anno("Minimum move time between the two positions in seconds"):
AMoveTime = float
[docs]class BeamSelectorPart(PmacChildPart):
"""
This part is for the K11 beam selector scan.
It moves a motor between two positions, holding at each position for the exposure
time of a particular detector before moving.
"""
def __init__(
self,
name: APartName,
mri: AMri,
selector_axis: ASelectorAxis,
imaging_angle: ATomoAngle,
diffraction_angle: ADiffAngle,
imaging_detector: ATomoDetector,
diffraction_detector: ADiffDetector,
move_time: AMoveTime,
initial_visibility: AIV = False,
) -> None:
# Some basic checking
parsed_move_time = float(move_time)
if parsed_move_time <= 0.0:
raise ValueError("Move time must be larger than zero.")
elif not isinstance(selector_axis, str):
raise ValueError("Selector axis name must be a string")
elif not isinstance(imaging_detector, str):
raise ValueError("Tomography detector name must be a string")
elif not isinstance(diffraction_detector, str):
raise ValueError("Diffraction detector name must be a string")
super().__init__(name, mri, initial_visibility)
self.selector_axis = selector_axis
self.imaging_detector = imaging_detector
self.diffraction_detector = diffraction_detector
self.imaging_angle = float(imaging_angle)
self.diffraction_angle = float(diffraction_angle)
self.move_time = parsed_move_time
def _get_error_message(self, name: str, mri: str, message: str) -> str:
return f"{mri} (name {name}): {message}"
def _check_detector_parameters(
self, name: str, mri: str, frames: int, enable: bool, exposure: float
) -> None:
if frames != 1:
raise ValueError(
self._get_error_message(name, mri, "Can only do 1 frame per step")
)
elif not enable:
raise ValueError(
self._get_error_message(name, mri, "Detector needs to be enabled")
)
elif exposure <= 0.0:
raise ValueError(
self._get_error_message(
name, mri, "Exposure needs to be greater than zero"
)
)
def _get_detector_exposure_times(
self, detectors: ADetectorTable
) -> Tuple[float, float]:
assert detectors, "No detector table found"
diffraction_detector_found = False
imaging_detector_found = False
diffraction_detector_exposure = 0.0
imaging_detector_exposure = 0.0
for enable, name, mri, exposure, frames in detectors.rows():
if name == self.diffraction_detector:
self._check_detector_parameters(name, mri, frames, enable, exposure)
diffraction_detector_exposure = exposure
diffraction_detector_found = True
elif name == self.imaging_detector:
self._check_detector_parameters(name, mri, frames, enable, exposure)
imaging_detector_exposure = exposure
imaging_detector_found = True
if diffraction_detector_found and imaging_detector_found:
break
if not diffraction_detector_found:
raise ValueError("Diffraction detector not found in table")
elif not imaging_detector_found:
raise ValueError("Tomography detector not found in table")
return diffraction_detector_exposure, imaging_detector_exposure
def _get_time_at_positions(
self, part_info: scanning.hooks.APartInfo, detectors: ADetectorTable
) -> Tuple[float, float]:
# Find out the exposure times of our detectors
(
diffraction_detector_exposure,
imaging_detector_exposure,
) = self._get_detector_exposure_times(detectors)
# Increase the time at each position to the minimum turnaround if necessary
min_turnaround = get_min_turnaround(part_info)
time_at_diffraction_position = max(
min_turnaround.time, diffraction_detector_exposure
)
time_at_imaging_position = max(min_turnaround.time, imaging_detector_exposure)
return time_at_diffraction_position, time_at_imaging_position
def _calculate_cycle_duration(
self, time_at_diffraction_position: float, time_at_imaging_position: float
) -> float:
return (
time_at_diffraction_position + time_at_imaging_position + 2 * self.move_time
)
def _check_generator_is_static(self, primary_generator: Generator) -> None:
assert isinstance(
primary_generator, StaticPointGenerator
), f"Expected primary generator to be static, got {type(primary_generator)}"
[docs] @add_call_types
def on_validate(
self,
generator: scanning.hooks.AGenerator,
part_info: scanning.hooks.APartInfo,
detectors: ADetectorTable,
) -> Optional[scanning.hooks.UParameterTweakInfos]:
# Check the primary generator is static
self._check_generator_is_static(generator.generators[0])
# Calculate the time that should be spent at each position
(
time_at_diffraction_position,
time_at_imaging_position,
) = self._get_time_at_positions(part_info, detectors)
# Now calculate how long one cycle should take
cycle_duration = self._calculate_cycle_duration(
time_at_diffraction_position, time_at_imaging_position
)
# See if we need to tweak the generator
if generator.duration != cycle_duration:
# Return the generator with our cycle duration
self.log.debug(
f"{self.name}: tweaking generator duration from {generator.duration} "
f"to {cycle_duration}"
)
serialized = generator.to_dict()
new_generator = CompoundGenerator.from_dict(serialized)
new_generator.duration = cycle_duration
return scanning.infos.ParameterTweakInfo("generator", new_generator)
else:
return None
def add_tail_off(self):
# The current point
current_point = self.generator.get_point(self.steps_up_to - 1)
# the next point is same as the previous
next_point = self.generator.get_point(self.steps_up_to - 2)
# insert the turnaround points
self.insert_gap(current_point, next_point, self.steps_up_to + 1)
# Set the velocity of the last point to 0
self.profile["velocityMode"][-1] = VelocityModes.ZERO_VELOCITY
self.end_index = self.steps_up_to