Source code for dodal.devices.fast_grid_scan

from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar

import numpy as np
from bluesky.plan_stubs import mv
from numpy import ndarray
from ophyd_async.core import (
    AsyncStatus,
    Device,
    Signal,
    SignalR,
    SignalRW,
    SoftSignalBackend,
    StandardReadable,
    wait_for_value,
)
from ophyd_async.epics.signal import (
    epics_signal_r,
    epics_signal_rw,
    epics_signal_rw_rbv,
    epics_signal_x,
)
from pydantic import validator
from pydantic.dataclasses import dataclass

from dodal.log import LOGGER
from dodal.parameters.experiment_parameter_base import AbstractExperimentWithBeamParams


[docs] @dataclass class GridAxis: start: float step_size: float full_steps: int
[docs] def steps_to_motor_position(self, steps): """Gives the motor position based on steps, where steps are 0 indexed""" return self.start + self.step_size * steps
@property def end(self): """Gives the point where the final frame is taken""" # Note that full_steps is one indexed e.g. if there is one step then the end is # refering to the first position return self.steps_to_motor_position(self.full_steps - 1) def is_within(self, steps): return 0 <= steps <= self.full_steps
[docs] class GridScanParamsCommon(AbstractExperimentWithBeamParams): """ Common holder class for the parameters of a grid scan in a similar layout to EPICS. The parameters and functions of this class are common to both the zebra and panda triggered fast grid scans. The grid specified is where data is taken e.g. it can be assumed the first frame is at x_start, y1_start, z1_start and subsequent frames are N*step_size away. """ x_steps: int = 1 y_steps: int = 1 z_steps: int = 0 x_step_size: float = 0.1 y_step_size: float = 0.1 z_step_size: float = 0.1 x_start: float = 0.1 y1_start: float = 0.1 y2_start: float = 0.1 z1_start: float = 0.1 z2_start: float = 0.1 x_axis: GridAxis = GridAxis(0, 0, 0) y_axis: GridAxis = GridAxis(0, 0, 0) z_axis: GridAxis = GridAxis(0, 0, 0) # Whether to set the stub offsets after centering set_stub_offsets: bool = False def get_param_positions(self) -> dict: return { "x_steps": self.x_steps, "y_steps": self.y_steps, "z_steps": self.z_steps, "x_step_size": self.x_step_size, "y_step_size": self.y_step_size, "z_step_size": self.z_step_size, "x_start": self.x_start, "y1_start": self.y1_start, "y2_start": self.y2_start, "z1_start": self.z1_start, "z2_start": self.z2_start, } class Config: arbitrary_types_allowed = True fields = { "x_axis": {"exclude": True}, "y_axis": {"exclude": True}, "z_axis": {"exclude": True}, } @validator("x_axis", always=True) def _get_x_axis(cls, x_axis: GridAxis, values: dict[str, Any]) -> GridAxis: return GridAxis(values["x_start"], values["x_step_size"], values["x_steps"]) @validator("y_axis", always=True) def _get_y_axis(cls, y_axis: GridAxis, values: dict[str, Any]) -> GridAxis: return GridAxis(values["y1_start"], values["y_step_size"], values["y_steps"]) @validator("z_axis", always=True) def _get_z_axis(cls, z_axis: GridAxis, values: dict[str, Any]) -> GridAxis: return GridAxis(values["z2_start"], values["z_step_size"], values["z_steps"]) def get_num_images(self): return self.x_steps * self.y_steps + self.x_steps * self.z_steps @property def is_3d_grid_scan(self): return self.z_steps > 0
[docs] def grid_position_to_motor_position(self, grid_position: ndarray) -> ndarray: """Converts a grid position, given as steps in the x, y, z grid, to a real motor position. :param grid_position: The x, y, z position in grid steps :return: The motor position this corresponds to. :raises: IndexError if the desired position is outside the grid.""" for position, axis in zip( grid_position, [self.x_axis, self.y_axis, self.z_axis] ): if not axis.is_within(position): raise IndexError(f"{grid_position} is outside the bounds of the grid") return np.array( [ self.x_axis.steps_to_motor_position(grid_position[0]), self.y_axis.steps_to_motor_position(grid_position[1]), self.z_axis.steps_to_motor_position(grid_position[2]), ] )
ParamType = TypeVar("ParamType", bound=GridScanParamsCommon)
[docs] class ZebraGridScanParams(GridScanParamsCommon): """ Params for standard Zebra FGS. Adds on the dwell time """ dwell_time_ms: float = 10 def get_param_positions(self): param_positions = super().get_param_positions() param_positions["dwell_time_ms"] = self.dwell_time_ms return param_positions @validator("dwell_time_ms", always=True, check_fields=True) def non_integer_dwell_time(cls, dwell_time_ms: float) -> float: dwell_time_floor_rounded = np.floor(dwell_time_ms) dwell_time_is_close = np.isclose( dwell_time_ms, dwell_time_floor_rounded, rtol=1e-3 ) if not dwell_time_is_close: raise ValueError( f"Dwell time of {dwell_time_ms}ms is not an integer value. Fast Grid Scan only accepts integer values" ) return dwell_time_ms
[docs] class PandAGridScanParams(GridScanParamsCommon): """ Params for panda constant-motion scan. Adds on the goniometer run-up distance """ run_up_distance_mm: float = 0.17 def get_param_positions(self): param_positions = super().get_param_positions() param_positions["run_up_distance_mm"] = self.run_up_distance_mm return param_positions
[docs] class MotionProgram(Device): def __init__(self, prefix: str, name: str = "") -> None: super().__init__(name) self.running = epics_signal_r(int, prefix + "PROGBITS") self.program_number = epics_signal_r(float, prefix + "CS1:PROG_NUM")
[docs] class ExpectedImages(SignalR[int]): def __init__(self, parent: "FastGridScanCommon") -> None: super().__init__(SoftSignalBackend(int)) self.parent: "FastGridScanCommon" = parent async def get_value(self): x = await self.parent.x_steps.get_value() y = await self.parent.y_steps.get_value() z = await self.parent.z_steps.get_value() first_grid = x * y second_grid = x * z return first_grid + second_grid
[docs] class FastGridScanCommon(StandardReadable, ABC, Generic[ParamType]): """Device for a general fast grid scan When the motion program is started, the goniometer will move in a snake-like grid trajectory, with X as the fast axis and Y as the slow axis. If Z steps isn't 0, the goniometer will then rotate in the omega direction such that it moves from the X-Y, to the X-Z plane then do a second grid scan. The detector is triggered after every x step. See https://github.com/DiamondLightSource/hyperion/wiki/Coordinate-Systems for more """ def __init__(self, prefix: str, smargon_prefix: str, name: str = "") -> None: self.x_steps = epics_signal_rw_rbv(int, f"{prefix}X_NUM_STEPS") self.y_steps = epics_signal_rw_rbv( int, f"{prefix}Y_NUM_STEPS" ) # Number of vertical steps during the first grid scan self.z_steps = epics_signal_rw_rbv( int, f"{prefix}Z_NUM_STEPS" ) # Number of vertical steps during the second grid scan, after the rotation in omega self.x_step_size = epics_signal_rw_rbv(float, f"{prefix}X_STEP_SIZE") self.y_step_size = epics_signal_rw_rbv(float, f"{prefix}Y_STEP_SIZE") self.z_step_size = epics_signal_rw_rbv(float, f"{prefix}Z_STEP_SIZE") self.x_start = epics_signal_rw_rbv(float, f"{prefix}X_START") self.y1_start = epics_signal_rw_rbv(float, f"{prefix}Y_START") self.y2_start = epics_signal_rw_rbv(float, f"{prefix}Y2_START") self.z1_start = epics_signal_rw_rbv(float, f"{prefix}Z_START") self.z2_start = epics_signal_rw_rbv(float, f"{prefix}Z2_START") self.scan_invalid = epics_signal_r(float, f"{prefix}SCAN_INVALID") self.run_cmd = epics_signal_x(f"{prefix}RUN.PROC") self.status = epics_signal_r(int, f"{prefix}SCAN_STATUS") self.expected_images = ExpectedImages(parent=self) self.motion_program = MotionProgram(smargon_prefix) self.position_counter = self._create_position_counter(prefix) # Kickoff timeout in seconds self.KICKOFF_TIMEOUT: float = 5.0 self.COMPLETE_STATUS: float = 60.0 self.movable_params: dict[str, Signal] = { "x_steps": self.x_steps, "y_steps": self.y_steps, "z_steps": self.z_steps, "x_step_size": self.x_step_size, "y_step_size": self.y_step_size, "z_step_size": self.z_step_size, "x_start": self.x_start, "y1_start": self.y1_start, "y2_start": self.y2_start, "z1_start": self.z1_start, "z2_start": self.z2_start, } super().__init__(name) @AsyncStatus.wrap async def kickoff(self): curr_prog = await self.motion_program.program_number.get_value() running = await self.motion_program.running.get_value() if running: LOGGER.info(f"Motion program {curr_prog} still running, waiting...") await wait_for_value(self.motion_program.running, 0, self.KICKOFF_TIMEOUT) LOGGER.debug("Running scan") await self.run_cmd.trigger() LOGGER.info("Waiting for FGS to start") await wait_for_value(self.status, 1, self.KICKOFF_TIMEOUT) LOGGER.debug("FGS kicked off") @AsyncStatus.wrap async def complete(self): await wait_for_value(self.status, 0, self.COMPLETE_STATUS) @abstractmethod def _create_position_counter(self, prefix: str) -> SignalRW[int]: pass
[docs] class ZebraFastGridScan(FastGridScanCommon[ZebraGridScanParams]): """Device for standard Zebra FGS. In this scan, the goniometer's velocity profile follows a parabolic shape between X steps, with the slowest points occuring at each X step. """ def __init__(self, prefix: str, name: str = "") -> None: full_prefix = prefix + "FGS:" # Time taken to travel between X steps self.dwell_time_ms = epics_signal_rw_rbv(float, f"{full_prefix}DWELL_TIME") self.x_counter = epics_signal_r(int, f"{full_prefix}X_COUNTER") self.y_counter = epics_signal_r(int, f"{full_prefix}Y_COUNTER") super().__init__(full_prefix, prefix, name) self.movable_params["dwell_time_ms"] = self.dwell_time_ms def _create_position_counter(self, prefix: str): return epics_signal_rw( int, f"{prefix}POS_COUNTER", write_pv=f"{prefix}POS_COUNTER_WRITE" )
[docs] class PandAFastGridScan(FastGridScanCommon[PandAGridScanParams]): """Device for panda constant-motion scan""" def __init__(self, prefix: str, name: str = "") -> None: full_prefix = prefix + "PGS:" self.time_between_x_steps_ms = ( epics_signal_rw_rbv( # Used by motion controller to set goniometer velocity float, f"{full_prefix}TIME_BETWEEN_X_STEPS" ) ) # Distance before and after the grid given to allow goniometer to reach desired speed while it is within the # grid self.run_up_distance_mm = epics_signal_rw_rbv( float, f"{full_prefix}RUNUP_DISTANCE" ) super().__init__(full_prefix, prefix, name) self.movable_params["run_up_distance_mm"] = self.run_up_distance_mm def _create_position_counter(self, prefix: str): return epics_signal_rw(int, f"{prefix}Y_COUNTER")
[docs] def set_fast_grid_scan_params(scan: FastGridScanCommon[ParamType], params: ParamType): to_move = [] # Create arguments for bps.mv for key in scan.movable_params.keys(): to_move.extend([scan.movable_params[key], params.__dict__[key]]) # Counter should always start at 0 to_move.extend([scan.position_counter, 0]) yield from mv(*to_move)