From 9be4007b21a7a9181f65bdc5e2554490084e8323 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 20 Jan 2024 19:58:15 +0300 Subject: [PATCH] Fix incorrect usage of subject_state --- .../rpp/subjects/details/subject_state.hpp | 4 ++-- src/rpp/rpp/subjects/serialized_subject.hpp | 21 +++++++++---------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 652c01bc7..656e32f52 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -35,8 +35,8 @@ struct disposed }; template -class subject_state final : public std::enable_shared_from_this> - , public composite_disposable +class subject_state : public std::enable_shared_from_this> + , public composite_disposable { using shared_observers = std::shared_ptr>>; using state_t = std::variant; diff --git a/src/rpp/rpp/subjects/serialized_subject.hpp b/src/rpp/rpp/subjects/serialized_subject.hpp index 76b28d536..56bd3220d 100644 --- a/src/rpp/rpp/subjects/serialized_subject.hpp +++ b/src/rpp/rpp/subjects/serialized_subject.hpp @@ -23,39 +23,38 @@ namespace rpp::subjects::details template class serialized_strategy { - struct serialized_state + struct serialized_state final : public subject_state { std::mutex mutex{}; - subject_state state{}; }; struct observer_strategy { std::shared_ptr state{}; - void set_upstream(const disposable_wrapper& d) const noexcept { state->state.add(d); } + void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } bool is_disposed() const noexcept { - return state->state.is_disposed(); + return state->is_disposed(); } void on_next(const Type& v) const { std::lock_guard lock{state->mutex}; - state->state.on_next(v); + state->on_next(v); } void on_error(const std::exception_ptr& err) const { std::lock_guard lock{state->mutex}; - state->state.on_error(err); + state->on_error(err); } void on_completed() const { std::lock_guard lock{state->mutex}; - state->state.on_completed(); + state->on_completed(); } }; @@ -65,18 +64,18 @@ class serialized_strategy auto get_observer() const { - return rpp::observer>{composite_disposable_wrapper{std::shared_ptr>{m_state, &m_state->state}}, observer_strategy{m_state}}; + return rpp::observer>{get_disposable(), observer_strategy{m_state}}; } template TObs> void on_subscribe(TObs&& observer) const { - m_state->state.on_subscribe(std::forward(observer)); + m_state->on_subscribe(std::forward(observer)); } - rpp::disposable_wrapper get_disposable() const + rpp::composite_disposable_wrapper get_disposable() const { - return rpp::disposable_wrapper{m_state->state}; + return rpp::composite_disposable_wrapper{m_state}; } private: