Source code for sm_bluesky.common.plans.alignments

from collections.abc import Callable
from enum import Enum
from functools import wraps
from typing import ParamSpec, TypeVar

from bluesky import preprocessors as bpp
from bluesky.callbacks.fitting import PeakStats
from bluesky.plan_stubs import abs_set, read
from bluesky.plans import scan
from bluesky.utils import MsgGenerator, plan
from ophyd_async.core import StandardReadable
from ophyd_async.epics.motor import Motor

from sm_bluesky.common.math_functions import cal_range_num
from sm_bluesky.common.plan_stubs import MotorTable
from sm_bluesky.log import LOGGER

from .fast_scan import fast_scan_1d


[docs] class StatPosition(tuple, Enum): """ Data table to help access the fit data.\n Com: Centre of mass\n CEN: Peak position\n MIN: Minimum value\n MAX: Maximum value\n D: Differential\n """ COM = ("stats", "com") CEN = ("stats", "cen") MIN = ("stats", "min") MAX = ("stats", "max") D_COM = ("derivative_stats", "com") D_CEN = ("derivative_stats", "cen") D_MIN = ("derivative_stats", "min") D_MAX = ("derivative_stats", "max")
P = ParamSpec("P") T = TypeVar("T") TCallable = Callable[ P, T ] # Type for callable functions with parameters P and return type T def scan_and_move_to_fit_pos(func: TCallable) -> TCallable: """ Wrapper to add a PeakStats callback before performing a scan and move to the fitted position after the scan. Parameters ---------- funcs : TCallable The scan function to wrap. Returns ------- TCallable The wrapped scan function. """ @wraps(func) def inner( det: StandardReadable, motor: Motor, fitted_loc: StatPosition, detname_suffix: str, *args, **kwargs, ) -> MsgGenerator: ps = PeakStats( f"{motor.name}", f"{det.name}-{detname_suffix}", calc_derivative_and_stats=True, ) yield from bpp.subs_wrapper( func(det, motor, fitted_loc, detname_suffix, *args, **kwargs), ps, ) peak_position = get_stat_loc(ps, fitted_loc) LOGGER.info(f"Fit info {ps}") yield from abs_set(motor, peak_position, wait=True) return inner
[docs] @plan @scan_and_move_to_fit_pos def step_scan_and_move_fit( det: StandardReadable, motor: Motor, fitted_loc: StatPosition, detname_suffix: str, start: float, end: float, num: int, ) -> MsgGenerator: """ Perform a step scan and move to the fitted position. Parameters ---------- det : StandardReadable The detector to use for alignment. motor : Motor The motor to center. fitted_loc : StatPosition The fitted position to move to (see StatPosition). detname_suffix : str The suffix for the detector name. start : float The starting position for the scan. end : float The ending position for the scan. num : int The number of steps in the scan. Returns ------- MsgGenerator A Bluesky generator for the scan. """ LOGGER.info( f"Step scanning {motor.name} with {det.name}-{detname_suffix}\ pro-scan move to {fitted_loc}" ) return scan([det], motor, start, end, num=num)
[docs] @plan @scan_and_move_to_fit_pos def fast_scan_and_move_fit( det: StandardReadable, motor: Motor, fitted_loc: StatPosition, detname_suffix: str, start: float, end: float, motor_speed: float | None = None, ) -> MsgGenerator: """Does a fast non-stopping scan and move to the fitted position. Parameters ---------- det: StandardReadable, Detector to be use for alignment. motor: Motor Motor devices that is being centre. fitted_loc: StatPosition Which fitted position to move to see StatPosition. detname_suffix: Str Name of the fitted axis within the detector start: float, Starting position for the scan. end: float, Ending position for the scan. motor_speed: Optional[float] = None, Speed of the motor. """ LOGGER.info( f"Fast scanning {motor.hints} with {det.name}-{detname_suffix}\ pro-scan move to {fitted_loc}" ) return fast_scan_1d( dets=[det], motor=motor, start=start, end=end, motor_speed=motor_speed )
def get_stat_loc(ps: PeakStats, loc: StatPosition) -> float: """Helper to check the fit was done correctly and return requested stats position.""" peak_stat = ps[loc.value[0]] if not peak_stat: raise ValueError("Fitting failed, check devices name are correct.") peak_stat = peak_stat._asdict() if not peak_stat["fwhm"]: raise ValueError("Fitting failed, no peak within scan range.") stat_pos = peak_stat[loc.value[1]] return stat_pos if isinstance(stat_pos, float) else stat_pos[0]
[docs] @plan def align_slit_with_look_up( motor: Motor, size: float, slit_table: dict[str, float], det: StandardReadable, centre_type: StatPosition, ) -> MsgGenerator: """Perform a step scan with therange and starting motor position given/calculated by using a look up table(dictionary). Move to the peak position after the scan and update the lookup table. Parameters ---------- motor: Motor Motor devices that is being centre. size: float, The size/name in the motor_table. motor_table: dict[str, float], Look up table for motor position, the str part should be the size of the slit in um. det: StandardReadable, Detector to be use for alignment. centre_type: StatPosition Which fitted position to move to see StatPosition. """ MotorTable.model_validate(slit_table) if str(int(size)) in slit_table: start_pos, end_pos, num = cal_range_num( cen=slit_table[str(size)], range=size / 1000 * 3, size=size / 5000.0 ) else: raise ValueError(f"Size of {size} is not in {slit_table.keys}") yield from step_scan_and_move_fit( det=det, motor=motor, start=start_pos, detname_suffix="value", end=end_pos, fitted_loc=centre_type, num=num, ) temp = yield from read(motor.user_readback) slit_table[str(size)] = temp[f"{motor.name}"]["value"]