From 29c6e422446d6b9d31c55b73029ae84faeda930b Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Wed, 10 Apr 2024 23:16:25 +0200 Subject: [PATCH 1/2] Call dispose after on_error/on_completed --- src/rpp/rpp/operators/concat.hpp | 2 +- .../operators/details/combining_strategy.hpp | 4 +- src/rpp/rpp/operators/merge.hpp | 4 +- src/rpp/rpp/operators/switch_on_next.hpp | 4 +- src/rpp/rpp/operators/with_latest_from.hpp | 6 +-- src/rpp/rpp/sources/concat.hpp | 1 + src/rpp/rpp/utils/functors.hpp | 1 + src/tests/rpp/test_combine_latest.cpp | 4 +- src/tests/rpp/test_merge.cpp | 4 +- ...est_from.cpp => test_with_latest_from.cpp} | 4 +- src/tests/rpp/test_zip.cpp | 4 +- src/tests/utils/disposable_observable.hpp | 38 +++++++++++++++++++ 12 files changed, 62 insertions(+), 14 deletions(-) rename src/tests/rpp/{test_with_lastest_from.cpp => test_with_latest_from.cpp} (97%) diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 0802a5666..be9737c97 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -121,8 +121,8 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - state->dispose(); state->get_observer()->on_error(err); + state->dispose(); } void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); } diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 099dd2c30..f7f5152c5 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -62,16 +62,16 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - disposable->dispose(); disposable->get_observer_under_lock()->on_error(err); + disposable->dispose(); } void on_completed() const { if (disposable->decrement_on_completed()) { - disposable->dispose(); disposable->get_observer_under_lock()->on_completed(); + disposable->dispose(); } } }; diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 511f43f1d..2332f6df2 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -71,8 +71,8 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - m_disposable->dispose(); m_disposable->get_observer_under_lock()->on_error(err); + m_disposable->dispose(); } void on_completed() const @@ -83,8 +83,8 @@ namespace rpp::operators::details { m_disposable->remove(v); } - m_disposable->dispose(); m_disposable->get_observer_under_lock()->on_completed(); + m_disposable->dispose(); } } diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index fc845d0c8..9cd494e83 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -62,8 +62,8 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - m_state->dispose(); m_state->get_observer()->on_error(err); + m_state->dispose(); } void on_completed() const @@ -106,8 +106,8 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - m_state->dispose(); m_state->get_observer()->on_error(err); + m_state->dispose(); } void on_completed() const diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index e6e8140ff..68963c718 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -68,8 +68,8 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - disposable->dispose(); disposable->get_observer_under_lock()->on_error(err); + disposable->dispose(); } static constexpr rpp::utils::empty_function_t<> on_completed{}; @@ -112,14 +112,14 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - disposable->dispose(); disposable->get_observer_under_lock()->on_error(err); + disposable->dispose(); } void on_completed() const { - disposable->dispose(); disposable->get_observer_under_lock()->on_completed(); + disposable->dispose(); } }; diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index 27bfd3e15..4a5e37a49 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -90,6 +90,7 @@ namespace rpp::details if (state->itr.value() == std::cend(state->container)) { state->observer.on_completed(); + state->dispose(); return; } diff --git a/src/rpp/rpp/utils/functors.hpp b/src/rpp/rpp/utils/functors.hpp index 3b36c1a9b..23ce2ef79 100644 --- a/src/rpp/rpp/utils/functors.hpp +++ b/src/rpp/rpp/utils/functors.hpp @@ -10,6 +10,7 @@ #pragma once +#include #include namespace rpp::utils diff --git a/src/tests/rpp/test_combine_latest.cpp b/src/tests/rpp/test_combine_latest.cpp index 35269214b..42c7b73d0 100644 --- a/src/tests/rpp/test_combine_latest.cpp +++ b/src/tests/rpp/test_combine_latest.cpp @@ -159,8 +159,10 @@ TEST_CASE("combine_latest satisfies disposable contracts") auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::combine_latest(observable); - test_operator_with_disposable(rpp::ops::combine_latest(observable)); + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); } CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); diff --git a/src/tests/rpp/test_merge.cpp b/src/tests/rpp/test_merge.cpp index 5ddcbf837..55e8511a8 100644 --- a/src/tests/rpp/test_merge.cpp +++ b/src/tests/rpp/test_merge.cpp @@ -285,8 +285,10 @@ TEST_CASE("merge satisfies disposable contracts") auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::merge_with(observable); - test_operator_with_disposable(rpp::ops::merge_with(observable)); + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); } CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); } diff --git a/src/tests/rpp/test_with_lastest_from.cpp b/src/tests/rpp/test_with_latest_from.cpp similarity index 97% rename from src/tests/rpp/test_with_lastest_from.cpp rename to src/tests/rpp/test_with_latest_from.cpp index 01d464876..44a4a8133 100644 --- a/src/tests/rpp/test_with_lastest_from.cpp +++ b/src/tests/rpp/test_with_latest_from.cpp @@ -198,8 +198,10 @@ TEST_CASE("with_latest_from satisfies disposable contracts") auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::with_latest_from(observable); - test_operator_with_disposable(rpp::ops::with_latest_from(observable)); + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); } CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); diff --git a/src/tests/rpp/test_zip.cpp b/src/tests/rpp/test_zip.cpp index d6de75d2f..206db4c79 100644 --- a/src/tests/rpp/test_zip.cpp +++ b/src/tests/rpp/test_zip.cpp @@ -180,8 +180,10 @@ TEST_CASE("zip satisfies disposable contracts") auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::zip(observable); - test_operator_with_disposable(rpp::ops::zip(observable)); + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); } CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index 52fc47552..dbee906fd 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -45,6 +45,38 @@ struct wrapped_observable_strategy_no_set_upstream auto subscribe(auto&&) const {} }; +template +void test_operator_over_observable_finish_before_dispose(auto&& op) +{ + SECTION("operator calls on_error before dispose") + { + bool callback_called = false; + auto observable_disposable = rpp::make_callback_disposable([&callback_called]() noexcept { + callback_called = true; + }); + auto observable = rpp::source::create([&observable_disposable](auto&& obs) { + obs.set_upstream(observable_disposable); + obs.on_error(std::make_exception_ptr(std::runtime_error{""})); + }); + + op(observable) | rpp::ops::subscribe([](const auto&) {}, [&callback_called](const std::exception_ptr&) { CHECK(!callback_called); }); + } + + SECTION("operator calls on_completed before dispose") + { + bool callback_called = false; + auto observable_disposable = rpp::make_callback_disposable([&callback_called]() noexcept { + callback_called = true; + }); + auto observable = rpp::source::create([&observable_disposable](auto&& obs) { + obs.set_upstream(observable_disposable); + obs.on_completed(); + }); + + op(observable) | rpp::ops::subscribe([](const auto&) {}, [&callback_called]() { CHECK(!callback_called); }); + } +} + template void test_operator_over_observable_with_disposable(auto&& op) { @@ -112,3 +144,9 @@ void test_operator_with_disposable(auto&& op) { test_operator_over_observable_with_disposable([op](auto&& observable) { return observable | op; }); } + +template +void test_operator_finish_before_dispose(auto&& op) +{ + test_operator_over_observable_finish_before_dispose([op](auto&& observable) { return observable | op; }); +} From de31a2849ec48c74c5cfb301cea3688d95caa7d9 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Wed, 10 Apr 2024 23:41:51 +0200 Subject: [PATCH 2/2] Add on_error_resume_next fix --- src/rpp/rpp/operators/on_error_resume_next.hpp | 5 ++--- src/tests/rpp/test_on_error_resume_next.cpp | 5 +++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 95b259a7b..6d064cadc 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -38,8 +38,6 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - disposable.dispose(); - std::optional> selector_obs; try { @@ -53,12 +51,13 @@ namespace rpp::operators::details { std::move(selector_obs).value().subscribe(std::move(observer)); } + disposable.dispose(); } void on_completed() const { - disposable.dispose(); observer.on_completed(); + disposable.dispose(); } void set_upstream(const disposable_wrapper& d) diff --git a/src/tests/rpp/test_on_error_resume_next.cpp b/src/tests/rpp/test_on_error_resume_next.cpp index 993ad03c6..b71fcc959 100644 --- a/src/tests/rpp/test_on_error_resume_next.cpp +++ b/src/tests/rpp/test_on_error_resume_next.cpp @@ -168,9 +168,10 @@ TEST_CASE("on_error_resume_next satisfies disposable contracts") auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty(); }); - test_operator_with_disposable( - rpp::ops::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty(); })); + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); } CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2);