API#
tickit
#
This is the internal API reference for tickit
- tickit.__version__: str#
Version number as calculated by pypa/setuptools_scm
tickit.core
#
tickit.core.adapter
#
- class tickit.core.adapter.T#
Message type
alias of TypeVar(‘T’)
- class tickit.core.adapter.RaiseInterrupt(*args, **kwargs)[source]#
A raise_interrupt function that should be passed to
Adapter
.
- class tickit.core.adapter.Adapter[source]#
An interface for types which implement device adapters.
- async run_forever(device: Device, raise_interrupt: RaiseInterrupt) None [source]#
An asynchronous method allowing indefinite running of core adapter logic.
An asynchronous method allowing for indefinite running of core adapter logic (typically the hosting of a protocol server and the interpretation of commands which are supplied via it).
- class tickit.core.adapter.Interpreter[source]#
An interface for types which handle messages received by an adapter.
- abstract async handle(adapter: Adapter, message: T) Tuple[AsyncIterable[T], bool] [source]#
An asynchronous method which handles messages received by an adapter.
An asynchronous method which handles messages received by an adapter, replies are sent as an asynchronous iterable to support setting of continuous readback, stand alone replies should be wrapped in an asynchronous iterable of length one.
- class tickit.core.adapter.Server[source]#
An interface for types which implement an external messaging protocol.
tickit.core.device
#
- class tickit.core.device.InMap#
A bound typevar for mappings of device inputs
- class tickit.core.device.OutMap#
A bound typevar for mappings of device outputs
- class tickit.core.device.DeviceUpdate(outputs: OutMap, call_at: SimTime | None)[source]#
An immutable data container for Device outputs and callback request time.
- Parameters:
outputs – A mapping of device output keys and current values.
call_at – The simulation time at which the component requests to be awoken.
- class tickit.core.device.Device[source]#
An interface for types which implement simulated devices.
- abstract update(time: SimTime, inputs: InMap) DeviceUpdate[OutMap] [source]#
Implements device behaviour according to the time and its inputs.
A method which implements the (typically physics based) changes which occur within the device in response to either the progression of time or the alteration of inputs to the device. The method returns the new observable state of the device and may optionally include a time in which the method should be called again.
- Parameters:
time – The current simulation time (in nanoseconds).
inputs – A mapping of device inputs and their values.
tickit.core.runner
#
- tickit.core.runner.run_all(awaitables: Iterable[Awaitable[None]]) List[Task] [source]#
Asynchronously runs the run_forever method of each lifetime runnable.
Creates and runs an asyncio task for the run_forever method of each lifetime runnable. Calls to the run_forever method are wrapped with an error handler.
- Parameters:
awaitables – An iterable of awaitable objects.
- Returns:
A list of asyncio tasks for the awaitables.
- Return type:
List[asyncio.Task]
- async tickit.core.runner.run_all_forever(awaitables: Iterable[Awaitable[None]]) None [source]#
Asynchronously runs the run_forever method of each lifetime runnable.
Creates and runs an asyncio task for the run_forever method of each lifetime runnable. Calls to the run_forever method are wrapped with an error handler. This function blocks until all run_forever methods have completed.
- Parameters:
awaitables – An iterable of awaitable objects.
tickit.core.typedefs
#
- class tickit.core.typedefs.PortID#
An identifier which specifies the input/output port of a component
alias of
str
- class tickit.core.typedefs.State#
A mapping of PortID to component input/output property
- class tickit.core.typedefs.Changes#
A mapping of the difference between State mappings
alias of
Map
- class tickit.core.typedefs.ComponentPort(component: ComponentID, port: PortID)[source]#
An immutable dataclass for custom (de)serialization of component - port pairs.
- class tickit.core.typedefs.Input(target: ComponentID, time: SimTime, changes: Changes)[source]#
An immutable data container for Component inputs.
- Parameters:
target – The Component which is to handle the Input.
time – The simulation time at which the Input was produced and is to be handled.
changes – The changes to the component inputs.
- class tickit.core.typedefs.StopComponent[source]#
An immutable dataclass to register Component shutdown.
- class tickit.core.typedefs.Output(source: ComponentID, time: SimTime, changes: Changes, call_at: SimTime | None)[source]#
An immutable data container for Component outputs.
- Parameters:
source – The Component which produced the Output.
time – The simulation time at which the Output was produced and is to be handled.
changes – The changes to the component outputs.
call_in – The duration after which the component requests to be awoken.
- class tickit.core.typedefs.Interrupt(source: ComponentID)[source]#
An immutable data container for scheduling Component interrupts.
- Parameters:
component – The Component which is requesting an interrupt.
- class tickit.core.typedefs.ComponentException(source: ComponentID, error: Exception, traceback: str)[source]#
An immutable data container for raising Component exceptions.
- Parameters:
component – The Component which raised an exception.
tickit.core.components
#
tickit.core.components.component
#
- class tickit.core.components.component.Component(name: ComponentID)[source]#
An interface for types which implement stand-alone simulation components.
An interface for types which implement stand-alone simulation components. Components define the top level building blocks of a tickit simulation (examples include the DeviceSimulation which host a device and corresponding adapter or a SystemSimulation which hosts a SlaveScheduler and internal Components).
- abstract async run_forever(state_consumer: Type[StateConsumer], state_producer: Type[StateProducer]) None [source]#
Asynchronous method allowing indefinite running of core logic.
- class tickit.core.components.component.ComponentConfig(name: ComponentID, inputs: Dict[PortID, ComponentPort])[source]#
A data container for component configuration.
A data container for component configuration which acts as a named union of subclasses to facilitate automatic deserialization.
- class tickit.core.components.component.BaseComponent(name: ComponentID)[source]#
A base class for components, implementing state interface related methods.
- async handle_input(message: Input | StopComponent | ComponentException)[source]#
Call on_tick when an input is received.
- Parameters:
message (ComponentInput) – An immutable data container for any message a component receives.
- async output(time: SimTime, changes: Changes, call_at: SimTime | None) None [source]#
Construct and send an Output message to the component output topic.
An asynchronous method which constructs an Output message tagged with the component name and sends it to the output topic of this component.
- async raise_interrupt() None [source]#
Sends an Interrupt message to the component output topic.
An asynchronous method which constructs an Interrupt message tagged with the component name and sends it to the output topic of this component.
- async run_forever(state_consumer: Type[StateConsumer], state_producer: Type[StateProducer]) None [source]#
Creates and configures a state consumer and state producer.
An asynchronous method which creates a state consumer which is subscribed to the input topic of the component and calls back to handle_input, and a state producer to produce Interrupt, Output or ComponentException messages.
tickit.core.components.device_simulation
#
- class tickit.core.components.device_simulation.DeviceSimulation(name: ~tickit.core.typedefs.ComponentID, device: ~tickit.core.device.Device, adapters: ~typing.List[~tickit.core.adapter.Adapter] = <factory>, _tasks: ~typing.List[~_asyncio.Task] = <factory>)[source]#
A component containing a device and the corresponding adapters.
A component which thinly wraps a device and the corresponding adapters, this component delegates core behaviour to the update method of the device, whilst allowing adapters to raise interrupts.
- async run_forever(state_consumer: Type[StateConsumer], state_producer: Type[StateProducer]) None [source]#
Set up state interfaces, run adapters and blocks until any complete.
- async on_tick(time: SimTime, changes: Changes) None [source]#
Delegates core behaviour to the device and calls adapter on_update.
An asynchronous method which updates device inputs according to external changes, delegates core behaviour to the device update method, informs Adapters of the update, computes changes to the state of the component and sends the resulting Output.
tickit.core.components.system_simulation
#
- class tickit.core.components.system_simulation.SystemSimulationComponent(name: ~tickit.core.typedefs.ComponentID, components: ~typing.List[~tickit.core.components.component.ComponentConfig], expose: ~typing.Dict[~tickit.core.typedefs.PortID, ~tickit.core.typedefs.ComponentPort], _tasks: ~typing.List[~_asyncio.Task] = <factory>)[source]#
A component containing a slave scheduler and several components.
A component which acts as a nested tickit simulation by wrapping a slave scheduler and a set of internal components, this component delegates core behaviour to the components within it, whilst outputting their requests for wakeups and interrupts.
- components: List[ComponentConfig]#
A list of immutable component configuration data containers, used to construct internal components.
- expose: Dict[PortID, ComponentPort]#
A mapping of outputs which the system simulation exposes and the corresponding output of an internal component.
- async run_forever(state_consumer: Type[StateConsumer], state_producer: Type[StateProducer]) None [source]#
Sets up state interfaces, the scheduler, and components to run continuously.
An asynchronous method starts the run_forever method of each component, runs the scheduler, and sets up externally facing state interfaces. The method blocks until and of the components or the scheduler complete.
- class tickit.core.components.system_simulation.SystemSimulation(name: ComponentID, inputs: Dict[PortID, ComponentPort], components: List[ComponentConfig], expose: Dict[PortID, ComponentPort], type: Literal['tickit.core.components.system_simulation.SystemSimulation'] = 'tickit.core.components.system_simulation.SystemSimulation')[source]#
Simulation of a nested set of components.
tickit.core.management
#
tickit.core.management.schedulers
#
tickit.core.management.schedulers.base
#
- class tickit.core.management.schedulers.base.BaseScheduler(wiring: Wiring | InverseWiring, state_consumer: Type[StateConsumer], state_producer: Type[StateProducer])[source]#
A base scheduler class which implements logic common to all schedulers.
A constructor which stores inputs and creates an empty wakeups mapping.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
state_consumer (Type[StateConsumer]) – The state consumer class to be used by the component.
state_producer (Type[StateProducer]) – The state producer class to be used by the component.
- abstract async schedule_interrupt(source: ComponentID) None [source]#
An abstract asynchronous method which should schedule an interrupt.
When implemented in a child this method should schedule an interrupt immediately.
- Parameters:
source (ComponentID) – The component which should be updated.
- async update_component(input: Input) None [source]#
Sends a message with an input to the input topic of a component.
- Parameters:
input (Input) – The input to be included in the message sent to the component.
- async handle_message(message: Interrupt | Output | ComponentException) None [source]#
Handle messages received by the state consumer.
An asynchronous callback which handles Interrupt, Output and ComponentException messages received by the state consumer. For Outputs, changes are propagated and wakeups scheduled if required. For interrupts handling is deferred. For exceptions, a StopComponent message is produced to each component in the system to facilitate shut down.
- Parameters:
message (Union[Interrupt, Output, ComponentException]) – An Interrupt, Output or ComponentException received by the state consumer.
- async setup() None [source]#
Instantiates and configures the ticker and state interfaces.
An asynchronous setup method which creates a ticker, a state consumer which is subscribed to the output topics of each component in the system, a state producer to produce component inputs.
- add_wakeup(component: ComponentID, when: SimTime) None [source]#
Adds a wakeup to the mapping.
- Parameters:
component (ComponentID) – The component which should be updated.
when (SimTime) – The simulation time at which the update should occur.
- get_first_wakeups() Tuple[Set[ComponentID], SimTime | None] [source]#
Gets the components which are due for update first and the wakeup time.
A method which returns a set of components which are due for update first and the simulation time at which the updates are scheduled. Or an empty set and None if no wakeups are scheduled.
- Returns:
- A tuple containing the set of
components which are scheduled for the first wakeup and the time at which the wakeup should occur. Or an empty set and None if no wakeups are scheduled.
- Return type:
Tuple[Set[ComponentID], Optional[SimTime]]
- async handle_component_exception(message: ComponentException) None [source]#
Handle exceptions raised from components by shutting down the simulation.
If a component produces an exception, the scheduler will produce a message to all components in the simulation to cause them to cancel any running component tasks. After which the scheduler shuts itself down.
tickit.core.management.schedulers.master
#
- class tickit.core.management.schedulers.master.MasterScheduler(wiring: Wiring | InverseWiring, state_consumer: Type[StateConsumer], state_producer: Type[StateProducer], initial_time: int = 0, simulation_speed: float = 1.0)[source]#
A master scheduler which orchestrates the running of a tickit simulation.
A master scheduler constructor which stores values for reference.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
state_consumer (Type[StateConsumer]) – The state consumer class to be used by the component.
state_producer (Type[StateProducer]) – The state producer class to be used by the component.
initial_time (int) – The initial time of the simulation (in nanoseconds). Defaults to 0.
simulation_speed (float) – The rate at which simulation time progresses with respect to real world time. Defaults to 1.0.
- async setup() None [source]#
Performs base setup and creates an awaitable flag to indicate new wakeups.
- add_wakeup(component: ComponentID, when: SimTime) None [source]#
Adds a wakeup to the priority queue and sets an awaitable flag.
A method which adds a wakeup to the priority queue and sets a awaitable flag indicating that a new wakeup has been added.
- Parameters:
component (ComponentID) – The component which should be updated.
when (SimTime) – The simulation time at which the update should occur.
- async run_forever() None [source]#
Perform initial tick then continuously schedule ticks according to wakeups.
An asynchronous method which initially performs setup, sets the event to signify the simulation is running, and performs an initial tick in which all components are updated. Subsequent ticks are performed as requested by components of the simulation according to the simulation speed.
- async schedule_interrupt(source: ComponentID) None [source]#
Schedules the interrupt of a component immediately.
An asynchronous method which implements the superclass abstract, an interrupt is scheduled as an immediate wakeup. This is achieved by giving the wakeup a simulation time equal to the simulation time of the last tick plus the real world time which has passed, adjusted by the simulation speed.
- Parameters:
source (ComponentID) – The source component which should be updated.
- sleep_time(when: SimTime) float [source]#
Computes the real world time until a specified simulation time.
- async handle_component_exception(message: ComponentException) None [source]#
Handle exceptions raised from components by shutting down the simulation.
If a component produces an exception, the scheduler will produce a message to all components in the simulation to cause them to cancel any running component tasks. After which the scheduler shuts itself down.
tickit.core.management.schedulers.slave
#
- class tickit.core.management.schedulers.slave.SlaveScheduler(wiring: Wiring | InverseWiring, state_consumer: Type[StateConsumer], state_producer: Type[StateProducer], expose: Dict[PortID, ComponentPort], raise_interrupt: Callable[[], Awaitable[None]])[source]#
A slave scheduler which orchestrates nested tickit simulations.
Slave scheduler constructor which adds wiring and saves values for reference.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
state_consumer (Type[StateConsumer]) – The state consumer class to be used by the component.
state_producer (Type[StateProducer]) – The state producer class to be used by the component.
expose (Dict[PortID, ComponentPort]) – A mapping of slave scheduler outputs to internal component ports.
raise_interrupt (Callable[[], Awaitable[None]]) – A callback to request that the slave scheduler is updated immediately.
- static add_exposing_wiring(wiring: Wiring | InverseWiring, expose: Dict[PortID, ComponentPort]) InverseWiring [source]#
Adds wiring to expose slave scheduler outputs.
Adds wiring to expose slave scheduler outputs, this is performed creating a mock “expose” component with inverse wiring set by expose.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
expose (Dict[PortID, ComponentPort]) – A mapping of slave scheduler outputs to internal component ports.
- Returns:
An inverse wiring object representing the connections between components in the system and the “expose” component which acts as the slave scheduler output.
- Return type:
- async update_component(input: Input) None [source]#
Sends an input to the target component. Mocks I/O for “external” or “expose”.
For real components the input is sent in a message to their input topic, for the mock component named “external”, external inputs are injected, whilst for the mock component and named “expose” the input is stored for use as the scheduler output.
- Parameters:
input (Input) – The input message to be sent to the component.
- async on_tick(time: SimTime, changes: Changes) Tuple[Changes, SimTime | None] [source]#
Routes inputs, does a tick and returns output changes and a callback time.
An asynchronous method which determines which components within the simulation require being woken up, sets the input changes for use by the “external” mock component, performs a tick, determines the period in which the slave scheduler should next be updated, and returns the changes collated by the “expose” mock component.
- Parameters:
- Returns:
A tuple of a mapping of the changed exposed outputs and their new values and optionally a duration in simulation time after which the slave scheduler should be called again.
- Return type:
- async run_forever() None [source]#
Delegates to setup which instantiates the ticker and state interfaces.
- async schedule_interrupt(source: ComponentID) None [source]#
Schedules the interrupt of a component immediately.
An asynchronous method which schedules an interrupt immediately by adding it to a set of queued interrupts and raising the interrupt to the master scheduler.
- Parameters:
source (ComponentID) – The source component of the interrupt.
- async handle_component_exception(message: ComponentException) None [source]#
Handle exceptions raised from components by shutting down the simulation.
If a component inside a system simulation produces an exception, the slave scheduler will produce a message to all components it contains to cause them to cancel any running component tasks (adapter tasks). Afterwards the scheduler stores the ComponentException message, allowing its associated system simulation component to propagate the exception to the master scheduler.
tickit.core.management.event_router
#
- tickit.core.management.event_router.Default_Wiring_Struct#
A mapping of component output ports to component input ports with defaults
alias of
DefaultDict
[ComponentID
,DefaultDict
[PortID
,Set
[ComponentPort
]]]
- tickit.core.management.event_router.Wiring_Struct#
A mapping of component output ports to component input ports
alias of
Dict
[ComponentID
,Dict
[PortID
,Set
[ComponentPort
]]]
- tickit.core.management.event_router.Default_InverseWiring_Struct#
A mapping of component input ports to component output ports with defaults
alias of
DefaultDict
[ComponentID
,DefaultDict
[PortID
,ComponentPort
]]
- tickit.core.management.event_router.Inverse_Wiring_Struct#
A mapping of component input ports to component output ports
alias of
Dict
[ComponentID
,Dict
[PortID
,ComponentPort
]]
- class tickit.core.management.event_router.Wiring(wiring: Dict[ComponentID, Dict[PortID, Set[ComponentPort]]] | None = None)[source]#
A mapping of component output ports to component input ports with defaults.
A mapping of component output ports to component input ports, used to represent the connections between components in a system. Defaults are generated in case of missing mapping keys; for an unknown component, an empty output port mapping is created, whilst for an unknown output port an empty input port set is created.
A constructor which adds defaults to a wiring struct if provided.
A constructor which adds defaults to a wiring struct if provided, otherwise an empty default dict is created.
- Parameters:
wiring (Optional[Wiring_Struct]) – An optional wiring struct. Defaults to None.
- classmethod from_inverse_wiring(inverse_wiring: InverseWiring) Wiring [source]#
Un-inverts an inverse wiring object to produce a Wiring instance.
- Parameters:
inverse_wiring (InverseWiring) – A mapping of component input ports to component output ports.
- Returns:
A mapping of component output ports to component input ports.
- Return type:
- class tickit.core.management.event_router.InverseWiring(wiring: Dict[ComponentID, Dict[PortID, ComponentPort]] | None = None)[source]#
A mapping of component input ports to component output ports.
A mapping of component input ports to component output ports, used to represent the connections between components in a system. When either an unknown input component or an unknown input component port is requested a KeyError is raised.
A constructor which adds defaults to an inverse wiring struct if provided.
A constructor which adds defaults to an inverse wiring struct if provided, otherwise an empty default dict is created.
- Parameters:
wiring (Optional[Inverse_Wiring_Struct]) – An optional inverse wiring struct. Defaults to None.
- classmethod from_wiring(wiring: Wiring) InverseWiring [source]#
Inverts a Wiring object to create an InverseWiring instance.
- Parameters:
wiring (Wiring) – A mapping of component output ports to component input ports.
- Returns:
A mapping of component input ports to component output ports.
- Return type:
- classmethod from_component_configs(configs: Iterable[ComponentConfig]) InverseWiring [source]#
Creates an InverseWiring instance from an iterable of ComponentConfigs.
- Parameters:
configs (Iterable[ComponentConfig]) – An iterable of component config data containers.
- Returns:
A mapping of component input ports to component output ports.
- Return type:
- class tickit.core.management.event_router.EventRouter(wiring: Wiring)[source]#
- class tickit.core.management.event_router.EventRouter(wiring: InverseWiring)
A utility class responsible for routing changes between components.
Construct an EventRouter.
An EventRouter stores possible inverse wiring for use in utilities.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
- property wiring: Wiring#
The cached wiring used by the event router.
- Returns:
The wiring used by the event router.
- Return type:
- property components: Set[ComponentID]#
A cached set of all components in the wiring.
- Returns:
A set of all components in the wiring.
- Return type:
Set[ComponentID]
- property output_components: Set[ComponentID]#
A cached set of components which provide outputs.
- Returns:
A set of components which provide outputs.
- Return type:
Set[ComponentID]
- property input_components: Set[ComponentID]#
A cached set of components which receive inputs.
- Returns:
A set of components which receive inputs.
- Return type:
Set[ComponentID]
- property isolated_components: Set[ComponentID]#
A cached set of components without inputs or outputs.
- Returns:
A set of components which are isolated
- Return type:
Set[ComponentID]
- property component_tree: Dict[ComponentID, Set[ComponentID]]#
A cached mapping of first order component dependants.
A cached property which returns a mapping of components to the set of components which are wired to any of its outputs.
- Returns:
A mapping of components to the set of components which are wired to any of its outputs.
- Return type:
Dict[ComponentID, Set[ComponentID]]
- property inverse_component_tree: Dict[ComponentID, Set[ComponentID]]#
A cached mapping of first order component dependencies.
A cached property which returns a mapping of components to the set of components which are wired to any of its inputs.
- Returns:
A mapping of components to the set of components which are wired to any of its inputs.
- Return type:
Dict[ComponentID, Set[ComponentID]]
- dependants(root: ComponentID) Set[ComponentID] [source]#
Finds the set of root component dependents .
Finds the set of all components which are recursively dependant on the root component.
- Parameters:
root (ComponentID) – The root component.
- Returns:
A set of all components which are recursively dependant on the root component.
- Return type:
Set[ComponentID]
- route(source: ComponentID, changes: Mapping[PortID, object]) DefaultDict[ComponentID, Dict[PortID, object]] [source]#
Generates a mapping of input changes resulting from output changes.
A method which generates a mapping of input changes which result from the propagation of output changes according to the wiring.
- Parameters:
source (ComponentID) – The source component of the output changes.
changes (Mapping[PortID, Hashable]) – A mapping of changes to the outputs of the source component.
- Returns:
A mapping of the input changes which result from the propagation of output changes according to the wiring.
- Return type:
DefaultDict[ComponentID, Dict[PortID, Hashable]]
tickit.core.management.ticker
#
- class tickit.core.management.ticker.Ticker(wiring: Wiring | InverseWiring, update_component: Callable[[Input], Coroutine[Any, Any, None]])[source]#
Utility class responsible for sequencing the update of components during a tick.
A utility class responsible for sequencing the update of components during a tick by eagerly updating each component which has had all of its dependencies resolved.
Ticker constructor which creates an event router and performs initial setup.
- Parameters:
wiring (Union[Wiring, InverseWiring]) – A wiring or inverse wiring object representing the connections between components in the system.
update_component (Callable[[Input], Awaitable[None]]) – A function or method which may be called to request a component performs and update, such updates should result in a subsequent call to the propagate method of the ticker.
See also
- async schedule_possible_updates() None [source]#
Update components with resolved dependencies.
An asynchronous method which schedules updates for components with resolved dependencies, as determined by the intersection of the components first order dependencies and the set of components which still require an update.
- async propagate(output: Output) None [source]#
Propagates the output of an updated component.
An asynchronous message which propagates the output of an updated component by removing the component from the set of components requiring update, adding the routed inputs to the accumulator, and scheduling any possible updates. If no components require update the finished flag will be set.
- Parameters:
output (Output) – The output produced by the update of a component.
- property components: Set[ComponentID]#
The set of all components in the wiring.
- Returns:
The set of all components in the wiring.
- Return type:
Set[ComponentID]
tickit.core.state_interfaces
#
tickit.core.state_interfaces.internal
#
- class tickit.core.state_interfaces.internal.C#
A consumable value
alias of TypeVar(‘C’)
- class tickit.core.state_interfaces.internal.P#
A producible value
alias of TypeVar(‘P’)
- class tickit.core.state_interfaces.internal.Message(value: Any)[source]#
An immutable data container for internal messages.
Create new instance of Message(value,)
- class tickit.core.state_interfaces.internal.InternalStateServer(*args: Any, **kwargs: Any)[source]#
A singleton, in memory, publish/subscribe message store and router.
A singleton, in memory, publish/subscribe message store and router. The single instance of this class holds a mapping of topics and a list of messages which have been pushed to them and a mapping of consumers which subscribe to the topics. Upon subscribing to a topic, a consumer is sent all of the messages which were previously registered against the topic. Upon pushing to a topic, the message is added to the store and immediately forwarded to each of the consumers which subscribe to the topic.
- async push(topic: str, message: Message) None [source]#
Asynchronous method which propagates a message to subscribers and stores it.
An asynchronous method which propagates the given message to consumers which subscribe to the topic and stores the message in the topic -> messages mapping.
- async subscribe(consumer: InternalStateConsumer, topics: Iterable[str])[source]#
Subscribes the consumer to the given topics.
An asynchronous method which adds a consumer to the subscriber list of each topic in the topics iterable. On subscription, previous messages on the topic are immediately passed to the consumer.
- Parameters:
consumer (InternalStateConsumer) – The consumer which is subscribing to the topic.
topics (Iterable[str]) – The topic which the consumer is to be subscribed to.
- create_topic(topic: str) None [source]#
Creates a new topic as an empty message list.
- Parameters:
topic (str) – The topic to be created.
- class tickit.core.state_interfaces.internal.InternalStateConsumer(callback: Callable[[C], Awaitable[None]])[source]#
An internal, singleton based, implementation of the StateConsumer protocol.
A internal, singleton based, implementation of the StateConsumer protocol, this consumer can subscribe to InternalStateServer topics, upon receiving a message the consumer passes the value to the callback function passed during initialization, if a topic is subscribed to which does not yet exist it is created.
Gets an instance of the InternalStateServer for use in subscribe.
- Parameters:
callback (Callable[[C], Awaitable[None]]) – An asynchronous handler function for consumed values.
- class tickit.core.state_interfaces.internal.InternalStateProducer[source]#
An internal, singleton based, implementation of the StateProducer protocol.
An internal, singleton based, implementation of the StateProducer protocol, this producer can produce a value to a InternalStateServer topic, if the topic does not yet exist it is created.
Gets an instance of the InternalStateServer for use in produce.
tickit.core.state_interfaces.kafka
#
- class tickit.core.state_interfaces.kafka.C#
A consumable value
alias of TypeVar(‘C’)
- class tickit.core.state_interfaces.kafka.P#
A producible value
alias of TypeVar(‘P’)
- class tickit.core.state_interfaces.kafka.KafkaStateConsumer(callback: Callable[[C], Awaitable[None]])[source]#
A kafka implementation of the StateConsumer protocol.
A kafka implementation of the StateConsumer protocol, this consumer can subscribe to kafka topics, upon receiving a message the consumer passes the value to the callback function passed during initialization, if a topic is subscribed to which does not yet exist it is created.
Creates an AIOKafka Consumer and begins consuming subscribed topics.
- Parameters:
callback (Callable[[C], Awaitable[None]]) – An asynchronous handler function for consumed values.
- class tickit.core.state_interfaces.kafka.KafkaStateProducer[source]#
A kafka implementation of the StateProducer protocol.
A kafka implementation of the StateProducer protocol, this producer can produce a value to a kafka topic, if the topic does not yet exist it is created.
Creates and starts and AIOKafka Producer.
tickit.core.state_interfaces.state_interface
#
- class tickit.core.state_interfaces.state_interface.C#
A consumable value
alias of TypeVar(‘C’, covariant=True)
- class tickit.core.state_interfaces.state_interface.P#
A producible value
alias of TypeVar(‘P’, contravariant=True)
- class tickit.core.state_interfaces.state_interface.StateConsumer(*args, **kwargs)[source]#
An interface for types which implement publish/subscribe message consumers.
An interface for types which implement publish/subscribe message consumers, the consumer must be able to subscribe to topics within the messaging framework, upon receiving a message the consumer should pass the value to the callback function, if a topic is subscribed to which does not yet exist it should be created.
- class tickit.core.state_interfaces.state_interface.StateProducer(*args, **kwargs)[source]#
An interface for types which implement publish/subscribe message producers.
An interface for types which implement publish/subscribe message producers, the producer must be able to produce a value to a topic within the messaging framework, if the topic does not yet exist it should be created.
- class tickit.core.state_interfaces.state_interface.StateInterface#
The union of StateConsumer and StateProducer
alias of TypeVar(‘StateInterface’, ~tickit.core.state_interfaces.state_interface.StateConsumer, ~tickit.core.state_interfaces.state_interface.StateProducer)
- tickit.core.state_interfaces.state_interface.add(name: str, external: bool) Callable[[Type[StateInterface]], Type[StateInterface]] [source]#
A decorator to add a StateInterface to the registry.
A decorator to add a StateInterface to the registry of StateConsumers or StateProducer according to its signature. StateConsumers and StateProducers which are intended to work together should be added with the same name.
- tickit.core.state_interfaces.state_interface.interfaces(external: bool = False) Set[str] [source]#
Returns interface names for which both a StateConsumer and StateProducer exist.
Gets a set of interface names for which both a StateConsumer and StateProducer exist. The external option may be used to restrict interfaces to those which may be used in simulations which are distributed across multiple processes.
- tickit.core.state_interfaces.state_interface.satisfy_externality(external: bool, interfaces: Dict[str, Tuple[Type[StateInterface], bool]]) Set[str] [source]#
Finds which interfaces satisfy the externality requirement provided.
- Parameters:
external (bool) – If true, only interfaces which can be used in simulations which are distributed across processes are returned. If false, all interfaces are returned.
interfaces (Dict[str, Tuple[Type[StateInterface], bool]]) – A mapping of interface names to a tuple of their class and their externality flag.
- Returns:
A set of interface names which satisfy the externality requirement.
- Return type:
Set[str]
- tickit.core.state_interfaces.state_interface.get_interface(name: str) Tuple[Type[StateConsumer], Type[StateProducer]] [source]#
Get the StateConsumer and StateProducer classes for a given interface name.
- Parameters:
name (str) – The name of the interface to be retrieved.
- Returns:
A tuple of the StateConsumer and StateProducer classes.
- Return type:
Tuple[Type[StateConsumer], Type[StateProducer]]
tickit.adapters
#
tickit.adapters.interpreters
#
tickit.adapters.interpreters.command
#
tickit.adapters.interpreters.command.command_interpreter
#
- class tickit.adapters.interpreters.command.command_interpreter.Command(*args, **kwargs)[source]#
An interface for interpretable commands.
- abstract parse(data: bytes) Sequence | None [source]#
An abstract method which parses a message and extracts arguments.
An abstract method which parses a message and produces arguments if a match is found, otherwise None is returned.
- Parameters:
data (bytes) – The message to be parsed.
- Returns:
A sequence of arguments extracted from the message if matched, otherwise None.
- Return type:
Optional[Sequence[AnyStr]]
- class tickit.adapters.interpreters.command.command_interpreter.CommandInterpreter[source]#
An interpreter which routes to commands registered to adapter methods.
An interpreter which attempts to parse messages according to the parse method of commands registered against adapter methods, if a match is found the method is called with the parsed arguments.
- static unknown_command() AsyncIterable[bytes] [source]#
An asynchronous iterable of containing a single unknown command reply.
- Returns:
An asynchronous iterable of containing a single unknown command reply: “Request does not match any known command”.
- Return type:
AsyncIterable[bytes]
- async handle(adapter: Adapter, message: AnyStr) Tuple[AsyncIterable, bool] [source]#
Matches the message to an adapter command and calls the corresponding method.
An asynchronous method which handles a message by attempting to match the message against each of the registered commands, if a match is found the corresponding command is called and its reply is returned with an asynchronous iterable wrapper if required. If no match is found the unknown command message is returned with no request for interrupt.
- Parameters:
- Returns:
A tuple of the asynchronous iterable of reply messages and a flag indicating whether an interrupt should be raised by the adapter.
- Return type:
tickit.adapters.interpreters.command.regex_command
#
- class tickit.adapters.interpreters.command.regex_command.RegexCommand(regex: AnyStr, interrupt: bool = False, format: str | None = None)[source]#
A decorator to register an adapter method as a regex parsed command.
- Parameters:
regex (Union[bytes, str]) – The regular expression pattern which must be matched in full, with groups used to extract command arguments.
interrupt (bool) – A flag indicating whether calling of the method should raise an adapter interrupt. Defaults to False.
format (Optional[str]) – The message decoding format to be used for string based interpretation. Defaults to None.
- Returns:
A decorator which registers the adapter method as a message handler.
- Return type:
Callable
- parse(data: bytes) Sequence | None [source]#
Performs message decoding and regex matching to match and extract arguments.
A method which performs message decoding according to the command formatting string, checks for a full regular expression match and returns a sequence of function arguments if a match is found, otherwise the method returns None.
- Parameters:
data (bytes) – The message data to be parsed.
- Returns:
If a full match is found a sequence of function arguments is returned, otherwise the method returns None.
- Return type:
Optional[Sequence[AnyStr]]
tickit.adapters.interpreters.endpoints
#
tickit.adapters.interpreters.endpoints.http_endpoint
#
- class tickit.adapters.interpreters.endpoints.http_endpoint.HttpEndpoint(path: str, method: str, interrupt: bool = False)[source]#
A decorator intended for use with HttpAdapter.
Routes an HTTP endpoint to the decorated method.
- Parameters:
path – The URL that will point to a specific endpoint.
method – The method to use when using this endpoint.
interrupt – If True, every time this endpoint is called the adapter’s device will be interrupted. Defaults to False.
- Returns:
A decorator which registers the adapter method as an endpoint.
- Return type:
Callable
- define(func: Callable[[Request], Awaitable[StreamResponse]]) RouteDef [source]#
Performs the construction of the endpoint RouteDef for the HTTP Server.
- A method which performs the construction of the route definition of the method,
URL and handler function for the endpoint, to then return to the HTTP Server.
- Parameters:
func (Callable) – The handler function to be attached to the route
endpoint. (definition for the) –
- Returns:
The route definition for the endpoint.
- Return type:
RouteDef
- classmethod get(url: str, interrupt: bool = False) HttpEndpoint [source]#
Shortcut to making a GET endpoint.
- Returns:
The class of HttpEndpoint with the “GET” request method.
- Return type:
cls
- classmethod put(url: str, interrupt: bool = False) HttpEndpoint [source]#
Shortcut to making a PUT endpoint.
- Returns:
The class of HttpEndpoint with the “PUT” request method.
- Return type:
cls
- classmethod post(url: str, interrupt: bool = False) HttpEndpoint [source]#
Shortcut to making a POST endpoint.
- Returns:
The class of HttpEndpoint with the “POST” request method.
- Return type:
cls
tickit.adapters.interpreters.wrappers
#
tickit.adapters.interpreters.wrappers.beheading_interpreter
#
- class tickit.adapters.interpreters.wrappers.beheading_interpreter.BeheadingInterpreter(interpreter: Interpreter, header_size: int)[source]#
A wrapper for an interpreter which strips a header from a message.
An interpreter wrapper that takes a message, strips off a header of a fixed size and passes the stripped message on to the wrapped interpreter
A wrapper for an interpreter which strips a header from a message.
- Parameters:
interpreter (Interpreter) – The interpreter the message is passed on to after the header is stripped.
header_size (int) – The number of characters in the header.
- async handle(adapter: Adapter, message: AnyStr) Tuple[AsyncIterable, bool] [source]#
Removes a header from a message and passes the message on to an interpreter.
- Parameters:
adapter (Adapter) – The adapter in which the function should be executed
message – (AnyStr): The handled message, of which the header is removed.
- Returns:
A tuple of the asynchronous iterable of reply messages and a flag indicating whether an interrupt should be raised by the adapter.
- Return type:
tickit.adapters.interpreters.wrappers.joining_interpreter
#
- class tickit.adapters.interpreters.wrappers.joining_interpreter.JoiningInterpreter(interpreter: Interpreter, response_delimiter: AnyStr)[source]#
A wrapper for an interpreter that combines responses.
An interpreter wrapper class that takes the wrapped interpreter’s response(s) to a message and combines them into a single response.
A decorator for an interpreter that combines multiple responses into one.
- Parameters:
interpreter (Interpreter) – The interpreter responding to a message.
response_delimiter (AnyStr) – The delimiter separating the responses to the individual responses when they are combined into a single response message.
- async handle(adapter: Adapter, message: AnyStr) Tuple[AsyncIterable, bool] [source]#
Merges the responses from an interpreter into a single message.
- Individual responses to the message are combined into a single response and
returned.
- Parameters:
adapter (Adapter) – The adapter in which the function should be executed.
message – (AnyStr): The message to be handled.
- Returns:
A tuple of the asynchronous iterable of a single reply message and a flag indicating whether an interrupt should be raised by the adapter.
- Return type:
tickit.adapters.interpreters.wrappers.splitting_interpreter
#
- class tickit.adapters.interpreters.wrappers.splitting_interpreter.SplittingInterpreter(interpreter: Interpreter, message_delimiter: AnyStr)[source]#
A wrapper for an interpreter that splits a single message into multiple.
An interpreter wrapper class that takes a message, splits it according to a given delimiter, and passes on the resulting sub-messages individually on to the wrapped interpreter.
An interpreter decorator that splits a message into multiple sub-messages.
- Parameters:
interpreter (Interpreter) – The interpreter messages are passed on to.
message_delimiter (AnyStr) – The delimiter by which the message is split up. Can be a regex pattern. Must be of the same type as the message.
- async handle(adapter: Adapter, message: AnyStr) Tuple[AsyncIterable, bool] [source]#
Splits a message and passes the resulting sub-messages on to an interpreter.
Splits a given message and passes the resulting sub-messages on to an interpreter. The responses to the individual sub-messages are then returned.
- Parameters:
adapter (Adapter) – The adapter in which the function should be executed
message – (AnyStr): The message to be split up and handled.
- Returns:
A tuple of the asynchronous iterable of reply messages and a flag indicating whether an interrupt should be raised by the adapter.
- Return type:
tickit.adapters.servers
#
tickit.adapters.servers.tcp
#
- class tickit.adapters.servers.tcp.TcpServer(host: str = 'localhost', port: int = 25565, format: ByteFormat = ByteFormat(format=b'%b'))[source]#
A configurable tcp server with delegated message handling for use in adapters.
The TcpServer constructor which takes a host, port and format byte string.
- Parameters:
host (str) – The host name which the server should be run under.
port (int) – The port number which the server should listen to.
format (ByteFormat) – A formatting string for messages sent by the server, allowing for the prepending and appending of data. Defaults to b”%b”.
- async run_forever(on_connect: Callable[[], AsyncIterable[bytes | None]], handler: Callable[[bytes], Awaitable[AsyncIterable[bytes | None]]]) None [source]#
Runs the TCP server indefinitely on the configured host and port.
An asynchronous method used to run the server indefinitely on the configured host and port. Upon client connection, messages from the on_connect iterable will be sent. Upon receiving a message the server will delegate handling of it to the handler. Replies will be formatted according to the configured format string.
tickit.adapters.composed
#
- class tickit.adapters.composed.T#
Message type
alias of TypeVar(‘T’)
- class tickit.adapters.composed.ComposedAdapter(server: Server, interpreter: Interpreter)[source]#
An adapter implementation which delegates to a server and interpreter.
An adapter implementation which delegates the hosting of an external messaging protocol to a server and message handling to an interpreter.
- async on_connect() AsyncIterable[T | None] [source]#
Overridable asynchronous iterable which yields messages on client connection.
- Returns:
An asynchronous iterable of messages.
- Return type:
AsyncIterable[Optional[T]]
- async handle_message(message: T) AsyncIterable[T | None] [source]#
Delegates message handling to the interpreter, raises interrupt if requested.
- async run_forever(device: Device, raise_interrupt: RaiseInterrupt) None [source]#
Runs the server continuously.
tickit.adapters.httpadapter
#
- class tickit.adapters.httpadapter.HttpAdapter(host: str = 'localhost', port: int = 8080, _stopped: Event | None = None, _ready: Event | None = None)[source]#
An adapter implementation which delegates to a server and sets up endpoints.
An adapter implementation which delegates the hosting of an http requests to a server and sets up the endpoints for said server.
- async run_forever(device: Device, raise_interrupt: RaiseInterrupt) None [source]#
Runs the server continuously.
- endpoints() Iterable[RouteDef] [source]#
Returns list of endpoints.
Fetches the defined HTTP endpoints in the device adapter, parses them and then yields them.
- Returns:
The list of defined endpoints
- Return type:
Iterable[HttpEndpoint]
- Yields:
Iterator[Iterable[HttpEndpoint]] – The iterator of the defined endpoints
tickit.adapters.zeromq.push_adapter
#
- class tickit.adapters.zeromq.push_adapter.ZeroMqPushAdapter(host: str = '127.0.0.1', port: int = 5555, socket_factory: ~tickit.adapters.zeromq.push_adapter.SocketFactory = <function create_zmq_push_socket>)[source]#
An adapter for a ZeroMQ data stream.
Initialize with default values.
- async run_forever(device: Device, raise_interrupt: RaiseInterrupt) None [source]#
Runs the ZeroMQ adapter continuously.
tickit.adapters.epicsadapter
#
tickit.adapters.epicsadapter.adapter
#
- class tickit.adapters.epicsadapter.adapter.InputRecord(name: str, set: Callable, get: Callable)[source]#
A data container representing an EPICS input record.
- class tickit.adapters.epicsadapter.adapter.OutputRecord(name: str)[source]#
A data container representing an EPICS output record.
- class tickit.adapters.epicsadapter.adapter.EpicsAdapter(ioc_name: str, db_file: str | None = None)[source]#
An adapter implementation which acts as an EPICS IOC.
This is optionally initialised from an EPICS database (db) file but can be customised in code by implementing on_db_load.
An EpicsAdapter constructor which stores the db_file path and the IOC name.
- Parameters:
- link_input_on_interrupt(record: InputRecord, getter: Callable[[], Any]) None [source]#
Adds a record and a getter to the mapping of interrupting records.
- Parameters:
record (InputRecord) – The record to be added.
getter (Callable[[], Any]) – The getter handle.
- abstract on_db_load() None [source]#
Customises records that have been loaded in to suit the simulation.
- async run_forever(device: Device, raise_interrupt: RaiseInterrupt) None [source]#
Runs the server continuously.
tickit.devices
#
tickit.devices.sink
#
- class tickit.devices.sink.SinkDevice[source]#
A simple device which can take any input and produces no output.
- class Inputs#
A typed mapping containing the ‘input’ input value
- class Outputs#
An empty typed mapping of device outputs
- class tickit.devices.sink.Sink(name: ComponentID, inputs: Dict[PortID, ComponentPort], type: Literal['tickit.devices.sink.Sink'] = 'tickit.devices.sink.Sink')[source]#
Arbitrary value sink that logs the value.
tickit.devices.source
#
- class tickit.devices.source.SourceDevice(value: Any)[source]#
A simple device which produces a pre-configured value.
A constructor of the source, which takes the pre-configured output value.
- Parameters:
value (Any) – A pre-configured output value.
- class Inputs#
An empty typed mapping of device inputs
- class Outputs#
A typed mapping containing the ‘value’ output value
- class tickit.devices.source.Source(name: ComponentID, inputs: Dict[PortID, ComponentPort], value: Any, type: Literal['tickit.devices.source.Source'] = 'tickit.devices.source.Source')[source]#
Source of a fixed value.
tickit.utils
#
tickit.utils.byte_format
#
tickit.utils.configuration
#
tickit.utils.configuration.tagged_union
#
tickit.utils.configuration.loading
#
- tickit.utils.configuration.loading.read_configs(config_path) List[ComponentConfig] [source]#
A utility function which reads and deserializes configs.
A utility function which reads config files, performs yaml deserialization, loads the required internal and external modules then subsequently performs apischema deserialization to produce a list of component configuration objects.
tickit.utils.singleton
#
tickit.utils.topic_naming
#
- tickit.utils.topic_naming.valid_component_id(component: ComponentID) None [source]#
Checks the validity of a ComponentID for generating a topic name.
- Parameters:
component (ComponentID) – The ComponentID to check.
- Raises:
ValueError – The component name is empty.
- tickit.utils.topic_naming.output_topic(component: ComponentID) str [source]#
Creates the output topic name for a given component.
- Parameters:
component (ComponentID) – The component for which an output topic name is required.
- Returns:
The output topic name of the component.
- Return type:
- tickit.utils.topic_naming.input_topic(component: ComponentID) str [source]#
Creates returns the input topic name for a given component.
- Parameters:
component (ComponentID) – The component for which an input topic name is required.
- Returns:
The input topic name of the component.
- Return type: