import re
from enum import Enum
from typing import Dict, List, Optional, Union
import cothread
import numpy as np
from annotypes import add_call_types
from scanpointgenerator import CompoundGenerator, Point
from malcolm.core import Block, Future, PartRegistrar, Put, Request
from malcolm.modules import builtin, scanning
from malcolm.modules.scanning.infos import MotionTrigger
from ..infos import MotorInfo
from ..util import (
MIN_TIME,
MinTurnaround,
all_points_joined,
all_points_same_velocities,
cs_axis_mapping,
cs_port_with_motors_in,
get_min_turnaround,
get_motion_axes,
get_motion_trigger,
point_velocities,
profile_between_points,
)
# Number of seconds that a trajectory tick is
TICK_S = 0.000001
# Longest move time we can request
MAX_MOVE_TIME = 4.0
# velocity modes
class VelocityModes:
AVERAGE_PREV_TO_NEXT = 0
REAL_PREV_TO_CURRENT = 1
AVERAGE_PREV_TO_CURRENT = 2
ZERO_VELOCITY = 3
# user programs
class UserPrograms:
NO_PROGRAM = 0 # Do nothing
LIVE_PROGRAM = 1 # GPIO123 = 1, 0, 0
DEAD_PROGRAM = 2 # GPIO123 = 0, 1, 0
MID_PROGRAM = 4 # GPIO123 = 0, 0, 1
ZERO_PROGRAM = 8 # GPIO123 = 0, 0, 0
class PointType(Enum):
START_OF_ROW = 0 # Lower bound of first point of row
MID_POINT = 1 # Position of any point
POINT_JOIN = 2 # Boundary of two joined points
END_OF_ROW = 3 # Upper boundary of last point of row
TURNAROUND = 4 # Between rows
# How many profile points to write each time
PROFILE_POINTS = 2000
# How many points to extract from a scanpointgenerator each time
BATCH_POINTS = 20000
# 80 char line lengths...
AIV = builtin.parts.AInitialVisibility
# Pull re-used annotypes into our namespace in case we are subclassed
APartName = builtin.parts.APartName
AMri = builtin.parts.AMri
[docs]class PmacChildPart(builtin.parts.ChildPart):
def __init__(
self, name: APartName, mri: AMri, initial_visibility: AIV = False
) -> None:
super().__init__(name, mri, initial_visibility)
# Axis information stored from validate
self.axis_mapping: Dict[str, MotorInfo] = {}
# Lookup of the completed_step value for each point
self.completed_steps_lookup: List[int] = []
# Minimum turnaround information
self.min_turnaround: Optional[MinTurnaround] = None
# If we are currently loading then block loading more points
self.loading = False
# Where we have generated into profile
self.end_index = 0
# Where we should stop loading points
self.steps_up_to = 0
# Have we already added a tail-off for the current profile
self.tail_off_added = False
# What sort of triggers to output
self.output_triggers: Optional[MotionTrigger] = None
# Profile points that haven't been sent yet
# {timeArray/velocityMode/userPrograms/a/b/c/u/v/w/x/y/z: [elements]}
self.profile: Dict[str, List] = {}
# accumulated intervals since the last PVT point used by sparse
# trajectory logic
self.time_since_last_pvt = 0
# Stored generator for positions
self.generator: CompoundGenerator = None
def setup(self, registrar: PartRegistrar) -> None:
super().setup(registrar)
# Hooks
registrar.hook(scanning.hooks.ValidateHook, self.on_validate)
registrar.hook(scanning.hooks.PreConfigureHook, self.reload)
registrar.hook(
(
scanning.hooks.ConfigureHook,
scanning.hooks.PostRunArmedHook,
scanning.hooks.SeekHook,
),
self.on_configure,
)
registrar.hook(scanning.hooks.RunHook, self.on_run)
registrar.hook(
(scanning.hooks.AbortHook, scanning.hooks.PauseHook), self.on_abort
)
def notify_dispatch_request(self, request: Request) -> None:
if isinstance(request, Put) and request.path[1] == "design":
# We have hooked self.reload to PreConfigure, and reload() will
# set design attribute, so explicitly allow this without checking
# it is in no_save (as it won't be in there)
pass
else:
super().notify_dispatch_request(request)
[docs] @add_call_types
def on_reset(self, context: builtin.hooks.AContext) -> None:
super().on_reset(context)
self.on_abort(context)
[docs] @add_call_types
def on_validate(
self,
context: scanning.hooks.AContext,
generator: scanning.hooks.AGenerator,
axesToMove: scanning.hooks.AAxesToMove,
part_info: scanning.hooks.APartInfo,
) -> scanning.hooks.UParameterTweakInfos:
child = context.block_view(self.mri)
# Check that we can move all the requested axes
available = set(child.layout.value.name)
motion_axes = get_motion_axes(generator, axesToMove)
assert available.issuperset(
motion_axes
), "Some of the requested axes %s are not on the motor list %s" % (
list(axesToMove),
sorted(available),
)
# Find the duration
duration = generator.duration
assert duration >= 0.0, f"{self.name}: negative duration is not supported"
# Check if we should guess the duration
if duration == 0.0:
# We need to tweak the duration if we are going to take part
if self.taking_part_in_scan(part_info, motion_axes):
duration = self.calculate_generator_duration(
context, generator, part_info, motion_axes
)
# We may have not been able to tweak duration if axis mappings are
# missing.
if not duration:
return None
else:
return None
# If GPIO is demanded for every point we need to align to the servo
# cycle
trigger = get_motion_trigger(part_info)
if trigger == scanning.infos.MotionTrigger.EVERY_POINT:
servo_freq = child.servoFrequency()
duration = self.get_aligned_duration_with_servo_frequency(
servo_freq, duration
)
# Check if the duration was tweaked and return
if duration != generator.duration:
self.log.debug(
f"{self.name}: tweaking duration from {generator.duration} to "
f"{duration}"
)
serialized = generator.to_dict()
new_generator = CompoundGenerator.from_dict(serialized)
new_generator.duration = duration
return scanning.infos.ParameterTweakInfo("generator", new_generator)
else:
return None
@staticmethod
def get_aligned_duration_with_servo_frequency(
frequency: float, duration: float
) -> float:
# convert half an exposure to multiple of servo ticks, rounding up
ticks = np.ceil(frequency * 0.5 * duration)
if not np.isclose(frequency, 3200):
# -0.002 for some observed jitter in the servo frequency if I10
# isn't a whole number of 1/4 us move timer ticks
# (any frequency apart from 3.2 kHz)
ticks -= 0.002
# convert to integer number of microseconds, rounding down
micros = np.floor(ticks / frequency * 1e6)
# back to duration
duration = 2 * float(micros) / 1e6
return duration
def taking_part_in_scan(
self,
part_info: scanning.hooks.APartInfo,
motion_axes: List[str],
) -> bool:
# Check if we should be taking part in the scan
need_gpio = get_motion_trigger(part_info) != scanning.infos.MotionTrigger.NONE
if motion_axes or need_gpio:
return True
else:
return False
[docs] def calculate_generator_duration(
self,
context: scanning.hooks.AContext,
generator: scanning.hooks.AGenerator,
part_info: scanning.hooks.APartInfo,
motion_axes: List[str],
) -> Optional[float]:
"""
Calculate a generator duration based on the generator and axes we are moving
"""
child = context.block_view(self.mri)
layout_table = child.layout.value
# Check if we are moving axes in this scan
if motion_axes:
# TODO: what happens if axes are not mapped due to the current config on
# the brick not matching the target config for the scan?
try:
axis_mapping = cs_axis_mapping(context, layout_table, motion_axes)
except AssertionError:
# We can't check the axes as they are not mapped, so don't tweak the
# generator and just return
self.log.debug(
f"{self.name}: can't guess generator duration during "
f"validate as axis mappings are not loaded or missing."
)
return None
# Step scans and fly scans behave differently
generator.prepare()
if generator.continuous and generator.size > 1:
# Estimate the duration for fly scans using the distance between the
# first two points and max velocities of participating axes
first_point = generator.get_point(0)
second_point = generator.get_point(1)
return self.calculate_duration_from_first_two_points(
axis_mapping,
first_point,
second_point,
)
else:
# Step scans have turnarounds at each point so can use this value
min_turnaround = get_min_turnaround(part_info)
return min_turnaround.time
else:
# Not moving axes so just return time of one tick
return TICK_S
[docs] def calculate_duration_from_first_two_points(
self,
axis_mapping: Dict[str, MotorInfo],
first_point: Point,
second_point: Point,
) -> float:
"""
Guess a duration by using the distance between two points and the max velocities
"""
duration = 0.0
for axis_name, motor_info in axis_mapping.items():
# Distance between two points
distance = (
second_point.positions[axis_name] - first_point.positions[axis_name]
)
# So calculate time taken between two points based on max velocity
min_duration = distance / motor_info.max_velocity
# Tweak duration based on slowest axis to move between points
if min_duration > duration:
duration = min_duration
# If duration is zero, it means we don't have any axes moving during these
# points so just tweak duration to time of one tick.
if duration == 0.0:
duration = TICK_S
return duration
def move_to_start(self, child: Block, cs_port: str, completed_steps: int) -> Future:
# Work out what method to call
match = re.search(r"\d+$", cs_port)
assert match, f"Cannot extract CS number from CS port '{cs_port}'"
move_async = child[f"moveCS{match.group()}_async"]
# Set all the axes to move to the start positions
first_point = self.generator.get_point(completed_steps)
args = {}
move_to_start_time = 0.0
for axis_name, velocity in point_velocities(
self.axis_mapping, first_point
).items():
motor_info: MotorInfo = self.axis_mapping[axis_name]
acceleration_distance = motor_info.ramp_distance(
0, velocity, min_ramp_time=MIN_TIME
)
start_pos = first_point.lower[axis_name] - acceleration_distance
assert motor_info.check_position_within_soft_limits(start_pos) is True, (
f"{motor_info.scannable}: ramp start position {start_pos} outside "
f"soft limits {motor_info.user_low_limit, motor_info.user_high_limit}"
)
args[motor_info.cs_axis.lower()] = start_pos
# Time profile that the move is likely to take
# NOTE: this is only accurate if pmac max velocity in linear motion
# prog is set to same speed as motor record VMAX
profile = motor_info.make_velocity_profile(
0, 0, motor_info.current_position - start_pos, 0
)
times, _ = profile.make_arrays()
move_to_start_time = max(times[-1], move_to_start_time)
# Call the method with the values
fs = move_async(moveTime=move_to_start_time, **args)
return fs
[docs] def get_cs_port(
self,
context: scanning.hooks.AContext,
motion_axes: List[str],
) -> str:
"""Work out the cs_port and motion_axes we should be using"""
child = context.block_view(self.mri)
layout_table = child.layout.value
if motion_axes:
self.axis_mapping = cs_axis_mapping(context, layout_table, motion_axes)
# Check units for everything in the axis mapping
# TODO: reinstate this when GDA does it properly
# for axis_name, motor_info in sorted(self.axis_mapping.items()):
# assert motor_info.units == generator.units[axis_name], \
# "%s: Expected scan units of %r, got %r" % (
# axis_name, motor_info.units, generator.units[axis_name])
# Guaranteed to have an entry in axis_mapping otherwise
# cs_axis_mapping would fail, so pick its cs_port
cs_port = list(self.axis_mapping.values())[0].cs_port
else:
# No axes to move, but if told to output triggers we still need to
# do something
self.axis_mapping = {}
# Pick the first cs we find that has an axis assigned
cs_port = cs_port_with_motors_in(context, layout_table)
return cs_port
# Allow CamelCase as arguments will be serialized
# noinspection PyPep8Naming
[docs] @add_call_types
def on_run(self, context: scanning.hooks.AContext) -> None:
if self.generator:
self.loading = False
child = context.block_view(self.mri)
# Wait for the trajectory to run and complete
child.pointsScanned.subscribe_value(self.update_step, child)
# TODO: we should return at the end of the last point for PostRun
child.executeProfile()
[docs] @add_call_types
def on_abort(self, context: scanning.hooks.AContext) -> None:
if self.generator:
child = context.block_view(self.mri)
# TODO: if we abort during move to start, what happens?
child.abortProfile()
def update_step(self, scanned, child):
# scanned is an index into the completed_steps_lookup, so a
# "how far through the pmac trajectory" rather than a generator
# scan step
if scanned > 0:
# Only report progress if we are triggering for every point
if self.output_triggers == scanning.infos.MotionTrigger.EVERY_POINT:
completed_steps = self.completed_steps_lookup[scanned - 1]
self.registrar.report(scanning.infos.RunProgressInfo(completed_steps))
# Keep PROFILE_POINTS trajectory points in front
if (
not self.loading
and self.end_index < self.steps_up_to
and len(self.completed_steps_lookup) - scanned < PROFILE_POINTS
):
self.loading = True
self.calculate_generator_profile(self.end_index)
self.write_profile_points(child)
self.loading = False
# If we got to the end, there might be some leftover points that
# need to be appended to finish - but we should wait until we can
# append them.
if (
not self.loading
and self.end_index == self.steps_up_to
and self.profile["timeArray"]
):
self.loading = True
self.calculate_generator_profile(self.end_index)
self.write_profile_points(child)
# If we are triggering at every point, then only one load should be
# required. Otherwise for row triggering we may have more than 2000
# points left in the profile due to sparse generation, so skip the
# check.
if self.output_triggers == scanning.infos.MotionTrigger.EVERY_POINT:
assert not self.profile[
"timeArray"
], f"Why do we still have points? {self.profile}"
self.loading = False
[docs] def write_profile_points(self, child, cs_port=None):
"""Build profile using given data
Args:
child (Block): Child block for running
cs_port (str): CS Port if this is a build rather than append
"""
# These are the things we will send
args = {}
if cs_port is not None:
args["csPort"] = cs_port
for k, v in self.profile.items():
# store the remnant back in the array
self.profile[k] = v[PROFILE_POINTS:]
v = v[:PROFILE_POINTS]
if k == "timeArray":
overflow = 0.0
time_array_ticks = []
for t in v:
ticks = t / TICK_S
overflow += ticks % 1
ticks = int(ticks)
if overflow > 0.5:
overflow -= 1
ticks += 1
time_array_ticks.append(ticks)
# TODO: overflow discarded every 10000 points, is it a problem?
v = np.array(time_array_ticks, np.int32)
elif k in ("velocityMode", "userPrograms"):
v = np.array(v, np.int32)
else:
v = np.array(v, np.float64)
args[k] = v
child.writeProfile(**args)
user_program = {
scanning.infos.MotionTrigger.NONE: {
PointType.POINT_JOIN: UserPrograms.NO_PROGRAM,
PointType.START_OF_ROW: UserPrograms.NO_PROGRAM,
PointType.MID_POINT: UserPrograms.NO_PROGRAM,
PointType.END_OF_ROW: UserPrograms.NO_PROGRAM,
PointType.TURNAROUND: UserPrograms.NO_PROGRAM,
},
scanning.infos.MotionTrigger.ROW_GATE: {
PointType.POINT_JOIN: UserPrograms.NO_PROGRAM,
PointType.START_OF_ROW: UserPrograms.LIVE_PROGRAM,
PointType.MID_POINT: UserPrograms.NO_PROGRAM,
PointType.END_OF_ROW: UserPrograms.ZERO_PROGRAM,
PointType.TURNAROUND: UserPrograms.NO_PROGRAM,
},
scanning.infos.MotionTrigger.EVERY_POINT: {
PointType.POINT_JOIN: UserPrograms.LIVE_PROGRAM,
PointType.START_OF_ROW: UserPrograms.LIVE_PROGRAM,
PointType.MID_POINT: UserPrograms.MID_PROGRAM,
PointType.END_OF_ROW: UserPrograms.DEAD_PROGRAM,
PointType.TURNAROUND: UserPrograms.ZERO_PROGRAM,
},
}
def get_user_program(self, point_type: PointType) -> int:
assert self.output_triggers, "No output triggers"
return self.user_program[self.output_triggers][point_type]
def calculate_profile_from_velocities(
self, time_arrays, velocity_arrays, current_positions, completed_steps
):
# at this point we have time/velocity arrays with 2-4 values for each
# axis. Each time represents a (instantaneous) change in acceleration.
# We want to translate this into a move profile (time/position).
# Every axis profile must have a point for each of the times from
# all axes combined
# extract the time points from all axes
t_list = []
for time_array in time_arrays.values():
t_list.extend(time_array)
combined_times = np.array(t_list)
combined_times = np.unique(combined_times)
# remove the 0 time initial point
combined_times = list(np.sort(combined_times))[1:]
num_intervals = len(combined_times)
# set up the time, mode and user arrays for the trajectory
prev_time = 0
time_intervals = []
for t in combined_times:
# times are absolute - convert to intervals
time_intervals.append(t - prev_time)
prev_time = t
# generate the profile positions in a temporary list of dict:
turnaround_profile = [{} for n in range(num_intervals)]
# Do this for each axis' velocity and time arrays
for axis_name, motor_info in self.axis_mapping.items():
axis_times = time_arrays[axis_name]
axis_velocities = velocity_arrays[axis_name]
prev_velocity = axis_velocities[0]
position = current_positions[axis_name]
# tracks the accumulated interpolated interval time since the
# last axis velocity profile point
time_interval = 0
# At this point we have time/velocity arrays with multiple values
# some of which align with the axis_times and some interleave.
# We want to create a matching move profile of 'num_intervals'
axis_pt = 1
for i in range(num_intervals):
axis_velocity = axis_velocities[axis_pt]
axis_prev_velocity = axis_velocities[axis_pt - 1]
axis_interval = axis_times[axis_pt] - axis_times[axis_pt - 1]
if np.isclose(combined_times[i], axis_times[axis_pt]):
# this combined point matches the axis point
# use the axis velocity and move to the next axis point
this_velocity = axis_velocities[axis_pt]
axis_pt += 1
time_interval = 0
else:
# this combined point is between two axis points,
# interpolate the velocity between those axis points
time_interval += time_intervals[i]
fraction = time_interval / axis_interval
dv = axis_velocity - axis_prev_velocity
this_velocity = axis_prev_velocity + fraction * dv
part_position = motor_info.ramp_distance(
prev_velocity, this_velocity, time_intervals[i]
)
prev_velocity = this_velocity
position += part_position
turnaround_profile[i][axis_name] = position
user_program = self.get_user_program(PointType.TURNAROUND)
for i in range(num_intervals):
self.add_profile_point(
time_intervals[i],
VelocityModes.REAL_PREV_TO_CURRENT,
user_program,
completed_steps,
turnaround_profile[i],
)
def add_profile_point(
self, time_point, velocity_mode, user_program, completed_step, axis_points
):
# Add padding if the move time exceeds the max pmac move time
if time_point > MAX_MOVE_TIME:
assert self.profile[
"timeArray"
], "Can't stretch the first point of a profile"
nsplit = int(time_point / MAX_MOVE_TIME + 1)
for _ in range(nsplit):
self.profile["timeArray"].append(time_point / nsplit)
for _ in range(nsplit - 1):
self.profile["velocityMode"].append(VelocityModes.AVERAGE_PREV_TO_NEXT)
self.profile["userPrograms"].append(UserPrograms.NO_PROGRAM)
for k, v in axis_points.items():
cs_axis = self.axis_mapping[k].cs_axis.lower()
last_point = self.profile[cs_axis][-1]
per_section = float(v - last_point) / nsplit
for i in range(1, nsplit):
self.profile[cs_axis].append(last_point + i * per_section)
last_completed_step = self.completed_steps_lookup[-1]
for _ in range(nsplit - 1):
self.completed_steps_lookup.append(last_completed_step)
else:
# Add point
self.profile["timeArray"].append(time_point)
# Set the requested point
self.profile["velocityMode"].append(velocity_mode)
self.profile["userPrograms"].append(user_program)
self.completed_steps_lookup.append(completed_step)
for k, v in axis_points.items():
cs_axis = self.axis_mapping[k].cs_axis.lower()
self.profile[cs_axis].append(v)
def add_generator_point_pair(self, point, point_num, points_are_joined):
# Add position
user_program = self.get_user_program(PointType.MID_POINT)
self.add_profile_point(
point.duration / 2.0,
VelocityModes.AVERAGE_PREV_TO_NEXT,
user_program,
point_num,
{name: point.positions[name] for name in self.axis_mapping},
)
# insert the lower bound of the next frame
if points_are_joined:
user_program = self.get_user_program(PointType.POINT_JOIN)
velocity_point = VelocityModes.AVERAGE_PREV_TO_NEXT
else:
user_program = self.get_user_program(PointType.END_OF_ROW)
velocity_point = VelocityModes.REAL_PREV_TO_CURRENT
self.add_profile_point(
point.duration / 2.0,
velocity_point,
user_program,
point_num + 1,
{name: point.upper[name] for name in self.axis_mapping},
)
[docs] def add_sparse_point(self, points, point_num, points_are_joined, same_velocities):
"""
Add in points but skip those that are linear to create a sparse
trajectory. Add the upper bound when the points are non-linear.
Always add the upper bound for the last point in a row (not joined to
the next point).
Joined| Same Vel|| Add Point | Add Upper
0 | 0 || Y | Y
0 | 1 || N | Y
1 | 0 || Y | Y
1 | 1 || N | N
NOTE:
This function may be called millions of times during the configure
phase and hence is highly optimized. In particular the use of variable
'point' looks like it may be referenced before assignment. The paths
controlled by the matrix above show that it will be always be assigned
or not used at all. Indexing into points[] is expensive so this is
intentional.
"""
profile_point_added = False
if self.time_since_last_pvt > 0 and not points_are_joined:
# assume we can skip if we are at the end of a row and we
# just skipped the most recent point (i.e. time_since_last_pvt > 0)
do_skip = True
else:
# otherwise skip this point if it is linear to previous point
do_skip = points_are_joined and same_velocities
if do_skip:
self.time_since_last_pvt += points.duration[point_num]
else:
# not skipping - add this mid or end point
point = points[point_num]
user_program = self.get_user_program(PointType.MID_POINT)
self.add_profile_point(
self.time_since_last_pvt + point.duration / 2.0,
VelocityModes.AVERAGE_PREV_TO_NEXT,
user_program,
point_num,
{name: point.positions[name] for name in self.axis_mapping},
)
self.time_since_last_pvt = point.duration / 2.0
profile_point_added = True
# only add the lower bound if we did not skip this point OR if we are
# at the end of a row where we always require a final point
if not do_skip or not points_are_joined:
# insert the lower bound of the next frame (i.e. the upper bound for
# this frame)
if points_are_joined:
user_program = self.get_user_program(PointType.POINT_JOIN)
velocity_point = VelocityModes.AVERAGE_PREV_TO_NEXT
else:
point = points[point_num]
user_program = self.get_user_program(PointType.END_OF_ROW)
if self.time_since_last_pvt > 0:
# if we have previously skipped points in this row then we
# use AVERAGE_PREV_TO_CURRENT at the end of the row this
# breaks the continuous line of REAL_PREV_TO_CURRENT which
# would accumulate errors over the scan
velocity_point = VelocityModes.AVERAGE_PREV_TO_CURRENT
else:
velocity_point = VelocityModes.REAL_PREV_TO_CURRENT
self.add_profile_point(
self.time_since_last_pvt,
velocity_point,
user_program,
point_num + 1,
{name: point.upper[name] for name in self.axis_mapping},
)
self.time_since_last_pvt = 0
profile_point_added = True
return profile_point_added
def get_some_points(self, start_index):
# calculate the indices of the next batch of points to get for
# the calculate_generator_profile loop
# cap at PROFILE_POINTS (+1 so we can always get next_point)
if start_index == self.steps_up_to:
return None, None, None
if self.steps_up_to - start_index > BATCH_POINTS:
up_to = BATCH_POINTS + start_index + 1
points = self.generator.get_points(start_index, up_to)
else:
points = self.generator.get_points(start_index, self.steps_up_to)
velocities = all_points_same_velocities(points)
joined = all_points_joined(points)
return points, joined, velocities
def check_profile_length_exceeds_profile_points(self) -> bool:
# Check if we have exceeded the points number and need to write
# Strictly less than so we always add one more point to the time
# array so we can always stretch points in a subsequent add with
# the values already in the profiles
return len(self.profile["timeArray"]) > PROFILE_POINTS
def is_last_point_in_current_batch(
self, start_batch_index: int, num_points: int
) -> bool:
return start_batch_index + num_points == self.steps_up_to
def create_generator_profile_every_point(self, start_index: int) -> bool:
# Ensure we have the correct trigger type
assert (
self.output_triggers == scanning.infos.MotionTrigger.EVERY_POINT
), f"{self.name}: trigger should be every point, got {self.output_triggers}"
# Keep going until we finish or exceed the maximum length of a profile
start_batch_index = start_index
point_index = start_index
while True:
# Grab some points
points, joined, _ = self.get_some_points(start_batch_index)
if not points:
return True
# Number of points and check if the last point is in this batch
num_points = len(points)
last_point_in_batch = self.is_last_point_in_current_batch(
start_batch_index, num_points
)
# Don't do all points in the batch otherwise we can get an index error
# when adding the turnaround.
points_to_do = num_points - 1
for i in range(points_to_do):
self.add_generator_point_pair(points[i], point_index, joined[i])
# add in the turnaround between non-contiguous points
if not (joined[i]):
self.insert_gap(points[i], points[i + 1], point_index + 1)
# Check if we have exceeded the profile points limit
if self.check_profile_length_exceeds_profile_points():
self.end_index = point_index + 1
return False
# Increment point index
point_index += 1
# Check for the last point
if last_point_in_batch:
# Add the final generator point
self.add_generator_point_pair(points[-1], point_index, False)
# Check if we have exceeded the profile points limit
if self.check_profile_length_exceeds_profile_points():
self.end_index = point_index + 1
return False
# Increment point index
point_index += 1
# Increment the index for the next batch
start_batch_index = point_index
# Check if we are done
if start_batch_index == self.steps_up_to:
# Return True for a tail-off
return True
# Yield so that we don't continuously block other threads
cothread.Yield()
def create_generator_profile_sparse(self, start_index: int) -> bool:
# Ensure we have the correct trigger type
assert (
self.output_triggers != scanning.infos.MotionTrigger.EVERY_POINT
), f"{self.name}: trigger should not be every point"
# Keep going until we finish or exceed the maximum length of a profile
start_batch_index = start_index
point_index = start_index
while True:
# Grab some points
points, joined, velocities = self.get_some_points(start_batch_index)
if not points:
return True
# Number of points and check if the last point is in this batch
num_points = len(points)
last_point_in_batch = self.is_last_point_in_current_batch(
start_batch_index, num_points
)
# Don't do all points in the batch otherwise we can get an index error
# when adding the turnaround.
points_to_do = num_points - 1
for i in range(points_to_do):
point_added = self.add_sparse_point(points, i, joined[i], velocities[i])
# add in the turnaround between non-contiguous points
if not (joined[i]):
self.insert_gap(points[i], points[i + 1], point_index + 1)
# Check if we have exceeded the profile points limit. Only check if we
# have actually added a point, otherwise we waste time.
if point_added and self.check_profile_length_exceeds_profile_points():
self.end_index = point_index + 1
return False
# Increment point index
point_index += 1
# Check for the last point
if last_point_in_batch:
point_added = self.add_sparse_point(points, points_to_do, False, False)
# Check if we have exceeded the profile points limit. Only check if we
# have actually added a point, otherwise we waste time.
if point_added and self.check_profile_length_exceeds_profile_points():
self.end_index = point_index + 1
return False
# Increment point index
point_index += 1
# Increment the index for the next batch
start_batch_index = point_index
# Check if we are done
if start_batch_index == self.steps_up_to:
# Return True for a tail-off
return True
# Yield so that we don't continuously block other threads
cothread.Yield()
def calculate_generator_profile(self, start_index, do_run_up=False):
# If we are doing the first build, do_run_up will be passed to flag
# that we need a run up, else just continue from the previous point
if do_run_up:
point = self.generator.get_point(start_index)
# Calculate how long to leave for the run-up (at least MIN_TIME)
run_up_time = self.min_turnaround.interval
axis_points = {}
for axis_name, velocity in point_velocities(
self.axis_mapping, point
).items():
axis_points[axis_name] = point.lower[axis_name]
motor_info = self.axis_mapping[axis_name]
run_up_time = max(
run_up_time, motor_info.acceleration_time(0, velocity)
)
# Add lower bound
user_program = self.get_user_program(PointType.START_OF_ROW)
self.add_profile_point(
run_up_time,
VelocityModes.REAL_PREV_TO_CURRENT,
user_program,
start_index,
axis_points,
)
self.time_since_last_pvt = 0
# Create the generator profile
if self.output_triggers == scanning.infos.MotionTrigger.EVERY_POINT:
needs_tail_off = self.create_generator_profile_every_point(start_index)
else:
needs_tail_off = self.create_generator_profile_sparse(start_index)
# Only add the tail off point if we haven't done so already
if needs_tail_off and not self.tail_off_added:
self.add_tail_off()
def add_tail_off(self):
# Add the last tail off point
point = self.generator.get_point(self.steps_up_to - 1)
# Calculate how long to leave for the tail-off
# #(at least MIN_TIME)
axis_points = {}
tail_off_time = self.min_turnaround.interval
for axis_name, velocity in point_velocities(
self.axis_mapping, point, entry=False
).items():
motor_info = self.axis_mapping[axis_name]
tail_off_time = max(
tail_off_time, motor_info.acceleration_time(0, velocity)
)
tail_off = motor_info.ramp_distance(velocity, 0)
tail_off_pos = point.upper[axis_name] + tail_off
assert motor_info.check_position_within_soft_limits(tail_off_pos) is True, (
f"{motor_info.scannable}: tail off position {tail_off_pos} outside "
f"soft limits {motor_info.user_low_limit, motor_info.user_high_limit}"
)
axis_points[axis_name] = tail_off_pos
# Do the last move
user_program = self.get_user_program(PointType.TURNAROUND)
self.add_profile_point(
tail_off_time,
VelocityModes.ZERO_VELOCITY,
user_program,
self.steps_up_to,
axis_points,
)
self.end_index = self.steps_up_to
self.tail_off_added = True
def insert_gap(self, point, next_point, completed_steps):
# Work out the velocity profiles of how to move to the start
min_turnaround = max(self.min_turnaround.time, point.delay_after)
time_arrays, velocity_arrays = profile_between_points(
self.axis_mapping,
point,
next_point,
min_turnaround,
self.min_turnaround.interval,
)
start_positions = {}
for axis_name in self.axis_mapping:
start_positions[axis_name] = point.upper[axis_name]
# Work out the Position trajectories from these profiles
self.calculate_profile_from_velocities(
time_arrays, velocity_arrays, start_positions, completed_steps
)
# make sure the last point is the same as next_point.lower since
# calculate_profile_from_velocities fails when the turnaround is 2
# points only
for axis_name, motor_info in self.axis_mapping.items():
self.profile[motor_info.cs_axis.lower()][-1] = next_point.lower[axis_name]
# Change the last point to be a live frame
self.profile["velocityMode"][-1] = VelocityModes.REAL_PREV_TO_CURRENT
user_program = self.get_user_program(PointType.START_OF_ROW)
self.profile["userPrograms"][-1] = user_program