From cbfc8a05d81686f4e236da87fc74c172d6c9104d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 12 Dec 2022 22:17:35 +0300 Subject: [PATCH 1/3] Use shared_ptr for buffer --- src/rpp/rpp/operators/buffer.hpp | 48 +++++++++++++++----------------- src/rpp/rpp/utils/functors.hpp | 32 ++++++++++----------- 2 files changed, 38 insertions(+), 42 deletions(-) diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index 77d13297e..7261779e2 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -26,21 +26,19 @@ namespace rpp::details { /// A non-copyable class that provides a copyable on_next for the subscriber and /// allows copies of on_next(s) to share the same states. -template +template Subscriber> struct buffer_state { - /// \param count Number of items being bundled. Note when count == 0, we'll - /// treat the behavior like when count == 1. - explicit buffer_state(size_t count) + template TSub> + explicit buffer_state(size_t count, TSub&& sub) : max(std::max(size_t{1}, count)) + , subscriber{std::forward(sub)} { clear_and_reserve_buckets(); } buffer_state(const buffer_state& other) = delete; - buffer_state(buffer_state&&) noexcept = default; - buffer_state& operator=(const buffer_state&) = delete; - buffer_state& operator=(buffer_state&&) noexcept = default; + buffer_state(buffer_state&&) noexcept = delete; void clear_and_reserve_buckets() const { @@ -48,32 +46,33 @@ struct buffer_state buckets.reserve(max); } - const size_t max; - mutable buffer_bundle_type buckets; + const size_t max; + buffer_bundle_type buckets; + Subscriber subscriber; }; struct buffer_on_next { - template - void operator()(auto&& value, const auto& subscriber, const buffer_state& state) const + template Subscriber> + void operator()(T&& value, const std::shared_ptr>& state) const { - state.buckets.push_back(std::forward(value)); + state->buckets.push_back(std::forward(value)); if (state.buckets.size() == state.max) { - subscriber.on_next(std::move(state.buckets)); - state.clear_and_reserve_buckets(); + state->subscriber.on_next(std::move(state.buckets)); + state->clear_and_reserve_buckets(); } } }; struct buffer_on_completed { - template - void operator()(const auto& subscriber, const buffer_state& state) const + template Subscriber> + void operator()(const buffer_state& state) const { if (!state.buckets.empty()) - subscriber.on_next(std::move(state.buckets)); - subscriber.on_completed(); + state->subscriber.on_next(std::move(state.buckets)); + state->subscriber.on_completed(); } }; @@ -86,14 +85,13 @@ struct buffer_impl auto operator()(TSub&& subscriber) const { auto subscription = subscriber.get_subscription(); + auto state = std::make_shared>>(count, std::forward(subscriber)); - // dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state - return create_subscriber_with_dynamic_state(std::move(subscription), - buffer_on_next{}, - utils::forwarding_on_error{}, - buffer_on_completed{}, - std::forward(subscriber), - buffer_state{count}); + return create_subscriber_with_state(std::move(subscription), + buffer_on_next{}, + utils::forwarding_on_error{}, + buffer_on_completed{}, + std::move(state)); } }; diff --git a/src/rpp/rpp/utils/functors.hpp b/src/rpp/rpp/utils/functors.hpp index adb55bcf0..f3605f218 100644 --- a/src/rpp/rpp/utils/functors.hpp +++ b/src/rpp/rpp/utils/functors.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -62,31 +63,28 @@ struct get struct forwarding_on_next { - void operator()(auto&& v, const auto& sub, const auto&...) const { sub.on_next(std::forward(v)); } -}; + template> TSub> + void operator()(T&& v, const TSub& sub, const auto&...) const { sub.on_next(std::forward(v)); } -struct forwarding_on_error -{ - void operator()(const std::exception_ptr& err, const auto& sub, const auto&...) const { sub.on_error(err); } + template + void operator()(T&& v, const std::shared_ptr& state, const auto&...) const { state->subscriber.on_next(std::forward(v)); } }; -struct forwarding_on_completed +struct forwarding_on_error { - void operator()(const auto& sub, const auto&...) const { sub.on_completed(); } -}; + template + void operator()(const std::exception_ptr& err, const TSub& sub, const auto&...) const { sub.on_error(err); } -struct forwarding_on_next_for_pointer -{ - void operator()(auto&& v, const auto& sub) const { sub->on_next(std::forward(v)); } + template + void operator()(const std::exception_ptr& err, const std::shared_ptr& state, const auto&...) const { state->subscriber.on_error(err); } }; -struct forwarding_on_error_for_pointer +struct forwarding_on_completed { - void operator()(const std::exception_ptr& err, const auto& sub) const { sub->on_error(err); } -}; + template + void operator()(const TSub& sub, const auto&...) const { sub.on_completed(); } -struct forwarding_on_completed_for_pointer -{ - void operator()(const auto& sub) const { sub->on_completed(); } + template + void operator()(const std::shared_ptr& state, const auto&...) const { state->subscriber.on_completed(); } }; } // namespace rpp::utils From 056e69cff0b33a6af9fa44dfa6be53adc26ebb9e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 12 Dec 2022 22:21:57 +0300 Subject: [PATCH 2/3] Fix --- src/rpp/rpp/operators/buffer.hpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index 7261779e2..58d1c37a3 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -26,7 +26,7 @@ namespace rpp::details { /// A non-copyable class that provides a copyable on_next for the subscriber and /// allows copies of on_next(s) to share the same states. -template Subscriber> +template struct buffer_state { template TSub> @@ -40,7 +40,7 @@ struct buffer_state buffer_state(const buffer_state& other) = delete; buffer_state(buffer_state&&) noexcept = delete; - void clear_and_reserve_buckets() const + void clear_and_reserve_buckets() { buckets.clear(); buckets.reserve(max); @@ -53,13 +53,13 @@ struct buffer_state struct buffer_on_next { - template Subscriber> + template void operator()(T&& value, const std::shared_ptr>& state) const { state->buckets.push_back(std::forward(value)); - if (state.buckets.size() == state.max) + if (state->buckets.size() == state->max) { - state->subscriber.on_next(std::move(state.buckets)); + state->subscriber.on_next(std::move(state->buckets)); state->clear_and_reserve_buckets(); } } @@ -67,11 +67,11 @@ struct buffer_on_next struct buffer_on_completed { - template Subscriber> - void operator()(const buffer_state& state) const + template + void operator()(const std::shared_ptr>& state) const { - if (!state.buckets.empty()) - state->subscriber.on_next(std::move(state.buckets)); + if (!state->buckets.empty()) + state->subscriber.on_next(std::move(state->buckets)); state->subscriber.on_completed(); } }; From 8893ba913871e6f2ff2b54f0df44a0886da4b454 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 12 Dec 2022 22:37:21 +0300 Subject: [PATCH 3/3] fix --- src/rpp/rpp/utils/functors.hpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/rpp/rpp/utils/functors.hpp b/src/rpp/rpp/utils/functors.hpp index f3605f218..bc12e8283 100644 --- a/src/rpp/rpp/utils/functors.hpp +++ b/src/rpp/rpp/utils/functors.hpp @@ -66,6 +66,9 @@ struct forwarding_on_next template> TSub> void operator()(T&& v, const TSub& sub, const auto&...) const { sub.on_next(std::forward(v)); } + template> TObs> + void operator()(T&& v, const TObs& obs, const auto&...) const { obs.on_next(std::forward(v)); } + template void operator()(T&& v, const std::shared_ptr& state, const auto&...) const { state->subscriber.on_next(std::forward(v)); } }; @@ -75,6 +78,9 @@ struct forwarding_on_error template void operator()(const std::exception_ptr& err, const TSub& sub, const auto&...) const { sub.on_error(err); } + template + void operator()(const std::exception_ptr& err, const TObs& obs, const auto&...) const { obs.on_error(err); } + template void operator()(const std::exception_ptr& err, const std::shared_ptr& state, const auto&...) const { state->subscriber.on_error(err); } }; @@ -84,6 +90,9 @@ struct forwarding_on_completed template void operator()(const TSub& sub, const auto&...) const { sub.on_completed(); } + template + void operator()(const TObs& obs, const auto&...) const { obs.on_completed(); } + template void operator()(const std::shared_ptr& state, const auto&...) const { state->subscriber.on_completed(); } };