Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 24 additions & 29 deletions src/rpp/rpp/operators/group_by.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ struct group_by_observer_strategy
RPP_NO_UNIQUE_ADDRESS ValueSelector value_selector;
RPP_NO_UNIQUE_ADDRESS KeyComparator comparator;

mutable std::map<TKey, subjects::publish_subject<Type>, KeyComparator> key_to_subject{};
std::shared_ptr<refcount_disposable> disposable = std::make_shared<refcount_disposable>();
using subject_observer = decltype(std::declval<subjects::publish_subject<Type>>().get_observer());

mutable std::map<TKey, subject_observer, KeyComparator> key_to_observer{};
std::shared_ptr<refcount_disposable> disposable = std::make_shared<refcount_disposable>();

RPP_CALL_DURING_CONSTRUCTION(
{
Expand All @@ -93,55 +95,48 @@ struct group_by_observer_strategy
template<rpp::constraint::decayed_same_as<T> TT>
void on_next(TT&& val) const
{
const auto subject = deduce_subject(observer, val);
if (!subject)
return;

const auto& subject_obs = subject->get_observer();
if (!subject_obs.is_disposed())
subject_obs.on_next(value_selector(std::forward<TT>(val)));
const auto subject_observer = deduce_observer(observer, val);
if (subject_observer && !subject_observer->is_disposed())
subject_observer->on_next(value_selector(std::forward<TT>(val)));
}

void on_error(const std::exception_ptr& err) const
{
for (const auto& [key, subject] : key_to_subject)
subject.get_observer().on_error(err);
for (const auto& [key, subject_observer] : key_to_observer)
subject_observer.on_error(err);

observer.on_error(err);
}

void on_completed() const
{
for (const auto& [key, subject] : key_to_subject)
subject.get_observer().on_completed();
for (const auto& [key, subject_observer] : key_to_observer)
subject_observer.on_completed();

observer.on_completed();
}

private:
template<rpp::constraint::decayed_same_as<T> TT>
const subjects::publish_subject<Type>* deduce_subject(const rpp::constraint::observer auto& obs, const TT& val) const
const subject_observer* deduce_observer(const rpp::constraint::observer auto& obs, const TT& val) const
{
auto key = key_selector(utils::as_const(val));
const auto key = key_selector(utils::as_const(val));

if (const auto itr = key_to_observer.find(key); itr != key_to_observer.cend())
return &itr->second;

if (obs.is_disposed())
{
const auto itr = key_to_subject.find(key);
return itr == key_to_subject.cend() ? nullptr : &itr->second;
}
return nullptr;

auto [itr, inserted] = key_to_subject.try_emplace(key);
const subjects::publish_subject<Type> subj{};

if (inserted)
{
disposable->add(rpp::disposable_wrapper::from_weak(itr->second.get_disposable().get_original()));
obs.on_next(rpp::grouped_observable_group_by<TKey, Type>{
std::move(key),
group_by_observable_strategy<Type>{itr->second, disposable}
});
}
disposable->add(rpp::disposable_wrapper::from_weak(subj.get_disposable().get_original()));
obs.on_next(rpp::grouped_observable_group_by<TKey, Type>{
key,
group_by_observable_strategy<Type>{subj, disposable}
});

return &itr->second;
return &key_to_observer.emplace(key, subj.get_observer()).first->second;
}
};

Expand Down
41 changes: 23 additions & 18 deletions src/rpp/rpp/operators/window.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class window_observer_strategy
, m_window_size{std::max(size_t{1}, count)}
{
m_observer.set_upstream(m_disposble->add_ref());
m_disposble->add(m_subject.get_disposable());
}

template<typename T>
Expand All @@ -52,36 +51,35 @@ class window_observer_strategy
// need to send new subject due to NEW item appeared (we avoid sending new subjects if no any new items)
if (m_items_in_current_window == m_window_size)
{
if (m_subject.get_disposable().is_disposed())
{
m_subject = Subject{m_disposble};
m_disposble->add(m_subject.get_disposable());
}

m_observer.on_next(m_subject.get_observable());
Subject subject{m_disposble};
m_subject_data.emplace(subject_data{subject.get_observer(), subject.get_disposable()});
m_disposble->add(m_subject_data->disposable);
m_observer.on_next(subject.get_observable());
m_items_in_current_window = 0;
}

++m_items_in_current_window;
m_subject.get_observer().on_next(std::forward<T>(v));
m_subject_data->observer.on_next(std::forward<T>(v));

// cleanup current subject, but don't send due to wait for new value
if (m_items_in_current_window == m_window_size)
if (++m_items_in_current_window == m_window_size)
{
m_subject.get_observer().on_completed();
m_disposble->remove(m_subject.get_disposable());
m_subject_data->observer.on_completed();
m_disposble->remove(m_subject_data->disposable);
m_subject_data.reset();
}
}

void on_error(const std::exception_ptr& err) const
{
m_subject.get_observer().on_error(err);
if (m_subject_data)
m_subject_data->observer.on_error(err);
m_observer.on_error(err);
}

void on_completed() const
{
m_subject.get_observer().on_completed();
if (m_subject_data)
m_subject_data->observer.on_completed();
m_observer.on_completed();
}

Expand All @@ -92,9 +90,16 @@ class window_observer_strategy
private:
std::shared_ptr<refcount_disposable> m_disposble = std::make_shared<refcount_disposable>();
RPP_NO_UNIQUE_ADDRESS TObserver m_observer;
mutable Subject m_subject{m_disposble};
const size_t m_window_size;
mutable size_t m_items_in_current_window = m_window_size;

struct subject_data
{
decltype(std::declval<Subject>().get_observer()) observer;
rpp::disposable_wrapper disposable;
};

mutable std::optional<subject_data> m_subject_data;
const size_t m_window_size;
mutable size_t m_items_in_current_window = m_window_size;
};

struct window_t : public operators::details::operator_observable_strategy_different_types<window_observer_strategy, rpp::utils::types<>, size_t>
Expand Down