Source code for tickit.core.state_interfaces.state_interface

from typing import (
    Awaitable,
    Callable,
    Dict,
    Iterable,
    Protocol,
    Set,
    Tuple,
    Type,
    TypeVar,
    Union,
    runtime_checkable,
)
from warnings import warn

#: A consumable value
C = TypeVar("C", covariant=True)
#: A producible value
P = TypeVar("P", contravariant=True)


[docs] @runtime_checkable class StateConsumer(Protocol[C]): """An interface for types which implement publish/subscribe message consumers. An interface for types which implement publish/subscribe message consumers, the consumer must be able to subscribe to topics within the messaging framework, upon receiving a message the consumer should pass the value to the callback function, if a topic is subscribed to which does not yet exist it should be created. """ def __init__(self, callback: Callable[[C], Awaitable[None]]) -> None: """Constructor of the consumer, given a callback handle. Args: callback (Callable[[C], Awaitable[None]]): An asynchronous handler function for consumed values. """ pass
[docs] async def subscribe(self, topics: Iterable[str]) -> None: """Subscribe the consumer to the given topics. Subscribe the consumer to the given topics, new messages are passed to the callback. Args: topics (Iterable[str]): An iterable of topics to subscribe to. """ pass
[docs] @runtime_checkable class StateProducer(Protocol[P]): """An interface for types which implement publish/subscribe message producers. An interface for types which implement publish/subscribe message producers, the producer must be able to produce a value to a topic within the messaging framework, if the topic does not yet exist it should be created. """ def __init__(self) -> None: """Constructor of the producer, given no arguments.""" pass
[docs] async def produce(self, topic: str, value: P) -> None: """Produce a value to the provided topic. Args: topic (str): The topic to which the value should be sent. value (P): The value to send to the provided topic. """ pass
#: The union of StateConsumer and StateProducer StateInterface = TypeVar("StateInterface", bound=Union[StateConsumer, StateProducer]) consumers: Dict[str, Tuple[Type[StateConsumer], bool]] = dict() producers: Dict[str, Tuple[Type[StateProducer], bool]] = dict()
[docs] def add( name: str, external: bool ) -> Callable[[Type[StateInterface]], Type[StateInterface]]: """A decorator to add a StateInterface to the registry. A decorator to add a StateInterface to the registry of StateConsumers or StateProducer according to its signature. StateConsumers and StateProducers which are intended to work together should be added with the same name. Args: name (str): The name under which the class should be registered (typically the name of the messaging framework). external (bool): A flag which indicates whether the interface can be used simulations which are distributed across processes. """ def wrap(interface: Type[StateInterface]) -> Type[StateInterface]: if isinstance(interface, StateProducer): producers[name] = (interface, external) elif isinstance(interface, StateConsumer): consumers[name] = (interface, external) else: warn( RuntimeWarning(f"{interface} is not {StateConsumer} or {StateProducer}") ) return interface return wrap
[docs] def interfaces(external: bool = False) -> Set[str]: """Returns interface names for which both a StateConsumer and StateProducer exist. Gets a set of interface names for which both a StateConsumer and StateProducer exist. The external option may be used to restrict interfaces to those which may be used in simulations which are distributed across multiple processes. Args: external (bool): If true, only interfaces which can be used in simulations which are distributed across processes are returned. If false, all interfaces are returned. Defaults to False. Returns: Set[str]: A set of names of StateConsumer / StateProducer pairs. """ return satisfy_externality(external, consumers) & satisfy_externality( external, producers )
[docs] def satisfy_externality( external: bool, interfaces: Dict[str, Tuple[Type[StateInterface], bool]] ) -> Set[str]: """Finds which interfaces satisfy the externality requirement provided. Args: external (bool): If true, only interfaces which can be used in simulations which are distributed across processes are returned. If false, all interfaces are returned. interfaces (Dict[str, Tuple[Type[StateInterface], bool]]): A mapping of interface names to a tuple of their class and their externality flag. Returns: Set[str]: A set of interface names which satisfy the externality requirement. """ return set( name for name, interface in interfaces.items() if not external or interface[1] )
[docs] def get_interface(name: str) -> Tuple[Type[StateConsumer], Type[StateProducer]]: """Get the StateConsumer and StateProducer classes for a given interface name. Args: name (str): The name of the interface to be retrieved. Returns: Tuple[Type[StateConsumer], Type[StateProducer]]: A tuple of the StateConsumer and StateProducer classes. """ return consumers[name][0], producers[name][0]