From 7e5625cff35b0f2465cc81cd4bf18b5b9288d20e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 20 Aug 2024 00:23:19 +0300 Subject: [PATCH 01/10] implement simplest retry --- .../rpp/operators/on_error_resume_next.hpp | 2 +- src/rpp/rpp/operators/retry.hpp | 148 ++++++++++++++++++ 2 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 src/rpp/rpp/operators/retry.hpp diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 6d064cadc..78a4d6d3f 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -60,7 +60,7 @@ namespace rpp::operators::details disposable.dispose(); } - void set_upstream(const disposable_wrapper& d) + void set_upstream(const disposable_wrapper& d) const { disposable.add(d); } diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp new file mode 100644 index 000000000..463a0c972 --- /dev/null +++ b/src/rpp/rpp/operators/retry.hpp @@ -0,0 +1,148 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include + +namespace rpp::operators::details +{ + template + struct retry_state_t : public rpp::composite_disposable + { + retry_state_t(TObserver&& in_observer, const Observable& observable, size_t count) + : observer(std::move(in_observer)) + , observable(observable) + , count{count} + { + } + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + RPP_NO_UNIQUE_ADDRESS Observable observable; + size_t count; + std::atomic is_inside_drain{}; + }; + + template + struct retry_observer_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr> state; + mutable bool locally_disposed{}; + + template + void on_next(T&& v) const + { + state->observer.on_next(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const + { + locally_disposed = true; + if (state->count == 0) + { + state->observer.on_error(err); + state->dispose(); + return; + } + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + return; + + drain(state); + } + + void on_completed() const + { + locally_disposed = true; + state->observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) const + { + state->add(d); + } + + bool is_disposed() const { return locally_disposed || state->is_disposed(); } + }; + + template + void drain(const std::shared_ptr, std::decay_t>>& state) + { + while (!state->is_disposed()) + { + --state->count; + state->clear(); + state->is_inside_drain.store(true, std::memory_order::seq_cst); + try + { + using value_type = rpp::utils::extract_observable_type_t; + state->observable.subscribe(observer>{state}); + + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + return; + } + catch (...) + { + state->observer.on_error(std::current_exception()); + return; + } + } + } + + struct retry_t + { + const size_t count{}; + + template + struct operator_traits + { + using result_type = T; + }; + + template + using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + + template + void subscribe(TObserver&& observer, TObservable&& observble) const + { + const auto state = std::make_shared, std::decay_t>>(std::forward(observer), std::forward(observble), count + 1); + state->observer.set_upstream(state->disposable.as_weak()); + drain(state); + } + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + /** + * @brief If an error occurs, resubscribe to the same observable. Repeat it up to the specified count. + * + * @marble retry + { + source observable : +-1-x + operator "retry:(2)" : +-1-1-1-x + } + * + * @param count is the number of retries + * + * @warning #include + * + * @ingroup error_handling_operators + * @see https://reactivex.io/documentation/operators/retry.html + */ + inline auto retry(size_t count) + { + return details::retry_t{count}; + } +} // namespace rpp::operators From 8a01f797945af0952e26a28c2677e243687b007a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 20 Aug 2024 00:26:06 +0300 Subject: [PATCH 02/10] fix --- src/rpp/rpp/operators/retry.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index 463a0c972..26f49a999 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -33,6 +33,9 @@ namespace rpp::operators::details std::atomic is_inside_drain{}; }; + template + void drain(const std::shared_ptr, std::decay_t>>& state); + template struct retry_observer_strategy { @@ -56,6 +59,7 @@ namespace rpp::operators::details state->dispose(); return; } + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; From 1377b6c1d36a35ac8593cc8cc2e9d56f899fda36 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 20 Aug 2024 23:39:30 +0300 Subject: [PATCH 03/10] extend --- src/rpp/rpp/operators/retry.hpp | 24 +++++----- src/tests/rpp/test_retry.cpp | 77 +++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 11 deletions(-) create mode 100644 src/tests/rpp/test_retry.cpp diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index 26f49a999..1b0f22a0b 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -18,7 +18,7 @@ namespace rpp::operators::details { template - struct retry_state_t : public rpp::composite_disposable + struct retry_state_t final : public rpp::composite_disposable { retry_state_t(TObserver&& in_observer, const Observable& observable, size_t count) : observer(std::move(in_observer)) @@ -33,10 +33,10 @@ namespace rpp::operators::details std::atomic is_inside_drain{}; }; - template - void drain(const std::shared_ptr, std::decay_t>>& state); + template + void drain(const std::shared_ptr>& state); - template + template struct retry_observer_strategy { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; @@ -80,8 +80,8 @@ namespace rpp::operators::details bool is_disposed() const { return locally_disposed || state->is_disposed(); } }; - template - void drain(const std::shared_ptr, std::decay_t>>& state) + template + void drain(const std::shared_ptr>& state) { while (!state->is_disposed()) { @@ -90,7 +90,7 @@ namespace rpp::operators::details state->is_inside_drain.store(true, std::memory_order::seq_cst); try { - using value_type = rpp::utils::extract_observable_type_t; + using value_type = rpp::utils::extract_observer_type_t; state->observable.subscribe(observer>{state}); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) @@ -117,12 +117,14 @@ namespace rpp::operators::details template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - template + template void subscribe(TObserver&& observer, TObservable&& observble) const { - const auto state = std::make_shared, std::decay_t>>(std::forward(observer), std::forward(observble), count + 1); - state->observer.set_upstream(state->disposable.as_weak()); - drain(state); + const auto d = disposable_wrapper_impl, std::decay_t>>::make(std::forward(observer), std::forward(observble), count + 1); + auto ptr = d.lock(); + + ptr->observer.set_upstream(d.as_weak()); + drain(ptr); } }; } // namespace rpp::operators::details diff --git a/src/tests/rpp/test_retry.cpp b/src/tests/rpp/test_retry.cpp new file mode 100644 index 000000000..f680b6d30 --- /dev/null +++ b/src/tests/rpp/test_retry.cpp @@ -0,0 +1,77 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include +#include + +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" +#include "rpp_trompeloil.hpp" + +TEST_CASE("retry handles errors properly") +{ + mock_observer mock{}; + trompeloeil::sequence seq; + + SECTION("observable 1-x-2") + { + const auto observable = rpp::source::concat(rpp::source::just(1), rpp::source::error({}), rpp::source::just(2)); + + SECTION("retry(0)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(0) | rpp::operators::subscribe(mock); + } + + SECTION("retry(1)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(1) | rpp::operators::subscribe(mock); + } + + SECTION("retry(2)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(2) | rpp::operators::subscribe(mock); + } + } +} + +TEST_CASE("retry handles stack overflow") +{ + mock_observer mock{}; + trompeloeil::sequence seq; + + constexpr size_t count = 500000; + + REQUIRE_CALL(*mock, on_next_rvalue(trompeloeil::_)).TIMES(count + 1).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + rpp::source::create([](const auto& obs) { + obs.on_next(1); + obs.on_error({}); + }) + | rpp::operators::retry(count) + | rpp::operators::subscribe(mock); +} From 29af3bafce390bd425a3a726a3ee5c6db0fc84a8 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 20 Aug 2024 23:43:53 +0300 Subject: [PATCH 04/10] add stack overflow test for repeat --- src/tests/rpp/test_repeat.cpp | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/tests/rpp/test_repeat.cpp b/src/tests/rpp/test_repeat.cpp index 9d1635d13..b7ddaf35f 100644 --- a/src/tests/rpp/test_repeat.cpp +++ b/src/tests/rpp/test_repeat.cpp @@ -18,6 +18,7 @@ #include "copy_count_tracker.hpp" #include "disposable_observable.hpp" +#include "rpp_trompeloil.hpp" TEST_CASE("repeat resubscribes") @@ -135,3 +136,22 @@ TEST_CASE("repeat satisfies disposable contracts") { test_operator_with_disposable(rpp::ops::repeat()); } + + +TEST_CASE("repeat handles stack overflow") +{ + mock_observer mock{}; + trompeloeil::sequence seq; + + constexpr size_t count = 500000; + + REQUIRE_CALL(*mock, on_next_rvalue(trompeloeil::_)).TIMES(count).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + rpp::source::create([](const auto& obs) { + obs.on_next(1); + obs.on_completed(); + }) + | rpp::operators::repeat(count) + | rpp::operators::subscribe(mock); +} From 556ef37810e3cb772134af61920aa74007991130 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 20 Aug 2024 23:49:34 +0300 Subject: [PATCH 05/10] extend tests --- src/tests/rpp/test_retry.cpp | 62 ++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/src/tests/rpp/test_retry.cpp b/src/tests/rpp/test_retry.cpp index f680b6d30..6af08efda 100644 --- a/src/tests/rpp/test_retry.cpp +++ b/src/tests/rpp/test_retry.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include "copy_count_tracker.hpp" #include "disposable_observable.hpp" @@ -53,6 +54,44 @@ TEST_CASE("retry handles errors properly") REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + observable | rpp::operators::retry(2) | rpp::operators::subscribe(mock); + } + } + SECTION("observable 1-|") + { + const auto observable = rpp::source::just(1); + + SECTION("retry(0)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(0) | rpp::operators::subscribe(mock); + } + + SECTION("retry(2)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(2) | rpp::operators::subscribe(mock); + } + } + SECTION("observable 1->") + { + const auto observable = rpp::source::concat(rpp::source::just(1), rpp::source::never()); + + SECTION("retry(0)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(0) | rpp::operators::subscribe(mock); + } + + SECTION("retry(2)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + observable | rpp::operators::retry(2) | rpp::operators::subscribe(mock); } } @@ -75,3 +114,26 @@ TEST_CASE("retry handles stack overflow") | rpp::operators::retry(count) | rpp::operators::subscribe(mock); } + +TEST_CASE("retry doesn't produce extra copies") +{ + SECTION("retry(2)") + { + copy_count_tracker::test_operator(rpp::ops::retry(2), + { + .send_by_copy = {.copy_count = 1, // 1 copy to final subscriber + .move_count = 0}, + .send_by_move = {.copy_count = 0, + .move_count = 1} // 1 move to final subscriber + }); + } +} + +TEST_CASE("retry satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::retry(1)); + + test_operator_over_observable_with_disposable([](const auto& observable) { + return rpp::source::concat(observable, rpp::source::error({})) | rpp::ops::retry(10); + }); +} From 2036193a8516ae447d2d93c6259d991e73fbdb6e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 20 Aug 2024 23:57:50 +0300 Subject: [PATCH 06/10] extend with infinite retry --- src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/fwd.hpp | 4 ++++ src/rpp/rpp/operators/retry.hpp | 40 +++++++++++++++++++++++++-------- src/tests/rpp/test_retry.cpp | 39 ++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 9 deletions(-) diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index bc7c75bee..53b225d0a 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -129,3 +129,4 @@ */ #include +#include diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index a8d73803e..8651ee668 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -106,6 +106,10 @@ namespace rpp::operators auto repeat(); + auto retry(size_t count); + + auto retry(); + template requires (!utils::is_not_template_callable || std::same_as, std::invoke_result_t &&, rpp::utils::convertible_to_any>>) auto scan(InitialValue&& initial_value, Fn&& accumulator); diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index 1b0f22a0b..ef437086c 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -20,17 +20,19 @@ namespace rpp::operators::details template struct retry_state_t final : public rpp::composite_disposable { - retry_state_t(TObserver&& in_observer, const Observable& observable, size_t count) - : observer(std::move(in_observer)) + retry_state_t(TObserver&& in_observer, const Observable& observable, std::optional count) + : count{count} + , observer(std::move(in_observer)) , observable(observable) - , count{count} + { } + std::optional count; + std::atomic is_inside_drain{}; + RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS Observable observable; - size_t count; - std::atomic is_inside_drain{}; }; template @@ -85,7 +87,8 @@ namespace rpp::operators::details { while (!state->is_disposed()) { - --state->count; + if (state->count) + --state->count.value(); state->clear(); state->is_inside_drain.store(true, std::memory_order::seq_cst); try @@ -106,7 +109,7 @@ namespace rpp::operators::details struct retry_t { - const size_t count{}; + const std::optional count{}; template struct operator_traits @@ -120,7 +123,7 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observble) const { - const auto d = disposable_wrapper_impl, std::decay_t>>::make(std::forward(observer), std::forward(observble), count + 1); + const auto d = disposable_wrapper_impl, std::decay_t>>::make(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); auto ptr = d.lock(); ptr->observer.set_upstream(d.as_weak()); @@ -132,7 +135,7 @@ namespace rpp::operators::details namespace rpp::operators { /** - * @brief If an error occurs, resubscribe to the same observable. Repeat it up to the specified count. + * @brief Resubscribe to the same observable specified number of counts in case of obtaining errors from underlying observable * * @marble retry { @@ -151,4 +154,23 @@ namespace rpp::operators { return details::retry_t{count}; } + + /** + * @brief Resubscribe to the same observable (without any limits) in case of obtaining errors from underlying observable + * + * @marble infinite_retry + { + source observable : +-1-x + operator "retry:()" : +-1-1-1-1-1-1-1-1-1-1-1-> + } + * + * @warning #include + * + * @ingroup error_handling_operators + * @see https://reactivex.io/documentation/operators/retry.html + */ + inline auto retry() + { + return details::retry_t{}; + } } // namespace rpp::operators diff --git a/src/tests/rpp/test_retry.cpp b/src/tests/rpp/test_retry.cpp index 6af08efda..450f5a549 100644 --- a/src/tests/rpp/test_retry.cpp +++ b/src/tests/rpp/test_retry.cpp @@ -56,6 +56,20 @@ TEST_CASE("retry handles errors properly") observable | rpp::operators::retry(2) | rpp::operators::subscribe(mock); } + + SECTION("retry()") + { + auto d = rpp::composite_disposable_wrapper::make(); + + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).LR_SIDE_EFFECT(d.dispose()).IN_SEQUENCE(seq); + + observable | rpp::operators::retry() | rpp::operators::subscribe(d, mock); + } } SECTION("observable 1-|") { @@ -76,6 +90,14 @@ TEST_CASE("retry handles errors properly") observable | rpp::operators::retry(2) | rpp::operators::subscribe(mock); } + + SECTION("retry()") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable | rpp::operators::retry() | rpp::operators::subscribe(mock); + } } SECTION("observable 1->") { @@ -94,6 +116,13 @@ TEST_CASE("retry handles errors properly") observable | rpp::operators::retry(2) | rpp::operators::subscribe(mock); } + + SECTION("retry()") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry() | rpp::operators::subscribe(mock); + } } } @@ -127,6 +156,16 @@ TEST_CASE("retry doesn't produce extra copies") .move_count = 1} // 1 move to final subscriber }); } + SECTION("retry()") + { + copy_count_tracker::test_operator(rpp::ops::retry(), + { + .send_by_copy = {.copy_count = 1, // 1 copy to final subscriber + .move_count = 0}, + .send_by_move = {.copy_count = 0, + .move_count = 1} // 1 move to final subscriber + }); + } } TEST_CASE("retry satisfies disposable contracts") From 5d01be4b47f597108cb70422ad7ad82778b4322f Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 21 Aug 2024 18:36:18 +0300 Subject: [PATCH 07/10] add doxygen --- src/examples/rpp/doxygen/retry.cpp | 30 ++++++++++++++++++++++++++++++ src/rpp/rpp/operators/retry.hpp | 6 ++++++ 2 files changed, 36 insertions(+) create mode 100644 src/examples/rpp/doxygen/retry.cpp diff --git a/src/examples/rpp/doxygen/retry.cpp b/src/examples/rpp/doxygen/retry.cpp new file mode 100644 index 000000000..4daecab4f --- /dev/null +++ b/src/examples/rpp/doxygen/retry.cpp @@ -0,0 +1,30 @@ +#include + +#include +#include + +/** + * @example retry.cpp + **/ +int main() +{ + //! [retry] + rpp::source::concat(rpp::source::just(1, 2, 3), rpp::source::error({})) + | rpp::operators::retry(2) + | rpp::operators::subscribe([](int v) { std::cout << v << " "; }, + [](const std::exception_ptr&) { std::cout << "error" << std::endl; }, + []() { std::cout << "completed" << std::endl; }); + // Output: 1 2 3 1 2 3 1 2 3 error + //! [retry] + + //! [retry_infinitely] + rpp::source::concat(rpp::source::just(1, 2, 3), rpp::source::error({})) + | rpp::operators::retry() + | rpp::operators::take(10) + | rpp::operators::subscribe([](int v) { std::cout << v << " "; }, + [](const std::exception_ptr&) { std::cout << "error" << std::endl; }, + []() { std::cout << "completed" << std::endl; }); + // Output: 1 2 3 1 2 3 1 2 3 1 completed + //! [retry_infinitely] + return 0; +} diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index ef437086c..5ab79fac0 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -147,6 +147,9 @@ namespace rpp::operators * * @warning #include * + * @par Examples: + * @snippet retry.cpp retry + * * @ingroup error_handling_operators * @see https://reactivex.io/documentation/operators/retry.html */ @@ -166,6 +169,9 @@ namespace rpp::operators * * @warning #include * + * @par Examples: + * @snippet retry.cpp retry_infinitely + * * @ingroup error_handling_operators * @see https://reactivex.io/documentation/operators/retry.html */ From 1b3379a7670a0783d0e9cb4891032a038eddfdd9 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 21 Aug 2024 18:45:46 +0300 Subject: [PATCH 08/10] benchmarks + doc --- src/benchmarks/benchmarks.cpp | 18 ++++++++++++++++++ src/rpp/rpp/operators/retry.hpp | 4 ++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index f404f06b1..0cca9060b 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -655,6 +655,24 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) | rxcpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); }); } + SECTION("create(on_next(1), on_error())+retry(1)+subscribe") + { + TEST_RPP([&]() { + rpp::source::create([&](auto&& observer) { + observer.on_error({}); + }) + | rpp::operators::retry(1) + | rpp::operators::subscribe([](int) { }, [](const std::exception_ptr& e){ ankerl::nanobench::doNotOptimizeAway(e);}); + }); + + TEST_RXCPP([&]() { + rxcpp::observable<>::create([&](auto&& observer) { + observer.on_error({}); + }) + | rxcpp::operators::retry(1) + | rxcpp::operators::subscribe([](int) { }, [](const std::exception_ptr& e){ ankerl::nanobench::doNotOptimizeAway(e);}); + }); + } } // BENCHMARK("Error Handling Operators") BENCHMARK("Subjects") diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index 5ab79fac0..4362f2ea4 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -135,7 +135,7 @@ namespace rpp::operators::details namespace rpp::operators { /** - * @brief Resubscribe to the same observable specified number of counts in case of obtaining errors from underlying observable + * @brief The retry operator attempts to resubscribe to the observable when an error occurs, up to the specified number of retries. * * @marble retry { @@ -159,7 +159,7 @@ namespace rpp::operators } /** - * @brief Resubscribe to the same observable (without any limits) in case of obtaining errors from underlying observable + * @brief The infinite retry operator continuously attempts to resubscribe to the observable upon error, without a retry limit. * * @marble infinite_retry { From c329b09c552a4e87b28e9e06a68e7509a6c86121 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 21 Aug 2024 15:47:11 +0000 Subject: [PATCH 09/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/benchmarks/benchmarks.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index 0cca9060b..02e7e8b16 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -662,7 +662,7 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) observer.on_error({}); }) | rpp::operators::retry(1) - | rpp::operators::subscribe([](int) { }, [](const std::exception_ptr& e){ ankerl::nanobench::doNotOptimizeAway(e);}); + | rpp::operators::subscribe([](int) {}, [](const std::exception_ptr& e) { ankerl::nanobench::doNotOptimizeAway(e); }); }); TEST_RXCPP([&]() { @@ -670,7 +670,7 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) observer.on_error({}); }) | rxcpp::operators::retry(1) - | rxcpp::operators::subscribe([](int) { }, [](const std::exception_ptr& e){ ankerl::nanobench::doNotOptimizeAway(e);}); + | rxcpp::operators::subscribe([](int) {}, [](const std::exception_ptr& e) { ankerl::nanobench::doNotOptimizeAway(e); }); }); } } // BENCHMARK("Error Handling Operators") From 676b8cbaf2d0a71aa9f8859158a2f5b430d538b6 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 21 Aug 2024 18:49:24 +0300 Subject: [PATCH 10/10] rename --- src/benchmarks/benchmarks.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index 02e7e8b16..f882ebc2f 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -655,7 +655,7 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) | rxcpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); }); } - SECTION("create(on_next(1), on_error())+retry(1)+subscribe") + SECTION("create(on_error())+retry(1)+subscribe") { TEST_RPP([&]() { rpp::source::create([&](auto&& observer) {