Skip to content

Commit eb45911

Browse files
iUnknwnpitrou
authored andcommitted
Switched idle count to semaphor, Updated tests
As suggested by reviewer tomMoral, swapped lock-protected counter with a semaphore to track the number of unused threads. Adjusted test_threads_terminate to wait for completiton of the previous future before submitting a new one (and checking the number of threads used). Also added a new test to confirm the thread pool can be saturated.
1 parent 6c95433 commit eb45911

2 files changed

Lines changed: 12 additions & 15 deletions

File tree

Lib/concurrent/futures/thread.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def _worker(executor_reference, work_queue, initializer, initargs):
8484
# attempt to increment idle count
8585
executor = executor_reference()
8686
if executor is not None:
87-
executor._increase_idle_count()
87+
executor._idle_semaphore.release()
8888
del executor
8989
continue
9090

@@ -140,8 +140,7 @@ def __init__(self, max_workers=None, thread_name_prefix='',
140140

141141
self._max_workers = max_workers
142142
self._work_queue = queue.SimpleQueue()
143-
self._idle_lock = threading.Lock()
144-
self._idle_count = 0
143+
self._idle_semaphore = threading.Semaphore(0)
145144
self._threads = set()
146145
self._broken = False
147146
self._shutdown = False
@@ -188,10 +187,8 @@ def submit(*args, **kwargs):
188187

189188
def _adjust_thread_count(self):
190189
#if idle threads are available, don't spin new threads
191-
with self._idle_lock:
192-
if self._idle_count > 0:
193-
self._idle_count -= 1
194-
return
190+
if self._idle_semaphore.acquire(timeout=0):
191+
return
195192

196193
# When the executor gets lost, the weakref callback will wake up
197194
# the worker threads.
@@ -212,10 +209,6 @@ def weakref_cb(_, q=self._work_queue):
212209
self._threads.add(t)
213210
_threads_queues[t] = self._work_queue
214211

215-
def _increase_idle_count(self):
216-
with self._idle_lock:
217-
self._idle_count += 1
218-
219212
def _initializer_failed(self):
220213
with self._shutdown_lock:
221214
self._broken = ('A thread initializer failed, the thread pool '

Lib/test/test_concurrent_futures.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,10 +346,10 @@ def _prime_executor(self):
346346
pass
347347

348348
def test_threads_terminate(self):
349-
self.executor.submit(mul, 21, 2)
350-
self.executor.submit(mul, 6, 7)
351-
self.executor.submit(mul, 3, 14)
352-
self.assertTrue(len(self.executor._threads) < 3)
349+
self.executor.submit(mul, 21, 2).result()
350+
self.executor.submit(mul, 6, 7).result()
351+
self.executor.submit(mul, 3, 14).result()
352+
self.assertTrue(len(self.executor._threads) == 1)
353353
self.executor.shutdown()
354354
for t in self.executor._threads:
355355
t.join()
@@ -753,6 +753,10 @@ def test_default_workers(self):
753753
self.assertEqual(executor._max_workers,
754754
(os.cpu_count() or 1) * 5)
755755

756+
def test_saturation(self):
757+
list(self.executor.map(lambda x: mul(0, x), range(100 * self.executor._max_workers)))
758+
self.assertEqual(len(self.executor._threads), self.executor._max_workers)
759+
756760

757761
class ProcessPoolExecutorTest(ExecutorTest):
758762

0 commit comments

Comments
 (0)