1 """A Future class similar to the one in PEP 3148."""
3 __all__ = ['CancelledError', 'TimeoutError',
5 'Future', 'wrap_future',
13 import reprlib # Python 3
15 import repr as reprlib # Python 2
19 from . import executor
23 _CANCELLED = 'CANCELLED'
24 _FINISHED = 'FINISHED'
26 Error = executor.Error
27 CancelledError = executor.CancelledError
28 TimeoutError = executor.TimeoutError
30 STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
33 class InvalidStateError(Error):
34 """The operation is not allowed in this state."""
37 class _TracebackLogger(object):
38 """Helper to log a traceback upon destruction if not cleared.
40 This solves a nasty problem with Futures and Tasks that have an
41 exception set: if nobody asks for the exception, the exception is
42 never logged. This violates the Zen of Python: 'Errors should
43 never pass silently. Unless explicitly silenced.'
45 However, we don't want to log the exception as soon as
46 set_exception() is called: if the calling code is written
47 properly, it will get the exception and handle it properly. But
48 we *do* want to log it if result() or exception() was never called
49 -- otherwise developers waste a lot of time wondering why their
50 buggy code fails silently.
52 An earlier attempt added a __del__() method to the Future class
53 itself, but this backfired because the presence of __del__()
54 prevents garbage collection from breaking cycles. A way out of
55 this catch-22 is to avoid having a __del__() method on the Future
56 class itself, but instead to have a reference to a helper object
57 with a __del__() method that logs the traceback, where we ensure
58 that the helper object doesn't participate in cycles, and only the
59 Future has a reference to it.
61 The helper object is added when set_exception() is called. When
62 the Future is collected, and the helper is present, the helper
63 object is also collected, and its __del__() method will log the
64 traceback. When the Future's result() or exception() method is
65 called (and a helper object is present), it removes the helper
66 object, after calling its clear() method to prevent it from
69 One downside is that we do a fair amount of work to extract the
70 traceback from the exception, even when it is never logged. It
71 would seem cheaper to just store the exception object, but that
72 references the traceback, which references stack frames, which may
73 reference the Future, which references the _TracebackLogger, and
74 then the _TracebackLogger would be included in a cycle, which is
75 what we're trying to avoid! As an optimization, we don't
76 immediately format the exception; we only do the work when
77 activate() is called, which call is delayed until after all the
78 Future's callbacks have run. Since usually a Future has at least
79 one callback (typically set by 'yield from') and usually that
80 callback extracts the callback, thereby removing the need to
83 PS. I don't claim credit for this solution. I first heard of it
84 in a discussion about closing files when they are collected.
87 __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
89 def __init__(self, future, exc):
90 self.loop = future._loop
91 self.source_traceback = future._source_traceback
99 self.tb = traceback.format_exception(exc.__class__, exc,
108 msg = 'Future/Task exception was never retrieved\n'
109 if self.source_traceback:
110 src = ''.join(traceback.format_list(self.source_traceback))
111 msg += 'Future/Task created at (most recent call last):\n'
112 msg += '%s\n' % src.rstrip()
113 msg += ''.join(self.tb).rstrip()
114 self.loop.call_exception_handler({'message': msg})
117 class Future(object):
118 """This class is *almost* compatible with concurrent.futures.Future.
122 - result() and exception() do not take a timeout argument and
123 raise an exception when the future isn't done yet.
125 - Callbacks registered with add_done_callback() are always called
126 via the event loop's call_soon_threadsafe().
128 - This class is not compatible with the wait() and as_completed()
129 methods in the concurrent.futures package.
131 (In Python 3.4 or later we may be able to unify the implementations.)
134 # Class variables serving as defaults for instance variables.
139 _source_traceback = None
141 _blocking = False # proper use of future (yield vs yield from)
143 # Used by Python 2 to raise the exception with the original traceback
144 # in the exception() method in debug mode
147 _log_traceback = False # Used for Python 3.4 and later
148 _tb_logger = None # Used for Python 3.3 only
150 def __init__(self, loop=None):
151 """Initialize the future.
153 The optional event_loop argument allows to explicitly set the event
154 loop object used by the future. If it's not provided, the future uses
155 the default event loop.
158 self._loop = events.get_event_loop()
162 if self._loop.get_debug():
163 self._source_traceback = traceback.extract_stack(sys._getframe(1))
165 def _format_callbacks(self):
171 def format_cb(callback):
172 return events._format_callback_source(callback, ())
175 cb = format_cb(cb[0])
177 cb = '{0}, {1}'.format(format_cb(cb[0]), format_cb(cb[1]))
179 cb = '{0}, <{1} more>, {2}'.format(format_cb(cb[0]),
182 return 'cb=[%s]' % cb
184 def _repr_info(self):
185 info = [self._state.lower()]
186 if self._state == _FINISHED:
187 if self._exception is not None:
188 info.append('exception={0!r}'.format(self._exception))
190 # use reprlib to limit the length of the output, especially
191 # for very long strings
192 result = reprlib.repr(self._result)
193 info.append('result={0}'.format(result))
195 info.append(self._format_callbacks())
196 if self._source_traceback:
197 frame = self._source_traceback[-1]
198 info.append('created at %s:%s' % (frame[0], frame[1]))
202 info = self._repr_info()
203 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
205 # On Python 3.3 and older, objects with a destructor part of a reference
206 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
210 if not self._log_traceback:
211 # set_exception() was not called, or result() or exception()
212 # has consumed the exception
214 exc = self._exception
216 'message': ('%s exception was never retrieved'
217 % self.__class__.__name__),
221 if self._source_traceback:
222 context['source_traceback'] = self._source_traceback
223 self._loop.call_exception_handler(context)
226 """Cancel the future and schedule callbacks.
228 If the future is already done or cancelled, return False. Otherwise,
229 change the future's state to cancelled, schedule the callbacks and
232 if self._state != _PENDING:
234 self._state = _CANCELLED
235 self._schedule_callbacks()
238 def _schedule_callbacks(self):
239 """Internal: Ask the event loop to call all callbacks.
241 The callbacks are scheduled to be called as soon as possible. Also
242 clears the callback list.
244 callbacks = self._callbacks[:]
248 self._callbacks[:] = []
249 for callback in callbacks:
250 self._loop.call_soon(callback, self)
253 """Return True if the future was cancelled."""
254 return self._state == _CANCELLED
256 # Don't implement running(); see http://bugs.python.org/issue18699
259 """Return True if the future is done.
261 Done means either that a result / exception are available, or that the
262 future was cancelled.
264 return self._state != _PENDING
267 """Return the result this future represents.
269 If the future has been cancelled, raises CancelledError. If the
270 future's result isn't yet available, raises InvalidStateError. If
271 the future is done and has an exception set, this exception is raised.
273 if self._state == _CANCELLED:
275 if self._state != _FINISHED:
276 raise InvalidStateError('Result is not ready.')
277 self._log_traceback = False
278 if self._tb_logger is not None:
279 self._tb_logger.clear()
280 self._tb_logger = None
281 exc_tb = self._exception_tb
282 self._exception_tb = None
283 if self._exception is not None:
284 if exc_tb is not None:
285 compat.reraise(type(self._exception), self._exception, exc_tb)
287 raise self._exception
291 """Return the exception that was set on this future.
293 The exception (or None if no exception was set) is returned only if
294 the future is done. If the future has been cancelled, raises
295 CancelledError. If the future isn't done yet, raises
298 if self._state == _CANCELLED:
300 if self._state != _FINISHED:
301 raise InvalidStateError('Exception is not set.')
302 self._log_traceback = False
303 if self._tb_logger is not None:
304 self._tb_logger.clear()
305 self._tb_logger = None
306 self._exception_tb = None
307 return self._exception
309 def add_done_callback(self, fn):
310 """Add a callback to be run when the future becomes done.
312 The callback is called with a single argument - the future object. If
313 the future is already done when this is called, the callback is
314 scheduled with call_soon.
316 if self._state != _PENDING:
317 self._loop.call_soon(fn, self)
319 self._callbacks.append(fn)
321 # New method not in PEP 3148.
323 def remove_done_callback(self, fn):
324 """Remove all instances of a callback from the "call when done" list.
326 Returns the number of callbacks removed.
328 filtered_callbacks = [f for f in self._callbacks if f != fn]
329 removed_count = len(self._callbacks) - len(filtered_callbacks)
331 self._callbacks[:] = filtered_callbacks
334 # So-called internal methods (note: no set_running_or_notify_cancel()).
336 def _set_result_unless_cancelled(self, result):
337 """Helper setting the result only if the future was not cancelled."""
340 self.set_result(result)
342 def set_result(self, result):
343 """Mark the future done and set its result.
345 If the future is already done when this method is called, raises
348 if self._state != _PENDING:
349 raise InvalidStateError('{0}: {1!r}'.format(self._state, self))
350 self._result = result
351 self._state = _FINISHED
352 self._schedule_callbacks()
354 def _get_exception_tb(self):
355 return self._exception_tb
357 def set_exception(self, exception):
358 self._set_exception_with_tb(exception, None)
360 def _set_exception_with_tb(self, exception, exc_tb):
361 """Mark the future done and set an exception.
363 If the future is already done when this method is called, raises
366 if self._state != _PENDING:
367 raise InvalidStateError('{0}: {1!r}'.format(self._state, self))
368 if isinstance(exception, type):
369 exception = exception()
370 self._exception = exception
371 if exc_tb is not None:
372 self._exception_tb = exc_tb
375 self._exception_tb = sys.exc_info()[2]
376 self._state = _FINISHED
377 self._schedule_callbacks()
379 self._log_traceback = True
381 self._tb_logger = _TracebackLogger(self, exception)
382 if hasattr(exception, '__traceback__'):
383 # Python 3: exception contains a link to the traceback
385 # Arrange for the logger to be activated after all callbacks
386 # have had a chance to call result() or exception().
387 self._loop.call_soon(self._tb_logger.activate)
389 if self._loop.get_debug():
390 frame = sys._getframe(1)
391 tb = ['Traceback (most recent call last):\n']
392 if self._exception_tb is not None:
393 tb += traceback.format_tb(self._exception_tb)
395 tb += traceback.format_stack(frame)
396 tb += traceback.format_exception_only(type(exception), exception)
397 self._tb_logger.tb = tb
399 self._tb_logger.tb = traceback.format_exception_only(
403 self._tb_logger.exc = None
405 # Truly internal methods.
407 def _copy_state(self, other):
408 """Internal helper to copy state from another Future.
410 The other Future may be a concurrent.futures.Future.
415 assert not self.done()
416 if other.cancelled():
419 exception = other.exception()
420 if exception is not None:
421 self.set_exception(exception)
423 result = other.result()
424 self.set_result(result)
426 if events.asyncio is not None:
427 # Accept also asyncio Future objects for interoperability
428 _FUTURE_CLASSES = (Future, events.asyncio.Future)
430 _FUTURE_CLASSES = Future
432 def wrap_future(fut, loop=None):
433 """Wrap concurrent.futures.Future object."""
434 if isinstance(fut, _FUTURE_CLASSES):
436 assert isinstance(fut, executor.Future), \
437 'concurrent.futures.Future is expected, got {0!r}'.format(fut)
439 loop = events.get_event_loop()
440 new_future = Future(loop=loop)
442 def _check_cancel_other(f):
446 new_future.add_done_callback(_check_cancel_other)
447 fut.add_done_callback(
448 lambda future: loop.call_soon_threadsafe(
449 new_future._copy_state, future))