Source code for tickit.adapters.tcp
from abc import abstractmethod
from inspect import getmembers
from typing import (
AnyStr,
AsyncIterable,
AsyncIterator,
Optional,
Protocol,
Sequence,
Tuple,
get_type_hints,
runtime_checkable,
)
from tickit.adapters.utils import wrap_as_async_iterator
from tickit.core.adapter import Interpreter, RaiseInterrupt
from tickit.utils.byte_format import ByteFormat
[docs]@runtime_checkable
class Command(Protocol):
"""An interface for interpretable commands."""
#: A flag which indicates whether calling of the method should trigger an interrupt
interrupt: bool
[docs] @abstractmethod
def parse(self, data: bytes) -> Optional[Sequence[AnyStr]]:
"""An abstract method which parses a message and extracts arguments.
An abstract method which parses a message and produces arguments if a match is
found, otherwise None is returned.
Args:
data (bytes): The message to be parsed.
Returns:
Optional[Sequence[AnyStr]]:
A sequence of arguments extracted from the message if matched,
otherwise None.
"""
pass
[docs]class CommandAdapter(Interpreter[AnyStr]):
"""An adapter which routes to commands registered to adapter methods.
An adapter which attempts to parse messages according to the parse method of
commands registered against adapter methods, if a match is found the method is
called with the parsed arguments.
"""
_byte_format: ByteFormat = ByteFormat(b"%b")
@property
def byte_format(self) -> ByteFormat:
return self._byte_format
def after_update(self) -> None:
...
[docs] async def handle_message(
self,
message: AnyStr,
raise_interrupt: RaiseInterrupt,
) -> AsyncIterable[Optional[AnyStr]]:
"""Delegates message handling to the adapter, raises interrupt if requested.
Args:
message (T): The message from the server to be handled.
Returns:
AsyncIterable[Optional[T]]: An asynchronous iterable of reply messages.
"""
reply, interrupt = await self.handle(message)
if interrupt:
await raise_interrupt()
return reply
[docs] async def handle(self, message: AnyStr) -> Tuple[AsyncIterator[AnyStr], bool]:
"""Matches the message to an adapter command and calls the corresponding method.
An asynchronous method which handles a message by attempting to match the
message against each of the registered commands, if a match is found the
corresponding command is called and its reply is returned with an asynchronous
iterator wrapper if required. If no match is found the unknown command message
is returned with no request for interrupt.
Args:
adapter (Adapter): The adapter in which the function should be executed
message (bytes): The message to be handled.
Returns:
Tuple[AsyncIterator[Union[str, bytes]], bool]:
A tuple of the asynchronous iterator of reply messages and a flag
indicating whether an interrupt should be raised by the adapter.
"""
for _, method in getmembers(self):
command = getattr(method, "__command__", None)
if command is None:
continue
args = command.parse(message)
if args is None:
continue
args = (
argtype(arg)
for arg, argtype in zip(args, get_type_hints(method).values())
)
resp = await method(*args)
if not isinstance(resp, AsyncIterator):
resp = wrap_as_async_iterator(resp)
return resp, command.interrupt
msg = "Request does not match any known command"
# This is a pain but the message needs to match the input message
# TODO: Fix command interpreters' handling of bytes vs str
if isinstance(message, bytes):
resp = wrap_as_async_iterator(msg.encode("utf-8"))
return resp, False
else:
resp = wrap_as_async_iterator(msg)
return resp, False
[docs] async def on_connect(self) -> AsyncIterable[Optional[AnyStr]]:
"""Overridable asynchronous iterable which yields messages on client connection.
Returns:
AsyncIterable[Optional[T]]: An asynchronous iterable of messages.
"""
if False:
yield None