diff --git a/src/rpp/rpp/observers/details/disposable_strategy.hpp b/src/rpp/rpp/observers/details/disposable_strategy.hpp index 30db40c10..f47e4cf7c 100644 --- a/src/rpp/rpp/observers/details/disposable_strategy.hpp +++ b/src/rpp/rpp/observers/details/disposable_strategy.hpp @@ -103,4 +103,15 @@ namespace rpp::details::observers static void dispose() {} }; + + struct locally_disposable_strategy + { + static void add(const rpp::disposable_wrapper&) {} + + bool is_disposed() const noexcept { return state; } + + void dispose() const { state = true; } + + mutable bool state{}; + }; } // namespace rpp::details::observers diff --git a/src/rpp/rpp/observers/details/fwd.hpp b/src/rpp/rpp/observers/details/fwd.hpp index 377b9f295..327f873ae 100644 --- a/src/rpp/rpp/observers/details/fwd.hpp +++ b/src/rpp/rpp/observers/details/fwd.hpp @@ -29,6 +29,11 @@ namespace rpp::details::observers */ struct none_disposable_strategy; + /** + * @brief Just bool over is_disposed/dispose logic with no any add logic + */ + struct locally_disposable_strategy; + /** * @brief Dynamic disposable logic based on pre-allocated vector */ diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 053725378..be31d36f4 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -111,6 +111,7 @@ namespace rpp::operators::details template struct concat_observer_strategy_base { + concat_observer_strategy_base(std::shared_ptr> state, rpp::composite_disposable_wrapper refcounted) : state{std::move(state)} , refcounted{std::move(refcounted)} @@ -138,6 +139,8 @@ namespace rpp::operators::details template struct concat_inner_observer_strategy : public concat_observer_strategy_base { + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; + using base = concat_observer_strategy_base; using base::concat_observer_strategy_base; diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 612df5c35..e01797f8e 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -68,6 +68,8 @@ namespace rpp::operators::details template struct delay_observer_strategy { + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index b07327b92..e36fec474 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -49,6 +49,8 @@ namespace rpp::operators::details template struct combining_observer_strategy { + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; + std::shared_ptr state{}; void set_upstream(const rpp::disposable_wrapper& d) const diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 72c1897a1..7ae88b988 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -51,6 +51,8 @@ namespace rpp::operators::details template struct merge_observer_base_strategy { + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; + merge_observer_base_strategy(std::shared_ptr>&& state) : m_state{std::move(state)} { diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index dbeb667f8..b0001b52d 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -44,10 +44,9 @@ namespace rpp::operators::details template struct retry_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; std::shared_ptr> state; - mutable bool locally_disposed{}; template void on_next(T&& v) const @@ -57,7 +56,6 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - locally_disposed = true; if (state->count == 0) { state->observer.on_error(err); @@ -74,7 +72,6 @@ namespace rpp::operators::details void on_completed() const { - locally_disposed = true; state->observer.on_completed(); } @@ -83,7 +80,7 @@ namespace rpp::operators::details state->disposable.add(d); } - bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } + bool is_disposed() const { return state->disposable.is_disposed(); } }; template diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index c446ca27b..1d93135b9 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -50,6 +50,8 @@ namespace rpp::operators::details template struct with_latest_from_inner_observer_strategy { + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index a3a661f40..ba5349161 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -52,10 +52,9 @@ namespace rpp::details template struct concat_source_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; std::shared_ptr> state{}; - mutable bool locally_disposed{}; template void on_next(T&& v) const @@ -65,17 +64,15 @@ namespace rpp::details void on_error(const std::exception_ptr& err) const { - locally_disposed = true; state->observer.on_error(err); } void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); } - bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } + bool is_disposed() const { return state->disposable.is_disposed(); } void on_completed() const { - locally_disposed = true; state->disposable.clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) diff --git a/src/tests/rpp/test_delay.cpp b/src/tests/rpp/test_delay.cpp index cafbbab05..1f3bf6fb8 100644 --- a/src/tests/rpp/test_delay.cpp +++ b/src/tests/rpp/test_delay.cpp @@ -21,6 +21,7 @@ #include #include "disposable_observable.hpp" +#include "rpp_trompeloil.hpp" namespace { @@ -232,9 +233,32 @@ TEST_CASE("delay delays observable's emissions") } } +TEST_CASE("delay is not disposing early") +{ + mock_observer mock{}; + trompeloeil::sequence s{}; + + rpp::schedulers::test_scheduler scheduler{}; + + std::optional d{}; + rpp::source::create([&d](auto&& obs) { + d = rpp::composite_disposable_wrapper::make(); + obs.set_upstream(d.value()); + obs.on_completed(); + CHECK(obs.is_disposed()); + }) + | rpp::ops::delay(std::chrono::seconds{1}, scheduler) + | rpp::ops::subscribe(mock); + + CHECK(!d->is_disposed()); + REQUIRE_CALL(*mock, on_completed()).LR_WITH(!d->is_disposed()).IN_SEQUENCE(s); + scheduler.time_advance(std::chrono::seconds{1}); + CHECK(d->is_disposed()); +} + TEST_CASE("delay satisfies disposable contracts") { - test_operator_with_disposable(rpp::ops::delay(std::chrono::seconds{0}, manual_scheduler{})); + test_operator_with_disposable(rpp::ops::delay(std::chrono::seconds{0}, rpp::schedulers::immediate{})); } TEST_CASE("observe_on forward error immediately") diff --git a/src/tests/rpp/test_merge.cpp b/src/tests/rpp/test_merge.cpp index 8c982300c..11506846c 100644 --- a/src/tests/rpp/test_merge.cpp +++ b/src/tests/rpp/test_merge.cpp @@ -216,18 +216,21 @@ TEST_CASE_TEMPLATE("merge handles race condition", TestType, rpp::memory_model:: extracted_obs.emplace(std::forward(obs).as_dynamic()); }); + auto test = [&](auto source) { SUBCASE("subscribe on it") { SUBCASE("on_error can't interleave with on_next") { + std::mutex m{}; source | rpp::ops::as_blocking() | rpp::ops::subscribe([&](auto&&) { REQUIRE(extracted_obs.has_value()); CHECK(!on_error_called); - std::thread{[extracted_obs] + std::thread{[extracted_obs,&m] { + std::lock_guard lock{m}; extracted_obs->on_error(std::exception_ptr{}); }}.detach(); std::this_thread::sleep_for(std::chrono::seconds{1}); diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index e5b527b95..933c9fdc2 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -115,22 +115,30 @@ void test_operator_over_observable_with_disposable(auto&& op) SUBCASE("operator disposes disposable on_error") { - op(rpp::source::create([](auto&& obs) { - const auto d = rpp::composite_disposable_wrapper::make(); - obs.set_upstream(d); + std::optional d{}; + op(rpp::source::create([&d](auto&& obs) { + d = rpp::composite_disposable_wrapper::make(); + obs.set_upstream(d.value()); obs.on_error({}); - CHECK(d.is_disposed()); + CHECK(obs.is_disposed()); })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); + + CHECK(d); + CHECK(d->is_disposed()); } SUBCASE("operator disposes disposable on_completed") { - op(rpp::source::create([](auto&& obs) { - const auto d = rpp::composite_disposable_wrapper::make(); - obs.set_upstream(d); + std::optional d{}; + op(rpp::source::create([&d](auto&& obs) { + d = rpp::composite_disposable_wrapper::make(); + obs.set_upstream(d.value()); obs.on_completed(); - CHECK(d.is_disposed()); + CHECK(obs.is_disposed()); })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); + + CHECK(d); + CHECK(d->is_disposed()); } SUBCASE("set_upstream with fixed_disposable_strategy_selector<1>")