From 936e465295c7a96f8c3e2445e0e05c8419e000c2 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 5 Jan 2024 23:17:37 +0300 Subject: [PATCH 1/4] Init --- docs/readme.md | 2 +- src/rpp/rpp/subjects.hpp | 3 +- src/rpp/rpp/subjects/fwd.hpp | 12 +++ src/rpp/rpp/subjects/publish_subject.hpp | 1 - src/rpp/rpp/subjects/serialized_subject.hpp | 83 +++++++++++++++++++++ src/tests/rpp/test_subjects.cpp | 5 ++ 6 files changed, 103 insertions(+), 3 deletions(-) create mode 100644 src/rpp/rpp/subjects/serialized_subject.hpp diff --git a/docs/readme.md b/docs/readme.md index 07efaafa4..bf43eaeeb 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -143,7 +143,7 @@ RPP follows this contract and especially this part. It means, that: 2. Any user-provided callbacks (for operators or observers) can be not thread-safe due to thread-safety of observable is guaranteed.
For example: internal logic of `take` operator doesn't use mutexes or atomics due to underlying observable **MUST** emit items serially 3. When you implement your own operator via `create` be careful to **follow this contract**! -4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use synchronized subjects instead if can't guarantee serial emissions! +4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use serialized_subject instead if can't guarantee serial emissions! It means, that for example: diff --git a/src/rpp/rpp/subjects.hpp b/src/rpp/rpp/subjects.hpp index e15eae7c7..3e5791c32 100644 --- a/src/rpp/rpp/subjects.hpp +++ b/src/rpp/rpp/subjects.hpp @@ -17,4 +17,5 @@ * \ingroup rpp */ -#include \ No newline at end of file +#include +#include \ No newline at end of file diff --git a/src/rpp/rpp/subjects/fwd.hpp b/src/rpp/rpp/subjects/fwd.hpp index a83f4a30d..bc55f27e6 100644 --- a/src/rpp/rpp/subjects/fwd.hpp +++ b/src/rpp/rpp/subjects/fwd.hpp @@ -31,6 +31,9 @@ class base_subject; template class publish_strategy; + +template +class serialized_strategy; } namespace rpp::subjects @@ -49,6 +52,15 @@ namespace rpp::subjects */ template using publish_subject = details::base_subject>; + +/** + * @brief Same as rpp::subjects::publish_subject, but on_next/on_error/on_completed calls are serialized via mutex + * + * @ingroup subjects + * @see https://reactivex.io/documentation/subject.html + */ +template +using serialized_subject = details::base_subject>; } namespace rpp::subjects::utils diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 56e5bc4db..4015d2ea0 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -12,7 +12,6 @@ #include #include -#include #include #include diff --git a/src/rpp/rpp/subjects/serialized_subject.hpp b/src/rpp/rpp/subjects/serialized_subject.hpp new file mode 100644 index 000000000..53ff22308 --- /dev/null +++ b/src/rpp/rpp/subjects/serialized_subject.hpp @@ -0,0 +1,83 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include + +#include +#include + +#include + +namespace rpp::subjects::details +{ +template +class serialized_strategy +{ + struct serialized_state + { + std::mutex mutex{}; + subject_state state{}; + }; + + struct observer_strategy + { + std::shared_ptr state{}; + + void set_upstream(const disposable_wrapper& d) const noexcept { state->state.add(d); } + + bool is_disposed() const noexcept + { + return state->state.is_disposed(); + } + + void on_next(const Type& v) const + { + std::lock_guard lock{state->mutex}; + state->state.on_next(v); + } + + void on_error(const std::exception_ptr& err) const + { + std::lock_guard lock{state->mutex}; + state->state.on_error(err); + } + + void on_completed() const + { + std::lock_guard lock{state->mutex}; + state->state.on_completed(); + } + }; + +public: + + using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; + + auto get_observer() const + { + return rpp::observer>{composite_disposable_wrapper{m_state->state}, observer_strategy{m_state}}; + } + + template TObs> + void on_subscribe(TObs&& observer) const + { + m_state->state.on_subscribe(std::forward(observer)); + } + + rpp::disposable_wrapper get_disposable() const + { + return rpp::disposable_wrapper{m_state->state}; + } + +private: + std::shared_ptr m_state = std::make_shared(); +}; +} // namespace rpp::subjects::details \ No newline at end of file diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index aa6d08fc0..ae3631849 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -223,4 +223,9 @@ TEST_CASE("publish subject caches error/completed") } } } +} + +TEST_CASE("serialized_subject handles race condition") +{ + } \ No newline at end of file From e7813e0ccf248385e01c9380629e2a1190d9bd1f Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 5 Jan 2024 23:27:49 +0300 Subject: [PATCH 2/4] finalize --- src/rpp/rpp/subjects/publish_subject.hpp | 2 ++ src/rpp/rpp/subjects/serialized_subject.hpp | 4 ++- src/tests/rpp/test_subjects.cpp | 29 +++++++++++++++++++-- 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 4015d2ea0..427e8c763 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -12,7 +12,9 @@ #include #include +#include #include +#include #include diff --git a/src/rpp/rpp/subjects/serialized_subject.hpp b/src/rpp/rpp/subjects/serialized_subject.hpp index 53ff22308..b9f8911ff 100644 --- a/src/rpp/rpp/subjects/serialized_subject.hpp +++ b/src/rpp/rpp/subjects/serialized_subject.hpp @@ -12,7 +12,9 @@ #include #include +#include #include +#include #include @@ -63,7 +65,7 @@ class serialized_strategy auto get_observer() const { - return rpp::observer>{composite_disposable_wrapper{m_state->state}, observer_strategy{m_state}}; + return rpp::observer>{composite_disposable_wrapper{std::shared_ptr>{m_state, &m_state->state}}, observer_strategy{m_state}}; } template TObs> diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index ae3631849..cb415d1bc 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -10,10 +10,14 @@ #include +#include +#include +#include #include +#include + #include "mock_observer.hpp" -#include TEST_CASE("publish subject multicasts values") { @@ -227,5 +231,26 @@ TEST_CASE("publish subject caches error/completed") TEST_CASE("serialized_subject handles race condition") { - + auto subj = rpp::subjects::serialized_subject{}; + SECTION("call on_next from 2 threads") + { + bool on_error_called{}; + rpp::source::create([&](auto&& obs) + { + subj.get_observable().subscribe(std::forward(obs)); + subj.get_observer().on_next(1); + }) + | rpp::operators::as_blocking() + | rpp::operators::subscribe([&](int) + { + CHECK(!on_error_called); + std::thread{[&]{ subj.get_observer().on_error({}); }}.detach(); + + std::this_thread::sleep_for(std::chrono::seconds{1}); + CHECK(!on_error_called); + }, + [&](const std::exception_ptr&){ on_error_called = true; }); + + CHECK(on_error_called); + } } \ No newline at end of file From f74679f266b2e72635b89fcb0f196baec2197cd7 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 5 Jan 2024 23:39:17 +0300 Subject: [PATCH 3/4] mac compile --- src/tests/rpp/test_subjects.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index cb415d1bc..efd00c749 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -19,6 +19,8 @@ #include "mock_observer.hpp" +#include + TEST_CASE("publish subject multicasts values") { auto mock_1 = mock_observer_strategy{}; From 31087adb65f1a1d8c188de3792c7aad3bc770d82 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 5 Jan 2024 23:47:02 +0300 Subject: [PATCH 4/4] minor update --- src/rpp/rpp/observables/observable.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/rpp/rpp/observables/observable.hpp b/src/rpp/rpp/observables/observable.hpp index 334a50809..4024f0426 100644 --- a/src/rpp/rpp/observables/observable.hpp +++ b/src/rpp/rpp/observables/observable.hpp @@ -37,6 +37,8 @@ template St class observable { public: + using value_type = Type; + using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t; template