from typing import Union
import numpy as np
from annotypes import Anno, Array, add_call_types
from malcolm.core import (
DEFAULT_TIMEOUT,
Display,
IncompatibleError,
NumberMeta,
PartRegistrar,
Widget,
)
from malcolm.modules import builtin
from ..util import CS_AXIS_NAMES
# recommended trajectory program number and lowest allowed program number
TRAJECTORY_PROGRAM_NUM = 3
FALLBACK_TRAJ_PROGRAM_NUM = 2
# The maximum number of points in a single trajectory scan
MAX_NUM_POINTS = 4000000
with Anno("The Asyn Port name of the Co-ordinate system port we want to scan"):
ACSPort = str
with Anno("The relative time points to scan in microseconds"):
ATimeArray = Union[Array[np.int32]]
with Anno("The velocity mode of each point"):
AVelocityMode = Union[Array[np.int32]]
with Anno("Which user program to run for each point"):
AUserPrograms = Union[Array[np.int32]]
with Anno("The position the axis should be at for each point in the scan"):
ADemandTrajectory = Union[Array[np.float64]]
# Pull re-used annotypes into our namespace in case we are subclassed
APartName = builtin.parts.APartName
AMri = builtin.parts.AMri
def _zeros_or_right_length(array, num_points):
if array is None:
array = np.zeros(num_points, np.int32)
else:
assert (
len(array) == num_points
), f"Array {array} should be {num_points} points long"
return array
# We will set these attributes on the child block, so don't save them
[docs]@builtin.util.no_save(
"numPoints",
"enableCallbacks",
"computeStatistics",
"timeArray",
"cs",
"velocityMode",
"userPrograms",
"pointsToBuild",
)
@builtin.util.no_save(f"use{x}" for x in CS_AXIS_NAMES)
@builtin.util.no_save(f"positions{x}" for x in CS_AXIS_NAMES)
class PmacTrajectoryPart(builtin.parts.ChildPart):
def __init__(
self,
name: APartName,
mri: AMri,
) -> None:
super().__init__(name, mri, initial_visibility=True)
# The total number of points we have written
self.total_points = 0
self.points_scanned = NumberMeta(
"int32", "The number of points scanned", tags=[Widget.METER.tag()]
).create_attribute_model(0)
def setup(self, registrar: PartRegistrar) -> None:
super().setup(registrar)
# Add methods
registrar.add_method_model(
self.write_profile, "writeProfile", needs_context=True
)
registrar.add_method_model(
self.execute_profile, "executeProfile", needs_context=True
)
registrar.add_method_model(
self.abort_profile, "abortProfile", needs_context=True
)
# Add Attributes
registrar.add_attribute_model("pointsScanned", self.points_scanned)
# Serialized, so use camelCase
# noinspection PyPep8Naming
[docs] @add_call_types
def write_profile(
self,
context: builtin.hooks.AContext,
timeArray: ATimeArray,
csPort: ACSPort = None,
velocityMode: AVelocityMode = None,
userPrograms: AUserPrograms = None,
a: ADemandTrajectory = None,
b: ADemandTrajectory = None,
c: ADemandTrajectory = None,
u: ADemandTrajectory = None,
v: ADemandTrajectory = None,
w: ADemandTrajectory = None,
x: ADemandTrajectory = None,
y: ADemandTrajectory = None,
z: ADemandTrajectory = None,
) -> None:
child = context.block_view(self.mri)
# make sure a matching trajectory program is installed on the pmac
if child.trajectoryProgVersion.value != TRAJECTORY_PROGRAM_NUM:
if child.trajectoryProgVersion.value >= FALLBACK_TRAJ_PROGRAM_NUM:
self.log.warning(
f"pmac trajectory program is version {FALLBACK_TRAJ_PROGRAM_NUM}"
f" version {TRAJECTORY_PROGRAM_NUM} is recommended"
)
else:
raise (
IncompatibleError(
f"pmac trajectory program {child.trajectoryProgVersion.value} "
f"detected. Malcolm requires {TRAJECTORY_PROGRAM_NUM}"
)
)
# The axes taking part in the scan
use_axes = []
for axis in CS_AXIS_NAMES:
if locals()[axis.lower()] is not None:
use_axes.append(axis)
if csPort is not None:
# This is a build
action = child.buildProfile
self.total_points = 0
child.numPoints.put_value(MAX_NUM_POINTS)
try:
child.cs.put_value(csPort)
except ValueError as e:
raise ValueError(
f"Cannot set CS to {csPort}, did you use a compound_motor_block "
f"for a raw motor?\n{e}"
)
# Tell the trajectory scans which of the arrays to use
arrays = {f"use{axis}": axis in use_axes for axis in CS_AXIS_NAMES}
child.put_attribute_values(arrays)
else:
# This is an append
action = child.appendProfile
# Fill in the arrays
num_points = len(timeArray)
attribute_values = dict(
timeArray=timeArray,
pointsToBuild=num_points,
velocityMode=_zeros_or_right_length(velocityMode, num_points),
userPrograms=_zeros_or_right_length(userPrograms, num_points),
)
for axis in use_axes:
demand = locals()[axis.lower()]
attribute_values[f"positions{axis}"] = demand
child.put_attribute_values(attribute_values)
# Write the profile
action()
# Record how many points we have now written in total
self.total_points += num_points
def set_scan_length(self, value):
self.points_scanned.meta.set_display(Display(limitHigh=value))
[docs] @add_call_types
def execute_profile(self, context: builtin.hooks.AContext) -> None:
child = context.block_view(self.mri)
fs1 = context.subscribe(
[self.mri, "pointsScanned", "value"], self.points_scanned.set_value
)
fs2 = context.subscribe(
[self.mri, "pointsBuilt", "value"], self.set_scan_length
)
try:
child.executeProfile()
# Now wait for up to 2*min_delta time to make sure any
# update_completed_steps come in
child.when_value_matches(
"pointsScanned", self.total_points, timeout=DEFAULT_TIMEOUT
)
finally:
context.unsubscribe(fs1)
context.unsubscribe(fs2)
[docs] @add_call_types
def abort_profile(self, context: builtin.hooks.AContext) -> None:
child = context.block_view(self.mri)
child.abortProfile()