Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 26 additions & 28 deletions src/rpp/rpp/operators/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,54 +26,53 @@ namespace rpp::details
{
/// A non-copyable class that provides a copyable on_next for the subscriber and
/// allows copies of on_next(s) to share the same states.
template<constraint::decayed_type UpstreamType>
template<constraint::decayed_type UpstreamType, constraint::subscriber Subscriber>
struct buffer_state
{
/// \param count Number of items being bundled. Note when count == 0, we'll
/// treat the behavior like when count == 1.
explicit buffer_state(size_t count)
template<constraint::decayed_same_as<Subscriber> TSub>
explicit buffer_state(size_t count, TSub&& sub)
: max(std::max(size_t{1}, count))
, subscriber{std::forward<TSub>(sub)}
{
clear_and_reserve_buckets();
}

buffer_state(const buffer_state& other) = delete;
buffer_state(buffer_state&&) noexcept = default;
buffer_state& operator=(const buffer_state&) = delete;
buffer_state& operator=(buffer_state&&) noexcept = default;
buffer_state(buffer_state&&) noexcept = delete;

void clear_and_reserve_buckets() const
void clear_and_reserve_buckets()
{
buckets.clear();
buckets.reserve(max);
}

const size_t max;
mutable buffer_bundle_type<UpstreamType> buckets;
const size_t max;
buffer_bundle_type<UpstreamType> buckets;
Subscriber subscriber;
};

struct buffer_on_next
{
template<constraint::decayed_type UpstreamType>
void operator()(auto&& value, const auto& subscriber, const buffer_state<UpstreamType>& state) const
template<typename T, constraint::decayed_type UpstreamType, constraint::subscriber Subscriber>
void operator()(T&& value, const std::shared_ptr<buffer_state<UpstreamType, Subscriber>>& state) const
{
state.buckets.push_back(std::forward<decltype(value)>(value));
if (state.buckets.size() == state.max)
state->buckets.push_back(std::forward<T>(value));
if (state->buckets.size() == state->max)
{
subscriber.on_next(std::move(state.buckets));
state.clear_and_reserve_buckets();
state->subscriber.on_next(std::move(state->buckets));
state->clear_and_reserve_buckets();
}
}
};

struct buffer_on_completed
{
template<constraint::decayed_type UpstreamType>
void operator()(const auto& subscriber, const buffer_state<UpstreamType>& state) const
template<constraint::decayed_type UpstreamType, constraint::subscriber Subscriber>
void operator()(const std::shared_ptr<buffer_state<UpstreamType, Subscriber>>& state) const
{
if (!state.buckets.empty())
subscriber.on_next(std::move(state.buckets));
subscriber.on_completed();
if (!state->buckets.empty())
state->subscriber.on_next(std::move(state->buckets));
state->subscriber.on_completed();
}
};

Expand All @@ -86,14 +85,13 @@ struct buffer_impl
auto operator()(TSub&& subscriber) const
{
auto subscription = subscriber.get_subscription();
auto state = std::make_shared<buffer_state<Type, std::decay_t<TSub>>>(count, std::forward<TSub>(subscriber));

// dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state
return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
buffer_on_next{},
utils::forwarding_on_error{},
buffer_on_completed{},
std::forward<TSub>(subscriber),
buffer_state<Type>{count});
return create_subscriber_with_state<Type>(std::move(subscription),
buffer_on_next{},
utils::forwarding_on_error{},
buffer_on_completed{},
std::move(state));
}
};

Expand Down
39 changes: 23 additions & 16 deletions src/rpp/rpp/utils/functors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <rpp/subscribers/constraints.hpp>

#include <exception>
#include <memory>
#include <utility>
#include <tuple>

Expand Down Expand Up @@ -62,31 +63,37 @@ struct get

struct forwarding_on_next
{
void operator()(auto&& v, const auto& sub, const auto&...) const { sub.on_next(std::forward<decltype(v)>(v)); }
template<typename T, constraint::subscriber_of_type<std::decay_t<T>> TSub>
void operator()(T&& v, const TSub& sub, const auto&...) const { sub.on_next(std::forward<T>(v)); }

template<typename T, constraint::observer_of_type<std::decay_t<T>> TObs>
void operator()(T&& v, const TObs& obs, const auto&...) const { obs.on_next(std::forward<T>(v)); }

template<typename T, typename State>
void operator()(T&& v, const std::shared_ptr<State>& state, const auto&...) const { state->subscriber.on_next(std::forward<T>(v)); }
};

struct forwarding_on_error
{
void operator()(const std::exception_ptr& err, const auto& sub, const auto&...) const { sub.on_error(err); }
};
template<constraint::subscriber TSub>
void operator()(const std::exception_ptr& err, const TSub& sub, const auto&...) const { sub.on_error(err); }

struct forwarding_on_completed
{
void operator()(const auto& sub, const auto&...) const { sub.on_completed(); }
};
template<constraint::observer TObs>
void operator()(const std::exception_ptr& err, const TObs& obs, const auto&...) const { obs.on_error(err); }

struct forwarding_on_next_for_pointer
{
void operator()(auto&& v, const auto& sub) const { sub->on_next(std::forward<decltype(v)>(v)); }
template<typename State>
void operator()(const std::exception_ptr& err, const std::shared_ptr<State>& state, const auto&...) const { state->subscriber.on_error(err); }
};

struct forwarding_on_error_for_pointer
struct forwarding_on_completed
{
void operator()(const std::exception_ptr& err, const auto& sub) const { sub->on_error(err); }
};
template<constraint::subscriber TSub>
void operator()(const TSub& sub, const auto&...) const { sub.on_completed(); }

struct forwarding_on_completed_for_pointer
{
void operator()(const auto& sub) const { sub->on_completed(); }
template<constraint::observer TObs>
void operator()(const TObs& obs, const auto&...) const { obs.on_completed(); }

template<typename State>
void operator()(const std::shared_ptr<State>& state, const auto&...) const { state->subscriber.on_completed(); }
};
} // namespace rpp::utils