API#
tickit
#
This is the internal API reference for tickit
- tickit.__version__: str#
Version number as calculated by pypa/setuptools_scm
tickit.core
#
tickit.core.adapter
#
- class tickit.core.adapter.T#
Message type
alias of TypeVar(‘T’)
- class tickit.core.adapter.RaiseInterrupt(*args, **kwargs)[source]#
A raise_interrupt function that should be passed the adapter.
- class tickit.core.adapter.AdapterContainer(adapter: A, io: AdapterIo)[source]#
A container for an object specific adapter and the required functional io.
- async run_forever(raise_interrupt: RaiseInterrupt) None [source]#
An asynchronous method allowing indefinite running of core adapter logic.
An asynchronous method allowing for indefinite running of core adapter logic (typically the hosting of a protocol server and the interpretation of commands which are supplied via it).
- class tickit.core.adapter.Interpreter[source]#
An interface for types which handle messages received by an adapter.
- abstract async handle(message: T) Tuple[AsyncIterable[T], bool] [source]#
An asynchronous method which handles messages received by an adapter.
An asynchronous method which handles messages received by an adapter, replies are sent as an asynchronous iterable to support setting of continuous readback, stand alone replies should be wrapped in an asynchronous iterable of length one.
tickit.core.device
#
- class tickit.core.device.InMap#
A bound typevar for mappings of device inputs
- class tickit.core.device.OutMap#
A bound typevar for mappings of device outputs
- class tickit.core.device.DeviceUpdate(outputs: OutMap, call_at: SimTime | None)[source]#
An immutable data container for Device outputs and callback request time.
- Parameters:
outputs – A mapping of device output keys and current values.
call_at – The simulation time at which the component requests to be awoken.
- class tickit.core.device.Device[source]#
An interface for types which implement simulated devices.
- abstract update(time: SimTime, inputs: InMap) DeviceUpdate[OutMap] [source]#
Implements device behaviour according to the time and its inputs.
A method which implements the (typically physics based) changes which occur within the device in response to either the progression of time or the alteration of inputs to the device. The method returns the new observable state of the device and may optionally include a time in which the method should be called again.
- Parameters:
time – The current simulation time (in nanoseconds).
inputs – A mapping of device inputs and their values.
tickit.core.typedefs
#
- class tickit.core.typedefs.PortID#
An identifier which specifies the input/output port of a component
alias of
str
- class tickit.core.typedefs.State#
A mapping of PortID to component input/output property
- class tickit.core.typedefs.Changes#
A mapping of the difference between State mappings
- class tickit.core.typedefs.ComponentPort(component: ComponentID, port: PortID)[source]#
An immutable dataclass for custom (de)serialization of component - port pairs.
- class tickit.core.typedefs.Input(target: ComponentID, time: SimTime, changes: Changes)[source]#
An immutable data container for Component inputs.
- Parameters:
target – The Component which is to handle the Input.
time – The simulation time at which the Input was produced and is to be handled.
changes – The changes to the component inputs.
- class tickit.core.typedefs.StopComponent[source]#
An immutable dataclass to register Component shutdown.
- class tickit.core.typedefs.Output(source: ComponentID, time: SimTime, changes: Changes, call_at: SimTime | None)[source]#
An immutable data container for Component outputs.
- Parameters:
source – The Component which produced the Output.
time – The simulation time at which the Output was produced and is to be handled.
changes – The changes to the component outputs.
call_in – The duration after which the component requests to be awoken.
- class tickit.core.typedefs.Skip(source: ComponentID, time: SimTime, changes: Changes)[source]#
An immutable data container for skipping Component Updates.
This mimics a Component output but is produced and consumed by the scheduler for situations where a components inputs has not changed, therefore does not need updating but this skipping needs to propgate through the graph.
- Parameters:
source – The Component whos update will be skipped
time – The simulation time at which the component skipping is to be handled.
changes – The changes to the component outputs, which will always be an empty map.
- class tickit.core.typedefs.Interrupt(source: ComponentID)[source]#
An immutable data container for scheduling Component interrupts.
- Parameters:
component – The Component which is requesting an interrupt.
- class tickit.core.typedefs.ComponentException(source: ComponentID, error: Exception, traceback: str)[source]#
An immutable data container for raising Component exceptions.
- Parameters:
component – The Component which raised an exception.
tickit.core.components
#
tickit.core.components.component
#
- class tickit.core.components.component.Component(name: ComponentID)[source]#
An interface for types which implement stand-alone simulation components.
An interface for types which implement stand-alone simulation components. Components define the top level building blocks of a tickit simulation (examples include the DeviceComponent which host a device and corresponding adapter or a SystemComponent which hosts a NestedScheduler and internal Components).
- abstract async run_forever(state_consumer: Type[StateConsumer], state_producer: Type[StateProducer]) None [source]#
Asynchronous method allowing indefinite running of core logic.
- class tickit.core.components.component.ComponentConfig(name: ComponentID, inputs: Dict[PortID, ComponentPort])[source]#
A data container for component configuration.
A data container for component configuration which acts as a named union of subclasses to facilitate automatic deserialization.
- class tickit.core.components.component.BaseComponent(name: ComponentID)[source]#
A base class for components, implementing state interface related methods.
- async handle_input(message: Input | StopComponent | ComponentException)[source]#
Call on_tick when an input is received.
- Parameters:
message (ComponentInput) – An immutable data container for any message a component receives.
- async output(time: SimTime, changes: Changes, call_at: SimTime | None) None [source]#
Construct and send an Output message to the component output topic.
An asynchronous method which constructs an Output message tagged with the component name and sends it to the output topic of this component.
- async raise_interrupt() None [source]#
Sends an Interrupt message to the component output topic.
An asynchronous method which constructs an Interrupt message tagged with the component name and sends it to the output topic of this component.
- async run_forever(state_consumer: Type[StateConsumer], state_producer: Type[StateProducer]) None [source]#
Creates and configures a state consumer and state producer.
An asynchronous method which creates a state consumer which is subscribed to the input topic of the component and calls back to handle_input, and a state producer to produce Interrupt, Output or ComponentException messages.
tickit.core.components.device_component
#
- class tickit.core.components.device_component.DeviceComponent(name: ~tickit.core.typedefs.ComponentID, device: ~tickit.core.device.Device, adapters: ~typing.List[~tickit.core.adapter.AdapterContainer] = <factory>, _tasks: ~typing.List[~_asyncio.Task] = <factory>)[source]#
A component containing a device and the corresponding adapters.
A component which thinly wraps a device and the corresponding adapters, this component delegates core behaviour to the update method of the device, whilst allowing adapters to raise interrupts.
- async run_forever(state_consumer: Type[StateConsumer], state_producer: Type[StateProducer]) None [source]#
Set up state interfaces, run adapters and blocks until any complete.
- async on_tick(time: SimTime, changes: Changes) None [source]#
Delegates core behaviour to the device and calls adapter on_update.
An asynchronous method which updates device inputs according to external changes, delegates core behaviour to the device update method, informs Adapters of the update, computes changes to the state of the component and sends the resulting Output.
tickit.core.components.system_component
#
- class tickit.core.components.system_component.SystemComponent(name: ~tickit.core.typedefs.ComponentID, components: ~typing.List[~tickit.core.components.component.ComponentConfig], expose: ~typing.Dict[~tickit.core.typedefs.PortID, ~tickit.core.typedefs.ComponentPort], adapter: ~tickit.core.adapter.AdapterContainer[~tickit.adapters.system.BaseSystemSimulationAdapter] | None = None, _tasks: ~typing.List[~_asyncio.Task] = <factory>)[source]#
A component containing a nested scheduler and several components.
A component which acts as a nested tickit simulation by wrapping a NestedScheduler and a set of internal components, this component delegates core behaviour to the components within it, whilst outputting their requests for wakeups and interrupts.
- components: List[ComponentConfig]#
A list of immutable component configuration data containers, used to construct internal components.
- expose: Dict[PortID, ComponentPort]#
A mapping of outputs which the system simulation exposes and the corresponding output of an internal component.
- async run_forever(state_consumer: Type[StateConsumer], state_producer: Type[StateProducer]) None [source]#
Sets up state interfaces, the scheduler, and components to run continuously.
An asynchronous method starts the run_forever method of each component, runs the scheduler, and sets up externally facing state interfaces. The method blocks until and of the components or the scheduler complete.
- class tickit.core.components.system_component.SystemSimulation(name: ComponentID, inputs: Dict[PortID, ComponentPort], components: List[ComponentConfig], expose: Dict[PortID, ComponentPort], type: Literal['tickit.core.components.system_component.SystemSimulation'] = 'tickit.core.components.system_component.SystemSimulation')[source]#
Simulation of a nested set of components.
tickit.core.management
#
tickit.core.management.schedulers
#
tickit.core.management.schedulers.base
#
- class tickit.core.management.schedulers.base.BaseScheduler(wiring: Wiring | InverseWiring, state_consumer: Type[StateConsumer], state_producer: Type[StateProducer])[source]#
A base scheduler class which implements logic common to all schedulers.
A constructor which stores inputs and creates an empty wakeups mapping.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
state_consumer (Type[StateConsumer]) – The state consumer class to be used by the component.
state_producer (Type[StateProducer]) – The state producer class to be used by the component.
- abstract async schedule_interrupt(source: ComponentID) None [source]#
An abstract asynchronous method which should schedule an interrupt.
When implemented in a child this method should schedule an interrupt immediately.
- Parameters:
source (ComponentID) – The component which should be updated.
- async update_component(input: Input) None [source]#
Sends a message with an input to the input topic of a component.
- Parameters:
input (Input) – The input to be included in the message sent to the component.
- async skip_component(skip: Skip) None [source]#
Sends a message to itself that a given component update has been skipped.
- Parameters:
skip (Skip) – The Skip information to be included in the message sent to the scheduler.
- async handle_message(message: Interrupt | Output | ComponentException | Skip) None [source]#
Handle messages received by the state consumer.
An asynchronous callback which handles Interrupt, Output, ComponentException and Skip messages received by the state consumer. For Outputs, changes are propagated and wakeups scheduled if required. Skips are also propagated. For interrupts handling is deferred. For exceptions, a StopComponent message is produced to each component in the system to facilitate shut down.
- Parameters:
message (Union[ComponentOutput, Skip]) – An Interrupt, Output or ComponentException received by the state consumer.
- async setup() None [source]#
Instantiates and configures the ticker and state interfaces.
An asynchronous setup method which creates a ticker, a state consumer which is subscribed to the output topics of each component in the system, a state producer to produce component inputs.
- add_wakeup(component: ComponentID, when: SimTime) None [source]#
Adds a wakeup to the mapping.
- Parameters:
component (ComponentID) – The component which should be updated.
when (SimTime) – The simulation time at which the update should occur.
- get_first_wakeups() Tuple[Set[ComponentID], SimTime | None] [source]#
Gets the components which are due for update first and the wakeup time.
A method which returns a set of components which are due for update first and the simulation time at which the updates are scheduled. Or an empty set and None if no wakeups are scheduled.
- Returns:
- A tuple containing the set of
components which are scheduled for the first wakeup and the time at which the wakeup should occur. Or an empty set and None if no wakeups are scheduled.
- Return type:
Tuple[Set[ComponentID], Optional[SimTime]]
- async handle_component_exception(message: ComponentException) None [source]#
Handle exceptions raised from components by shutting down the simulation.
If a component produces an exception, the scheduler will produce a message to all components in the simulation to cause them to cancel any running component tasks. After which the scheduler shuts itself down.
tickit.core.management.schedulers.master
#
- class tickit.core.management.schedulers.master.MasterScheduler(wiring: Wiring | InverseWiring, state_consumer: Type[StateConsumer], state_producer: Type[StateProducer], initial_time: int = 0, simulation_speed: float = 1.0)[source]#
A master scheduler which orchestrates the running of a tickit simulation.
A master scheduler constructor which stores values for reference.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
state_consumer (Type[StateConsumer]) – The state consumer class to be used by the component.
state_producer (Type[StateProducer]) – The state producer class to be used by the component.
initial_time (int) – The initial time of the simulation (in nanoseconds). Defaults to 0.
simulation_speed (float) – The rate at which simulation time progresses with respect to real world time. Defaults to 1.0.
- async setup() None [source]#
Performs base setup and creates an awaitable flag to indicate new wakeups.
- add_wakeup(component: ComponentID, when: SimTime) None [source]#
Adds a wakeup to the priority queue and sets an awaitable flag.
A method which adds a wakeup to the priority queue and sets a awaitable flag indicating that a new wakeup has been added.
- Parameters:
component (ComponentID) – The component which should be updated.
when (SimTime) – The simulation time at which the update should occur.
- async run_forever() None [source]#
Perform initial tick then continuously schedule ticks according to wakeups.
An asynchronous method which initially performs setup, sets the event to signify the simulation is running, and performs an initial tick in which all components are updated. Subsequent ticks are performed as requested by components of the simulation according to the simulation speed.
- async schedule_interrupt(source: ComponentID) None [source]#
Schedules the interrupt of a component immediately.
An asynchronous method which implements the superclass abstract, an interrupt is scheduled as an immediate wakeup. This is achieved by giving the wakeup a simulation time equal to the simulation time of the last tick plus the real world time which has passed, adjusted by the simulation speed.
- Parameters:
source (ComponentID) – The source component which should be updated.
- sleep_time(when: SimTime) float [source]#
Computes the real world time until a specified simulation time.
- async handle_component_exception(message: ComponentException) None [source]#
Handle exceptions raised from components by shutting down the simulation.
If a component produces an exception, the scheduler will produce a message to all components in the simulation to cause them to cancel any running component tasks. After which the scheduler shuts itself down.
tickit.core.management.schedulers.nested
#
- class tickit.core.management.schedulers.nested.NestedScheduler(wiring: Wiring | InverseWiring, state_consumer: Type[StateConsumer], state_producer: Type[StateProducer], expose: Dict[PortID, ComponentPort], raise_interrupt: Callable[[], Awaitable[None]])[source]#
A scheduler which orchestrates nested tickit simulations.
NestedScheduler constructor which adds wiring and saves values for reference.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
state_consumer (Type[StateConsumer]) – The state consumer class to be used by the component.
state_producer (Type[StateProducer]) – The state producer class to be used by the component.
expose (Dict[PortID, ComponentPort]) – A mapping of nested scheduler outputs to internal component ports.
raise_interrupt (Callable[[], Awaitable[None]]) – A callback to request that the nested scheduler is updated immediately.
- static add_exposing_wiring(wiring: Wiring | InverseWiring, expose: Dict[PortID, ComponentPort]) InverseWiring [source]#
Adds wiring to expose nested scheduler outputs.
Adds wiring to expose nested scheduler outputs, this is performed creating a mock “expose” component with inverse wiring set by expose.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
expose (Dict[PortID, ComponentPort]) – A mapping of nested scheduler outputs to internal component ports.
- Returns:
An inverse wiring object representing the connections between components in the system and the “expose” component which acts as the nested scheduler output.
- Return type:
- async update_component(input: Input) None [source]#
Sends an input to the target component. Mocks I/O for “external” or “expose”.
For real components the input is sent in a message to their input topic, for the mock component named “external”, external inputs are injected, whilst for the mock component and named “expose” the input is stored for use as the scheduler output.
- Parameters:
input (Input) – The input message to be sent to the component.
- async on_tick(time: SimTime, changes: Changes) Tuple[Changes, SimTime | None] [source]#
Routes inputs, does a tick and returns output changes and a callback time.
An asynchronous method which determines which components within the simulation require being woken up, sets the input changes for use by the “external” mock component, performs a tick, determines the period in which the nested scheduler should next be updated, and returns the changes collated by the “expose” mock component.
- Parameters:
- Returns:
A tuple of a mapping of the changed exposed outputs and their new values and optionally a duration in simulation time after which the nested scheduler should be called again.
- Return type:
- async run_forever() None [source]#
Delegates to setup which instantiates the ticker and state interfaces.
- async schedule_interrupt(source: ComponentID) None [source]#
Schedules the interrupt of a component immediately.
An asynchronous method which schedules an interrupt immediately by adding it to a set of queued interrupts and raising the interrupt to the master scheduler.
- Parameters:
source (ComponentID) – The source component of the interrupt.
- async handle_component_exception(message: ComponentException) None [source]#
Handle exceptions raised from components by shutting down the simulation.
If a component inside a system simulation produces an exception, the nested scheduler will produce a message to all components it contains to cause them to cancel any running component tasks (adapter tasks). Afterwards the scheduler stores the ComponentException message, allowing its associated system simulation component to propagate the exception to the master scheduler.
tickit.core.management.event_router
#
- tickit.core.management.event_router.Default_Wiring_Struct#
A mapping of component output ports to component input ports with defaults
alias of
DefaultDict
[ComponentID
,DefaultDict
[PortID
,Set
[ComponentPort
]]]
- tickit.core.management.event_router.Wiring_Struct#
A mapping of component output ports to component input ports
alias of
Dict
[ComponentID
,Dict
[PortID
,Set
[ComponentPort
]]]
- tickit.core.management.event_router.Default_InverseWiring_Struct#
A mapping of component input ports to component output ports with defaults
alias of
DefaultDict
[ComponentID
,DefaultDict
[PortID
,ComponentPort
]]
- tickit.core.management.event_router.Inverse_Wiring_Struct#
A mapping of component input ports to component output ports
alias of
Dict
[ComponentID
,Dict
[PortID
,ComponentPort
]]
- class tickit.core.management.event_router.Wiring(wiring: Dict[ComponentID, Dict[PortID, Set[ComponentPort]]] | None = None)[source]#
A mapping of component output ports to component input ports with defaults.
A mapping of component output ports to component input ports, used to represent the connections between components in a system. Defaults are generated in case of missing mapping keys; for an unknown component, an empty output port mapping is created, whilst for an unknown output port an empty input port set is created.
A constructor which adds defaults to a wiring struct if provided.
A constructor which adds defaults to a wiring struct if provided, otherwise an empty default dict is created.
- Parameters:
wiring (Optional[Wiring_Struct]) – An optional wiring struct. Defaults to None.
- classmethod from_inverse_wiring(inverse_wiring: InverseWiring) Wiring [source]#
Un-inverts an inverse wiring object to produce a Wiring instance.
- Parameters:
inverse_wiring (InverseWiring) – A mapping of component input ports to component output ports.
- Returns:
A mapping of component output ports to component input ports.
- Return type:
- class tickit.core.management.event_router.InverseWiring(wiring: Dict[ComponentID, Dict[PortID, ComponentPort]] | None = None)[source]#
A mapping of component input ports to component output ports.
A mapping of component input ports to component output ports, used to represent the connections between components in a system. When either an unknown input component or an unknown input component port is requested a KeyError is raised.
A constructor which adds defaults to an inverse wiring struct if provided.
A constructor which adds defaults to an inverse wiring struct if provided, otherwise an empty default dict is created.
- Parameters:
wiring (Optional[Inverse_Wiring_Struct]) – An optional inverse wiring struct. Defaults to None.
- classmethod from_wiring(wiring: Wiring) InverseWiring [source]#
Inverts a Wiring object to create an InverseWiring instance.
- Parameters:
wiring (Wiring) – A mapping of component output ports to component input ports.
- Returns:
A mapping of component input ports to component output ports.
- Return type:
- classmethod from_component_configs(configs: Iterable[ComponentConfig]) InverseWiring [source]#
Creates an InverseWiring instance from an iterable of ComponentConfigs.
- Parameters:
configs (Iterable[ComponentConfig]) – An iterable of component config data containers.
- Returns:
A mapping of component input ports to component output ports.
- Return type:
- class tickit.core.management.event_router.EventRouter(wiring: Wiring)[source]#
- class tickit.core.management.event_router.EventRouter(wiring: InverseWiring)
A utility class responsible for routing changes between components.
Construct an EventRouter.
An EventRouter stores possible inverse wiring for use in utilities.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
- property wiring: Wiring#
The cached wiring used by the event router.
- Returns:
The wiring used by the event router.
- Return type:
- property components: Set[ComponentID]#
A cached set of all components in the wiring.
- Returns:
A set of all components in the wiring.
- Return type:
Set[ComponentID]
- property output_components: Set[ComponentID]#
A cached set of components which provide outputs.
- Returns:
A set of components which provide outputs.
- Return type:
Set[ComponentID]
- property input_components: Set[ComponentID]#
A cached set of components which receive inputs.
- Returns:
A set of components which receive inputs.
- Return type:
Set[ComponentID]
- property isolated_components: Set[ComponentID]#
A cached set of components without inputs or outputs.
- Returns:
A set of components which are isolated
- Return type:
Set[ComponentID]
- property component_tree: Dict[ComponentID, Set[ComponentID]]#
A cached mapping of first order component dependants.
A cached property which returns a mapping of components to the set of components which are wired to any of its outputs.
- Returns:
A mapping of components to the set of components which are wired to any of its outputs.
- Return type:
Dict[ComponentID, Set[ComponentID]]
- property inverse_component_tree: Dict[ComponentID, Set[ComponentID]]#
A cached mapping of first order component dependencies.
A cached property which returns a mapping of components to the set of components which are wired to any of its inputs.
- Returns:
A mapping of components to the set of components which are wired to any of its inputs.
- Return type:
Dict[ComponentID, Set[ComponentID]]
- dependants(root: ComponentID) Set[ComponentID] [source]#
Finds the set of root component dependents .
Finds the set of all components which are recursively dependant on the root component.
- Parameters:
root (ComponentID) – The root component.
- Returns:
A set of all components which are recursively dependant on the root component.
- Return type:
Set[ComponentID]
- route(source: ComponentID, changes: Mapping[PortID, object]) DefaultDict[ComponentID, Dict[PortID, object]] [source]#
Generates a mapping of input changes resulting from output changes.
A method which generates a mapping of input changes which result from the propagation of output changes according to the wiring.
- Parameters:
source (ComponentID) – The source component of the output changes.
changes (Mapping[PortID, Hashable]) – A mapping of changes to the outputs of the source component.
- Returns:
A mapping of the input changes which result from the propagation of output changes according to the wiring.
- Return type:
DefaultDict[ComponentID, Dict[PortID, Hashable]]
tickit.core.management.ticker
#
- class tickit.core.management.ticker.Ticker(wiring: Wiring | InverseWiring, update_component: Callable[[Input], Coroutine[Any, Any, None]], skip_component: Callable[[Skip], Coroutine[Any, Any, None]])[source]#
Utility class responsible for sequencing the update of components during a tick.
A utility class responsible for sequencing the update of components during a tick by eagerly updating each component which has had all of its dependencies resolved.
Ticker constructor which creates an event router and performs initial setup.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
update_component (Callable[[Input], Awaitable[None]]) – A function or method which may be called to request a component performs and update, such updates should result in a subsequent call to the propagate method of the ticker.
See also
- async schedule_possible_updates() None [source]#
Update components with resolved dependencies.
An asynchronous method which schedules updates for components with resolved dependencies, as determined by the intersection of the components first order dependencies and the set of components which still require an update.
- async propagate(output: Output | Skip) None [source]#
Propagates the output of an updated component.
An asynchronous message which propagates the output of an updated component by removing the component from the set of components requiring update, adding the routed inputs to the accumulator, and scheduling any possible updates. If no components require update the finished flag will be set.
- Parameters:
output (Output) – The output produced by the update of a component.
- property components: Set[ComponentID]#
The set of all components in the wiring.
- Returns:
The set of all components in the wiring.
- Return type:
Set[ComponentID]
tickit.core.state_interfaces
#
tickit.core.state_interfaces.internal
#
- class tickit.core.state_interfaces.internal.C#
A consumable value
alias of TypeVar(‘C’)
- class tickit.core.state_interfaces.internal.P#
A producible value
alias of TypeVar(‘P’)
- class tickit.core.state_interfaces.internal.Message(value: Any)[source]#
An immutable data container for internal messages.
Create new instance of Message(value,)
- class tickit.core.state_interfaces.internal.InternalStateServer(*args: Any, **kwargs: Any)[source]#
A singleton, in memory, publish/subscribe message store and router.
A singleton, in memory, publish/subscribe message store and router. The single instance of this class holds a mapping of topics and a list of messages which have been pushed to them and a mapping of consumers which subscribe to the topics. Upon subscribing to a topic, a consumer is sent all of the messages which were previously registered against the topic. Upon pushing to a topic, the message is added to the store and immediately forwarded to each of the consumers which subscribe to the topic.
- async push(topic: str, message: Message) None [source]#
Asynchronous method which propagates a message to subscribers and stores it.
An asynchronous method which propagates the given message to consumers which subscribe to the topic and stores the message in the topic -> messages mapping.
- async subscribe(consumer: InternalStateConsumer, topics: Iterable[str])[source]#
Subscribes the consumer to the given topics.
An asynchronous method which adds a consumer to the subscriber list of each topic in the topics iterable. On subscription, previous messages on the topic are immediately passed to the consumer.
- Parameters:
consumer (InternalStateConsumer) – The consumer which is subscribing to the topic.
topics (Iterable[str]) – The topic which the consumer is to be subscribed to.
- create_topic(topic: str) None [source]#
Creates a new topic as an empty message list.
- Parameters:
topic (str) – The topic to be created.
- class tickit.core.state_interfaces.internal.InternalStateConsumer(callback: Callable[[C], Awaitable[None]])[source]#
An internal, singleton based, implementation of the StateConsumer protocol.
A internal, singleton based, implementation of the StateConsumer protocol, this consumer can subscribe to InternalStateServer topics, upon receiving a message the consumer passes the value to the callback function passed during initialization, if a topic is subscribed to which does not yet exist it is created.
Gets an instance of the InternalStateServer for use in subscribe.
- Parameters:
callback (Callable[[C], Awaitable[None]]) – An asynchronous handler function for consumed values.
- class tickit.core.state_interfaces.internal.InternalStateProducer[source]#
An internal, singleton based, implementation of the StateProducer protocol.
An internal, singleton based, implementation of the StateProducer protocol, this producer can produce a value to a InternalStateServer topic, if the topic does not yet exist it is created.
Gets an instance of the InternalStateServer for use in produce.
tickit.core.state_interfaces.kafka
#
- class tickit.core.state_interfaces.kafka.C#
A consumable value
alias of TypeVar(‘C’)
- class tickit.core.state_interfaces.kafka.P#
A producible value
alias of TypeVar(‘P’)
- class tickit.core.state_interfaces.kafka.KafkaStateConsumer(callback: Callable[[C], Awaitable[None]])[source]#
A kafka implementation of the StateConsumer protocol.
A kafka implementation of the StateConsumer protocol, this consumer can subscribe to kafka topics, upon receiving a message the consumer passes the value to the callback function passed during initialization, if a topic is subscribed to which does not yet exist it is created.
Creates an AIOKafka Consumer and begins consuming subscribed topics.
- Parameters:
callback (Callable[[C], Awaitable[None]]) – An asynchronous handler function for consumed values.
- class tickit.core.state_interfaces.kafka.KafkaStateProducer[source]#
A kafka implementation of the StateProducer protocol.
A kafka implementation of the StateProducer protocol, this producer can produce a value to a kafka topic, if the topic does not yet exist it is created.
Creates and starts and AIOKafka Producer.
tickit.core.state_interfaces.state_interface
#
- class tickit.core.state_interfaces.state_interface.C#
A consumable value
alias of TypeVar(‘C’, covariant=True)
- class tickit.core.state_interfaces.state_interface.P#
A producible value
alias of TypeVar(‘P’, contravariant=True)
- class tickit.core.state_interfaces.state_interface.StateConsumer(*args, **kwargs)[source]#
An interface for types which implement publish/subscribe message consumers.
An interface for types which implement publish/subscribe message consumers, the consumer must be able to subscribe to topics within the messaging framework, upon receiving a message the consumer should pass the value to the callback function, if a topic is subscribed to which does not yet exist it should be created.
- class tickit.core.state_interfaces.state_interface.StateProducer(*args, **kwargs)[source]#
An interface for types which implement publish/subscribe message producers.
An interface for types which implement publish/subscribe message producers, the producer must be able to produce a value to a topic within the messaging framework, if the topic does not yet exist it should be created.
- class tickit.core.state_interfaces.state_interface.StateInterface#
The union of StateConsumer and StateProducer
alias of TypeVar(‘StateInterface’, bound=
Union
[StateConsumer
,StateProducer
])
- tickit.core.state_interfaces.state_interface.add(name: str, external: bool) Callable[[Type[StateInterface]], Type[StateInterface]] [source]#
A decorator to add a StateInterface to the registry.
A decorator to add a StateInterface to the registry of StateConsumers or StateProducer according to its signature. StateConsumers and StateProducers which are intended to work together should be added with the same name.
- tickit.core.state_interfaces.state_interface.interfaces(external: bool = False) Set[str] [source]#
Returns interface names for which both a StateConsumer and StateProducer exist.
Gets a set of interface names for which both a StateConsumer and StateProducer exist. The external option may be used to restrict interfaces to those which may be used in simulations which are distributed across multiple processes.
- tickit.core.state_interfaces.state_interface.satisfy_externality(external: bool, interfaces: Dict[str, Tuple[Type[StateInterface], bool]]) Set[str] [source]#
Finds which interfaces satisfy the externality requirement provided.
- Parameters:
external (bool) – If true, only interfaces which can be used in simulations which are distributed across processes are returned. If false, all interfaces are returned.
interfaces (Dict[str, Tuple[Type[StateInterface], bool]]) – A mapping of interface names to a tuple of their class and their externality flag.
- Returns:
A set of interface names which satisfy the externality requirement.
- Return type:
Set[str]
- tickit.core.state_interfaces.state_interface.get_interface(name: str) Tuple[Type[StateConsumer], Type[StateProducer]] [source]#
Get the StateConsumer and StateProducer classes for a given interface name.
- Parameters:
name (str) – The name of the interface to be retrieved.
- Returns:
A tuple of the StateConsumer and StateProducer classes.
- Return type:
Tuple[Type[StateConsumer], Type[StateProducer]]
tickit.devices
#
tickit.devices.iobox
#
- class tickit.devices.iobox.IoBoxDevice[source]#
A simple device which can take and store key-value pairs from both network adapters and the ticket graph.
Adapters should write values to the device via device.write(addr, value) or device[addr] = value. The writes will be pending until the scheduler interrupts the device. For example:
`python device[foo] >> 5 device[foo] = 6 device[foo] >> 5 interrupt() device[foo] >> 6 `
Linked devices can send values to be written via inputs and receive changes via outputs. For example:
`python update = box.update(SimTime(0), {"updates": [(4, "foo")]}) assert update.outputs["updates"] == [(4, "foo")] >> 6 `
The two modes of I/O may used independently, optionally and interoperably.
This device is useful for simulating a basic block of memory. The envisioned use of this class is where you wish to simulate a network interface but the internal hardware logic is either not needed or very simple. A custom adapter can be made for the network interface and can simply read and write to an IoBox.
- write(addr: A, value: V) None [source]#
Write a value to an address.
The value will only make it into memory when update() is called e.g. by an interrupt.
- Parameters:
addr (A) – Address to store value
value (V) – Value to store
- read(addr: A) V [source]#
Read a value from an address.
- Parameters:
addr (A) – Address to find value
- Returns:
Value at address
- Return type:
V
- Raises:
ValueError – If no value stored at address
- update(time: SimTime, inputs: Inputs) DeviceUpdate[Outputs] [source]#
Write all pending values to their addresses.
- Parameters:
time (SimTime) – (Simulated) time at which this is called
inputs (Inputs) – Inputs to this device, may contain addresses and values to update
- Returns:
- Outputs and update, may contain addresses
and values that have been updated, either via inputs or an adapter
- Return type:
DeviceUpdate[Outputs]
- class tickit.devices.iobox.IoBox(name: ComponentID, inputs: Dict[PortID, ComponentPort], type: Literal['tickit.devices.iobox.IoBox'] = 'tickit.devices.iobox.IoBox')[source]#
Arbitrary box of key-value pairs.
tickit.devices.sink
#
- class tickit.devices.sink.SinkDevice[source]#
A simple device which can take any input and produces no output.
- update(time: SimTime, inputs: Inputs) DeviceUpdate[Outputs] [source]#
The update method which logs the inputs and produces no outputs.
- Parameters:
- Returns:
The produced update event which never contains any changes, and never requests a callback.
- Return type:
DeviceUpdate[Outputs]
- class tickit.devices.sink.Sink(name: ComponentID, inputs: Dict[PortID, ComponentPort], type: Literal['tickit.devices.sink.Sink'] = 'tickit.devices.sink.Sink')[source]#
Arbitrary value sink that logs the value.
tickit.devices.source
#
- class tickit.devices.source.SourceDevice(value: Any)[source]#
A simple device which produces a pre-configured value.
A constructor of the source, which takes the pre-configured output value.
- Parameters:
value (Any) – A pre-configured output value.
- update(time: SimTime, inputs: Inputs) DeviceUpdate[Outputs] [source]#
The update method which produces the pre-configured output value.
- Parameters:
- Returns:
The produced update event which contains the pre-configured value, and never requests a callback.
- Return type:
DeviceUpdate[Outputs]
- class tickit.devices.source.Source(name: ComponentID, inputs: Dict[PortID, ComponentPort], value: Any, type: Literal['tickit.devices.source.Source'] = 'tickit.devices.source.Source')[source]#
Source of a fixed value.
tickit.utils
#
tickit.utils.byte_format
#
tickit.utils.configuration
#
tickit.utils.configuration.tagged_union
#
tickit.utils.configuration.loading
#
- tickit.utils.configuration.loading.read_configs(config_path) List[ComponentConfig] [source]#
A utility function which reads and deserializes configs.
A utility function which reads config files, performs yaml deserialization, loads the required internal and external modules then subsequently performs apischema deserialization to produce a list of component configuration objects.
tickit.utils.singleton
#
tickit.utils.topic_naming
#
- tickit.utils.topic_naming.valid_component_id(component: ComponentID) None [source]#
Checks the validity of a ComponentID for generating a topic name.
- Parameters:
component (ComponentID) – The ComponentID to check.
- Raises:
ValueError – The component name is empty.
- tickit.utils.topic_naming.output_topic(component: ComponentID) str [source]#
Creates the output topic name for a given component.
- Parameters:
component (ComponentID) – The component for which an output topic name is required.
- Returns:
The output topic name of the component.
- Return type:
- tickit.utils.topic_naming.input_topic(component: ComponentID) str [source]#
Creates returns the input topic name for a given component.
- Parameters:
component (ComponentID) – The component for which an input topic name is required.
- Returns:
The input topic name of the component.
- Return type: