Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/rpp/rpp/subjects/details/subject_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ struct disposed
};

template<rpp::constraint::decayed_type Type>
class subject_state final : public std::enable_shared_from_this<subject_state<Type>>
, public composite_disposable
class subject_state : public std::enable_shared_from_this<subject_state<Type>>
, public composite_disposable
{
using shared_observers = std::shared_ptr<std::vector<rpp::dynamic_observer<Type>>>;
using state_t = std::variant<shared_observers, std::exception_ptr, completed, disposed>;
Expand Down
21 changes: 10 additions & 11 deletions src/rpp/rpp/subjects/serialized_subject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,38 @@ namespace rpp::subjects::details
template<rpp::constraint::decayed_type Type>
class serialized_strategy
{
struct serialized_state
struct serialized_state final : public subject_state<Type>
{
std::mutex mutex{};
subject_state<Type> state{};
};

struct observer_strategy
{
std::shared_ptr<serialized_state> state{};

void set_upstream(const disposable_wrapper& d) const noexcept { state->state.add(d); }
void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }

bool is_disposed() const noexcept
{
return state->state.is_disposed();
return state->is_disposed();
}

void on_next(const Type& v) const
{
std::lock_guard lock{state->mutex};
state->state.on_next(v);
state->on_next(v);
}

void on_error(const std::exception_ptr& err) const
{
std::lock_guard lock{state->mutex};
state->state.on_error(err);
state->on_error(err);
}

void on_completed() const
{
std::lock_guard lock{state->mutex};
state->state.on_completed();
state->on_completed();
}
};

Expand All @@ -65,18 +64,18 @@ class serialized_strategy

auto get_observer() const
{
return rpp::observer<Type, rpp::details::with_external_disposable<observer_strategy>>{composite_disposable_wrapper{std::shared_ptr<subject_state<Type>>{m_state, &m_state->state}}, observer_strategy{m_state}};
return rpp::observer<Type, rpp::details::with_external_disposable<observer_strategy>>{get_disposable(), observer_strategy{m_state}};
}

template<rpp::constraint::observer_of_type<Type> TObs>
void on_subscribe(TObs&& observer) const
{
m_state->state.on_subscribe(std::forward<TObs>(observer));
m_state->on_subscribe(std::forward<TObs>(observer));
}

rpp::disposable_wrapper get_disposable() const
rpp::composite_disposable_wrapper get_disposable() const
{
return rpp::disposable_wrapper{m_state->state};
return rpp::composite_disposable_wrapper{m_state};
}

private:
Expand Down