Source code for dodal.devices.focusing_mirror

from enum import Enum

from ophyd_async.core import (
    AsyncStatus,
    Device,
    DeviceVector,
    StandardReadable,
    observe_value,
)
from ophyd_async.core.signal import soft_signal_r_and_setter
from ophyd_async.epics.motion import Motor
from ophyd_async.epics.signal import (
    epics_signal_r,
    epics_signal_rw,
    epics_signal_x,
)

from dodal.log import LOGGER

VOLTAGE_POLLING_DELAY_S = 0.5

# The default timeout is 60 seconds as voltage slew rate is typically ~2V/s
DEFAULT_SETTLE_TIME_S = 60


[docs] class MirrorType(str, Enum): """See https://manual.nexusformat.org/classes/base_classes/NXmirror.html""" SINGLE = "single" MULTI = "multi"
[docs] class MirrorStripe(str, Enum): RHODIUM = "Rhodium" BARE = "Bare" PLATINUM = "Platinum"
[docs] class MirrorVoltageDemand(str, Enum): N_A = "N/A" OK = "OK" FAIL = "FAIL" SLEW = "SLEW"
[docs] class MirrorVoltageDevice(Device): """Abstract the bimorph mirror voltage PVs into a single device that can be set asynchronously and returns when the demanded voltage setpoint is accepted, without blocking the caller as this process can take significant time. """ def __init__(self, name: str = "", prefix: str = ""): self._actual_v = epics_signal_r(int, prefix + "R") self._setpoint_v = epics_signal_rw(int, prefix + "D") self._demand_accepted = epics_signal_r(MirrorVoltageDemand, prefix + "DSEV") super().__init__(name=name)
[docs] @AsyncStatus.wrap async def set(self, value, *args, **kwargs): """Combine the following operations into a single set: 1. apply the value to the setpoint PV 3. Wait until demand is accepted 4. When either demand is accepted or DEFAULT_SETTLE_TIME expires, signal the result on the Status """ setpoint_v = self._setpoint_v demand_accepted = self._demand_accepted if await demand_accepted.get_value() != MirrorVoltageDemand.OK: raise AssertionError( f"Attempted to set {setpoint_v.name} when demand is not accepted." ) if await setpoint_v.get_value() == value: LOGGER.debug(f"{setpoint_v.name} already at {value} - skipping set") return LOGGER.debug(f"setting {setpoint_v.name} to {value}") # Register an observer up front to ensure we don't miss events after we # perform the set demand_accepted_iterator = observe_value( demand_accepted, timeout=DEFAULT_SETTLE_TIME_S ) # discard the current value (OK) so we can await a subsequent change await anext(demand_accepted_iterator) await setpoint_v.set(value) # The set should always change to SLEW regardless of whether we are # already at the set point, then change back to OK/FAIL depending on # success accepted_value = await anext(demand_accepted_iterator) assert accepted_value == MirrorVoltageDemand.SLEW LOGGER.debug( f"Demand not accepted for {setpoint_v.name}, waiting for acceptance..." ) while MirrorVoltageDemand.SLEW == ( accepted_value := await anext(demand_accepted_iterator) ): pass if accepted_value != MirrorVoltageDemand.OK: raise AssertionError( f"Voltage slew failed for {setpoint_v.name}, new state={accepted_value}" )
[docs] class VFMMirrorVoltages(StandardReadable): def __init__( self, name: str, prefix: str, *args, daq_configuration_path: str, **kwargs ): self.voltage_lookup_table_path = ( daq_configuration_path + "/json/mirrorFocus.json" ) with self.add_children_as_readables(): self.voltage_channels = DeviceVector( { i - 14: MirrorVoltageDevice(prefix=f"{prefix}BM:V{i}") for i in range(14, 22) } ) super().__init__(*args, name=name, **kwargs)
[docs] class FocusingMirror(StandardReadable): """Focusing Mirror""" def __init__( self, name, prefix, bragg_to_lat_lut_path=None, x_suffix="X", y_suffix="Y" ): self.bragg_to_lat_lookup_table_path = bragg_to_lat_lut_path self.yaw_mrad = Motor(prefix + "YAW") self.pitch_mrad = Motor(prefix + "PITCH") self.roll_mrad = Motor(prefix + "ROLL") self.x_mm = Motor(prefix + x_suffix) self.y_mm = Motor(prefix + y_suffix) self.jack1_mm = Motor(prefix + "Y1") self.jack2_mm = Motor(prefix + "Y2") self.jack3_mm = Motor(prefix + "Y3") self.translation1_mm = Motor(prefix + "X1") self.translation2_mm = Motor(prefix + "X2") self.type, _ = soft_signal_r_and_setter(MirrorType, MirrorType.SINGLE) # The device is in the beamline co-ordinate system so pitch is the incident angle # regardless of orientation of the mirror self.incident_angle = Motor(prefix + "PITCH") self.set_readable_signals( read=[self.incident_angle.user_readback], config=[self.type], ) super().__init__(name)
[docs] class FocusingMirrorWithStripes(FocusingMirror): """A focusing mirror where the stripe material can be changed. This is usually done based on the energy of the beamline.""" def __init__(self, name, prefix, *args, **kwargs): self.stripe = epics_signal_rw(MirrorStripe, prefix + "STRP:DVAL") # apply the current set stripe setting self.apply_stripe = epics_signal_x(prefix + "CHANGE.PROC") super().__init__(name, prefix, *args, **kwargs) def energy_to_stripe(self, energy_kev) -> MirrorStripe: # In future, this should be configurable per-mirror if energy_kev < 7: return MirrorStripe.BARE else: return MirrorStripe.RHODIUM