efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / concurrent / futures / process.py
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 """Implements ProcessPoolExecutor.
5
6 The follow diagram and text describe the data-flow through the system:
7
8 |======================= In-process =====================|== Out-of-process ==|
9
10 +----------+     +----------+       +--------+     +-----------+    +---------+
11 |          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
12 |          |     +----------+       |        |     +-----------+    |         |
13 |          |     | ...      |       |        |     | ...       |    |         |
14 |          |     | 6        |       |        |     | 5, call() |    |         |
15 |          |     | 7        |       |        |     | ...       |    |         |
16 | Process  |     | ...      |       | Local  |     +-----------+    | Process |
17 |  Pool    |     +----------+       | Worker |                      |  #1..n  |
18 | Executor |                        | Thread |                      |         |
19 |          |     +----------- +     |        |     +-----------+    |         |
20 |          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
21 |          |     +------------+     |        |     +-----------+    |         |
22 |          |     | 6: call()  |     |        |     | ...       |    |         |
23 |          |     |    future  |     |        |     | 4, result |    |         |
24 |          |     | ...        |     |        |     | 3, except |    |         |
25 +----------+     +------------+     +--------+     +-----------+    +---------+
26
27 Executor.submit() called:
28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29 - adds the id of the _WorkItem to the "Work Ids" queue
30
31 Local worker thread:
32 - reads work ids from the "Work Ids" queue and looks up the corresponding
33   WorkItem from the "Work Items" dict: if the work item has been cancelled then
34   it is simply removed from the dict, otherwise it is repackaged as a
35   _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36   until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37   calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38 - reads _ResultItems from "Result Q", updates the future stored in the
39   "Work Items" dict and deletes the dict entry
40
41 Process #1..n:
42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting
43   _ResultItems in "Request Q"
44 """
45
46 import atexit
47 from concurrent.futures import _base
48 import Queue as queue
49 import multiprocessing
50 import threading
51 import weakref
52 import sys
53
54 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
55
56 # Workers are created as daemon threads and processes. This is done to allow the
57 # interpreter to exit when there are still idle processes in a
58 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
59 # allowing workers to die with the interpreter has two undesirable properties:
60 #   - The workers would still be running during interpretor shutdown,
61 #     meaning that they would fail in unpredictable ways.
62 #   - The workers could be killed while evaluating a work item, which could
63 #     be bad if the callable being evaluated has external side-effects e.g.
64 #     writing to a file.
65 #
66 # To work around this problem, an exit handler is installed which tells the
67 # workers to exit when their work queues are empty and then waits until the
68 # threads/processes finish.
69
70 _threads_queues = weakref.WeakKeyDictionary()
71 _shutdown = False
72
73 def _python_exit():
74     global _shutdown
75     _shutdown = True
76     items = list(_threads_queues.items()) if _threads_queues else ()
77     for t, q in items:
78         q.put(None)
79     for t, q in items:
80         t.join(sys.maxint)
81
82 # Controls how many more calls than processes will be queued in the call queue.
83 # A smaller number will mean that processes spend more time idle waiting for
84 # work while a larger number will make Future.cancel() succeed less frequently
85 # (Futures in the call queue cannot be cancelled).
86 EXTRA_QUEUED_CALLS = 1
87
88 class _WorkItem(object):
89     def __init__(self, future, fn, args, kwargs):
90         self.future = future
91         self.fn = fn
92         self.args = args
93         self.kwargs = kwargs
94
95 class _ResultItem(object):
96     def __init__(self, work_id, exception=None, result=None):
97         self.work_id = work_id
98         self.exception = exception
99         self.result = result
100
101 class _CallItem(object):
102     def __init__(self, work_id, fn, args, kwargs):
103         self.work_id = work_id
104         self.fn = fn
105         self.args = args
106         self.kwargs = kwargs
107
108 def _process_worker(call_queue, result_queue):
109     """Evaluates calls from call_queue and places the results in result_queue.
110
111     This worker is run in a separate process.
112
113     Args:
114         call_queue: A multiprocessing.Queue of _CallItems that will be read and
115             evaluated by the worker.
116         result_queue: A multiprocessing.Queue of _ResultItems that will written
117             to by the worker.
118         shutdown: A multiprocessing.Event that will be set as a signal to the
119             worker that it should exit when call_queue is empty.
120     """
121     while True:
122         call_item = call_queue.get(block=True)
123         if call_item is None:
124             # Wake up queue management thread
125             result_queue.put(None)
126             return
127         try:
128             r = call_item.fn(*call_item.args, **call_item.kwargs)
129         except:
130             e = sys.exc_info()[1]
131             result_queue.put(_ResultItem(call_item.work_id,
132                                          exception=e))
133         else:
134             result_queue.put(_ResultItem(call_item.work_id,
135                                          result=r))
136
137 def _add_call_item_to_queue(pending_work_items,
138                             work_ids,
139                             call_queue):
140     """Fills call_queue with _WorkItems from pending_work_items.
141
142     This function never blocks.
143
144     Args:
145         pending_work_items: A dict mapping work ids to _WorkItems e.g.
146             {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
147         work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
148             are consumed and the corresponding _WorkItems from
149             pending_work_items are transformed into _CallItems and put in
150             call_queue.
151         call_queue: A multiprocessing.Queue that will be filled with _CallItems
152             derived from _WorkItems.
153     """
154     while True:
155         if call_queue.full():
156             return
157         try:
158             work_id = work_ids.get(block=False)
159         except queue.Empty:
160             return
161         else:
162             work_item = pending_work_items[work_id]
163
164             if work_item.future.set_running_or_notify_cancel():
165                 call_queue.put(_CallItem(work_id,
166                                          work_item.fn,
167                                          work_item.args,
168                                          work_item.kwargs),
169                                block=True)
170             else:
171                 del pending_work_items[work_id]
172                 continue
173
174 def _queue_management_worker(executor_reference,
175                              processes,
176                              pending_work_items,
177                              work_ids_queue,
178                              call_queue,
179                              result_queue):
180     """Manages the communication between this process and the worker processes.
181
182     This function is run in a local thread.
183
184     Args:
185         executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
186             this thread. Used to determine if the ProcessPoolExecutor has been
187             garbage collected and that this function can exit.
188         process: A list of the multiprocessing.Process instances used as
189             workers.
190         pending_work_items: A dict mapping work ids to _WorkItems e.g.
191             {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
192         work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
193         call_queue: A multiprocessing.Queue that will be filled with _CallItems
194             derived from _WorkItems for processing by the process workers.
195         result_queue: A multiprocessing.Queue of _ResultItems generated by the
196             process workers.
197     """
198     nb_shutdown_processes = [0]
199     def shutdown_one_process():
200         """Tell a worker to terminate, which will in turn wake us again"""
201         call_queue.put(None)
202         nb_shutdown_processes[0] += 1
203     while True:
204         _add_call_item_to_queue(pending_work_items,
205                                 work_ids_queue,
206                                 call_queue)
207
208         result_item = result_queue.get(block=True)
209         if result_item is not None:
210             work_item = pending_work_items[result_item.work_id]
211             del pending_work_items[result_item.work_id]
212
213             if result_item.exception:
214                 work_item.future.set_exception(result_item.exception)
215             else:
216                 work_item.future.set_result(result_item.result)
217             # Delete references to object. See issue16284
218             del work_item
219         # Check whether we should start shutting down.
220         executor = executor_reference()
221         # No more work items can be added if:
222         #   - The interpreter is shutting down OR
223         #   - The executor that owns this worker has been collected OR
224         #   - The executor that owns this worker has been shutdown.
225         if _shutdown or executor is None or executor._shutdown_thread:
226             # Since no new work items can be added, it is safe to shutdown
227             # this thread if there are no pending work items.
228             if not pending_work_items:
229                 while nb_shutdown_processes[0] < len(processes):
230                     shutdown_one_process()
231                 # If .join() is not called on the created processes then
232                 # some multiprocessing.Queue methods may deadlock on Mac OS
233                 # X.
234                 for p in processes:
235                     p.join()
236                 call_queue.close()
237                 return
238         del executor
239
240 _system_limits_checked = False
241 _system_limited = None
242 def _check_system_limits():
243     global _system_limits_checked, _system_limited
244     if _system_limits_checked:
245         if _system_limited:
246             raise NotImplementedError(_system_limited)
247     _system_limits_checked = True
248     try:
249         import os
250         nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
251     except (AttributeError, ValueError):
252         # sysconf not available or setting not available
253         return
254     if nsems_max == -1:
255         # indetermine limit, assume that limit is determined
256         # by available memory only
257         return
258     if nsems_max >= 256:
259         # minimum number of semaphores available
260         # according to POSIX
261         return
262     _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
263     raise NotImplementedError(_system_limited)
264
265
266 class ProcessPoolExecutor(_base.Executor):
267     def __init__(self, max_workers=None):
268         """Initializes a new ProcessPoolExecutor instance.
269
270         Args:
271             max_workers: The maximum number of processes that can be used to
272                 execute the given calls. If None or not given then as many
273                 worker processes will be created as the machine has processors.
274         """
275         _check_system_limits()
276
277         if max_workers is None:
278             self._max_workers = multiprocessing.cpu_count()
279         else:
280             if max_workers <= 0:
281                 raise ValueError("max_workers must be greater than 0")
282
283             self._max_workers = max_workers
284
285         # Make the call queue slightly larger than the number of processes to
286         # prevent the worker processes from idling. But don't make it too big
287         # because futures in the call queue cannot be cancelled.
288         self._call_queue = multiprocessing.Queue(self._max_workers +
289                                                  EXTRA_QUEUED_CALLS)
290         self._result_queue = multiprocessing.Queue()
291         self._work_ids = queue.Queue()
292         self._queue_management_thread = None
293         self._processes = set()
294
295         # Shutdown is a two-step process.
296         self._shutdown_thread = False
297         self._shutdown_lock = threading.Lock()
298         self._queue_count = 0
299         self._pending_work_items = {}
300
301     def _start_queue_management_thread(self):
302         # When the executor gets lost, the weakref callback will wake up
303         # the queue management thread.
304         def weakref_cb(_, q=self._result_queue):
305             q.put(None)
306         if self._queue_management_thread is None:
307             self._queue_management_thread = threading.Thread(
308                     target=_queue_management_worker,
309                     args=(weakref.ref(self, weakref_cb),
310                           self._processes,
311                           self._pending_work_items,
312                           self._work_ids,
313                           self._call_queue,
314                           self._result_queue))
315             self._queue_management_thread.daemon = True
316             self._queue_management_thread.start()
317             _threads_queues[self._queue_management_thread] = self._result_queue
318
319     def _adjust_process_count(self):
320         for _ in range(len(self._processes), self._max_workers):
321             p = multiprocessing.Process(
322                     target=_process_worker,
323                     args=(self._call_queue,
324                           self._result_queue))
325             p.start()
326             self._processes.add(p)
327
328     def submit(self, fn, *args, **kwargs):
329         with self._shutdown_lock:
330             if self._shutdown_thread:
331                 raise RuntimeError('cannot schedule new futures after shutdown')
332
333             f = _base.Future()
334             w = _WorkItem(f, fn, args, kwargs)
335
336             self._pending_work_items[self._queue_count] = w
337             self._work_ids.put(self._queue_count)
338             self._queue_count += 1
339             # Wake up queue management thread
340             self._result_queue.put(None)
341
342             self._start_queue_management_thread()
343             self._adjust_process_count()
344             return f
345     submit.__doc__ = _base.Executor.submit.__doc__
346
347     def shutdown(self, wait=True):
348         with self._shutdown_lock:
349             self._shutdown_thread = True
350         if self._queue_management_thread:
351             # Wake up queue management thread
352             self._result_queue.put(None)
353             if wait:
354                 self._queue_management_thread.join(sys.maxint)
355         # To reduce the risk of openning too many files, remove references to
356         # objects that use file descriptors.
357         self._queue_management_thread = None
358         self._call_queue = None
359         self._result_queue = None
360         self._processes = None
361     shutdown.__doc__ = _base.Executor.shutdown.__doc__
362
363 atexit.register(_python_exit)