Source code for tickit.core.lifetime_runnable

import asyncio
import traceback
from typing import Iterable, List

from tickit.utils.compat.typing_compat import Protocol, runtime_checkable


[docs]@runtime_checkable class LifetimeRunnable(Protocol): """An interface for types which implement an awaitable run_forever method."""
[docs] async def run_forever(self) -> None: """An asynchronous method which may run indefinitely.""" pass
[docs]def run_all(runnables: Iterable[LifetimeRunnable]) -> List[asyncio.Task]: """Asynchronously runs the run_forever method of each lifetime runnable. Creates and runs an asyncio task for the run_forever method of each lifetime runnable. Calls to the run_forever method are wrapped with an error handler. Args: runnables: An iterable of objects which implement run_forever. Returns: List[asyncio.Task]: A list of asyncio tasks for the runnables. """ async def run_with_error_handling(runnable: LifetimeRunnable) -> None: try: await runnable.run_forever() except Exception as e: # These are prints rather than logging because we just want the # result going directly to stdout. print("Task exception: {}".format(e)) print(traceback.format_exc()) return [ asyncio.create_task(run_with_error_handling(runnable)) for runnable in runnables ]
[docs]async def run_all_forever(runnables: Iterable[LifetimeRunnable]) -> None: """Asynchronously runs the run_forever method of each lifetime runnable. Creates and runs an asyncio task for the run_forever method of each lifetime runnable. Calls to the run_forever method are wrapped with an error handler. This function blocks until all run_forever methods have completed. Args: runnables: An iterable of objects which implement run_forever. """ await asyncio.wait(run_all(runnables))