Source code for dodal.devices.oav.oav_to_redis_forwarder
import asyncio
import io
import pickle
import uuid
from collections.abc import Awaitable, Callable
from datetime import timedelta
import numpy as np
from aiohttp import ClientResponse, ClientSession
from bluesky.protocols import Flyable, Stoppable
from ophyd_async.core import (
AsyncStatus,
StandardReadable,
soft_signal_r_and_setter,
soft_signal_rw,
)
from ophyd_async.epics.signal import epics_signal_r
from PIL import Image
from redis.asyncio import StrictRedis
from dodal.log import LOGGER
[docs]
async def get_next_jpeg(response: ClientResponse) -> bytes:
JPEG_START_BYTE = b"\xff\xd8"
JPEG_STOP_BYTE = b"\xff\xd9"
while True:
line = await response.content.readline()
if line.startswith(JPEG_START_BYTE):
return line + await response.content.readuntil(JPEG_STOP_BYTE)
[docs]
class OAVToRedisForwarder(StandardReadable, Flyable, Stoppable):
"""Forwards OAV image data to redis. To use call:
> bps.kickoff(oav_forwarder)
> bps.monitor(oav_forwarder.uuid)
> bps.complete(oav_forwarder)
"""
DATA_EXPIRY_DAYS = 7
def __init__(
self,
prefix: str,
redis_host: str,
redis_password: str,
redis_db: int = 0,
name: str = "",
) -> None:
"""Reads image data from the MJPEG stream on an OAV and forwards it into a
redis database. This is currently only used for murko integration.
Arguments:
prefix: str the PV prefix of the OAV
redis_host: str the host where the redis database is running
redis_password: str the password for the redis database
redis_db: int which redis database to connect to, defaults to 0
name: str the name of this device
"""
self.stream_url = epics_signal_r(str, f"{prefix}MJPG:MJPG_URL_RBV")
with self.add_children_as_readables():
self.uuid, self.uuid_setter = soft_signal_r_and_setter(str)
self.forwarding_task = None
self.redis_client = StrictRedis(
host=redis_host, password=redis_password, db=redis_db
)
self._stop_flag = False
self.sample_id = soft_signal_rw(int, initial_value=0)
# The uuid that images are being saved under, this should be monitored for
# callbacks to correlate the data
self.uuid, self.uuid_setter = soft_signal_r_and_setter(str)
super().__init__(name=name)
async def _get_frame_and_put_to_redis(self, response: ClientResponse):
"""Converts the data that comes in as a jpeg byte stream into a numpy array of
RGB values, pickles this array then writes it to redis.
"""
jpeg_bytes = await get_next_jpeg(response)
self.uuid_setter(image_uuid := str(uuid.uuid4()))
img = Image.open(io.BytesIO(jpeg_bytes))
image_data = pickle.dumps(np.asarray(img))
sample_id = str(await self.sample_id.get_value())
await self.redis_client.hset(sample_id, image_uuid, image_data) # type: ignore
await self.redis_client.expire(sample_id, timedelta(days=self.DATA_EXPIRY_DAYS))
LOGGER.debug(f"Sent frame to redis key {sample_id} with uuid {image_uuid}")
async def _open_connection_and_do_function(
self, function_to_do: Callable[[ClientResponse, str | None], Awaitable]
):
stream_url = await self.stream_url.get_value()
async with ClientSession() as session:
async with session.get(stream_url) as response:
await function_to_do(response, stream_url)
async def _stream_to_redis(self, response, _):
while not self._stop_flag:
await self._get_frame_and_put_to_redis(response)
await asyncio.sleep(0.01)
async def _confirm_mjpg_stream(self, response, stream_url):
if response.content_type != "multipart/x-mixed-replace":
raise ValueError(f"{stream_url} is not an MJPG stream")
@AsyncStatus.wrap
async def kickoff(self):
self._stop_flag = False
await self._open_connection_and_do_function(self._confirm_mjpg_stream)
self.forwarding_task = asyncio.create_task(
self._open_connection_and_do_function(self._stream_to_redis)
)
@AsyncStatus.wrap
async def complete(self):
assert self.forwarding_task, "Device not kicked off"
await self.stop()
@AsyncStatus.wrap
async def stop(self, success=True):
if self.forwarding_task:
self._stop_flag = True
await self.forwarding_task