From 867f101cbbdcc3a423250e91c20939c81ad25be8 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 13 Sep 2024 22:57:09 +0300 Subject: [PATCH 01/17] add test --- src/tests/utils/disposable_observable.hpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index 49e451f13..c8124077f 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -99,6 +99,21 @@ void test_operator_over_observable_with_disposable(auto&& op) CHECK(observable_disposable.is_disposed()); } + SECTION("operator doesn't disposes disposable too early") + { + auto observable_disposable = rpp::composite_disposable_wrapper::make(); + auto observable = rpp::source::create([&observable_disposable](auto&& obs) { + obs.set_upstream(observable_disposable); + }); + + auto observer_disposable = rpp::composite_disposable_wrapper::make(); + op(observable) | rpp::ops::subscribe(observer_disposable, [](const auto&) {}); + + CHECK(!observable_disposable.is_disposed()); + observer_disposable.dispose(); + CHECK(observable_disposable.is_disposed()); + } + SECTION("operator disposes disposable on_error") { op(rpp::source::create([](auto&& obs) { From bd836f84f8258cb95cdd79b2af49049dac58786b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 15 Sep 2024 00:01:08 +0300 Subject: [PATCH 02/17] partial fix --- src/rpp/rpp/operators/combine_latest.hpp | 22 ++++----- .../operators/details/combining_strategy.hpp | 45 +++++++++---------- src/rpp/rpp/operators/zip.hpp | 18 ++++---- 3 files changed, 40 insertions(+), 45 deletions(-) diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 671232cdf..2beeffc94 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -19,11 +19,11 @@ namespace rpp::operators::details { template - class combine_latest_disposable final : public combining_disposable + class combine_latest_state final : public combining_state { public: - explicit combine_latest_disposable(Observer&& observer, const TSelector& selector) - : combining_disposable(std::move(observer)) + explicit combine_latest_state(Observer&& observer, const TSelector& selector) + : combining_state(std::move(observer), sizeof...(Args)) , m_selector(selector) { } @@ -40,23 +40,23 @@ namespace rpp::operators::details template struct combine_latest_observer_strategy final - : public combining_observer_strategy> + : public combining_observer_strategy> { - using combining_observer_strategy>::disposable; + using combining_observer_strategy>::state; template void on_next(T&& v) const { // mutex need to be locked during changing of values, generating new values and sending of new values due to we can't update value while we are sending old one - const auto observer = disposable->get_observer_under_lock(); - disposable->get_values().template get().emplace(std::forward(v)); + const auto observer = state->get_observer_under_lock(); + state->get_values().template get().emplace(std::forward(v)); - disposable->get_values().apply(&apply_impl, disposable, observer); + state->get_values().apply(&apply_impl, state, observer); } private: - template - static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock& observer, const std::optional&... vals) + template + static void apply_impl(const TState& disposable, const rpp::utils::pointer_under_lock& observer, const std::optional&... vals) { if ((vals.has_value() && ...)) observer->on_next(disposable->get_selector()(vals.value()...)); @@ -64,7 +64,7 @@ namespace rpp::operators::details }; template - struct combine_latest_t : public combining_operator_t + struct combine_latest_t : public combining_operator_t { }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index f7f5152c5..d3a0b1c92 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -22,12 +22,13 @@ namespace rpp::operators::details { - template - class combining_disposable : public composite_disposable + template + class combining_state { public: - explicit combining_disposable(Observer&& observer) + explicit combining_state(Observer&& observer, size_t on_completed_needed) : m_observer_with_mutex{std::move(observer)} + , m_on_completed_needed{on_completed_needed} { } @@ -42,41 +43,37 @@ namespace rpp::operators::details private: rpp::utils::value_with_mutex m_observer_with_mutex{}; - std::atomic_size_t m_on_completed_needed{sizeof...(Args)}; + std::atomic_size_t m_on_completed_needed; }; - template + template struct combining_observer_strategy { - std::shared_ptr disposable{}; + std::shared_ptr state{}; void set_upstream(const rpp::disposable_wrapper& d) const { - disposable->add(d); + state->get_observer_under_lock()->set_upstream(d); } bool is_disposed() const { - return disposable->is_disposed(); + return state->get_observer_under_lock()->is_disposed(); } void on_error(const std::exception_ptr& err) const { - disposable->get_observer_under_lock()->on_error(err); - disposable->dispose(); + state->get_observer_under_lock()->on_error(err); } void on_completed() const { - if (disposable->decrement_on_completed()) - { - disposable->get_observer_under_lock()->on_completed(); - disposable->dispose(); - } + if (state->decrement_on_completed()) + state->get_observer_under_lock()->on_completed(); } }; - template typename TDisposable, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> + template typename TState, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> struct combining_operator_t { RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables; @@ -93,7 +90,7 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using updated_disposable_strategy = ::rpp::details::observables::default_disposable_strategy_selector; // TODO: sum of Prev + TObservables template auto lift(Observer&& observer) const @@ -105,20 +102,18 @@ namespace rpp::operators::details template static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { - using Disposable = TDisposable...>; + using State = TState...>; - const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); - auto locked = disposable.lock(); - locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); - subscribe>(locked, std::index_sequence_for{}, observables...); + const auto state = std::make_shared(std::forward(observer), selector); + subscribe>(state, std::index_sequence_for{}, observables...); - return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(locked)}; + return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(state)}; } template - static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) + static void subscribe(const std::shared_ptr...>>& state, std::index_sequence, const TObservables&... observables) { - (..., observables.subscribe(rpp::observer, TStrategy...>>{disposable})); + (..., observables.subscribe(rpp::observer, TStrategy...>>{state})); } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp index f4be887e3..f785dc7ad 100644 --- a/src/rpp/rpp/operators/zip.hpp +++ b/src/rpp/rpp/operators/zip.hpp @@ -21,11 +21,11 @@ namespace rpp::operators::details { template - class zip_disposable final : public combining_disposable + class zip_state final : public combining_state { public: - explicit zip_disposable(Observer&& observer, const TSelector& selector) - : combining_disposable(std::move(observer)) + explicit zip_state(Observer&& observer, const TSelector& selector) + : combining_state(std::move(observer), sizeof...(Args)) , m_selector(selector) { } @@ -42,17 +42,17 @@ namespace rpp::operators::details template struct zip_observer_strategy final - : public combining_observer_strategy> + : public combining_observer_strategy> { - using combining_observer_strategy>::disposable; + using combining_observer_strategy>::state; template void on_next(T&& v) const { - const auto observer = disposable->get_observer_under_lock(); - disposable->get_pendings().template get().push_back(std::forward(v)); + const auto observer = state->get_observer_under_lock(); + state->get_pendings().template get().push_back(std::forward(v)); - disposable->get_pendings().apply(&apply_impl, disposable, observer); + state->get_pendings().apply(&apply_impl, state, observer); } private: @@ -68,7 +68,7 @@ namespace rpp::operators::details }; template - struct zip_t : public combining_operator_t + struct zip_t : public combining_operator_t { }; } // namespace rpp::operators::details From 0719f84ba3ddca8d9c8755cfa19dfb8bdfe7c7f2 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 15 Sep 2024 21:27:56 +0300 Subject: [PATCH 03/17] fix concat --- .../rpp/disposables/refcount_disposable.hpp | 11 ++++-- src/rpp/rpp/operators/concat.hpp | 34 ++++++++----------- src/rpp/rpp/sources/concat.hpp | 21 ++++++------ 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/src/rpp/rpp/disposables/refcount_disposable.hpp b/src/rpp/rpp/disposables/refcount_disposable.hpp index c41521cde..10e327886 100644 --- a/src/rpp/rpp/disposables/refcount_disposable.hpp +++ b/src/rpp/rpp/disposables/refcount_disposable.hpp @@ -54,7 +54,12 @@ namespace rpp friend class details::refocunt_disposable_inner; refcount_disposable() = default; - composite_disposable_wrapper add_ref(); + enum class Mode : bool + { + Weak, + Strong + }; + composite_disposable_wrapper add_ref(Mode mode = Mode::Weak); private: std::atomic m_refcount{0}; @@ -91,7 +96,7 @@ namespace rpp::details namespace rpp { - inline composite_disposable_wrapper refcount_disposable::add_ref() + inline composite_disposable_wrapper refcount_disposable::add_ref(refcount_disposable::Mode mode) { auto current_value = m_refcount.load(std::memory_order::seq_cst); while (true) @@ -103,7 +108,7 @@ namespace rpp if (m_refcount.compare_exchange_strong(current_value, current_value + 1, std::memory_order::seq_cst)) { auto inner = composite_disposable_wrapper::make(wrapper_from_this()); - add(inner.as_weak()); + add(mode == Mode::Weak ? inner.as_weak() : inner); return inner; } } diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index b750b5e15..65e8ecde2 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -34,29 +34,33 @@ namespace rpp::operators::details }; template - class concat_state_t final : public rpp::refcount_disposable + class concat_state_t final : public std::enable_shared_from_this> { public: concat_state_t(TObserver&& observer) : m_observer{std::move(observer)} { + const auto d = disposable_wrapper_impl::make(); + m_disposable = d.lock(); + get_observer()->set_upstream(d); } rpp::utils::pointer_under_lock get_observer() { return m_observer; } rpp::utils::pointer_under_lock> get_queue() { return m_queue; } + const std::shared_ptr& get_disposable() const { return m_disposable; } std::atomic& stage() { return m_stage; } void drain(rpp::composite_disposable_wrapper refcounted) { - while (!is_disposed()) + while (!m_disposable->is_disposed()) { const auto observable = get_observable(); if (!observable) { stage().store(ConcatStage::None, std::memory_order::relaxed); refcounted.dispose(); - if (is_disposed()) + if (m_disposable->is_disposed()) get_observer()->on_completed(); return; } @@ -74,12 +78,13 @@ namespace rpp::operators::details drain(refcounted); } + private: bool handle_observable_impl(const rpp::constraint::decayed_same_as auto& observable, rpp::composite_disposable_wrapper refcounted) { stage().store(ConcatStage::Draining, std::memory_order::relaxed); refcounted.clear(); - observable.subscribe(concat_inner_observer_strategy{disposable_wrapper_impl{wrapper_from_this()}.lock(), std::move(refcounted)}); + observable.subscribe(concat_inner_observer_strategy{this->shared_from_this(), std::move(refcounted)}); ConcatStage current = ConcatStage::Draining; return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst); @@ -97,6 +102,7 @@ namespace rpp::operators::details } private: + std::shared_ptr m_disposable{}; rpp::utils::value_with_mutex m_observer; rpp::utils::value_with_mutex> m_queue; std::atomic m_stage{}; @@ -112,7 +118,7 @@ namespace rpp::operators::details } concat_observer_strategy_base(std::shared_ptr> state) - : concat_observer_strategy_base{state, state->add_ref()} + : concat_observer_strategy_base{state, state->get_disposable()->add_ref(refcount_disposable::Mode::Strong)} { } @@ -122,7 +128,7 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { state->get_observer()->on_error(err); - state->dispose(); + state->get_disposable()->dispose(); } void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); } @@ -162,7 +168,7 @@ namespace rpp::operators::details using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; concat_observer_strategy(TObserver&& observer) - : base{init_state(std::move(observer))} + : base{std::make_shared>(std::move(observer))} { } @@ -171,7 +177,7 @@ namespace rpp::operators::details { ConcatStage current = ConcatStage::None; if (base::state->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst)) - base::state->handle_observable(std::forward(v), base::state->add_ref()); + base::state->handle_observable(std::forward(v), base::state->get_disposable()->add_ref(refcount_disposable::Mode::Strong)); else base::state->get_queue()->push(std::forward(v)); } @@ -179,19 +185,9 @@ namespace rpp::operators::details void on_completed() const { base::refcounted.dispose(); - if (base::state->is_disposed()) + if (base::state->get_disposable()->is_disposed()) base::state->get_observer()->on_completed(); } - - - private: - static std::shared_ptr> init_state(TObserver&& observer) - { - const auto d = disposable_wrapper_impl>::make(std::move(observer)); - auto ptr = d.lock(); - ptr->get_observer()->set_upstream(d.as_weak()); - return ptr; - } }; struct concat_t : lift_operator diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index ef0d6ab94..85b0dac97 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -22,12 +22,13 @@ namespace rpp::details { template - struct concat_state_t : public rpp::composite_disposable + struct concat_state_t { concat_state_t(TObserver&& in_observer, const PackedContainer& in_container) : observer(std::move(in_observer)) , container(in_container) { + observer.set_upstream(disposable); try { itr = std::cbegin(container); @@ -35,12 +36,13 @@ namespace rpp::details catch (...) { this->observer.on_error(std::current_exception()); - this->dispose(); + this->disposable.dispose(); } } RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS PackedContainer container; + rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); std::optional itr{}; std::atomic is_inside_drain{}; }; @@ -68,14 +70,14 @@ namespace rpp::details state->observer.on_error(err); } - void set_upstream(const disposable_wrapper& d) { state->add(d); } + void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); } - bool is_disposed() const { return locally_disposed || state->is_disposed(); } + bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } void on_completed() const { locally_disposed = true; - state->clear(); + state->disposable.clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -87,12 +89,12 @@ namespace rpp::details template void drain(const std::shared_ptr>& state) { - while (!state->is_disposed()) + while (!state->disposable.is_disposed()) { if (state->itr.value() == std::cend(state->container)) { state->observer.on_completed(); - state->dispose(); + state->disposable.dispose(); return; } @@ -130,10 +132,7 @@ namespace rpp::details template Strategy> void subscribe(observer&& obs) const { - const auto d = disposable_wrapper_impl, PackedContainer>>::make(std::move(obs), container); - const auto state = d.lock(); - state->observer.set_upstream(d.as_weak()); - drain(state); + drain(std::make_shared, PackedContainer>>(std::move(obs), container)); } }; From 0be57d396408e049737220447e2c30653bcfa037 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 15 Sep 2024 21:33:43 +0300 Subject: [PATCH 04/17] Debounce --- src/rpp/rpp/operators/debounce.hpp | 61 +++++++++++++----------------- 1 file changed, 27 insertions(+), 34 deletions(-) diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 1803c6db3..6d36b4dac 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -18,27 +18,26 @@ namespace rpp::operators::details { - template - class debounce_disposable; + template + class debounce_state; - template - struct debounce_disposable_wrapper + template + struct debounce_state_wrapper { - std::shared_ptr> disposable{}; + std::shared_ptr> state{}; - bool is_disposed() const { return disposable->is_disposed(); } + bool is_disposed() const { return state->get_observer_under_lock()->is_disposed(); } - void on_error(const std::exception_ptr& err) const { disposable->get_observer_under_lock()->on_error(err); } + void on_error(const std::exception_ptr& err) const { state->get_observer_under_lock()->on_error(err); } }; - template - class debounce_disposable final : public rpp::composite_disposable_impl - , public rpp::details::enable_wrapper_from_this> + template + class debounce_state final : public std::enable_shared_from_this> { using T = rpp::utils::extract_observer_type_t; public: - debounce_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period) + debounce_state(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period) : m_observer(std::move(in_observer)) , m_worker{std::move(in_worker)} , m_period{period} @@ -71,17 +70,17 @@ namespace rpp::operators::details { m_worker.schedule( m_time_when_value_should_be_emitted.value(), - [](const debounce_disposable_wrapper& handler) -> schedulers::optional_delay_to { - auto value_or_duration = handler.disposable->extract_value_or_time(); + [](const debounce_state_wrapper& handler) -> schedulers::optional_delay_to { + auto value_or_duration = handler.state->extract_value_or_time(); if (auto* timepoint = std::get_if(&value_or_duration)) return schedulers::optional_delay_to{*timepoint}; if (auto* value = std::get_if(&value_or_duration)) - handler.disposable->get_observer_under_lock()->on_next(std::move(*value)); + handler.state->get_observer_under_lock()->on_next(std::move(*value)); return std::nullopt; }, - debounce_disposable_wrapper{this->wrapper_from_this().lock()}); + debounce_state_wrapper{this->shared_from_this()}); } std::variant extract_value_or_time() @@ -108,41 +107,38 @@ namespace rpp::operators::details std::optional m_value_to_be_emitted{}; }; - template + template struct debounce_observer_strategy { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - std::shared_ptr> disposable{}; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const { - disposable->add(d); + state->get_observer_under_lock()->set_upstream(d); } bool is_disposed() const { - return disposable->is_disposed(); + return state->get_observer_under_lock()->is_disposed(); } template void on_next(T&& v) const { - disposable->emplace_safe(std::forward(v)); + state->emplace_safe(std::forward(v)); } void on_error(const std::exception_ptr& err) const noexcept { - disposable->dispose(); - disposable->get_observer_under_lock()->on_error(err); + state->get_observer_under_lock()->on_error(err); } void on_completed() const noexcept { - disposable->dispose(); - const auto value = disposable->extract_value(); - const auto observer = disposable->get_observer_under_lock(); - if (value) + const auto observer = state->get_observer_under_lock(); + if (const auto value = state->extract_value()) observer->on_next(std::move(value).value()); observer->on_completed(); } @@ -158,21 +154,18 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using updated_disposable_strategy = Prev; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; - template - auto lift_with_disposable_strategy(Observer&& observer) const + template + auto lift(Observer&& observer) const { using worker_t = rpp::schedulers::utils::get_worker_t; - using container = typename DisposableStrategy::disposable_container; - const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); - auto ptr = disposable.lock(); - ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); - return rpp::observer, worker_t, container>>{std::move(ptr)}; + auto ptr = std::make_shared, worker_t>>(std::forward(observer), scheduler.create_worker(), duration); + return rpp::observer, worker_t>>{std::move(ptr)}; } }; } // namespace rpp::operators::details From 48fddad2744e60591d7767cad2d3b61ca4f95e73 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 15 Sep 2024 21:39:24 +0300 Subject: [PATCH 05/17] fix delay --- src/rpp/rpp/operators/delay.hpp | 77 ++++++++++++++++----------------- 1 file changed, 37 insertions(+), 40 deletions(-) diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 649844ca0..83c0c3b58 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -35,12 +35,12 @@ namespace rpp::operators::details rpp::schedulers::time_point time_point{}; }; - template - struct delay_disposable final : public rpp::composite_disposable_impl + template + struct delay_state final { using T = rpp::utils::extract_observer_type_t; - delay_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration delay) + delay_state(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration delay) : observer(std::move(in_observer)) , worker{std::move(in_worker)} , delay{delay} @@ -56,29 +56,29 @@ namespace rpp::operators::details bool is_active{}; }; - template - struct delay_disposable_wrapper + template + struct delay_state_wrapper { - std::shared_ptr> disposable{}; + std::shared_ptr> state{}; - bool is_disposed() const { return disposable->is_disposed(); } + bool is_disposed() const { return state->observer.is_disposed(); } - void on_error(const std::exception_ptr& err) const { disposable->observer.on_error(err); } + void on_error(const std::exception_ptr& err) const { state->observer.on_error(err); } }; - template + template struct delay_observer_strategy { - std::shared_ptr> disposable{}; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const { - disposable->add(d); + state->observer.set_upstream(d); } bool is_disposed() const { - return disposable->is_disposed(); + return state->observer.is_disposed(); } template @@ -103,59 +103,59 @@ namespace rpp::operators::details { if (const auto tp = emplace_safe(std::forward(value))) { - disposable->worker.schedule( + state->worker.schedule( tp.value(), - [](const delay_disposable_wrapper& wrapper) { return drain_queue(wrapper.disposable); }, - delay_disposable_wrapper{disposable}); + [](const delay_state_wrapper& wrapper) { return drain_queue(wrapper.state); }, + delay_state_wrapper{state}); } } template std::optional emplace_safe(TT&& item) const { - std::lock_guard lock{disposable->mutex}; + std::lock_guard lock{state->mutex}; if constexpr (ClearOnError && rpp::constraint::decayed_same_as) { - disposable->queue = std::queue>>{}; - disposable->observer.on_error(std::forward(item)); + state->queue = std::queue>>{}; + state->observer.on_error(std::forward(item)); return std::nullopt; } else { - const auto tp = disposable->worker.now() + disposable->delay; - disposable->queue.emplace(std::forward(item), tp); - if (!disposable->is_active) + const auto tp = state->worker.now() + state->delay; + state->queue.emplace(std::forward(item), tp); + if (!state->is_active) { - disposable->is_active = true; + state->is_active = true; return tp; } return std::nullopt; } } - static schedulers::optional_delay_to drain_queue(const std::shared_ptr>& disposable) + static schedulers::optional_delay_to drain_queue(const std::shared_ptr>& state) { while (true) { - std::unique_lock lock{disposable->mutex}; - if (disposable->queue.empty()) + std::unique_lock lock{state->mutex}; + if (state->queue.empty()) { - disposable->is_active = false; + state->is_active = false; return std::nullopt; } - auto& top = disposable->queue.front(); - if (top.time_point > disposable->worker.now()) + auto& top = state->queue.front(); + if (top.time_point > state->worker.now()) return schedulers::optional_delay_to{top.time_point}; auto item = std::move(top.value); - disposable->queue.pop(); + state->queue.pop(); lock.unlock(); - std::visit(rpp::utils::overloaded{[&](rpp::utils::extract_observer_type_t&& v) { disposable->observer.on_next(std::move(v)); }, - [&](const std::exception_ptr& err) { disposable->observer.on_error(err); }, + std::visit(rpp::utils::overloaded{[&](rpp::utils::extract_observer_type_t&& v) { state->observer.on_next(std::move(v)); }, + [&](const std::exception_ptr& err) { state->observer.on_error(err); }, [&](rpp::utils::none) { - disposable->observer.on_completed(); + state->observer.on_completed(); }}, std::move(item)); } @@ -172,21 +172,18 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using updated_disposable_strategy = Prev; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; - template - auto lift_with_disposable_strategy(Observer&& observer) const + template + auto lift(Observer&& observer) const { using worker_t = rpp::schedulers::utils::get_worker_t; - using container = typename DisposableStrategy::disposable_container; - const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); - auto ptr = disposable.lock(); - ptr->observer.set_upstream(disposable.as_weak()); - return rpp::observer, worker_t, container, ClearOnError>>{std::move(ptr)}; + auto state = std::make_shared, worker_t>>(std::forward(observer), scheduler.create_worker(), duration); + return rpp::observer, worker_t, ClearOnError>>{std::move(state)}; } }; } // namespace rpp::operators::details From fd29804b9995fad24a056316fbf15dd3d18e2d74 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 15 Sep 2024 18:43:21 +0000 Subject: [PATCH 06/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/operators/concat.hpp | 2 +- src/rpp/rpp/operators/debounce.hpp | 4 ++-- src/rpp/rpp/operators/delay.hpp | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 65e8ecde2..70bda64e9 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -47,7 +47,7 @@ namespace rpp::operators::details rpp::utils::pointer_under_lock get_observer() { return m_observer; } rpp::utils::pointer_under_lock> get_queue() { return m_queue; } - const std::shared_ptr& get_disposable() const { return m_disposable; } + const std::shared_ptr& get_disposable() const { return m_disposable; } std::atomic& stage() { return m_stage; } diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 6d36b4dac..a9546a62e 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -138,7 +138,7 @@ namespace rpp::operators::details void on_completed() const noexcept { const auto observer = state->get_observer_under_lock(); - if (const auto value = state->extract_value()) + if (const auto value = state->extract_value()) observer->on_next(std::move(value).value()); observer->on_completed(); } @@ -162,7 +162,7 @@ namespace rpp::operators::details template auto lift(Observer&& observer) const { - using worker_t = rpp::schedulers::utils::get_worker_t; + using worker_t = rpp::schedulers::utils::get_worker_t; auto ptr = std::make_shared, worker_t>>(std::forward(observer), scheduler.create_worker(), duration); return rpp::observer, worker_t>>{std::move(ptr)}; diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 83c0c3b58..4bfb9df22 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -180,7 +180,7 @@ namespace rpp::operators::details template auto lift(Observer&& observer) const { - using worker_t = rpp::schedulers::utils::get_worker_t; + using worker_t = rpp::schedulers::utils::get_worker_t; auto state = std::make_shared, worker_t>>(std::forward(observer), scheduler.create_worker(), duration); return rpp::observer, worker_t, ClearOnError>>{std::move(state)}; From 30d7a1e72a02036c2df8beb4d189cf65adf6cfb1 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 15 Sep 2024 21:57:35 +0300 Subject: [PATCH 07/17] fix merge --- src/rpp/rpp/operators/merge.hpp | 53 +++++++++++++++------------------ 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index d7715098c..89ce4ab60 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -13,7 +13,6 @@ #include #include -#include #include #include #include @@ -24,12 +23,13 @@ namespace rpp::operators::details { template - class merge_disposable final : public composite_disposable + class merge_state final { public: - merge_disposable(TObserver&& observer) + merge_state(TObserver&& observer) : m_observer(std::move(observer)) { + get_observer_under_lock()->set_upstream(m_disposable); } // just need atomicity, not guarding anything @@ -40,56 +40,60 @@ namespace rpp::operators::details rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer; } + const rpp::composite_disposable_wrapper& get_disposable() const { return m_disposable; } + private: rpp::utils::value_with_mutex m_observer{}; + rpp::composite_disposable_wrapper m_disposable = composite_disposable_wrapper::make(); std::atomic_size_t m_on_completed_needed{1}; }; template struct merge_observer_base_strategy { - merge_observer_base_strategy(std::shared_ptr>&& disposable) - : m_disposable{std::move(disposable)} + merge_observer_base_strategy(std::shared_ptr>&& state) + : m_state{std::move(state)} { } - merge_observer_base_strategy(const std::shared_ptr>& disposable) - : m_disposable{disposable} + merge_observer_base_strategy(const std::shared_ptr>& state) + : m_state{state} { } void set_upstream(const rpp::disposable_wrapper& d) const { - m_disposable->add(d); + m_state->get_disposable().add(d); m_disposables.push_back(d); } bool is_disposed() const { - return m_disposable->is_disposed(); + return m_state->get_observer_under_lock()->is_disposed(); } void on_error(const std::exception_ptr& err) const { - m_disposable->get_observer_under_lock()->on_error(err); - m_disposable->dispose(); + m_state->get_observer_under_lock()->on_error(err); } void on_completed() const { - if (m_disposable->decrement_on_completed()) + if (m_state->decrement_on_completed()) + { + m_state->get_observer_under_lock()->on_completed(); + } + else { for (const auto& v : m_disposables) { - m_disposable->remove(v); + m_state->get_disposable().remove(v); } - m_disposable->get_observer_under_lock()->on_completed(); - m_disposable->dispose(); } } protected: - std::shared_ptr> m_disposable; + std::shared_ptr> m_state; mutable std::vector m_disposables{}; }; @@ -101,7 +105,7 @@ namespace rpp::operators::details template void on_next(T&& v) const { - merge_observer_base_strategy::m_disposable->get_observer_under_lock()->on_next(std::forward(v)); + merge_observer_base_strategy::m_state->get_observer_under_lock()->on_next(std::forward(v)); } }; @@ -110,24 +114,15 @@ namespace rpp::operators::details { public: explicit merge_observer_strategy(TObserver&& observer) - : merge_observer_base_strategy{init_state(std::move(observer))} + : merge_observer_base_strategy{std::make_shared>(std::move(observer))} { } template void on_next(T&& v) const { - merge_observer_base_strategy::m_disposable->increment_on_completed(); - std::forward(v).subscribe(rpp::observer, merge_observer_inner_strategy>{merge_observer_inner_strategy{merge_observer_base_strategy::m_disposable}}); - } - - private: - static std::shared_ptr> init_state(TObserver&& observer) - { - const auto d = disposable_wrapper_impl>::make(std::move(observer)); - auto ptr = d.lock(); - ptr->get_observer_under_lock()->set_upstream(d.as_weak()); - return ptr; + merge_observer_base_strategy::m_state->increment_on_completed(); + std::forward(v).subscribe(rpp::observer, merge_observer_inner_strategy>{merge_observer_inner_strategy{merge_observer_base_strategy::m_state}}); } }; From 48837bd2bb73e0d7c528b4f89fd36a13ce8c241c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 15 Sep 2024 18:57:21 +0000 Subject: [PATCH 08/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/operators/merge.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 89ce4ab60..791a65f84 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -93,7 +93,7 @@ namespace rpp::operators::details } protected: - std::shared_ptr> m_state; + std::shared_ptr> m_state; mutable std::vector m_disposables{}; }; From c959bb25d7b4b2a4f62f498c6262665afe5a718e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 15 Sep 2024 22:01:14 +0300 Subject: [PATCH 09/17] includes --- src/rpp/rpp/operators/debounce.hpp | 1 - src/rpp/rpp/operators/delay.hpp | 1 - src/rpp/rpp/operators/details/combining_strategy.hpp | 1 - 3 files changed, 3 deletions(-) diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index a9546a62e..24c1a9d34 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -12,7 +12,6 @@ #include -#include #include #include diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 4bfb9df22..f92106414 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -13,7 +13,6 @@ #include #include -#include #include #include diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index d3a0b1c92..4aa03b752 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -13,7 +13,6 @@ #include #include -#include #include #include #include From 0d3643cf9ecebb02263118870687b64ba0b69ec9 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 15 Sep 2024 22:06:51 +0300 Subject: [PATCH 10/17] on_error_resume --- .../rpp/operators/on_error_resume_next.hpp | 40 +++++-------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index c010613ca..23d1c820b 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -17,18 +17,6 @@ namespace rpp::operators::details { - template - struct on_error_resume_next_disposable final : public rpp::composite_disposable - { - on_error_resume_next_disposable(TObserver&& observer) - : rpp::composite_disposable{} - , observer(std::move(observer)) - { - } - - RPP_NO_UNIQUE_ADDRESS TObserver observer; - }; - template struct on_error_resume_next_inner_observer_strategy { @@ -64,53 +52,43 @@ namespace rpp::operators::details using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; on_error_resume_next_observer_strategy(TObserver&& observer, const Selector& selector) - : state{init_state(std::move(observer))} + : state{std::make_shared(std::move(observer))} , selector{selector} { } - std::shared_ptr> state; - RPP_NO_UNIQUE_ADDRESS Selector selector; + std::shared_ptr state; + RPP_NO_UNIQUE_ADDRESS Selector selector; template void on_next(T&& v) const { - state->observer.on_next(std::forward(v)); + state->on_next(std::forward(v)); } void on_error(const std::exception_ptr& err) const { try { - selector(err).subscribe(on_error_resume_next_inner_observer_strategy{std::shared_ptr(state, &state->observer)}); + selector(err).subscribe(on_error_resume_next_inner_observer_strategy{state}); } catch (...) { - state->observer.on_error(std::current_exception()); + state->on_error(std::current_exception()); } - state->dispose(); } void on_completed() const { - state->observer.on_completed(); - state->dispose(); + state->on_completed(); } void set_upstream(const disposable_wrapper& d) const { - state->add(d); + state->set_upstream(d); } bool is_disposed() const { return state->is_disposed(); } - - static std::shared_ptr> init_state(TObserver&& observer) - { - const auto d = disposable_wrapper_impl>::make(std::move(observer)); - auto ptr = d.lock(); - ptr->observer.set_upstream(d.as_weak()); - return ptr; - } }; template @@ -135,7 +113,7 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::atomic_dynamic_disposable_strategy_selector<1>; + using updated_disposable_strategy = rpp::details::observables::default_disposable_strategy_selector; }; } // namespace rpp::operators::details From ab2a111a0c27ecbbc0a3635af9580b928d58516b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 15 Sep 2024 22:10:36 +0300 Subject: [PATCH 11/17] retry --- src/rpp/rpp/operators/retry.hpp | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index 686daec58..89a34dca7 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -18,7 +18,7 @@ namespace rpp::operators::details { template - struct retry_state_t final : public rpp::composite_disposable + struct retry_state_t final { retry_state_t(TObserver&& in_observer, const Observable& observable, std::optional count) : count{count} @@ -26,6 +26,7 @@ namespace rpp::operators::details , observable(observable) { + observer.set_upstream(disposable); } std::optional count; @@ -33,6 +34,8 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS Observable observable; + + rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); }; template @@ -58,11 +61,10 @@ namespace rpp::operators::details if (state->count == 0) { state->observer.on_error(err); - state->dispose(); return; } - state->clear(); + state->disposable.clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -78,16 +80,16 @@ namespace rpp::operators::details void set_upstream(const disposable_wrapper& d) const { - state->add(d); + state->disposable.add(d); } - bool is_disposed() const { return locally_disposed || state->is_disposed(); } + bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } }; template void drain(const std::shared_ptr>& state) { - while (!state->is_disposed()) + while (!state->disposable.is_disposed()) { if (state->count) --state->count.value(); @@ -124,10 +126,7 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observble) const { - const auto d = disposable_wrapper_impl, std::decay_t>>::make(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); - auto ptr = d.lock(); - - ptr->observer.set_upstream(d.as_weak()); + const auto ptr = std::make_shared, std::decay_t>>(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); drain(ptr); } }; From 826dce540539be0172c1cfb4f458d3dec941510a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 15 Sep 2024 22:13:38 +0300 Subject: [PATCH 12/17] retry_when --- src/rpp/rpp/operators/retry_when.hpp | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index bf481db07..c09633f51 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -23,13 +23,14 @@ namespace rpp::operators::details template - struct retry_when_state final : public rpp::composite_disposable + struct retry_when_state final { - retry_when_state(TObserver&& observer, const TObservable& observable, const TNotifier& notifier) - : observer(std::move(observer)) + retry_when_state(TObserver&& in_observer, const TObservable& observable, const TNotifier& notifier) + : observer(std::move(in_observer)) , observable(observable) , notifier(notifier) { + observer.set_upstream(disposable); } std::atomic_bool is_inside_drain{}; @@ -37,6 +38,9 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS TObservable observable; RPP_NO_UNIQUE_ADDRESS TNotifier notifier; + + rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); + }; template @@ -56,7 +60,7 @@ namespace rpp::operators::details void on_next(T&&) const { locally_disposed = true; - state->clear(); + state->disposable.clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -76,9 +80,9 @@ namespace rpp::operators::details state->observer.on_completed(); } - void set_upstream(const disposable_wrapper& d) const { state->add(d); } + void set_upstream(const disposable_wrapper& d) const { state->disposable.add(d); } - bool is_disposed() const { return locally_disposed || state->is_disposed(); } + bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } }; templateobserver.on_completed(); } - void set_upstream(const disposable_wrapper& d) { state->add(d); } + void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); } - bool is_disposed() const { return state->is_disposed(); } + bool is_disposed() const { return state->disposable.is_disposed(); } }; template void drain(const std::shared_ptr>& state) { - while (!state->is_disposed()) + while (!state->disposable.is_disposed()) { state->is_inside_drain.store(true, std::memory_order::seq_cst); try @@ -157,10 +161,7 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observable) const { - const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), std::forward(observable), notifier); - auto ptr = d.lock(); - - ptr->observer.set_upstream(d.as_weak()); + const auto ptr = std::make_shared, std::decay_t, std::decay_t>>(std::forward(observer), std::forward(observable), notifier); drain(ptr); } }; From dc2e3edaac1780fe50885425f60410563567d5c2 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 15 Sep 2024 22:28:58 +0300 Subject: [PATCH 13/17] take_until --- src/rpp/rpp/operators/retry_when.hpp | 1 - src/rpp/rpp/operators/take_until.hpp | 19 ++++++++----------- src/tests/rpp/test_take_until.cpp | 6 ++++-- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index c09633f51..62c2ae595 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -40,7 +40,6 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS TNotifier notifier; rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); - }; template diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index ed2aca2c2..c8c20e8e9 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -13,22 +13,21 @@ #include #include -#include #include #include namespace rpp::operators::details { template - class take_until_disposable final : public rpp::composite_disposable + class take_until_state final { public: - take_until_disposable(TObserver&& observer) + take_until_state(TObserver&& observer) : m_observer_with_mutex(std::move(observer)) { } - take_until_disposable(const TObserver& observer) + take_until_state(const TObserver& observer) : m_observer_with_mutex(observer) { } @@ -49,7 +48,7 @@ namespace rpp::operators::details { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - std::shared_ptr> state; + std::shared_ptr> state; void on_error(const std::exception_ptr& err) const { @@ -63,9 +62,9 @@ namespace rpp::operators::details state->get_observer()->on_completed(); } - void set_upstream(const disposable_wrapper& d) { state->add(d); } + void set_upstream(const disposable_wrapper& d) { state->get_observer()->set_upstream(d); } - bool is_disposed() const { return state->is_disposed(); } + bool is_disposed() const { return state->get_observer()->is_disposed(); } }; template @@ -104,14 +103,12 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using updated_disposable_strategy = rpp::details::observables::default_disposable_strategy_selector; template auto lift(Observer&& observer) const { - const auto d = disposable_wrapper_impl>>::make(std::forward(observer)); - auto ptr = d.lock(); - ptr->get_observer()->set_upstream(d.as_weak()); + auto ptr = std::make_shared>>(std::forward(observer)); observable.subscribe(take_until_throttle_observer_strategy>{ptr}); return rpp::observer>>(std::move(ptr)); diff --git a/src/tests/rpp/test_take_until.cpp b/src/tests/rpp/test_take_until.cpp index 7d65a96a8..46509393d 100644 --- a/src/tests/rpp/test_take_until.cpp +++ b/src/tests/rpp/test_take_until.cpp @@ -14,10 +14,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -146,7 +148,7 @@ TEST_CASE("take_until can handle race condition") SECTION("on_completed shall not interleave with on_next") { - rpp::source::interval(std::chrono::milliseconds{200}, rpp::schedulers::current_thread{}) + rpp::source::concat(rpp::source::just(1) | rpp::ops::take(1), rpp::source::never()) | rpp::ops::take_until(subject.get_observable()) | rpp::ops::as_blocking() | rpp::ops::subscribe([&](auto&&) { @@ -173,7 +175,7 @@ TEST_CASE("take_until can handle race condition") SECTION("on_error shall not interleave with on_next") { - rpp::source::interval(std::chrono::milliseconds{200}, rpp::schedulers::current_thread{}) + rpp::source::concat(rpp::source::just(1) | rpp::ops::take(1), rpp::source::never()) | rpp::ops::take_until(subject.get_observable()) | rpp::ops::as_blocking() | rpp::ops::subscribe([&](auto&&) { From 5d6fafd7a0fae4baef479ffb4beadd0720d8b839 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 15 Sep 2024 22:32:15 +0300 Subject: [PATCH 14/17] with_latest_from --- src/rpp/rpp/operators/with_latest_from.hpp | 48 +++++++++++----------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index 78ae5bafe..5c553d6c6 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -23,13 +23,14 @@ namespace rpp::operators::details { template - class with_latest_from_disposable final : public composite_disposable + class with_latest_from_state final { public: - explicit with_latest_from_disposable(Observer&& observer, const TSelector& selector) + explicit with_latest_from_state(Observer&& observer, const TSelector& selector) : m_observer_with_mutex{std::move(observer)} , m_selector{selector} { + get_observer_under_lock()->set_upstream(m_disposable); } rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer_with_mutex; } @@ -37,39 +38,40 @@ namespace rpp::operators::details rpp::utils::tuple>...>& get_values() { return m_values; } const TSelector& get_selector() const { return m_selector; } + const composite_disposable_wrapper& get_disposable() const { return m_disposable; } private: rpp::utils::value_with_mutex m_observer_with_mutex{}; rpp::utils::tuple>...> m_values{}; + composite_disposable_wrapper m_disposable = composite_disposable_wrapper::make(); RPP_NO_UNIQUE_ADDRESS TSelector m_selector; }; template struct with_latest_from_inner_observer_strategy { - std::shared_ptr> disposable{}; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const { - disposable->add(d); + state->get_disposable().add(d); } bool is_disposed() const { - return disposable->is_disposed(); + return state->get_disposable().is_disposed(); } template void on_next(T&& v) const { - auto locked_value = disposable->get_values().template get().lock(); + auto locked_value = state->get_values().template get().lock(); locked_value->emplace(std::forward(v)); } void on_error(const std::exception_ptr& err) const { - disposable->get_observer_under_lock()->on_error(err); - disposable->dispose(); + state->get_observer_under_lock()->on_error(err); } static constexpr rpp::utils::empty_function_t<> on_completed{}; @@ -79,26 +81,26 @@ namespace rpp::operators::details requires std::invocable struct with_latest_from_observer_strategy { - using Disposable = with_latest_from_disposable; + using Disposable = with_latest_from_state; using Result = std::invoke_result_t; using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - std::shared_ptr disposable{}; + std::shared_ptr state{}; void set_upstream(const rpp::disposable_wrapper& d) const { - disposable->add(d); + state->get_disposable().add(d); } bool is_disposed() const { - return disposable->is_disposed(); + return state->get_disposable().is_disposed(); } template void on_next(T&& v) const { - auto result = disposable->get_values().apply([&d = this->disposable, &v](rpp::utils::value_with_mutex>&... vals) -> std::optional { + auto result = state->get_values().apply([&d = this->state, &v](rpp::utils::value_with_mutex>&... vals) -> std::optional { auto lock = std::scoped_lock{vals.get_mutex()...}; if ((vals.get_value_unsafe().has_value() && ...)) @@ -107,19 +109,17 @@ namespace rpp::operators::details }); if (result.has_value()) - disposable->get_observer_under_lock()->on_next(std::move(result).value()); + state->get_observer_under_lock()->on_next(std::move(result).value()); } void on_error(const std::exception_ptr& err) const { - disposable->get_observer_under_lock()->on_error(err); - disposable->dispose(); + state->get_observer_under_lock()->on_error(err); } void on_completed() const { - disposable->get_observer_under_lock()->on_completed(); - disposable->dispose(); + state->get_observer_under_lock()->on_completed(); } }; @@ -140,7 +140,7 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using updated_disposable_strategy = rpp::details::observables::default_disposable_strategy_selector; template auto lift(Observer&& observer) const @@ -152,20 +152,18 @@ namespace rpp::operators::details template static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { - using Disposable = with_latest_from_disposable...>; + using State = with_latest_from_state...>; - const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); - auto ptr = disposable.lock(); - ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); + auto ptr = std::make_shared(std::forward(observer), selector); subscribe(ptr, std::index_sequence_for{}, observables...); return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(ptr)}; } template - static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) + static void subscribe(const std::shared_ptr...>>& state, std::index_sequence, const TObservables&... observables) { - (..., observables.subscribe(rpp::observer, with_latest_from_inner_observer_strategy...>>{disposable})); + (..., observables.subscribe(rpp::observer, with_latest_from_inner_observer_strategy...>>{state})); } }; } // namespace rpp::operators::details From 4b80c5f0623b4761031f6a1852f85303c59fab2f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 15 Sep 2024 19:31:45 +0000 Subject: [PATCH 15/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/operators/retry.hpp | 2 +- src/rpp/rpp/operators/retry_when.hpp | 2 +- src/rpp/rpp/operators/with_latest_from.hpp | 2 +- src/tests/rpp/test_take_until.cpp | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index 89a34dca7..037ac7fb7 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -126,7 +126,7 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observble) const { - const auto ptr = std::make_shared, std::decay_t>>(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); + const auto ptr = std::make_shared, std::decay_t>>(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); drain(ptr); } }; diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index 62c2ae595..d5ad56a69 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -160,7 +160,7 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observable) const { - const auto ptr = std::make_shared, std::decay_t, std::decay_t>>(std::forward(observer), std::forward(observable), notifier); + const auto ptr = std::make_shared, std::decay_t, std::decay_t>>(std::forward(observer), std::forward(observable), notifier); drain(ptr); } }; diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index 5c553d6c6..1e0eff555 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -37,7 +37,7 @@ namespace rpp::operators::details rpp::utils::tuple>...>& get_values() { return m_values; } - const TSelector& get_selector() const { return m_selector; } + const TSelector& get_selector() const { return m_selector; } const composite_disposable_wrapper& get_disposable() const { return m_disposable; } private: diff --git a/src/tests/rpp/test_take_until.cpp b/src/tests/rpp/test_take_until.cpp index 46509393d..5ab1b77cf 100644 --- a/src/tests/rpp/test_take_until.cpp +++ b/src/tests/rpp/test_take_until.cpp @@ -17,9 +17,9 @@ #include #include #include +#include #include #include -#include #include #include #include @@ -148,7 +148,7 @@ TEST_CASE("take_until can handle race condition") SECTION("on_completed shall not interleave with on_next") { - rpp::source::concat(rpp::source::just(1) | rpp::ops::take(1), rpp::source::never()) + rpp::source::concat(rpp::source::just(1) | rpp::ops::take(1), rpp::source::never()) | rpp::ops::take_until(subject.get_observable()) | rpp::ops::as_blocking() | rpp::ops::subscribe([&](auto&&) { From 15a664e591146e143fa4c4edb2439afec152293e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 16 Sep 2024 21:54:52 +0300 Subject: [PATCH 16/17] try fix --- src/rpp/rpp/disposables/refcount_disposable.hpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rpp/rpp/disposables/refcount_disposable.hpp b/src/rpp/rpp/disposables/refcount_disposable.hpp index 10e327886..90396fc14 100644 --- a/src/rpp/rpp/disposables/refcount_disposable.hpp +++ b/src/rpp/rpp/disposables/refcount_disposable.hpp @@ -56,10 +56,10 @@ namespace rpp enum class Mode : bool { - Weak, - Strong + WeakRefStrongSource, + StrongRefRefSource }; - composite_disposable_wrapper add_ref(Mode mode = Mode::Weak); + composite_disposable_wrapper add_ref(Mode mode = Mode::WeakRefStrongSource); private: std::atomic m_refcount{0}; @@ -107,8 +107,8 @@ namespace rpp // just need atomicity, not guarding anything if (m_refcount.compare_exchange_strong(current_value, current_value + 1, std::memory_order::seq_cst)) { - auto inner = composite_disposable_wrapper::make(wrapper_from_this()); - add(mode == Mode::Weak ? inner.as_weak() : inner); + auto inner = composite_disposable_wrapper::make(mode == Mode::WeakRefStrongSource ? wrapper_from_this() : wrapper_from_this().as_weak()); + add(mode == Mode::WeakRefStrongSource ? inner.as_weak() : inner); return inner; } } From 499ba4f293864bd526373777f0626eec50b5a501 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 16 Sep 2024 22:02:26 +0300 Subject: [PATCH 17/17] one more --- src/rpp/rpp/operators/concat.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 70bda64e9..c4cc12823 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -118,7 +118,7 @@ namespace rpp::operators::details } concat_observer_strategy_base(std::shared_ptr> state) - : concat_observer_strategy_base{state, state->get_disposable()->add_ref(refcount_disposable::Mode::Strong)} + : concat_observer_strategy_base{state, state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource)} { } @@ -177,7 +177,7 @@ namespace rpp::operators::details { ConcatStage current = ConcatStage::None; if (base::state->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst)) - base::state->handle_observable(std::forward(v), base::state->get_disposable()->add_ref(refcount_disposable::Mode::Strong)); + base::state->handle_observable(std::forward(v), base::state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource)); else base::state->get_queue()->push(std::forward(v)); }