diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index 211dc4fe1..cbfee1cf5 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -55,35 +55,64 @@ std::shared_ptr> make_dynamic_observer_state(Args { return std::make_shared>>(std::forward(args)...); } +} // namespace rpp::details -template -std::shared_ptr> make_dynamic_observer_state_from_fns(Args&& ...args) +namespace rpp { - return make_dynamic_observer_state...>>(std::forward(args)...); -} - -template -class dynamic_state_observer : public details::typed_observer_tag +/** + * \brief Dynamic (type-erased) version of observer (comparing to specific_observer) + * \details It uses type-erasure mechanism to hide types of OnNext, OnError and OnCompleted callbacks. But it has higher cost in the terms of performance due to usage of heap. + * Use it only when you need to store observer as member variable or make copy of original subscriber. In other cases prefer using "auto" to avoid converting to dynamic_observer + * \tparam T is type of value handled by this observer + * \ingroup observers + */ +template +class dynamic_observer final : public details::typed_observer_tag { public: - template - requires (constraint::decayed_same_as && ...) - dynamic_state_observer(std::invocable auto&& on_next, - std::invocable auto&& on_error, - std::invocable auto&& on_completed, - TStates&& ... states) - : m_state{make_dynamic_observer_state_from_fns(std::forward(on_next), - std::forward(on_error), - std::forward(on_completed), - std::forward(states)...)} {} + template...> OnNext, + std::invocable...> OnError, + std::invocable...> OnCompleted> + dynamic_observer(OnNext&& on_next, + OnError&& on_error, + OnCompleted&& on_completed, + States&& ... states) + : m_state{details::make_dynamic_observer_state, + std::decay_t, + std::decay_t, + std::decay_t...>>( + std::forward(on_next), + std::forward(on_error), + std::forward(on_completed), + std::forward(states)...)} {} + + template OnNext = utils::empty_function_t, + constraint::on_error_fn OnError = utils::rethrow_error_t, + constraint::on_completed_fn OnCompleted = utils::empty_function_t<>> + dynamic_observer(OnNext&& on_next = {}, OnError&& on_error = {}, OnCompleted&& on_completed = {}) + : m_state{details::make_dynamic_observer_state, + std::decay_t, + std::decay_t>>( + std::forward(on_next), + std::forward(on_error), + std::forward(on_completed))} {} - dynamic_state_observer(std::shared_ptr> state) - : m_state{ std::move(state) } {} + dynamic_observer(constraint::on_next_fn auto&& on_next, constraint::on_completed_fn auto&& on_completed) + : dynamic_observer{std::forward(on_next), + utils::rethrow_error_t{}, + std::forward(on_completed)} {} + dynamic_observer(std::shared_ptr> state) + : m_state{std::move(state)} {} template TObserver> - requires (!std::is_same_v, dynamic_state_observer>) - dynamic_state_observer(TObserver&& obs) + requires (!std::is_same_v, dynamic_observer>) + dynamic_observer(TObserver&& obs) : m_state{details::make_dynamic_observer_state>(std::forward(obs))} {} @@ -126,46 +155,13 @@ class dynamic_state_observer : public details::typed_observer_tag m_state->on_completed(); } -private: - std::shared_ptr> m_state{}; -}; -} // namespace rpp::details - -namespace rpp -{ -/** - * \brief Dynamic (type-erased) version of observer (comparing to specific_observer) - * \details It uses type-erasure mechanism to hide types of OnNext, OnError and OnCompleted callbacks. But it has higher cost in the terms of performance due to usage of heap. - * Use it only when you need to store observer as member variable or make copy of original subscriber. In other cases prefer using "auto" to avoid converting to dynamic_observer - * \tparam T is type of value handled by this observer - * \ingroup observers - */ -template -class dynamic_observer final : public details::dynamic_state_observer -{ -public: - template OnNext = utils::empty_function_t, - constraint::on_error_fn OnError = utils::rethrow_error_t, - constraint::on_completed_fn OnCompleted = utils::empty_function_t<>> - dynamic_observer(OnNext&& on_next = {}, OnError&& on_error = {}, OnCompleted&& on_completed = {}) - : details::dynamic_state_observer{std::forward(on_next), - std::forward(on_error), - std::forward(on_completed)} {} - - dynamic_observer(constraint::on_next_fn auto&& on_next, constraint::on_completed_fn auto&& on_completed) - : details::dynamic_state_observer{std::forward(on_next), - utils::rethrow_error_t{}, - std::forward(on_completed)} {} - - template TObserver> - requires (!std::is_same_v, dynamic_observer>) - dynamic_observer(TObserver&& obs) - : details::dynamic_state_observer{std::forward(obs)} {} - /** * \brief Do nothing for rpp::dynamic_observer. Created only for unification of interfaces with rpp::specific_observer */ const dynamic_observer& as_dynamic() const { return *this; } + +private: + std::shared_ptr> m_state{}; }; template diff --git a/src/rpp/rpp/observers/specific_observer.hpp b/src/rpp/rpp/observers/specific_observer.hpp index fadcaa130..1140063a4 100644 --- a/src/rpp/rpp/observers/specific_observer.hpp +++ b/src/rpp/rpp/observers/specific_observer.hpp @@ -35,7 +35,6 @@ class specific_observer : public details::state_observer; - using base::base; public: template TOnNext = utils::empty_function_t, constraint::on_error_fn TOnError = utils::rethrow_error_t, @@ -50,6 +49,9 @@ class specific_observer : public details::state_observer(on_completed)} {} + specific_observer(const specific_observer&) = default; + specific_observer(specific_observer&&) noexcept = default; + /** * \brief Converting current rpp::specific_observer to rpp::dynamic_observer alternative with erasing of type (and using heap) * \return converted rpp::dynamic_observer diff --git a/src/rpp/rpp/observers/state_observer.hpp b/src/rpp/rpp/observers/state_observer.hpp index 77ec98fb0..dff338746 100644 --- a/src/rpp/rpp/observers/state_observer.hpp +++ b/src/rpp/rpp/observers/state_observer.hpp @@ -16,12 +16,24 @@ #include #include +#include + namespace rpp::details { -/** - * \brief Special type of specific_observer which has some state which this observer stores and pass to each callback. Actually it is base class for all observers. - */ +template +struct type_erased_observer_interface +{ + virtual ~type_erased_observer_interface() = default; + + virtual void on_next(const T& v) const = 0; + virtual void on_next(T&& v) const = 0; + virtual void on_error(const std::exception_ptr& err) const = 0; + virtual void on_completed() const = 0; + virtual void copy_to(void*) const = 0; + virtual void move_to(void*) = 0; +}; + template && std::invocable && std::invocable) -class state_observer : public details::typed_observer_tag +class type_erased_observer final : public type_erased_observer_interface { public: template requires (constraint::decayed_same_as && ...) - state_observer(std::invocable auto&& on_next, - std::invocable auto&& on_error, - std::invocable auto&& on_completed, - TStates&& ...states) - : m_on_next{std::forward(on_next)} - , m_on_err{std::forward(on_error)} - , m_on_completed{std::forward(on_completed)} - , m_state{std::forward(states)...} {} - - /** - * \brief Observable calls this methods to notify observer about new value. - * - * \note obtains value by const-reference to original object. - */ + type_erased_observer(std::invocable auto&& on_next, + std::invocable auto&& on_error, + std::invocable auto&& on_completed, + TStates&& ... states) + : m_data{std::forward(on_next), + std::forward(on_error), + std::forward(on_completed), + std::forward(states)...} {} + + void on_next(const T& v) const override + { + std::apply([&v, this](const States& ...states) { m_data.on_next(v, states...); }, m_data.state); + } + + void on_next(T&& v) const override + { + std::apply([&v, this](const States& ...states) { m_data.on_next(std::move(v), states...); }, m_data.state); + } + + void on_error(const std::exception_ptr& err) const override + { + std::apply([&err, this](const States& ...states) { m_data.on_err(err, states...); }, m_data.state); + } + + void on_completed() const override + { + std::apply(m_data.on_completed, m_data.state); + } + + void copy_to(void* ptr) const override + { + std::construct_at(static_cast(ptr), *this); + } + + void move_to(void* ptr) override + { + std::construct_at(static_cast(ptr), std::move(*this)); + } + +private: + // struct needed due to [[no_unique_address]] can conflicts with vtbl + struct data + { + data(auto&& on_next, auto&& on_err, auto&& on_completed, auto&& ...state) + : on_next{std::forward(on_next)} + , on_err{std::forward(on_err)} + , on_completed{std::forward(on_completed)} + , state{std::forward(state)...} {} + + RPP_NO_UNIQUE_ADDRESS OnNext on_next; + RPP_NO_UNIQUE_ADDRESS OnError on_err; + RPP_NO_UNIQUE_ADDRESS OnCompleted on_completed; + + RPP_NO_UNIQUE_ADDRESS std::tuple state; + }; + + data m_data{}; +}; + +template +class state_observer_base : public typed_observer_tag +{ +public: + template...> OnNext, + std::invocable...> OnError, + std::invocable...> OnCompleted> + requires(sizeof(type_erased_observer, + std::decay_t, + std::decay_t, + std::decay_t...>) == StateSize && + alignof(type_erased_observer, + std::decay_t, + std::decay_t, + std::decay_t...>) == StateAlignment) + state_observer_base(OnNext&& on_next, + OnError&& on_error, + OnCompleted&& on_completed, + States&& ... states) + { + std::construct_at(reinterpret_cast, + std::decay_t, + std::decay_t, + std::decay_t...>*>(m_state), + std::forward(on_next), + std::forward(on_error), + std::forward(on_completed), + std::forward(states)...); + } + + state_observer_base(const state_observer_base& other) + { + other.cast()->copy_to(m_state); + } + + state_observer_base(state_observer_base&& other) noexcept + { + other.cast()->move_to(m_state); + } + + state_observer_base& operator=(const state_observer_base& other) + { + if (this == &other) + return *this; + std::destroy_at(cast()); + other.cast()->copy_to(m_state); + return *this; + } + + state_observer_base& operator=(state_observer_base&& other) noexcept + { + if (this == &other) + return *this; + + std::destroy_at(cast()); + other.cast()->move_to(m_state); + + return *this; + } + + ~state_observer_base() noexcept + { + std::destroy_at(cast()); + } + void on_next(const T& v) const { - std::apply([&v, this](const States& ...states) { m_on_next(v, states...); }, m_state); + cast()->on_next(v); } - /** - * \brief Observable calls this methods to notify observer about new value. - * - * \note obtains value by rvalue-reference to original object - */ void on_next(T&& v) const { - std::apply([&v, this](const States& ...states) { m_on_next(std::move(v), states...); }, m_state); + cast()->on_next(std::move(v)); } - /** - * \brief Observable calls this method to notify observer about some error during generation next data. - * \warning Obtaining this call means no any further on_next or on_completed calls - * \param err details of error - */ void on_error(const std::exception_ptr& err) const { - std::apply([&err, this](const States& ...states) { m_on_err(err, states...); }, m_state); + cast()->on_error(err); } - /** - * \brief Observable calls this method to notify observer about finish of work. - * \warning Obtaining this call means no any further on_next calls - */ void on_completed() const { - std::apply(m_on_completed, m_state); + cast()->on_completed(); } private: - RPP_NO_UNIQUE_ADDRESS OnNext m_on_next; - RPP_NO_UNIQUE_ADDRESS OnError m_on_err; - RPP_NO_UNIQUE_ADDRESS OnCompleted m_on_completed; + type_erased_observer_interface* cast() + { + return reinterpret_cast*>(m_state); + } + + const type_erased_observer_interface* cast() const + { + return reinterpret_cast*>(m_state); + } - RPP_NO_UNIQUE_ADDRESS std::tuple m_state; +private: + alignas(StateAlignment) std::byte m_state[StateSize]{}; }; -template -state_observer(TOnNext, Args...)->state_observer>, TOnNext, Args...>; -} // namespace rpp::details \ No newline at end of file +template +using state_observer_with_state = state_observer_base; + +template + requires (std::invocable && + std::invocable && + std::invocable) +class state_observer : public state_observer_with_state> +{ + using state_observer_with_state>::state_observer_with_state; +}; + +template +state_observer(OnNext, Args...) -> state_observer>, OnNext, Args...>; +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index 77d13297e..01f8b8c66 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -37,11 +37,6 @@ struct buffer_state clear_and_reserve_buckets(); } - buffer_state(const buffer_state& other) = delete; - buffer_state(buffer_state&&) noexcept = default; - buffer_state& operator=(const buffer_state&) = delete; - buffer_state& operator=(buffer_state&&) noexcept = default; - void clear_and_reserve_buckets() const { buckets.clear(); diff --git a/src/rpp/rpp/operators/details/subscriber_with_state.hpp b/src/rpp/rpp/operators/details/subscriber_with_state.hpp index cba345817..5596449ca 100644 --- a/src/rpp/rpp/operators/details/subscriber_with_state.hpp +++ b/src/rpp/rpp/operators/details/subscriber_with_state.hpp @@ -50,7 +50,7 @@ auto create_subscriber_with_dynamic_state(composite_subscription sub, OnCompleted&& on_completed, States&&... states) { - using TObs = dynamic_state_observer...>; + using TObs = dynamic_observer; return make_specific_subscriber(std::move(sub), std::forward(on_next), std::forward(on_error), diff --git a/src/rpp/rpp/operators/fwd/subscribe.hpp b/src/rpp/rpp/operators/fwd/subscribe.hpp index bf09c5866..8bde0d340 100644 --- a/src/rpp/rpp/operators/fwd/subscribe.hpp +++ b/src/rpp/rpp/operators/fwd/subscribe.hpp @@ -20,9 +20,9 @@ struct member_overload * \return subscription on this observable which can be used to unsubscribe */ template TSub> - auto subscribe(TSub&& subscriber) const + auto subscribe(const TSub& subscriber) const { - return subscribe_impl(std::forward(subscriber)); + return subscribe_impl(subscriber); } /** @@ -102,11 +102,5 @@ struct member_overload { return static_cast(this)->subscribe_impl(subscriber); } - - template Obs> - auto subscribe_impl(specific_subscriber&& subscriber) const - { - return static_cast(this)->subscribe_impl(std::move(subscriber)); - } }; } // namespace rpp::details diff --git a/src/tests/rpp/test_observer.cpp b/src/tests/rpp/test_observer.cpp index cd4517182..925320b8d 100644 --- a/src/tests/rpp/test_observer.cpp +++ b/src/tests/rpp/test_observer.cpp @@ -214,7 +214,8 @@ TEST_CASE("observer size should be equal to size of callbacks", "[observer]") { auto empty_observer = rpp::specific_observer{}; - CHECK(sizeof(empty_observer) == 1); + // 2x vtable + CHECK(sizeof(empty_observer) == 2*sizeof(void*)); } SECTION("dynamic_observer")