From e4b23f6caa3290cd7a7a4bcc689424082d9b9b08 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 10 Dec 2022 23:36:59 +0300 Subject: [PATCH 1/7] Initial implementation for debounce --- src/examples/rpp/doxygen/debounce.cpp | 0 .../rpp/observables/interface_observable.hpp | 1 + src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/debounce.hpp | 147 ++++++++++++++++++ src/rpp/rpp/operators/fwd.hpp | 1 + src/rpp/rpp/operators/fwd/debounce.hpp | 60 +++++++ src/tests/rpp/test_debounce.cpp | 89 +++++++++++ 7 files changed, 299 insertions(+) create mode 100644 src/examples/rpp/doxygen/debounce.cpp create mode 100644 src/rpp/rpp/operators/debounce.hpp create mode 100644 src/rpp/rpp/operators/fwd/debounce.hpp create mode 100644 src/tests/rpp/test_debounce.cpp diff --git a/src/examples/rpp/doxygen/debounce.cpp b/src/examples/rpp/doxygen/debounce.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/src/rpp/rpp/observables/interface_observable.hpp b/src/rpp/rpp/observables/interface_observable.hpp index ed276c578..1cebbeb72 100644 --- a/src/rpp/rpp/observables/interface_observable.hpp +++ b/src/rpp/rpp/observables/interface_observable.hpp @@ -39,6 +39,7 @@ struct RPP_EMPTY_BASES interface_observable , details::member_overload , details::member_overload , details::member_overload + , details::member_overload , details::member_overload , details::member_overload , details::member_overload diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 1febf5363..da700d740 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -38,6 +38,7 @@ * \ingroup operators */ +#include #include #include #include diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp new file mode 100644 index 000000000..70b30daf4 --- /dev/null +++ b/src/rpp/rpp/operators/debounce.hpp @@ -0,0 +1,147 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include // required due to operator uses lift +#include +#include +#include +#include +#include + +#include + +#include +#include + +IMPLEMENTATION_FILE(debounce_tag); + +namespace rpp::details +{ +template +class debounce_state : early_unsubscribe_state +{ +public: + debounce_state(schedulers::duration period, const Scheduler& scheduler, const composite_subscription& subscription_of_subscriber) + : early_unsubscribe_state(subscription_of_subscriber) + , m_period{period} + , m_worker{scheduler.create_worker(children_subscriptions)} {} + + std::optional emplace_safe(auto&& v) + { + std::lock_guard lock{m_mutex}; + m_value_to_be_emitted.emplace(std::forward(v)); + const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value(); + m_time_when_value_should_be_emitted = m_worker.now() + m_period; + return need_to_scheduled ? m_time_when_value_should_be_emitted : std::optional{}; + } + + std::variant extract_value_or_time(schedulers::time_point expected_time) + { + std::lock_guard lock{m_mutex}; + if (!m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value()) + return std::monostate{}; + + if (m_time_when_value_should_be_emitted != expected_time) + return m_time_when_value_should_be_emitted.value() - Worker::now(); + + m_time_when_value_should_be_emitted.reset(); + auto v = std::move(m_value_to_be_emitted).value(); + m_value_to_be_emitted.reset(); + return v; + } + + std::optional extract_value() + { + std::lock_guard lock{m_mutex}; + std::optional res{}; + m_value_to_be_emitted.swap(res); + return res; + } + +private: + using Worker = std::decay_t>; + + schedulers::duration m_period; + Worker m_worker; + std::mutex m_mutex{}; + std::optional m_time_when_value_should_be_emitted{}; + std::optional m_value_to_be_emitted{}; +}; + +struct debounce_on_next +{ + template + void operator()(Value&& v, const auto& subscriber, const auto& state_ptr) const + { + if (const auto time_to_schedule = state_ptr->emplace_safe(std::forward(v))) + { + state_ptr->worker.schedule(time_to_schedule.value(), + [expected_time = time_to_schedule.value(), =]() mutable -> rpp::schedulers::optional_duration + { + auto value_or_duration = state_ptr->extract_value_or_time(expected_time); + if (auto* duration = std::get_if(&value_or_duration)) + return *duration; + + if (auto* value = std::get_if>(&value_or_duration)) + subscriber.on_next(std::move(*value)); + return {}; + }); + } + } +}; + +using debounce_on_error = early_unsubscribe_on_error; + +struct debounce_on_completed +{ + void operator()(const auto& subscriber, const auto& state_ptr) const + { + state_ptr->children_subscriptions.unsubscribe(); + + if (auto v = state_ptr->extract_value()) + subscriber.on_next(std::move(v.value())); + + subscriber.on_completed(); + } +}; + +template +struct debounce_state_with_serialized_spinlock : debounce_state +{ + using debounce_state::debounce_state; + + // spinlock because most part of time there is only one thread would be active + utils::spinlock spinlock{}; +}; + +template +struct debounce_impl +{ + schedulers::duration period; + TScheduler scheduler; + + template TSub> + auto operator()(TSub&& in_subscriber) const + { + auto state = std::make_shared>(period, scheduler, in_subscriber.get_subscription()); + // change subscriber to serialized to avoid manual using of mutex + auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->spinlock}); + + return create_subscriber_with_state(state->children_subscriptions, + debounce_on_next{}, + debounce_on_error{}, + debounce_on_completed{}, + std::move(subscriber), + std::move(state)); + } +}; +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 8fdf700a3..0c72e24bb 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/fwd/debounce.hpp b/src/rpp/rpp/operators/fwd/debounce.hpp new file mode 100644 index 000000000..b6031f578 --- /dev/null +++ b/src/rpp/rpp/operators/fwd/debounce.hpp @@ -0,0 +1,60 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include + +namespace rpp::details +{ +struct debounce_tag; +} + +namespace rpp::details +{ +template +struct debounce_impl; + +template +struct member_overload +{ + /** + * \brief Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset. + * + * \marble debounce + { + source observable : +--1-2-----3-4--| + operator "debounce(4)" : +--------2------4| + } + * \param period is duration of time should be passed since emission from original observable without any new emissions to emit this emission. + * \param scheduler is scheduler used to run timer for debounce + * \return new specific_observable with the debounce operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet debounce.cpp debounce + * + * \ingroup utility_operators + * \see https://reactivex.io/documentation/operators/debounce.html + */ + template + auto debounce(schedulers::duration period,const TScheduler& scheduler = TScheduler{}) const & requires is_header_included + { + return static_cast(this)->template lift(debounce_impl{period, scheduler}); + } + + template + auto debounce(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) && requires is_header_included + { + return std::move(*static_cast(this)).template lift(debounce_impl{period, scheduler}); + } +}; +} // namespace rpp::details diff --git a/src/tests/rpp/test_debounce.cpp b/src/tests/rpp/test_debounce.cpp new file mode 100644 index 000000000..3972ca919 --- /dev/null +++ b/src/tests/rpp/test_debounce.cpp @@ -0,0 +1,89 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#include +#include +#include +#include +#include + +SCENARIO("debounce emit only items where timeout reached", "[operators][debounce]") +{ + auto debounce_delay = std::chrono::seconds{2}; + test_scheduler scheduler{}; + auto start = s_current_time; + + GIVEN("subject of items and subscriber subscribed on it via debounce") + { + auto mock = mock_observer{}; + auto subj = rpp::subjects::publish_subject{}; + subj.get_observable().debounce(debounce_delay, scheduler).subscribe(mock); + WHEN("emit value") + { + subj.get_subscriber().on_next(1); + THEN("delay scheduled action to track period") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay}); + CHECK(scheduler.get_executions().empty()); + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + AND_WHEN("scheduler reached delayed time") + { + scheduler.time_advance(debounce_delay); + THEN("emission reached mock") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay}); + CHECK(scheduler.get_executions() == == std::vector{start+debounce_delay}); + CHECK(mock.get_received_values() == std::vector{1}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + AND_WHEN("emit on completed") + { + subj.get_subscriber().on_completed(); + THEN("emission reached mock with on completed without schedulable exection") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay}); + CHECK(scheduler.get_executions().empty()); + CHECK(mock.get_received_values() == std::vector{1}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + AND_WHEN("new value emitted before scheduler reached requested time") + { + scheduler.time_advance(debounce_delay / 2); + subj.get_subscriber().on_next(2); + THEN("nothing changed immediately") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay}); + CHECK(scheduler.get_executions().empty()); + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + AND_WHEN("scheduler reached originally requested time") + { + scheduler.time_advance(debounce_delay / 2); + THEN("delay re-schedule schedulable to new delay timepoint") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay, start+debounce_delay/2+debounce_delay}); + CHECK(scheduler.get_executions().empty()); + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + } + } + } +} From 8c8e0b2ec64c370690b54cb118112fbde200f063 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 11 Dec 2022 13:22:53 +0300 Subject: [PATCH 2/7] Fixes + examples --- src/examples/rpp/doxygen/debounce.cpp | 42 ++++++++++++++++++++++++++ src/rpp/rpp/operators/debounce.hpp | 37 ++++++++++++----------- src/rpp/rpp/operators/fwd/debounce.hpp | 4 +-- src/tests/rpp/test_debounce.cpp | 16 ++++++++-- 4 files changed, 78 insertions(+), 21 deletions(-) diff --git a/src/examples/rpp/doxygen/debounce.cpp b/src/examples/rpp/doxygen/debounce.cpp index e69de29bb..56cc93454 100644 --- a/src/examples/rpp/doxygen/debounce.cpp +++ b/src/examples/rpp/doxygen/debounce.cpp @@ -0,0 +1,42 @@ +#include + +#include + +/** + * \example debounce.cpp + **/ +int main() +{ + { + //! [debounce] + auto start = rpp::schedulers::clock_type::now(); + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 5, 6, 9, 10) + .flat_map([](int v) + { + return rpp::source::just(v) + .delay(std::chrono::milliseconds(500) * v, rpp::schedulers::current_thread{}); + }) + .tap([&](int v) + { + std::cout << "Sent value " << v << " at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; + }) + .debounce(std::chrono::milliseconds{700}, rpp::schedulers::current_thread{}) + .subscribe([](int v) { std::cout << "new value " << v << std::endl; }, + []() { std::cout << "completed" << std::endl; }); + + + // Output: + // Sent value 1 at 504 + // Sent value 2 at 1009 + // new value 2 + // Sent value 5 at 2505 + // Sent value 6 at 3010 + // new value 6 + // Sent value 9 at 4507 + // Sent value 10 at 5007 + // new value 10 + // completed + //! [debounce] + } + return 0; +} diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 70b30daf4..19638a6e6 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -19,7 +19,7 @@ #include -#include +#include #include IMPLEMENTATION_FILE(debounce_tag); @@ -27,7 +27,7 @@ IMPLEMENTATION_FILE(debounce_tag); namespace rpp::details { template -class debounce_state : early_unsubscribe_state +class debounce_state : public early_unsubscribe_state { public: debounce_state(schedulers::duration period, const Scheduler& scheduler, const composite_subscription& subscription_of_subscriber) @@ -44,14 +44,15 @@ class debounce_state : early_unsubscribe_state return need_to_scheduled ? m_time_when_value_should_be_emitted : std::optional{}; } - std::variant extract_value_or_time(schedulers::time_point expected_time) + std::variant extract_value_or_time() { std::lock_guard lock{m_mutex}; if (!m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value()) return std::monostate{}; - if (m_time_when_value_should_be_emitted != expected_time) - return m_time_when_value_should_be_emitted.value() - Worker::now(); + const auto now = m_worker.now(); + if (m_time_when_value_should_be_emitted > now) + return m_time_when_value_should_be_emitted.value() - now; m_time_when_value_should_be_emitted.reset(); auto v = std::move(m_value_to_be_emitted).value(); @@ -67,8 +68,10 @@ class debounce_state : early_unsubscribe_state return res; } + using Worker = decltype(std::declval().create_worker(std::declval())); + const Worker& get_worker() const {return m_worker;} + private: - using Worker = std::decay_t>; schedulers::duration m_period; Worker m_worker; @@ -84,17 +87,17 @@ struct debounce_on_next { if (const auto time_to_schedule = state_ptr->emplace_safe(std::forward(v))) { - state_ptr->worker.schedule(time_to_schedule.value(), - [expected_time = time_to_schedule.value(), =]() mutable -> rpp::schedulers::optional_duration - { - auto value_or_duration = state_ptr->extract_value_or_time(expected_time); - if (auto* duration = std::get_if(&value_or_duration)) - return *duration; - - if (auto* value = std::get_if>(&value_or_duration)) - subscriber.on_next(std::move(*value)); - return {}; - }); + state_ptr->get_worker().schedule(time_to_schedule.value(), + [=]() mutable -> schedulers::optional_duration + { + auto value_or_duration = state_ptr->extract_value_or_time(); + if (auto* duration = std::get_if(&value_or_duration)) + return *duration; + + if (auto* value = std::get_if>(&value_or_duration)) + subscriber.on_next(std::move(*value)); + return {}; + }); } } }; diff --git a/src/rpp/rpp/operators/fwd/debounce.hpp b/src/rpp/rpp/operators/fwd/debounce.hpp index b6031f578..0b86f6247 100644 --- a/src/rpp/rpp/operators/fwd/debounce.hpp +++ b/src/rpp/rpp/operators/fwd/debounce.hpp @@ -31,8 +31,8 @@ struct member_overload * * \marble debounce { - source observable : +--1-2-----3-4--| - operator "debounce(4)" : +--------2------4| + source observable : +--1-2-----3---| + operator "debounce(4)" : +--------2-----3| } * \param period is duration of time should be passed since emission from original observable without any new emissions to emit this emission. * \param scheduler is scheduler used to run timer for debounce diff --git a/src/tests/rpp/test_debounce.cpp b/src/tests/rpp/test_debounce.cpp index 3972ca919..515afcb1b 100644 --- a/src/tests/rpp/test_debounce.cpp +++ b/src/tests/rpp/test_debounce.cpp @@ -41,7 +41,7 @@ SCENARIO("debounce emit only items where timeout reached", "[operators][debounce THEN("emission reached mock") { CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay}); - CHECK(scheduler.get_executions() == == std::vector{start+debounce_delay}); + CHECK(scheduler.get_executions() == std::vector{start+debounce_delay}); CHECK(mock.get_received_values() == std::vector{1}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); @@ -77,11 +77,23 @@ SCENARIO("debounce emit only items where timeout reached", "[operators][debounce THEN("delay re-schedule schedulable to new delay timepoint") { CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay, start+debounce_delay/2+debounce_delay}); - CHECK(scheduler.get_executions().empty()); + CHECK(scheduler.get_executions() == std::vector{start+debounce_delay}); CHECK(mock.get_total_on_next_count() == 0); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); } + AND_WHEN("scheduler reached delayed time") + { + scheduler.time_advance(debounce_delay/2); + THEN("emission reached mock") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay, start+debounce_delay/2+debounce_delay}); + CHECK(scheduler.get_executions() == std::vector{start+debounce_delay, start+debounce_delay/2+debounce_delay}); + CHECK(mock.get_received_values() == std::vector{2}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } } } } From cebd71bc434a35b58bc6d43ca172198a06370409 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 11 Dec 2022 13:23:06 +0300 Subject: [PATCH 3/7] status --- docs/Implementation Status.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index 89d16a1da..d96a6f08f 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -62,7 +62,7 @@ ### Filtering - [x] filter - [x] take -- [ ] debounce +- [x] debounce - [ ] distinct - [ ] distinct - [x] distinct_until_changed From c4cc0d5965335d5257a46dab4ff13817c7a9b7f7 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 11 Dec 2022 19:56:41 +0300 Subject: [PATCH 4/7] compile fix --- src/rpp/rpp/operators/debounce.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 19638a6e6..b559193dc 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -96,7 +96,8 @@ struct debounce_on_next if (auto* value = std::get_if>(&value_or_duration)) subscriber.on_next(std::move(*value)); - return {}; + + return std::nullopt; }); } } From 1a18b60dd0e83267d58bb11a2eb821aed403cbdb Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 11 Dec 2022 20:10:17 +0300 Subject: [PATCH 5/7] fix sanitizer --- src/rpp/rpp/operators/debounce.hpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index b559193dc..013b0d8c4 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -88,9 +88,13 @@ struct debounce_on_next if (const auto time_to_schedule = state_ptr->emplace_safe(std::forward(v))) { state_ptr->get_worker().schedule(time_to_schedule.value(), - [=]() mutable -> schedulers::optional_duration + [subscriber, weak_state=std::weak_ptr{state_ptr}]() mutable -> schedulers::optional_duration { - auto value_or_duration = state_ptr->extract_value_or_time(); + auto locked_state = weak_state.lock(); + if (!locked_state) + return std::nullopt; + + auto value_or_duration = locked_state->extract_value_or_time(); if (auto* duration = std::get_if(&value_or_duration)) return *duration; From 8d1cd4189f11312b61ebde5915e497214a5ef54c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 11 Dec 2022 20:56:09 +0300 Subject: [PATCH 6/7] update --- src/rpp/rpp/operators/debounce.hpp | 51 +++++++++++-------- .../details/serialized_subscriber.hpp | 45 ++++++++++++---- 2 files changed, 63 insertions(+), 33 deletions(-) diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 013b0d8c4..27190ce48 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -39,7 +39,7 @@ class debounce_state : public early_unsubscribe_state { std::lock_guard lock{m_mutex}; m_value_to_be_emitted.emplace(std::forward(v)); - const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value(); + const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value(); m_time_when_value_should_be_emitted = m_worker.now() + m_period; return need_to_scheduled ? m_time_when_value_should_be_emitted : std::optional{}; } @@ -62,17 +62,16 @@ class debounce_state : public early_unsubscribe_state std::optional extract_value() { - std::lock_guard lock{m_mutex}; + std::lock_guard lock{m_mutex}; std::optional res{}; m_value_to_be_emitted.swap(res); return res; } using Worker = decltype(std::declval().create_worker(std::declval())); - const Worker& get_worker() const {return m_worker;} + const Worker& get_worker() const { return m_worker; } private: - schedulers::duration m_period; Worker m_worker; std::mutex m_mutex{}; @@ -83,23 +82,19 @@ class debounce_state : public early_unsubscribe_state struct debounce_on_next { template - void operator()(Value&& v, const auto& subscriber, const auto& state_ptr) const + void operator()(Value&& v, const auto& state_ptr) const { if (const auto time_to_schedule = state_ptr->emplace_safe(std::forward(v))) { state_ptr->get_worker().schedule(time_to_schedule.value(), - [subscriber, weak_state=std::weak_ptr{state_ptr}]() mutable -> schedulers::optional_duration + [state_ptr]() mutable -> schedulers::optional_duration { - auto locked_state = weak_state.lock(); - if (!locked_state) - return std::nullopt; - - auto value_or_duration = locked_state->extract_value_or_time(); + auto value_or_duration = state_ptr->extract_value_or_time(); if (auto* duration = std::get_if(&value_or_duration)) return *duration; if (auto* value = std::get_if>(&value_or_duration)) - subscriber.on_next(std::move(*value)); + state_ptr->subscriber.on_next(std::move(*value)); return std::nullopt; }); @@ -107,28 +102,43 @@ struct debounce_on_next } }; -using debounce_on_error = early_unsubscribe_on_error; +struct debounce_on_error +{ + void operator()(const std::exception_ptr& err, const auto& state) const + { + state->children_subscriptions.unsubscribe(); + state->subscriber.on_error(err); + } +}; struct debounce_on_completed { - void operator()(const auto& subscriber, const auto& state_ptr) const + void operator()(const auto& state_ptr) const { state_ptr->children_subscriptions.unsubscribe(); if (auto v = state_ptr->extract_value()) - subscriber.on_next(std::move(v.value())); + state_ptr->subscriber.on_next(std::move(v.value())); - subscriber.on_completed(); + state_ptr->subscriber.on_completed(); } }; -template +template struct debounce_state_with_serialized_spinlock : debounce_state { - using debounce_state::debounce_state; + debounce_state_with_serialized_spinlock(auto&& sub, + schedulers::duration period, + const Scheduler& scheduler, + const composite_subscription& subscription_of_subscriber) + : debounce_state{std::move(period), scheduler, subscription_of_subscriber} + , subscriber(make_serialized_subscriber(std::forward(sub), std::ref(spinlock))) {} // spinlock because most part of time there is only one thread would be active utils::spinlock spinlock{}; + + using InnerSub = decltype(make_serialized_subscriber(std::declval(), std::declval>())); + InnerSub subscriber; }; template @@ -140,15 +150,12 @@ struct debounce_impl template TSub> auto operator()(TSub&& in_subscriber) const { - auto state = std::make_shared>(period, scheduler, in_subscriber.get_subscription()); - // change subscriber to serialized to avoid manual using of mutex - auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->spinlock}); + auto state = std::make_shared>>(std::forward(in_subscriber), period, scheduler, in_subscriber.get_subscription()); return create_subscriber_with_state(state->children_subscriptions, debounce_on_next{}, debounce_on_error{}, debounce_on_completed{}, - std::move(subscriber), std::move(state)); } }; diff --git a/src/rpp/rpp/operators/details/serialized_subscriber.hpp b/src/rpp/rpp/operators/details/serialized_subscriber.hpp index 966d9a488..76d01c6f5 100644 --- a/src/rpp/rpp/operators/details/serialized_subscriber.hpp +++ b/src/rpp/rpp/operators/details/serialized_subscriber.hpp @@ -18,34 +18,44 @@ namespace rpp::details { +template +auto lock(const std::shared_ptr& ptr) +{ + return std::lock_guard{*ptr}; +} + +template +auto lock(const std::reference_wrapper& ref) +{ + return std::lock_guard{ref.get()}; +} + struct forwarding_on_next_under_lock { - template - void operator()(T&& v, const auto& subscriber, const std::shared_ptr& primitive) const + template + void operator()(T&& v, const auto& subscriber, const auto& primitive) const { - std::lock_guard lock{*primitive}; + auto lock_guard = lock(primitive); subscriber.on_next(std::forward(v)); } }; struct forwarding_on_error_under_lock { - template - void operator()(const std::exception_ptr& err, - const auto& subscriber, - const std::shared_ptr& primitive) const + void operator()(const std::exception_ptr& err, + const auto& subscriber, + const auto& primitive) const { - std::lock_guard lock{*primitive}; + auto lock_guard = lock(primitive); subscriber.on_error(err); } }; struct forwarding_on_completed_under_lock { - template - void operator()(const auto& subscriber, const std::shared_ptr& primitive) const + void operator()(const auto& subscriber, const auto& primitive) const { - std::lock_guard lock{*primitive}; + auto lock_guard = lock(primitive); subscriber.on_completed(); } }; @@ -62,4 +72,17 @@ auto make_serialized_subscriber(TSub&& std::forward(subscriber), primitive); } + +template +auto make_serialized_subscriber(TSub&& subscriber, + std::reference_wrapper primitive) +{ + auto sub = subscriber.get_subscription(); + return create_subscriber_with_state>>(std::move(sub), + forwarding_on_next_under_lock{}, + forwarding_on_error_under_lock{}, + forwarding_on_completed_under_lock{}, + std::forward(subscriber), + primitive); +} } // namespace rpp::details From 5371122941186c547c72d7ed9d0b6611ebf11b14 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 11 Dec 2022 21:58:50 +0300 Subject: [PATCH 7/7] Fix --- src/tests/rpp/test_debounce.cpp | 16 +++++++++++++++- src/tests/rpp/test_delay.cpp | 14 ++++++++++++++ src/tests/utils/test_scheduler.hpp | 13 ++++++++----- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/src/tests/rpp/test_debounce.cpp b/src/tests/rpp/test_debounce.cpp index 515afcb1b..8ff4c3033 100644 --- a/src/tests/rpp/test_debounce.cpp +++ b/src/tests/rpp/test_debounce.cpp @@ -22,7 +22,8 @@ SCENARIO("debounce emit only items where timeout reached", "[operators][debounce GIVEN("subject of items and subscriber subscribed on it via debounce") { auto mock = mock_observer{}; - auto subj = rpp::subjects::publish_subject{}; + std::optional> optional_subj{rpp::subjects::publish_subject{}}; + auto& subj = optional_subj.value(); subj.get_observable().debounce(debounce_delay, scheduler).subscribe(mock); WHEN("emit value") { @@ -96,6 +97,19 @@ SCENARIO("debounce emit only items where timeout reached", "[operators][debounce } } } + AND_WHEN("subject destoryed and then schedulable reaches schedulable") + { + optional_subj.reset(); + scheduler.time_advance(debounce_delay); + THEN("emission reached mock") + { + CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay}); + CHECK(scheduler.get_executions() == std::vector{start+debounce_delay}); + CHECK(mock.get_received_values() == std::vector{1}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } } } } diff --git a/src/tests/rpp/test_delay.cpp b/src/tests/rpp/test_delay.cpp index bdd78ea7c..779da8cf5 100644 --- a/src/tests/rpp/test_delay.cpp +++ b/src/tests/rpp/test_delay.cpp @@ -9,6 +9,7 @@ // Project home: https://github.com/victimsnino/ReactivePlusPlus #include "mock_observer.hpp" +#include #include #include @@ -148,5 +149,18 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay] } } } + WHEN("subscribe on subject via delay via test_scheduler, sent value") + { + subj.get_observable() + .delay(std::chrono::seconds{30000}, test_scheduler{}) + .subscribe(mock); + + subj.get_subscriber().on_next(1); + + AND_THEN("no memory leak") + { + // checked via sanitizer + } + } } } diff --git a/src/tests/utils/test_scheduler.hpp b/src/tests/utils/test_scheduler.hpp index 6f9abb0c9..3b96bcd81 100644 --- a/src/tests/utils/test_scheduler.hpp +++ b/src/tests/utils/test_scheduler.hpp @@ -58,23 +58,26 @@ class test_scheduler final : public rpp::schedulers::details::scheduler_tag class worker_strategy { public: - worker_strategy(std::shared_ptr state) + worker_strategy(std::weak_ptr state) : m_state{state} { } void defer_at(rpp::schedulers::time_point time_point, rpp::schedulers::constraint::schedulable_fn auto&& fn) const { - if (m_state->sub.is_subscribed()) + if (auto locked = m_state.lock()) { - m_state->schedule(time_point, std::forward(fn)); - m_state->drain(); + if (locked->sub.is_subscribed()) + { + locked->schedule(time_point, std::forward(fn)); + locked->drain(); + } } } static rpp::schedulers::time_point now() { return s_current_time; } private: - std::shared_ptr m_state; + std::weak_ptr m_state; }; test_scheduler() {}