Source code for sm_bluesky.electron_analyser.plan_stubs.analyser_per_step

from collections.abc import Mapping, Sequence
from typing import Any

from bluesky.plan_stubs import (
    move_per_step,
    trigger_and_read,
)
from bluesky.protocols import Movable, Readable
from bluesky.utils import (
    MsgGenerator,
    plan,
)
from dodal.devices.electron_analyser import (
    ElectronAnalyserRegionDetector,
    GenericElectronAnalyserRegionDetector,
)
from dodal.log import LOGGER


@plan
def analyser_shot(detectors: Sequence[Readable], *args) -> MsgGenerator:
    yield from analyser_nd_step(detectors, {}, {}, *args)


[docs] @plan def analyser_nd_step( detectors: Sequence[Readable], step: Mapping[Movable, Any], pos_cache: dict[Movable, Any], *args, ) -> MsgGenerator: """ Inner loop of an N-dimensional step scan Modified default function for ``per_step`` param in ND plans. Performs an extra for loop for each ElectronAnalyserRegionDetector present so they can be collected one by one. Parameters ---------- detectors : iterable devices to read step : dict mapping motors to positions in this step pos_cache : dict mapping motors to their last-set positions """ analyser_detectors: list[GenericElectronAnalyserRegionDetector] = [] other_detectors = [] for det in detectors: if isinstance(det, ElectronAnalyserRegionDetector): analyser_detectors.append(det) else: other_detectors.append(det) # Step provides the map of motors to single position to move to. Move motors to # required positions. yield from move_per_step(step, pos_cache) # This is to satisfy type checking. Motors are Moveable and Readable, so make # them Readable so positions can be measured. motors: list[Readable] = [s for s in step.keys() if isinstance(s, Readable)] # To get energy sources and open paired shutters, they need to be given in this # plan. They could possibly come from step but we then have to extract them out. # It would also mean forcefully adding in the devices at the wrapper level. # It would easier if they are part of the detector and the plan just calls the # common methods so it is more dynamic and configuration only for device. for analyser_det in analyser_detectors: dets = [analyser_det] + list(other_detectors) + list(motors) LOGGER.info(f"Scanning region {analyser_det.region.name}.") yield from trigger_and_read( dets, name=analyser_det.region.name, )