diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index 89d16a1da..d96a6f08f 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -62,7 +62,7 @@ ### Filtering - [x] filter - [x] take -- [ ] debounce +- [x] debounce - [ ] distinct - [ ] distinct - [x] distinct_until_changed diff --git a/src/examples/rpp/doxygen/debounce.cpp b/src/examples/rpp/doxygen/debounce.cpp new file mode 100644 index 000000000..56cc93454 --- /dev/null +++ b/src/examples/rpp/doxygen/debounce.cpp @@ -0,0 +1,42 @@ +#include + +#include + +/** + * \example debounce.cpp + **/ +int main() +{ + { + //! [debounce] + auto start = rpp::schedulers::clock_type::now(); + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 5, 6, 9, 10) + .flat_map([](int v) + { + return rpp::source::just(v) + .delay(std::chrono::milliseconds(500) * v, rpp::schedulers::current_thread{}); + }) + .tap([&](int v) + { + std::cout << "Sent value " << v << " at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; + }) + .debounce(std::chrono::milliseconds{700}, rpp::schedulers::current_thread{}) + .subscribe([](int v) { std::cout << "new value " << v << std::endl; }, + []() { std::cout << "completed" << std::endl; }); + + + // Output: + // Sent value 1 at 504 + // Sent value 2 at 1009 + // new value 2 + // Sent value 5 at 2505 + // Sent value 6 at 3010 + // new value 6 + // Sent value 9 at 4507 + // Sent value 10 at 5007 + // new value 10 + // completed + //! [debounce] + } + return 0; +} diff --git a/src/rpp/rpp/observables/interface_observable.hpp b/src/rpp/rpp/observables/interface_observable.hpp index ed276c578..1cebbeb72 100644 --- a/src/rpp/rpp/observables/interface_observable.hpp +++ b/src/rpp/rpp/observables/interface_observable.hpp @@ -39,6 +39,7 @@ struct RPP_EMPTY_BASES interface_observable , details::member_overload , details::member_overload , details::member_overload + , details::member_overload , details::member_overload , details::member_overload , details::member_overload diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 1febf5363..da700d740 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -38,6 +38,7 @@ * \ingroup operators */ +#include #include #include #include diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp new file mode 100644 index 000000000..27190ce48 --- /dev/null +++ b/src/rpp/rpp/operators/debounce.hpp @@ -0,0 +1,162 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include // required due to operator uses lift +#include +#include +#include +#include +#include + +#include + +#include +#include + +IMPLEMENTATION_FILE(debounce_tag); + +namespace rpp::details +{ +template +class debounce_state : public early_unsubscribe_state +{ +public: + debounce_state(schedulers::duration period, const Scheduler& scheduler, const composite_subscription& subscription_of_subscriber) + : early_unsubscribe_state(subscription_of_subscriber) + , m_period{period} + , m_worker{scheduler.create_worker(children_subscriptions)} {} + + std::optional emplace_safe(auto&& v) + { + std::lock_guard lock{m_mutex}; + m_value_to_be_emitted.emplace(std::forward(v)); + const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value(); + m_time_when_value_should_be_emitted = m_worker.now() + m_period; + return need_to_scheduled ? m_time_when_value_should_be_emitted : std::optional{}; + } + + std::variant extract_value_or_time() + { + std::lock_guard lock{m_mutex}; + if (!m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value()) + return std::monostate{}; + + const auto now = m_worker.now(); + if (m_time_when_value_should_be_emitted > now) + return m_time_when_value_should_be_emitted.value() - now; + + m_time_when_value_should_be_emitted.reset(); + auto v = std::move(m_value_to_be_emitted).value(); + m_value_to_be_emitted.reset(); + return v; + } + + std::optional extract_value() + { + std::lock_guard lock{m_mutex}; + std::optional res{}; + m_value_to_be_emitted.swap(res); + return res; + } + + using Worker = decltype(std::declval().create_worker(std::declval())); + const Worker& get_worker() const { return m_worker; } + +private: + schedulers::duration m_period; + Worker m_worker; + std::mutex m_mutex{}; + std::optional m_time_when_value_should_be_emitted{}; + std::optional m_value_to_be_emitted{}; +}; + +struct debounce_on_next +{ + template + void operator()(Value&& v, const auto& state_ptr) const + { + if (const auto time_to_schedule = state_ptr->emplace_safe(std::forward(v))) + { + state_ptr->get_worker().schedule(time_to_schedule.value(), + [state_ptr]() mutable -> schedulers::optional_duration + { + auto value_or_duration = state_ptr->extract_value_or_time(); + if (auto* duration = std::get_if(&value_or_duration)) + return *duration; + + if (auto* value = std::get_if>(&value_or_duration)) + state_ptr->subscriber.on_next(std::move(*value)); + + return std::nullopt; + }); + } + } +}; + +struct debounce_on_error +{ + void operator()(const std::exception_ptr& err, const auto& state) const + { + state->children_subscriptions.unsubscribe(); + state->subscriber.on_error(err); + } +}; + +struct debounce_on_completed +{ + void operator()(const auto& state_ptr) const + { + state_ptr->children_subscriptions.unsubscribe(); + + if (auto v = state_ptr->extract_value()) + state_ptr->subscriber.on_next(std::move(v.value())); + + state_ptr->subscriber.on_completed(); + } +}; + +template +struct debounce_state_with_serialized_spinlock : debounce_state +{ + debounce_state_with_serialized_spinlock(auto&& sub, + schedulers::duration period, + const Scheduler& scheduler, + const composite_subscription& subscription_of_subscriber) + : debounce_state{std::move(period), scheduler, subscription_of_subscriber} + , subscriber(make_serialized_subscriber(std::forward(sub), std::ref(spinlock))) {} + + // spinlock because most part of time there is only one thread would be active + utils::spinlock spinlock{}; + + using InnerSub = decltype(make_serialized_subscriber(std::declval(), std::declval>())); + InnerSub subscriber; +}; + +template +struct debounce_impl +{ + schedulers::duration period; + TScheduler scheduler; + + template TSub> + auto operator()(TSub&& in_subscriber) const + { + auto state = std::make_shared>>(std::forward(in_subscriber), period, scheduler, in_subscriber.get_subscription()); + + return create_subscriber_with_state(state->children_subscriptions, + debounce_on_next{}, + debounce_on_error{}, + debounce_on_completed{}, + std::move(state)); + } +}; +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/details/serialized_subscriber.hpp b/src/rpp/rpp/operators/details/serialized_subscriber.hpp index 966d9a488..76d01c6f5 100644 --- a/src/rpp/rpp/operators/details/serialized_subscriber.hpp +++ b/src/rpp/rpp/operators/details/serialized_subscriber.hpp @@ -18,34 +18,44 @@ namespace rpp::details { +template +auto lock(const std::shared_ptr& ptr) +{ + return std::lock_guard{*ptr}; +} + +template +auto lock(const std::reference_wrapper& ref) +{ + return std::lock_guard{ref.get()}; +} + struct forwarding_on_next_under_lock { - template - void operator()(T&& v, const auto& subscriber, const std::shared_ptr& primitive) const + template + void operator()(T&& v, const auto& subscriber, const auto& primitive) const { - std::lock_guard lock{*primitive}; + auto lock_guard = lock(primitive); subscriber.on_next(std::forward(v)); } }; struct forwarding_on_error_under_lock { - template - void operator()(const std::exception_ptr& err, - const auto& subscriber, - const std::shared_ptr& primitive) const + void operator()(const std::exception_ptr& err, + const auto& subscriber, + const auto& primitive) const { - std::lock_guard lock{*primitive}; + auto lock_guard = lock(primitive); subscriber.on_error(err); } }; struct forwarding_on_completed_under_lock { - template - void operator()(const auto& subscriber, const std::shared_ptr& primitive) const + void operator()(const auto& subscriber, const auto& primitive) const { - std::lock_guard lock{*primitive}; + auto lock_guard = lock(primitive); subscriber.on_completed(); } }; @@ -62,4 +72,17 @@ auto make_serialized_subscriber(TSub&& std::forward(subscriber), primitive); } + +template +auto make_serialized_subscriber(TSub&& subscriber, + std::reference_wrapper primitive) +{ + auto sub = subscriber.get_subscription(); + return create_subscriber_with_state>>(std::move(sub), + forwarding_on_next_under_lock{}, + forwarding_on_error_under_lock{}, + forwarding_on_completed_under_lock{}, + std::forward(subscriber), + primitive); +} } // namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 8fdf700a3..0c72e24bb 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/fwd/debounce.hpp b/src/rpp/rpp/operators/fwd/debounce.hpp new file mode 100644 index 000000000..0b86f6247 --- /dev/null +++ b/src/rpp/rpp/operators/fwd/debounce.hpp @@ -0,0 +1,60 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include + +namespace rpp::details +{ +struct debounce_tag; +} + +namespace rpp::details +{ +template +struct debounce_impl; + +template +struct member_overload +{ + /** + * \brief Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset. + * + * \marble debounce + { + source observable : +--1-2-----3---| + operator "debounce(4)" : +--------2-----3| + } + * \param period is duration of time should be passed since emission from original observable without any new emissions to emit this emission. + * \param scheduler is scheduler used to run timer for debounce + * \return new specific_observable with the debounce operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet debounce.cpp debounce + * + * \ingroup utility_operators + * \see https://reactivex.io/documentation/operators/debounce.html + */ + template + auto debounce(schedulers::duration period,const TScheduler& scheduler = TScheduler{}) const & requires is_header_included + { + return static_cast(this)->template lift(debounce_impl{period, scheduler}); + } + + template + auto debounce(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) && requires is_header_included + { + return std::move(*static_cast(this)).template lift(debounce_impl{period, scheduler}); + } +}; +} // namespace rpp::details diff --git a/src/tests/rpp/test_debounce.cpp b/src/tests/rpp/test_debounce.cpp new file mode 100644 index 000000000..8ff4c3033 --- /dev/null +++ b/src/tests/rpp/test_debounce.cpp @@ -0,0 +1,115 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#include +#include +#include +#include +#include + +SCENARIO("debounce emit only items where timeout reached", "[operators][debounce]") +{ + auto debounce_delay = std::chrono::seconds{2}; + test_scheduler scheduler{}; + auto start = s_current_time; + + GIVEN("subject of items and subscriber subscribed on it via debounce") + { + auto mock = mock_observer{}; + std::optional> optional_subj{rpp::subjects::publish_subject{}}; + auto& subj = optional_subj.value(); + subj.get_observable().debounce(debounce_delay, scheduler).subscribe(mock); + WHEN("emit value") + { + subj.get_subscriber().on_next(1); + THEN("delay scheduled action to track period") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay}); + CHECK(scheduler.get_executions().empty()); + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + AND_WHEN("scheduler reached delayed time") + { + scheduler.time_advance(debounce_delay); + THEN("emission reached mock") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay}); + CHECK(scheduler.get_executions() == std::vector{start+debounce_delay}); + CHECK(mock.get_received_values() == std::vector{1}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + AND_WHEN("emit on completed") + { + subj.get_subscriber().on_completed(); + THEN("emission reached mock with on completed without schedulable exection") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay}); + CHECK(scheduler.get_executions().empty()); + CHECK(mock.get_received_values() == std::vector{1}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + AND_WHEN("new value emitted before scheduler reached requested time") + { + scheduler.time_advance(debounce_delay / 2); + subj.get_subscriber().on_next(2); + THEN("nothing changed immediately") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay}); + CHECK(scheduler.get_executions().empty()); + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + AND_WHEN("scheduler reached originally requested time") + { + scheduler.time_advance(debounce_delay / 2); + THEN("delay re-schedule schedulable to new delay timepoint") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay, start+debounce_delay/2+debounce_delay}); + CHECK(scheduler.get_executions() == std::vector{start+debounce_delay}); + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + AND_WHEN("scheduler reached delayed time") + { + scheduler.time_advance(debounce_delay/2); + THEN("emission reached mock") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay, start+debounce_delay/2+debounce_delay}); + CHECK(scheduler.get_executions() == std::vector{start+debounce_delay, start+debounce_delay/2+debounce_delay}); + CHECK(mock.get_received_values() == std::vector{2}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + } + } + AND_WHEN("subject destoryed and then schedulable reaches schedulable") + { + optional_subj.reset(); + scheduler.time_advance(debounce_delay); + THEN("emission reached mock") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay}); + CHECK(scheduler.get_executions() == std::vector{start+debounce_delay}); + CHECK(mock.get_received_values() == std::vector{1}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + } + } +} diff --git a/src/tests/rpp/test_delay.cpp b/src/tests/rpp/test_delay.cpp index bdd78ea7c..779da8cf5 100644 --- a/src/tests/rpp/test_delay.cpp +++ b/src/tests/rpp/test_delay.cpp @@ -9,6 +9,7 @@ // Project home: https://github.com/victimsnino/ReactivePlusPlus #include "mock_observer.hpp" +#include #include #include @@ -148,5 +149,18 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay] } } } + WHEN("subscribe on subject via delay via test_scheduler, sent value") + { + subj.get_observable() + .delay(std::chrono::seconds{30000}, test_scheduler{}) + .subscribe(mock); + + subj.get_subscriber().on_next(1); + + AND_THEN("no memory leak") + { + // checked via sanitizer + } + } } } diff --git a/src/tests/utils/test_scheduler.hpp b/src/tests/utils/test_scheduler.hpp index 6f9abb0c9..3b96bcd81 100644 --- a/src/tests/utils/test_scheduler.hpp +++ b/src/tests/utils/test_scheduler.hpp @@ -58,23 +58,26 @@ class test_scheduler final : public rpp::schedulers::details::scheduler_tag class worker_strategy { public: - worker_strategy(std::shared_ptr state) + worker_strategy(std::weak_ptr state) : m_state{state} { } void defer_at(rpp::schedulers::time_point time_point, rpp::schedulers::constraint::schedulable_fn auto&& fn) const { - if (m_state->sub.is_subscribed()) + if (auto locked = m_state.lock()) { - m_state->schedule(time_point, std::forward(fn)); - m_state->drain(); + if (locked->sub.is_subscribed()) + { + locked->schedule(time_point, std::forward(fn)); + locked->drain(); + } } } static rpp::schedulers::time_point now() { return s_current_time; } private: - std::shared_ptr m_state; + std::weak_ptr m_state; }; test_scheduler() {}