Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .clangd
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CompileFlags:
CompilationDatabase: "build/"
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
#include <rpp/operators/finally.hpp>
#include <rpp/operators/observe_on.hpp>
#include <rpp/operators/repeat.hpp>
#include <rpp/operators/repeat_when.hpp>
#include <rpp/operators/subscribe_on.hpp>
#include <rpp/operators/tap.hpp>
#include <rpp/operators/timeout.hpp>
Expand Down
122 changes: 122 additions & 0 deletions src/rpp/rpp/operators/details/repeating_strategy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/observables/fwd.hpp>
#include <rpp/operators/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/utils/constraints.hpp>
#include <rpp/utils/utils.hpp>

#include "rpp/observers/fwd.hpp"

namespace rpp::operators::details
{
template<rpp::constraint::observer TObserver,
typename TObservable,
typename TNotifier>
struct repeating_state final : public rpp::composite_disposable
{
repeating_state(TObserver&& observer, const TObservable& observable, const TNotifier& notifier)
: observer(std::move(observer))
, observable(observable)
, notifier(notifier)
{
}

std::atomic_bool is_inside_drain{};

RPP_NO_UNIQUE_ADDRESS TObserver observer;
RPP_NO_UNIQUE_ADDRESS TObservable observable;
RPP_NO_UNIQUE_ADDRESS TNotifier notifier;
};

template<typename TStrategy, rpp::constraint::observer TObserver, typename TObservable, typename TNotifier>
void drain(const std::shared_ptr<repeating_state<TObserver, TObservable, TNotifier>>& state);

template<typename TOuterStrategy,
rpp::constraint::observer TObserver,
typename TObservable,
typename TNotifier>
struct repeating_inner_observer_strategy
{
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;

std::shared_ptr<repeating_state<TObserver, TObservable, TNotifier>> state;
mutable bool locally_disposed{};

template<typename T>
void on_next(T&&) const
{
locally_disposed = true;
state->clear();

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
return;

drain<TOuterStrategy>(state);
}

void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
state->observer.on_error(err);
}

void on_completed() const
{
locally_disposed = true;
state->observer.on_completed();
}

void set_upstream(const disposable_wrapper& d) const { state->add(d); }

Check warning on line 82 in src/rpp/rpp/operators/details/repeating_strategy.hpp

View check run for this annotation

Codecov / codecov/patch

src/rpp/rpp/operators/details/repeating_strategy.hpp#L82

Added line #L82 was not covered by tests

bool is_disposed() const { return locally_disposed || state->is_disposed(); }
};

template<rpp::constraint::observer TObserver,
typename TObservable,
typename TNotifier>
struct repeating_observer_strategy
{
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;

std::shared_ptr<repeating_state<TObserver, TObservable, TNotifier>> state;

void set_upstream(const disposable_wrapper& d) { state->add(d); }

bool is_disposed() const { return state->is_disposed(); }
};

template<typename TStrategy, rpp::constraint::observer TObserver, typename TObservable, typename TNotifier>
void drain(const std::shared_ptr<repeating_state<TObserver, TObservable, TNotifier>>& state)
{
while (!state->is_disposed())
{
state->is_inside_drain.store(true, std::memory_order::seq_cst);
try
{
using value_type = rpp::utils::extract_observer_type_t<TObserver>;
state->observable.subscribe(rpp::observer<value_type, TStrategy>{state});

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
return;
}
catch (...)
{
state->observer.on_error(std::current_exception());
return;
}
}
}
} // namespace rpp::operators::details
4 changes: 4 additions & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ namespace rpp::operators
requires rpp::constraint::observable<std::invoke_result_t<Selector, std::exception_ptr>>
auto on_error_resume_next(Selector&& selector);

template<typename Notifier>
requires rpp::constraint::observable<std::invoke_result_t<Notifier>>
auto repeat_when(Notifier&& notifier);

template<typename Notifier>
requires rpp::constraint::observable<std::invoke_result_t<Notifier, std::exception_ptr>>
auto retry_when(Notifier&& notifier);
Expand Down
100 changes: 100 additions & 0 deletions src/rpp/rpp/operators/repeat_when.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/observables/fwd.hpp>
#include <rpp/operators/fwd.hpp>

#include <rpp/operators/details/repeating_strategy.hpp>

namespace rpp::operators::details
{
template<rpp::constraint::observer TObserver,
typename TObservable,
typename TNotifier>
struct repeat_when_impl_strategy final : public repeating_observer_strategy<TObserver, TObservable, TNotifier>
{
using self_type = repeat_when_impl_strategy<TObserver, TObservable, TNotifier>;

using repeating_observer_strategy<TObserver, TObservable, TNotifier>::state;

template<typename T>
void on_next(T&& v) const
{
state->observer.on_next(std::forward<T>(v));
}

void on_error(const std::exception_ptr& err) const
{
state->observer.on_error(err);
}

void on_completed() const
{
try
{
state->notifier().subscribe(repeating_inner_observer_strategy<self_type, TObserver, TObservable, TNotifier>{this->state});
}
catch (...)
{
state->observer.on_error(std::current_exception());
}
}
};

template<rpp::constraint::decayed_type TNotifier>
struct repeat_when_t
{
RPP_NO_UNIQUE_ADDRESS TNotifier notifier;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
using result_type = T;
};

template<rpp::details::observables::constraint::disposables_strategy Prev>
using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;

template<rpp::constraint::observer TObserver, typename TObservable>
void subscribe(TObserver&& observer, TObservable&& observable) const
{
const auto d = disposable_wrapper_impl<repeating_state<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observable), notifier);
auto ptr = d.lock();

ptr->observer.set_upstream(d.as_weak());

drain<repeat_when_impl_strategy<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>(ptr);
}
};
} // namespace rpp::operators::details

namespace rpp::operators
{
/**
* @brief If observable completes, invoke @p notifier and when returned observable emits a value
* resubscribe to the source observable. If the notifier throws or returns an error/empty
* observable, then error/completed emission is forwarded to original subscription.
*
* @param notifier callable taking no arguments and returning observable notifying when to resubscribe
*
* @note `#include <rpp/operators/repeat_when.hpp>`
*
* @ingroup creational_operators
* @see https://reactivex.io/documentation/operators/repeat.html
*/
template<typename TNotifier>
requires rpp::constraint::observable<std::invoke_result_t<TNotifier>>
auto repeat_when(TNotifier&& notifier)
{
return details::repeat_when_t<std::decay_t<TNotifier>>{std::forward<TNotifier>(notifier)};
}
} // namespace rpp::operators
Loading
Loading