diff --git a/src/rpp/rpp/disposables/details/base_disposable.hpp b/src/rpp/rpp/disposables/details/base_disposable.hpp index 0cb4e3f35..6d09755aa 100644 --- a/src/rpp/rpp/disposables/details/base_disposable.hpp +++ b/src/rpp/rpp/disposables/details/base_disposable.hpp @@ -41,7 +41,7 @@ namespace rpp::details } protected: - virtual void base_dispose_impl(interface_disposable::Mode mode) noexcept = 0; + virtual void base_dispose_impl(interface_disposable::Mode) noexcept {}; private: std::atomic_bool m_disposed{}; diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 24c1a9d34..8af013f03 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -25,13 +25,14 @@ namespace rpp::operators::details { std::shared_ptr> state{}; - bool is_disposed() const { return state->get_observer_under_lock()->is_disposed(); } + bool is_disposed() const { return state->is_disposed(); } void on_error(const std::exception_ptr& err) const { state->get_observer_under_lock()->on_error(err); } }; template - class debounce_state final : public std::enable_shared_from_this> + class debounce_state final : public rpp::details::enable_wrapper_from_this> + , public rpp::details::base_disposable { using T = rpp::utils::extract_observer_type_t; @@ -79,7 +80,7 @@ namespace rpp::operators::details return std::nullopt; }, - debounce_state_wrapper{this->shared_from_this()}); + debounce_state_wrapper{this->wrapper_from_this().lock()}); } std::variant extract_value_or_time() @@ -120,7 +121,7 @@ namespace rpp::operators::details bool is_disposed() const { - return state->get_observer_under_lock()->is_disposed(); + return state->is_disposed(); } template @@ -153,7 +154,7 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = Prev; + using updated_disposable_strategy = typename Prev::template add<1>; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; @@ -163,7 +164,9 @@ namespace rpp::operators::details { using worker_t = rpp::schedulers::utils::get_worker_t; - auto ptr = std::make_shared, worker_t>>(std::forward(observer), scheduler.create_worker(), duration); + auto d = rpp::disposable_wrapper_impl, worker_t>>::make(std::forward(observer), scheduler.create_worker(), duration); + auto ptr = d.lock(); + ptr->get_observer_under_lock()->set_upstream(d.as_weak()); return rpp::observer, worker_t>>{std::move(ptr)}; } }; diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 4aa03b752..b07327b92 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -22,7 +22,8 @@ namespace rpp::operators::details { template - class combining_state + class combining_state : public rpp::details::enable_wrapper_from_this> + , public rpp::details::base_disposable { public: explicit combining_state(Observer&& observer, size_t on_completed_needed) @@ -57,7 +58,7 @@ namespace rpp::operators::details bool is_disposed() const { - return state->get_observer_under_lock()->is_disposed(); + return state->is_disposed(); } void on_error(const std::exception_ptr& err) const @@ -103,7 +104,10 @@ namespace rpp::operators::details { using State = TState...>; - const auto state = std::make_shared(std::forward(observer), selector); + const auto d = rpp::disposable_wrapper_impl::make(std::forward(observer), selector); + auto state = d.lock(); + state->get_observer_under_lock()->set_upstream(d.as_weak()); + subscribe>(state, std::index_sequence_for{}, observables...); return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(state)}; diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 791a65f84..660b5acd1 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -69,7 +69,7 @@ namespace rpp::operators::details bool is_disposed() const { - return m_state->get_observer_under_lock()->is_disposed(); + return m_state->get_disposable().is_disposed(); } void on_error(const std::exception_ptr& err) const diff --git a/src/tests/rpp/test_combine_latest.cpp b/src/tests/rpp/test_combine_latest.cpp index 6ab894dce..f9358c960 100644 --- a/src/tests/rpp/test_combine_latest.cpp +++ b/src/tests/rpp/test_combine_latest.cpp @@ -138,6 +138,21 @@ TEST_CASE("combine_latest handles race condition") } } +TEST_CASE("combine_latest is not deadlocking is_disposed") +{ + std::optional> observer{}; + + rpp::source::create([&observer](auto&& obs) { + observer = std::forward(obs).as_dynamic(); + observer->on_next(1); + }) + | rpp::operators::combine_latest(rpp::source::just(1)) + | rpp::ops::subscribe([&observer](auto) { + CHECK(observer); + CHECK(!observer->is_disposed()); + }); +} + TEST_CASE("combine_latest satisfies disposable contracts") { auto observable_disposable = rpp::composite_disposable_wrapper::make(); diff --git a/src/tests/rpp/test_debounce.cpp b/src/tests/rpp/test_debounce.cpp index c3ad94788..28d49b1bc 100644 --- a/src/tests/rpp/test_debounce.cpp +++ b/src/tests/rpp/test_debounce.cpp @@ -120,6 +120,26 @@ TEST_CASE("debounce emit only items where timeout reached") } } + +TEST_CASE("debounce is not deadlocking is_disposed") +{ + std::optional> observer{}; + + rpp::schedulers::test_scheduler scheduler{}; + + rpp::source::create([&observer](auto&& obs) { + observer = std::forward(obs).as_dynamic(); + observer->on_next(1); + }) + | rpp::operators::debounce(std::chrono::seconds{1}, scheduler) + | rpp::ops::subscribe([&observer](int) { + CHECK(observer); + CHECK(!observer->is_disposed()); + }); + scheduler.time_advance(std::chrono::seconds{1}); +} + + TEST_CASE("debounce forwards error") { auto mock = mock_observer_strategy{}; diff --git a/src/tests/rpp/test_merge.cpp b/src/tests/rpp/test_merge.cpp index 1df94ad63..8c982300c 100644 --- a/src/tests/rpp/test_merge.cpp +++ b/src/tests/rpp/test_merge.cpp @@ -258,6 +258,20 @@ TEST_CASE("merge dispose inner_disposable immediately") | rpp::ops::subscribe([](int) {}); } +TEST_CASE("merge is not deadlocking is_disposed") +{ + std::optional> observer{}; + rpp::source::create([&observer](auto&& obs) { + observer = std::forward(obs).as_dynamic(); + observer->on_next(1); + }) + | rpp::ops::merge_with(rpp::source::never()) + | rpp::ops::subscribe([&observer](int) { + CHECK(observer); + CHECK(!observer->is_disposed()); + }); +} + TEST_CASE("merge doesn't produce extra copies") { SUBCASE("send value by copy") diff --git a/src/tests/rpp/test_zip.cpp b/src/tests/rpp/test_zip.cpp index 269f324d0..6e084da80 100644 --- a/src/tests/rpp/test_zip.cpp +++ b/src/tests/rpp/test_zip.cpp @@ -175,6 +175,21 @@ TEST_CASE("zip doesn't produce extra copies") } } +TEST_CASE("zip is not deadlocking is_disposed") +{ + std::optional> observer{}; + + rpp::source::create([&observer](auto&& obs) { + observer = std::forward(obs).as_dynamic(); + observer->on_next(1); + }) + | rpp::operators::zip(rpp::source::just(1)) + | rpp::ops::subscribe([&observer](auto) { + CHECK(observer); + CHECK(!observer->is_disposed()); + }); +} + TEST_CASE("zip satisfies disposable contracts") { auto observable_disposable = rpp::composite_disposable_wrapper::make();