Source code for fastcs.transport.graphql.transport
import asyncio
from dataclasses import dataclass, field
from fastcs.controller_api import ControllerAPI
from fastcs.transport.transport import Transport
from .graphql import GraphQLServer
from .options import GraphQLServerOptions
[docs]
@dataclass
class GraphQLTransport(Transport):
"""GraphQL transport."""
graphql: GraphQLServerOptions = field(default_factory=GraphQLServerOptions)
def initialise(
self,
controller_api: ControllerAPI,
loop: asyncio.AbstractEventLoop,
):
self._server = GraphQLServer(controller_api)
async def serve(self) -> None:
await self._server.serve(self.graphql)
def __repr__(self) -> str:
return f"GraphQLTransport({self.graphql.host}:{self.graphql.port})"