from smartem_backend.model.mq_event import (
AcquisitionCreatedEvent,
AcquisitionDeletedEvent,
AcquisitionUpdatedEvent,
AgentInstructionCreatedEvent,
AgentInstructionExpiredEvent,
AgentInstructionUpdatedEvent,
AtlasCreatedEvent,
AtlasDeletedEvent,
AtlasTileCreatedEvent,
AtlasTileDeletedEvent,
AtlasTileUpdatedEvent,
AtlasUpdatedEvent,
CtfCompleteBody,
CtfRegisteredBody,
FoilHoleCreatedEvent,
FoilHoleDeletedEvent,
FoilHoleModelPredictionEvent,
FoilHoleUpdatedEvent,
GridCreatedEvent,
GridDeletedEvent,
GridRegisteredEvent,
GridSquareCreatedEvent,
GridSquareDeletedEvent,
GridSquareModelPredictionEvent,
GridSquareRegisteredEvent,
GridSquareUpdatedEvent,
GridUpdatedEvent,
MessageQueueEventType,
MicrographCreatedEvent,
MicrographDeletedEvent,
MicrographUpdatedEvent,
ModelParameterUpdateEvent,
MotionCorrectionCompleteBody,
MotionCorrectionRegisteredBody,
MultiFoilHoleModelPredictionEvent,
RefreshPredictionsEvent,
)
from smartem_backend.utils import rmq_publisher
# ========== Acquisition DB Entity Mutations ==========
[docs]
def publish_acquisition_created(uuid, id=None, **kwargs):
"""Publish acquisition created event to RabbitMQ"""
event = AcquisitionCreatedEvent(event_type=MessageQueueEventType.ACQUISITION_CREATED, uuid=uuid, id=id)
return rmq_publisher.publish_event(MessageQueueEventType.ACQUISITION_CREATED, event)
[docs]
def publish_acquisition_updated(uuid, id=None):
"""Publish acquisition updated event to RabbitMQ"""
event = AcquisitionUpdatedEvent(event_type=MessageQueueEventType.ACQUISITION_UPDATED, uuid=uuid, id=id)
return rmq_publisher.publish_event(MessageQueueEventType.ACQUISITION_UPDATED, event)
[docs]
def publish_acquisition_deleted(uuid):
"""Publish acquisition deleted event to RabbitMQ"""
event = AcquisitionDeletedEvent(event_type=MessageQueueEventType.ACQUISITION_DELETED, uuid=uuid)
return rmq_publisher.publish_event(MessageQueueEventType.ACQUISITION_DELETED, event)
# ========== Atlas DB Entity Mutations ==========
[docs]
def publish_atlas_created(uuid, id=None, grid_uuid=None):
"""Publish atlas created event to RabbitMQ"""
event = AtlasCreatedEvent(event_type=MessageQueueEventType.ATLAS_CREATED, uuid=uuid, id=id, grid_uuid=grid_uuid)
return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_CREATED, event)
[docs]
def publish_atlas_updated(uuid, id=None, grid_uuid=None):
"""Publish atlas updated event to RabbitMQ"""
event = AtlasUpdatedEvent(event_type=MessageQueueEventType.ATLAS_UPDATED, uuid=uuid, id=id, grid_uuid=grid_uuid)
return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_UPDATED, event)
[docs]
def publish_atlas_deleted(uuid):
"""Publish atlas deleted event to RabbitMQ"""
event = AtlasDeletedEvent(event_type=MessageQueueEventType.ATLAS_DELETED, uuid=uuid)
return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_DELETED, event)
# ========== Atlas Tile DB Entity Mutations ==========
[docs]
def publish_atlas_tile_created(uuid, id=None, atlas_uuid=None):
"""Publish atlas tile created event to RabbitMQ"""
event = AtlasTileCreatedEvent(
event_type=MessageQueueEventType.ATLAS_TILE_CREATED, uuid=uuid, id=id, atlas_uuid=atlas_uuid
)
return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_TILE_CREATED, event)
[docs]
def publish_atlas_tile_updated(uuid, id=None, atlas_uuid=None):
"""Publish atlas tile updated event to RabbitMQ"""
event = AtlasTileUpdatedEvent(
event_type=MessageQueueEventType.ATLAS_TILE_UPDATED, uuid=uuid, id=id, atlas_uuid=atlas_uuid
)
return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_TILE_UPDATED, event)
[docs]
def publish_atlas_tile_deleted(uuid):
"""Publish atlas tile deleted event to RabbitMQ"""
event = AtlasTileDeletedEvent(event_type=MessageQueueEventType.ATLAS_TILE_DELETED, uuid=uuid)
return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_TILE_DELETED, event)
# ========== Grid DB Entity Mutations ==========
[docs]
def publish_grid_created(uuid, acquisition_uuid=None):
"""Publish grid created event to RabbitMQ"""
event = GridCreatedEvent(
event_type=MessageQueueEventType.GRID_CREATED, uuid=uuid, acquisition_uuid=acquisition_uuid
)
return rmq_publisher.publish_event(MessageQueueEventType.GRID_CREATED, event)
[docs]
def publish_grid_updated(uuid, acquisition_uuid=None):
"""Publish grid updated event to RabbitMQ"""
event = GridUpdatedEvent(
event_type=MessageQueueEventType.GRID_UPDATED, uuid=uuid, acquisition_uuid=acquisition_uuid
)
return rmq_publisher.publish_event(MessageQueueEventType.GRID_UPDATED, event)
[docs]
def publish_grid_deleted(uuid):
"""Publish grid deleted event to RabbitMQ"""
event = GridDeletedEvent(event_type=MessageQueueEventType.GRID_DELETED, uuid=uuid)
return rmq_publisher.publish_event(MessageQueueEventType.GRID_DELETED, event)
[docs]
def publish_grid_registered(uuid: str):
"""Publish grid updated event to RabbitMQ"""
event = GridRegisteredEvent(
event_type=MessageQueueEventType.GRID_REGISTERED,
grid_uuid=uuid,
)
return rmq_publisher.publish_event(MessageQueueEventType.GRID_REGISTERED, event)
# ========== Grid Square DB Entity Mutations ==========
[docs]
def publish_gridsquare_created(uuid, grid_uuid=None, gridsquare_id=None):
"""Publish grid square created event to RabbitMQ"""
event = GridSquareCreatedEvent(
event_type=MessageQueueEventType.GRIDSQUARE_CREATED, uuid=uuid, grid_uuid=grid_uuid, gridsquare_id=gridsquare_id
)
return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_CREATED, event)
[docs]
def publish_gridsquare_updated(uuid, grid_uuid=None, gridsquare_id=None):
"""Publish grid square updated event to RabbitMQ"""
event = GridSquareUpdatedEvent(
event_type=MessageQueueEventType.GRIDSQUARE_UPDATED, uuid=uuid, grid_uuid=grid_uuid, gridsquare_id=gridsquare_id
)
return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_UPDATED, event)
[docs]
def publish_gridsquare_deleted(uuid):
"""Publish grid square deleted event to RabbitMQ"""
event = GridSquareDeletedEvent(event_type=MessageQueueEventType.GRIDSQUARE_DELETED, uuid=uuid)
return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_DELETED, event)
[docs]
def publish_gridsquare_registered(uuid: str, count: int | None = None):
"""Publish grid square updated event to RabbitMQ"""
event = GridSquareRegisteredEvent(
event_type=MessageQueueEventType.GRIDSQUARE_REGISTERED,
uuid=uuid,
count=count,
)
return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_REGISTERED, event)
[docs]
def publish_gridsquare_lowmag_created(uuid, grid_uuid=None, gridsquare_id=None):
"""Publish low mag grid square created event to RabbitMQ"""
event = GridSquareCreatedEvent(
event_type=MessageQueueEventType.GRIDSQUARE_LOWMAG_CREATED,
uuid=uuid,
grid_uuid=grid_uuid,
gridsquare_id=gridsquare_id,
)
return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_LOWMAG_CREATED, event)
[docs]
def publish_gridsquare_lowmag_updated(uuid, grid_uuid=None, gridsquare_id=None):
"""Publish low mag grid square updated event to RabbitMQ"""
event = GridSquareUpdatedEvent(
event_type=MessageQueueEventType.GRIDSQUARE_LOWMAG_UPDATED,
uuid=uuid,
grid_uuid=grid_uuid,
gridsquare_id=gridsquare_id,
)
return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_LOWMAG_UPDATED, event)
[docs]
def publish_gridsquare_lowmag_deleted(uuid):
"""Publish low mag grid square deleted event to RabbitMQ"""
event = GridSquareDeletedEvent(event_type=MessageQueueEventType.GRIDSQUARE_LOWMAG_DELETED, uuid=uuid)
return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_LOWMAG_DELETED, event)
# ========== Foil Hole DB Entity Mutations ==========
[docs]
def publish_foilhole_created(uuid, foilhole_id=None, gridsquare_uuid=None, gridsquare_id=None):
"""Publish foil hole created event to RabbitMQ"""
event = FoilHoleCreatedEvent(
event_type=MessageQueueEventType.FOILHOLE_CREATED,
uuid=uuid,
foilhole_id=foilhole_id,
gridsquare_uuid=gridsquare_uuid,
gridsquare_id=gridsquare_id,
)
return rmq_publisher.publish_event(MessageQueueEventType.FOILHOLE_CREATED, event)
[docs]
def publish_foilhole_updated(uuid, foilhole_id=None, gridsquare_uuid=None, gridsquare_id=None):
"""Publish foil hole updated event to RabbitMQ"""
event = FoilHoleUpdatedEvent(
event_type=MessageQueueEventType.FOILHOLE_UPDATED,
uuid=uuid,
foilhole_id=foilhole_id,
gridsquare_uuid=gridsquare_uuid,
gridsquare_id=gridsquare_id,
)
return rmq_publisher.publish_event(MessageQueueEventType.FOILHOLE_UPDATED, event)
[docs]
def publish_foilhole_deleted(uuid):
"""Publish foil hole deleted event to RabbitMQ"""
event = FoilHoleDeletedEvent(event_type=MessageQueueEventType.FOILHOLE_DELETED, uuid=uuid)
return rmq_publisher.publish_event(MessageQueueEventType.FOILHOLE_DELETED, event)
# ========== Micrograph DB Entity Mutations ==========
[docs]
def publish_micrograph_created(uuid, foilhole_uuid=None, foilhole_id=None, micrograph_id=None):
"""Publish micrograph created event to RabbitMQ"""
event = MicrographCreatedEvent(
event_type=MessageQueueEventType.MICROGRAPH_CREATED,
uuid=uuid,
foilhole_uuid=foilhole_uuid,
foilhole_id=foilhole_id,
micrograph_id=micrograph_id,
)
return rmq_publisher.publish_event(MessageQueueEventType.MICROGRAPH_CREATED, event)
[docs]
def publish_micrograph_updated(uuid, foilhole_uuid=None, foilhole_id=None, micrograph_id=None):
"""Publish micrograph updated event to RabbitMQ"""
event = MicrographUpdatedEvent(
event_type=MessageQueueEventType.MICROGRAPH_UPDATED,
uuid=uuid,
foilhole_uuid=foilhole_uuid,
foilhole_id=foilhole_id,
micrograph_id=micrograph_id,
)
return rmq_publisher.publish_event(MessageQueueEventType.MICROGRAPH_UPDATED, event)
[docs]
def publish_micrograph_deleted(uuid):
"""Publish micrograph deleted event to RabbitMQ"""
event = MicrographDeletedEvent(event_type=MessageQueueEventType.MICROGRAPH_DELETED, uuid=uuid)
return rmq_publisher.publish_event(MessageQueueEventType.MICROGRAPH_DELETED, event)
[docs]
def publish_gridsquare_model_prediction(
gridsquare_uuid: str, model_name: str, prediction_value: float, metric: str | None = None
):
"""Publish model prediction event for a grid square to RabbitMQ"""
event = GridSquareModelPredictionEvent(
event_type=MessageQueueEventType.GRIDSQUARE_MODEL_PREDICTION,
gridsquare_uuid=gridsquare_uuid,
prediction_model_name=model_name,
prediction_value=prediction_value,
metric=metric,
)
return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_MODEL_PREDICTION, event)
[docs]
def publish_foilhole_model_prediction(
foilhole_uuid: str, model_name: str, prediction_value: float, metric: str | None = None
):
"""Publish model prediction event for a foil hole to RabbitMQ"""
event = FoilHoleModelPredictionEvent(
event_type=MessageQueueEventType.FOILHOLE_MODEL_PREDICTION,
foilhole_uuid=foilhole_uuid,
prediction_model_name=model_name,
prediction_value=prediction_value,
metric=metric,
)
return rmq_publisher.publish_event(MessageQueueEventType.FOILHOLE_MODEL_PREDICTION, event)
[docs]
def publish_multi_foilhole_model_prediction(
foilhole_uuids: list[str], model_name: str, prediction_value: float, metric: str | None = None
):
"""Publish model prediction event for a list of foil holes to RabbitMQ"""
event = MultiFoilHoleModelPredictionEvent(
event_type=MessageQueueEventType.MULTI_FOILHOLE_MODEL_PREDICTION,
foilhole_uuids=foilhole_uuids,
prediction_model_name=model_name,
prediction_value=prediction_value,
metric=metric,
)
return rmq_publisher.publish_event(MessageQueueEventType.MULTI_FOILHOLE_MODEL_PREDICTION, event)
[docs]
def publish_model_parameter_update(
grid_uuid: str, model_name: str, key: str, value: float, metric: str | None = None, group: str = ""
):
"""Publish model parameter update event to RabbitMQ"""
event = ModelParameterUpdateEvent(
event_type=MessageQueueEventType.MODEL_PARAMETER_UPDATE,
grid_uuid=grid_uuid,
prediction_model_name=model_name,
key=key,
value=value,
metric=metric,
group=group,
)
return rmq_publisher.publish_event(MessageQueueEventType.MODEL_PARAMETER_UPDATE, event)
[docs]
def publish_motion_correction_completed(micrograph_uuid: str, total_motion: float, average_motion: float):
"""Publish motion correction completed event to RabbitMQ"""
event = MotionCorrectionCompleteBody(
event_type=MessageQueueEventType.MOTION_CORRECTION_COMPLETE,
micrograph_uuid=micrograph_uuid,
total_motion=total_motion,
average_motion=average_motion,
)
return rmq_publisher.publish_event(MessageQueueEventType.MOTION_CORRECTION_COMPLETE, event)
[docs]
def publish_motion_correction_registered(micrograph_uuid: str, quality: bool, metric_name: str | None = None):
"""Publish motion correction registered (after motion correction completed) event to RabbitMQ"""
event = MotionCorrectionRegisteredBody(
event_type=MessageQueueEventType.MOTION_CORRECTION_REGISTERED,
micrograph_uuid=micrograph_uuid,
quality=quality,
metric_name=metric_name,
)
return rmq_publisher.publish_event(MessageQueueEventType.MOTION_CORRECTION_REGISTERED, event)
[docs]
def publish_ctf_estimation_completed(micrograph_uuid: str, ctf_max_res: float):
"""Publish CTF estimation completed event to RabbitMQ"""
event = CtfCompleteBody(
event_type=MessageQueueEventType.CTF_COMPLETE, micrograph_uuid=micrograph_uuid, ctf_max_res=ctf_max_res
)
return rmq_publisher.publish_event(MessageQueueEventType.CTF_COMPLETE, event)
[docs]
def publish_ctf_estimation_registered(micrograph_uuid: str, quality: bool, metric_name: str | None = None):
"""Publish CTF estimation registered (after motion correction completed) event to RabbitMQ"""
event = CtfRegisteredBody(
event_type=MessageQueueEventType.CTF_REGISTERED,
micrograph_uuid=micrograph_uuid,
quality=quality,
metric_name=metric_name,
)
return rmq_publisher.publish_event(MessageQueueEventType.CTF_REGISTERED, event)
[docs]
def publish_refresh_predictions(grid_uuid: str):
"""Publish event to trigger overall prediction recalculation"""
event = RefreshPredictionsEvent(event_type=MessageQueueEventType.REFRESH_PREDICTIONS, grid_uuid=grid_uuid)
return rmq_publisher.publish_event(MessageQueueEventType.REFRESH_PREDICTIONS, event)
# ========== Agent Communication Events ==========
[docs]
def publish_agent_instruction_created(instruction_id, session_id, agent_id, instruction_type, payload, **kwargs):
"""Publish agent instruction created event to RabbitMQ"""
event = AgentInstructionCreatedEvent(
event_type=MessageQueueEventType.AGENT_INSTRUCTION_CREATED,
instruction_id=instruction_id,
session_id=session_id,
agent_id=agent_id,
instruction_type=instruction_type,
payload=payload,
sequence_number=kwargs.get("sequence_number"),
priority=kwargs.get("priority", "normal"),
expires_at=kwargs.get("expires_at"),
instruction_metadata=kwargs.get("instruction_metadata"),
)
return rmq_publisher.publish_event(MessageQueueEventType.AGENT_INSTRUCTION_CREATED, event)
[docs]
def publish_agent_instruction_updated(instruction_id, session_id, agent_id, status, acknowledged_at=None):
"""Publish agent instruction updated event to RabbitMQ"""
event = AgentInstructionUpdatedEvent(
event_type=MessageQueueEventType.AGENT_INSTRUCTION_UPDATED,
instruction_id=instruction_id,
session_id=session_id,
agent_id=agent_id,
status=status,
acknowledged_at=acknowledged_at,
)
return rmq_publisher.publish_event(MessageQueueEventType.AGENT_INSTRUCTION_UPDATED, event)
[docs]
def publish_agent_instruction_expired(instruction_id, session_id, agent_id, expires_at, retry_count):
"""Publish agent instruction expired event to RabbitMQ"""
event = AgentInstructionExpiredEvent(
event_type=MessageQueueEventType.AGENT_INSTRUCTION_EXPIRED,
instruction_id=instruction_id,
session_id=session_id,
agent_id=agent_id,
expires_at=expires_at,
retry_count=retry_count,
)
return rmq_publisher.publish_event(MessageQueueEventType.AGENT_INSTRUCTION_EXPIRED, event)