From 518d73870a8682cfe33aaa13888272e42c678d95 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 13 Jan 2024 23:38:44 +0300 Subject: [PATCH 1/4] reduce get_observer() calls --- src/rpp/rpp/operators/group_by.hpp | 53 ++++++++++++++---------------- src/rpp/rpp/operators/window.hpp | 37 ++++++++++++--------- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/src/rpp/rpp/operators/group_by.hpp b/src/rpp/rpp/operators/group_by.hpp index 6cfdcc756..fe2bd96ca 100644 --- a/src/rpp/rpp/operators/group_by.hpp +++ b/src/rpp/rpp/operators/group_by.hpp @@ -72,8 +72,10 @@ struct group_by_observer_strategy RPP_NO_UNIQUE_ADDRESS ValueSelector value_selector; RPP_NO_UNIQUE_ADDRESS KeyComparator comparator; - mutable std::map, KeyComparator> key_to_subject{}; - std::shared_ptr disposable = std::make_shared(); + using subject_observer = decltype(std::declval>().get_observer()); + + mutable std::map key_to_observer{}; + std::shared_ptr disposable = std::make_shared(); RPP_CALL_DURING_CONSTRUCTION( { @@ -93,55 +95,48 @@ struct group_by_observer_strategy template TT> void on_next(TT&& val) const { - const auto subject = deduce_subject(observer, val); - if (!subject) - return; - - const auto& subject_obs = subject->get_observer(); - if (!subject_obs.is_disposed()) - subject_obs.on_next(value_selector(std::forward(val))); + const auto subject_observer = deduce_observer(observer, val); + if (subject_observer && !subject_observer->is_disposed()) + subject_observer->on_next(value_selector(std::forward(val))); } void on_error(const std::exception_ptr& err) const { - for (const auto& [key, subject] : key_to_subject) - subject.get_observer().on_error(err); + for (const auto& [key, subject_observer] : key_to_observer) + subject_observer.on_error(err); observer.on_error(err); } void on_completed() const { - for (const auto& [key, subject] : key_to_subject) - subject.get_observer().on_completed(); + for (const auto& [key, subject_observer] : key_to_observer) + subject_observer.on_completed(); observer.on_completed(); } private: template TT> - const subjects::publish_subject* deduce_subject(const rpp::constraint::observer auto& obs, const TT& val) const + const subject_observer* deduce_observer(const rpp::constraint::observer auto& obs, const TT& val) const { - auto key = key_selector(utils::as_const(val)); + const auto key = key_selector(utils::as_const(val)); + + if (const auto itr = key_to_observer.find(key); itr != key_to_observer.cend()) + return &itr->second; if (obs.is_disposed()) - { - const auto itr = key_to_subject.find(key); - return itr == key_to_subject.cend() ? nullptr : &itr->second; - } + return nullptr; - auto [itr, inserted] = key_to_subject.try_emplace(key); + const subjects::publish_subject subj{}; - if (inserted) - { - disposable->add(rpp::disposable_wrapper::from_weak(itr->second.get_disposable().get_original())); - obs.on_next(rpp::grouped_observable_group_by{ - std::move(key), - group_by_observable_strategy{itr->second, disposable} - }); - } + disposable->add(rpp::disposable_wrapper::from_weak(subj.get_disposable().get_original())); + obs.on_next(rpp::grouped_observable_group_by{ + key, + group_by_observable_strategy{subj, disposable} + }); - return &itr->second; + return &key_to_observer.emplace(key, subj.get_observer()).first->second; } }; diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index d459948bc..33d77f795 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -43,7 +43,6 @@ class window_observer_strategy , m_window_size{std::max(size_t{1}, count)} { m_observer.set_upstream(m_disposble->add_ref()); - m_disposble->add(m_subject.get_disposable()); } template @@ -52,36 +51,35 @@ class window_observer_strategy // need to send new subject due to NEW item appeared (we avoid sending new subjects if no any new items) if (m_items_in_current_window == m_window_size) { - if (m_subject.get_disposable().is_disposed()) - { - m_subject = Subject{m_disposble}; - m_disposble->add(m_subject.get_disposable()); - } - - m_observer.on_next(m_subject.get_observable()); + Subject subject{m_disposble}; + m_subject_data.emplace(subject_data{subject.get_observer(), subject.get_disposable()}); + m_observer.on_next(subject.get_observable()); m_items_in_current_window = 0; } ++m_items_in_current_window; - m_subject.get_observer().on_next(std::forward(v)); + m_subject_data->observer.on_next(std::forward(v)); // cleanup current subject, but don't send due to wait for new value if (m_items_in_current_window == m_window_size) { - m_subject.get_observer().on_completed(); - m_disposble->remove(m_subject.get_disposable()); + m_subject_data->observer.on_completed(); + m_disposble->remove(m_subject_data->disposable); + m_subject_data.reset(); } } void on_error(const std::exception_ptr& err) const { - m_subject.get_observer().on_error(err); + if (m_subject_data) + m_subject_data->observer.on_error(err); m_observer.on_error(err); } void on_completed() const { - m_subject.get_observer().on_completed(); + if (m_subject_data) + m_subject_data->observer.on_completed(); m_observer.on_completed(); } @@ -92,9 +90,16 @@ class window_observer_strategy private: std::shared_ptr m_disposble = std::make_shared(); RPP_NO_UNIQUE_ADDRESS TObserver m_observer; - mutable Subject m_subject{m_disposble}; - const size_t m_window_size; - mutable size_t m_items_in_current_window = m_window_size; + + struct subject_data + { + decltype(std::declval().get_observer()) observer; + rpp::disposable_wrapper disposable; + }; + + mutable std::optional m_subject_data; + const size_t m_window_size; + mutable size_t m_items_in_current_window = m_window_size; }; struct window_t : public operators::details::operator_observable_strategy_different_types, size_t> From a35689f91e309bf08c81f03cebd5c48d13001128 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 13 Jan 2024 23:55:48 +0300 Subject: [PATCH 2/4] fix --- src/rpp/rpp/operators/window.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index 33d77f795..71e7fb6d9 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -53,6 +53,7 @@ class window_observer_strategy { Subject subject{m_disposble}; m_subject_data.emplace(subject_data{subject.get_observer(), subject.get_disposable()}); + m_disposble->add(m_subject_data.disposable); m_observer.on_next(subject.get_observable()); m_items_in_current_window = 0; } From 1ca3904b7cb4143972a77686a84241ddd50f1e54 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 13 Jan 2024 23:56:54 +0300 Subject: [PATCH 3/4] fix --- src/rpp/rpp/operators/window.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index 71e7fb6d9..04ebeb14f 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -58,11 +58,10 @@ class window_observer_strategy m_items_in_current_window = 0; } - ++m_items_in_current_window; m_subject_data->observer.on_next(std::forward(v)); // cleanup current subject, but don't send due to wait for new value - if (m_items_in_current_window == m_window_size) + if (++m_items_in_current_window == m_window_size) { m_subject_data->observer.on_completed(); m_disposble->remove(m_subject_data->disposable); From 9afaee5cdf6056f803a70e15633ce23ead0f6c0c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 13 Jan 2024 23:59:24 +0300 Subject: [PATCH 4/4] fix --- src/rpp/rpp/operators/window.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index 04ebeb14f..06475c76f 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -53,7 +53,7 @@ class window_observer_strategy { Subject subject{m_disposble}; m_subject_data.emplace(subject_data{subject.get_observer(), subject.get_disposable()}); - m_disposble->add(m_subject_data.disposable); + m_disposble->add(m_subject_data->disposable); m_observer.on_next(subject.get_observable()); m_items_in_current_window = 0; }