Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/benchmarks/benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,24 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
| rxcpp::operators::subscribe<int>([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}
SECTION("create(on_error())+retry(1)+subscribe")
{
TEST_RPP([&]() {
rpp::source::create<int>([&](auto&& observer) {
observer.on_error({});
})
| rpp::operators::retry(1)
| rpp::operators::subscribe([](int) {}, [](const std::exception_ptr& e) { ankerl::nanobench::doNotOptimizeAway(e); });
});

TEST_RXCPP([&]() {
rxcpp::observable<>::create<int>([&](auto&& observer) {
observer.on_error({});
})
| rxcpp::operators::retry(1)
| rxcpp::operators::subscribe<int>([](int) {}, [](const std::exception_ptr& e) { ankerl::nanobench::doNotOptimizeAway(e); });
});
}
} // BENCHMARK("Error Handling Operators")

BENCHMARK("Subjects")
Expand Down
30 changes: 30 additions & 0 deletions src/examples/rpp/doxygen/retry.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include <rpp/rpp.hpp>

#include <exception>
#include <iostream>

/**
* @example retry.cpp
**/
int main()
{
//! [retry]
rpp::source::concat(rpp::source::just(1, 2, 3), rpp::source::error<int>({}))
| rpp::operators::retry(2)
| rpp::operators::subscribe([](int v) { std::cout << v << " "; },
[](const std::exception_ptr&) { std::cout << "error" << std::endl; },
[]() { std::cout << "completed" << std::endl; });
// Output: 1 2 3 1 2 3 1 2 3 error
//! [retry]

//! [retry_infinitely]
rpp::source::concat(rpp::source::just(1, 2, 3), rpp::source::error<int>({}))
| rpp::operators::retry()
| rpp::operators::take(10)
| rpp::operators::subscribe([](int v) { std::cout << v << " "; },
[](const std::exception_ptr&) { std::cout << "error" << std::endl; },
[]() { std::cout << "completed" << std::endl; });
// Output: 1 2 3 1 2 3 1 2 3 1 completed
//! [retry_infinitely]
return 0;
}
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,4 @@
*/

#include <rpp/operators/on_error_resume_next.hpp>
#include <rpp/operators/retry.hpp>
4 changes: 4 additions & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ namespace rpp::operators

auto repeat();

auto retry(size_t count);

auto retry();

template<typename InitialValue, typename Fn>
requires (!utils::is_not_template_callable<Fn> || std::same_as<std::decay_t<InitialValue>, std::invoke_result_t<Fn, std::decay_t<InitialValue> &&, rpp::utils::convertible_to_any>>)
auto scan(InitialValue&& initial_value, Fn&& accumulator);
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/on_error_resume_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ namespace rpp::operators::details
disposable.dispose();
}

void set_upstream(const disposable_wrapper& d)
void set_upstream(const disposable_wrapper& d) const
{
disposable.add(d);
}
Expand Down
182 changes: 182 additions & 0 deletions src/rpp/rpp/operators/retry.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/operators/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/operators/details/strategy.hpp>

namespace rpp::operators::details
{
template<rpp::constraint::observer TObserver, constraint::decayed_type Observable>
struct retry_state_t final : public rpp::composite_disposable
{
retry_state_t(TObserver&& in_observer, const Observable& observable, std::optional<size_t> count)
: count{count}
, observer(std::move(in_observer))
, observable(observable)

{
}

std::optional<size_t> count;
std::atomic<bool> is_inside_drain{};

RPP_NO_UNIQUE_ADDRESS TObserver observer;
RPP_NO_UNIQUE_ADDRESS Observable observable;
};

template<rpp::constraint::observer TObserver, typename TObservable>
void drain(const std::shared_ptr<retry_state_t<TObserver, TObservable>>& state);

template<rpp::constraint::observer TObserver, typename TObservable>
struct retry_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

std::shared_ptr<retry_state_t<TObserver, TObservable>> state;
mutable bool locally_disposed{};

template<typename T>
void on_next(T&& v) const
{
state->observer.on_next(std::forward<T>(v));
}

void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
if (state->count == 0)
{
state->observer.on_error(err);
state->dispose();
return;
}

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
return;

drain(state);
}

void on_completed() const
{
locally_disposed = true;
state->observer.on_completed();
}

void set_upstream(const disposable_wrapper& d) const
{
state->add(d);
}

bool is_disposed() const { return locally_disposed || state->is_disposed(); }
};

template<rpp::constraint::observer TObserver, typename TObservable>
void drain(const std::shared_ptr<retry_state_t<TObserver, TObservable>>& state)
{
while (!state->is_disposed())
{
if (state->count)
--state->count.value();
state->clear();
state->is_inside_drain.store(true, std::memory_order::seq_cst);
try
{
using value_type = rpp::utils::extract_observer_type_t<TObserver>;
state->observable.subscribe(observer<value_type, retry_observer_strategy<TObserver, TObservable>>{state});

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
return;
}
catch (...)
{
state->observer.on_error(std::current_exception());
return;
}
}
}

struct retry_t
{
const std::optional<size_t> count{};

template<rpp::constraint::decayed_type T>
struct operator_traits
{
using result_type = T;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;

template<rpp::constraint::observer TObserver, typename TObservable>
void subscribe(TObserver&& observer, TObservable&& observble) const
{
const auto d = disposable_wrapper_impl<retry_state_t<std::decay_t<TObserver>, std::decay_t<TObservable>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observble), count ? count.value() + 1 : count);
auto ptr = d.lock();

ptr->observer.set_upstream(d.as_weak());
drain(ptr);
}
};
} // namespace rpp::operators::details

namespace rpp::operators
{
/**
* @brief The retry operator attempts to resubscribe to the observable when an error occurs, up to the specified number of retries.
*
* @marble retry
{
source observable : +-1-x
operator "retry:(2)" : +-1-1-1-x
}
*
* @param count is the number of retries
*
* @warning #include <rpp/operators/retry.hpp>
*
* @par Examples:
* @snippet retry.cpp retry
*
* @ingroup error_handling_operators
* @see https://reactivex.io/documentation/operators/retry.html
*/
inline auto retry(size_t count)
{
return details::retry_t{count};
}

/**
* @brief The infinite retry operator continuously attempts to resubscribe to the observable upon error, without a retry limit.
*
* @marble infinite_retry
{
source observable : +-1-x
operator "retry:()" : +-1-1-1-1-1-1-1-1-1-1-1->
}
*
* @warning #include <rpp/operators/retry.hpp>
*
* @par Examples:
* @snippet retry.cpp retry_infinitely
*
* @ingroup error_handling_operators
* @see https://reactivex.io/documentation/operators/retry.html
*/
inline auto retry()
{
return details::retry_t{};
}
} // namespace rpp::operators
20 changes: 20 additions & 0 deletions src/tests/rpp/test_repeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "copy_count_tracker.hpp"
#include "disposable_observable.hpp"
#include "rpp_trompeloil.hpp"


TEST_CASE("repeat resubscribes")
Expand Down Expand Up @@ -135,3 +136,22 @@ TEST_CASE("repeat satisfies disposable contracts")
{
test_operator_with_disposable<int>(rpp::ops::repeat());
}


TEST_CASE("repeat handles stack overflow")
{
mock_observer<int> mock{};
trompeloeil::sequence seq;

constexpr size_t count = 500000;

REQUIRE_CALL(*mock, on_next_rvalue(trompeloeil::_)).TIMES(count).IN_SEQUENCE(seq);
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq);

rpp::source::create<int>([](const auto& obs) {
obs.on_next(1);
obs.on_completed();
})
| rpp::operators::repeat(count)
| rpp::operators::subscribe(mock);
}
Loading