Source code for dodal.devices.util.epics_util

from functools import partial
from typing import Callable

from bluesky.protocols import Movable
from ophyd import Component, EpicsSignal
from ophyd import Device as OphydDevice
from ophyd.status import Status, StatusBase
from ophyd_async.core import AsyncStatus, wait_for_value
from ophyd_async.core import Device as OphydAsyncDevice
from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw

from dodal.log import LOGGER


[docs] def epics_signal_put_wait(pv_name: str, wait: float = 3.0) -> Component[EpicsSignal]: """Creates a `Component` around an `EpicsSignal` that waits for a callback on a put. Args: pv_name (str): The name of the PV for the `EpicsSignal` wait (str, optional): The timeout to wait for a callback. Defaults to 3.0. Returns: EpicsSignal: An EpicsSignal that will wait for a callback. """ return Component(EpicsSignal, pv_name, put_complete=True, write_timeout=wait)
[docs] def run_functions_without_blocking( functions_to_chain: list[Callable[[], StatusBase]], timeout: float = 60.0, associated_obj: OphydDevice | None = None, ) -> Status: """Creates and initiates an asynchronous chaining of functions which return a status Usage: This function can be used to take a series of status-returning functions and run them all sequentially and in the background by making use of callbacks. It also ensures exceptions on each returned status are propagated Args: functions_to_chain( list(function - > StatusBase) ): A list of functions which each return a status object associated_obj (Device | None): The device that should be associated with the returned status Returns: Status: A status object which is marked as complete once all of the Status objects returned by the unwrapped functions have completed. """ # The returned status - marked as finished at the end of the callback chain. If any # intermediate statuses have an exception, the full_status will timeout. full_status = Status(obj=associated_obj, timeout=timeout) def closing_func(old_status: Status): if old_status.exception() is not None: set_global_exception_and_log(old_status) else: full_status.set_finished() # Wrap each function by first checking the previous status and attaching a callback # to the next function in the chain def wrap_func( old_status: Status, current_func: Callable[[], StatusBase], next_func ): if old_status.exception() is not None: set_global_exception_and_log(old_status) return status = call_func(current_func) if not isinstance(status, StatusBase): LOGGER.error( f"wrap_func attempted to wrap {current_func} when it does not return a Status" ) raise ValueError(f"{current_func} does not return a Status") status.add_callback(next_func) def set_global_exception_and_log(status: Status): error = status.exception() full_status.set_exception(error) # So full_status can also be checked for any errors LOGGER.error(f"Status {status} has failed with error {error}") # Each wrapped function needs to attach its callback to the subsequent wrapped # function, therefore wrapped_funcs list needs to be created in reverse order wrapped_funcs = [] wrapped_funcs.append( partial( wrap_func, current_func=functions_to_chain[-1], next_func=closing_func, ) ) # Wrap each function in reverse for num, func in enumerate(list(reversed(functions_to_chain))[1:-1]): wrapped_funcs.append( partial( wrap_func, current_func=func, next_func=wrapped_funcs[-1], ) ) starting_status = Status(done=True, success=True) # Initiate the chain of functions wrap_func(starting_status, functions_to_chain[0], wrapped_funcs[-1]) return full_status
[docs] def call_func(func: Callable[[], StatusBase]) -> StatusBase: return func()
[docs] class SetWhenEnabled(OphydAsyncDevice, Movable): """A device that sets the proc field of a PV when it becomes enabled.""" def __init__(self, name: str = "", prefix: str = ""): self.proc = epics_signal_rw(int, prefix + ".PROC") self.disp = epics_signal_r(int, prefix + ".DISP") super().__init__(name) @AsyncStatus.wrap async def set(self, value: int): await wait_for_value(self.disp, 0, None) await self.proc.set(value)