diff --git a/old_build/_deps/snitch-src b/old_build/_deps/snitch-src new file mode 160000 index 000000000..2f6230890 --- /dev/null +++ b/old_build/_deps/snitch-src @@ -0,0 +1 @@ +Subproject commit 2f623089022ba09d4bcaf21772684b94a160a1e0 diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index bf39f8303..13339e901 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -55,7 +55,7 @@ namespace rpp::operators::details { std::lock_guard lock{m_mutex}; m_value_to_be_emitted.emplace(std::forward(v)); - const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value(); + const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value(); m_time_when_value_should_be_emitted = m_worker.now() + m_period; if (need_to_scheduled) { @@ -75,7 +75,7 @@ namespace rpp::operators::details void schedule() { m_worker.schedule( - m_period, + m_time_when_value_should_be_emitted.value(), [](const debounce_disposable_wrapper& handler) -> schedulers::optional_delay_to { auto value_or_duration = handler.disposable->extract_value_or_time(); if (auto* timepoint = std::get_if(&value_or_duration)) @@ -211,4 +211,4 @@ namespace rpp::operators { return details::debounce_t>{period, std::forward(scheduler)}; } -} // namespace rpp::operators \ No newline at end of file +} // namespace rpp::operators diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 92613d771..69860f835 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -106,17 +106,17 @@ namespace rpp::operators::details template void emplace(TT&& value) const { - if (const auto delay = emplace_safe(std::forward(value))) + if (const auto tp = emplace_safe(std::forward(value))) { disposable->worker.schedule( - delay.value(), + tp.value(), [](const delay_disposable_wrapper& wrapper) { return drain_queue(wrapper.disposable); }, delay_disposable_wrapper{disposable}); } } template - std::optional emplace_safe(TT&& item) const + std::optional emplace_safe(TT&& item) const { std::lock_guard lock{disposable->mutex}; if constexpr (ClearOnError && rpp::constraint::decayed_same_as) @@ -127,11 +127,12 @@ namespace rpp::operators::details } else { - disposable->queue.emplace(std::forward(item), disposable->worker.now() + disposable->delay); + const auto tp = disposable->worker.now() + disposable->delay; + disposable->queue.emplace(std::forward(item), tp); if (!disposable->is_active) { disposable->is_active = true; - return disposable->delay; + return tp; } return std::nullopt; } diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index b1f2b2e0f..ccc97170f 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -162,6 +162,22 @@ namespace rpp::schedulers drain_queue(queue); } + template Fn> + static void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) + { + if (handler.is_disposed()) + return; + + if (s_queue.has_value()) + { + s_queue->emplace(tp, std::forward(fn), std::forward(handler), std::forward(args)...); + } + else + { + defer_for(tp - now(), std::forward(fn), std::forward(handler), std::forward(args)...); + } + } + static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } static rpp::schedulers::time_point now() { return details::now(); } diff --git a/src/rpp/rpp/schedulers/details/worker.hpp b/src/rpp/rpp/schedulers/details/worker.hpp index 09e5eba14..e97fa0562 100644 --- a/src/rpp/rpp/schedulers/details/worker.hpp +++ b/src/rpp/rpp/schedulers/details/worker.hpp @@ -41,7 +41,19 @@ namespace rpp::schedulers template Fn> void schedule(const duration delay, Fn&& fn, Handler&& handler, Args&&... args) const { - m_strategy.defer_for(delay, std::forward(fn), std::forward(handler), std::forward(args)...); + if constexpr (constraint::defer_for_strategy) + m_strategy.defer_for(delay, std::forward(fn), std::forward(handler), std::forward(args)...); + else + schedule(now() + delay, std::forward(fn), std::forward(handler), std::forward(args)...); + } + + template Fn> + void schedule(const time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const + { + if constexpr (constraint::defer_to_strategy) + m_strategy.defer_to(tp, std::forward(fn), std::forward(handler), std::forward(args)...); + else + schedule(tp - now(), std::forward(fn), std::forward(handler), std::forward(args)...); } rpp::disposable_wrapper get_disposable() const diff --git a/src/rpp/rpp/schedulers/fwd.hpp b/src/rpp/rpp/schedulers/fwd.hpp index b4405f511..3e8bd5a9a 100644 --- a/src/rpp/rpp/schedulers/fwd.hpp +++ b/src/rpp/rpp/schedulers/fwd.hpp @@ -122,7 +122,7 @@ namespace rpp::schedulers::constraint }; template - concept strategy = requires(const S& s, const details::fake_schedulable_handler& handler) { + concept defer_for_strategy = requires(const S& s, const details::fake_schedulable_handler& handler) { { s.defer_for(duration{}, std::declval(), handler) } -> std::same_as; @@ -132,6 +132,23 @@ namespace rpp::schedulers::constraint { s.defer_for(duration{}, std::declval(), handler) } -> std::same_as; + }; + + template + concept defer_to_strategy = requires(const S& s, const details::fake_schedulable_handler& handler) { + { + s.defer_to(time_point{}, std::declval(), handler) + } -> std::same_as; + { + s.defer_to(time_point{}, std::declval(), handler) + } -> std::same_as; + { + s.defer_to(time_point{}, std::declval(), handler) + } -> std::same_as; + }; + + template + concept strategy = (defer_for_strategy || defer_to_strategy)&&requires(const S& s, const details::fake_schedulable_handler& handler) { { s.get_disposable() } -> rpp::constraint::any_of; diff --git a/src/rpp/rpp/schedulers/new_thread.hpp b/src/rpp/rpp/schedulers/new_thread.hpp index ba9c47f71..d8bba7fa3 100644 --- a/src/rpp/rpp/schedulers/new_thread.hpp +++ b/src/rpp/rpp/schedulers/new_thread.hpp @@ -52,7 +52,7 @@ namespace rpp::schedulers } template Fn> - void defer_at(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) + void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) { if (is_disposed()) return; @@ -142,9 +142,9 @@ namespace rpp::schedulers worker_strategy() = default; template Fn> - void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args) const + void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const { - m_state.lock()->defer_at(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); + m_state.lock()->defer_to(tp, std::forward(fn), std::forward(handler), std::forward(args)...); } rpp::disposable_wrapper get_disposable() const { return m_state; } diff --git a/src/rpp/rpp/schedulers/run_loop.hpp b/src/rpp/rpp/schedulers/run_loop.hpp index ca1d88e71..8233800ae 100644 --- a/src/rpp/rpp/schedulers/run_loop.hpp +++ b/src/rpp/rpp/schedulers/run_loop.hpp @@ -114,10 +114,10 @@ namespace rpp::schedulers } template Fn> - void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args) const + void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const { if (const auto shared = m_state.lock()) - shared->emplace_and_notify(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); + shared->emplace_and_notify(tp, std::forward(fn), std::forward(handler), std::forward(args)...); } static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } diff --git a/src/rpp/rpp/sources/interval.hpp b/src/rpp/rpp/sources/interval.hpp index 7a2d72c3f..6c15c582f 100644 --- a/src/rpp/rpp/sources/interval.hpp +++ b/src/rpp/rpp/sources/interval.hpp @@ -45,14 +45,7 @@ namespace rpp::details observer.set_upstream(std::move(d)); } - if constexpr (std::is_same_v) - { - worker.schedule(initial - worker.now(), interval_schedulable{}, std::forward(observer), period, size_t{}); - } - else - { - worker.schedule(initial, interval_schedulable{}, std::forward(observer), period, size_t{}); - } + worker.schedule(initial, interval_schedulable{}, std::forward(observer), period, size_t{}); } }; } // namespace rpp::details