Source code for tickit.core.state_interfaces.kafka

import asyncio
from typing import Awaitable, Callable, Generic, Iterable, TypeVar

import yaml
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from yaml.loader import Loader

from tickit.core.state_interfaces import state_interface

#: A consumable value
C = TypeVar("C")
#: A producible value
P = TypeVar("P")


[docs] @state_interface.add("kafka", True) class KafkaStateConsumer(Generic[C]): """A kafka implementation of the StateConsumer protocol. A kafka implementation of the StateConsumer protocol, this consumer can subscribe to kafka topics, upon receiving a message the consumer passes the value to the callback function passed during initialization, if a topic is subscribed to which does not yet exist it is created. """ def __init__(self, callback: Callable[[C], Awaitable[None]]) -> None: """Creates an AIOKafka Consumer and begins consuming subscribed topics. Args: callback (Callable[[C], Awaitable[None]]): An asynchronous handler function for consumed values. """ self.consumer = AIOKafkaConsumer( None, auto_offset_reset="earliest", value_deserializer=lambda m: yaml.load(m.decode("utf-8"), Loader=Loader), ) self.callback = callback asyncio.create_task(self._run_forever()) async def _run_forever(self) -> None: """Starts the consumer and waits for messages to arrive.""" await self.consumer.start() while True: async for message in self.consumer: if message.value is not None: await self.callback(message.value)
[docs] async def subscribe(self, topics: Iterable[str]): """Subscribes the consumer to the given topics. Subscribes the consumer to the given topics, new messages are passed to the callback. Args: topics (Iterable[str]): An iterable of topics to subscribe to. """ self.consumer.subscribe(topics)
[docs] @state_interface.add("kafka", True) class KafkaStateProducer(Generic[P]): """A kafka implementation of the StateProducer protocol. A kafka implementation of the StateProducer protocol, this producer can produce a value to a kafka topic, if the topic does not yet exist it is created. """ def __init__(self) -> None: """Creates and starts and AIOKafka Producer.""" self.producer = AIOKafkaProducer( value_serializer=lambda m: yaml.dump(m).encode("utf-8") ) self._start = asyncio.create_task(self.producer.start())
[docs] async def produce(self, topic: str, value: P) -> None: """Produces a value to the provided topic. Args: topic (str): The topic to which the value should be sent. value (P): The value to send to the provided topic. """ await self._start await self.producer.send(topic, value)