From cf0559805e77544f6ae8c1479930d92752adff00 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 16 Apr 2024 23:55:47 +0300 Subject: [PATCH 01/15] add new_thread benchmark --- src/benchmarks/benchmarks.cpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index 55b0f4f62..fb30bd631 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -146,6 +146,19 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) }); } + + SECTION("from array of 1 - create + as_blocking + subscribe + new_thread") + { + std::array vals{123}; + TEST_RPP([&]() { + (rpp::source::from_iterable(vals, rpp::schedulers::new_thread{}) | rpp::ops::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + TEST_RXCPP([&]() { + (rxcpp::observable<>::iterate(vals, rxcpp::observe_on_new_thread()) | rxcpp::operators::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } + SECTION("concat_as_source of just(1 immediate) create + subscribe") { TEST_RPP([&]() { From 286fda3cff78acdc4eac48c3b15312a3726dd398 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 00:04:37 +0300 Subject: [PATCH 02/15] improve situation --- src/rpp/rpp/schedulers/current_thread.hpp | 77 +++++++++++++---------- src/rpp/rpp/schedulers/details/queue.hpp | 17 ++--- src/rpp/rpp/schedulers/new_thread.hpp | 52 +++++++-------- 3 files changed, 73 insertions(+), 73 deletions(-) diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index 74aad619b..6557fbbf1 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -87,7 +87,7 @@ namespace rpp::schedulers friend class new_thread; class worker_strategy; - inline static thread_local std::optional> s_queue{}; + inline static thread_local details::schedulables_queue* s_queue{}; struct is_queue_is_empty { @@ -97,16 +97,11 @@ namespace rpp::schedulers }; - static void drain_current_queue() + static void drain_queue() { - drain_queue(s_queue); - } - - static void drain_queue(std::optional>& queue) - { - while (!queue->is_empty()) + while (s_queue && !s_queue->is_empty()) { - auto top = queue->pop(); + auto top = s_queue->pop(); if (top->is_disposed()) continue; @@ -122,13 +117,13 @@ namespace rpp::schedulers else timepoint = (*top)(); - } while (queue->is_empty() && timepoint.has_value()); + } while (s_queue->is_empty() && timepoint.has_value()); if (timepoint.has_value()) - queue->emplace(timepoint.value(), std::move(top)); + s_queue->emplace(timepoint.value(), std::move(top)); } - queue.reset(); + s_queue = nullptr; } class worker_strategy @@ -140,26 +135,22 @@ namespace rpp::schedulers if (handler.is_disposed()) return; - auto& queue = s_queue; - const bool someone_owns_queue = queue.has_value(); - std::optional timepoint{}; - if (!someone_owns_queue) + if (!s_queue) { - queue.emplace(); + details::schedulables_queue queue{}; + s_queue = &queue; - timepoint = details::immediate_scheduling_while_condition(duration, is_queue_is_empty{queue.value()}, fn, handler, args...); + const auto timepoint = details::immediate_scheduling_while_condition(duration, is_queue_is_empty{queue}, fn, handler, args...); if (!timepoint || handler.is_disposed()) - return drain_queue(queue); - } - else - { - timepoint = now() + duration; - } + return drain_queue(); - queue->emplace(timepoint.value(), std::forward(fn), std::forward(handler), std::forward(args)...); + s_queue->emplace(timepoint.value(), std::forward(fn), std::forward(handler), std::forward(args)...); + drain_queue(); + return; + } - if (!someone_owns_queue) - drain_queue(queue); + s_queue->emplace(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); + drain_queue(); } template Fn> @@ -168,7 +159,7 @@ namespace rpp::schedulers if (handler.is_disposed()) return; - if (s_queue.has_value()) + if (s_queue) { s_queue->emplace(tp, std::forward(fn), std::forward(handler), std::forward(args)...); } @@ -183,15 +174,33 @@ namespace rpp::schedulers static rpp::schedulers::time_point now() { return details::now(); } }; - public: - static rpp::utils::finally_action own_queue_and_drain_finally_if_not_owned() + private: + class own_queue_guard { - const bool someone_owns_queue = s_queue.has_value(); + public: + own_queue_guard() + : m_clear_on_destruction{!s_queue} + { + if (m_clear_on_destruction) + s_queue = &m_queue; + } + ~own_queue_guard() + { + if (m_clear_on_destruction) + drain_queue(); + } + own_queue_guard(const own_queue_guard&) = delete; + own_queue_guard(own_queue_guard&&) = delete; - if (!someone_owns_queue) - s_queue.emplace(); + private: + details::schedulables_queue m_queue{}; + bool m_clear_on_destruction{}; + }; - return rpp::utils::finally_action{!someone_owns_queue ? &drain_current_queue : &rpp::utils::empty_function<>}; + public: + static own_queue_guard own_queue_and_drain_finally_if_not_owned() + { + return own_queue_guard{}; } static rpp::schedulers::worker create_worker() diff --git a/src/rpp/rpp/schedulers/details/queue.hpp b/src/rpp/rpp/schedulers/details/queue.hpp index 44b4c6870..a0e8e470b 100644 --- a/src/rpp/rpp/schedulers/details/queue.hpp +++ b/src/rpp/rpp/schedulers/details/queue.hpp @@ -151,7 +151,7 @@ namespace rpp::schedulers::details schedulables_queue& operator=(const schedulables_queue& other) = delete; schedulables_queue& operator=(schedulables_queue&& other) noexcept = default; - schedulables_queue(std::shared_ptr shared_data) + schedulables_queue(std::weak_ptr shared_data) : m_shared_data{std::move(shared_data)} { } @@ -162,8 +162,8 @@ namespace rpp::schedulers::details using schedulable_type = specific_schedulable, std::decay_t, std::decay_t...>; emplace_impl(std::make_shared(timepoint, std::forward(fn), std::forward(handler), std::forward(args)...)); - if (m_shared_data) - m_shared_data->cv.notify_all(); + if (const auto s = m_shared_data.lock()) + s->cv.notify_all(); } void emplace(const time_point& timepoint, std::shared_ptr&& schedulable) @@ -173,8 +173,8 @@ namespace rpp::schedulers::details schedulable->set_timepoint(timepoint); emplace_impl(std::move(schedulable)); - if (m_shared_data) - m_shared_data->cv.notify_all(); + if (const auto s = m_shared_data.lock()) + s->cv.notify_all(); } bool is_empty() const { return !m_head; } @@ -193,7 +193,8 @@ namespace rpp::schedulers::details void emplace_impl(std::shared_ptr&& schedulable) { // needed in case of new_thread and current_thread shares same queue - optional_mutex mutex{m_shared_data ? &m_shared_data->mutex : nullptr}; + const auto s = m_shared_data.lock(); + optional_mutex mutex{s ? &s->mutex : nullptr}; std::lock_guard lock{mutex}; if (!m_head || schedulable->get_timepoint() < m_head->get_timepoint()) @@ -215,7 +216,7 @@ namespace rpp::schedulers::details } private: - std::shared_ptr m_head{}; - std::shared_ptr m_shared_data{}; + std::shared_ptr m_head{}; + std::weak_ptr m_shared_data{}; }; } // namespace rpp::schedulers::details diff --git a/src/rpp/rpp/schedulers/new_thread.hpp b/src/rpp/rpp/schedulers/new_thread.hpp index 6ceabe5b5..deb55226c 100644 --- a/src/rpp/rpp/schedulers/new_thread.hpp +++ b/src/rpp/rpp/schedulers/new_thread.hpp @@ -32,21 +32,12 @@ namespace rpp::schedulers class disposable final : public rpp::details::base_disposable { public: - disposable() - { - // just waiting - while (!m_state->queue_ptr.load(std::memory_order::seq_cst)) - { - }; - } + disposable() = default; template Fn> void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) { - std::lock_guard lock{m_state->mutex}; - // guarded by lock - if (const auto queue = m_state->queue_ptr.load(std::memory_order::seq_cst)) - queue->emplace(time_point, std::forward(fn), std::forward(handler), std::forward(args)...); + m_state->queue.emplace(time_point, std::forward(fn), std::forward(handler), std::forward(args)...); } private: @@ -57,7 +48,7 @@ namespace rpp::schedulers { std::lock_guard lock{m_state->mutex}; - m_state->is_disposed.store(true, std::memory_order::relaxed); + m_state->is_disposed = true; } m_state->cv.notify_all(); @@ -69,58 +60,57 @@ namespace rpp::schedulers struct state_t : public details::shared_queue_data { - std::atomic*> queue_ptr{}; - std::atomic_bool is_disposed{}; + details::schedulables_queue queue{}; + bool is_disposed{}; }; static void data_thread(std::shared_ptr state) { - auto& queue = current_thread::s_queue; - state->queue_ptr.store(&queue.emplace(state), std::memory_order::seq_cst); + current_thread::s_queue = &state->queue; while (true) { std::unique_lock lock{state->mutex}; - - if (queue->is_empty() && state->is_disposed.load(std::memory_order::seq_cst)) + if (state->queue.is_empty() && state->is_disposed) break; - state->cv.wait(lock, [&] { return !queue->is_empty() || state->is_disposed.load(std::memory_order::seq_cst); }); + state->cv.wait(lock, [&] { return !state->queue.is_empty() || state->is_disposed; }); - if (queue->is_empty()) + if (state->queue.is_empty()) break; - if (queue->top()->is_disposed()) + if (state->queue.top()->is_disposed()) { - queue->pop(); + state->queue.pop(); continue; } - if (details::s_last_now_time < queue->top()->get_timepoint()) + if (details::s_last_now_time < state->queue.top()->get_timepoint()) { - if (const auto now = worker_strategy::now(); now < queue->top()->get_timepoint()) + if (const auto now = worker_strategy::now(); now < state->queue.top()->get_timepoint()) { - state->cv.wait_for(lock, queue->top()->get_timepoint() - now, [&] { return queue->top()->is_disposed() || worker_strategy::now() >= queue->top()->get_timepoint(); }); + state->cv.wait_for(lock, state->queue.top()->get_timepoint() - now, [&] { return state->queue.top()->is_disposed() || worker_strategy::now() >= state->queue.top()->get_timepoint(); }); continue; } } - auto top = queue->pop(); + auto top = state->queue.pop(); lock.unlock(); if (const auto timepoint = (*top)()) if (!top->is_disposed()) - queue->emplace(timepoint.value(), std::move(top)); + state->queue.emplace(timepoint.value(), std::move(top)); } - std::unique_lock lock{state->mutex}; - state->queue_ptr.store(nullptr, std::memory_order::seq_cst); - queue.reset(); + current_thread::s_queue = nullptr; } private: std::shared_ptr m_state = std::make_shared(); - std::thread m_thread{&data_thread, m_state}; + + RPP_CALL_DURING_CONSTRUCTION(m_state->queue = details::schedulables_queue(m_state)); + + std::thread m_thread{&data_thread, m_state}; }; public: From f0b41fd53aa6cd9263a755522c38668ed082c99d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 14:02:38 +0300 Subject: [PATCH 03/15] Update current_thread.hpp --- src/rpp/rpp/schedulers/current_thread.hpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index 6557fbbf1..270c8beda 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -145,12 +145,10 @@ namespace rpp::schedulers return drain_queue(); s_queue->emplace(timepoint.value(), std::forward(fn), std::forward(handler), std::forward(args)...); - drain_queue(); - return; + return drain_queue(); } s_queue->emplace(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); - drain_queue(); } template Fn> From ca7dbe53f53e807e36f43782012c4b041373917a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 19:34:46 +0300 Subject: [PATCH 04/15] Update CMakePresets.json --- CMakePresets.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakePresets.json b/CMakePresets.json index 08335a799..c392d9ed0 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -47,7 +47,7 @@ "inherits" : ["ci-flags"], "hidden": true, "cacheVariables": { - "CMAKE_CXX_FLAGS": "/utf-8 /W4 /permissive- /volatile:iso /Zc:preprocessor /EHsc /Zc:__cplusplus /Zc:externConstexpr /Zc:throwingNew" + "CMAKE_CXX_FLAGS": "/utf-8 /W4 /permissive- /volatile:iso /Zc:preprocessor /EHsc /Zc:__cplusplus /Zc:externConstexpr /Zc:throwingNew /bigobj" } }, { From 3da8d58d87a354f7f1c6ee3a1e0fa33121d2a393 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 19:52:13 +0300 Subject: [PATCH 05/15] add 1000 values scheduling example --- src/benchmarks/benchmarks.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index fb30bd631..23024edab 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -159,6 +159,18 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) }); } + SECTION("from array of 1000 - create + as_blocking + subscribe + new_thread") + { + std::array vals{}; + TEST_RPP([&]() { + (rpp::source::from_iterable(vals, rpp::schedulers::new_thread{}) | rpp::ops::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + TEST_RXCPP([&]() { + (rxcpp::observable<>::iterate(vals, rxcpp::observe_on_new_thread()) | rxcpp::operators::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } + SECTION("concat_as_source of just(1 immediate) create + subscribe") { TEST_RPP([&]() { From 266584c938df15d2327d73a304b8a18473a69fe5 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 20:49:15 +0300 Subject: [PATCH 06/15] small improve --- src/rpp/rpp/schedulers/details/queue.hpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/rpp/rpp/schedulers/details/queue.hpp b/src/rpp/rpp/schedulers/details/queue.hpp index a0e8e470b..86adcd168 100644 --- a/src/rpp/rpp/schedulers/details/queue.hpp +++ b/src/rpp/rpp/schedulers/details/queue.hpp @@ -162,8 +162,6 @@ namespace rpp::schedulers::details using schedulable_type = specific_schedulable, std::decay_t, std::decay_t...>; emplace_impl(std::make_shared(timepoint, std::forward(fn), std::forward(handler), std::forward(args)...)); - if (const auto s = m_shared_data.lock()) - s->cv.notify_all(); } void emplace(const time_point& timepoint, std::shared_ptr&& schedulable) @@ -173,8 +171,6 @@ namespace rpp::schedulers::details schedulable->set_timepoint(timepoint); emplace_impl(std::move(schedulable)); - if (const auto s = m_shared_data.lock()) - s->cv.notify_all(); } bool is_empty() const { return !m_head; } @@ -193,7 +189,12 @@ namespace rpp::schedulers::details void emplace_impl(std::shared_ptr&& schedulable) { // needed in case of new_thread and current_thread shares same queue - const auto s = m_shared_data.lock(); + const auto s = m_shared_data.lock(); + const rpp::utils::finally_action _{[&] { + if (s) + s->cv.notify_one(); + }}; + optional_mutex mutex{s ? &s->mutex : nullptr}; std::lock_guard lock{mutex}; From 94bffdbeef85066cca34d6c7058890168f627689 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 22:11:40 +0300 Subject: [PATCH 07/15] speedup --- src/rpp/rpp/schedulers/details/queue.hpp | 88 +++++++++++++++++++----- src/rpp/rpp/schedulers/new_thread.hpp | 20 +++++- 2 files changed, 88 insertions(+), 20 deletions(-) diff --git a/src/rpp/rpp/schedulers/details/queue.hpp b/src/rpp/rpp/schedulers/details/queue.hpp index 86adcd168..f31ae89ad 100644 --- a/src/rpp/rpp/schedulers/details/queue.hpp +++ b/src/rpp/rpp/schedulers/details/queue.hpp @@ -17,12 +17,15 @@ #include #include +#include "rpp/utils/functors.hpp" + #include #include #include #include #include #include +#include namespace rpp::schedulers::details { @@ -36,8 +39,40 @@ namespace rpp::schedulers::details virtual ~schedulable_base() noexcept = default; - virtual std::optional operator()() noexcept = 0; - virtual bool is_disposed() const noexcept = 0; + virtual std::optional operator()() noexcept = 0; + + class advanced_call + { + public: + advanced_call(std::variant data) + : m_data{std::move(data)} + { + } + + + auto visit(const auto& fn) const + { + return std::visit(fn, m_data); + } + + bool can_run_immediately() const noexcept + { + return visit(rpp::utils::overloaded{[](const delay_to&) { + return false; + }, + [](const auto& d) { + return d.value == rpp::schedulers::duration::zero(); + }}); + } + + private: + std::variant m_data; + }; + + virtual std::optional make_advanced_call() noexcept = 0; + virtual time_point handle_advanced_call(const advanced_call&) noexcept = 0; + + virtual bool is_disposed() const noexcept = 0; time_point get_timepoint() const { return m_time_point; } @@ -63,6 +98,20 @@ namespace rpp::schedulers::details requires constraint::schedulable_fn class specific_schedulable final : public schedulable_base { + auto get_advanced_call_handler() const + { + return rpp::utils::overloaded{ + [](const delay_from_now& v) { + return NowStrategy::now() + v.value; + }, + [this](const delay_from_this_timepoint& v) { + return get_timepoint() + v.value; + }, + [](const delay_to& v) { + return v.value; + }}; + } + public: template TFn, typename... TArgs> explicit specific_schedulable(const time_point& time_point, TFn&& in_fn, TArgs&&... in_args) @@ -77,21 +126,21 @@ namespace rpp::schedulers::details try { if (const auto res = m_args.apply(m_fn)) - { - if constexpr (constraint::schedulable_delay_from_now_fn) - { - return NowStrategy::now() + res->value; - } - else if constexpr (constraint::schedulable_delay_to_fn) - { - return res->value; - } - else - { - static_assert(constraint::schedulable_delay_from_this_timepoint_fn); - return get_timepoint() + res->value; - } - } + return get_advanced_call_handler()(res.value()); + } + catch (...) + { + m_args.template get<0>().on_error(std::current_exception()); + } + return std::nullopt; + } + + std::optional make_advanced_call() noexcept override + { + try + { + if (const auto res = m_args.apply(m_fn)) + return advanced_call{res.value()}; } catch (...) { @@ -100,6 +149,11 @@ namespace rpp::schedulers::details return std::nullopt; } + time_point handle_advanced_call(const advanced_call& v) noexcept override + { + return v.visit(get_advanced_call_handler()); + } + bool is_disposed() const noexcept override { return m_args.template get<0>().is_disposed(); } private: diff --git a/src/rpp/rpp/schedulers/new_thread.hpp b/src/rpp/rpp/schedulers/new_thread.hpp index deb55226c..49361ef2c 100644 --- a/src/rpp/rpp/schedulers/new_thread.hpp +++ b/src/rpp/rpp/schedulers/new_thread.hpp @@ -38,6 +38,7 @@ namespace rpp::schedulers void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) { m_state->queue.emplace(time_point, std::forward(fn), std::forward(handler), std::forward(args)...); + m_state->has_fresh_data.store(true); } private: @@ -62,6 +63,7 @@ namespace rpp::schedulers { details::schedulables_queue queue{}; bool is_disposed{}; + std::atomic_bool has_fresh_data{false}; }; static void data_thread(std::shared_ptr state) @@ -95,11 +97,23 @@ namespace rpp::schedulers } auto top = state->queue.pop(); + state->has_fresh_data.store(!state->queue.is_empty()); lock.unlock(); - if (const auto timepoint = (*top)()) - if (!top->is_disposed()) - state->queue.emplace(timepoint.value(), std::move(top)); + while (true) + { + if (const auto res = top->make_advanced_call()) + { + if (!top->is_disposed()) + { + if (res->can_run_immediately() && !state->has_fresh_data.load()) + continue; + + state->queue.emplace(top->handle_advanced_call(res.value()), std::move(top)); + } + } + break; + } } current_thread::s_queue = nullptr; From fe5092aab8db96f6bfb283264bfd60a27bd9780d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 22:16:28 +0300 Subject: [PATCH 08/15] compile fix --- src/rpp/rpp/schedulers/details/queue.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/schedulers/details/queue.hpp b/src/rpp/rpp/schedulers/details/queue.hpp index f31ae89ad..8df4339bf 100644 --- a/src/rpp/rpp/schedulers/details/queue.hpp +++ b/src/rpp/rpp/schedulers/details/queue.hpp @@ -45,7 +45,7 @@ namespace rpp::schedulers::details { public: advanced_call(std::variant data) - : m_data{std::move(data)} + : m_data{data} { } From eac37829480b8ff9bc0ee3006a4a114ec9a47dc6 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 22:17:05 +0300 Subject: [PATCH 09/15] fix --- src/rpp/rpp/schedulers/current_thread.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index 270c8beda..5c32c5129 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -97,7 +97,7 @@ namespace rpp::schedulers }; - static void drain_queue() + static void drain_queue() noexcept { while (s_queue && !s_queue->is_empty()) { From f393e784a12f813f7b234aae2bc213952d12c2fe Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 22:41:49 +0300 Subject: [PATCH 10/15] handle current_thread --- src/rpp/rpp/schedulers/current_thread.hpp | 37 ++++++++++++++--------- src/rpp/rpp/schedulers/details/queue.hpp | 1 + 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index 5c32c5129..1e7fc18ab 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -105,22 +105,29 @@ namespace rpp::schedulers if (top->is_disposed()) continue; - std::optional timepoint{top->get_timepoint()}; - // immediate like scheduling - do + details::sleep_until(top->get_timepoint()); + + while (true) { - if (timepoint && !top->is_disposed()) - details::sleep_until(top->get_timepoint()); - - if (top->is_disposed()) - timepoint.reset(); - else - timepoint = (*top)(); - - } while (s_queue->is_empty() && timepoint.has_value()); - - if (timepoint.has_value()) - s_queue->emplace(timepoint.value(), std::move(top)); + if (const auto res = top->make_advanced_call()) + { + if (!top->is_disposed()) + { + if (s_queue->is_empty()) + { + if (const auto d = std::get_if(&res->get())) { + std::this_thread::sleep_for(d->value); + } else { + details::sleep_until(top->handle_advanced_call(res.value())); + } + continue; + } + + s_queue->emplace(top->handle_advanced_call(res.value()), std::move(top)); + } + } + break; + } } s_queue = nullptr; diff --git a/src/rpp/rpp/schedulers/details/queue.hpp b/src/rpp/rpp/schedulers/details/queue.hpp index 8df4339bf..482ad797d 100644 --- a/src/rpp/rpp/schedulers/details/queue.hpp +++ b/src/rpp/rpp/schedulers/details/queue.hpp @@ -49,6 +49,7 @@ namespace rpp::schedulers::details { } + const std::variant& get() const { return m_data; } auto visit(const auto& fn) const { From 4e11c7acd96b6aa0f40da09caaee8b99401fd6e3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 18 Apr 2024 19:41:03 +0000 Subject: [PATCH 11/15] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/schedulers/current_thread.hpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index 1e7fc18ab..6ac994884 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -106,7 +106,7 @@ namespace rpp::schedulers continue; details::sleep_until(top->get_timepoint()); - + while (true) { if (const auto res = top->make_advanced_call()) @@ -115,9 +115,12 @@ namespace rpp::schedulers { if (s_queue->is_empty()) { - if (const auto d = std::get_if(&res->get())) { + if (const auto d = std::get_if(&res->get())) + { std::this_thread::sleep_for(d->value); - } else { + } + else + { details::sleep_until(top->handle_advanced_call(res.value())); } continue; From e472a643e2c0cb8129ee314a65cb87f9df3efea6 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 23:22:56 +0300 Subject: [PATCH 12/15] add benchmarks --- src/benchmarks/benchmarks.cpp | 38 +++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index 23024edab..755797bf0 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -266,6 +266,44 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) }); }); } + SECTION("new_thread scheduler create worker + schedule") + { + TEST_RPP([&]() { + rpp::schedulers::new_thread::create_worker().schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); return rpp::schedulers::optional_delay_from_now{}; }, rpp::make_lambda_observer([](int) {})); + }); + TEST_RXCPP([&]() { + rxcpp::observe_on_new_thread().create_coordinator().get_worker().schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } + + SECTION("new_thread scheduler create worker + schedule + recursive schedule") + { + TEST_RPP( + [&]() { + const auto worker = rpp::schedulers::new_thread::create_worker(); + worker.schedule( + [&worker](auto&& v) { + worker.schedule( + [](const auto& v) { + ankerl::nanobench::doNotOptimizeAway(v); + return rpp::schedulers::optional_delay_from_now{}; + }, + std::move(v)); + return rpp::schedulers::optional_delay_from_now{}; + }, + rpp::make_lambda_observer([](int) {})); + }); + TEST_RXCPP( + [&]() { + const auto worker = rxcpp::observe_on_new_thread() + .create_coordinator() + .get_worker(); + + worker.schedule([&worker](const auto&) { + worker.schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + }); + } } // BENCHMARK("Schedulers") BENCHMARK("Combining Operators") From aa8296f1783bc3e1c7c57528eadce66dde673753 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 18 Apr 2024 23:24:35 +0300 Subject: [PATCH 13/15] adapt --- src/rpp/rpp/schedulers/details/queue.hpp | 25 ++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/rpp/rpp/schedulers/details/queue.hpp b/src/rpp/rpp/schedulers/details/queue.hpp index 482ad797d..a4afc7397 100644 --- a/src/rpp/rpp/schedulers/details/queue.hpp +++ b/src/rpp/rpp/schedulers/details/queue.hpp @@ -89,16 +89,8 @@ namespace rpp::schedulers::details next->set_next(std::move(m_next)); m_next = std::move(next); } - - private: - std::shared_ptr m_next{}; - time_point m_time_point; - }; - - template - requires constraint::schedulable_fn - class specific_schedulable final : public schedulable_base - { + protected: + template auto get_advanced_call_handler() const { return rpp::utils::overloaded{ @@ -113,6 +105,15 @@ namespace rpp::schedulers::details }}; } + private: + std::shared_ptr m_next{}; + time_point m_time_point; + }; + + template + requires constraint::schedulable_fn + class specific_schedulable final : public schedulable_base + { public: template TFn, typename... TArgs> explicit specific_schedulable(const time_point& time_point, TFn&& in_fn, TArgs&&... in_args) @@ -127,7 +128,7 @@ namespace rpp::schedulers::details try { if (const auto res = m_args.apply(m_fn)) - return get_advanced_call_handler()(res.value()); + return get_advanced_call_handler()(res.value()); } catch (...) { @@ -152,7 +153,7 @@ namespace rpp::schedulers::details time_point handle_advanced_call(const advanced_call& v) noexcept override { - return v.visit(get_advanced_call_handler()); + return v.visit(get_advanced_call_handler()); } bool is_disposed() const noexcept override { return m_args.template get<0>().is_disposed(); } From 4adea1994dca6d1c74b6a98cb3c98cb06642b640 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 18 Apr 2024 20:23:50 +0000 Subject: [PATCH 14/15] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/schedulers/details/queue.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rpp/rpp/schedulers/details/queue.hpp b/src/rpp/rpp/schedulers/details/queue.hpp index a4afc7397..26f424430 100644 --- a/src/rpp/rpp/schedulers/details/queue.hpp +++ b/src/rpp/rpp/schedulers/details/queue.hpp @@ -89,6 +89,7 @@ namespace rpp::schedulers::details next->set_next(std::move(m_next)); m_next = std::move(next); } + protected: template auto get_advanced_call_handler() const From 3fefdf0690254d0661168b99a4a431bf26c37c3d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 19 Apr 2024 11:40:32 +0300 Subject: [PATCH 15/15] Update benchmarks.cpp --- src/benchmarks/benchmarks.cpp | 38 ----------------------------------- 1 file changed, 38 deletions(-) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index 755797bf0..23024edab 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -266,44 +266,6 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) }); }); } - SECTION("new_thread scheduler create worker + schedule") - { - TEST_RPP([&]() { - rpp::schedulers::new_thread::create_worker().schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); return rpp::schedulers::optional_delay_from_now{}; }, rpp::make_lambda_observer([](int) {})); - }); - TEST_RXCPP([&]() { - rxcpp::observe_on_new_thread().create_coordinator().get_worker().schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); }); - }); - } - - SECTION("new_thread scheduler create worker + schedule + recursive schedule") - { - TEST_RPP( - [&]() { - const auto worker = rpp::schedulers::new_thread::create_worker(); - worker.schedule( - [&worker](auto&& v) { - worker.schedule( - [](const auto& v) { - ankerl::nanobench::doNotOptimizeAway(v); - return rpp::schedulers::optional_delay_from_now{}; - }, - std::move(v)); - return rpp::schedulers::optional_delay_from_now{}; - }, - rpp::make_lambda_observer([](int) {})); - }); - TEST_RXCPP( - [&]() { - const auto worker = rxcpp::observe_on_new_thread() - .create_coordinator() - .get_worker(); - - worker.schedule([&worker](const auto&) { - worker.schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); }); - }); - }); - } } // BENCHMARK("Schedulers") BENCHMARK("Combining Operators")