diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index deaf499d1..408de7bbd 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -19,24 +19,73 @@ namespace rpp::details::observers { - template - struct member_ptr_caller_impl; - - template - struct member_ptr_caller_impl + template + class observer_vtable { - static R call(void* data, Args... args) noexcept(NoExcept) { return (static_cast(data)->*F)(static_cast(args)...); } + public: + void set_upstream(const disposable_wrapper& d) noexcept { m_vtable.set_upstream_ptr(this, d); } + bool is_disposed() const noexcept { return m_vtable.is_disposed_ptr(this); } + + void on_next(const Type& v) const noexcept { m_vtable.on_next_lvalue_ptr(this, v); } + void on_next(Type&& v) const noexcept { m_vtable.on_next_rvalue_ptr(this, std::move(v)); } + void on_error(const std::exception_ptr& err) const noexcept { m_vtable.on_error_ptr(this, err); } + void on_completed() const noexcept { m_vtable.on_completed_ptr(this); } + + protected: + struct vtable_t + { + void (*const on_next_lvalue_ptr)(const observer_vtable*, const Type&){}; + void (*const on_next_rvalue_ptr)(const observer_vtable*, Type&&){}; + void (*const on_error_ptr)(const observer_vtable*, const std::exception_ptr&){}; + void (*const on_completed_ptr)(const observer_vtable*){}; + + void (*const set_upstream_ptr)(observer_vtable*, const disposable_wrapper&){}; + bool (*const is_disposed_ptr)(const observer_vtable*){}; + }; + + observer_vtable(vtable_t&& vtable) + : m_vtable{std::move(vtable)} + { + } + + const vtable_t m_vtable{}; }; - template - struct member_ptr_caller_impl + template + class type_erased_observer : public observer_vtable> { - static R call(const void* data, Args... args) noexcept(NoExcept) { return (static_cast(data)->*F)(static_cast(args)...); } - }; + using Type = rpp::utils::extract_observer_type_t; + using Base = observer_vtable; + using Vtable = typename Base::vtable_t; - template - using member_ptr_caller = member_ptr_caller_impl; + static constexpr const TObs& cast(const Base* ptr) + { + return static_cast(ptr)->m_observer; + } + static constexpr TObs& cast(Base* ptr) + { + return static_cast(ptr)->m_observer; + } + + public: + type_erased_observer(TObs&& observer) + : Base{Vtable{ + .on_next_lvalue_ptr = +[](const Base* b, const Type& v) { cast(b).on_next(v); }, + .on_next_rvalue_ptr = +[](const Base* b, Type&& v) { cast(b).on_next(std::move(v)); }, + .on_error_ptr = +[](const Base* b, const std::exception_ptr& err) { cast(b).on_error(err); }, + .on_completed_ptr = +[](const Base* b) { cast(b).on_completed(); }, + .set_upstream_ptr = +[](Base* b, const rpp::disposable_wrapper& d) { cast(b).set_upstream(d); }, + .is_disposed_ptr = +[](const Base* b) { + return cast(b).is_disposed(); + }}} + , m_observer{std::move(observer)} + { + } + + private: + TObs m_observer; + }; template class dynamic_strategy final @@ -45,52 +94,20 @@ namespace rpp::details::observers template Strategy> requires (!rpp::constraint::decayed_same_as>) explicit dynamic_strategy(observer&& obs) - : m_forwarder{std::make_shared>(std::move(obs))} - , m_vtable{vtable::template create>()} + : m_observer{std::make_shared>>(std::move(obs))} { } - void set_upstream(const disposable_wrapper& d) noexcept { m_vtable->set_upstream(m_forwarder.get(), d); } - - bool is_disposed() const noexcept { return m_vtable->is_disposed(m_forwarder.get()); } + void set_upstream(const disposable_wrapper& d) noexcept { m_observer->set_upstream(d); } + bool is_disposed() const noexcept { return m_observer->is_disposed(); } - void on_next(const Type& v) const noexcept { m_vtable->on_next_lvalue(m_forwarder.get(), v); } - - void on_next(Type&& v) const noexcept { m_vtable->on_next_rvalue(m_forwarder.get(), std::move(v)); } - - void on_error(const std::exception_ptr& err) const noexcept { m_vtable->on_error(m_forwarder.get(), err); } - - void on_completed() const noexcept { m_vtable->on_completed(m_forwarder.get()); } - - private: - struct vtable - { - void (*on_next_lvalue)(const void*, const Type&){}; - void (*on_next_rvalue)(const void*, Type&&){}; - void (*on_error)(const void*, const std::exception_ptr&){}; - void (*on_completed)(const void*){}; - - void (*set_upstream)(void*, const disposable_wrapper&){}; - bool (*is_disposed)(const void*){}; - - template - static const vtable* create() noexcept - { - static vtable s_res{ - .on_next_lvalue = &member_ptr_caller(&Strategy::on_next)>::call, - .on_next_rvalue = &member_ptr_caller(&Strategy::on_next)>::call, - .on_error = &member_ptr_caller<&Strategy::on_error>::call, - .on_completed = &member_ptr_caller<&Strategy::on_completed>::call, - .set_upstream = &member_ptr_caller<&Strategy::set_upstream>::call, - .is_disposed = &member_ptr_caller<&Strategy::is_disposed>::call, - }; - return &s_res; - } - }; + void on_next(const Type& v) const noexcept { m_observer->on_next(v); } + void on_next(Type&& v) const noexcept { m_observer->on_next(std::move(v)); } + void on_error(const std::exception_ptr& err) const noexcept { m_observer->on_error(err); } + void on_completed() const noexcept { m_observer->on_completed(); } private: - std::shared_ptr m_forwarder; - const vtable* m_vtable; + std::shared_ptr> m_observer; }; } // namespace rpp::details::observers diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index b902fa304..12969ffe3 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -20,10 +20,10 @@ #include #include +#include #include #include #include -#include namespace rpp::subjects::details { @@ -39,7 +39,36 @@ namespace rpp::subjects::details class subject_state : public composite_disposable , public rpp::details::enable_wrapper_from_this> { - using shared_observers = std::shared_ptr>>; + template + class disposable_with_observer : public rpp::details::observers::type_erased_observer + , public rpp::details::base_disposable + { + public: + disposable_with_observer(TObs&& observer, std::weak_ptr state) + : rpp::details::observers::type_erased_observer{std::move(observer)} + , m_state{std::move(state)} + { + } + + private: + void base_dispose_impl(interface_disposable::Mode) noexcept override + { + if (const auto shared = m_state.lock()) + { + std::unique_lock lock{shared->m_mutex}; + process_state_unsafe(shared->m_state, + [&](const shared_observers& observers) { + shared->m_state = cleanup_observers(observers, this); + }); + } + } + + std::weak_ptr m_state{}; + }; + + using observer = std::shared_ptr>; + using observers = std::deque; + using shared_observers = std::shared_ptr; using state_t = std::variant; public: @@ -54,13 +83,21 @@ namespace rpp::subjects::details process_state_unsafe( m_state, [&](const shared_observers& observers) { - auto new_observers = make_copy_of_subscribed_observers(true, observers); - auto observer_as_dynamic = std::forward(observer).as_dynamic(); - new_observers->push_back(observer_as_dynamic); - m_state = std::move(new_observers); + auto d = disposable_wrapper_impl>>::make(std::forward(observer), this->wrapper_from_this().lock()); + auto ptr = d.lock(); + if (!observers) + { + auto new_observers = std::make_shared(); + new_observers->emplace_back(ptr); + m_state = std::move(new_observers); + } + else + { + observers->emplace_back(ptr); + } lock.unlock(); - set_upstream(observer_as_dynamic); + ptr->set_upstream(d.as_weak()); }, [&](const std::exception_ptr& err) { lock.unlock(); @@ -74,9 +111,23 @@ namespace rpp::subjects::details void on_next(const Type& v) { + std::unique_lock observers_lock{m_mutex}; + + if (!std::holds_alternative(m_state)) + return; + + // we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call + const auto observers = std::get(m_state); + if (!observers) + return; + + const auto begin = observers->cbegin(); + const auto end = observers->cend(); + + observers_lock.unlock(); + std::lock_guard lock{m_serialized_mutex}; - if (const auto observers = extract_observers_under_lock_if_there()) - rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_next(v); }); + std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); }); } void on_error(const std::exception_ptr& err) @@ -84,7 +135,7 @@ namespace rpp::subjects::details { std::lock_guard lock{m_serialized_mutex}; if (const auto observers = exchange_observers_under_lock_if_there(err)) - rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_error(err); }); + rpp::utils::for_each(*observers, [&](const observer& obs) { obs->on_error(err); }); } dispose(); } @@ -94,7 +145,7 @@ namespace rpp::subjects::details { std::lock_guard lock{m_serialized_mutex}; if (const auto observers = exchange_observers_under_lock_if_there(completed{})) - rpp::utils::for_each(*observers, rpp::utils::static_mem_fn<&dynamic_observer::on_completed>{}); + rpp::utils::for_each(*observers, [](const observer& obs) { obs->on_completed(); }); } dispose(); } @@ -105,62 +156,26 @@ namespace rpp::subjects::details exchange_observers_under_lock_if_there(disposed{}); } - void set_upstream(rpp::dynamic_observer& obs) + static shared_observers cleanup_observers(const shared_observers& current_subs, const rpp::details::observers::observer_vtable* to_delete) { - obs.set_upstream(rpp::disposable_wrapper{make_callback_disposable( - [weak = this->wrapper_from_this().as_weak()]() noexcept // NOLINT(bugprone-exception-escape) - { - if (const auto shared = weak.lock()) - { - std::unique_lock lock{shared->m_mutex}; - process_state_unsafe(shared->m_state, - [&](const shared_observers& observers) { - shared->m_state = make_copy_of_subscribed_observers(false, observers); - }); - } - })}); - } - - static shared_observers make_copy_of_subscribed_observers(bool add, const shared_observers& current_subs) - { - auto subs = std::make_shared>>(); - subs->reserve(deduce_new_size(add, current_subs)); + auto subs = std::make_shared(); if (current_subs) { std::copy_if(current_subs->cbegin(), current_subs->cend(), std::back_inserter(*subs), - rpp::utils::static_not_mem_fn<&dynamic_observer::is_disposed>{}); + [&to_delete](const observer& obs) { + return to_delete != obs.get() && !obs->is_disposed(); + }); } return subs; } - static size_t deduce_new_size(bool add, const shared_observers& current_subs) - { - if (!current_subs) - return add ? 1 : 0; - - if (add) - return current_subs->size() + 1; - - return std::max(current_subs->size(), size_t{1}) - 1; - } - static void process_state_unsafe(const state_t& state, const auto&... actions) { std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state); } - shared_observers extract_observers_under_lock_if_there() - { - std::lock_guard lock{m_mutex}; - - if (!std::holds_alternative(m_state)) - return {}; - - return std::get(m_state); - } - shared_observers exchange_observers_under_lock_if_there(state_t&& new_val) { std::lock_guard lock{m_mutex}; @@ -172,7 +187,7 @@ namespace rpp::subjects::details } private: - state_t m_state{}; + state_t m_state; std::mutex m_mutex{}; RPP_NO_UNIQUE_ADDRESS std::conditional_t m_serialized_mutex{}; }; diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index 5f7ddbb74..04eaac90c 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -20,6 +20,7 @@ #include #include "copy_count_tracker.hpp" +#include "rpp_trompeloil.hpp" #include @@ -137,6 +138,38 @@ TEST_CASE("publish subject multicasts values") } } +TEST_CASE("subject can be modified from on_next call") +{ + rpp::subjects::publish_subject subject{}; + mock_observer inner_mock{}; + + SECTION("subscribe inside on_next") + { + subject.get_observable().subscribe([&subject, &inner_mock](int) { + subject.get_observable().subscribe(inner_mock); + }); + + subject.get_observer().on_next(1); + + REQUIRE_CALL(*inner_mock, on_next_lvalue(2)); + subject.get_observer().on_next(2); + } + + SECTION("unsubscribe inside on_next") + { + auto d = rpp::composite_disposable_wrapper::make(); + + subject.get_observable().subscribe([d](int) { + d.clear(); + }); + subject.get_observable().subscribe(d, inner_mock); + + REQUIRE_CALL(*inner_mock, on_next_lvalue(1)); + subject.get_observer().on_next(1); + subject.get_observer().on_next(2); + } +} + TEST_CASE("publish subject caches error/completed") { auto mock = mock_observer_strategy{};