Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Implementation Status.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
- [ ] backpressure ???

### Error handling
- [ ] catch
- [x] catch
- [ ] retry

### Utility
Expand Down
24 changes: 23 additions & 1 deletion src/benchmarks/rpp_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1077,4 +1077,26 @@ TEST_CASE("single-threaded locks")
}
return target;
};
}
}

TEST_CASE("on_error_resume_next")
{
BENCHMARK_ADVANCED("on_error_resume_next construction from observable via dot + subscribe")(Catch::Benchmark::Chronometer meter)
{
const auto obs = rpp::observable::create<int>([](const auto& subscriber)
{
subscriber.on_error(std::make_exception_ptr(std::runtime_error{""}));
});
auto subscriber = rpp::specific_subscriber{[](const int&) {}};

meter.measure([&]
{
return obs
.on_error_resume_next([](auto&&)
{
return rpp::observable::just(1);
})
.subscribe(subscriber);
});
};
}
21 changes: 21 additions & 0 deletions src/benchmarks/rxcpp_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1045,3 +1045,24 @@ TEST_CASE("trampoline scheduler")
};
}

TEST_CASE("on_error_resume_next")
{
BENCHMARK_ADVANCED("on_error_resume_next construction from observable via dot + subscribe")(Catch::Benchmark::Chronometer meter)
{
const auto obs = rxcpp::sources::create<int>([](const auto& subscriber)
{
subscriber.on_error(std::make_exception_ptr(std::runtime_error{""}));
});
auto subscriber = rxcpp::make_subscriber<int>();

meter.measure([&](int)
{
return obs
.on_error_resume_next([](auto&&)
{
return rxcpp::observable<>::just(1);
})
.subscribe(subscriber);
});
};
}
32 changes: 32 additions & 0 deletions src/examples/doxygen/on_error_resume_next.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include <rpp/rpp.hpp>

#include <iostream>

/**
* \example on_error_resume_next.cpp
**/
int main()
{
//! [on_error_resume_next]
rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{""}))
.on_error_resume_next([](auto&&)
{
return rpp::observable::just(1, 2, 3);
})
.subscribe([&](int v)
{
std::cout << "-" << v;
},
[&](auto&&)
{
std::cout << "-x";
},
[&]()
{
std::cout << "-|" << std::endl;
});
// source: -x
// output: -1-2-3-|
//! [on_error_resume_next]
return 0;
}
1 change: 1 addition & 0 deletions src/rpp/rpp/observables/interface_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct RPP_EMPTY_BASES interface_observable
, details::member_overload<Type, SpecificObservable, details::merge_tag>
, details::member_overload<Type, SpecificObservable, details::multicast_tag>
, details::member_overload<Type, SpecificObservable, details::observe_on_tag>
, details::member_overload<Type, SpecificObservable, details::on_error_resume_next_tag>
, details::member_overload<Type, SpecificObservable, details::publish_tag>
, details::member_overload<Type, SpecificObservable, details::repeat_tag>
, details::member_overload<Type, SpecificObservable, details::sample_tag>
Expand Down
12 changes: 10 additions & 2 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@

#include <rpp/operators/concat.hpp>


/**
* \defgroup utility_operators Utility Operators
* \brief Utility operators are operators that provide some extra functionality without changing of original values, but changing of behaviour
Expand All @@ -102,4 +101,13 @@

#include <rpp/operators/multicast.hpp>
#include <rpp/operators/publish.hpp>
#include <rpp/operators/ref_count.hpp>
#include <rpp/operators/ref_count.hpp>

/**
* \defgroup error_handling_operators Error handling Operators
* \brief Error handling operators Operators that help to recover from error notifications from an Observable.
* \see https://reactivex.io/documentation/operators.html#error
* \ingroup operators
*/

#include <rpp/operators/on_error_resume_next.hpp>
1 change: 1 addition & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <rpp/operators/fwd/merge.hpp>
#include <rpp/operators/fwd/multicast.hpp>
#include <rpp/operators/fwd/observe_on.hpp>
#include <rpp/operators/fwd/on_error_resume_next.hpp>
#include <rpp/operators/fwd/publish.hpp>
#include <rpp/operators/fwd/ref_count.hpp>
#include <rpp/operators/fwd/repeat.hpp>
Expand Down
84 changes: 84 additions & 0 deletions src/rpp/rpp/operators/fwd/on_error_resume_next.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// TC Wang 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#pragma once

#include <rpp/observables/constraints.hpp>
#include <rpp/observables/details/member_overload.hpp>
#include <rpp/utils/exceptions.hpp>
#include <rpp/utils/function_traits.hpp>
#include <rpp/utils/functors.hpp>

#include <exception>

namespace rpp::details
{
struct on_error_resume_next_tag;
}

namespace rpp::details
{

template<typename Fn>
concept resume_callable = std::invocable<Fn, std::exception_ptr> && constraint::observable<utils::decayed_invoke_result_t<Fn, std::exception_ptr>>;

template<constraint::decayed_type Type, rpp::details::resume_callable ResumeCallable>
struct on_error_resume_next_impl;

template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, on_error_resume_next_tag>
{

/**
* \brief Recover from an on_error notification by continuing the sequence without error.
* \details The operator intercepts an on_error notification from the source Observable and, instead of passing it through to any observers, replaces it with some other item or sequence of items.
* \warning This operator potentially allows the resulting Observable to terminate normally or not to terminate at all.
*
* \marble on_error_resume_next
{
source observable : +-1-x
operator "on_error_resume_next: -9-9-|" : +-1-9-9-|
}
*
* \param resume_callable A callable that is given an error pointer and shall return an Observable.
* \return new specific_observable with the on_error_resume_next operator as most recent operator.
* \warning #include <rpp/operators/on_error_resume_next.hpp>
*
* \par Examples
* \snippet on_error_resume_next.cpp on_error_resume_next
*
* \ingroup error_handling_operators
* \see https://reactivex.io/documentation/operators/on_error_resume_next.html
*/
template<rpp::details::resume_callable ResumeCallable>
auto on_error_resume_next(ResumeCallable&& resume_callable) const& requires is_header_included<on_error_resume_next_tag, ResumeCallable>
{
return cast_this()->template lift<Type>(on_error_resume_next_impl<Type, std::decay_t<ResumeCallable>>{std::forward<ResumeCallable>(resume_callable)});
}

template<rpp::details::resume_callable ResumeCallable>
auto on_error_resume_next(ResumeCallable&& resume_callable) && requires is_header_included<on_error_resume_next_tag, ResumeCallable>
{
return move_this().template lift<Type>(on_error_resume_next_impl<Type, std::decay_t<ResumeCallable>>{std::forward<ResumeCallable>(resume_callable)});
}

private:
const SpecificObservable* cast_this() const
{
return static_cast<const SpecificObservable*>(this);
}

SpecificObservable&& move_this()
{
return std::move(*static_cast<SpecificObservable*>(this));
}
};

} // namespace rpp::details
67 changes: 67 additions & 0 deletions src/rpp/rpp/operators/on_error_resume_next.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// TC Wang 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/defs.hpp>
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <rpp/operators/fwd/on_error_resume_next.hpp>
#include <rpp/subscribers/constraints.hpp>

IMPLEMENTATION_FILE(on_error_resume_next_tag);

namespace rpp::details
{

/**
* Functor (type-erasure) of "on_error_resume_next" for on_error operator.
*/
struct on_error_resume_next_on_error
{
template<rpp::details::resume_callable ResumeCallable>
void operator()(const std::exception_ptr& err,
const auto& subscriber,
const ResumeCallable& resume_callable) const
{
using Type = rpp::utils::extract_subscriber_type_t<decltype(subscriber)>;

// Subscribe to next_observable
resume_callable(err).subscribe(create_subscriber_with_state<Type>(subscriber.get_subscription(),
rpp::utils::forwarding_on_next{},
rpp::utils::forwarding_on_error{},
rpp::utils::forwarding_on_completed{},
subscriber));
}
};

/**
* \brief Functor of OperatorFn for "on_error_resume_next" operator (used by "lift").
*/
template<constraint::decayed_type Type, rpp::details::resume_callable ResumeCallable>
struct on_error_resume_next_impl
{
RPP_NO_UNIQUE_ADDRESS ResumeCallable m_resume_callable;

template<constraint::subscriber_of_type<Type> TSub>
auto operator()(TSub&& downstream_subscriber) const
{
// Child subscription is for keeping the downstream subscriber's subscription alive when upstream sends on_error event.
auto subscription = downstream_subscriber.get_subscription().make_child();

return create_subscriber_with_state<Type>(std::move(subscription),
rpp::utils::forwarding_on_next{},
on_error_resume_next_on_error{},
rpp::utils::forwarding_on_completed{},
std::forward<TSub>(downstream_subscriber),
m_resume_callable);
}
};
} // namespace rpp::details
Loading