Source code for dodal.devices.zocalo.zocalo_interaction
import dataclasses
import getpass
import os
import socket
from dataclasses import dataclass
import zocalo.configuration
from workflows.transport import lookup
from dodal.devices.zocalo.zocalo_constants import ZOCALO_ENV
from dodal.log import LOGGER
def _get_zocalo_connection(environment):
zc = zocalo.configuration.from_file()
zc.activate_environment(environment)
transport = lookup("PikaTransport")()
transport.connect()
return transport
[docs]
@dataclass
class ZocaloStartInfo:
"""
ispyb_dcid (int): The ID of the data collection in ISPyB
filename (str): The name of the file that the detector will store into dev/shm
start_frame_index (int): The index of the first image of this collection within the file
written by the detector
number_of_frames (int): The number of frames in this collection
message_index (int): Which trigger this is in the detector collection e.g. 0 for the
first collection after a single arm, 1 for the next...
"""
ispyb_dcid: int
filename: str | None
start_frame_index: int
number_of_frames: int
message_index: int
def _get_zocalo_headers() -> tuple[str, str]:
user = os.environ.get("ZOCALO_GO_USER")
# cannot default as getuser() will throw when called from inside a container
if not user:
user = getpass.getuser()
hostname = os.environ.get("ZOCALO_GO_HOSTNAME", socket.gethostname())
return user, hostname
[docs]
class ZocaloTrigger:
"""This class just sends 'run_start' and 'run_end' messages to zocalo, it is
intended to be used in bluesky callback classes. To get results from zocalo back
into a plan, use the ZocaloResults ophyd device.
see https://diamondlightsource.github.io/dodal/main/how-to/zocalo.html for
more information about zocalo."""
def __init__(self, environment: str = ZOCALO_ENV):
self.zocalo_environment: str = environment
def _send_to_zocalo(self, parameters: dict):
transport = _get_zocalo_connection(self.zocalo_environment)
try:
message = {
"recipes": ["mimas"],
"parameters": parameters,
}
user, hostname = _get_zocalo_headers()
header = {
"zocalo.go.user": user,
"zocalo.go.host": hostname,
}
transport.send("processing_recipe", message, headers=header)
finally:
transport.disconnect()
[docs]
def run_start(
self,
start_data: ZocaloStartInfo,
):
"""Tells the data analysis pipeline we have started a run.
Assumes that appropriate data has already been put into ISPyB
Args:
start_data (ZocaloStartInfo): Data about the collection to send to zocalo
"""
LOGGER.info(f"Starting Zocalo job {start_data}")
data = dataclasses.asdict(start_data)
data["event"] = "start"
self._send_to_zocalo(data)
[docs]
def run_end(self, data_collection_id: int):
"""Tells the data analysis pipeline we have finished a run.
Assumes that appropriate data has already been put into ISPyB
Args:
data_collection_id (int): The ID of the data collection representing the
gridscan in ISPyB
"""
LOGGER.info(f"Ending Zocalo job with ispyb id {data_collection_id}")
self._send_to_zocalo(
{
"event": "end",
"ispyb_dcid": data_collection_id,
}
)