From 2e1ce1404e188a52b008cb9e191584d403e3dc6c Mon Sep 17 00:00:00 2001 From: TC Wang Date: Thu, 22 Sep 2022 09:15:42 -0700 Subject: [PATCH 1/9] Add "on_error_resume_next" operator. --- docs/Implementation Status.md | 2 +- src/benchmarks/rpp_benchmark.cpp | 24 +++- src/benchmarks/rxcpp_benchmark.cpp | 21 ++++ src/examples/doxygen/on_error_resume_next.cpp | 32 +++++ .../rpp/observables/interface_observable.hpp | 1 + src/rpp/rpp/operators.hpp | 12 +- src/rpp/rpp/operators/fwd.hpp | 1 + .../operators/fwd/on_error_resume_next.hpp | 84 +++++++++++++ .../rpp/operators/on_error_resume_next.hpp | 85 +++++++++++++ src/tests/test_on_error_resume_next.cpp | 118 ++++++++++++++++++ 10 files changed, 376 insertions(+), 4 deletions(-) create mode 100644 src/examples/doxygen/on_error_resume_next.cpp create mode 100644 src/rpp/rpp/operators/fwd/on_error_resume_next.hpp create mode 100644 src/rpp/rpp/operators/on_error_resume_next.hpp create mode 100644 src/tests/test_on_error_resume_next.cpp diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index a938dbeca..4f8ceae55 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -119,7 +119,7 @@ - [ ] backpressure ??? ### Error handling -- [ ] catch +- [x] catch - [ ] retry ### Utility diff --git a/src/benchmarks/rpp_benchmark.cpp b/src/benchmarks/rpp_benchmark.cpp index 40e604426..8e1fb0695 100644 --- a/src/benchmarks/rpp_benchmark.cpp +++ b/src/benchmarks/rpp_benchmark.cpp @@ -1077,4 +1077,26 @@ TEST_CASE("single-threaded locks") } return target; }; -} \ No newline at end of file +} + +TEST_CASE("on_error_resume_next") +{ + BENCHMARK_ADVANCED("on_error_resume_next construction from observable via dot + subscribe")(Catch::Benchmark::Chronometer meter) + { + const auto obs = rpp::observable::create([](const auto& subscriber) + { + subscriber.on_error(std::make_exception_ptr(std::runtime_error{""})); + }); + auto subscriber = rpp::specific_subscriber{[](const int&) {}}; + + meter.measure([&] + { + return obs + .on_error_resume_next([](auto&&) + { + return rpp::observable::just(1); + }) + .subscribe(subscriber); + }); + }; +} diff --git a/src/benchmarks/rxcpp_benchmark.cpp b/src/benchmarks/rxcpp_benchmark.cpp index 361fe72b8..6438bcfd6 100644 --- a/src/benchmarks/rxcpp_benchmark.cpp +++ b/src/benchmarks/rxcpp_benchmark.cpp @@ -1045,3 +1045,24 @@ TEST_CASE("trampoline scheduler") }; } +TEST_CASE("on_error_resume_next") +{ + BENCHMARK_ADVANCED("on_error_resume_next construction from observable via dot + subscribe")(Catch::Benchmark::Chronometer meter) + { + const auto obs = rxcpp::sources::create([](const auto& subscriber) + { + subscriber.on_error(std::make_exception_ptr(std::runtime_error{""})); + }); + auto subscriber = rxcpp::make_subscriber(); + + meter.measure([&](int) + { + return obs + .on_error_resume_next([](auto&&) + { + return rxcpp::observable<>::just(1); + }) + .subscribe(subscriber); + }); + }; +} diff --git a/src/examples/doxygen/on_error_resume_next.cpp b/src/examples/doxygen/on_error_resume_next.cpp new file mode 100644 index 000000000..7668c8219 --- /dev/null +++ b/src/examples/doxygen/on_error_resume_next.cpp @@ -0,0 +1,32 @@ +#include + +#include + +/** + * \example on_error_resume_next.cpp + **/ +int main() +{ + //! [on_error_resume_next] + rpp::source::error(std::make_exception_ptr(std::runtime_error{""})) + .on_error_resume_next([](auto&&) + { + return rpp::observable::just(1, 2, 3); + }) + .subscribe([&](int v) + { + std::cout << "-" << v; + }, + [&](auto&&) + { + std::cout << "-x"; + }, + [&]() + { + std::cout << "-|" << std::endl; + }); + // source: -x + // output: -1-2-3-| + //! [on_error_resume_next] + return 0; +} diff --git a/src/rpp/rpp/observables/interface_observable.hpp b/src/rpp/rpp/observables/interface_observable.hpp index e7bcfbfef..0208be9e3 100644 --- a/src/rpp/rpp/observables/interface_observable.hpp +++ b/src/rpp/rpp/observables/interface_observable.hpp @@ -53,6 +53,7 @@ struct RPP_EMPTY_BASES interface_observable , details::member_overload , details::member_overload , details::member_overload + , details::member_overload , details::member_overload , details::member_overload , details::member_overload diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 1ddc4f268..dbd94e1c4 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -79,7 +79,6 @@ #include - /** * \defgroup utility_operators Utility Operators * \brief Utility operators are operators that provide some extra functionality without changing of original values, but changing of behaviour @@ -102,4 +101,13 @@ #include #include -#include \ No newline at end of file +#include + +/** + * \defgroup error_handling_operators Error handling Operators + * \brief Error handling operators Operators that help to recover from error notifications from an Observable. + * \see https://reactivex.io/documentation/operators.html#error + * \ingroup operators + */ + +#include diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 4dd8e1c3f..22f9d49ad 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp new file mode 100644 index 000000000..75c60e6de --- /dev/null +++ b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp @@ -0,0 +1,84 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// TC Wang 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace rpp::details +{ +struct on_error_resume_next_tag; +} + +namespace rpp::details +{ + +template +concept resume_callable = std::invocable && constraint::observable>; + +template +struct on_error_resume_next_impl; + +template +struct member_overload +{ + + /** + * \brief Recover from an onError notification by continuing the sequence without error. + * \details The operator intercepts an on_error notification from the source Observable and, instead of passing it through to any observers, replaces it with some other item or sequence of items. + * \warning This operator potentially allows the resulting Observable to terminate normally or not to terminate at all. + * + * \marble on_error_resume_next + { + source observable : +-1-x + operator "on_error_resume_next: -9-9-|" : +-1-9-9-| + } + * + * \param resume_callable A callable that is given an error pointer and shall return an Observable. + * \return new specific_observable with the on_error_resume_next operator as most recent operator. + * \warning #include + * + * \par Examples + * \snippet on_error_resume_next.cpp on_error_resume_next + * + * \ingroup error_handling_operators + * \see https://reactivex.io/documentation/operators/on_error_resume_next.html + */ + template + auto on_error_resume_next(ResumeCallable&& resume_callable) const& requires is_header_included + { + return cast_this()->template lift(on_error_resume_next_impl{std::forward(resume_callable)}); + } + + template + auto on_error_resume_next(ResumeCallable&& resume_callable) && requires is_header_included + { + return move_this().template lift(on_error_resume_next_impl{std::forward(resume_callable)}); + } + +private: + const SpecificObservable* cast_this() const + { + return static_cast(this); + } + + SpecificObservable&& move_this() + { + return std::move(*static_cast(this)); + } +}; + +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp new file mode 100644 index 000000000..f19ae9064 --- /dev/null +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -0,0 +1,85 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// TC Wang 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include // create_subscriber_with_state +#include +#include + +IMPLEMENTATION_FILE(on_error_resume_next_tag); + +namespace rpp::details +{ + +template +struct on_error_resume_state +{ + rpp::composite_subscription m_subscription; + ResumeCallable m_resume_callable; + + explicit on_error_resume_state(rpp::composite_subscription subscription, + const ResumeCallable& callable) : m_subscription(std::move(subscription)) + , m_resume_callable(callable) {}; + explicit on_error_resume_state() = delete; +}; + +/** + * Functor (type-erasure) of "on_error_resume_next" for on_error operator. + */ +struct on_error_resume_next_on_error +{ + template + void operator()(const std::exception_ptr& err, + const auto& subscriber, + const std::shared_ptr>& state) const + { + using Type = rpp::utils::extract_subscriber_type_t; + + auto new_observable = state->m_resume_callable(err); + + // Subscribe to next_observable + new_observable.subscribe(create_subscriber_with_state(state->m_subscription, + rpp::utils::forwarding_on_next{}, + rpp::utils::forwarding_on_error{}, + rpp::utils::forwarding_on_completed{}, + subscriber)); + } +}; + +/** + * \brief Functor of OperatorFn for "on_error_resume_next" operator (used by "lift"). + */ +template +struct on_error_resume_next_impl +{ + RPP_NO_UNIQUE_ADDRESS ResumeCallable m_resume_callable; + + explicit on_error_resume_next_impl(ResumeCallable&& callable) : m_resume_callable(std::move(callable)) {}; + explicit on_error_resume_next_impl() = delete; + + template TSub> + auto operator()(TSub&& downstream_subscriber) const + { + // Child subscription is for keeping the downstream subscriber's subscription alive when upstream sends on_error event. + auto state = std::make_shared>(downstream_subscriber.get_subscription(), m_resume_callable); + auto subscription = downstream_subscriber.get_subscription().make_child(); + + return create_subscriber_with_state(std::move(subscription), + rpp::utils::forwarding_on_next{}, + on_error_resume_next_on_error{}, + rpp::utils::forwarding_on_completed{}, + std::forward(downstream_subscriber), + std::move(state)); + } +}; +} // namespace rpp::details diff --git a/src/tests/test_on_error_resume_next.cpp b/src/tests/test_on_error_resume_next.cpp new file mode 100644 index 000000000..2b2e1ef3c --- /dev/null +++ b/src/tests/test_on_error_resume_next.cpp @@ -0,0 +1,118 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// TC Wang 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#include "mock_observer.hpp" + +#include +#include +#include +#include +#include + +SCENARIO("on_error_resume_next captures error by subscribing to new observable", "[on_error_resume_next]") +{ + GIVEN("observable of -x") + { + auto mock = mock_observer{}; + + rpp::source::error(std::make_exception_ptr(std::runtime_error{""})) + .on_error_resume_next([](auto&&) + { + return rpp::observable::just(1, 2, 3); + }) + .subscribe(mock); + + THEN("should see -1-2-3-|") + { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + + GIVEN("observable of -x and new observable emits error") + { + auto mock = mock_observer{}; + + rpp::source::error(std::make_exception_ptr(std::runtime_error{""})) + .on_error_resume_next([](auto&&) + { + return rpp::source::error(std::make_exception_ptr(std::runtime_error{""})); + }) + .subscribe(mock); + + THEN("should see -x") + { + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 1); + } + } + + GIVEN("observable of -x and new observable completes immediately") + { + auto mock = mock_observer{}; + + rpp::source::error(std::make_exception_ptr(std::runtime_error{""})) + .on_error_resume_next([](auto&&) + { + return rpp::source::empty(); + }) + .subscribe(mock); + + THEN("should see -|") + { + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } +} + +SCENARIO("on_error_resume_next forwards on_next and on_completed", "[on_error_resume_next]") +{ + GIVEN("observable of -1-2-3-|") + { + auto mock = mock_observer{}; + + rpp::source::just(1, 2, 3) + .on_error_resume_next([](auto&&) + { + return rpp::observable::just(4); + }) + .subscribe(mock); + + THEN("should see -1-2-3-|") + { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + + GIVEN("observable of -|") + { + auto mock = mock_observer{}; + + rpp::source::empty() + .on_error_resume_next([](auto&&) + { + return rpp::observable::just(1); + }) + .subscribe(mock); + + THEN("should see -|") + { + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } +} From 6e65d85aeb9c496997c0815ebc843e3e50ffb571 Mon Sep 17 00:00:00 2001 From: TC Wang Date: Tue, 11 Oct 2022 17:38:14 -0700 Subject: [PATCH 2/9] Remove redundant deleted default constructor. --- src/rpp/rpp/operators/on_error_resume_next.hpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index f19ae9064..564eaead0 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -30,7 +30,6 @@ struct on_error_resume_state explicit on_error_resume_state(rpp::composite_subscription subscription, const ResumeCallable& callable) : m_subscription(std::move(subscription)) , m_resume_callable(callable) {}; - explicit on_error_resume_state() = delete; }; /** @@ -65,7 +64,6 @@ struct on_error_resume_next_impl RPP_NO_UNIQUE_ADDRESS ResumeCallable m_resume_callable; explicit on_error_resume_next_impl(ResumeCallable&& callable) : m_resume_callable(std::move(callable)) {}; - explicit on_error_resume_next_impl() = delete; template TSub> auto operator()(TSub&& downstream_subscriber) const From 11a6cb873f0674b79f0ce50d5c967b286da890e6 Mon Sep 17 00:00:00 2001 From: TC Wang Date: Tue, 11 Oct 2022 17:39:40 -0700 Subject: [PATCH 3/9] Fix typo in comment. --- src/rpp/rpp/operators/fwd/on_error_resume_next.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp index 75c60e6de..489240d19 100644 --- a/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp @@ -37,7 +37,7 @@ struct member_overload { /** - * \brief Recover from an onError notification by continuing the sequence without error. + * \brief Recover from an on_error notification by continuing the sequence without error. * \details The operator intercepts an on_error notification from the source Observable and, instead of passing it through to any observers, replaces it with some other item or sequence of items. * \warning This operator potentially allows the resulting Observable to terminate normally or not to terminate at all. * From dddd300d967b9e6a6c1af0c922b893ebc1c49cfd Mon Sep 17 00:00:00 2001 From: TC Wang Date: Tue, 11 Oct 2022 17:45:12 -0700 Subject: [PATCH 4/9] Remove redundant shared_ptr allocation. --- .../rpp/operators/on_error_resume_next.hpp | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 564eaead0..19045ae60 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -21,17 +21,6 @@ IMPLEMENTATION_FILE(on_error_resume_next_tag); namespace rpp::details { -template -struct on_error_resume_state -{ - rpp::composite_subscription m_subscription; - ResumeCallable m_resume_callable; - - explicit on_error_resume_state(rpp::composite_subscription subscription, - const ResumeCallable& callable) : m_subscription(std::move(subscription)) - , m_resume_callable(callable) {}; -}; - /** * Functor (type-erasure) of "on_error_resume_next" for on_error operator. */ @@ -40,14 +29,15 @@ struct on_error_resume_next_on_error template void operator()(const std::exception_ptr& err, const auto& subscriber, - const std::shared_ptr>& state) const + const ResumeCallable& resume_callable) const { using Type = rpp::utils::extract_subscriber_type_t; - auto new_observable = state->m_resume_callable(err); + auto new_observable = resume_callable(err); // Subscribe to next_observable - new_observable.subscribe(create_subscriber_with_state(state->m_subscription, + auto subscription = subscriber.get_subscription(); + new_observable.subscribe(create_subscriber_with_state(std::move(subscription), rpp::utils::forwarding_on_next{}, rpp::utils::forwarding_on_error{}, rpp::utils::forwarding_on_completed{}, @@ -69,7 +59,6 @@ struct on_error_resume_next_impl auto operator()(TSub&& downstream_subscriber) const { // Child subscription is for keeping the downstream subscriber's subscription alive when upstream sends on_error event. - auto state = std::make_shared>(downstream_subscriber.get_subscription(), m_resume_callable); auto subscription = downstream_subscriber.get_subscription().make_child(); return create_subscriber_with_state(std::move(subscription), @@ -77,7 +66,7 @@ struct on_error_resume_next_impl on_error_resume_next_on_error{}, rpp::utils::forwarding_on_completed{}, std::forward(downstream_subscriber), - std::move(state)); + m_resume_callable); } }; } // namespace rpp::details From dd11df1fd9517811e91636569575ef2aead02669 Mon Sep 17 00:00:00 2001 From: TC Wang Date: Wed, 12 Oct 2022 08:29:44 -0700 Subject: [PATCH 5/9] Specify std::decay_t so that cv qualifier is removed and variable is copyable. --- src/rpp/rpp/operators/fwd/on_error_resume_next.hpp | 4 ++-- src/rpp/rpp/operators/on_error_resume_next.hpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp index 489240d19..20565e4ea 100644 --- a/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp @@ -60,13 +60,13 @@ struct member_overload template auto on_error_resume_next(ResumeCallable&& resume_callable) const& requires is_header_included { - return cast_this()->template lift(on_error_resume_next_impl{std::forward(resume_callable)}); + return cast_this()->template lift(on_error_resume_next_impl>{std::forward(resume_callable)}); } template auto on_error_resume_next(ResumeCallable&& resume_callable) && requires is_header_included { - return move_this().template lift(on_error_resume_next_impl{std::forward(resume_callable)}); + return move_this().template lift(on_error_resume_next_impl>{std::forward(resume_callable)}); } private: diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 19045ae60..a7fa8fb33 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -53,7 +53,7 @@ struct on_error_resume_next_impl { RPP_NO_UNIQUE_ADDRESS ResumeCallable m_resume_callable; - explicit on_error_resume_next_impl(ResumeCallable&& callable) : m_resume_callable(std::move(callable)) {}; + explicit on_error_resume_next_impl(ResumeCallable&& callable) : m_resume_callable(std::forward(callable)) {}; template TSub> auto operator()(TSub&& downstream_subscriber) const From 8e25be1216c23dc2acf1b1b46d710cfa26e3e4f7 Mon Sep 17 00:00:00 2001 From: TC Wang Date: Wed, 12 Oct 2022 08:35:47 -0700 Subject: [PATCH 6/9] Reduce amount of copying. --- src/rpp/rpp/operators/on_error_resume_next.hpp | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index a7fa8fb33..1e5c66f7d 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -33,15 +33,12 @@ struct on_error_resume_next_on_error { using Type = rpp::utils::extract_subscriber_type_t; - auto new_observable = resume_callable(err); - // Subscribe to next_observable - auto subscription = subscriber.get_subscription(); - new_observable.subscribe(create_subscriber_with_state(std::move(subscription), - rpp::utils::forwarding_on_next{}, - rpp::utils::forwarding_on_error{}, - rpp::utils::forwarding_on_completed{}, - subscriber)); + resume_callable(err).subscribe(create_subscriber_with_state(subscriber.get_subscription(), + rpp::utils::forwarding_on_next{}, + rpp::utils::forwarding_on_error{}, + rpp::utils::forwarding_on_completed{}, + subscriber)); } }; From 3069a2811fb19fa55bfdbd9042c09feeb214969c Mon Sep 17 00:00:00 2001 From: TC Wang Date: Wed, 12 Oct 2022 12:12:33 -0700 Subject: [PATCH 7/9] Add more test case(s). --- src/tests/test_on_error_resume_next.cpp | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/tests/test_on_error_resume_next.cpp b/src/tests/test_on_error_resume_next.cpp index 2b2e1ef3c..5bd0be135 100644 --- a/src/tests/test_on_error_resume_next.cpp +++ b/src/tests/test_on_error_resume_next.cpp @@ -11,6 +11,8 @@ #include "mock_observer.hpp" #include +#include +#include #include #include #include @@ -74,6 +76,28 @@ SCENARIO("on_error_resume_next captures error by subscribing to new observable", CHECK(mock.get_on_error_count() == 0); } } + + GIVEN("observable of -1-2-x and new observable completes immediately") + { + auto mock = mock_observer{}; + + rpp::source::just(rpp::source::just(1).as_dynamic(), + rpp::source::just(2).as_dynamic(), + rpp::source::error(std::make_exception_ptr(std::runtime_error{""})).as_dynamic()) + .concat() + .on_error_resume_next([](auto&&) + { + return rpp::source::just(3); + }) + .subscribe(mock); + + THEN("should see -1-2-3-|") + { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } } SCENARIO("on_error_resume_next forwards on_next and on_completed", "[on_error_resume_next]") From bb77066fbe170a480b4f22f9c9396b23384fef6d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 12 Oct 2022 22:42:34 +0300 Subject: [PATCH 8/9] Update src/rpp/rpp/operators/on_error_resume_next.hpp --- src/rpp/rpp/operators/on_error_resume_next.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 1e5c66f7d..bad8070d7 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -50,7 +50,8 @@ struct on_error_resume_next_impl { RPP_NO_UNIQUE_ADDRESS ResumeCallable m_resume_callable; - explicit on_error_resume_next_impl(ResumeCallable&& callable) : m_resume_callable(std::forward(callable)) {}; + explicit on_error_resume_next_impl(const ResumeCallable& callable) : m_resume_callable(callable) {}; + explicit on_error_resume_next_impl(ResumeCallable&& callable) : m_resume_callable(std::move(callable)) {}; template TSub> auto operator()(TSub&& downstream_subscriber) const From b46604901401a54ded4048e0b9b326b00d3b4134 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 12 Oct 2022 23:07:40 +0300 Subject: [PATCH 9/9] Update on_error_resume_next.hpp --- src/rpp/rpp/operators/on_error_resume_next.hpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index bad8070d7..90df7ae98 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -50,9 +50,6 @@ struct on_error_resume_next_impl { RPP_NO_UNIQUE_ADDRESS ResumeCallable m_resume_callable; - explicit on_error_resume_next_impl(const ResumeCallable& callable) : m_resume_callable(callable) {}; - explicit on_error_resume_next_impl(ResumeCallable&& callable) : m_resume_callable(std::move(callable)) {}; - template TSub> auto operator()(TSub&& downstream_subscriber) const {