Source code for tickit.core.components.system_simulation
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Type
import pydantic.v1.dataclasses
from tickit.core.components.component import BaseComponent, Component, ComponentConfig
from tickit.core.management.event_router import InverseWiring
from tickit.core.management.schedulers.slave import SlaveScheduler
from tickit.core.runner import run_all
from tickit.core.state_interfaces.state_interface import StateConsumer, StateProducer
from tickit.core.typedefs import Changes, ComponentID, ComponentPort, PortID, SimTime
from tickit.utils.topic_naming import output_topic
LOGGER = logging.getLogger(__name__)
[docs]@dataclass
class SystemSimulationComponent(BaseComponent):
"""A component containing a slave scheduler and several components.
A component which acts as a nested tickit simulation by wrapping a slave scheduler
and a set of internal components, this component delegates core behaviour to the
components within it, whilst outputting their requests for wakeups and interrupts.
"""
name: ComponentID
#: A list of immutable component configuration data containers, used to
#: construct internal components.
components: List[ComponentConfig]
#: A mapping of outputs which the system simulation exposes and the
#: corresponding output of an internal component.
expose: Dict[PortID, ComponentPort]
_tasks: List[asyncio.Task] = field(default_factory=list)
[docs] async def run_forever(
self, state_consumer: Type[StateConsumer], state_producer: Type[StateProducer]
) -> None:
"""Sets up state interfaces, the scheduler, and components to run continuously.
An asynchronous method starts the run_forever method of each component, runs
the scheduler, and sets up externally facing state interfaces. The method
blocks until and of the components or the scheduler complete.
"""
inverse_wiring = InverseWiring.from_component_configs(self.components)
self.scheduler = SlaveScheduler(
inverse_wiring,
state_consumer,
state_producer,
self.expose,
self.raise_interrupt,
)
self._tasks = run_all(
component().run_forever(state_consumer, state_producer)
for component in self.components
) + run_all([self.scheduler.run_forever()])
await super().run_forever(state_consumer, state_producer)
if self._tasks:
await asyncio.wait(self._tasks)
[docs] async def on_tick(self, time: SimTime, changes: Changes) -> None:
"""Delegates core behaviour to the slave scheduler.
An asynchronous method which delegates core behaviour of computing changes and
determining a callback period to the slave scheduler and sends the resulting
Output.
Args:
time (SimTime): The current simulation time (in nanoseconds).
changes (Changes): A mapping of changed component inputs and their new
values.
"""
on_tick = asyncio.create_task(self.scheduler.on_tick(time, changes))
error_state = asyncio.create_task(self.scheduler.error.wait())
done, _ = await asyncio.wait(
[on_tick, error_state],
return_when=asyncio.tasks.FIRST_COMPLETED,
)
if error_state in done:
await self.state_producer.produce(
output_topic(self.name),
self.scheduler.component_error,
)
else:
output_changes, call_in = on_tick.result()
await self.output(time, output_changes, call_in)
[docs] async def stop_component(self) -> None:
"""Cancel all pending tasks associated with the System Simulation component.
Cancels long running adapter tasks associated with the component.
"""
LOGGER.debug(f"Stopping {self.name}")
for task in self._tasks:
task.cancel()
[docs]@pydantic.v1.dataclasses.dataclass
class SystemSimulation(ComponentConfig):
"""Simulation of a nested set of components."""
name: ComponentID
components: List[ComponentConfig]
expose: Dict[PortID, ComponentPort]
def __call__(self) -> Component: # noqa: D102
return SystemSimulationComponent(
name=self.name,
components=self.components,
expose=self.expose,
)