Cleanup for stow ---STOW
[dotfiles/.git] / .local / lib / python2.7 / site-packages / concurrent / futures / thread.py
diff --git a/.local/lib/python2.7/site-packages/concurrent/futures/thread.py b/.local/lib/python2.7/site-packages/concurrent/futures/thread.py
new file mode 100644 (file)
index 0000000..b5f832f
--- /dev/null
@@ -0,0 +1,170 @@
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
+
+"""Implements ThreadPoolExecutor."""
+
+import atexit
+from concurrent.futures import _base
+import itertools
+import Queue as queue
+import threading
+import weakref
+import sys
+
+try:
+    from multiprocessing import cpu_count
+except ImportError:
+    # some platforms don't have multiprocessing
+    def cpu_count():
+        return None
+
+__author__ = 'Brian Quinlan (brian@sweetapp.com)'
+
+# Workers are created as daemon threads. This is done to allow the interpreter
+# to exit when there are still idle threads in a ThreadPoolExecutor's thread
+# pool (i.e. shutdown() was not called). However, allowing workers to die with
+# the interpreter has two undesirable properties:
+#   - The workers would still be running during interpretor shutdown,
+#     meaning that they would fail in unpredictable ways.
+#   - The workers could be killed while evaluating a work item, which could
+#     be bad if the callable being evaluated has external side-effects e.g.
+#     writing to a file.
+#
+# To work around this problem, an exit handler is installed which tells the
+# workers to exit when their work queues are empty and then waits until the
+# threads finish.
+
+_threads_queues = weakref.WeakKeyDictionary()
+_shutdown = False
+
+def _python_exit():
+    global _shutdown
+    _shutdown = True
+    items = list(_threads_queues.items()) if _threads_queues else ()
+    for t, q in items:
+        q.put(None)
+    for t, q in items:
+        t.join(sys.maxint)
+
+atexit.register(_python_exit)
+
+class _WorkItem(object):
+    def __init__(self, future, fn, args, kwargs):
+        self.future = future
+        self.fn = fn
+        self.args = args
+        self.kwargs = kwargs
+
+    def run(self):
+        if not self.future.set_running_or_notify_cancel():
+            return
+
+        try:
+            result = self.fn(*self.args, **self.kwargs)
+        except:
+            e, tb = sys.exc_info()[1:]
+            self.future.set_exception_info(e, tb)
+        else:
+            self.future.set_result(result)
+
+def _worker(executor_reference, work_queue):
+    try:
+        while True:
+            work_item = work_queue.get(block=True)
+            if work_item is not None:
+                work_item.run()
+                # Delete references to object. See issue16284
+                del work_item
+
+                # attempt to increment idle count
+                executor = executor_reference()
+                if executor is not None:
+                    executor._idle_semaphore.release()
+                del executor
+                continue
+            executor = executor_reference()
+            # Exit if:
+            #   - The interpreter is shutting down OR
+            #   - The executor that owns the worker has been collected OR
+            #   - The executor that owns the worker has been shutdown.
+            if _shutdown or executor is None or executor._shutdown:
+                # Notice other workers
+                work_queue.put(None)
+                return
+            del executor
+    except:
+        _base.LOGGER.critical('Exception in worker', exc_info=True)
+
+
+class ThreadPoolExecutor(_base.Executor):
+
+    # Used to assign unique thread names when thread_name_prefix is not supplied.
+    _counter = itertools.count().next
+
+    def __init__(self, max_workers=None, thread_name_prefix=''):
+        """Initializes a new ThreadPoolExecutor instance.
+
+        Args:
+            max_workers: The maximum number of threads that can be used to
+                execute the given calls.
+            thread_name_prefix: An optional name prefix to give our threads.
+        """
+        if max_workers is None:
+            # Use this number because ThreadPoolExecutor is often
+            # used to overlap I/O instead of CPU work.
+            max_workers = (cpu_count() or 1) * 5
+        if max_workers <= 0:
+            raise ValueError("max_workers must be greater than 0")
+
+        self._max_workers = max_workers
+        self._work_queue = queue.Queue()
+        self._idle_semaphore = threading.Semaphore(0)
+        self._threads = set()
+        self._shutdown = False
+        self._shutdown_lock = threading.Lock()
+        self._thread_name_prefix = (thread_name_prefix or
+                                    ("ThreadPoolExecutor-%d" % self._counter()))
+
+    def submit(self, fn, *args, **kwargs):
+        with self._shutdown_lock:
+            if self._shutdown:
+                raise RuntimeError('cannot schedule new futures after shutdown')
+
+            f = _base.Future()
+            w = _WorkItem(f, fn, args, kwargs)
+
+            self._work_queue.put(w)
+            self._adjust_thread_count()
+            return f
+    submit.__doc__ = _base.Executor.submit.__doc__
+
+    def _adjust_thread_count(self):
+        # if idle threads are available, don't spin new threads
+        if self._idle_semaphore.acquire(False):
+            return
+
+        # When the executor gets lost, the weakref callback will wake up
+        # the worker threads.
+        def weakref_cb(_, q=self._work_queue):
+            q.put(None)
+
+        num_threads = len(self._threads)
+        if num_threads < self._max_workers:
+            thread_name = '%s_%d' % (self._thread_name_prefix or self,
+                                     num_threads)
+            t = threading.Thread(name=thread_name, target=_worker,
+                                 args=(weakref.ref(self, weakref_cb),
+                                       self._work_queue))
+            t.daemon = True
+            t.start()
+            self._threads.add(t)
+            _threads_queues[t] = self._work_queue
+
+    def shutdown(self, wait=True):
+        with self._shutdown_lock:
+            self._shutdown = True
+            self._work_queue.put(None)
+        if wait:
+            for t in self._threads:
+                t.join(sys.maxint)
+    shutdown.__doc__ = _base.Executor.shutdown.__doc__