Source code for dodal.devices.focusing_mirror

from enum import Enum, IntEnum
from typing import Any

from ophyd import Component, Device, EpicsSignal
from ophyd.status import Status, StatusBase
from ophyd_async.core import StandardReadable
from ophyd_async.core.signal import soft_signal_r_and_setter
from ophyd_async.epics.motion import Motor
from ophyd_async.epics.signal import (
    epics_signal_rw,
    epics_signal_x,
)

from dodal.log import LOGGER

VOLTAGE_POLLING_DELAY_S = 0.5

# The default timeout is 60 seconds as voltage slew rate is typically ~2V/s
DEFAULT_SETTLE_TIME_S = 60


[docs] class MirrorType(str, Enum): """See https://manual.nexusformat.org/classes/base_classes/NXmirror.html""" SINGLE = "single" MULTI = "multi"
[docs] class MirrorStripe(str, Enum): RHODIUM = "Rhodium" BARE = "Bare" PLATINUM = "Platinum"
[docs] class MirrorVoltageDemand(IntEnum): N_A = 0 OK = 1 FAIL = 2 SLEW = 3
[docs] class MirrorVoltageDevice(Device): """Abstract the bimorph mirror voltage PVs into a single device that can be set asynchronously and returns when the demanded voltage setpoint is accepted, without blocking the caller as this process can take significant time. """ _actual_v: EpicsSignal = Component(EpicsSignal, "R") _setpoint_v: EpicsSignal = Component(EpicsSignal, "D") _demand_accepted: EpicsSignal = Component(EpicsSignal, "DSEV")
[docs] def set(self, value, *args, **kwargs) -> StatusBase: """Combine the following operations into a single set: 1. apply the value to the setpoint PV 2. Return to the caller with a Status future 3. Wait until demand is accepted 4. When either demand is accepted or DEFAULT_SETTLE_TIME expires, signal the result on the Status """ setpoint_v = self._setpoint_v demand_accepted = self._demand_accepted if demand_accepted.get() != MirrorVoltageDemand.OK: raise AssertionError( f"Attempted to set {setpoint_v.name} when demand is not accepted." ) if setpoint_v.get() == value: LOGGER.debug(f"{setpoint_v.name} already at {value} - skipping set") return Status(success=True, done=True) LOGGER.debug(f"setting {setpoint_v.name} to {value}") demand_accepted_status = Status(self, DEFAULT_SETTLE_TIME_S) subscription: dict[str, Any] = {"handle": None} def demand_check_callback(old_value, value, **kwargs): LOGGER.debug(f"Got event old={old_value} new={value} for {setpoint_v.name}") if old_value != MirrorVoltageDemand.OK and value == MirrorVoltageDemand.OK: LOGGER.debug(f"Demand accepted for {setpoint_v.name}") subs_handle = subscription.pop("handle", None) if subs_handle is None: raise AssertionError("Demand accepted before set attempted") demand_accepted.unsubscribe(subs_handle) demand_accepted_status.set_finished() # else timeout handled by parent demand_accepted_status subscription["handle"] = demand_accepted.subscribe(demand_check_callback) setpoint_status = setpoint_v.set(value) status = setpoint_status & demand_accepted_status return status
[docs] class VFMMirrorVoltages(Device): def __init__(self, *args, daq_configuration_path: str, **kwargs): super().__init__(*args, **kwargs) self.voltage_lookup_table_path = ( daq_configuration_path + "/json/mirrorFocus.json" ) _channel14_voltage_device = Component(MirrorVoltageDevice, "BM:V14") _channel15_voltage_device = Component(MirrorVoltageDevice, "BM:V15") _channel16_voltage_device = Component(MirrorVoltageDevice, "BM:V16") _channel17_voltage_device = Component(MirrorVoltageDevice, "BM:V17") _channel18_voltage_device = Component(MirrorVoltageDevice, "BM:V18") _channel19_voltage_device = Component(MirrorVoltageDevice, "BM:V19") _channel20_voltage_device = Component(MirrorVoltageDevice, "BM:V20") _channel21_voltage_device = Component(MirrorVoltageDevice, "BM:V21") @property def voltage_channels(self) -> list[MirrorVoltageDevice]: return [ self._channel14_voltage_device, self._channel15_voltage_device, self._channel16_voltage_device, self._channel17_voltage_device, self._channel18_voltage_device, self._channel19_voltage_device, self._channel20_voltage_device, self._channel21_voltage_device, ]
[docs] class FocusingMirror(StandardReadable): """Focusing Mirror""" def __init__( self, name, prefix, bragg_to_lat_lut_path=None, x_suffix="X", y_suffix="Y" ): self.bragg_to_lat_lookup_table_path = bragg_to_lat_lut_path self.yaw_mrad = Motor(prefix + "YAW") self.pitch_mrad = Motor(prefix + "PITCH") self.roll_mrad = Motor(prefix + "ROLL") self.x_mm = Motor(prefix + x_suffix) self.y_mm = Motor(prefix + y_suffix) self.jack1_mm = Motor(prefix + "Y1") self.jack2_mm = Motor(prefix + "Y2") self.jack3_mm = Motor(prefix + "Y3") self.translation1_mm = Motor(prefix + "X1") self.translation2_mm = Motor(prefix + "X2") self.type, _ = soft_signal_r_and_setter(MirrorType, MirrorType.SINGLE) # The device is in the beamline co-ordinate system so pitch is the incident angle # regardless of orientation of the mirror self.incident_angle = Motor(prefix + "PITCH") self.set_readable_signals( read=[self.incident_angle.user_readback], config=[self.type], ) super().__init__(name)
[docs] class FocusingMirrorWithStripes(FocusingMirror): """A focusing mirror where the stripe material can be changed. This is usually done based on the energy of the beamline.""" def __init__(self, name, prefix, *args, **kwargs): self.stripe = epics_signal_rw(MirrorStripe, prefix + "STRP:DVAL") # apply the current set stripe setting self.apply_stripe = epics_signal_x(prefix + "CHANGE.PROC") super().__init__(name, prefix, *args, **kwargs) def energy_to_stripe(self, energy_kev) -> MirrorStripe: # In future, this should be configurable per-mirror if energy_kev < 7: return MirrorStripe.BARE else: return MirrorStripe.RHODIUM