# This file is part of the Diamond cothread library.
#
# Copyright (C) 2007 James Rowland, 2007-2012 Michael Abbott,
# Diamond Light Source Ltd.
#
# The Diamond cothread library is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the License,
# or (at your option) any later version.
#
# The Diamond cothread library is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
# Contact:
# Dr. Michael Abbott,
# Diamond Light Source Ltd,
# Diamond House,
# Chilton,
# Didcot,
# Oxfordshire,
# OX11 0DE
# michael.abbott@diamond.ac.uk
'''Simple cooperative threading using coroutines. The following functions
define the interface provided by this module.
Spawn(function, arguments...)
A new cooperative thread, or "task", is created as a call to
function(arguments). Control is not transferred to the task until
control is yielded.
Sleep(delay)
SleepUntil(time)
The calling task is suspended until the given time. Sleep(delay)
suspends the task for at least delay seconds, SleepUntil(time)
suspends until the specified time has passed (time is defined as the
value returned by time.time()).
Control is not returned to the calling task until all other
active tasks have been processed.
Yield()
Yield() suspends control so that all other potentially busy tasks can
run.
Instances of the Event object can be used for communication between tasks.
The following Event object methods are relevant.
Wait()
Wait(timeout)
Waits for the event object to be signalled or for the timeout to
expire (if specified). Returns True if a signal was received, False
if a timeout ocurred.
Signal()
Signals the event object, releasing at least one waiting task.
Similarly the EventQueue can be used for communication.
'''
# It might be worth taking a close look at:
# http://wiki.secondlife.com/wiki/Eventlet
from __future__ import print_function
import sys
import os
import time
import bisect
import traceback
import collections
import threading
from . import _coroutine
from . import py23
if os.environ.get('COTHREAD_CHECK_STACK'):
_coroutine.enable_check_stack(True)
from . import coselect
if sys.version_info >= (3,):
import _thread
else:
import thread as _thread
__all__ = [
'Spawn', # Spawn new task
'Sleep', # Suspend task for given delay
'SleepUntil', # Suspend task until specified time
'Yield', # Suspend task for immediate resumption
'Event', # Event for waiting and signalling
'RLock', # Recursive lock
'Pulse', # Event for dynamic condition variables
'EventQueue', # Queue of objects with event handling
'ThreadedEventQueue', # Event queue designed to work with threads
'WaitForAll', # Wait for all events to become ready
'AbsTimeout', # Converts timeout into absolute deadline format
'GetDeadline', # Returns deadline associated with timeout
'Deadline', # Converts deadline into timeout format
'Timedout', # Timeout exception raised by event waiting
'Quit', # Immediate process quit
'WaitForQuit', # Wait until Quit() is called
'Timer', # One-shot cancellable timer
'Callback', # Simple asynchronous synchronisation
'CallbackResult', # Asynchronous synchronisation with result
'scheduler_thread_id', # For checking we're in cothread's thread
]
class _TimerQueue(object):
'''A timer queue: objects are held on the queue in timeout sequence.'''
# The queue is implemented using the bisect function to insert objects
# into the queue without having to resort the list. This is cheap and
# cheerful to implement and runs fast enough.
def __init__(self):
# We maintain the list of timeouts and the associated tasks separately
# so that bisect searching can safely search the timeouts list without
# trying to compare wakeups.
self.__timeouts = []
self.__wakeups = []
self.__garbage = 0
def put(self, task, timeout):
'''Adds value to the queue with the specified timeout.'''
index = bisect.bisect(self.__timeouts, timeout)
self.__timeouts.insert(index, timeout)
self.__wakeups.insert(index, task)
def timeout(self):
'''Returns the timeout of the queue. Only valid if queue not empty.'''
return self.__timeouts[0]
def wake_expired(self):
index = bisect.bisect_right(self.__timeouts, time.time())
expired = self.__wakeups[:index]
del self.__timeouts[:index]
del self.__wakeups[:index]
for task in expired:
if not task.wakeup(_WAKEUP_TIMEOUT):
self.__garbage -= 1
assert 0 <= self.__garbage <= len(self)
def __len__(self):
'''Returns the number of entries on the queue.'''
return len(self.__timeouts)
def cancel(self):
'''This is called to cancel a timeout. We add this to our garbage
count, triggering a garbage collect if appropriate.'''
self.__garbage += 1
if 2 * self.__garbage > len(self):
timeouts = []
wakeups = []
for timeout, task in zip(self.__timeouts, self.__wakeups):
if not task.woken():
timeouts.append(timeout)
wakeups.append(task)
self.__timeouts = timeouts
self.__wakeups = wakeups
self.__garbage = 0
class _WakeupQueue(object):
__slots__ = [
'__waiters', # List of wakeup objects pending wakeup
'__garbage', # Count of expired wakeup objects
]
def __init__(self):
self.__waiters = []
# Every time a timeout occurs a waiter is left behind on the timer
# queue. We keep count of these as "garbage", and at the appropriate
# time we can garbage collect the queue.
self.__garbage = 0
def __len__(self):
return len(self.__waiters)
def append(self, waiter):
self.__waiters.append(waiter)
def wake(self, wake_all):
if self.__waiters:
if wake_all:
for task in self.__waiters:
task.wakeup(_WAKEUP_NORMAL)
self.__waiters = []
self.__garbage = 0
else:
# Wake the first task that actually wakes, mark the rest as
# junk.
for n, task in enumerate(self.__waiters):
if task.wakeup(_WAKEUP_NORMAL):
break
else:
self.__garbage -= 1
del self.__waiters[:n+1]
assert 0 <= self.__garbage <= len(self)
def cancel(self):
# A cancelled wait becomes garbage on the waiting queue. We keep
# count of how much garbage there is -- once the queue has more
# garbage than waiters it's probably time to rebuild the queue and
# keep only those waiters which haven't been woken yet.
self.__garbage += 1
if 2 * self.__garbage > len(self):
self.__waiters = [task
for task in self.__waiters
if not task.woken()]
self.__garbage = 0
class _Wakeup(object):
'''A _Wakeup object is used when a task is to be suspended on one or more
queues. On wakeup the original task is woken, but only once: this is
used to ensure that entries on other queues are effectively cancelled.'''
__slots__ = [
'__task', # Coroutine associated with task to wake
'__wakeup_task', # Action to take on wakeup
'__queue', # Queue where this wakeup object resides
'__timers', # Timeout queue for this wakeup
]
def __init__(self, wakeup_task, queue, timers):
self.__task = _coroutine.get_current()
self.__wakeup_task = wakeup_task
self.__queue = queue
self.__timers = timers
def wakeup(self, reason):
if self.__task:
# Let the scheduler know that this task has been woken, and forget
# about it, so we don't wake it again.
# Note that it's rather important to mark this wakeup as woken
# *before* calling the queue cancel() functions, as otherwise
# their garbage collection will be confused!
self.__wakeup_task(self.__task, reason)
self.__task = None
# Each queue needs to be cancelled if it's not the wakeup reason.
# This test also properly deals with interrupt wakeup, which
# requires both queues to be cancelled.
if reason != _WAKEUP_NORMAL and self.__queue:
self.__queue.cancel()
if reason != _WAKEUP_TIMEOUT and self.__timers:
self.__timers.cancel()
# Also drop our reference to the queue to avoid overextending
# object lifetime.
self.__queue = None
return True
else:
return False
def woken(self):
return self.__task is None
# Task wakeup reasons
_WAKEUP_NORMAL = 0 # Normal wakeup
_WAKEUP_TIMEOUT = 1 # Wakeup on timeout
# A third reason, transfering exception to another cothread, is encoded as a
# tuple.
# Important system invariants:
# - A running task is not on any waiting queue.
# This is enforced by:
# 1) when a task it suspended it is recorded on waiting queues by using
# a shared _Wakeup() object;
# 2) the .wakeup() method is always used before resuming the task.
class _Scheduler(object):
'''Coroutine activity scheduler.'''
# The scheduler runs in a dedicated stack. It doesn't need much stack.
SCHEDULER_STACK_SIZE = 65536
@classmethod
def create(cls):
'''Creates the scheduler in its own coroutine and starts it running.
We switch to the scheduler long enough for it to complete
initialisation.'''
# We run the scheduler in its own coroutine to allow the main task to
# participate in scheduling. This produces its own complications but
# makes for a more usable system.
current = _coroutine.get_current()
scheduler_task = _coroutine.create(
current, cls.__scheduler, cls.SCHEDULER_STACK_SIZE)
return _coroutine.switch(scheduler_task, current)
@classmethod
def __scheduler(cls, main_task):
'''The top level scheduler loop. Starts by creating the scheduler,
and then manages dispatching from the top level.'''
# First create the scheduler and pass it back to our caller, who we
# expect to be the main task. The next time we get control it's time
# to run the scheduling loop.
self = cls()
_coroutine.switch(main_task, self)
# If the schedule loop raises an exception then propagate the
# exception up to the main thread before restarting the scheduler.
# This has mostly the right effects: a standalone program will
# terminate, and an interactive program will receive back control, and
# the scheduler should carry on operating.
while True:
try:
self.__schedule_loop()
except:
# Switch to the main task asking it to re-raise the interrupt.
# First we have to make sure it's not on the run queue.
for index, (task, reason) in enumerate(self.__ready_queue):
if task is main_task:
del self.__ready_queue[index]
break
# All task wakeup entry points will interpret this as a request
# to re-raise the exception. Pass through the exception info.
_coroutine.switch(main_task, sys.exc_info())
def __init__(self):
# List of all tasks that are currently ready to be dispatched.
self.__ready_queue = []
# List of tasks waiting for ready_queue to become empty
self.__yield_queue = _WakeupQueue()
# List of tasks waiting for a timeout
self.__timer_queue = _TimerQueue()
# Scheduler coroutine: this will be switched to whenever any other
# task decides to sleep.
self.__coroutine = _coroutine.get_current()
# Initially the schedule loop will run freely with its own select.
self.__poll_callback = None
# Dictionary of waitable descriptors for which polling needs to be
# done. Each entry consists of an event mask together with a list of
# interested tasks.
self.__poll_queue = {}
# By default use blocking poll while waiting for the next event.
self._poll_block = coselect.poll_block
def __tick(self):
'''This must be called regularly to ensure that all waiting tasks are
processed. It processes all tasks that are ready to run and then runs
all timers that have expired.'''
# Wake up all the expired timers on entry. These go to the end of
# the ready queue.
self.__timer_queue.wake_expired()
# If the ready queue is still empty, now's the time to run the yield
# queue.
if not self.__ready_queue:
self.__yield_queue.wake(True)
# Pick up the ready queue and process every task in it. When each
# task is resumed it is passed a flag indicating whether it has been
# resumed because of an expired timer, or for some other reason
# (typically either a voluntary suspend, or a successful wait for an
# event).
ready_queue = self.__ready_queue
self.__ready_queue = []
for task, reason in ready_queue:
_coroutine.switch(task, reason)
def __schedule_loop(self):
'''This runs a scheduler loop without returning.'''
while True:
# Dispatch all waiting tasks
self.__tick()
# Now see how long we have to wait for the next tick
if self.__ready_queue or self.__yield_queue:
# There are ready tasks: don't wait
delay = 0
elif self.__timer_queue:
# There are timers waiting to fire: wait for the first one. We
# don't sleep for less than 1ms: there's not a lot of point in a
# shorter timeout, and this works around some timer calculation
# quirks.
delay = max(self.__timer_queue.timeout() - time.time(), 0.001)
else:
# Nothing to do: block until something external happens.
delay = None
# Finally suspend until something is ready.
self.__wakeup_poll(self.__poll_suspend(delay))
def __poll_suspend(self, delay):
'''Suspends the scheduler until the appropriate ready condition is
reached. Returns lists of ready file descriptors and events.'''
poll_list, self.__poll_queue = \
coselect._compute_poll_list(self.__poll_queue)
if self.__poll_callback is None:
# If we're not being polled from outside, run our own poll.
return self._poll_block(poll_list, delay)
else:
# If the scheduler loop was invoked from outside then return
# control back to the caller: it will provide the select
# operation we need.
return self.__poll_callback.switch(poll_list, delay)
def poll_scheduler(self, ready_list):
'''This is called when the scheduler needs to be controlled from
outside. It will perform a full round of scheduling before returing
control to the caller.
Two values are returned, a list of descriptors and events plus
a timeout, being precisely the values required for a call to
poll_block(). A sensible default outer scheduler loop would be
ready_list = []
while True:
ready_list = poll_block(*poll_scheduler(ready_list))
'''
assert self.__poll_callback is None, 'Nested pollers will not work'
# Switching to the scheduler will return control to us when the next
# round is complete.
# Note that the first time this is called we may get an incomplete
# schedule, as we may be resuming inside the dispatch loop: in effect
# the first call to this routine interrupts the original scheduler.
self.__poll_callback = _coroutine.get_current()
result = _coroutine.switch(self.__coroutine, ready_list)
self.__poll_callback = None
if isinstance(result, tuple):
# This case arises if we are main and the scheduler just died.
py23.raise_with_traceback(result)
else:
return result
def spawn(self, function, stack_size):
'''Spawns a new task: function is spawned as a new background task
as a child of the scheduler task.'''
task = _coroutine.create(self.__coroutine, function, stack_size)
self.__ready_queue.append((task, _WAKEUP_NORMAL))
def do_yield(self, until):
'''Hands control to the next task with work to do, will return as
soon as there is time.'''
self.wait_until(until, self.__yield_queue, None)
def wait_until(self, until, suspend_queue, wakeup):
'''The calling task is suspended. If a deadline is given then the
task will definitely be woken up when the deadline is reached if not
before. If a suspend_queue is given then the task is added to it
(and it is the caller's responsibility to ensure the task is woken
up, with a call to wakeup()).
Returns True iff the wakeup is from a timeout.'''
# If no wakeup has been specified, create one. This is a key
# component for ensuring consistent behaviour of the system: the
# wakeup object ensures each task is only woken up exactly once.
if wakeup is None:
wakeup = self.__Wakeup(suspend_queue, until)
# If a timeout or a suspension queue has been specified, add
# ourselves as appropriate. Failing either of these it's up to the
# caller to arrange a wakeup.
if suspend_queue is not None:
suspend_queue.append(wakeup)
if until is not None:
self.__timer_queue.put(wakeup, until)
# Suspend until we're woken.
# Normally this call will return control to __tick(), but there are
# two other cases to consider. On the very first suspend control is
# returned to the top of __scheduler(), and more interestingly, on
# suspending immediately after calling poll_scheduler() control is
# returned to __select(). This last case expects a list of ready
# descriptors to be returned, so we have to be compatible with this!
result = _coroutine.switch(self.__coroutine, [])
if isinstance(result, tuple):
# We get here if main is suspended and the scheduler decides
# to die. Make sure our wakeup is cancelled, and then
# re-raise the offending exception.
wakeup.wakeup(result)
py23.raise_with_traceback(result)
else:
return result == _WAKEUP_TIMEOUT
def poll_until(self, poller, until):
'''Cooperative poll: the calling task is suspended until one of
the specified waitable objects becomes ready or the timeout expires.
'''
# Add our poller to the appropriate poll event queues so that we'll
# get woken. Note that we don't need to worry about coming off the
# queue: this'll be managed in _compute_poll_list later on
poller.wakeup = self.__Wakeup(None, until)
for file in poller.events:
self.__poll_queue.setdefault(file, []).append(poller)
# It's vital to yield during this call, even if we have actually
# timed out -- otherwise the wakeup we've just added to the poll
# queue will get processed when it's no longer valid (oops).
self.wait_until(until, None, poller.wakeup)
def __Wakeup(self, queue, until):
if until is None:
return _Wakeup(self.__wakeup_task, queue, None)
else:
return _Wakeup(self.__wakeup_task, queue, self.__timer_queue)
def __wakeup_task(self, task, reason):
if not isinstance(reason, tuple):
self.__ready_queue.append((task, reason))
def __wakeup_poll(self, poll_result):
'''Called with the result of a system poll: a list of file descriptors
and wakeup reasons. Each waiting task is informed.'''
# Work through all the notified files: with each file is a received
# event mask which we'll pass through to the interested task.
#
# Some care is required here if we are to neither deliver spurious
# wakeups nor lose wakeups.
# We make two assumption about our wakeup call, translating into
# assumptions on either coselect.poll_block or poll_scheduler:
# 1/ if an event is ready on a file we will eventually be notified;
# 2/ if an event is not ready we will not be notified -- in other
# words, if a poll notify occurs we can safely access the file
# without risk of blocking.
#
# The goal of the loop below is to translate these assumptions into
# corresponding properties on poll_until. The problem arises when
# there is more than one listener on an event, as the first listener
# may consume the event before subsequent listeners receive it.
# The simplest solution is to be to communicate each event to just
# one interested listener, but ensure that the event remains
# monitored.
for file, events in poll_result:
for poller in self.__poll_queue.get(file, []):
# Consume any events taken by the woken process
events &= ~poller.notify_wakeup(file, events)
class Timedout(Exception):
'''Waiting for event timed out.'''
def AbsTimeout(timeout):
'''A timeout is represented in one of three forms:
None A timeout that never expires
interval A relative timeout interval
(deadline,) An absolute deadline
This routine checks that the given input is in one of these three forms
and returns a timeout in absolute deadline format.'''
if timeout is None:
return None
elif isinstance(timeout, tuple):
return timeout
else:
return (timeout + time.time(),)
def GetDeadline(timeout):
'''Returns the deadline associated with the given timeout, or None if there
is no deadline.'''
if timeout is None:
return None
elif isinstance(timeout, tuple):
return timeout[0]
else:
return timeout + time.time()
def Deadline(deadline):
'''Converts a deadline into a timeout.'''
return (deadline,)
class EventBase(object):
'''The base class for implementing events and signals.'''
__slots__ = [
'__wait_queue', # Queue of cothreads waiting to be woken
'__wait_abort', # Count of abortable waits.
]
def __init__(self):
# List of tasks currently waiting to be woken up.
self.__wait_queue = _WakeupQueue()
# Number of aborted waits that need to be emulated. This is
# incremented by subclasses for each _Wakeup that needs to be
# simulated.
self.__wait_abort = 0
def _WaitUntil(self, timeout):
'''Suspends the calling task until _Wakeup() is called. Raises an
exception if a timeout occurs first.'''
# If the event object is not ready we always yield control to ensure
# that other ready cothreads get the opportunity to run.
_validate_thread()
if _scheduler.wait_until(GetDeadline(timeout), self.__wait_queue, None):
raise Timedout('Timed out waiting for event')
def _Wakeup(self, wake_all):
'''Wakes one or all waiting tasks. Returns False if an aborted wait
needs to be emulated.'''
_validate_thread()
if self.__wait_abort and not wake_all:
# This is a special case: an aborted wait needs to be completed.
# This occurs when waiting needs to be simulated, in which case
# any resources consumed by the reader need to be consumed by the
# waker instead!
self.__wait_abort -= 1
return False
else:
self.__wait_abort = 0
self.__wait_queue.wake(wake_all)
return True
def _AbortWait(self):
self.__wait_abort += 1
class Spawn(EventBase):
'''This class is used to wrap cooperative threads: every task (except
for main) managed by the scheduler should be an instance of this class.'''
__slots__ = [
'__function', # Function implementing cothread action
'__args', # Positional arguments for action
'__kargs', # Keyword arguments for action
'__result', # Result when action has completed
'__raise_on_wait', # Action to take on exception
]
# Set of all active processes for debugging
Cothreads = set()
def __init__(self, function, *args, **kargs):
'''The given function and arguments will be called as a new task.
All of the arguments will be be passed through to function, except for
the special keyword raise_on_wait which defaults to False.
If raise_on_wait is set then any exception raised during the
execution of this task will be postponed until Wait() is called. This
allows such exceptions to be caught without disturbing the normal
operation of the system. Otherwise any exception is reported.'''
EventBase.__init__(self)
self.__function = function
self.__args = args
self.__kargs = kargs
self.__result = ()
self.__raise_on_wait = kargs.pop('raise_on_wait', False)
# Hand control over to the run method in the scheduler.
_validate_thread()
_scheduler.spawn(self.__run, kargs.pop('stack_size', 0))
self.Cothreads.add(self)
def __run(self, _):
try:
# Try for normal successful result.
self.__result = (True,
self.__function(*self.__args, **self.__kargs))
except:
# Oops: the task terminated with an exception.
if self.__raise_on_wait:
# The creator of the task is willing to catch this exception,
# so hang onto it now until Wait() is called.
self.__result = (False, sys.exc_info())
else:
# No good. We can't allow this exception to propagate, as
# doing so will kill the scheduler. Instead report the
# traceback right here.
print('Spawned task',
getattr(self.__function, '__name__', '(unknown)'),
'raised uncaught exception', file = sys.stderr)
traceback.print_exc()
self.__result = (True, None)
if not self._Wakeup(False):
# Aborted wakeup: consume the result now, will cause a subsequent
# Wait() to fail, which it should.
del self.__result
self.Cothreads.remove(self)
# See wait_until() for an explanation of this return value.
return []
def __bool__(self):
'''Tests whether the event is signalled.'''
return bool(self.__result)
__nonzero__ = __bool__
def Wait(self, timeout = None):
'''Waits until the task has completed. May raise an exception if the
task terminated with an exception and raise_on_wait was selected.
Can only be called once, as the result is deleted after call.'''
if not self.__result:
self._WaitUntil(timeout)
ok, result = self.__result
if ok:
return result
else:
try:
# Re-raise the exception that actually killed the task here
# where it can be received by whoever waits on the task.
py23.raise_with_traceback(result)
finally:
# In this case result and self.__result contain a traceback. To
# avoid circular references which will delay garbage collection,
# ensure these variables are deleted before the exception is
# caught.
del self.__result
del result
def AbortWait(self):
'''Called instead of performing a proper wait to release any resources
that might be consumed until the wait occurs.'''
if self.__result:
# Result has already arrived. Consume it silently now.
del self.__result
else:
# Still need to wait: need to abort the next wakeup.
self._AbortWait()
class Event(EventBase):
'''Any number of tasks can wait for an event to occur. A single value
can also be associated with the event.'''
__slots__ = [
'__value', # Value on this event
'__auto_reset', # Whether value is consumed when taken
]
def __init__(self, auto_reset = True):
'''An event object is either signalled or reset. Any task can wait
for the object to become signalled, and it will be suspended until
this occurs.
The intial value can be specified, as can the behaviour on succesfully
signalling a process: if auto_reset=True is specified then only one
task at a time sees any individual signal on this object.'''
EventBase.__init__(self)
self.__value = ()
self.__auto_reset = auto_reset
def __bool__(self):
'''Tests whether the event is signalled.'''
return bool(self.__value)
__nonzero__ = __bool__
def Wait(self, timeout = None):
'''The caller will block until the event becomes true, or until the
timeout occurs if a timeout is specified. A Timeout exception is
raised if a timeout occurs.'''
# If one task resets the event while another is waiting the wait may
# fail, so we have to loop here.
deadline = AbsTimeout(timeout)
while not self.__value:
self._WaitUntil(deadline)
ok, result = self.__value
if self.__auto_reset:
# If this is an auto reset event then we reset it on exit;
# this means that we're the only thread that sees it being
# signalled.
self.__value = ()
# Finally return the result as a value or raise an exception.
if ok:
return result
else:
raise result
def AbortWait(self):
'''Called instead of performing a proper wait to release any resources
that might be consumed until the wait occurs.'''
# If this isn't an auto_reset event then our aborted wait makes no
# difference. Otherwise we either consume the value now or on the
# next wakeup.
if self.__auto_reset:
if self.__value:
self.Reset()
else:
self._AbortWait()
def Signal(self, value = None):
'''Signals the event. Any waiting tasks are scheduled to be woken.'''
self.__value = (True, value)
if not self._Wakeup(not self.__auto_reset):
self.Reset()
def SignalException(self, exception):
'''Signals the event with an exception: the next call to wait will
receive an exception instead of a normal return value.'''
self.__value = (False, exception)
if not self._Wakeup(not self.__auto_reset):
self.Reset()
def Reset(self):
'''Resets the event (and erases the value).'''
self.__value = ()
class Pulse(EventBase):
'''Somewhat equivalent to pthread condition variable: any number of waiters
will be woken by calling the Signal() method, but there is no state and
nothing is returned from Wait().'''
def Wait(self, timeout = None):
self._WaitUntil(timeout)
def Signal(self, wake_all = True):
self._Wakeup(wake_all)
AbortWait = EventBase._AbortWait
class EventQueue(EventBase):
'''A queue of objects. A queue can also be treated as an iterator.'''
__slots__ = [
'__queue', # Queue of values
'__closed', # Used to halt iteration over this queue
'__max_length', # Maximum length of queue
]
def __init__(self, max_length = None):
EventBase.__init__(self)
self.__queue = []
self.__closed = False
self.__max_length = max_length
def __len__(self):
'''Returns the number of objects waiting on the queue.'''
return len(self.__queue)
def Wait(self, timeout = None):
'''Returns the next object from the queue, or raises a Timeout
exception if the timeout expires first.'''
deadline = AbsTimeout(timeout)
while not self.__queue and not self.__closed:
self._WaitUntil(deadline)
if self.__queue:
return self.__queue.pop(0)
else:
raise StopIteration
def AbortWait(self):
'''Called instead of performing a proper wait to release any resources
that might be consumed until the wait occurs.'''
if self.__queue:
self.__queue.pop(0)
elif not self.__closed:
self._AbortWait()
def Signal(self, value):
'''Adds the given value to the tail of the queue.'''
assert not self.__closed, 'Can\'t write to a closed queue'
if self.__max_length is None or len(self) < self.__max_length:
self.__queue.append(value)
if not self._Wakeup(False):
self.__queue.pop(0)
return True
else:
return False
def Reset(self):
'''Discards all values in queue.'''
self.__queue = []
def close(self):
'''An event queue can be closed. This will cause waiting to raise
the StopIteration exception (once existing entries have been read),
and will prevent any further signals to the queue.'''
self.__closed = True
self._Wakeup(True)
def __iter__(self):
'''An event queue can itself be treated as an iterator: this allows
event dispatching using a for loop, and provides some support for
combining queues.'''
return self
def next(self):
return self.Wait()
__next__ = next
class ThreadedEventQueue(object):
'''An event queue designed to work with threads.'''
__slots__ = [
'__values', # List of queued values
'__signal', # File handle used to notify new value
'wait_descriptor', # File handle waited on for new values
]
def __init__(self):
# According to the documentation this is thread safe, so we don't
# need to take any particular precautions when using this!
self.__values = collections.deque()
self.wait_descriptor, self.__signal = os.pipe()
def __len__(self):
'''Returns the number of objects waiting on the queue.'''
return len(self.__values)
def Wait(self, timeout = None):
'''Waits for a value to be written to the queue. This can safely be
called from either a cothread or another thread: the appropriate form
of cooperative or normal blocking will be selected automatically.'''
if _thread.get_ident() == scheduler_thread_id:
# Normal cothread case, use cooperative wait
poll = coselect.poll_list
else:
# Another thread, so block caller until ready
poll = coselect.poll_block
if not poll([(self.wait_descriptor, coselect.POLLIN)], timeout):
raise Timedout('Timed out waiting for signal')
os.read(self.wait_descriptor, 1)
return self.__values.popleft()
def Signal(self, value):
'''Posts a value to the event queue. This can safely be called from
a thread or a cothread.'''
self.__values.append(value)
os.write(self.__signal, b'-')
# Implements asynchronous (and "lock free") synchronisation from any Python
# thread to the main cothread thread. Technically the Python Global Interpreter
# Lock (GIL) plays an essential role in this code by serialising all the actions
# here and ensuring that collections.deque actions are atomic (which follows
# from its implementation as a C extension).
#
# Note that the signalling from Callback() to the callback_events() loop is
# rather delicate. Care is taken here to reduce the number of os.read/write
# actions as these involve costly system calls, but without the hazard of losing
# events which would result in deadlock.
class _Callback:
COTHREAD_CALLBACK_STACK = \
int(os.environ.get('COTHREAD_CALLBACK_STACK', 1024 * 1024))
def __init__(self):
self.values = collections.deque()
self.wait, self.signal = os.pipe()
self.waiting = False
Spawn(self.callback_events, stack_size = self.COTHREAD_CALLBACK_STACK)
def callback_events(self):
while True:
self.waiting = True
if not self.values:
coselect.poll_list([(self.wait, coselect.POLLIN)])
os.read(self.wait, 4096) # Consume all pending wakeups
while self.values:
action, args = self.values.popleft()
try:
action(*args)
except:
print('Asynchronous callback raised uncaught exception',
file = sys.stderr)
traceback.print_exc()
action = args = None
def __call__(self, action, *args):
'''This can be called from within any Python thread to arrange for
action(*args) to be called in the context of the cothread thread.'''
self.values.append((action, args))
if self.waiting:
self.waiting = False
os.write(self.signal, b'-')
def CallbackResult(action, *args, **kargs):
'''Perform action in the main cothread and return a result.'''
callback = kargs.pop('callback_queue', Callback)
timeout = kargs.pop('callback_timeout', None)
spawn = kargs.pop('callback_spawn', True)
if scheduler_thread_id == _thread.get_ident():
return action(*args, **kargs)
else:
event = threading.Event()
action_result = [False, None]
def do_action():
try:
action_result[0] = True
action_result[1] = action(*args, **kargs)
except:
action_result[0] = False
action_result[1] = sys.exc_info()
event.set()
# Hand the action over to the cothread carrying thread for action and
# wait for the result.
if spawn:
callback(Spawn, do_action)
else:
callback(do_action)
if not event.wait(timeout):
raise Timedout('Timed out waiting for callback result')
# Return result or raise caught exception as appropriate.
ok, result = action_result
if ok:
return result
else:
py23.raise_with_traceback(result)
# Note: raising entire stack backtrace context might be dangerous, need
# to think about this carefully, particularly if the corresponding stack
# has been swapped out...
class Timer(object):
'''A cancellable one-shot or auto-retriggering timer.'''
__slots__ = [
'__timeout', # Time to wait until triggering timer
'__callback', # Function to call when timer fires
'__retrigger', # Enables retriggering timers
'__reuse', # Set if timer can be reused
'__control', # Event object for controlling timer
'__fire', # Controls action when event timeout occurs
]
def __init__(self, timeout, callback,
retrigger = False, reuse = False, stack_size = 0):
'''The callback will be called (with no arguments) after the specified
timeout. If retrigger is set then the timer will automatically
retrigger until it is cancelled. Unless reuse or retrigger is set the
timer will be cancelled once it fires and cannot be reused.'''
assert callable(callback), 'Ensure the callback is callable'
self.__timeout = timeout
self.__callback = callback
self.__retrigger = retrigger # Auto retrigger on each timeout
self.__reuse = reuse or retrigger # Keep timer alive
self.__control = Event() # Used to control main loop
self.__fire = True # False if control event pending
Spawn(self.__timer, stack_size = stack_size)
def __timer(self):
running = True
while running:
try:
self.__control.Wait(self.__timeout)
except Timedout:
if self.__fire:
if not self.__retrigger:
# Unless we're automatically retriggering, any new
# timeout has to be specified anew.
self.__timeout = None
self.__callback()
else:
self.__fire = True # We've seen the control event
running = self.__reuse
del self.__callback # Try to avoid reference loops
def cancel(self):
'''Cancels the timer: the timer is guaranteed not to fire once this
call has been made. A cancelled timer cannot be reset.'''
self.__reuse = False
self.__fire = False
self.__control.Signal()
def reset(self, timeout, retrigger=None):
'''Resets the timer. The timeout is reset to the given timeout and the
timer is restarted. A timeout of None can be used to temporarily
suspend a timer.'''
assert self.__reuse, 'Cannot reuse this timer'
self.__timeout = timeout
if retrigger is not None:
self.__retrigger = retrigger
self.__fire = False
self.__control.Signal()
def WaitForAll(event_list, timeout = None):
'''Waits for all events in the event list to become ready or for the
timeout to expire.'''
# Make sure that the timeout is actually a deadline, then it's easy to do
# all the waits in sequence.
timeout = AbsTimeout(timeout)
# Unfortunately our waiting can be interrupted by an exception. To avoid
# leaking memory in this case we perform simulated waits on any remaining
# events. This is a good deal more complicated than
# return [event.Wait(timeout) for event in event_list]
# which is what it ought to be!
event_list = list(event_list)
result = []
try:
n = -1 # In case event_list is empty!
for n, event in enumerate(event_list):
result.append(event.Wait(timeout))
finally:
for event in event_list[n+1:]:
event.AbortWait()
return result
# Other possibly desirable entites:
#
# The ability to wait for an event to occur on one of a set of objects.
# This would probably require quite deep hooking into the queueing
# mechanism, and seems of limited value (the natural alternative is to
# create a task per event).
#
# The ability to kill a task
# This is probably doable with something equivalent to the .throw greenlet
# method (or even with a special wakeup value), but may require some care.
_QuitEvent = Event(auto_reset = False)
def Quit():
'''Signals the quit event. Once signalled it stays signalled.'''
_QuitEvent.Signal()
def WaitForQuit(catch_interrupt = True):
'''Waits for the quit event to be signalled. If catch_interrupt is True
then control-C will only signal the quit event and will not generate an
exception; this does mean that the only way to interrupt a misbehaving loop
is to use another signal such as SIGQUIT (C-\)'''
if catch_interrupt:
import signal
def quit(signum, frame):
Callback(_QuitEvent.Signal)
signal.signal(signal.SIGINT, quit)
_QuitEvent.Wait()
# There is only the one scheduler, which we create right away. A dedicated
# scheduler task is created: this allows the main task to suspend, but does
# mean that the scheduler is not the parent of all the tasks it's managing.
_scheduler = _Scheduler.create()
# We hang onto the thread ID for the cothread thread (at present there can
# only be one) so that we can recognise when we're in another thread.
scheduler_thread_id = _thread.get_ident()
# Thread validation: ensure cothreads aren't used across threads!
def _validate_thread():
assert scheduler_thread_id == _thread.get_ident(), \
'Cannot call into cothread from another thread. Consider using ' \
'Callback or CallbackResult.'
# This is the asynchronous callback method.
Callback = _Callback()
def SleepUntil(deadline):
'''Sleep until the specified deadline. Control will always be yielded,
even if the timeout has already passed.'''
_validate_thread()
_scheduler.wait_until(deadline, None, None)
def Sleep(timeout):
'''Sleep until the specified timeout has expired.'''
SleepUntil(GetDeadline(timeout))
def Yield(timeout = 0):
'''Hands control back to the scheduler. Control is returned either after
the specified timeout has passed, or as soon as there are no active jobs
waiting to be run.'''
_validate_thread()
_scheduler.do_yield(GetDeadline(timeout))
[docs]class RLock(object):
"""A reentrant lock."""
__slots__ = [
'__event', # Underlying event object
'__owner', # The coroutine that has locked
'__count', # The number of times the owner has locked
]
def __init__(self):
self.__event = Event()
# Start off with the event set so acquire will not block
self.__event.Signal()
self.__owner = None
self.__count = 0
[docs] def acquire(self, timeout=None):
"""Acquire the lock if necessary and increment the recursion level."""
# Inspired by threading.RLock
me = _coroutine.get_current()
if self.__owner and _coroutine.is_equal(self.__owner, me):
# if we are the owner then just increment the count
self.__count += 1
else:
# otherwise wait until it is unlocked
self.__event.Wait(timeout=timeout)
self.__owner = me
self.__count = 1
[docs] def release(self):
"""Release a lock, decrementing the recursion level."""
assert self.__owner and _coroutine.is_equal(
self.__owner, _coroutine.get_current()), \
"cannot release un-acquired lock"
self.__count -= 1
if self.__count == 0:
self.__owner = None
# Wakeup one cothread waiting on acquire()
self.__event.Signal()
# Needed to make it a context manager
__enter__ = acquire
def __exit__(self, t, v, tb):
self.release()