diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index 408de7bbd..deaf499d1 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -19,73 +19,24 @@ namespace rpp::details::observers { - template - class observer_vtable - { - public: - void set_upstream(const disposable_wrapper& d) noexcept { m_vtable.set_upstream_ptr(this, d); } - bool is_disposed() const noexcept { return m_vtable.is_disposed_ptr(this); } - - void on_next(const Type& v) const noexcept { m_vtable.on_next_lvalue_ptr(this, v); } - void on_next(Type&& v) const noexcept { m_vtable.on_next_rvalue_ptr(this, std::move(v)); } - void on_error(const std::exception_ptr& err) const noexcept { m_vtable.on_error_ptr(this, err); } - void on_completed() const noexcept { m_vtable.on_completed_ptr(this); } - - protected: - struct vtable_t - { - void (*const on_next_lvalue_ptr)(const observer_vtable*, const Type&){}; - void (*const on_next_rvalue_ptr)(const observer_vtable*, Type&&){}; - void (*const on_error_ptr)(const observer_vtable*, const std::exception_ptr&){}; - void (*const on_completed_ptr)(const observer_vtable*){}; - - void (*const set_upstream_ptr)(observer_vtable*, const disposable_wrapper&){}; - bool (*const is_disposed_ptr)(const observer_vtable*){}; - }; - - observer_vtable(vtable_t&& vtable) - : m_vtable{std::move(vtable)} - { - } + template + struct member_ptr_caller_impl; - const vtable_t m_vtable{}; + template + struct member_ptr_caller_impl + { + static R call(void* data, Args... args) noexcept(NoExcept) { return (static_cast(data)->*F)(static_cast(args)...); } }; - template - class type_erased_observer : public observer_vtable> + template + struct member_ptr_caller_impl { - using Type = rpp::utils::extract_observer_type_t; - using Base = observer_vtable; - using Vtable = typename Base::vtable_t; - - static constexpr const TObs& cast(const Base* ptr) - { - return static_cast(ptr)->m_observer; - } - - static constexpr TObs& cast(Base* ptr) - { - return static_cast(ptr)->m_observer; - } + static R call(const void* data, Args... args) noexcept(NoExcept) { return (static_cast(data)->*F)(static_cast(args)...); } + }; - public: - type_erased_observer(TObs&& observer) - : Base{Vtable{ - .on_next_lvalue_ptr = +[](const Base* b, const Type& v) { cast(b).on_next(v); }, - .on_next_rvalue_ptr = +[](const Base* b, Type&& v) { cast(b).on_next(std::move(v)); }, - .on_error_ptr = +[](const Base* b, const std::exception_ptr& err) { cast(b).on_error(err); }, - .on_completed_ptr = +[](const Base* b) { cast(b).on_completed(); }, - .set_upstream_ptr = +[](Base* b, const rpp::disposable_wrapper& d) { cast(b).set_upstream(d); }, - .is_disposed_ptr = +[](const Base* b) { - return cast(b).is_disposed(); - }}} - , m_observer{std::move(observer)} - { - } + template + using member_ptr_caller = member_ptr_caller_impl; - private: - TObs m_observer; - }; template class dynamic_strategy final @@ -94,20 +45,52 @@ namespace rpp::details::observers template Strategy> requires (!rpp::constraint::decayed_same_as>) explicit dynamic_strategy(observer&& obs) - : m_observer{std::make_shared>>(std::move(obs))} + : m_forwarder{std::make_shared>(std::move(obs))} + , m_vtable{vtable::template create>()} { } - void set_upstream(const disposable_wrapper& d) noexcept { m_observer->set_upstream(d); } - bool is_disposed() const noexcept { return m_observer->is_disposed(); } + void set_upstream(const disposable_wrapper& d) noexcept { m_vtable->set_upstream(m_forwarder.get(), d); } + + bool is_disposed() const noexcept { return m_vtable->is_disposed(m_forwarder.get()); } - void on_next(const Type& v) const noexcept { m_observer->on_next(v); } - void on_next(Type&& v) const noexcept { m_observer->on_next(std::move(v)); } - void on_error(const std::exception_ptr& err) const noexcept { m_observer->on_error(err); } - void on_completed() const noexcept { m_observer->on_completed(); } + void on_next(const Type& v) const noexcept { m_vtable->on_next_lvalue(m_forwarder.get(), v); } + + void on_next(Type&& v) const noexcept { m_vtable->on_next_rvalue(m_forwarder.get(), std::move(v)); } + + void on_error(const std::exception_ptr& err) const noexcept { m_vtable->on_error(m_forwarder.get(), err); } + + void on_completed() const noexcept { m_vtable->on_completed(m_forwarder.get()); } + + private: + struct vtable + { + void (*on_next_lvalue)(const void*, const Type&){}; + void (*on_next_rvalue)(const void*, Type&&){}; + void (*on_error)(const void*, const std::exception_ptr&){}; + void (*on_completed)(const void*){}; + + void (*set_upstream)(void*, const disposable_wrapper&){}; + bool (*is_disposed)(const void*){}; + + template + static const vtable* create() noexcept + { + static vtable s_res{ + .on_next_lvalue = &member_ptr_caller(&Strategy::on_next)>::call, + .on_next_rvalue = &member_ptr_caller(&Strategy::on_next)>::call, + .on_error = &member_ptr_caller<&Strategy::on_error>::call, + .on_completed = &member_ptr_caller<&Strategy::on_completed>::call, + .set_upstream = &member_ptr_caller<&Strategy::set_upstream>::call, + .is_disposed = &member_ptr_caller<&Strategy::is_disposed>::call, + }; + return &s_res; + } + }; private: - std::shared_ptr> m_observer; + std::shared_ptr m_forwarder; + const vtable* m_vtable; }; } // namespace rpp::details::observers diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 12969ffe3..b902fa304 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -20,10 +20,10 @@ #include #include -#include #include #include #include +#include namespace rpp::subjects::details { @@ -39,36 +39,7 @@ namespace rpp::subjects::details class subject_state : public composite_disposable , public rpp::details::enable_wrapper_from_this> { - template - class disposable_with_observer : public rpp::details::observers::type_erased_observer - , public rpp::details::base_disposable - { - public: - disposable_with_observer(TObs&& observer, std::weak_ptr state) - : rpp::details::observers::type_erased_observer{std::move(observer)} - , m_state{std::move(state)} - { - } - - private: - void base_dispose_impl(interface_disposable::Mode) noexcept override - { - if (const auto shared = m_state.lock()) - { - std::unique_lock lock{shared->m_mutex}; - process_state_unsafe(shared->m_state, - [&](const shared_observers& observers) { - shared->m_state = cleanup_observers(observers, this); - }); - } - } - - std::weak_ptr m_state{}; - }; - - using observer = std::shared_ptr>; - using observers = std::deque; - using shared_observers = std::shared_ptr; + using shared_observers = std::shared_ptr>>; using state_t = std::variant; public: @@ -83,21 +54,13 @@ namespace rpp::subjects::details process_state_unsafe( m_state, [&](const shared_observers& observers) { - auto d = disposable_wrapper_impl>>::make(std::forward(observer), this->wrapper_from_this().lock()); - auto ptr = d.lock(); - if (!observers) - { - auto new_observers = std::make_shared(); - new_observers->emplace_back(ptr); - m_state = std::move(new_observers); - } - else - { - observers->emplace_back(ptr); - } + auto new_observers = make_copy_of_subscribed_observers(true, observers); + auto observer_as_dynamic = std::forward(observer).as_dynamic(); + new_observers->push_back(observer_as_dynamic); + m_state = std::move(new_observers); lock.unlock(); - ptr->set_upstream(d.as_weak()); + set_upstream(observer_as_dynamic); }, [&](const std::exception_ptr& err) { lock.unlock(); @@ -111,23 +74,9 @@ namespace rpp::subjects::details void on_next(const Type& v) { - std::unique_lock observers_lock{m_mutex}; - - if (!std::holds_alternative(m_state)) - return; - - // we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call - const auto observers = std::get(m_state); - if (!observers) - return; - - const auto begin = observers->cbegin(); - const auto end = observers->cend(); - - observers_lock.unlock(); - std::lock_guard lock{m_serialized_mutex}; - std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); }); + if (const auto observers = extract_observers_under_lock_if_there()) + rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_next(v); }); } void on_error(const std::exception_ptr& err) @@ -135,7 +84,7 @@ namespace rpp::subjects::details { std::lock_guard lock{m_serialized_mutex}; if (const auto observers = exchange_observers_under_lock_if_there(err)) - rpp::utils::for_each(*observers, [&](const observer& obs) { obs->on_error(err); }); + rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_error(err); }); } dispose(); } @@ -145,7 +94,7 @@ namespace rpp::subjects::details { std::lock_guard lock{m_serialized_mutex}; if (const auto observers = exchange_observers_under_lock_if_there(completed{})) - rpp::utils::for_each(*observers, [](const observer& obs) { obs->on_completed(); }); + rpp::utils::for_each(*observers, rpp::utils::static_mem_fn<&dynamic_observer::on_completed>{}); } dispose(); } @@ -156,26 +105,62 @@ namespace rpp::subjects::details exchange_observers_under_lock_if_there(disposed{}); } - static shared_observers cleanup_observers(const shared_observers& current_subs, const rpp::details::observers::observer_vtable* to_delete) + void set_upstream(rpp::dynamic_observer& obs) { - auto subs = std::make_shared(); + obs.set_upstream(rpp::disposable_wrapper{make_callback_disposable( + [weak = this->wrapper_from_this().as_weak()]() noexcept // NOLINT(bugprone-exception-escape) + { + if (const auto shared = weak.lock()) + { + std::unique_lock lock{shared->m_mutex}; + process_state_unsafe(shared->m_state, + [&](const shared_observers& observers) { + shared->m_state = make_copy_of_subscribed_observers(false, observers); + }); + } + })}); + } + + static shared_observers make_copy_of_subscribed_observers(bool add, const shared_observers& current_subs) + { + auto subs = std::make_shared>>(); + subs->reserve(deduce_new_size(add, current_subs)); if (current_subs) { std::copy_if(current_subs->cbegin(), current_subs->cend(), std::back_inserter(*subs), - [&to_delete](const observer& obs) { - return to_delete != obs.get() && !obs->is_disposed(); - }); + rpp::utils::static_not_mem_fn<&dynamic_observer::is_disposed>{}); } return subs; } + static size_t deduce_new_size(bool add, const shared_observers& current_subs) + { + if (!current_subs) + return add ? 1 : 0; + + if (add) + return current_subs->size() + 1; + + return std::max(current_subs->size(), size_t{1}) - 1; + } + static void process_state_unsafe(const state_t& state, const auto&... actions) { std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state); } + shared_observers extract_observers_under_lock_if_there() + { + std::lock_guard lock{m_mutex}; + + if (!std::holds_alternative(m_state)) + return {}; + + return std::get(m_state); + } + shared_observers exchange_observers_under_lock_if_there(state_t&& new_val) { std::lock_guard lock{m_mutex}; @@ -187,7 +172,7 @@ namespace rpp::subjects::details } private: - state_t m_state; + state_t m_state{}; std::mutex m_mutex{}; RPP_NO_UNIQUE_ADDRESS std::conditional_t m_serialized_mutex{}; }; diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index 04eaac90c..5f7ddbb74 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -20,7 +20,6 @@ #include #include "copy_count_tracker.hpp" -#include "rpp_trompeloil.hpp" #include @@ -138,38 +137,6 @@ TEST_CASE("publish subject multicasts values") } } -TEST_CASE("subject can be modified from on_next call") -{ - rpp::subjects::publish_subject subject{}; - mock_observer inner_mock{}; - - SECTION("subscribe inside on_next") - { - subject.get_observable().subscribe([&subject, &inner_mock](int) { - subject.get_observable().subscribe(inner_mock); - }); - - subject.get_observer().on_next(1); - - REQUIRE_CALL(*inner_mock, on_next_lvalue(2)); - subject.get_observer().on_next(2); - } - - SECTION("unsubscribe inside on_next") - { - auto d = rpp::composite_disposable_wrapper::make(); - - subject.get_observable().subscribe([d](int) { - d.clear(); - }); - subject.get_observable().subscribe(d, inner_mock); - - REQUIRE_CALL(*inner_mock, on_next_lvalue(1)); - subject.get_observer().on_next(1); - subject.get_observer().on_next(2); - } -} - TEST_CASE("publish subject caches error/completed") { auto mock = mock_observer_strategy{};