From 85730625f17d3b52d7cf1f4a739a46fa4ddd040c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 20:24:50 +0300 Subject: [PATCH 1/5] add thread_pool --- src/examples/rpp/doxygen/thread_pool.cpp | 31 ++++++++ src/rpp/rpp/schedulers.hpp | 1 + src/rpp/rpp/schedulers/fwd.hpp | 1 + src/rpp/rpp/schedulers/thread_pool.hpp | 97 ++++++++++++++++++++++++ 4 files changed, 130 insertions(+) create mode 100644 src/examples/rpp/doxygen/thread_pool.cpp create mode 100644 src/rpp/rpp/schedulers/thread_pool.hpp diff --git a/src/examples/rpp/doxygen/thread_pool.cpp b/src/examples/rpp/doxygen/thread_pool.cpp new file mode 100644 index 000000000..3377933ee --- /dev/null +++ b/src/examples/rpp/doxygen/thread_pool.cpp @@ -0,0 +1,31 @@ +#include + +#include + +/** + * \example thread_pool.cpp + **/ + +int main() // NOLINT(bugprone-exception-escape) +{ + //! [thread_pool] + const auto scheduler = rpp::schedulers::thread_pool{4}; + rpp::source::just(1, 2, 3, 4, 5, 6, 7, 8) + | rpp::operators::flat_map([scheduler](int value) { return rpp::source::just(scheduler, value) + | rpp::operators::delay(std::chrono::nanoseconds{500}, rpp::schedulers::immediate{}); }) + | rpp::operators::as_blocking() + | rpp::operators::subscribe([](int v) { std::cout << "[" << std::this_thread::get_id() << "] " << v << std::endl; }); + + // Output: (can be in any order but same correlation between thread and values) + // [thread_1] 1 + // [thread_2] 2 + // [thread_3] 3 + // [thread_4] 4 + // [thread_1] 5 + // [thread_2] 6 + // [thread_3] 7 + // [thread_4] 8 + //! thread_pool] + + return 0; +} diff --git a/src/rpp/rpp/schedulers.hpp b/src/rpp/rpp/schedulers.hpp index b72d9d901..1035e0fdc 100644 --- a/src/rpp/rpp/schedulers.hpp +++ b/src/rpp/rpp/schedulers.hpp @@ -21,3 +21,4 @@ #include #include #include +#include diff --git a/src/rpp/rpp/schedulers/fwd.hpp b/src/rpp/rpp/schedulers/fwd.hpp index 31fcd4c76..d190d603a 100644 --- a/src/rpp/rpp/schedulers/fwd.hpp +++ b/src/rpp/rpp/schedulers/fwd.hpp @@ -167,6 +167,7 @@ namespace rpp::schedulers class current_thread; class new_thread; class run_loop; + class thread_pool; namespace defaults { diff --git a/src/rpp/rpp/schedulers/thread_pool.hpp b/src/rpp/rpp/schedulers/thread_pool.hpp new file mode 100644 index 000000000..d2d0e9678 --- /dev/null +++ b/src/rpp/rpp/schedulers/thread_pool.hpp @@ -0,0 +1,97 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include + +namespace rpp::schedulers +{ + /** + * @brief Scheduler owning static thread pool of workers and using "some" thread from this pool on `create_worker` call + * @warning Expected to use this scheduler as local variable to share same threads between different operators or as static variable + * + * @par Examples + * @snippet thread_pool.cpp thread_pool + * + * @ingroup schedulers + */ + class thread_pool final + { + using original_worker = decltype(new_thread::create_worker()); + + class worker_strategy + { + public: + worker_strategy(const original_worker& original_worker) + : m_original_worker{original_worker} + { + } + + template Fn> + void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args) const + { + m_original_worker.schedule(duration, std::forward(fn), std::forward(handler), std::forward(args)...); + } + + template Fn> + void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const + { + m_original_worker.schedule(tp, std::forward(fn), std::forward(handler), std::forward(args)...); + } + + static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } + static rpp::schedulers::time_point now() { return original_worker::now(); } + + private: + original_worker m_original_worker; + }; + + public: + explicit thread_pool(size_t threads_count = std::thread::hardware_concurrency()) + : m_state{std::make_shared(threads_count)} + { + } + + rpp::schedulers::worker create_worker() const + { + return rpp::schedulers::worker{m_state->get()}; + } + + private: + class state + { + public: + explicit state(size_t threads_count) + { + threads_count = std::max(size_t{1}, threads_count); + workers.reserve(threads_count); + for (size_t i = 0; i < threads_count; ++i) + workers.emplace_back(new_thread::create_worker()); + } + + ~state() + { + for (const auto& worker : workers) + worker.get_disposable().dispose(); + } + + const original_worker& get() { return workers[index++ % workers.size()]; } + + private: + std::vector workers{}; + size_t index{}; + }; + + std::shared_ptr m_state{}; + }; +} // namespace rpp::schedulers From 016a6f94d2cdf47b8812632d82ca64d033345892 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 21 Apr 2024 22:59:41 +0300 Subject: [PATCH 2/5] add test --- src/rpp/rpp/schedulers/thread_pool.hpp | 6 ----- src/tests/rpp/test_scheduler.cpp | 31 ++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/src/rpp/rpp/schedulers/thread_pool.hpp b/src/rpp/rpp/schedulers/thread_pool.hpp index d2d0e9678..d3579d589 100644 --- a/src/rpp/rpp/schedulers/thread_pool.hpp +++ b/src/rpp/rpp/schedulers/thread_pool.hpp @@ -79,12 +79,6 @@ namespace rpp::schedulers workers.emplace_back(new_thread::create_worker()); } - ~state() - { - for (const auto& worker : workers) - worker.get_disposable().dispose(); - } - const original_worker& get() { return workers[index++ % workers.size()]; } private: diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index 33db92ee5..ca57f4072 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -52,7 +52,7 @@ static std::string simulate_nested_scheduling(auto worker, const auto& obs, std: out.push_back("Task 3 runs "s + get_thread_id_as_string()); return rpp::schedulers::optional_delay_from_now{}; }, - obs); + abs); out.push_back("Task 2 ends "s + get_thread_id_as_string()); return rpp::schedulers::optional_delay_from_now{}; @@ -348,7 +348,7 @@ TEST_CASE("Immediate scheduler") } } -TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, rpp::schedulers::new_thread) +TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, rpp::schedulers::new_thread, rpp::schedulers::thread_pool) { auto d = rpp::composite_disposable_wrapper::make(); auto mock_obs = mock_observer_strategy{}; @@ -945,3 +945,30 @@ TEST_CASE("current_thread inside new_thread") REQUIRE(done->load()); CHECK(current_thread_invoked->load()); } + +TEST_CASE("thread_pool uses multiple threads") +{ + auto obs = mock_observer_strategy{}.get_observer().as_dynamic(); + + auto scheduler = rpp::schedulers::thread_pool{3}; + + const auto get_thread_id = [&scheduler, &obs]() { + std::promise promise{}; + scheduler.create_worker().schedule([&promise](const auto&) { + promise.set_value(std::this_thread::get_id()); + return rpp::schedulers::optional_delay_from_now{}; + }, obs); + return promise.get_future().get(); + }; + + const auto thread_1_value = get_thread_id(); + const auto thread_2_value = get_thread_id(); + const auto thread_3_value = get_thread_id(); + CHECK(thread_1_value != thread_2_value); + CHECK(thread_1_value != thread_3_value); + CHECK(thread_2_value != thread_3_value); + + CHECK(thread_1_value == get_thread_id()); + CHECK(thread_2_value == get_thread_id()); + CHECK(thread_3_value == get_thread_id()); +} From 61da5a57e6984e7848961d1050cf53754dbffc34 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 21 Apr 2024 19:59:57 +0000 Subject: [PATCH 3/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tests/rpp/test_scheduler.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index ca57f4072..66fe33ad3 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -957,10 +957,11 @@ TEST_CASE("thread_pool uses multiple threads") scheduler.create_worker().schedule([&promise](const auto&) { promise.set_value(std::this_thread::get_id()); return rpp::schedulers::optional_delay_from_now{}; - }, obs); + }, + obs); return promise.get_future().get(); }; - + const auto thread_1_value = get_thread_id(); const auto thread_2_value = get_thread_id(); const auto thread_3_value = get_thread_id(); From 92c6ac77938fee0c23ffb7f1d2d4238e5b66d4e7 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 21 Apr 2024 23:12:13 +0300 Subject: [PATCH 4/5] extend tests --- src/examples/rpp/doxygen/thread_pool.cpp | 20 ++++++++++++- src/rpp/rpp/schedulers.hpp | 1 + src/rpp/rpp/schedulers/computational.hpp | 37 ++++++++++++++++++++++++ src/rpp/rpp/schedulers/fwd.hpp | 1 + src/tests/rpp/test_scheduler.cpp | 34 ++++++++++++++++++++-- 5 files changed, 90 insertions(+), 3 deletions(-) create mode 100644 src/rpp/rpp/schedulers/computational.hpp diff --git a/src/examples/rpp/doxygen/thread_pool.cpp b/src/examples/rpp/doxygen/thread_pool.cpp index 3377933ee..5897a751a 100644 --- a/src/examples/rpp/doxygen/thread_pool.cpp +++ b/src/examples/rpp/doxygen/thread_pool.cpp @@ -25,7 +25,25 @@ int main() // NOLINT(bugprone-exception-escape) // [thread_2] 6 // [thread_3] 7 // [thread_4] 8 - //! thread_pool] + //! [thread_pool] + + //! [computational] + rpp::source::just(1, 2, 3, 4, 5, 6, 7, 8) + | rpp::operators::flat_map([](int value) { return rpp::source::just(rpp::schedulers::computational{}, value) + | rpp::operators::delay(std::chrono::nanoseconds{500}, rpp::schedulers::immediate{}); }) + | rpp::operators::as_blocking() + | rpp::operators::subscribe([](int v) { std::cout << "[" << std::this_thread::get_id() << "] " << v << std::endl; }); + + // Output: (can be in any order but same correlation between thread and values) + // [thread_1] 1 + // [thread_2] 2 + // [thread_3] 3 + // [thread_4] 4 + // [thread_1] 5 + // [thread_2] 6 + // [thread_3] 7 + // [thread_4] 8 + //! [computational] return 0; } diff --git a/src/rpp/rpp/schedulers.hpp b/src/rpp/rpp/schedulers.hpp index 1035e0fdc..4bd4fed03 100644 --- a/src/rpp/rpp/schedulers.hpp +++ b/src/rpp/rpp/schedulers.hpp @@ -22,3 +22,4 @@ #include #include #include +#include diff --git a/src/rpp/rpp/schedulers/computational.hpp b/src/rpp/rpp/schedulers/computational.hpp new file mode 100644 index 000000000..b90653706 --- /dev/null +++ b/src/rpp/rpp/schedulers/computational.hpp @@ -0,0 +1,37 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include + +namespace rpp::schedulers +{ + /** + * @brief Scheduler owning static thread pool of workers and using "some" thread from this pool on `create_worker` call + * @warning Actually it is static variable to `thread_pool` scheduler + * @note Expected to pass to this scheduler intensive CPU bound tasks with relatevely small duration of execution (to be sure that no any thread with tasks from some other operators would be blocked on that task) + * + * @par Examples + * @snippet thread_pool.cpp computational + * + * @ingroup schedulers + */ + class computational final + { + public: + static auto create_worker() + { + static thread_pool tp{}; + return tp.create_worker(); + } + }; +} \ No newline at end of file diff --git a/src/rpp/rpp/schedulers/fwd.hpp b/src/rpp/rpp/schedulers/fwd.hpp index d190d603a..a4896ffa4 100644 --- a/src/rpp/rpp/schedulers/fwd.hpp +++ b/src/rpp/rpp/schedulers/fwd.hpp @@ -168,6 +168,7 @@ namespace rpp::schedulers class new_thread; class run_loop; class thread_pool; + class computational; namespace defaults { diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index 66fe33ad3..40b7117f6 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -52,7 +52,7 @@ static std::string simulate_nested_scheduling(auto worker, const auto& obs, std: out.push_back("Task 3 runs "s + get_thread_id_as_string()); return rpp::schedulers::optional_delay_from_now{}; }, - abs); + obs); out.push_back("Task 2 ends "s + get_thread_id_as_string()); return rpp::schedulers::optional_delay_from_now{}; @@ -348,7 +348,7 @@ TEST_CASE("Immediate scheduler") } } -TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, rpp::schedulers::new_thread, rpp::schedulers::thread_pool) +TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, rpp::schedulers::new_thread) { auto d = rpp::composite_disposable_wrapper::make(); auto mock_obs = mock_observer_strategy{}; @@ -973,3 +973,33 @@ TEST_CASE("thread_pool uses multiple threads") CHECK(thread_2_value == get_thread_id()); CHECK(thread_3_value == get_thread_id()); } + + +TEST_CASE("thread_pool shares same thread") +{ + auto obs = mock_observer_strategy{}.get_observer().as_dynamic(); + + auto scheduler = rpp::schedulers::thread_pool{1}; + + std::atomic_bool first_job_done{}; + + scheduler.create_worker().schedule([&first_job_done](const auto&) { + while (!first_job_done) + std::this_thread::yield(); + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + + std::promise second_task_executed_promise{}; + scheduler.create_worker().schedule([&second_task_executed_promise](const auto&) { + second_task_executed_promise.set_value(true); + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + auto f = second_task_executed_promise.get_future(); + + CHECK(f.wait_for(std::chrono::seconds{1}) == std::future_status::timeout); + first_job_done.store(true); + + CHECK(f.get()); +} \ No newline at end of file From d364b3b05536e42c540c34d7acf545321aea816b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 21 Apr 2024 20:11:55 +0000 Subject: [PATCH 5/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/examples/rpp/doxygen/thread_pool.cpp | 2 +- src/rpp/rpp/schedulers.hpp | 2 +- src/rpp/rpp/schedulers/computational.hpp | 3 ++- src/tests/rpp/test_scheduler.cpp | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/examples/rpp/doxygen/thread_pool.cpp b/src/examples/rpp/doxygen/thread_pool.cpp index 5897a751a..91d52c242 100644 --- a/src/examples/rpp/doxygen/thread_pool.cpp +++ b/src/examples/rpp/doxygen/thread_pool.cpp @@ -30,7 +30,7 @@ int main() // NOLINT(bugprone-exception-escape) //! [computational] rpp::source::just(1, 2, 3, 4, 5, 6, 7, 8) | rpp::operators::flat_map([](int value) { return rpp::source::just(rpp::schedulers::computational{}, value) - | rpp::operators::delay(std::chrono::nanoseconds{500}, rpp::schedulers::immediate{}); }) + | rpp::operators::delay(std::chrono::nanoseconds{500}, rpp::schedulers::immediate{}); }) | rpp::operators::as_blocking() | rpp::operators::subscribe([](int v) { std::cout << "[" << std::this_thread::get_id() << "] " << v << std::endl; }); diff --git a/src/rpp/rpp/schedulers.hpp b/src/rpp/rpp/schedulers.hpp index 4bd4fed03..7e02a08f8 100644 --- a/src/rpp/rpp/schedulers.hpp +++ b/src/rpp/rpp/schedulers.hpp @@ -17,9 +17,9 @@ * @ingroup rpp */ +#include #include #include #include #include #include -#include diff --git a/src/rpp/rpp/schedulers/computational.hpp b/src/rpp/rpp/schedulers/computational.hpp index b90653706..3f892bd15 100644 --- a/src/rpp/rpp/schedulers/computational.hpp +++ b/src/rpp/rpp/schedulers/computational.hpp @@ -11,6 +11,7 @@ #pragma once #include + #include namespace rpp::schedulers @@ -34,4 +35,4 @@ namespace rpp::schedulers return tp.create_worker(); } }; -} \ No newline at end of file +} // namespace rpp::schedulers diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index 40b7117f6..be184450b 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -1002,4 +1002,4 @@ TEST_CASE("thread_pool shares same thread") first_job_done.store(true); CHECK(f.get()); -} \ No newline at end of file +}