From 5782347d7689b2e278d55afc7d5b95606e9f1690 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 11 Jan 2023 23:01:07 +0300 Subject: [PATCH 1/6] try another one --- src/rpp/rpp/observers/dynamic_observer.hpp | 95 ++++---- src/rpp/rpp/observers/state_observer.hpp | 220 ++++++++++++++---- src/rpp/rpp/operators/buffer.hpp | 5 - .../details/subscriber_with_state.hpp | 2 +- src/rpp/rpp/operators/fwd/subscribe.hpp | 10 +- src/tests/rpp/test_observer.cpp | 7 +- 6 files changed, 227 insertions(+), 112 deletions(-) diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index 211dc4fe1..d87d685b6 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -61,29 +61,53 @@ std::shared_ptr> make_dynamic_observer_state_from { return make_dynamic_observer_state...>>(std::forward(args)...); } +} // namespace rpp::details -template -class dynamic_state_observer : public details::typed_observer_tag +namespace rpp +{ +/** + * \brief Dynamic (type-erased) version of observer (comparing to specific_observer) + * \details It uses type-erasure mechanism to hide types of OnNext, OnError and OnCompleted callbacks. But it has higher cost in the terms of performance due to usage of heap. + * Use it only when you need to store observer as member variable or make copy of original subscriber. In other cases prefer using "auto" to avoid converting to dynamic_observer + * \tparam T is type of value handled by this observer + * \ingroup observers + */ +template +class dynamic_observer final : public details::typed_observer_tag { public: - template - requires (constraint::decayed_same_as && ...) - dynamic_state_observer(std::invocable auto&& on_next, - std::invocable auto&& on_error, - std::invocable auto&& on_completed, - TStates&& ... states) - : m_state{make_dynamic_observer_state_from_fns(std::forward(on_next), - std::forward(on_error), - std::forward(on_completed), - std::forward(states)...)} {} + template...> OnNext, + std::invocable...> OnError, + std::invocable...> OnCompleted> + dynamic_observer(OnNext&& on_next, + OnError&& on_error, + OnCompleted&& on_completed, + States&& ... states) + : m_state{details::make_dynamic_observer_state_from_fns(std::forward(on_next), + std::forward(on_error), + std::forward(on_completed), + std::forward(states)...)} {} + + template OnNext = utils::empty_function_t, + constraint::on_error_fn OnError = utils::rethrow_error_t, + constraint::on_completed_fn OnCompleted = utils::empty_function_t<>> + dynamic_observer(OnNext&& on_next = {}, OnError&& on_error = {}, OnCompleted&& on_completed = {}) + : m_state{details::make_dynamic_observer_state_from_fns(std::forward(on_next), + std::forward(on_error), + std::forward(on_completed))} {} - dynamic_state_observer(std::shared_ptr> state) - : m_state{ std::move(state) } {} + dynamic_observer(constraint::on_next_fn auto&& on_next, constraint::on_completed_fn auto&& on_completed) + : dynamic_observer{std::forward(on_next), + utils::rethrow_error_t{}, + std::forward(on_completed)} {} + dynamic_observer(std::shared_ptr> state) + : m_state{std::move(state)} {} template TObserver> - requires (!std::is_same_v, dynamic_state_observer>) - dynamic_state_observer(TObserver&& obs) + requires (!std::is_same_v, dynamic_observer>) + dynamic_observer(TObserver&& obs) : m_state{details::make_dynamic_observer_state>(std::forward(obs))} {} @@ -126,46 +150,13 @@ class dynamic_state_observer : public details::typed_observer_tag m_state->on_completed(); } -private: - std::shared_ptr> m_state{}; -}; -} // namespace rpp::details - -namespace rpp -{ -/** - * \brief Dynamic (type-erased) version of observer (comparing to specific_observer) - * \details It uses type-erasure mechanism to hide types of OnNext, OnError and OnCompleted callbacks. But it has higher cost in the terms of performance due to usage of heap. - * Use it only when you need to store observer as member variable or make copy of original subscriber. In other cases prefer using "auto" to avoid converting to dynamic_observer - * \tparam T is type of value handled by this observer - * \ingroup observers - */ -template -class dynamic_observer final : public details::dynamic_state_observer -{ -public: - template OnNext = utils::empty_function_t, - constraint::on_error_fn OnError = utils::rethrow_error_t, - constraint::on_completed_fn OnCompleted = utils::empty_function_t<>> - dynamic_observer(OnNext&& on_next = {}, OnError&& on_error = {}, OnCompleted&& on_completed = {}) - : details::dynamic_state_observer{std::forward(on_next), - std::forward(on_error), - std::forward(on_completed)} {} - - dynamic_observer(constraint::on_next_fn auto&& on_next, constraint::on_completed_fn auto&& on_completed) - : details::dynamic_state_observer{std::forward(on_next), - utils::rethrow_error_t{}, - std::forward(on_completed)} {} - - template TObserver> - requires (!std::is_same_v, dynamic_observer>) - dynamic_observer(TObserver&& obs) - : details::dynamic_state_observer{std::forward(obs)} {} - /** * \brief Do nothing for rpp::dynamic_observer. Created only for unification of interfaces with rpp::specific_observer */ const dynamic_observer& as_dynamic() const { return *this; } + +private: + std::shared_ptr> m_state{}; }; template diff --git a/src/rpp/rpp/observers/state_observer.hpp b/src/rpp/rpp/observers/state_observer.hpp index 77ec98fb0..7204a7d0d 100644 --- a/src/rpp/rpp/observers/state_observer.hpp +++ b/src/rpp/rpp/observers/state_observer.hpp @@ -16,12 +16,24 @@ #include #include +#include + namespace rpp::details { -/** - * \brief Special type of specific_observer which has some state which this observer stores and pass to each callback. Actually it is base class for all observers. - */ +template +struct type_erased_observer_interface +{ + virtual ~type_erased_observer_interface() = default; + + virtual void on_next(const T& v) const = 0; + virtual void on_next(T&& v) const = 0; + virtual void on_error(const std::exception_ptr& err) const = 0; + virtual void on_completed() const = 0; + virtual void copy_to(void*) const = 0; + virtual void move_to(void*) = 0; +}; + template && std::invocable && std::invocable) -class state_observer : public details::typed_observer_tag +class type_erased_observer final : public type_erased_observer_interface { public: template requires (constraint::decayed_same_as && ...) - state_observer(std::invocable auto&& on_next, - std::invocable auto&& on_error, - std::invocable auto&& on_completed, - TStates&& ...states) - : m_on_next{std::forward(on_next)} - , m_on_err{std::forward(on_error)} - , m_on_completed{std::forward(on_completed)} - , m_state{std::forward(states)...} {} - - /** - * \brief Observable calls this methods to notify observer about new value. - * - * \note obtains value by const-reference to original object. - */ + type_erased_observer(std::invocable auto&& on_next, + std::invocable auto&& on_error, + std::invocable auto&& on_completed, + TStates&& ... states) + : m_data{std::forward(on_next), + std::forward(on_error), + std::forward(on_completed), + std::forward(states)...} {} + + void on_next(const T& v) const override + { + std::apply([&v, this](const States& ...states) { m_data.on_next(v, states...); }, m_data.state); + } + + void on_next(T&& v) const override + { + std::apply([&v, this](const States& ...states) { m_data.on_next(std::move(v), states...); }, m_data.state); + } + + void on_error(const std::exception_ptr& err) const override + { + std::apply([&err, this](const States& ...states) { m_data.on_err(err, states...); }, m_data.state); + } + + void on_completed() const override + { + std::apply(m_data.on_completed, m_data.state); + } + + void copy_to(void* ptr) const override + { + std::construct_at(static_cast(ptr), *this); + } + + void move_to(void* ptr) override + { + std::construct_at(static_cast(ptr), std::move(*this)); + } + +private: + // struct needed due to [[no_unique_address]] can conflicts with vtbl + struct data + { + data(auto&& on_next, auto&& on_err, auto&& on_completed, auto&& ...state) + : on_next{std::forward(on_next)} + , on_err{std::forward(on_err)} + , on_completed{std::forward(on_completed)} + , state{std::forward(state)...} {} + + RPP_NO_UNIQUE_ADDRESS OnNext on_next; + RPP_NO_UNIQUE_ADDRESS OnError on_err; + RPP_NO_UNIQUE_ADDRESS OnCompleted on_completed; + + RPP_NO_UNIQUE_ADDRESS std::tuple state; + }; + + data m_data{}; +}; + +template +class state_observer_base : public typed_observer_tag +{ +public: + template...> OnNext, + std::invocable...> OnError, + std::invocable...> OnCompleted> + requires(sizeof(type_erased_observer, + std::decay_t, + std::decay_t, + std::decay_t...>) == StateSize && + alignof(type_erased_observer, + std::decay_t, + std::decay_t, + std::decay_t...>) == StateAlignment) + state_observer_base(OnNext&& on_next, + OnError&& on_error, + OnCompleted&& on_completed, + States&& ... states) + { + std::construct_at(reinterpret_cast, + std::decay_t, + std::decay_t, + std::decay_t...>*>(m_state), + std::forward(on_next), + std::forward(on_error), + std::forward(on_completed), + std::forward(states)...); + } + + state_observer_base(const state_observer_base& other) + { + other.cast()->copy_to(m_state); + } + + state_observer_base(state_observer_base&& other) noexcept + { + other.cast()->move_to(m_state); + } + + state_observer_base& operator=(const state_observer_base& other) + { + if (this == &other) + return *this; + std::destroy_at(cast()); + other.cast()->copy_to(m_state); + return *this; + } + + state_observer_base& operator=(state_observer_base&& other) noexcept + { + if (this == &other) + return *this; + + std::destroy_at(cast()); + other.cast()->move_to(m_state); + + return *this; + } + + ~state_observer_base() noexcept + { + std::destroy_at(cast()); + } + void on_next(const T& v) const { - std::apply([&v, this](const States& ...states) { m_on_next(v, states...); }, m_state); + cast()->on_next(v); } - /** - * \brief Observable calls this methods to notify observer about new value. - * - * \note obtains value by rvalue-reference to original object - */ void on_next(T&& v) const { - std::apply([&v, this](const States& ...states) { m_on_next(std::move(v), states...); }, m_state); + cast()->on_next(std::move(v)); } - /** - * \brief Observable calls this method to notify observer about some error during generation next data. - * \warning Obtaining this call means no any further on_next or on_completed calls - * \param err details of error - */ void on_error(const std::exception_ptr& err) const { - std::apply([&err, this](const States& ...states) { m_on_err(err, states...); }, m_state); + cast()->on_error(err); } - /** - * \brief Observable calls this method to notify observer about finish of work. - * \warning Obtaining this call means no any further on_next calls - */ void on_completed() const { - std::apply(m_on_completed, m_state); + cast()->on_completed(); } private: - RPP_NO_UNIQUE_ADDRESS OnNext m_on_next; - RPP_NO_UNIQUE_ADDRESS OnError m_on_err; - RPP_NO_UNIQUE_ADDRESS OnCompleted m_on_completed; + type_erased_observer_interface* cast() + { + return reinterpret_cast*>(m_state); + } + + const type_erased_observer_interface* cast() const + { + return reinterpret_cast*>(m_state); + } - RPP_NO_UNIQUE_ADDRESS std::tuple m_state; +private: + alignas(StateAlignment) std::byte m_state[StateSize]{}; }; -template -state_observer(TOnNext, Args...)->state_observer>, TOnNext, Args...>; -} // namespace rpp::details \ No newline at end of file +template +using state_observer_with_state = state_observer_base; + +template + requires (std::invocable && + std::invocable && + std::invocable) +using state_observer = state_observer_with_state>; + +template +state_observer_base(OnNext, OnError, OnCompleted, States...) -> state_observer>, OnNext, OnError, OnCompleted, States...>; + + +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index 77d13297e..01f8b8c66 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -37,11 +37,6 @@ struct buffer_state clear_and_reserve_buckets(); } - buffer_state(const buffer_state& other) = delete; - buffer_state(buffer_state&&) noexcept = default; - buffer_state& operator=(const buffer_state&) = delete; - buffer_state& operator=(buffer_state&&) noexcept = default; - void clear_and_reserve_buckets() const { buckets.clear(); diff --git a/src/rpp/rpp/operators/details/subscriber_with_state.hpp b/src/rpp/rpp/operators/details/subscriber_with_state.hpp index cba345817..5596449ca 100644 --- a/src/rpp/rpp/operators/details/subscriber_with_state.hpp +++ b/src/rpp/rpp/operators/details/subscriber_with_state.hpp @@ -50,7 +50,7 @@ auto create_subscriber_with_dynamic_state(composite_subscription sub, OnCompleted&& on_completed, States&&... states) { - using TObs = dynamic_state_observer...>; + using TObs = dynamic_observer; return make_specific_subscriber(std::move(sub), std::forward(on_next), std::forward(on_error), diff --git a/src/rpp/rpp/operators/fwd/subscribe.hpp b/src/rpp/rpp/operators/fwd/subscribe.hpp index bf09c5866..8bde0d340 100644 --- a/src/rpp/rpp/operators/fwd/subscribe.hpp +++ b/src/rpp/rpp/operators/fwd/subscribe.hpp @@ -20,9 +20,9 @@ struct member_overload * \return subscription on this observable which can be used to unsubscribe */ template TSub> - auto subscribe(TSub&& subscriber) const + auto subscribe(const TSub& subscriber) const { - return subscribe_impl(std::forward(subscriber)); + return subscribe_impl(subscriber); } /** @@ -102,11 +102,5 @@ struct member_overload { return static_cast(this)->subscribe_impl(subscriber); } - - template Obs> - auto subscribe_impl(specific_subscriber&& subscriber) const - { - return static_cast(this)->subscribe_impl(std::move(subscriber)); - } }; } // namespace rpp::details diff --git a/src/tests/rpp/test_observer.cpp b/src/tests/rpp/test_observer.cpp index cd4517182..590aebb36 100644 --- a/src/tests/rpp/test_observer.cpp +++ b/src/tests/rpp/test_observer.cpp @@ -140,7 +140,7 @@ SCENARIO("State observer copy-count for state", "[observer]") auto state = copy_count_tracker{}; auto make_observer = [](auto&& state) { - auto observer = rpp::details::state_observer{[](int, const copy_count_tracker&) {}, + auto observer = rpp::details::state_observer_base{[](int, const copy_count_tracker&) {}, [](const std::exception_ptr&, const copy_count_tracker&) {}, [](const copy_count_tracker&) {}, std::forward(state)}; @@ -180,7 +180,7 @@ SCENARIO("State proxy calls to subscriber", "[observer]") GIVEN("state_observer") { - auto state_observer = rpp::details::state_observer{[](int v, rpp::dynamic_subscriber sub) + auto state_observer = rpp::details::state_observer_base{[](int v, rpp::dynamic_subscriber sub) { sub.on_next(v); }, @@ -214,7 +214,8 @@ TEST_CASE("observer size should be equal to size of callbacks", "[observer]") { auto empty_observer = rpp::specific_observer{}; - CHECK(sizeof(empty_observer) == 1); + // 2x vtable + CHECK(sizeof(empty_observer) == 2*sizeof(void*)); } SECTION("dynamic_observer") From 014ed60b235137c9e2dcefd75ee949d5a3702ab3 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 11 Jan 2023 23:07:31 +0300 Subject: [PATCH 2/6] try to fix --- src/rpp/rpp/observers/state_observer.hpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rpp/rpp/observers/state_observer.hpp b/src/rpp/rpp/observers/state_observer.hpp index 7204a7d0d..0cfe542b8 100644 --- a/src/rpp/rpp/observers/state_observer.hpp +++ b/src/rpp/rpp/observers/state_observer.hpp @@ -212,10 +212,10 @@ template using state_observer_with_state = state_observer_base; template + typename OnNext, + typename OnError, + typename OnCompleted, + constraint::decayed_type ...States> requires (std::invocable && std::invocable && std::invocable) From db930520d29dab5d5a65a22a83bbb210da2158b6 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 11 Jan 2023 23:56:03 +0300 Subject: [PATCH 3/6] compile --- src/rpp/rpp/observers/dynamic_observer.hpp | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index d87d685b6..afdb584e2 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -55,12 +55,6 @@ std::shared_ptr> make_dynamic_observer_state(Args { return std::make_shared>>(std::forward(args)...); } - -template -std::shared_ptr> make_dynamic_observer_state_from_fns(Args&& ...args) -{ - return make_dynamic_observer_state...>>(std::forward(args)...); -} } // namespace rpp::details namespace rpp @@ -84,7 +78,13 @@ class dynamic_observer final : public details::typed_observer_tag OnError&& on_error, OnCompleted&& on_completed, States&& ... states) - : m_state{details::make_dynamic_observer_state_from_fns(std::forward(on_next), + : m_state{details::make_dynamic_observer_state, + std::decay_t, + std::decay_t, + std::decay_t...>>( + std::forward(on_next), std::forward(on_error), std::forward(on_completed), std::forward(states)...)} {} @@ -93,7 +93,11 @@ class dynamic_observer final : public details::typed_observer_tag constraint::on_error_fn OnError = utils::rethrow_error_t, constraint::on_completed_fn OnCompleted = utils::empty_function_t<>> dynamic_observer(OnNext&& on_next = {}, OnError&& on_error = {}, OnCompleted&& on_completed = {}) - : m_state{details::make_dynamic_observer_state_from_fns(std::forward(on_next), + : m_state{details::make_dynamic_observer_state, + std::decay_t, + std::decay_t>(std::forward(on_next), std::forward(on_error), std::forward(on_completed))} {} From a5a1f8971c3de601867c3cdcaaea66a7a7a93c4e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 12 Jan 2023 12:38:57 +0300 Subject: [PATCH 4/6] Update dynamic_observer.hpp --- src/rpp/rpp/observers/dynamic_observer.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index afdb584e2..cbfee1cf5 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -97,7 +97,8 @@ class dynamic_observer final : public details::typed_observer_tag details::state_observer, std::decay_t, - std::decay_t>(std::forward(on_next), + std::decay_t>>( + std::forward(on_next), std::forward(on_error), std::forward(on_completed))} {} From 9132f3eb3b74b881bac5b519f8c6cc517b10b533 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 12 Jan 2023 14:08:57 +0300 Subject: [PATCH 5/6] Update state_observer.hpp --- src/rpp/rpp/observers/state_observer.hpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/rpp/rpp/observers/state_observer.hpp b/src/rpp/rpp/observers/state_observer.hpp index 0cfe542b8..ab7d00f02 100644 --- a/src/rpp/rpp/observers/state_observer.hpp +++ b/src/rpp/rpp/observers/state_observer.hpp @@ -221,11 +221,10 @@ template) using state_observer = state_observer_with_state>; -template -state_observer_base(OnNext, OnError, OnCompleted, States...) -> state_observer>, OnNext, OnError, OnCompleted, States...>; +template +state_observer_base(OnNext, Args...) -> state_observer_base>, + sizeof(type_erased_observer>, OnNext, Args...>), + alignof(type_erased_observer>, OnNext, Args...>)>; } // namespace rpp::details From 2f354a9613958fd06ff85d586cb06d138b5eb39d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 18 Jan 2023 22:46:21 +0300 Subject: [PATCH 6/6] temp --- src/rpp/rpp/observers/specific_observer.hpp | 4 +++- src/rpp/rpp/observers/state_observer.hpp | 11 +++++------ src/tests/rpp/test_observer.cpp | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/rpp/rpp/observers/specific_observer.hpp b/src/rpp/rpp/observers/specific_observer.hpp index fadcaa130..1140063a4 100644 --- a/src/rpp/rpp/observers/specific_observer.hpp +++ b/src/rpp/rpp/observers/specific_observer.hpp @@ -35,7 +35,6 @@ class specific_observer : public details::state_observer; - using base::base; public: template TOnNext = utils::empty_function_t, constraint::on_error_fn TOnError = utils::rethrow_error_t, @@ -50,6 +49,9 @@ class specific_observer : public details::state_observer(on_completed)} {} + specific_observer(const specific_observer&) = default; + specific_observer(specific_observer&&) noexcept = default; + /** * \brief Converting current rpp::specific_observer to rpp::dynamic_observer alternative with erasing of type (and using heap) * \return converted rpp::dynamic_observer diff --git a/src/rpp/rpp/observers/state_observer.hpp b/src/rpp/rpp/observers/state_observer.hpp index ab7d00f02..dff338746 100644 --- a/src/rpp/rpp/observers/state_observer.hpp +++ b/src/rpp/rpp/observers/state_observer.hpp @@ -219,12 +219,11 @@ template && std::invocable && std::invocable) -using state_observer = state_observer_with_state>; +class state_observer : public state_observer_with_state> +{ + using state_observer_with_state>::state_observer_with_state; +}; template -state_observer_base(OnNext, Args...) -> state_observer_base>, - sizeof(type_erased_observer>, OnNext, Args...>), - alignof(type_erased_observer>, OnNext, Args...>)>; - - +state_observer(OnNext, Args...) -> state_observer>, OnNext, Args...>; } // namespace rpp::details diff --git a/src/tests/rpp/test_observer.cpp b/src/tests/rpp/test_observer.cpp index 590aebb36..925320b8d 100644 --- a/src/tests/rpp/test_observer.cpp +++ b/src/tests/rpp/test_observer.cpp @@ -140,7 +140,7 @@ SCENARIO("State observer copy-count for state", "[observer]") auto state = copy_count_tracker{}; auto make_observer = [](auto&& state) { - auto observer = rpp::details::state_observer_base{[](int, const copy_count_tracker&) {}, + auto observer = rpp::details::state_observer{[](int, const copy_count_tracker&) {}, [](const std::exception_ptr&, const copy_count_tracker&) {}, [](const copy_count_tracker&) {}, std::forward(state)}; @@ -180,7 +180,7 @@ SCENARIO("State proxy calls to subscriber", "[observer]") GIVEN("state_observer") { - auto state_observer = rpp::details::state_observer_base{[](int v, rpp::dynamic_subscriber sub) + auto state_observer = rpp::details::state_observer{[](int v, rpp::dynamic_subscriber sub) { sub.on_next(v); },