Source code for fastcs.connections.ip_connection

import asyncio
from dataclasses import dataclass


[docs] class DisconnectedError(Exception): """Raised if the ip connection is disconnected.""" pass
[docs] @dataclass class IPConnectionSettings: ip: str = "127.0.0.1" port: int = 25565
[docs] @dataclass class StreamConnection: """For reading and writing to a stream.""" reader: asyncio.StreamReader writer: asyncio.StreamWriter def __post_init__(self): self._lock = asyncio.Lock() async def __aenter__(self): await self._lock.acquire() return self async def __aexit__(self, exc_type, exc_val, exc_tb): self._lock.release() async def send_message(self, message) -> None: self.writer.write(message.encode("utf-8")) await self.writer.drain() async def receive_response(self) -> str: data = await self.reader.readline() return data.decode("utf-8") async def close(self): self.writer.close() await self.writer.wait_closed()
[docs] class IPConnection: """For connecting to an ip using a `StreamConnection`.""" def __init__(self): self.__connection = None @property def _connection(self) -> StreamConnection: if self.__connection is None: raise DisconnectedError("Need to call connect() before using IPConnection.") return self.__connection async def connect(self, settings: IPConnectionSettings): reader, writer = await asyncio.open_connection(settings.ip, settings.port) self.__connection = StreamConnection(reader, writer) async def send_command(self, message) -> None: async with self._connection as connection: await connection.send_message(message) async def send_query(self, message) -> str: async with self._connection as connection: await connection.send_message(message) return await connection.receive_response() async def close(self): async with self._connection as connection: await connection.close() self.__connection = None