diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index 4362f2ea4..686daec58 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -62,6 +62,8 @@ namespace rpp::operators::details return; } + state->clear(); + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -89,7 +91,6 @@ namespace rpp::operators::details { if (state->count) --state->count.value(); - state->clear(); state->is_inside_drain.store(true, std::memory_order::seq_cst); try { diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index 3b0a9e0f1..bf481db07 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -56,9 +56,11 @@ namespace rpp::operators::details void on_next(T&&) const { locally_disposed = true; + state->clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; + drain(state); } @@ -74,7 +76,7 @@ namespace rpp::operators::details state->observer.on_completed(); } - void set_upstream(const disposable_wrapper& d) { state->add(d); } + void set_upstream(const disposable_wrapper& d) const { state->add(d); } bool is_disposed() const { return locally_disposed || state->is_disposed(); } }; @@ -121,7 +123,6 @@ namespace rpp::operators::details { while (!state->is_disposed()) { - state->clear(); state->is_inside_drain.store(true, std::memory_order::seq_cst); try { diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index 8ee5f537e..ef0d6ab94 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -75,6 +75,8 @@ namespace rpp::details void on_completed() const { locally_disposed = true; + state->clear(); + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -95,7 +97,6 @@ namespace rpp::details } using value_type = rpp::utils::extract_observable_type_t>; - state->clear(); state->is_inside_drain.store(true, std::memory_order::seq_cst); try { diff --git a/src/tests/rpp/test_concat.cpp b/src/tests/rpp/test_concat.cpp index 270054477..655b80e1e 100644 --- a/src/tests/rpp/test_concat.cpp +++ b/src/tests/rpp/test_concat.cpp @@ -248,6 +248,21 @@ TEST_CASE("concat as operator async completiton") subj.get_observer().on_completed(); } +TEST_CASE("concat disposes on looping") +{ + mock_observer mock{}; + REQUIRE_CALL(*mock, on_next_rvalue(1)); + REQUIRE_CALL(*mock, on_completed()); + + rpp::source::concat(rpp::source::create([](auto&& subscriber) { + auto d = rpp::composite_disposable_wrapper::make(); + subscriber.set_upstream(d); + subscriber.on_next(1); + subscriber.on_completed(); + CHECK(d.is_disposed()); + })) | rpp::ops::subscribe(mock); +} + TEST_CASE("concat doesn't produce extra copies") { copy_count_tracker tracker{}; diff --git a/src/tests/rpp/test_repeat.cpp b/src/tests/rpp/test_repeat.cpp index b7ddaf35f..56af00c4f 100644 --- a/src/tests/rpp/test_repeat.cpp +++ b/src/tests/rpp/test_repeat.cpp @@ -134,7 +134,7 @@ TEST_CASE("repeat doesn't produce extra copies") TEST_CASE("repeat satisfies disposable contracts") { - test_operator_with_disposable(rpp::ops::repeat()); + test_operator_with_disposable(rpp::ops::repeat(1)); } diff --git a/src/tests/rpp/test_retry.cpp b/src/tests/rpp/test_retry.cpp index d31d7b962..19acfa6b5 100644 --- a/src/tests/rpp/test_retry.cpp +++ b/src/tests/rpp/test_retry.cpp @@ -173,6 +173,22 @@ TEST_CASE("retry handles stack overflow") | rpp::operators::subscribe(mock); } +TEST_CASE("retry disposes on looping") +{ + mock_observer mock{}; + REQUIRE_CALL(*mock, on_next_rvalue(1)).TIMES(2); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)); + + rpp::source::concat(rpp::source::create([](auto&& subscriber) { + auto d = rpp::composite_disposable_wrapper::make(); + subscriber.set_upstream(d); + subscriber.on_next(1); + subscriber.on_error({}); + CHECK(d.is_disposed()); + })) | rpp::ops::retry(1) + | rpp::ops::subscribe(mock); +} + TEST_CASE("retry doesn't produce extra copies") { SECTION("retry(2)") diff --git a/src/tests/rpp/test_retry_when.cpp b/src/tests/rpp/test_retry_when.cpp index 0aa705171..0e4ac96bc 100644 --- a/src/tests/rpp/test_retry_when.cpp +++ b/src/tests/rpp/test_retry_when.cpp @@ -248,6 +248,24 @@ TEST_CASE("repeat_when does not stack overflow") | rpp::operators::subscribe(mock); } +TEST_CASE("retry_when disposes on looping") +{ + mock_observer mock{}; + REQUIRE_CALL(*mock, on_next_rvalue(1)).TIMES(2); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)); + + size_t i = 0; + + rpp::source::concat(rpp::source::create([](auto&& subscriber) { + auto d = rpp::composite_disposable_wrapper::make(); + subscriber.set_upstream(d); + subscriber.on_next(1); + subscriber.on_error({}); + CHECK(d.is_disposed()); + })) | rpp::ops::retry_when([&i](const std::exception_ptr& e) { return i++ ? rpp::source::error(e).as_dynamic() : rpp::source::just(1).as_dynamic(); }) + | rpp::ops::subscribe(mock); +} + TEST_CASE("retry_when doesn't produce extra copies") { SECTION("retry_when(empty_notifier)") @@ -264,20 +282,13 @@ TEST_CASE("retry_when doesn't produce extra copies") TEST_CASE("retry_when satisfies disposable contracts") { - auto observable_disposable = rpp::composite_disposable_wrapper::make(); - { - auto observable = observable_with_disposable(observable_disposable); - auto op = rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty(); }); - test_operator_with_disposable(op); - test_operator_finish_before_dispose(op); + test_operator_with_disposable(rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty(); })); + test_operator_finish_before_dispose(rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty(); })); - test_operator_over_observable_with_disposable( - [](auto observable) { - return rpp::source::concat(observable, rpp::source::error(std::make_exception_ptr(std::runtime_error{"error"}))) - | rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }); - }); - } - - CHECK((observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2)); + test_operator_over_observable_with_disposable( + [](auto observable) { + return rpp::source::concat(observable, rpp::source::error(std::make_exception_ptr(std::runtime_error{"error"}))) + | rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }); + }); } diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index 826c9f918..49e451f13 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -99,6 +99,26 @@ void test_operator_over_observable_with_disposable(auto&& op) CHECK(observable_disposable.is_disposed()); } + SECTION("operator disposes disposable on_error") + { + op(rpp::source::create([](auto&& obs) { + const auto d = rpp::composite_disposable_wrapper::make(); + obs.set_upstream(d); + obs.on_error({}); + CHECK(d.is_disposed()); + })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); + } + + SECTION("operator disposes disposable on_completed") + { + op(rpp::source::create([](auto&& obs) { + const auto d = rpp::composite_disposable_wrapper::make(); + obs.set_upstream(d); + obs.on_completed(); + CHECK(d.is_disposed()); + })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); + } + SECTION("set_upstream with fixed_disposable_strategy_selector<1>") { CHECK_NOTHROW(op(rpp::observable>>{})