from __future__ import annotations
import asyncio
from enum import Enum
from functools import partialmethod
from typing import List
from ophyd_async.core import (
AsyncStatus,
DeviceVector,
SignalRW,
StandardReadable,
observe_value,
)
from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw
# Sources
DISCONNECT = 0
IN1_TTL = 1
IN2_TTL = 4
IN3_TTL = 7
IN4_TTL = 10
PC_ARM = 29
PC_GATE = 30
PC_PULSE = 31
AND3 = 34
AND4 = 35
OR1 = 36
PULSE1 = 52
PULSE2 = 53
SOFT_IN1 = 60
SOFT_IN2 = 61
SOFT_IN3 = 62
# Instrument specific
TTL_DETECTOR = 1
TTL_SHUTTER = 2
TTL_XSPRESS3 = 3
TTL_PANDA = 4
[docs]
class ArmSource(str, Enum):
SOFT = "Soft"
EXTERNAL = "External"
[docs]
class TrigSource(str, Enum):
POSITION = "Position"
TIME = "Time"
EXTERNAL = "External"
[docs]
class EncEnum(str, Enum):
Enc1 = "Enc1"
Enc2 = "Enc2"
Enc3 = "Enc3"
Enc4 = "Enc4"
Enc1_4Av = "Enc1-4Av"
[docs]
class I03Axes:
SMARGON_X1 = EncEnum.Enc1
SMARGON_Y = EncEnum.Enc2
SMARGON_Z = EncEnum.Enc3
OMEGA = EncEnum.Enc4
[docs]
class I24Axes:
VGON_Z = EncEnum.Enc1
OMEGA = EncEnum.Enc2
VGON_X = EncEnum.Enc3
VGON_YH = EncEnum.Enc4
[docs]
class RotationDirection(str, Enum):
POSITIVE = "Positive"
NEGATIVE = "Negative"
@property
def multiplier(self):
return 1 if self == RotationDirection.POSITIVE else -1
[docs]
class ArmDemand(Enum):
ARM = 1
DISARM = 0
[docs]
class SoftInState(str, Enum):
YES = "Yes"
NO = "No"
[docs]
class ArmingDevice(StandardReadable):
"""A useful device that can abstract some of the logic of arming.
Allows a user to just call arm.set(ArmDemand.ARM)"""
TIMEOUT = 3
def __init__(self, prefix: str, name: str = "") -> None:
self.arm_set = epics_signal_rw(float, prefix + "PC_ARM")
self.disarm_set = epics_signal_rw(float, prefix + "PC_DISARM")
self.armed = epics_signal_r(float, prefix + "PC_ARM_OUT")
super().__init__(name)
async def _set_armed(self, demand: ArmDemand):
signal_to_set = self.arm_set if demand == ArmDemand.ARM else self.disarm_set
await signal_to_set.set(1)
async for reading in observe_value(self.armed):
if reading == demand.value:
return
def set(self, demand: ArmDemand) -> AsyncStatus:
return AsyncStatus(
asyncio.wait_for(self._set_armed(demand), timeout=self.TIMEOUT)
)
[docs]
class PositionCompare(StandardReadable):
def __init__(self, prefix: str, name: str = "") -> None:
self.num_gates = epics_signal_rw(float, prefix + "PC_GATE_NGATE")
self.gate_trigger = epics_signal_rw(EncEnum, prefix + "PC_ENC")
self.gate_source = epics_signal_rw(TrigSource, prefix + "PC_GATE_SEL")
self.gate_input = epics_signal_rw(float, prefix + "PC_GATE_INP")
self.gate_width = epics_signal_rw(float, prefix + "PC_GATE_WID")
self.gate_start = epics_signal_rw(float, prefix + "PC_GATE_START")
self.gate_step = epics_signal_rw(float, prefix + "PC_GATE_STEP")
self.pulse_source = epics_signal_rw(TrigSource, prefix + "PC_PULSE_SEL")
self.pulse_input = epics_signal_rw(float, prefix + "PC_PULSE_INP")
self.pulse_start = epics_signal_rw(float, prefix + "PC_PULSE_START")
self.pulse_width = epics_signal_rw(float, prefix + "PC_PULSE_WID")
self.pulse_step = epics_signal_rw(float, prefix + "PC_PULSE_STEP")
self.pulse_max = epics_signal_rw(float, prefix + "PC_PULSE_MAX")
self.dir = epics_signal_rw(RotationDirection, prefix + "PC_DIR")
self.arm_source = epics_signal_rw(ArmSource, prefix + "PC_ARM_SEL")
self.reset = epics_signal_rw(int, prefix + "SYS_RESET.PROC")
self.arm = ArmingDevice(prefix)
super().__init__(name)
async def is_armed(self) -> bool:
arm_state = await self.arm.armed.get_value()
return arm_state == 1
[docs]
class PulseOutput(StandardReadable):
"""Zebra pulse output panel."""
def __init__(self, prefix: str, name: str = "") -> None:
self.input = epics_signal_rw(float, prefix + "_INP")
self.delay = epics_signal_rw(float, prefix + "_DLY")
self.width = epics_signal_rw(float, prefix + "_WID")
super().__init__(name)
[docs]
class ZebraOutputPanel(StandardReadable):
def __init__(self, prefix: str, name: str = "") -> None:
self.pulse_1 = PulseOutput(prefix + "PULSE1")
self.pulse_2 = PulseOutput(prefix + "PULSE2")
self.out_pvs: DeviceVector[SignalRW] = DeviceVector(
{i: epics_signal_rw(float, prefix + f"OUT{i}_TTL") for i in range(1, 5)}
)
super().__init__(name)
[docs]
def boolean_array_to_integer(values: List[bool]) -> int:
"""Converts a boolean array to integer by interpretting it in binary with LSB 0 bit
numbering.
Args:
values (List[bool]): The list of booleans to convert.
Returns:
int: The interpretted integer.
"""
return sum(v << i for i, v in enumerate(values))
[docs]
class GateControl(StandardReadable):
def __init__(self, prefix: str, name: str = "") -> None:
self.enable = epics_signal_rw(int, prefix + "_ENA")
self.sources = DeviceVector(
{i: epics_signal_rw(float, prefix + f"_INP{i}") for i in range(1, 5)}
)
self.invert = epics_signal_rw(int, prefix + "_INV")
super().__init__(name)
[docs]
class GateType(Enum):
AND = "AND"
OR = "OR"
[docs]
class LogicGateConfiguration:
NUMBER_OF_INPUTS = 4
def __init__(self, input_source: int, invert: bool = False) -> None:
self.sources: List[int] = []
self.invert: List[bool] = []
self.add_input(input_source, invert)
def __str__(self) -> str:
input_strings = []
for input, (source, invert) in enumerate(zip(self.sources, self.invert)):
input_strings.append(f"INP{input+1}={'!' if invert else ''}{source}")
return ", ".join(input_strings)
[docs]
class Zebra(StandardReadable):
"""The Zebra device."""
def __init__(self, name: str, prefix: str) -> None:
self.pc = PositionCompare(prefix, name)
self.output = ZebraOutputPanel(prefix, name)
self.inputs = SoftInputs(prefix, name)
self.logic_gates = LogicGateConfigurer(prefix, name)
super().__init__(name=name)