Source code for smartem_backend.mq_publisher

from smartem_backend.model.mq_event import (
    AcquisitionCreatedEvent,
    AcquisitionDeletedEvent,
    AcquisitionUpdatedEvent,
    AgentInstructionCreatedEvent,
    AgentInstructionExpiredEvent,
    AgentInstructionUpdatedEvent,
    AtlasCreatedEvent,
    AtlasDeletedEvent,
    AtlasTileCreatedEvent,
    AtlasTileDeletedEvent,
    AtlasTileUpdatedEvent,
    AtlasUpdatedEvent,
    CtfCompleteBody,
    CtfRegisteredBody,
    FoilHoleCreatedEvent,
    FoilHoleDeletedEvent,
    FoilHoleModelPredictionEvent,
    FoilHoleUpdatedEvent,
    GridCreatedEvent,
    GridDeletedEvent,
    GridRegisteredEvent,
    GridSquareCreatedEvent,
    GridSquareDeletedEvent,
    GridSquareModelPredictionEvent,
    GridSquareRegisteredEvent,
    GridSquareUpdatedEvent,
    GridUpdatedEvent,
    MessageQueueEventType,
    MicrographCreatedEvent,
    MicrographDeletedEvent,
    MicrographUpdatedEvent,
    ModelParameterUpdateEvent,
    MotionCorrectionCompleteBody,
    MotionCorrectionRegisteredBody,
    MultiFoilHoleModelPredictionEvent,
    RefreshPredictionsEvent,
)
from smartem_backend.utils import rmq_publisher


# ========== Acquisition DB Entity Mutations ==========
[docs] def publish_acquisition_created(uuid, id=None, **kwargs): """Publish acquisition created event to RabbitMQ""" event = AcquisitionCreatedEvent(event_type=MessageQueueEventType.ACQUISITION_CREATED, uuid=uuid, id=id) return rmq_publisher.publish_event(MessageQueueEventType.ACQUISITION_CREATED, event)
[docs] def publish_acquisition_updated(uuid, id=None): """Publish acquisition updated event to RabbitMQ""" event = AcquisitionUpdatedEvent(event_type=MessageQueueEventType.ACQUISITION_UPDATED, uuid=uuid, id=id) return rmq_publisher.publish_event(MessageQueueEventType.ACQUISITION_UPDATED, event)
[docs] def publish_acquisition_deleted(uuid): """Publish acquisition deleted event to RabbitMQ""" event = AcquisitionDeletedEvent(event_type=MessageQueueEventType.ACQUISITION_DELETED, uuid=uuid) return rmq_publisher.publish_event(MessageQueueEventType.ACQUISITION_DELETED, event)
# ========== Atlas DB Entity Mutations ==========
[docs] def publish_atlas_created(uuid, id=None, grid_uuid=None): """Publish atlas created event to RabbitMQ""" event = AtlasCreatedEvent(event_type=MessageQueueEventType.ATLAS_CREATED, uuid=uuid, id=id, grid_uuid=grid_uuid) return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_CREATED, event)
[docs] def publish_atlas_updated(uuid, id=None, grid_uuid=None): """Publish atlas updated event to RabbitMQ""" event = AtlasUpdatedEvent(event_type=MessageQueueEventType.ATLAS_UPDATED, uuid=uuid, id=id, grid_uuid=grid_uuid) return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_UPDATED, event)
[docs] def publish_atlas_deleted(uuid): """Publish atlas deleted event to RabbitMQ""" event = AtlasDeletedEvent(event_type=MessageQueueEventType.ATLAS_DELETED, uuid=uuid) return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_DELETED, event)
# ========== Atlas Tile DB Entity Mutations ==========
[docs] def publish_atlas_tile_created(uuid, id=None, atlas_uuid=None): """Publish atlas tile created event to RabbitMQ""" event = AtlasTileCreatedEvent( event_type=MessageQueueEventType.ATLAS_TILE_CREATED, uuid=uuid, id=id, atlas_uuid=atlas_uuid ) return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_TILE_CREATED, event)
[docs] def publish_atlas_tile_updated(uuid, id=None, atlas_uuid=None): """Publish atlas tile updated event to RabbitMQ""" event = AtlasTileUpdatedEvent( event_type=MessageQueueEventType.ATLAS_TILE_UPDATED, uuid=uuid, id=id, atlas_uuid=atlas_uuid ) return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_TILE_UPDATED, event)
[docs] def publish_atlas_tile_deleted(uuid): """Publish atlas tile deleted event to RabbitMQ""" event = AtlasTileDeletedEvent(event_type=MessageQueueEventType.ATLAS_TILE_DELETED, uuid=uuid) return rmq_publisher.publish_event(MessageQueueEventType.ATLAS_TILE_DELETED, event)
# ========== Grid DB Entity Mutations ==========
[docs] def publish_grid_created(uuid, acquisition_uuid=None): """Publish grid created event to RabbitMQ""" event = GridCreatedEvent( event_type=MessageQueueEventType.GRID_CREATED, uuid=uuid, acquisition_uuid=acquisition_uuid ) return rmq_publisher.publish_event(MessageQueueEventType.GRID_CREATED, event)
[docs] def publish_grid_updated(uuid, acquisition_uuid=None): """Publish grid updated event to RabbitMQ""" event = GridUpdatedEvent( event_type=MessageQueueEventType.GRID_UPDATED, uuid=uuid, acquisition_uuid=acquisition_uuid ) return rmq_publisher.publish_event(MessageQueueEventType.GRID_UPDATED, event)
[docs] def publish_grid_deleted(uuid): """Publish grid deleted event to RabbitMQ""" event = GridDeletedEvent(event_type=MessageQueueEventType.GRID_DELETED, uuid=uuid) return rmq_publisher.publish_event(MessageQueueEventType.GRID_DELETED, event)
[docs] def publish_grid_registered(uuid: str): """Publish grid updated event to RabbitMQ""" event = GridRegisteredEvent( event_type=MessageQueueEventType.GRID_REGISTERED, grid_uuid=uuid, ) return rmq_publisher.publish_event(MessageQueueEventType.GRID_REGISTERED, event)
# ========== Grid Square DB Entity Mutations ==========
[docs] def publish_gridsquare_created(uuid, grid_uuid=None, gridsquare_id=None): """Publish grid square created event to RabbitMQ""" event = GridSquareCreatedEvent( event_type=MessageQueueEventType.GRIDSQUARE_CREATED, uuid=uuid, grid_uuid=grid_uuid, gridsquare_id=gridsquare_id ) return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_CREATED, event)
[docs] def publish_gridsquare_updated(uuid, grid_uuid=None, gridsquare_id=None): """Publish grid square updated event to RabbitMQ""" event = GridSquareUpdatedEvent( event_type=MessageQueueEventType.GRIDSQUARE_UPDATED, uuid=uuid, grid_uuid=grid_uuid, gridsquare_id=gridsquare_id ) return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_UPDATED, event)
[docs] def publish_gridsquare_deleted(uuid): """Publish grid square deleted event to RabbitMQ""" event = GridSquareDeletedEvent(event_type=MessageQueueEventType.GRIDSQUARE_DELETED, uuid=uuid) return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_DELETED, event)
[docs] def publish_gridsquare_registered(uuid: str, count: int | None = None): """Publish grid square updated event to RabbitMQ""" event = GridSquareRegisteredEvent( event_type=MessageQueueEventType.GRIDSQUARE_REGISTERED, uuid=uuid, count=count, ) return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_REGISTERED, event)
[docs] def publish_gridsquare_lowmag_created(uuid, grid_uuid=None, gridsquare_id=None): """Publish low mag grid square created event to RabbitMQ""" event = GridSquareCreatedEvent( event_type=MessageQueueEventType.GRIDSQUARE_LOWMAG_CREATED, uuid=uuid, grid_uuid=grid_uuid, gridsquare_id=gridsquare_id, ) return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_LOWMAG_CREATED, event)
[docs] def publish_gridsquare_lowmag_updated(uuid, grid_uuid=None, gridsquare_id=None): """Publish low mag grid square updated event to RabbitMQ""" event = GridSquareUpdatedEvent( event_type=MessageQueueEventType.GRIDSQUARE_LOWMAG_UPDATED, uuid=uuid, grid_uuid=grid_uuid, gridsquare_id=gridsquare_id, ) return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_LOWMAG_UPDATED, event)
[docs] def publish_gridsquare_lowmag_deleted(uuid): """Publish low mag grid square deleted event to RabbitMQ""" event = GridSquareDeletedEvent(event_type=MessageQueueEventType.GRIDSQUARE_LOWMAG_DELETED, uuid=uuid) return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_LOWMAG_DELETED, event)
# ========== Foil Hole DB Entity Mutations ==========
[docs] def publish_foilhole_created(uuid, foilhole_id=None, gridsquare_uuid=None, gridsquare_id=None): """Publish foil hole created event to RabbitMQ""" event = FoilHoleCreatedEvent( event_type=MessageQueueEventType.FOILHOLE_CREATED, uuid=uuid, foilhole_id=foilhole_id, gridsquare_uuid=gridsquare_uuid, gridsquare_id=gridsquare_id, ) return rmq_publisher.publish_event(MessageQueueEventType.FOILHOLE_CREATED, event)
[docs] def publish_foilhole_updated(uuid, foilhole_id=None, gridsquare_uuid=None, gridsquare_id=None): """Publish foil hole updated event to RabbitMQ""" event = FoilHoleUpdatedEvent( event_type=MessageQueueEventType.FOILHOLE_UPDATED, uuid=uuid, foilhole_id=foilhole_id, gridsquare_uuid=gridsquare_uuid, gridsquare_id=gridsquare_id, ) return rmq_publisher.publish_event(MessageQueueEventType.FOILHOLE_UPDATED, event)
[docs] def publish_foilhole_deleted(uuid): """Publish foil hole deleted event to RabbitMQ""" event = FoilHoleDeletedEvent(event_type=MessageQueueEventType.FOILHOLE_DELETED, uuid=uuid) return rmq_publisher.publish_event(MessageQueueEventType.FOILHOLE_DELETED, event)
# ========== Micrograph DB Entity Mutations ==========
[docs] def publish_micrograph_created(uuid, foilhole_uuid=None, foilhole_id=None, micrograph_id=None): """Publish micrograph created event to RabbitMQ""" event = MicrographCreatedEvent( event_type=MessageQueueEventType.MICROGRAPH_CREATED, uuid=uuid, foilhole_uuid=foilhole_uuid, foilhole_id=foilhole_id, micrograph_id=micrograph_id, ) return rmq_publisher.publish_event(MessageQueueEventType.MICROGRAPH_CREATED, event)
[docs] def publish_micrograph_updated(uuid, foilhole_uuid=None, foilhole_id=None, micrograph_id=None): """Publish micrograph updated event to RabbitMQ""" event = MicrographUpdatedEvent( event_type=MessageQueueEventType.MICROGRAPH_UPDATED, uuid=uuid, foilhole_uuid=foilhole_uuid, foilhole_id=foilhole_id, micrograph_id=micrograph_id, ) return rmq_publisher.publish_event(MessageQueueEventType.MICROGRAPH_UPDATED, event)
[docs] def publish_micrograph_deleted(uuid): """Publish micrograph deleted event to RabbitMQ""" event = MicrographDeletedEvent(event_type=MessageQueueEventType.MICROGRAPH_DELETED, uuid=uuid) return rmq_publisher.publish_event(MessageQueueEventType.MICROGRAPH_DELETED, event)
[docs] def publish_gridsquare_model_prediction( gridsquare_uuid: str, model_name: str, prediction_value: float, metric: str | None = None ): """Publish model prediction event for a grid square to RabbitMQ""" event = GridSquareModelPredictionEvent( event_type=MessageQueueEventType.GRIDSQUARE_MODEL_PREDICTION, gridsquare_uuid=gridsquare_uuid, prediction_model_name=model_name, prediction_value=prediction_value, metric=metric, ) return rmq_publisher.publish_event(MessageQueueEventType.GRIDSQUARE_MODEL_PREDICTION, event)
[docs] def publish_foilhole_model_prediction( foilhole_uuid: str, model_name: str, prediction_value: float, metric: str | None = None ): """Publish model prediction event for a foil hole to RabbitMQ""" event = FoilHoleModelPredictionEvent( event_type=MessageQueueEventType.FOILHOLE_MODEL_PREDICTION, foilhole_uuid=foilhole_uuid, prediction_model_name=model_name, prediction_value=prediction_value, metric=metric, ) return rmq_publisher.publish_event(MessageQueueEventType.FOILHOLE_MODEL_PREDICTION, event)
[docs] def publish_multi_foilhole_model_prediction( foilhole_uuids: list[str], model_name: str, prediction_value: float, metric: str | None = None ): """Publish model prediction event for a list of foil holes to RabbitMQ""" event = MultiFoilHoleModelPredictionEvent( event_type=MessageQueueEventType.MULTI_FOILHOLE_MODEL_PREDICTION, foilhole_uuids=foilhole_uuids, prediction_model_name=model_name, prediction_value=prediction_value, metric=metric, ) return rmq_publisher.publish_event(MessageQueueEventType.MULTI_FOILHOLE_MODEL_PREDICTION, event)
[docs] def publish_model_parameter_update( grid_uuid: str, model_name: str, key: str, value: float, metric: str | None = None, group: str = "" ): """Publish model parameter update event to RabbitMQ""" event = ModelParameterUpdateEvent( event_type=MessageQueueEventType.MODEL_PARAMETER_UPDATE, grid_uuid=grid_uuid, prediction_model_name=model_name, key=key, value=value, metric=metric, group=group, ) return rmq_publisher.publish_event(MessageQueueEventType.MODEL_PARAMETER_UPDATE, event)
[docs] def publish_motion_correction_completed(micrograph_uuid: str, total_motion: float, average_motion: float): """Publish motion correction completed event to RabbitMQ""" event = MotionCorrectionCompleteBody( event_type=MessageQueueEventType.MOTION_CORRECTION_COMPLETE, micrograph_uuid=micrograph_uuid, total_motion=total_motion, average_motion=average_motion, ) return rmq_publisher.publish_event(MessageQueueEventType.MOTION_CORRECTION_COMPLETE, event)
[docs] def publish_motion_correction_registered(micrograph_uuid: str, quality: bool, metric_name: str | None = None): """Publish motion correction registered (after motion correction completed) event to RabbitMQ""" event = MotionCorrectionRegisteredBody( event_type=MessageQueueEventType.MOTION_CORRECTION_REGISTERED, micrograph_uuid=micrograph_uuid, quality=quality, metric_name=metric_name, ) return rmq_publisher.publish_event(MessageQueueEventType.MOTION_CORRECTION_REGISTERED, event)
[docs] def publish_ctf_estimation_completed(micrograph_uuid: str, ctf_max_res: float): """Publish CTF estimation completed event to RabbitMQ""" event = CtfCompleteBody( event_type=MessageQueueEventType.CTF_COMPLETE, micrograph_uuid=micrograph_uuid, ctf_max_res=ctf_max_res ) return rmq_publisher.publish_event(MessageQueueEventType.CTF_COMPLETE, event)
[docs] def publish_ctf_estimation_registered(micrograph_uuid: str, quality: bool, metric_name: str | None = None): """Publish CTF estimation registered (after motion correction completed) event to RabbitMQ""" event = CtfRegisteredBody( event_type=MessageQueueEventType.CTF_REGISTERED, micrograph_uuid=micrograph_uuid, quality=quality, metric_name=metric_name, ) return rmq_publisher.publish_event(MessageQueueEventType.CTF_REGISTERED, event)
[docs] def publish_refresh_predictions(grid_uuid: str): """Publish event to trigger overall prediction recalculation""" event = RefreshPredictionsEvent(event_type=MessageQueueEventType.REFRESH_PREDICTIONS, grid_uuid=grid_uuid) return rmq_publisher.publish_event(MessageQueueEventType.REFRESH_PREDICTIONS, event)
# ========== Agent Communication Events ==========
[docs] def publish_agent_instruction_created(instruction_id, session_id, agent_id, instruction_type, payload, **kwargs): """Publish agent instruction created event to RabbitMQ""" event = AgentInstructionCreatedEvent( event_type=MessageQueueEventType.AGENT_INSTRUCTION_CREATED, instruction_id=instruction_id, session_id=session_id, agent_id=agent_id, instruction_type=instruction_type, payload=payload, sequence_number=kwargs.get("sequence_number"), priority=kwargs.get("priority", "normal"), expires_at=kwargs.get("expires_at"), instruction_metadata=kwargs.get("instruction_metadata"), ) return rmq_publisher.publish_event(MessageQueueEventType.AGENT_INSTRUCTION_CREATED, event)
[docs] def publish_agent_instruction_updated(instruction_id, session_id, agent_id, status, acknowledged_at=None): """Publish agent instruction updated event to RabbitMQ""" event = AgentInstructionUpdatedEvent( event_type=MessageQueueEventType.AGENT_INSTRUCTION_UPDATED, instruction_id=instruction_id, session_id=session_id, agent_id=agent_id, status=status, acknowledged_at=acknowledged_at, ) return rmq_publisher.publish_event(MessageQueueEventType.AGENT_INSTRUCTION_UPDATED, event)
[docs] def publish_agent_instruction_expired(instruction_id, session_id, agent_id, expires_at, retry_count): """Publish agent instruction expired event to RabbitMQ""" event = AgentInstructionExpiredEvent( event_type=MessageQueueEventType.AGENT_INSTRUCTION_EXPIRED, instruction_id=instruction_id, session_id=session_id, agent_id=agent_id, expires_at=expires_at, retry_count=retry_count, ) return rmq_publisher.publish_event(MessageQueueEventType.AGENT_INSTRUCTION_EXPIRED, event)