diff --git a/src/rpp/rpp/disposables/refcount_disposable.hpp b/src/rpp/rpp/disposables/refcount_disposable.hpp index c41521cde..90396fc14 100644 --- a/src/rpp/rpp/disposables/refcount_disposable.hpp +++ b/src/rpp/rpp/disposables/refcount_disposable.hpp @@ -54,7 +54,12 @@ namespace rpp friend class details::refocunt_disposable_inner; refcount_disposable() = default; - composite_disposable_wrapper add_ref(); + enum class Mode : bool + { + WeakRefStrongSource, + StrongRefRefSource + }; + composite_disposable_wrapper add_ref(Mode mode = Mode::WeakRefStrongSource); private: std::atomic m_refcount{0}; @@ -91,7 +96,7 @@ namespace rpp::details namespace rpp { - inline composite_disposable_wrapper refcount_disposable::add_ref() + inline composite_disposable_wrapper refcount_disposable::add_ref(refcount_disposable::Mode mode) { auto current_value = m_refcount.load(std::memory_order::seq_cst); while (true) @@ -102,8 +107,8 @@ namespace rpp // just need atomicity, not guarding anything if (m_refcount.compare_exchange_strong(current_value, current_value + 1, std::memory_order::seq_cst)) { - auto inner = composite_disposable_wrapper::make(wrapper_from_this()); - add(inner.as_weak()); + auto inner = composite_disposable_wrapper::make(mode == Mode::WeakRefStrongSource ? wrapper_from_this() : wrapper_from_this().as_weak()); + add(mode == Mode::WeakRefStrongSource ? inner.as_weak() : inner); return inner; } } diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 671232cdf..2beeffc94 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -19,11 +19,11 @@ namespace rpp::operators::details { template - class combine_latest_disposable final : public combining_disposable + class combine_latest_state final : public combining_state { public: - explicit combine_latest_disposable(Observer&& observer, const TSelector& selector) - : combining_disposable(std::move(observer)) + explicit combine_latest_state(Observer&& observer, const TSelector& selector) + : combining_state(std::move(observer), sizeof...(Args)) , m_selector(selector) { } @@ -40,23 +40,23 @@ namespace rpp::operators::details template struct combine_latest_observer_strategy final - : public combining_observer_strategy> + : public combining_observer_strategy> { - using combining_observer_strategy>::disposable; + using combining_observer_strategy>::state; template void on_next(T&& v) const { // mutex need to be locked during changing of values, generating new values and sending of new values due to we can't update value while we are sending old one - const auto observer = disposable->get_observer_under_lock(); - disposable->get_values().template get().emplace(std::forward(v)); + const auto observer = state->get_observer_under_lock(); + state->get_values().template get().emplace(std::forward(v)); - disposable->get_values().apply(&apply_impl, disposable, observer); + state->get_values().apply(&apply_impl, state, observer); } private: - template - static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock& observer, const std::optional&... vals) + template + static void apply_impl(const TState& disposable, const rpp::utils::pointer_under_lock& observer, const std::optional&... vals) { if ((vals.has_value() && ...)) observer->on_next(disposable->get_selector()(vals.value()...)); @@ -64,7 +64,7 @@ namespace rpp::operators::details }; template - struct combine_latest_t : public combining_operator_t + struct combine_latest_t : public combining_operator_t { }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index b750b5e15..c4cc12823 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -34,29 +34,33 @@ namespace rpp::operators::details }; template - class concat_state_t final : public rpp::refcount_disposable + class concat_state_t final : public std::enable_shared_from_this> { public: concat_state_t(TObserver&& observer) : m_observer{std::move(observer)} { + const auto d = disposable_wrapper_impl::make(); + m_disposable = d.lock(); + get_observer()->set_upstream(d); } rpp::utils::pointer_under_lock get_observer() { return m_observer; } rpp::utils::pointer_under_lock> get_queue() { return m_queue; } + const std::shared_ptr& get_disposable() const { return m_disposable; } std::atomic& stage() { return m_stage; } void drain(rpp::composite_disposable_wrapper refcounted) { - while (!is_disposed()) + while (!m_disposable->is_disposed()) { const auto observable = get_observable(); if (!observable) { stage().store(ConcatStage::None, std::memory_order::relaxed); refcounted.dispose(); - if (is_disposed()) + if (m_disposable->is_disposed()) get_observer()->on_completed(); return; } @@ -74,12 +78,13 @@ namespace rpp::operators::details drain(refcounted); } + private: bool handle_observable_impl(const rpp::constraint::decayed_same_as auto& observable, rpp::composite_disposable_wrapper refcounted) { stage().store(ConcatStage::Draining, std::memory_order::relaxed); refcounted.clear(); - observable.subscribe(concat_inner_observer_strategy{disposable_wrapper_impl{wrapper_from_this()}.lock(), std::move(refcounted)}); + observable.subscribe(concat_inner_observer_strategy{this->shared_from_this(), std::move(refcounted)}); ConcatStage current = ConcatStage::Draining; return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst); @@ -97,6 +102,7 @@ namespace rpp::operators::details } private: + std::shared_ptr m_disposable{}; rpp::utils::value_with_mutex m_observer; rpp::utils::value_with_mutex> m_queue; std::atomic m_stage{}; @@ -112,7 +118,7 @@ namespace rpp::operators::details } concat_observer_strategy_base(std::shared_ptr> state) - : concat_observer_strategy_base{state, state->add_ref()} + : concat_observer_strategy_base{state, state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource)} { } @@ -122,7 +128,7 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { state->get_observer()->on_error(err); - state->dispose(); + state->get_disposable()->dispose(); } void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); } @@ -162,7 +168,7 @@ namespace rpp::operators::details using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; concat_observer_strategy(TObserver&& observer) - : base{init_state(std::move(observer))} + : base{std::make_shared>(std::move(observer))} { } @@ -171,7 +177,7 @@ namespace rpp::operators::details { ConcatStage current = ConcatStage::None; if (base::state->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst)) - base::state->handle_observable(std::forward(v), base::state->add_ref()); + base::state->handle_observable(std::forward(v), base::state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource)); else base::state->get_queue()->push(std::forward(v)); } @@ -179,19 +185,9 @@ namespace rpp::operators::details void on_completed() const { base::refcounted.dispose(); - if (base::state->is_disposed()) + if (base::state->get_disposable()->is_disposed()) base::state->get_observer()->on_completed(); } - - - private: - static std::shared_ptr> init_state(TObserver&& observer) - { - const auto d = disposable_wrapper_impl>::make(std::move(observer)); - auto ptr = d.lock(); - ptr->get_observer()->set_upstream(d.as_weak()); - return ptr; - } }; struct concat_t : lift_operator diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 1803c6db3..24c1a9d34 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -12,33 +12,31 @@ #include -#include #include #include namespace rpp::operators::details { - template - class debounce_disposable; + template + class debounce_state; - template - struct debounce_disposable_wrapper + template + struct debounce_state_wrapper { - std::shared_ptr> disposable{}; + std::shared_ptr> state{}; - bool is_disposed() const { return disposable->is_disposed(); } + bool is_disposed() const { return state->get_observer_under_lock()->is_disposed(); } - void on_error(const std::exception_ptr& err) const { disposable->get_observer_under_lock()->on_error(err); } + void on_error(const std::exception_ptr& err) const { state->get_observer_under_lock()->on_error(err); } }; - template - class debounce_disposable final : public rpp::composite_disposable_impl - , public rpp::details::enable_wrapper_from_this> + template + class debounce_state final : public std::enable_shared_from_this> { using T = rpp::utils::extract_observer_type_t; public: - debounce_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period) + debounce_state(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period) : m_observer(std::move(in_observer)) , m_worker{std::move(in_worker)} , m_period{period} @@ -71,17 +69,17 @@ namespace rpp::operators::details { m_worker.schedule( m_time_when_value_should_be_emitted.value(), - [](const debounce_disposable_wrapper& handler) -> schedulers::optional_delay_to { - auto value_or_duration = handler.disposable->extract_value_or_time(); + [](const debounce_state_wrapper& handler) -> schedulers::optional_delay_to { + auto value_or_duration = handler.state->extract_value_or_time(); if (auto* timepoint = std::get_if(&value_or_duration)) return schedulers::optional_delay_to{*timepoint}; if (auto* value = std::get_if(&value_or_duration)) - handler.disposable->get_observer_under_lock()->on_next(std::move(*value)); + handler.state->get_observer_under_lock()->on_next(std::move(*value)); return std::nullopt; }, - debounce_disposable_wrapper{this->wrapper_from_this().lock()}); + debounce_state_wrapper{this->shared_from_this()}); } std::variant extract_value_or_time() @@ -108,41 +106,38 @@ namespace rpp::operators::details std::optional m_value_to_be_emitted{}; }; - template + template struct debounce_observer_strategy { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - std::shared_ptr> disposable{}; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const { - disposable->add(d); + state->get_observer_under_lock()->set_upstream(d); } bool is_disposed() const { - return disposable->is_disposed(); + return state->get_observer_under_lock()->is_disposed(); } template void on_next(T&& v) const { - disposable->emplace_safe(std::forward(v)); + state->emplace_safe(std::forward(v)); } void on_error(const std::exception_ptr& err) const noexcept { - disposable->dispose(); - disposable->get_observer_under_lock()->on_error(err); + state->get_observer_under_lock()->on_error(err); } void on_completed() const noexcept { - disposable->dispose(); - const auto value = disposable->extract_value(); - const auto observer = disposable->get_observer_under_lock(); - if (value) + const auto observer = state->get_observer_under_lock(); + if (const auto value = state->extract_value()) observer->on_next(std::move(value).value()); observer->on_completed(); } @@ -158,21 +153,18 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using updated_disposable_strategy = Prev; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; - template - auto lift_with_disposable_strategy(Observer&& observer) const + template + auto lift(Observer&& observer) const { - using worker_t = rpp::schedulers::utils::get_worker_t; - using container = typename DisposableStrategy::disposable_container; + using worker_t = rpp::schedulers::utils::get_worker_t; - const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); - auto ptr = disposable.lock(); - ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); - return rpp::observer, worker_t, container>>{std::move(ptr)}; + auto ptr = std::make_shared, worker_t>>(std::forward(observer), scheduler.create_worker(), duration); + return rpp::observer, worker_t>>{std::move(ptr)}; } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 649844ca0..f92106414 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -13,7 +13,6 @@ #include #include -#include #include #include @@ -35,12 +34,12 @@ namespace rpp::operators::details rpp::schedulers::time_point time_point{}; }; - template - struct delay_disposable final : public rpp::composite_disposable_impl + template + struct delay_state final { using T = rpp::utils::extract_observer_type_t; - delay_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration delay) + delay_state(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration delay) : observer(std::move(in_observer)) , worker{std::move(in_worker)} , delay{delay} @@ -56,29 +55,29 @@ namespace rpp::operators::details bool is_active{}; }; - template - struct delay_disposable_wrapper + template + struct delay_state_wrapper { - std::shared_ptr> disposable{}; + std::shared_ptr> state{}; - bool is_disposed() const { return disposable->is_disposed(); } + bool is_disposed() const { return state->observer.is_disposed(); } - void on_error(const std::exception_ptr& err) const { disposable->observer.on_error(err); } + void on_error(const std::exception_ptr& err) const { state->observer.on_error(err); } }; - template + template struct delay_observer_strategy { - std::shared_ptr> disposable{}; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const { - disposable->add(d); + state->observer.set_upstream(d); } bool is_disposed() const { - return disposable->is_disposed(); + return state->observer.is_disposed(); } template @@ -103,59 +102,59 @@ namespace rpp::operators::details { if (const auto tp = emplace_safe(std::forward(value))) { - disposable->worker.schedule( + state->worker.schedule( tp.value(), - [](const delay_disposable_wrapper& wrapper) { return drain_queue(wrapper.disposable); }, - delay_disposable_wrapper{disposable}); + [](const delay_state_wrapper& wrapper) { return drain_queue(wrapper.state); }, + delay_state_wrapper{state}); } } template std::optional emplace_safe(TT&& item) const { - std::lock_guard lock{disposable->mutex}; + std::lock_guard lock{state->mutex}; if constexpr (ClearOnError && rpp::constraint::decayed_same_as) { - disposable->queue = std::queue>>{}; - disposable->observer.on_error(std::forward(item)); + state->queue = std::queue>>{}; + state->observer.on_error(std::forward(item)); return std::nullopt; } else { - const auto tp = disposable->worker.now() + disposable->delay; - disposable->queue.emplace(std::forward(item), tp); - if (!disposable->is_active) + const auto tp = state->worker.now() + state->delay; + state->queue.emplace(std::forward(item), tp); + if (!state->is_active) { - disposable->is_active = true; + state->is_active = true; return tp; } return std::nullopt; } } - static schedulers::optional_delay_to drain_queue(const std::shared_ptr>& disposable) + static schedulers::optional_delay_to drain_queue(const std::shared_ptr>& state) { while (true) { - std::unique_lock lock{disposable->mutex}; - if (disposable->queue.empty()) + std::unique_lock lock{state->mutex}; + if (state->queue.empty()) { - disposable->is_active = false; + state->is_active = false; return std::nullopt; } - auto& top = disposable->queue.front(); - if (top.time_point > disposable->worker.now()) + auto& top = state->queue.front(); + if (top.time_point > state->worker.now()) return schedulers::optional_delay_to{top.time_point}; auto item = std::move(top.value); - disposable->queue.pop(); + state->queue.pop(); lock.unlock(); - std::visit(rpp::utils::overloaded{[&](rpp::utils::extract_observer_type_t&& v) { disposable->observer.on_next(std::move(v)); }, - [&](const std::exception_ptr& err) { disposable->observer.on_error(err); }, + std::visit(rpp::utils::overloaded{[&](rpp::utils::extract_observer_type_t&& v) { state->observer.on_next(std::move(v)); }, + [&](const std::exception_ptr& err) { state->observer.on_error(err); }, [&](rpp::utils::none) { - disposable->observer.on_completed(); + state->observer.on_completed(); }}, std::move(item)); } @@ -172,21 +171,18 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using updated_disposable_strategy = Prev; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; - template - auto lift_with_disposable_strategy(Observer&& observer) const + template + auto lift(Observer&& observer) const { - using worker_t = rpp::schedulers::utils::get_worker_t; - using container = typename DisposableStrategy::disposable_container; + using worker_t = rpp::schedulers::utils::get_worker_t; - const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); - auto ptr = disposable.lock(); - ptr->observer.set_upstream(disposable.as_weak()); - return rpp::observer, worker_t, container, ClearOnError>>{std::move(ptr)}; + auto state = std::make_shared, worker_t>>(std::forward(observer), scheduler.create_worker(), duration); + return rpp::observer, worker_t, ClearOnError>>{std::move(state)}; } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index f7f5152c5..4aa03b752 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -13,7 +13,6 @@ #include #include -#include #include #include #include @@ -22,12 +21,13 @@ namespace rpp::operators::details { - template - class combining_disposable : public composite_disposable + template + class combining_state { public: - explicit combining_disposable(Observer&& observer) + explicit combining_state(Observer&& observer, size_t on_completed_needed) : m_observer_with_mutex{std::move(observer)} + , m_on_completed_needed{on_completed_needed} { } @@ -42,41 +42,37 @@ namespace rpp::operators::details private: rpp::utils::value_with_mutex m_observer_with_mutex{}; - std::atomic_size_t m_on_completed_needed{sizeof...(Args)}; + std::atomic_size_t m_on_completed_needed; }; - template + template struct combining_observer_strategy { - std::shared_ptr disposable{}; + std::shared_ptr state{}; void set_upstream(const rpp::disposable_wrapper& d) const { - disposable->add(d); + state->get_observer_under_lock()->set_upstream(d); } bool is_disposed() const { - return disposable->is_disposed(); + return state->get_observer_under_lock()->is_disposed(); } void on_error(const std::exception_ptr& err) const { - disposable->get_observer_under_lock()->on_error(err); - disposable->dispose(); + state->get_observer_under_lock()->on_error(err); } void on_completed() const { - if (disposable->decrement_on_completed()) - { - disposable->get_observer_under_lock()->on_completed(); - disposable->dispose(); - } + if (state->decrement_on_completed()) + state->get_observer_under_lock()->on_completed(); } }; - template typename TDisposable, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> + template typename TState, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> struct combining_operator_t { RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables; @@ -93,7 +89,7 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using updated_disposable_strategy = ::rpp::details::observables::default_disposable_strategy_selector; // TODO: sum of Prev + TObservables template auto lift(Observer&& observer) const @@ -105,20 +101,18 @@ namespace rpp::operators::details template static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { - using Disposable = TDisposable...>; + using State = TState...>; - const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); - auto locked = disposable.lock(); - locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); - subscribe>(locked, std::index_sequence_for{}, observables...); + const auto state = std::make_shared(std::forward(observer), selector); + subscribe>(state, std::index_sequence_for{}, observables...); - return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(locked)}; + return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(state)}; } template - static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) + static void subscribe(const std::shared_ptr...>>& state, std::index_sequence, const TObservables&... observables) { - (..., observables.subscribe(rpp::observer, TStrategy...>>{disposable})); + (..., observables.subscribe(rpp::observer, TStrategy...>>{state})); } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index d7715098c..791a65f84 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -13,7 +13,6 @@ #include #include -#include #include #include #include @@ -24,12 +23,13 @@ namespace rpp::operators::details { template - class merge_disposable final : public composite_disposable + class merge_state final { public: - merge_disposable(TObserver&& observer) + merge_state(TObserver&& observer) : m_observer(std::move(observer)) { + get_observer_under_lock()->set_upstream(m_disposable); } // just need atomicity, not guarding anything @@ -40,56 +40,60 @@ namespace rpp::operators::details rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer; } + const rpp::composite_disposable_wrapper& get_disposable() const { return m_disposable; } + private: rpp::utils::value_with_mutex m_observer{}; + rpp::composite_disposable_wrapper m_disposable = composite_disposable_wrapper::make(); std::atomic_size_t m_on_completed_needed{1}; }; template struct merge_observer_base_strategy { - merge_observer_base_strategy(std::shared_ptr>&& disposable) - : m_disposable{std::move(disposable)} + merge_observer_base_strategy(std::shared_ptr>&& state) + : m_state{std::move(state)} { } - merge_observer_base_strategy(const std::shared_ptr>& disposable) - : m_disposable{disposable} + merge_observer_base_strategy(const std::shared_ptr>& state) + : m_state{state} { } void set_upstream(const rpp::disposable_wrapper& d) const { - m_disposable->add(d); + m_state->get_disposable().add(d); m_disposables.push_back(d); } bool is_disposed() const { - return m_disposable->is_disposed(); + return m_state->get_observer_under_lock()->is_disposed(); } void on_error(const std::exception_ptr& err) const { - m_disposable->get_observer_under_lock()->on_error(err); - m_disposable->dispose(); + m_state->get_observer_under_lock()->on_error(err); } void on_completed() const { - if (m_disposable->decrement_on_completed()) + if (m_state->decrement_on_completed()) + { + m_state->get_observer_under_lock()->on_completed(); + } + else { for (const auto& v : m_disposables) { - m_disposable->remove(v); + m_state->get_disposable().remove(v); } - m_disposable->get_observer_under_lock()->on_completed(); - m_disposable->dispose(); } } protected: - std::shared_ptr> m_disposable; + std::shared_ptr> m_state; mutable std::vector m_disposables{}; }; @@ -101,7 +105,7 @@ namespace rpp::operators::details template void on_next(T&& v) const { - merge_observer_base_strategy::m_disposable->get_observer_under_lock()->on_next(std::forward(v)); + merge_observer_base_strategy::m_state->get_observer_under_lock()->on_next(std::forward(v)); } }; @@ -110,24 +114,15 @@ namespace rpp::operators::details { public: explicit merge_observer_strategy(TObserver&& observer) - : merge_observer_base_strategy{init_state(std::move(observer))} + : merge_observer_base_strategy{std::make_shared>(std::move(observer))} { } template void on_next(T&& v) const { - merge_observer_base_strategy::m_disposable->increment_on_completed(); - std::forward(v).subscribe(rpp::observer, merge_observer_inner_strategy>{merge_observer_inner_strategy{merge_observer_base_strategy::m_disposable}}); - } - - private: - static std::shared_ptr> init_state(TObserver&& observer) - { - const auto d = disposable_wrapper_impl>::make(std::move(observer)); - auto ptr = d.lock(); - ptr->get_observer_under_lock()->set_upstream(d.as_weak()); - return ptr; + merge_observer_base_strategy::m_state->increment_on_completed(); + std::forward(v).subscribe(rpp::observer, merge_observer_inner_strategy>{merge_observer_inner_strategy{merge_observer_base_strategy::m_state}}); } }; diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index c010613ca..23d1c820b 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -17,18 +17,6 @@ namespace rpp::operators::details { - template - struct on_error_resume_next_disposable final : public rpp::composite_disposable - { - on_error_resume_next_disposable(TObserver&& observer) - : rpp::composite_disposable{} - , observer(std::move(observer)) - { - } - - RPP_NO_UNIQUE_ADDRESS TObserver observer; - }; - template struct on_error_resume_next_inner_observer_strategy { @@ -64,53 +52,43 @@ namespace rpp::operators::details using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; on_error_resume_next_observer_strategy(TObserver&& observer, const Selector& selector) - : state{init_state(std::move(observer))} + : state{std::make_shared(std::move(observer))} , selector{selector} { } - std::shared_ptr> state; - RPP_NO_UNIQUE_ADDRESS Selector selector; + std::shared_ptr state; + RPP_NO_UNIQUE_ADDRESS Selector selector; template void on_next(T&& v) const { - state->observer.on_next(std::forward(v)); + state->on_next(std::forward(v)); } void on_error(const std::exception_ptr& err) const { try { - selector(err).subscribe(on_error_resume_next_inner_observer_strategy{std::shared_ptr(state, &state->observer)}); + selector(err).subscribe(on_error_resume_next_inner_observer_strategy{state}); } catch (...) { - state->observer.on_error(std::current_exception()); + state->on_error(std::current_exception()); } - state->dispose(); } void on_completed() const { - state->observer.on_completed(); - state->dispose(); + state->on_completed(); } void set_upstream(const disposable_wrapper& d) const { - state->add(d); + state->set_upstream(d); } bool is_disposed() const { return state->is_disposed(); } - - static std::shared_ptr> init_state(TObserver&& observer) - { - const auto d = disposable_wrapper_impl>::make(std::move(observer)); - auto ptr = d.lock(); - ptr->observer.set_upstream(d.as_weak()); - return ptr; - } }; template @@ -135,7 +113,7 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::atomic_dynamic_disposable_strategy_selector<1>; + using updated_disposable_strategy = rpp::details::observables::default_disposable_strategy_selector; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index 686daec58..037ac7fb7 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -18,7 +18,7 @@ namespace rpp::operators::details { template - struct retry_state_t final : public rpp::composite_disposable + struct retry_state_t final { retry_state_t(TObserver&& in_observer, const Observable& observable, std::optional count) : count{count} @@ -26,6 +26,7 @@ namespace rpp::operators::details , observable(observable) { + observer.set_upstream(disposable); } std::optional count; @@ -33,6 +34,8 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS Observable observable; + + rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); }; template @@ -58,11 +61,10 @@ namespace rpp::operators::details if (state->count == 0) { state->observer.on_error(err); - state->dispose(); return; } - state->clear(); + state->disposable.clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -78,16 +80,16 @@ namespace rpp::operators::details void set_upstream(const disposable_wrapper& d) const { - state->add(d); + state->disposable.add(d); } - bool is_disposed() const { return locally_disposed || state->is_disposed(); } + bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } }; template void drain(const std::shared_ptr>& state) { - while (!state->is_disposed()) + while (!state->disposable.is_disposed()) { if (state->count) --state->count.value(); @@ -124,10 +126,7 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observble) const { - const auto d = disposable_wrapper_impl, std::decay_t>>::make(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); - auto ptr = d.lock(); - - ptr->observer.set_upstream(d.as_weak()); + const auto ptr = std::make_shared, std::decay_t>>(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); drain(ptr); } }; diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index bf481db07..d5ad56a69 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -23,13 +23,14 @@ namespace rpp::operators::details template - struct retry_when_state final : public rpp::composite_disposable + struct retry_when_state final { - retry_when_state(TObserver&& observer, const TObservable& observable, const TNotifier& notifier) - : observer(std::move(observer)) + retry_when_state(TObserver&& in_observer, const TObservable& observable, const TNotifier& notifier) + : observer(std::move(in_observer)) , observable(observable) , notifier(notifier) { + observer.set_upstream(disposable); } std::atomic_bool is_inside_drain{}; @@ -37,6 +38,8 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS TObservable observable; RPP_NO_UNIQUE_ADDRESS TNotifier notifier; + + rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); }; template @@ -56,7 +59,7 @@ namespace rpp::operators::details void on_next(T&&) const { locally_disposed = true; - state->clear(); + state->disposable.clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -76,9 +79,9 @@ namespace rpp::operators::details state->observer.on_completed(); } - void set_upstream(const disposable_wrapper& d) const { state->add(d); } + void set_upstream(const disposable_wrapper& d) const { state->disposable.add(d); } - bool is_disposed() const { return locally_disposed || state->is_disposed(); } + bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } }; templateobserver.on_completed(); } - void set_upstream(const disposable_wrapper& d) { state->add(d); } + void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); } - bool is_disposed() const { return state->is_disposed(); } + bool is_disposed() const { return state->disposable.is_disposed(); } }; template void drain(const std::shared_ptr>& state) { - while (!state->is_disposed()) + while (!state->disposable.is_disposed()) { state->is_inside_drain.store(true, std::memory_order::seq_cst); try @@ -157,10 +160,7 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observable) const { - const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), std::forward(observable), notifier); - auto ptr = d.lock(); - - ptr->observer.set_upstream(d.as_weak()); + const auto ptr = std::make_shared, std::decay_t, std::decay_t>>(std::forward(observer), std::forward(observable), notifier); drain(ptr); } }; diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index ed2aca2c2..c8c20e8e9 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -13,22 +13,21 @@ #include #include -#include #include #include namespace rpp::operators::details { template - class take_until_disposable final : public rpp::composite_disposable + class take_until_state final { public: - take_until_disposable(TObserver&& observer) + take_until_state(TObserver&& observer) : m_observer_with_mutex(std::move(observer)) { } - take_until_disposable(const TObserver& observer) + take_until_state(const TObserver& observer) : m_observer_with_mutex(observer) { } @@ -49,7 +48,7 @@ namespace rpp::operators::details { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - std::shared_ptr> state; + std::shared_ptr> state; void on_error(const std::exception_ptr& err) const { @@ -63,9 +62,9 @@ namespace rpp::operators::details state->get_observer()->on_completed(); } - void set_upstream(const disposable_wrapper& d) { state->add(d); } + void set_upstream(const disposable_wrapper& d) { state->get_observer()->set_upstream(d); } - bool is_disposed() const { return state->is_disposed(); } + bool is_disposed() const { return state->get_observer()->is_disposed(); } }; template @@ -104,14 +103,12 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using updated_disposable_strategy = rpp::details::observables::default_disposable_strategy_selector; template auto lift(Observer&& observer) const { - const auto d = disposable_wrapper_impl>>::make(std::forward(observer)); - auto ptr = d.lock(); - ptr->get_observer()->set_upstream(d.as_weak()); + auto ptr = std::make_shared>>(std::forward(observer)); observable.subscribe(take_until_throttle_observer_strategy>{ptr}); return rpp::observer>>(std::move(ptr)); diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index 78ae5bafe..1e0eff555 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -23,53 +23,55 @@ namespace rpp::operators::details { template - class with_latest_from_disposable final : public composite_disposable + class with_latest_from_state final { public: - explicit with_latest_from_disposable(Observer&& observer, const TSelector& selector) + explicit with_latest_from_state(Observer&& observer, const TSelector& selector) : m_observer_with_mutex{std::move(observer)} , m_selector{selector} { + get_observer_under_lock()->set_upstream(m_disposable); } rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer_with_mutex; } rpp::utils::tuple>...>& get_values() { return m_values; } - const TSelector& get_selector() const { return m_selector; } + const TSelector& get_selector() const { return m_selector; } + const composite_disposable_wrapper& get_disposable() const { return m_disposable; } private: rpp::utils::value_with_mutex m_observer_with_mutex{}; rpp::utils::tuple>...> m_values{}; + composite_disposable_wrapper m_disposable = composite_disposable_wrapper::make(); RPP_NO_UNIQUE_ADDRESS TSelector m_selector; }; template struct with_latest_from_inner_observer_strategy { - std::shared_ptr> disposable{}; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const { - disposable->add(d); + state->get_disposable().add(d); } bool is_disposed() const { - return disposable->is_disposed(); + return state->get_disposable().is_disposed(); } template void on_next(T&& v) const { - auto locked_value = disposable->get_values().template get().lock(); + auto locked_value = state->get_values().template get().lock(); locked_value->emplace(std::forward(v)); } void on_error(const std::exception_ptr& err) const { - disposable->get_observer_under_lock()->on_error(err); - disposable->dispose(); + state->get_observer_under_lock()->on_error(err); } static constexpr rpp::utils::empty_function_t<> on_completed{}; @@ -79,26 +81,26 @@ namespace rpp::operators::details requires std::invocable struct with_latest_from_observer_strategy { - using Disposable = with_latest_from_disposable; + using Disposable = with_latest_from_state; using Result = std::invoke_result_t; using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - std::shared_ptr disposable{}; + std::shared_ptr state{}; void set_upstream(const rpp::disposable_wrapper& d) const { - disposable->add(d); + state->get_disposable().add(d); } bool is_disposed() const { - return disposable->is_disposed(); + return state->get_disposable().is_disposed(); } template void on_next(T&& v) const { - auto result = disposable->get_values().apply([&d = this->disposable, &v](rpp::utils::value_with_mutex>&... vals) -> std::optional { + auto result = state->get_values().apply([&d = this->state, &v](rpp::utils::value_with_mutex>&... vals) -> std::optional { auto lock = std::scoped_lock{vals.get_mutex()...}; if ((vals.get_value_unsafe().has_value() && ...)) @@ -107,19 +109,17 @@ namespace rpp::operators::details }); if (result.has_value()) - disposable->get_observer_under_lock()->on_next(std::move(result).value()); + state->get_observer_under_lock()->on_next(std::move(result).value()); } void on_error(const std::exception_ptr& err) const { - disposable->get_observer_under_lock()->on_error(err); - disposable->dispose(); + state->get_observer_under_lock()->on_error(err); } void on_completed() const { - disposable->get_observer_under_lock()->on_completed(); - disposable->dispose(); + state->get_observer_under_lock()->on_completed(); } }; @@ -140,7 +140,7 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using updated_disposable_strategy = rpp::details::observables::default_disposable_strategy_selector; template auto lift(Observer&& observer) const @@ -152,20 +152,18 @@ namespace rpp::operators::details template static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { - using Disposable = with_latest_from_disposable...>; + using State = with_latest_from_state...>; - const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); - auto ptr = disposable.lock(); - ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); + auto ptr = std::make_shared(std::forward(observer), selector); subscribe(ptr, std::index_sequence_for{}, observables...); return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(ptr)}; } template - static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) + static void subscribe(const std::shared_ptr...>>& state, std::index_sequence, const TObservables&... observables) { - (..., observables.subscribe(rpp::observer, with_latest_from_inner_observer_strategy...>>{disposable})); + (..., observables.subscribe(rpp::observer, with_latest_from_inner_observer_strategy...>>{state})); } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp index f4be887e3..f785dc7ad 100644 --- a/src/rpp/rpp/operators/zip.hpp +++ b/src/rpp/rpp/operators/zip.hpp @@ -21,11 +21,11 @@ namespace rpp::operators::details { template - class zip_disposable final : public combining_disposable + class zip_state final : public combining_state { public: - explicit zip_disposable(Observer&& observer, const TSelector& selector) - : combining_disposable(std::move(observer)) + explicit zip_state(Observer&& observer, const TSelector& selector) + : combining_state(std::move(observer), sizeof...(Args)) , m_selector(selector) { } @@ -42,17 +42,17 @@ namespace rpp::operators::details template struct zip_observer_strategy final - : public combining_observer_strategy> + : public combining_observer_strategy> { - using combining_observer_strategy>::disposable; + using combining_observer_strategy>::state; template void on_next(T&& v) const { - const auto observer = disposable->get_observer_under_lock(); - disposable->get_pendings().template get().push_back(std::forward(v)); + const auto observer = state->get_observer_under_lock(); + state->get_pendings().template get().push_back(std::forward(v)); - disposable->get_pendings().apply(&apply_impl, disposable, observer); + state->get_pendings().apply(&apply_impl, state, observer); } private: @@ -68,7 +68,7 @@ namespace rpp::operators::details }; template - struct zip_t : public combining_operator_t + struct zip_t : public combining_operator_t { }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index ef0d6ab94..85b0dac97 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -22,12 +22,13 @@ namespace rpp::details { template - struct concat_state_t : public rpp::composite_disposable + struct concat_state_t { concat_state_t(TObserver&& in_observer, const PackedContainer& in_container) : observer(std::move(in_observer)) , container(in_container) { + observer.set_upstream(disposable); try { itr = std::cbegin(container); @@ -35,12 +36,13 @@ namespace rpp::details catch (...) { this->observer.on_error(std::current_exception()); - this->dispose(); + this->disposable.dispose(); } } RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS PackedContainer container; + rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); std::optional itr{}; std::atomic is_inside_drain{}; }; @@ -68,14 +70,14 @@ namespace rpp::details state->observer.on_error(err); } - void set_upstream(const disposable_wrapper& d) { state->add(d); } + void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); } - bool is_disposed() const { return locally_disposed || state->is_disposed(); } + bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } void on_completed() const { locally_disposed = true; - state->clear(); + state->disposable.clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -87,12 +89,12 @@ namespace rpp::details template void drain(const std::shared_ptr>& state) { - while (!state->is_disposed()) + while (!state->disposable.is_disposed()) { if (state->itr.value() == std::cend(state->container)) { state->observer.on_completed(); - state->dispose(); + state->disposable.dispose(); return; } @@ -130,10 +132,7 @@ namespace rpp::details template Strategy> void subscribe(observer&& obs) const { - const auto d = disposable_wrapper_impl, PackedContainer>>::make(std::move(obs), container); - const auto state = d.lock(); - state->observer.set_upstream(d.as_weak()); - drain(state); + drain(std::make_shared, PackedContainer>>(std::move(obs), container)); } }; diff --git a/src/tests/rpp/test_take_until.cpp b/src/tests/rpp/test_take_until.cpp index 7d65a96a8..5ab1b77cf 100644 --- a/src/tests/rpp/test_take_until.cpp +++ b/src/tests/rpp/test_take_until.cpp @@ -14,8 +14,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -146,7 +148,7 @@ TEST_CASE("take_until can handle race condition") SECTION("on_completed shall not interleave with on_next") { - rpp::source::interval(std::chrono::milliseconds{200}, rpp::schedulers::current_thread{}) + rpp::source::concat(rpp::source::just(1) | rpp::ops::take(1), rpp::source::never()) | rpp::ops::take_until(subject.get_observable()) | rpp::ops::as_blocking() | rpp::ops::subscribe([&](auto&&) { @@ -173,7 +175,7 @@ TEST_CASE("take_until can handle race condition") SECTION("on_error shall not interleave with on_next") { - rpp::source::interval(std::chrono::milliseconds{200}, rpp::schedulers::current_thread{}) + rpp::source::concat(rpp::source::just(1) | rpp::ops::take(1), rpp::source::never()) | rpp::ops::take_until(subject.get_observable()) | rpp::ops::as_blocking() | rpp::ops::subscribe([&](auto&&) { diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index 49e451f13..c8124077f 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -99,6 +99,21 @@ void test_operator_over_observable_with_disposable(auto&& op) CHECK(observable_disposable.is_disposed()); } + SECTION("operator doesn't disposes disposable too early") + { + auto observable_disposable = rpp::composite_disposable_wrapper::make(); + auto observable = rpp::source::create([&observable_disposable](auto&& obs) { + obs.set_upstream(observable_disposable); + }); + + auto observer_disposable = rpp::composite_disposable_wrapper::make(); + op(observable) | rpp::ops::subscribe(observer_disposable, [](const auto&) {}); + + CHECK(!observable_disposable.is_disposed()); + observer_disposable.dispose(); + CHECK(observable_disposable.is_disposed()); + } + SECTION("operator disposes disposable on_error") { op(rpp::source::create([](auto&& obs) {