Source code for fastcs.transport.tango.adapter
import asyncio
from fastcs.controller_api import ControllerAPI
from fastcs.transport.adapter import TransportAdapter
from .dsr import TangoDSR
from .options import TangoOptions
[docs]
class TangoTransport(TransportAdapter):
"""Tango transport."""
def __init__(
self,
controller_api: ControllerAPI,
loop: asyncio.AbstractEventLoop,
options: TangoOptions | None = None,
):
self._options = options or TangoOptions()
self._dsr = TangoDSR(controller_api, loop)
@property
def options(self) -> TangoOptions:
return self._options
def create_docs(self) -> None:
raise NotImplementedError
def create_gui(self) -> None:
raise NotImplementedError
async def serve(self) -> None:
coro = asyncio.to_thread(
self._dsr.run,
self.options.dsr,
)
await coro