smartem_backend.utils#
Members
Base class for RabbitMQ connection management  | 
|
Consumer class for receiving messages from RabbitMQ  | 
|
Publisher class for sending messages to RabbitMQ  | 
|
Get the singleton database engine.  | 
|
Get the log file path with validation and fallback handling.  | 
|
  | 
|
Set up logger with consolidated configuration logic.  | 
|
Get or create a singleton database engine with connection pooling.  | 
|
Create RabbitMQ publisher and consumer instances using configuration settings  | 
- smartem_backend.utils.get_log_file_path(conf: dict | None = None) str | None[source]#
 Get the log file path with validation and fallback handling.
- Parameters:
 conf – Configuration dictionary (if None, will load from config file)
- Returns:
 Valid log file path or None for test environments
- Return type:
 str | None
- smartem_backend.utils.setup_logger(level: int = 20, conf: dict | None = None)[source]#
 Set up logger with consolidated configuration logic.
- Parameters:
 level – Logging level (default: INFO)
conf – Configuration dictionary (if None, will load from config file)
- Returns:
 Configured logger instance
- smartem_backend.utils.setup_postgres_connection(echo=False, force_new=False) Engine[source]#
 Get or create a singleton database engine with connection pooling.
- Parameters:
 echo – Enable SQL logging
force_new – Force creation of new engine (for testing)
- Returns:
 SQLAlchemy Engine instance
- smartem_backend.utils.get_db_engine() Engine[source]#
 Get the singleton database engine. Creates it if it doesn’t exist.
- Returns:
 SQLAlchemy Engine instance
- class smartem_backend.utils.RabbitMQConnection(connection_params: dict[str, Any] | None = None, exchange: str = '', queue: str = 'smartem_backend')[source]#
 Base class for RabbitMQ connection management
Initialize RabbitMQ connection
- Parameters:
 connection_params – Dictionary with RabbitMQ connection parameters. If None, load from environment variables
exchange – Exchange name to use (default is direct exchange “”)
queue – Queue name to use
- class smartem_backend.utils.RabbitMQPublisher(connection_params: dict[str, Any] | None = None, exchange: str = '', queue: str = 'smartem_backend')[source]#
 Publisher class for sending messages to RabbitMQ
Initialize RabbitMQ connection
- Parameters:
 connection_params – Dictionary with RabbitMQ connection parameters. If None, load from environment variables
exchange – Exchange name to use (default is direct exchange “”)
queue – Queue name to use
- publish_event(event_type: MessageQueueEventType, payload: BaseModel | dict[str, Any]) bool[source]#
 Publish an event to RabbitMQ
- Parameters:
 event_type – Type of event from EventType enum
payload – Event payload, either as Pydantic model or dictionary
- Returns:
 True if message was published successfully
- Return type:
 
- class smartem_backend.utils.RabbitMQConsumer(connection_params: dict[str, Any] | None = None, exchange: str = '', queue: str = 'smartem_backend')[source]#
 Consumer class for receiving messages from RabbitMQ
Initialize RabbitMQ connection
- Parameters:
 connection_params – Dictionary with RabbitMQ connection parameters. If None, load from environment variables
exchange – Exchange name to use (default is direct exchange “”)
queue – Queue name to use
- smartem_backend.utils.setup_rabbitmq(queue_name=None, exchange=None)[source]#
 Create RabbitMQ publisher and consumer instances using configuration settings
- Parameters:
 queue_name – Optional queue name override (if None, load from config)
exchange – Optional exchange name override (if None, use default “”)
- Returns:
 (RabbitMQPublisher instance, RabbitMQConsumer instance)
- Return type: