# Backend-to-Agent Communication System Design
This document provides comprehensive system design documentation for the backend-to-agent communication system
implementing ADR #8's hybrid SSE + HTTP architecture. This system enables real-time delivery of microscope control
instructions from Kubernetes-hosted backend services to Windows workstations controlling
cryo-electron microscopes.
## Architecture Overview
### System Context
The SmartEM Decisions platform operates in a distributed environment where backend services run in Kubernetes clusters
whilst agent services execute on Windows workstations directly connected to scientific equipment.
The communication system bridges this divide whilst meeting high-throughput requirements.
**Implementation Status**: **COMPLETED** - This POC implementation provides a production-ready backend-to-agent
communication system with full SSE streaming, RabbitMQ integration, database persistence, and comprehensive
connection management.
```mermaid
graph TB
subgraph k8s["Kubernetes Cluster - Diamond Light Source"]
subgraph packages["SmartEM Packages"]
subgraph backend["smartem_backend"]
api_server["api_server.py
FastAPI + SSE Endpoints"]
consumer["consumer.py
RabbitMQ Event Processing"]
conn_mgr["agent_connection_manager.py
Connection Health Monitoring"]
end
subgraph common["smartem_common"]
schemas["schemas.py
Shared Data Models"]
utils["utils.py
Common Utilities"]
end
subgraph athena["athena_api"]
athena_client["client.py
External API Integration"]
end
subgraph mcp["smartem_mcp"]
mcp_server["server.py
Model Context Protocol"]
end
end
subgraph infra["Infrastructure Services"]
db[("PostgreSQL
AgentSession, AgentInstruction
AgentConnection, AgentInstructionAcknowledgement")]
mq[("RabbitMQ
Event Communication
Instruction Lifecycle")]
end
end
subgraph boundary["Network Boundary - Windows Workstations"]
subgraph agents["Agent Software (Windows)"]
subgraph agent_pkg["smartem_agent"]
fs_watcher["fs_watcher.py
File System Monitoring"]
fs_parser["fs_parser.py
EPU Data Parsing"]
sse_client["SSEAgentClient
Stream Connection"]
end
end
subgraph equipment["Scientific Equipment"]
subgraph scope1["Microscope Workstation 1"]
epu1["EPU Software
(ThermoFisher)"]
em1["Cryo-EM Microscope 1"]
gpfs1["GPFS Storage
Image Data"]
end
subgraph scope2["Microscope Workstation 2"]
epu2["EPU Software
(ThermoFisher)"]
em2["Cryo-EM Microscope 2"]
gpfs2["GPFS Storage
Image Data"]
end
subgraph scopeN["Microscope Workstation N"]
epuN["EPU Software
(ThermoFisher)"]
emN["Cryo-EM Microscope N"]
gpfsN["GPFS Storage
Image Data"]
end
end
end
%% Database and message queue connections
api_server --> db
api_server --> mq
consumer --> db
consumer --> mq
conn_mgr --> db
conn_mgr --> mq
%% SSE streaming connections
api_server -.->|"SSE: /agent/{id}/session/{sid}/instructions/stream"| sse_client
%% HTTP acknowledgement connections
sse_client -.->|"HTTP: /agent/{id}/session/{sid}/instructions/{iid}/ack"| api_server
%% File system monitoring
fs_watcher --> fs_parser
fs_parser --> sse_client
%% Equipment integration
epu1 --> gpfs1
epu2 --> gpfs2
epuN --> gpfsN
fs_watcher -.->|"Monitor EPU Output"| gpfs1
fs_watcher -.->|"Monitor EPU Output"| gpfs2
fs_watcher -.->|"Monitor EPU Output"| gpfsN
%% External API integration
consumer --> athena_client
%% Package dependencies
backend -.-> common
agent_pkg -.-> common
fs_parser -.-> schemas
%% Styling
classDef k8s fill:#e1f5fe,stroke:#0277bd,stroke-width:2px
classDef boundary fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
classDef database fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef equipment fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef communication fill:#fff8e1,stroke:#f57f17,stroke-width:2px
class k8s,packages,backend,common,athena,mcp,infra k8s
class boundary,agents,agent_pkg,equipment boundary
class db,mq database
class scope1,scope2,scopeN,epu1,epu2,epuN,em1,em2,emN,gpfs1,gpfs2,gpfsN equipment
class api_server,sse_client,fs_watcher communication
```
### Service Architecture
The communication system employs a **separate service approach** rather than integrating directly with the main API
service. This design provides:
- **Isolation of concerns**: Communication logic remains separate from core business logic
- **Scalability independence**: Communication service can scale independently based on connection load
- **Operational simplicity**: Monitoring and debugging of persistent connections without affecting main API
- **Resource management**: Dedicated resources for managing long-lived SSE connections
```mermaid
graph LR
subgraph services["Service Layer"]
main["Main API Service"]
comm["Communication Service"]
end
subgraph data["Data Layer"]
db[("PostgreSQL")]
mq[("RabbitMQ")]
end
subgraph agents["Agent Layer"]
agent["Agent Clients"]
end
main --> db
main --> mq
comm --> db
comm --> mq
mq --> comm
comm <--> agents
main -.->|Events| mq
mq -.->|Instructions| comm
```
## Technical Stack Integration
### FastAPI Integration
The communication service leverages FastAPI's native SSE support through `StreamingResponse` and event streaming
patterns:
```python
# Conceptual endpoint structure
@app.get("/agent/{agent_id}/instructions/stream")
async def stream_instructions(agent_id: str):
"""SSE endpoint for streaming instructions to agents"""
@app.post("/agent/{agent_id}/instructions/{instruction_id}/ack")
async def acknowledge_instruction(agent_id: str, instruction_id: str):
"""HTTP endpoint for instruction acknowledgements"""
```
### RabbitMQ Message Flow
The system integrates with the existing RabbitMQ infrastructure as an event communication backbone between ML components and the communication service:
```mermaid
sequenceDiagram
participant ML as ML Pipeline
participant MQ as RabbitMQ
participant Comm as Communication Service
participant Agent as Agent Client
participant DB as PostgreSQL
ML->>MQ: Publish instruction event
MQ->>Comm: Deliver to agent queue
Comm->>DB: Store instruction state
Comm->>Agent: Stream via SSE
Agent->>Comm: HTTP acknowledgement
Comm->>DB: Update delivery status
Comm->>MQ: Publish ACK event
```
### PostgreSQL Schema Design
The communication system extends the existing database schema with instruction tracking tables:
```sql
-- Conceptual schema structure
CREATE TABLE agent_instructions (
id UUID PRIMARY KEY,
agent_id VARCHAR NOT NULL,
instruction_type VARCHAR NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
delivered_at TIMESTAMP WITH TIME ZONE,
acknowledged_at TIMESTAMP WITH TIME ZONE,
status VARCHAR NOT NULL DEFAULT 'pending',
retry_count INTEGER DEFAULT 0
);
CREATE TABLE agent_connections (
agent_id VARCHAR PRIMARY KEY,
connection_id VARCHAR NOT NULL,
connected_at TIMESTAMP WITH TIME ZONE NOT NULL,
last_heartbeat TIMESTAMP WITH TIME ZONE,
connection_type VARCHAR NOT NULL -- 'sse'
);
```
## Component Interactions and Data Flows
### Primary Communication Flow (SSE)
The primary communication path uses Server-Sent Events for efficient real-time instruction delivery:
```mermaid
sequenceDiagram
participant Agent as Agent Client
participant Comm as Communication Service
participant MQ as RabbitMQ
participant DB as PostgreSQL
Agent->>Comm: Establish SSE connection
Comm->>DB: Register connection
loop Instruction Processing
MQ->>Comm: New instruction event
Comm->>DB: Store instruction
Comm->>Agent: Stream instruction (SSE)
Agent->>Comm: HTTP acknowledgement
Comm->>DB: Update delivery status
end
Agent->>Comm: Connection closed
Comm->>DB: Clean up connection state
```
### Error Handling and Recovery
The system implements comprehensive error handling across multiple failure scenarios:
```mermaid
graph TD
start([Instruction Generated]) --> sse{SSE Connected?}
sse -->|Yes| stream[Stream via SSE]
sse -->|No| queue[Queue for Next Connection]
stream --> ack{Acknowledgement Received?}
ack -->|Yes| complete[Mark Complete]
ack -->|No| retry{Retry Count < Max?}
retry -->|Yes| delay[Exponential Backoff]
retry -->|No| failed[Mark Failed]
delay --> reconnect[Reconnect SSE]
reconnect --> stream
queue --> reconnect
failed --> end([End])
complete --> end([End])
```
## Scalability Design
### Connection Management
The system is designed to support **one session per agent machine** with a capacity of **20 concurrent SSE
connections**. This design aligns with the facility's requirements of up to 20 microscope workstations, where each workstation controls
a single microscope.
```mermaid
graph TB
subgraph comm["Communication Service Instance"]
pool["Connection Pool"]
mgr["Connection Manager"]
health["Health Monitor"]
end
subgraph agents["Agent Connections"]
agent1["Agent 1 (SSE)"]
agent2["Agent 2 (SSE)"]
agent3["Agent 3 (SSE)"]
agentN["Agent N (SSE)"]
end
pool --> agent1
pool --> agent2
pool --> agent3
pool --> agentN
mgr --> pool
health --> pool
pool -.->|Max 20 concurrent| limit[Connection Limit]
```
### Theoretical Scaling Limits
**Current Architecture Bottlenecks:**
1. **Database Write Performance**: High-frequency instruction persistence and state updates may impact database performance
2. **Database Connection Pool**: Connection pool limits for concurrent instruction storage and retrieval operations
3. **Memory Usage**: Each SSE connection maintains in-memory state (~1-2MB per connection)
4. **RabbitMQ Throughput**: Event notification capacity for real-time updates
**Scaling Strategies:**
- **Horizontal Scaling**: Deploy multiple communication service instances behind load balancer
- **Connection Sharding**: Distribute agents across service instances by agent ID hash
- **Resource Optimization**: Implement connection pooling and memory-efficient streaming
- **Database Optimization**: Use connection pooling and read replicas for instruction queries
### Performance Characteristics
**Expected Throughput:**
- **Instruction Frequency**: 1 instruction per 30-120 seconds per agent during active data collection
- **Peak Load**: 20 agents × 2 instructions/minute = 40 instructions/minute system-wide (at maximum frequency)
- **Message Size**: 1-10KB JSON payloads for microscope control instructions
- **Latency Requirements**: Sub-second delivery for real-time workflow efficiency
## Implementation Specifications
### SSE Streaming Service Design
The SSE streaming service implements persistent connections with automatic reconnection handling:
```python
# Conceptual implementation structure
class SSEInstructionStream:
"""Manages SSE connections and instruction streaming"""
async def stream_instructions(self, agent_id: str) -> AsyncIterator[str]:
"""Async generator for SSE instruction stream"""
async def handle_connection_lifecycle(self, agent_id: str):
"""Manages connection establishment, maintenance, and cleanup"""
async def process_pending_instructions(self, agent_id: str):
"""Retrieves and processes pending instructions from database for SSE delivery"""
class ConnectionManager:
"""Manages active SSE connections and health monitoring"""
def register_connection(self, agent_id: str, connection_id: str):
"""Register new SSE connection"""
def cleanup_connection(self, agent_id: str):
"""Clean up disconnected SSE connection"""
async def health_check_connections(self):
"""Monitor connection health and handle failures"""
```
### HTTP Acknowledgement Endpoints
HTTP acknowledgement endpoints provide reliable delivery confirmation:
```python
# Conceptual acknowledgement handling
@dataclass
class InstructionAcknowledgement:
instruction_id: str
agent_id: str
acknowledged_at: datetime
status: Literal["received", "processed", "failed"]
error_message: str | None = None
class AcknowledgementHandler:
"""Handles instruction acknowledgements and delivery tracking"""
async def process_acknowledgement(
self,
ack: InstructionAcknowledgement
) -> bool:
"""Process and store instruction acknowledgement"""
async def handle_declined_instruction(
self,
agent_id: str,
instruction_id: str,
reason: str
):
"""Handle agent declining instruction execution"""
```
### SSE Retry Implementation
The system implements robust SSE reconnection with exponential backoff:
```python
class SSERetryManager:
"""Manages SSE connection retry logic with exponential backoff"""
def should_retry(self, agent_id: str, attempt_count: int) -> bool:
"""Determine if SSE connection should be retried"""
def calculate_backoff_delay(self, attempt_count: int) -> int:
"""Calculate exponential backoff delay for reconnection"""
async def reconnect_with_backoff(self, agent_id: str) -> bool:
"""Attempt SSE reconnection with backoff delay"""
```
## Implementation Status & Components
This POC implementation provides a complete working system with the following implemented components:
### Completed Features
#### 1. Database Schema & Migration (Alembic)
- **AgentSession**: Session management for agent connections
- **AgentInstruction**: Instruction storage with metadata and lifecycle tracking
- **AgentConnection**: Real-time connection tracking with heartbeat monitoring
- **AgentInstructionAcknowledgement**: Comprehensive acknowledgement tracking
#### 2. FastAPI SSE Endpoints
- **`/agent/{agent_id}/session/{session_id}/instructions/stream`**: Real-time SSE streaming
- **`/agent/{agent_id}/session/{session_id}/instructions/{instruction_id}/ack`**: HTTP acknowledgement
- **Debug endpoints**: Connection statistics and session management
#### 3. RabbitMQ Integration
- **Event Publishers**: Agent instruction lifecycle events
- **Consumer Handlers**: Process instruction events and database updates
- **Message Types**: `agent.instruction.created`, `agent.instruction.updated`, `agent.instruction.expired`
#### 4. Enhanced Agent Client (`SSEAgentClient`)
- **Exponential backoff retry logic** with jitter
- **Connection statistics and monitoring**
- **Comprehensive error handling and recovery**
- **Processing time tracking for performance metrics**
#### 5. Connection Management Service (`AgentConnectionManager`)
- **Automatic stale connection cleanup** (2-minute timeout)
- **Instruction expiration handling** with retry logic
- **Session activity monitoring** (1-hour inactivity threshold)
- **Real-time statistics and health monitoring**
#### 6. Production-Ready Example Client
- **Complete instruction processing workflow**
- **Multiple instruction type support** (stage movement, image acquisition)
- **Processing time measurement and acknowledgement**
- **Enhanced error handling and statistics display**
### 🚀 Key Implementation Highlights
- **Database-backed persistence**: All instruction state persisted with full audit trail
- **Connection resilience**: Automatic reconnection with exponential backoff
- **Health monitoring**: Background tasks for cleanup and monitoring
- **Production logging**: Comprehensive logging at all system levels
- **Type safety**: Full Pydantic model validation throughout
- **Test-friendly design**: Debug endpoints for system verification
## Message Lifecycle Management
### Sequential Delivery Requirements
The system ensures **sequential message delivery** to maintain microscope control instruction ordering:
```mermaid
stateDiagram-v2
[*] --> Generated: Instruction created
Generated --> Queued: Added to agent queue
Queued --> Streaming: SSE connection available
Queued --> Retry: Connection unavailable
Streaming --> Delivered: Agent receives via SSE
Retry --> Queued: After backoff delay
Delivered --> Acknowledged: Agent confirms receipt
Delivered --> Declined: Agent declines execution
Delivered --> Timeout: No acknowledgement received
Acknowledged --> [*]: Complete
Declined --> [*]: Logged and complete
Timeout --> Retry: Attempt redelivery
Retry --> Queued: Requeue instruction
Retry --> Failed: Max retries exceeded
Failed --> [*]: Mark as failed
```
### Database Persistence Strategy
The system uses **PostgreSQL as source of truth** with RabbitMQ as the event communication backbone:
```mermaid
graph LR
subgraph truth["Source of Truth"]
db[("PostgreSQL")]
end
subgraph events["Event Communication"]
mq[("RabbitMQ")]
end
subgraph ops["Operational Queries"]
queries["Instruction State
Connection Health
Performance Metrics
Audit Trails"]
end
db --> mq
db --> queries
db -.->|Primary| primary[Instruction Persistence
State Management
Audit Logging]
mq -.->|Secondary| secondary[Event Notification
Component Communication
Real-time Updates]
```
### Agent Restart Message Replay (TODO)
**Current Status**: Not implemented - marked as future requirement
**Design Considerations**:
- Determine replay window (e.g., last 24 hours of unacknowledged instructions)
- Handle duplicate instruction detection and prevention
- Manage instruction sequence numbering across agent restarts
- Implement replay request mechanism from agent on startup
## External Message Types and Transformation Pipeline
### External Message Types
The system processes external RabbitMQ messages from data processing pipelines and machine learning components
to trigger real-time decision-making and microscope control instructions. These external messages represent
completion events from various stages of the cryo-EM data processing workflow:
**Primary External Message Types:**
- **MOTION_CORRECTION_COMPLETE**: Indicates completion of motion correction processing for collected images
- **CTF_COMPLETE**: Signals completion of contrast transfer function (CTF) estimation for image quality assessment
- **PARTICLE_PICKING_COMPLETE**: Notifies completion of automated particle identification in micrographs
- **PARTICLE_SELECTION_COMPLETE**: Indicates completion of particle quality assessment and selection
- **GRIDSQUARE_MODEL_PREDICTION**: Provides machine learning predictions for gridsquare quality and suitability
- **FOILHOLE_MODEL_PREDICTION**: Delivers ML predictions for individual foilhole targeting recommendations
- **MODEL_PARAMETER_UPDATE**: Communicates updates to ML model parameters affecting decision thresholds
These external messages contain scientific data processing results, quality metrics, and ML predictions that drive
the backend's decision logic for microscope control instructions.
### Message Transformation Pipeline
The transformation pipeline converts external data processing events into actionable SSE instructions for agents.
This pipeline operates within the `consumer.py` component and implements the core business logic for scientific
decision-making:
```mermaid
graph TB
subgraph external["External Systems"]
ml_pipeline["ML Pipeline"]
image_proc["Image Processing"]
ctf_est["CTF Estimation"]
particle_pick["Particle Picking"]
end
subgraph rabbitmq["Message Queue"]
ext_queue["External Event Queue"]
end
subgraph backend["Backend Processing"]
consumer["consumer.py"]
decision_logic["Decision Logic Engine"]
threshold_eval["Threshold Evaluation"]
instruction_gen["Instruction Generator"]
end
subgraph communication["Communication System"]
sse_stream["SSE Instruction Stream"]
agent_client["Agent Clients"]
end
subgraph microscope["Microscope Control"]
athena_api["Athena API"]
epu_control["EPU Software"]
end
ml_pipeline --> ext_queue
image_proc --> ext_queue
ctf_est --> ext_queue
particle_pick --> ext_queue
ext_queue --> consumer
consumer --> decision_logic
decision_logic --> threshold_eval
threshold_eval --> instruction_gen
instruction_gen --> sse_stream
sse_stream --> agent_client
agent_client --> athena_api
athena_api --> epu_control
```
**Transformation Process:**
1. **Message Reception**: External messages arrive via dedicated RabbitMQ queues with processing results
2. **Data Extraction**: Consumer extracts quality metrics, coordinates, and prediction values from message payloads
3. **Decision Logic Application**: Business rules evaluate data against configurable quality thresholds
4. **Instruction Generation**: Decision outcomes generate specific microscope control instructions
5. **Traceability Injection**: Instructions include metadata linking back to originating external messages
6. **SSE Delivery**: Generated instructions are queued for real-time delivery to appropriate agents
**Quality Threshold Examples:**
```json
{
"gridsquare_quality_threshold": 0.7,
"foilhole_ice_thickness_max": 150.0,
"ctf_resolution_minimum": 4.0,
"motion_correction_drift_max": 2.0
}
```
### Athena API Control Instructions
The system generates specific instruction types that correspond to Athena API control capabilities for microscope
workflow management. These instructions represent the actionable outputs of the decision-making process:
**Primary Control Instruction Types:**
#### athena.control.reorder_foilholes
Reorders foilhole acquisition sequence based on ML predictions and quality assessments:
```json
{
"instruction_id": "uuid-v4",
"instruction_type": "athena.control.reorder_foilholes",
"version": "1.0",
"timestamp": "2025-09-23T10:30:00Z",
"payload": {
"gridsquare_id": "GS_001_002",
"foilhole_order": [
{"foilhole_id": "FH_001", "priority_score": 0.95},
{"foilhole_id": "FH_003", "priority_score": 0.87},
{"foilhole_id": "FH_002", "priority_score": 0.72}
],
"reorder_reason": "ml_prediction_update"
},
"metadata": {
"session_id": "session-uuid",
"originating_message_id": "external-msg-uuid",
"decision_timestamp": "2025-09-23T10:29:45Z",
"quality_threshold": 0.7
}
}
```
#### athena.control.reorder_gridsquares
Reorders gridsquare acquisition sequence based on overall quality assessments:
```json
{
"instruction_id": "uuid-v4",
"instruction_type": "athena.control.reorder_gridsquares",
"version": "1.0",
"timestamp": "2025-09-23T10:35:00Z",
"payload": {
"gridsquare_order": [
{"gridsquare_id": "GS_001_003", "quality_score": 0.92},
{"gridsquare_id": "GS_001_001", "quality_score": 0.88},
{"gridsquare_id": "GS_001_002", "quality_score": 0.74}
],
"reorder_strategy": "quality_optimised"
},
"metadata": {
"session_id": "session-uuid",
"originating_message_id": "gridsquare-prediction-uuid",
"model_version": "v2.1.0",
"prediction_confidence": 0.89
}
}
```
#### athena.control.skip_gridsquares
Skips gridsquares that fail to meet quality thresholds:
```json
{
"instruction_id": "uuid-v4",
"instruction_type": "athena.control.skip_gridsquares",
"version": "1.0",
"timestamp": "2025-09-23T10:40:00Z",
"payload": {
"gridsquares_to_skip": [
{
"gridsquare_id": "GS_001_005",
"skip_reason": "quality_below_threshold",
"quality_score": 0.45
},
{
"gridsquare_id": "GS_001_007",
"skip_reason": "ice_contamination_detected",
"contamination_level": 0.85
}
],
"skip_strategy": "quality_based"
},
"metadata": {
"session_id": "session-uuid",
"originating_message_id": "quality-assessment-uuid",
"threshold_applied": 0.6,
"assessment_method": "ml_classification"
}
}
```
### Complete Message Flow Diagram
The following diagram illustrates the complete message flow from external systems through to microscope control:
```mermaid
sequenceDiagram
participant ExtSys as External Systems
(ML Pipeline, Image Processing)
participant MQ as RabbitMQ
(External Events)
participant Consumer as Backend Consumer
(consumer.py)
participant DB as PostgreSQL
(Instruction Storage)
participant SSE as SSE Stream
(Real-time Delivery)
participant Agent as Agent Client
(Windows Workstation)
participant Athena as Athena API
(Microscope Control)
participant EPU as EPU Software
(ThermoFisher)
Note over ExtSys: Data processing completes
ExtSys->>MQ: MOTION_CORRECTION_COMPLETE
CTF_COMPLETE
PARTICLE_PICKING_COMPLETE
ML_PREDICTION
MQ->>Consumer: External event delivery
Note over Consumer: Message transformation pipeline
Consumer->>Consumer: Extract quality metrics
Apply decision thresholds
Generate control instructions
Consumer->>DB: Store instruction with traceability
(originating_message_id)
Consumer->>MQ: Publish instruction event
MQ->>SSE: Instruction ready for delivery
SSE->>Agent: Stream instruction via SSE
(athena.control.*)
Agent->>SSE: HTTP acknowledgement
SSE->>DB: Update delivery status
Note over Agent: Process microscope control instruction
Agent->>Athena: Execute control command
(reorder_foilholes, skip_gridsquares)
Athena->>EPU: Apply workflow changes
EPU->>Agent: Execution confirmation
Agent->>SSE: Final execution status
SSE->>DB: Update instruction completion
Note over DB: Complete audit trail
External message → Decision → Execution
```
**Key Message Flow Characteristics:**
1. **Scientific Traceability**: Every instruction maintains linkage to originating external messages for reproducibility
2. **Real-time Processing**: Sub-second transformation from external events to microscope control instructions
3. **Quality-driven Decisions**: Business logic applies configurable thresholds to scientific data
4. **Comprehensive Audit Trail**: Full tracking from data processing results through to microscope actions
5. **Failure Recovery**: Instruction replay capability maintains workflow continuity during system interruptions
**Message Transformation Examples:**
- **CTF_COMPLETE** with resolution < 4.0Å → **athena.control.skip_gridsquares** (poor quality)
- **GRIDSQUARE_MODEL_PREDICTION** with score > 0.8 → **athena.control.reorder_gridsquares** (prioritise high-quality)
- **FOILHOLE_MODEL_PREDICTION** with updated rankings → **athena.control.reorder_foilholes** (optimise sequence)
This transformation pipeline ensures that scientific data processing results directly drive microscope control decisions
with full traceability and audit capabilities for research reproducibility.
## Extensibility Design
### JSON Message Vocabulary
The system implements **comprehensive JSON message vocabulary** for microscope control instructions:
**Core Message Structure:**
```json
{
"instruction_id": "uuid-v4",
"instruction_type": "athena.control.*",
"version": "1.0",
"timestamp": "2025-09-23T10:30:00Z",
"payload": {
// Instruction-specific data
},
"metadata": {
"session_id": "session-uuid",
"originating_message_id": "external-msg-uuid",
"decision_timestamp": "2025-09-23T10:29:45Z",
"quality_threshold": 0.7
}
}
```
**Implemented Instruction Types:**
- `athena.control.reorder_foilholes`: Reorder foilhole acquisition sequence based on ML predictions
- `athena.control.reorder_gridsquares`: Reorder gridsquare acquisition sequence based on quality assessment
- `athena.control.skip_gridsquares`: Skip gridsquares that fail quality thresholds
**Enhanced Metadata for Scientific Traceability:**
- `originating_message_id`: Links instruction back to external processing event
- `decision_timestamp`: Records when decision logic was applied
- `quality_threshold`: Documents threshold values used in decision-making
- `model_version`: Tracks ML model version for reproducibility
- `prediction_confidence`: Records confidence level of ML predictions
**Extensibility Features**:
- Version field for message schema evolution across instruction types
- Flexible payload structure accommodating diverse microscope control requirements
- Enhanced metadata section supporting scientific traceability and reproducibility
- Type-safe instruction validation using Pydantic models for data integrity
- Originating message linkage for complete audit trails from data processing to execution
- Configurable quality thresholds enabling adaptive decision-making
### Implemented ML Integration
The architecture provides **production machine learning and data processing integration**:
```mermaid
graph TB
subgraph ml["ML Pipeline Integration (Implemented)"]
model["Prediction Models
(Gridsquare/Foilhole)"]
pipeline["Processing Pipeline
(Motion/CTF/Particles)"]
feedback["Execution Feedback
(Future Enhancement)"]
end
subgraph comm["Communication System"]
consumer["Message Consumer
(consumer.py)"]
decision["Decision Logic Engine"]
instructions["Instruction Generation"]
service["SSE Communication Service"]
end
subgraph agents["Agent Layer"]
agent["Agent Execution"]
athena["Athena API Integration"]
results["Execution Results"]
end
model --> pipeline
pipeline --> consumer
consumer --> decision
decision --> instructions
instructions --> service
service --> agent
agent --> athena
athena --> results
results -.->|Future| feedback
feedback -.->|Future| model
classDef implemented fill:#d4edda,stroke:#155724,stroke-width:2px
classDef future fill:#fff3cd,stroke:#856404,stroke-width:2px
class model,pipeline,consumer,decision,instructions,service,agent,athena,results implemented
class feedback future
```
**Current ML Integration Features:**
- **Real-time Processing Results**: Integration with motion correction, CTF estimation, and particle picking pipelines
- **ML Model Predictions**: Gridsquare and foilhole quality prediction integration
- **Quality-driven Decisions**: Automated decision-making based on configurable quality thresholds
- **Scientific Traceability**: Full audit trail from ML predictions to microscope actions
**Future Enhancement Areas:**
- **Execution Feedback Loop**: Collection of execution results to improve ML model predictions
- **Adaptive Thresholds**: Dynamic threshold adjustment based on session performance
- **Predictive Analytics**: Advanced analytics for experiment outcome prediction
## Traceability and Monitoring
### Message Tracking System
The system provides **full message tracking** with comprehensive identification:
```mermaid
graph LR
subgraph tracking["Message Tracking"]
id["Instruction ID (UUID)"]
origin["Origin Timestamp"]
causation["Causation Chain"]
status["Delivery Status"]
end
subgraph audit["Audit Trail"]
created["Creation Event"]
queued["Queue Event"]
delivered["Delivery Event"]
acked["Acknowledgement Event"]
end
tracking --> audit
subgraph queries["Operational Queries"]
pending["Pending Instructions"]
failed["Failed Deliveries"]
performance["Performance Metrics"]
end
audit --> queries
```
### Monitoring Architecture
```mermaid
graph TB
subgraph metrics["Metrics Collection"]
conn["Connection Metrics"]
delivery["Delivery Metrics"]
performance["Performance Metrics"]
errors["Error Rates"]
end
subgraph monitoring["Monitoring Stack"]
prometheus["Prometheus"]
grafana["Grafana Dashboards"]
alerts["Alert Manager"]
end
subgraph logs["Logging"]
structured["Structured Logs"]
correlation["Correlation IDs"]
levels["Log Levels"]
end
metrics --> monitoring
logs --> monitoring
```
## Operational Considerations
### SSE Connection Health Monitoring
**Health Check Mechanisms**:
- Periodic heartbeat messages via SSE stream
- Connection timeout detection and cleanup
- Automatic reconnection attempts with exponential backoff
- Connection state synchronisation with database
**Monitoring Metrics**:
- Active connection count per agent
- Connection duration and stability
- Reconnection frequency and success rates
- Memory usage per connection
### Debugging Message Delivery Issues
**Debugging Capabilities**:
```python
# Conceptual debugging interface
class DeliveryDebugger:
"""Tools for debugging message delivery issues"""
def trace_instruction_lifecycle(self, instruction_id: str) -> DeliveryTrace:
"""Trace complete instruction delivery lifecycle"""
def diagnose_connection_issues(self, agent_id: str) -> ConnectionDiagnosis:
"""Diagnose SSE connection problems"""
def analyse_delivery_patterns(self, agent_id: str, timeframe: timedelta) -> Analysis:
"""Analyse delivery success patterns for optimization"""
```
**Troubleshooting Features**:
- Real-time delivery status dashboard
- Instruction replay capability for testing
- Connection diagnostics with detailed error reporting
- Performance profiling for bottleneck identification
### Resource Management
**Connection Resource Management**:
- Concurrent connection capacity (20 connections for facility requirements)
- Connection memory usage monitoring and alerting
- Automatic connection cleanup on agent disconnect
- Resource pool management for database connections
**Performance Optimization**:
- Connection keep-alive optimization for long-lived SSE streams
- Message batching for high-frequency instruction bursts
- Database query optimization for instruction retrieval
- Memory-efficient JSON streaming for large instruction payloads
## Critical Analysis and Risk Assessment
### Potential Bottlenecks
**Identified Bottlenecks**:
1. **Database Write Performance**: High-frequency instruction persistence and state updates may impact database performance under load
2. **Database Connection Contention**: Concurrent access from multiple service instances may strain database connection pools
3. **Memory Usage Growth**: Long-lived SSE connections accumulate memory usage over time
4. **Single Point of Failure**: Communication service represents single point of failure for all agents
**Mitigation Strategies**:
- Implement horizontal scaling with load balancing for connection distribution
- Use database connection pooling and async operations for performance
- Implement memory leak detection and connection lifecycle management
- Deploy redundant service instances with automatic failover capabilities
### Integration Complexities
**Architectural Challenges**:
1. **RabbitMQ-SSE Bridge**: Complex event bridging between message queues and SSE streams
2. **State Synchronisation**: Maintaining consistency between RabbitMQ, PostgreSQL, and SSE connection state
3. **Error Propagation**: Ensuring error conditions propagate correctly across system boundaries
4. **Testing Complexity**: Integration testing across multiple protocols and failure scenarios
**Recommended Approaches**:
- Implement comprehensive integration testing with realistic failure simulation
- Use event sourcing patterns for consistent state management
- Deploy canary deployments for safe production rollouts
- Establish clear error handling contracts between components
### Monitoring and Operational Gaps
**Identified Gaps**:
1. **End-to-End Tracing**: Limited visibility into complete instruction delivery lifecycle
2. **Capacity Planning**: Insufficient metrics for predicting scaling requirements
3. **Agent Health Correlation**: Limited correlation between agent health and delivery success
**Required Improvements**:
- Implement distributed tracing with correlation IDs across all components
- Develop capacity planning dashboards with predictive analytics
- Create agent health correlation dashboards for operational insights
This system design provides a robust foundation for real-time backend-to-agent communication whilst maintaining the
flexibility for future enhancements and scaling requirements in the SmartEM Decisions scientific computing platform.