Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/benchmarks/benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,30 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
}
} // BENCHMARK("Aggregating Operators")

BENCHMARK("Error Handling Operators")
{
SECTION("create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe")
{
TEST_RPP([&]() {
rpp::source::create<int>([&](auto&& observer) {
observer.on_next(1);
observer.on_error(std::make_exception_ptr(std::runtime_error{""}));
})
| rpp::operators::on_error_resume_next([](const std::exception_ptr&) { return rpp::immediate_just(2); })
| rpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});

TEST_RXCPP([&]() {
rxcpp::observable<>::create<int>([&](auto&& observer) {
observer.on_next(1);
observer.on_error(std::make_exception_ptr(std::runtime_error{""}));
})
| rxcpp::operators::on_error_resume_next([](const std::exception_ptr&) { return rxcpp::immediate_just(2); })
| rxcpp::operators::subscribe<int>([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}
} // BENCHMARK("Error Handling Operators")

BENCHMARK("Subjects")
{
SECTION("publish_subject with 1 observer - on_next")
Expand Down
9 changes: 9 additions & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,12 @@

#include <rpp/operators/concat.hpp>
#include <rpp/operators/reduce.hpp>

/**
* @defgroup error_handling_operators Error Handling Operators
* @brief Operators that help to recover from error notifications from an Observable
* @see https://reactivex.io/documentation/operators.html#error
* @ingroup operators
*/

#include <rpp/operators/on_error_resume_next.hpp>
109 changes: 109 additions & 0 deletions src/rpp/rpp/operators/on_error_resume_next.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/operators/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/operators/details/strategy.hpp>

namespace rpp::operators::details
{
template<rpp::constraint::observer TObserver, rpp::constraint::decayed_type Selector>
struct on_error_resume_next_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

RPP_NO_UNIQUE_ADDRESS mutable TObserver observer;
RPP_NO_UNIQUE_ADDRESS Selector selector;
// Manually control disposable to ensure observer is not used after move in on_error emission
mutable rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make();

RPP_CALL_DURING_CONSTRUCTION(
{
observer.set_upstream(disposable);
});

template<typename T>
void on_next(T&& v) const
{
observer.on_next(std::forward<T>(v));
}

void on_error(const std::exception_ptr& err) const
{
disposable.dispose();
selector(err).subscribe(std::move(observer));
}

void on_completed() const
{
disposable.dispose();
observer.on_completed();
}

void set_upstream(const disposable_wrapper& d)
{
disposable.add(d);
}

bool is_disposed() const { return disposable.is_disposed(); }
};

template<rpp::constraint::decayed_type Selector>
struct on_error_resume_next_t : lift_operator<on_error_resume_next_t<Selector>, Selector>
{
template<rpp::constraint::decayed_type T>
struct operator_traits
{
using selector_observable_result_type =
rpp::utils::extract_observable_type_t<std::invoke_result_t<Selector, std::exception_ptr>>;

static_assert(
rpp::constraint::decayed_same_as<selector_observable_result_type, T>,
"Selector observable result type is not the same as T");

using result_type = T;

template<rpp::constraint::observer_of_type<result_type> TObserver>
using observer_strategy = on_error_resume_next_observer_strategy<TObserver, Selector>;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::atomic_dynamic_disposable_strategy_selector<1>;
};
} // namespace rpp::operators::details

namespace rpp::operators
{
/**
* @brief If an error occurs, take the result from the Selector and subscribe to that instead.
*
* @marble on_error_resume_next
{
source observable : +-1-x
operator "on_error_resume_next: () => obs(-2-3-|)" : +-1-2-3-|
}
*
* @param selector callable taking a std::exception_ptr and returning observable to continue on
*
* @warning #include <rpp/operators/on_error_resume_next.hpp>
*
* @ingroup error_handling_operators
* @see https://reactivex.io/documentation/operators/catch.html
*/
template<typename Selector>
requires rpp::constraint::observable<std::invoke_result_t<Selector, std::exception_ptr>>
auto on_error_resume_next(Selector&& selector)
{
return details::on_error_resume_next_t<std::decay_t<Selector>>{std::forward<Selector>(selector)};
}
} // namespace rpp::operators
156 changes: 156 additions & 0 deletions src/tests/rpp/test_on_error_resume_next.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#include <snitch/snitch.hpp>

#include <rpp/operators/on_error_resume_next.hpp>
#include <rpp/schedulers/immediate.hpp>
#include <rpp/sources/create.hpp>
#include <rpp/sources/empty.hpp>
#include <rpp/sources/just.hpp>

#include "disposable_observable.hpp"
#include "mock_observer.hpp"

TEMPLATE_TEST_CASE("on_error_resume_next switches observable on error", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared)
{
auto mock = mock_observer_strategy<int>();
SECTION("observable without error emission")
{
auto obs = rpp::source::just<TestType>(rpp::schedulers::immediate{}, 1, 2, 3);
SECTION("subscribe")
{
obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty<int>(); })
| rpp::ops::subscribe(mock);
SECTION("observer obtains values from observable")
{
CHECK(mock.get_received_values() == std::vector{1, 2, 3});
CHECK(mock.get_total_on_next_count() == 3);
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}

SECTION("observable with one error emission")
{
auto obs = rpp::source::create<int>([](const auto& sub) {
sub.on_next(1);
sub.on_next(2);
sub.on_error(std::make_exception_ptr(std::runtime_error{""}));
});
SECTION("subscribe")
{
obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) {
return rpp::source::just<TestType>(rpp::schedulers::immediate{}, 3);
})
| rpp::ops::subscribe(mock);
SECTION("observer obtains values from both outer and inner observable")
{
CHECK(mock.get_received_values() == std::vector{1, 2, 3});
CHECK(mock.get_total_on_next_count() == 3);
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}

SECTION("observable with two error emissions")
{
auto obs = rpp::source::create<int>([](const auto& sub) {
sub.on_next(1);
sub.on_next(2);
sub.on_error(std::make_exception_ptr(std::runtime_error{""}));
sub.on_error(std::make_exception_ptr(std::runtime_error{""}));
});
SECTION("subscribe")
{
obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) {
return rpp::source::just<TestType>(rpp::schedulers::immediate{}, 3);
})
| rpp::ops::subscribe(mock);
SECTION("observer only receives values from first inner observable")
{
CHECK(mock.get_received_values() == std::vector{1, 2, 3});
CHECK(mock.get_total_on_next_count() == 3);
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}

SECTION("inner observable different emission type")
{
auto obs = rpp::source::create<int>([](const auto& sub) {
sub.on_next(1);
sub.on_next(2);
sub.on_error(std::make_exception_ptr(std::runtime_error{""}));
sub.on_error(std::make_exception_ptr(std::runtime_error{""}));
});
SECTION("subscribe")
{
obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) {
return rpp::source::just<TestType>(rpp::schedulers::immediate{}, 3);
})
| rpp::ops::subscribe(mock);
SECTION("observer only receives values from first inner observable")
{
CHECK(mock.get_received_values() == std::vector{1, 2, 3});
CHECK(mock.get_total_on_next_count() == 3);
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}

SECTION("nested on_error_resume_next operators")
{
auto obs = rpp::source::create<int>([](const auto& sub) {
sub.on_next(1);
sub.on_error(std::make_exception_ptr(std::runtime_error{""}));
});
SECTION("subscribe")
{
obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) {
return rpp::source::create<int>([](const auto& sub) {
sub.on_next(2);
sub.on_error(std::make_exception_ptr(std::runtime_error{""}));
})
| rpp::operators::on_error_resume_next([](const std::exception_ptr&) {
return rpp::source::create<int>([](const auto& sub) {
sub.on_next(3);
sub.on_completed();
});
});
})
| rpp::ops::subscribe(mock);
SECTION("observer receives values without any errors")
{
CHECK(mock.get_received_values() == std::vector{1, 2, 3});
CHECK(mock.get_total_on_next_count() == 3);
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}
}

TEST_CASE("on_error_resume_next satisfies disposable contracts")
{
auto observable_disposable = rpp::composite_disposable_wrapper::make();
{
auto observable = observable_with_disposable<int>(observable_disposable);

test_operator_with_disposable<int>(
rpp::ops::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty<int>(); }));
}

CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2);
}