1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
4 """Implements ThreadPoolExecutor."""
7 from concurrent.futures import _base
15 from multiprocessing import cpu_count
17 # some platforms don't have multiprocessing
21 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
23 # Workers are created as daemon threads. This is done to allow the interpreter
24 # to exit when there are still idle threads in a ThreadPoolExecutor's thread
25 # pool (i.e. shutdown() was not called). However, allowing workers to die with
26 # the interpreter has two undesirable properties:
27 # - The workers would still be running during interpretor shutdown,
28 # meaning that they would fail in unpredictable ways.
29 # - The workers could be killed while evaluating a work item, which could
30 # be bad if the callable being evaluated has external side-effects e.g.
33 # To work around this problem, an exit handler is installed which tells the
34 # workers to exit when their work queues are empty and then waits until the
37 _threads_queues = weakref.WeakKeyDictionary()
43 items = list(_threads_queues.items()) if _threads_queues else ()
49 atexit.register(_python_exit)
51 class _WorkItem(object):
52 def __init__(self, future, fn, args, kwargs):
59 if not self.future.set_running_or_notify_cancel():
63 result = self.fn(*self.args, **self.kwargs)
65 e, tb = sys.exc_info()[1:]
66 self.future.set_exception_info(e, tb)
68 self.future.set_result(result)
70 def _worker(executor_reference, work_queue):
73 work_item = work_queue.get(block=True)
74 if work_item is not None:
76 # Delete references to object. See issue16284
79 # attempt to increment idle count
80 executor = executor_reference()
81 if executor is not None:
82 executor._idle_semaphore.release()
85 executor = executor_reference()
87 # - The interpreter is shutting down OR
88 # - The executor that owns the worker has been collected OR
89 # - The executor that owns the worker has been shutdown.
90 if _shutdown or executor is None or executor._shutdown:
91 # Notice other workers
96 _base.LOGGER.critical('Exception in worker', exc_info=True)
99 class ThreadPoolExecutor(_base.Executor):
101 # Used to assign unique thread names when thread_name_prefix is not supplied.
102 _counter = itertools.count().next
104 def __init__(self, max_workers=None, thread_name_prefix=''):
105 """Initializes a new ThreadPoolExecutor instance.
108 max_workers: The maximum number of threads that can be used to
109 execute the given calls.
110 thread_name_prefix: An optional name prefix to give our threads.
112 if max_workers is None:
113 # Use this number because ThreadPoolExecutor is often
114 # used to overlap I/O instead of CPU work.
115 max_workers = (cpu_count() or 1) * 5
117 raise ValueError("max_workers must be greater than 0")
119 self._max_workers = max_workers
120 self._work_queue = queue.Queue()
121 self._idle_semaphore = threading.Semaphore(0)
122 self._threads = set()
123 self._shutdown = False
124 self._shutdown_lock = threading.Lock()
125 self._thread_name_prefix = (thread_name_prefix or
126 ("ThreadPoolExecutor-%d" % self._counter()))
128 def submit(self, fn, *args, **kwargs):
129 with self._shutdown_lock:
131 raise RuntimeError('cannot schedule new futures after shutdown')
134 w = _WorkItem(f, fn, args, kwargs)
136 self._work_queue.put(w)
137 self._adjust_thread_count()
139 submit.__doc__ = _base.Executor.submit.__doc__
141 def _adjust_thread_count(self):
142 # if idle threads are available, don't spin new threads
143 if self._idle_semaphore.acquire(False):
146 # When the executor gets lost, the weakref callback will wake up
147 # the worker threads.
148 def weakref_cb(_, q=self._work_queue):
151 num_threads = len(self._threads)
152 if num_threads < self._max_workers:
153 thread_name = '%s_%d' % (self._thread_name_prefix or self,
155 t = threading.Thread(name=thread_name, target=_worker,
156 args=(weakref.ref(self, weakref_cb),
161 _threads_queues[t] = self._work_queue
163 def shutdown(self, wait=True):
164 with self._shutdown_lock:
165 self._shutdown = True
166 self._work_queue.put(None)
168 for t in self._threads:
170 shutdown.__doc__ = _base.Executor.shutdown.__doc__