smartem_backend.utils#

Members

RabbitMQConnection

Base class for RabbitMQ connection management

RabbitMQConsumer

Consumer class for receiving messages from RabbitMQ

RabbitMQPublisher

Publisher class for sending messages to RabbitMQ

get_db_engine

Get the singleton database engine.

get_log_file_path

Get the log file path with validation and fallback handling.

load_conf

setup_logger

Set up logger with consolidated configuration logic.

setup_postgres_connection

Get or create a singleton database engine with connection pooling.

setup_rabbitmq

Create RabbitMQ publisher and consumer instances using configuration settings

smartem_backend.utils.get_log_file_path(conf: dict | None = None) str | None[source]#

Get the log file path with validation and fallback handling.

Parameters:

conf – Configuration dictionary (if None, will load from config file)

Returns:

Valid log file path or None for test environments

Return type:

str | None

smartem_backend.utils.setup_logger(level: int = 20, conf: dict | None = None)[source]#

Set up logger with consolidated configuration logic.

Parameters:
  • level – Logging level (default: INFO)

  • conf – Configuration dictionary (if None, will load from config file)

Returns:

Configured logger instance

smartem_backend.utils.setup_postgres_connection(echo=False, force_new=False) Engine[source]#

Get or create a singleton database engine with connection pooling.

Parameters:
  • echo – Enable SQL logging

  • force_new – Force creation of new engine (for testing)

Returns:

SQLAlchemy Engine instance

smartem_backend.utils.get_db_engine() Engine[source]#

Get the singleton database engine. Creates it if it doesn’t exist.

Returns:

SQLAlchemy Engine instance

class smartem_backend.utils.RabbitMQConnection(connection_params: dict[str, Any] | None = None, exchange: str = '', queue: str = 'smartem_backend')[source]#

Base class for RabbitMQ connection management

Initialize RabbitMQ connection

Parameters:
  • connection_params – Dictionary with RabbitMQ connection parameters. If None, load from environment variables

  • exchange – Exchange name to use (default is direct exchange “”)

  • queue – Queue name to use

connect() None[source]#

Establish connection to RabbitMQ server

close() None[source]#

Close the connection to RabbitMQ

channel()[source]#

Get the channel object

Returns:

The current channel object

class smartem_backend.utils.RabbitMQPublisher(connection_params: dict[str, Any] | None = None, exchange: str = '', queue: str = 'smartem_backend')[source]#

Publisher class for sending messages to RabbitMQ

Initialize RabbitMQ connection

Parameters:
  • connection_params – Dictionary with RabbitMQ connection parameters. If None, load from environment variables

  • exchange – Exchange name to use (default is direct exchange “”)

  • queue – Queue name to use

publish_event(event_type: MessageQueueEventType, payload: BaseModel | dict[str, Any]) bool[source]#

Publish an event to RabbitMQ

Parameters:
  • event_type – Type of event from EventType enum

  • payload – Event payload, either as Pydantic model or dictionary

Returns:

True if message was published successfully

Return type:

bool

class smartem_backend.utils.RabbitMQConsumer(connection_params: dict[str, Any] | None = None, exchange: str = '', queue: str = 'smartem_backend')[source]#

Consumer class for receiving messages from RabbitMQ

Initialize RabbitMQ connection

Parameters:
  • connection_params – Dictionary with RabbitMQ connection parameters. If None, load from environment variables

  • exchange – Exchange name to use (default is direct exchange “”)

  • queue – Queue name to use

consume(callback: Callable, prefetch_count: int = 1) None[source]#

Start consuming messages from the queue

Parameters:
  • callback – Callback function to process messages

  • prefetch_count – Maximum number of unacknowledged messages (default: 1)

stop_consuming() None[source]#

Stop consuming messages

smartem_backend.utils.setup_rabbitmq(queue_name=None, exchange=None)[source]#

Create RabbitMQ publisher and consumer instances using configuration settings

Parameters:
  • queue_name – Optional queue name override (if None, load from config)

  • exchange – Optional exchange name override (if None, use default “”)

Returns:

(RabbitMQPublisher instance, RabbitMQConsumer instance)

Return type:

tuple