diff --git a/src/rpp/rpp/observers/details/disposables_strategy.hpp b/src/rpp/rpp/observers/details/disposables_strategy.hpp index 966079736..568e935da 100644 --- a/src/rpp/rpp/observers/details/disposables_strategy.hpp +++ b/src/rpp/rpp/observers/details/disposables_strategy.hpp @@ -55,4 +55,17 @@ namespace rpp::details::observers static constexpr void dispose() {} }; + + class boolean_disposables_strategy + { + public: + static constexpr void add(const rpp::disposable_wrapper&) {} + + bool is_disposed() const noexcept { return m_is_disposed; } + + void dispose() const { m_is_disposed = true; } + + private: + mutable bool m_is_disposed{}; + }; } // namespace rpp::details::observers diff --git a/src/rpp/rpp/observers/details/fwd.hpp b/src/rpp/rpp/observers/details/fwd.hpp index 0ac6be5be..6232b0fdc 100644 --- a/src/rpp/rpp/observers/details/fwd.hpp +++ b/src/rpp/rpp/observers/details/fwd.hpp @@ -25,7 +25,9 @@ namespace rpp::details::observers // No any disposables logic for observer expected None = 1, // Use external (passed to constructor) composite_disposable_wrapper as disposable - External = 2 + External = 2, + // Observer just controls is_disposed or not but upstreams handled via observer_strategy + Boolean = 3 }; namespace constraint @@ -47,6 +49,10 @@ namespace rpp::details::observers * @brief No any disposable logic at all. Used only inside proxy-forwarding operators where extra disposable logic not requires */ struct none_disposables_strategy; + /** + * @brief Just control is_disposed or not via boolean and ignore upstreams at all + */ + class boolean_disposables_strategy; /** * @brief Keep disposables inside dynamic_disposables_container container (based on std::vector) @@ -66,7 +72,7 @@ namespace rpp::details::observers template consteval auto* deduce_optimal_disposables_strategy() { - static_assert(mode == disposables_mode::Auto || mode == disposables_mode::None || mode == disposables_mode::External); + static_assert(mode == disposables_mode::Auto || mode == disposables_mode::None || mode == disposables_mode::External || mode == disposables_mode::Boolean); if constexpr (mode == disposables_mode::Auto) return static_cast(nullptr); @@ -74,6 +80,8 @@ namespace rpp::details::observers return static_cast(nullptr); else if constexpr (mode == disposables_mode::External) return static_cast(nullptr); + else if constexpr (mode == disposables_mode::Boolean) + return static_cast(nullptr); else return static_cast(nullptr); } diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 5e8dcdf2f..fe3bdcd18 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -115,10 +115,9 @@ namespace rpp::operators::details template struct concat_inner_observer_strategy { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean; std::shared_ptr> disposable{}; - mutable bool locally_disposed{}; template void on_next(T&& v) const @@ -128,13 +127,11 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - locally_disposed = true; disposable->get_observer()->on_error(err); } void on_completed() const { - locally_disposed = true; disposable->get_inner_child_disposable().clear(); ConcatStage current{ConcatStage::Draining}; @@ -148,7 +145,7 @@ namespace rpp::operators::details void set_upstream(const disposable_wrapper& d) const { disposable->get_inner_child_disposable().add(d); } - bool is_disposed() const { return locally_disposed || disposable->get_inner_child_disposable().is_disposed(); } + bool is_disposed() const { return disposable->get_inner_child_disposable().is_disposed(); } }; template diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 3e3945dac..38f488c94 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -69,7 +69,7 @@ namespace rpp::operators::details template struct delay_observer_strategy { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean; std::shared_ptr> disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index f14203ad0..817cebb45 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -48,7 +48,7 @@ namespace rpp::operators::details template struct merge_observer_base_strategy { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean; merge_observer_base_strategy(std::shared_ptr>&& disposable) : m_disposable{std::move(disposable)} { diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index cb6700cde..0338c26ba 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -41,10 +41,9 @@ namespace rpp::operators::details template struct retry_observer_strategy { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean; std::shared_ptr> state; - mutable bool locally_disposed{}; template void on_next(T&& v) const @@ -54,7 +53,6 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - locally_disposed = true; if (state->count == 0) { state->observer.on_error(err); @@ -71,7 +69,6 @@ namespace rpp::operators::details void on_completed() const { - locally_disposed = true; state->observer.on_completed(); } @@ -80,7 +77,7 @@ namespace rpp::operators::details state->add(d); } - bool is_disposed() const { return locally_disposed || state->is_disposed(); } + bool is_disposed() const { return state->is_disposed(); } }; template diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index b22f86c85..7025ac4b7 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -50,10 +50,9 @@ namespace rpp::details template struct concat_source_observer_strategy { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean; std::shared_ptr> state{}; - mutable bool locally_disposed{}; template void on_next(T&& v) const @@ -63,17 +62,15 @@ namespace rpp::details void on_error(const std::exception_ptr& err) const { - locally_disposed = true; state->observer.on_error(err); } void set_upstream(const disposable_wrapper& d) { state->add(d); } - bool is_disposed() const { return locally_disposed || state->is_disposed(); } + bool is_disposed() const { return state->is_disposed(); } void on_completed() const { - locally_disposed = true; state->clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index afb2f66cf..97533fbc5 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -105,6 +105,7 @@ void test_operator_over_observable_with_disposable(auto&& op) const auto d = rpp::composite_disposable_wrapper::make(); obs.set_upstream(d); obs.on_error({}); + CHECK(obs.is_disposed()); CHECK(d.is_disposed()); })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); } @@ -115,6 +116,7 @@ void test_operator_over_observable_with_disposable(auto&& op) const auto d = rpp::composite_disposable_wrapper::make(); obs.set_upstream(d); obs.on_completed(); + CHECK(obs.is_disposed()); CHECK(d.is_disposed()); })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); }