Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion include/core/CStaticThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class CORE_EXPORT CStaticThreadPool {
CStaticThreadPool& operator=(const CStaticThreadPool&) = delete;
CStaticThreadPool& operator=(CStaticThreadPool&&) = delete;

//! Adjust the number of threads which are being used by the pool.
//!
//! \note \p threads should be in the range [1, pool size].
void numberThreadsInUse(std::size_t threads);

//! Schedule a Callable type to be executed by a thread in the pool.
//!
//! \note This forwards the task to the queue.
Expand Down Expand Up @@ -88,9 +93,10 @@ class CORE_EXPORT CStaticThreadPool {
// This doesn't have to be atomic because it is always only set to true,
// always set straight before it is checked on each worker in the pool
// and tearing can't happen for single byte writes.
bool m_Done = false;
bool m_Done{false};
std::atomic_bool m_Busy;
std::atomic<std::uint64_t> m_Cursor;
std::atomic<std::size_t> m_NumberThreadsInUse;
TWrappedTaskQueueVec m_TaskQueues;
TThreadVec m_Pool;
};
Expand Down
28 changes: 20 additions & 8 deletions lib/core/CStaticThreadPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ std::size_t computeSize(std::size_t hint) {

CStaticThreadPool::CStaticThreadPool(std::size_t size)
: m_Busy{false}, m_Cursor{0}, m_TaskQueues{computeSize(size)} {
m_NumberThreadsInUse.store(m_TaskQueues.size());
m_Pool.reserve(m_TaskQueues.size());
for (std::size_t id = 0; id < m_TaskQueues.size(); ++id) {
try {
Expand All @@ -40,9 +41,14 @@ CStaticThreadPool::~CStaticThreadPool() {
this->shutdown();
}

void CStaticThreadPool::numberThreadsInUse(std::size_t threads) {
threads = std::max(std::min(threads, m_Pool.size()), std::size_t{1});
m_NumberThreadsInUse.store(threads);
}

void CStaticThreadPool::schedule(TTask&& task_) {
// Only block if every queue is full.
std::size_t size{m_TaskQueues.size()};
std::size_t size{m_NumberThreadsInUse.load()};
std::size_t i{m_Cursor.load()};
std::size_t end{i + size};
CWrappedTask task{std::forward<TTask>(task_)};
Expand Down Expand Up @@ -72,8 +78,8 @@ bool CStaticThreadPool::busy() const {
return m_Busy.load();
}

void CStaticThreadPool::busy(bool value) {
m_Busy.store(value);
void CStaticThreadPool::busy(bool busy) {
m_Busy.store(busy);
}

void CStaticThreadPool::shutdown() {
Expand Down Expand Up @@ -105,7 +111,6 @@ void CStaticThreadPool::worker(std::size_t id) {
};

TOptionalTask task;
std::size_t size{m_TaskQueues.size()};

while (m_Done == false) {
// We maintain "worker count" queues and each worker has an affinity to a
Expand All @@ -115,11 +120,18 @@ void CStaticThreadPool::worker(std::size_t id) {
// if everything is working well we have essentially no contention between
// workers on queue reads.

for (std::size_t i = 0; i < size; ++i) {
task = m_TaskQueues[(id + i) % size].tryPop(ifAllowed);
if (task != boost::none) {
break;
std::size_t size{m_NumberThreadsInUse.load()};

// Only steal work if the thread is in use.
if (id < size) {
for (std::size_t i = 0; i < size; ++i) {
task = m_TaskQueues[(id + i) % size].tryPop(ifAllowed);
if (task != boost::none) {
break;
}
}
} else {
task = boost::none;
}
if (task == boost::none) {
task = m_TaskQueues[id].pop();
Expand Down
51 changes: 46 additions & 5 deletions lib/core/unittest/CStaticThreadPoolTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
#include <test/CRandomNumbers.h>

#include <boost/test/unit_test.hpp>
#include <boost/unordered_set.hpp>

#include <atomic>
#include <chrono>
#include <mutex>
#include <thread>

BOOST_AUTO_TEST_SUITE(CStaticThreadPoolTest)
Expand Down Expand Up @@ -72,7 +74,7 @@ BOOST_AUTO_TEST_CASE(testScheduleDelayMinimisation) {
LOG_DEBUG(<< "Time to schedule " << timeToSchedule);
//BOOST_TEST_REQUIRE(timeToSchedule <= 1);
}
BOOST_REQUIRE_EQUAL(200u, counter.load());
BOOST_REQUIRE_EQUAL(200, counter.load());
}

BOOST_AUTO_TEST_CASE(testThroughputStability) {
Expand Down Expand Up @@ -108,11 +110,11 @@ BOOST_AUTO_TEST_CASE(testThroughputStability) {

uint64_t timeToSchedule{watch.stop()};
LOG_DEBUG(<< "Time to schedule " << timeToSchedule);
//BOOST_TEST_REQUIRE(timeToSchedule >= 330);
//BOOST_TEST_REQUIRE(timeToSchedule >= 300);
//BOOST_TEST_REQUIRE(timeToSchedule <= 350);
}

BOOST_REQUIRE_EQUAL(2000u, counter.load());
BOOST_REQUIRE_EQUAL(2000, counter.load());

// The best we can achieve is 2000ms ignoring all overheads.
std::uint64_t totalTime{totalTimeWatch.stop()};
Expand Down Expand Up @@ -148,7 +150,7 @@ BOOST_AUTO_TEST_CASE(testManyTasksThroughput) {
}
}

BOOST_REQUIRE_EQUAL(10000u, counter.load());
BOOST_REQUIRE_EQUAL(10000, counter.load());

// We have 1400ms of delays so the best we can achieve here is 700ms elapsed.
std::uint64_t totalTime{watch.stop()};
Expand Down Expand Up @@ -202,7 +204,46 @@ BOOST_AUTO_TEST_CASE(testWithExceptions) {
}

// We didn't lose any real tasks.
BOOST_REQUIRE_EQUAL(200u, counter.load());
BOOST_REQUIRE_EQUAL(200, counter.load());
}

BOOST_AUTO_TEST_CASE(testNumberThreadsInUse) {

// Start a threadpool then change the number of threads and check we aren't
// getting execution on more than the specified number of distinct threads.

using TThreadIdUSet = boost::unordered_set<std::thread::id, std::hash<std::thread::id>>;

core::CStaticThreadPool pool{8};

std::mutex mutex;
std::size_t numberProcessedTasks{0};
TThreadIdUSet executionThreads;

for (std::size_t numberThreadsInUse : {5, 6, 2}) {

pool.numberThreadsInUse(numberThreadsInUse);

for (std::size_t i = 0; i < 200; ++i) {
pool.schedule([&] {
std::scoped_lock<std::mutex> lock{mutex};
++numberProcessedTasks;
executionThreads.insert(std::this_thread::get_id());
});
}

for (;;) {
std::scoped_lock<std::mutex> lock{mutex};
if (numberProcessedTasks == 200) {
LOG_DEBUG(<< "# threads used = " << executionThreads.size());
// A subset of threads can steal all the work.
BOOST_REQUIRE(executionThreads.size() <= numberThreadsInUse);
numberProcessedTasks = 0;
executionThreads.clear();
break;
}
}
}
}

BOOST_AUTO_TEST_SUITE_END()