From 6bb13558dc8789485cd1e877a5a6beddd9031e2a Mon Sep 17 00:00:00 2001 From: Tom Veasey Date: Thu, 17 Mar 2022 16:13:34 +0000 Subject: [PATCH 1/6] Allow on-the-fly adjustment of the effective thread pool size --- include/core/CStaticThreadPool.h | 8 +++- lib/core/CStaticThreadPool.cc | 28 +++++++++---- lib/core/unittest/CStaticThreadPoolTest.cc | 48 +++++++++++++++++++--- 3 files changed, 70 insertions(+), 14 deletions(-) diff --git a/include/core/CStaticThreadPool.h b/include/core/CStaticThreadPool.h index f48fa88d5..9731b3ecb 100644 --- a/include/core/CStaticThreadPool.h +++ b/include/core/CStaticThreadPool.h @@ -46,6 +46,11 @@ class CORE_EXPORT CStaticThreadPool { CStaticThreadPool& operator=(const CStaticThreadPool&) = delete; CStaticThreadPool& operator=(CStaticThreadPool&&) = delete; + //! Adjust the number of threads which are being used by the pool. + //! + //! \note \p threads should be in the range [1, pool size]. + void numberThreadsInUse(std::size_t threads); + //! Schedule a Callable type to be executed by a thread in the pool. //! //! \note This forwards the task to the queue. @@ -88,9 +93,10 @@ class CORE_EXPORT CStaticThreadPool { // This doesn't have to be atomic because it is always only set to true, // always set straight before it is checked on each worker in the pool // and tearing can't happen for single byte writes. - bool m_Done = false; + bool m_Done{false}; std::atomic_bool m_Busy; std::atomic m_Cursor; + std::atomic m_NumberThreadsInUse; TWrappedTaskQueueVec m_TaskQueues; TThreadVec m_Pool; }; diff --git a/lib/core/CStaticThreadPool.cc b/lib/core/CStaticThreadPool.cc index f27eedea1..2d1dc48be 100644 --- a/lib/core/CStaticThreadPool.cc +++ b/lib/core/CStaticThreadPool.cc @@ -25,6 +25,7 @@ std::size_t computeSize(std::size_t hint) { CStaticThreadPool::CStaticThreadPool(std::size_t size) : m_Busy{false}, m_Cursor{0}, m_TaskQueues{computeSize(size)} { + m_NumberThreadsInUse.store(m_TaskQueues.size()); m_Pool.reserve(m_TaskQueues.size()); for (std::size_t id = 0; id < m_TaskQueues.size(); ++id) { try { @@ -40,9 +41,14 @@ CStaticThreadPool::~CStaticThreadPool() { this->shutdown(); } +void CStaticThreadPool::numberThreadsInUse(std::size_t threads) { + threads = std::max(std::min(threads, m_Pool.size()), 1UL); + m_NumberThreadsInUse.store(threads); +} + void CStaticThreadPool::schedule(TTask&& task_) { // Only block if every queue is full. - std::size_t size{m_TaskQueues.size()}; + std::size_t size{m_NumberThreadsInUse.load()}; std::size_t i{m_Cursor.load()}; std::size_t end{i + size}; CWrappedTask task{std::forward(task_)}; @@ -72,8 +78,8 @@ bool CStaticThreadPool::busy() const { return m_Busy.load(); } -void CStaticThreadPool::busy(bool value) { - m_Busy.store(value); +void CStaticThreadPool::busy(bool busy) { + m_Busy.store(busy); } void CStaticThreadPool::shutdown() { @@ -105,7 +111,6 @@ void CStaticThreadPool::worker(std::size_t id) { }; TOptionalTask task; - std::size_t size{m_TaskQueues.size()}; while (m_Done == false) { // We maintain "worker count" queues and each worker has an affinity to a @@ -115,11 +120,18 @@ void CStaticThreadPool::worker(std::size_t id) { // if everything is working well we have essentially no contention between // workers on queue reads. - for (std::size_t i = 0; i < size; ++i) { - task = m_TaskQueues[(id + i) % size].tryPop(ifAllowed); - if (task != boost::none) { - break; + std::size_t size{m_NumberThreadsInUse.load()}; + + // Only steal work if the thread is in use. + if (id < size) { + for (std::size_t i = 0; i < size; ++i) { + task = m_TaskQueues[(id + i) % size].tryPop(ifAllowed); + if (task != boost::none) { + break; + } } + } else { + task = boost::none; } if (task == boost::none) { task = m_TaskQueues[id].pop(); diff --git a/lib/core/unittest/CStaticThreadPoolTest.cc b/lib/core/unittest/CStaticThreadPoolTest.cc index 4bafcdeb0..9dbd87766 100644 --- a/lib/core/unittest/CStaticThreadPoolTest.cc +++ b/lib/core/unittest/CStaticThreadPoolTest.cc @@ -18,7 +18,9 @@ #include #include +#include #include +#include BOOST_AUTO_TEST_SUITE(CStaticThreadPoolTest) @@ -72,7 +74,7 @@ BOOST_AUTO_TEST_CASE(testScheduleDelayMinimisation) { LOG_DEBUG(<< "Time to schedule " << timeToSchedule); //BOOST_TEST_REQUIRE(timeToSchedule <= 1); } - BOOST_REQUIRE_EQUAL(200u, counter.load()); + BOOST_REQUIRE_EQUAL(200, counter.load()); } BOOST_AUTO_TEST_CASE(testThroughputStability) { @@ -108,11 +110,11 @@ BOOST_AUTO_TEST_CASE(testThroughputStability) { uint64_t timeToSchedule{watch.stop()}; LOG_DEBUG(<< "Time to schedule " << timeToSchedule); - //BOOST_TEST_REQUIRE(timeToSchedule >= 330); + //BOOST_TEST_REQUIRE(timeToSchedule >= 300); //BOOST_TEST_REQUIRE(timeToSchedule <= 350); } - BOOST_REQUIRE_EQUAL(2000u, counter.load()); + BOOST_REQUIRE_EQUAL(2000, counter.load()); // The best we can achieve is 2000ms ignoring all overheads. std::uint64_t totalTime{totalTimeWatch.stop()}; @@ -148,7 +150,7 @@ BOOST_AUTO_TEST_CASE(testManyTasksThroughput) { } } - BOOST_REQUIRE_EQUAL(10000u, counter.load()); + BOOST_REQUIRE_EQUAL(10000, counter.load()); // We have 1400ms of delays so the best we can achieve here is 700ms elapsed. std::uint64_t totalTime{watch.stop()}; @@ -202,7 +204,43 @@ BOOST_AUTO_TEST_CASE(testWithExceptions) { } // We didn't lose any real tasks. - BOOST_REQUIRE_EQUAL(200u, counter.load()); + BOOST_REQUIRE_EQUAL(200, counter.load()); +} + +BOOST_AUTO_TEST_CASE(testNumberThreadsInUse) { + + // Start a threadpool then change the number of threads and check we aren't + // getting execution on more than the specified number of distinct threads. + + core::CStaticThreadPool pool{8}; + + std::mutex mutex; + std::size_t numberProcessedTasks{0}; + std::unordered_set executionThreads; + + for (std::size_t numberThreadsInUse : {5, 6, 2}) { + + pool.numberThreadsInUse(numberThreadsInUse); + + for (std::size_t i = 0; i < 200; ++i) { + pool.schedule([&] { + std::scoped_lock lock{mutex}; + ++numberProcessedTasks; + executionThreads.insert(std::this_thread::get_id()); + }); + } + + for (;;) { + std::scoped_lock lock{mutex}; + if (numberProcessedTasks == 200) { + LOG_DEBUG(<< "# threads used = " << executionThreads.size()); + numberProcessedTasks = 0; + executionThreads.clear(); + BOOST_REQUIRE_EQUAL(numberThreadsInUse, executionThreads.size()); + break; + } + } + } } BOOST_AUTO_TEST_SUITE_END() From f047acdece21675beec11357850a4b73d3defdd6 Mon Sep 17 00:00:00 2001 From: Tom Veasey Date: Thu, 17 Mar 2022 17:15:30 +0000 Subject: [PATCH 2/6] Review comments --- lib/core/CStaticThreadPool.cc | 2 +- lib/core/unittest/CStaticThreadPoolTest.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/core/CStaticThreadPool.cc b/lib/core/CStaticThreadPool.cc index 2d1dc48be..ed0a57b13 100644 --- a/lib/core/CStaticThreadPool.cc +++ b/lib/core/CStaticThreadPool.cc @@ -42,7 +42,7 @@ CStaticThreadPool::~CStaticThreadPool() { } void CStaticThreadPool::numberThreadsInUse(std::size_t threads) { - threads = std::max(std::min(threads, m_Pool.size()), 1UL); + threads = std::max(std::min(threads, m_Pool.size()), std::size_t{1}); m_NumberThreadsInUse.store(threads); } diff --git a/lib/core/unittest/CStaticThreadPoolTest.cc b/lib/core/unittest/CStaticThreadPoolTest.cc index 9dbd87766..0c2097ab2 100644 --- a/lib/core/unittest/CStaticThreadPoolTest.cc +++ b/lib/core/unittest/CStaticThreadPoolTest.cc @@ -15,12 +15,12 @@ #include #include +#include #include #include #include #include -#include BOOST_AUTO_TEST_SUITE(CStaticThreadPoolTest) @@ -216,7 +216,7 @@ BOOST_AUTO_TEST_CASE(testNumberThreadsInUse) { std::mutex mutex; std::size_t numberProcessedTasks{0}; - std::unordered_set executionThreads; + boost::unordered_set executionThreads; for (std::size_t numberThreadsInUse : {5, 6, 2}) { From b05c9207ddeba1c0dcc0855da550a7a0415d08d5 Mon Sep 17 00:00:00 2001 From: Tom Veasey Date: Thu, 17 Mar 2022 17:18:11 +0000 Subject: [PATCH 3/6] Correct test --- lib/core/unittest/CStaticThreadPoolTest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/core/unittest/CStaticThreadPoolTest.cc b/lib/core/unittest/CStaticThreadPoolTest.cc index 0c2097ab2..b111b5ed4 100644 --- a/lib/core/unittest/CStaticThreadPoolTest.cc +++ b/lib/core/unittest/CStaticThreadPoolTest.cc @@ -234,9 +234,9 @@ BOOST_AUTO_TEST_CASE(testNumberThreadsInUse) { std::scoped_lock lock{mutex}; if (numberProcessedTasks == 200) { LOG_DEBUG(<< "# threads used = " << executionThreads.size()); + BOOST_REQUIRE_EQUAL(numberThreadsInUse, executionThreads.size()); numberProcessedTasks = 0; executionThreads.clear(); - BOOST_REQUIRE_EQUAL(numberThreadsInUse, executionThreads.size()); break; } } From 67741daa74ff048027cc54489da02b584dd47ce7 Mon Sep 17 00:00:00 2001 From: Tom Veasey Date: Thu, 17 Mar 2022 18:27:10 +0000 Subject: [PATCH 4/6] Build fix --- lib/core/unittest/CStaticThreadPoolTest.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/core/unittest/CStaticThreadPoolTest.cc b/lib/core/unittest/CStaticThreadPoolTest.cc index b111b5ed4..0245c3048 100644 --- a/lib/core/unittest/CStaticThreadPoolTest.cc +++ b/lib/core/unittest/CStaticThreadPoolTest.cc @@ -212,11 +212,13 @@ BOOST_AUTO_TEST_CASE(testNumberThreadsInUse) { // Start a threadpool then change the number of threads and check we aren't // getting execution on more than the specified number of distinct threads. + using TThreadIdUSet = boost::unordered_set>; + core::CStaticThreadPool pool{8}; std::mutex mutex; std::size_t numberProcessedTasks{0}; - boost::unordered_set executionThreads; + TThreadIdUSet executionThreads; for (std::size_t numberThreadsInUse : {5, 6, 2}) { From 0b8e39b9a6d4181dbf1d4472a74c9372a86447b2 Mon Sep 17 00:00:00 2001 From: Tom Veasey Date: Thu, 17 Mar 2022 22:59:33 +0000 Subject: [PATCH 5/6] Test fix --- lib/core/unittest/CStaticThreadPoolTest.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/core/unittest/CStaticThreadPoolTest.cc b/lib/core/unittest/CStaticThreadPoolTest.cc index 0245c3048..195315ec0 100644 --- a/lib/core/unittest/CStaticThreadPoolTest.cc +++ b/lib/core/unittest/CStaticThreadPoolTest.cc @@ -236,7 +236,8 @@ BOOST_AUTO_TEST_CASE(testNumberThreadsInUse) { std::scoped_lock lock{mutex}; if (numberProcessedTasks == 200) { LOG_DEBUG(<< "# threads used = " << executionThreads.size()); - BOOST_REQUIRE_EQUAL(numberThreadsInUse, executionThreads.size()); + // A subset of threads can steal all the work. + BOOST_REQUIRE(numberThreadsInUse <= executionThreads.size()); numberProcessedTasks = 0; executionThreads.clear(); break; From e241b29ca35d3b25e65ee832ca3fd7cc39c83cf5 Mon Sep 17 00:00:00 2001 From: Tom Veasey Date: Fri, 18 Mar 2022 09:41:34 +0000 Subject: [PATCH 6/6] Typo --- lib/core/unittest/CStaticThreadPoolTest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/core/unittest/CStaticThreadPoolTest.cc b/lib/core/unittest/CStaticThreadPoolTest.cc index 195315ec0..c6ea2045a 100644 --- a/lib/core/unittest/CStaticThreadPoolTest.cc +++ b/lib/core/unittest/CStaticThreadPoolTest.cc @@ -237,7 +237,7 @@ BOOST_AUTO_TEST_CASE(testNumberThreadsInUse) { if (numberProcessedTasks == 200) { LOG_DEBUG(<< "# threads used = " << executionThreads.size()); // A subset of threads can steal all the work. - BOOST_REQUIRE(numberThreadsInUse <= executionThreads.size()); + BOOST_REQUIRE(executionThreads.size() <= numberThreadsInUse); numberProcessedTasks = 0; executionThreads.clear(); break;