Source code for smartem_backend.agent_data_cleanup

"""
Data retention and cleanup utilities for agent communication system.

Provides functions to manage data lifecycle for scientific data compliance:
- Cleanup old agent connections and stale data
- Archive completed sessions with configurable retention policies
- Maintain audit trails while managing storage growth
- Support regulatory compliance for scientific research data
"""

import logging
from datetime import datetime, timedelta

from sqlalchemy import and_, text
from sqlmodel import Session, select

from smartem_backend.model.database import (
    AgentConnection,
    AgentInstruction,
    AgentInstructionAcknowledgement,
    AgentSession,
)
from smartem_backend.utils import setup_postgres_connection

logger = logging.getLogger(__name__)


[docs] class AgentDataRetentionPolicy: """Configuration for agent data retention policies.""" def __init__( self, connection_cleanup_hours: int = 24, instruction_retention_days: int = 30, completed_session_retention_days: int = 90, acknowledgement_retention_days: int = 365, batch_size: int = 1000, ): """ Initialize retention policy configuration. Args: connection_cleanup_hours: Hours to keep stale connections before cleanup instruction_retention_days: Days to retain instruction history completed_session_retention_days: Days to retain completed session data acknowledgement_retention_days: Days to retain acknowledgement audit trail batch_size: Number of records to process in each cleanup batch """ self.connection_cleanup_hours = connection_cleanup_hours self.instruction_retention_days = instruction_retention_days self.completed_session_retention_days = completed_session_retention_days self.acknowledgement_retention_days = acknowledgement_retention_days self.batch_size = batch_size
[docs] @classmethod def scientific_compliance(cls) -> "AgentDataRetentionPolicy": """ Return a conservative retention policy suitable for scientific research compliance. Scientific research often requires longer retention periods for reproducibility and audit requirements. """ return cls( connection_cleanup_hours=48, instruction_retention_days=365, completed_session_retention_days=730, # 2 years acknowledgement_retention_days=2555, # 7 years (common compliance requirement) batch_size=500, )
[docs] @classmethod def development(cls) -> "AgentDataRetentionPolicy": """Return a shorter retention policy suitable for development environments.""" return cls( connection_cleanup_hours=4, instruction_retention_days=7, completed_session_retention_days=14, acknowledgement_retention_days=30, batch_size=1000, )
[docs] class AgentDataCleanupService: """Service for managing agent communication data lifecycle and cleanup.""" def __init__(self, session: Session, policy: AgentDataRetentionPolicy | None = None): """ Initialize cleanup service. Args: session: Database session for cleanup operations policy: Retention policy configuration (defaults to scientific compliance) """ self.session = session self.policy = policy or AgentDataRetentionPolicy.scientific_compliance()
[docs] def cleanup_stale_connections(self) -> dict[str, int]: """ Clean up stale agent connections that haven't had heartbeats recently. Returns: Dictionary with cleanup statistics """ cutoff_time = datetime.now() - timedelta(hours=self.policy.connection_cleanup_hours) # Find stale connections stale_query = select(AgentConnection).where( and_( AgentConnection.status == "active", AgentConnection.last_heartbeat_at < cutoff_time, ) ) stale_connections = self.session.exec(stale_query).all() cleaned_count = 0 for conn in stale_connections: # Mark as closed rather than deleting for audit trail conn.status = "timeout" conn.closed_at = datetime.now() conn.close_reason = f"Stale connection cleanup after {self.policy.connection_cleanup_hours}h" cleaned_count += 1 if cleaned_count % self.policy.batch_size == 0: self.session.commit() logger.info(f"Marked {cleaned_count} stale connections as closed") self.session.commit() logger.info(f"Cleanup completed: {cleaned_count} stale connections marked as closed") return { "stale_connections_closed": cleaned_count, "cutoff_time": cutoff_time.isoformat(), }
[docs] def cleanup_old_instructions(self) -> dict[str, int]: """ Clean up old instruction records beyond retention period. Maintains acknowledgement audit trail while removing instruction payload data. """ cutoff_time = datetime.now() - timedelta(days=self.policy.instruction_retention_days) # Find old instructions that can be archived old_instructions_query = select(AgentInstruction).where( and_( AgentInstruction.created_at < cutoff_time, AgentInstruction.status.in_(["completed", "failed", "expired"]), ) ) old_instructions = self.session.exec(old_instructions_query).all() archived_count = 0 deleted_count = 0 for instruction in old_instructions: # Check if this instruction has acknowledgements (preserve audit trail) ack_query = select(AgentInstructionAcknowledgement).where( AgentInstructionAcknowledgement.instruction_id == instruction.instruction_id ) has_acknowledgements = self.session.exec(ack_query).first() is not None if has_acknowledgements: # Archive: clear sensitive payload but keep metadata for audit trail instruction.payload = {"archived": True, "archived_at": datetime.now().isoformat()} instruction.metadata = { **(instruction.metadata or {}), "archived": True, "original_payload_size": len(str(instruction.payload)), } archived_count += 1 else: # Safe to delete instructions without acknowledgements self.session.delete(instruction) deleted_count += 1 if (archived_count + deleted_count) % self.policy.batch_size == 0: self.session.commit() logger.info(f"Processed {archived_count + deleted_count} old instructions") self.session.commit() logger.info(f"Instruction cleanup completed: {archived_count} archived, {deleted_count} deleted") return { "instructions_archived": archived_count, "instructions_deleted": deleted_count, "cutoff_time": cutoff_time.isoformat(), }
[docs] def cleanup_completed_sessions(self) -> dict[str, int]: """ Clean up old completed sessions beyond retention period. Maintains session metadata but removes detailed experimental parameters. """ cutoff_time = datetime.now() - timedelta(days=self.policy.completed_session_retention_days) # Find old completed sessions old_sessions_query = select(AgentSession).where( and_( AgentSession.ended_at.isnot(None), AgentSession.ended_at < cutoff_time, AgentSession.status.in_(["completed", "terminated", "error"]), ) ) old_sessions = self.session.exec(old_sessions_query).all() archived_count = 0 for session in old_sessions: # Archive sensitive experimental parameters if session.experimental_parameters: archived_params = { "archived": True, "archived_at": datetime.now().isoformat(), "parameter_count": len(session.experimental_parameters), } session.experimental_parameters = archived_params archived_count += 1 if archived_count % self.policy.batch_size == 0: self.session.commit() logger.info(f"Archived {archived_count} completed sessions") self.session.commit() logger.info(f"Session cleanup completed: {archived_count} sessions archived") return { "sessions_archived": archived_count, "cutoff_time": cutoff_time.isoformat(), }
[docs] def cleanup_old_acknowledgements(self) -> dict[str, int]: """ Clean up very old acknowledgement records beyond regulatory retention period. This should be used carefully as it affects audit trail completeness. """ cutoff_time = datetime.now() - timedelta(days=self.policy.acknowledgement_retention_days) # Count acknowledgements to be deleted count_query = select(AgentInstructionAcknowledgement).where( AgentInstructionAcknowledgement.created_at < cutoff_time ) old_acknowledgements = self.session.exec(count_query).all() deleted_count = 0 for ack in old_acknowledgements: self.session.delete(ack) deleted_count += 1 if deleted_count % self.policy.batch_size == 0: self.session.commit() logger.info(f"Deleted {deleted_count} old acknowledgements") self.session.commit() logger.warning( f"Acknowledgement cleanup completed: {deleted_count} records deleted (affects audit trail completeness)" ) return { "acknowledgements_deleted": deleted_count, "cutoff_time": cutoff_time.isoformat(), }
[docs] def run_full_cleanup(self) -> dict[str, any]: """ Run complete cleanup process across all agent communication data. Returns: Dictionary with comprehensive cleanup statistics """ logger.info("Starting full agent data cleanup process") start_time = datetime.now() results = {} try: # Run cleanup in order of increasing importance/sensitivity results["connections"] = self.cleanup_stale_connections() results["instructions"] = self.cleanup_old_instructions() results["sessions"] = self.cleanup_completed_sessions() # Only cleanup acknowledgements if explicitly configured (affects audit trail) if self.policy.acknowledgement_retention_days < 2555: # Less than 7 years logger.warning("Cleaning up acknowledgements - this affects scientific audit trail") results["acknowledgements"] = self.cleanup_old_acknowledgements() except Exception as e: logger.error(f"Error during cleanup process: {e}") self.session.rollback() raise end_time = datetime.now() duration = end_time - start_time results["summary"] = { "start_time": start_time.isoformat(), "end_time": end_time.isoformat(), "duration_seconds": duration.total_seconds(), "policy": { "connection_cleanup_hours": self.policy.connection_cleanup_hours, "instruction_retention_days": self.policy.instruction_retention_days, "session_retention_days": self.policy.completed_session_retention_days, "acknowledgement_retention_days": self.policy.acknowledgement_retention_days, }, } logger.info(f"Full cleanup completed in {duration.total_seconds():.2f} seconds") return results
[docs] def get_data_usage_statistics(self) -> dict[str, any]: """ Get current data usage statistics for agent communication tables. Returns: Dictionary with storage and record count statistics """ stats = {} # Get record counts stats["record_counts"] = { "sessions": len(self.session.exec(select(AgentSession)).all()), "instructions": len(self.session.exec(select(AgentInstruction)).all()), "connections": len(self.session.exec(select(AgentConnection)).all()), "acknowledgements": len(self.session.exec(select(AgentInstructionAcknowledgement)).all()), } # Get table sizes using PostgreSQL system tables try: size_query = text(""" SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size, pg_total_relation_size(schemaname||'.'||tablename) as size_bytes FROM pg_tables WHERE tablename IN ('agentsession', 'agentinstruction', 'agentconnection', 'agentinstructionacknowledgement') ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC; """) result = self.session.execute(size_query) table_sizes = [] total_bytes = 0 for row in result: table_info = { "table": row[1], "size_human": row[2], "size_bytes": row[3], } table_sizes.append(table_info) total_bytes += row[3] stats["table_sizes"] = table_sizes stats["total_size_bytes"] = total_bytes except Exception as e: logger.error(f"Could not retrieve table size statistics: {e}") stats["table_sizes"] = "unavailable" # Get oldest records try: oldest_session = self.session.exec(select(AgentSession).order_by(AgentSession.created_at).limit(1)).first() oldest_instruction = self.session.exec( select(AgentInstruction).order_by(AgentInstruction.created_at).limit(1) ).first() stats["oldest_records"] = { "session": oldest_session.created_at.isoformat() if oldest_session else None, "instruction": oldest_instruction.created_at.isoformat() if oldest_instruction else None, } except Exception as e: logger.error(f"Could not retrieve oldest record statistics: {e}") stats["oldest_records"] = "unavailable" return stats
[docs] def main(): """CLI entry point for agent data cleanup operations.""" import argparse parser = argparse.ArgumentParser(description="Agent communication data cleanup utility") parser.add_argument( "--policy", choices=["scientific", "development"], default="scientific", help="Retention policy to use", ) parser.add_argument( "--operation", choices=["cleanup", "stats", "connections", "instructions", "sessions"], default="stats", help="Operation to perform", ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be cleaned up without making changes", ) args = parser.parse_args() # Set up logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # Create database session engine = setup_postgres_connection() with Session(engine) as session: # Select policy if args.policy == "development": policy = AgentDataRetentionPolicy.development() else: policy = AgentDataRetentionPolicy.scientific_compliance() cleanup_service = AgentDataCleanupService(session, policy) if args.dry_run: logger.info("DRY RUN MODE - No changes will be made") # Execute requested operation if args.operation == "stats": stats = cleanup_service.get_data_usage_statistics() print("Agent Communication Data Statistics:") print(f"Record counts: {stats['record_counts']}") if "table_sizes" in stats and stats["table_sizes"] != "unavailable": print("Table sizes:") for table in stats["table_sizes"]: print(f" {table['table']}: {table['size_human']}") if "oldest_records" in stats and stats["oldest_records"] != "unavailable": print(f"Oldest records: {stats['oldest_records']}") elif args.operation == "cleanup": if args.dry_run: logger.info("Would run full cleanup with policy: %s", args.policy) else: results = cleanup_service.run_full_cleanup() print("Cleanup Results:") for category, result in results.items(): print(f"{category}: {result}") elif args.operation == "connections": if args.dry_run: logger.info("Would cleanup stale connections") else: result = cleanup_service.cleanup_stale_connections() print(f"Connection cleanup: {result}") elif args.operation == "instructions": if args.dry_run: logger.info("Would cleanup old instructions") else: result = cleanup_service.cleanup_old_instructions() print(f"Instruction cleanup: {result}") elif args.operation == "sessions": if args.dry_run: logger.info("Would cleanup completed sessions") else: result = cleanup_service.cleanup_completed_sessions() print(f"Session cleanup: {result}")
if __name__ == "__main__": main()