Source code for fastcs.transport.epics.pva.transport
import asyncio
from dataclasses import dataclass, field
from fastcs.controller_api import ControllerAPI
from fastcs.logging import logger as _fastcs_logger
from fastcs.transport.epics.docs import EpicsDocs
from fastcs.transport.epics.gui import PvaEpicsGUI
from fastcs.transport.epics.options import (
EpicsDocsOptions,
EpicsGUIOptions,
EpicsIOCOptions,
)
from fastcs.transport.transport import Transport
from .ioc import P4PIOC
logger = _fastcs_logger.bind(logger_name=__name__)
[docs]
@dataclass
class EpicsPVATransport(Transport):
"""PV access transport."""
docs: EpicsDocsOptions = field(default_factory=EpicsDocsOptions)
gui: EpicsGUIOptions = field(default_factory=EpicsGUIOptions)
pva_ioc: EpicsIOCOptions = field(default_factory=EpicsIOCOptions)
def initialise(
self,
controller_api: ControllerAPI,
loop: asyncio.AbstractEventLoop,
) -> None:
self._controller_api = controller_api
self._pv_prefix = self.pva_ioc.pv_prefix
self._ioc = P4PIOC(self.pva_ioc.pv_prefix, controller_api)
async def serve(self) -> None:
logger.info("Running IOC", pv_prefix=self._pv_prefix)
await self._ioc.run()
def create_docs(self) -> None:
EpicsDocs(self._controller_api).create_docs(self.docs)
def create_gui(self) -> None:
PvaEpicsGUI(self._controller_api, self._pv_prefix).create_gui(self.gui)
def __repr__(self):
return f"EpicsPVATransport({self._pv_prefix})"