1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
11 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
13 FIRST_COMPLETED = 'FIRST_COMPLETED'
14 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
15 ALL_COMPLETED = 'ALL_COMPLETED'
16 _AS_COMPLETED = '_AS_COMPLETED'
18 # Possible future states (for internal use by the futures package).
21 # The future was cancelled by the user...
22 CANCELLED = 'CANCELLED'
23 # ...and _Waiter.add_cancelled() was called by a worker.
24 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
31 CANCELLED_AND_NOTIFIED,
35 _STATE_TO_DESCRIPTION_MAP = {
38 CANCELLED: "cancelled",
39 CANCELLED_AND_NOTIFIED: "cancelled",
43 # Logger for internal use by the futures package.
44 LOGGER = logging.getLogger("concurrent.futures")
46 class Error(Exception):
47 """Base class for all future-related exceptions."""
50 class CancelledError(Error):
51 """The Future was cancelled."""
54 class TimeoutError(Error):
55 """The operation exceeded the given deadline."""
58 class _Waiter(object):
59 """Provides the event that wait() and as_completed() block on."""
61 self.event = threading.Event()
62 self.finished_futures = []
64 def add_result(self, future):
65 self.finished_futures.append(future)
67 def add_exception(self, future):
68 self.finished_futures.append(future)
70 def add_cancelled(self, future):
71 self.finished_futures.append(future)
73 class _AsCompletedWaiter(_Waiter):
74 """Used by as_completed()."""
77 super(_AsCompletedWaiter, self).__init__()
78 self.lock = threading.Lock()
80 def add_result(self, future):
82 super(_AsCompletedWaiter, self).add_result(future)
85 def add_exception(self, future):
87 super(_AsCompletedWaiter, self).add_exception(future)
90 def add_cancelled(self, future):
92 super(_AsCompletedWaiter, self).add_cancelled(future)
95 class _FirstCompletedWaiter(_Waiter):
96 """Used by wait(return_when=FIRST_COMPLETED)."""
98 def add_result(self, future):
99 super(_FirstCompletedWaiter, self).add_result(future)
102 def add_exception(self, future):
103 super(_FirstCompletedWaiter, self).add_exception(future)
106 def add_cancelled(self, future):
107 super(_FirstCompletedWaiter, self).add_cancelled(future)
110 class _AllCompletedWaiter(_Waiter):
111 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
113 def __init__(self, num_pending_calls, stop_on_exception):
114 self.num_pending_calls = num_pending_calls
115 self.stop_on_exception = stop_on_exception
116 self.lock = threading.Lock()
117 super(_AllCompletedWaiter, self).__init__()
119 def _decrement_pending_calls(self):
121 self.num_pending_calls -= 1
122 if not self.num_pending_calls:
125 def add_result(self, future):
126 super(_AllCompletedWaiter, self).add_result(future)
127 self._decrement_pending_calls()
129 def add_exception(self, future):
130 super(_AllCompletedWaiter, self).add_exception(future)
131 if self.stop_on_exception:
134 self._decrement_pending_calls()
136 def add_cancelled(self, future):
137 super(_AllCompletedWaiter, self).add_cancelled(future)
138 self._decrement_pending_calls()
140 class _AcquireFutures(object):
141 """A context manager that does an ordered acquire of Future conditions."""
143 def __init__(self, futures):
144 self.futures = sorted(futures, key=id)
147 for future in self.futures:
148 future._condition.acquire()
150 def __exit__(self, *args):
151 for future in self.futures:
152 future._condition.release()
154 def _create_and_install_waiters(fs, return_when):
155 if return_when == _AS_COMPLETED:
156 waiter = _AsCompletedWaiter()
157 elif return_when == FIRST_COMPLETED:
158 waiter = _FirstCompletedWaiter()
161 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
163 if return_when == FIRST_EXCEPTION:
164 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
165 elif return_when == ALL_COMPLETED:
166 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
168 raise ValueError("Invalid return condition: %r" % return_when)
171 f._waiters.append(waiter)
176 def _yield_finished_futures(fs, waiter, ref_collect):
178 Iterate on the list *fs*, yielding finished futures one by one in
180 Before yielding a future, *waiter* is removed from its waiters
181 and the future is removed from each set in the collection of sets
184 The aim of this function is to avoid keeping stale references after
185 the future is yielded and before the iterator resumes.
189 for futures_set in ref_collect:
190 futures_set.remove(f)
192 f._waiters.remove(waiter)
194 # Careful not to keep a reference to the popped value
198 def as_completed(fs, timeout=None):
199 """An iterator over the given futures that yields each as it completes.
202 fs: The sequence of Futures (possibly created by different Executors) to
204 timeout: The maximum number of seconds to wait. If None, then there
205 is no limit on the wait time.
208 An iterator that yields the given Futures as they complete (finished or
209 cancelled). If any given Futures are duplicated, they will be returned
213 TimeoutError: If the entire result iterator could not be generated
214 before the given timeout.
216 if timeout is not None:
217 end_time = timeout + time.time()
220 total_futures = len(fs)
221 with _AcquireFutures(fs):
224 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
225 pending = fs - finished
226 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
227 finished = list(finished)
229 for f in _yield_finished_futures(finished, waiter,
238 wait_timeout = end_time - time.time()
241 '%d (of %d) futures unfinished' % (
242 len(pending), total_futures))
244 waiter.event.wait(wait_timeout)
247 finished = waiter.finished_futures
248 waiter.finished_futures = []
251 # reverse to keep finishing order
253 for f in _yield_finished_futures(finished, waiter,
254 ref_collect=(fs, pending)):
259 # Remove waiter from unfinished futures
262 f._waiters.remove(waiter)
264 DoneAndNotDoneFutures = collections.namedtuple(
265 'DoneAndNotDoneFutures', 'done not_done')
266 def wait(fs, timeout=None, return_when=ALL_COMPLETED):
267 """Wait for the futures in the given sequence to complete.
270 fs: The sequence of Futures (possibly created by different Executors) to
272 timeout: The maximum number of seconds to wait. If None, then there
273 is no limit on the wait time.
274 return_when: Indicates when this function should return. The options
277 FIRST_COMPLETED - Return when any future finishes or is
279 FIRST_EXCEPTION - Return when any future finishes by raising an
280 exception. If no future raises an exception
281 then it is equivalent to ALL_COMPLETED.
282 ALL_COMPLETED - Return when all futures finish or are cancelled.
285 A named 2-tuple of sets. The first set, named 'done', contains the
286 futures that completed (is finished or cancelled) before the wait
287 completed. The second set, named 'not_done', contains uncompleted
290 with _AcquireFutures(fs):
291 done = set(f for f in fs
292 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
293 not_done = set(fs) - done
295 if (return_when == FIRST_COMPLETED) and done:
296 return DoneAndNotDoneFutures(done, not_done)
297 elif (return_when == FIRST_EXCEPTION) and done:
298 if any(f for f in done
299 if not f.cancelled() and f.exception() is not None):
300 return DoneAndNotDoneFutures(done, not_done)
302 if len(done) == len(fs):
303 return DoneAndNotDoneFutures(done, not_done)
305 waiter = _create_and_install_waiters(fs, return_when)
307 waiter.event.wait(timeout)
310 f._waiters.remove(waiter)
312 done.update(waiter.finished_futures)
313 return DoneAndNotDoneFutures(done, set(fs) - done)
315 class Future(object):
316 """Represents the result of an asynchronous computation."""
319 """Initializes the future. Should not be called by clients."""
320 self._condition = threading.Condition()
321 self._state = PENDING
323 self._exception = None
324 self._traceback = None
326 self._done_callbacks = []
328 def _invoke_callbacks(self):
329 for callback in self._done_callbacks:
333 LOGGER.exception('exception calling callback for %r', self)
334 except BaseException:
335 # Explicitly let all other new-style exceptions through so
336 # that we can catch all old-style exceptions with a simple
337 # "except:" clause below.
339 # All old-style exception objects are instances of
340 # types.InstanceType, but "except types.InstanceType:" does
341 # not catch old-style exceptions for some reason. Thus, the
342 # only way to catch all old-style exceptions without catching
343 # any new-style exceptions is to filter out the new-style
344 # exceptions, which all derive from BaseException.
347 # Because of the BaseException clause above, this handler only
348 # executes for old-style exception objects.
349 LOGGER.exception('exception calling callback for %r', self)
352 with self._condition:
353 if self._state == FINISHED:
355 return '<%s at %#x state=%s raised %s>' % (
356 self.__class__.__name__,
358 _STATE_TO_DESCRIPTION_MAP[self._state],
359 self._exception.__class__.__name__)
361 return '<%s at %#x state=%s returned %s>' % (
362 self.__class__.__name__,
364 _STATE_TO_DESCRIPTION_MAP[self._state],
365 self._result.__class__.__name__)
366 return '<%s at %#x state=%s>' % (
367 self.__class__.__name__,
369 _STATE_TO_DESCRIPTION_MAP[self._state])
372 """Cancel the future if possible.
374 Returns True if the future was cancelled, False otherwise. A future
375 cannot be cancelled if it is running or has already completed.
377 with self._condition:
378 if self._state in [RUNNING, FINISHED]:
381 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
384 self._state = CANCELLED
385 self._condition.notify_all()
387 self._invoke_callbacks()
391 """Return True if the future was cancelled."""
392 with self._condition:
393 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
396 """Return True if the future is currently executing."""
397 with self._condition:
398 return self._state == RUNNING
401 """Return True of the future was cancelled or finished executing."""
402 with self._condition:
403 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
405 def __get_result(self):
407 if isinstance(self._exception, types.InstanceType):
408 # The exception is an instance of an old-style class, which
409 # means type(self._exception) returns types.ClassType instead
410 # of the exception's actual class type.
411 exception_type = self._exception.__class__
413 exception_type = type(self._exception)
414 raise exception_type, self._exception, self._traceback
418 def add_done_callback(self, fn):
419 """Attaches a callable that will be called when the future finishes.
422 fn: A callable that will be called with this future as its only
423 argument when the future completes or is cancelled. The callable
424 will always be called by a thread in the same process in which
425 it was added. If the future has already completed or been
426 cancelled then the callable will be called immediately. These
427 callables are called in the order that they were added.
429 with self._condition:
430 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
431 self._done_callbacks.append(fn)
435 def result(self, timeout=None):
436 """Return the result of the call that the future represents.
439 timeout: The number of seconds to wait for the result if the future
440 isn't done. If None, then there is no limit on the wait time.
443 The result of the call that the future represents.
446 CancelledError: If the future was cancelled.
447 TimeoutError: If the future didn't finish executing before the given
449 Exception: If the call raised then that exception will be raised.
451 with self._condition:
452 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
453 raise CancelledError()
454 elif self._state == FINISHED:
455 return self.__get_result()
457 self._condition.wait(timeout)
459 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
460 raise CancelledError()
461 elif self._state == FINISHED:
462 return self.__get_result()
466 def exception_info(self, timeout=None):
467 """Return a tuple of (exception, traceback) raised by the call that the
471 timeout: The number of seconds to wait for the exception if the
472 future isn't done. If None, then there is no limit on the wait
476 The exception raised by the call that the future represents or None
477 if the call completed without raising.
480 CancelledError: If the future was cancelled.
481 TimeoutError: If the future didn't finish executing before the given
484 with self._condition:
485 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
486 raise CancelledError()
487 elif self._state == FINISHED:
488 return self._exception, self._traceback
490 self._condition.wait(timeout)
492 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
493 raise CancelledError()
494 elif self._state == FINISHED:
495 return self._exception, self._traceback
499 def exception(self, timeout=None):
500 """Return the exception raised by the call that the future represents.
503 timeout: The number of seconds to wait for the exception if the
504 future isn't done. If None, then there is no limit on the wait
508 The exception raised by the call that the future represents or None
509 if the call completed without raising.
512 CancelledError: If the future was cancelled.
513 TimeoutError: If the future didn't finish executing before the given
516 return self.exception_info(timeout)[0]
518 # The following methods should only be used by Executors and in tests.
519 def set_running_or_notify_cancel(self):
520 """Mark the future as running or process any cancel notifications.
522 Should only be used by Executor implementations and unit tests.
524 If the future has been cancelled (cancel() was called and returned
525 True) then any threads waiting on the future completing (though calls
526 to as_completed() or wait()) are notified and False is returned.
528 If the future was not cancelled then it is put in the running state
529 (future calls to running() will return True) and True is returned.
531 This method should be called by Executor implementations before
532 executing the work associated with this future. If this method returns
533 False then the work should not be executed.
536 False if the Future was cancelled, True otherwise.
539 RuntimeError: if this method was already called or if set_result()
540 or set_exception() was called.
542 with self._condition:
543 if self._state == CANCELLED:
544 self._state = CANCELLED_AND_NOTIFIED
545 for waiter in self._waiters:
546 waiter.add_cancelled(self)
547 # self._condition.notify_all() is not necessary because
548 # self.cancel() triggers a notification.
550 elif self._state == PENDING:
551 self._state = RUNNING
554 LOGGER.critical('Future %s in unexpected state: %s',
557 raise RuntimeError('Future in unexpected state')
559 def set_result(self, result):
560 """Sets the return value of work associated with the future.
562 Should only be used by Executor implementations and unit tests.
564 with self._condition:
565 self._result = result
566 self._state = FINISHED
567 for waiter in self._waiters:
568 waiter.add_result(self)
569 self._condition.notify_all()
570 self._invoke_callbacks()
572 def set_exception_info(self, exception, traceback):
573 """Sets the result of the future as being the given exception
576 Should only be used by Executor implementations and unit tests.
578 with self._condition:
579 self._exception = exception
580 self._traceback = traceback
581 self._state = FINISHED
582 for waiter in self._waiters:
583 waiter.add_exception(self)
584 self._condition.notify_all()
585 self._invoke_callbacks()
587 def set_exception(self, exception):
588 """Sets the result of the future as being the given exception.
590 Should only be used by Executor implementations and unit tests.
592 self.set_exception_info(exception, None)
594 class Executor(object):
595 """This is an abstract base class for concrete asynchronous executors."""
597 def submit(self, fn, *args, **kwargs):
598 """Submits a callable to be executed with the given arguments.
600 Schedules the callable to be executed as fn(*args, **kwargs) and returns
601 a Future instance representing the execution of the callable.
604 A Future representing the given call.
606 raise NotImplementedError()
608 def map(self, fn, *iterables, **kwargs):
609 """Returns an iterator equivalent to map(fn, iter).
612 fn: A callable that will take as many arguments as there are
614 timeout: The maximum number of seconds to wait. If None, then there
615 is no limit on the wait time.
618 An iterator equivalent to: map(func, *iterables) but the calls may
619 be evaluated out-of-order.
622 TimeoutError: If the entire result iterator could not be generated
623 before the given timeout.
624 Exception: If fn(*args) raises for any values.
626 timeout = kwargs.get('timeout')
627 if timeout is not None:
628 end_time = timeout + time.time()
630 fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
632 # Yield must be hidden in closure so that the futures are submitted
633 # before the first iterator value is required.
634 def result_iterator():
636 # reverse to keep finishing order
639 # Careful not to keep a reference to the popped future
641 yield fs.pop().result()
643 yield fs.pop().result(end_time - time.time())
647 return result_iterator()
649 def shutdown(self, wait=True):
650 """Clean-up the resources associated with the Executor.
652 It is safe to call this method several times. Otherwise, no other
653 methods can be called after this one.
656 wait: If True then shutdown will not return until all running
657 futures have finished executing and the resources used by the
658 executor have been reclaimed.
665 def __exit__(self, exc_type, exc_val, exc_tb):
666 self.shutdown(wait=True)