Source code for dodal.devices.pressure_jump_cell

import asyncio
from dataclasses import dataclass

from bluesky.protocols import HasName, Movable
from ophyd_async.core import (
    AsyncStatus,
    DeviceVector,
    SignalR,
    StandardReadable,
    StandardReadableFormat,
    StrictEnum,
)
from ophyd_async.epics.core import epics_signal_r, epics_signal_rw

OPENSEQ_PULSE_LENGTH = 0.2


[docs] class PumpState(StrictEnum): MANUAL = "Manual" AUTO_PRESSURE = "Auto Pressure" AUTO_POSITION = "Auto Position"
[docs] class StopState(StrictEnum): CONTINUE = "CONTINUE" STOP = "STOP"
[docs] class FastValveControlRequest(StrictEnum): OPEN = "Open" CLOSE = "Close" RESET = "Reset" ARM = "Arm" DISARM = "Disarm"
[docs] class ValveControlRequest(StrictEnum): OPEN = "Open" CLOSE = "Close" RESET = "Reset"
[docs] class ValveOpenSeqRequest(StrictEnum): INACTIVE = "0" OPEN_SEQ = "1"
[docs] class PumpMotorDirectionState(StrictEnum): EMPTY = "" FORWARD = "Forward" REVERSE = "Reverse"
[docs] class ValveState(StrictEnum): FAULT = "Fault" OPEN = "Open" OPENING = "Opening" CLOSED = "Closed" CLOSING = "Closing"
[docs] class FastValveState(StrictEnum): FAULT = "Fault" OPEN = "Open" OPEN_ARMED = "Open Armed" CLOSED = "Closed" CLOSED_ARMED = "Closed Armed" NONE = "Unused"
[docs] @dataclass class AllValvesControlState: valve_1: ValveControlRequest | None = None valve_3: ValveControlRequest | None = None valve_5: FastValveControlRequest | None = None valve_6: FastValveControlRequest | None = None
[docs] class AllValvesControl(StandardReadable, Movable[AllValvesControlState]): """ valves 2, 4, 7, 8 are not controlled by the IOC, as they are under manual control. fast_valves: tuple[int, ...] = (5, 6) slow_valves: tuple[int, ...] = (1, 3) """ def __init__( self, prefix: str, name: str = "", fast_valves: tuple[int, ...] = (5, 6), slow_valves: tuple[int, ...] = (1, 3), ) -> None: self.fast_valves = fast_valves self.slow_valves = slow_valves with self.add_children_as_readables(): self.valve_states: DeviceVector[SignalR[ValveState]] = DeviceVector( { i: epics_signal_r(ValveState, f"{prefix}V{i}:STA") for i in self.slow_valves } ) self.fast_valve_states: DeviceVector[SignalR[FastValveState]] = ( DeviceVector( { i: epics_signal_r(FastValveState, f"{prefix}V{i}:STA") for i in self.fast_valves } ) ) self.fast_valve_control: DeviceVector[FastValveControl] = DeviceVector( {i: FastValveControl(f"{prefix}V{i}") for i in self.fast_valves} ) self.valve_control: DeviceVector[ValveControl] = DeviceVector( {i: ValveControl(f"{prefix}V{i}") for i in self.slow_valves} ) super().__init__(name) async def set_valve( self, valve: int, value: ValveControlRequest | FastValveControlRequest, ): if valve in self.slow_valves and (isinstance(value, ValveControlRequest)): if value == ValveControlRequest.OPEN: await self.valve_control[valve].set(ValveOpenSeqRequest.OPEN_SEQ) await asyncio.sleep(OPENSEQ_PULSE_LENGTH) await self.valve_control[valve].set(ValveOpenSeqRequest.INACTIVE) else: await self.valve_control[valve].set(value) elif valve in self.fast_valves and (isinstance(value, FastValveControlRequest)): if value == FastValveControlRequest.OPEN: await self.fast_valve_control[valve].set(ValveOpenSeqRequest.OPEN_SEQ) await asyncio.sleep(OPENSEQ_PULSE_LENGTH) await self.fast_valve_control[valve].set(ValveOpenSeqRequest.INACTIVE) else: await self.fast_valve_control[valve].set(value) @AsyncStatus.wrap async def set(self, value: AllValvesControlState): await asyncio.gather( *( self.set_valve(int(i[-1]), value) for i, value in value.__dict__.items() if value is not None ) )
[docs] class ValveControl( StandardReadable, Movable[ValveControlRequest | ValveOpenSeqRequest] ): def __init__(self, prefix: str, name: str = "") -> None: with self.add_children_as_readables(): self.close = epics_signal_rw(ValveControlRequest, prefix + ":CON") self.open = epics_signal_rw(int, prefix + ":OPENSEQ") super().__init__(name) def set(self, value: ValveControlRequest | ValveOpenSeqRequest) -> AsyncStatus: set_status = None if isinstance(value, ValveControlRequest): set_status = self.close.set(value) elif isinstance(value, ValveOpenSeqRequest): set_status = self.open.set(value.value) return set_status
[docs] class FastValveControl( StandardReadable, Movable[FastValveControlRequest | ValveOpenSeqRequest] ): def __init__(self, prefix: str, name: str = "") -> None: with self.add_children_as_readables(): self.close = epics_signal_rw(FastValveControlRequest, prefix + ":CON") self.open = epics_signal_rw(int, prefix + ":OPENSEQ") super().__init__(name) def set(self, value: FastValveControlRequest | ValveOpenSeqRequest) -> AsyncStatus: set_status = None if isinstance(value, FastValveControlRequest): set_status = self.close.set(value) elif isinstance(value, ValveOpenSeqRequest): set_status = self.open.set(value.value) return set_status
[docs] class Pump(StandardReadable): def __init__(self, prefix: str, name: str = "") -> None: with self.add_children_as_readables(): self.pump_position = epics_signal_r(float, prefix + "POS") self.pump_motor_direction = epics_signal_r( PumpMotorDirectionState, prefix + "MTRDIR" ) self.pump_speed = epics_signal_rw( float, write_pv=prefix + "MSPEED", read_pv=prefix + "MSPEED_RBV" ) with self.add_children_as_readables(StandardReadableFormat.CONFIG_SIGNAL): self.pump_mode = epics_signal_rw(PumpState, prefix + "SP:AUTO") super().__init__(name)
[docs] class PressureTransducer(StandardReadable): """ Pressure transducer for a high pressure X-ray cell. This is the chamber and there are three of them. 1 is the start, 3 is where the sample is. NOTE: the distinction between the adc prefix and the cell prefix is kept here. """ def __init__( self, prefix: str, cell_prefix: str, transducer_number: int, ethercat_channel_number: int, name: str = "", full_different_prefix_adc: str = "", ) -> None: final_prefix = f"{prefix}{cell_prefix}" with self.add_children_as_readables(): self.omron_pressure = epics_signal_r( float, f"{final_prefix}PP{transducer_number}:PRES" ) self.omron_voltage = epics_signal_r( float, f"{final_prefix}PP{transducer_number}:RAW" ) self.beckhoff_pressure = epics_signal_r( float, f"{final_prefix}STATP{transducer_number}:MeanValue_RBV" ) # P1 beckhoff voltage = BL38P-EA-ADC-02:CH1 # P2 beckhoff voltage = BL38P-EA-ADC-01:CH2 # P3 beckhoff voltage = BL38P-EA-ADC-01:CH1 self.slow_beckhoff_voltage_readout = epics_signal_r( float, f"{full_different_prefix_adc}CH{ethercat_channel_number}" ) super().__init__(name)
[docs] class PressureJumpCellController(HasName): def __init__(self, prefix: str, name: str = "") -> None: self.stop = epics_signal_rw(StopState, f"{prefix}STOP") self.target_pressure = epics_signal_rw(float, f"{prefix}TARGET") self.timeout = epics_signal_rw(float, f"{prefix}TIMER.HIGH") self.go = epics_signal_rw(bool, f"{prefix}GO") ## Jump logic ## self.start_pressure = epics_signal_rw(float, f"{prefix}JUMPF") self.target_pressure = epics_signal_rw(float, f"{prefix}JUMPT") self.jump_ready = epics_signal_rw(bool, f"{prefix}SETJUMP") self._name = name super().__init__() @property def name(self): return self._name
[docs] class PressureJumpCell(StandardReadable): """ High pressure X-ray cell, used to apply pressure or pressure jumps to a sample. prefix: str The prefix of beamline - SPECIAL - unusual that the cell prefix is computed separately """ def __init__( self, prefix: str, cell_prefix: str = "-HPXC-01:", adc_prefix: str = "-ADC", name: str = "", ): self.all_valves_control = AllValvesControl(f"{prefix}{cell_prefix}", name) self.pump = Pump(f"{prefix}{cell_prefix}", name) self.controller = PressureJumpCellController( f"{prefix}{cell_prefix}CTRL:", name ) with self.add_children_as_readables(): self.pressure_transducers: DeviceVector[PressureTransducer] = DeviceVector( { 1: PressureTransducer( prefix=prefix, cell_prefix=cell_prefix, transducer_number=1, full_different_prefix_adc=f"{prefix}{adc_prefix}-02:", ethercat_channel_number=1, ), 2: PressureTransducer( prefix=prefix, cell_prefix=cell_prefix, transducer_number=2, full_different_prefix_adc=f"{prefix}{adc_prefix}-01:", ethercat_channel_number=2, ), 3: PressureTransducer( prefix=prefix, cell_prefix=cell_prefix, transducer_number=3, full_different_prefix_adc=f"{prefix}{adc_prefix}-01:", ethercat_channel_number=1, ), } ) self.cell_temperature = epics_signal_r(float, f"{prefix}{cell_prefix}TEMP") super().__init__(name)