Source code for fastcs.connections.serial_connection
import asyncio
from dataclasses import dataclass
import aioserial
[docs]
class NotOpenedError(Exception):
"""If the serial stream is not opened."""
pass
[docs]
@dataclass
class SerialConnectionSettings:
port: str
baud: int = 115200
[docs]
class SerialConnection:
"""A serial connection."""
def __init__(self):
self.stream = None
self._lock = asyncio.Lock()
async def connect(self, settings: SerialConnectionSettings) -> None:
self.__stream = aioserial.AioSerial(port=settings.port, baudrate=settings.baud)
@property
def _stream(self) -> aioserial.AioSerial:
if self.__stream is None:
raise NotOpenedError(
"Need to call connect() before using SerialConnection."
)
return self.__stream
async def send_command(self, message: bytes) -> None:
async with self._lock:
await self._send_message(message)
async def send_query(self, message: bytes, response_size: int) -> bytes:
async with self._lock:
await self._send_message(message)
return await self._receive_response(response_size)
async def _send_message(self, message):
await self._stream.write_async(message)
async def _receive_response(self, size):
return await self._stream.read_async(size)
async def close(self) -> None:
async with self._lock:
self._stream.close()
self.__stream = None