Backend-to-Agent Communication System Design#
This document provides comprehensive system design documentation for the backend-to-agent communication system implementing ADR #8’s hybrid SSE + HTTP architecture. This system enables real-time delivery of microscope control instructions from Kubernetes-hosted backend services to Windows workstations controlling cryo-electron microscopes.
Architecture Overview#
System Context#
The SmartEM Decisions platform operates in a distributed environment where backend services run in Kubernetes clusters whilst agent services execute on Windows workstations directly connected to scientific equipment. The communication system bridges this divide whilst meeting high-throughput requirements.
Implementation Status: COMPLETED - This POC implementation provides a production-ready backend-to-agent communication system with full SSE streaming, RabbitMQ integration, database persistence, and comprehensive connection management.
graph TB
subgraph k8s["Kubernetes Cluster - Diamond Light Source"]
subgraph packages["SmartEM Packages"]
subgraph backend["smartem_backend"]
api_server["api_server.py<br/>FastAPI + SSE Endpoints"]
consumer["consumer.py<br/>RabbitMQ Event Processing"]
conn_mgr["agent_connection_manager.py<br/>Connection Health Monitoring"]
end
subgraph common["smartem_common"]
schemas["schemas.py<br/>Shared Data Models"]
utils["utils.py<br/>Common Utilities"]
end
subgraph athena["athena_api"]
athena_client["client.py<br/>External API Integration"]
end
subgraph mcp["smartem_mcp"]
mcp_server["server.py<br/>Model Context Protocol"]
end
end
subgraph infra["Infrastructure Services"]
db[("PostgreSQL<br/>AgentSession, AgentInstruction<br/>AgentConnection, AgentInstructionAcknowledgement")]
mq[("RabbitMQ<br/>Event Communication<br/>Instruction Lifecycle")]
end
end
subgraph boundary["Network Boundary - Windows Workstations"]
subgraph agents["Agent Software (Windows)"]
subgraph agent_pkg["smartem_agent"]
fs_watcher["fs_watcher.py<br/>File System Monitoring"]
fs_parser["fs_parser.py<br/>EPU Data Parsing"]
sse_client["SSEAgentClient<br/>Stream Connection"]
end
end
subgraph equipment["Scientific Equipment"]
subgraph scope1["Microscope Workstation 1"]
epu1["EPU Software<br/>(ThermoFisher)"]
em1["Cryo-EM Microscope 1"]
gpfs1["GPFS Storage<br/>Image Data"]
end
subgraph scope2["Microscope Workstation 2"]
epu2["EPU Software<br/>(ThermoFisher)"]
em2["Cryo-EM Microscope 2"]
gpfs2["GPFS Storage<br/>Image Data"]
end
subgraph scopeN["Microscope Workstation N"]
epuN["EPU Software<br/>(ThermoFisher)"]
emN["Cryo-EM Microscope N"]
gpfsN["GPFS Storage<br/>Image Data"]
end
end
end
%% Database and message queue connections
api_server --> db
api_server --> mq
consumer --> db
consumer --> mq
conn_mgr --> db
conn_mgr --> mq
%% SSE streaming connections
api_server -.->|"SSE: /agent/{id}/session/{sid}/instructions/stream"| sse_client
%% HTTP acknowledgement connections
sse_client -.->|"HTTP: /agent/{id}/session/{sid}/instructions/{iid}/ack"| api_server
%% File system monitoring
fs_watcher --> fs_parser
fs_parser --> sse_client
%% Equipment integration
epu1 --> gpfs1
epu2 --> gpfs2
epuN --> gpfsN
fs_watcher -.->|"Monitor EPU Output"| gpfs1
fs_watcher -.->|"Monitor EPU Output"| gpfs2
fs_watcher -.->|"Monitor EPU Output"| gpfsN
%% External API integration
consumer --> athena_client
%% Package dependencies
backend -.-> common
agent_pkg -.-> common
fs_parser -.-> schemas
%% Styling
classDef k8s fill:#e1f5fe,stroke:#0277bd,stroke-width:2px
classDef boundary fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
classDef database fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef equipment fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef communication fill:#fff8e1,stroke:#f57f17,stroke-width:2px
class k8s,packages,backend,common,athena,mcp,infra k8s
class boundary,agents,agent_pkg,equipment boundary
class db,mq database
class scope1,scope2,scopeN,epu1,epu2,epuN,em1,em2,emN,gpfs1,gpfs2,gpfsN equipment
class api_server,sse_client,fs_watcher communication
Service Architecture#
The communication system employs a separate service approach rather than integrating directly with the main API service. This design provides:
Isolation of concerns: Communication logic remains separate from core business logic
Scalability independence: Communication service can scale independently based on connection load
Operational simplicity: Monitoring and debugging of persistent connections without affecting main API
Resource management: Dedicated resources for managing long-lived SSE connections
graph LR
subgraph services["Service Layer"]
main["Main API Service"]
comm["Communication Service"]
end
subgraph data["Data Layer"]
db[("PostgreSQL")]
mq[("RabbitMQ")]
end
subgraph agents["Agent Layer"]
agent["Agent Clients"]
end
main --> db
main --> mq
comm --> db
comm --> mq
mq --> comm
comm <--> agents
main -.->|Events| mq
mq -.->|Instructions| comm
Technical Stack Integration#
FastAPI Integration#
The communication service leverages FastAPI’s native SSE support through StreamingResponse
and event streaming
patterns:
# Conceptual endpoint structure
@app.get("/agent/{agent_id}/instructions/stream")
async def stream_instructions(agent_id: str):
"""SSE endpoint for streaming instructions to agents"""
@app.post("/agent/{agent_id}/instructions/{instruction_id}/ack")
async def acknowledge_instruction(agent_id: str, instruction_id: str):
"""HTTP endpoint for instruction acknowledgements"""
RabbitMQ Message Flow#
The system integrates with the existing RabbitMQ infrastructure as an event communication backbone between ML components and the communication service:
sequenceDiagram
participant ML as ML Pipeline
participant MQ as RabbitMQ
participant Comm as Communication Service
participant Agent as Agent Client
participant DB as PostgreSQL
ML->>MQ: Publish instruction event
MQ->>Comm: Deliver to agent queue
Comm->>DB: Store instruction state
Comm->>Agent: Stream via SSE
Agent->>Comm: HTTP acknowledgement
Comm->>DB: Update delivery status
Comm->>MQ: Publish ACK event
PostgreSQL Schema Design#
The communication system extends the existing database schema with instruction tracking tables:
-- Conceptual schema structure
CREATE TABLE agent_instructions (
id UUID PRIMARY KEY,
agent_id VARCHAR NOT NULL,
instruction_type VARCHAR NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
delivered_at TIMESTAMP WITH TIME ZONE,
acknowledged_at TIMESTAMP WITH TIME ZONE,
status VARCHAR NOT NULL DEFAULT 'pending',
retry_count INTEGER DEFAULT 0
);
CREATE TABLE agent_connections (
agent_id VARCHAR PRIMARY KEY,
connection_id VARCHAR NOT NULL,
connected_at TIMESTAMP WITH TIME ZONE NOT NULL,
last_heartbeat TIMESTAMP WITH TIME ZONE,
connection_type VARCHAR NOT NULL -- 'sse'
);
Component Interactions and Data Flows#
Primary Communication Flow (SSE)#
The primary communication path uses Server-Sent Events for efficient real-time instruction delivery:
sequenceDiagram
participant Agent as Agent Client
participant Comm as Communication Service
participant MQ as RabbitMQ
participant DB as PostgreSQL
Agent->>Comm: Establish SSE connection
Comm->>DB: Register connection
loop Instruction Processing
MQ->>Comm: New instruction event
Comm->>DB: Store instruction
Comm->>Agent: Stream instruction (SSE)
Agent->>Comm: HTTP acknowledgement
Comm->>DB: Update delivery status
end
Agent->>Comm: Connection closed
Comm->>DB: Clean up connection state
Error Handling and Recovery#
The system implements comprehensive error handling across multiple failure scenarios:
graph TD
start([Instruction Generated]) --> sse{SSE Connected?}
sse -->|Yes| stream[Stream via SSE]
sse -->|No| queue[Queue for Next Connection]
stream --> ack{Acknowledgement Received?}
ack -->|Yes| complete[Mark Complete]
ack -->|No| retry{Retry Count < Max?}
retry -->|Yes| delay[Exponential Backoff]
retry -->|No| failed[Mark Failed]
delay --> reconnect[Reconnect SSE]
reconnect --> stream
queue --> reconnect
failed --> end([End])
complete --> end([End])
Scalability Design#
Connection Management#
The system is designed to support one session per agent machine with a capacity of 20 concurrent SSE connections. This design aligns with the facility’s requirements of up to 20 microscope workstations, where each workstation controls a single microscope.
graph TB
subgraph comm["Communication Service Instance"]
pool["Connection Pool"]
mgr["Connection Manager"]
health["Health Monitor"]
end
subgraph agents["Agent Connections"]
agent1["Agent 1 (SSE)"]
agent2["Agent 2 (SSE)"]
agent3["Agent 3 (SSE)"]
agentN["Agent N (SSE)"]
end
pool --> agent1
pool --> agent2
pool --> agent3
pool --> agentN
mgr --> pool
health --> pool
pool -.->|Max 20 concurrent| limit[Connection Limit]
Theoretical Scaling Limits#
Current Architecture Bottlenecks:
Database Write Performance: High-frequency instruction persistence and state updates may impact database performance
Database Connection Pool: Connection pool limits for concurrent instruction storage and retrieval operations
Memory Usage: Each SSE connection maintains in-memory state (~1-2MB per connection)
RabbitMQ Throughput: Event notification capacity for real-time updates
Scaling Strategies:
Horizontal Scaling: Deploy multiple communication service instances behind load balancer
Connection Sharding: Distribute agents across service instances by agent ID hash
Resource Optimization: Implement connection pooling and memory-efficient streaming
Database Optimization: Use connection pooling and read replicas for instruction queries
Performance Characteristics#
Expected Throughput:
Instruction Frequency: 1 instruction per 30-120 seconds per agent during active data collection
Peak Load: 20 agents × 2 instructions/minute = 40 instructions/minute system-wide (at maximum frequency)
Message Size: 1-10KB JSON payloads for microscope control instructions
Latency Requirements: Sub-second delivery for real-time workflow efficiency
Implementation Specifications#
SSE Streaming Service Design#
The SSE streaming service implements persistent connections with automatic reconnection handling:
# Conceptual implementation structure
class SSEInstructionStream:
"""Manages SSE connections and instruction streaming"""
async def stream_instructions(self, agent_id: str) -> AsyncIterator[str]:
"""Async generator for SSE instruction stream"""
async def handle_connection_lifecycle(self, agent_id: str):
"""Manages connection establishment, maintenance, and cleanup"""
async def process_pending_instructions(self, agent_id: str):
"""Retrieves and processes pending instructions from database for SSE delivery"""
class ConnectionManager:
"""Manages active SSE connections and health monitoring"""
def register_connection(self, agent_id: str, connection_id: str):
"""Register new SSE connection"""
def cleanup_connection(self, agent_id: str):
"""Clean up disconnected SSE connection"""
async def health_check_connections(self):
"""Monitor connection health and handle failures"""
HTTP Acknowledgement Endpoints#
HTTP acknowledgement endpoints provide reliable delivery confirmation:
# Conceptual acknowledgement handling
@dataclass
class InstructionAcknowledgement:
instruction_id: str
agent_id: str
acknowledged_at: datetime
status: Literal["received", "processed", "failed"]
error_message: str | None = None
class AcknowledgementHandler:
"""Handles instruction acknowledgements and delivery tracking"""
async def process_acknowledgement(
self,
ack: InstructionAcknowledgement
) -> bool:
"""Process and store instruction acknowledgement"""
async def handle_declined_instruction(
self,
agent_id: str,
instruction_id: str,
reason: str
):
"""Handle agent declining instruction execution"""
SSE Retry Implementation#
The system implements robust SSE reconnection with exponential backoff:
class SSERetryManager:
"""Manages SSE connection retry logic with exponential backoff"""
def should_retry(self, agent_id: str, attempt_count: int) -> bool:
"""Determine if SSE connection should be retried"""
def calculate_backoff_delay(self, attempt_count: int) -> int:
"""Calculate exponential backoff delay for reconnection"""
async def reconnect_with_backoff(self, agent_id: str) -> bool:
"""Attempt SSE reconnection with backoff delay"""
Implementation Status & Components#
This POC implementation provides a complete working system with the following implemented components:
Completed Features#
1. Database Schema & Migration (Alembic)#
AgentSession: Session management for agent connections
AgentInstruction: Instruction storage with metadata and lifecycle tracking
AgentConnection: Real-time connection tracking with heartbeat monitoring
AgentInstructionAcknowledgement: Comprehensive acknowledgement tracking
2. FastAPI SSE Endpoints#
/agent/{agent_id}/session/{session_id}/instructions/stream
: Real-time SSE streaming/agent/{agent_id}/session/{session_id}/instructions/{instruction_id}/ack
: HTTP acknowledgementDebug endpoints: Connection statistics and session management
3. RabbitMQ Integration#
Event Publishers: Agent instruction lifecycle events
Consumer Handlers: Process instruction events and database updates
Message Types:
agent.instruction.created
,agent.instruction.updated
,agent.instruction.expired
4. Enhanced Agent Client (SSEAgentClient
)#
Exponential backoff retry logic with jitter
Connection statistics and monitoring
Comprehensive error handling and recovery
Processing time tracking for performance metrics
5. Connection Management Service (AgentConnectionManager
)#
Automatic stale connection cleanup (2-minute timeout)
Instruction expiration handling with retry logic
Session activity monitoring (1-hour inactivity threshold)
Real-time statistics and health monitoring
6. Production-Ready Example Client#
Complete instruction processing workflow
Multiple instruction type support (stage movement, image acquisition)
Processing time measurement and acknowledgement
Enhanced error handling and statistics display
🚀 Key Implementation Highlights#
Database-backed persistence: All instruction state persisted with full audit trail
Connection resilience: Automatic reconnection with exponential backoff
Health monitoring: Background tasks for cleanup and monitoring
Production logging: Comprehensive logging at all system levels
Type safety: Full Pydantic model validation throughout
Test-friendly design: Debug endpoints for system verification
Message Lifecycle Management#
Sequential Delivery Requirements#
The system ensures sequential message delivery to maintain microscope control instruction ordering:
stateDiagram-v2
[*] --> Generated: Instruction created
Generated --> Queued: Added to agent queue
Queued --> Streaming: SSE connection available
Queued --> Retry: Connection unavailable
Streaming --> Delivered: Agent receives via SSE
Retry --> Queued: After backoff delay
Delivered --> Acknowledged: Agent confirms receipt
Delivered --> Declined: Agent declines execution
Delivered --> Timeout: No acknowledgement received
Acknowledged --> [*]: Complete
Declined --> [*]: Logged and complete
Timeout --> Retry: Attempt redelivery
Retry --> Queued: Requeue instruction
Retry --> Failed: Max retries exceeded
Failed --> [*]: Mark as failed
Database Persistence Strategy#
The system uses PostgreSQL as source of truth with RabbitMQ as the event communication backbone:
graph LR
subgraph truth["Source of Truth"]
db[("PostgreSQL")]
end
subgraph events["Event Communication"]
mq[("RabbitMQ")]
end
subgraph ops["Operational Queries"]
queries["Instruction State<br/>Connection Health<br/>Performance Metrics<br/>Audit Trails"]
end
db --> mq
db --> queries
db -.->|Primary| primary[Instruction Persistence<br/>State Management<br/>Audit Logging]
mq -.->|Secondary| secondary[Event Notification<br/>Component Communication<br/>Real-time Updates]
Agent Restart Message Replay (TODO)#
Current Status: Not implemented - marked as future requirement
Design Considerations:
Determine replay window (e.g., last 24 hours of unacknowledged instructions)
Handle duplicate instruction detection and prevention
Manage instruction sequence numbering across agent restarts
Implement replay request mechanism from agent on startup
External Message Types and Transformation Pipeline#
External Message Types#
The system processes external RabbitMQ messages from data processing pipelines and machine learning components to trigger real-time decision-making and microscope control instructions. These external messages represent completion events from various stages of the cryo-EM data processing workflow:
Primary External Message Types:
MOTION_CORRECTION_COMPLETE: Indicates completion of motion correction processing for collected images
CTF_COMPLETE: Signals completion of contrast transfer function (CTF) estimation for image quality assessment
PARTICLE_PICKING_COMPLETE: Notifies completion of automated particle identification in micrographs
PARTICLE_SELECTION_COMPLETE: Indicates completion of particle quality assessment and selection
GRIDSQUARE_MODEL_PREDICTION: Provides machine learning predictions for gridsquare quality and suitability
FOILHOLE_MODEL_PREDICTION: Delivers ML predictions for individual foilhole targeting recommendations
MODEL_PARAMETER_UPDATE: Communicates updates to ML model parameters affecting decision thresholds
These external messages contain scientific data processing results, quality metrics, and ML predictions that drive the backend’s decision logic for microscope control instructions.
Message Transformation Pipeline#
The transformation pipeline converts external data processing events into actionable SSE instructions for agents.
This pipeline operates within the consumer.py
component and implements the core business logic for scientific
decision-making:
graph TB
subgraph external["External Systems"]
ml_pipeline["ML Pipeline"]
image_proc["Image Processing"]
ctf_est["CTF Estimation"]
particle_pick["Particle Picking"]
end
subgraph rabbitmq["Message Queue"]
ext_queue["External Event Queue"]
end
subgraph backend["Backend Processing"]
consumer["consumer.py"]
decision_logic["Decision Logic Engine"]
threshold_eval["Threshold Evaluation"]
instruction_gen["Instruction Generator"]
end
subgraph communication["Communication System"]
sse_stream["SSE Instruction Stream"]
agent_client["Agent Clients"]
end
subgraph microscope["Microscope Control"]
athena_api["Athena API"]
epu_control["EPU Software"]
end
ml_pipeline --> ext_queue
image_proc --> ext_queue
ctf_est --> ext_queue
particle_pick --> ext_queue
ext_queue --> consumer
consumer --> decision_logic
decision_logic --> threshold_eval
threshold_eval --> instruction_gen
instruction_gen --> sse_stream
sse_stream --> agent_client
agent_client --> athena_api
athena_api --> epu_control
Transformation Process:
Message Reception: External messages arrive via dedicated RabbitMQ queues with processing results
Data Extraction: Consumer extracts quality metrics, coordinates, and prediction values from message payloads
Decision Logic Application: Business rules evaluate data against configurable quality thresholds
Instruction Generation: Decision outcomes generate specific microscope control instructions
Traceability Injection: Instructions include metadata linking back to originating external messages
SSE Delivery: Generated instructions are queued for real-time delivery to appropriate agents
Quality Threshold Examples:
{
"gridsquare_quality_threshold": 0.7,
"foilhole_ice_thickness_max": 150.0,
"ctf_resolution_minimum": 4.0,
"motion_correction_drift_max": 2.0
}
Athena API Control Instructions#
The system generates specific instruction types that correspond to Athena API control capabilities for microscope workflow management. These instructions represent the actionable outputs of the decision-making process:
Primary Control Instruction Types:
athena.control.reorder_foilholes#
Reorders foilhole acquisition sequence based on ML predictions and quality assessments:
{
"instruction_id": "uuid-v4",
"instruction_type": "athena.control.reorder_foilholes",
"version": "1.0",
"timestamp": "2025-09-23T10:30:00Z",
"payload": {
"gridsquare_id": "GS_001_002",
"foilhole_order": [
{"foilhole_id": "FH_001", "priority_score": 0.95},
{"foilhole_id": "FH_003", "priority_score": 0.87},
{"foilhole_id": "FH_002", "priority_score": 0.72}
],
"reorder_reason": "ml_prediction_update"
},
"metadata": {
"session_id": "session-uuid",
"originating_message_id": "external-msg-uuid",
"decision_timestamp": "2025-09-23T10:29:45Z",
"quality_threshold": 0.7
}
}
athena.control.reorder_gridsquares#
Reorders gridsquare acquisition sequence based on overall quality assessments:
{
"instruction_id": "uuid-v4",
"instruction_type": "athena.control.reorder_gridsquares",
"version": "1.0",
"timestamp": "2025-09-23T10:35:00Z",
"payload": {
"gridsquare_order": [
{"gridsquare_id": "GS_001_003", "quality_score": 0.92},
{"gridsquare_id": "GS_001_001", "quality_score": 0.88},
{"gridsquare_id": "GS_001_002", "quality_score": 0.74}
],
"reorder_strategy": "quality_optimised"
},
"metadata": {
"session_id": "session-uuid",
"originating_message_id": "gridsquare-prediction-uuid",
"model_version": "v2.1.0",
"prediction_confidence": 0.89
}
}
athena.control.skip_gridsquares#
Skips gridsquares that fail to meet quality thresholds:
{
"instruction_id": "uuid-v4",
"instruction_type": "athena.control.skip_gridsquares",
"version": "1.0",
"timestamp": "2025-09-23T10:40:00Z",
"payload": {
"gridsquares_to_skip": [
{
"gridsquare_id": "GS_001_005",
"skip_reason": "quality_below_threshold",
"quality_score": 0.45
},
{
"gridsquare_id": "GS_001_007",
"skip_reason": "ice_contamination_detected",
"contamination_level": 0.85
}
],
"skip_strategy": "quality_based"
},
"metadata": {
"session_id": "session-uuid",
"originating_message_id": "quality-assessment-uuid",
"threshold_applied": 0.6,
"assessment_method": "ml_classification"
}
}
Complete Message Flow Diagram#
The following diagram illustrates the complete message flow from external systems through to microscope control:
sequenceDiagram
participant ExtSys as External Systems<br/>(ML Pipeline, Image Processing)
participant MQ as RabbitMQ<br/>(External Events)
participant Consumer as Backend Consumer<br/>(consumer.py)
participant DB as PostgreSQL<br/>(Instruction Storage)
participant SSE as SSE Stream<br/>(Real-time Delivery)
participant Agent as Agent Client<br/>(Windows Workstation)
participant Athena as Athena API<br/>(Microscope Control)
participant EPU as EPU Software<br/>(ThermoFisher)
Note over ExtSys: Data processing completes
ExtSys->>MQ: MOTION_CORRECTION_COMPLETE<br/>CTF_COMPLETE<br/>PARTICLE_PICKING_COMPLETE<br/>ML_PREDICTION
MQ->>Consumer: External event delivery
Note over Consumer: Message transformation pipeline
Consumer->>Consumer: Extract quality metrics<br/>Apply decision thresholds<br/>Generate control instructions
Consumer->>DB: Store instruction with traceability<br/>(originating_message_id)
Consumer->>MQ: Publish instruction event
MQ->>SSE: Instruction ready for delivery
SSE->>Agent: Stream instruction via SSE<br/>(athena.control.*)
Agent->>SSE: HTTP acknowledgement
SSE->>DB: Update delivery status
Note over Agent: Process microscope control instruction
Agent->>Athena: Execute control command<br/>(reorder_foilholes, skip_gridsquares)
Athena->>EPU: Apply workflow changes
EPU->>Agent: Execution confirmation
Agent->>SSE: Final execution status
SSE->>DB: Update instruction completion
Note over DB: Complete audit trail<br/>External message → Decision → Execution
Key Message Flow Characteristics:
Scientific Traceability: Every instruction maintains linkage to originating external messages for reproducibility
Real-time Processing: Sub-second transformation from external events to microscope control instructions
Quality-driven Decisions: Business logic applies configurable thresholds to scientific data
Comprehensive Audit Trail: Full tracking from data processing results through to microscope actions
Failure Recovery: Instruction replay capability maintains workflow continuity during system interruptions
Message Transformation Examples:
CTF_COMPLETE with resolution < 4.0Å → athena.control.skip_gridsquares (poor quality)
GRIDSQUARE_MODEL_PREDICTION with score > 0.8 → athena.control.reorder_gridsquares (prioritise high-quality)
FOILHOLE_MODEL_PREDICTION with updated rankings → athena.control.reorder_foilholes (optimise sequence)
This transformation pipeline ensures that scientific data processing results directly drive microscope control decisions with full traceability and audit capabilities for research reproducibility.
Extensibility Design#
JSON Message Vocabulary#
The system implements comprehensive JSON message vocabulary for microscope control instructions:
Core Message Structure:
{
"instruction_id": "uuid-v4",
"instruction_type": "athena.control.*",
"version": "1.0",
"timestamp": "2025-09-23T10:30:00Z",
"payload": {
// Instruction-specific data
},
"metadata": {
"session_id": "session-uuid",
"originating_message_id": "external-msg-uuid",
"decision_timestamp": "2025-09-23T10:29:45Z",
"quality_threshold": 0.7
}
}
Implemented Instruction Types:
athena.control.reorder_foilholes
: Reorder foilhole acquisition sequence based on ML predictionsathena.control.reorder_gridsquares
: Reorder gridsquare acquisition sequence based on quality assessmentathena.control.skip_gridsquares
: Skip gridsquares that fail quality thresholds
Enhanced Metadata for Scientific Traceability:
originating_message_id
: Links instruction back to external processing eventdecision_timestamp
: Records when decision logic was appliedquality_threshold
: Documents threshold values used in decision-makingmodel_version
: Tracks ML model version for reproducibilityprediction_confidence
: Records confidence level of ML predictions
Extensibility Features:
Version field for message schema evolution across instruction types
Flexible payload structure accommodating diverse microscope control requirements
Enhanced metadata section supporting scientific traceability and reproducibility
Type-safe instruction validation using Pydantic models for data integrity
Originating message linkage for complete audit trails from data processing to execution
Configurable quality thresholds enabling adaptive decision-making
Implemented ML Integration#
The architecture provides production machine learning and data processing integration:
graph TB
subgraph ml["ML Pipeline Integration (Implemented)"]
model["Prediction Models<br/>(Gridsquare/Foilhole)"]
pipeline["Processing Pipeline<br/>(Motion/CTF/Particles)"]
feedback["Execution Feedback<br/>(Future Enhancement)"]
end
subgraph comm["Communication System"]
consumer["Message Consumer<br/>(consumer.py)"]
decision["Decision Logic Engine"]
instructions["Instruction Generation"]
service["SSE Communication Service"]
end
subgraph agents["Agent Layer"]
agent["Agent Execution"]
athena["Athena API Integration"]
results["Execution Results"]
end
model --> pipeline
pipeline --> consumer
consumer --> decision
decision --> instructions
instructions --> service
service --> agent
agent --> athena
athena --> results
results -.->|Future| feedback
feedback -.->|Future| model
classDef implemented fill:#d4edda,stroke:#155724,stroke-width:2px
classDef future fill:#fff3cd,stroke:#856404,stroke-width:2px
class model,pipeline,consumer,decision,instructions,service,agent,athena,results implemented
class feedback future
Current ML Integration Features:
Real-time Processing Results: Integration with motion correction, CTF estimation, and particle picking pipelines
ML Model Predictions: Gridsquare and foilhole quality prediction integration
Quality-driven Decisions: Automated decision-making based on configurable quality thresholds
Scientific Traceability: Full audit trail from ML predictions to microscope actions
Future Enhancement Areas:
Execution Feedback Loop: Collection of execution results to improve ML model predictions
Adaptive Thresholds: Dynamic threshold adjustment based on session performance
Predictive Analytics: Advanced analytics for experiment outcome prediction
Traceability and Monitoring#
Message Tracking System#
The system provides full message tracking with comprehensive identification:
graph LR
subgraph tracking["Message Tracking"]
id["Instruction ID (UUID)"]
origin["Origin Timestamp"]
causation["Causation Chain"]
status["Delivery Status"]
end
subgraph audit["Audit Trail"]
created["Creation Event"]
queued["Queue Event"]
delivered["Delivery Event"]
acked["Acknowledgement Event"]
end
tracking --> audit
subgraph queries["Operational Queries"]
pending["Pending Instructions"]
failed["Failed Deliveries"]
performance["Performance Metrics"]
end
audit --> queries
Monitoring Architecture#
graph TB
subgraph metrics["Metrics Collection"]
conn["Connection Metrics"]
delivery["Delivery Metrics"]
performance["Performance Metrics"]
errors["Error Rates"]
end
subgraph monitoring["Monitoring Stack"]
prometheus["Prometheus"]
grafana["Grafana Dashboards"]
alerts["Alert Manager"]
end
subgraph logs["Logging"]
structured["Structured Logs"]
correlation["Correlation IDs"]
levels["Log Levels"]
end
metrics --> monitoring
logs --> monitoring
Operational Considerations#
SSE Connection Health Monitoring#
Health Check Mechanisms:
Periodic heartbeat messages via SSE stream
Connection timeout detection and cleanup
Automatic reconnection attempts with exponential backoff
Connection state synchronisation with database
Monitoring Metrics:
Active connection count per agent
Connection duration and stability
Reconnection frequency and success rates
Memory usage per connection
Debugging Message Delivery Issues#
Debugging Capabilities:
# Conceptual debugging interface
class DeliveryDebugger:
"""Tools for debugging message delivery issues"""
def trace_instruction_lifecycle(self, instruction_id: str) -> DeliveryTrace:
"""Trace complete instruction delivery lifecycle"""
def diagnose_connection_issues(self, agent_id: str) -> ConnectionDiagnosis:
"""Diagnose SSE connection problems"""
def analyse_delivery_patterns(self, agent_id: str, timeframe: timedelta) -> Analysis:
"""Analyse delivery success patterns for optimization"""
Troubleshooting Features:
Real-time delivery status dashboard
Instruction replay capability for testing
Connection diagnostics with detailed error reporting
Performance profiling for bottleneck identification
Resource Management#
Connection Resource Management:
Concurrent connection capacity (20 connections for facility requirements)
Connection memory usage monitoring and alerting
Automatic connection cleanup on agent disconnect
Resource pool management for database connections
Performance Optimization:
Connection keep-alive optimization for long-lived SSE streams
Message batching for high-frequency instruction bursts
Database query optimization for instruction retrieval
Memory-efficient JSON streaming for large instruction payloads
Critical Analysis and Risk Assessment#
Potential Bottlenecks#
Identified Bottlenecks:
Database Write Performance: High-frequency instruction persistence and state updates may impact database performance under load
Database Connection Contention: Concurrent access from multiple service instances may strain database connection pools
Memory Usage Growth: Long-lived SSE connections accumulate memory usage over time
Single Point of Failure: Communication service represents single point of failure for all agents
Mitigation Strategies:
Implement horizontal scaling with load balancing for connection distribution
Use database connection pooling and async operations for performance
Implement memory leak detection and connection lifecycle management
Deploy redundant service instances with automatic failover capabilities
Integration Complexities#
Architectural Challenges:
RabbitMQ-SSE Bridge: Complex event bridging between message queues and SSE streams
State Synchronisation: Maintaining consistency between RabbitMQ, PostgreSQL, and SSE connection state
Error Propagation: Ensuring error conditions propagate correctly across system boundaries
Testing Complexity: Integration testing across multiple protocols and failure scenarios
Recommended Approaches:
Implement comprehensive integration testing with realistic failure simulation
Use event sourcing patterns for consistent state management
Deploy canary deployments for safe production rollouts
Establish clear error handling contracts between components
Monitoring and Operational Gaps#
Identified Gaps:
End-to-End Tracing: Limited visibility into complete instruction delivery lifecycle
Capacity Planning: Insufficient metrics for predicting scaling requirements
Agent Health Correlation: Limited correlation between agent health and delivery success
Required Improvements:
Implement distributed tracing with correlation IDs across all components
Develop capacity planning dashboards with predictive analytics
Create agent health correlation dashboards for operational insights
This system design provides a robust foundation for real-time backend-to-agent communication whilst maintaining the flexibility for future enhancements and scaling requirements in the SmartEM Decisions scientific computing platform.