diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 0802a5666..be9737c97 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -121,8 +121,8 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - state->dispose(); state->get_observer()->on_error(err); + state->dispose(); } void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); } diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 099dd2c30..f7f5152c5 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -62,16 +62,16 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - disposable->dispose(); disposable->get_observer_under_lock()->on_error(err); + disposable->dispose(); } void on_completed() const { if (disposable->decrement_on_completed()) { - disposable->dispose(); disposable->get_observer_under_lock()->on_completed(); + disposable->dispose(); } } }; diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 511f43f1d..2332f6df2 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -71,8 +71,8 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - m_disposable->dispose(); m_disposable->get_observer_under_lock()->on_error(err); + m_disposable->dispose(); } void on_completed() const @@ -83,8 +83,8 @@ namespace rpp::operators::details { m_disposable->remove(v); } - m_disposable->dispose(); m_disposable->get_observer_under_lock()->on_completed(); + m_disposable->dispose(); } } diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 95b259a7b..6d064cadc 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -38,8 +38,6 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - disposable.dispose(); - std::optional> selector_obs; try { @@ -53,12 +51,13 @@ namespace rpp::operators::details { std::move(selector_obs).value().subscribe(std::move(observer)); } + disposable.dispose(); } void on_completed() const { - disposable.dispose(); observer.on_completed(); + disposable.dispose(); } void set_upstream(const disposable_wrapper& d) diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index fc845d0c8..9cd494e83 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -62,8 +62,8 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - m_state->dispose(); m_state->get_observer()->on_error(err); + m_state->dispose(); } void on_completed() const @@ -106,8 +106,8 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - m_state->dispose(); m_state->get_observer()->on_error(err); + m_state->dispose(); } void on_completed() const diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index e6e8140ff..68963c718 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -68,8 +68,8 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - disposable->dispose(); disposable->get_observer_under_lock()->on_error(err); + disposable->dispose(); } static constexpr rpp::utils::empty_function_t<> on_completed{}; @@ -112,14 +112,14 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - disposable->dispose(); disposable->get_observer_under_lock()->on_error(err); + disposable->dispose(); } void on_completed() const { - disposable->dispose(); disposable->get_observer_under_lock()->on_completed(); + disposable->dispose(); } }; diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index 27bfd3e15..4a5e37a49 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -90,6 +90,7 @@ namespace rpp::details if (state->itr.value() == std::cend(state->container)) { state->observer.on_completed(); + state->dispose(); return; } diff --git a/src/rpp/rpp/utils/functors.hpp b/src/rpp/rpp/utils/functors.hpp index 3b36c1a9b..23ce2ef79 100644 --- a/src/rpp/rpp/utils/functors.hpp +++ b/src/rpp/rpp/utils/functors.hpp @@ -10,6 +10,7 @@ #pragma once +#include #include namespace rpp::utils diff --git a/src/tests/rpp/test_combine_latest.cpp b/src/tests/rpp/test_combine_latest.cpp index 35269214b..42c7b73d0 100644 --- a/src/tests/rpp/test_combine_latest.cpp +++ b/src/tests/rpp/test_combine_latest.cpp @@ -159,8 +159,10 @@ TEST_CASE("combine_latest satisfies disposable contracts") auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::combine_latest(observable); - test_operator_with_disposable(rpp::ops::combine_latest(observable)); + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); } CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); diff --git a/src/tests/rpp/test_merge.cpp b/src/tests/rpp/test_merge.cpp index 5ddcbf837..55e8511a8 100644 --- a/src/tests/rpp/test_merge.cpp +++ b/src/tests/rpp/test_merge.cpp @@ -285,8 +285,10 @@ TEST_CASE("merge satisfies disposable contracts") auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::merge_with(observable); - test_operator_with_disposable(rpp::ops::merge_with(observable)); + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); } CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); } diff --git a/src/tests/rpp/test_on_error_resume_next.cpp b/src/tests/rpp/test_on_error_resume_next.cpp index 993ad03c6..b71fcc959 100644 --- a/src/tests/rpp/test_on_error_resume_next.cpp +++ b/src/tests/rpp/test_on_error_resume_next.cpp @@ -168,9 +168,10 @@ TEST_CASE("on_error_resume_next satisfies disposable contracts") auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty(); }); - test_operator_with_disposable( - rpp::ops::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty(); })); + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); } CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); diff --git a/src/tests/rpp/test_with_lastest_from.cpp b/src/tests/rpp/test_with_latest_from.cpp similarity index 97% rename from src/tests/rpp/test_with_lastest_from.cpp rename to src/tests/rpp/test_with_latest_from.cpp index 01d464876..44a4a8133 100644 --- a/src/tests/rpp/test_with_lastest_from.cpp +++ b/src/tests/rpp/test_with_latest_from.cpp @@ -198,8 +198,10 @@ TEST_CASE("with_latest_from satisfies disposable contracts") auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::with_latest_from(observable); - test_operator_with_disposable(rpp::ops::with_latest_from(observable)); + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); } CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); diff --git a/src/tests/rpp/test_zip.cpp b/src/tests/rpp/test_zip.cpp index d6de75d2f..206db4c79 100644 --- a/src/tests/rpp/test_zip.cpp +++ b/src/tests/rpp/test_zip.cpp @@ -180,8 +180,10 @@ TEST_CASE("zip satisfies disposable contracts") auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::zip(observable); - test_operator_with_disposable(rpp::ops::zip(observable)); + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); } CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index 52fc47552..dbee906fd 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -45,6 +45,38 @@ struct wrapped_observable_strategy_no_set_upstream auto subscribe(auto&&) const {} }; +template +void test_operator_over_observable_finish_before_dispose(auto&& op) +{ + SECTION("operator calls on_error before dispose") + { + bool callback_called = false; + auto observable_disposable = rpp::make_callback_disposable([&callback_called]() noexcept { + callback_called = true; + }); + auto observable = rpp::source::create([&observable_disposable](auto&& obs) { + obs.set_upstream(observable_disposable); + obs.on_error(std::make_exception_ptr(std::runtime_error{""})); + }); + + op(observable) | rpp::ops::subscribe([](const auto&) {}, [&callback_called](const std::exception_ptr&) { CHECK(!callback_called); }); + } + + SECTION("operator calls on_completed before dispose") + { + bool callback_called = false; + auto observable_disposable = rpp::make_callback_disposable([&callback_called]() noexcept { + callback_called = true; + }); + auto observable = rpp::source::create([&observable_disposable](auto&& obs) { + obs.set_upstream(observable_disposable); + obs.on_completed(); + }); + + op(observable) | rpp::ops::subscribe([](const auto&) {}, [&callback_called]() { CHECK(!callback_called); }); + } +} + template void test_operator_over_observable_with_disposable(auto&& op) { @@ -112,3 +144,9 @@ void test_operator_with_disposable(auto&& op) { test_operator_over_observable_with_disposable([op](auto&& observable) { return observable | op; }); } + +template +void test_operator_finish_before_dispose(auto&& op) +{ + test_operator_over_observable_finish_before_dispose([op](auto&& observable) { return observable | op; }); +}