From 931e84292bb4dfa3c0c3884a321e072a93c53161 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 11 Oct 2024 23:17:12 +0300 Subject: [PATCH 1/3] merge and debounce --- .../disposables/details/base_disposable.hpp | 2 +- src/rpp/rpp/operators/debounce.hpp | 14 +++++++------ src/rpp/rpp/operators/merge.hpp | 2 +- src/tests/rpp/test_debounce.cpp | 20 +++++++++++++++++++ src/tests/rpp/test_merge.cpp | 14 +++++++++++++ 5 files changed, 44 insertions(+), 8 deletions(-) diff --git a/src/rpp/rpp/disposables/details/base_disposable.hpp b/src/rpp/rpp/disposables/details/base_disposable.hpp index 0cb4e3f35..693be0aa4 100644 --- a/src/rpp/rpp/disposables/details/base_disposable.hpp +++ b/src/rpp/rpp/disposables/details/base_disposable.hpp @@ -41,7 +41,7 @@ namespace rpp::details } protected: - virtual void base_dispose_impl(interface_disposable::Mode mode) noexcept = 0; + virtual void base_dispose_impl(interface_disposable::Mode ) noexcept {}; private: std::atomic_bool m_disposed{}; diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 24c1a9d34..a8bc4fe7e 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -25,13 +25,13 @@ namespace rpp::operators::details { std::shared_ptr> state{}; - bool is_disposed() const { return state->get_observer_under_lock()->is_disposed(); } + bool is_disposed() const { return state->is_disposed(); } void on_error(const std::exception_ptr& err) const { state->get_observer_under_lock()->on_error(err); } }; template - class debounce_state final : public std::enable_shared_from_this> + class debounce_state final : public rpp::details::enable_wrapper_from_this>, public rpp::details::base_disposable { using T = rpp::utils::extract_observer_type_t; @@ -79,7 +79,7 @@ namespace rpp::operators::details return std::nullopt; }, - debounce_state_wrapper{this->shared_from_this()}); + debounce_state_wrapper{this->wrapper_from_this().lock()}); } std::variant extract_value_or_time() @@ -120,7 +120,7 @@ namespace rpp::operators::details bool is_disposed() const { - return state->get_observer_under_lock()->is_disposed(); + return state->is_disposed(); } template @@ -153,7 +153,7 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = Prev; + using updated_disposable_strategy = typename Prev::template add<1>; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; @@ -163,7 +163,9 @@ namespace rpp::operators::details { using worker_t = rpp::schedulers::utils::get_worker_t; - auto ptr = std::make_shared, worker_t>>(std::forward(observer), scheduler.create_worker(), duration); + auto d = rpp::disposable_wrapper_impl, worker_t>>::make(std::forward(observer), scheduler.create_worker(), duration); + auto ptr = d.lock(); + ptr->get_observer_under_lock()->set_upstream(d.as_weak()); return rpp::observer, worker_t>>{std::move(ptr)}; } }; diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 791a65f84..660b5acd1 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -69,7 +69,7 @@ namespace rpp::operators::details bool is_disposed() const { - return m_state->get_observer_under_lock()->is_disposed(); + return m_state->get_disposable().is_disposed(); } void on_error(const std::exception_ptr& err) const diff --git a/src/tests/rpp/test_debounce.cpp b/src/tests/rpp/test_debounce.cpp index c3ad94788..d34340e93 100644 --- a/src/tests/rpp/test_debounce.cpp +++ b/src/tests/rpp/test_debounce.cpp @@ -120,6 +120,26 @@ TEST_CASE("debounce emit only items where timeout reached") } } + +TEST_CASE("debounce is not deadlocking is_disposed") +{ + std::optional> observer{}; + + rpp::schedulers::test_scheduler scheduler{}; + + rpp::source::create([&observer](auto&& obs){ + observer = std::forward(obs).as_dynamic(); + observer->on_next(1); + }) + | rpp::operators::debounce(std::chrono::seconds{1}, scheduler) + | rpp::ops::subscribe([&observer](int){ + CHECK(observer); + CHECK(!observer->is_disposed()); + }); + scheduler.time_advance(std::chrono::seconds{1}); +} + + TEST_CASE("debounce forwards error") { auto mock = mock_observer_strategy{}; diff --git a/src/tests/rpp/test_merge.cpp b/src/tests/rpp/test_merge.cpp index 1df94ad63..b37430162 100644 --- a/src/tests/rpp/test_merge.cpp +++ b/src/tests/rpp/test_merge.cpp @@ -258,6 +258,20 @@ TEST_CASE("merge dispose inner_disposable immediately") | rpp::ops::subscribe([](int) {}); } +TEST_CASE("merge is not deadlocking is_disposed") +{ + std::optional> observer{}; + rpp::source::create([&observer](auto&& obs){ + observer = std::forward(obs).as_dynamic(); + observer->on_next(1); + }) + | rpp::ops::merge_with(rpp::source::never()) + | rpp::ops::subscribe([&observer](int){ + CHECK(observer); + CHECK(!observer->is_disposed()); + }); +} + TEST_CASE("merge doesn't produce extra copies") { SUBCASE("send value by copy") From b712b55a108993abf5bcc4d9e2e47746f73cb469 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 11 Oct 2024 23:21:15 +0300 Subject: [PATCH 2/3] fix rest --- .../rpp/operators/details/combining_strategy.hpp | 9 ++++++--- src/tests/rpp/test_combine_latest.cpp | 15 +++++++++++++++ src/tests/rpp/test_zip.cpp | 15 +++++++++++++++ 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 4aa03b752..01bc39ac8 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -22,7 +22,7 @@ namespace rpp::operators::details { template - class combining_state + class combining_state : public rpp::details::enable_wrapper_from_this>, public rpp::details::base_disposable { public: explicit combining_state(Observer&& observer, size_t on_completed_needed) @@ -57,7 +57,7 @@ namespace rpp::operators::details bool is_disposed() const { - return state->get_observer_under_lock()->is_disposed(); + return state->is_disposed(); } void on_error(const std::exception_ptr& err) const @@ -103,7 +103,10 @@ namespace rpp::operators::details { using State = TState...>; - const auto state = std::make_shared(std::forward(observer), selector); + const auto d = rpp::disposable_wrapper_impl::make(std::forward(observer), selector); + auto state = d.lock(); + state->get_observer_under_lock()->set_upstream(d.as_weak()); + subscribe>(state, std::index_sequence_for{}, observables...); return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(state)}; diff --git a/src/tests/rpp/test_combine_latest.cpp b/src/tests/rpp/test_combine_latest.cpp index 6ab894dce..d8c849f83 100644 --- a/src/tests/rpp/test_combine_latest.cpp +++ b/src/tests/rpp/test_combine_latest.cpp @@ -138,6 +138,21 @@ TEST_CASE("combine_latest handles race condition") } } +TEST_CASE("combine_latest is not deadlocking is_disposed") +{ + std::optional> observer{}; + + rpp::source::create([&observer](auto&& obs){ + observer = std::forward(obs).as_dynamic(); + observer->on_next(1); + }) + | rpp::operators::combine_latest(rpp::source::just(1)) + | rpp::ops::subscribe([&observer](auto){ + CHECK(observer); + CHECK(!observer->is_disposed()); + }); +} + TEST_CASE("combine_latest satisfies disposable contracts") { auto observable_disposable = rpp::composite_disposable_wrapper::make(); diff --git a/src/tests/rpp/test_zip.cpp b/src/tests/rpp/test_zip.cpp index 269f324d0..82543fb8f 100644 --- a/src/tests/rpp/test_zip.cpp +++ b/src/tests/rpp/test_zip.cpp @@ -175,6 +175,21 @@ TEST_CASE("zip doesn't produce extra copies") } } +TEST_CASE("zip is not deadlocking is_disposed") +{ + std::optional> observer{}; + + rpp::source::create([&observer](auto&& obs){ + observer = std::forward(obs).as_dynamic(); + observer->on_next(1); + }) + | rpp::operators::zip(rpp::source::just(1)) + | rpp::ops::subscribe([&observer](auto){ + CHECK(observer); + CHECK(!observer->is_disposed()); + }); +} + TEST_CASE("zip satisfies disposable contracts") { auto observable_disposable = rpp::composite_disposable_wrapper::make(); From 1862c5608f60a84a200f2ff6bc0b86d7a8b2161b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 11 Oct 2024 20:21:08 +0000 Subject: [PATCH 3/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/disposables/details/base_disposable.hpp | 2 +- src/rpp/rpp/operators/debounce.hpp | 5 +++-- src/rpp/rpp/operators/details/combining_strategy.hpp | 7 ++++--- src/tests/rpp/test_combine_latest.cpp | 12 ++++++------ src/tests/rpp/test_debounce.cpp | 12 ++++++------ src/tests/rpp/test_merge.cpp | 12 ++++++------ src/tests/rpp/test_zip.cpp | 12 ++++++------ 7 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/rpp/rpp/disposables/details/base_disposable.hpp b/src/rpp/rpp/disposables/details/base_disposable.hpp index 693be0aa4..6d09755aa 100644 --- a/src/rpp/rpp/disposables/details/base_disposable.hpp +++ b/src/rpp/rpp/disposables/details/base_disposable.hpp @@ -41,7 +41,7 @@ namespace rpp::details } protected: - virtual void base_dispose_impl(interface_disposable::Mode ) noexcept {}; + virtual void base_dispose_impl(interface_disposable::Mode) noexcept {}; private: std::atomic_bool m_disposed{}; diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index a8bc4fe7e..8af013f03 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -31,7 +31,8 @@ namespace rpp::operators::details }; template - class debounce_state final : public rpp::details::enable_wrapper_from_this>, public rpp::details::base_disposable + class debounce_state final : public rpp::details::enable_wrapper_from_this> + , public rpp::details::base_disposable { using T = rpp::utils::extract_observer_type_t; @@ -163,7 +164,7 @@ namespace rpp::operators::details { using worker_t = rpp::schedulers::utils::get_worker_t; - auto d = rpp::disposable_wrapper_impl, worker_t>>::make(std::forward(observer), scheduler.create_worker(), duration); + auto d = rpp::disposable_wrapper_impl, worker_t>>::make(std::forward(observer), scheduler.create_worker(), duration); auto ptr = d.lock(); ptr->get_observer_under_lock()->set_upstream(d.as_weak()); return rpp::observer, worker_t>>{std::move(ptr)}; diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 01bc39ac8..b07327b92 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -22,7 +22,8 @@ namespace rpp::operators::details { template - class combining_state : public rpp::details::enable_wrapper_from_this>, public rpp::details::base_disposable + class combining_state : public rpp::details::enable_wrapper_from_this> + , public rpp::details::base_disposable { public: explicit combining_state(Observer&& observer, size_t on_completed_needed) @@ -103,8 +104,8 @@ namespace rpp::operators::details { using State = TState...>; - const auto d = rpp::disposable_wrapper_impl::make(std::forward(observer), selector); - auto state = d.lock(); + const auto d = rpp::disposable_wrapper_impl::make(std::forward(observer), selector); + auto state = d.lock(); state->get_observer_under_lock()->set_upstream(d.as_weak()); subscribe>(state, std::index_sequence_for{}, observables...); diff --git a/src/tests/rpp/test_combine_latest.cpp b/src/tests/rpp/test_combine_latest.cpp index d8c849f83..f9358c960 100644 --- a/src/tests/rpp/test_combine_latest.cpp +++ b/src/tests/rpp/test_combine_latest.cpp @@ -142,15 +142,15 @@ TEST_CASE("combine_latest is not deadlocking is_disposed") { std::optional> observer{}; - rpp::source::create([&observer](auto&& obs){ + rpp::source::create([&observer](auto&& obs) { observer = std::forward(obs).as_dynamic(); observer->on_next(1); }) - | rpp::operators::combine_latest(rpp::source::just(1)) - | rpp::ops::subscribe([&observer](auto){ - CHECK(observer); - CHECK(!observer->is_disposed()); - }); + | rpp::operators::combine_latest(rpp::source::just(1)) + | rpp::ops::subscribe([&observer](auto) { + CHECK(observer); + CHECK(!observer->is_disposed()); + }); } TEST_CASE("combine_latest satisfies disposable contracts") diff --git a/src/tests/rpp/test_debounce.cpp b/src/tests/rpp/test_debounce.cpp index d34340e93..28d49b1bc 100644 --- a/src/tests/rpp/test_debounce.cpp +++ b/src/tests/rpp/test_debounce.cpp @@ -127,15 +127,15 @@ TEST_CASE("debounce is not deadlocking is_disposed") rpp::schedulers::test_scheduler scheduler{}; - rpp::source::create([&observer](auto&& obs){ + rpp::source::create([&observer](auto&& obs) { observer = std::forward(obs).as_dynamic(); observer->on_next(1); }) - | rpp::operators::debounce(std::chrono::seconds{1}, scheduler) - | rpp::ops::subscribe([&observer](int){ - CHECK(observer); - CHECK(!observer->is_disposed()); - }); + | rpp::operators::debounce(std::chrono::seconds{1}, scheduler) + | rpp::ops::subscribe([&observer](int) { + CHECK(observer); + CHECK(!observer->is_disposed()); + }); scheduler.time_advance(std::chrono::seconds{1}); } diff --git a/src/tests/rpp/test_merge.cpp b/src/tests/rpp/test_merge.cpp index b37430162..8c982300c 100644 --- a/src/tests/rpp/test_merge.cpp +++ b/src/tests/rpp/test_merge.cpp @@ -261,15 +261,15 @@ TEST_CASE("merge dispose inner_disposable immediately") TEST_CASE("merge is not deadlocking is_disposed") { std::optional> observer{}; - rpp::source::create([&observer](auto&& obs){ + rpp::source::create([&observer](auto&& obs) { observer = std::forward(obs).as_dynamic(); observer->on_next(1); }) - | rpp::ops::merge_with(rpp::source::never()) - | rpp::ops::subscribe([&observer](int){ - CHECK(observer); - CHECK(!observer->is_disposed()); - }); + | rpp::ops::merge_with(rpp::source::never()) + | rpp::ops::subscribe([&observer](int) { + CHECK(observer); + CHECK(!observer->is_disposed()); + }); } TEST_CASE("merge doesn't produce extra copies") diff --git a/src/tests/rpp/test_zip.cpp b/src/tests/rpp/test_zip.cpp index 82543fb8f..6e084da80 100644 --- a/src/tests/rpp/test_zip.cpp +++ b/src/tests/rpp/test_zip.cpp @@ -179,15 +179,15 @@ TEST_CASE("zip is not deadlocking is_disposed") { std::optional> observer{}; - rpp::source::create([&observer](auto&& obs){ + rpp::source::create([&observer](auto&& obs) { observer = std::forward(obs).as_dynamic(); observer->on_next(1); }) - | rpp::operators::zip(rpp::source::just(1)) - | rpp::ops::subscribe([&observer](auto){ - CHECK(observer); - CHECK(!observer->is_disposed()); - }); + | rpp::operators::zip(rpp::source::just(1)) + | rpp::ops::subscribe([&observer](auto) { + CHECK(observer); + CHECK(!observer->is_disposed()); + }); } TEST_CASE("zip satisfies disposable contracts")