Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ RPP follows this contract and especially this part. It means, that:
2. Any user-provided callbacks (for operators or observers) can be not thread-safe due to thread-safety of observable is guaranteed. <br>
For example: internal logic of `take` operator doesn't use mutexes or atomics due to underlying observable **MUST** emit items serially
3. When you implement your own operator via `create` be careful to **follow this contract**!
4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use synchronized subjects instead if can't guarantee serial emissions!
4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use serialized_subject instead if can't guarantee serial emissions!

It means, that for example:

Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/observables/observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ template<constraint::decayed_type Type, constraint::observable_strategy<Type> St
class observable
{
public:
using value_type = Type;

using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t<Strategy>;

template<typename... Args>
Expand Down
3 changes: 2 additions & 1 deletion src/rpp/rpp/subjects.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
* \ingroup rpp
*/

#include <rpp/subjects/publish_subject.hpp>
#include <rpp/subjects/publish_subject.hpp>
#include <rpp/subjects/serialized_subject.hpp>
12 changes: 12 additions & 0 deletions src/rpp/rpp/subjects/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class base_subject;

template<rpp::constraint::decayed_type Type>
class publish_strategy;

template<rpp::constraint::decayed_type Type>
class serialized_strategy;
}

namespace rpp::subjects
Expand All @@ -49,6 +52,15 @@ namespace rpp::subjects
*/
template<rpp::constraint::decayed_type Type>
using publish_subject = details::base_subject<Type, details::publish_strategy<Type>>;

/**
* @brief Same as rpp::subjects::publish_subject, but on_next/on_error/on_completed calls are serialized via mutex
*
* @ingroup subjects
* @see https://reactivex.io/documentation/subject.html
*/
template<rpp::constraint::decayed_type Type>
using serialized_subject = details::base_subject<Type, details::serialized_strategy<Type>>;
}

namespace rpp::subjects::utils
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/subjects/publish_subject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <rpp/observers/observer.hpp>
#include <rpp/subjects/details/base_subject.hpp>
#include <rpp/subjects/details/subject_state.hpp>
#include <rpp/disposables/disposable_wrapper.hpp>

#include <memory>

Expand Down
85 changes: 85 additions & 0 deletions src/rpp/rpp/subjects/serialized_subject.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#pragma once

#include <rpp/subjects/fwd.hpp>

#include <rpp/observers/observer.hpp>
#include <rpp/subjects/details/base_subject.hpp>
#include <rpp/subjects/details/subject_state.hpp>
#include <rpp/disposables/disposable_wrapper.hpp>

#include <memory>

namespace rpp::subjects::details
{
template<rpp::constraint::decayed_type Type>
class serialized_strategy
{
struct serialized_state
{
std::mutex mutex{};
subject_state<Type> state{};
};

struct observer_strategy
{
std::shared_ptr<serialized_state> state{};

void set_upstream(const disposable_wrapper& d) const noexcept { state->state.add(d); }

bool is_disposed() const noexcept
{
return state->state.is_disposed();
}

void on_next(const Type& v) const
{
std::lock_guard lock{state->mutex};
state->state.on_next(v);
}

void on_error(const std::exception_ptr& err) const
{
std::lock_guard lock{state->mutex};
state->state.on_error(err);
}

void on_completed() const
{
std::lock_guard lock{state->mutex};
state->state.on_completed();
}
};

public:

using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t<subject_state<Type>>;

auto get_observer() const
{
return rpp::observer<Type, rpp::details::with_external_disposable<observer_strategy>>{composite_disposable_wrapper{std::shared_ptr<subject_state<Type>>{m_state, &m_state->state}}, observer_strategy{m_state}};
}

template<rpp::constraint::observer_of_type<Type> TObs>
void on_subscribe(TObs&& observer) const
{
m_state->state.on_subscribe(std::forward<TObs>(observer));
}

rpp::disposable_wrapper get_disposable() const
{
return rpp::disposable_wrapper{m_state->state};
}

private:
std::shared_ptr<serialized_state> m_state = std::make_shared<serialized_state>();
};
} // namespace rpp::subjects::details
34 changes: 33 additions & 1 deletion src/tests/rpp/test_subjects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@

#include <snitch/snitch.hpp>

#include <rpp/operators/as_blocking.hpp>
#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/subjects/serialized_subject.hpp>
#include <rpp/subjects/publish_subject.hpp>
#include <rpp/sources/create.hpp>


#include "mock_observer.hpp"
#include <rpp/disposables/composite_disposable.hpp>

#include <thread>

TEST_CASE("publish subject multicasts values")
{
Expand Down Expand Up @@ -223,4 +229,30 @@ TEST_CASE("publish subject caches error/completed")
}
}
}
}

TEST_CASE("serialized_subject handles race condition")
{
auto subj = rpp::subjects::serialized_subject<int>{};
SECTION("call on_next from 2 threads")
{
bool on_error_called{};
rpp::source::create<int>([&](auto&& obs)
{
subj.get_observable().subscribe(std::forward<decltype(obs)>(obs));
subj.get_observer().on_next(1);
})
| rpp::operators::as_blocking()
| rpp::operators::subscribe([&](int)
{
CHECK(!on_error_called);
std::thread{[&]{ subj.get_observer().on_error({}); }}.detach();

std::this_thread::sleep_for(std::chrono::seconds{1});
CHECK(!on_error_called);
},
[&](const std::exception_ptr&){ on_error_called = true; });

CHECK(on_error_called);
}
}