efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / tasks.py
1 """Support for tasks, coroutines and the scheduler."""
2 from __future__ import print_function
3
4 __all__ = ['Task',
5            'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6            'wait', 'wait_for', 'as_completed', 'sleep', 'async',
7            'gather', 'shield', 'ensure_future',
8            ]
9
10 import functools
11 import linecache
12 import traceback
13 import warnings
14 try:
15     from weakref import WeakSet
16 except ImportError:
17     # Python 2.6
18     from .py27_weakrefset import WeakSet
19
20 from . import compat
21 from . import coroutines
22 from . import events
23 from . import executor
24 from . import futures
25 from .locks import Lock, Condition, Semaphore, _ContextManager
26 from .coroutines import coroutine, From, Return, ReturnException
27
28
29
30 @coroutine
31 def _lock_coroutine(lock):
32     yield From(lock.acquire())
33     raise Return(_ContextManager(lock))
34
35
36 class Task(futures.Future):
37     """A coroutine wrapped in a Future."""
38
39     # An important invariant maintained while a Task not done:
40     #
41     # - Either _fut_waiter is None, and _step() is scheduled;
42     # - or _fut_waiter is some Future, and _step() is *not* scheduled.
43     #
44     # The only transition from the latter to the former is through
45     # _wakeup().  When _fut_waiter is not None, one of its callbacks
46     # must be _wakeup().
47
48     # Weak set containing all tasks alive.
49     _all_tasks = WeakSet()
50
51     # Dictionary containing tasks that are currently active in
52     # all running event loops.  {EventLoop: Task}
53     _current_tasks = {}
54
55     # If False, don't log a message if the task is destroyed whereas its
56     # status is still pending
57     _log_destroy_pending = True
58
59     @classmethod
60     def current_task(cls, loop=None):
61         """Return the currently running task in an event loop or None.
62
63         By default the current task for the current event loop is returned.
64
65         None is returned when called not in the context of a Task.
66         """
67         if loop is None:
68             loop = events.get_event_loop()
69         return cls._current_tasks.get(loop)
70
71     @classmethod
72     def all_tasks(cls, loop=None):
73         """Return a set of all tasks for an event loop.
74
75         By default all tasks for the current event loop are returned.
76         """
77         if loop is None:
78             loop = events.get_event_loop()
79         return set(t for t in cls._all_tasks if t._loop is loop)
80
81     def __init__(self, coro, loop=None):
82         assert coroutines.iscoroutine(coro), repr(coro)
83         super(Task, self).__init__(loop=loop)
84         if self._source_traceback:
85             del self._source_traceback[-1]
86         self._coro = coro
87         self._fut_waiter = None
88         self._must_cancel = False
89         self._loop.call_soon(self._step)
90         self.__class__._all_tasks.add(self)
91
92     # On Python 3.3 or older, objects with a destructor that are part of a
93     # reference cycle are never destroyed. That's not the case any more on
94     # Python 3.4 thanks to the PEP 442.
95     if compat.PY34:
96         def __del__(self):
97             if self._state == futures._PENDING and self._log_destroy_pending:
98                 context = {
99                     'task': self,
100                     'message': 'Task was destroyed but it is pending!',
101                 }
102                 if self._source_traceback:
103                     context['source_traceback'] = self._source_traceback
104                 self._loop.call_exception_handler(context)
105             futures.Future.__del__(self)
106
107     def _repr_info(self):
108         info = super(Task, self)._repr_info()
109
110         if self._must_cancel:
111             # replace status
112             info[0] = 'cancelling'
113
114         coro = coroutines._format_coroutine(self._coro)
115         info.insert(1, 'coro=<%s>' % coro)
116
117         if self._fut_waiter is not None:
118             info.insert(2, 'wait_for=%r' % self._fut_waiter)
119         return info
120
121     def get_stack(self, limit=None):
122         """Return the list of stack frames for this task's coroutine.
123
124         If the coroutine is not done, this returns the stack where it is
125         suspended.  If the coroutine has completed successfully or was
126         cancelled, this returns an empty list.  If the coroutine was
127         terminated by an exception, this returns the list of traceback
128         frames.
129
130         The frames are always ordered from oldest to newest.
131
132         The optional limit gives the maximum number of frames to
133         return; by default all available frames are returned.  Its
134         meaning differs depending on whether a stack or a traceback is
135         returned: the newest frames of a stack are returned, but the
136         oldest frames of a traceback are returned.  (This matches the
137         behavior of the traceback module.)
138
139         For reasons beyond our control, only one stack frame is
140         returned for a suspended coroutine.
141         """
142         frames = []
143         try:
144             # 'async def' coroutines
145             f = self._coro.cr_frame
146         except AttributeError:
147             f = self._coro.gi_frame
148         if f is not None:
149             while f is not None:
150                 if limit is not None:
151                     if limit <= 0:
152                         break
153                     limit -= 1
154                 frames.append(f)
155                 f = f.f_back
156             frames.reverse()
157         elif self._exception is not None:
158             tb = self._exception.__traceback__
159             while tb is not None:
160                 if limit is not None:
161                     if limit <= 0:
162                         break
163                     limit -= 1
164                 frames.append(tb.tb_frame)
165                 tb = tb.tb_next
166         return frames
167
168     def print_stack(self, limit=None, file=None):
169         """Print the stack or traceback for this task's coroutine.
170
171         This produces output similar to that of the traceback module,
172         for the frames retrieved by get_stack().  The limit argument
173         is passed to get_stack().  The file argument is an I/O stream
174         to which the output is written; by default output is written
175         to sys.stderr.
176         """
177         extracted_list = []
178         checked = set()
179         for f in self.get_stack(limit=limit):
180             lineno = f.f_lineno
181             co = f.f_code
182             filename = co.co_filename
183             name = co.co_name
184             if filename not in checked:
185                 checked.add(filename)
186                 linecache.checkcache(filename)
187             line = linecache.getline(filename, lineno, f.f_globals)
188             extracted_list.append((filename, lineno, name, line))
189         exc = self._exception
190         if not extracted_list:
191             print('No stack for %r' % self, file=file)
192         elif exc is not None:
193             print('Traceback for %r (most recent call last):' % self,
194                   file=file)
195         else:
196             print('Stack for %r (most recent call last):' % self,
197                   file=file)
198         traceback.print_list(extracted_list, file=file)
199         if exc is not None:
200             for line in traceback.format_exception_only(exc.__class__, exc):
201                 print(line, file=file, end='')
202
203     def cancel(self):
204         """Request that this task cancel itself.
205
206         This arranges for a CancelledError to be thrown into the
207         wrapped coroutine on the next cycle through the event loop.
208         The coroutine then has a chance to clean up or even deny
209         the request using try/except/finally.
210
211         Unlike Future.cancel, this does not guarantee that the
212         task will be cancelled: the exception might be caught and
213         acted upon, delaying cancellation of the task or preventing
214         cancellation completely.  The task may also return a value or
215         raise a different exception.
216
217         Immediately after this method is called, Task.cancelled() will
218         not return True (unless the task was already cancelled).  A
219         task will be marked as cancelled when the wrapped coroutine
220         terminates with a CancelledError exception (even if cancel()
221         was not called).
222         """
223         if self.done():
224             return False
225         if self._fut_waiter is not None:
226             if self._fut_waiter.cancel():
227                 # Leave self._fut_waiter; it may be a Task that
228                 # catches and ignores the cancellation so we may have
229                 # to cancel it again later.
230                 return True
231         # It must be the case that self._step is already scheduled.
232         self._must_cancel = True
233         return True
234
235     def _step(self, value=None, exc=None, exc_tb=None):
236         assert not self.done(), \
237             '_step(): already done: {0!r}, {1!r}, {2!r}'.format(self, value, exc)
238
239         if self._must_cancel:
240             if not isinstance(exc, futures.CancelledError):
241                 exc = futures.CancelledError()
242             self._must_cancel = False
243         coro = self._coro
244         self._fut_waiter = None
245
246         if exc_tb is not None:
247             init_exc = exc
248         else:
249             init_exc = None
250         self.__class__._current_tasks[self._loop] = self
251         # Call either coro.throw(exc) or coro.send(value).
252         try:
253             if exc is not None:
254                 if exc_tb is not None:
255                    result = coro.throw(exc, None, exc_tb)
256                 else:
257                    result = coro.throw(exc)
258             else:
259                 result = coro.send(value)
260         # On Python 3.3 and Python 3.4, ReturnException is not used in
261         # practice. But this except is kept to have a single code base
262         # for all Python versions.
263         except coroutines.ReturnException as exc:
264             if isinstance(exc, ReturnException):
265                 exc.raised = True
266                 result = exc.value
267             else:
268                 result = None
269             self.set_result(result)
270         except StopIteration as exc:
271             if compat.PY33:
272                 # asyncio Task object? get the result of the coroutine
273                 result = exc.value
274             else:
275                 if isinstance(exc, ReturnException):
276                     exc.raised = True
277                     result = exc.value
278                 else:
279                     result = None
280             self.set_result(result)
281         except futures.CancelledError as exc:
282             super(Task, self).cancel()  # I.e., Future.cancel(self).
283         except BaseException as exc:
284             if exc is init_exc:
285                 self._set_exception_with_tb(exc, exc_tb)
286                 exc_tb = None
287             else:
288                 self.set_exception(exc)
289
290             if not isinstance(exc, Exception):
291                 # reraise BaseException
292                 raise
293         else:
294             if coroutines._DEBUG:
295                 if not coroutines._coroutine_at_yield_from(self._coro):
296                     # trollius coroutine must "yield From(...)"
297                     if not isinstance(result, coroutines.FromWrapper):
298                         self._loop.call_soon(
299                             self._step, None,
300                             RuntimeError("yield used without From"))
301                         return
302                     result = result.obj
303                 else:
304                     # asyncio coroutine using "yield from ..."
305                     if isinstance(result, coroutines.FromWrapper):
306                         result = result.obj
307             elif isinstance(result, coroutines.FromWrapper):
308                 result = result.obj
309
310             if coroutines.iscoroutine(result):
311                 # "yield coroutine" creates a task, the current task
312                 # will wait until the new task is done
313                 result = self._loop.create_task(result)
314             # FIXME: faster check. common base class? hasattr?
315             elif isinstance(result, (Lock, Condition, Semaphore)):
316                 coro = _lock_coroutine(result)
317                 result = self._loop.create_task(coro)
318
319             if isinstance(result, futures._FUTURE_CLASSES):
320                 # Yielded Future must come from Future.__iter__().
321                 result.add_done_callback(self._wakeup)
322                 self._fut_waiter = result
323                 if self._must_cancel:
324                     if self._fut_waiter.cancel():
325                         self._must_cancel = False
326             elif result is None:
327                 # Bare yield relinquishes control for one event loop iteration.
328                 self._loop.call_soon(self._step)
329             else:
330                 # Yielding something else is an error.
331                 self._loop.call_soon(
332                     self._step, None,
333                     RuntimeError(
334                         'Task got bad yield: {0!r}'.format(result)))
335         finally:
336             self.__class__._current_tasks.pop(self._loop)
337             self = None  # Needed to break cycles when an exception occurs.
338
339     def _wakeup(self, future):
340         if (future._state == futures._FINISHED
341         and future._exception is not None):
342             # Get the traceback before calling exception(), because calling
343             # the exception() method clears the traceback
344             exc_tb = future._get_exception_tb()
345             exc = future.exception()
346             self._step(None, exc, exc_tb)
347             exc_tb = None
348         else:
349             try:
350                 value = future.result()
351             except Exception as exc:
352                 # This may also be a cancellation.
353                 self._step(None, exc)
354             else:
355                 self._step(value, None)
356         self = None  # Needed to break cycles when an exception occurs.
357
358
359 # wait() and as_completed() similar to those in PEP 3148.
360
361 # Export symbols in trollius.tasks for compatibility with asyncio
362 FIRST_COMPLETED = executor.FIRST_COMPLETED
363 FIRST_EXCEPTION = executor.FIRST_EXCEPTION
364 ALL_COMPLETED = executor.ALL_COMPLETED
365
366
367 @coroutine
368 def wait(fs, loop=None, timeout=None, return_when=ALL_COMPLETED):
369     """Wait for the Futures and coroutines given by fs to complete.
370
371     The sequence futures must not be empty.
372
373     Coroutines will be wrapped in Tasks.
374
375     Returns two sets of Future: (done, pending).
376
377     Usage:
378
379         done, pending = yield From(asyncio.wait(fs))
380
381     Note: This does not raise TimeoutError! Futures that aren't done
382     when the timeout occurs are returned in the second set.
383     """
384     if isinstance(fs, futures._FUTURE_CLASSES) or coroutines.iscoroutine(fs):
385         raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
386     if not fs:
387         raise ValueError('Set of coroutines/Futures is empty.')
388     if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
389         raise ValueError('Invalid return_when value: {0}'.format(return_when))
390
391     if loop is None:
392         loop = events.get_event_loop()
393
394     fs = set(ensure_future(f, loop=loop) for f in set(fs))
395
396     result = yield From(_wait(fs, timeout, return_when, loop))
397     raise Return(result)
398
399
400 def _release_waiter(waiter, *args):
401     if not waiter.done():
402         waiter.set_result(None)
403
404
405 @coroutine
406 def wait_for(fut, timeout, loop=None):
407     """Wait for the single Future or coroutine to complete, with timeout.
408
409     Coroutine will be wrapped in Task.
410
411     Returns result of the Future or coroutine.  When a timeout occurs,
412     it cancels the task and raises TimeoutError.  To avoid the task
413     cancellation, wrap it in shield().
414
415     If the wait is cancelled, the task is also cancelled.
416
417     This function is a coroutine.
418     """
419     if loop is None:
420         loop = events.get_event_loop()
421
422     if timeout is None:
423         result = yield From(fut)
424         raise Return(result)
425
426     waiter = futures.Future(loop=loop)
427     timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
428     cb = functools.partial(_release_waiter, waiter)
429
430     fut = ensure_future(fut, loop=loop)
431     fut.add_done_callback(cb)
432
433     try:
434         # wait until the future completes or the timeout
435         try:
436             yield From(waiter)
437         except futures.CancelledError:
438             fut.remove_done_callback(cb)
439             fut.cancel()
440             raise
441
442         if fut.done():
443             raise Return(fut.result())
444         else:
445             fut.remove_done_callback(cb)
446             fut.cancel()
447             raise futures.TimeoutError()
448     finally:
449         timeout_handle.cancel()
450
451
452 @coroutine
453 def _wait(fs, timeout, return_when, loop):
454     """Internal helper for wait() and _wait_for().
455
456     The fs argument must be a collection of Futures.
457     """
458     assert fs, 'Set of Futures is empty.'
459     waiter = futures.Future(loop=loop)
460     timeout_handle = None
461     if timeout is not None:
462         timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
463     non_local = {'counter': len(fs)}
464
465     def _on_completion(f):
466         non_local['counter'] -= 1
467         if (non_local['counter'] <= 0 or
468             return_when == FIRST_COMPLETED or
469             return_when == FIRST_EXCEPTION and (not f.cancelled() and
470                                                 f.exception() is not None)):
471             if timeout_handle is not None:
472                 timeout_handle.cancel()
473             if not waiter.done():
474                 waiter.set_result(None)
475
476     for f in fs:
477         f.add_done_callback(_on_completion)
478
479     try:
480         yield From(waiter)
481     finally:
482         if timeout_handle is not None:
483             timeout_handle.cancel()
484
485     done, pending = set(), set()
486     for f in fs:
487         f.remove_done_callback(_on_completion)
488         if f.done():
489             done.add(f)
490         else:
491             pending.add(f)
492     raise Return(done, pending)
493
494
495 # This is *not* a @coroutine!  It is just an iterator (yielding Futures).
496 def as_completed(fs, loop=None, timeout=None):
497     """Return an iterator whose values are coroutines.
498
499     When waiting for the yielded coroutines you'll get the results (or
500     exceptions!) of the original Futures (or coroutines), in the order
501     in which and as soon as they complete.
502
503     This differs from PEP 3148; the proper way to use this is:
504
505         for f in as_completed(fs):
506             result = yield From(f)  # The 'yield' may raise.
507             # Use result.
508
509     If a timeout is specified, the 'yield' will raise
510     TimeoutError when the timeout occurs before all Futures are done.
511
512     Note: The futures 'f' are not necessarily members of fs.
513     """
514     if isinstance(fs, futures._FUTURE_CLASSES) or coroutines.iscoroutine(fs):
515         raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
516     loop = loop if loop is not None else events.get_event_loop()
517     todo = set(ensure_future(f, loop=loop) for f in set(fs))
518     from .queues import Queue  # Import here to avoid circular import problem.
519     done = Queue(loop=loop)
520     timeout_handle = None
521
522     def _on_timeout():
523         for f in todo:
524             f.remove_done_callback(_on_completion)
525             done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
526         todo.clear()  # Can't do todo.remove(f) in the loop.
527
528     def _on_completion(f):
529         if not todo:
530             return  # _on_timeout() was here first.
531         todo.remove(f)
532         done.put_nowait(f)
533         if not todo and timeout_handle is not None:
534             timeout_handle.cancel()
535
536     @coroutine
537     def _wait_for_one():
538         f = yield From(done.get())
539         if f is None:
540             # Dummy value from _on_timeout().
541             raise futures.TimeoutError
542         raise Return(f.result())   # May raise f.exception().
543
544     for f in todo:
545         f.add_done_callback(_on_completion)
546     if todo and timeout is not None:
547         timeout_handle = loop.call_later(timeout, _on_timeout)
548     for _ in range(len(todo)):
549         yield _wait_for_one()
550
551
552 @coroutine
553 def sleep(delay, result=None, loop=None):
554     """Coroutine that completes after a given time (in seconds)."""
555     future = futures.Future(loop=loop)
556     h = future._loop.call_later(delay,
557                                 future._set_result_unless_cancelled, result)
558     try:
559         result = yield From(future)
560         raise Return(result)
561     finally:
562         h.cancel()
563
564
565 def async(coro_or_future, loop=None):
566     """Wrap a coroutine in a future.
567
568     If the argument is a Future, it is returned directly.
569
570     This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
571     """
572
573     warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
574                   DeprecationWarning)
575
576     return ensure_future(coro_or_future, loop=loop)
577
578
579 def ensure_future(coro_or_future, loop=None):
580     """Wrap a coroutine in a future.
581
582     If the argument is a Future, it is returned directly.
583     """
584     # FIXME: only check if coroutines._DEBUG is True?
585     if isinstance(coro_or_future, coroutines.FromWrapper):
586         coro_or_future = coro_or_future.obj
587     if isinstance(coro_or_future, futures._FUTURE_CLASSES):
588         if loop is not None and loop is not coro_or_future._loop:
589             raise ValueError('loop argument must agree with Future')
590         return coro_or_future
591     elif coroutines.iscoroutine(coro_or_future):
592         if loop is None:
593             loop = events.get_event_loop()
594         task = loop.create_task(coro_or_future)
595         if task._source_traceback:
596             del task._source_traceback[-1]
597         return task
598     else:
599         raise TypeError('A Future or coroutine is required')
600
601
602 class _GatheringFuture(futures.Future):
603     """Helper for gather().
604
605     This overrides cancel() to cancel all the children and act more
606     like Task.cancel(), which doesn't immediately mark itself as
607     cancelled.
608     """
609
610     def __init__(self, children, loop=None):
611         super(_GatheringFuture, self).__init__(loop=loop)
612         self._children = children
613
614     def cancel(self):
615         if self.done():
616             return False
617         for child in self._children:
618             child.cancel()
619         return True
620
621
622 def gather(*coros_or_futures, **kw):
623     """Return a future aggregating results from the given coroutines
624     or futures.
625
626     All futures must share the same event loop.  If all the tasks are
627     done successfully, the returned future's result is the list of
628     results (in the order of the original sequence, not necessarily
629     the order of results arrival).  If *return_exceptions* is True,
630     exceptions in the tasks are treated the same as successful
631     results, and gathered in the result list; otherwise, the first
632     raised exception will be immediately propagated to the returned
633     future.
634
635     Cancellation: if the outer Future is cancelled, all children (that
636     have not completed yet) are also cancelled.  If any child is
637     cancelled, this is treated as if it raised CancelledError --
638     the outer Future is *not* cancelled in this case.  (This is to
639     prevent the cancellation of one child to cause other children to
640     be cancelled.)
641     """
642     loop = kw.pop('loop', None)
643     return_exceptions = kw.pop('return_exceptions', False)
644     if kw:
645         raise TypeError("unexpected keyword")
646
647     if not coros_or_futures:
648         outer = futures.Future(loop=loop)
649         outer.set_result([])
650         return outer
651
652     arg_to_fut = {}
653     for arg in set(coros_or_futures):
654         if not isinstance(arg, futures._FUTURE_CLASSES):
655             fut = ensure_future(arg, loop=loop)
656             if loop is None:
657                 loop = fut._loop
658             # The caller cannot control this future, the "destroy pending task"
659             # warning should not be emitted.
660             fut._log_destroy_pending = False
661         else:
662             fut = arg
663             if loop is None:
664                 loop = fut._loop
665             elif fut._loop is not loop:
666                 raise ValueError("futures are tied to different event loops")
667         arg_to_fut[arg] = fut
668
669     children = [arg_to_fut[arg] for arg in coros_or_futures]
670     nchildren = len(children)
671     outer = _GatheringFuture(children, loop=loop)
672     non_local = {'nfinished': 0}
673     results = [None] * nchildren
674
675     def _done_callback(i, fut):
676         if outer.done():
677             if not fut.cancelled():
678                 # Mark exception retrieved.
679                 fut.exception()
680             return
681
682         if fut.cancelled():
683             res = futures.CancelledError()
684             if not return_exceptions:
685                 outer.set_exception(res)
686                 return
687         elif fut._exception is not None:
688             res = fut.exception()  # Mark exception retrieved.
689             if not return_exceptions:
690                 outer.set_exception(res)
691                 return
692         else:
693             res = fut._result
694         results[i] = res
695         non_local['nfinished'] += 1
696         if non_local['nfinished'] == nchildren:
697             outer.set_result(results)
698
699     for i, fut in enumerate(children):
700         fut.add_done_callback(functools.partial(_done_callback, i))
701     return outer
702
703
704 def shield(arg, loop=None):
705     """Wait for a future, shielding it from cancellation.
706
707     The statement
708
709         res = yield From(shield(something()))
710
711     is exactly equivalent to the statement
712
713         res = yield From(something())
714
715     *except* that if the coroutine containing it is cancelled, the
716     task running in something() is not cancelled.  From the POV of
717     something(), the cancellation did not happen.  But its caller is
718     still cancelled, so the yield-from expression still raises
719     CancelledError.  Note: If something() is cancelled by other means
720     this will still cancel shield().
721
722     If you want to completely ignore cancellation (not recommended)
723     you can combine shield() with a try/except clause, as follows:
724
725         try:
726             res = yield From(shield(something()))
727         except CancelledError:
728             res = None
729     """
730     inner = ensure_future(arg, loop=loop)
731     if inner.done():
732         # Shortcut.
733         return inner
734     loop = inner._loop
735     outer = futures.Future(loop=loop)
736
737     def _done_callback(inner):
738         if outer.cancelled():
739             if not inner.cancelled():
740                 # Mark inner's result as retrieved.
741                 inner.exception()
742             return
743
744         if inner.cancelled():
745             outer.cancel()
746         else:
747             exc = inner.exception()
748             if exc is not None:
749                 outer.set_exception(exc)
750             else:
751                 outer.set_result(inner.result())
752
753     inner.add_done_callback(_done_callback)
754     return outer