import re
from enum import Enum
from typing import Dict, List, Optional, Union
import numpy as np
from annotypes import add_call_types
from scanpointgenerator import CompoundGenerator
from malcolm.core import Block, Future, PartRegistrar, Put, Request
from malcolm.modules import builtin, scanning
from malcolm.modules.pmac.util import get_motion_trigger
from malcolm.modules.scanning.infos import MinTurnaroundInfo, MotionTrigger
from ..infos import MotorInfo
from ..util import (
MIN_INTERVAL,
MIN_TIME,
all_points_joined,
all_points_same_velocities,
cs_axis_mapping,
cs_port_with_motors_in,
get_motion_axes,
point_velocities,
profile_between_points,
)
# Number of seconds that a trajectory tick is
TICK_S = 0.000001
# Longest move time we can request
MAX_MOVE_TIME = 4.0
# velocity modes
class VelocityModes:
AVERAGE_PREV_TO_NEXT = 0
REAL_PREV_TO_CURRENT = 1
AVERAGE_PREV_TO_CURRENT = 2
ZERO_VELOCITY = 3
# user programs
class UserPrograms:
NO_PROGRAM = 0 # Do nothing
LIVE_PROGRAM = 1 # GPIO123 = 1, 0, 0
DEAD_PROGRAM = 2 # GPIO123 = 0, 1, 0
MID_PROGRAM = 4 # GPIO123 = 0, 0, 1
ZERO_PROGRAM = 8 # GPIO123 = 0, 0, 0
class PointType(Enum):
START_OF_ROW = 0 # Lower bound of first point of row
MID_POINT = 1 # Position of any point
POINT_JOIN = 2 # Boundary of two joined points
END_OF_ROW = 3 # Upper boundary of last point of row
TURNAROUND = 4 # Between rows
# How many profile points to write each time
PROFILE_POINTS = 2000
# How many points to extract from a scanpointgenerator each time
BATCH_POINTS = 20000
# 80 char line lengths...
AIV = builtin.parts.AInitialVisibility
# Pull re-used annotypes into our namespace in case we are subclassed
APartName = builtin.parts.APartName
AMri = builtin.parts.AMri
[docs]class PmacChildPart(builtin.parts.ChildPart):
def __init__(
self, name: APartName, mri: AMri, initial_visibility: AIV = False
) -> None:
super().__init__(name, mri, initial_visibility)
# Axis information stored from validate
self.axis_mapping: Dict[str, MotorInfo] = {}
# Lookup of the completed_step value for each point
self.completed_steps_lookup: List[int] = []
# The minimum turnaround time for non-joined points
self.min_turnaround = 0.0
# The minimum turnaround time for non-joined points
self.min_interval = 0.0
# If we are currently loading then block loading more points
self.loading = False
# Where we have generated into profile
self.end_index = 0
# Where we should stop loading points
self.steps_up_to = 0
# What sort of triggers to output
self.output_triggers: Optional[MotionTrigger] = None
# Profile points that haven't been sent yet
# {timeArray/velocityMode/userPrograms/a/b/c/u/v/w/x/y/z: [elements]}
self.profile: Dict[str, List] = {}
# accumulated intervals since the last PVT point used by sparse
# trajectory logic
self.time_since_last_pvt = 0
# Stored generator for positions
self.generator: CompoundGenerator = None
[docs] def setup(self, registrar: PartRegistrar) -> None:
super().setup(registrar)
# Hooks
registrar.hook(scanning.hooks.ValidateHook, self.on_validate)
registrar.hook(scanning.hooks.PreConfigureHook, self.reload)
registrar.hook(
(
scanning.hooks.ConfigureHook,
scanning.hooks.PostRunArmedHook,
scanning.hooks.SeekHook,
),
self.on_configure,
)
registrar.hook(scanning.hooks.RunHook, self.on_run)
registrar.hook(
(scanning.hooks.AbortHook, scanning.hooks.PauseHook), self.on_abort
)
[docs] def notify_dispatch_request(self, request: Request) -> None:
if isinstance(request, Put) and request.path[1] == "design":
# We have hooked self.reload to PreConfigure, and reload() will
# set design attribute, so explicitly allow this without checking
# it is in no_save (as it won't be in there)
pass
else:
super().notify_dispatch_request(request)
[docs] @add_call_types
def on_reset(self, context: builtin.hooks.AContext) -> None:
super().on_reset(context)
self.on_abort(context)
# Allow CamelCase as arguments will be serialized
# noinspection PyPep8Naming
[docs] @add_call_types
def on_validate(
self,
context: scanning.hooks.AContext,
generator: scanning.hooks.AGenerator,
axesToMove: scanning.hooks.AAxesToMove,
part_info: scanning.hooks.APartInfo,
) -> scanning.hooks.UParameterTweakInfos:
child = context.block_view(self.mri)
# Check that we can move all the requested axes
available = set(child.layout.value.name)
motion_axes = get_motion_axes(generator, axesToMove)
assert available.issuperset(
motion_axes
), "Some of the requested axes %s are not on the motor list %s" % (
list(axesToMove),
sorted(available),
)
# If GPIO not demanded for every point we don't need to align to the
# servo cycle
trigger = get_motion_trigger(part_info)
if trigger != scanning.infos.MotionTrigger.EVERY_POINT:
return None
# Find the duration
assert generator.duration > 0, "Can only do fixed duration at the moment"
servo_freq = child.servoFrequency()
# convert half an exposure to multiple of servo ticks, rounding down
ticks = np.floor(servo_freq * 0.5 * generator.duration)
if not np.isclose(servo_freq, 3200):
# + 0.002 for some observed jitter in the servo frequency if I10
# isn't a whole number of 1/4 us move timer ticks
# (any frequency apart from 3.2 kHz)
ticks += 0.002
# convert to integer number of microseconds, rounding up
micros = np.ceil(ticks / servo_freq * 1e6)
# back to duration
duration = 2 * float(micros) / 1e6
if duration != generator.duration:
self.log.debug(
f"Calculated duration {duration} does not match {generator}"
)
serialized = generator.to_dict()
new_generator = CompoundGenerator.from_dict(serialized)
new_generator.duration = duration
return scanning.infos.ParameterTweakInfo("generator", new_generator)
else:
return None
def move_to_start(self, child: Block, cs_port: str, completed_steps: int) -> Future:
# Work out what method to call
match = re.search(r"\d+$", cs_port)
assert match, "Cannot extract CS number from CS port '%s'" % cs_port
move_async = child["moveCS%s_async" % match.group()]
# Set all the axes to move to the start positions
first_point = self.generator.get_point(completed_steps)
args = {}
move_to_start_time = 0.0
for axis_name, velocity in point_velocities(
self.axis_mapping, first_point
).items():
motor_info: MotorInfo = self.axis_mapping[axis_name]
acceleration_distance = motor_info.ramp_distance(
0, velocity, min_ramp_time=MIN_TIME
)
start_pos = first_point.lower[axis_name] - acceleration_distance
args[motor_info.cs_axis.lower()] = start_pos
# Time profile that the move is likely to take
# NOTE: this is only accurate if pmac max velocity in linear motion
# prog is set to same speed as motor record VMAX
profile = motor_info.make_velocity_profile(
0, 0, motor_info.current_position - start_pos, 0
)
times, _ = profile.make_arrays()
move_to_start_time = max(times[-1], move_to_start_time)
# Call the method with the values
fs = move_async(moveTime=move_to_start_time, **args)
return fs
# Allow CamelCase as arguments will be serialized
# noinspection PyPep8Naming
[docs] @add_call_types
def on_run(self, context: scanning.hooks.AContext) -> None:
if self.generator:
self.loading = False
child = context.block_view(self.mri)
# Wait for the trajectory to run and complete
child.pointsScanned.subscribe_value(self.update_step, child)
# TODO: we should return at the end of the last point for PostRun
child.executeProfile()
[docs] @add_call_types
def on_abort(self, context: scanning.hooks.AContext) -> None:
if self.generator:
child = context.block_view(self.mri)
# TODO: if we abort during move to start, what happens?
child.abortProfile()
def update_step(self, scanned, child):
# scanned is an index into the completed_steps_lookup, so a
# "how far through the pmac trajectory" rather than a generator
# scan step
if scanned > 0:
completed_steps = self.completed_steps_lookup[scanned - 1]
self.registrar.report(scanning.infos.RunProgressInfo(completed_steps))
# Keep PROFILE_POINTS trajectory points in front
if (
not self.loading
and self.end_index < self.steps_up_to
and len(self.completed_steps_lookup) - scanned < PROFILE_POINTS
):
self.loading = True
self.calculate_generator_profile(self.end_index)
self.write_profile_points(child)
self.loading = False
# If we got to the end, there might be some leftover points that
# need to be appended to finish
if (
not self.loading
and self.end_index == self.steps_up_to
and self.profile["timeArray"]
):
self.loading = True
self.calculate_generator_profile(self.end_index)
self.write_profile_points(child)
assert not self.profile["timeArray"], (
"Why do we still have points? %s" % self.profile
)
self.loading = False
[docs] def write_profile_points(self, child, cs_port=None):
"""Build profile using given data
Args:
child (Block): Child block for running
cs_port (str): CS Port if this is a build rather than append
"""
# These are the things we will send
args = {}
if cs_port is not None:
args["csPort"] = cs_port
for k, v in self.profile.items():
# store the remnant back in the array
self.profile[k] = v[PROFILE_POINTS:]
v = v[:PROFILE_POINTS]
if k == "timeArray":
overflow = 0.0
time_array_ticks = []
for t in v:
ticks = t / TICK_S
overflow += ticks % 1
ticks = int(ticks)
if overflow > 0.5:
overflow -= 1
ticks += 1
time_array_ticks.append(ticks)
# TODO: overflow discarded every 10000 points, is it a problem?
v = np.array(time_array_ticks, np.int32)
elif k in ("velocityMode", "userPrograms"):
v = np.array(v, np.int32)
else:
v = np.array(v, np.float64)
args[k] = v
child.writeProfile(**args)
user_program = {
scanning.infos.MotionTrigger.NONE: {
PointType.POINT_JOIN: UserPrograms.NO_PROGRAM,
PointType.START_OF_ROW: UserPrograms.NO_PROGRAM,
PointType.MID_POINT: UserPrograms.NO_PROGRAM,
PointType.END_OF_ROW: UserPrograms.NO_PROGRAM,
PointType.TURNAROUND: UserPrograms.NO_PROGRAM,
},
scanning.infos.MotionTrigger.ROW_GATE: {
PointType.POINT_JOIN: UserPrograms.NO_PROGRAM,
PointType.START_OF_ROW: UserPrograms.LIVE_PROGRAM,
PointType.MID_POINT: UserPrograms.NO_PROGRAM,
PointType.END_OF_ROW: UserPrograms.ZERO_PROGRAM,
PointType.TURNAROUND: UserPrograms.NO_PROGRAM,
},
scanning.infos.MotionTrigger.EVERY_POINT: {
PointType.POINT_JOIN: UserPrograms.LIVE_PROGRAM,
PointType.START_OF_ROW: UserPrograms.LIVE_PROGRAM,
PointType.MID_POINT: UserPrograms.MID_PROGRAM,
PointType.END_OF_ROW: UserPrograms.DEAD_PROGRAM,
PointType.TURNAROUND: UserPrograms.ZERO_PROGRAM,
},
}
def get_user_program(self, point_type: PointType) -> int:
assert self.output_triggers, "No output triggers"
return self.user_program[self.output_triggers][point_type]
def calculate_profile_from_velocities(
self, time_arrays, velocity_arrays, current_positions, completed_steps
):
# at this point we have time/velocity arrays with 2-4 values for each
# axis. Each time represents a (instantaneous) change in acceleration.
# We want to translate this into a move profile (time/position).
# Every axis profile must have a point for each of the times from
# all axes combined
# extract the time points from all axes
t_list = []
for time_array in time_arrays.values():
t_list.extend(time_array)
combined_times = np.array(t_list)
combined_times = np.unique(combined_times)
# remove the 0 time initial point
combined_times = list(np.sort(combined_times))[1:]
num_intervals = len(combined_times)
# set up the time, mode and user arrays for the trajectory
prev_time = 0
time_intervals = []
for t in combined_times:
# times are absolute - convert to intervals
time_intervals.append(t - prev_time)
prev_time = t
# generate the profile positions in a temporary list of dict:
turnaround_profile = [{} for n in range(num_intervals)]
# Do this for each axis' velocity and time arrays
for axis_name, motor_info in self.axis_mapping.items():
axis_times = time_arrays[axis_name]
axis_velocities = velocity_arrays[axis_name]
prev_velocity = axis_velocities[0]
position = current_positions[axis_name]
# tracks the accumulated interpolated interval time since the
# last axis velocity profile point
time_interval = 0
# At this point we have time/velocity arrays with multiple values
# some of which align with the axis_times and some interleave.
# We want to create a matching move profile of 'num_intervals'
axis_pt = 1
for i in range(num_intervals):
axis_velocity = axis_velocities[axis_pt]
axis_prev_velocity = axis_velocities[axis_pt - 1]
axis_interval = axis_times[axis_pt] - axis_times[axis_pt - 1]
if np.isclose(combined_times[i], axis_times[axis_pt]):
# this combined point matches the axis point
# use the axis velocity and move to the next axis point
this_velocity = axis_velocities[axis_pt]
axis_pt += 1
time_interval = 0
else:
# this combined point is between two axis points,
# interpolate the velocity between those axis points
time_interval += time_intervals[i]
fraction = time_interval / axis_interval
dv = axis_velocity - axis_prev_velocity
this_velocity = axis_prev_velocity + fraction * dv
part_position = motor_info.ramp_distance(
prev_velocity, this_velocity, time_intervals[i]
)
prev_velocity = this_velocity
position += part_position
turnaround_profile[i][axis_name] = position
user_program = self.get_user_program(PointType.TURNAROUND)
for i in range(num_intervals):
self.add_profile_point(
time_intervals[i],
VelocityModes.REAL_PREV_TO_CURRENT,
user_program,
completed_steps,
turnaround_profile[i],
)
def add_profile_point(
self, time_point, velocity_mode, user_program, completed_step, axis_points
):
# Add padding if the move time exceeds the max pmac move time
if time_point > MAX_MOVE_TIME:
assert self.profile[
"timeArray"
], "Can't stretch the first point of a profile"
nsplit = int(time_point / MAX_MOVE_TIME + 1)
for _ in range(nsplit):
self.profile["timeArray"].append(time_point / nsplit)
for _ in range(nsplit - 1):
self.profile["velocityMode"].append(VelocityModes.AVERAGE_PREV_TO_NEXT)
self.profile["userPrograms"].append(UserPrograms.NO_PROGRAM)
for k, v in axis_points.items():
cs_axis = self.axis_mapping[k].cs_axis.lower()
last_point = self.profile[cs_axis][-1]
per_section = float(v - last_point) / nsplit
for i in range(1, nsplit):
self.profile[cs_axis].append(last_point + i * per_section)
last_completed_step = self.completed_steps_lookup[-1]
for _ in range(nsplit - 1):
self.completed_steps_lookup.append(last_completed_step)
else:
# Add point
self.profile["timeArray"].append(time_point)
# Set the requested point
self.profile["velocityMode"].append(velocity_mode)
self.profile["userPrograms"].append(user_program)
self.completed_steps_lookup.append(completed_step)
for k, v in axis_points.items():
cs_axis = self.axis_mapping[k].cs_axis.lower()
self.profile[cs_axis].append(v)
def add_generator_point_pair(self, point, point_num, points_are_joined):
# Add position
user_program = self.get_user_program(PointType.MID_POINT)
self.add_profile_point(
point.duration / 2.0,
VelocityModes.AVERAGE_PREV_TO_NEXT,
user_program,
point_num,
{name: point.positions[name] for name in self.axis_mapping},
)
# insert the lower bound of the next frame
if points_are_joined:
user_program = self.get_user_program(PointType.POINT_JOIN)
velocity_point = VelocityModes.AVERAGE_PREV_TO_NEXT
else:
user_program = self.get_user_program(PointType.END_OF_ROW)
velocity_point = VelocityModes.REAL_PREV_TO_CURRENT
self.add_profile_point(
point.duration / 2.0,
velocity_point,
user_program,
point_num + 1,
{name: point.upper[name] for name in self.axis_mapping},
)
[docs] def add_sparse_point(self, points, point_num, points_are_joined, same_velocities):
"""
Add in points but skip those that are linear to create a sparse
trajectory. Add the upper bound when the points are non-linear.
Always add the upper bound for the last point in a row (not joined to
the next point).
Joined| Same Vel|| Add Point | Add Upper
0 | 0 || Y | Y
0 | 1 || N | Y
1 | 0 || Y | Y
1 | 1 || N | N
NOTE:
This function may be called millions of times during the configure
phase and hence is highly optimized. In particular the use of variable
'point' looks like it may be referenced before assignment. The paths
controlled by the matrix above show that it will be always be assigned
or not used at all. Indexing into points[] is expensive so this is
intentional.
"""
if self.time_since_last_pvt > 0 and not points_are_joined:
# assume we can skip if we are at the end of a row and we
# just skipped the most recent point (i.e. time_since_last_pvt > 0)
do_skip = True
else:
# otherwise skip this point if it is linear to previous point
do_skip = points_are_joined and same_velocities
if do_skip:
self.time_since_last_pvt += points.duration[point_num]
else:
# not skipping - add this mid or end point
point = points[point_num]
user_program = self.get_user_program(PointType.MID_POINT)
self.add_profile_point(
self.time_since_last_pvt + point.duration / 2.0,
VelocityModes.AVERAGE_PREV_TO_NEXT,
user_program,
point_num,
{name: point.positions[name] for name in self.axis_mapping},
)
self.time_since_last_pvt = point.duration / 2.0
# only add the lower bound if we did not skip this point OR if we are
# at the end of a row where we always require a final point
if not do_skip or not points_are_joined:
# insert the lower bound of the next frame (i.e. the upper bound for
# this frame)
if points_are_joined:
user_program = self.get_user_program(PointType.POINT_JOIN)
velocity_point = VelocityModes.AVERAGE_PREV_TO_NEXT
else:
point = points[point_num]
user_program = self.get_user_program(PointType.END_OF_ROW)
if self.time_since_last_pvt > 0:
# if we have previously skipped points in this row then we
# use AVERAGE_PREV_TO_CURRENT at the end of the row this
# breaks the continuous line of REAL_PREV_TO_CURRENT which
# would accumulate errors over the scan
velocity_point = VelocityModes.AVERAGE_PREV_TO_CURRENT
else:
velocity_point = VelocityModes.REAL_PREV_TO_CURRENT
self.add_profile_point(
self.time_since_last_pvt,
velocity_point,
user_program,
point_num + 1,
{name: point.upper[name] for name in self.axis_mapping},
)
self.time_since_last_pvt = 0
def get_some_points(self, start_index):
# calculate the indices of the next batch of points to get for
# the calculate_generator_profile loop
# cap at PROFILE_POINTS (+1 so we can always get next_point)
if start_index == self.steps_up_to:
return None, None, None
if self.steps_up_to - start_index > BATCH_POINTS:
up_to = BATCH_POINTS + start_index + 1
points = self.generator.get_points(start_index, up_to)
else:
points = self.generator.get_points(start_index, self.steps_up_to)
velocities = all_points_same_velocities(points)
joined = all_points_joined(points)
return points, joined, velocities
def calculate_generator_profile(self, start_index, do_run_up=False):
# If we are doing the first build, do_run_up will be passed to flag
# that we need a run up, else just continue from the previous point
if do_run_up:
point = self.generator.get_point(start_index)
# Calculate how long to leave for the run-up (at least MIN_TIME)
run_up_time = self.min_interval
axis_points = {}
for axis_name, velocity in point_velocities(
self.axis_mapping, point
).items():
axis_points[axis_name] = point.lower[axis_name]
motor_info = self.axis_mapping[axis_name]
run_up_time = max(
run_up_time, motor_info.acceleration_time(0, velocity)
)
# Add lower bound
user_program = self.get_user_program(PointType.START_OF_ROW)
self.add_profile_point(
run_up_time,
VelocityModes.REAL_PREV_TO_CURRENT,
user_program,
start_index,
axis_points,
)
self.time_since_last_pvt = 0
points, joined, velocities = self.get_some_points(start_index)
points_idx = start_index
for i in range(start_index, self.steps_up_to):
if i - points_idx >= BATCH_POINTS:
points, joined, velocities = self.get_some_points(i)
points_idx = i
last_point = i + 1 == self.steps_up_to
if not last_point:
# cope with the zero axes case (where joined == None)
points_are_joined = joined is None or joined[i - points_idx]
same_velocities = velocities is None or velocities[i - points_idx]
else:
same_velocities = points_are_joined = False
if self.output_triggers == scanning.infos.MotionTrigger.EVERY_POINT:
self.add_generator_point_pair(
points[i - points_idx], i, points_are_joined
)
else:
self.add_sparse_point(
points, i - points_idx, points_are_joined, same_velocities
)
# add in the turnaround between non-contiguous points
if not (points_are_joined or last_point):
self.insert_gap(
points[i - points_idx], points[i - points_idx + 1], i + 1
)
# Check if we have exceeded the points number and need to write
# Strictly less than so we always add one more point to the time
# array so we can always stretch points in a subsequent add with
# the values already in the profiles
if len(self.profile["timeArray"]) > PROFILE_POINTS:
self.end_index = i + 1
return
self.add_tail_off()
def add_tail_off(self):
# Add the last tail off point
point = self.generator.get_point(self.steps_up_to - 1)
# Calculate how long to leave for the tail-off
# #(at least MIN_TIME)
axis_points = {}
tail_off_time = self.min_interval
for axis_name, velocity in point_velocities(
self.axis_mapping, point, entry=False
).items():
motor_info = self.axis_mapping[axis_name]
tail_off_time = max(
tail_off_time, motor_info.acceleration_time(0, velocity)
)
tail_off = motor_info.ramp_distance(velocity, 0)
axis_points[axis_name] = point.upper[axis_name] + tail_off
# Do the last move
user_program = self.get_user_program(PointType.TURNAROUND)
self.add_profile_point(
tail_off_time,
VelocityModes.ZERO_VELOCITY,
user_program,
self.steps_up_to,
axis_points,
)
self.end_index = self.steps_up_to
def insert_gap(self, point, next_point, completed_steps):
# Work out the velocity profiles of how to move to the start
min_turnaround = max(self.min_turnaround, point.delay_after)
time_arrays, velocity_arrays = profile_between_points(
self.axis_mapping, point, next_point, min_turnaround, self.min_interval
)
start_positions = {}
for axis_name in self.axis_mapping:
start_positions[axis_name] = point.upper[axis_name]
# Work out the Position trajectories from these profiles
self.calculate_profile_from_velocities(
time_arrays, velocity_arrays, start_positions, completed_steps
)
# make sure the last point is the same as next_point.lower since
# calculate_profile_from_velocities fails when the turnaround is 2
# points only
for axis_name, motor_info in self.axis_mapping.items():
self.profile[motor_info.cs_axis.lower()][-1] = next_point.lower[axis_name]
# Change the last point to be a live frame
self.profile["velocityMode"][-1] = VelocityModes.REAL_PREV_TO_CURRENT
user_program = self.get_user_program(PointType.START_OF_ROW)
self.profile["userPrograms"][-1] = user_program