From 8de4787c5e4e5dfac6a5a902d022da1d4135ecd2 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 14 Nov 2024 21:30:42 +0300 Subject: [PATCH 1/4] speedup switch_on_next --- src/rpp/rpp/operators/concat.hpp | 5 +-- src/rpp/rpp/operators/switch_on_next.hpp | 40 ++++++++++++++---------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 62fceac9f..5e8dcdf2f 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -12,6 +12,7 @@ #include +#include #include #include @@ -34,7 +35,7 @@ namespace rpp::operators::details }; template - class concat_disposable final : public rpp::composite_disposable + class concat_disposable final : public rpp::details::base_disposable , public rpp::details::enable_wrapper_from_this> { public: @@ -87,7 +88,7 @@ namespace rpp::operators::details return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst); } - void composite_dispose_impl(interface_disposable::Mode) noexcept override + void base_dispose_impl(interface_disposable::Mode) noexcept override { for (auto& d : m_child_disposables) d.dispose(); diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index 5ad52dd4e..8d791544e 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -13,14 +13,16 @@ #include #include -#include +#include #include #include +#include + namespace rpp::operators::details { template - class switch_on_next_state_t final : public refcount_disposable + class switch_on_next_state_t final : public rpp::details::base_disposable { public: template TObs> @@ -33,13 +35,21 @@ namespace rpp::operators::details switch_on_next_state_t(const switch_on_next_state_t&) = delete; switch_on_next_state_t(switch_on_next_state_t&&) noexcept = delete; - rpp::utils::pointer_under_lock get_observer() + rpp::utils::pointer_under_lock get_observer() { return m_observer_with_mutex; } + rpp::composite_disposable& get_base_child_disposable() { return m_base_child_disposable; } + rpp::utils::pointer_under_lock get_inner_child_disposable() { return m_inner_child_disposable; } + + private: + void base_dispose_impl(interface_disposable::Mode) noexcept override { - return m_observer_with_mutex; + get_base_child_disposable().dispose(); + get_inner_child_disposable()->dispose(); } private: - rpp::utils::value_with_mutex m_observer_with_mutex{}; + rpp::utils::value_with_mutex m_observer_with_mutex{}; + rpp::composite_disposable m_base_child_disposable{}; + rpp::utils::value_with_mutex m_inner_child_disposable{composite_disposable_wrapper::empty()}; }; template @@ -68,12 +78,11 @@ namespace rpp::operators::details void on_completed() const { m_refcounted.dispose(); - if (m_state->is_disposed()) + if (m_state->get_base_child_disposable().is_disposed()) m_state->get_observer()->on_completed(); } void set_upstream(const disposable_wrapper& d) const { m_refcounted.add(d); } - bool is_disposed() const { return m_refcounted.is_disposed(); } private: @@ -98,9 +107,10 @@ namespace rpp::operators::details template void on_next(T&& v) const { - m_last_refcount.dispose(); - m_last_refcount = m_state->add_ref(); - std::forward(v).subscribe(switch_on_next_inner_observer_strategy{m_state, m_last_refcount}); + const auto inner = m_state->get_inner_child_disposable(); + inner->dispose(); + inner = rpp::composite_disposable_wrapper::make(); + std::forward(v).subscribe(switch_on_next_inner_observer_strategy{m_state, *inner}); } void on_error(const std::exception_ptr& err) const @@ -110,13 +120,13 @@ namespace rpp::operators::details void on_completed() const { - m_this_refcount.dispose(); - if (m_state->is_disposed()) + m_state->get_base_child_disposable().dispose(); + if (m_state->get_inner_child_disposable()->is_disposed()) m_state->get_observer()->on_completed(); } - void set_upstream(const disposable_wrapper& d) const { m_this_refcount.add(d); } - bool is_disposed() const { return m_this_refcount.is_disposed(); } + void set_upstream(const disposable_wrapper& d) const { m_state->get_base_child_disposable().add(d); } + bool is_disposed() const { return m_state->get_base_child_disposable().is_disposed(); } private: static std::shared_ptr> init_state(TObserver&& observer) @@ -129,8 +139,6 @@ namespace rpp::operators::details private: std::shared_ptr> m_state; - rpp::composite_disposable_wrapper m_this_refcount = m_state->add_ref(); - mutable rpp::composite_disposable_wrapper m_last_refcount = composite_disposable_wrapper::empty(); }; struct switch_on_next_t : lift_operator From 6427359ea5608323519d729024ac86fa3001a8d5 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 14 Nov 2024 21:51:25 +0300 Subject: [PATCH 2/4] compile fix --- src/rpp/rpp/operators/switch_on_next.hpp | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index 8d791544e..5133fdb9f 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -58,9 +58,9 @@ namespace rpp::operators::details public: static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; - switch_on_next_inner_observer_strategy(const std::shared_ptr>& state, const composite_disposable_wrapper& refcounted) + switch_on_next_inner_observer_strategy(const std::shared_ptr>& state, composite_disposable_wrapper&& refcounted) : m_state{state} - , m_refcounted{refcounted} + , m_refcounted{std::move(refcounted)} { } @@ -107,10 +107,13 @@ namespace rpp::operators::details template void on_next(T&& v) const { - const auto inner = m_state->get_inner_child_disposable(); - inner->dispose(); - inner = rpp::composite_disposable_wrapper::make(); - std::forward(v).subscribe(switch_on_next_inner_observer_strategy{m_state, *inner}); + auto new_inner = rpp::composite_disposable_wrapper::make(); + { + auto inner = m_state->get_inner_child_disposable(); + inner->dispose(); + *inner = new_inner; + } + std::forward(v).subscribe(switch_on_next_inner_observer_strategy{m_state, std::move(new_inner)}); } void on_error(const std::exception_ptr& err) const From ce7cacf5244f92bd8f15ff68065711dae10b8571 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 15 Nov 2024 21:19:36 +0300 Subject: [PATCH 3/4] add tests --- src/rpp/rpp/operators/switch_on_next.hpp | 3 ++ src/tests/rpp/test_switch_on_next.cpp | 36 ++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index 5133fdb9f..b88ebaf6a 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -111,6 +111,9 @@ namespace rpp::operators::details { auto inner = m_state->get_inner_child_disposable(); inner->dispose(); + if (m_state->is_disposed()) + return; + *inner = new_inner; } std::forward(v).subscribe(switch_on_next_inner_observer_strategy{m_state, std::move(new_inner)}); diff --git a/src/tests/rpp/test_switch_on_next.cpp b/src/tests/rpp/test_switch_on_next.cpp index 18613c985..6078ac049 100644 --- a/src/tests/rpp/test_switch_on_next.cpp +++ b/src/tests/rpp/test_switch_on_next.cpp @@ -21,6 +21,7 @@ #include "copy_count_tracker.hpp" #include "disposable_observable.hpp" +#include "rpp_trompeloil.hpp" TEST_CASE("switch_on_next switches observable after obtaining new one") { @@ -164,6 +165,41 @@ TEST_CASE("switch_on_next switches observable after obtaining new one") } } } + SUBCASE("switch_on_next completes right") + { + mock_observer mock{}; + trompeloeil::sequence s{}; + + rpp::subjects::publish_subject> subj{}; + + subj.get_observable() | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock); + SUBCASE("on_completed from base") + { + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); + } + + SUBCASE("on_completed from inner + then from base") + { + subj.get_observer().on_next(rpp::source::empty()); + + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); + } + + SUBCASE("on_completed from base + then from inner") + { + subj.get_observer().on_next(rpp::source::empty()); + subj.get_observer().on_next(rpp::source::never()); + + rpp::subjects::publish_subject inner{}; + subj.get_observer().on_next(inner.get_observable()); + subj.get_observer().on_completed(); + + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s); + inner.get_observer().on_completed(); + } + } } TEST_CASE("switch_on_next doesn't produce extra copies") From 5462ce1d465fdee0828a87e295fe69215e301800 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 15 Nov 2024 21:42:03 +0300 Subject: [PATCH 4/4] fix tets --- src/tests/rpp/test_switch_on_next.cpp | 89 ++++++++++----------------- 1 file changed, 31 insertions(+), 58 deletions(-) diff --git a/src/tests/rpp/test_switch_on_next.cpp b/src/tests/rpp/test_switch_on_next.cpp index 6078ac049..eebdf2c81 100644 --- a/src/tests/rpp/test_switch_on_next.cpp +++ b/src/tests/rpp/test_switch_on_next.cpp @@ -11,7 +11,6 @@ #include #include -#include #include #include #include @@ -25,19 +24,19 @@ TEST_CASE("switch_on_next switches observable after obtaining new one") { - auto mock = mock_observer_strategy(); + mock_observer mock{}; + trompeloeil::sequence s{}; + SUBCASE("just observable of just observables") { auto observable = rpp::source::just(rpp::source::just(1), rpp::source::just(2), rpp::source::just(3)); - SUBCASE("subscribe on it via switch_on_next") + SUBCASE("subscribe on it via switch_on_next - obtains values as from concat") { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*mock, on_next_lvalue(2)).IN_SEQUENCE(s); + REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s); observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock); - SUBCASE("obtains values as from concat") - { - CHECK(mock.get_received_values() == std::vector{1, 2, 3}); - CHECK(mock.get_on_error_count() == 0); - CHECK(mock.get_on_completed_count() == 1); - } } } SUBCASE("just observable of just observables where second is error") @@ -45,15 +44,12 @@ TEST_CASE("switch_on_next switches observable after obtaining new one") auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(), rpp::source::error(std::make_exception_ptr(std::runtime_error{""})).as_dynamic(), rpp::source::just(3).as_dynamic()); - SUBCASE("subscribe on it via switch_on_next") + SUBCASE("subscribe on it via switch_on_next - obtains values as from concat but stops on error") { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock); - SUBCASE("obtains values as from concat but stops on error") - { - CHECK(mock.get_received_values() == std::vector{1}); - CHECK(mock.get_on_error_count() == 1); - CHECK(mock.get_on_completed_count() == 0); - } } } SUBCASE("just observable of just observables where second is completed") @@ -61,15 +57,13 @@ TEST_CASE("switch_on_next switches observable after obtaining new one") auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(), rpp::source::empty().as_dynamic(), rpp::source::just(3).as_dynamic()); - SUBCASE("subscribe on it via switch_on_next") + SUBCASE("subscribe on it via switch_on_next - obtains values as from concat") { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s); + observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock); - SUBCASE("obtains values as from concat") - { - CHECK(mock.get_received_values() == std::vector{1, 3}); - CHECK(mock.get_on_error_count() == 0); - CHECK(mock.get_on_completed_count() == 1); - } } } SUBCASE("just observable of just observables where second is never") @@ -77,15 +71,12 @@ TEST_CASE("switch_on_next switches observable after obtaining new one") auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(), rpp::source::never().as_dynamic(), rpp::source::just(3).as_dynamic()); - SUBCASE("subscribe on it via switch_on_next") + SUBCASE("subscribe on it via switch_on_next - obtains values as from concat") { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s); observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock); - SUBCASE("obtains values as from concat") - { - CHECK(mock.get_received_values() == std::vector{1, 3}); - CHECK(mock.get_on_error_count() == 0); - CHECK(mock.get_on_completed_count() == 1); - } } } SUBCASE("just observable of just observables where last is never") @@ -93,15 +84,12 @@ TEST_CASE("switch_on_next switches observable after obtaining new one") auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(), rpp::source::just(3).as_dynamic(), rpp::source::never().as_dynamic()); - SUBCASE("subscribe on it via switch_on_next") + SUBCASE("subscribe on it via switch_on_next - obtains values as from concat but no complete") { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s); + observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock); - SUBCASE("obtains values as from concat but no complete") - { - CHECK(mock.get_received_values() == std::vector{1, 3}); - CHECK(mock.get_on_error_count() == 0); - CHECK(mock.get_on_completed_count() == 0); - } } } SUBCASE("subject of just subjects") @@ -121,12 +109,9 @@ TEST_CASE("switch_on_next switches observable after obtaining new one") SUBCASE("Only value from first subject obtained") { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s); subj_1.get_observer().on_next(1); subj_2.get_observer().on_next(2); - - CHECK(mock.get_received_values() == std::vector{1}); - CHECK(mock.get_on_error_count() == 0); - CHECK(mock.get_on_completed_count() == 0); } SUBCASE("send second subject and send values for all subjects") { @@ -135,31 +120,22 @@ TEST_CASE("switch_on_next switches observable after obtaining new one") SUBCASE("Only value from second subject obtained") { subj_1.get_observer().on_next(1); - subj_2.get_observer().on_next(2); - CHECK(mock.get_received_values() == std::vector{2}); - CHECK(mock.get_on_error_count() == 0); - CHECK(mock.get_on_completed_count() == 0); + REQUIRE_CALL(*mock, on_next_lvalue(2)).IN_SEQUENCE(s); + subj_2.get_observer().on_next(2); } } SUBCASE("original subject completes but provided send value") { subj_of_subjects.get_observer().on_completed(); + + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s); subj_1.get_observer().on_next(1); subj_2.get_observer().on_next(2); - SUBCASE("value obtained") - { - CHECK(mock.get_received_values() == std::vector{1}); - CHECK(mock.get_on_error_count() == 0); - CHECK(mock.get_on_completed_count() == 0); - } SUBCASE("subject sends on_completed") { + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s); subj_1.get_observer().on_completed(); - SUBCASE("subsriber completed") - { - CHECK(mock.get_on_completed_count() == 1); - } } } } @@ -167,9 +143,6 @@ TEST_CASE("switch_on_next switches observable after obtaining new one") } SUBCASE("switch_on_next completes right") { - mock_observer mock{}; - trompeloeil::sequence s{}; - rpp::subjects::publish_subject> subj{}; subj.get_observable() | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);