from typing import Any
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from blueapi.core import MsgGenerator
from bluesky.preprocessors import (
finalize_wrapper,
)
from bluesky.protocols import Readable
from bluesky.utils import plan, short_uid
from dodal.plan_stubs.data_session import attach_data_session_metadata_decorator
from numpy import linspace
from ophyd_async.core import FlyMotorInfo
from ophyd_async.epics.motor import Motor
from sm_bluesky.common.helper import add_extra_names_to_meta
from sm_bluesky.common.plan_stubs import check_within_limit
from sm_bluesky.log import LOGGER
[docs]
@plan
@attach_data_session_metadata_decorator()
def fast_scan_1d(
dets: list[Readable],
motor: Motor,
start: float,
end: float,
motor_speed: float | None = None,
md: dict[str, Any] | None = None,
) -> MsgGenerator:
"""
Perform a fast scan along one axis.
Parameters
----------
dets : list[Readable]
List of readable objects (e.g., detectors).
motor : Motor
The motor to move during the scan.
start : float
The starting position of the motor.
end : float
The ending position of the motor.
motor_speed : Optional[float], optional
The speed of the motor during the scan. If None,
the motor's current speed is used.
Returns
-------
MsgGenerator
A Bluesky generator for the scan.
"""
if md is None:
md = {}
@bpp.stage_decorator(dets)
@bpp.run_decorator(md=md)
def inner_fast_scan_1d(
dets: list[Any],
motor: Motor,
start: float,
end: float,
motor_speed: float | None = None,
):
yield from check_within_limit([start, end], motor)
yield from _fast_scan_1d(dets, motor, start, end, motor_speed)
yield from finalize_wrapper(
plan=inner_fast_scan_1d(dets, motor, start, end, motor_speed),
final_plan=clean_up(),
)
[docs]
@plan
@attach_data_session_metadata_decorator()
def fast_scan_grid(
dets: list[Readable],
step_motor: Motor,
step_start: float,
step_end: float,
num_step: int,
scan_motor: Motor,
scan_start: float,
scan_end: float,
motor_speed: float | None = None,
snake_axes: bool = False,
md: dict[str, Any] | None = None,
) -> MsgGenerator:
"""
Same as fast_scan_1d with an extra axis to step through forming a grid.
Parameters
----------
detectors : list
list of 'readable' objects
step_motor :
Motor (moveable, readable)
step_start :
Starting position for slow/stepping motor.
step_end :
Ending position for step motro.
num_step:
Number of steps to take going from start to end.
scan_motor: Motor (moveable, readable)
The motor that will not stop during measurements.
scan_start:
Scan motor starting position.
scan_end:
Scan motor ending position.
motor_speed: float optional.
Speed of the scanning motor during measurements.
If None, it will use current speed.
snake_axes:
If Ture. Scan motor will start an other line where it ended.
md:
place holder for meta data for future.
"""
if md is None:
md = {}
md = add_extra_names_to_meta(md, "detectors", [det.name for det in dets])
md = add_extra_names_to_meta(md, "motors", [scan_motor.name, step_motor.name])
@bpp.stage_decorator(dets)
@bpp.run_decorator(md=md)
def inner_fast_scan_grid(
dets: list[Any],
step_motor: Motor,
step_start: float,
step_end: float,
num_step: int,
scan_motor: Motor,
scan_start: float,
scan_end: float,
motor_speed: float | None = None,
snake_axes: bool = False,
):
yield from check_within_limit([step_start, step_end], step_motor)
yield from check_within_limit([scan_start, scan_end], scan_motor)
steps = linspace(step_start, step_end, num_step, endpoint=True)
if snake_axes:
for cnt, step in enumerate(steps):
yield from bps.abs_set(step_motor, step)
if cnt % 2 == 0:
yield from _fast_scan_1d(
dets + [step_motor],
scan_motor,
scan_start,
scan_end,
motor_speed,
)
else:
yield from _fast_scan_1d(
dets + [step_motor],
scan_motor,
scan_end,
scan_start,
motor_speed,
)
else:
for step in steps:
yield from bps.abs_set(step_motor, step)
yield from _fast_scan_1d(
dets + [step_motor], scan_motor, scan_start, scan_end, motor_speed
)
yield from finalize_wrapper(
plan=inner_fast_scan_grid(
dets,
step_motor,
step_start,
step_end,
num_step,
scan_motor,
scan_start,
scan_end,
motor_speed,
snake_axes,
),
final_plan=clean_up(),
)
@plan
def reset_speed(old_speed, motor: Motor) -> MsgGenerator:
LOGGER.info(f"Clean up: setting motor speed to {old_speed}.")
if old_speed:
yield from bps.abs_set(motor.velocity, old_speed)
def clean_up():
LOGGER.info("Clean up")
# possibly use to move back to starting position.
yield from bps.null()
@plan
def _fast_scan_1d(
dets: list[Any],
motor: Motor,
start: float,
end: float,
motor_speed: float | None = None,
) -> MsgGenerator:
"""
The logic for one axis fast scan, used in fast_scan_1d and fast_scan_grid
In this scan:
1) The motor moves to the starting point.
2) The motor speed is changed
3) The motor is set in motion toward the end point
4) During this movement detectors are triggered and read out until
the endpoint is reached or stopped.
5) Clean up, reset motor speed.
Note: This is purely software triggering which result in variable accuracy.
However, fast scan does not require encoder and hardware setup and should
work for all motor. It is most frequently use for alignment and
slow motion measurements.
Parameters
----------
detectors : list
list of 'readable' objects
motor : Motor (moveable, readable)
start: float
starting position.
end: float,
ending position
motor_speed: Optional[float] = None,
The speed of the motor during scan
"""
# read the current speed and store it
old_speed: float = yield from bps.rd(motor.velocity)
def inner_fast_scan_1d(
dets: list[Any],
motor: Motor,
start: float,
end: float,
motor_speed: float | None = None,
):
if not motor_speed:
motor_speed = old_speed
LOGGER.info(
f"Starting 1d fly scan with {motor.name}:"
+ f" start position = {start}, end position = {end}."
)
grp = short_uid("prepare")
fly_info = FlyMotorInfo(
start_position=start,
end_position=end,
time_for_move=abs(start - end) / motor_speed,
)
yield from bps.prepare(motor, fly_info, group=grp, wait=True)
yield from bps.wait(group=grp)
yield from bps.kickoff(motor, group=grp, wait=True)
LOGGER.info(f"flying motor = {motor.name} at speed = {motor_speed}")
done = yield from bps.complete(motor)
yield from bps.trigger_and_read(dets + [motor])
while not done.done:
yield from bps.trigger_and_read(dets + [motor])
yield from bps.checkpoint()
yield from finalize_wrapper(
plan=inner_fast_scan_1d(dets, motor, start, end, motor_speed),
final_plan=reset_speed(old_speed, motor),
)