Source code for smartem_backend.model.mq_event

from datetime import datetime
from enum import Enum

from pydantic import BaseModel, computed_field, field_serializer, model_validator


def non_negative_float(v: float):
    return v >= 0


[docs] class MessageQueueEventType(str, Enum): """Enum listing various system events that are mapped to messages in RabbitMQ""" ACQUISITION_CREATED = "acquisition.created" ACQUISITION_UPDATED = "acquisition.updated" ACQUISITION_DELETED = "acquisition.deleted" ATLAS_CREATED = "atlas.created" ATLAS_UPDATED = "atlas.updated" ATLAS_DELETED = "atlas.deleted" ATLAS_TILE_CREATED = "atlas_tile.created" ATLAS_TILE_UPDATED = "atlas_tile.updated" ATLAS_TILE_DELETED = "atlas_tile.deleted" GRID_CREATED = "grid.created" GRID_UPDATED = "grid.updated" GRID_DELETED = "grid.deleted" GRID_REGISTERED = "grid.registered" # all grid squares on grid have been registered GRIDSQUARE_CREATED = "gridsquare.created" GRIDSQUARE_UPDATED = "gridsquare.updated" GRIDSQUARE_DELETED = "gridsquare.deleted" GRIDSQUARE_REGISTERED = "gridsquare.registered" # all foil holes on gridsquare have been registered GRIDSQUARE_LOWMAG_CREATED = "gridsquare_lowmag.created" GRIDSQUARE_LOWMAG_UPDATED = "gridsquare_lowmag.updated" GRIDSQUARE_LOWMAG_DELETED = "gridsquare_lowmag.deleted" FOILHOLE_CREATED = "foilhole.created" FOILHOLE_UPDATED = "foilhole.updated" FOILHOLE_DELETED = "foilhole.deleted" MICROGRAPH_CREATED = "micrograph.created" MICROGRAPH_UPDATED = "micrograph.updated" MICROGRAPH_DELETED = "micrograph.deleted" # ACQUISITION_SESSION_START = "acquisition.session_started" # ACQUISITION_SESSION_PAUSE = "acquisition.session_paused" # ACQUISITION_SESSION_RESUME = "acquisition.session_resumed" # ACQUISITION_SESSION_END = "acquisition.session_ended" # GRID_SCAN_SESSION_START = "grid.scan_started" # GRID_SCAN_SESSION_COMPLETE = "grid.scan_completed" # GRIDSQUARES_DECISION_START = "gridsquares.decision_started" GRIDSQUARES_DECISION_COMPLETE = "gridsquares.decision_completed" FOILHOLES_DETECTED = "foilholes.detected" # TODO is this redundant? # FOILHOLES_DECISION_START = "foilholes.decision_started" FOILHOLES_DECISION_COMPLETE = "foilholes.decision_completed" MICROGRAPHS_DETECTED = "micrographs.detected" # TODO is this redundant? # MOTION_CORRECTION_START = "motion_correction.started" MOTION_CORRECTION_COMPLETE = "motion_correction.completed" # CTF_START = "ctf.started" CTF_COMPLETE = "ctf.completed" # PARTICLE_PICKING_START = "particle_picking.started" PARTICLE_PICKING_COMPLETE = "particle_picking.completed" # PARTICLE_SELECTION_START = "particle_selection.started" PARTICLE_SELECTION_COMPLETE = "particle_selection.completed" GRIDSQUARE_MODEL_PREDICTION = "gridsquare.model_prediction" FOILHOLE_MODEL_PREDICTION = "foilhole.model_prediction" MODEL_PARAMETER_UPDATE = "gridsquare.model_parameter_update"
class GenericEventMessageBody(BaseModel): event_type: MessageQueueEventType @field_serializer("event_type") def serialize_event_type(self, event_type: MessageQueueEventType, _info): return str(event_type.value) @field_serializer("*", when_used="json") def serialize_datetime_fields(self, v, _info): if isinstance(v, datetime): return v.isoformat() return v # ============ Acquisition Events ============
[docs] class AcquisitionEventBase(GenericEventMessageBody): """Base model for acquisition events""" pass
[docs] class AcquisitionCreatedEvent(AcquisitionEventBase): """Event emitted when an acquisition is created""" uuid: str id: str | None = None
[docs] class AcquisitionUpdatedEvent(AcquisitionEventBase): """Event emitted when an acquisition is updated""" uuid: str id: str | None = None
[docs] class AcquisitionDeletedEvent(AcquisitionEventBase): """Event emitted when an acquisition is deleted""" uuid: str
# ============ Atlas Events ============
[docs] class AtlasEventBase(GenericEventMessageBody): """Base model for atlas events""" pass
[docs] class AtlasCreatedEvent(AtlasEventBase): """Event emitted when an atlas is created""" uuid: str id: str | None = None grid_uuid: str | None = None
[docs] class AtlasUpdatedEvent(AtlasEventBase): """Event emitted when an atlas is updated""" uuid: str id: str | None = None grid_uuid: str | None = None
[docs] class AtlasDeletedEvent(AtlasEventBase): """Event emitted when an atlas is deleted""" uuid: str
# ============ Atlas Tile Events ============
[docs] class AtlasTileEventBase(GenericEventMessageBody): """Base model for atlas tile events""" pass
[docs] class AtlasTileCreatedEvent(AtlasTileEventBase): """Event emitted when an atlas tile is created""" uuid: str id: str | None = None atlas_uuid: str | None = None
[docs] class AtlasTileUpdatedEvent(AtlasTileEventBase): """Event emitted when an atlas tile is updated""" uuid: str id: str | None = None atlas_uuid: str | None = None
[docs] class AtlasTileDeletedEvent(AtlasTileEventBase): """Event emitted when an atlas tile is deleted""" uuid: str
# ============ Grid Events ============ # TODO could definitely be refactored to reduce duplication. # Since GridCreatedEvent and GridUpdatedEvent share the same fields, with the only difference being # that the fields are optional in the updated event.
[docs] class GridEventBase(GenericEventMessageBody): """Base model for grid events""" pass
[docs] class GridCreatedEvent(GridEventBase): """Event emitted when a grid is created""" uuid: str acquisition_uuid: str | None = None
[docs] class GridUpdatedEvent(GridEventBase): """Event emitted when a grid is updated""" uuid: str acquisition_uuid: str | None = None
[docs] class GridDeletedEvent(GridEventBase): """Event emitted when a grid is deleted""" uuid: str
[docs] class GridRegisteredEvent(GridEventBase): """Event emitted when all squares at atlas mag have been registered for a grid""" grid_uuid: str
# ============ Grid Square Events ============
[docs] class GridSquareEventBase(GenericEventMessageBody): """Base model for grid square events""" pass
[docs] class GridSquareCreatedEvent(GridSquareEventBase): """Event emitted when a grid square is created""" uuid: str grid_uuid: str | None = None gridsquare_id: str | None = None
[docs] class GridSquareUpdatedEvent(GridSquareEventBase): """Event emitted when a grid square is updated""" uuid: str grid_uuid: str | None = None gridsquare_id: str | None = None
[docs] class GridSquareDeletedEvent(GridSquareEventBase): """Event emitted when a grid square is deleted""" uuid: str
[docs] class GridSquareRegisteredEvent(GridSquareEventBase): """Event emitted when all holes at square mag have been registered for a grid square""" uuid: str grid_uuid: str | None = None
# ============ Foil Hole Events ============
[docs] class FoilHoleEventBase(GenericEventMessageBody): """Base model for foil hole events""" pass
[docs] class FoilHoleCreatedEvent(FoilHoleEventBase): """Event emitted when a foil hole is created""" uuid: str foilhole_id: str | None = None gridsquare_uuid: str | None = None gridsquare_id: str | None = None
[docs] class FoilHoleUpdatedEvent(FoilHoleEventBase): """Event emitted when a foil hole is updated""" uuid: str foilhole_id: str | None = None gridsquare_uuid: str | None = None gridsquare_id: str | None = None
[docs] class FoilHoleDeletedEvent(FoilHoleEventBase): """Event emitted when a foil hole is deleted""" uuid: str
# ============ Micrograph Events ============
[docs] class MicrographEventBase(GenericEventMessageBody): """Base model for micrograph events""" pass
[docs] class MicrographCreatedEvent(MicrographEventBase): """Event emitted when a micrograph is created""" uuid: str foilhole_uuid: str | None = None foilhole_id: str | None = None micrograph_id: str | None = None
[docs] class MicrographUpdatedEvent(MicrographEventBase): """Event emitted when a micrograph is updated""" uuid: str foilhole_uuid: str | None = None foilhole_id: str | None = None micrograph_id: str | None = None
[docs] class MicrographDeletedEvent(MicrographEventBase): """Event emitted when a micrograph is deleted""" uuid: str
# ============ Data Processing and ML Events ============ class MotionCorrectionStartBody(GenericEventMessageBody): micrograph_uuid: str class MotionCorrectionCompleteBody(GenericEventMessageBody): micrograph_uuid: str total_motion: float average_motion: float ctf_max_resolution_estimate: float @model_validator(mode="after") def check_model(self): if self.total_motion < 0: raise ValueError("Total Motion should be a non-negative float") if self.average_motion < 0: raise ValueError("Average Motion should be a non-negative float") if self.ctf_max_resolution_estimate < 0: raise ValueError("CTF Max Resolution should be a non-negative float") return self class CtfStartBody(GenericEventMessageBody): micrograph_uuid: str class CtfCompleteBody(BaseModel): micrograph_uuid: str total_motion: float average_motion: float ctf_max_resolution_estimate: float @model_validator(mode="after") def check_model(self): if self.total_motion < 0: raise ValueError("Total Motion should be a non-negative float") if self.average_motion < 0: raise ValueError("Average Motion should be a non-negative float") if self.ctf_max_resolution_estimate < 0: raise ValueError("CTF Max Resolution should be a non-negative float") return self class ParticlePickingStartBody(GenericEventMessageBody): micrograph_uuid: str class ParticlePickingCompleteBody(GenericEventMessageBody): micrograph_uuid: str number_of_particles_picked: int pick_distribution: dict @model_validator(mode="after") def check_model(self): if self.number_of_particles_picked < 0: raise ValueError("Number of Particles Picked should be a non-negative int") # TODO validate that number of particles picked equals to the size of pick distribution return self
[docs] class ParticleSelectionStartBody(GenericEventMessageBody): """TODO For particle selection start see: https://github.com/DiamondLightSource/cryoem-services/blob/main/src/cryoemservices/services/select_particles.py#L16C1-L21C41 class ParticleSelectionStart(BaseModel): input_file: str = Field(..., min_length=1) batch_size: int image_size: int incomplete_batch_size: int = 10000 relion_options: RelionServiceOptions """ micrograph_uuid: str
class ParticleSelectionCompleteBody(GenericEventMessageBody): micrograph_uuid: str number_of_particles_selected: int number_of_particles_rejected: int selection_distribution: dict @computed_field @property def total_number_of_particles(self) -> int: return self.number_of_particles_selected + self.number_of_particles_rejected @model_validator(mode="after") def check_model(self): if self.number_of_particles_selected < 0: raise ValueError("Number of Particles Selected should be a non-negative int") if self.number_of_particles_rejected < 0: raise ValueError("Number of Particles Rejected should be a non-negative int") return self class GridSquareModelPredictionEvent(GenericEventMessageBody): gridsquare_uuid: str prediction_model_name: str prediction_value: float class FoilHoleModelPredictionEvent(GenericEventMessageBody): foilhole_uuid: str prediction_model_name: str prediction_value: float class ModelParameterUpdateEvent(GenericEventMessageBody): grid_uuid: str prediction_model_name: str key: str value: float group: str = ""