1 """Support for tasks, coroutines and the scheduler."""
2 from __future__ import print_function
5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
7 'gather', 'shield', 'ensure_future',
15 from weakref import WeakSet
18 from .py27_weakrefset import WeakSet
21 from . import coroutines
23 from . import executor
25 from .locks import Lock, Condition, Semaphore, _ContextManager
26 from .coroutines import coroutine, From, Return, ReturnException
31 def _lock_coroutine(lock):
32 yield From(lock.acquire())
33 raise Return(_ContextManager(lock))
36 class Task(futures.Future):
37 """A coroutine wrapped in a Future."""
39 # An important invariant maintained while a Task not done:
41 # - Either _fut_waiter is None, and _step() is scheduled;
42 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
44 # The only transition from the latter to the former is through
45 # _wakeup(). When _fut_waiter is not None, one of its callbacks
48 # Weak set containing all tasks alive.
49 _all_tasks = WeakSet()
51 # Dictionary containing tasks that are currently active in
52 # all running event loops. {EventLoop: Task}
55 # If False, don't log a message if the task is destroyed whereas its
56 # status is still pending
57 _log_destroy_pending = True
60 def current_task(cls, loop=None):
61 """Return the currently running task in an event loop or None.
63 By default the current task for the current event loop is returned.
65 None is returned when called not in the context of a Task.
68 loop = events.get_event_loop()
69 return cls._current_tasks.get(loop)
72 def all_tasks(cls, loop=None):
73 """Return a set of all tasks for an event loop.
75 By default all tasks for the current event loop are returned.
78 loop = events.get_event_loop()
79 return set(t for t in cls._all_tasks if t._loop is loop)
81 def __init__(self, coro, loop=None):
82 assert coroutines.iscoroutine(coro), repr(coro)
83 super(Task, self).__init__(loop=loop)
84 if self._source_traceback:
85 del self._source_traceback[-1]
87 self._fut_waiter = None
88 self._must_cancel = False
89 self._loop.call_soon(self._step)
90 self.__class__._all_tasks.add(self)
92 # On Python 3.3 or older, objects with a destructor that are part of a
93 # reference cycle are never destroyed. That's not the case any more on
94 # Python 3.4 thanks to the PEP 442.
97 if self._state == futures._PENDING and self._log_destroy_pending:
100 'message': 'Task was destroyed but it is pending!',
102 if self._source_traceback:
103 context['source_traceback'] = self._source_traceback
104 self._loop.call_exception_handler(context)
105 futures.Future.__del__(self)
107 def _repr_info(self):
108 info = super(Task, self)._repr_info()
110 if self._must_cancel:
112 info[0] = 'cancelling'
114 coro = coroutines._format_coroutine(self._coro)
115 info.insert(1, 'coro=<%s>' % coro)
117 if self._fut_waiter is not None:
118 info.insert(2, 'wait_for=%r' % self._fut_waiter)
121 def get_stack(self, limit=None):
122 """Return the list of stack frames for this task's coroutine.
124 If the coroutine is not done, this returns the stack where it is
125 suspended. If the coroutine has completed successfully or was
126 cancelled, this returns an empty list. If the coroutine was
127 terminated by an exception, this returns the list of traceback
130 The frames are always ordered from oldest to newest.
132 The optional limit gives the maximum number of frames to
133 return; by default all available frames are returned. Its
134 meaning differs depending on whether a stack or a traceback is
135 returned: the newest frames of a stack are returned, but the
136 oldest frames of a traceback are returned. (This matches the
137 behavior of the traceback module.)
139 For reasons beyond our control, only one stack frame is
140 returned for a suspended coroutine.
144 # 'async def' coroutines
145 f = self._coro.cr_frame
146 except AttributeError:
147 f = self._coro.gi_frame
150 if limit is not None:
157 elif self._exception is not None:
158 tb = self._exception.__traceback__
159 while tb is not None:
160 if limit is not None:
164 frames.append(tb.tb_frame)
168 def print_stack(self, limit=None, file=None):
169 """Print the stack or traceback for this task's coroutine.
171 This produces output similar to that of the traceback module,
172 for the frames retrieved by get_stack(). The limit argument
173 is passed to get_stack(). The file argument is an I/O stream
174 to which the output is written; by default output is written
179 for f in self.get_stack(limit=limit):
182 filename = co.co_filename
184 if filename not in checked:
185 checked.add(filename)
186 linecache.checkcache(filename)
187 line = linecache.getline(filename, lineno, f.f_globals)
188 extracted_list.append((filename, lineno, name, line))
189 exc = self._exception
190 if not extracted_list:
191 print('No stack for %r' % self, file=file)
192 elif exc is not None:
193 print('Traceback for %r (most recent call last):' % self,
196 print('Stack for %r (most recent call last):' % self,
198 traceback.print_list(extracted_list, file=file)
200 for line in traceback.format_exception_only(exc.__class__, exc):
201 print(line, file=file, end='')
204 """Request that this task cancel itself.
206 This arranges for a CancelledError to be thrown into the
207 wrapped coroutine on the next cycle through the event loop.
208 The coroutine then has a chance to clean up or even deny
209 the request using try/except/finally.
211 Unlike Future.cancel, this does not guarantee that the
212 task will be cancelled: the exception might be caught and
213 acted upon, delaying cancellation of the task or preventing
214 cancellation completely. The task may also return a value or
215 raise a different exception.
217 Immediately after this method is called, Task.cancelled() will
218 not return True (unless the task was already cancelled). A
219 task will be marked as cancelled when the wrapped coroutine
220 terminates with a CancelledError exception (even if cancel()
225 if self._fut_waiter is not None:
226 if self._fut_waiter.cancel():
227 # Leave self._fut_waiter; it may be a Task that
228 # catches and ignores the cancellation so we may have
229 # to cancel it again later.
231 # It must be the case that self._step is already scheduled.
232 self._must_cancel = True
235 def _step(self, value=None, exc=None, exc_tb=None):
236 assert not self.done(), \
237 '_step(): already done: {0!r}, {1!r}, {2!r}'.format(self, value, exc)
239 if self._must_cancel:
240 if not isinstance(exc, futures.CancelledError):
241 exc = futures.CancelledError()
242 self._must_cancel = False
244 self._fut_waiter = None
246 if exc_tb is not None:
250 self.__class__._current_tasks[self._loop] = self
251 # Call either coro.throw(exc) or coro.send(value).
254 if exc_tb is not None:
255 result = coro.throw(exc, None, exc_tb)
257 result = coro.throw(exc)
259 result = coro.send(value)
260 # On Python 3.3 and Python 3.4, ReturnException is not used in
261 # practice. But this except is kept to have a single code base
262 # for all Python versions.
263 except coroutines.ReturnException as exc:
264 if isinstance(exc, ReturnException):
269 self.set_result(result)
270 except StopIteration as exc:
272 # asyncio Task object? get the result of the coroutine
275 if isinstance(exc, ReturnException):
280 self.set_result(result)
281 except futures.CancelledError as exc:
282 super(Task, self).cancel() # I.e., Future.cancel(self).
283 except BaseException as exc:
285 self._set_exception_with_tb(exc, exc_tb)
288 self.set_exception(exc)
290 if not isinstance(exc, Exception):
291 # reraise BaseException
294 if coroutines._DEBUG:
295 if not coroutines._coroutine_at_yield_from(self._coro):
296 # trollius coroutine must "yield From(...)"
297 if not isinstance(result, coroutines.FromWrapper):
298 self._loop.call_soon(
300 RuntimeError("yield used without From"))
304 # asyncio coroutine using "yield from ..."
305 if isinstance(result, coroutines.FromWrapper):
307 elif isinstance(result, coroutines.FromWrapper):
310 if coroutines.iscoroutine(result):
311 # "yield coroutine" creates a task, the current task
312 # will wait until the new task is done
313 result = self._loop.create_task(result)
314 # FIXME: faster check. common base class? hasattr?
315 elif isinstance(result, (Lock, Condition, Semaphore)):
316 coro = _lock_coroutine(result)
317 result = self._loop.create_task(coro)
319 if isinstance(result, futures._FUTURE_CLASSES):
320 # Yielded Future must come from Future.__iter__().
321 result.add_done_callback(self._wakeup)
322 self._fut_waiter = result
323 if self._must_cancel:
324 if self._fut_waiter.cancel():
325 self._must_cancel = False
327 # Bare yield relinquishes control for one event loop iteration.
328 self._loop.call_soon(self._step)
330 # Yielding something else is an error.
331 self._loop.call_soon(
334 'Task got bad yield: {0!r}'.format(result)))
336 self.__class__._current_tasks.pop(self._loop)
337 self = None # Needed to break cycles when an exception occurs.
339 def _wakeup(self, future):
340 if (future._state == futures._FINISHED
341 and future._exception is not None):
342 # Get the traceback before calling exception(), because calling
343 # the exception() method clears the traceback
344 exc_tb = future._get_exception_tb()
345 exc = future.exception()
346 self._step(None, exc, exc_tb)
350 value = future.result()
351 except Exception as exc:
352 # This may also be a cancellation.
353 self._step(None, exc)
355 self._step(value, None)
356 self = None # Needed to break cycles when an exception occurs.
359 # wait() and as_completed() similar to those in PEP 3148.
361 # Export symbols in trollius.tasks for compatibility with asyncio
362 FIRST_COMPLETED = executor.FIRST_COMPLETED
363 FIRST_EXCEPTION = executor.FIRST_EXCEPTION
364 ALL_COMPLETED = executor.ALL_COMPLETED
368 def wait(fs, loop=None, timeout=None, return_when=ALL_COMPLETED):
369 """Wait for the Futures and coroutines given by fs to complete.
371 The sequence futures must not be empty.
373 Coroutines will be wrapped in Tasks.
375 Returns two sets of Future: (done, pending).
379 done, pending = yield From(asyncio.wait(fs))
381 Note: This does not raise TimeoutError! Futures that aren't done
382 when the timeout occurs are returned in the second set.
384 if isinstance(fs, futures._FUTURE_CLASSES) or coroutines.iscoroutine(fs):
385 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
387 raise ValueError('Set of coroutines/Futures is empty.')
388 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
389 raise ValueError('Invalid return_when value: {0}'.format(return_when))
392 loop = events.get_event_loop()
394 fs = set(ensure_future(f, loop=loop) for f in set(fs))
396 result = yield From(_wait(fs, timeout, return_when, loop))
400 def _release_waiter(waiter, *args):
401 if not waiter.done():
402 waiter.set_result(None)
406 def wait_for(fut, timeout, loop=None):
407 """Wait for the single Future or coroutine to complete, with timeout.
409 Coroutine will be wrapped in Task.
411 Returns result of the Future or coroutine. When a timeout occurs,
412 it cancels the task and raises TimeoutError. To avoid the task
413 cancellation, wrap it in shield().
415 If the wait is cancelled, the task is also cancelled.
417 This function is a coroutine.
420 loop = events.get_event_loop()
423 result = yield From(fut)
426 waiter = futures.Future(loop=loop)
427 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
428 cb = functools.partial(_release_waiter, waiter)
430 fut = ensure_future(fut, loop=loop)
431 fut.add_done_callback(cb)
434 # wait until the future completes or the timeout
437 except futures.CancelledError:
438 fut.remove_done_callback(cb)
443 raise Return(fut.result())
445 fut.remove_done_callback(cb)
447 raise futures.TimeoutError()
449 timeout_handle.cancel()
453 def _wait(fs, timeout, return_when, loop):
454 """Internal helper for wait() and _wait_for().
456 The fs argument must be a collection of Futures.
458 assert fs, 'Set of Futures is empty.'
459 waiter = futures.Future(loop=loop)
460 timeout_handle = None
461 if timeout is not None:
462 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
463 non_local = {'counter': len(fs)}
465 def _on_completion(f):
466 non_local['counter'] -= 1
467 if (non_local['counter'] <= 0 or
468 return_when == FIRST_COMPLETED or
469 return_when == FIRST_EXCEPTION and (not f.cancelled() and
470 f.exception() is not None)):
471 if timeout_handle is not None:
472 timeout_handle.cancel()
473 if not waiter.done():
474 waiter.set_result(None)
477 f.add_done_callback(_on_completion)
482 if timeout_handle is not None:
483 timeout_handle.cancel()
485 done, pending = set(), set()
487 f.remove_done_callback(_on_completion)
492 raise Return(done, pending)
495 # This is *not* a @coroutine! It is just an iterator (yielding Futures).
496 def as_completed(fs, loop=None, timeout=None):
497 """Return an iterator whose values are coroutines.
499 When waiting for the yielded coroutines you'll get the results (or
500 exceptions!) of the original Futures (or coroutines), in the order
501 in which and as soon as they complete.
503 This differs from PEP 3148; the proper way to use this is:
505 for f in as_completed(fs):
506 result = yield From(f) # The 'yield' may raise.
509 If a timeout is specified, the 'yield' will raise
510 TimeoutError when the timeout occurs before all Futures are done.
512 Note: The futures 'f' are not necessarily members of fs.
514 if isinstance(fs, futures._FUTURE_CLASSES) or coroutines.iscoroutine(fs):
515 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
516 loop = loop if loop is not None else events.get_event_loop()
517 todo = set(ensure_future(f, loop=loop) for f in set(fs))
518 from .queues import Queue # Import here to avoid circular import problem.
519 done = Queue(loop=loop)
520 timeout_handle = None
524 f.remove_done_callback(_on_completion)
525 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
526 todo.clear() # Can't do todo.remove(f) in the loop.
528 def _on_completion(f):
530 return # _on_timeout() was here first.
533 if not todo and timeout_handle is not None:
534 timeout_handle.cancel()
538 f = yield From(done.get())
540 # Dummy value from _on_timeout().
541 raise futures.TimeoutError
542 raise Return(f.result()) # May raise f.exception().
545 f.add_done_callback(_on_completion)
546 if todo and timeout is not None:
547 timeout_handle = loop.call_later(timeout, _on_timeout)
548 for _ in range(len(todo)):
549 yield _wait_for_one()
553 def sleep(delay, result=None, loop=None):
554 """Coroutine that completes after a given time (in seconds)."""
555 future = futures.Future(loop=loop)
556 h = future._loop.call_later(delay,
557 future._set_result_unless_cancelled, result)
559 result = yield From(future)
565 def async(coro_or_future, loop=None):
566 """Wrap a coroutine in a future.
568 If the argument is a Future, it is returned directly.
570 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
573 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
576 return ensure_future(coro_or_future, loop=loop)
579 def ensure_future(coro_or_future, loop=None):
580 """Wrap a coroutine in a future.
582 If the argument is a Future, it is returned directly.
584 # FIXME: only check if coroutines._DEBUG is True?
585 if isinstance(coro_or_future, coroutines.FromWrapper):
586 coro_or_future = coro_or_future.obj
587 if isinstance(coro_or_future, futures._FUTURE_CLASSES):
588 if loop is not None and loop is not coro_or_future._loop:
589 raise ValueError('loop argument must agree with Future')
590 return coro_or_future
591 elif coroutines.iscoroutine(coro_or_future):
593 loop = events.get_event_loop()
594 task = loop.create_task(coro_or_future)
595 if task._source_traceback:
596 del task._source_traceback[-1]
599 raise TypeError('A Future or coroutine is required')
602 class _GatheringFuture(futures.Future):
603 """Helper for gather().
605 This overrides cancel() to cancel all the children and act more
606 like Task.cancel(), which doesn't immediately mark itself as
610 def __init__(self, children, loop=None):
611 super(_GatheringFuture, self).__init__(loop=loop)
612 self._children = children
617 for child in self._children:
622 def gather(*coros_or_futures, **kw):
623 """Return a future aggregating results from the given coroutines
626 All futures must share the same event loop. If all the tasks are
627 done successfully, the returned future's result is the list of
628 results (in the order of the original sequence, not necessarily
629 the order of results arrival). If *return_exceptions* is True,
630 exceptions in the tasks are treated the same as successful
631 results, and gathered in the result list; otherwise, the first
632 raised exception will be immediately propagated to the returned
635 Cancellation: if the outer Future is cancelled, all children (that
636 have not completed yet) are also cancelled. If any child is
637 cancelled, this is treated as if it raised CancelledError --
638 the outer Future is *not* cancelled in this case. (This is to
639 prevent the cancellation of one child to cause other children to
642 loop = kw.pop('loop', None)
643 return_exceptions = kw.pop('return_exceptions', False)
645 raise TypeError("unexpected keyword")
647 if not coros_or_futures:
648 outer = futures.Future(loop=loop)
653 for arg in set(coros_or_futures):
654 if not isinstance(arg, futures._FUTURE_CLASSES):
655 fut = ensure_future(arg, loop=loop)
658 # The caller cannot control this future, the "destroy pending task"
659 # warning should not be emitted.
660 fut._log_destroy_pending = False
665 elif fut._loop is not loop:
666 raise ValueError("futures are tied to different event loops")
667 arg_to_fut[arg] = fut
669 children = [arg_to_fut[arg] for arg in coros_or_futures]
670 nchildren = len(children)
671 outer = _GatheringFuture(children, loop=loop)
672 non_local = {'nfinished': 0}
673 results = [None] * nchildren
675 def _done_callback(i, fut):
677 if not fut.cancelled():
678 # Mark exception retrieved.
683 res = futures.CancelledError()
684 if not return_exceptions:
685 outer.set_exception(res)
687 elif fut._exception is not None:
688 res = fut.exception() # Mark exception retrieved.
689 if not return_exceptions:
690 outer.set_exception(res)
695 non_local['nfinished'] += 1
696 if non_local['nfinished'] == nchildren:
697 outer.set_result(results)
699 for i, fut in enumerate(children):
700 fut.add_done_callback(functools.partial(_done_callback, i))
704 def shield(arg, loop=None):
705 """Wait for a future, shielding it from cancellation.
709 res = yield From(shield(something()))
711 is exactly equivalent to the statement
713 res = yield From(something())
715 *except* that if the coroutine containing it is cancelled, the
716 task running in something() is not cancelled. From the POV of
717 something(), the cancellation did not happen. But its caller is
718 still cancelled, so the yield-from expression still raises
719 CancelledError. Note: If something() is cancelled by other means
720 this will still cancel shield().
722 If you want to completely ignore cancellation (not recommended)
723 you can combine shield() with a try/except clause, as follows:
726 res = yield From(shield(something()))
727 except CancelledError:
730 inner = ensure_future(arg, loop=loop)
735 outer = futures.Future(loop=loop)
737 def _done_callback(inner):
738 if outer.cancelled():
739 if not inner.cancelled():
740 # Mark inner's result as retrieved.
744 if inner.cancelled():
747 exc = inner.exception()
749 outer.set_exception(exc)
751 outer.set_result(inner.result())
753 inner.add_done_callback(_done_callback)