Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/rpp/rpp/observers/details/disposables_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,17 @@ namespace rpp::details::observers

static constexpr void dispose() {}
};

class boolean_disposables_strategy
{
public:
static constexpr void add(const rpp::disposable_wrapper&) {}

bool is_disposed() const noexcept { return m_is_disposed; }

void dispose() const { m_is_disposed = true; }

private:
mutable bool m_is_disposed{};
};
} // namespace rpp::details::observers
12 changes: 10 additions & 2 deletions src/rpp/rpp/observers/details/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ namespace rpp::details::observers
// No any disposables logic for observer expected
None = 1,
// Use external (passed to constructor) composite_disposable_wrapper as disposable
External = 2
External = 2,
// Observer just controls is_disposed or not but upstreams handled via observer_strategy
Boolean = 3
};

namespace constraint
Expand All @@ -47,6 +49,10 @@ namespace rpp::details::observers
* @brief No any disposable logic at all. Used only inside proxy-forwarding operators where extra disposable logic not requires
*/
struct none_disposables_strategy;
/**
* @brief Just control is_disposed or not via boolean and ignore upstreams at all
*/
class boolean_disposables_strategy;

/**
* @brief Keep disposables inside dynamic_disposables_container container (based on std::vector)
Expand All @@ -66,14 +72,16 @@ namespace rpp::details::observers
template<disposables_mode mode>
consteval auto* deduce_optimal_disposables_strategy()
{
static_assert(mode == disposables_mode::Auto || mode == disposables_mode::None || mode == disposables_mode::External);
static_assert(mode == disposables_mode::Auto || mode == disposables_mode::None || mode == disposables_mode::External || mode == disposables_mode::Boolean);

if constexpr (mode == disposables_mode::Auto)
return static_cast<default_disposables_strategy*>(nullptr);
else if constexpr (mode == disposables_mode::None)
return static_cast<none_disposables_strategy*>(nullptr);
else if constexpr (mode == disposables_mode::External)
return static_cast<composite_disposable_wrapper*>(nullptr);
else if constexpr (mode == disposables_mode::Boolean)
return static_cast<boolean_disposables_strategy*>(nullptr);
else
return static_cast<void*>(nullptr);
}
Expand Down
7 changes: 2 additions & 5 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,9 @@ namespace rpp::operators::details
template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_inner_observer_strategy
{
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;

std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable{};
mutable bool locally_disposed{};

template<typename T>
void on_next(T&& v) const
Expand All @@ -128,13 +127,11 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
disposable->get_observer()->on_error(err);
}

void on_completed() const
{
locally_disposed = true;
disposable->get_inner_child_disposable().clear();

ConcatStage current{ConcatStage::Draining};
Expand All @@ -148,7 +145,7 @@ namespace rpp::operators::details

void set_upstream(const disposable_wrapper& d) const { disposable->get_inner_child_disposable().add(d); }

bool is_disposed() const { return locally_disposed || disposable->get_inner_child_disposable().is_disposed(); }
bool is_disposed() const { return disposable->get_inner_child_disposable().is_disposed(); }
};

template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace rpp::operators::details
template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container, bool ClearOnError>
struct delay_observer_strategy
{
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
std::shared_ptr<delay_disposable<Observer, Worker, Container>> disposable{};

void set_upstream(const rpp::disposable_wrapper& d) const
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace rpp::operators::details
template<rpp::constraint::observer TObserver>
struct merge_observer_base_strategy
{
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
merge_observer_base_strategy(std::shared_ptr<merge_disposable<TObserver>>&& disposable)
: m_disposable{std::move(disposable)}
{
Expand Down
7 changes: 2 additions & 5 deletions src/rpp/rpp/operators/retry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ namespace rpp::operators::details
template<rpp::constraint::observer TObserver, typename TObservable>
struct retry_observer_strategy
{
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;

std::shared_ptr<retry_state_t<TObserver, TObservable>> state;
mutable bool locally_disposed{};

template<typename T>
void on_next(T&& v) const
Expand All @@ -54,7 +53,6 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
if (state->count == 0)
{
state->observer.on_error(err);
Expand All @@ -71,7 +69,6 @@ namespace rpp::operators::details

void on_completed() const
{
locally_disposed = true;
state->observer.on_completed();
}

Expand All @@ -80,7 +77,7 @@ namespace rpp::operators::details
state->add(d);
}

bool is_disposed() const { return locally_disposed || state->is_disposed(); }
bool is_disposed() const { return state->is_disposed(); }
};

template<rpp::constraint::observer TObserver, typename TObservable>
Expand Down
7 changes: 2 additions & 5 deletions src/rpp/rpp/sources/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ namespace rpp::details
template<rpp::constraint::observer TObserver, constraint::decayed_type PackedContainer>
struct concat_source_observer_strategy
{
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;

std::shared_ptr<concat_state_t<TObserver, PackedContainer>> state{};
mutable bool locally_disposed{};

template<typename T>
void on_next(T&& v) const
Expand All @@ -63,17 +62,15 @@ namespace rpp::details

void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
state->observer.on_error(err);
}

void set_upstream(const disposable_wrapper& d) { state->add(d); }

bool is_disposed() const { return locally_disposed || state->is_disposed(); }
bool is_disposed() const { return state->is_disposed(); }

void on_completed() const
{
locally_disposed = true;
state->clear();

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
Expand Down
2 changes: 2 additions & 0 deletions src/tests/utils/disposable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ void test_operator_over_observable_with_disposable(auto&& op)
const auto d = rpp::composite_disposable_wrapper::make();
obs.set_upstream(d);
obs.on_error({});
CHECK(obs.is_disposed());
CHECK(d.is_disposed());
})).subscribe([](const auto&) {}, [](const std::exception_ptr&) {});
}
Expand All @@ -115,6 +116,7 @@ void test_operator_over_observable_with_disposable(auto&& op)
const auto d = rpp::composite_disposable_wrapper::make();
obs.set_upstream(d);
obs.on_completed();
CHECK(obs.is_disposed());
CHECK(d.is_disposed());
})).subscribe([](const auto&) {}, [](const std::exception_ptr&) {});
}
Expand Down