import asyncio
import inspect
import logging
import threading
import atexit
import signal
[docs]class AsyncioDispatcher:
def __init__(self, loop=None, debug=False):
"""A dispatcher for `asyncio` based IOCs, suitable to be passed to
`softioc.iocInit`. Means that `on_update` callback functions can be
async.
If a ``loop`` is provided it must already be running. Otherwise a new
Event Loop will be created and run in a dedicated thread.
``debug`` is passed through to ``asyncio.run()``.
For a clean exit, call ``softioc.interactive_ioc(..., call_exit=False)``
"""
if loop is None:
# will wait until worker is executing the new loop
started = threading.Event()
# Make one and run it in a background thread
self.__worker = threading.Thread(
target=asyncio.run,
args=(self.__inloop(started),),
kwargs={'debug': debug})
# Explicitly manage worker thread as part of interpreter shutdown.
# Otherwise threading module will deadlock trying to join()
# before our atexit hook runs, while the loop is still running.
self.__worker.daemon = True
self.__worker.start()
started.wait()
self.__atexit = atexit.register(self.__shutdown)
assert self.loop is not None and self.loop.is_running()
elif not loop.is_running():
raise ValueError("Provided asyncio event loop is not running")
else:
self.loop = loop
def close(self):
if self.__atexit is not None:
atexit.unregister(self.__atexit)
self.__atexit = None
self.__shutdown()
def wait_for_quit(self):
stop_event = threading.Event()
def signal_handler(signum, frame):
stop_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
stop_event.wait()
async def __inloop(self, started):
self.loop = asyncio.get_running_loop()
self.__interrupt = asyncio.Event()
started.set()
del started
await self.__interrupt.wait()
def __shutdown(self):
if self.__worker is not None:
self.loop.call_soon_threadsafe(self.__interrupt.set)
self.__worker.join()
self.__worker = None
def __call__(
self,
func,
func_args=(),
completion = None,
completion_args=()):
async def async_wrapper():
try:
ret = func(*func_args)
if inspect.isawaitable(ret):
await ret
if completion:
completion(*completion_args)
except Exception:
logging.exception("Exception when running dispatched callback")
asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop)
def __enter__(self):
return self
def __exit__(self, A, B, C):
self.close()