Source code for smartem_backend.cli.random_prior_updates

import random
import threading
import time

import typer
from sqlalchemy.engine import Engine
from sqlmodel import Session, select

from smartem_backend.model.database import FoilHole, Grid, GridSquare, Micrograph
from smartem_backend.predictions.update import prior_update
from smartem_backend.utils import get_db_engine, logger

# Default time ranges for processing steps (in seconds)
DEFAULT_MOTION_CORRECTION_DELAY = (1.0, 3.0)
DEFAULT_CTF_DELAY = (2.0, 5.0)
DEFAULT_PARTICLE_PICKING_DELAY = (3.0, 8.0)
DEFAULT_PARTICLE_SELECTION_DELAY = (2.0, 6.0)


def perform_random_updates(
    grid_uuid: str | None = None,
    random_range: tuple[float, float] = (0, 1),
    origin: str = "motion_correction",
    engine: Engine = None,
) -> None:
    if engine is None:
        engine = get_db_engine()

    with Session(engine) as sess:
        if grid_uuid is None:
            grid = sess.exec(select(Grid)).first()
            grid_uuid = grid.uuid
        mics = sess.exec(
            select(Micrograph, FoilHole, GridSquare)
            .where(Micrograph.foilhole_uuid == FoilHole.uuid)
            .where(FoilHole.gridsquare_uuid == GridSquare.uuid)
            .where(GridSquare.grid_uuid == grid_uuid)
        ).all()
        for m in mics:
            prior_update(random.uniform(random_range[0], random_range[1]) < 0.5, m[0].uuid, sess, origin=origin)
    return None


[docs] def simulate_processing_pipeline(micrograph_uuid: str, engine: Engine = None) -> None: """ Simulate the data processing pipeline for a micrograph with random delays. Pipeline: motion correction → ctf → particle picking → particle selection Args: micrograph_uuid: UUID of the micrograph to process engine: Optional database engine (uses singleton if not provided) """ if engine is None: engine = get_db_engine() processing_steps = [ ("motion_correction", DEFAULT_MOTION_CORRECTION_DELAY), ("ctf", DEFAULT_CTF_DELAY), ("particle_picking", DEFAULT_PARTICLE_PICKING_DELAY), ("particle_selection", DEFAULT_PARTICLE_SELECTION_DELAY), ] logger.info(f"Starting processing pipeline simulation for micrograph {micrograph_uuid}") for step_name, delay_range in processing_steps: # Random delay for this processing step delay = random.uniform(delay_range[0], delay_range[1]) logger.debug(f"Simulating {step_name} for micrograph {micrograph_uuid}, delay: {delay:.2f}s") time.sleep(delay) # Perform random weight update for this step - reuse the same engine try: with Session(engine) as sess: # Generate random quality result (True/False) quality_result = random.choice([True, False]) prior_update(quality=quality_result, micrograph_uuid=micrograph_uuid, session=sess, origin=step_name) logger.info(f"Completed {step_name} for micrograph {micrograph_uuid}, quality: {quality_result}") except Exception as e: logger.error(f"Error in {step_name} for micrograph {micrograph_uuid}: {e}") # Continue with next step even if one fails logger.info(f"Completed processing pipeline simulation for micrograph {micrograph_uuid}")
[docs] def simulate_processing_pipeline_async(micrograph_uuid: str, engine: Engine = None) -> None: """ Start the processing pipeline simulation in a background thread. Args: micrograph_uuid: UUID of the micrograph to process engine: Optional database engine (uses singleton if not provided) """ if engine is None: engine = get_db_engine() def run_simulation(): simulate_processing_pipeline(micrograph_uuid, engine) # Start simulation in background thread so it doesn't block the consumer thread = threading.Thread(target=run_simulation, daemon=True) thread.start() logger.debug(f"Started background processing simulation for micrograph {micrograph_uuid}")
def run() -> None: typer.run(perform_random_updates) return None