From e0548f1a919ecea8e077a243f1e6ba2ed88ded17 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sat, 7 Jun 2025 23:43:33 +0200 Subject: [PATCH 1/2] repeat_when operator --- .clangd | 2 + .../operators/details/repeating_strategy.hpp | 122 ++++++++++++ src/rpp/rpp/operators/repeat_when.hpp | 100 ++++++++++ src/rpp/rpp/operators/retry_when.hpp | 105 +---------- src/tests/rpp/test_repeat_when.cpp | 174 ++++++++++++++++++ 5 files changed, 406 insertions(+), 97 deletions(-) create mode 100644 .clangd create mode 100644 src/rpp/rpp/operators/details/repeating_strategy.hpp create mode 100644 src/rpp/rpp/operators/repeat_when.hpp create mode 100644 src/tests/rpp/test_repeat_when.cpp diff --git a/.clangd b/.clangd new file mode 100644 index 000000000..0c5abfdb1 --- /dev/null +++ b/.clangd @@ -0,0 +1,2 @@ +CompileFlags: + CompilationDatabase: "build/" diff --git a/src/rpp/rpp/operators/details/repeating_strategy.hpp b/src/rpp/rpp/operators/details/repeating_strategy.hpp new file mode 100644 index 000000000..67a60570f --- /dev/null +++ b/src/rpp/rpp/operators/details/repeating_strategy.hpp @@ -0,0 +1,122 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include "rpp/observers/fwd.hpp" + +namespace rpp::operators::details +{ + template + struct repeating_state final : public rpp::composite_disposable + { + repeating_state(TObserver&& observer, const TObservable& observable, const TNotifier& notifier) + : observer(std::move(observer)) + , observable(observable) + , notifier(notifier) + { + } + + std::atomic_bool is_inside_drain{}; + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + RPP_NO_UNIQUE_ADDRESS TObservable observable; + RPP_NO_UNIQUE_ADDRESS TNotifier notifier; + }; + + template + void drain(const std::shared_ptr>& state); + + template + struct repeating_inner_observer_strategy + { + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; + + std::shared_ptr> state; + mutable bool locally_disposed{}; + + template + void on_next(T&&) const + { + locally_disposed = true; + state->clear(); + + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + return; + + drain(state); + } + + void on_error(const std::exception_ptr& err) const + { + locally_disposed = true; + state->observer.on_error(err); + } + + void on_completed() const + { + locally_disposed = true; + state->observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) const { state->add(d); } + + bool is_disposed() const { return locally_disposed || state->is_disposed(); } + }; + + template + struct repeating_observer_strategy + { + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; + + std::shared_ptr> state; + + void set_upstream(const disposable_wrapper& d) { state->add(d); } + + bool is_disposed() const { return state->is_disposed(); } + }; + + template + void drain(const std::shared_ptr>& state) + { + while (!state->is_disposed()) + { + state->is_inside_drain.store(true, std::memory_order::seq_cst); + try + { + using value_type = rpp::utils::extract_observer_type_t; + state->observable.subscribe(rpp::observer{state}); + + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + return; + } + catch (...) + { + state->observer.on_error(std::current_exception()); + return; + } + } + } +} // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/repeat_when.hpp b/src/rpp/rpp/operators/repeat_when.hpp new file mode 100644 index 000000000..ccb4d1550 --- /dev/null +++ b/src/rpp/rpp/operators/repeat_when.hpp @@ -0,0 +1,100 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include + +#include + +namespace rpp::operators::details +{ + template + struct repeat_when_impl_strategy final : public repeating_observer_strategy + { + using self_type = repeat_when_impl_strategy; + + using repeating_observer_strategy::state; + + template + void on_next(T&& v) const + { + state->observer.on_next(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const + { + state->observer.on_error(err); + } + + void on_completed() const + { + try + { + state->notifier().subscribe(repeating_inner_observer_strategy{this->state}); + } + catch (...) + { + state->observer.on_error(std::current_exception()); + } + } + }; + + template + struct repeat_when_t + { + RPP_NO_UNIQUE_ADDRESS TNotifier notifier; + + template + struct operator_traits + { + using result_type = T; + }; + + template + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; + + template + void subscribe(TObserver&& observer, TObservable&& observable) const + { + const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), std::forward(observable), notifier); + auto ptr = d.lock(); + + ptr->observer.set_upstream(d.as_weak()); + + drain, std::decay_t, std::decay_t>>(ptr); + } + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + /** + * @brief If observable completes, invoke @p notifier and when returned observable emits a value + * resubscribe to the source observable. If the notifier throws or returns an error/empty + * observable, then error/completed emission is forwarded to original subscription. + * + * @param notifier callable taking no arguments and returning observable notifying when to resubscribe + * + * @note `#include ` + * + * @ingroup creational_operators + * @see https://reactivex.io/documentation/operators/repeat.html + */ + template + requires rpp::constraint::observable> + auto repeat_when(TNotifier&& notifier) + { + return details::repeat_when_t>{std::forward(notifier)}; + } +} // namespace rpp::operators diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index b1bea7e8e..0c7e3906f 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -13,82 +13,18 @@ #include #include -#include -#include -#include -#include +#include namespace rpp::operators::details { template - struct retry_when_state final : public rpp::composite_disposable + struct retry_when_impl_strategy final : public repeating_observer_strategy { - retry_when_state(TObserver&& observer, const TObservable& observable, const TNotifier& notifier) - : observer(std::move(observer)) - , observable(observable) - , notifier(notifier) - { - } + using self_type = retry_when_impl_strategy; - std::atomic_bool is_inside_drain{}; - - RPP_NO_UNIQUE_ADDRESS TObserver observer; - RPP_NO_UNIQUE_ADDRESS TObservable observable; - RPP_NO_UNIQUE_ADDRESS TNotifier notifier; - }; - - template - void drain(const std::shared_ptr>& state); - - template - struct retry_when_impl_inner_strategy - { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; - - std::shared_ptr> state; - mutable bool locally_disposed{}; - - template - void on_next(T&&) const - { - locally_disposed = true; - state->clear(); - - if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) - return; - - drain(state); - } - - void on_error(const std::exception_ptr& err) const - { - locally_disposed = true; - state->observer.on_error(err); - } - - void on_completed() const - { - locally_disposed = true; - state->observer.on_completed(); - } - - void set_upstream(const disposable_wrapper& d) const { state->add(d); } - - bool is_disposed() const { return locally_disposed || state->is_disposed(); } - }; - - template - struct retry_when_impl_strategy - { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; - - std::shared_ptr> state; + using repeating_observer_strategy::state; template void on_next(T&& v) const @@ -100,7 +36,7 @@ namespace rpp::operators::details { try { - state->notifier(err).subscribe(retry_when_impl_inner_strategy{state}); + state->notifier(err).subscribe(repeating_inner_observer_strategy{this->state}); } catch (...) { @@ -112,34 +48,8 @@ namespace rpp::operators::details { state->observer.on_completed(); } - - void set_upstream(const disposable_wrapper& d) { state->add(d); } - - bool is_disposed() const { return state->is_disposed(); } }; - template - void drain(const std::shared_ptr>& state) - { - while (!state->is_disposed()) - { - state->is_inside_drain.store(true, std::memory_order::seq_cst); - try - { - using value_type = rpp::utils::extract_observer_type_t; - state->observable.subscribe(rpp::observer>{state}); - - if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) - return; - } - catch (...) - { - state->observer.on_error(std::current_exception()); - return; - } - } - } - template struct retry_when_t { @@ -157,11 +67,12 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observable) const { - const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), std::forward(observable), notifier); + const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), std::forward(observable), notifier); auto ptr = d.lock(); ptr->observer.set_upstream(d.as_weak()); - drain(ptr); + + drain, std::decay_t, std::decay_t>>(ptr); } }; } // namespace rpp::operators::details diff --git a/src/tests/rpp/test_repeat_when.cpp b/src/tests/rpp/test_repeat_when.cpp new file mode 100644 index 000000000..303a4568d --- /dev/null +++ b/src/tests/rpp/test_repeat_when.cpp @@ -0,0 +1,174 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" +#include "rpp_trompeloil.hpp" + +#include + +TEST_CASE("repeat_when resubscribes on notifier emission") +{ + mock_observer mock{}; + trompeloeil::sequence seq; + size_t subscribe_count = 0; + auto observable = rpp::source::create([&subscribe_count](const auto& sub) { + sub.on_next(std::to_string(++subscribe_count)); + sub.on_completed(); + }); + + SUBCASE("empty notifier") + { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::repeat_when([]() { return rpp::source::empty(); }) + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); + } + + SUBCASE("notifier resubscribes once") + { + size_t i = 0; + + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_rvalue("2")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::repeat_when([&i]() { + if (i++ != 0) + return rpp::source::empty().as_dynamic(); + return rpp::source::just(1).as_dynamic(); + }) + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 2); + } + + SUBCASE("notifier resubscribes multiple times") + { + size_t i = 0; + + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_rvalue("2")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_rvalue("3")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_rvalue("4")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::repeat_when([&i]() { + if (i++ >= 3) + return rpp::source::empty().as_dynamic(); + return rpp::source::just(1).as_dynamic(); + }) + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 4); + } + + SUBCASE("notifier throws") + { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable + | rpp::operators::repeat_when([]() { + throw std::runtime_error{""}; + return rpp::source::just(1); + }) + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); + } +} + +TEST_CASE("repeat_when does not stack overflow") +{ + mock_observer mock{}; + trompeloeil::sequence seq; + + constexpr size_t count = 500000; + + REQUIRE_CALL(*mock, on_next_rvalue(trompeloeil::_)).TIMES(count).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + rpp::source::create([](const auto& obs) { + obs.on_next(1); + obs.on_completed(); + }) + | rpp::operators::repeat_when([i = count]() mutable { + if (--i != 0) + return rpp::source::just(rpp::schedulers::immediate{}, 1).as_dynamic(); // Use immediate scheduler for recursion + return rpp::source::empty().as_dynamic(); }) + | rpp::operators::subscribe(mock); +} + +TEST_CASE("repeat_when disposes on looping") +{ + mock_observer mock{}; + REQUIRE_CALL(*mock, on_next_rvalue(1)).TIMES(2); + REQUIRE_CALL(*mock, on_completed()); + + size_t i = 0; + + rpp::source::concat(rpp::source::create([](auto&& subscriber) { + auto d = rpp::composite_disposable_wrapper::make(); + subscriber.set_upstream(d); + subscriber.on_next(1); + subscriber.on_completed(); + CHECK(d.is_disposed()); + })) | rpp::ops::repeat_when([&i]() { return i++ ? rpp::source::empty().as_dynamic() : rpp::source::just(1).as_dynamic(); }) + | rpp::ops::subscribe(mock); +} + +TEST_CASE("repeat_when doesn't produce extra copies") +{ + SUBCASE("repeat_when(empty_notifier)") + { + copy_count_tracker::test_operator(rpp::ops::repeat_when([]() { return rpp::source::empty(); }), + { + .send_by_copy = {.copy_count = 1, // 1 copy to final subscriber + .move_count = 0}, + .send_by_move = {.copy_count = 0, + .move_count = 1} // 1 move to final subscriber + }); + } +} + +TEST_CASE("repeat_when satisfies disposable contracts") +{ + + test_operator_with_disposable(rpp::ops::repeat_when([]() { return rpp::source::empty(); })); + test_operator_finish_before_dispose(rpp::ops::repeat_when([]() { return rpp::source::empty(); })); + + test_operator_over_observable_with_disposable( + [](auto observable) { + auto c = std::make_shared(); + return rpp::source::concat(observable, rpp::source::error(std::make_exception_ptr(std::runtime_error{"error"}))) + | rpp::ops::repeat_when([c]() -> rpp::dynamic_observable { if ((*c)++ ==0) return rpp::source::just(1); return rpp::source::empty(); }); + }); +} From 405906065a6504a030fdd20cc87b921981d7d626 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sat, 7 Jun 2025 23:54:31 +0200 Subject: [PATCH 2/2] Add to fwd headers --- src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/fwd.hpp | 4 ++++ src/rpp/rpp/operators/retry_when.hpp | 2 +- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 89744d67e..5caa5ee30 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -174,6 +174,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 5f3b591f6..f8fa23c13 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -184,6 +184,10 @@ namespace rpp::operators requires rpp::constraint::observable> auto on_error_resume_next(Selector&& selector); + template + requires rpp::constraint::observable> + auto repeat_when(Notifier&& notifier); + template requires rpp::constraint::observable> auto retry_when(Notifier&& notifier); diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index 0c7e3906f..601b6ee4d 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -97,7 +97,7 @@ namespace rpp::operators * @snippet retry_when.cpp retry_when delay * @snippet retry_when.cpp retry_when * - * @ingroup error_handling_operators + * @ingroup utility_operators * @see https://reactivex.io/documentation/operators/retry.html */ template