Source code for dodal.common.signal_utils
from collections.abc import Callable, Coroutine
from typing import Any, TypeVar
from bluesky.protocols import Reading
from ophyd_async.core import SignalR, SoftSignalBackend
from ophyd_async.core._soft_signal_backend import SignalMetadata
T = TypeVar("T")
[docs]
class HarwareBackedSoftSignalBackend(SoftSignalBackend[T]):
def __init__(
self,
get_from_hardware_func: Callable[[], Coroutine[Any, Any, T]],
*args,
**kwargs,
) -> None:
self.get_from_hardware_func = get_from_hardware_func
super().__init__(*args, **kwargs)
async def _update_value(self):
new_value = await self.get_from_hardware_func()
await self.put(new_value)
async def get_reading(self) -> Reading:
await self._update_value()
return await super().get_reading()
async def get_value(self) -> T:
await self._update_value()
return await super().get_value()
[docs]
def create_hardware_backed_soft_signal(
datatype: type[T],
get_from_hardware_func: Callable[[], Coroutine[Any, Any, T]],
units: str | None = None,
precision: int | None = None,
):
"""Creates a soft signal that, when read will call the function passed into
`get_from_hardware_func` and return this.
This will allow you to make soft signals derived from arbitrary hardware signals.
However, calling subscribe on this signal does not give you a sensible value and
the signal is currently read only. See https://github.com/bluesky/ophyd-async/issues/525
for a more full solution.
"""
metadata = SignalMetadata(units=units, precision=precision)
return SignalR(
backend=HarwareBackedSoftSignalBackend(
get_from_hardware_func, datatype, metadata=metadata
)
)