efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / concurrent / futures / thread.py
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 """Implements ThreadPoolExecutor."""
5
6 import atexit
7 from concurrent.futures import _base
8 import itertools
9 import Queue as queue
10 import threading
11 import weakref
12 import sys
13
14 try:
15     from multiprocessing import cpu_count
16 except ImportError:
17     # some platforms don't have multiprocessing
18     def cpu_count():
19         return None
20
21 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
22
23 # Workers are created as daemon threads. This is done to allow the interpreter
24 # to exit when there are still idle threads in a ThreadPoolExecutor's thread
25 # pool (i.e. shutdown() was not called). However, allowing workers to die with
26 # the interpreter has two undesirable properties:
27 #   - The workers would still be running during interpretor shutdown,
28 #     meaning that they would fail in unpredictable ways.
29 #   - The workers could be killed while evaluating a work item, which could
30 #     be bad if the callable being evaluated has external side-effects e.g.
31 #     writing to a file.
32 #
33 # To work around this problem, an exit handler is installed which tells the
34 # workers to exit when their work queues are empty and then waits until the
35 # threads finish.
36
37 _threads_queues = weakref.WeakKeyDictionary()
38 _shutdown = False
39
40 def _python_exit():
41     global _shutdown
42     _shutdown = True
43     items = list(_threads_queues.items()) if _threads_queues else ()
44     for t, q in items:
45         q.put(None)
46     for t, q in items:
47         t.join(sys.maxint)
48
49 atexit.register(_python_exit)
50
51 class _WorkItem(object):
52     def __init__(self, future, fn, args, kwargs):
53         self.future = future
54         self.fn = fn
55         self.args = args
56         self.kwargs = kwargs
57
58     def run(self):
59         if not self.future.set_running_or_notify_cancel():
60             return
61
62         try:
63             result = self.fn(*self.args, **self.kwargs)
64         except:
65             e, tb = sys.exc_info()[1:]
66             self.future.set_exception_info(e, tb)
67         else:
68             self.future.set_result(result)
69
70 def _worker(executor_reference, work_queue):
71     try:
72         while True:
73             work_item = work_queue.get(block=True)
74             if work_item is not None:
75                 work_item.run()
76                 # Delete references to object. See issue16284
77                 del work_item
78
79                 # attempt to increment idle count
80                 executor = executor_reference()
81                 if executor is not None:
82                     executor._idle_semaphore.release()
83                 del executor
84                 continue
85             executor = executor_reference()
86             # Exit if:
87             #   - The interpreter is shutting down OR
88             #   - The executor that owns the worker has been collected OR
89             #   - The executor that owns the worker has been shutdown.
90             if _shutdown or executor is None or executor._shutdown:
91                 # Notice other workers
92                 work_queue.put(None)
93                 return
94             del executor
95     except:
96         _base.LOGGER.critical('Exception in worker', exc_info=True)
97
98
99 class ThreadPoolExecutor(_base.Executor):
100
101     # Used to assign unique thread names when thread_name_prefix is not supplied.
102     _counter = itertools.count().next
103
104     def __init__(self, max_workers=None, thread_name_prefix=''):
105         """Initializes a new ThreadPoolExecutor instance.
106
107         Args:
108             max_workers: The maximum number of threads that can be used to
109                 execute the given calls.
110             thread_name_prefix: An optional name prefix to give our threads.
111         """
112         if max_workers is None:
113             # Use this number because ThreadPoolExecutor is often
114             # used to overlap I/O instead of CPU work.
115             max_workers = (cpu_count() or 1) * 5
116         if max_workers <= 0:
117             raise ValueError("max_workers must be greater than 0")
118
119         self._max_workers = max_workers
120         self._work_queue = queue.Queue()
121         self._idle_semaphore = threading.Semaphore(0)
122         self._threads = set()
123         self._shutdown = False
124         self._shutdown_lock = threading.Lock()
125         self._thread_name_prefix = (thread_name_prefix or
126                                     ("ThreadPoolExecutor-%d" % self._counter()))
127
128     def submit(self, fn, *args, **kwargs):
129         with self._shutdown_lock:
130             if self._shutdown:
131                 raise RuntimeError('cannot schedule new futures after shutdown')
132
133             f = _base.Future()
134             w = _WorkItem(f, fn, args, kwargs)
135
136             self._work_queue.put(w)
137             self._adjust_thread_count()
138             return f
139     submit.__doc__ = _base.Executor.submit.__doc__
140
141     def _adjust_thread_count(self):
142         # if idle threads are available, don't spin new threads
143         if self._idle_semaphore.acquire(False):
144             return
145
146         # When the executor gets lost, the weakref callback will wake up
147         # the worker threads.
148         def weakref_cb(_, q=self._work_queue):
149             q.put(None)
150
151         num_threads = len(self._threads)
152         if num_threads < self._max_workers:
153             thread_name = '%s_%d' % (self._thread_name_prefix or self,
154                                      num_threads)
155             t = threading.Thread(name=thread_name, target=_worker,
156                                  args=(weakref.ref(self, weakref_cb),
157                                        self._work_queue))
158             t.daemon = True
159             t.start()
160             self._threads.add(t)
161             _threads_queues[t] = self._work_queue
162
163     def shutdown(self, wait=True):
164         with self._shutdown_lock:
165             self._shutdown = True
166             self._work_queue.put(None)
167         if wait:
168             for t in self._threads:
169                 t.join(sys.maxint)
170     shutdown.__doc__ = _base.Executor.shutdown.__doc__