Source code for tickit.adapters.interpreters.wrappers.splitting_interpreter
import re
from typing import AnyStr, AsyncIterable, List, Tuple
from tickit.adapters.interpreters.utils import wrap_messages_as_async_iterable
from tickit.core.adapter import Adapter, Interpreter
[docs]class SplittingInterpreter(Interpreter[AnyStr]):
"""A wrapper for an interpreter that splits a single message into multiple.
An interpreter wrapper class that takes a message, splits it according to a given
delimiter, and passes on the resulting sub-messages individually on to the
wrapped interpreter.
"""
def __init__(
self,
interpreter: Interpreter[AnyStr],
message_delimiter: AnyStr,
) -> None:
"""An interpreter decorator that splits a message into multiple sub-messages.
Args:
interpreter (Interpreter): The interpreter messages are passed on to.
message_delimiter (AnyStr): The delimiter by which the message is split up.
Can be a regex pattern. Must be of the same type as the message.
"""
super().__init__()
self.interpreter: Interpreter[AnyStr] = interpreter
self.message_delimiter: AnyStr = message_delimiter
async def _handle_individual_messages(
self, adapter: Adapter, individual_messages: List[AnyStr]
) -> List[Tuple[AsyncIterable[AnyStr], bool]]:
results = [
await self.interpreter.handle(adapter, message)
for message in individual_messages
]
return results
async def _collect_responses(
self, results: List[Tuple[AsyncIterable[AnyStr], bool]]
) -> Tuple[AsyncIterable[AnyStr], bool]:
"""Combines results from handling multiple messages.
Takes the responses from when the wrapped interpreter handles multiple messages
and returns an appropriate composite response and interrupt. The response is
an asynchronous iterable of each of the individual responses, the composite
interrupt is a logical inclusive 'or' of all of the individual interrupts.
Args:
results (List[Tuple[AsyncIterable[AnyStr], bool]]): a list of returned
values from the wrapped class' handle() method.
Returns:
Tuple[AsyncIterable[AnyStr], bool]:
A tuple of the asynchronous iterable of reply messages and a flag
indicating whether an interrupt should be raised by the adapter.
"""
individual_responses, individual_interrupts = zip(*results)
responses = [
response
for response_gen in individual_responses
async for response in response_gen
]
resp = wrap_messages_as_async_iterable(responses)
interrupt = any(individual_interrupts)
return resp, interrupt
[docs] async def handle(
self, adapter: Adapter, message: AnyStr
) -> Tuple[AsyncIterable[AnyStr], bool]:
"""Splits a message and passes the resulting sub-messages on to an interpreter.
Splits a given message and passes the resulting sub-messages on to an
interpreter. The responses to the individual sub-messages are then returned.
Args:
adapter (Adapter): The adapter in which the function should be executed
message: (AnyStr): The message to be split up and handled.
Returns:
Tuple[AsyncIterable[Union[str, bytes]], bool]:
A tuple of the asynchronous iterable of reply messages and a flag
indicating whether an interrupt should be raised by the adapter.
"""
individual_messages = [
msg
for msg in re.split(self.message_delimiter, message)
if msg # Discard empty strings and None
]
results = await self._handle_individual_messages(adapter, individual_messages)
return await self._collect_responses(results)