Source code for fastcs.transports.graphql.transport

import asyncio
from dataclasses import dataclass, field

from fastcs.transports.controller_api import ControllerAPI
from fastcs.transports.transport import Transport

from .graphql import GraphQLServer
from .options import GraphQLServerOptions


[docs] @dataclass class GraphQLTransport(Transport): """GraphQL transport.""" graphql: GraphQLServerOptions = field(default_factory=GraphQLServerOptions) def connect( self, controller_api: ControllerAPI, loop: asyncio.AbstractEventLoop, ): self._server = GraphQLServer(controller_api) async def serve(self) -> None: await self._server.serve(self.graphql) def __repr__(self) -> str: return f"GraphQLTransport({self.graphql.host}:{self.graphql.port})"