diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index 77d13297e..58d1c37a3 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -26,54 +26,53 @@ namespace rpp::details { /// A non-copyable class that provides a copyable on_next for the subscriber and /// allows copies of on_next(s) to share the same states. -template +template struct buffer_state { - /// \param count Number of items being bundled. Note when count == 0, we'll - /// treat the behavior like when count == 1. - explicit buffer_state(size_t count) + template TSub> + explicit buffer_state(size_t count, TSub&& sub) : max(std::max(size_t{1}, count)) + , subscriber{std::forward(sub)} { clear_and_reserve_buckets(); } buffer_state(const buffer_state& other) = delete; - buffer_state(buffer_state&&) noexcept = default; - buffer_state& operator=(const buffer_state&) = delete; - buffer_state& operator=(buffer_state&&) noexcept = default; + buffer_state(buffer_state&&) noexcept = delete; - void clear_and_reserve_buckets() const + void clear_and_reserve_buckets() { buckets.clear(); buckets.reserve(max); } - const size_t max; - mutable buffer_bundle_type buckets; + const size_t max; + buffer_bundle_type buckets; + Subscriber subscriber; }; struct buffer_on_next { - template - void operator()(auto&& value, const auto& subscriber, const buffer_state& state) const + template + void operator()(T&& value, const std::shared_ptr>& state) const { - state.buckets.push_back(std::forward(value)); - if (state.buckets.size() == state.max) + state->buckets.push_back(std::forward(value)); + if (state->buckets.size() == state->max) { - subscriber.on_next(std::move(state.buckets)); - state.clear_and_reserve_buckets(); + state->subscriber.on_next(std::move(state->buckets)); + state->clear_and_reserve_buckets(); } } }; struct buffer_on_completed { - template - void operator()(const auto& subscriber, const buffer_state& state) const + template + void operator()(const std::shared_ptr>& state) const { - if (!state.buckets.empty()) - subscriber.on_next(std::move(state.buckets)); - subscriber.on_completed(); + if (!state->buckets.empty()) + state->subscriber.on_next(std::move(state->buckets)); + state->subscriber.on_completed(); } }; @@ -86,14 +85,13 @@ struct buffer_impl auto operator()(TSub&& subscriber) const { auto subscription = subscriber.get_subscription(); + auto state = std::make_shared>>(count, std::forward(subscriber)); - // dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state - return create_subscriber_with_dynamic_state(std::move(subscription), - buffer_on_next{}, - utils::forwarding_on_error{}, - buffer_on_completed{}, - std::forward(subscriber), - buffer_state{count}); + return create_subscriber_with_state(std::move(subscription), + buffer_on_next{}, + utils::forwarding_on_error{}, + buffer_on_completed{}, + std::move(state)); } }; diff --git a/src/rpp/rpp/utils/functors.hpp b/src/rpp/rpp/utils/functors.hpp index adb55bcf0..bc12e8283 100644 --- a/src/rpp/rpp/utils/functors.hpp +++ b/src/rpp/rpp/utils/functors.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -62,31 +63,37 @@ struct get struct forwarding_on_next { - void operator()(auto&& v, const auto& sub, const auto&...) const { sub.on_next(std::forward(v)); } + template> TSub> + void operator()(T&& v, const TSub& sub, const auto&...) const { sub.on_next(std::forward(v)); } + + template> TObs> + void operator()(T&& v, const TObs& obs, const auto&...) const { obs.on_next(std::forward(v)); } + + template + void operator()(T&& v, const std::shared_ptr& state, const auto&...) const { state->subscriber.on_next(std::forward(v)); } }; struct forwarding_on_error { - void operator()(const std::exception_ptr& err, const auto& sub, const auto&...) const { sub.on_error(err); } -}; + template + void operator()(const std::exception_ptr& err, const TSub& sub, const auto&...) const { sub.on_error(err); } -struct forwarding_on_completed -{ - void operator()(const auto& sub, const auto&...) const { sub.on_completed(); } -}; + template + void operator()(const std::exception_ptr& err, const TObs& obs, const auto&...) const { obs.on_error(err); } -struct forwarding_on_next_for_pointer -{ - void operator()(auto&& v, const auto& sub) const { sub->on_next(std::forward(v)); } + template + void operator()(const std::exception_ptr& err, const std::shared_ptr& state, const auto&...) const { state->subscriber.on_error(err); } }; -struct forwarding_on_error_for_pointer +struct forwarding_on_completed { - void operator()(const std::exception_ptr& err, const auto& sub) const { sub->on_error(err); } -}; + template + void operator()(const TSub& sub, const auto&...) const { sub.on_completed(); } -struct forwarding_on_completed_for_pointer -{ - void operator()(const auto& sub) const { sub->on_completed(); } + template + void operator()(const TObs& obs, const auto&...) const { obs.on_completed(); } + + template + void operator()(const std::shared_ptr& state, const auto&...) const { state->subscriber.on_completed(); } }; } // namespace rpp::utils