From 95447f9c6667f6417759f62c3e4a0200aea6151d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 18 Feb 2024 23:01:38 +0300 Subject: [PATCH 1/4] conditionally use defer_for/defer_to --- old_build/_deps/snitch-src | 1 + src/rpp/rpp/schedulers/current_thread.hpp | 13 +++++++++++++ src/rpp/rpp/schedulers/details/worker.hpp | 15 ++++++++++++++- src/rpp/rpp/schedulers/fwd.hpp | 19 ++++++++++++++++++- src/rpp/rpp/schedulers/new_thread.hpp | 6 +++--- src/rpp/rpp/schedulers/run_loop.hpp | 4 ++-- src/rpp/rpp/sources/interval.hpp | 9 +-------- 7 files changed, 52 insertions(+), 15 deletions(-) create mode 160000 old_build/_deps/snitch-src diff --git a/old_build/_deps/snitch-src b/old_build/_deps/snitch-src new file mode 160000 index 000000000..2f6230890 --- /dev/null +++ b/old_build/_deps/snitch-src @@ -0,0 +1 @@ +Subproject commit 2f623089022ba09d4bcaf21772684b94a160a1e0 diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index b1f2b2e0f..24626bbc1 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -162,6 +162,19 @@ namespace rpp::schedulers drain_queue(queue); } + template Fn> + static void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) + { + if (handler.is_disposed()) + return; + + if (s_queue.has_value()) { + s_queue->emplace(tp, std::forward(fn), std::forward(handler), std::forward(args)...); + } else { + defer_for(tp - now(), std::forward(fn), std::forward(handler), std::forward(args)...); + } + } + static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } static rpp::schedulers::time_point now() { return details::now(); } diff --git a/src/rpp/rpp/schedulers/details/worker.hpp b/src/rpp/rpp/schedulers/details/worker.hpp index 09e5eba14..4ceffece5 100644 --- a/src/rpp/rpp/schedulers/details/worker.hpp +++ b/src/rpp/rpp/schedulers/details/worker.hpp @@ -41,7 +41,20 @@ namespace rpp::schedulers template Fn> void schedule(const duration delay, Fn&& fn, Handler&& handler, Args&&... args) const { - m_strategy.defer_for(delay, std::forward(fn), std::forward(handler), std::forward(args)...); + if constexpr (constraint::defer_for_strategy) + m_strategy.defer_for(delay, std::forward(fn), std::forward(handler), std::forward(args)...); + else + schedule(now() + delay, std::forward(fn), std::forward(handler), std::forward(args)...); + + } + + template Fn> + void schedule(const time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const + { + if constexpr (constraint::defer_to_strategy) + m_strategy.defer_to(tp, std::forward(fn), std::forward(handler), std::forward(args)...); + else + schedule(tp - now(), std::forward(fn), std::forward(handler), std::forward(args)...); } rpp::disposable_wrapper get_disposable() const diff --git a/src/rpp/rpp/schedulers/fwd.hpp b/src/rpp/rpp/schedulers/fwd.hpp index b4405f511..9e13b4814 100644 --- a/src/rpp/rpp/schedulers/fwd.hpp +++ b/src/rpp/rpp/schedulers/fwd.hpp @@ -122,7 +122,7 @@ namespace rpp::schedulers::constraint }; template - concept strategy = requires(const S& s, const details::fake_schedulable_handler& handler) { + concept defer_for_strategy = requires(const S& s, const details::fake_schedulable_handler& handler) { { s.defer_for(duration{}, std::declval(), handler) } -> std::same_as; @@ -132,6 +132,23 @@ namespace rpp::schedulers::constraint { s.defer_for(duration{}, std::declval(), handler) } -> std::same_as; + }; + + template + concept defer_to_strategy = requires(const S& s, const details::fake_schedulable_handler& handler) { + { + s.defer_to(time_point{}, std::declval(), handler) + } -> std::same_as; + { + s.defer_to(time_point{}, std::declval(), handler) + } -> std::same_as; + { + s.defer_to(time_point{}, std::declval(), handler) + } -> std::same_as; + }; + + template + concept strategy = (defer_for_strategy || defer_to_strategy) && requires(const S& s, const details::fake_schedulable_handler& handler) { { s.get_disposable() } -> rpp::constraint::any_of; diff --git a/src/rpp/rpp/schedulers/new_thread.hpp b/src/rpp/rpp/schedulers/new_thread.hpp index ba9c47f71..d8bba7fa3 100644 --- a/src/rpp/rpp/schedulers/new_thread.hpp +++ b/src/rpp/rpp/schedulers/new_thread.hpp @@ -52,7 +52,7 @@ namespace rpp::schedulers } template Fn> - void defer_at(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) + void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) { if (is_disposed()) return; @@ -142,9 +142,9 @@ namespace rpp::schedulers worker_strategy() = default; template Fn> - void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args) const + void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const { - m_state.lock()->defer_at(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); + m_state.lock()->defer_to(tp, std::forward(fn), std::forward(handler), std::forward(args)...); } rpp::disposable_wrapper get_disposable() const { return m_state; } diff --git a/src/rpp/rpp/schedulers/run_loop.hpp b/src/rpp/rpp/schedulers/run_loop.hpp index ca1d88e71..8233800ae 100644 --- a/src/rpp/rpp/schedulers/run_loop.hpp +++ b/src/rpp/rpp/schedulers/run_loop.hpp @@ -114,10 +114,10 @@ namespace rpp::schedulers } template Fn> - void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args) const + void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const { if (const auto shared = m_state.lock()) - shared->emplace_and_notify(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); + shared->emplace_and_notify(tp, std::forward(fn), std::forward(handler), std::forward(args)...); } static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } diff --git a/src/rpp/rpp/sources/interval.hpp b/src/rpp/rpp/sources/interval.hpp index 7a2d72c3f..6c15c582f 100644 --- a/src/rpp/rpp/sources/interval.hpp +++ b/src/rpp/rpp/sources/interval.hpp @@ -45,14 +45,7 @@ namespace rpp::details observer.set_upstream(std::move(d)); } - if constexpr (std::is_same_v) - { - worker.schedule(initial - worker.now(), interval_schedulable{}, std::forward(observer), period, size_t{}); - } - else - { - worker.schedule(initial, interval_schedulable{}, std::forward(observer), period, size_t{}); - } + worker.schedule(initial, interval_schedulable{}, std::forward(observer), period, size_t{}); } }; } // namespace rpp::details From 3b69d43a66f99704f92d35eb16e4017d793c4fa6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 18 Feb 2024 20:02:39 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/schedulers/current_thread.hpp | 7 +++++-- src/rpp/rpp/schedulers/details/worker.hpp | 1 - src/rpp/rpp/schedulers/fwd.hpp | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index 24626bbc1..ccc97170f 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -168,9 +168,12 @@ namespace rpp::schedulers if (handler.is_disposed()) return; - if (s_queue.has_value()) { + if (s_queue.has_value()) + { s_queue->emplace(tp, std::forward(fn), std::forward(handler), std::forward(args)...); - } else { + } + else + { defer_for(tp - now(), std::forward(fn), std::forward(handler), std::forward(args)...); } } diff --git a/src/rpp/rpp/schedulers/details/worker.hpp b/src/rpp/rpp/schedulers/details/worker.hpp index 4ceffece5..e97fa0562 100644 --- a/src/rpp/rpp/schedulers/details/worker.hpp +++ b/src/rpp/rpp/schedulers/details/worker.hpp @@ -45,7 +45,6 @@ namespace rpp::schedulers m_strategy.defer_for(delay, std::forward(fn), std::forward(handler), std::forward(args)...); else schedule(now() + delay, std::forward(fn), std::forward(handler), std::forward(args)...); - } template Fn> diff --git a/src/rpp/rpp/schedulers/fwd.hpp b/src/rpp/rpp/schedulers/fwd.hpp index 9e13b4814..3e8bd5a9a 100644 --- a/src/rpp/rpp/schedulers/fwd.hpp +++ b/src/rpp/rpp/schedulers/fwd.hpp @@ -148,7 +148,7 @@ namespace rpp::schedulers::constraint }; template - concept strategy = (defer_for_strategy || defer_to_strategy) && requires(const S& s, const details::fake_schedulable_handler& handler) { + concept strategy = (defer_for_strategy || defer_to_strategy)&&requires(const S& s, const details::fake_schedulable_handler& handler) { { s.get_disposable() } -> rpp::constraint::any_of; From 6772fc08a9a132a508a0184cb987c86159e3f228 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 19 Feb 2024 00:20:23 +0300 Subject: [PATCH 3/4] use in operators --- src/rpp/rpp/operators/debounce.hpp | 4 ++-- src/rpp/rpp/operators/delay.hpp | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index bf39f8303..91e1e9665 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -55,7 +55,7 @@ namespace rpp::operators::details { std::lock_guard lock{m_mutex}; m_value_to_be_emitted.emplace(std::forward(v)); - const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value(); + const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value(); m_time_when_value_should_be_emitted = m_worker.now() + m_period; if (need_to_scheduled) { @@ -75,7 +75,7 @@ namespace rpp::operators::details void schedule() { m_worker.schedule( - m_period, + m_time_when_value_should_be_emitted, [](const debounce_disposable_wrapper& handler) -> schedulers::optional_delay_to { auto value_or_duration = handler.disposable->extract_value_or_time(); if (auto* timepoint = std::get_if(&value_or_duration)) diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 92613d771..69860f835 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -106,17 +106,17 @@ namespace rpp::operators::details template void emplace(TT&& value) const { - if (const auto delay = emplace_safe(std::forward(value))) + if (const auto tp = emplace_safe(std::forward(value))) { disposable->worker.schedule( - delay.value(), + tp.value(), [](const delay_disposable_wrapper& wrapper) { return drain_queue(wrapper.disposable); }, delay_disposable_wrapper{disposable}); } } template - std::optional emplace_safe(TT&& item) const + std::optional emplace_safe(TT&& item) const { std::lock_guard lock{disposable->mutex}; if constexpr (ClearOnError && rpp::constraint::decayed_same_as) @@ -127,11 +127,12 @@ namespace rpp::operators::details } else { - disposable->queue.emplace(std::forward(item), disposable->worker.now() + disposable->delay); + const auto tp = disposable->worker.now() + disposable->delay; + disposable->queue.emplace(std::forward(item), tp); if (!disposable->is_active) { disposable->is_active = true; - return disposable->delay; + return tp; } return std::nullopt; } From 099fdf134ee3fe907b59a1911f7d057f59097592 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 19 Feb 2024 10:01:22 +0300 Subject: [PATCH 4/4] Update debounce.hpp --- src/rpp/rpp/operators/debounce.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 91e1e9665..13339e901 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -75,7 +75,7 @@ namespace rpp::operators::details void schedule() { m_worker.schedule( - m_time_when_value_should_be_emitted, + m_time_when_value_should_be_emitted.value(), [](const debounce_disposable_wrapper& handler) -> schedulers::optional_delay_to { auto value_or_duration = handler.disposable->extract_value_or_time(); if (auto* timepoint = std::get_if(&value_or_duration)) @@ -211,4 +211,4 @@ namespace rpp::operators { return details::debounce_t>{period, std::forward(scheduler)}; } -} // namespace rpp::operators \ No newline at end of file +} // namespace rpp::operators