diff --git a/include/core/CStaticThreadPool.h b/include/core/CStaticThreadPool.h index f48fa88d5..9731b3ecb 100644 --- a/include/core/CStaticThreadPool.h +++ b/include/core/CStaticThreadPool.h @@ -46,6 +46,11 @@ class CORE_EXPORT CStaticThreadPool { CStaticThreadPool& operator=(const CStaticThreadPool&) = delete; CStaticThreadPool& operator=(CStaticThreadPool&&) = delete; + //! Adjust the number of threads which are being used by the pool. + //! + //! \note \p threads should be in the range [1, pool size]. + void numberThreadsInUse(std::size_t threads); + //! Schedule a Callable type to be executed by a thread in the pool. //! //! \note This forwards the task to the queue. @@ -88,9 +93,10 @@ class CORE_EXPORT CStaticThreadPool { // This doesn't have to be atomic because it is always only set to true, // always set straight before it is checked on each worker in the pool // and tearing can't happen for single byte writes. - bool m_Done = false; + bool m_Done{false}; std::atomic_bool m_Busy; std::atomic m_Cursor; + std::atomic m_NumberThreadsInUse; TWrappedTaskQueueVec m_TaskQueues; TThreadVec m_Pool; }; diff --git a/lib/core/CStaticThreadPool.cc b/lib/core/CStaticThreadPool.cc index f27eedea1..ed0a57b13 100644 --- a/lib/core/CStaticThreadPool.cc +++ b/lib/core/CStaticThreadPool.cc @@ -25,6 +25,7 @@ std::size_t computeSize(std::size_t hint) { CStaticThreadPool::CStaticThreadPool(std::size_t size) : m_Busy{false}, m_Cursor{0}, m_TaskQueues{computeSize(size)} { + m_NumberThreadsInUse.store(m_TaskQueues.size()); m_Pool.reserve(m_TaskQueues.size()); for (std::size_t id = 0; id < m_TaskQueues.size(); ++id) { try { @@ -40,9 +41,14 @@ CStaticThreadPool::~CStaticThreadPool() { this->shutdown(); } +void CStaticThreadPool::numberThreadsInUse(std::size_t threads) { + threads = std::max(std::min(threads, m_Pool.size()), std::size_t{1}); + m_NumberThreadsInUse.store(threads); +} + void CStaticThreadPool::schedule(TTask&& task_) { // Only block if every queue is full. - std::size_t size{m_TaskQueues.size()}; + std::size_t size{m_NumberThreadsInUse.load()}; std::size_t i{m_Cursor.load()}; std::size_t end{i + size}; CWrappedTask task{std::forward(task_)}; @@ -72,8 +78,8 @@ bool CStaticThreadPool::busy() const { return m_Busy.load(); } -void CStaticThreadPool::busy(bool value) { - m_Busy.store(value); +void CStaticThreadPool::busy(bool busy) { + m_Busy.store(busy); } void CStaticThreadPool::shutdown() { @@ -105,7 +111,6 @@ void CStaticThreadPool::worker(std::size_t id) { }; TOptionalTask task; - std::size_t size{m_TaskQueues.size()}; while (m_Done == false) { // We maintain "worker count" queues and each worker has an affinity to a @@ -115,11 +120,18 @@ void CStaticThreadPool::worker(std::size_t id) { // if everything is working well we have essentially no contention between // workers on queue reads. - for (std::size_t i = 0; i < size; ++i) { - task = m_TaskQueues[(id + i) % size].tryPop(ifAllowed); - if (task != boost::none) { - break; + std::size_t size{m_NumberThreadsInUse.load()}; + + // Only steal work if the thread is in use. + if (id < size) { + for (std::size_t i = 0; i < size; ++i) { + task = m_TaskQueues[(id + i) % size].tryPop(ifAllowed); + if (task != boost::none) { + break; + } } + } else { + task = boost::none; } if (task == boost::none) { task = m_TaskQueues[id].pop(); diff --git a/lib/core/unittest/CStaticThreadPoolTest.cc b/lib/core/unittest/CStaticThreadPoolTest.cc index 4bafcdeb0..c6ea2045a 100644 --- a/lib/core/unittest/CStaticThreadPoolTest.cc +++ b/lib/core/unittest/CStaticThreadPoolTest.cc @@ -15,9 +15,11 @@ #include #include +#include #include #include +#include #include BOOST_AUTO_TEST_SUITE(CStaticThreadPoolTest) @@ -72,7 +74,7 @@ BOOST_AUTO_TEST_CASE(testScheduleDelayMinimisation) { LOG_DEBUG(<< "Time to schedule " << timeToSchedule); //BOOST_TEST_REQUIRE(timeToSchedule <= 1); } - BOOST_REQUIRE_EQUAL(200u, counter.load()); + BOOST_REQUIRE_EQUAL(200, counter.load()); } BOOST_AUTO_TEST_CASE(testThroughputStability) { @@ -108,11 +110,11 @@ BOOST_AUTO_TEST_CASE(testThroughputStability) { uint64_t timeToSchedule{watch.stop()}; LOG_DEBUG(<< "Time to schedule " << timeToSchedule); - //BOOST_TEST_REQUIRE(timeToSchedule >= 330); + //BOOST_TEST_REQUIRE(timeToSchedule >= 300); //BOOST_TEST_REQUIRE(timeToSchedule <= 350); } - BOOST_REQUIRE_EQUAL(2000u, counter.load()); + BOOST_REQUIRE_EQUAL(2000, counter.load()); // The best we can achieve is 2000ms ignoring all overheads. std::uint64_t totalTime{totalTimeWatch.stop()}; @@ -148,7 +150,7 @@ BOOST_AUTO_TEST_CASE(testManyTasksThroughput) { } } - BOOST_REQUIRE_EQUAL(10000u, counter.load()); + BOOST_REQUIRE_EQUAL(10000, counter.load()); // We have 1400ms of delays so the best we can achieve here is 700ms elapsed. std::uint64_t totalTime{watch.stop()}; @@ -202,7 +204,46 @@ BOOST_AUTO_TEST_CASE(testWithExceptions) { } // We didn't lose any real tasks. - BOOST_REQUIRE_EQUAL(200u, counter.load()); + BOOST_REQUIRE_EQUAL(200, counter.load()); +} + +BOOST_AUTO_TEST_CASE(testNumberThreadsInUse) { + + // Start a threadpool then change the number of threads and check we aren't + // getting execution on more than the specified number of distinct threads. + + using TThreadIdUSet = boost::unordered_set>; + + core::CStaticThreadPool pool{8}; + + std::mutex mutex; + std::size_t numberProcessedTasks{0}; + TThreadIdUSet executionThreads; + + for (std::size_t numberThreadsInUse : {5, 6, 2}) { + + pool.numberThreadsInUse(numberThreadsInUse); + + for (std::size_t i = 0; i < 200; ++i) { + pool.schedule([&] { + std::scoped_lock lock{mutex}; + ++numberProcessedTasks; + executionThreads.insert(std::this_thread::get_id()); + }); + } + + for (;;) { + std::scoped_lock lock{mutex}; + if (numberProcessedTasks == 200) { + LOG_DEBUG(<< "# threads used = " << executionThreads.size()); + // A subset of threads can steal all the work. + BOOST_REQUIRE(executionThreads.size() <= numberThreadsInUse); + numberProcessedTasks = 0; + executionThreads.clear(); + break; + } + } + } } BOOST_AUTO_TEST_SUITE_END()