diff --git a/src/rpp/rpp/disposables/callback_disposable.hpp b/src/rpp/rpp/disposables/callback_disposable.hpp index 6e7442e10..646ecd730 100644 --- a/src/rpp/rpp/disposables/callback_disposable.hpp +++ b/src/rpp/rpp/disposables/callback_disposable.hpp @@ -12,6 +12,7 @@ #include #include +#include namespace rpp { @@ -42,8 +43,8 @@ class callback_disposable final : public details::base_disposable }; template -auto make_callback_disposable(Fn&& invocable) +disposable_wrapper make_callback_disposable(Fn&& invocable) { - return std::make_shared>>(std::forward(invocable)); + return disposable_wrapper::make>>(std::forward(invocable)); } } // namespace rpp \ No newline at end of file diff --git a/src/rpp/rpp/disposables/composite_disposable.hpp b/src/rpp/rpp/disposables/composite_disposable.hpp index ae41b93e0..ee6130a56 100644 --- a/src/rpp/rpp/disposables/composite_disposable.hpp +++ b/src/rpp/rpp/disposables/composite_disposable.hpp @@ -17,7 +17,6 @@ #include #include -#include namespace rpp { @@ -64,7 +63,7 @@ class composite_disposable_impl : public interface_composite_disposable void add(disposable_wrapper disposable) override { - if (disposable.is_disposed() || disposable.get_original().get() == this) + if (disposable.is_disposed() || disposable.lock().get() == this) return; while (true) diff --git a/src/rpp/rpp/disposables/disposable_wrapper.hpp b/src/rpp/rpp/disposables/disposable_wrapper.hpp index 99d6a9778..073b842c3 100644 --- a/src/rpp/rpp/disposables/disposable_wrapper.hpp +++ b/src/rpp/rpp/disposables/disposable_wrapper.hpp @@ -10,147 +10,227 @@ #pragma once +#include #include #include +#include #include #include -namespace rpp +namespace rpp::details { -/** - * @brief Wrapper over disposable_ptr to prevent manual checking over nullptr/is_disposed() - * @details Can keep weak_ptr in case of not owning disposable - * - * @ingroup disposables - */ template -class disposable_wrapper_impl +class enable_wrapper_from_this; + +template +class auto_dispose_wrapper final { - struct weak_tag +public: + static_assert(std::derived_from); + + template + requires (std::constructible_from && !rpp::constraint::variadic_decayed_same_as) + explicit auto_dispose_wrapper(TArgs&&... args) + : m_data{std::forward(args)...} { - }; + } - template TT = TDisposable> - explicit disposable_wrapper_impl(weak_tag, std::weak_ptr disposable) - requires std::derived_from - : m_disposable{std::move(disposable)} + auto_dispose_wrapper(const auto_dispose_wrapper&) = delete; + auto_dispose_wrapper(auto_dispose_wrapper&&) noexcept = delete; + + ~auto_dispose_wrapper() noexcept { + // static_cast(m_data).dispose_impl(rpp::interface_disposable::Mode::Destroying); } + TDisposable* get() { return &m_data; } + +private: + RPP_NO_UNIQUE_ADDRESS TDisposable m_data; +}; + +class disposable_wrapper_base +{ public: - disposable_wrapper_impl() = default; + bool operator==(const disposable_wrapper_base& other) const + { + return get().first == other.get().first; + } - template TT = TDisposable> - disposable_wrapper_impl(std::shared_ptr&& disposable) - requires std::derived_from - : m_disposable{std::static_pointer_cast(std::move(disposable))} + bool is_disposed() const noexcept { + if (const auto locked = get().first) + return locked->is_disposed(); + return true; } - template TT = TDisposable> - disposable_wrapper_impl(const std::shared_ptr& disposable) - requires std::derived_from - : m_disposable{std::static_pointer_cast(disposable)} + void dispose() const noexcept { + if (const auto locked = get().first) + locked->dispose(); } - static disposable_wrapper_impl from_shared(std::shared_ptr disposable) +protected: + explicit disposable_wrapper_base(std::shared_ptr&& disposable) + : m_disposable{std::move(disposable)} { - return disposable_wrapper_impl{std::move(disposable)}; } - static disposable_wrapper_impl from_weak(std::weak_ptr disposable) + explicit disposable_wrapper_base(std::weak_ptr&& disposable) + : m_disposable{std::move(disposable)} { - return disposable_wrapper_impl{weak_tag{}, std::move(disposable)}; } - bool operator==(const disposable_wrapper_impl& other) const + disposable_wrapper_base() = default; + + std::pair, bool> get() const noexcept { - return raw_pointer() == other.raw_pointer(); + if (const auto ptr_ptr = std::get_if>(&m_disposable)) + return {*ptr_ptr, true}; + + if (const auto ptr_ptr = std::get_if>(&m_disposable)) + return {ptr_ptr->lock(), false}; + + return {nullptr, true}; } - bool is_disposed() const noexcept +private: + std::variant, std::weak_ptr> m_disposable; +}; + +} + +namespace rpp +{ +/** + * @brief Wrapper to keep disposable. Any disposable have to be created right from this wrapper with help of `make` function. + * @details Member functions is safe to call even if internal disposable is gone. Also it provides access to "raw" shared_ptr and it can be nullptr in case of disposable empty/ptr gone. + * @details Can keep weak_ptr in case of not owning disposable + * + * @ingroup disposables + */ +template +class disposable_wrapper_impl final : public details::disposable_wrapper_base +{ + using TDefaultMake = std::conditional_t, composite_disposable, TDisposable>; +public: + template + friend class disposable_wrapper_impl; + + template + friend class details::enable_wrapper_from_this; + + bool operator==(const disposable_wrapper_impl&) const = default; + + /** + * @brief Way to create disposable_wrapper. Passed `TTarget` type can be any type derived from `TDisposable`. + */ + template TTarget = TDefaultMake, typename... TArgs> + requires (std::constructible_from) + static disposable_wrapper_impl make(TArgs&& ...args) { - if (const auto locked = get_original()) - return locked->is_disposed(); - return true; + const auto ptr = std::make_shared>(std::forward(args)...); + auto base_ptr = std::shared_ptr{ptr, static_cast(ptr->get())}; + if constexpr (rpp::utils::is_base_of_v) + { + base_ptr->set_weak_self(std::weak_ptr(base_ptr)); + } + return disposable_wrapper_impl{std::static_pointer_cast(std::move(base_ptr))}; } - void dispose() const noexcept + /** + * @brief Creates disposable_wrapper which behaves like disposed disposable + */ + static disposable_wrapper_impl empty() { - if (const auto locked = get_original()) - locked->dispose(); + return disposable_wrapper_impl{}; + } + + template + disposable_wrapper add(Fn&& invocable) requires std::derived_from + { + auto d = make_callback_disposable(std::forward(invocable)); + add(d); + return d; } - void add(disposable_wrapper other) const - requires std::derived_from + void add(disposable_wrapper other) const requires std::derived_from { - if (const auto locked = get_original()) + if (const auto locked = lock()) locked->add(std::move(other)); else other.dispose(); } - void remove(const disposable_wrapper& other) const - requires std::derived_from + void remove(const disposable_wrapper& other) const requires std::derived_from { - if (const auto locked = get_original()) + if (const auto locked = lock()) locked->remove(other); } - void clear() const - requires std::derived_from + void clear() const requires std::derived_from { - if (const auto locked = get_original()) + if (const auto locked = lock()) locked->clear(); } - std::shared_ptr get_original() const noexcept + std::shared_ptr lock() const noexcept { - if (const auto ptr_ptr = std::get_if>(&m_disposable)) - return *ptr_ptr; - - if (const auto ptr_ptr = std::get_if>(&m_disposable)) - return ptr_ptr->lock(); - - return nullptr; + return std::static_pointer_cast(get().first); } - operator disposable_wrapper_impl() const + disposable_wrapper_impl as_weak() const { - if (const auto ptr_ptr = std::get_if>(&m_disposable)) - return disposable_wrapper::from_shared(*ptr_ptr); - - if (const auto ptr_ptr = std::get_if>(&m_disposable)) - return disposable_wrapper::from_weak(*ptr_ptr); - - return rpp::disposable_wrapper{}; + auto [locked, is_shared] = get(); + if (is_shared) + return disposable_wrapper_impl{std::weak_ptr{locked}}; + return *this; } - bool has_underlying() const + template + requires rpp::constraint::static_pointer_convertible_to + operator disposable_wrapper_impl() const { - if (const auto ptr_ptr = std::get_if>(&m_disposable)) - return ptr_ptr->use_count() != 0; - - if (const auto ptr_ptr = std::get_if>(&m_disposable)) - return ptr_ptr->use_count() != 0; - - return false; + auto [locked, is_shared] = get(); + if (!locked) + return rpp::disposable_wrapper_impl::empty(); + + auto res = disposable_wrapper_impl{std::move(locked)}; + if (is_shared) + return res; + + return res.as_weak(); } private: - const TDisposable* raw_pointer() const { - if (const auto ptr_ptr = std::get_if>(&m_disposable)) - return ptr_ptr->get(); + using details::disposable_wrapper_base::disposable_wrapper_base; +}; +} + +namespace rpp::details +{ +template +class enable_wrapper_from_this +{ +public: + template + friend class rpp::disposable_wrapper_impl; - if (const auto ptr_ptr = std::get_if>(&m_disposable)) - if (const auto shared = ptr_ptr->lock()) - return shared.get(); +protected: + enable_wrapper_from_this() = default; - return nullptr; + void set_weak_self(std::weak_ptr weak) + { + m_weak = std::move(weak); + } + +public: + disposable_wrapper_impl wrapper_from_this() const + { + return disposable_wrapper_impl(std::static_pointer_cast(m_weak.lock())); } private: - std::variant, std::weak_ptr> m_disposable; + std::weak_ptr m_weak{}; }; -} +} \ No newline at end of file diff --git a/src/rpp/rpp/disposables/fwd.hpp b/src/rpp/rpp/disposables/fwd.hpp index 9fffc4b10..5963bbd65 100644 --- a/src/rpp/rpp/disposables/fwd.hpp +++ b/src/rpp/rpp/disposables/fwd.hpp @@ -56,5 +56,5 @@ class callback_disposable; class refcount_disposable; template -auto make_callback_disposable(Fn&& invocable); +disposable_wrapper make_callback_disposable(Fn&& invocable); } // namespace rpp \ No newline at end of file diff --git a/src/rpp/rpp/disposables/interface_composite_disposable.hpp b/src/rpp/rpp/disposables/interface_composite_disposable.hpp index e670c5003..dffd39f85 100644 --- a/src/rpp/rpp/disposables/interface_composite_disposable.hpp +++ b/src/rpp/rpp/disposables/interface_composite_disposable.hpp @@ -11,7 +11,6 @@ #pragma once #include -#include #include namespace rpp diff --git a/src/rpp/rpp/disposables/interface_disposable.hpp b/src/rpp/rpp/disposables/interface_disposable.hpp index dcc6825cb..a18cc3d01 100644 --- a/src/rpp/rpp/disposables/interface_disposable.hpp +++ b/src/rpp/rpp/disposables/interface_disposable.hpp @@ -12,12 +12,8 @@ #include -#include - namespace rpp { -using disposable_ptr = std::shared_ptr; - /** * @brief Interface of disposable * diff --git a/src/rpp/rpp/disposables/refcount_disposable.hpp b/src/rpp/rpp/disposables/refcount_disposable.hpp index 041378f8b..3d714cece 100644 --- a/src/rpp/rpp/disposables/refcount_disposable.hpp +++ b/src/rpp/rpp/disposables/refcount_disposable.hpp @@ -17,31 +17,19 @@ #include #include -#include #include +namespace rpp::details +{ +class refocunt_disposable_inner; +} + namespace rpp { -class refcount_disposable : public std::enable_shared_from_this +class refcount_disposable : public rpp::details::enable_wrapper_from_this , public rpp::composite_disposable { - class refocunt_disposable_inner final : public rpp::composite_disposable, public std::enable_shared_from_this - { - public: - refocunt_disposable_inner(std::shared_ptr state) - : m_state{std::move(state)} {} - - void dispose_impl() noexcept override - { - m_state->remove(rpp::disposable_wrapper::from_shared(shared_from_this())); - m_state->release(); - m_state.reset(); - } - - private: - std::shared_ptr m_state; - }; - + void release() { auto current_value = m_refcount.load(std::memory_order::seq_cst); @@ -63,28 +51,56 @@ class refcount_disposable : public std::enable_shared_from_this(shared_from_this()); - add(rpp::disposable_wrapper::from_weak(inner)); - return composite_disposable_wrapper{inner}; - } - } - } + composite_disposable_wrapper add_ref(); private: std::atomic m_refcount{0}; constexpr static size_t s_disposed = std::numeric_limits::max(); }; } // namespace rpp + +namespace rpp::details +{ +class refocunt_disposable_inner final : public rpp::composite_disposable, public rpp::details::enable_wrapper_from_this +{ +public: + refocunt_disposable_inner(disposable_wrapper_impl state) + : m_state{std::move(state)} {} + + void dispose_impl() noexcept override + { + m_state.remove(this->wrapper_from_this()); + if (const auto locked = m_state.lock()) + locked->release(); + m_state = disposable_wrapper_impl::empty(); + } + +private: + disposable_wrapper_impl m_state; +}; + +} + +namespace rpp +{ +inline composite_disposable_wrapper refcount_disposable::add_ref() +{ + auto current_value = m_refcount.load(std::memory_order::seq_cst); + while (true) + { + if (current_value == s_disposed) + return composite_disposable_wrapper::empty(); + + // just need atomicity, not guarding anything + if (m_refcount.compare_exchange_strong(current_value, current_value + 1, std::memory_order::seq_cst)) + { + auto inner = composite_disposable_wrapper::make(wrapper_from_this()); + add(inner.as_weak()); + return inner; + } + } +} +} // namespace rpp \ No newline at end of file diff --git a/src/rpp/rpp/observables/blocking_observable.hpp b/src/rpp/rpp/observables/blocking_observable.hpp index 619dc017c..f3e2163ca 100644 --- a/src/rpp/rpp/observables/blocking_observable.hpp +++ b/src/rpp/rpp/observables/blocking_observable.hpp @@ -63,12 +63,13 @@ class blocking_strategy template ObserverStrategy> void subscribe(observer&& obs) const { - auto d = std::make_shared(); + auto d = disposable_wrapper_impl::make(); obs.set_upstream(d); m_original.subscribe(std::move(obs)); - if (!d->is_disposed()) - d->wait(); + if (!d.is_disposed()) + if (const auto locked = d.lock()) + locked->wait(); } private: diff --git a/src/rpp/rpp/observables/connectable_observable.hpp b/src/rpp/rpp/observables/connectable_observable.hpp index e770535c0..7198fcd3f 100644 --- a/src/rpp/rpp/observables/connectable_observable.hpp +++ b/src/rpp/rpp/observables/connectable_observable.hpp @@ -28,8 +28,8 @@ struct ref_count_on_subscribe_t disposable{}; + std::mutex mutex{}; + disposable_wrapper_impl disposable = disposable_wrapper_impl::empty(); }; std::shared_ptr m_state = std::make_shared(); @@ -44,19 +44,19 @@ struct ref_count_on_subscribe_t> on_subscribe() const + std::pair on_subscribe() const { std::unique_lock lock(m_state->mutex); - if (m_state->disposable && !m_state->disposable->is_disposed()) - return {m_state->disposable->add_ref(), std::nullopt}; + if (!m_state->disposable.is_disposed()) + return {m_state->disposable.lock()->add_ref(), composite_disposable_wrapper::empty()}; - m_state->disposable = std::make_shared(); - return {m_state->disposable->add_ref(), m_state->disposable}; + m_state->disposable = disposable_wrapper_impl::make(); + return {m_state->disposable.lock()->add_ref(), m_state->disposable}; } }; } @@ -95,19 +95,16 @@ class connectable_observable final : public decltype(std::declval().get * @snippet connect.cpp connect * */ - rpp::disposable_wrapper connect(rpp::composite_disposable_wrapper wrapper = {}) const + rpp::disposable_wrapper connect(rpp::composite_disposable_wrapper wrapper = composite_disposable_wrapper::make()) const { std::unique_lock lock(m_state->mutex); if (m_subject.get_disposable().is_disposed()) - return {}; + return rpp::disposable_wrapper::empty(); if (!m_state->disposable.is_disposed()) return m_state->disposable; - if (!wrapper.has_underlying()) - wrapper = rpp::composite_disposable_wrapper{std::make_shared()}; - m_state->disposable = wrapper; lock.unlock(); @@ -162,7 +159,7 @@ class connectable_observable final : public decltype(std::declval().get struct state_t { std::mutex mutex{}; - rpp::composite_disposable_wrapper disposable{}; + rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::empty(); }; std::shared_ptr m_state = std::make_shared(); diff --git a/src/rpp/rpp/observables/observable.hpp b/src/rpp/rpp/observables/observable.hpp index 5f702221f..30828eae8 100644 --- a/src/rpp/rpp/observables/observable.hpp +++ b/src/rpp/rpp/observables/observable.hpp @@ -93,14 +93,14 @@ class observable * * @par Example * \code{.cpp} - * auto disposable = std::make_shared(); + * auto disposable = rpp::composite_disposable_wrapper::make(); * rpp::source::just(1) * | rpp::operators::repeat() * | rpp::operators::subscribe_on(rpp::schedulers::new_thread{}) * | rpp::operators::subscribe(disposable, rpp::make_lambda_observer([](int) { std::cout << "NEW VALUE" << std::endl; })); * * std::this_thread::sleep_for(std::chrono::seconds(1)); - * disposable->dispose(); + * disposable.dispose(); * std::this_thread::sleep_for(std::chrono::seconds(1)); * \endcode * @@ -142,8 +142,8 @@ class observable [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(observer&& observer) const { if (!observer.is_disposed()) - return subscribe(rpp::composite_disposable_wrapper{std::make_shared>()}, std::move(observer)); - return {}; + return subscribe(rpp::composite_disposable_wrapper::make>(), std::move(observer)); + return composite_disposable_wrapper::empty(); } /** @@ -157,7 +157,7 @@ class observable requires (!constraint::observer) [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(ObserverStrategy&& observer_strategy) const { - return subscribe(rpp::composite_disposable_wrapper{std::make_shared>()}, std::forward(observer_strategy)); + return subscribe(rpp::composite_disposable_wrapper::make>(), std::forward(observer_strategy)); } /** @@ -171,7 +171,7 @@ class observable */ [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(dynamic_observer observer) const { - return subscribe>(rpp::composite_disposable_wrapper{std::make_shared>()}, std::move(observer)); + return subscribe>(rpp::composite_disposable_wrapper::make>(), std::move(observer)); } /** @@ -214,7 +214,7 @@ class observable std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> [[nodiscard("Use returned disposable or use subscribe(on_next, on_error, on_completed) instead")]] composite_disposable_wrapper subscribe_with_disposable(OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {}) const { - auto res = rpp::composite_disposable_wrapper{std::make_shared>()}; + auto res = rpp::composite_disposable_wrapper::make>(); subscribe(make_lambda_observer(res, std::forward(on_next), std::forward(on_error), @@ -246,14 +246,14 @@ class observable * * @par Example * \code{.cpp} - * auto disposable = std::make_shared(); + * auto disposable = rpp::composite_disposable_wrapper::make(); * rpp::source::just(1) * | rpp::operators::repeat() * | rpp::operators::subscribe_on(rpp::schedulers::new_thread{}) * | rpp::operators::subscribe(disposable, [](int) { std::cout << "NEW VALUE" << std::endl; }); * * std::this_thread::sleep_for(std::chrono::seconds(1)); - * disposable->dispose(); + * disposable.dispose(); * std::this_thread::sleep_for(std::chrono::seconds(1)); * \endcode * @@ -281,14 +281,14 @@ class observable * * @par Example * \code{.cpp} - * auto disposable = std::make_shared(); + * auto disposable = rpp::composite_disposable_wrapper::make(); * rpp::source::just(1) * | rpp::operators::repeat() * | rpp::operators::subscribe_on(rpp::schedulers::new_thread{}) * | rpp::operators::subscribe(disposable, [](int) { std::cout << "NEW VALUE" << std::endl; }); * * std::this_thread::sleep_for(std::chrono::seconds(1)); - * disposable->dispose(); + * disposable.dispose(); * std::this_thread::sleep_for(std::chrono::seconds(1)); * \endcode * diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 64d8e0c04..17d62352d 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -124,15 +124,16 @@ struct combine_latest_t using ExpectedValue = typename observable_chain_strategy::value_type; using Disposable = combine_latest_disposable...>; - auto disposable = std::make_shared(std::forward(observer), selector); - disposable->get_observer_under_lock()->set_upstream(rpp::disposable_wrapper::from_weak(disposable)); - subscribe>(disposable, std::index_sequence_for{}, observables...); + const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); + auto locked = disposable.lock(); + locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); + subscribe>(locked, std::index_sequence_for{}, observables...); - observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(disposable)}); + observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(locked)}); } template - static void subscribe(std::shared_ptr...>> disposable, std::index_sequence, const TObservables&... observables) + static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) { (..., observables.subscribe(rpp::observer, combine_latest_observer_strategy...>>{disposable})); } diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 212424506..d8de38db9 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -80,7 +80,7 @@ class concat_state_t final : public rpp::refcount_disposable { stage().store(ConcatStage::Draining, std::memory_order::relaxed); refcounted.clear(); - observable.subscribe(concat_inner_observer_strategy{std::static_pointer_cast(shared_from_this()), std::move(refcounted)}); + observable.subscribe(concat_inner_observer_strategy{disposable_wrapper_impl{wrapper_from_this()}.lock(), std::move(refcounted)}); ConcatStage current = ConcatStage::Draining; return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst); @@ -163,9 +163,8 @@ struct concat_observer_strategy : public concat_observer_strategy_base>(std::move(observer))} + : base{init_state(std::move(observer))} { - base::state->get_observer()->set_upstream(rpp::disposable_wrapper::from_weak(base::state)); } template @@ -184,6 +183,16 @@ struct concat_observer_strategy : public concat_observer_strategy_baseis_disposed()) base::state->get_observer()->on_completed(); } + + +private: + static std::shared_ptr> init_state(TObserver&& observer) + { + const auto d = disposable_wrapper_impl>::make(std::move(observer)); + auto ptr = d.lock(); + ptr->get_observer()->set_upstream(d.as_weak()); + return ptr; + } }; struct concat_t: public operators::details::template_operator_observable_strategy diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 1fa112aa7..9541efbc7 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -32,8 +32,7 @@ struct debounce_disposable_wrapper }; template -class debounce_disposable final : public rpp::composite_disposable_impl - , public std::enable_shared_from_this> +class debounce_disposable final : public rpp::composite_disposable_impl, public rpp::details::enable_wrapper_from_this> { using T = rpp::utils::extract_observer_type_t; @@ -86,7 +85,7 @@ class debounce_disposable final : public rpp::composite_disposable_impl{this->shared_from_this()}); + debounce_disposable_wrapper{this->wrapper_from_this().lock()}); } std::variant extract_value_or_time() @@ -171,9 +170,10 @@ struct debounce_t using worker_t = rpp::schedulers::utils::get_worker_t; using container = typename DisposableStrategy::template add::disposable_container; - auto disposable = std::make_shared, worker_t, container>>(std::forward(observer), scheduler.create_worker(), duration); - disposable->get_observer_under_lock()->set_upstream(rpp::disposable_wrapper::from_weak(disposable)); - return rpp::observer, worker_t, container>>{std::move(disposable)}; + const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); + auto ptr = disposable.lock(); + ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); + return rpp::observer, worker_t, container>>{std::move(ptr)}; } }; } diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 5df788c93..5959f23a8 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -184,9 +184,10 @@ struct delay_t using worker_t = rpp::schedulers::utils::get_worker_t; using container = typename DisposableStrategy::template add::disposable_container; - auto disposable = std::make_shared, worker_t, container>>(std::forward(observer), scheduler.create_worker(), duration); - disposable->observer.set_upstream(rpp::disposable_wrapper::from_weak(disposable)); - return rpp::observer, worker_t, container, ClearOnError>>{std::move(disposable)}; + const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); + auto ptr = disposable.lock(); + ptr->observer.set_upstream(disposable.as_weak()); + return rpp::observer, worker_t, container, ClearOnError>>{std::move(ptr)}; } }; } diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index de40f9eaa..1a5cf81c7 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -43,31 +43,32 @@ class forwarding_strategy public: using expected_disposable_strategy = typename rpp::details::observables::deduce_disposable_strategy_t>::template add<1>; - explicit forwarding_strategy(std::shared_ptr refcount) + explicit forwarding_strategy(disposable_wrapper_impl refcount) : m_refcount{std::move(refcount)} { } auto get_observer() const { - return rpp::observer{observer_strategy{m_state}}; + return rpp::observer{m_state.lock()}; } template TObs> void on_subscribe(TObs&& observer) const { - observer.set_upstream(m_refcount->add_ref()); - m_state->on_subscribe(std::forward(observer)); + if (const auto locked = m_refcount.lock()) + observer.set_upstream(locked->add_ref()); + m_state.lock()->on_subscribe(std::forward(observer)); } rpp::composite_disposable_wrapper get_disposable() const { - return rpp::composite_disposable_wrapper::from_weak(m_state); + return m_state.as_weak(); } private: - std::shared_ptr> m_state = std::make_shared>(); - std::shared_ptr m_refcount{}; + disposable_wrapper_impl m_refcount; + disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make(); }; template diff --git a/src/rpp/rpp/operators/group_by.hpp b/src/rpp/rpp/operators/group_by.hpp index fe2bd96ca..75de84d6f 100644 --- a/src/rpp/rpp/operators/group_by.hpp +++ b/src/rpp/rpp/operators/group_by.hpp @@ -75,12 +75,12 @@ struct group_by_observer_strategy using subject_observer = decltype(std::declval>().get_observer()); mutable std::map key_to_observer{}; - std::shared_ptr disposable = std::make_shared(); - - RPP_CALL_DURING_CONSTRUCTION( + std::shared_ptr disposable = [&] { - observer.set_upstream(disposable->add_ref()); - }); + auto ptr = disposable_wrapper_impl::make().lock(); + observer.set_upstream(ptr->add_ref()); + return ptr; + }(); void set_upstream(const rpp::disposable_wrapper& d) const { @@ -130,7 +130,7 @@ struct group_by_observer_strategy const subjects::publish_subject subj{}; - disposable->add(rpp::disposable_wrapper::from_weak(subj.get_disposable().get_original())); + disposable->add(subj.get_disposable().as_weak()); obs.on_next(rpp::grouped_observable_group_by{ key, group_by_observable_strategy{subj, disposable} diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 61d7f59a3..0571ebe97 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -109,9 +109,8 @@ class merge_observer_strategy final : public merge_observer_base_strategy{std::make_shared>(std::move(observer))} + : merge_observer_base_strategy{init_state(std::move(observer))} { - merge_observer_base_strategy::m_disposable->get_observer_under_lock()->set_upstream(disposable_wrapper::from_weak(merge_observer_base_strategy::m_disposable)); } template @@ -120,6 +119,15 @@ class merge_observer_strategy final : public merge_observer_base_strategy::m_disposable->increment_on_completed(); std::forward(v).subscribe(rpp::observer, merge_observer_inner_strategy>{merge_observer_inner_strategy{merge_observer_base_strategy::m_disposable}}); } + +private: + static std::shared_ptr> init_state(TObserver&& observer) + { + const auto d = disposable_wrapper_impl>::make(std::move(observer)); + auto ptr = d.lock(); + ptr->get_observer_under_lock()->set_upstream(d.as_weak()); + return ptr; + } }; struct merge_t diff --git a/src/rpp/rpp/operators/subscribe.hpp b/src/rpp/rpp/operators/subscribe.hpp index a3709e289..58eca9c2e 100644 --- a/src/rpp/rpp/operators/subscribe.hpp +++ b/src/rpp/rpp/operators/subscribe.hpp @@ -365,7 +365,7 @@ auto subscribe(rpp::composite_disposable_wrapper d, OnNext&& on_next, OnComplete template ObserverStrategy> auto subscribe_with_disposable(observer&& observer) { - return subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, std::move(observer)); + return subscribe(composite_disposable_wrapper::make(), std::move(observer)); } /** @@ -378,7 +378,7 @@ auto subscribe_with_disposable(observer&& observer) template auto subscribe_with_disposable(dynamic_observer observer) { - return subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, std::move(observer)); + return subscribe(composite_disposable_wrapper::make(), std::move(observer)); } /** @@ -391,7 +391,7 @@ auto subscribe_with_disposable(dynamic_observer observer) template OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> auto subscribe_with_disposable(OnNext&& on_next = {}, OnError&& on_error = {}, OnCompleted&& on_completed = {}) { - return subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, std::forward(on_next), std::forward(on_error), std::forward(on_completed)); + return subscribe(composite_disposable_wrapper::make(), std::forward(on_next), std::forward(on_error), std::forward(on_completed)); } /** @@ -404,6 +404,6 @@ auto subscribe_with_disposable(OnNext&& on_next = {}, OnError&& on_error = {}, O template OnCompleted> auto subscribe_with_disposable(OnNext&& on_next, OnCompleted&& on_completed) { - return subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, std::forward(on_next), rpp::utils::rethrow_error_t{}, std::forward(on_completed)); + return subscribe(composite_disposable_wrapper::make(), std::forward(on_next), rpp::utils::rethrow_error_t{}, std::forward(on_completed)); } } \ No newline at end of file diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index d5d46df83..c2ee5cb35 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -79,7 +79,7 @@ class switch_on_next_inner_observer_strategy private: std::shared_ptr> m_state; - rpp::composite_disposable_wrapper m_refcounted{}; + rpp::composite_disposable_wrapper m_refcounted; }; template @@ -88,18 +88,9 @@ class switch_on_next_observer_strategy public: using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - switch_on_next_observer_strategy(const TObserver& obs) - : m_state{std::make_shared>(obs)} - { - m_state->get_observer()->set_upstream(rpp::disposable_wrapper::from_weak(m_state)); - m_this_refcount = m_state->add_ref(); - } - switch_on_next_observer_strategy(TObserver&& obs) - : m_state{std::make_shared>(std::move(obs))} + : m_state{init_state(std::move(obs))} { - m_state->get_observer()->set_upstream(rpp::disposable_wrapper::from_weak(m_state)); - m_this_refcount = m_state->add_ref(); } switch_on_next_observer_strategy(const switch_on_next_observer_strategy&) = delete; @@ -127,13 +118,20 @@ class switch_on_next_observer_strategy } void set_upstream(const disposable_wrapper& d) const { m_this_refcount.add(d); } - bool is_disposed() const { return m_this_refcount.is_disposed(); } - +private: + static std::shared_ptr> init_state(TObserver&& observer) + { + const auto d = disposable_wrapper_impl>::make(std::move(observer)); + auto ptr = d.lock(); + ptr->get_observer()->set_upstream(d.as_weak()); + return ptr; + } + private: std::shared_ptr> m_state; - rpp::composite_disposable_wrapper m_this_refcount{}; - mutable rpp::composite_disposable_wrapper m_last_refcount{}; + rpp::composite_disposable_wrapper m_this_refcount = m_state->add_ref(); + mutable rpp::composite_disposable_wrapper m_last_refcount = composite_disposable_wrapper::empty(); }; struct switch_on_next_t : public operators::details::operator_observable_strategy diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index f7b7a90b3..05a92feaf 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -20,7 +20,7 @@ namespace rpp::operators::details { template -class take_until_disposable : public rpp::composite_disposable +class take_until_disposable final : public rpp::composite_disposable { public: take_until_disposable(TObserver&& observer) @@ -44,7 +44,7 @@ struct take_until_observer_strategy_base { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - RPP_NO_UNIQUE_ADDRESS std::shared_ptr> state; + std::shared_ptr> state; void on_error(const std::exception_ptr& err) const { @@ -98,15 +98,16 @@ struct take_until_t template void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const { - auto d = std::make_shared>>(std::forward(observer)); - d->get_observer()->set_upstream(rpp::disposable_wrapper::from_weak(d)); + const auto d = disposable_wrapper_impl>>::make(std::forward(observer)); + auto ptr = d.lock(); + ptr->get_observer()->set_upstream(d.as_weak()); // Need to take ownership over current_thread in case of inner-observables also using it auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - observable.subscribe(take_until_throttle_observer_strategy>{d}); + observable.subscribe(take_until_throttle_observer_strategy>{ptr}); using expected_value = typename observable_chain_strategy::value_type; - observable_strategy.subscribe(rpp::observer>>(std::move(d))); + observable_strategy.subscribe(rpp::observer>>(std::move(ptr))); } }; } diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index db1abbb92..30d6d0ef2 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -42,7 +42,7 @@ class window_observer_strategy : m_observer{std::move(observer)} , m_window_size{std::max(size_t{1}, count)} { - m_observer.set_upstream(m_disposble->add_ref()); + m_observer.set_upstream(m_disposable->add_ref()); } template @@ -51,9 +51,9 @@ class window_observer_strategy // need to send new subject due to NEW item appeared (we avoid sending new subjects if no any new items) if (m_items_in_current_window == m_window_size) { - Subject subject{m_disposble}; + Subject subject{m_disposable->wrapper_from_this()}; m_subject_data.emplace(subject_data{subject.get_observer(), subject.get_disposable()}); - m_disposble->add(m_subject_data->disposable); + m_disposable->add(m_subject_data->disposable); m_observer.on_next(subject.get_observable()); m_items_in_current_window = 0; } @@ -64,7 +64,7 @@ class window_observer_strategy if (++m_items_in_current_window == m_window_size) { m_subject_data->observer.on_completed(); - m_disposble->remove(m_subject_data->disposable); + m_disposable->remove(m_subject_data->disposable); m_subject_data.reset(); } } @@ -83,12 +83,12 @@ class window_observer_strategy m_observer.on_completed(); } - void set_upstream(const disposable_wrapper& d) const { m_disposble->add(d); } + void set_upstream(const disposable_wrapper& d) const { m_disposable->add(d); } - bool is_disposed() const { return m_disposble->is_disposed(); } + bool is_disposed() const { return m_disposable->is_disposed(); } private: - std::shared_ptr m_disposble = std::make_shared(); + std::shared_ptr m_disposable = disposable_wrapper_impl::make().lock(); RPP_NO_UNIQUE_ADDRESS TObserver m_observer; struct subject_data diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 440ed008b..78ca57b10 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -59,7 +59,7 @@ struct window_toggle_state auto on_new_subject(const Subject& subject) { const auto locked_state = get_state_under_lock(); - const auto ptr = &locked_state->observers.emplace_back(subject.get_observer()); + auto ptr = &locked_state->observers.emplace_back(subject.get_observer()); locked_state->observer.on_next(subject.get_observable()); return ptr; } @@ -111,10 +111,10 @@ struct window_toggle_opening_observer_strategy template void on_next(T&& v) const { - typename TState::Subject subject{disposable}; - const auto ptr = state->on_new_subject(subject); + typename TState::Subject subject{disposable->wrapper_from_this()}; + auto ptr = state->on_new_subject(subject); disposable->add(subject.get_disposable()); - state->get_closing(std::forward(v)).subscribe(subject.get_disposable(), window_toggle_closing_observer_strategy{disposable, state, subject.get_disposable(), ptr}); + state->get_closing(std::forward(v)).subscribe(subject.get_disposable(), window_toggle_closing_observer_strategy{disposable, state, subject.get_disposable(), std::move(ptr)}); } void on_error(const std::exception_ptr& err) const @@ -173,7 +173,7 @@ class window_toggle_observer_strategy bool is_disposed() const { return m_disposable->is_disposed(); } private: - std::shared_ptr m_disposable = std::make_shared(); + std::shared_ptr m_disposable = disposable_wrapper_impl::make().lock(); std::shared_ptr m_state; }; diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index 1714cb076..94900b5fa 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -151,17 +151,18 @@ struct with_latest_from_t { using Disposable = with_latest_from_disposable...>; - auto disposable = std::make_shared(std::forward(observer), selector); - disposable->get_observer_under_lock()->set_upstream(rpp::disposable_wrapper::from_weak(disposable)); - subscribe(disposable, std::index_sequence_for{}, observables...); + const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); + auto ptr = disposable.lock(); + ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); + subscribe(ptr, std::index_sequence_for{}, observables...); using ExpectedValue = typename observable_chain_strategy::value_type; - observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(disposable)}); + observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(ptr)}); } template - static void subscribe(std::shared_ptr...>> disposable, std::index_sequence, const TObservables&... observables) + static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) { (..., observables.subscribe(rpp::observer, with_latest_from_inner_observer_strategy...>>{disposable})); } diff --git a/src/rpp/rpp/schedulers/details/worker.hpp b/src/rpp/rpp/schedulers/details/worker.hpp index 1330991e3..2d9db0f7b 100644 --- a/src/rpp/rpp/schedulers/details/worker.hpp +++ b/src/rpp/rpp/schedulers/details/worker.hpp @@ -47,7 +47,7 @@ class worker rpp::disposable_wrapper get_disposable() const { if constexpr (is_none_disposable) - return {}; + return disposable_wrapper::empty(); else return m_strategy.get_disposable(); } diff --git a/src/rpp/rpp/schedulers/new_thread.hpp b/src/rpp/rpp/schedulers/new_thread.hpp index d80a9af8f..81c48280f 100644 --- a/src/rpp/rpp/schedulers/new_thread.hpp +++ b/src/rpp/rpp/schedulers/new_thread.hpp @@ -144,15 +144,15 @@ class new_thread template Fn> void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args) const { - m_state->defer_at(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); + m_state.lock()->defer_at(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); } - rpp::disposable_wrapper get_disposable() const { return rpp::disposable_wrapper{m_state}; } + rpp::disposable_wrapper get_disposable() const { return m_state; } static rpp::schedulers::time_point now() { return details::now(); } private: - std::shared_ptr m_state = std::make_shared(); + disposable_wrapper_impl m_state = disposable_wrapper_impl::make(); }; static rpp::schedulers::worker create_worker() diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index fa9e2544d..9c1e25514 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -23,7 +23,6 @@ namespace rpp::details { template struct concat_state_t : public rpp::composite_disposable - , public std::enable_shared_from_this> { concat_state_t(TObserver&& in_observer, const PackedContainer& in_container) : observer(std::move(in_observer)) @@ -129,8 +128,9 @@ struct concat_strategy template Strategy> void subscribe(observer&& obs) const { - auto state = std::make_shared, PackedContainer>>(std::move(obs), container); - state->observer.set_upstream(rpp::disposable_wrapper::from_weak(state)); + const auto d = disposable_wrapper_impl, PackedContainer>>::make(std::move(obs), container); + const auto state = d.lock(); + state->observer.set_upstream(d.as_weak()); drain(state); } }; diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 15258f9ab..066c91913 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -46,22 +46,22 @@ class publish_strategy auto get_observer() const { - return rpp::observer{observer_strategy{m_state}}; + return rpp::observer{m_state.lock()}; } template TObs> void on_subscribe(TObs&& observer) const { - m_state->on_subscribe(std::forward(observer)); + m_state.lock()->on_subscribe(std::forward(observer)); } rpp::disposable_wrapper get_disposable() const { - return rpp::disposable_wrapper{m_state}; + return m_state; } private: - std::shared_ptr> m_state = std::make_shared>(); + disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make(); }; } // namespace rpp::subjects::details diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp index b5cfe0add..5b83d7662 100644 --- a/src/rpp/rpp/subjects/replay_subject.hpp +++ b/src/rpp/rpp/subjects/replay_subject.hpp @@ -142,17 +142,17 @@ class replay_strategy public: replay_strategy() - : m_state(std::make_shared(std::nullopt, std::nullopt)) + : m_state(disposable_wrapper_impl::make(std::nullopt, std::nullopt)) { } replay_strategy(size_t count) - : m_state{std::make_shared(std::max(1, count), std::nullopt)} + : m_state{disposable_wrapper_impl::make(std::max(1, count), std::nullopt)} { } replay_strategy(size_t count, rpp::schedulers::duration duration) - : m_state{std::make_shared(std::max(1, count), duration)} + : m_state{disposable_wrapper_impl::make(std::max(1, count), duration)} { } @@ -160,30 +160,33 @@ class replay_strategy auto get_observer() const { - return rpp::observer{observer_strategy{m_state}}; + return rpp::observer{m_state.lock()}; } template TObs> void on_subscribe(TObs&& observer) const { + if (const auto locked = m_state.lock()) { - std::unique_lock lock{m_state->list_mutex}; - m_state->collect_duration(); - for (const auto& value : m_state->values) { - observer.on_next(value.first); + std::unique_lock lock{locked->list_mutex}; + locked->collect_duration(); + for (const auto& value : locked->values) + { + observer.on_next(value.first); + } } + locked->on_subscribe(std::forward(observer)); } - m_state->on_subscribe(std::forward(observer)); } rpp::disposable_wrapper get_disposable() const { - return rpp::disposable_wrapper{m_state}; + return m_state; } private: - std::shared_ptr m_state = std::make_shared(); + disposable_wrapper_impl m_state; }; } diff --git a/src/rpp/rpp/subjects/serialized_subject.hpp b/src/rpp/rpp/subjects/serialized_subject.hpp index 07cad6eaa..37cbefb88 100644 --- a/src/rpp/rpp/subjects/serialized_subject.hpp +++ b/src/rpp/rpp/subjects/serialized_subject.hpp @@ -66,22 +66,22 @@ class serialized_strategy auto get_observer() const { - return rpp::observer{observer_strategy{m_state}}; + return rpp::observer{m_state.lock()}; } template TObs> void on_subscribe(TObs&& observer) const { - m_state->on_subscribe(std::forward(observer)); + m_state.lock()->on_subscribe(std::forward(observer)); } rpp::composite_disposable_wrapper get_disposable() const { - return rpp::composite_disposable_wrapper{m_state}; + return m_state; } private: - std::shared_ptr m_state = std::make_shared(); + disposable_wrapper_impl m_state = disposable_wrapper_impl::make(); }; } // namespace rpp::subjects::details diff --git a/src/rpp/rpp/utils/constraints.hpp b/src/rpp/rpp/utils/constraints.hpp index 22c96c8ab..9846adb9b 100644 --- a/src/rpp/rpp/utils/constraints.hpp +++ b/src/rpp/rpp/utils/constraints.hpp @@ -30,6 +30,9 @@ concept decayed_any_of = (decayed_same_as || ...); template concept variadic_decayed_same_as = sizeof...(Types) == 1 && (decayed_same_as && ...); +template +concept static_pointer_convertible_to = requires { static_cast(std::declval()); }; + template concept iterable = requires(R& rng) { diff --git a/src/tests/rpp/test_combine_latest.cpp b/src/tests/rpp/test_combine_latest.cpp index a4a4f75ee..59fb11e6f 100644 --- a/src/tests/rpp/test_combine_latest.cpp +++ b/src/tests/rpp/test_combine_latest.cpp @@ -160,12 +160,12 @@ TEST_CASE("combine_latest handles race condition") TEST_CASE("combine_latest satisfies disposable contracts") { - auto observable_disposable = std::make_shared(); + auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); test_operator_with_disposable(rpp::ops::combine_latest(observable)); } - CHECK(observable_disposable->is_disposed() || observable_disposable.use_count() == 1); + CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); } \ No newline at end of file diff --git a/src/tests/rpp/test_concat.cpp b/src/tests/rpp/test_concat.cpp index 8a289cfa9..6c18e8f69 100644 --- a/src/tests/rpp/test_concat.cpp +++ b/src/tests/rpp/test_concat.cpp @@ -148,10 +148,10 @@ TEMPLATE_TEST_CASE("concat as source", "", rpp::memory_model::use_stack, rpp::me } SECTION("concat stoped if disposed") { - auto d = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); auto observable = rpp::source::concat(rpp::source::just(1), - rpp::source::create([&](auto&& obs) { d->dispose(); obs.on_completed(); }), + rpp::source::create([&](auto&& obs) { d.dispose(); obs.on_completed(); }), rpp::source::create([&](auto&&) { FAIL("Shouldn't be called"); }), rpp::source::just(3)); observable.subscribe(rpp::composite_disposable_wrapper{d}, mock); @@ -163,20 +163,20 @@ TEMPLATE_TEST_CASE("concat as source", "", rpp::memory_model::use_stack, rpp::me SECTION("concat tracks actual upstream") { - auto d = std::make_shared(); - auto d1 = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); + auto d1 = rpp::composite_disposable_wrapper::make(); auto observable = rpp::source::concat(rpp::source::create([&](auto&& obs) { obs.set_upstream(rpp::disposable_wrapper{d1}); - CHECK(!d->is_disposed()); - CHECK(!d1->is_disposed()); + CHECK(!d.is_disposed()); + CHECK(!d1.is_disposed()); - d->dispose(); + d.dispose(); - CHECK(d->is_disposed()); - CHECK(d1->is_disposed()); + CHECK(d.is_disposed()); + CHECK(d1.is_disposed()); })); observable.subscribe(rpp::composite_disposable_wrapper{d}, mock); @@ -184,9 +184,9 @@ TEMPLATE_TEST_CASE("concat as source", "", rpp::memory_model::use_stack, rpp::me SECTION("concat tracks actual upstream for 2 upstreams") { - auto d = std::make_shared(); - auto d1 = std::make_shared(); - auto d2 = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); + auto d1 = rpp::composite_disposable_wrapper::make(); + auto d2 = rpp::composite_disposable_wrapper::make(); auto observable = rpp::source::concat(rpp::source::create([&](auto&& obs) { obs.set_upstream(rpp::disposable_wrapper{d1}); obs.on_completed(); }), @@ -194,14 +194,14 @@ TEMPLATE_TEST_CASE("concat as source", "", rpp::memory_model::use_stack, rpp::me { obs.set_upstream(rpp::disposable_wrapper{d2}); - CHECK(!d->is_disposed()); - CHECK(d1->is_disposed()); - CHECK(!d2->is_disposed()); + CHECK(!d.is_disposed()); + CHECK(d1.is_disposed()); + CHECK(!d2.is_disposed()); - d->dispose(); + d.dispose(); - CHECK(d->is_disposed()); - CHECK(d2->is_disposed()); + CHECK(d.is_disposed()); + CHECK(d2.is_disposed()); })); observable.subscribe(rpp::composite_disposable_wrapper{d}, mock); @@ -299,10 +299,10 @@ TEMPLATE_TEST_CASE("concat as operator", "", rpp::schedulers::current_thread, rp } SECTION("concat stoped if disposed") { - auto d = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); auto observable = rpp::source::just(TestType{}, rpp::source::just(TestType{}, 1).as_dynamic(), - rpp::source::create([&](auto&& obs) { obs.on_next(2); d->dispose(); obs.on_completed(); }).as_dynamic(), + rpp::source::create([&](auto&& obs) { obs.on_next(2); d.dispose(); obs.on_completed(); }).as_dynamic(), rpp::source::create([&](auto&&) { FAIL("Shouldn't be called"); }).as_dynamic(), rpp::source::just(TestType{}, 3).as_dynamic()) | rpp::operators::concat(); @@ -314,43 +314,43 @@ TEMPLATE_TEST_CASE("concat as operator", "", rpp::schedulers::current_thread, rp } SECTION("concat tracks actual upstream") { - auto d = std::make_shared(); - auto d1 = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); + auto d1 = rpp::composite_disposable_wrapper::make(); auto observable = rpp::source::just(TestType{}, rpp::source::create([&](auto&& obs) { obs.set_upstream(rpp::disposable_wrapper{d1}); - CHECK(!d->is_disposed()); - CHECK(!d1->is_disposed()); + CHECK(!d.is_disposed()); + CHECK(!d1.is_disposed()); - d->dispose(); + d.dispose(); - CHECK(d->is_disposed()); - CHECK(d1->is_disposed()); + CHECK(d.is_disposed()); + CHECK(d1.is_disposed()); })) | rpp::operators::concat(); observable.subscribe(rpp::composite_disposable_wrapper{d}, mock); } SECTION("concat tracks actual upstream for 2 upstreams") { - auto d = std::make_shared(); - auto d1 = std::make_shared(); - auto d2 = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); + auto d1 = rpp::composite_disposable_wrapper::make(); + auto d2 = rpp::composite_disposable_wrapper::make(); auto observable = rpp::source::just(TestType{}, rpp::source::create([&](auto&& obs) { obs.set_upstream(rpp::disposable_wrapper{d1}); obs.on_completed(); }).as_dynamic(), rpp::source::create([&](auto&& obs) { obs.set_upstream(rpp::disposable_wrapper{d2}); - CHECK(!d->is_disposed()); - CHECK(d1->is_disposed()); - CHECK(!d2->is_disposed()); + CHECK(!d.is_disposed()); + CHECK(d1.is_disposed()); + CHECK(!d2.is_disposed()); - d->dispose(); + d.dispose(); - CHECK(d->is_disposed()); - CHECK(d2->is_disposed()); + CHECK(d.is_disposed()); + CHECK(d2.is_disposed()); }).as_dynamic()) | rpp::operators::concat(); diff --git a/src/tests/rpp/test_connectable_observable.cpp b/src/tests/rpp/test_connectable_observable.cpp index a27a256a1..c899e23b2 100644 --- a/src/tests/rpp/test_connectable_observable.cpp +++ b/src/tests/rpp/test_connectable_observable.cpp @@ -21,7 +21,7 @@ TEST_CASE("connectable observable") { auto mock = mock_observer_strategy{}; - auto d = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); SECTION("source and connectable observable from it") { @@ -36,7 +36,7 @@ TEST_CASE("connectable observable") CHECK(mock.get_total_on_next_count() == 0); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); - CHECK(!d->is_disposed()); + CHECK(!d.is_disposed()); } SECTION("call connect") { @@ -46,7 +46,7 @@ TEST_CASE("connectable observable") CHECK(mock.get_received_values() == std::vector{ 1 }); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); - CHECK(d->is_disposed()); + CHECK(d.is_disposed()); CHECK(sub_connectable.is_disposed()); } } @@ -59,7 +59,7 @@ TEST_CASE("connectable observable") CHECK(mock.get_received_values() == std::vector{ 1 }); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); - CHECK(d->is_disposed()); + CHECK(d.is_disposed()); CHECK(sub_connectable.is_disposed()); } } @@ -73,7 +73,7 @@ TEST_CASE("connectable observable") CHECK(mock.get_received_values() == std::vector{ 10 }); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); - CHECK(d->is_disposed()); + CHECK(d.is_disposed()); CHECK(sub_connectable.is_disposed()); } } @@ -104,7 +104,7 @@ TEST_CASE("connectable observable") CHECK(mock.get_received_values() == std::vector{ 1 }); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); - CHECK(!d->is_disposed()); + CHECK(!d.is_disposed()); CHECK(!sub_connectable.is_disposed()); } } @@ -117,7 +117,7 @@ TEST_CASE("connectable observable") CHECK(mock.get_total_on_next_count() == 0); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); - CHECK(!d->is_disposed()); + CHECK(!d.is_disposed()); CHECK(sub_connectable.is_disposed()); CHECK(!source.get_disposable().is_disposed()); } @@ -131,7 +131,7 @@ TEST_CASE("connectable observable") CHECK(mock.get_total_on_next_count() == 1); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); - CHECK(!d->is_disposed()); + CHECK(!d.is_disposed()); CHECK(sub_connectable.is_disposed()); CHECK(!source.get_disposable().is_disposed()); } @@ -145,7 +145,7 @@ TEST_CASE("connectable observable") CHECK(mock.get_total_on_next_count() == 0); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); - CHECK(d->is_disposed()); + CHECK(d.is_disposed()); CHECK(sub_connectable.is_disposed()); } SECTION("connect again and send values") @@ -157,7 +157,7 @@ TEST_CASE("connectable observable") CHECK(mock.get_total_on_next_count() == 0); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); - CHECK(d->is_disposed()); + CHECK(d.is_disposed()); CHECK(sub_connectable.is_disposed()); CHECK(new_sub_connectable.is_disposed()); } @@ -246,7 +246,7 @@ TEST_CASE("ref_count") SECTION("subscribe on it without ref_count and with ref_count") { observable.subscribe(observer_1); - auto sub = std::make_shared(); + auto sub = rpp::composite_disposable_wrapper::make(); observable.ref_count().subscribe(rpp::composite_disposable_wrapper{sub}, observer_2); SECTION("send value") { @@ -266,7 +266,7 @@ TEST_CASE("ref_count") } SECTION("unsubscribe observer with ref_count and send value") { - sub->dispose(); + sub.dispose(); subj.get_observer().on_next(1); SECTION("no observers obtain values") { @@ -281,7 +281,7 @@ TEST_CASE("ref_count") } SECTION("subscribe via ref_count again and send value") { - sub = std::make_shared(); + sub = rpp::composite_disposable_wrapper::make(); observable.ref_count().subscribe(rpp::composite_disposable_wrapper{sub},observer_2); subj.get_observer().on_next(1); SECTION("both observers obtain values") @@ -302,7 +302,7 @@ TEST_CASE("ref_count") SECTION("subscribe both with ref_count") { observable.ref_count().subscribe(observer_1); - auto sub = std::make_shared(); + auto sub = rpp::composite_disposable_wrapper::make(); observable.ref_count().subscribe(rpp::composite_disposable_wrapper{sub}, observer_2); SECTION("send value") { @@ -322,7 +322,7 @@ TEST_CASE("ref_count") } SECTION("unsubscribe 1 observer with ref_count and send value") { - sub->dispose(); + sub.dispose(); subj.get_observer().on_next(1); SECTION("first observer obtains values") { diff --git a/src/tests/rpp/test_delay.cpp b/src/tests/rpp/test_delay.cpp index 9cb28872c..854207327 100644 --- a/src/tests/rpp/test_delay.cpp +++ b/src/tests/rpp/test_delay.cpp @@ -40,7 +40,7 @@ namespace s_test_queue.emplace(rpp::schedulers::time_point{duration}, std::forward(fn), std::forward(handler), std::forward(args)...); } - static rpp::disposable_wrapper get_disposable() {return rpp::disposable_wrapper{std::make_shared()}; } + static rpp::disposable_wrapper get_disposable() {return rpp::disposable_wrapper::make(); } static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); } }; diff --git a/src/tests/rpp/test_disposables.cpp b/src/tests/rpp/test_disposables.cpp index 5203a6a43..25e9772a1 100644 --- a/src/tests/rpp/test_disposables.cpp +++ b/src/tests/rpp/test_disposables.cpp @@ -27,7 +27,7 @@ struct custom_disposable : public rpp::interface_disposable TEMPLATE_TEST_CASE("disposable keeps state", "", rpp::details::disposables::dynamic_disposables_container<0>, rpp::details::disposables::static_disposables_container<1>) { - auto d = rpp::composite_disposable_wrapper{std::make_shared>()}; + auto d = rpp::composite_disposable_wrapper::make>(); CHECK(!d.is_disposed()); @@ -48,36 +48,36 @@ TEMPLATE_TEST_CASE("disposable keeps state", "", rpp::details::disposables::dyna SECTION("add other disposable") { - auto other = std::make_shared(); - CHECK(!other->is_disposed()); + auto other = rpp::composite_disposable_wrapper::make(); + CHECK(!other.is_disposed()); d.add(other); SECTION("calling dispose on original disposable forces both of them to be disposed") { d.dispose(); - CHECK(other->is_disposed()); + CHECK(other.is_disposed()); CHECK(d.is_disposed()); } SECTION("calling clear on original disposable forces inner to be disposed") { d.clear(); - CHECK(other->is_disposed()); + CHECK(other.is_disposed()); CHECK(!d.is_disposed()); - other = std::make_shared(); - CHECK(!other->is_disposed()); + other = rpp::composite_disposable_wrapper::make(); + CHECK(!other.is_disposed()); d.add(other); - CHECK(!other->is_disposed()); + CHECK(!other.is_disposed()); d.clear(); - CHECK(other->is_disposed()); + CHECK(other.is_disposed()); CHECK(!d.is_disposed()); } SECTION("calling clear on disposed disposable") { d.dispose(); - CHECK(other->is_disposed()); + CHECK(other.is_disposed()); CHECK(d.is_disposed()); d.clear(); } @@ -86,24 +86,24 @@ TEMPLATE_TEST_CASE("disposable keeps state", "", rpp::details::disposables::dyna { d.remove(other); d.dispose(); - CHECK(!other->is_disposed()); + CHECK(!other.is_disposed()); CHECK(d.is_disposed()); } SECTION("calling dispose on other disposable forces only other to be disposed") { - other->dispose(); - CHECK(other->is_disposed()); + other.dispose(); + CHECK(other.is_disposed()); CHECK(!d.is_disposed()); } } SECTION("add disposed disposable") { - auto other = std::make_shared(); - other->dispose(); + auto other = rpp::composite_disposable_wrapper::make(); + other.dispose(); d.add(other); - CHECK(other->is_disposed()); + CHECK(other.is_disposed()); CHECK(!d.is_disposed()); } @@ -113,27 +113,61 @@ TEMPLATE_TEST_CASE("disposable keeps state", "", rpp::details::disposables::dyna SECTION("adding non disposed disposable to empty forces it to be disposed") { - auto other = std::make_shared(); - CHECK(!other->is_disposed()); + auto other = rpp::composite_disposable_wrapper::make(); + CHECK(!other.is_disposed()); d.add(other); - CHECK(other->is_disposed()); + CHECK(other.is_disposed()); } } SECTION("empty disposable") { - d = rpp::composite_disposable_wrapper{}; + d = rpp::composite_disposable_wrapper::empty(); CHECK(d.is_disposed()); d.dispose(); SECTION("adding non disposed disposable to empty forces it to be disposed") { - auto other = std::make_shared(); - CHECK(!other->is_disposed()); + auto other = rpp::composite_disposable_wrapper::make(); + CHECK(!other.is_disposed()); d.add(other); - CHECK(other->is_disposed()); + CHECK(other.is_disposed()); } } + // SECTION("disposable dispose on destruction") + // { + // { + // auto other = rpp::composite_disposable_wrapper::make(); + // CHECK(!other.is_disposed()); + // CHECK(!d.is_disposed()); + // other.add(d); + // CHECK(!other.is_disposed()); + // CHECK(!d.is_disposed()); + // } + // CHECK(d.is_disposed()); + // } + + SECTION("add callback_disposable") + { + size_t invoked_count{}; + d.add([&invoked_count]()noexcept { + ++invoked_count; + }); + CHECK(invoked_count == 0); + d.dispose(); + CHECK(invoked_count == 1); + } + + SECTION("add callback_disposable to disposed disposable") + { + d.dispose(); + + size_t invoked_count{}; + d.add([&invoked_count]()noexcept { + ++invoked_count; + }); + CHECK(invoked_count == 1); + } SECTION("add self") { d.add(d); @@ -155,48 +189,48 @@ TEMPLATE_TEST_CASE("disposable keeps state", "", rpp::details::disposables::dyna TEST_CASE("refcount disposable dispose underlying in case of reaching zero") { - auto refcount = std::make_shared(); - auto refcounted = refcount->add_ref(); - auto underlying = std::make_shared(); - refcount->add(underlying); + auto refcount = rpp::disposable_wrapper_impl::make(); + auto refcounted = refcount.lock()->add_ref(); + auto underlying = rpp::disposable_wrapper_impl::make(); + refcount.add(underlying); - CHECK(!underlying->is_disposed()); + CHECK(!underlying.is_disposed()); CHECK(!refcounted.is_disposed()); - CHECK(!refcount->is_disposed()); + CHECK(!refcount.is_disposed()); SECTION("disposing refcounted as is disposes underlying") { refcounted.dispose(); - CHECK(underlying->dispose_count == 1); + CHECK(underlying.lock()->dispose_count == 1); CHECK(refcounted.is_disposed()); - CHECK(refcount->is_disposed()); + CHECK(refcount.is_disposed()); SECTION("additional disposing does nothing") { refcounted.dispose(); - CHECK(underlying->dispose_count == 1); + CHECK(underlying.lock()->dispose_count == 1); CHECK(refcounted.is_disposed()); - CHECK(refcount->is_disposed()); + CHECK(refcount.is_disposed()); } SECTION("addref and disposing does nothing") { - auto d = refcount->add_ref(); + auto d = refcount.lock()->add_ref(); CHECK(d.is_disposed()); refcounted.dispose(); - CHECK(underlying->dispose_count == 1); + CHECK(underlying.lock()->dispose_count == 1); CHECK(refcounted.is_disposed()); - CHECK(refcount->is_disposed()); + CHECK(refcount.is_disposed()); } } SECTION("disposing added to underlying not disposes refcount") { - underlying->dispose(); + underlying.dispose(); - CHECK(underlying->dispose_count == 1); - CHECK(!refcount->is_disposed()); + CHECK(underlying.lock()->dispose_count == 1); + CHECK(!refcount.is_disposed()); CHECK(!refcounted.is_disposed()); } @@ -205,34 +239,34 @@ TEST_CASE("refcount disposable dispose underlying in case of reaching zero") size_t count = 5; std::vector disposables{}; for (size_t i = 0; i < count; ++i) - disposables.push_back(refcount->add_ref()); + disposables.push_back(refcount.lock()->add_ref()); - CHECK(!refcount->is_disposed()); + CHECK(!refcount.is_disposed()); CHECK(!refcounted.is_disposed()); for (size_t i = 0; i < 10*count; ++i) refcounted.dispose(); CHECK(refcounted.is_disposed()); - CHECK(!underlying->is_disposed()); + CHECK(!underlying.lock()->is_disposed()); for(auto& d : disposables) { - CHECK(!underlying->is_disposed()); + CHECK(!underlying.lock()->is_disposed()); CHECK(!d.is_disposed()); d.dispose(); CHECK(d.is_disposed()); } - CHECK(underlying->dispose_count == 1); + CHECK(underlying.lock()->dispose_count == 1); } } TEST_CASE("composite_disposable correctly handles exception") { - auto d = rpp::composite_disposable_wrapper{std::make_shared>>()}; - auto d1 = rpp::composite_disposable_wrapper{std::make_shared()}; - auto d2 = rpp::composite_disposable_wrapper{std::make_shared()}; + auto d = rpp::composite_disposable_wrapper::make>>(); + auto d1 = rpp::composite_disposable_wrapper::make(); + auto d2 = rpp::composite_disposable_wrapper::make(); d.add(d1); CHECK_THROWS_AS(d.add(d2), rpp::utils::more_disposables_than_expected); CHECK(!d1.is_disposed()); @@ -247,8 +281,8 @@ TEST_CASE("static_disposable_container works as expected") { rpp::details::disposables::static_disposables_container<2> container{}; - auto d1 = rpp::composite_disposable_wrapper{std::make_shared()}; - auto d2 = rpp::composite_disposable_wrapper{std::make_shared()}; + auto d1 = rpp::composite_disposable_wrapper::make(); + auto d2 = rpp::composite_disposable_wrapper::make(); SECTION("dispose empty") { diff --git a/src/tests/rpp/test_group_by.cpp b/src/tests/rpp/test_group_by.cpp index 9064730e0..6bce9d4b6 100644 --- a/src/tests/rpp/test_group_by.cpp +++ b/src/tests/rpp/test_group_by.cpp @@ -107,7 +107,7 @@ TEST_CASE("group_by emits grouped seqences of values with identity key selector" TEST_CASE("group_by keeps subscription till anyone subscribed") { - auto observable_upstream = std::make_shared(); + auto observable_upstream = rpp::composite_disposable_wrapper::make(); std::optional> extracted{}; auto observable = rpp::source::create([&](auto&& obs) @@ -123,53 +123,53 @@ TEST_CASE("group_by keeps subscription till anyone subscribed") size_t on_completed_count = 0; auto on_error = [&](const std::exception_ptr&){++on_error_count;}; auto on_completed = [&](){++on_completed_count;}; - auto d = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); observable.subscribe(d, [&](const auto& observable) { - auto d = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); observable.subscribe(d, [](auto){}, on_error, on_completed); disposables.push_back(d); }, on_error, on_completed); REQUIRE(extracted.has_value()); - REQUIRE(!d->is_disposed()); - REQUIRE(!observable_upstream->is_disposed()); + REQUIRE(!d.is_disposed()); + REQUIRE(!observable_upstream.is_disposed()); REQUIRE(disposables.size() == 2); REQUIRE(rpp::utils::all_of(disposables, [](const auto& d){return !d.is_disposed();})); SECTION("dispose root") { - d->dispose(); + d.dispose(); REQUIRE(rpp::utils::all_of(disposables, [](const auto& d){return !d.is_disposed();})); - REQUIRE(!observable_upstream->is_disposed()); + REQUIRE(!observable_upstream.is_disposed()); } SECTION("disposing other disposables") { rpp::utils::for_each(disposables, std::mem_fn(&rpp::composite_disposable_wrapper::dispose)); - REQUIRE(!d->is_disposed()); - REQUIRE(!observable_upstream->is_disposed()); + REQUIRE(!d.is_disposed()); + REQUIRE(!observable_upstream.is_disposed()); } SECTION("dispose all") { - d->dispose(); + d.dispose(); rpp::utils::for_each(disposables, std::mem_fn(&rpp::composite_disposable_wrapper::dispose)); - REQUIRE(observable_upstream->is_disposed()); - REQUIRE(d->is_disposed()); + REQUIRE(observable_upstream.is_disposed()); + REQUIRE(d.is_disposed()); REQUIRE(rpp::utils::all_of(disposables, [](const auto& d){return d.is_disposed();})); } SECTION("send on_error") { extracted->on_error(std::make_exception_ptr(std::runtime_error{""})); - REQUIRE(d->is_disposed()); - REQUIRE(observable_upstream->is_disposed()); + REQUIRE(d.is_disposed()); + REQUIRE(observable_upstream.is_disposed()); REQUIRE(rpp::utils::all_of(disposables, [](const auto& d){return d.is_disposed();})); REQUIRE(on_error_count == disposables.size()+1); } SECTION("send on_completed") { extracted->on_completed(); - REQUIRE(d->is_disposed()); - REQUIRE(observable_upstream->is_disposed()); + REQUIRE(d.is_disposed()); + REQUIRE(observable_upstream.is_disposed()); REQUIRE(rpp::utils::all_of(disposables, [](const auto& d){return d.is_disposed();})); REQUIRE(on_completed_count == disposables.size()+1); } diff --git a/src/tests/rpp/test_merge.cpp b/src/tests/rpp/test_merge.cpp index 3a8fcda14..0655dda3e 100644 --- a/src/tests/rpp/test_merge.cpp +++ b/src/tests/rpp/test_merge.cpp @@ -260,10 +260,10 @@ TEMPLATE_TEST_CASE("merge handles race condition", "", rpp::memory_model::use_st TEST_CASE("merge dispose inner_disposable immediately") { rpp::source::create([](auto&& d){ - auto disposable = std::make_shared(); + auto disposable = rpp::composite_disposable_wrapper::make(); d.set_upstream(rpp::disposable_wrapper{disposable}); d.on_completed(); - CHECK(disposable->is_disposed()); + CHECK(disposable.is_disposed()); }) | rpp::ops::merge_with(rpp::source::never()) | rpp::ops::subscribe([](int){}); @@ -292,11 +292,11 @@ TEST_CASE("merge doesn't produce extra copies") TEST_CASE("merge satisfies disposable contracts") { - auto observable_disposable = std::make_shared(); + auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); test_operator_with_disposable(rpp::ops::merge_with(observable)); } - CHECK(observable_disposable->is_disposed() || observable_disposable.use_count() == 1); + CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); } \ No newline at end of file diff --git a/src/tests/rpp/test_observables.cpp b/src/tests/rpp/test_observables.cpp index 21c18c751..ffa4da6f3 100644 --- a/src/tests/rpp/test_observables.cpp +++ b/src/tests/rpp/test_observables.cpp @@ -52,28 +52,28 @@ TEST_CASE("create observable works properly as observable") SECTION("subscribe disposed callbacks") { - observable.subscribe(rpp::composite_disposable_wrapper{}, [](int) {}, [](const std::exception_ptr&) {}, []() {}); + observable.subscribe(rpp::composite_disposable_wrapper::empty(), [](int) {}, [](const std::exception_ptr&) {}, []() {}); CHECK(on_subscribe_called == 0u); } SECTION("subscribe disposed observer") { - observable.subscribe(rpp::composite_disposable_wrapper{}, rpp::make_lambda_observer([](int) {}, [](const std::exception_ptr&) {}, []() {})); + observable.subscribe(rpp::composite_disposable_wrapper::empty(), rpp::make_lambda_observer([](int) {}, [](const std::exception_ptr&) {}, []() {})); CHECK(on_subscribe_called == 0u); } SECTION("subscribe non-disposed callbacks") { - observable.subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, [](int) {}, [](const std::exception_ptr&) {}, []() {}); + observable.subscribe(rpp::composite_disposable_wrapper::make(), [](int) {}, [](const std::exception_ptr&) {}, []() {}); CHECK(on_subscribe_called == 1u); } SECTION("subscribe non-disposed observer") { - observable.subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, rpp::make_lambda_observer([](int) {}, [](const std::exception_ptr&) {}, []() {})); + observable.subscribe(rpp::composite_disposable_wrapper::make(), rpp::make_lambda_observer([](int) {}, [](const std::exception_ptr&) {}, []() {})); CHECK(on_subscribe_called == 1u); } diff --git a/src/tests/rpp/test_observers.cpp b/src/tests/rpp/test_observers.cpp index 84e9d6908..9e6bc7dc7 100644 --- a/src/tests/rpp/test_observers.cpp +++ b/src/tests/rpp/test_observers.cpp @@ -84,11 +84,11 @@ TEST_CASE("as_dynamic keeps disposing") SECTION("set upstream, convert to dynamic and dispose") { - auto d = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); observer.set_upstream(rpp::disposable_wrapper{d}); auto dynamic = std::forward(observer).as_dynamic(); dynamic.on_completed(); - CHECK(d->is_disposed()); + CHECK(d.is_disposed()); } SECTION("convert to dynamic, copy and dispose") @@ -106,20 +106,20 @@ TEST_CASE("as_dynamic keeps disposing") } SECTION("observer with disposable") { - check(rpp::make_lambda_observer(rpp::composite_disposable_wrapper{std::make_shared()}, [](int) {}, [](const std::exception_ptr&) {}, []() {})); + check(rpp::make_lambda_observer(rpp::composite_disposable_wrapper::make(), [](int) {}, [](const std::exception_ptr&) {}, []() {})); } SECTION("observer with disposed disposable") { - check(rpp::make_lambda_observer(rpp::composite_disposable_wrapper{std::make_shared()}, [](int) {}, [](const std::exception_ptr&) {}, []() {})); + check(rpp::make_lambda_observer(rpp::composite_disposable_wrapper::make(), [](int) {}, [](const std::exception_ptr&) {}, []() {})); } } TEST_CASE("observer disposes disposable on termination callbacks") { - auto d = rpp::composite_disposable_wrapper{std::make_shared()}; + auto d = rpp::composite_disposable_wrapper::make(); auto observer = rpp::make_lambda_observer(d, [](int) {}, [](const std::exception_ptr&) {}, []() {}); - auto upstream = rpp::disposable_wrapper{std::make_shared()}; + auto upstream = rpp::disposable_wrapper::make(); observer.set_upstream(upstream); CHECK(!d.is_disposed()); @@ -149,7 +149,7 @@ TEST_CASE("set_upstream without base disposable makes it main disposalbe") auto test_observer = [](auto&& observer) { - auto upstream = rpp::disposable_wrapper{std::make_shared()}; + auto upstream = rpp::disposable_wrapper::make(); observer.set_upstream(upstream); CHECK(!upstream.is_disposed()); CHECK(!observer.is_disposed()); @@ -180,37 +180,37 @@ TEST_CASE("set_upstream can be called multiple times") { auto check = [](auto&& observer) { - auto d1 = std::make_shared(); + auto d1 = rpp::composite_disposable_wrapper::make(); observer.set_upstream(rpp::disposable_wrapper{d1}); - CHECK(d1->is_disposed() == observer.is_disposed()); - auto d2 = std::make_shared(); + CHECK(d1.is_disposed() == observer.is_disposed()); + auto d2 = rpp::composite_disposable_wrapper::make(); observer.set_upstream(rpp::disposable_wrapper{d2}); - CHECK(d1->is_disposed() == observer.is_disposed()); - CHECK(d2->is_disposed() == observer.is_disposed()); + CHECK(d1.is_disposed() == observer.is_disposed()); + CHECK(d2.is_disposed() == observer.is_disposed()); observer.on_completed(); - CHECK(d1->is_disposed()); - CHECK(d2->is_disposed()); + CHECK(d1.is_disposed()); + CHECK(d2.is_disposed()); }; SECTION("observer") check(rpp::make_lambda_observer([](int) {}, [](const std::exception_ptr&) {}, []() {})); SECTION("observer with disposable") - check(rpp::make_lambda_observer(rpp::composite_disposable_wrapper{std::make_shared()}, [](int) {}, [](const std::exception_ptr&) {}, []() {})); + check(rpp::make_lambda_observer(rpp::composite_disposable_wrapper::make(), [](int) {}, [](const std::exception_ptr&) {}, []() {})); SECTION("observer with empty disposable") - check(rpp::make_lambda_observer(rpp::composite_disposable_wrapper{}, [](int) {}, [](const std::exception_ptr&) {}, []() {})); + check(rpp::make_lambda_observer(rpp::composite_disposable_wrapper::empty(), [](int) {}, [](const std::exception_ptr&) {}, []() {})); } TEST_CASE("set_upstream depends on base disposable") { - auto d = rpp::composite_disposable_wrapper{std::make_shared()}; + auto d = rpp::composite_disposable_wrapper::make(); auto original_observer = rpp::make_lambda_observer(d, [](int) {}, [](const std::exception_ptr&) {}, []() {}); auto test_observer = [&d](auto&& observer) { - auto upstream = rpp::disposable_wrapper{std::make_shared()}; + auto upstream = rpp::disposable_wrapper::make(); CHECK(!d.is_disposed()); CHECK(!upstream.is_disposed()); @@ -246,11 +246,11 @@ TEST_CASE("set_upstream depends on base disposable") TEST_CASE("set_upstream disposing when empty base disposable") { - auto original_observer = rpp::make_lambda_observer(rpp::composite_disposable_wrapper{}, [](int) {}, [](const std::exception_ptr&) {}, []() {}); + auto original_observer = rpp::make_lambda_observer(rpp::composite_disposable_wrapper::empty(), [](int) {}, [](const std::exception_ptr&) {}, []() {}); auto test_observer = [](auto&& observer) { - auto upstream = rpp::disposable_wrapper{std::make_shared()}; + auto upstream = rpp::disposable_wrapper::make(); CHECK(!upstream.is_disposed()); CHECK(observer.is_disposed()); diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index 6f7906245..5bdac5ab0 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -163,7 +163,7 @@ static std::string simulate_complex_scheduling_with_delay(const auto& worker, co TEST_CASE("Immediate scheduler") { auto scheduler = rpp::schedulers::immediate{}; - auto d = rpp::composite_disposable_wrapper{std::make_shared()}; + auto d = rpp::composite_disposable_wrapper::make(); auto mock_obs = mock_observer_strategy{}; auto obs = mock_obs.get_observer(d).as_dynamic(); @@ -349,7 +349,7 @@ TEST_CASE("Immediate scheduler") TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, rpp::schedulers::new_thread) { - auto d = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); auto mock_obs = mock_observer_strategy{}; auto obs = std::optional{mock_obs.get_observer(d).as_dynamic()}; @@ -389,7 +389,7 @@ TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, { worker.reset(); obs.reset(); - d.reset(); + d = rpp::composite_disposable_wrapper::empty(); while(!done->load()){}; }; @@ -552,7 +552,7 @@ TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, SECTION("scheduler does nothing with disposed observer") { - d->dispose(); + d.dispose(); worker->schedule([&call_count](const auto&) -> rpp::schedulers::optional_delay_from_now { ++call_count; @@ -568,7 +568,7 @@ TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, { worker->schedule([&call_count, d, worker](const auto& obs) -> rpp::schedulers::optional_delay_from_now { - d->dispose(); + d.dispose(); worker->schedule([&call_count](const auto&) -> rpp::schedulers::optional_delay_from_now { ++call_count; @@ -589,7 +589,7 @@ TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, worker->schedule([&call_count, d](const auto&) -> rpp::schedulers::optional_delay_from_now { if (++call_count > 1) - d->dispose(); + d.dispose(); return rpp::schedulers::optional_delay_from_now{std::chrono::nanoseconds{1}}; }, obs.value()); @@ -605,7 +605,7 @@ TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, worker->schedule([&call_count, d](const auto&) -> rpp::schedulers::optional_delay_from_now { if (++call_count > 1) - d->dispose(); + d.dispose(); return rpp::schedulers::optional_delay_from_now{std::chrono::nanoseconds{1}}; }, obs); @@ -625,7 +625,7 @@ TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, worker->schedule([&call_count, d](const auto&) -> rpp::schedulers::optional_delay_from_now { if (++call_count > 1) - d->dispose(); + d.dispose(); return rpp::schedulers::optional_delay_from_now{std::chrono::nanoseconds{1}}; }, obs); return std::nullopt; @@ -647,7 +647,7 @@ TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, return rpp::schedulers::optional_delay_from_now{std::chrono::nanoseconds{1}}; }, obs); - d->dispose(); + d.dispose(); return rpp::schedulers::optional_delay_from_now{std::chrono::nanoseconds{1}}; }, obs.value()); @@ -746,7 +746,7 @@ TEST_CASE("run_loop scheduler dispatches tasks only manually") { auto scheduler = rpp::schedulers::run_loop{}; auto worker = scheduler.create_worker(); - auto d = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); auto obs = mock_observer_strategy{}.get_observer(d).as_dynamic(); SECTION("submit 3 tasks to run_loop") @@ -755,7 +755,7 @@ TEST_CASE("run_loop scheduler dispatches tasks only manually") size_t schedulable_2_executed_count{}; size_t schedulable_3_executed_count{}; worker.schedule([&](const auto&) -> rpp::schedulers::optional_delay_from_now {++schedulable_1_executed_count; return {}; }, obs); - worker.schedule([&](const auto&) -> rpp::schedulers::optional_delay_from_now {++schedulable_2_executed_count; d->dispose(); return {}; }, obs); + worker.schedule([&](const auto&) -> rpp::schedulers::optional_delay_from_now {++schedulable_2_executed_count; d.dispose(); return {}; }, obs); worker.schedule([&](const auto&) -> rpp::schedulers::optional_delay_from_now {++schedulable_3_executed_count; return {}; }, obs); SECTION("nothing happens but scheduler has schedulable to dispatch") @@ -763,7 +763,7 @@ TEST_CASE("run_loop scheduler dispatches tasks only manually") CHECK(schedulable_1_executed_count == 0); CHECK(schedulable_2_executed_count == 0); CHECK(schedulable_3_executed_count == 0); - CHECK(d->is_disposed() == false); + CHECK(d.is_disposed() == false); CHECK(scheduler.is_empty() == false); CHECK(scheduler.is_any_ready_schedulable() == true); } @@ -775,7 +775,7 @@ TEST_CASE("run_loop scheduler dispatches tasks only manually") CHECK(schedulable_1_executed_count == 1); CHECK(schedulable_2_executed_count == 0); CHECK(schedulable_3_executed_count == 0); - CHECK(d->is_disposed() == false); + CHECK(d.is_disposed() == false); CHECK(scheduler.is_empty() == false); CHECK(scheduler.is_any_ready_schedulable() == true); @@ -787,7 +787,7 @@ TEST_CASE("run_loop scheduler dispatches tasks only manually") CHECK(schedulable_1_executed_count == 1); CHECK(schedulable_2_executed_count == 1); CHECK(schedulable_3_executed_count == 0); - CHECK(d->is_disposed() == true); + CHECK(d.is_disposed() == true); CHECK(scheduler.is_empty() == false); CHECK(scheduler.is_any_ready_schedulable() == true); } @@ -799,7 +799,7 @@ TEST_CASE("run_loop scheduler dispatches tasks only manually") CHECK(schedulable_1_executed_count == 1); CHECK(schedulable_2_executed_count == 1); CHECK(schedulable_3_executed_count == 0); - CHECK(d->is_disposed() == true); + CHECK(d.is_disposed() == true); CHECK(scheduler.is_empty() == true); CHECK(scheduler.is_any_ready_schedulable() == false); } @@ -819,7 +819,7 @@ TEST_CASE("run_loop scheduler dispatches tasks only manually") SECTION("only first schedulable dispatched") { CHECK(schedulable_1_executed_count == 1); - CHECK(d->is_disposed() == false); + CHECK(d.is_disposed() == false); CHECK(scheduler.is_empty() == true); CHECK(scheduler.is_any_ready_schedulable() == false); diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index a37a1c61e..f1e22a354 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -31,8 +31,8 @@ TEST_CASE("publish subject multicasts values") auto sub = rpp::subjects::publish_subject{}; SECTION("subscribe multiple observers") { - auto dis_1 = std::make_shared(); - auto dis_2 = std::make_shared(); + auto dis_1 = rpp::composite_disposable_wrapper::make(); + auto dis_2 = rpp::composite_disposable_wrapper::make(); sub.get_observable().subscribe(mock_1.get_observer(dis_1)); sub.get_observable().subscribe(mock_2.get_observer(dis_2)); @@ -114,10 +114,10 @@ TEST_CASE("publish subject multicasts values") SECTION("first subscriber unsubscribes and then emit value") { // 1 native, 1 inside subject - CHECK(dis_1.use_count() == 2); - dis_1->dispose(); + // CHECK(dis_1.use_count() == 2); + dis_1.dispose(); // only this 1 native - CHECK(dis_1.use_count() == 1); + // CHECK(dis_1.use_count() == 1); sub.get_observer().on_next(1); SECTION("observers obtain value") diff --git a/src/tests/rpp/test_subscribe.cpp b/src/tests/rpp/test_subscribe.cpp index e6a141d49..a3351415f 100644 --- a/src/tests/rpp/test_subscribe.cpp +++ b/src/tests/rpp/test_subscribe.cpp @@ -29,16 +29,16 @@ TEMPLATE_TEST_CASE("subscribe as operator", "", rpp::memory_model::use_stack, rp SECTION("subscribe observer strategy with disposable") { - static_assert(std::is_same_v()}, mock)), rpp::composite_disposable_wrapper>); - auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, mock); + static_assert(std::is_same_v); + auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper::make(), mock); CHECK(d.is_disposed()); CHECK(mock.get_received_values() == std::vector{1}); } SECTION("subscribe observer strategy with disposed disposable") { - static_assert(std::is_same_v); - auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper{}, mock); + static_assert(std::is_same_v); + auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper::empty(), mock); CHECK(d.is_disposed()); CHECK(mock.get_received_values().empty()); } @@ -52,16 +52,16 @@ TEMPLATE_TEST_CASE("subscribe as operator", "", rpp::memory_model::use_stack, rp SECTION("subscribe observer with disposable") { - static_assert(std::is_same_v()}, mock.get_observer())), rpp::composite_disposable_wrapper>); - auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, mock.get_observer()); + static_assert(std::is_same_v); + auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper::make(), mock.get_observer()); CHECK(d.is_disposed()); CHECK(mock.get_received_values() == std::vector{1}); } SECTION("subscribe observer with disposed disposable") { - static_assert(std::is_same_v); - auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper{}, mock.get_observer()); + static_assert(std::is_same_v); + auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper::empty(), mock.get_observer()); CHECK(d.is_disposed()); CHECK(mock.get_received_values().empty()); } @@ -75,16 +75,16 @@ TEMPLATE_TEST_CASE("subscribe as operator", "", rpp::memory_model::use_stack, rp SECTION("subscribe dynamic observer with disposable") { - static_assert(std::is_same_v()}, mock.get_observer().as_dynamic())), rpp::composite_disposable_wrapper>); - auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, mock.get_observer().as_dynamic()); + static_assert(std::is_same_v); + auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper::make(), mock.get_observer().as_dynamic()); CHECK(d.is_disposed()); CHECK(mock.get_received_values() == std::vector{1}); } SECTION("subscribe dynamic observer with disposed disposable") { - static_assert(std::is_same_v); - auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper{}, mock.get_observer().as_dynamic()); + static_assert(std::is_same_v); + auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper::empty(), mock.get_observer().as_dynamic()); CHECK(d.is_disposed()); CHECK(mock.get_received_values().empty()); } @@ -98,16 +98,16 @@ TEMPLATE_TEST_CASE("subscribe as operator", "", rpp::memory_model::use_stack, rp SECTION("subscribe lambdas with disposable") { - static_assert(std::is_same_v()}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{})), rpp::composite_disposable_wrapper>); - auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, [&mock](const auto& v){ mock.on_next(v);}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{}); + static_assert(std::is_same_v{}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{})), rpp::composite_disposable_wrapper>); + auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper::make(), [&mock](const auto& v){ mock.on_next(v);}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{}); CHECK(d.is_disposed()); CHECK(mock.get_received_values() == std::vector{1}); } SECTION("subscribe lambdas with disposed disposable") { - static_assert(std::is_same_v{}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{})), rpp::composite_disposable_wrapper>); - auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper{}, [&mock](const auto& v){ mock.on_next(v);}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{}); + static_assert(std::is_same_v{}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{})), rpp::composite_disposable_wrapper>); + auto d = observable | rpp::operators::subscribe(rpp::composite_disposable_wrapper::empty(), [&mock](const auto& v){ mock.on_next(v);}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{}); CHECK(d.is_disposed()); CHECK(mock.get_received_values().empty()); } @@ -127,16 +127,16 @@ TEMPLATE_TEST_CASE("subscribe as member", "", rpp::memory_model::use_stack, rpp: SECTION("subscribe observer strategy with disposable") { - static_assert(std::is_same_v()}, mock)), rpp::composite_disposable_wrapper>); - auto d = observable.subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, mock); + static_assert(std::is_same_v); + auto d = observable.subscribe(rpp::composite_disposable_wrapper::make(), mock); CHECK(d.is_disposed()); CHECK(mock.get_received_values() == std::vector{1}); } SECTION("subscribe observer strategy with disposed disposable") { - static_assert(std::is_same_v); - auto d = observable.subscribe(rpp::composite_disposable_wrapper{}, mock); + static_assert(std::is_same_v); + auto d = observable.subscribe(rpp::composite_disposable_wrapper::empty(), mock); CHECK(d.is_disposed()); CHECK(mock.get_received_values().empty()); } @@ -150,16 +150,16 @@ TEMPLATE_TEST_CASE("subscribe as member", "", rpp::memory_model::use_stack, rpp: SECTION("subscribe observer with disposable") { - static_assert(std::is_same_v()}, mock.get_observer())), rpp::composite_disposable_wrapper>); - auto d = observable.subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, mock.get_observer()); + static_assert(std::is_same_v); + auto d = observable.subscribe(rpp::composite_disposable_wrapper::make(), mock.get_observer()); CHECK(d.is_disposed()); CHECK(mock.get_received_values() == std::vector{1}); } SECTION("subscribe observer with disposed disposable") { - static_assert(std::is_same_v); - auto d = observable.subscribe(rpp::composite_disposable_wrapper{}, mock.get_observer()); + static_assert(std::is_same_v); + auto d = observable.subscribe(rpp::composite_disposable_wrapper::empty(), mock.get_observer()); CHECK(d.is_disposed()); CHECK(mock.get_received_values().empty()); } @@ -173,16 +173,16 @@ TEMPLATE_TEST_CASE("subscribe as member", "", rpp::memory_model::use_stack, rpp: SECTION("subscribe dynamic observer with disposable") { - static_assert(std::is_same_v()}, mock.get_observer().as_dynamic())), rpp::composite_disposable_wrapper>); - auto d = observable.subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, mock.get_observer().as_dynamic()); + static_assert(std::is_same_v); + auto d = observable.subscribe(rpp::composite_disposable_wrapper::make(), mock.get_observer().as_dynamic()); CHECK(d.is_disposed()); CHECK(mock.get_received_values() == std::vector{1}); } SECTION("subscribe dynamic observer with disposed disposable") { - static_assert(std::is_same_v); - auto d = observable.subscribe(rpp::composite_disposable_wrapper{}, mock.get_observer().as_dynamic()); + static_assert(std::is_same_v); + auto d = observable.subscribe(rpp::composite_disposable_wrapper::empty(), mock.get_observer().as_dynamic()); CHECK(d.is_disposed()); CHECK(mock.get_received_values().empty()); } @@ -196,16 +196,16 @@ TEMPLATE_TEST_CASE("subscribe as member", "", rpp::memory_model::use_stack, rpp: SECTION("subscribe lambdas with disposable") { - static_assert(std::is_same_v()}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{})), rpp::composite_disposable_wrapper>); - auto d = observable.subscribe(rpp::composite_disposable_wrapper{std::make_shared()}, [&mock](const auto& v){ mock.on_next(v);}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{}); + static_assert(std::is_same_v{}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{})), rpp::composite_disposable_wrapper>); + auto d = observable.subscribe(rpp::composite_disposable_wrapper::make(), [&mock](const auto& v){ mock.on_next(v);}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{}); CHECK(d.is_disposed()); CHECK(mock.get_received_values() == std::vector{1}); } SECTION("subscribe lambdas with disposed disposable") { - static_assert(std::is_same_v{}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{})), rpp::composite_disposable_wrapper>); - auto d = observable.subscribe(rpp::composite_disposable_wrapper{}, [&mock](const auto& v){ mock.on_next(v);}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{}); + static_assert(std::is_same_v{}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{})), rpp::composite_disposable_wrapper>); + auto d = observable.subscribe(rpp::composite_disposable_wrapper::empty(), [&mock](const auto& v){ mock.on_next(v);}, rpp::utils::empty_function_t{}, rpp::utils::empty_function_t<>{}); CHECK(d.is_disposed()); CHECK(mock.get_received_values().empty()); } diff --git a/src/tests/rpp/test_subscribe_on.cpp b/src/tests/rpp/test_subscribe_on.cpp index 67cf14639..e32e12727 100644 --- a/src/tests/rpp/test_subscribe_on.cpp +++ b/src/tests/rpp/test_subscribe_on.cpp @@ -42,7 +42,7 @@ TEST_CASE("subscribe_on schedules job in another scheduler") auto obs = rpp::source::create([&](auto&& sub) { thread_id.set_value(std::this_thread::get_id()); - sub.set_upstream(std::make_shared()); + sub.set_upstream(rpp::composite_disposable_wrapper::make()); sub.on_next(1); sub.on_completed(); }); @@ -77,7 +77,7 @@ TEST_CASE("subscribe_on schedules job in another scheduler") rpp::schedulers::current_thread::create_worker().schedule([&mock, &executed](const auto&) { - auto d = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); rpp::source::create([&](const auto&) { executed = true; @@ -86,10 +86,10 @@ TEST_CASE("subscribe_on schedules job in another scheduler") | rpp::ops::subscribe(mock.get_observer(d)); CHECK(!executed); - CHECK(!d->is_disposed()); - d->dispose(); + CHECK(!d.is_disposed()); + d.dispose(); CHECK(!executed); - CHECK(d->is_disposed()); + CHECK(d.is_disposed()); return rpp::schedulers::optional_delay_from_now{}; }, mock); @@ -98,8 +98,8 @@ TEST_CASE("subscribe_on schedules job in another scheduler") SECTION("subscribe_on and then upstream updates upstream inside observer") { - auto d = std::make_shared(); - auto second = std::make_shared(); + auto d = rpp::composite_disposable_wrapper::make(); + auto second = rpp::composite_disposable_wrapper::make(); rpp::source::create([&](auto&& obs) { obs.set_upstream(rpp::disposable_wrapper{second}); @@ -107,11 +107,11 @@ TEST_CASE("subscribe_on schedules job in another scheduler") | rpp::ops::subscribe_on(rpp::schedulers::current_thread{}) | rpp::ops::subscribe(mock.get_observer(d)); - CHECK(!d->is_disposed()); - CHECK(!second->is_disposed()); - d->dispose(); - CHECK(d->is_disposed()); - CHECK(second->is_disposed()); + CHECK(!d.is_disposed()); + CHECK(!second.is_disposed()); + d.dispose(); + CHECK(d.is_disposed()); + CHECK(second.is_disposed()); } } diff --git a/src/tests/rpp/test_take.cpp b/src/tests/rpp/test_take.cpp index ce6877983..926bd026f 100644 --- a/src/tests/rpp/test_take.cpp +++ b/src/tests/rpp/test_take.cpp @@ -23,7 +23,7 @@ TEST_CASE("take operator limits emissions") int actually_values_sent{}; auto obs = rpp::source::create([&actually_values_sent](auto&& obs) { - auto upstream = rpp::disposable_wrapper{std::make_shared()}; + auto upstream = rpp::disposable_wrapper::make(); obs.set_upstream(upstream); while(!obs.is_disposed()) diff --git a/src/tests/rpp/test_window.cpp b/src/tests/rpp/test_window.cpp index 29f278e64..837890794 100644 --- a/src/tests/rpp/test_window.cpp +++ b/src/tests/rpp/test_window.cpp @@ -181,15 +181,15 @@ TEST_CASE("window subdivide observable into sub-observables") TEST_CASE("window disposes original disposable only when everything is disposed") { - auto source_disposable = std::make_shared(); + auto source_disposable = rpp::composite_disposable_wrapper::make(); auto obs = rpp::source::create([source_disposable](auto&& obs) { obs.set_upstream(source_disposable); obs.on_next(1); }); - auto observer_disposable = std::make_shared(); - auto inner_observer_disposable = std::make_shared(); + auto observer_disposable = rpp::composite_disposable_wrapper::make(); + auto inner_observer_disposable = rpp::composite_disposable_wrapper::make(); obs | rpp::ops::window(2) | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::window_observable& new_obs) @@ -197,21 +197,21 @@ TEST_CASE("window disposes original disposable only when everything is disposed" new_obs.subscribe(rpp::composite_disposable_wrapper{inner_observer_disposable}, [](int){}); }); - CHECK(!source_disposable->is_disposed()); - CHECK(!observer_disposable->is_disposed()); - CHECK(!inner_observer_disposable->is_disposed()); + CHECK(!source_disposable.is_disposed()); + CHECK(!observer_disposable.is_disposed()); + CHECK(!inner_observer_disposable.is_disposed()); - observer_disposable->dispose(); + observer_disposable.dispose(); - CHECK(!source_disposable->is_disposed()); - CHECK(observer_disposable->is_disposed()); - CHECK(!inner_observer_disposable->is_disposed()); + CHECK(!source_disposable.is_disposed()); + CHECK(observer_disposable.is_disposed()); + CHECK(!inner_observer_disposable.is_disposed()); - inner_observer_disposable->dispose(); + inner_observer_disposable.dispose(); - CHECK(source_disposable->is_disposed()); - CHECK(observer_disposable->is_disposed()); - CHECK(inner_observer_disposable->is_disposed()); + CHECK(source_disposable.is_disposed()); + CHECK(observer_disposable.is_disposed()); + CHECK(inner_observer_disposable.is_disposed()); } TEST_CASE("window satisfies disposable contracts") diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp index 106b5dfd9..68747fe7c 100644 --- a/src/tests/rpp/test_window_toggle.cpp +++ b/src/tests/rpp/test_window_toggle.cpp @@ -188,15 +188,15 @@ TEST_CASE("window_toggle") TEST_CASE("window_toggle disposes original disposable only when everything is disposed") { - auto source_disposable = std::make_shared(); + auto source_disposable = rpp::composite_disposable_wrapper::make(); auto obs = rpp::source::create([source_disposable](auto&& obs) { obs.set_upstream(source_disposable); obs.on_next(1); }); - auto observer_disposable = std::make_shared(); - auto inner_observer_disposable = std::make_shared(); + auto observer_disposable = rpp::composite_disposable_wrapper::make(); + auto inner_observer_disposable = rpp::composite_disposable_wrapper::make(); obs | rpp::ops::window_toggle(rpp::source::just(rpp::schedulers::immediate{}, 1), [](int){return rpp::source::never(); }) | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::window_toggle_observable& new_obs) @@ -204,21 +204,21 @@ TEST_CASE("window_toggle disposes original disposable only when everything is di new_obs.subscribe(rpp::composite_disposable_wrapper{inner_observer_disposable}, [](int){}); }); - CHECK(!source_disposable->is_disposed()); - CHECK(!observer_disposable->is_disposed()); - CHECK(!inner_observer_disposable->is_disposed()); + CHECK(!source_disposable.is_disposed()); + CHECK(!observer_disposable.is_disposed()); + CHECK(!inner_observer_disposable.is_disposed()); - observer_disposable->dispose(); + observer_disposable.dispose(); - CHECK(!source_disposable->is_disposed()); - CHECK(observer_disposable->is_disposed()); - CHECK(!inner_observer_disposable->is_disposed()); + CHECK(!source_disposable.is_disposed()); + CHECK(observer_disposable.is_disposed()); + CHECK(!inner_observer_disposable.is_disposed()); - inner_observer_disposable->dispose(); + inner_observer_disposable.dispose(); - CHECK(source_disposable->is_disposed()); - CHECK(observer_disposable->is_disposed()); - CHECK(inner_observer_disposable->is_disposed()); + CHECK(source_disposable.is_disposed()); + CHECK(observer_disposable.is_disposed()); + CHECK(inner_observer_disposable.is_disposed()); } TEST_CASE("window_toggle satisfies disposable contracts") diff --git a/src/tests/rpp/test_with_lastest_from.cpp b/src/tests/rpp/test_with_lastest_from.cpp index a93f90791..3ef90199c 100644 --- a/src/tests/rpp/test_with_lastest_from.cpp +++ b/src/tests/rpp/test_with_lastest_from.cpp @@ -184,12 +184,12 @@ TEST_CASE("with_latest_from handles race condition") TEST_CASE("with_latest_from satisfies disposable contracts") { - auto observable_disposable = std::make_shared(); + auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); test_operator_with_disposable(rpp::ops::with_latest_from(observable)); } - CHECK(observable_disposable->is_disposed() || observable_disposable.use_count() == 1); + CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); } \ No newline at end of file diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index 2bf849507..1acd8be22 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -32,7 +32,7 @@ struct wrapped_observable_strategy_set_upstream auto subscribe(auto&& observer) const { - observer.set_upstream(std::make_shared()); + observer.set_upstream(rpp::composite_disposable_wrapper::make()); } }; @@ -50,16 +50,16 @@ void test_operator_over_observable_with_disposable(auto&& op) { SECTION("operator disposes disposable") { - auto observable_disposable = std::make_shared(); + auto observable_disposable = rpp::composite_disposable_wrapper::make(); { auto observable = observable_with_disposable(observable_disposable); - auto observer_disposable = std::make_shared(); - op(observable) | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [](const auto&){}); + auto observer_disposable = rpp::composite_disposable_wrapper::make(); + op(observable) | rpp::ops::subscribe(observer_disposable, [](const auto&){}); - observer_disposable->dispose(); + observer_disposable.dispose(); } - CHECK(observable_disposable->is_disposed() || observable_disposable.use_count() == 1); + CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); } SECTION("set_upstream with fixed_disposable_strategy_selector<1>") diff --git a/src/tests/utils/test_scheduler.hpp b/src/tests/utils/test_scheduler.hpp index dbec93959..37f9cfd58 100644 --- a/src/tests/utils/test_scheduler.hpp +++ b/src/tests/utils/test_scheduler.hpp @@ -66,7 +66,7 @@ class test_scheduler final class worker_strategy { public: - worker_strategy(std::weak_ptr state) + worker_strategy(rpp::disposable_wrapper_impl state) : m_state{std::move(state)} { } template Fn> @@ -84,29 +84,29 @@ class test_scheduler final static rpp::schedulers::time_point now() { return s_current_time; } - rpp::disposable_wrapper get_disposable() const { return rpp::disposable_wrapper::from_weak(m_state); } + rpp::disposable_wrapper get_disposable() const { return m_state.as_weak(); } private: - std::weak_ptr m_state; + rpp::disposable_wrapper_impl m_state; }; test_scheduler() = default; rpp::schedulers::worker create_worker() const { - return rpp::schedulers::worker{m_state}; + return rpp::schedulers::worker{m_state.as_weak()}; } - const auto& get_schedulings() const { return m_state->schedulings; } - const auto& get_executions() const { return m_state->executions; } + const auto& get_schedulings() const { return m_state.lock()->schedulings; } + const auto& get_executions() const { return m_state.lock()->executions; } static rpp::schedulers::time_point now() { return s_current_time; } void time_advance(rpp::schedulers::duration dur) const { s_current_time += dur; - m_state->drain(); + m_state.lock()->drain(); } private: - std::shared_ptr m_state = std::make_shared(); + rpp::disposable_wrapper_impl m_state = rpp::disposable_wrapper_impl::make(); };