Source code for fastcs.transport.tango.transport
import asyncio
from dataclasses import dataclass, field
from fastcs.controller_api import ControllerAPI
from fastcs.transport.transport import Transport
from .dsr import TangoDSR, TangoDSROptions
[docs]
@dataclass
class TangoTransport(Transport):
"""Tango transport."""
dsr: TangoDSROptions = field(default_factory=TangoDSROptions)
def initialise(
self,
controller_api: ControllerAPI,
loop: asyncio.AbstractEventLoop,
):
self._dsr = TangoDSR(controller_api, loop)
async def serve(self) -> None:
coro = asyncio.to_thread(self._dsr.run, self.dsr)
await coro