From d70d315ef4ddb0aa7628b9a81995c452d14ae96e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 30 Aug 2024 13:04:37 +0300 Subject: [PATCH 1/6] fix infinite looping --- src/rpp/rpp/operators/take_until.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index ed2aca2c2..a3857cd40 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -65,7 +65,7 @@ namespace rpp::operators::details void set_upstream(const disposable_wrapper& d) { state->add(d); } - bool is_disposed() const { return state->is_disposed(); } + bool is_disposed() const { return state->is_stopped() || state->is_disposed(); } }; template From 556a3c56a87efe84385c489fc32b9254d3d95d7c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 30 Aug 2024 13:13:56 +0300 Subject: [PATCH 2/6] add test --- src/tests/rpp/test_take_until.cpp | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/tests/rpp/test_take_until.cpp b/src/tests/rpp/test_take_until.cpp index 7d65a96a8..a8465faf5 100644 --- a/src/tests/rpp/test_take_until.cpp +++ b/src/tests/rpp/test_take_until.cpp @@ -14,9 +14,15 @@ #include #include #include +#include +#include +#include +#include +#include #include #include #include +#include #include #include #include @@ -239,3 +245,22 @@ TEST_CASE("take_until dispose after completion") CHECK(log == std::vector{"completed", "finally"}); } + +TEST_CASE("take_until infinite loop") +{ + rpp::source::create([](auto&& obs) { + obs.on_next(1); + obs.on_completed(); + }) + | rpp::ops::map([](int) { + const auto delay = rpp::source::timer(std::chrono::seconds{1}, rpp::schedulers::current_thread{}); + return rpp::source::never() + | rpp::ops::take_until(delay) + | rpp::ops::start_with(1); + }) + | rpp::ops::concat() + | rpp::ops::repeat() + | rpp::ops::take(5) + | rpp::ops::subscribe([](int) {}); + ; +} \ No newline at end of file From 76d5a8dae9d94445a22497cc0bfc6dabf3d66dcf Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 30 Aug 2024 10:14:09 +0000 Subject: [PATCH 3/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tests/rpp/test_take_until.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/tests/rpp/test_take_until.cpp b/src/tests/rpp/test_take_until.cpp index a8465faf5..f301b61d6 100644 --- a/src/tests/rpp/test_take_until.cpp +++ b/src/tests/rpp/test_take_until.cpp @@ -13,20 +13,20 @@ #include #include +#include #include #include -#include -#include #include +#include #include #include #include #include -#include #include #include #include #include +#include #include #include "copy_count_tracker.hpp" @@ -263,4 +263,4 @@ TEST_CASE("take_until infinite loop") | rpp::ops::take(5) | rpp::ops::subscribe([](int) {}); ; -} \ No newline at end of file +} From 3be78e45c955b729b8e3b97e25559226e0889ca3 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 30 Aug 2024 23:52:54 +0300 Subject: [PATCH 4/6] update tests and fix --- src/rpp/rpp/operators/retry.hpp | 3 ++- src/rpp/rpp/operators/retry_when.hpp | 5 +++-- src/rpp/rpp/operators/take_until.hpp | 2 +- src/rpp/rpp/sources/concat.hpp | 3 ++- src/tests/rpp/test_take_until.cpp | 4 ++-- 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index 4362f2ea4..686daec58 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -62,6 +62,8 @@ namespace rpp::operators::details return; } + state->clear(); + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -89,7 +91,6 @@ namespace rpp::operators::details { if (state->count) --state->count.value(); - state->clear(); state->is_inside_drain.store(true, std::memory_order::seq_cst); try { diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index 3b0a9e0f1..bf481db07 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -56,9 +56,11 @@ namespace rpp::operators::details void on_next(T&&) const { locally_disposed = true; + state->clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; + drain(state); } @@ -74,7 +76,7 @@ namespace rpp::operators::details state->observer.on_completed(); } - void set_upstream(const disposable_wrapper& d) { state->add(d); } + void set_upstream(const disposable_wrapper& d) const { state->add(d); } bool is_disposed() const { return locally_disposed || state->is_disposed(); } }; @@ -121,7 +123,6 @@ namespace rpp::operators::details { while (!state->is_disposed()) { - state->clear(); state->is_inside_drain.store(true, std::memory_order::seq_cst); try { diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index a3857cd40..ed2aca2c2 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -65,7 +65,7 @@ namespace rpp::operators::details void set_upstream(const disposable_wrapper& d) { state->add(d); } - bool is_disposed() const { return state->is_stopped() || state->is_disposed(); } + bool is_disposed() const { return state->is_disposed(); } }; template diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index 8ee5f537e..ef0d6ab94 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -75,6 +75,8 @@ namespace rpp::details void on_completed() const { locally_disposed = true; + state->clear(); + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -95,7 +97,6 @@ namespace rpp::details } using value_type = rpp::utils::extract_observable_type_t>; - state->clear(); state->is_inside_drain.store(true, std::memory_order::seq_cst); try { diff --git a/src/tests/rpp/test_take_until.cpp b/src/tests/rpp/test_take_until.cpp index f301b61d6..bbeb79473 100644 --- a/src/tests/rpp/test_take_until.cpp +++ b/src/tests/rpp/test_take_until.cpp @@ -253,14 +253,14 @@ TEST_CASE("take_until infinite loop") obs.on_completed(); }) | rpp::ops::map([](int) { - const auto delay = rpp::source::timer(std::chrono::seconds{1}, rpp::schedulers::current_thread{}); + const auto delay = rpp::source::timer(std::chrono::nanoseconds{100}, rpp::schedulers::current_thread{}); return rpp::source::never() | rpp::ops::take_until(delay) | rpp::ops::start_with(1); }) | rpp::ops::concat() | rpp::ops::repeat() - | rpp::ops::take(5) + | rpp::ops::take(2) | rpp::ops::subscribe([](int) {}); ; } From 80d88ca3f4d9ecbf4a84ca8ed6620326cba3b585 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 31 Aug 2024 00:14:09 +0300 Subject: [PATCH 5/6] make tests mor correct --- src/tests/rpp/test_concat.cpp | 15 +++++++++++++++ src/tests/rpp/test_retry.cpp | 16 ++++++++++++++++ src/tests/rpp/test_retry_when.cpp | 18 ++++++++++++++++++ src/tests/rpp/test_take_until.cpp | 25 ------------------------- 4 files changed, 49 insertions(+), 25 deletions(-) diff --git a/src/tests/rpp/test_concat.cpp b/src/tests/rpp/test_concat.cpp index 270054477..655b80e1e 100644 --- a/src/tests/rpp/test_concat.cpp +++ b/src/tests/rpp/test_concat.cpp @@ -248,6 +248,21 @@ TEST_CASE("concat as operator async completiton") subj.get_observer().on_completed(); } +TEST_CASE("concat disposes on looping") +{ + mock_observer mock{}; + REQUIRE_CALL(*mock, on_next_rvalue(1)); + REQUIRE_CALL(*mock, on_completed()); + + rpp::source::concat(rpp::source::create([](auto&& subscriber) { + auto d = rpp::composite_disposable_wrapper::make(); + subscriber.set_upstream(d); + subscriber.on_next(1); + subscriber.on_completed(); + CHECK(d.is_disposed()); + })) | rpp::ops::subscribe(mock); +} + TEST_CASE("concat doesn't produce extra copies") { copy_count_tracker tracker{}; diff --git a/src/tests/rpp/test_retry.cpp b/src/tests/rpp/test_retry.cpp index d31d7b962..19acfa6b5 100644 --- a/src/tests/rpp/test_retry.cpp +++ b/src/tests/rpp/test_retry.cpp @@ -173,6 +173,22 @@ TEST_CASE("retry handles stack overflow") | rpp::operators::subscribe(mock); } +TEST_CASE("retry disposes on looping") +{ + mock_observer mock{}; + REQUIRE_CALL(*mock, on_next_rvalue(1)).TIMES(2); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)); + + rpp::source::concat(rpp::source::create([](auto&& subscriber) { + auto d = rpp::composite_disposable_wrapper::make(); + subscriber.set_upstream(d); + subscriber.on_next(1); + subscriber.on_error({}); + CHECK(d.is_disposed()); + })) | rpp::ops::retry(1) + | rpp::ops::subscribe(mock); +} + TEST_CASE("retry doesn't produce extra copies") { SECTION("retry(2)") diff --git a/src/tests/rpp/test_retry_when.cpp b/src/tests/rpp/test_retry_when.cpp index 0aa705171..f7be88265 100644 --- a/src/tests/rpp/test_retry_when.cpp +++ b/src/tests/rpp/test_retry_when.cpp @@ -248,6 +248,24 @@ TEST_CASE("repeat_when does not stack overflow") | rpp::operators::subscribe(mock); } +TEST_CASE("retry_when disposes on looping") +{ + mock_observer mock{}; + REQUIRE_CALL(*mock, on_next_rvalue(1)).TIMES(2); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)); + + size_t i = 0; + + rpp::source::concat(rpp::source::create([](auto&& subscriber) { + auto d = rpp::composite_disposable_wrapper::make(); + subscriber.set_upstream(d); + subscriber.on_next(1); + subscriber.on_error({}); + CHECK(d.is_disposed()); + })) | rpp::ops::retry_when([&i](const std::exception_ptr& e) { return i++ ? rpp::source::error(e).as_dynamic() : rpp::source::just(1).as_dynamic(); }) + | rpp::ops::subscribe(mock); +} + TEST_CASE("retry_when doesn't produce extra copies") { SECTION("retry_when(empty_notifier)") diff --git a/src/tests/rpp/test_take_until.cpp b/src/tests/rpp/test_take_until.cpp index bbeb79473..7d65a96a8 100644 --- a/src/tests/rpp/test_take_until.cpp +++ b/src/tests/rpp/test_take_until.cpp @@ -13,12 +13,7 @@ #include #include -#include #include -#include -#include -#include -#include #include #include #include @@ -26,7 +21,6 @@ #include #include #include -#include #include #include "copy_count_tracker.hpp" @@ -245,22 +239,3 @@ TEST_CASE("take_until dispose after completion") CHECK(log == std::vector{"completed", "finally"}); } - -TEST_CASE("take_until infinite loop") -{ - rpp::source::create([](auto&& obs) { - obs.on_next(1); - obs.on_completed(); - }) - | rpp::ops::map([](int) { - const auto delay = rpp::source::timer(std::chrono::nanoseconds{100}, rpp::schedulers::current_thread{}); - return rpp::source::never() - | rpp::ops::take_until(delay) - | rpp::ops::start_with(1); - }) - | rpp::ops::concat() - | rpp::ops::repeat() - | rpp::ops::take(2) - | rpp::ops::subscribe([](int) {}); - ; -} From 84207eef4c2ee992574ecb92cfc0530efb14b429 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 31 Aug 2024 21:43:01 +0300 Subject: [PATCH 6/6] cover with tests --- src/tests/rpp/test_repeat.cpp | 2 +- src/tests/rpp/test_retry_when.cpp | 21 +++++++-------------- src/tests/utils/disposable_observable.hpp | 20 ++++++++++++++++++++ 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/tests/rpp/test_repeat.cpp b/src/tests/rpp/test_repeat.cpp index b7ddaf35f..56af00c4f 100644 --- a/src/tests/rpp/test_repeat.cpp +++ b/src/tests/rpp/test_repeat.cpp @@ -134,7 +134,7 @@ TEST_CASE("repeat doesn't produce extra copies") TEST_CASE("repeat satisfies disposable contracts") { - test_operator_with_disposable(rpp::ops::repeat()); + test_operator_with_disposable(rpp::ops::repeat(1)); } diff --git a/src/tests/rpp/test_retry_when.cpp b/src/tests/rpp/test_retry_when.cpp index f7be88265..0e4ac96bc 100644 --- a/src/tests/rpp/test_retry_when.cpp +++ b/src/tests/rpp/test_retry_when.cpp @@ -282,20 +282,13 @@ TEST_CASE("retry_when doesn't produce extra copies") TEST_CASE("retry_when satisfies disposable contracts") { - auto observable_disposable = rpp::composite_disposable_wrapper::make(); - { - auto observable = observable_with_disposable(observable_disposable); - auto op = rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty(); }); - - test_operator_with_disposable(op); - test_operator_finish_before_dispose(op); - test_operator_over_observable_with_disposable( - [](auto observable) { - return rpp::source::concat(observable, rpp::source::error(std::make_exception_ptr(std::runtime_error{"error"}))) - | rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }); - }); - } + test_operator_with_disposable(rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty(); })); + test_operator_finish_before_dispose(rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty(); })); - CHECK((observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2)); + test_operator_over_observable_with_disposable( + [](auto observable) { + return rpp::source::concat(observable, rpp::source::error(std::make_exception_ptr(std::runtime_error{"error"}))) + | rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }); + }); } diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index 826c9f918..49e451f13 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -99,6 +99,26 @@ void test_operator_over_observable_with_disposable(auto&& op) CHECK(observable_disposable.is_disposed()); } + SECTION("operator disposes disposable on_error") + { + op(rpp::source::create([](auto&& obs) { + const auto d = rpp::composite_disposable_wrapper::make(); + obs.set_upstream(d); + obs.on_error({}); + CHECK(d.is_disposed()); + })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); + } + + SECTION("operator disposes disposable on_completed") + { + op(rpp::source::create([](auto&& obs) { + const auto d = rpp::composite_disposable_wrapper::make(); + obs.set_upstream(d); + obs.on_completed(); + CHECK(d.is_disposed()); + })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); + } + SECTION("set_upstream with fixed_disposable_strategy_selector<1>") { CHECK_NOTHROW(op(rpp::observable>>{})