diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 78bfd6908..41e53a75a 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -20,7 +20,7 @@ #include #include -#include +#include #include #include #include @@ -67,7 +67,7 @@ namespace rpp::subjects::details }; using observer = std::shared_ptr>; - using observers = std::deque; + using observers = std::list; using shared_observers = std::shared_ptr; using state_t = std::variant; @@ -112,22 +112,21 @@ namespace rpp::subjects::details void on_next(const Type& v) { std::unique_lock observers_lock{m_mutex}; + process_state_unsafe(m_state, [&](shared_observers observers) { + if (!observers) + return; - if (!std::holds_alternative(m_state)) - return; + auto itr = observers->cbegin(); + const auto size = observers->size(); - // we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call - const auto observers = std::get(m_state); - if (!observers) - return; + observers_lock.unlock(); - const auto begin = observers->cbegin(); - const auto end = observers->cend(); - - observers_lock.unlock(); - - std::lock_guard lock{m_serialized_mutex}; - std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); }); + std::lock_guard lock{m_serialized_mutex}; + for (size_t i = 0; i < size; ++i) + { + (*(itr++))->on_next(v); + } + }); } void on_error(const std::exception_ptr& err) @@ -171,19 +170,18 @@ namespace rpp::subjects::details return subs; } - static void process_state_unsafe(const state_t& state, const auto&... actions) + static auto process_state_unsafe(const state_t& state, const auto&... actions) { - std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state); + return std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state); } shared_observers exchange_observers_under_lock_if_there(state_t&& new_val) { std::lock_guard lock{m_mutex}; - if (!std::holds_alternative(m_state)) - return {}; - - return std::get(std::exchange(m_state, std::move(new_val))); + return process_state_unsafe(m_state, [&](shared_observers observers) { + m_state = std::move(new_val); + return observers; }, [](auto) { return shared_observers{}; }); } private: diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index 3293d46fe..8a18a559d 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -169,6 +169,26 @@ TEST_CASE("subject can be modified from on_next call") } } +TEST_CASE("subject handles addition from inside on_next properly") +{ + rpp::subjects::publish_subject subject{}; + + SUBCASE("subscribe inside on_next") + { + int value = {}; + subject.get_observable().subscribe([&subject, &value](int v) { + for (int i = 0; i < 100; ++i) + subject.get_observable().subscribe([](int) {}); + value = v; + }); + + for (int i = 0; i < 100; ++i) + subject.get_observer().on_next(i); + + REQUIRE(value == 99); + } +} + TEST_CASE("publish subject caches error/completed") { auto mock = mock_observer_strategy{};