From f3bfba318f863f6ecb8e2f2475c711168a07f303 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 14 Oct 2024 14:36:44 +0300 Subject: [PATCH 1/8] Fix early disposing inside delay --- src/rpp/rpp/operators/delay.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 612df5c35..2b8436b0e 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -68,6 +68,8 @@ namespace rpp::operators::details template struct delay_observer_strategy { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const From 9000d5ecfb9f389aa8f2d8e1cedb4b6de5330868 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 14 Oct 2024 22:49:21 +0300 Subject: [PATCH 2/8] tests --- src/tests/rpp/test_delay.cpp | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/tests/rpp/test_delay.cpp b/src/tests/rpp/test_delay.cpp index cafbbab05..1f71157c5 100644 --- a/src/tests/rpp/test_delay.cpp +++ b/src/tests/rpp/test_delay.cpp @@ -21,6 +21,7 @@ #include #include "disposable_observable.hpp" +#include "rpp_trompeloil.hpp" namespace { @@ -232,9 +233,31 @@ TEST_CASE("delay delays observable's emissions") } } +TEST_CASE("delay is not disposing early") +{ + mock_observer mock{}; + trompeloeil::sequence s{}; + + rpp::schedulers::test_scheduler scheduler{}; + + std::optional d{}; + rpp::source::create([&d](auto&& obs) { + d = rpp::composite_disposable_wrapper::make(); + obs.set_upstream(d.value()); + obs.on_completed(); + }) + | rpp::ops::delay(std::chrono::seconds{1}, scheduler) + | rpp::ops::subscribe(mock); + + CHECK(!d->is_disposed()); + REQUIRE_CALL(*mock, on_completed()).LR_WITH(!d->is_disposed()).IN_SEQUENCE(s); + scheduler.time_advance(std::chrono::seconds{1}); + CHECK(d->is_disposed()); +} + TEST_CASE("delay satisfies disposable contracts") { - test_operator_with_disposable(rpp::ops::delay(std::chrono::seconds{0}, manual_scheduler{})); + test_operator_with_disposable(rpp::ops::delay(std::chrono::seconds{0}, rpp::schedulers::immediate{})); } TEST_CASE("observe_on forward error immediately") From 51d39ebbc7cc1e2ccdce5d6e5e863f194680db3b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 14 Oct 2024 19:49:01 +0000 Subject: [PATCH 3/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tests/rpp/test_delay.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tests/rpp/test_delay.cpp b/src/tests/rpp/test_delay.cpp index 1f71157c5..4d9ab0191 100644 --- a/src/tests/rpp/test_delay.cpp +++ b/src/tests/rpp/test_delay.cpp @@ -235,7 +235,7 @@ TEST_CASE("delay delays observable's emissions") TEST_CASE("delay is not disposing early") { - mock_observer mock{}; + mock_observer mock{}; trompeloeil::sequence s{}; rpp::schedulers::test_scheduler scheduler{}; @@ -246,8 +246,8 @@ TEST_CASE("delay is not disposing early") obs.set_upstream(d.value()); obs.on_completed(); }) - | rpp::ops::delay(std::chrono::seconds{1}, scheduler) - | rpp::ops::subscribe(mock); + | rpp::ops::delay(std::chrono::seconds{1}, scheduler) + | rpp::ops::subscribe(mock); CHECK(!d->is_disposed()); REQUIRE_CALL(*mock, on_completed()).LR_WITH(!d->is_disposed()).IN_SEQUENCE(s); From 906250a9e2095b21ab167aaa085909d05ec328f0 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 14 Oct 2024 23:14:12 +0300 Subject: [PATCH 4/8] modify some other strategies --- src/rpp/rpp/observers/details/disposable_strategy.hpp | 11 +++++++++++ src/rpp/rpp/observers/details/fwd.hpp | 5 +++++ src/rpp/rpp/operators/concat.hpp | 3 +++ src/rpp/rpp/operators/delay.hpp | 2 +- src/rpp/rpp/operators/details/combining_strategy.hpp | 2 ++ src/rpp/rpp/operators/merge.hpp | 2 ++ src/rpp/rpp/operators/retry.hpp | 7 ++----- src/rpp/rpp/operators/with_latest_from.hpp | 2 ++ src/rpp/rpp/sources/concat.hpp | 7 ++----- src/tests/rpp/test_delay.cpp | 1 + src/tests/utils/disposable_observable.hpp | 1 + 11 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/rpp/rpp/observers/details/disposable_strategy.hpp b/src/rpp/rpp/observers/details/disposable_strategy.hpp index 30db40c10..f47e4cf7c 100644 --- a/src/rpp/rpp/observers/details/disposable_strategy.hpp +++ b/src/rpp/rpp/observers/details/disposable_strategy.hpp @@ -103,4 +103,15 @@ namespace rpp::details::observers static void dispose() {} }; + + struct locally_disposable_strategy + { + static void add(const rpp::disposable_wrapper&) {} + + bool is_disposed() const noexcept { return state; } + + void dispose() const { state = true; } + + mutable bool state{}; + }; } // namespace rpp::details::observers diff --git a/src/rpp/rpp/observers/details/fwd.hpp b/src/rpp/rpp/observers/details/fwd.hpp index 377b9f295..327f873ae 100644 --- a/src/rpp/rpp/observers/details/fwd.hpp +++ b/src/rpp/rpp/observers/details/fwd.hpp @@ -29,6 +29,11 @@ namespace rpp::details::observers */ struct none_disposable_strategy; + /** + * @brief Just bool over is_disposed/dispose logic with no any add logic + */ + struct locally_disposable_strategy; + /** * @brief Dynamic disposable logic based on pre-allocated vector */ diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 053725378..be31d36f4 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -111,6 +111,7 @@ namespace rpp::operators::details template struct concat_observer_strategy_base { + concat_observer_strategy_base(std::shared_ptr> state, rpp::composite_disposable_wrapper refcounted) : state{std::move(state)} , refcounted{std::move(refcounted)} @@ -138,6 +139,8 @@ namespace rpp::operators::details template struct concat_inner_observer_strategy : public concat_observer_strategy_base { + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; + using base = concat_observer_strategy_base; using base::concat_observer_strategy_base; diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 2b8436b0e..e01797f8e 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -68,7 +68,7 @@ namespace rpp::operators::details template struct delay_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; std::shared_ptr> state{}; diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index b07327b92..158e7cd1f 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -49,6 +49,8 @@ namespace rpp::operators::details template struct combining_observer_strategy { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + std::shared_ptr state{}; void set_upstream(const rpp::disposable_wrapper& d) const diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 72c1897a1..7ae88b988 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -51,6 +51,8 @@ namespace rpp::operators::details template struct merge_observer_base_strategy { + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; + merge_observer_base_strategy(std::shared_ptr>&& state) : m_state{std::move(state)} { diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index dbeb667f8..b0001b52d 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -44,10 +44,9 @@ namespace rpp::operators::details template struct retry_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; std::shared_ptr> state; - mutable bool locally_disposed{}; template void on_next(T&& v) const @@ -57,7 +56,6 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - locally_disposed = true; if (state->count == 0) { state->observer.on_error(err); @@ -74,7 +72,6 @@ namespace rpp::operators::details void on_completed() const { - locally_disposed = true; state->observer.on_completed(); } @@ -83,7 +80,7 @@ namespace rpp::operators::details state->disposable.add(d); } - bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } + bool is_disposed() const { return state->disposable.is_disposed(); } }; template diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index c446ca27b..1d93135b9 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -50,6 +50,8 @@ namespace rpp::operators::details template struct with_latest_from_inner_observer_strategy { + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index a3a661f40..ba5349161 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -52,10 +52,9 @@ namespace rpp::details template struct concat_source_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; std::shared_ptr> state{}; - mutable bool locally_disposed{}; template void on_next(T&& v) const @@ -65,17 +64,15 @@ namespace rpp::details void on_error(const std::exception_ptr& err) const { - locally_disposed = true; state->observer.on_error(err); } void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); } - bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } + bool is_disposed() const { return state->disposable.is_disposed(); } void on_completed() const { - locally_disposed = true; state->disposable.clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) diff --git a/src/tests/rpp/test_delay.cpp b/src/tests/rpp/test_delay.cpp index 4d9ab0191..1f3bf6fb8 100644 --- a/src/tests/rpp/test_delay.cpp +++ b/src/tests/rpp/test_delay.cpp @@ -245,6 +245,7 @@ TEST_CASE("delay is not disposing early") d = rpp::composite_disposable_wrapper::make(); obs.set_upstream(d.value()); obs.on_completed(); + CHECK(obs.is_disposed()); }) | rpp::ops::delay(std::chrono::seconds{1}, scheduler) | rpp::ops::subscribe(mock); diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index e5b527b95..798c666d7 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -129,6 +129,7 @@ void test_operator_over_observable_with_disposable(auto&& op) const auto d = rpp::composite_disposable_wrapper::make(); obs.set_upstream(d); obs.on_completed(); + CHECK(obs.is_disposed()); CHECK(d.is_disposed()); })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); } From 6484db5d8cd22b912a9eebd4d8a8b69b85bd9b96 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 14 Oct 2024 23:29:37 +0300 Subject: [PATCH 5/8] one more --- src/tests/utils/disposable_observable.hpp | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index 798c666d7..0a5608068 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -115,23 +115,30 @@ void test_operator_over_observable_with_disposable(auto&& op) SUBCASE("operator disposes disposable on_error") { - op(rpp::source::create([](auto&& obs) { - const auto d = rpp::composite_disposable_wrapper::make(); - obs.set_upstream(d); + std::optional d{}: + op(rpp::source::create([&d](auto&& obs) { + d = rpp::composite_disposable_wrapper::make(); + obs.set_upstream(d.value()); obs.on_error({}); - CHECK(d.is_disposed()); + CHECK(obs.is_disposed()); })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); + + CHECK(d); + CHECK(d->is_disposed()); } SUBCASE("operator disposes disposable on_completed") { - op(rpp::source::create([](auto&& obs) { - const auto d = rpp::composite_disposable_wrapper::make(); - obs.set_upstream(d); + std::optional d{}: + op(rpp::source::create([&d](auto&& obs) { + d = rpp::composite_disposable_wrapper::make(); + obs.set_upstream(d.value()); obs.on_completed(); CHECK(obs.is_disposed()); - CHECK(d.is_disposed()); })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); + + CHECK(d); + CHECK(d->is_disposed()); } SUBCASE("set_upstream with fixed_disposable_strategy_selector<1>") From 31ec72097b840ffd622a7236657cd6a9b5dbcbb1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 14 Oct 2024 20:29:16 +0000 Subject: [PATCH 6/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tests/utils/disposable_observable.hpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index 0a5608068..b5ceb4b9a 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -115,8 +115,7 @@ void test_operator_over_observable_with_disposable(auto&& op) SUBCASE("operator disposes disposable on_error") { - std::optional d{}: - op(rpp::source::create([&d](auto&& obs) { + std::optional d{} : op(rpp::source::create([&d](auto&& obs) { d = rpp::composite_disposable_wrapper::make(); obs.set_upstream(d.value()); obs.on_error({}); @@ -129,8 +128,7 @@ void test_operator_over_observable_with_disposable(auto&& op) SUBCASE("operator disposes disposable on_completed") { - std::optional d{}: - op(rpp::source::create([&d](auto&& obs) { + std::optional d{} : op(rpp::source::create([&d](auto&& obs) { d = rpp::composite_disposable_wrapper::make(); obs.set_upstream(d.value()); obs.on_completed(); From 82927fe2eae5115c99c85ecf1c91e034a8e90176 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 15 Oct 2024 00:32:28 +0300 Subject: [PATCH 7/8] FIX --- src/tests/utils/disposable_observable.hpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index b5ceb4b9a..933c9fdc2 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -115,7 +115,8 @@ void test_operator_over_observable_with_disposable(auto&& op) SUBCASE("operator disposes disposable on_error") { - std::optional d{} : op(rpp::source::create([&d](auto&& obs) { + std::optional d{}; + op(rpp::source::create([&d](auto&& obs) { d = rpp::composite_disposable_wrapper::make(); obs.set_upstream(d.value()); obs.on_error({}); @@ -128,7 +129,8 @@ void test_operator_over_observable_with_disposable(auto&& op) SUBCASE("operator disposes disposable on_completed") { - std::optional d{} : op(rpp::source::create([&d](auto&& obs) { + std::optional d{}; + op(rpp::source::create([&d](auto&& obs) { d = rpp::composite_disposable_wrapper::make(); obs.set_upstream(d.value()); obs.on_completed(); From bc99305943e02c2081d87f6c8eaf1a5298b27d2d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 15 Oct 2024 18:22:01 +0300 Subject: [PATCH 8/8] fix --- src/rpp/rpp/operators/details/combining_strategy.hpp | 2 +- src/tests/rpp/test_merge.cpp | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 158e7cd1f..e36fec474 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -49,7 +49,7 @@ namespace rpp::operators::details template struct combining_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; std::shared_ptr state{}; diff --git a/src/tests/rpp/test_merge.cpp b/src/tests/rpp/test_merge.cpp index 8c982300c..11506846c 100644 --- a/src/tests/rpp/test_merge.cpp +++ b/src/tests/rpp/test_merge.cpp @@ -216,18 +216,21 @@ TEST_CASE_TEMPLATE("merge handles race condition", TestType, rpp::memory_model:: extracted_obs.emplace(std::forward(obs).as_dynamic()); }); + auto test = [&](auto source) { SUBCASE("subscribe on it") { SUBCASE("on_error can't interleave with on_next") { + std::mutex m{}; source | rpp::ops::as_blocking() | rpp::ops::subscribe([&](auto&&) { REQUIRE(extracted_obs.has_value()); CHECK(!on_error_called); - std::thread{[extracted_obs] + std::thread{[extracted_obs,&m] { + std::lock_guard lock{m}; extracted_obs->on_error(std::exception_ptr{}); }}.detach(); std::this_thread::sleep_for(std::chrono::seconds{1});