diff --git a/docs/readme.md b/docs/readme.md
index 07efaafa4..bf43eaeeb 100644
--- a/docs/readme.md
+++ b/docs/readme.md
@@ -143,7 +143,7 @@ RPP follows this contract and especially this part. It means, that:
2. Any user-provided callbacks (for operators or observers) can be not thread-safe due to thread-safety of observable is guaranteed.
For example: internal logic of `take` operator doesn't use mutexes or atomics due to underlying observable **MUST** emit items serially
3. When you implement your own operator via `create` be careful to **follow this contract**!
-4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use synchronized subjects instead if can't guarantee serial emissions!
+4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use serialized_subject instead if can't guarantee serial emissions!
It means, that for example:
diff --git a/src/rpp/rpp/observables/observable.hpp b/src/rpp/rpp/observables/observable.hpp
index 334a50809..4024f0426 100644
--- a/src/rpp/rpp/observables/observable.hpp
+++ b/src/rpp/rpp/observables/observable.hpp
@@ -37,6 +37,8 @@ template St
class observable
{
public:
+ using value_type = Type;
+
using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t;
template
diff --git a/src/rpp/rpp/subjects.hpp b/src/rpp/rpp/subjects.hpp
index e15eae7c7..3e5791c32 100644
--- a/src/rpp/rpp/subjects.hpp
+++ b/src/rpp/rpp/subjects.hpp
@@ -17,4 +17,5 @@
* \ingroup rpp
*/
-#include
\ No newline at end of file
+#include
+#include
\ No newline at end of file
diff --git a/src/rpp/rpp/subjects/fwd.hpp b/src/rpp/rpp/subjects/fwd.hpp
index a83f4a30d..bc55f27e6 100644
--- a/src/rpp/rpp/subjects/fwd.hpp
+++ b/src/rpp/rpp/subjects/fwd.hpp
@@ -31,6 +31,9 @@ class base_subject;
template
class publish_strategy;
+
+template
+class serialized_strategy;
}
namespace rpp::subjects
@@ -49,6 +52,15 @@ namespace rpp::subjects
*/
template
using publish_subject = details::base_subject>;
+
+/**
+ * @brief Same as rpp::subjects::publish_subject, but on_next/on_error/on_completed calls are serialized via mutex
+ *
+ * @ingroup subjects
+ * @see https://reactivex.io/documentation/subject.html
+ */
+template
+using serialized_subject = details::base_subject>;
}
namespace rpp::subjects::utils
diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp
index 56e5bc4db..427e8c763 100644
--- a/src/rpp/rpp/subjects/publish_subject.hpp
+++ b/src/rpp/rpp/subjects/publish_subject.hpp
@@ -14,6 +14,7 @@
#include
#include
#include
+#include
#include
diff --git a/src/rpp/rpp/subjects/serialized_subject.hpp b/src/rpp/rpp/subjects/serialized_subject.hpp
new file mode 100644
index 000000000..b9f8911ff
--- /dev/null
+++ b/src/rpp/rpp/subjects/serialized_subject.hpp
@@ -0,0 +1,85 @@
+// ReactivePlusPlus library
+//
+// Copyright Aleksey Loginov 2022 - present.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// https://www.boost.org/LICENSE_1_0.txt)
+//
+// Project home: https://github.com/victimsnino/ReactivePlusPlus
+
+#pragma once
+
+#include
+
+#include
+#include
+#include
+#include
+
+#include
+
+namespace rpp::subjects::details
+{
+template
+class serialized_strategy
+{
+ struct serialized_state
+ {
+ std::mutex mutex{};
+ subject_state state{};
+ };
+
+ struct observer_strategy
+ {
+ std::shared_ptr state{};
+
+ void set_upstream(const disposable_wrapper& d) const noexcept { state->state.add(d); }
+
+ bool is_disposed() const noexcept
+ {
+ return state->state.is_disposed();
+ }
+
+ void on_next(const Type& v) const
+ {
+ std::lock_guard lock{state->mutex};
+ state->state.on_next(v);
+ }
+
+ void on_error(const std::exception_ptr& err) const
+ {
+ std::lock_guard lock{state->mutex};
+ state->state.on_error(err);
+ }
+
+ void on_completed() const
+ {
+ std::lock_guard lock{state->mutex};
+ state->state.on_completed();
+ }
+ };
+
+public:
+
+ using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>;
+
+ auto get_observer() const
+ {
+ return rpp::observer>{composite_disposable_wrapper{std::shared_ptr>{m_state, &m_state->state}}, observer_strategy{m_state}};
+ }
+
+ template TObs>
+ void on_subscribe(TObs&& observer) const
+ {
+ m_state->state.on_subscribe(std::forward(observer));
+ }
+
+ rpp::disposable_wrapper get_disposable() const
+ {
+ return rpp::disposable_wrapper{m_state->state};
+ }
+
+private:
+ std::shared_ptr m_state = std::make_shared();
+};
+} // namespace rpp::subjects::details
\ No newline at end of file
diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp
index aa6d08fc0..efd00c749 100644
--- a/src/tests/rpp/test_subjects.cpp
+++ b/src/tests/rpp/test_subjects.cpp
@@ -10,10 +10,16 @@
#include
+#include
+#include
+#include
#include
+#include
+
#include "mock_observer.hpp"
-#include
+
+#include
TEST_CASE("publish subject multicasts values")
{
@@ -223,4 +229,30 @@ TEST_CASE("publish subject caches error/completed")
}
}
}
+}
+
+TEST_CASE("serialized_subject handles race condition")
+{
+ auto subj = rpp::subjects::serialized_subject{};
+ SECTION("call on_next from 2 threads")
+ {
+ bool on_error_called{};
+ rpp::source::create([&](auto&& obs)
+ {
+ subj.get_observable().subscribe(std::forward(obs));
+ subj.get_observer().on_next(1);
+ })
+ | rpp::operators::as_blocking()
+ | rpp::operators::subscribe([&](int)
+ {
+ CHECK(!on_error_called);
+ std::thread{[&]{ subj.get_observer().on_error({}); }}.detach();
+
+ std::this_thread::sleep_for(std::chrono::seconds{1});
+ CHECK(!on_error_called);
+ },
+ [&](const std::exception_ptr&){ on_error_called = true; });
+
+ CHECK(on_error_called);
+ }
}
\ No newline at end of file