Cleanup for stow ---STOW
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / tasks.py
diff --git a/.local/lib/python2.7/site-packages/trollius/tasks.py b/.local/lib/python2.7/site-packages/trollius/tasks.py
new file mode 100644 (file)
index 0000000..440a6d8
--- /dev/null
@@ -0,0 +1,754 @@
+"""Support for tasks, coroutines and the scheduler."""
+from __future__ import print_function
+
+__all__ = ['Task',
+           'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
+           'wait', 'wait_for', 'as_completed', 'sleep', 'async',
+           'gather', 'shield', 'ensure_future',
+           ]
+
+import functools
+import linecache
+import traceback
+import warnings
+try:
+    from weakref import WeakSet
+except ImportError:
+    # Python 2.6
+    from .py27_weakrefset import WeakSet
+
+from . import compat
+from . import coroutines
+from . import events
+from . import executor
+from . import futures
+from .locks import Lock, Condition, Semaphore, _ContextManager
+from .coroutines import coroutine, From, Return, ReturnException
+
+
+
+@coroutine
+def _lock_coroutine(lock):
+    yield From(lock.acquire())
+    raise Return(_ContextManager(lock))
+
+
+class Task(futures.Future):
+    """A coroutine wrapped in a Future."""
+
+    # An important invariant maintained while a Task not done:
+    #
+    # - Either _fut_waiter is None, and _step() is scheduled;
+    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
+    #
+    # The only transition from the latter to the former is through
+    # _wakeup().  When _fut_waiter is not None, one of its callbacks
+    # must be _wakeup().
+
+    # Weak set containing all tasks alive.
+    _all_tasks = WeakSet()
+
+    # Dictionary containing tasks that are currently active in
+    # all running event loops.  {EventLoop: Task}
+    _current_tasks = {}
+
+    # If False, don't log a message if the task is destroyed whereas its
+    # status is still pending
+    _log_destroy_pending = True
+
+    @classmethod
+    def current_task(cls, loop=None):
+        """Return the currently running task in an event loop or None.
+
+        By default the current task for the current event loop is returned.
+
+        None is returned when called not in the context of a Task.
+        """
+        if loop is None:
+            loop = events.get_event_loop()
+        return cls._current_tasks.get(loop)
+
+    @classmethod
+    def all_tasks(cls, loop=None):
+        """Return a set of all tasks for an event loop.
+
+        By default all tasks for the current event loop are returned.
+        """
+        if loop is None:
+            loop = events.get_event_loop()
+        return set(t for t in cls._all_tasks if t._loop is loop)
+
+    def __init__(self, coro, loop=None):
+        assert coroutines.iscoroutine(coro), repr(coro)
+        super(Task, self).__init__(loop=loop)
+        if self._source_traceback:
+            del self._source_traceback[-1]
+        self._coro = coro
+        self._fut_waiter = None
+        self._must_cancel = False
+        self._loop.call_soon(self._step)
+        self.__class__._all_tasks.add(self)
+
+    # On Python 3.3 or older, objects with a destructor that are part of a
+    # reference cycle are never destroyed. That's not the case any more on
+    # Python 3.4 thanks to the PEP 442.
+    if compat.PY34:
+        def __del__(self):
+            if self._state == futures._PENDING and self._log_destroy_pending:
+                context = {
+                    'task': self,
+                    'message': 'Task was destroyed but it is pending!',
+                }
+                if self._source_traceback:
+                    context['source_traceback'] = self._source_traceback
+                self._loop.call_exception_handler(context)
+            futures.Future.__del__(self)
+
+    def _repr_info(self):
+        info = super(Task, self)._repr_info()
+
+        if self._must_cancel:
+            # replace status
+            info[0] = 'cancelling'
+
+        coro = coroutines._format_coroutine(self._coro)
+        info.insert(1, 'coro=<%s>' % coro)
+
+        if self._fut_waiter is not None:
+            info.insert(2, 'wait_for=%r' % self._fut_waiter)
+        return info
+
+    def get_stack(self, limit=None):
+        """Return the list of stack frames for this task's coroutine.
+
+        If the coroutine is not done, this returns the stack where it is
+        suspended.  If the coroutine has completed successfully or was
+        cancelled, this returns an empty list.  If the coroutine was
+        terminated by an exception, this returns the list of traceback
+        frames.
+
+        The frames are always ordered from oldest to newest.
+
+        The optional limit gives the maximum number of frames to
+        return; by default all available frames are returned.  Its
+        meaning differs depending on whether a stack or a traceback is
+        returned: the newest frames of a stack are returned, but the
+        oldest frames of a traceback are returned.  (This matches the
+        behavior of the traceback module.)
+
+        For reasons beyond our control, only one stack frame is
+        returned for a suspended coroutine.
+        """
+        frames = []
+        try:
+            # 'async def' coroutines
+            f = self._coro.cr_frame
+        except AttributeError:
+            f = self._coro.gi_frame
+        if f is not None:
+            while f is not None:
+                if limit is not None:
+                    if limit <= 0:
+                        break
+                    limit -= 1
+                frames.append(f)
+                f = f.f_back
+            frames.reverse()
+        elif self._exception is not None:
+            tb = self._exception.__traceback__
+            while tb is not None:
+                if limit is not None:
+                    if limit <= 0:
+                        break
+                    limit -= 1
+                frames.append(tb.tb_frame)
+                tb = tb.tb_next
+        return frames
+
+    def print_stack(self, limit=None, file=None):
+        """Print the stack or traceback for this task's coroutine.
+
+        This produces output similar to that of the traceback module,
+        for the frames retrieved by get_stack().  The limit argument
+        is passed to get_stack().  The file argument is an I/O stream
+        to which the output is written; by default output is written
+        to sys.stderr.
+        """
+        extracted_list = []
+        checked = set()
+        for f in self.get_stack(limit=limit):
+            lineno = f.f_lineno
+            co = f.f_code
+            filename = co.co_filename
+            name = co.co_name
+            if filename not in checked:
+                checked.add(filename)
+                linecache.checkcache(filename)
+            line = linecache.getline(filename, lineno, f.f_globals)
+            extracted_list.append((filename, lineno, name, line))
+        exc = self._exception
+        if not extracted_list:
+            print('No stack for %r' % self, file=file)
+        elif exc is not None:
+            print('Traceback for %r (most recent call last):' % self,
+                  file=file)
+        else:
+            print('Stack for %r (most recent call last):' % self,
+                  file=file)
+        traceback.print_list(extracted_list, file=file)
+        if exc is not None:
+            for line in traceback.format_exception_only(exc.__class__, exc):
+                print(line, file=file, end='')
+
+    def cancel(self):
+        """Request that this task cancel itself.
+
+        This arranges for a CancelledError to be thrown into the
+        wrapped coroutine on the next cycle through the event loop.
+        The coroutine then has a chance to clean up or even deny
+        the request using try/except/finally.
+
+        Unlike Future.cancel, this does not guarantee that the
+        task will be cancelled: the exception might be caught and
+        acted upon, delaying cancellation of the task or preventing
+        cancellation completely.  The task may also return a value or
+        raise a different exception.
+
+        Immediately after this method is called, Task.cancelled() will
+        not return True (unless the task was already cancelled).  A
+        task will be marked as cancelled when the wrapped coroutine
+        terminates with a CancelledError exception (even if cancel()
+        was not called).
+        """
+        if self.done():
+            return False
+        if self._fut_waiter is not None:
+            if self._fut_waiter.cancel():
+                # Leave self._fut_waiter; it may be a Task that
+                # catches and ignores the cancellation so we may have
+                # to cancel it again later.
+                return True
+        # It must be the case that self._step is already scheduled.
+        self._must_cancel = True
+        return True
+
+    def _step(self, value=None, exc=None, exc_tb=None):
+        assert not self.done(), \
+            '_step(): already done: {0!r}, {1!r}, {2!r}'.format(self, value, exc)
+
+        if self._must_cancel:
+            if not isinstance(exc, futures.CancelledError):
+                exc = futures.CancelledError()
+            self._must_cancel = False
+        coro = self._coro
+        self._fut_waiter = None
+
+        if exc_tb is not None:
+            init_exc = exc
+        else:
+            init_exc = None
+        self.__class__._current_tasks[self._loop] = self
+        # Call either coro.throw(exc) or coro.send(value).
+        try:
+            if exc is not None:
+                if exc_tb is not None:
+                   result = coro.throw(exc, None, exc_tb)
+                else:
+                   result = coro.throw(exc)
+            else:
+                result = coro.send(value)
+        # On Python 3.3 and Python 3.4, ReturnException is not used in
+        # practice. But this except is kept to have a single code base
+        # for all Python versions.
+        except coroutines.ReturnException as exc:
+            if isinstance(exc, ReturnException):
+                exc.raised = True
+                result = exc.value
+            else:
+                result = None
+            self.set_result(result)
+        except StopIteration as exc:
+            if compat.PY33:
+                # asyncio Task object? get the result of the coroutine
+                result = exc.value
+            else:
+                if isinstance(exc, ReturnException):
+                    exc.raised = True
+                    result = exc.value
+                else:
+                    result = None
+            self.set_result(result)
+        except futures.CancelledError as exc:
+            super(Task, self).cancel()  # I.e., Future.cancel(self).
+        except BaseException as exc:
+            if exc is init_exc:
+                self._set_exception_with_tb(exc, exc_tb)
+                exc_tb = None
+            else:
+                self.set_exception(exc)
+
+            if not isinstance(exc, Exception):
+                # reraise BaseException
+                raise
+        else:
+            if coroutines._DEBUG:
+                if not coroutines._coroutine_at_yield_from(self._coro):
+                    # trollius coroutine must "yield From(...)"
+                    if not isinstance(result, coroutines.FromWrapper):
+                        self._loop.call_soon(
+                            self._step, None,
+                            RuntimeError("yield used without From"))
+                        return
+                    result = result.obj
+                else:
+                    # asyncio coroutine using "yield from ..."
+                    if isinstance(result, coroutines.FromWrapper):
+                        result = result.obj
+            elif isinstance(result, coroutines.FromWrapper):
+                result = result.obj
+
+            if coroutines.iscoroutine(result):
+                # "yield coroutine" creates a task, the current task
+                # will wait until the new task is done
+                result = self._loop.create_task(result)
+            # FIXME: faster check. common base class? hasattr?
+            elif isinstance(result, (Lock, Condition, Semaphore)):
+                coro = _lock_coroutine(result)
+                result = self._loop.create_task(coro)
+
+            if isinstance(result, futures._FUTURE_CLASSES):
+                # Yielded Future must come from Future.__iter__().
+                result.add_done_callback(self._wakeup)
+                self._fut_waiter = result
+                if self._must_cancel:
+                    if self._fut_waiter.cancel():
+                        self._must_cancel = False
+            elif result is None:
+                # Bare yield relinquishes control for one event loop iteration.
+                self._loop.call_soon(self._step)
+            else:
+                # Yielding something else is an error.
+                self._loop.call_soon(
+                    self._step, None,
+                    RuntimeError(
+                        'Task got bad yield: {0!r}'.format(result)))
+        finally:
+            self.__class__._current_tasks.pop(self._loop)
+            self = None  # Needed to break cycles when an exception occurs.
+
+    def _wakeup(self, future):
+        if (future._state == futures._FINISHED
+        and future._exception is not None):
+            # Get the traceback before calling exception(), because calling
+            # the exception() method clears the traceback
+            exc_tb = future._get_exception_tb()
+            exc = future.exception()
+            self._step(None, exc, exc_tb)
+            exc_tb = None
+        else:
+            try:
+                value = future.result()
+            except Exception as exc:
+                # This may also be a cancellation.
+                self._step(None, exc)
+            else:
+                self._step(value, None)
+        self = None  # Needed to break cycles when an exception occurs.
+
+
+# wait() and as_completed() similar to those in PEP 3148.
+
+# Export symbols in trollius.tasks for compatibility with asyncio
+FIRST_COMPLETED = executor.FIRST_COMPLETED
+FIRST_EXCEPTION = executor.FIRST_EXCEPTION
+ALL_COMPLETED = executor.ALL_COMPLETED
+
+
+@coroutine
+def wait(fs, loop=None, timeout=None, return_when=ALL_COMPLETED):
+    """Wait for the Futures and coroutines given by fs to complete.
+
+    The sequence futures must not be empty.
+
+    Coroutines will be wrapped in Tasks.
+
+    Returns two sets of Future: (done, pending).
+
+    Usage:
+
+        done, pending = yield From(asyncio.wait(fs))
+
+    Note: This does not raise TimeoutError! Futures that aren't done
+    when the timeout occurs are returned in the second set.
+    """
+    if isinstance(fs, futures._FUTURE_CLASSES) or coroutines.iscoroutine(fs):
+        raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
+    if not fs:
+        raise ValueError('Set of coroutines/Futures is empty.')
+    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
+        raise ValueError('Invalid return_when value: {0}'.format(return_when))
+
+    if loop is None:
+        loop = events.get_event_loop()
+
+    fs = set(ensure_future(f, loop=loop) for f in set(fs))
+
+    result = yield From(_wait(fs, timeout, return_when, loop))
+    raise Return(result)
+
+
+def _release_waiter(waiter, *args):
+    if not waiter.done():
+        waiter.set_result(None)
+
+
+@coroutine
+def wait_for(fut, timeout, loop=None):
+    """Wait for the single Future or coroutine to complete, with timeout.
+
+    Coroutine will be wrapped in Task.
+
+    Returns result of the Future or coroutine.  When a timeout occurs,
+    it cancels the task and raises TimeoutError.  To avoid the task
+    cancellation, wrap it in shield().
+
+    If the wait is cancelled, the task is also cancelled.
+
+    This function is a coroutine.
+    """
+    if loop is None:
+        loop = events.get_event_loop()
+
+    if timeout is None:
+        result = yield From(fut)
+        raise Return(result)
+
+    waiter = futures.Future(loop=loop)
+    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
+    cb = functools.partial(_release_waiter, waiter)
+
+    fut = ensure_future(fut, loop=loop)
+    fut.add_done_callback(cb)
+
+    try:
+        # wait until the future completes or the timeout
+        try:
+            yield From(waiter)
+        except futures.CancelledError:
+            fut.remove_done_callback(cb)
+            fut.cancel()
+            raise
+
+        if fut.done():
+            raise Return(fut.result())
+        else:
+            fut.remove_done_callback(cb)
+            fut.cancel()
+            raise futures.TimeoutError()
+    finally:
+        timeout_handle.cancel()
+
+
+@coroutine
+def _wait(fs, timeout, return_when, loop):
+    """Internal helper for wait() and _wait_for().
+
+    The fs argument must be a collection of Futures.
+    """
+    assert fs, 'Set of Futures is empty.'
+    waiter = futures.Future(loop=loop)
+    timeout_handle = None
+    if timeout is not None:
+        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
+    non_local = {'counter': len(fs)}
+
+    def _on_completion(f):
+        non_local['counter'] -= 1
+        if (non_local['counter'] <= 0 or
+            return_when == FIRST_COMPLETED or
+            return_when == FIRST_EXCEPTION and (not f.cancelled() and
+                                                f.exception() is not None)):
+            if timeout_handle is not None:
+                timeout_handle.cancel()
+            if not waiter.done():
+                waiter.set_result(None)
+
+    for f in fs:
+        f.add_done_callback(_on_completion)
+
+    try:
+        yield From(waiter)
+    finally:
+        if timeout_handle is not None:
+            timeout_handle.cancel()
+
+    done, pending = set(), set()
+    for f in fs:
+        f.remove_done_callback(_on_completion)
+        if f.done():
+            done.add(f)
+        else:
+            pending.add(f)
+    raise Return(done, pending)
+
+
+# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
+def as_completed(fs, loop=None, timeout=None):
+    """Return an iterator whose values are coroutines.
+
+    When waiting for the yielded coroutines you'll get the results (or
+    exceptions!) of the original Futures (or coroutines), in the order
+    in which and as soon as they complete.
+
+    This differs from PEP 3148; the proper way to use this is:
+
+        for f in as_completed(fs):
+            result = yield From(f)  # The 'yield' may raise.
+            # Use result.
+
+    If a timeout is specified, the 'yield' will raise
+    TimeoutError when the timeout occurs before all Futures are done.
+
+    Note: The futures 'f' are not necessarily members of fs.
+    """
+    if isinstance(fs, futures._FUTURE_CLASSES) or coroutines.iscoroutine(fs):
+        raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
+    loop = loop if loop is not None else events.get_event_loop()
+    todo = set(ensure_future(f, loop=loop) for f in set(fs))
+    from .queues import Queue  # Import here to avoid circular import problem.
+    done = Queue(loop=loop)
+    timeout_handle = None
+
+    def _on_timeout():
+        for f in todo:
+            f.remove_done_callback(_on_completion)
+            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
+        todo.clear()  # Can't do todo.remove(f) in the loop.
+
+    def _on_completion(f):
+        if not todo:
+            return  # _on_timeout() was here first.
+        todo.remove(f)
+        done.put_nowait(f)
+        if not todo and timeout_handle is not None:
+            timeout_handle.cancel()
+
+    @coroutine
+    def _wait_for_one():
+        f = yield From(done.get())
+        if f is None:
+            # Dummy value from _on_timeout().
+            raise futures.TimeoutError
+        raise Return(f.result())   # May raise f.exception().
+
+    for f in todo:
+        f.add_done_callback(_on_completion)
+    if todo and timeout is not None:
+        timeout_handle = loop.call_later(timeout, _on_timeout)
+    for _ in range(len(todo)):
+        yield _wait_for_one()
+
+
+@coroutine
+def sleep(delay, result=None, loop=None):
+    """Coroutine that completes after a given time (in seconds)."""
+    future = futures.Future(loop=loop)
+    h = future._loop.call_later(delay,
+                                future._set_result_unless_cancelled, result)
+    try:
+        result = yield From(future)
+        raise Return(result)
+    finally:
+        h.cancel()
+
+
+def async(coro_or_future, loop=None):
+    """Wrap a coroutine in a future.
+
+    If the argument is a Future, it is returned directly.
+
+    This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
+    """
+
+    warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
+                  DeprecationWarning)
+
+    return ensure_future(coro_or_future, loop=loop)
+
+
+def ensure_future(coro_or_future, loop=None):
+    """Wrap a coroutine in a future.
+
+    If the argument is a Future, it is returned directly.
+    """
+    # FIXME: only check if coroutines._DEBUG is True?
+    if isinstance(coro_or_future, coroutines.FromWrapper):
+        coro_or_future = coro_or_future.obj
+    if isinstance(coro_or_future, futures._FUTURE_CLASSES):
+        if loop is not None and loop is not coro_or_future._loop:
+            raise ValueError('loop argument must agree with Future')
+        return coro_or_future
+    elif coroutines.iscoroutine(coro_or_future):
+        if loop is None:
+            loop = events.get_event_loop()
+        task = loop.create_task(coro_or_future)
+        if task._source_traceback:
+            del task._source_traceback[-1]
+        return task
+    else:
+        raise TypeError('A Future or coroutine is required')
+
+
+class _GatheringFuture(futures.Future):
+    """Helper for gather().
+
+    This overrides cancel() to cancel all the children and act more
+    like Task.cancel(), which doesn't immediately mark itself as
+    cancelled.
+    """
+
+    def __init__(self, children, loop=None):
+        super(_GatheringFuture, self).__init__(loop=loop)
+        self._children = children
+
+    def cancel(self):
+        if self.done():
+            return False
+        for child in self._children:
+            child.cancel()
+        return True
+
+
+def gather(*coros_or_futures, **kw):
+    """Return a future aggregating results from the given coroutines
+    or futures.
+
+    All futures must share the same event loop.  If all the tasks are
+    done successfully, the returned future's result is the list of
+    results (in the order of the original sequence, not necessarily
+    the order of results arrival).  If *return_exceptions* is True,
+    exceptions in the tasks are treated the same as successful
+    results, and gathered in the result list; otherwise, the first
+    raised exception will be immediately propagated to the returned
+    future.
+
+    Cancellation: if the outer Future is cancelled, all children (that
+    have not completed yet) are also cancelled.  If any child is
+    cancelled, this is treated as if it raised CancelledError --
+    the outer Future is *not* cancelled in this case.  (This is to
+    prevent the cancellation of one child to cause other children to
+    be cancelled.)
+    """
+    loop = kw.pop('loop', None)
+    return_exceptions = kw.pop('return_exceptions', False)
+    if kw:
+        raise TypeError("unexpected keyword")
+
+    if not coros_or_futures:
+        outer = futures.Future(loop=loop)
+        outer.set_result([])
+        return outer
+
+    arg_to_fut = {}
+    for arg in set(coros_or_futures):
+        if not isinstance(arg, futures._FUTURE_CLASSES):
+            fut = ensure_future(arg, loop=loop)
+            if loop is None:
+                loop = fut._loop
+            # The caller cannot control this future, the "destroy pending task"
+            # warning should not be emitted.
+            fut._log_destroy_pending = False
+        else:
+            fut = arg
+            if loop is None:
+                loop = fut._loop
+            elif fut._loop is not loop:
+                raise ValueError("futures are tied to different event loops")
+        arg_to_fut[arg] = fut
+
+    children = [arg_to_fut[arg] for arg in coros_or_futures]
+    nchildren = len(children)
+    outer = _GatheringFuture(children, loop=loop)
+    non_local = {'nfinished': 0}
+    results = [None] * nchildren
+
+    def _done_callback(i, fut):
+        if outer.done():
+            if not fut.cancelled():
+                # Mark exception retrieved.
+                fut.exception()
+            return
+
+        if fut.cancelled():
+            res = futures.CancelledError()
+            if not return_exceptions:
+                outer.set_exception(res)
+                return
+        elif fut._exception is not None:
+            res = fut.exception()  # Mark exception retrieved.
+            if not return_exceptions:
+                outer.set_exception(res)
+                return
+        else:
+            res = fut._result
+        results[i] = res
+        non_local['nfinished'] += 1
+        if non_local['nfinished'] == nchildren:
+            outer.set_result(results)
+
+    for i, fut in enumerate(children):
+        fut.add_done_callback(functools.partial(_done_callback, i))
+    return outer
+
+
+def shield(arg, loop=None):
+    """Wait for a future, shielding it from cancellation.
+
+    The statement
+
+        res = yield From(shield(something()))
+
+    is exactly equivalent to the statement
+
+        res = yield From(something())
+
+    *except* that if the coroutine containing it is cancelled, the
+    task running in something() is not cancelled.  From the POV of
+    something(), the cancellation did not happen.  But its caller is
+    still cancelled, so the yield-from expression still raises
+    CancelledError.  Note: If something() is cancelled by other means
+    this will still cancel shield().
+
+    If you want to completely ignore cancellation (not recommended)
+    you can combine shield() with a try/except clause, as follows:
+
+        try:
+            res = yield From(shield(something()))
+        except CancelledError:
+            res = None
+    """
+    inner = ensure_future(arg, loop=loop)
+    if inner.done():
+        # Shortcut.
+        return inner
+    loop = inner._loop
+    outer = futures.Future(loop=loop)
+
+    def _done_callback(inner):
+        if outer.cancelled():
+            if not inner.cancelled():
+                # Mark inner's result as retrieved.
+                inner.exception()
+            return
+
+        if inner.cancelled():
+            outer.cancel()
+        else:
+            exc = inner.exception()
+            if exc is not None:
+                outer.set_exception(exc)
+            else:
+                outer.set_result(inner.result())
+
+    inner.add_done_callback(_done_callback)
+    return outer