diff --git a/src/examples/rpp/doxygen/thread_pool.cpp b/src/examples/rpp/doxygen/thread_pool.cpp new file mode 100644 index 000000000..91d52c242 --- /dev/null +++ b/src/examples/rpp/doxygen/thread_pool.cpp @@ -0,0 +1,49 @@ +#include + +#include + +/** + * \example thread_pool.cpp + **/ + +int main() // NOLINT(bugprone-exception-escape) +{ + //! [thread_pool] + const auto scheduler = rpp::schedulers::thread_pool{4}; + rpp::source::just(1, 2, 3, 4, 5, 6, 7, 8) + | rpp::operators::flat_map([scheduler](int value) { return rpp::source::just(scheduler, value) + | rpp::operators::delay(std::chrono::nanoseconds{500}, rpp::schedulers::immediate{}); }) + | rpp::operators::as_blocking() + | rpp::operators::subscribe([](int v) { std::cout << "[" << std::this_thread::get_id() << "] " << v << std::endl; }); + + // Output: (can be in any order but same correlation between thread and values) + // [thread_1] 1 + // [thread_2] 2 + // [thread_3] 3 + // [thread_4] 4 + // [thread_1] 5 + // [thread_2] 6 + // [thread_3] 7 + // [thread_4] 8 + //! [thread_pool] + + //! [computational] + rpp::source::just(1, 2, 3, 4, 5, 6, 7, 8) + | rpp::operators::flat_map([](int value) { return rpp::source::just(rpp::schedulers::computational{}, value) + | rpp::operators::delay(std::chrono::nanoseconds{500}, rpp::schedulers::immediate{}); }) + | rpp::operators::as_blocking() + | rpp::operators::subscribe([](int v) { std::cout << "[" << std::this_thread::get_id() << "] " << v << std::endl; }); + + // Output: (can be in any order but same correlation between thread and values) + // [thread_1] 1 + // [thread_2] 2 + // [thread_3] 3 + // [thread_4] 4 + // [thread_1] 5 + // [thread_2] 6 + // [thread_3] 7 + // [thread_4] 8 + //! [computational] + + return 0; +} diff --git a/src/rpp/rpp/schedulers.hpp b/src/rpp/rpp/schedulers.hpp index b72d9d901..7e02a08f8 100644 --- a/src/rpp/rpp/schedulers.hpp +++ b/src/rpp/rpp/schedulers.hpp @@ -17,7 +17,9 @@ * @ingroup rpp */ +#include #include #include #include #include +#include diff --git a/src/rpp/rpp/schedulers/computational.hpp b/src/rpp/rpp/schedulers/computational.hpp new file mode 100644 index 000000000..3f892bd15 --- /dev/null +++ b/src/rpp/rpp/schedulers/computational.hpp @@ -0,0 +1,38 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include + +namespace rpp::schedulers +{ + /** + * @brief Scheduler owning static thread pool of workers and using "some" thread from this pool on `create_worker` call + * @warning Actually it is static variable to `thread_pool` scheduler + * @note Expected to pass to this scheduler intensive CPU bound tasks with relatevely small duration of execution (to be sure that no any thread with tasks from some other operators would be blocked on that task) + * + * @par Examples + * @snippet thread_pool.cpp computational + * + * @ingroup schedulers + */ + class computational final + { + public: + static auto create_worker() + { + static thread_pool tp{}; + return tp.create_worker(); + } + }; +} // namespace rpp::schedulers diff --git a/src/rpp/rpp/schedulers/fwd.hpp b/src/rpp/rpp/schedulers/fwd.hpp index 31fcd4c76..a4896ffa4 100644 --- a/src/rpp/rpp/schedulers/fwd.hpp +++ b/src/rpp/rpp/schedulers/fwd.hpp @@ -167,6 +167,8 @@ namespace rpp::schedulers class current_thread; class new_thread; class run_loop; + class thread_pool; + class computational; namespace defaults { diff --git a/src/rpp/rpp/schedulers/thread_pool.hpp b/src/rpp/rpp/schedulers/thread_pool.hpp new file mode 100644 index 000000000..d3579d589 --- /dev/null +++ b/src/rpp/rpp/schedulers/thread_pool.hpp @@ -0,0 +1,91 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include + +namespace rpp::schedulers +{ + /** + * @brief Scheduler owning static thread pool of workers and using "some" thread from this pool on `create_worker` call + * @warning Expected to use this scheduler as local variable to share same threads between different operators or as static variable + * + * @par Examples + * @snippet thread_pool.cpp thread_pool + * + * @ingroup schedulers + */ + class thread_pool final + { + using original_worker = decltype(new_thread::create_worker()); + + class worker_strategy + { + public: + worker_strategy(const original_worker& original_worker) + : m_original_worker{original_worker} + { + } + + template Fn> + void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args) const + { + m_original_worker.schedule(duration, std::forward(fn), std::forward(handler), std::forward(args)...); + } + + template Fn> + void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const + { + m_original_worker.schedule(tp, std::forward(fn), std::forward(handler), std::forward(args)...); + } + + static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } + static rpp::schedulers::time_point now() { return original_worker::now(); } + + private: + original_worker m_original_worker; + }; + + public: + explicit thread_pool(size_t threads_count = std::thread::hardware_concurrency()) + : m_state{std::make_shared(threads_count)} + { + } + + rpp::schedulers::worker create_worker() const + { + return rpp::schedulers::worker{m_state->get()}; + } + + private: + class state + { + public: + explicit state(size_t threads_count) + { + threads_count = std::max(size_t{1}, threads_count); + workers.reserve(threads_count); + for (size_t i = 0; i < threads_count; ++i) + workers.emplace_back(new_thread::create_worker()); + } + + const original_worker& get() { return workers[index++ % workers.size()]; } + + private: + std::vector workers{}; + size_t index{}; + }; + + std::shared_ptr m_state{}; + }; +} // namespace rpp::schedulers diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index 33db92ee5..be184450b 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -945,3 +945,61 @@ TEST_CASE("current_thread inside new_thread") REQUIRE(done->load()); CHECK(current_thread_invoked->load()); } + +TEST_CASE("thread_pool uses multiple threads") +{ + auto obs = mock_observer_strategy{}.get_observer().as_dynamic(); + + auto scheduler = rpp::schedulers::thread_pool{3}; + + const auto get_thread_id = [&scheduler, &obs]() { + std::promise promise{}; + scheduler.create_worker().schedule([&promise](const auto&) { + promise.set_value(std::this_thread::get_id()); + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + return promise.get_future().get(); + }; + + const auto thread_1_value = get_thread_id(); + const auto thread_2_value = get_thread_id(); + const auto thread_3_value = get_thread_id(); + CHECK(thread_1_value != thread_2_value); + CHECK(thread_1_value != thread_3_value); + CHECK(thread_2_value != thread_3_value); + + CHECK(thread_1_value == get_thread_id()); + CHECK(thread_2_value == get_thread_id()); + CHECK(thread_3_value == get_thread_id()); +} + + +TEST_CASE("thread_pool shares same thread") +{ + auto obs = mock_observer_strategy{}.get_observer().as_dynamic(); + + auto scheduler = rpp::schedulers::thread_pool{1}; + + std::atomic_bool first_job_done{}; + + scheduler.create_worker().schedule([&first_job_done](const auto&) { + while (!first_job_done) + std::this_thread::yield(); + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + + std::promise second_task_executed_promise{}; + scheduler.create_worker().schedule([&second_task_executed_promise](const auto&) { + second_task_executed_promise.set_value(true); + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + auto f = second_task_executed_promise.get_future(); + + CHECK(f.wait_for(std::chrono::seconds{1}) == std::future_status::timeout); + first_job_done.store(true); + + CHECK(f.get()); +}