diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index a938dbeca..4f8ceae55 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -119,7 +119,7 @@ - [ ] backpressure ??? ### Error handling -- [ ] catch +- [x] catch - [ ] retry ### Utility diff --git a/src/benchmarks/rpp_benchmark.cpp b/src/benchmarks/rpp_benchmark.cpp index 40e604426..8e1fb0695 100644 --- a/src/benchmarks/rpp_benchmark.cpp +++ b/src/benchmarks/rpp_benchmark.cpp @@ -1077,4 +1077,26 @@ TEST_CASE("single-threaded locks") } return target; }; -} \ No newline at end of file +} + +TEST_CASE("on_error_resume_next") +{ + BENCHMARK_ADVANCED("on_error_resume_next construction from observable via dot + subscribe")(Catch::Benchmark::Chronometer meter) + { + const auto obs = rpp::observable::create([](const auto& subscriber) + { + subscriber.on_error(std::make_exception_ptr(std::runtime_error{""})); + }); + auto subscriber = rpp::specific_subscriber{[](const int&) {}}; + + meter.measure([&] + { + return obs + .on_error_resume_next([](auto&&) + { + return rpp::observable::just(1); + }) + .subscribe(subscriber); + }); + }; +} diff --git a/src/benchmarks/rxcpp_benchmark.cpp b/src/benchmarks/rxcpp_benchmark.cpp index 361fe72b8..6438bcfd6 100644 --- a/src/benchmarks/rxcpp_benchmark.cpp +++ b/src/benchmarks/rxcpp_benchmark.cpp @@ -1045,3 +1045,24 @@ TEST_CASE("trampoline scheduler") }; } +TEST_CASE("on_error_resume_next") +{ + BENCHMARK_ADVANCED("on_error_resume_next construction from observable via dot + subscribe")(Catch::Benchmark::Chronometer meter) + { + const auto obs = rxcpp::sources::create([](const auto& subscriber) + { + subscriber.on_error(std::make_exception_ptr(std::runtime_error{""})); + }); + auto subscriber = rxcpp::make_subscriber(); + + meter.measure([&](int) + { + return obs + .on_error_resume_next([](auto&&) + { + return rxcpp::observable<>::just(1); + }) + .subscribe(subscriber); + }); + }; +} diff --git a/src/examples/doxygen/on_error_resume_next.cpp b/src/examples/doxygen/on_error_resume_next.cpp new file mode 100644 index 000000000..7668c8219 --- /dev/null +++ b/src/examples/doxygen/on_error_resume_next.cpp @@ -0,0 +1,32 @@ +#include + +#include + +/** + * \example on_error_resume_next.cpp + **/ +int main() +{ + //! [on_error_resume_next] + rpp::source::error(std::make_exception_ptr(std::runtime_error{""})) + .on_error_resume_next([](auto&&) + { + return rpp::observable::just(1, 2, 3); + }) + .subscribe([&](int v) + { + std::cout << "-" << v; + }, + [&](auto&&) + { + std::cout << "-x"; + }, + [&]() + { + std::cout << "-|" << std::endl; + }); + // source: -x + // output: -1-2-3-| + //! [on_error_resume_next] + return 0; +} diff --git a/src/rpp/rpp/observables/interface_observable.hpp b/src/rpp/rpp/observables/interface_observable.hpp index e7bcfbfef..0208be9e3 100644 --- a/src/rpp/rpp/observables/interface_observable.hpp +++ b/src/rpp/rpp/observables/interface_observable.hpp @@ -53,6 +53,7 @@ struct RPP_EMPTY_BASES interface_observable , details::member_overload , details::member_overload , details::member_overload + , details::member_overload , details::member_overload , details::member_overload , details::member_overload diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 1ddc4f268..dbd94e1c4 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -79,7 +79,6 @@ #include - /** * \defgroup utility_operators Utility Operators * \brief Utility operators are operators that provide some extra functionality without changing of original values, but changing of behaviour @@ -102,4 +101,13 @@ #include #include -#include \ No newline at end of file +#include + +/** + * \defgroup error_handling_operators Error handling Operators + * \brief Error handling operators Operators that help to recover from error notifications from an Observable. + * \see https://reactivex.io/documentation/operators.html#error + * \ingroup operators + */ + +#include diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 4dd8e1c3f..22f9d49ad 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp new file mode 100644 index 000000000..20565e4ea --- /dev/null +++ b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp @@ -0,0 +1,84 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// TC Wang 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace rpp::details +{ +struct on_error_resume_next_tag; +} + +namespace rpp::details +{ + +template +concept resume_callable = std::invocable && constraint::observable>; + +template +struct on_error_resume_next_impl; + +template +struct member_overload +{ + + /** + * \brief Recover from an on_error notification by continuing the sequence without error. + * \details The operator intercepts an on_error notification from the source Observable and, instead of passing it through to any observers, replaces it with some other item or sequence of items. + * \warning This operator potentially allows the resulting Observable to terminate normally or not to terminate at all. + * + * \marble on_error_resume_next + { + source observable : +-1-x + operator "on_error_resume_next: -9-9-|" : +-1-9-9-| + } + * + * \param resume_callable A callable that is given an error pointer and shall return an Observable. + * \return new specific_observable with the on_error_resume_next operator as most recent operator. + * \warning #include + * + * \par Examples + * \snippet on_error_resume_next.cpp on_error_resume_next + * + * \ingroup error_handling_operators + * \see https://reactivex.io/documentation/operators/on_error_resume_next.html + */ + template + auto on_error_resume_next(ResumeCallable&& resume_callable) const& requires is_header_included + { + return cast_this()->template lift(on_error_resume_next_impl>{std::forward(resume_callable)}); + } + + template + auto on_error_resume_next(ResumeCallable&& resume_callable) && requires is_header_included + { + return move_this().template lift(on_error_resume_next_impl>{std::forward(resume_callable)}); + } + +private: + const SpecificObservable* cast_this() const + { + return static_cast(this); + } + + SpecificObservable&& move_this() + { + return std::move(*static_cast(this)); + } +}; + +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp new file mode 100644 index 000000000..90df7ae98 --- /dev/null +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -0,0 +1,67 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// TC Wang 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include // create_subscriber_with_state +#include +#include + +IMPLEMENTATION_FILE(on_error_resume_next_tag); + +namespace rpp::details +{ + +/** + * Functor (type-erasure) of "on_error_resume_next" for on_error operator. + */ +struct on_error_resume_next_on_error +{ + template + void operator()(const std::exception_ptr& err, + const auto& subscriber, + const ResumeCallable& resume_callable) const + { + using Type = rpp::utils::extract_subscriber_type_t; + + // Subscribe to next_observable + resume_callable(err).subscribe(create_subscriber_with_state(subscriber.get_subscription(), + rpp::utils::forwarding_on_next{}, + rpp::utils::forwarding_on_error{}, + rpp::utils::forwarding_on_completed{}, + subscriber)); + } +}; + +/** + * \brief Functor of OperatorFn for "on_error_resume_next" operator (used by "lift"). + */ +template +struct on_error_resume_next_impl +{ + RPP_NO_UNIQUE_ADDRESS ResumeCallable m_resume_callable; + + template TSub> + auto operator()(TSub&& downstream_subscriber) const + { + // Child subscription is for keeping the downstream subscriber's subscription alive when upstream sends on_error event. + auto subscription = downstream_subscriber.get_subscription().make_child(); + + return create_subscriber_with_state(std::move(subscription), + rpp::utils::forwarding_on_next{}, + on_error_resume_next_on_error{}, + rpp::utils::forwarding_on_completed{}, + std::forward(downstream_subscriber), + m_resume_callable); + } +}; +} // namespace rpp::details diff --git a/src/tests/test_on_error_resume_next.cpp b/src/tests/test_on_error_resume_next.cpp new file mode 100644 index 000000000..5bd0be135 --- /dev/null +++ b/src/tests/test_on_error_resume_next.cpp @@ -0,0 +1,142 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// TC Wang 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#include "mock_observer.hpp" + +#include +#include +#include +#include +#include +#include +#include + +SCENARIO("on_error_resume_next captures error by subscribing to new observable", "[on_error_resume_next]") +{ + GIVEN("observable of -x") + { + auto mock = mock_observer{}; + + rpp::source::error(std::make_exception_ptr(std::runtime_error{""})) + .on_error_resume_next([](auto&&) + { + return rpp::observable::just(1, 2, 3); + }) + .subscribe(mock); + + THEN("should see -1-2-3-|") + { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + + GIVEN("observable of -x and new observable emits error") + { + auto mock = mock_observer{}; + + rpp::source::error(std::make_exception_ptr(std::runtime_error{""})) + .on_error_resume_next([](auto&&) + { + return rpp::source::error(std::make_exception_ptr(std::runtime_error{""})); + }) + .subscribe(mock); + + THEN("should see -x") + { + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 1); + } + } + + GIVEN("observable of -x and new observable completes immediately") + { + auto mock = mock_observer{}; + + rpp::source::error(std::make_exception_ptr(std::runtime_error{""})) + .on_error_resume_next([](auto&&) + { + return rpp::source::empty(); + }) + .subscribe(mock); + + THEN("should see -|") + { + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + + GIVEN("observable of -1-2-x and new observable completes immediately") + { + auto mock = mock_observer{}; + + rpp::source::just(rpp::source::just(1).as_dynamic(), + rpp::source::just(2).as_dynamic(), + rpp::source::error(std::make_exception_ptr(std::runtime_error{""})).as_dynamic()) + .concat() + .on_error_resume_next([](auto&&) + { + return rpp::source::just(3); + }) + .subscribe(mock); + + THEN("should see -1-2-3-|") + { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } +} + +SCENARIO("on_error_resume_next forwards on_next and on_completed", "[on_error_resume_next]") +{ + GIVEN("observable of -1-2-3-|") + { + auto mock = mock_observer{}; + + rpp::source::just(1, 2, 3) + .on_error_resume_next([](auto&&) + { + return rpp::observable::just(4); + }) + .subscribe(mock); + + THEN("should see -1-2-3-|") + { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + + GIVEN("observable of -|") + { + auto mock = mock_observer{}; + + rpp::source::empty() + .on_error_resume_next([](auto&&) + { + return rpp::observable::just(1); + }) + .subscribe(mock); + + THEN("should see -|") + { + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } +}