from collections.abc import Sequence
from math import floor, sqrt
from typing import Any, TypedDict
import bluesky.plan_stubs as bps
import bluesky.plans as bp
from blueapi.core import MsgGenerator
from bluesky.preprocessors import finalize_wrapper
from bluesky.protocols import Readable
from bluesky.utils import plan
from dodal.plan_stubs.data_session import attach_data_session_metadata_decorator
from ophyd_async.epics.adcore import AreaDetector, SingleTriggerDetector
from ophyd_async.epics.motor import Motor
from sm_bluesky.common.math_functions import step_size_to_step_num
from sm_bluesky.common.plan_stubs import (
check_within_limit,
get_motor_positions,
get_velocity_and_step_size,
set_area_detector_acquire_time,
)
from sm_bluesky.common.plans.fast_scan import fast_scan_grid
from sm_bluesky.log import LOGGER
class CleanUpArgs(TypedDict, total=False):
Home: bool
Origin: list[Motor | float]
"""Dictionary to store clean up options."""
[docs]
@plan
@attach_data_session_metadata_decorator()
def grid_step_scan(
dets: Sequence[Readable],
count_time: float,
x_step_motor: Motor,
x_step_start: float,
x_step_end: float,
x_step_size: float,
y_step_motor: Motor,
y_step_start: float,
y_step_end: float,
y_step_size: float,
home: bool = False,
snake: bool = False,
md: dict | None = None,
) -> MsgGenerator:
"""
Standard Bluesky grid scan adapted to use step size.
Optionally moves back to the original position after scan.
Parameters
----------
dets : Sequence[Readable]
Area detectors or readable devices.
count_time : float
Detector count time.
x_step_motor : Motor
Motor for the X axis.
x_step_start : float
Starting position for x_step_motor.
x_step_end : float
Ending position for x_step_motor.
x_step_size : float
Step size for x motor.
y_step_motor : Motor
Motor for the Y axis.
y_step_start : float
Starting position for y_step_motor.
y_step_end : float
Ending position for y_step_motor.
y_step_size : float
Step size for y motor.
home : bool, optional
If True, move back to position before scan.
snake : bool, optional
If True, do grid scan without moving scan axis back to start position.
md : dict, optional
Metadata.
Returns
-------
MsgGenerator
A Bluesky generator for the scan.
"""
# Check limits before doing anything
yield from check_within_limit([x_step_start, x_step_end], x_step_motor)
yield from check_within_limit([y_step_start, y_step_end], y_step_motor)
clean_up_arg: CleanUpArgs = {"Home": home}
if home:
# Store original positions to return to after scan
clean_up_arg["Origin"] = yield from get_motor_positions(
x_step_motor, y_step_motor
) # type: ignore
main_det = dets[0]
if isinstance(main_det, AreaDetector | SingleTriggerDetector):
yield from set_area_detector_acquire_time(main_det, acquire_time=count_time)
# Add 1 to step number to include the end point
x_num = step_size_to_step_num(x_step_start, x_step_end, x_step_size) + 1
y_num = step_size_to_step_num(y_step_start, y_step_end, y_step_size) + 1
yield from finalize_wrapper(
plan=bp.grid_scan(
dets,
x_step_motor,
x_step_start,
x_step_end,
x_num,
y_step_motor,
y_step_start,
y_step_end,
y_num,
snake_axes=snake,
md=md,
),
final_plan=clean_up(clean_up_arg),
)
[docs]
@plan
@attach_data_session_metadata_decorator()
def grid_fast_scan(
dets: list[Readable],
count_time: float,
step_motor: Motor,
step_start: float,
step_end: float,
scan_motor: Motor,
scan_start: float,
scan_end: float,
plan_time: float,
point_correction: float = 1,
step_size: float | None = None,
home: bool = False,
snake_axes: bool = True,
md: dict[str, Any] | None = None,
) -> MsgGenerator:
"""
Initiates a 2-axis scan, targeting a maximum scan speed of around 10Hz.
Calculates the number of data points based on the detector's count time.
If no step size is provided, aims for a uniform distribution of points.
Adjusts scan speed and step size to fit the desired scan duration.
Parameters
----------
dets : list[Readable]
List of detectors to use for the scan.
count_time : float
Detector count time.
step_motor : Motor
Motor for the slow axis.
step_start : float
Starting position for the step motor.
step_end : float
Ending position for the step motor.
scan_motor : Motor
Motor for the continuously moving axis.
scan_start : float
Starting position for the scan motor.
scan_end : float
Ending position for the scan motor.
plan_time : float
Desired duration of the scan in seconds.
point_correction : float, optional
Scaling factor for the number of points, by default 1.
step_size : float, optional
Step size for the slow axis, by default None.
home : bool, optional
If True, move back to the original position after the scan, by default False.
snake_axes : bool, optional
If True, perform a snake scan, by default True.
md : dict, optional
Metadata for the scan, by default None.
Returns
-------
MsgGenerator
A Bluesky generator for the scan.
"""
clean_up_arg: CleanUpArgs = {"Home": home}
yield from check_within_limit([scan_start, scan_end], scan_motor)
yield from check_within_limit([step_start, step_end], step_motor)
if home:
clean_up_arg["Origin"] = yield from get_motor_positions(scan_motor, step_motor) # type: ignore
scan_acc = yield from bps.rd(scan_motor.acceleration_time)
scan_motor_speed = yield from bps.rd(scan_motor.velocity)
scan_motor_max_vel = yield from bps.rd(scan_motor.max_velocity)
step_motor_speed = yield from bps.rd(step_motor.velocity)
step_acc = yield from bps.rd(step_motor.acceleration_time)
main_det = dets[0]
if isinstance(main_det, AreaDetector):
yield from set_area_detector_acquire_time(det=main_det, acquire_time=count_time)
deadtime = main_det._controller.get_deadtime(count_time) # noqa: SLF001
elif isinstance(main_det, SingleTriggerDetector):
yield from set_area_detector_acquire_time(det=main_det, acquire_time=count_time)
deadtime = count_time
else:
deadtime = count_time
ideal_velocity, ideal_step_size = estimate_speed_steps(
plan_time=plan_time,
deadtime=deadtime,
step_start=step_start,
step_end=step_end,
step_size=step_size,
step_speed=step_motor_speed,
step_acceleration=step_acc,
scan_start=scan_start,
scan_end=scan_end,
scan_speed=scan_motor_speed,
scan_acceleration=scan_acc,
scan_max_vel=scan_motor_max_vel,
correction=point_correction,
snake_axes=snake_axes,
)
velocity, ideal_step_size = yield from get_velocity_and_step_size(
scan_motor,
ideal_velocity,
ideal_step_size,
)
num_of_step = step_size_to_step_num(step_start, step_end, ideal_step_size)
LOGGER.info(
f"Step size = {ideal_step_size}, {scan_motor.name}: velocity = {velocity}, "
f"number of steps = {num_of_step}."
)
yield from finalize_wrapper(
plan=fast_scan_grid(
dets,
step_motor,
step_start,
step_end,
num_of_step,
scan_motor,
scan_start,
scan_end,
velocity,
snake_axes=snake_axes,
md=md,
),
final_plan=clean_up(clean_up_arg),
)
def clean_up(clean_up_arg: CleanUpArgs) -> MsgGenerator:
LOGGER.info(f"Clean up: {list(clean_up_arg)}")
if clean_up_arg.get("Home") and "Origin" in clean_up_arg:
# Move motors back to stored position
yield from bps.mov(*clean_up_arg["Origin"])
def estimate_speed_steps(
plan_time: float,
deadtime: float,
step_start: float,
step_end: float,
step_size: float | None,
step_acceleration: float,
step_speed: float,
scan_start: float,
scan_end: float,
scan_acceleration: float,
scan_speed: float,
scan_max_vel: float,
snake_axes: bool,
correction: float,
) -> tuple[float, float]:
"""
Estimate the speed and step size for a scan.
"""
step_range = abs(step_start - step_end)
scan_range = abs(scan_start - scan_end)
point_per_axis = estimate_axis_points(
plan_time=plan_time,
deadtime=deadtime,
step_range=step_range,
step_acceleration=step_acceleration,
step_speed=step_speed,
scan_range=scan_range,
scan_acceleration=scan_acceleration,
scan_speed=scan_speed,
snake_axes=snake_axes,
)
point_per_step_axis = max(1, floor(point_per_axis * correction * step_range))
point_per_scan_axis = max(1, floor(point_per_axis * correction * scan_range))
# Ideal step size is evenly distributed points within the two axes.
if step_size is not None:
if step_size == 0:
raise ValueError("Step_size is 0")
ideal_step_size = abs(step_size)
else:
ideal_step_size = step_range / point_per_step_axis
# ideal_velocity: speed that allows the required step size.
if point_per_scan_axis <= 2:
ideal_velocity = scan_max_vel
else:
ideal_velocity = scan_range / (
(scan_range / ideal_step_size) * deadtime + scan_acceleration * 2
)
LOGGER.info(
f"Ideal step size = {ideal_step_size}, velocity = {ideal_velocity}, "
f"number of data points for step axis {point_per_step_axis}"
)
return ideal_velocity, ideal_step_size
def estimate_axis_points(
plan_time: float,
deadtime: float,
step_range: float,
step_acceleration: float,
step_speed: float,
scan_range: float,
scan_acceleration: float,
scan_speed: float,
snake_axes: bool = True,
num_points_per_axis: float | None = None,
) -> int:
"""
Estimate the number of points per axis for a scan.
"""
iteration_limit = 10 # Prevent infinite recursion
iteration_count = 0
if num_points_per_axis is None:
num_points_per_axis = sqrt((plan_time / deadtime) / (scan_range * step_range))
old_num_points_per_axis = num_points_per_axis
while (
iteration_count <= iteration_limit
and abs(num_points_per_axis - old_num_points_per_axis) <= 0.49
):
old_num_points_per_axis = num_points_per_axis
point_step_axis = max(1, floor(num_points_per_axis * step_range))
step_mv_time = point_step_axis * step_acceleration * 2 + step_range / step_speed
if snake_axes:
scan_mv_time = point_step_axis * (scan_acceleration * 2)
else:
point_scan_axis = max(1, floor(num_points_per_axis * scan_range))
scan_mv_time = point_scan_axis * (scan_acceleration * 2) + (
point_scan_axis - 1
) * (scan_range / scan_speed + scan_acceleration * 2)
corrected_num_points = (plan_time - step_mv_time - scan_mv_time) / deadtime
if corrected_num_points <= 0:
raise ValueError(
f"Plan time too short for the area and count time required. "
f"Plan time: {plan_time}, Step move time: {step_mv_time}, "
f"Scan move time: {scan_mv_time}, Deadtime: {deadtime}"
)
iteration_count += 1
num_points_per_axis = corrected_num_points
point_per_axis = sqrt(num_points_per_axis / (scan_range * step_range))
return max(1, floor(point_per_axis))