From fc14edc980442a5abc3e4c09d6b861cfd21d8598 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 3 Jun 2024 22:00:07 +0300 Subject: [PATCH 01/13] initial benchmarks --- cmake/dependencies.cmake | 10 +++++++--- src/benchmarks/benchmarks.cpp | 17 +++++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index 6430e41b4..697745e08 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -46,11 +46,15 @@ macro(rpp_fetch_library_extended NAME URL TAG TARGET_NAME) Include(FetchContent) set(BUILD_SHARED_LIBS OFF CACHE INTERNAL "Build SHARED libraries") + Set(FETCHCONTENT_QUIET FALSE) + FetchContent_Declare( ${NAME} GIT_REPOSITORY ${URL} GIT_TAG ${TAG} - GIT_SHALLOW TRUE + GIT_SHALLOW TRUE + GIT_PROGRESS TRUE + GIT_SUBMODULES "" ) FetchContent_MakeAvailable(${NAME}) @@ -60,7 +64,7 @@ endmacro() macro(rpp_fetch_library NAME URL TAG) find_package(${NAME} QUIET) if (NOT ${NAME}_FOUND) - message("-- RPP: Fetching ${NAME}...") + message("-- RPP: Fetching ${NAME} from ${URL} by ${TAG}...") rpp_fetch_library_extended(${NAME} ${URL} ${TAG} ${NAME}) endif() endmacro() @@ -68,7 +72,7 @@ endmacro() # ==================== RXCPP ======================= if (RPP_BUILD_RXCPP AND RPP_BUILD_BENCHMARKS) set(RXCPP_DISABLE_TESTS_AND_EXAMPLES 1) - rpp_fetch_library(rxcpp https://github.com/ReactiveX/RxCpp.git origin/main) + rpp_fetch_library(rxcpp https://github.com/ReactiveX/RxCpp.git main) endif() # ===================== Tests =================== diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index e4ce85dc3..3ed8d7cdc 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -661,6 +661,23 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) }); } } + SECTION("subscribe 1000 observers to publish_subject") + { + TEST_RPP([&]{ + rpp::subjects::publish_subject s{}; + for(size_t i =0; i < 1000; ++i) { + s.get_observable().subscribe(rpp::make_lambda_observer([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); + } + s.get_observer().on_next(1); + }); + TEST_RXCPP([&]{ + rxcpp::subjects::subject s{}; + for(size_t i =0; i < 1000; ++i) { + s.get_observable().subscribe(rxcpp::make_subscriber([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); + } + s.get_subscriber().on_next(1); + }); + } } // BENCHMARK("Subjects") BENCHMARK("Scenarios") From 384a27e96f535f85b6e7319962a7cf69ce1d08e1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 3 Jun 2024 19:00:36 +0000 Subject: [PATCH 02/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/benchmarks/benchmarks.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index 3ed8d7cdc..b2c738a91 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -663,16 +663,18 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) } SECTION("subscribe 1000 observers to publish_subject") { - TEST_RPP([&]{ + TEST_RPP([&] { rpp::subjects::publish_subject s{}; - for(size_t i =0; i < 1000; ++i) { + for (size_t i = 0; i < 1000; ++i) + { s.get_observable().subscribe(rpp::make_lambda_observer([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); } s.get_observer().on_next(1); }); - TEST_RXCPP([&]{ + TEST_RXCPP([&] { rxcpp::subjects::subject s{}; - for(size_t i =0; i < 1000; ++i) { + for (size_t i = 0; i < 1000; ++i) + { s.get_observable().subscribe(rxcpp::make_subscriber([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); } s.get_subscriber().on_next(1); From f81072f2f18007fd8f2b1b22346b723aa65170c1 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 3 Jun 2024 22:08:32 +0300 Subject: [PATCH 03/13] more tests --- src/benchmarks/benchmarks.cpp | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index b2c738a91..ff55af4bc 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -680,6 +680,30 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) s.get_subscriber().on_next(1); }); } + SECTION("100 on_next to 1000 observers to publish_subject") + { + { + rpp::subjects::publish_subject rpp_subj{}; + for(size_t i =0; i < 1000; ++i) { + rpp_subj.get_observable().subscribe(rpp::make_lambda_observer([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); + } + TEST_RPP([&]{ + for(size_t i =0; i < 100; ++i) + rpp_subj.get_observer().on_next(i); + }); + } + + { + rxcpp::subjects::subject rxcpp_subj{}; + for(size_t i =0; i < 1000; ++i) { + rxcpp_subj.get_observable().subscribe(rxcpp::make_subscriber([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); + } + TEST_RXCPP([&]{ + for(size_t i =0; i < 100; ++i) + rxcpp_subj.get_subscriber().on_next(i); + }); + } + } } // BENCHMARK("Subjects") BENCHMARK("Scenarios") From edea0ba7aec3deefe4677dc60e80b41303efcdee Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 3 Jun 2024 23:13:45 +0300 Subject: [PATCH 04/13] simplify benchmark --- src/benchmarks/benchmarks.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index ff55af4bc..9e6618c8e 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -661,11 +661,11 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) }); } } - SECTION("subscribe 1000 observers to publish_subject") + SECTION("subscribe 100 observers to publish_subject") { TEST_RPP([&] { rpp::subjects::publish_subject s{}; - for (size_t i = 0; i < 1000; ++i) + for (size_t i = 0; i < 100; ++i) { s.get_observable().subscribe(rpp::make_lambda_observer([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); } @@ -673,18 +673,18 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) }); TEST_RXCPP([&] { rxcpp::subjects::subject s{}; - for (size_t i = 0; i < 1000; ++i) + for (size_t i = 0; i < 100; ++i) { s.get_observable().subscribe(rxcpp::make_subscriber([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); } s.get_subscriber().on_next(1); }); } - SECTION("100 on_next to 1000 observers to publish_subject") + SECTION("100 on_next to 100 observers to publish_subject") { { rpp::subjects::publish_subject rpp_subj{}; - for(size_t i =0; i < 1000; ++i) { + for(size_t i =0; i < 100; ++i) { rpp_subj.get_observable().subscribe(rpp::make_lambda_observer([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); } TEST_RPP([&]{ @@ -695,7 +695,7 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) { rxcpp::subjects::subject rxcpp_subj{}; - for(size_t i =0; i < 1000; ++i) { + for(size_t i =0; i < 100; ++i) { rxcpp_subj.get_observable().subscribe(rxcpp::make_subscriber([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); } TEST_RXCPP([&]{ From 4018426a4049a5fe1829561cbef43925af1e0943 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 3 Jun 2024 20:14:03 +0000 Subject: [PATCH 05/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/benchmarks/benchmarks.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index 9e6618c8e..a90f0c962 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -684,22 +684,24 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) { { rpp::subjects::publish_subject rpp_subj{}; - for(size_t i =0; i < 100; ++i) { + for (size_t i = 0; i < 100; ++i) + { rpp_subj.get_observable().subscribe(rpp::make_lambda_observer([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); } - TEST_RPP([&]{ - for(size_t i =0; i < 100; ++i) + TEST_RPP([&] { + for (size_t i = 0; i < 100; ++i) rpp_subj.get_observer().on_next(i); }); } { rxcpp::subjects::subject rxcpp_subj{}; - for(size_t i =0; i < 100; ++i) { + for (size_t i = 0; i < 100; ++i) + { rxcpp_subj.get_observable().subscribe(rxcpp::make_subscriber([](int v) { ankerl::nanobench::doNotOptimizeAway(v); })); } - TEST_RXCPP([&]{ - for(size_t i =0; i < 100; ++i) + TEST_RXCPP([&] { + for (size_t i = 0; i < 100; ++i) rxcpp_subj.get_subscriber().on_next(i); }); } From cf99daea29ab217b2fb81bab909bbfa3b3defbbe Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 3 Jun 2024 23:33:03 +0300 Subject: [PATCH 06/13] speedup --- .../rpp/subjects/details/subject_state.hpp | 91 ++++++++----------- src/tests/rpp/test_subjects.cpp | 31 +++++++ 2 files changed, 70 insertions(+), 52 deletions(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index b902fa304..db67a4ef9 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -23,7 +23,7 @@ #include #include #include -#include +#include namespace rpp::subjects::details { @@ -39,7 +39,8 @@ namespace rpp::subjects::details class subject_state : public composite_disposable , public rpp::details::enable_wrapper_from_this> { - using shared_observers = std::shared_ptr>>; + using observers = std::deque>>; + using shared_observers = std::shared_ptr; using state_t = std::variant; public: @@ -54,13 +55,24 @@ namespace rpp::subjects::details process_state_unsafe( m_state, [&](const shared_observers& observers) { - auto new_observers = make_copy_of_subscribed_observers(true, observers); + auto d = rpp::disposable_wrapper{make_callback_disposable( + [weak = this->wrapper_from_this().as_weak()]() noexcept // NOLINT(bugprone-exception-escape) + { + if (const auto shared = weak.lock()) + { + std::unique_lock lock{shared->m_mutex}; + process_state_unsafe(shared->m_state, + [&](const shared_observers& observers) { + shared->m_state = cleanup_observers(observers); + }); + } + })}; + auto observer_as_dynamic = std::forward(observer).as_dynamic(); - new_observers->push_back(observer_as_dynamic); - m_state = std::move(new_observers); + observers->emplace_back(d.as_weak(), observer_as_dynamic); lock.unlock(); - set_upstream(observer_as_dynamic); + observer_as_dynamic.set_upstream(std::move(d)); }, [&](const std::exception_ptr& err) { lock.unlock(); @@ -74,9 +86,20 @@ namespace rpp::subjects::details void on_next(const Type& v) { + std::unique_lock observers_lock{m_mutex}; + + if (!std::holds_alternative(m_state)) + return; + + // we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call + const auto observers = std::get(m_state); + const auto begin = observers->cbegin(); + const auto end = observers->cend(); + + observers_lock.unlock(); + std::lock_guard lock{m_serialized_mutex}; - if (const auto observers = extract_observers_under_lock_if_there()) - rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_next(v); }); + std::for_each(begin, end, [&](const auto& pair) { pair.second.on_next(v); }); } void on_error(const std::exception_ptr& err) @@ -84,7 +107,7 @@ namespace rpp::subjects::details { std::lock_guard lock{m_serialized_mutex}; if (const auto observers = exchange_observers_under_lock_if_there(err)) - rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_error(err); }); + rpp::utils::for_each(*observers, [&](const auto& pair) { pair.second.on_error(err); }); } dispose(); } @@ -94,7 +117,7 @@ namespace rpp::subjects::details { std::lock_guard lock{m_serialized_mutex}; if (const auto observers = exchange_observers_under_lock_if_there(completed{})) - rpp::utils::for_each(*observers, rpp::utils::static_mem_fn<&dynamic_observer::on_completed>{}); + rpp::utils::for_each(*observers, [](const auto& pair) { pair.second.on_completed(); }); } dispose(); } @@ -105,62 +128,26 @@ namespace rpp::subjects::details exchange_observers_under_lock_if_there(disposed{}); } - void set_upstream(rpp::dynamic_observer& obs) + static shared_observers cleanup_observers(const shared_observers& current_subs) { - obs.set_upstream(rpp::disposable_wrapper{make_callback_disposable( - [weak = this->wrapper_from_this().as_weak()]() noexcept // NOLINT(bugprone-exception-escape) - { - if (const auto shared = weak.lock()) - { - std::unique_lock lock{shared->m_mutex}; - process_state_unsafe(shared->m_state, - [&](const shared_observers& observers) { - shared->m_state = make_copy_of_subscribed_observers(false, observers); - }); - } - })}); - } - - static shared_observers make_copy_of_subscribed_observers(bool add, const shared_observers& current_subs) - { - auto subs = std::make_shared>>(); - subs->reserve(deduce_new_size(add, current_subs)); + auto subs = std::make_shared(); if (current_subs) { std::copy_if(current_subs->cbegin(), current_subs->cend(), std::back_inserter(*subs), - rpp::utils::static_not_mem_fn<&dynamic_observer::is_disposed>{}); + [](const auto& pair) { + return !pair.first.is_disposed() && !pair.second.is_disposed(); + }); } return subs; } - static size_t deduce_new_size(bool add, const shared_observers& current_subs) - { - if (!current_subs) - return add ? 1 : 0; - - if (add) - return current_subs->size() + 1; - - return std::max(current_subs->size(), size_t{1}) - 1; - } - static void process_state_unsafe(const state_t& state, const auto&... actions) { std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state); } - shared_observers extract_observers_under_lock_if_there() - { - std::lock_guard lock{m_mutex}; - - if (!std::holds_alternative(m_state)) - return {}; - - return std::get(m_state); - } - shared_observers exchange_observers_under_lock_if_there(state_t&& new_val) { std::lock_guard lock{m_mutex}; @@ -172,7 +159,7 @@ namespace rpp::subjects::details } private: - state_t m_state{}; + state_t m_state = std::make_shared(); std::mutex m_mutex{}; RPP_NO_UNIQUE_ADDRESS std::conditional_t m_serialized_mutex{}; }; diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index 5f7ddbb74..b3e880c4a 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -20,6 +20,7 @@ #include #include "copy_count_tracker.hpp" +#include "rpp_trompeloil.hpp" #include @@ -137,6 +138,36 @@ TEST_CASE("publish subject multicasts values") } } +TEST_CASE("subject can be modified from on_next call") +{ + rpp::subjects::publish_subject subject{}; + mock_observer inner_mock{}; + + SECTION("subscribe inside on_next") { + subject.get_observable().subscribe([&subject, &inner_mock](int) { + subject.get_observable().subscribe(inner_mock); + }); + + subject.get_observer().on_next(1); + + REQUIRE_CALL(*inner_mock, on_next_lvalue(2)); + subject.get_observer().on_next(2); + } + + SECTION("unsubscribe inside on_next") { + auto d = rpp::composite_disposable_wrapper::make(); + + subject.get_observable().subscribe([d](int) { + d.clear(); + }); + subject.get_observable().subscribe(d, inner_mock); + + REQUIRE_CALL(*inner_mock, on_next_lvalue(1)); + subject.get_observer().on_next(1); + subject.get_observer().on_next(2); + } +} + TEST_CASE("publish subject caches error/completed") { auto mock = mock_observer_strategy{}; From 10c870a7d935b271f43681482ea52934049bdbf4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 3 Jun 2024 20:33:18 +0000 Subject: [PATCH 07/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/subjects/details/subject_state.hpp | 8 ++++---- src/tests/rpp/test_subjects.cpp | 8 +++++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index db67a4ef9..c760791d5 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -20,10 +20,10 @@ #include #include +#include #include #include #include -#include namespace rpp::subjects::details { @@ -93,8 +93,8 @@ namespace rpp::subjects::details // we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call const auto observers = std::get(m_state); - const auto begin = observers->cbegin(); - const auto end = observers->cend(); + const auto begin = observers->cbegin(); + const auto end = observers->cend(); observers_lock.unlock(); @@ -137,7 +137,7 @@ namespace rpp::subjects::details current_subs->cend(), std::back_inserter(*subs), [](const auto& pair) { - return !pair.first.is_disposed() && !pair.second.is_disposed(); + return !pair.first.is_disposed() && !pair.second.is_disposed(); }); } return subs; diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index b3e880c4a..04eaac90c 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -141,9 +141,10 @@ TEST_CASE("publish subject multicasts values") TEST_CASE("subject can be modified from on_next call") { rpp::subjects::publish_subject subject{}; - mock_observer inner_mock{}; + mock_observer inner_mock{}; - SECTION("subscribe inside on_next") { + SECTION("subscribe inside on_next") + { subject.get_observable().subscribe([&subject, &inner_mock](int) { subject.get_observable().subscribe(inner_mock); }); @@ -154,7 +155,8 @@ TEST_CASE("subject can be modified from on_next call") subject.get_observer().on_next(2); } - SECTION("unsubscribe inside on_next") { + SECTION("unsubscribe inside on_next") + { auto d = rpp::composite_disposable_wrapper::make(); subject.get_observable().subscribe([d](int) { From 4831042fd4af75555c359ad969ed6fcf84105651 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 4 Jun 2024 23:44:41 +0300 Subject: [PATCH 08/13] dynamic_observer --- src/rpp/rpp/observers/dynamic_observer.hpp | 96 +++++++++---------- .../rpp/subjects/details/subject_state.hpp | 63 +++++++----- 2 files changed, 87 insertions(+), 72 deletions(-) diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index deaf499d1..3378046b8 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -19,24 +19,51 @@ namespace rpp::details::observers { - template - struct member_ptr_caller_impl; - - template - struct member_ptr_caller_impl + template + struct observer_vtable { - static R call(void* data, Args... args) noexcept(NoExcept) { return (static_cast(data)->*F)(static_cast(args)...); } + void (*on_next_lvalue)(const observer_vtable*, const Type&){}; + void (*on_next_rvalue)(const observer_vtable*, Type&&){}; + void (*on_error)(const observer_vtable*, const std::exception_ptr&){}; + void (*on_completed)(const observer_vtable*){}; + + void (*set_upstream)(observer_vtable*, const disposable_wrapper&){}; + bool (*is_disposed)(const observer_vtable*){}; }; - template - struct member_ptr_caller_impl + template + class type_erased_observer : public observer_vtable> { - static R call(const void* data, Args... args) noexcept(NoExcept) { return (static_cast(data)->*F)(static_cast(args)...); } - }; + using Type = rpp::utils::extract_observer_type_t; + using Base = observer_vtable; + + static constexpr const TObs& cast(const Base* ptr) + { + return static_cast(ptr)->m_observer; + } + + static constexpr TObs& cast(Base* ptr) + { + return static_cast(ptr)->m_observer; + } - template - using member_ptr_caller = member_ptr_caller_impl; + public: + type_erased_observer(TObs&& observer) + : Base{ + .on_next_lvalue = +[](const Base* b, const Type& v) { cast(b).on_next(v); }, + .on_next_rvalue = +[](const Base* b, Type&& v) { cast(b).on_next(std::move(v)); }, + .on_error = +[](const Base* b, const std::exception_ptr& err) { cast(b).on_error(err); }, + .on_completed = +[](const Base* b) { cast(b).on_completed(); }, + .set_upstream = +[](Base* b, const rpp::disposable_wrapper& d) { cast(b).set_upstream(d); }, + .is_disposed = +[](const Base* b) { return cast(b).is_disposed(); } + } + , m_observer{std::move(observer)} + { + } + private: + TObs m_observer; + }; template class dynamic_strategy final @@ -45,52 +72,23 @@ namespace rpp::details::observers template Strategy> requires (!rpp::constraint::decayed_same_as>) explicit dynamic_strategy(observer&& obs) - : m_forwarder{std::make_shared>(std::move(obs))} - , m_vtable{vtable::template create>()} + : m_observer{std::make_shared>>(std::move(obs))} { } - void set_upstream(const disposable_wrapper& d) noexcept { m_vtable->set_upstream(m_forwarder.get(), d); } + void set_upstream(const disposable_wrapper& d) noexcept { m_observer->set_upstream(m_observer.get(), d); } - bool is_disposed() const noexcept { return m_vtable->is_disposed(m_forwarder.get()); } + bool is_disposed() const noexcept { return m_observer->is_disposed(m_observer.get()); } - void on_next(const Type& v) const noexcept { m_vtable->on_next_lvalue(m_forwarder.get(), v); } + void on_next(const Type& v) const noexcept { m_observer->on_next_lvalue(m_observer.get(), v); } - void on_next(Type&& v) const noexcept { m_vtable->on_next_rvalue(m_forwarder.get(), std::move(v)); } + void on_next(Type&& v) const noexcept { m_observer->on_next_rvalue(m_observer.get(), std::move(v)); } - void on_error(const std::exception_ptr& err) const noexcept { m_vtable->on_error(m_forwarder.get(), err); } - - void on_completed() const noexcept { m_vtable->on_completed(m_forwarder.get()); } - - private: - struct vtable - { - void (*on_next_lvalue)(const void*, const Type&){}; - void (*on_next_rvalue)(const void*, Type&&){}; - void (*on_error)(const void*, const std::exception_ptr&){}; - void (*on_completed)(const void*){}; - - void (*set_upstream)(void*, const disposable_wrapper&){}; - bool (*is_disposed)(const void*){}; - - template - static const vtable* create() noexcept - { - static vtable s_res{ - .on_next_lvalue = &member_ptr_caller(&Strategy::on_next)>::call, - .on_next_rvalue = &member_ptr_caller(&Strategy::on_next)>::call, - .on_error = &member_ptr_caller<&Strategy::on_error>::call, - .on_completed = &member_ptr_caller<&Strategy::on_completed>::call, - .set_upstream = &member_ptr_caller<&Strategy::set_upstream>::call, - .is_disposed = &member_ptr_caller<&Strategy::is_disposed>::call, - }; - return &s_res; - } - }; + void on_error(const std::exception_ptr& err) const noexcept { m_observer->on_error(m_observer.get(), err); } + void on_completed() const noexcept { m_observer->on_completed(m_observer.get()); } private: - std::shared_ptr m_forwarder; - const vtable* m_vtable; + std::shared_ptr> m_observer; }; } // namespace rpp::details::observers diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index c760791d5..71714ce8b 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -39,7 +39,35 @@ namespace rpp::subjects::details class subject_state : public composite_disposable , public rpp::details::enable_wrapper_from_this> { - using observers = std::deque>>; + template + class disposable_with_observer : public rpp::details::observers::type_erased_observer + , public rpp::details::base_disposable + { + public: + disposable_with_observer(TObs&& observer, disposable_wrapper_impl state) + : rpp::details::observers::type_erased_observer{std::move(observer)} + , m_state{std::move(state)} + { + } + + private: + void base_dispose_impl(interface_disposable::Mode) noexcept override + { + if (const auto shared = m_state.lock()) + { + std::unique_lock lock{shared->m_mutex}; + process_state_unsafe(shared->m_state, + [&](const shared_observers& observers) { + shared->m_state = cleanup_observers(observers, this); + }); + } + } + + rpp::disposable_wrapper_impl m_state{}; + }; + + using observer = std::shared_ptr>; + using observers = std::deque; using shared_observers = std::shared_ptr; using state_t = std::variant; @@ -55,24 +83,13 @@ namespace rpp::subjects::details process_state_unsafe( m_state, [&](const shared_observers& observers) { - auto d = rpp::disposable_wrapper{make_callback_disposable( - [weak = this->wrapper_from_this().as_weak()]() noexcept // NOLINT(bugprone-exception-escape) - { - if (const auto shared = weak.lock()) - { - std::unique_lock lock{shared->m_mutex}; - process_state_unsafe(shared->m_state, - [&](const shared_observers& observers) { - shared->m_state = cleanup_observers(observers); - }); - } - })}; - - auto observer_as_dynamic = std::forward(observer).as_dynamic(); - observers->emplace_back(d.as_weak(), observer_as_dynamic); + auto d = disposable_wrapper_impl>>::make(std::forward(observer), this->wrapper_from_this().as_weak()); + auto ptr = d.lock(); + + observers->emplace_back(ptr); lock.unlock(); - observer_as_dynamic.set_upstream(std::move(d)); + ptr->set_upstream(ptr.get(), d.as_weak()); }, [&](const std::exception_ptr& err) { lock.unlock(); @@ -99,7 +116,7 @@ namespace rpp::subjects::details observers_lock.unlock(); std::lock_guard lock{m_serialized_mutex}; - std::for_each(begin, end, [&](const auto& pair) { pair.second.on_next(v); }); + std::for_each(begin, end, [&](const observer& obs) { obs->on_next_lvalue(obs.get(), v); }); } void on_error(const std::exception_ptr& err) @@ -107,7 +124,7 @@ namespace rpp::subjects::details { std::lock_guard lock{m_serialized_mutex}; if (const auto observers = exchange_observers_under_lock_if_there(err)) - rpp::utils::for_each(*observers, [&](const auto& pair) { pair.second.on_error(err); }); + rpp::utils::for_each(*observers, [&](const observer& obs) { obs->on_error(obs.get(), err); }); } dispose(); } @@ -117,7 +134,7 @@ namespace rpp::subjects::details { std::lock_guard lock{m_serialized_mutex}; if (const auto observers = exchange_observers_under_lock_if_there(completed{})) - rpp::utils::for_each(*observers, [](const auto& pair) { pair.second.on_completed(); }); + rpp::utils::for_each(*observers, [](const observer& obs) { obs->on_completed(obs.get()); }); } dispose(); } @@ -128,7 +145,7 @@ namespace rpp::subjects::details exchange_observers_under_lock_if_there(disposed{}); } - static shared_observers cleanup_observers(const shared_observers& current_subs) + static shared_observers cleanup_observers(const shared_observers& current_subs, const rpp::details::observers::observer_vtable* to_delete) { auto subs = std::make_shared(); if (current_subs) @@ -136,8 +153,8 @@ namespace rpp::subjects::details std::copy_if(current_subs->cbegin(), current_subs->cend(), std::back_inserter(*subs), - [](const auto& pair) { - return !pair.first.is_disposed() && !pair.second.is_disposed(); + [&to_delete](const observer& obs) { + return to_delete != obs.get() && !obs->is_disposed(obs.get()); }); } return subs; From 3fa7ef98a724429ca8e5bc7e76b98f5cc340a6b1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 4 Jun 2024 20:45:14 +0000 Subject: [PATCH 09/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/observers/dynamic_observer.hpp | 6 ++++-- src/rpp/rpp/subjects/details/subject_state.hpp | 10 +++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index 3378046b8..b08dedd47 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -55,8 +55,9 @@ namespace rpp::details::observers .on_error = +[](const Base* b, const std::exception_ptr& err) { cast(b).on_error(err); }, .on_completed = +[](const Base* b) { cast(b).on_completed(); }, .set_upstream = +[](Base* b, const rpp::disposable_wrapper& d) { cast(b).set_upstream(d); }, - .is_disposed = +[](const Base* b) { return cast(b).is_disposed(); } - } + .is_disposed = +[](const Base* b) { + return cast(b).is_disposed(); + }} , m_observer{std::move(observer)} { } @@ -87,6 +88,7 @@ namespace rpp::details::observers void on_error(const std::exception_ptr& err) const noexcept { m_observer->on_error(m_observer.get(), err); } void on_completed() const noexcept { m_observer->on_completed(m_observer.get()); } + private: std::shared_ptr> m_observer; }; diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 71714ce8b..a1aca1251 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -41,7 +41,7 @@ namespace rpp::subjects::details { template class disposable_with_observer : public rpp::details::observers::type_erased_observer - , public rpp::details::base_disposable + , public rpp::details::base_disposable { public: disposable_with_observer(TObs&& observer, disposable_wrapper_impl state) @@ -57,9 +57,9 @@ namespace rpp::subjects::details { std::unique_lock lock{shared->m_mutex}; process_state_unsafe(shared->m_state, - [&](const shared_observers& observers) { - shared->m_state = cleanup_observers(observers, this); - }); + [&](const shared_observers& observers) { + shared->m_state = cleanup_observers(observers, this); + }); } } @@ -83,7 +83,7 @@ namespace rpp::subjects::details process_state_unsafe( m_state, [&](const shared_observers& observers) { - auto d = disposable_wrapper_impl>>::make(std::forward(observer), this->wrapper_from_this().as_weak()); + auto d = disposable_wrapper_impl>>::make(std::forward(observer), this->wrapper_from_this().as_weak()); auto ptr = d.lock(); observers->emplace_back(ptr); From 92420cda8ee15fdf8190e7fb1ba2342f9a3f7d50 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 5 Jun 2024 22:28:31 +0300 Subject: [PATCH 10/13] simplify --- src/rpp/rpp/observers/dynamic_observer.hpp | 65 ++++++++++++------- .../rpp/subjects/details/subject_state.hpp | 25 ++++--- 2 files changed, 55 insertions(+), 35 deletions(-) diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index b08dedd47..1ca301eea 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -20,15 +20,33 @@ namespace rpp::details::observers { template - struct observer_vtable + class observer_vtable { - void (*on_next_lvalue)(const observer_vtable*, const Type&){}; - void (*on_next_rvalue)(const observer_vtable*, Type&&){}; - void (*on_error)(const observer_vtable*, const std::exception_ptr&){}; - void (*on_completed)(const observer_vtable*){}; + public: + void set_upstream(const disposable_wrapper& d) noexcept { m_vtable.set_upstream_ptr(this, d); } + bool is_disposed() const noexcept { return m_vtable.is_disposed_ptr(this); } + + void on_next(const Type& v) const noexcept { m_vtable.on_next_lvalue_ptr(this, v); } + void on_next(Type&& v) const noexcept { m_vtable.on_next_rvalue_ptr(this, std::move(v)); } + void on_error(const std::exception_ptr& err) const noexcept { m_vtable.on_error_ptr(this, err); } + void on_completed() const noexcept { m_vtable.on_completed_ptr(this); } + + protected: + struct vtable_t + { + void (*const on_next_lvalue_ptr)(const observer_vtable*, const Type&){}; + void (*const on_next_rvalue_ptr)(const observer_vtable*, Type&&){}; + void (*const on_error_ptr)(const observer_vtable*, const std::exception_ptr&){}; + void (*const on_completed_ptr)(const observer_vtable*){}; + + void (*const set_upstream_ptr)(observer_vtable*, const disposable_wrapper&){}; + bool (*const is_disposed_ptr)(const observer_vtable*){}; + }; - void (*set_upstream)(observer_vtable*, const disposable_wrapper&){}; - bool (*is_disposed)(const observer_vtable*){}; + observer_vtable(vtable_t&& vtable) + : m_vtable{std::move(vtable)}{} + + const vtable_t m_vtable{}; }; template @@ -36,6 +54,7 @@ namespace rpp::details::observers { using Type = rpp::utils::extract_observer_type_t; using Base = observer_vtable; + using Vtable = typename Base::vtable_t; static constexpr const TObs& cast(const Base* ptr) { @@ -49,15 +68,15 @@ namespace rpp::details::observers public: type_erased_observer(TObs&& observer) - : Base{ - .on_next_lvalue = +[](const Base* b, const Type& v) { cast(b).on_next(v); }, - .on_next_rvalue = +[](const Base* b, Type&& v) { cast(b).on_next(std::move(v)); }, - .on_error = +[](const Base* b, const std::exception_ptr& err) { cast(b).on_error(err); }, - .on_completed = +[](const Base* b) { cast(b).on_completed(); }, - .set_upstream = +[](Base* b, const rpp::disposable_wrapper& d) { cast(b).set_upstream(d); }, - .is_disposed = +[](const Base* b) { + : Base{Vtable{ + .on_next_lvalue_ptr = +[](const Base* b, const Type& v) { cast(b).on_next(v); }, + .on_next_rvalue_ptr = +[](const Base* b, Type&& v) { cast(b).on_next(std::move(v)); }, + .on_error_ptr = +[](const Base* b, const std::exception_ptr& err) { cast(b).on_error(err); }, + .on_completed_ptr = +[](const Base* b) { cast(b).on_completed(); }, + .set_upstream_ptr = +[](Base* b, const rpp::disposable_wrapper& d) { cast(b).set_upstream(d); }, + .is_disposed_ptr = +[](const Base* b) { return cast(b).is_disposed(); - }} + }}} , m_observer{std::move(observer)} { } @@ -77,17 +96,13 @@ namespace rpp::details::observers { } - void set_upstream(const disposable_wrapper& d) noexcept { m_observer->set_upstream(m_observer.get(), d); } - - bool is_disposed() const noexcept { return m_observer->is_disposed(m_observer.get()); } - - void on_next(const Type& v) const noexcept { m_observer->on_next_lvalue(m_observer.get(), v); } - - void on_next(Type&& v) const noexcept { m_observer->on_next_rvalue(m_observer.get(), std::move(v)); } - - void on_error(const std::exception_ptr& err) const noexcept { m_observer->on_error(m_observer.get(), err); } + void set_upstream(const disposable_wrapper& d) noexcept { m_observer->set_upstream(d); } + bool is_disposed() const noexcept { return m_observer->is_disposed(); } - void on_completed() const noexcept { m_observer->on_completed(m_observer.get()); } + void on_next(const Type& v) const noexcept { m_observer->on_next(v); } + void on_next(Type&& v) const noexcept { m_observer->on_next(std::move(v)); } + void on_error(const std::exception_ptr& err) const noexcept { m_observer->on_error(err); } + void on_completed() const noexcept { m_observer->on_completed(); } private: std::shared_ptr> m_observer; diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index a1aca1251..b60427c86 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -44,7 +44,7 @@ namespace rpp::subjects::details , public rpp::details::base_disposable { public: - disposable_with_observer(TObs&& observer, disposable_wrapper_impl state) + disposable_with_observer(TObs&& observer, std::weak_ptr state) : rpp::details::observers::type_erased_observer{std::move(observer)} , m_state{std::move(state)} { @@ -63,7 +63,7 @@ namespace rpp::subjects::details } } - rpp::disposable_wrapper_impl m_state{}; + std::weak_ptr m_state{}; }; using observer = std::shared_ptr>; @@ -82,14 +82,16 @@ namespace rpp::subjects::details std::unique_lock lock{m_mutex}; process_state_unsafe( m_state, - [&](const shared_observers& observers) { - auto d = disposable_wrapper_impl>>::make(std::forward(observer), this->wrapper_from_this().as_weak()); + [&](shared_observers& observers) { + auto d = disposable_wrapper_impl>>::make(std::forward(observer), this->wrapper_from_this().lock()); auto ptr = d.lock(); + if (!observers) + observers = std::make_shared(); observers->emplace_back(ptr); lock.unlock(); - ptr->set_upstream(ptr.get(), d.as_weak()); + ptr->set_upstream(d.as_weak()); }, [&](const std::exception_ptr& err) { lock.unlock(); @@ -110,13 +112,16 @@ namespace rpp::subjects::details // we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call const auto observers = std::get(m_state); + if (!observers) + return; + const auto begin = observers->cbegin(); const auto end = observers->cend(); observers_lock.unlock(); std::lock_guard lock{m_serialized_mutex}; - std::for_each(begin, end, [&](const observer& obs) { obs->on_next_lvalue(obs.get(), v); }); + std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); }); } void on_error(const std::exception_ptr& err) @@ -124,7 +129,7 @@ namespace rpp::subjects::details { std::lock_guard lock{m_serialized_mutex}; if (const auto observers = exchange_observers_under_lock_if_there(err)) - rpp::utils::for_each(*observers, [&](const observer& obs) { obs->on_error(obs.get(), err); }); + rpp::utils::for_each(*observers, [&](const observer& obs) { obs->on_error(err); }); } dispose(); } @@ -134,7 +139,7 @@ namespace rpp::subjects::details { std::lock_guard lock{m_serialized_mutex}; if (const auto observers = exchange_observers_under_lock_if_there(completed{})) - rpp::utils::for_each(*observers, [](const observer& obs) { obs->on_completed(obs.get()); }); + rpp::utils::for_each(*observers, [](const observer& obs) { obs->on_completed(); }); } dispose(); } @@ -154,7 +159,7 @@ namespace rpp::subjects::details current_subs->cend(), std::back_inserter(*subs), [&to_delete](const observer& obs) { - return to_delete != obs.get() && !obs->is_disposed(obs.get()); + return to_delete != obs.get() && !obs->is_disposed(); }); } return subs; @@ -176,7 +181,7 @@ namespace rpp::subjects::details } private: - state_t m_state = std::make_shared(); + state_t m_state; std::mutex m_mutex{}; RPP_NO_UNIQUE_ADDRESS std::conditional_t m_serialized_mutex{}; }; From 97aa670552f4b1c69f788fff37e445b3cb9b7d54 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 6 Jun 2024 06:35:49 +0000 Subject: [PATCH 11/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/observers/dynamic_observer.hpp | 8 +++++--- src/rpp/rpp/subjects/details/subject_state.hpp | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index 1ca301eea..408de7bbd 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -44,7 +44,9 @@ namespace rpp::details::observers }; observer_vtable(vtable_t&& vtable) - : m_vtable{std::move(vtable)}{} + : m_vtable{std::move(vtable)} + { + } const vtable_t m_vtable{}; }; @@ -52,8 +54,8 @@ namespace rpp::details::observers template class type_erased_observer : public observer_vtable> { - using Type = rpp::utils::extract_observer_type_t; - using Base = observer_vtable; + using Type = rpp::utils::extract_observer_type_t; + using Base = observer_vtable; using Vtable = typename Base::vtable_t; static constexpr const TObs& cast(const Base* ptr) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index b60427c86..fc40101fa 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -115,8 +115,8 @@ namespace rpp::subjects::details if (!observers) return; - const auto begin = observers->cbegin(); - const auto end = observers->cend(); + const auto begin = observers->cbegin(); + const auto end = observers->cend(); observers_lock.unlock(); From 0f723a16b841139475121e33c24b89bcc33f9182 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 6 Jun 2024 09:43:06 +0300 Subject: [PATCH 12/13] fix --- src/rpp/rpp/subjects/details/subject_state.hpp | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index fc40101fa..edc28c12d 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -82,13 +82,16 @@ namespace rpp::subjects::details std::unique_lock lock{m_mutex}; process_state_unsafe( m_state, - [&](shared_observers& observers) { + [&](const shared_observers& observers) { auto d = disposable_wrapper_impl>>::make(std::forward(observer), this->wrapper_from_this().lock()); auto ptr = d.lock(); - - if (!observers) - observers = std::make_shared(); - observers->emplace_back(ptr); + if (!observers) { + auto new_observers = std::make_shared(); + new_observers->emplace_back(ptr); + m_state = std::move(new_observers); + } else { + observers->emplace_back(ptr); + } lock.unlock(); ptr->set_upstream(d.as_weak()); From 3c18e20db5e30e33f6aa5681a8963aebdd7fc1ac Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 6 Jun 2024 06:43:22 +0000 Subject: [PATCH 13/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/subjects/details/subject_state.hpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index edc28c12d..12969ffe3 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -85,11 +85,14 @@ namespace rpp::subjects::details [&](const shared_observers& observers) { auto d = disposable_wrapper_impl>>::make(std::forward(observer), this->wrapper_from_this().lock()); auto ptr = d.lock(); - if (!observers) { + if (!observers) + { auto new_observers = std::make_shared(); new_observers->emplace_back(ptr); m_state = std::move(new_observers); - } else { + } + else + { observers->emplace_back(ptr); }