From e3706c862574d1bb22bb09cd6047f8bcf3f010dd Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 3 Nov 2024 23:38:47 +0300 Subject: [PATCH 01/20] Partially revert commit --- .../rpp/disposables/refcount_disposable.hpp | 13 +-- src/rpp/rpp/operators/combine_latest.hpp | 22 ++--- src/rpp/rpp/operators/concat.hpp | 58 +++++++------ src/rpp/rpp/operators/debounce.hpp | 65 ++++++++------- src/rpp/rpp/operators/delay.hpp | 83 ++++++++++--------- .../operators/details/combining_strategy.hpp | 40 ++++----- src/rpp/rpp/operators/merge.hpp | 49 ++++++----- .../rpp/operators/on_error_resume_next.hpp | 37 +++++++-- src/rpp/rpp/operators/retry.hpp | 18 ++-- src/rpp/rpp/operators/retry_when.hpp | 26 +++--- src/rpp/rpp/operators/take_until.hpp | 19 +++-- src/rpp/rpp/operators/with_latest_from.hpp | 50 ++++++----- src/rpp/rpp/operators/zip.hpp | 18 ++-- src/rpp/rpp/sources/concat.hpp | 17 ++-- 14 files changed, 274 insertions(+), 241 deletions(-) diff --git a/src/rpp/rpp/disposables/refcount_disposable.hpp b/src/rpp/rpp/disposables/refcount_disposable.hpp index 90396fc14..c41521cde 100644 --- a/src/rpp/rpp/disposables/refcount_disposable.hpp +++ b/src/rpp/rpp/disposables/refcount_disposable.hpp @@ -54,12 +54,7 @@ namespace rpp friend class details::refocunt_disposable_inner; refcount_disposable() = default; - enum class Mode : bool - { - WeakRefStrongSource, - StrongRefRefSource - }; - composite_disposable_wrapper add_ref(Mode mode = Mode::WeakRefStrongSource); + composite_disposable_wrapper add_ref(); private: std::atomic m_refcount{0}; @@ -96,7 +91,7 @@ namespace rpp::details namespace rpp { - inline composite_disposable_wrapper refcount_disposable::add_ref(refcount_disposable::Mode mode) + inline composite_disposable_wrapper refcount_disposable::add_ref() { auto current_value = m_refcount.load(std::memory_order::seq_cst); while (true) @@ -107,8 +102,8 @@ namespace rpp // just need atomicity, not guarding anything if (m_refcount.compare_exchange_strong(current_value, current_value + 1, std::memory_order::seq_cst)) { - auto inner = composite_disposable_wrapper::make(mode == Mode::WeakRefStrongSource ? wrapper_from_this() : wrapper_from_this().as_weak()); - add(mode == Mode::WeakRefStrongSource ? inner.as_weak() : inner); + auto inner = composite_disposable_wrapper::make(wrapper_from_this()); + add(inner.as_weak()); return inner; } } diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index a843647bb..2be977ac2 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -19,11 +19,11 @@ namespace rpp::operators::details { template - class combine_latest_state final : public combining_state + class combine_latest_disposable final : public combining_disposable { public: - explicit combine_latest_state(Observer&& observer, const TSelector& selector) - : combining_state(std::move(observer), sizeof...(Args)) + explicit combine_latest_disposable(Observer&& observer, const TSelector& selector) + : combining_disposable(std::move(observer), sizeof...(Args)) , m_selector(selector) { } @@ -40,23 +40,23 @@ namespace rpp::operators::details template struct combine_latest_observer_strategy final - : public combining_observer_strategy> + : public combining_observer_strategy> { - using combining_observer_strategy>::state; + using combining_observer_strategy>::disposable; template void on_next(T&& v) const { // mutex need to be locked during changing of values, generating new values and sending of new values due to we can't update value while we are sending old one - const auto observer = state->get_observer_under_lock(); - state->get_values().template get().emplace(std::forward(v)); + const auto observer = disposable->get_observer_under_lock(); + disposable->get_values().template get().emplace(std::forward(v)); - state->get_values().apply(&apply_impl, state, observer); + disposable->get_values().apply(&apply_impl, disposable, observer); } private: - template - static void apply_impl(const TState& disposable, const rpp::utils::pointer_under_lock& observer, const std::optional&... vals) + template + static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock& observer, const std::optional&... vals) { if ((vals.has_value() && ...)) observer->on_next(disposable->get_selector()(vals.value()...)); @@ -64,7 +64,7 @@ namespace rpp::operators::details }; template - struct combine_latest_t : public combining_operator_t + struct combine_latest_t : public combining_operator_t { }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index fbe29fced..68534f80d 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -34,33 +34,29 @@ namespace rpp::operators::details }; template - class concat_state_t final : public std::enable_shared_from_this> + class concat_disposable final : public rpp::refcount_disposable { public: - concat_state_t(TObserver&& observer) + concat_disposable(TObserver&& observer) : m_observer{std::move(observer)} { - const auto d = disposable_wrapper_impl::make(); - m_disposable = d.lock(); - get_observer()->set_upstream(d); } rpp::utils::pointer_under_lock get_observer() { return m_observer; } rpp::utils::pointer_under_lock> get_queue() { return m_queue; } - const std::shared_ptr& get_disposable() const { return m_disposable; } std::atomic& stage() { return m_stage; } void drain(rpp::composite_disposable_wrapper refcounted) { - while (!m_disposable->is_disposed()) + while (!is_disposed()) { const auto observable = get_observable(); if (!observable) { stage().store(ConcatStage::None, std::memory_order::relaxed); refcounted.dispose(); - if (m_disposable->is_disposed()) + if (is_disposed()) get_observer()->on_completed(); return; } @@ -78,13 +74,12 @@ namespace rpp::operators::details drain(refcounted); } - private: bool handle_observable_impl(const rpp::constraint::decayed_same_as auto& observable, rpp::composite_disposable_wrapper refcounted) { stage().store(ConcatStage::Draining, std::memory_order::relaxed); refcounted.clear(); - observable.subscribe(concat_inner_observer_strategy{this->shared_from_this(), std::move(refcounted)}); + observable.subscribe(concat_inner_observer_strategy{disposable_wrapper_impl{wrapper_from_this()}.lock(), std::move(refcounted)}); ConcatStage current = ConcatStage::Draining; return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst); @@ -102,7 +97,6 @@ namespace rpp::operators::details } private: - std::shared_ptr m_disposable{}; rpp::utils::value_with_mutex m_observer; rpp::utils::value_with_mutex> m_queue; std::atomic m_stage{}; @@ -111,23 +105,23 @@ namespace rpp::operators::details template struct concat_observer_strategy_base { - concat_observer_strategy_base(std::shared_ptr> state, rpp::composite_disposable_wrapper refcounted) - : state{std::move(state)} + concat_observer_strategy_base(std::shared_ptr> disposable, rpp::composite_disposable_wrapper refcounted) + : disposable{std::move(disposable)} , refcounted{std::move(refcounted)} { } - concat_observer_strategy_base(std::shared_ptr> state) - : concat_observer_strategy_base{state, state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource)} + concat_observer_strategy_base(std::shared_ptr> disposable) + : concat_observer_strategy_base{disposable, disposable->add_ref()} { } - std::shared_ptr> state; - rpp::composite_disposable_wrapper refcounted; + std::shared_ptr> disposable; + rpp::composite_disposable_wrapper refcounted; void on_error(const std::exception_ptr& err) const { - state->get_observer()->on_error(err); + disposable->get_observer()->on_error(err); } void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); } @@ -146,18 +140,18 @@ namespace rpp::operators::details template void on_next(T&& v) const { - base::state->get_observer()->on_next(std::forward(v)); + base::disposable->get_observer()->on_next(std::forward(v)); } void on_completed() const { ConcatStage current{ConcatStage::Draining}; - if (base::state->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst)) + if (base::disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst)) return; assert(current == ConcatStage::Processing); - base::state->drain(base::refcounted); + base::disposable->drain(base::refcounted); } }; @@ -168,7 +162,7 @@ namespace rpp::operators::details static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; concat_observer_strategy(TObserver&& observer) - : base{std::make_shared>(std::move(observer))} + : base{init_state(std::move(observer))} { } @@ -176,17 +170,27 @@ namespace rpp::operators::details void on_next(T&& v) const { ConcatStage current = ConcatStage::None; - if (base::state->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst)) - base::state->handle_observable(std::forward(v), base::state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource)); + if (base::disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst)) + base::disposable->handle_observable(std::forward(v), base::disposable->add_ref()); else - base::state->get_queue()->push(std::forward(v)); + base::disposable->get_queue()->push(std::forward(v)); } void on_completed() const { base::refcounted.dispose(); - if (base::state->get_disposable()->is_disposed()) - base::state->get_observer()->on_completed(); + if (base::disposable->is_disposed()) + base::disposable->get_observer()->on_completed(); + } + + + private: + static std::shared_ptr> init_state(TObserver&& observer) + { + const auto d = disposable_wrapper_impl>::make(std::move(observer)); + auto ptr = d.lock(); + ptr->get_observer()->set_upstream(d.as_weak()); + return ptr; } }; diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 3130fe9e8..8e0d3e916 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -12,32 +12,33 @@ #include +#include #include #include namespace rpp::operators::details { - template - class debounce_state; + template + class debounce_disposable; - template - struct debounce_state_wrapper + template + struct debounce_disposable_wrapper { - std::shared_ptr> state{}; + std::shared_ptr> disposable{}; - bool is_disposed() const { return state->is_disposed(); } + bool is_disposed() const { return disposable->is_disposed(); } - void on_error(const std::exception_ptr& err) const { state->get_observer_under_lock()->on_error(err); } + void on_error(const std::exception_ptr& err) const { disposable->get_observer_under_lock()->on_error(err); } }; - template - class debounce_state final : public rpp::details::enable_wrapper_from_this> - , public rpp::details::base_disposable + template + class debounce_disposable final : public rpp::composite_disposable_impl + , public rpp::details::enable_wrapper_from_this> { using T = rpp::utils::extract_observer_type_t; public: - debounce_state(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period) + debounce_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period) : m_observer(std::move(in_observer)) , m_worker{std::move(in_worker)} , m_period{period} @@ -70,17 +71,17 @@ namespace rpp::operators::details { m_worker.schedule( m_time_when_value_should_be_emitted.value(), - [](const debounce_state_wrapper& handler) -> schedulers::optional_delay_to { - auto value_or_duration = handler.state->extract_value_or_time(); + [](const debounce_disposable_wrapper& handler) -> schedulers::optional_delay_to { + auto value_or_duration = handler.disposable->extract_value_or_time(); if (auto* timepoint = std::get_if(&value_or_duration)) return schedulers::optional_delay_to{*timepoint}; if (auto* value = std::get_if(&value_or_duration)) - handler.state->get_observer_under_lock()->on_next(std::move(*value)); + handler.disposable->get_observer_under_lock()->on_next(std::move(*value)); return std::nullopt; }, - debounce_state_wrapper{this->wrapper_from_this().lock()}); + debounce_disposable_wrapper{this->wrapper_from_this().lock()}); } std::variant extract_value_or_time() @@ -107,38 +108,38 @@ namespace rpp::operators::details std::optional m_value_to_be_emitted{}; }; - template + template struct debounce_observer_strategy { static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; - std::shared_ptr> state{}; + std::shared_ptr> disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->get_observer_under_lock()->set_upstream(d); + disposable->add(d); } bool is_disposed() const { - return state->is_disposed(); + return disposable->is_disposed(); } template void on_next(T&& v) const { - state->emplace_safe(std::forward(v)); + disposable->emplace_safe(std::forward(v)); } void on_error(const std::exception_ptr& err) const noexcept { - state->get_observer_under_lock()->on_error(err); + disposable->get_observer_under_lock()->on_error(err); } void on_completed() const noexcept { - const auto observer = state->get_observer_under_lock(); - if (const auto value = state->extract_value()) + const auto observer = disposable->get_observer_under_lock(); + if (const auto value = disposable->extract_value()) observer->on_next(std::move(value).value()); observer->on_completed(); } @@ -154,20 +155,22 @@ namespace rpp::operators::details }; template - using updated_optimal_disposables_strategy = typename Prev::template add<1>; + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; - template - auto lift(Observer&& observer) const + template + auto lift_with_disposable_strategy(Observer&& observer) const { - using worker_t = rpp::schedulers::utils::get_worker_t; + using worker_t = rpp::schedulers::utils::get_worker_t; + using container = typename DisposableStrategy::disposables_container; - auto d = rpp::disposable_wrapper_impl, worker_t>>::make(std::forward(observer), scheduler.create_worker(), duration); - auto ptr = d.lock(); - ptr->get_observer_under_lock()->set_upstream(d.as_weak()); - return rpp::observer, worker_t>>{std::move(ptr)}; + + const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); + auto ptr = disposable.lock(); + ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); + return rpp::observer, worker_t, container>>{std::move(ptr)}; } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 8b924ee4b..010eb968a 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -34,12 +35,12 @@ namespace rpp::operators::details rpp::schedulers::time_point time_point{}; }; - template - struct delay_state final + template + struct delay_disposable final : public rpp::composite_disposable_impl { using T = rpp::utils::extract_observer_type_t; - delay_state(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration delay) + delay_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration delay) : observer(std::move(in_observer)) , worker{std::move(in_worker)} , delay{delay} @@ -55,31 +56,30 @@ namespace rpp::operators::details bool is_active{}; }; - template - struct delay_state_wrapper + template + struct delay_disposable_wrapper { - std::shared_ptr> state{}; + std::shared_ptr> disposable{}; - bool is_disposed() const { return state->observer.is_disposed(); } + bool is_disposed() const { return disposable->is_disposed(); } - void on_error(const std::exception_ptr& err) const { state->observer.on_error(err); } + void on_error(const std::exception_ptr& err) const { disposable->observer.on_error(err); } }; - template + template struct delay_observer_strategy { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; - - std::shared_ptr> state{}; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + std::shared_ptr> disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->observer.set_upstream(d); + disposable->add(d); } bool is_disposed() const { - return state->observer.is_disposed(); + return disposable->is_disposed(); } template @@ -104,59 +104,59 @@ namespace rpp::operators::details { if (const auto tp = emplace_safe(std::forward(value))) { - state->worker.schedule( + disposable->worker.schedule( tp.value(), - [](const delay_state_wrapper& wrapper) { return drain_queue(wrapper.state); }, - delay_state_wrapper{state}); + [](const delay_disposable_wrapper& wrapper) { return drain_queue(wrapper.disposable); }, + delay_disposable_wrapper{disposable}); } } template std::optional emplace_safe(TT&& item) const { - std::lock_guard lock{state->mutex}; + std::lock_guard lock{disposable->mutex}; if constexpr (ClearOnError && rpp::constraint::decayed_same_as) { - state->queue = std::queue>>{}; - state->observer.on_error(std::forward(item)); + disposable->queue = std::queue>>{}; + disposable->observer.on_error(std::forward(item)); return std::nullopt; } else { - const auto tp = state->worker.now() + state->delay; - state->queue.emplace(std::forward(item), tp); - if (!state->is_active) + const auto tp = disposable->worker.now() + disposable->delay; + disposable->queue.emplace(std::forward(item), tp); + if (!disposable->is_active) { - state->is_active = true; + disposable->is_active = true; return tp; } return std::nullopt; } } - static schedulers::optional_delay_to drain_queue(const std::shared_ptr>& state) + static schedulers::optional_delay_to drain_queue(const std::shared_ptr>& disposable) { while (true) { - std::unique_lock lock{state->mutex}; - if (state->queue.empty()) + std::unique_lock lock{disposable->mutex}; + if (disposable->queue.empty()) { - state->is_active = false; + disposable->is_active = false; return std::nullopt; } - auto& top = state->queue.front(); - if (top.time_point > state->worker.now()) + auto& top = disposable->queue.front(); + if (top.time_point > disposable->worker.now()) return schedulers::optional_delay_to{top.time_point}; auto item = std::move(top.value); - state->queue.pop(); + disposable->queue.pop(); lock.unlock(); - std::visit(rpp::utils::overloaded{[&](rpp::utils::extract_observer_type_t&& v) { state->observer.on_next(std::move(v)); }, - [&](const std::exception_ptr& err) { state->observer.on_error(err); }, + std::visit(rpp::utils::overloaded{[&](rpp::utils::extract_observer_type_t&& v) { disposable->observer.on_next(std::move(v)); }, + [&](const std::exception_ptr& err) { disposable->observer.on_error(err); }, [&](rpp::utils::none) { - state->observer.on_completed(); + disposable->observer.on_completed(); }}, std::move(item)); } @@ -173,18 +173,21 @@ namespace rpp::operators::details }; template - using updated_optimal_disposables_strategy = Prev; + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; - template - auto lift(Observer&& observer) const + template + auto lift_with_disposable_strategy(Observer&& observer) const { - using worker_t = rpp::schedulers::utils::get_worker_t; + using worker_t = rpp::schedulers::utils::get_worker_t; + using container = typename DisposableStrategy::disposables_container; - auto state = std::make_shared, worker_t>>(std::forward(observer), scheduler.create_worker(), duration); - return rpp::observer, worker_t, ClearOnError>>{std::move(state)}; + const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); + auto ptr = disposable.lock(); + ptr->observer.set_upstream(disposable.as_weak()); + return rpp::observer, worker_t, container, ClearOnError>>{std::move(ptr)}; } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 89fbe5f60..af3ca98ee 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -22,11 +23,10 @@ namespace rpp::operators::details { template - class combining_state : public rpp::details::enable_wrapper_from_this> - , public rpp::details::base_disposable + class combining_disposable : public composite_disposable { public: - explicit combining_state(Observer&& observer, size_t on_completed_needed) + explicit combining_disposable(Observer&& observer, size_t on_completed_needed) : m_observer_with_mutex{std::move(observer)} , m_on_completed_needed{on_completed_needed} { @@ -46,36 +46,36 @@ namespace rpp::operators::details std::atomic_size_t m_on_completed_needed; }; - template + template struct combining_observer_strategy { static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; - std::shared_ptr state{}; + std::shared_ptr disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->get_observer_under_lock()->set_upstream(d); + disposable->add(d); } bool is_disposed() const { - return state->is_disposed(); + return disposable->is_disposed(); } void on_error(const std::exception_ptr& err) const { - state->get_observer_under_lock()->on_error(err); + disposable->get_observer_under_lock()->on_error(err); } void on_completed() const { - if (state->decrement_on_completed()) - state->get_observer_under_lock()->on_completed(); + if (disposable->decrement_on_completed()) + disposable->get_observer_under_lock()->on_completed(); } }; - template typename TState, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> + template typename TDisposable, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> struct combining_operator_t { RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables; @@ -92,7 +92,7 @@ namespace rpp::operators::details }; template - using updated_optimal_disposables_strategy = ::rpp::details::observables::default_disposables_strategy; // TODO: sum of Prev + TObservables + using updated_optimal_disposables_strategy = ::rpp::details::observables::fixed_disposables_strategy<0>; template auto lift(Observer&& observer) const @@ -104,21 +104,21 @@ namespace rpp::operators::details template static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { - using State = TState...>; + using Disposable = TDisposable...>; - const auto d = rpp::disposable_wrapper_impl::make(std::forward(observer), selector); - auto state = d.lock(); - state->get_observer_under_lock()->set_upstream(d.as_weak()); + const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); + auto locked = disposable.lock(); + locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); - subscribe>(state, std::index_sequence_for{}, observables...); + subscribe>(locked, std::index_sequence_for{}, observables...); - return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(state)}; + return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(locked)}; } template - static void subscribe(const std::shared_ptr...>>& state, std::index_sequence, const TObservables&... observables) + static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) { - (..., observables.subscribe(rpp::observer, TStrategy...>>{state})); + (..., observables.subscribe(rpp::observer, TStrategy...>>{disposable})); } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index ed5a4ae61..ee480a924 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -23,13 +24,12 @@ namespace rpp::operators::details { template - class merge_state final + class merge_disposable final : public composite_disposable { public: - merge_state(TObserver&& observer) + merge_disposable(TObserver&& observer) : m_observer(std::move(observer)) { - get_observer_under_lock()->set_upstream(m_disposable); } // just need atomicity, not guarding anything @@ -40,11 +40,8 @@ namespace rpp::operators::details rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer; } - const rpp::composite_disposable_wrapper& get_disposable() const { return m_disposable; } - private: rpp::utils::value_with_mutex m_observer{}; - rpp::composite_disposable_wrapper m_disposable = composite_disposable_wrapper::make(); std::atomic_size_t m_on_completed_needed{1}; }; @@ -52,50 +49,49 @@ namespace rpp::operators::details struct merge_observer_base_strategy { static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; - - merge_observer_base_strategy(std::shared_ptr>&& state) - : m_state{std::move(state)} + merge_observer_base_strategy(std::shared_ptr>&& disposable) + : m_disposable{std::move(disposable)} { } - merge_observer_base_strategy(const std::shared_ptr>& state) - : m_state{state} + merge_observer_base_strategy(const std::shared_ptr>& disposable) + : m_disposable{disposable} { } void set_upstream(const rpp::disposable_wrapper& d) const { - m_state->get_disposable().add(d); + m_disposable->add(d); m_disposables.push_back(d); } bool is_disposed() const { - return m_state->get_disposable().is_disposed(); + return m_disposable->is_disposed(); } void on_error(const std::exception_ptr& err) const { - m_state->get_observer_under_lock()->on_error(err); + m_disposable->get_observer_under_lock()->on_error(err); } void on_completed() const { - if (m_state->decrement_on_completed()) + if (m_disposable->decrement_on_completed()) { - m_state->get_observer_under_lock()->on_completed(); + m_disposable->get_observer_under_lock()->on_completed(); } else { for (const auto& v : m_disposables) { - m_state->get_disposable().remove(v); + m_disposable->remove(v); } } } protected: - std::shared_ptr> m_state; + std::shared_ptr> m_disposable; mutable std::vector m_disposables{}; }; @@ -107,7 +103,7 @@ namespace rpp::operators::details template void on_next(T&& v) const { - merge_observer_base_strategy::m_state->get_observer_under_lock()->on_next(std::forward(v)); + merge_observer_base_strategy::m_disposable->get_observer_under_lock()->on_next(std::forward(v)); } }; @@ -116,15 +112,24 @@ namespace rpp::operators::details { public: explicit merge_observer_strategy(TObserver&& observer) - : merge_observer_base_strategy{std::make_shared>(std::move(observer))} + : merge_observer_base_strategy{init_state(std::move(observer))} { } template void on_next(T&& v) const { - merge_observer_base_strategy::m_state->increment_on_completed(); - std::forward(v).subscribe(rpp::observer, merge_observer_inner_strategy>{merge_observer_inner_strategy{merge_observer_base_strategy::m_state}}); + merge_observer_base_strategy::m_disposable->increment_on_completed(); + std::forward(v).subscribe(rpp::observer, merge_observer_inner_strategy>{merge_observer_inner_strategy{merge_observer_base_strategy::m_disposable}}); + } + + private: + static std::shared_ptr> init_state(TObserver&& observer) + { + const auto d = disposable_wrapper_impl>::make(std::move(observer)); + auto ptr = d.lock(); + ptr->get_observer_under_lock()->set_upstream(d.as_weak()); + return ptr; } }; diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 87ce69773..aa5930969 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -17,6 +17,18 @@ namespace rpp::operators::details { + template + struct on_error_resume_next_disposable final : public rpp::composite_disposable + { + on_error_resume_next_disposable(TObserver&& observer) + : rpp::composite_disposable{} + , observer(std::move(observer)) + { + } + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + }; + template struct on_error_resume_next_inner_observer_strategy { @@ -52,43 +64,52 @@ namespace rpp::operators::details static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; on_error_resume_next_observer_strategy(TObserver&& observer, const Selector& selector) - : state{std::make_shared(std::move(observer))} + : state{init_state(std::move(observer))} , selector{selector} { } - std::shared_ptr state; - RPP_NO_UNIQUE_ADDRESS Selector selector; + std::shared_ptr> state; + RPP_NO_UNIQUE_ADDRESS Selector selector; template void on_next(T&& v) const { - state->on_next(std::forward(v)); + state->observer.on_next(std::forward(v)); } void on_error(const std::exception_ptr& err) const { try { - selector(err).subscribe(on_error_resume_next_inner_observer_strategy{state}); + selector(err).subscribe(on_error_resume_next_inner_observer_strategy{std::shared_ptr(state, &state->observer)}); } catch (...) { - state->on_error(std::current_exception()); + state->observer.on_error(std::current_exception()); } + state->dispose(); } void on_completed() const { - state->on_completed(); + state->observer.on_completed(); } void set_upstream(const disposable_wrapper& d) const { - state->set_upstream(d); + state->add(d); } bool is_disposed() const { return state->is_disposed(); } + + static std::shared_ptr> init_state(TObserver&& observer) + { + const auto d = disposable_wrapper_impl>::make(std::move(observer)); + auto ptr = d.lock(); + ptr->observer.set_upstream(d.as_weak()); + return ptr; + } }; template diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index 73797acdc..cb6700cde 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -18,7 +18,7 @@ namespace rpp::operators::details { template - struct retry_state_t final + struct retry_state_t final : public rpp::composite_disposable { retry_state_t(TObserver&& in_observer, const Observable& observable, std::optional count) : count{count} @@ -26,7 +26,6 @@ namespace rpp::operators::details , observable(observable) { - observer.set_upstream(disposable); } std::optional count; @@ -34,8 +33,6 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS Observable observable; - - rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); }; template @@ -64,7 +61,7 @@ namespace rpp::operators::details return; } - state->disposable.clear(); + state->clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -80,16 +77,16 @@ namespace rpp::operators::details void set_upstream(const disposable_wrapper& d) const { - state->disposable.add(d); + state->add(d); } - bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } + bool is_disposed() const { return locally_disposed || state->is_disposed(); } }; template void drain(const std::shared_ptr>& state) { - while (!state->disposable.is_disposed()) + while (!state->is_disposed()) { if (state->count) --state->count.value(); @@ -126,7 +123,10 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observble) const { - const auto ptr = std::make_shared, std::decay_t>>(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); + const auto d = disposable_wrapper_impl, std::decay_t>>::make(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); + auto ptr = d.lock(); + + ptr->observer.set_upstream(d.as_weak()); drain(ptr); } }; diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index 79155d8aa..b1bea7e8e 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -23,14 +23,13 @@ namespace rpp::operators::details template - struct retry_when_state final + struct retry_when_state final : public rpp::composite_disposable { - retry_when_state(TObserver&& in_observer, const TObservable& observable, const TNotifier& notifier) - : observer(std::move(in_observer)) + retry_when_state(TObserver&& observer, const TObservable& observable, const TNotifier& notifier) + : observer(std::move(observer)) , observable(observable) , notifier(notifier) { - observer.set_upstream(disposable); } std::atomic_bool is_inside_drain{}; @@ -38,8 +37,6 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS TObservable observable; RPP_NO_UNIQUE_ADDRESS TNotifier notifier; - - rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); }; template @@ -59,7 +56,7 @@ namespace rpp::operators::details void on_next(T&&) const { locally_disposed = true; - state->disposable.clear(); + state->clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -79,9 +76,9 @@ namespace rpp::operators::details state->observer.on_completed(); } - void set_upstream(const disposable_wrapper& d) const { state->disposable.add(d); } + void set_upstream(const disposable_wrapper& d) const { state->add(d); } - bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } + bool is_disposed() const { return locally_disposed || state->is_disposed(); } }; templateobserver.on_completed(); } - void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); } + void set_upstream(const disposable_wrapper& d) { state->add(d); } - bool is_disposed() const { return state->disposable.is_disposed(); } + bool is_disposed() const { return state->is_disposed(); } }; template void drain(const std::shared_ptr>& state) { - while (!state->disposable.is_disposed()) + while (!state->is_disposed()) { state->is_inside_drain.store(true, std::memory_order::seq_cst); try @@ -160,7 +157,10 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observable) const { - const auto ptr = std::make_shared, std::decay_t, std::decay_t>>(std::forward(observer), std::forward(observable), notifier); + const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), std::forward(observable), notifier); + auto ptr = d.lock(); + + ptr->observer.set_upstream(d.as_weak()); drain(ptr); } }; diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index 6a9933953..a9b6d03bb 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -13,21 +13,22 @@ #include #include +#include #include #include namespace rpp::operators::details { template - class take_until_state final + class take_until_disposable final : public rpp::composite_disposable { public: - take_until_state(TObserver&& observer) + take_until_disposable(TObserver&& observer) : m_observer_with_mutex(std::move(observer)) { } - take_until_state(const TObserver& observer) + take_until_disposable(const TObserver& observer) : m_observer_with_mutex(observer) { } @@ -48,7 +49,7 @@ namespace rpp::operators::details { static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; - std::shared_ptr> state; + std::shared_ptr> state; void on_error(const std::exception_ptr& err) const { @@ -62,9 +63,9 @@ namespace rpp::operators::details state->get_observer()->on_completed(); } - void set_upstream(const disposable_wrapper& d) { state->get_observer()->set_upstream(d); } + void set_upstream(const disposable_wrapper& d) { state->add(d); } - bool is_disposed() const { return state->get_observer()->is_disposed(); } + bool is_disposed() const { return state->is_disposed(); } }; template @@ -103,12 +104,14 @@ namespace rpp::operators::details }; template - using updated_optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy; + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; template auto lift(Observer&& observer) const { - auto ptr = std::make_shared>>(std::forward(observer)); + const auto d = disposable_wrapper_impl>>::make(std::forward(observer)); + auto ptr = d.lock(); + ptr->get_observer()->set_upstream(d.as_weak()); observable.subscribe(take_until_throttle_observer_strategy>{ptr}); return rpp::observer>>(std::move(ptr)); diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index f952d43f8..f55bfa3c9 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -23,57 +23,53 @@ namespace rpp::operators::details { template - class with_latest_from_state final + class with_latest_from_disposable final : public composite_disposable { public: - explicit with_latest_from_state(Observer&& observer, const TSelector& selector) + explicit with_latest_from_disposable(Observer&& observer, const TSelector& selector) : m_observer_with_mutex{std::move(observer)} , m_selector{selector} { - get_observer_under_lock()->set_upstream(m_disposable); } rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer_with_mutex; } rpp::utils::tuple>...>& get_values() { return m_values; } - const TSelector& get_selector() const { return m_selector; } - const composite_disposable_wrapper& get_disposable() const { return m_disposable; } + const TSelector& get_selector() const { return m_selector; } private: rpp::utils::value_with_mutex m_observer_with_mutex{}; rpp::utils::tuple>...> m_values{}; - composite_disposable_wrapper m_disposable = composite_disposable_wrapper::make(); RPP_NO_UNIQUE_ADDRESS TSelector m_selector; }; template struct with_latest_from_inner_observer_strategy { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; - - std::shared_ptr> state{}; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + std::shared_ptr> disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->get_disposable().add(d); + disposable->add(d); } bool is_disposed() const { - return state->get_disposable().is_disposed(); + return disposable->is_disposed(); } template void on_next(T&& v) const { - auto locked_value = state->get_values().template get().lock(); + auto locked_value = disposable->get_values().template get().lock(); locked_value->emplace(std::forward(v)); } void on_error(const std::exception_ptr& err) const { - state->get_observer_under_lock()->on_error(err); + disposable->get_observer_under_lock()->on_error(err); } static constexpr rpp::utils::empty_function_t<> on_completed{}; @@ -83,26 +79,26 @@ namespace rpp::operators::details requires std::invocable struct with_latest_from_observer_strategy { - using Disposable = with_latest_from_state; + using Disposable = with_latest_from_disposable; using Result = std::invoke_result_t; static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; - std::shared_ptr state{}; + std::shared_ptr disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->get_disposable().add(d); + disposable->add(d); } bool is_disposed() const { - return state->get_disposable().is_disposed(); + return disposable->is_disposed(); } template void on_next(T&& v) const { - auto result = state->get_values().apply([&d = this->state, &v](rpp::utils::value_with_mutex>&... vals) -> std::optional { + auto result = disposable->get_values().apply([&d = this->disposable, &v](rpp::utils::value_with_mutex>&... vals) -> std::optional { auto lock = std::scoped_lock{vals.get_mutex()...}; if ((vals.get_value_unsafe().has_value() && ...)) @@ -111,17 +107,17 @@ namespace rpp::operators::details }); if (result.has_value()) - state->get_observer_under_lock()->on_next(std::move(result).value()); + disposable->get_observer_under_lock()->on_next(std::move(result).value()); } void on_error(const std::exception_ptr& err) const { - state->get_observer_under_lock()->on_error(err); + disposable->get_observer_under_lock()->on_error(err); } void on_completed() const { - state->get_observer_under_lock()->on_completed(); + disposable->get_observer_under_lock()->on_completed(); } }; @@ -142,7 +138,7 @@ namespace rpp::operators::details }; template - using updated_optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy; + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; template auto lift(Observer&& observer) const @@ -154,18 +150,20 @@ namespace rpp::operators::details template static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { - using State = with_latest_from_state...>; + using Disposable = with_latest_from_disposable...>; - auto ptr = std::make_shared(std::forward(observer), selector); + const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); + auto ptr = disposable.lock(); + ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); subscribe(ptr, std::index_sequence_for{}, observables...); return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(ptr)}; } template - static void subscribe(const std::shared_ptr...>>& state, std::index_sequence, const TObservables&... observables) + static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) { - (..., observables.subscribe(rpp::observer, with_latest_from_inner_observer_strategy...>>{state})); + (..., observables.subscribe(rpp::observer, with_latest_from_inner_observer_strategy...>>{disposable})); } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp index ec29fd04e..71d0369d0 100644 --- a/src/rpp/rpp/operators/zip.hpp +++ b/src/rpp/rpp/operators/zip.hpp @@ -21,11 +21,11 @@ namespace rpp::operators::details { template - class zip_state final : public combining_state + class zip_disposable final : public combining_disposable { public: - explicit zip_state(Observer&& observer, const TSelector& selector) - : combining_state(std::move(observer), sizeof...(Args)) + explicit zip_disposable(Observer&& observer, const TSelector& selector) + : combining_disposable(std::move(observer), sizeof...(Args)) , m_selector(selector) { } @@ -42,17 +42,17 @@ namespace rpp::operators::details template struct zip_observer_strategy final - : public combining_observer_strategy> + : public combining_observer_strategy> { - using combining_observer_strategy>::state; + using combining_observer_strategy>::disposable; template void on_next(T&& v) const { - const auto observer = state->get_observer_under_lock(); - state->get_pendings().template get().push_back(std::forward(v)); + const auto observer = disposable->get_observer_under_lock(); + disposable->get_pendings().template get().push_back(std::forward(v)); - state->get_pendings().apply(&apply_impl, state, observer); + disposable->get_pendings().apply(&apply_impl, disposable, observer); } private: @@ -68,7 +68,7 @@ namespace rpp::operators::details }; template - struct zip_t : public combining_operator_t + struct zip_t : public combining_operator_t { }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index d8e74722d..b22f86c85 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -22,13 +22,12 @@ namespace rpp::details { template - struct concat_state_t + struct concat_state_t : public rpp::composite_disposable { concat_state_t(TObserver&& in_observer, const PackedContainer& in_container) : observer(std::move(in_observer)) , container(in_container) { - observer.set_upstream(disposable); try { itr = std::cbegin(container); @@ -41,7 +40,6 @@ namespace rpp::details RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS PackedContainer container; - rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); std::optional itr{}; std::atomic is_inside_drain{}; }; @@ -69,14 +67,14 @@ namespace rpp::details state->observer.on_error(err); } - void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); } + void set_upstream(const disposable_wrapper& d) { state->add(d); } - bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } + bool is_disposed() const { return locally_disposed || state->is_disposed(); } void on_completed() const { locally_disposed = true; - state->disposable.clear(); + state->clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -88,7 +86,7 @@ namespace rpp::details template void drain(const std::shared_ptr>& state) { - while (!state->disposable.is_disposed()) + while (!state->is_disposed()) { if (state->itr.value() == std::cend(state->container)) { @@ -133,7 +131,10 @@ namespace rpp::details template Strategy> void subscribe(observer&& obs) const { - drain(std::make_shared, PackedContainer>>(std::move(obs), container)); + const auto d = disposable_wrapper_impl, PackedContainer>>::make(std::move(obs), container); + const auto state = d.lock(); + state->observer.set_upstream(d.as_weak()); + drain(state); } }; From b79ce00b5f8565b1a4ea12d702b38033466a22a7 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 3 Nov 2024 23:46:40 +0300 Subject: [PATCH 02/20] fix --- src/rpp/rpp/operators/debounce.hpp | 2 +- src/rpp/rpp/operators/delay.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 8e0d3e916..5d4e0cb82 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -161,7 +161,7 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; template - auto lift_with_disposable_strategy(Observer&& observer) const + auto lift_with_disposables_strategy(Observer&& observer) const { using worker_t = rpp::schedulers::utils::get_worker_t; using container = typename DisposableStrategy::disposables_container; diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 010eb968a..ce7dd6a4e 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -179,7 +179,7 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; template - auto lift_with_disposable_strategy(Observer&& observer) const + auto lift_with_disposables_strategy(Observer&& observer) const { using worker_t = rpp::schedulers::utils::get_worker_t; using container = typename DisposableStrategy::disposables_container; From c4decc81d2591387349d606cd18f4accb207165b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 3 Nov 2024 23:49:13 +0300 Subject: [PATCH 03/20] fix --- src/rpp/rpp/observables/details/chain_strategy.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/rpp/rpp/observables/details/chain_strategy.hpp b/src/rpp/rpp/observables/details/chain_strategy.hpp index 9bb42e98c..1aac6a425 100644 --- a/src/rpp/rpp/observables/details/chain_strategy.hpp +++ b/src/rpp/rpp/observables/details/chain_strategy.hpp @@ -52,7 +52,10 @@ namespace rpp::details::observables else if constexpr (rpp::constraint::operator_lift) m_strategies.subscribe(m_strategy.template lift(std::forward(observer))); else + { + static_assert(rpp::constraint::operator_subscribe); m_strategy.subscribe(std::forward(observer), m_strategies); + } } private: From 652e362a9fa8ded69ba959249cdd7c7eea3751dd Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 3 Nov 2024 23:56:07 +0300 Subject: [PATCH 04/20] fix --- src/rpp/rpp/operators/details/combining_strategy.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index af3ca98ee..8484528e7 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -92,7 +92,7 @@ namespace rpp::operators::details }; template - using updated_optimal_disposables_strategy = ::rpp::details::observables::fixed_disposables_strategy<0>; + using updated_optimal_disposables_strategy = ::rpp::details::observables::fixed_disposables_strategy<1>; template auto lift(Observer&& observer) const From 1cccfe4897922169f9d4743eb9b17d870012066b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 4 Nov 2024 00:10:25 +0300 Subject: [PATCH 05/20] fix --- src/tests/utils/disposable_observable.hpp | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index b457d07f0..afb2f66cf 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -82,7 +82,7 @@ void test_operator_over_observable_finish_before_dispose(auto&& op) template void test_operator_over_observable_with_disposable(auto&& op) { - SUBCASE("operator disposes disposable") + SUBCASE("operator disposes disposable but not too early") { auto observable_disposable = rpp::composite_disposable_wrapper::make(); std::optional> saved_observer{}; @@ -94,20 +94,6 @@ void test_operator_over_observable_with_disposable(auto&& op) auto observer_disposable = rpp::composite_disposable_wrapper::make(); op(observable) | rpp::ops::subscribe(observer_disposable, [](const auto&) {}); - observer_disposable.dispose(); - CHECK(observable_disposable.is_disposed()); - } - - SUBCASE("operator doesn't disposes disposable too early") - { - auto observable_disposable = rpp::composite_disposable_wrapper::make(); - auto observable = rpp::source::create([&observable_disposable](auto&& obs) { - obs.set_upstream(observable_disposable); - }); - - auto observer_disposable = rpp::composite_disposable_wrapper::make(); - op(observable) | rpp::ops::subscribe(observer_disposable, [](const auto&) {}); - CHECK(!observable_disposable.is_disposed()); observer_disposable.dispose(); CHECK(observable_disposable.is_disposed()); From 017bd10cdb3512595207235461752559589d070d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 4 Nov 2024 23:15:16 +0300 Subject: [PATCH 06/20] update deps --- cmake/dependencies.cmake | 4 ++-- conanfile.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index bdfb62880..469278cf2 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -116,13 +116,13 @@ endif() # ===================== Tests =================== if (RPP_BUILD_TESTS) rpp_fetch_library(doctest https://github.com/doctest/doctest.git v2.4.11) - rpp_fetch_library(trompeloeil https://github.com/rollbear/trompeloeil.git main) + rpp_fetch_library(trompeloeil https://github.com/rollbear/trompeloeil.git v48) endif() # ==================== Nanobench ================= if (RPP_BUILD_BENCHMARKS) - rpp_fetch_library(nanobench https://github.com/martinus/nanobench.git master) + rpp_fetch_library(nanobench https://github.com/martinus/nanobench.git v4.3.11) endif() # ==================== ASIO ===================== diff --git a/conanfile.py b/conanfile.py index 67937cd5a..f8fbc9a53 100644 --- a/conanfile.py +++ b/conanfile.py @@ -25,7 +25,7 @@ class RppConan(ConanFile): def requirements(self): if self.options.with_tests: - self.requires("trompeloeil/47") + self.requires("trompeloeil/48") self.requires("doctest/2.4.11") if self.options.with_benchmarks: From c525a796e3c343a29fe24d0461a66c5ea9768ca6 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 4 Nov 2024 23:58:54 +0300 Subject: [PATCH 07/20] new changes --- src/benchmarks/benchmarks.cpp | 20 ++++++++++++------- src/rpp/rpp/operators/concat.hpp | 8 +++++--- src/rpp/rpp/operators/delay.hpp | 4 +++- .../operators/details/combining_strategy.hpp | 1 + src/rpp/rpp/operators/merge.hpp | 3 ++- src/rpp/rpp/operators/take_until.hpp | 3 +-- 6 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index f882ebc2f..228a33434 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -65,6 +65,7 @@ namespace rpp } } // namespace rpp +#ifdef RPP_BUILD_RXCPP namespace rxcpp { template @@ -73,16 +74,17 @@ namespace rxcpp return rxcpp::observable<>::from(rxcpp::identity_immediate(), std::forward(vals)...); } } // namespace rxcpp +#endif int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) { - auto bench = ankerl::nanobench::Bench{}.output(nullptr).warmup(3); - const auto args = std::span{argv, static_cast(argc)}; - const auto benchmark = find_argument("--benchmark=", args); - const auto section = find_argument("--section=", args); - const auto disable_rxcpp = find_argument("--disable_rxcpp", args).has_value(); - const auto disable_rpp = find_argument("--disable_rpp", args).has_value(); - const auto dump = find_argument("--dump=", args); + auto bench = ankerl::nanobench::Bench{}.output(nullptr).warmup(3); + const auto args = std::span{argv, static_cast(argc)}; + const auto benchmark = find_argument("--benchmark=", args); + const auto section = find_argument("--section=", args); + [[maybe_unused]] const auto disable_rxcpp = find_argument("--disable_rxcpp", args).has_value(); + const auto disable_rpp = find_argument("--disable_rpp", args).has_value(); + const auto dump = find_argument("--dump=", args); BENCHMARK("General") { @@ -687,8 +689,10 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) }); } { +#ifdef RPP_BUILD_RXCPP rxcpp::subjects::subject rxcpp_subj{}; rxcpp_subj.get_observable().subscribe(rxcpp::make_subscriber([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }, [] {})); +#endif TEST_RXCPP([&]() { rxcpp_subj.get_subscriber().on_next(1); }); @@ -728,11 +732,13 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) } { +#ifdef RPP_BUILD_RXCPP rxcpp::subjects::subject rxcpp_subj{}; for (size_t i = 0; i < 100; ++i) { rxcpp_subj.get_observable().subscribe(rxcpp::make_subscriber([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); } +#endif TEST_RXCPP([&] { for (size_t i = 0; i < 100; ++i) rxcpp_subj.get_subscriber().on_next(i); diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 68534f80d..60e20d7dd 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -78,7 +78,6 @@ namespace rpp::operators::details bool handle_observable_impl(const rpp::constraint::decayed_same_as auto& observable, rpp::composite_disposable_wrapper refcounted) { stage().store(ConcatStage::Draining, std::memory_order::relaxed); - refcounted.clear(); observable.subscribe(concat_inner_observer_strategy{disposable_wrapper_impl{wrapper_from_this()}.lock(), std::move(refcounted)}); ConcatStage current = ConcatStage::Draining; @@ -132,7 +131,7 @@ namespace rpp::operators::details template struct concat_inner_observer_strategy : public concat_observer_strategy_base { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; using base = concat_observer_strategy_base; using base::concat_observer_strategy_base; @@ -145,6 +144,8 @@ namespace rpp::operators::details void on_completed() const { + base::refcounted.clear(); + ConcatStage current{ConcatStage::Draining}; if (base::disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst)) return; @@ -158,7 +159,8 @@ namespace rpp::operators::details template struct concat_observer_strategy : public concat_observer_strategy_base { - using base = concat_observer_strategy_base; + using base = concat_observer_strategy_base; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; concat_observer_strategy(TObserver&& observer) diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index ce7dd6a4e..3e3945dac 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -69,7 +69,7 @@ namespace rpp::operators::details template struct delay_observer_strategy { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr> disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const @@ -91,11 +91,13 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const noexcept { emplace(err); + disposable->clear(); } void on_completed() const noexcept { emplace(rpp::utils::none{}); + disposable->clear(); } private: diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 8484528e7..8bdbb6430 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -49,6 +49,7 @@ namespace rpp::operators::details template struct combining_observer_strategy { + // `Auto` due to we have to dispose disposables during on_completed anyway static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; std::shared_ptr disposable{}; diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index ee480a924..f14203ad0 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -48,7 +48,7 @@ namespace rpp::operators::details template struct merge_observer_base_strategy { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; merge_observer_base_strategy(std::shared_ptr>&& disposable) : m_disposable{std::move(disposable)} { @@ -86,6 +86,7 @@ namespace rpp::operators::details for (const auto& v : m_disposables) { m_disposable->remove(v); + v.dispose(); } } } diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index a9b6d03bb..e74d6b79c 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -33,7 +33,6 @@ namespace rpp::operators::details { } - void stop() { m_stopped = true; } bool is_stopped() const { return m_stopped; } bool stop_return_was_stopped() { return m_stopped.exchange(true); } @@ -47,7 +46,7 @@ namespace rpp::operators::details template struct take_until_observer_strategy_base { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; std::shared_ptr> state; From e4d2545f7a4353b8b36e6444437e8d68a5bb9155 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 4 Nov 2024 21:02:05 +0000 Subject: [PATCH 08/20] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .github/workflows/ci v2.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index ce296a023..52c7f878f 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -35,7 +35,7 @@ jobs: - uses: actions/checkout@v4 with: fetch-depth: 0 - + - name: Install deps if: matrix.config.os == 'ubuntu-latest' run: | From f54ed0d18b355cac88240ccdd146502e2e8e190c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 5 Nov 2024 18:06:27 +0300 Subject: [PATCH 09/20] try to fix jinja2 --- .github/workflows/ci v2.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index 8d13c8b85..5c178ccd4 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -40,6 +40,7 @@ jobs: if: matrix.config.os == 'ubuntu-latest' run: | pip3 install jinja2 + sudo apt-get install python3-jinja2 - name: get conan uses: turtlebrowser/get-conan@main From cf7d6661f55ded1014ee254d7e021d5cd4e4858a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 5 Nov 2024 18:21:00 +0300 Subject: [PATCH 10/20] Update ci v2.yml --- .github/workflows/ci v2.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index 5c178ccd4..9b1de686d 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -40,7 +40,8 @@ jobs: if: matrix.config.os == 'ubuntu-latest' run: | pip3 install jinja2 - sudo apt-get install python3-jinja2 + pip install jinja2 + sudo apt-get install python3-jinja2 python-jinja2 - name: get conan uses: turtlebrowser/get-conan@main From 48936b905424680f90afbc2bc261e7f0ee892f24 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 5 Nov 2024 23:14:49 +0300 Subject: [PATCH 11/20] Update ci v2.yml --- .github/workflows/ci v2.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index 9b1de686d..68f56eb2b 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -41,7 +41,7 @@ jobs: run: | pip3 install jinja2 pip install jinja2 - sudo apt-get install python3-jinja2 python-jinja2 + sudo apt-get install python3-jinja2 - name: get conan uses: turtlebrowser/get-conan@main From db6f58c0aff6ad2720fd7a8934cdad1dd7bf9839 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 5 Nov 2024 23:35:18 +0300 Subject: [PATCH 12/20] Update ci v2.yml --- .github/workflows/ci v2.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index 68f56eb2b..152cec8a7 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -39,8 +39,8 @@ jobs: - name: Install deps if: matrix.config.os == 'ubuntu-latest' run: | - pip3 install jinja2 - pip install jinja2 + sudo pip3 install jinja2 + sudo pip install jinja2 sudo apt-get install python3-jinja2 - name: get conan From a3145c7ab53f050c3652ef0275b5bc4b0fe417f3 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 5 Nov 2024 23:50:10 +0300 Subject: [PATCH 13/20] Update ci v2.yml --- .github/workflows/ci v2.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index 152cec8a7..5f08c1722 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -39,9 +39,9 @@ jobs: - name: Install deps if: matrix.config.os == 'ubuntu-latest' run: | - sudo pip3 install jinja2 - sudo pip install jinja2 sudo apt-get install python3-jinja2 + sudo pip3 install jinja2 --upgrade + sudo pip install jinja2 --upgrade - name: get conan uses: turtlebrowser/get-conan@main From 793aac4a8a7e3957a47786a6d42019d4478e7ee5 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 5 Nov 2024 23:58:47 +0300 Subject: [PATCH 14/20] Update ci v2.yml --- .github/workflows/ci v2.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index 5f08c1722..5938da418 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -36,6 +36,12 @@ jobs: with: fetch-depth: 0 + - name: Install Qt + # if: steps.check_cache.outputs.cache-hit != 'true' || github.event_name == 'schedule' + uses: jurplel/install-qt-action@v4 + with: + cache: true + - name: Install deps if: matrix.config.os == 'ubuntu-latest' run: | @@ -57,12 +63,6 @@ jobs: restore-keys: deps-${{ matrix.config.name }}-${{ matrix.build_type.config }} lookup-only: true - - name: Install Qt - if: steps.check_cache.outputs.cache-hit != 'true' || github.event_name == 'schedule' - uses: jurplel/install-qt-action@v4 - with: - cache: true - - name: conan detect profile if: steps.check_cache.outputs.cache-hit != 'true' || github.event_name == 'schedule' run: | From af429e45e038d9e63681d5451bfad21a01d944cb Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 6 Nov 2024 00:00:07 +0300 Subject: [PATCH 15/20] Update ci v2.yml --- .github/workflows/ci v2.yml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index 5938da418..a3818085f 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -42,13 +42,6 @@ jobs: with: cache: true - - name: Install deps - if: matrix.config.os == 'ubuntu-latest' - run: | - sudo apt-get install python3-jinja2 - sudo pip3 install jinja2 --upgrade - sudo pip install jinja2 --upgrade - - name: get conan uses: turtlebrowser/get-conan@main From 3b02d7333b8b2fae95d1f2c75bef3b5c2c630949 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 6 Nov 2024 00:08:09 +0300 Subject: [PATCH 16/20] Update ci v2.yml --- .github/workflows/ci v2.yml | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index a3818085f..dff033c8a 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -36,11 +36,10 @@ jobs: with: fetch-depth: 0 - - name: Install Qt - # if: steps.check_cache.outputs.cache-hit != 'true' || github.event_name == 'schedule' - uses: jurplel/install-qt-action@v4 - with: - cache: true + - name: Update deps + if: matrix.config.os == 'ubuntu-latest' + run: | + sudo apt-get update - name: get conan uses: turtlebrowser/get-conan@main @@ -56,6 +55,12 @@ jobs: restore-keys: deps-${{ matrix.config.name }}-${{ matrix.build_type.config }} lookup-only: true + - name: Install Qt + if: steps.check_cache.outputs.cache-hit != 'true' || github.event_name == 'schedule' + uses: jurplel/install-qt-action@v4 + with: + cache: true + - name: conan detect profile if: steps.check_cache.outputs.cache-hit != 'true' || github.event_name == 'schedule' run: | From c6fe6daa6d5e92583f9f720c602f3622cc37ff3f Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 6 Nov 2024 00:11:29 +0300 Subject: [PATCH 17/20] Update ci v2.yml --- .github/workflows/ci v2.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index dff033c8a..6105e54bb 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -36,10 +36,8 @@ jobs: with: fetch-depth: 0 - - name: Update deps - if: matrix.config.os == 'ubuntu-latest' - run: | - sudo apt-get update + - name: Setup Python + uses: actions/setup-python@v5 - name: get conan uses: turtlebrowser/get-conan@main From 29c8e1705f79c320a5036cd678f7740cd26e4192 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 5 Nov 2024 21:11:36 +0000 Subject: [PATCH 18/20] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .github/workflows/ci v2.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index 6105e54bb..64a604532 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -36,7 +36,7 @@ jobs: with: fetch-depth: 0 - - name: Setup Python + - name: Setup Python uses: actions/setup-python@v5 - name: get conan From a3714792dfd0afcb05719f2753b85a27dae3ff09 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 6 Nov 2024 00:12:55 +0300 Subject: [PATCH 19/20] Update ci v2.yml --- .github/workflows/ci v2.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index 64a604532..d3efbb4b4 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -36,8 +36,8 @@ jobs: with: fetch-depth: 0 - - name: Setup Python - uses: actions/setup-python@v5 + - uses: actions/setup-python@v5 + with: { python-version: "3.8" } - name: get conan uses: turtlebrowser/get-conan@main From 6095b8d9d977590df81650b1c89e0a76e512efb7 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 6 Nov 2024 00:22:56 +0300 Subject: [PATCH 20/20] Update ci v2.yml --- .github/workflows/ci v2.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci v2.yml b/.github/workflows/ci v2.yml index d3efbb4b4..8d4208783 100644 --- a/.github/workflows/ci v2.yml +++ b/.github/workflows/ci v2.yml @@ -37,7 +37,8 @@ jobs: fetch-depth: 0 - uses: actions/setup-python@v5 - with: { python-version: "3.8" } + with: + python-version: '3.6.x - 3.11.x' - name: get conan uses: turtlebrowser/get-conan@main