efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / concurrent / futures / _base.py
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 import collections
5 import logging
6 import threading
7 import itertools
8 import time
9 import types
10
11 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
12
13 FIRST_COMPLETED = 'FIRST_COMPLETED'
14 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
15 ALL_COMPLETED = 'ALL_COMPLETED'
16 _AS_COMPLETED = '_AS_COMPLETED'
17
18 # Possible future states (for internal use by the futures package).
19 PENDING = 'PENDING'
20 RUNNING = 'RUNNING'
21 # The future was cancelled by the user...
22 CANCELLED = 'CANCELLED'
23 # ...and _Waiter.add_cancelled() was called by a worker.
24 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
25 FINISHED = 'FINISHED'
26
27 _FUTURE_STATES = [
28     PENDING,
29     RUNNING,
30     CANCELLED,
31     CANCELLED_AND_NOTIFIED,
32     FINISHED
33 ]
34
35 _STATE_TO_DESCRIPTION_MAP = {
36     PENDING: "pending",
37     RUNNING: "running",
38     CANCELLED: "cancelled",
39     CANCELLED_AND_NOTIFIED: "cancelled",
40     FINISHED: "finished"
41 }
42
43 # Logger for internal use by the futures package.
44 LOGGER = logging.getLogger("concurrent.futures")
45
46 class Error(Exception):
47     """Base class for all future-related exceptions."""
48     pass
49
50 class CancelledError(Error):
51     """The Future was cancelled."""
52     pass
53
54 class TimeoutError(Error):
55     """The operation exceeded the given deadline."""
56     pass
57
58 class _Waiter(object):
59     """Provides the event that wait() and as_completed() block on."""
60     def __init__(self):
61         self.event = threading.Event()
62         self.finished_futures = []
63
64     def add_result(self, future):
65         self.finished_futures.append(future)
66
67     def add_exception(self, future):
68         self.finished_futures.append(future)
69
70     def add_cancelled(self, future):
71         self.finished_futures.append(future)
72
73 class _AsCompletedWaiter(_Waiter):
74     """Used by as_completed()."""
75
76     def __init__(self):
77         super(_AsCompletedWaiter, self).__init__()
78         self.lock = threading.Lock()
79
80     def add_result(self, future):
81         with self.lock:
82             super(_AsCompletedWaiter, self).add_result(future)
83             self.event.set()
84
85     def add_exception(self, future):
86         with self.lock:
87             super(_AsCompletedWaiter, self).add_exception(future)
88             self.event.set()
89
90     def add_cancelled(self, future):
91         with self.lock:
92             super(_AsCompletedWaiter, self).add_cancelled(future)
93             self.event.set()
94
95 class _FirstCompletedWaiter(_Waiter):
96     """Used by wait(return_when=FIRST_COMPLETED)."""
97
98     def add_result(self, future):
99         super(_FirstCompletedWaiter, self).add_result(future)
100         self.event.set()
101
102     def add_exception(self, future):
103         super(_FirstCompletedWaiter, self).add_exception(future)
104         self.event.set()
105
106     def add_cancelled(self, future):
107         super(_FirstCompletedWaiter, self).add_cancelled(future)
108         self.event.set()
109
110 class _AllCompletedWaiter(_Waiter):
111     """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
112
113     def __init__(self, num_pending_calls, stop_on_exception):
114         self.num_pending_calls = num_pending_calls
115         self.stop_on_exception = stop_on_exception
116         self.lock = threading.Lock()
117         super(_AllCompletedWaiter, self).__init__()
118
119     def _decrement_pending_calls(self):
120         with self.lock:
121             self.num_pending_calls -= 1
122             if not self.num_pending_calls:
123                 self.event.set()
124
125     def add_result(self, future):
126         super(_AllCompletedWaiter, self).add_result(future)
127         self._decrement_pending_calls()
128
129     def add_exception(self, future):
130         super(_AllCompletedWaiter, self).add_exception(future)
131         if self.stop_on_exception:
132             self.event.set()
133         else:
134             self._decrement_pending_calls()
135
136     def add_cancelled(self, future):
137         super(_AllCompletedWaiter, self).add_cancelled(future)
138         self._decrement_pending_calls()
139
140 class _AcquireFutures(object):
141     """A context manager that does an ordered acquire of Future conditions."""
142
143     def __init__(self, futures):
144         self.futures = sorted(futures, key=id)
145
146     def __enter__(self):
147         for future in self.futures:
148             future._condition.acquire()
149
150     def __exit__(self, *args):
151         for future in self.futures:
152             future._condition.release()
153
154 def _create_and_install_waiters(fs, return_when):
155     if return_when == _AS_COMPLETED:
156         waiter = _AsCompletedWaiter()
157     elif return_when == FIRST_COMPLETED:
158         waiter = _FirstCompletedWaiter()
159     else:
160         pending_count = sum(
161                 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
162
163         if return_when == FIRST_EXCEPTION:
164             waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
165         elif return_when == ALL_COMPLETED:
166             waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
167         else:
168             raise ValueError("Invalid return condition: %r" % return_when)
169
170     for f in fs:
171         f._waiters.append(waiter)
172
173     return waiter
174
175
176 def _yield_finished_futures(fs, waiter, ref_collect):
177     """
178     Iterate on the list *fs*, yielding finished futures one by one in
179     reverse order.
180     Before yielding a future, *waiter* is removed from its waiters
181     and the future is removed from each set in the collection of sets
182     *ref_collect*.
183
184     The aim of this function is to avoid keeping stale references after
185     the future is yielded and before the iterator resumes.
186     """
187     while fs:
188         f = fs[-1]
189         for futures_set in ref_collect:
190             futures_set.remove(f)
191         with f._condition:
192             f._waiters.remove(waiter)
193         del f
194         # Careful not to keep a reference to the popped value
195         yield fs.pop()
196
197
198 def as_completed(fs, timeout=None):
199     """An iterator over the given futures that yields each as it completes.
200
201     Args:
202         fs: The sequence of Futures (possibly created by different Executors) to
203             iterate over.
204         timeout: The maximum number of seconds to wait. If None, then there
205             is no limit on the wait time.
206
207     Returns:
208         An iterator that yields the given Futures as they complete (finished or
209         cancelled). If any given Futures are duplicated, they will be returned
210         once.
211
212     Raises:
213         TimeoutError: If the entire result iterator could not be generated
214             before the given timeout.
215     """
216     if timeout is not None:
217         end_time = timeout + time.time()
218
219     fs = set(fs)
220     total_futures = len(fs)
221     with _AcquireFutures(fs):
222         finished = set(
223                 f for f in fs
224                 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
225         pending = fs - finished
226         waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
227     finished = list(finished)
228     try:
229         for f in _yield_finished_futures(finished, waiter,
230                                          ref_collect=(fs,)):
231             f = [f]
232             yield f.pop()
233
234         while pending:
235             if timeout is None:
236                 wait_timeout = None
237             else:
238                 wait_timeout = end_time - time.time()
239                 if wait_timeout < 0:
240                     raise TimeoutError(
241                             '%d (of %d) futures unfinished' % (
242                             len(pending), total_futures))
243
244             waiter.event.wait(wait_timeout)
245
246             with waiter.lock:
247                 finished = waiter.finished_futures
248                 waiter.finished_futures = []
249                 waiter.event.clear()
250
251             # reverse to keep finishing order
252             finished.reverse()
253             for f in _yield_finished_futures(finished, waiter,
254                                              ref_collect=(fs, pending)):
255                 f = [f]
256                 yield f.pop()
257
258     finally:
259         # Remove waiter from unfinished futures
260         for f in fs:
261             with f._condition:
262                 f._waiters.remove(waiter)
263
264 DoneAndNotDoneFutures = collections.namedtuple(
265         'DoneAndNotDoneFutures', 'done not_done')
266 def wait(fs, timeout=None, return_when=ALL_COMPLETED):
267     """Wait for the futures in the given sequence to complete.
268
269     Args:
270         fs: The sequence of Futures (possibly created by different Executors) to
271             wait upon.
272         timeout: The maximum number of seconds to wait. If None, then there
273             is no limit on the wait time.
274         return_when: Indicates when this function should return. The options
275             are:
276
277             FIRST_COMPLETED - Return when any future finishes or is
278                               cancelled.
279             FIRST_EXCEPTION - Return when any future finishes by raising an
280                               exception. If no future raises an exception
281                               then it is equivalent to ALL_COMPLETED.
282             ALL_COMPLETED -   Return when all futures finish or are cancelled.
283
284     Returns:
285         A named 2-tuple of sets. The first set, named 'done', contains the
286         futures that completed (is finished or cancelled) before the wait
287         completed. The second set, named 'not_done', contains uncompleted
288         futures.
289     """
290     with _AcquireFutures(fs):
291         done = set(f for f in fs
292                    if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
293         not_done = set(fs) - done
294
295         if (return_when == FIRST_COMPLETED) and done:
296             return DoneAndNotDoneFutures(done, not_done)
297         elif (return_when == FIRST_EXCEPTION) and done:
298             if any(f for f in done
299                    if not f.cancelled() and f.exception() is not None):
300                 return DoneAndNotDoneFutures(done, not_done)
301
302         if len(done) == len(fs):
303             return DoneAndNotDoneFutures(done, not_done)
304
305         waiter = _create_and_install_waiters(fs, return_when)
306
307     waiter.event.wait(timeout)
308     for f in fs:
309         with f._condition:
310             f._waiters.remove(waiter)
311
312     done.update(waiter.finished_futures)
313     return DoneAndNotDoneFutures(done, set(fs) - done)
314
315 class Future(object):
316     """Represents the result of an asynchronous computation."""
317
318     def __init__(self):
319         """Initializes the future. Should not be called by clients."""
320         self._condition = threading.Condition()
321         self._state = PENDING
322         self._result = None
323         self._exception = None
324         self._traceback = None
325         self._waiters = []
326         self._done_callbacks = []
327
328     def _invoke_callbacks(self):
329         for callback in self._done_callbacks:
330             try:
331                 callback(self)
332             except Exception:
333                 LOGGER.exception('exception calling callback for %r', self)
334             except BaseException:
335                 # Explicitly let all other new-style exceptions through so
336                 # that we can catch all old-style exceptions with a simple
337                 # "except:" clause below.
338                 #
339                 # All old-style exception objects are instances of
340                 # types.InstanceType, but "except types.InstanceType:" does
341                 # not catch old-style exceptions for some reason.  Thus, the
342                 # only way to catch all old-style exceptions without catching
343                 # any new-style exceptions is to filter out the new-style
344                 # exceptions, which all derive from BaseException.
345                 raise
346             except:
347                 # Because of the BaseException clause above, this handler only
348                 # executes for old-style exception objects.
349                 LOGGER.exception('exception calling callback for %r', self)
350
351     def __repr__(self):
352         with self._condition:
353             if self._state == FINISHED:
354                 if self._exception:
355                     return '<%s at %#x state=%s raised %s>' % (
356                         self.__class__.__name__,
357                         id(self),
358                         _STATE_TO_DESCRIPTION_MAP[self._state],
359                         self._exception.__class__.__name__)
360                 else:
361                     return '<%s at %#x state=%s returned %s>' % (
362                         self.__class__.__name__,
363                         id(self),
364                         _STATE_TO_DESCRIPTION_MAP[self._state],
365                         self._result.__class__.__name__)
366             return '<%s at %#x state=%s>' % (
367                     self.__class__.__name__,
368                     id(self),
369                    _STATE_TO_DESCRIPTION_MAP[self._state])
370
371     def cancel(self):
372         """Cancel the future if possible.
373
374         Returns True if the future was cancelled, False otherwise. A future
375         cannot be cancelled if it is running or has already completed.
376         """
377         with self._condition:
378             if self._state in [RUNNING, FINISHED]:
379                 return False
380
381             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
382                 return True
383
384             self._state = CANCELLED
385             self._condition.notify_all()
386
387         self._invoke_callbacks()
388         return True
389
390     def cancelled(self):
391         """Return True if the future was cancelled."""
392         with self._condition:
393             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
394
395     def running(self):
396         """Return True if the future is currently executing."""
397         with self._condition:
398             return self._state == RUNNING
399
400     def done(self):
401         """Return True of the future was cancelled or finished executing."""
402         with self._condition:
403             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
404
405     def __get_result(self):
406         if self._exception:
407             if isinstance(self._exception, types.InstanceType):
408                 # The exception is an instance of an old-style class, which
409                 # means type(self._exception) returns types.ClassType instead
410                 # of the exception's actual class type.
411                 exception_type = self._exception.__class__
412             else:
413                 exception_type = type(self._exception)
414             raise exception_type, self._exception, self._traceback
415         else:
416             return self._result
417
418     def add_done_callback(self, fn):
419         """Attaches a callable that will be called when the future finishes.
420
421         Args:
422             fn: A callable that will be called with this future as its only
423                 argument when the future completes or is cancelled. The callable
424                 will always be called by a thread in the same process in which
425                 it was added. If the future has already completed or been
426                 cancelled then the callable will be called immediately. These
427                 callables are called in the order that they were added.
428         """
429         with self._condition:
430             if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
431                 self._done_callbacks.append(fn)
432                 return
433         fn(self)
434
435     def result(self, timeout=None):
436         """Return the result of the call that the future represents.
437
438         Args:
439             timeout: The number of seconds to wait for the result if the future
440                 isn't done. If None, then there is no limit on the wait time.
441
442         Returns:
443             The result of the call that the future represents.
444
445         Raises:
446             CancelledError: If the future was cancelled.
447             TimeoutError: If the future didn't finish executing before the given
448                 timeout.
449             Exception: If the call raised then that exception will be raised.
450         """
451         with self._condition:
452             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
453                 raise CancelledError()
454             elif self._state == FINISHED:
455                 return self.__get_result()
456
457             self._condition.wait(timeout)
458
459             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
460                 raise CancelledError()
461             elif self._state == FINISHED:
462                 return self.__get_result()
463             else:
464                 raise TimeoutError()
465
466     def exception_info(self, timeout=None):
467         """Return a tuple of (exception, traceback) raised by the call that the
468         future represents.
469
470         Args:
471             timeout: The number of seconds to wait for the exception if the
472                 future isn't done. If None, then there is no limit on the wait
473                 time.
474
475         Returns:
476             The exception raised by the call that the future represents or None
477             if the call completed without raising.
478
479         Raises:
480             CancelledError: If the future was cancelled.
481             TimeoutError: If the future didn't finish executing before the given
482                 timeout.
483         """
484         with self._condition:
485             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
486                 raise CancelledError()
487             elif self._state == FINISHED:
488                 return self._exception, self._traceback
489
490             self._condition.wait(timeout)
491
492             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
493                 raise CancelledError()
494             elif self._state == FINISHED:
495                 return self._exception, self._traceback
496             else:
497                 raise TimeoutError()
498
499     def exception(self, timeout=None):
500         """Return the exception raised by the call that the future represents.
501
502         Args:
503             timeout: The number of seconds to wait for the exception if the
504                 future isn't done. If None, then there is no limit on the wait
505                 time.
506
507         Returns:
508             The exception raised by the call that the future represents or None
509             if the call completed without raising.
510
511         Raises:
512             CancelledError: If the future was cancelled.
513             TimeoutError: If the future didn't finish executing before the given
514                 timeout.
515         """
516         return self.exception_info(timeout)[0]
517
518     # The following methods should only be used by Executors and in tests.
519     def set_running_or_notify_cancel(self):
520         """Mark the future as running or process any cancel notifications.
521
522         Should only be used by Executor implementations and unit tests.
523
524         If the future has been cancelled (cancel() was called and returned
525         True) then any threads waiting on the future completing (though calls
526         to as_completed() or wait()) are notified and False is returned.
527
528         If the future was not cancelled then it is put in the running state
529         (future calls to running() will return True) and True is returned.
530
531         This method should be called by Executor implementations before
532         executing the work associated with this future. If this method returns
533         False then the work should not be executed.
534
535         Returns:
536             False if the Future was cancelled, True otherwise.
537
538         Raises:
539             RuntimeError: if this method was already called or if set_result()
540                 or set_exception() was called.
541         """
542         with self._condition:
543             if self._state == CANCELLED:
544                 self._state = CANCELLED_AND_NOTIFIED
545                 for waiter in self._waiters:
546                     waiter.add_cancelled(self)
547                 # self._condition.notify_all() is not necessary because
548                 # self.cancel() triggers a notification.
549                 return False
550             elif self._state == PENDING:
551                 self._state = RUNNING
552                 return True
553             else:
554                 LOGGER.critical('Future %s in unexpected state: %s',
555                                 id(self),
556                                 self._state)
557                 raise RuntimeError('Future in unexpected state')
558
559     def set_result(self, result):
560         """Sets the return value of work associated with the future.
561
562         Should only be used by Executor implementations and unit tests.
563         """
564         with self._condition:
565             self._result = result
566             self._state = FINISHED
567             for waiter in self._waiters:
568                 waiter.add_result(self)
569             self._condition.notify_all()
570         self._invoke_callbacks()
571
572     def set_exception_info(self, exception, traceback):
573         """Sets the result of the future as being the given exception
574         and traceback.
575
576         Should only be used by Executor implementations and unit tests.
577         """
578         with self._condition:
579             self._exception = exception
580             self._traceback = traceback
581             self._state = FINISHED
582             for waiter in self._waiters:
583                 waiter.add_exception(self)
584             self._condition.notify_all()
585         self._invoke_callbacks()
586
587     def set_exception(self, exception):
588         """Sets the result of the future as being the given exception.
589
590         Should only be used by Executor implementations and unit tests.
591         """
592         self.set_exception_info(exception, None)
593
594 class Executor(object):
595     """This is an abstract base class for concrete asynchronous executors."""
596
597     def submit(self, fn, *args, **kwargs):
598         """Submits a callable to be executed with the given arguments.
599
600         Schedules the callable to be executed as fn(*args, **kwargs) and returns
601         a Future instance representing the execution of the callable.
602
603         Returns:
604             A Future representing the given call.
605         """
606         raise NotImplementedError()
607
608     def map(self, fn, *iterables, **kwargs):
609         """Returns an iterator equivalent to map(fn, iter).
610
611         Args:
612             fn: A callable that will take as many arguments as there are
613                 passed iterables.
614             timeout: The maximum number of seconds to wait. If None, then there
615                 is no limit on the wait time.
616
617         Returns:
618             An iterator equivalent to: map(func, *iterables) but the calls may
619             be evaluated out-of-order.
620
621         Raises:
622             TimeoutError: If the entire result iterator could not be generated
623                 before the given timeout.
624             Exception: If fn(*args) raises for any values.
625         """
626         timeout = kwargs.get('timeout')
627         if timeout is not None:
628             end_time = timeout + time.time()
629
630         fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
631
632         # Yield must be hidden in closure so that the futures are submitted
633         # before the first iterator value is required.
634         def result_iterator():
635             try:
636                 # reverse to keep finishing order
637                 fs.reverse()
638                 while fs:
639                     # Careful not to keep a reference to the popped future
640                     if timeout is None:
641                         yield fs.pop().result()
642                     else:
643                         yield fs.pop().result(end_time - time.time())
644             finally:
645                 for future in fs:
646                     future.cancel()
647         return result_iterator()
648
649     def shutdown(self, wait=True):
650         """Clean-up the resources associated with the Executor.
651
652         It is safe to call this method several times. Otherwise, no other
653         methods can be called after this one.
654
655         Args:
656             wait: If True then shutdown will not return until all running
657                 futures have finished executing and the resources used by the
658                 executor have been reclaimed.
659         """
660         pass
661
662     def __enter__(self):
663         return self
664
665     def __exit__(self, exc_type, exc_val, exc_tb):
666         self.shutdown(wait=True)
667         return False