Source code for sm_bluesky.common.plan_stubs.motions

from collections.abc import Hashable, Iterator
from typing import Any

import bluesky.plan_stubs as bps
from bluesky.plan_stubs import abs_set
from bluesky.utils import MsgGenerator, plan
from dodal.devices.slits import Slits
from ophyd_async.epics.motor import Motor
from pydantic import RootModel

from sm_bluesky.log import LOGGER


[docs] class MotorTable(RootModel): """RootModel for motor tables""" root: dict[str, float]
[docs] @plan def move_motor_with_look_up( slit: Motor, size: float, motor_table: dict[str, float], use_motor_position: bool = False, wait: bool = True, group: Hashable | None = None, ) -> MsgGenerator: """Perform a step scan with the range and starting motor position given/calculated by using a look up table(dictionary). Move to the peak position after the scan and update the lookup table. Parameters ---------- motor: Motor Motor devices that is being centre. size: float The motor position or name in the motor_table. motor_table: dict[str, float], Look up table for motor position, use_motor_position: bool = False, If Ture it will take motor position as size. wait: bool = True, If Ture, it will wait until position is reached. group: Hashable | None = None, Bluesky group identifier used by ‘wait’. """ MotorTable.model_validate(motor_table) if use_motor_position: yield from abs_set(slit, size, wait=wait, group=group) elif str(int(size)) in motor_table: yield from abs_set(slit, motor_table[str(int(size))], wait=wait, group=group) else: raise ValueError( f"No slit with size={size}. Available slit size: {motor_table}" )
[docs] @plan def set_slit_size( xy_slit: Slits, x_size: float, y_size: float | None = None, wait: bool = True, group: Hashable | None = None, ) -> MsgGenerator: """Set opening of x-y slit. Parameters ---------- xy_slit: Slits A slits device. x_size: float The x opening size. y_size: float The y opening size. wait: bool If this is True it will wait for all motions to finish. group (optional): Hashable Bluesky group identifier used by ‘wait’. """ if wait and group is None: group = f"{xy_slit.name}_wait" if y_size is None: y_size = x_size LOGGER.info(f"Setting {xy_slit.name} to x = {x_size}, y = {y_size}.") yield from bps.abs_set(xy_slit.x_gap, x_size, group=group) yield from bps.abs_set(xy_slit.y_gap, y_size, group=group) if wait: LOGGER.info(f"Waiting for {xy_slit.name} to finish move.") yield from bps.wait(group=group)
[docs] @plan def check_within_limit(values: list[float], motor: Motor): """Check if the given values are within the limits of the motor. Parameters ---------- values : List[float] The values to check. motor : Motor The motor to check the limits of. Raises ------ ValueError If any value is outside the motor's limits. """ LOGGER.info(f"Check {motor.name} limits.") lower_limit = yield from bps.rd(motor.low_limit_travel) high_limit = yield from bps.rd(motor.high_limit_travel) for value in values: if not lower_limit < value < high_limit: raise ValueError( f"{motor.name} move request of {value} is beyond limits:" f"{lower_limit} < {high_limit}" )
[docs] def get_motor_positions(*arg: Motor) -> Iterator[tuple[str, float]]: """ Get the motor positions of the given motors and store them in a list. Parameters ---------- arg : Motor The motors to get the positions of. Returns ------- Iterator[Tuple[str, float]] An iterator of tuples containing the motor name and its position. """ motor_position = [] for motor in arg: motor_position.append(motor) position = yield from bps.rd(motor) # type: ignore motor_position.append(position) LOGGER.info(f"Stored motor, position = {motor_position}.") return motor_position
[docs] def get_velocity_and_step_size( scan_motor: Motor, ideal_velocity: float, ideal_step_size: float ) -> Iterator[Any]: """ Adjust the step size if the required velocity is higher than the max value. Parameters ---------- scan_motor : Motor The motor which will move continuously. ideal_velocity : float The desired velocity. ideal_step_size : float The non-scanning motor step size. Returns ------- Iterator[Tuple[float, float]] An iterator containing the adjusted velocity and step size. """ if ideal_velocity <= 0.0: raise ValueError(f"{scan_motor.name} speed: {ideal_velocity} <= 0") max_velocity = yield from bps.rd(scan_motor.max_velocity) # type: ignore # if motor does not move fast enough increase step_motor step size if ideal_velocity > max_velocity: ideal_step_size = ideal_step_size * (ideal_velocity / max_velocity) ideal_velocity = round(max_velocity, 3) return ideal_velocity, ideal_step_size