import contextlib
import heapq
import collections
from functools import partial
from Queue import Full, Empty

import tornado
from tornado import ioloop
from tornado import gen
from tornado.concurrent import Future

version_tuple = (1, 0, 1, 'dev0')

version = '.'.join(map(str, version_tuple))
"""Current version of Toro."""

__all__ = [
    # Exceptions
    'NotReady', 'AlreadySet', 'Full', 'Empty', 'Timeout',

    # Primitives
    'AsyncResult', 'Event', 'Condition',  'Semaphore', 'BoundedSemaphore',

    # Queues
    'Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue'

[docs]class NotReady(Exception): """Raised when accessing an :class:`AsyncResult` that has no value yet.""" pass
[docs]class AlreadySet(Exception): """Raised when setting a value on an :class:`AsyncResult` that already has one.""" pass
[docs]class Timeout(Exception): """Raised when a deadline passes before a Future is ready.""" def __str__(self): return "Timeout"
class _TimeoutFuture(Future): def __init__(self, deadline, io_loop): """Create a Future with optional deadline. If deadline is not None, it may be a number denoting a unix timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` object for a deadline relative to the current time. set_exception(toro.Timeout()) is executed after a timeout. """ super(_TimeoutFuture, self).__init__() self.io_loop = io_loop if deadline is not None: callback = partial(self.set_exception, Timeout()) self._timeout_handle = io_loop.add_timeout(deadline, callback) else: self._timeout_handle = None def set_result(self, result): self._cancel_timeout() super(_TimeoutFuture, self).set_result(result) def set_exception(self, exception): self._cancel_timeout() super(_TimeoutFuture, self).set_exception(exception) def _cancel_timeout(self): if self._timeout_handle: self.io_loop.remove_timeout(self._timeout_handle) self._timeout_handle = None class _ContextManagerList(list): def __enter__(self, *args, **kwargs): for obj in self: obj.__enter__(*args, **kwargs) def __exit__(self, *args, **kwargs): for obj in self: obj.__exit__(*args, **kwargs) class _ContextManagerFuture(Future): """A Future that can be used with the "with" statement. When a coroutine yields this Future, the return value is a context manager that can be used like: with (yield future): pass At the end of the block, the Future's exit callback is run. Used for Lock.acquire, Semaphore.acquire, RWLock.acquire_read / acquire_write. """ def __init__(self, wrapped, exit_callback): super(_ContextManagerFuture, self).__init__() wrapped.add_done_callback(self._done_callback) self.exit_callback = exit_callback def _done_callback(self, wrapped): if wrapped.exception(): self.set_exception(wrapped.exception()) else: self.set_result(wrapped.result()) def result(self): if self.exception(): raise self.exception() # Otherwise return a context manager that cleans up after the block. @contextlib.contextmanager def f(): try: yield finally: self.exit_callback() return f() def _consume_expired_waiters(waiters): # Delete waiters at the head of the queue who've timed out while waiters and waiters[0].done(): waiters.popleft() _null_result = object()
[docs]class AsyncResult(object): """A one-time event that stores a value or an exception. The only distinction between AsyncResult and a simple Future is that AsyncResult lets coroutines wait with a deadline. The deadline can be configured separately for each waiter. An :class:`AsyncResult` instance cannot be reset. :Parameters: - `io_loop`: Optional custom IOLoop. """ def __init__(self, io_loop=None): self.io_loop = io_loop or ioloop.IOLoop.current() self.value = _null_result self.waiters = [] def __str__(self): result = '<%s ' % (self.__class__.__name__, ) if self.ready(): result += 'value=%r' % self.value else: result += 'unset' if self.waiters: result += ' waiters[%s]' % len(self.waiters) return result + '>'
[docs] def set(self, value): """Set a value and wake up all the waiters.""" if self.ready(): raise AlreadySet self.value = value waiters, self.waiters = self.waiters, [] for waiter in waiters: if not waiter.done(): # Might have timed out waiter.set_result(value)
def ready(self): return self.value is not _null_result
[docs] def get(self, deadline=None): """Get a value once :meth:`set` is called. Returns a Future. The Future's result will be the value. The Future raises :exc:`toro.Timeout` if no value is set before the deadline. :Parameters: - `deadline`: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ future = _TimeoutFuture(deadline, self.io_loop) if self.ready(): future.set_result(self.value) else: self.waiters.append(future) return future
[docs] def get_nowait(self): """Get the value if ready, or raise :class:`NotReady`.""" if self.ready(): return self.value else: raise NotReady
[docs]class Condition(object): """A condition allows one or more coroutines to wait until notified. Like a standard Condition_, but does not need an underlying lock that is acquired and released. .. _Condition: :Parameters: - `io_loop`: Optional custom IOLoop. """ def __init__(self, io_loop=None): self.io_loop = io_loop or ioloop.IOLoop.current() self.waiters = collections.deque() # Queue of _Waiter objects def __str__(self): result = '<%s' % (self.__class__.__name__, ) if self.waiters: result += ' waiters[%s]' % len(self.waiters) return result + '>'
[docs] def wait(self, deadline=None): """Wait for :meth:`notify`. Returns a Future. :exc:`~toro.Timeout` is executed after a timeout. :Parameters: - `deadline`: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ future = _TimeoutFuture(deadline, self.io_loop) self.waiters.append(future) return future
[docs] def notify(self, n=1): """Wake up `n` waiters. :Parameters: - `n`: The number of waiters to awaken (default: 1) """ waiters = [] # Waiters we plan to run right now while n and self.waiters: waiter = self.waiters.popleft() if not waiter.done(): # Might have timed out n -= 1 waiters.append(waiter) for waiter in waiters: waiter.set_result(None)
[docs] def notify_all(self): """Wake up all waiters.""" self.notify(len(self.waiters))
# TODO: show correct examples that avoid thread / process issues w/ concurrent.futures.Future
[docs]class Event(object): """An event blocks coroutines until its internal flag is set to True. Similar to threading.Event_. .. _threading.Event: .. seealso:: :doc:`examples/event_example` :Parameters: - `io_loop`: Optional custom IOLoop. """ def __init__(self, io_loop=None): self.io_loop = io_loop or ioloop.IOLoop.current() self.condition = Condition(io_loop=io_loop) self._flag = False def __str__(self): return '<%s %s>' % ( self.__class__.__name__, 'set' if self._flag else 'clear')
[docs] def is_set(self): """Return ``True`` if and only if the internal flag is true.""" return self._flag
[docs] def set(self): """Set the internal flag to ``True``. All waiters are awakened. Calling :meth:`wait` once the flag is true will not block. """ self._flag = True self.condition.notify_all()
[docs] def clear(self): """Reset the internal flag to ``False``. Calls to :meth:`wait` will block until :meth:`set` is called. """ self._flag = False
[docs] def wait(self, deadline=None): """Block until the internal flag is true. Returns a Future. The Future raises :exc:`~toro.Timeout` after a timeout. :Parameters: - `callback`: Function taking no arguments. - `deadline`: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ if self._flag: future = _TimeoutFuture(None, self.io_loop) future.set_result(None) return future else: return self.condition.wait(deadline)
[docs]class Queue(object): """Create a queue object with a given maximum size. If `maxsize` is 0 (the default) the queue size is unbounded. Unlike the `standard Queue`_, you can reliably know this Queue's size with :meth:`qsize`, since your single-threaded Tornado application won't be interrupted between calling :meth:`qsize` and doing an operation on the Queue. **Examples:** :doc:`examples/producer_consumer_example` :doc:`examples/web_spider_example` :Parameters: - `maxsize`: Optional size limit (no limit by default). - `io_loop`: Optional custom IOLoop. .. _`Gevent's Queue`: .. _`standard Queue`: """ def __init__(self, maxsize=0, io_loop=None): self.io_loop = io_loop or ioloop.IOLoop.current() if maxsize is None: raise TypeError("maxsize can't be None") if maxsize < 0: raise ValueError("maxsize can't be negative") self._maxsize = maxsize # _TimeoutFutures self.getters = collections.deque([]) # Pairs of (item, _TimeoutFuture) self.putters = collections.deque([]) self._init(maxsize) # These three are overridable in subclasses. def _init(self, maxsize): self.queue = collections.deque() def _get(self): return self.queue.popleft() def _put(self, item): self.queue.append(item) def __repr__(self): return '<%s at %s %s>' % ( type(self).__name__, hex(id(self)), self._format()) def __str__(self): return '<%s %s>' % (type(self).__name__, self._format()) def _format(self): result = 'maxsize=%r' % (self.maxsize, ) if getattr(self, 'queue', None): result += ' queue=%r' % self.queue if self.getters: result += ' getters[%s]' % len(self.getters) if self.putters: result += ' putters[%s]' % len(self.putters) return result def _consume_expired_putters(self): # Delete waiters at the head of the queue who've timed out while self.putters and self.putters[0][1].done(): self.putters.popleft()
[docs] def qsize(self): """Number of items in the queue""" return len(self.queue)
@property def maxsize(self): """Number of items allowed in the queue.""" return self._maxsize
[docs] def empty(self): """Return ``True`` if the queue is empty, ``False`` otherwise.""" return not self.queue
[docs] def full(self): """Return ``True`` if there are `maxsize` items in the queue. .. note:: if the Queue was initialized with `maxsize=0` (the default), then :meth:`full` is never ``True``. """ if self.maxsize == 0: return False else: return self.maxsize <= self.qsize()
[docs] def put(self, item, deadline=None): """Put an item into the queue. Returns a Future. The Future blocks until a free slot is available for `item`, or raises :exc:`toro.Timeout`. :Parameters: - `deadline`: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ _consume_expired_waiters(self.getters) future = _TimeoutFuture(deadline, self.io_loop) if self.getters: assert not self.queue, "queue non-empty, why are getters waiting?" getter = self.getters.popleft() # Use _put and _get instead of passing item straight to getter, in # case a subclass has logic that must run (e.g. JoinableQueue). self._put(item) getter.set_result(self._get()) future.set_result(None) else: if self.maxsize and self.maxsize <= self.qsize(): self.putters.append((item, future)) else: self._put(item) future.set_result(None) return future
[docs] def put_nowait(self, item): """Put an item into the queue without blocking. If no free slot is immediately available, raise queue.Full. """ _consume_expired_waiters(self.getters) if self.getters: assert not self.queue, "queue non-empty, why are getters waiting?" getter = self.getters.popleft() self._put(item) getter.set_result(self._get()) elif self.maxsize and self.maxsize <= self.qsize(): raise Full else: self._put(item)
[docs] def get(self, deadline=None): """Remove and return an item from the queue. Returns a Future. The Future blocks until an item is available, or raises :exc:`toro.Timeout`. :Parameters: - `deadline`: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ self._consume_expired_putters() future = _TimeoutFuture(deadline, self.io_loop) if self.putters: assert self.full(), "queue not full, why are putters waiting?" item, putter = self.putters.popleft() self._put(item) putter.set_result(None) future.set_result(self._get()) elif self.qsize(): future.set_result(self._get()) else: self.getters.append(future) return future
[docs] def get_nowait(self): """Remove and return an item from the queue without blocking. Return an item if one is immediately available, else raise :exc:`queue.Empty`. """ self._consume_expired_putters() if self.putters: assert self.full(), "queue not full, why are putters waiting?" item, putter = self.putters.popleft() self._put(item) putter.set_result(None) return self._get() elif self.qsize(): return self._get() else: raise Empty
[docs]class PriorityQueue(Queue): """A subclass of :class:`Queue` that retrieves entries in priority order (lowest first). Entries are typically tuples of the form: ``(priority number, data)``. :Parameters: - `maxsize`: Optional size limit (no limit by default). - `initial`: Optional sequence of initial items. - `io_loop`: Optional custom IOLoop. """ def _init(self, maxsize): self.queue = [] def _put(self, item, heappush=heapq.heappush): heappush(self.queue, item) def _get(self, heappop=heapq.heappop): return heappop(self.queue)
[docs]class LifoQueue(Queue): """A subclass of :class:`Queue` that retrieves most recently added entries first. :Parameters: - `maxsize`: Optional size limit (no limit by default). - `initial`: Optional sequence of initial items. - `io_loop`: Optional custom IOLoop. """ def _init(self, maxsize): self.queue = [] def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
[docs]class JoinableQueue(Queue): """A subclass of :class:`Queue` that additionally has :meth:`task_done` and :meth:`join` methods. .. seealso:: :doc:`examples/web_spider_example` :Parameters: - `maxsize`: Optional size limit (no limit by default). - `initial`: Optional sequence of initial items. - `io_loop`: Optional custom IOLoop. """ def __init__(self, maxsize=0, io_loop=None): Queue.__init__(self, maxsize=maxsize, io_loop=io_loop) self.unfinished_tasks = 0 self._finished = Event(io_loop) self._finished.set() def _format(self): result = Queue._format(self) if self.unfinished_tasks: result += ' tasks=%s' % self.unfinished_tasks return result def _put(self, item): self.unfinished_tasks += 1 self._finished.clear() Queue._put(self, item)
[docs] def task_done(self): """Indicate that a formerly enqueued task is complete. Used by queue consumers. For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to :meth:`task_done` tells the queue that the processing on the task is complete. If a :meth:`join` is currently blocking, it will resume when all items have been processed (meaning that a :meth:`task_done` call was received for every item that had been :meth:`put <Queue.put>` into the queue). Raises ``ValueError`` if called more times than there were items placed in the queue. """ if self.unfinished_tasks <= 0: raise ValueError('task_done() called too many times') self.unfinished_tasks -= 1 if self.unfinished_tasks == 0: self._finished.set()
[docs] def join(self, deadline=None): """Block until all items in the queue are processed. Returns a Future. The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls :meth:`task_done` to indicate that all work on the item is complete. When the count of unfinished tasks drops to zero, :meth:`join` unblocks. The Future raises :exc:`toro.Timeout` if the count is not zero before the deadline. :Parameters: - `deadline`: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ return self._finished.wait(deadline)
[docs]class Semaphore(object): """A lock that can be acquired a fixed number of times before blocking. A Semaphore manages a counter representing the number of release() calls minus the number of acquire() calls, plus an initial value. The acquire() method blocks if necessary until it can return without making the counter negative. If not given, value defaults to 1. :meth:`acquire` supports the context manager protocol: >>> from tornado import gen >>> import toro >>> semaphore = toro.Semaphore() >>> >>> @gen.coroutine ... def f(): ... with (yield semaphore.acquire()): ... assert semaphore.locked() ... ... assert not semaphore.locked() .. note:: Unlike the standard threading.Semaphore_, a :class:`Semaphore` can tell you the current value of its :attr:`counter`, because code in a single-threaded Tornado app can check these values and act upon them without fear of interruption from another thread. .. _threading.Semaphore: .. seealso:: :doc:`examples/web_spider_example` :Parameters: - `value`: An int, the initial value (default 1). - `io_loop`: Optional custom IOLoop. """ def __init__(self, value=1, io_loop=None): if value < 0: raise ValueError('semaphore initial value must be >= 0') # The semaphore is implemented as a Queue with 'value' objects self.q = Queue(io_loop=io_loop) for _ in range(value): self.q.put_nowait(None) self._unlocked = Event(io_loop=io_loop) if value: self._unlocked.set() def __repr__(self): return '<%s at %s%s>' % ( type(self).__name__, hex(id(self)), self._format()) def __str__(self): return '<%s%s>' % ( self.__class__.__name__, self._format()) def _format(self): return ' counter=%s' % self.counter @property def counter(self): """An integer, the current semaphore value""" return self.q.qsize()
[docs] def locked(self): """True if :attr:`counter` is zero""" return self.q.empty()
[docs] def release(self): """Increment :attr:`counter` and wake one waiter. """ self.q.put(None) if not self.locked(): # No one was waiting on acquire(), so self.q.qsize() is positive self._unlocked.set()
[docs] def wait(self, deadline=None): """Wait for :attr:`locked` to be False. Returns a Future. The Future raises :exc:`toro.Timeout` after the deadline. :Parameters: - `deadline`: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ return self._unlocked.wait(deadline)
[docs] def acquire(self, deadline=None): """Decrement :attr:`counter`. Returns a Future. Block if the counter is zero and wait for a :meth:`release`. The Future raises :exc:`toro.Timeout` after the deadline. :Parameters: - `deadline`: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ queue_future = self.q.get(deadline) if self.q.empty(): self._unlocked.clear() future = _ContextManagerFuture(queue_future, self.release) return future
def __enter__(self): raise RuntimeError( "Use Semaphore like 'with (yield semaphore)', not like" " 'with semaphore'") __exit__ = __enter__
[docs]class BoundedSemaphore(Semaphore): """A semaphore that prevents release() being called too often. A bounded semaphore checks to make sure its current value doesn't exceed its initial value. If it does, ``ValueError`` is raised. In most situations semaphores are used to guard resources with limited capacity. If the semaphore is released too many times it's a sign of a bug. If not given, *value* defaults to 1. .. seealso:: :doc:`examples/web_spider_example` """ def __init__(self, value=1, io_loop=None): super(BoundedSemaphore, self).__init__(value=value, io_loop=io_loop) self._initial_value = value def release(self): if self.counter >= self._initial_value: raise ValueError("Semaphore released too many times") return super(BoundedSemaphore, self).release()
[docs]class Lock(object): """A lock for coroutines. It is created unlocked. When unlocked, :meth:`acquire` changes the state to locked. When the state is locked, yielding :meth:`acquire` waits until a call to :meth:`release`. The :meth:`release` method should only be called in the locked state; an attempt to release an unlocked lock raises RuntimeError. When more than one coroutine is waiting for the lock, the first one registered is awakened by :meth:`release`. :meth:`acquire` supports the context manager protocol: >>> from tornado import gen >>> import toro >>> lock = toro.Lock() >>> >>> @gen.coroutine ... def f(): ... with (yield lock.acquire()): ... assert lock.locked() ... ... assert not lock.locked() .. note:: Unlike with the standard threading.Lock_, code in a single-threaded Tornado application can check if a :class:`Lock` is :meth:`locked`, and act on that information without fear that another thread has grabbed the lock, provided you do not yield to the IOLoop between checking :meth:`locked` and using a protected resource. .. _threading.Lock: .. seealso:: :doc:`examples/lock_example` :Parameters: - `io_loop`: Optional custom IOLoop. """ def __init__(self, io_loop=None): self._block = BoundedSemaphore(value=1, io_loop=io_loop) def __str__(self): return "<%s _block=%s>" % ( self.__class__.__name__, self._block)
[docs] def acquire(self, deadline=None): """Attempt to lock. Returns a Future. The Future raises :exc:`toro.Timeout` if the deadline passes. :Parameters: - `deadline`: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ return self._block.acquire(deadline)
[docs] def release(self): """Unlock. If any coroutines are waiting for :meth:`acquire`, the first in line is awakened. If not locked, raise a RuntimeError. """ if not self.locked(): raise RuntimeError('release unlocked lock') self._block.release()
[docs] def locked(self): """``True`` if the lock has been acquired""" return self._block.locked()
def __enter__(self): raise RuntimeError( "Use Lock like 'with (yield lock)', not like" " 'with lock'") __exit__ = __enter__
if tornado.version_info[:2] >= (4, 2): tornado_multi_future = gen.multi_future else: tornado_multi_future = lambda futures, quiet_exceptions: futures
[docs]class RWLock(object): """A reader-writer lock for coroutines. It is created unlocked. When unlocked, :meth:`acquire_write` always changes the state to locked. When unlocked, :meth:`acquire_read` can changed the state to locked, if :meth:`acquire_read` was called max_readers times. When the state is locked, yielding :meth:`acquire_read`/meth:`acquire_write` waits until a call to :meth:`release_write` in case of locking on write, or :meth:`release_read` in case of locking on read. The :meth:`release_read` method should only be called in the locked-on-read state; an attempt to release an unlocked lock raises RuntimeError. The :meth:`release_write` method should only be called in the locked on write state; an attempt to release an unlocked lock raises RuntimeError. When more than one coroutine is waiting for the lock, the first one registered is awakened by :meth:`release_read`/:meth:`release_write`. :meth:`acquire_read`/:meth:`acquire_write` support the context manager protocol: >>> from tornado import gen >>> import toro >>> lock = toro.RWLock(max_readers=10) >>> >>> @gen.coroutine ... def f(): ... with (yield lock.acquire_read()): ... assert not lock.locked() ... ... with (yield lock.acquire_write()): ... assert lock.locked() ... ... assert not lock.locked() :Parameters: - `max_readers`: Optional max readers value, default 1. - `io_loop`: Optional custom IOLoop. """ def __init__(self, max_readers=1, io_loop=None): self._max_readers = max_readers self._block = BoundedSemaphore(value=max_readers, io_loop=io_loop) def __str__(self): return "<%s _block=%s>" % ( self.__class__.__name__, self._block)
[docs] def acquire_read(self, deadline=None): """Attempt to lock for read. Returns a Future. The Future raises :exc:`toro.Timeout` if the deadline passes. :Parameters: - `deadline`: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ return self._block.acquire(deadline)
[docs] def acquire_write(self, deadline=None): """Attempt to lock for write. Returns a Future. The Future raises :exc:`toro.Timeout` if the deadline passes. :Parameters: - `deadline`: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ futures = [self._block.acquire(deadline) for _ in xrange(self._max_readers)] try: managers = yield tornado_multi_future(futures, quiet_exceptions=Timeout) except Timeout: for f in futures: # Avoid traceback logging. f.exception() raise raise gen.Return(_ContextManagerList(managers))
[docs] def release_read(self): """Releases one reader. If any coroutines are waiting for :meth:`acquire_read` (in case of full readers queue), the first in line is awakened. If not locked, raise a RuntimeError. """ if self._block.counter == self._max_readers: raise RuntimeError('release unlocked lock') self._block.release()
[docs] def release_write(self): """Releases after write. The first in queue will be awakened after release. If not locked, raise a RuntimeError. """ if not self.locked(): raise RuntimeError('release unlocked lock') for i in xrange(self._max_readers): self._block.release()
[docs] def locked(self): """``True`` if the lock has been acquired""" return self._block.locked()
def __enter__(self): raise RuntimeError( "Use RWLock like 'with (yield lock)', not like" " 'with lock'") __exit__ = __enter__