from collections import defaultdict
from typing import (
Any,
Awaitable,
Callable,
DefaultDict,
Generic,
Iterable,
List,
NamedTuple,
NewType,
Set,
TypeVar,
)
from tickit.core.state_interfaces import state_interface
from tickit.utils.singleton import Singleton
#: A consumable value
C = TypeVar("C")
#: A producible value
P = TypeVar("P")
[docs]class Message(NamedTuple):
"""An immutable data container for internal messages."""
value: Any
#: A list of messages
Messages = NewType("Messages", List[Message])
[docs]class InternalStateServer(metaclass=Singleton):
"""A singleton, in memory, publish/subscribe message store and router.
A singleton, in memory, publish/subscribe message store and router. The single
instance of this class holds a mapping of topics and a list of messages which have
been pushed to them and a mapping of consumers which subscribe to the topics. Upon
subscribing to a topic, a consumer is sent all of the messages which were
previously registered against the topic. Upon pushing to a topic, the message is
added to the store and immediately forwarded to each of the consumers which
subscribe to the topic.
"""
_topics: DefaultDict[str, Messages] = defaultdict(lambda: Messages(list()))
_subscribers: DefaultDict[str, Set["InternalStateConsumer"]] = defaultdict(set)
[docs] async def push(self, topic: str, message: Message) -> None:
"""Asynchronous method which propagates a message to subscribers and stores it.
An asynchronous method which propagates the given message to consumers which
subscribe to the topic and stores the message in the topic -> messages mapping.
Args:
topic (str): The topic on which the the message should be propagated and
stored.
message (Message): The message which should be propagated and stored on the
topic.
"""
self._topics[topic].append(message)
for subscriber in self._subscribers[topic]:
await subscriber.add_message(message)
[docs] async def subscribe(self, consumer: "InternalStateConsumer", topics: Iterable[str]):
"""Subscribes the consumer to the given topics.
An asynchronous method which adds a consumer to the subscriber list of each
topic in the topics iterable. On subscription, previous messages on the topic
are immediately passed to the consumer.
Args:
consumer (InternalStateConsumer): The consumer which is subscribing to the
topic.
topics (Iterable[str]): The topic which the consumer is to be subscribed to.
"""
for topic in topics:
self._subscribers[topic].add(consumer)
for message in self._topics[topic]:
await consumer.add_message(message)
[docs] def create_topic(self, topic: str) -> None:
"""Creates a new topic as an empty message list.
Args:
topic (str): The topic to be created.
"""
assert topic not in self._topics.keys()
self._topics[topic] = Messages(list())
[docs] def remove_topic(self, topic: str) -> None:
"""Removes an existing topic and the corresponding message list.
Args:
topic (str): The topic to be removed.
"""
assert topic in self._topics.keys()
del self._topics[topic]
@property
def topics(self) -> List[str]:
"""A list of topics which currently exist.
Returns:
List[str]: A list of topics which currently exist.
"""
return list(self._topics.keys())
[docs]@state_interface.add("internal", False)
class InternalStateConsumer(Generic[C]):
"""An internal, singleton based, implementation of the StateConsumer protocol.
A internal, singleton based, implementation of the StateConsumer protocol, this
consumer can subscribe to InternalStateServer topics, upon receiving a message the
consumer passes the value to the callback function passed during initialization, if
a topic is subscribed to which does not yet exist it is created.
"""
def __init__(self, callback: Callable[[C], Awaitable[None]]) -> None:
"""Gets an instance of the InternalStateServer for use in subscribe.
Args:
callback (Callable[[C], Awaitable[None]]): An asynchronous handler function
for consumed values.
"""
self.server = InternalStateServer()
self.callback = callback
[docs] async def subscribe(self, topics: Iterable[str]) -> None:
"""Subscribes the consumer to the given topics.
Subscribes the consumer to the given topics, new messages are passed to the
callback.
Args:
topics (Iterable[str]): An iterable of topics to subscribe to.
"""
await self.server.subscribe(self, topics)
[docs] async def add_message(self, message: Message) -> None:
"""Adds a message to the consumer, triggering a callback.
Args:
message (Message): The message to be added to the consumer.
"""
await self.callback(message.value)
[docs]@state_interface.add("internal", False)
class InternalStateProducer(Generic[P]):
"""An internal, singleton based, implementation of the StateProducer protocol.
An internal, singleton based, implementation of the StateProducer protocol, this
producer can produce a value to a InternalStateServer topic, if the topic does not
yet exist it is created.
"""
def __init__(self) -> None:
"""Gets an instance of the InternalStateServer for use in produce."""
self.server = InternalStateServer()
[docs] async def produce(self, topic: str, value: P) -> None:
"""Produces a value to the provided topic.
Args:
topic (str): The topic to which the value should be sent.
value (P): The value to send to the provided topic.
"""
await self.server.push(topic, Message(value=value))