Source code for tickit.adapters.servers.tcp

import asyncio
import logging
from asyncio.streams import StreamReader, StreamWriter
from typing import AsyncIterable, Awaitable, Callable, List, Optional

from tickit.core.adapter import Server
from tickit.utils.byte_format import ByteFormat

LOGGER = logging.getLogger(__name__)


[docs]class TcpServer(Server[bytes]): """A configurable tcp server with delegated message handling for use in adapters.""" def __init__( self, host: str = "localhost", port: int = 25565, format: ByteFormat = ByteFormat(b"%b"), ) -> None: """The TcpServer constructor which takes a host, port and format byte string. Args: host (str): The host name which the server should be run under. port (int): The port number which the server should listen to. format (ByteFormat): A formatting string for messages sent by the server, allowing for the prepending and appending of data. Defaults to b"%b". """ self.host = host self.port = port self.format = format.format
[docs] async def run_forever( self, on_connect: Callable[[], AsyncIterable[Optional[bytes]]], handler: Callable[[bytes], Awaitable[AsyncIterable[Optional[bytes]]]], ) -> None: """Runs the TCP server indefinitely on the configured host and port. An asynchronous method used to run the server indefinitely on the configured host and port. Upon client connection, messages from the on_connect iterable will be sent. Upon receiving a message the server will delegate handling of it to the handler. Replies will be formatted according to the configured format string. Args: on_connect (Callable[[], AsyncIterable[bytes]]): An asynchronous iterable of messages to be sent upon client connection. handler (Callable[[bytes], Awaitable[AsyncIterable[bytes]]]): An asynchronous message handler which returns an asynchronous iterable of replies. """ handle = self._generate_handle_function(on_connect, handler) server = await asyncio.start_server(handle, self.host, self.port) async with server: await server.serve_forever()
def _generate_handle_function( self, on_connect: Callable[[], AsyncIterable[Optional[bytes]]], handler: Callable[[bytes], Awaitable[AsyncIterable[Optional[bytes]]]], ) -> Callable[[StreamReader, StreamWriter], Awaitable[None]]: """Generates the handle function to be passed to the server. The handle function is generated from the specified functions. It's purpose is to define how the server will respond to incoming messages. Args: on_connect (Callable[[], AsyncIterable[bytes]]): An asynchronous iterable of messages to be sent upon client connection. handler (Callable[[bytes], Awaitable[AsyncIterable[bytes]]]): An asynchronous message handler which returns an asynchronous iterable of replies. """ tasks: List[asyncio.Task] = list() async def handle(reader: StreamReader, writer: StreamWriter) -> None: async def reply(replies: AsyncIterable[Optional[bytes]]) -> None: async for reply in replies: if reply is None: continue LOGGER.debug(f"Replying with {reply!r}") writer.write(self.format % reply) if writer.is_closing(): break await writer.drain() tasks.append(asyncio.create_task(reply(on_connect()))) while True: data: bytes = await reader.read(1024) if data == b"": break addr = writer.get_extra_info("peername") LOGGER.debug(f"Received {data!r} from {addr}") tasks.append(asyncio.create_task(reply(await handler(data)))) await asyncio.wait(tasks) return handle