Cleanup for stow ---STOW
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / coroutines.py
diff --git a/.local/lib/python2.7/site-packages/trollius/coroutines.py b/.local/lib/python2.7/site-packages/trollius/coroutines.py
new file mode 100644 (file)
index 0000000..6a06989
--- /dev/null
@@ -0,0 +1,496 @@
+__all__ = ['coroutine',
+           'iscoroutinefunction', 'iscoroutine',
+           'From', 'Return']
+
+import functools
+import inspect
+import opcode
+import os
+import sys
+import textwrap
+import traceback
+import types
+
+from . import compat
+from . import events
+from . import futures
+from .log import logger
+
+
+# Opcode of "yield from" instruction
+_YIELD_FROM = opcode.opmap.get('YIELD_FROM', None)
+
+# If you set _DEBUG to true, @coroutine will wrap the resulting
+# generator objects in a CoroWrapper instance (defined below).  That
+# instance will log a message when the generator is never iterated
+# over, which may happen when you forget to use "yield from" with a
+# coroutine call.  Note that the value of the _DEBUG flag is taken
+# when the decorator is used, so to be of any use it must be set
+# before you define your coroutines.  A downside of using this feature
+# is that tracebacks show entries for the CoroWrapper.__next__ method
+# when _DEBUG is true.
+_DEBUG = bool(os.environ.get('TROLLIUSDEBUG'))
+
+
+try:
+    _types_coroutine = types.coroutine
+except AttributeError:
+    _types_coroutine = None
+
+try:
+    _inspect_iscoroutinefunction = inspect.iscoroutinefunction
+except AttributeError:
+    _inspect_iscoroutinefunction = lambda func: False
+
+try:
+    from collections.abc import Coroutine as _CoroutineABC, \
+                                Awaitable as _AwaitableABC
+except ImportError:
+    _CoroutineABC = _AwaitableABC = None
+
+
+if _YIELD_FROM is not None:
+    # Check for CPython issue #21209
+    exec('''if 1:
+        def has_yield_from_bug():
+            class MyGen:
+                def __init__(self):
+                    self.send_args = None
+                def __iter__(self):
+                    return self
+                def __next__(self):
+                    return 42
+                def send(self, *what):
+                    self.send_args = what
+                    return None
+            def yield_from_gen(gen):
+                yield from gen
+            value = (1, 2, 3)
+            gen = MyGen()
+            coro = yield_from_gen(gen)
+            next(coro)
+            coro.send(value)
+            return gen.send_args != (value,)
+''')
+    _YIELD_FROM_BUG = has_yield_from_bug()
+    del has_yield_from_bug
+else:
+    _YIELD_FROM_BUG = False
+
+
+if compat.PY35:
+    return_base_class = Exception
+else:
+    return_base_class = StopIteration
+
+class ReturnException(return_base_class):
+    def __init__(self, *args):
+        return_base_class.__init__(self)
+        if not args:
+            self.value = None
+        elif len(args) == 1:
+            self.value = args[0]
+        else:
+            self.value = args
+        self.raised = False
+        if _DEBUG:
+            frame = sys._getframe(1)
+            self._source_traceback = traceback.extract_stack(frame)
+            # explicitly clear the reference to avoid reference cycles
+            frame = None
+        else:
+            self._source_traceback = None
+
+    def __del__(self):
+        if self.raised:
+            return
+
+        fmt = 'Return(%r) used without raise'
+        if self._source_traceback:
+            fmt += '\nReturn created at (most recent call last):\n'
+            tb = ''.join(traceback.format_list(self._source_traceback))
+            fmt += tb.rstrip()
+        logger.error(fmt, self.value)
+
+
+if compat.PY33 and not compat.PY35:
+    # Don't use the Return class on Python 3.3 and 3.4 to support asyncio
+    # coroutines (to avoid the warning emited in Return destructor).
+    #
+    # The problem is that ReturnException inherits from StopIteration.
+    # "yield from trollius_coroutine". Task._step() does not receive the Return
+    # exception, because "yield from" handles it internally. So it's not
+    # possible to set the raised attribute to True to avoid the warning in
+    # Return destructor.
+    def Return(*args):
+        if not args:
+            value = None
+        elif len(args) == 1:
+            value = args[0]
+        else:
+            value = args
+        return StopIteration(value)
+else:
+    Return = ReturnException
+
+
+def debug_wrapper(gen):
+    # This function is called from 'sys.set_coroutine_wrapper'.
+    # We only wrap here coroutines defined via 'async def' syntax.
+    # Generator-based coroutines are wrapped in @coroutine
+    # decorator.
+    return CoroWrapper(gen, None)
+
+
+def _coroutine_at_yield_from(coro):
+    """Test if the last instruction of a coroutine is "yield from".
+
+    Return False if the coroutine completed.
+    """
+    frame = coro.gi_frame
+    if frame is None:
+        return False
+    code = coro.gi_code
+    assert frame.f_lasti >= 0
+    offset = frame.f_lasti + 1
+    instr = code.co_code[offset]
+    return (instr == _YIELD_FROM)
+
+
+class CoroWrapper:
+    # Wrapper for coroutine object in _DEBUG mode.
+
+    def __init__(self, gen, func=None):
+        assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen
+        self.gen = gen
+        self.func = func # Used to unwrap @coroutine decorator
+        self._source_traceback = traceback.extract_stack(sys._getframe(1))
+        self.__name__ = getattr(gen, '__name__', None)
+        self.__qualname__ = getattr(gen, '__qualname__', None)
+
+    def __repr__(self):
+        coro_repr = _format_coroutine(self)
+        if self._source_traceback:
+            frame = self._source_traceback[-1]
+            coro_repr += ', created at %s:%s' % (frame[0], frame[1])
+        return '<%s %s>' % (self.__class__.__name__, coro_repr)
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        return next(self.gen)
+    next = __next__
+
+    if _YIELD_FROM_BUG:
+        # For for CPython issue #21209: using "yield from" and a custom
+        # generator, generator.send(tuple) unpacks the tuple instead of passing
+        # the tuple unchanged. Check if the caller is a generator using "yield
+        # from" to decide if the parameter should be unpacked or not.
+        def send(self, *value):
+            frame = sys._getframe()
+            caller = frame.f_back
+            assert caller.f_lasti >= 0
+            if caller.f_code.co_code[caller.f_lasti] != _YIELD_FROM:
+                value = value[0]
+            return self.gen.send(value)
+    else:
+        def send(self, value):
+            return self.gen.send(value)
+
+    def throw(self, exc_type, exc_value=None, exc_tb=None):
+        return self.gen.throw(exc_type, exc_value, exc_tb)
+
+    def close(self):
+        return self.gen.close()
+
+    @property
+    def gi_frame(self):
+        return self.gen.gi_frame
+
+    @property
+    def gi_running(self):
+        return self.gen.gi_running
+
+    @property
+    def gi_code(self):
+        return self.gen.gi_code
+
+    if compat.PY35:
+
+        __await__ = __iter__ # make compatible with 'await' expression
+
+        @property
+        def gi_yieldfrom(self):
+            return self.gen.gi_yieldfrom
+
+        @property
+        def cr_await(self):
+            return self.gen.cr_await
+
+        @property
+        def cr_running(self):
+            return self.gen.cr_running
+
+        @property
+        def cr_code(self):
+            return self.gen.cr_code
+
+        @property
+        def cr_frame(self):
+            return self.gen.cr_frame
+
+    def __del__(self):
+        # Be careful accessing self.gen.frame -- self.gen might not exist.
+        gen = getattr(self, 'gen', None)
+        frame = getattr(gen, 'gi_frame', None)
+        if frame is None:
+            frame = getattr(gen, 'cr_frame', None)
+        if frame is not None and frame.f_lasti == -1:
+            msg = '%r was never yielded from' % self
+            tb = getattr(self, '_source_traceback', ())
+            if tb:
+                tb = ''.join(traceback.format_list(tb))
+                msg += ('\nCoroutine object created at '
+                        '(most recent call last):\n')
+                msg += tb.rstrip()
+            logger.error(msg)
+
+if not compat.PY34:
+    # Backport functools.update_wrapper() from Python 3.4:
+    # - Python 2.7 fails if assigned attributes don't exist
+    # - Python 2.7 and 3.1 don't set the __wrapped__ attribute
+    # - Python 3.2 and 3.3 set __wrapped__ before updating __dict__
+    def _update_wrapper(wrapper,
+                       wrapped,
+                       assigned = functools.WRAPPER_ASSIGNMENTS,
+                       updated = functools.WRAPPER_UPDATES):
+        """Update a wrapper function to look like the wrapped function
+
+           wrapper is the function to be updated
+           wrapped is the original function
+           assigned is a tuple naming the attributes assigned directly
+           from the wrapped function to the wrapper function (defaults to
+           functools.WRAPPER_ASSIGNMENTS)
+           updated is a tuple naming the attributes of the wrapper that
+           are updated with the corresponding attribute from the wrapped
+           function (defaults to functools.WRAPPER_UPDATES)
+        """
+        for attr in assigned:
+            try:
+                value = getattr(wrapped, attr)
+            except AttributeError:
+                pass
+            else:
+                setattr(wrapper, attr, value)
+        for attr in updated:
+            getattr(wrapper, attr).update(getattr(wrapped, attr, {}))
+        # Issue #17482: set __wrapped__ last so we don't inadvertently copy it
+        # from the wrapped function when updating __dict__
+        wrapper.__wrapped__ = wrapped
+        # Return the wrapper so this can be used as a decorator via partial()
+        return wrapper
+
+    def _wraps(wrapped,
+              assigned = functools.WRAPPER_ASSIGNMENTS,
+              updated = functools.WRAPPER_UPDATES):
+        """Decorator factory to apply update_wrapper() to a wrapper function
+
+           Returns a decorator that invokes update_wrapper() with the decorated
+           function as the wrapper argument and the arguments to wraps() as the
+           remaining arguments. Default arguments are as for update_wrapper().
+           This is a convenience function to simplify applying partial() to
+           update_wrapper().
+        """
+        return functools.partial(_update_wrapper, wrapped=wrapped,
+                                 assigned=assigned, updated=updated)
+else:
+    _wraps = functools.wraps
+
+_PEP479 = (sys.version_info >= (3, 5))
+if _PEP479:
+    # Need exec() because yield+return raises a SyntaxError on Python 2
+    exec(textwrap.dedent('''
+        def pep479_wrapper(func, coro_func):
+            @_wraps(func)
+            def pep479_wrapped(*args, **kw):
+                coro = coro_func(*args, **kw)
+                value = None
+                error = None
+                while True:
+                    try:
+                        if error is not None:
+                            value = coro.throw(error)
+                        elif value is not None:
+                            value = coro.send(value)
+                        else:
+                            value = next(coro)
+                    except RuntimeError:
+                        # FIXME: special case for
+                        # FIXME: "isinstance(exc.__context__, StopIteration)"?
+                        raise
+                    except StopIteration as exc:
+                        return exc.value
+                    except Return as exc:
+                        exc.raised = True
+                        return exc.value
+                    except BaseException as exc:
+                        raise
+
+                    try:
+                        value = yield value
+                        error = None
+                    except BaseException as exc:
+                        value = None
+                        error = exc
+
+            return pep479_wrapped
+        '''))
+
+
+def coroutine(func):
+    """Decorator to mark coroutines.
+
+    If the coroutine is not yielded from before it is destroyed,
+    an error message is logged.
+    """
+    if _inspect_iscoroutinefunction(func):
+        # In Python 3.5 that's all we need to do for coroutines
+        # defiend with "async def".
+        # Wrapping in CoroWrapper will happen via
+        # 'sys.set_coroutine_wrapper' function.
+        return func
+
+    if inspect.isgeneratorfunction(func):
+        coro = func
+    else:
+        @_wraps(func)
+        def coro(*args, **kw):
+            res = func(*args, **kw)
+            if (isinstance(res, futures._FUTURE_CLASSES)
+                or inspect.isgenerator(res)):
+                res = yield From(res)
+            elif _AwaitableABC is not None:
+                # If 'func' returns an Awaitable (new in 3.5) we
+                # want to run it.
+                try:
+                    await_meth = res.__await__
+                except AttributeError:
+                    pass
+                else:
+                    if isinstance(res, _AwaitableABC):
+                        res = yield From(await_meth())
+            raise Return(res)
+
+    if _PEP479:
+        # FIXME: use @_wraps
+        coro = pep479_wrapper(func, coro)
+        coro = _wraps(func)(coro)
+
+    if not _DEBUG:
+        if _types_coroutine is None:
+            wrapper = coro
+        else:
+            wrapper = _types_coroutine(coro)
+    else:
+        @_wraps(func)
+        def wrapper(*args, **kwds):
+            w = CoroWrapper(coro(*args, **kwds), func=func)
+            if w._source_traceback:
+                del w._source_traceback[-1]
+            # Python < 3.5 does not implement __qualname__
+            # on generator objects, so we set it manually.
+            # We use getattr as some callables (such as
+            # functools.partial may lack __qualname__).
+            w.__name__ = getattr(func, '__name__', None)
+            w.__qualname__ = getattr(func, '__qualname__', None)
+            return w
+
+    wrapper._is_coroutine = True  # For iscoroutinefunction().
+    return wrapper
+
+
+def iscoroutinefunction(func):
+    """Return True if func is a decorated coroutine function."""
+    return (getattr(func, '_is_coroutine', False) or
+            _inspect_iscoroutinefunction(func))
+
+
+_COROUTINE_TYPES = (types.GeneratorType, CoroWrapper)
+if _CoroutineABC is not None:
+    _COROUTINE_TYPES += (_CoroutineABC,)
+if events.asyncio is not None:
+    # Accept also asyncio CoroWrapper for interoperability
+    if hasattr(events.asyncio, 'coroutines'):
+        _COROUTINE_TYPES += (events.asyncio.coroutines.CoroWrapper,)
+    else:
+        # old asyncio/Python versions
+        _COROUTINE_TYPES += (events.asyncio.tasks.CoroWrapper,)
+
+def iscoroutine(obj):
+    """Return True if obj is a coroutine object."""
+    return isinstance(obj, _COROUTINE_TYPES)
+
+
+def _format_coroutine(coro):
+    assert iscoroutine(coro)
+
+    coro_name = None
+    if isinstance(coro, CoroWrapper):
+        func = coro.func
+        coro_name = coro.__qualname__
+        if coro_name is not None:
+            coro_name = '{0}()'.format(coro_name)
+    else:
+        func = coro
+
+    if coro_name is None:
+        coro_name = events._format_callback(func, ())
+
+    try:
+        coro_code = coro.gi_code
+    except AttributeError:
+        coro_code = coro.cr_code
+
+    try:
+        coro_frame = coro.gi_frame
+    except AttributeError:
+        coro_frame = coro.cr_frame
+
+    filename = coro_code.co_filename
+    if (isinstance(coro, CoroWrapper)
+    and not inspect.isgeneratorfunction(coro.func)
+    and coro.func is not None):
+        filename, lineno = events._get_function_source(coro.func)
+        if coro_frame is None:
+            coro_repr = ('%s done, defined at %s:%s'
+                         % (coro_name, filename, lineno))
+        else:
+            coro_repr = ('%s running, defined at %s:%s'
+                         % (coro_name, filename, lineno))
+    elif coro_frame is not None:
+        lineno = coro_frame.f_lineno
+        coro_repr = ('%s running at %s:%s'
+                     % (coro_name, filename, lineno))
+    else:
+        lineno = coro_code.co_firstlineno
+        coro_repr = ('%s done, defined at %s:%s'
+                     % (coro_name, filename, lineno))
+
+    return coro_repr
+
+
+class FromWrapper(object):
+    __slots__ = ('obj',)
+
+    def __init__(self, obj):
+        if isinstance(obj, FromWrapper):
+            obj = obj.obj
+            assert not isinstance(obj, FromWrapper)
+        self.obj = obj
+
+def From(obj):
+    if not _DEBUG:
+        return obj
+    else:
+        return FromWrapper(obj)