Don't spin up new threads if there are existing idle ones
Fixes #88.
diff --git a/CHANGES.rst b/CHANGES.rst
index 1f9db94..b5089cc 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,9 @@
+3.3.0
+=====
+
+- Backported bpo-24882: Let ThreadPoolExecutor reuse idle threads before creating new thread
+
+
3.2.0
=====
diff --git a/concurrent/futures/thread.py b/concurrent/futures/thread.py
index bb0ce9d..b5f832f 100644
--- a/concurrent/futures/thread.py
+++ b/concurrent/futures/thread.py
@@ -75,6 +75,12 @@
work_item.run()
# Delete references to object. See issue16284
del work_item
+
+ # attempt to increment idle count
+ executor = executor_reference()
+ if executor is not None:
+ executor._idle_semaphore.release()
+ del executor
continue
executor = executor_reference()
# Exit if:
@@ -112,6 +118,7 @@
self._max_workers = max_workers
self._work_queue = queue.Queue()
+ self._idle_semaphore = threading.Semaphore(0)
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
@@ -132,12 +139,15 @@
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
+ # if idle threads are available, don't spin new threads
+ if self._idle_semaphore.acquire(False):
+ return
+
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
- # TODO(bquinlan): Should avoid creating new threads if there are more
- # idle threads than items in the work queue.
+
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
diff --git a/setup.py b/setup.py
index 792c4f0..510f793 100755
--- a/setup.py
+++ b/setup.py
@@ -27,7 +27,7 @@
readme = f.read()
setup(name='futures',
- version='3.2.0',
+ version='3.3.0',
description='Backport of the concurrent.futures package from Python 3',
long_description=readme,
author='Brian Quinlan',
diff --git a/test_futures.py b/test_futures.py
index d5b3499..f51786d 100644
--- a/test_futures.py
+++ b/test_futures.py
@@ -206,7 +206,15 @@
self.executor.submit(mul, 21, 2)
self.executor.submit(mul, 6, 7)
self.executor.submit(mul, 3, 14)
+ def acquire_lock(lock):
+ lock.acquire()
+
+ sem = threading.Semaphore(0)
+ for i in range(3):
+ self.executor.submit(acquire_lock, sem)
self.assertEqual(len(self.executor._threads), 3)
+ for i in range(3):
+ sem.release()
self.executor.shutdown()
for t in self.executor._threads:
t.join()
@@ -529,6 +537,27 @@
self.assertEqual(executor._max_workers,
(cpu_count() or 1) * 5)
+ def test_saturation(self):
+ executor = self.executor_type(4)
+ def acquire_lock(lock):
+ lock.acquire()
+
+ sem = threading.Semaphore(0)
+ for i in range(15 * executor._max_workers):
+ executor.submit(acquire_lock, sem)
+ self.assertEqual(len(executor._threads), executor._max_workers)
+ for i in range(15 * executor._max_workers):
+ sem.release()
+ executor.shutdown(wait=True)
+
+ def test_idle_thread_reuse(self):
+ executor = self.executor_type()
+ executor.submit(mul, 21, 2).result()
+ executor.submit(mul, 6, 7).result()
+ executor.submit(mul, 3, 14).result()
+ self.assertEqual(len(executor._threads), 1)
+ executor.shutdown(wait=True)
+
class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
pass