diff --git a/src/examples/rpp/doxygen/retry_when.cpp b/src/examples/rpp/doxygen/retry_when.cpp new file mode 100644 index 000000000..4fcf9a1e3 --- /dev/null +++ b/src/examples/rpp/doxygen/retry_when.cpp @@ -0,0 +1,63 @@ +#include + +#include +#include + +/** + * @example retry_when.cpp + **/ + +int main() +{ + //! [retry_when delay] + size_t retry_count = 0; + rpp::source::create([&retry_count](const auto& sub) { + if (++retry_count != 4) + { + sub.on_error({}); + } + else + { + sub.on_next(std::string{"success"}); + sub.on_completed(); + } + }) + | rpp::operators::retry_when([](const std::exception_ptr&) { + return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{}); + }) + | rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; }); + // Source observable is resubscribed after 5 seconds on each error emission + //! [retry_when delay] + + //! [retry_when] + retry_count = 0; + rpp::source::create([&retry_count](const auto& sub) { + if (++retry_count != 4) + { + sub.on_error({}); + } + else + { + sub.on_next(std::string{"success"}); + sub.on_completed(); + } + }) + | rpp::operators::retry_when([](const std::exception_ptr& ep) { + try + { + std::rethrow_exception(ep); + } + catch (const std::runtime_error&) + { + return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{}); + } + catch (...) + { + throw; + } + }) + | rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; }); + // Source observable is resubscribed after 5 seconds only on particular error emissions + //! [retry_when] + return 0; +} diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 53b225d0a..015d6c889 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -130,3 +130,4 @@ #include #include +#include diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 8651ee668..24d8ffce8 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -186,6 +186,10 @@ namespace rpp::operators requires rpp::constraint::observable> auto on_error_resume_next(Selector&& selector); + template + requires rpp::constraint::observable> + auto retry_when(Notifier&& notifier); + template requires (!rpp::constraint::observable && (!utils::is_not_template_callable || std::invocable, utils::extract_observable_type_t...>)) auto with_latest_from(TSelector&& selector, TObservable&& observable, TObservables&&... observables); diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp new file mode 100644 index 000000000..f875e562b --- /dev/null +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -0,0 +1,202 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include + +#include +#include +#include +#include + +namespace rpp::operators::details +{ + template + struct retry_when_state final : public rpp::composite_disposable + { + retry_when_state(TObserver&& observer, const TObservable& observable, const TNotifier& notifier) + : observer(std::move(observer)) + , observable(observable) + , notifier(notifier) + { + } + + std::atomic_bool is_inside_drain{}; + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + RPP_NO_UNIQUE_ADDRESS TObservable observable; + RPP_NO_UNIQUE_ADDRESS TNotifier notifier; + }; + + template + void drain(const std::shared_ptr>& state); + + template + struct retry_when_impl_inner_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr> state; + mutable bool locally_disposed{}; + + template + void on_next(T&&) const + { + locally_disposed = true; + + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + return; + drain(state); + } + + void on_error(const std::exception_ptr& err) const + { + locally_disposed = true; + state->observer.on_error(err); + } + + void on_completed() const + { + locally_disposed = true; + state->observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) { state->add(d); } + + bool is_disposed() const { return locally_disposed || state->is_disposed(); } + }; + + template + struct retry_when_impl_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr> state; + + template + void on_next(T&& v) const + { + state->observer.on_next(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const + { + std::optional> notifier_obs; + try + { + notifier_obs.emplace(state->notifier(err)); + } + catch (...) + { + state->observer.on_error(std::current_exception()); + } + if (notifier_obs.has_value()) + { + std::move(notifier_obs).value().subscribe(retry_when_impl_inner_strategy{state}); + } + } + + void on_completed() const + { + state->observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) { state->add(d); } + + bool is_disposed() const { return state->is_disposed(); } + }; + + template + void drain(const std::shared_ptr>& state) + { + while (!state->is_disposed()) + { + state->clear(); + state->is_inside_drain.store(true, std::memory_order::seq_cst); + try + { + using value_type = rpp::utils::extract_observer_type_t; + state->observable.subscribe(rpp::observer>{state}); + + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + return; + } + catch (...) + { + state->observer.on_error(std::current_exception()); + return; + } + } + } + + template + struct retry_when_t + { + RPP_NO_UNIQUE_ADDRESS TNotifier notifier; + + template + struct operator_traits + { + using result_type = T; + }; + + template + using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + + template + void subscribe(TObserver&& observer, TObservable&& observable) const + { + const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), std::forward(observable), notifier); + auto ptr = d.lock(); + + ptr->observer.set_upstream(d.as_weak()); + drain(ptr); + } + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + /** + * @brief If an error occurs, invoke @p notifier and when returned observable emits a value + * resubscribe to the source observable. If the notifier throws or returns an error/empty + * observable, then error/completed emission is forwarded to original subscription. + * + * @param notifier callable taking a std::exception_ptr and returning observable notifying when to resubscribe + * + * @warning retry_when along with other re-subscribing operators needs to be carefully used with + * hot observables, as re-subscribing to a hot observable can have unwanted behaviors. For example, + * a hot observable behind a replay subject can indefinitely yield an error on each re-subscription + * and using retry_when on it would lead to an infinite execution. + * + * @warning #include + * + * @par Examples: + * @snippet retry_when.cpp retry_when delay + * @snippet retry_when.cpp retry_when + * + * @ingroup error_handling_operators + * @see https://reactivex.io/documentation/operators/retry.html + */ + template + requires rpp::constraint::observable> + auto retry_when(TNotifier&& notifier) + { + return details::retry_when_t>{std::forward(notifier)}; + } +} // namespace rpp::operators diff --git a/src/tests/rpp/test_retry_when.cpp b/src/tests/rpp/test_retry_when.cpp new file mode 100644 index 000000000..4d5632ee3 --- /dev/null +++ b/src/tests/rpp/test_retry_when.cpp @@ -0,0 +1,245 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" +#include "rpp_trompeloil.hpp" + + +TEST_CASE("retry_when resubscribes on notifier emission") +{ + mock_observer mock{}; + trompeloeil::sequence seq; + + SECTION("observable without error emission") + { + size_t subscribe_count = 0; + auto observable = rpp::source::create([&subscribe_count](const auto& sub) { + ++subscribe_count; + sub.on_next(std::string{"1"}); + sub.on_completed(); + }); + SECTION("observer obtains values from observable") + { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }) + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); + } + } + + SECTION("observable with 1 error") + { + size_t subscribe_count = 0; + auto observable = rpp::source::create([&subscribe_count](const auto& sub) { + if (subscribe_count++ == 0) + { + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + } + else + { + sub.on_next(std::string{"1"}); + sub.on_completed(); + } + }); + + SECTION("original observable is subscribed twice and observer receives one emission") + { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }) + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 2); + } + + SECTION("original observable is subscribed twice and observer receives one emission, notifier emits on new_thread") + { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(rpp::schedulers::new_thread{}, 1); }) + | rpp::operators::as_blocking() + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 2); + } + + SECTION("original observable is subscribed twice and observer receives only one emission") + { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1, 2, 3); }) + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 2); + } + + SECTION("original observable is subscribed only once and observer receives error emission") + { + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { + throw std::runtime_error{"error"}; + return rpp::source::just(1); + }) + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); + } + + SECTION("original observable is subscribed only once and observer receives error emission") + { + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { + return rpp::source::error(std::make_exception_ptr(std::runtime_error{"error"})); + }) + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); + } + + SECTION("original observable is subscribed only once and observer receives completed emission") + { + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { + return rpp::source::empty(); + }) + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); + } + } + + SECTION("observable with 4 errors") + { + size_t subscribe_count = 0; + auto observable = rpp::source::create([&subscribe_count](const auto& sub) { + if (subscribe_count++ < 4) + { + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + } + else + { + sub.on_next(std::string{"1"}); + sub.on_completed(); + } + }); + + SECTION("original observable is subscribed 5 times and observer receives one emission") + { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }) + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 4 + 1); + } + + SECTION("original observable is subscribed twice and observer receives one emission, notifier emits on new_thread") + { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(rpp::schedulers::new_thread{}, 1); }) + | rpp::operators::as_blocking() + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 4 + 1); + } + } +} + +TEST_CASE("repeat_when does not stack overflow") +{ + mock_observer mock{}; + trompeloeil::sequence seq; + + constexpr size_t count = 500000; + + REQUIRE_CALL(*mock, on_next_rvalue(trompeloeil::_)).TIMES(count).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + rpp::source::create([](const auto& obs) { + obs.on_next(1); + obs.on_error({}); + }) + | rpp::operators::retry_when([i = count](const std::exception_ptr& ep) mutable { + if (--i != 0) + return rpp::source::just(rpp::schedulers::immediate{}, 1).as_dynamic(); // Use immediate scheduler for recursion + return rpp::source::error(ep).as_dynamic(); }) + | rpp::operators::subscribe(mock); +} + +TEST_CASE("retry_when doesn't produce extra copies") +{ + SECTION("retry_when(empty_notifier)") + { + copy_count_tracker::test_operator(rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty(); }), + { + .send_by_copy = {.copy_count = 1, // 1 copy to final subscriber + .move_count = 0}, + .send_by_move = {.copy_count = 0, + .move_count = 1} // 1 move to final subscriber + }); + } +} + +TEST_CASE("retry_when satisfies disposable contracts") +{ + auto observable_disposable = rpp::composite_disposable_wrapper::make(); + { + auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty(); }); + + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); + + test_operator_over_observable_with_disposable( + [](auto observable) { + return rpp::source::concat(observable, rpp::source::error(std::make_exception_ptr(std::runtime_error{"error"}))) + | rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }); + }); + } + + CHECK((observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2)); +}