From e165f05291585de4600c976abc85c9bac3af259a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 2 Apr 2024 18:14:00 +0300 Subject: [PATCH 01/10] extract common code --- src/rpp/rpp/operators/concat.hpp | 10 ++-- src/rpp/rpp/operators/debounce.hpp | 6 +- .../operators/details/combining_strategy.hpp | 6 +- src/rpp/rpp/operators/details/utils.hpp | 59 ------------------- src/rpp/rpp/operators/merge.hpp | 6 +- src/rpp/rpp/operators/switch_on_next.hpp | 8 +-- src/rpp/rpp/operators/take_until.hpp | 6 +- src/rpp/rpp/operators/timeout.hpp | 6 +- src/rpp/rpp/operators/window_toggle.hpp | 6 +- src/rpp/rpp/operators/with_latest_from.hpp | 12 ++-- src/rpp/rpp/utils/utils.hpp | 44 ++++++++++++++ 11 files changed, 77 insertions(+), 92 deletions(-) delete mode 100644 src/rpp/rpp/operators/details/utils.hpp diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index cb5a28ee3..45761c7ee 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -14,7 +14,7 @@ #include #include -#include +#include #include #include @@ -42,8 +42,8 @@ namespace rpp::operators::details { } - pointer_under_lock get_observer() { return pointer_under_lock{m_observer}; } - pointer_under_lock> get_queue() { return pointer_under_lock{m_queue}; } + rpp::utils::pointer_under_lock get_observer() { return rpp::utils::pointer_under_lock{m_observer}; } + rpp::utils::pointer_under_lock> get_queue() { return rpp::utils::pointer_under_lock{m_queue}; } std::atomic& stage() { return m_stage; } @@ -97,8 +97,8 @@ namespace rpp::operators::details } private: - value_with_mutex m_observer; - value_with_mutex> m_queue; + rpp::utils::value_with_mutex m_observer; + rpp::utils::value_with_mutex> m_queue; std::atomic m_stage{}; }; diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 13339e901..cf7ca56d3 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -14,7 +14,7 @@ #include #include -#include +#include namespace rpp::operators::details { @@ -69,7 +69,7 @@ namespace rpp::operators::details return std::exchange(m_value_to_be_emitted, std::optional{}); } - pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return rpp::utils::pointer_under_lock{m_observer}; } private: void schedule() @@ -104,7 +104,7 @@ namespace rpp::operators::details return v; } - value_with_mutex m_observer; + rpp::utils::value_with_mutex m_observer; RPP_NO_UNIQUE_ADDRESS Worker m_worker; rpp::schedulers::duration m_period; diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 206c44727..fd6fba9bc 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include @@ -31,7 +31,7 @@ namespace rpp::operators::details { } - pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer_with_mutex}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return rpp::utils::pointer_under_lock{m_observer_with_mutex}; } bool decrement_on_completed() { @@ -40,7 +40,7 @@ namespace rpp::operators::details } private: - value_with_mutex m_observer_with_mutex{}; + rpp::utils::value_with_mutex m_observer_with_mutex{}; std::atomic_size_t m_on_completed_needed{sizeof...(Args)}; }; diff --git a/src/rpp/rpp/operators/details/utils.hpp b/src/rpp/rpp/operators/details/utils.hpp deleted file mode 100644 index 8c6a0d4f6..000000000 --- a/src/rpp/rpp/operators/details/utils.hpp +++ /dev/null @@ -1,59 +0,0 @@ -// ReactivePlusPlus library -// -// Copyright Aleksey Loginov 2023 - present. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// https://www.boost.org/LICENSE_1_0.txt) -// -// Project home: https://github.com/victimsnino/ReactivePlusPlus -// - -#pragma once - -#include - -namespace rpp::operators::details -{ - template - struct value_with_mutex - { - value_with_mutex() = default; - - explicit value_with_mutex(const T& v) - : value{v} - { - } - - explicit value_with_mutex(T&& v) - : value{std::move(v)} - { - } - - T value{}; - std::mutex mutex{}; - }; - - template - class pointer_under_lock - { - public: - explicit pointer_under_lock(value_with_mutex& value) - : pointer_under_lock{value.value, value.mutex} - { - } - - pointer_under_lock(T& val, std::mutex& mutex) - : m_ptr{&val} - , m_lock{mutex} - { - } - - T* operator->() { return m_ptr; } - - const T* operator->() const { return m_ptr; } - - private: - T* m_ptr; - std::scoped_lock m_lock; - }; -} // namespace rpp::operators::details \ No newline at end of file diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 80634c823..0fab2099e 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include @@ -38,10 +38,10 @@ namespace rpp::operators::details // just need atomicity, not guarding anything bool decrement_on_completed() { return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; } - pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return rpp::utils::pointer_under_lock{m_observer}; } private: - value_with_mutex m_observer{}; + rpp::utils::value_with_mutex m_observer{}; std::atomic_size_t m_on_completed_needed{1}; }; diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index a789cd7e8..3480932ed 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include namespace rpp::operators::details { @@ -33,13 +33,13 @@ namespace rpp::operators::details switch_on_next_state_t(const switch_on_next_state_t&) = delete; switch_on_next_state_t(switch_on_next_state_t&&) noexcept = delete; - pointer_under_lock get_observer() + rpp::utils::pointer_under_lock get_observer() { - return pointer_under_lock{m_observer_with_mutex}; + return rpp::utils::pointer_under_lock{m_observer_with_mutex}; } private: - value_with_mutex m_observer_with_mutex{}; + rpp::utils::value_with_mutex m_observer_with_mutex{}; }; template diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index 04a9b1e23..cd9e21292 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -14,7 +14,7 @@ #include #include -#include +#include #include namespace rpp::operators::details @@ -33,10 +33,10 @@ namespace rpp::operators::details { } - pointer_under_lock get_observer() { return pointer_under_lock{m_observer_with_mutex}; } + rpp::utils::pointer_under_lock get_observer() { return rpp::utils::pointer_under_lock{m_observer_with_mutex}; } private: - value_with_mutex m_observer_with_mutex{}; + rpp::utils::value_with_mutex m_observer_with_mutex{}; }; template diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 494a66ca7..7c4fda71f 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -13,7 +13,7 @@ #include #include -#include +#include #include #include @@ -35,14 +35,14 @@ namespace rpp::operators::details , m_fallback{fallback} { } - pointer_under_lock get_observer_with_timeout_under_lock() { return pointer_under_lock{m_observer_with_timeout}; } + rpp::utils::pointer_under_lock get_observer_with_timeout_under_lock() { return rpp::utils::pointer_under_lock{m_observer_with_timeout}; } const TFallbackObservable& get_fallback() const { return m_fallback; } rpp::schedulers::duration get_period() const { return m_period; } private: - value_with_mutex m_observer_with_timeout; + rpp::utils::value_with_mutex m_observer_with_timeout; const rpp::schedulers::duration m_period; const TFallbackObservable m_fallback; diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 5a67b28ad..f5d375add 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -16,7 +16,7 @@ #include #include #include -#include +#include #include #include @@ -50,7 +50,7 @@ namespace rpp::operators::details { } - rpp::operators::details::pointer_under_lock get_state_under_lock() { return rpp::operators::details::pointer_under_lock{m_state}; } + rpp::utils::pointer_under_lock get_state_under_lock() { return rpp::utils::pointer_under_lock{m_state}; } template auto get_closing(T&& v) const @@ -67,7 +67,7 @@ namespace rpp::operators::details } private: - rpp::operators::details::value_with_mutex m_state{}; + rpp::utils::value_with_mutex m_state{}; RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn m_closings; }; diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index c966bbda5..12352df04 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include @@ -32,15 +32,15 @@ namespace rpp::operators::details { } - pointer_under_lock get_observer_under_lock() { return pointer_under_lock{observer_with_mutex}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return rpp::utils::pointer_under_lock{observer_with_mutex}; } - rpp::utils::tuple>...>& get_values() { return values; } + rpp::utils::tuple>...>& get_values() { return values; } const TSelector& get_selector() const { return selector; } private: - value_with_mutex observer_with_mutex{}; - rpp::utils::tuple>...> values{}; + rpp::utils::value_with_mutex observer_with_mutex{}; + rpp::utils::tuple>...> values{}; RPP_NO_UNIQUE_ADDRESS TSelector selector; }; @@ -99,7 +99,7 @@ namespace rpp::operators::details template void on_next(T&& v) const { - auto result = disposable->get_values().apply([this, &v](value_with_mutex>&... vals) -> std::optional { + auto result = disposable->get_values().apply([this, &v](rpp::utils::value_with_mutex>&... vals) -> std::optional { auto lock = std::scoped_lock{vals.mutex...}; if ((vals.value.has_value() && ...)) diff --git a/src/rpp/rpp/utils/utils.hpp b/src/rpp/rpp/utils/utils.hpp index 65c33865a..85196bc74 100644 --- a/src/rpp/rpp/utils/utils.hpp +++ b/src/rpp/rpp/utils/utils.hpp @@ -15,6 +15,7 @@ #include #include +#include namespace rpp::utils { @@ -253,6 +254,49 @@ namespace rpp::utils static constexpr void try_lock() {} }; + template + struct value_with_mutex + { + value_with_mutex() = default; + + explicit value_with_mutex(const T& v) + : value{v} + { + } + + explicit value_with_mutex(T&& v) + : value{std::move(v)} + { + } + + T value{}; + std::mutex mutex{}; + }; + + template + class pointer_under_lock + { + public: + explicit pointer_under_lock(value_with_mutex& value) + : pointer_under_lock{value.value, value.mutex} + { + } + + pointer_under_lock(T& val, std::mutex& mutex) + : m_ptr{&val} + , m_lock{mutex} + { + } + + T* operator->() { return m_ptr; } + + const T* operator->() const { return m_ptr; } + + private: + T* m_ptr; + std::scoped_lock m_lock; + }; + #define RPP_CALL_DURING_CONSTRUCTION(...) RPP_NO_UNIQUE_ADDRESS rpp::utils::none _ = [&]() { \ __VA_ARGS__; \ return rpp::utils::none{}; \ From 5d0e409c8809d05145ae31e82ae478669c862821 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 2 Apr 2024 18:14:05 +0300 Subject: [PATCH 02/10] add behavior subject --- src/rpp/rpp/subjects.hpp | 3 +- src/rpp/rpp/subjects/behavior_subject.hpp | 126 ++++++++++++++++++++++ src/rpp/rpp/subjects/fwd.hpp | 7 ++ 3 files changed, 135 insertions(+), 1 deletion(-) create mode 100644 src/rpp/rpp/subjects/behavior_subject.hpp diff --git a/src/rpp/rpp/subjects.hpp b/src/rpp/rpp/subjects.hpp index bf5ed2934..42af54881 100644 --- a/src/rpp/rpp/subjects.hpp +++ b/src/rpp/rpp/subjects.hpp @@ -18,4 +18,5 @@ */ #include -#include \ No newline at end of file +#include +#include \ No newline at end of file diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp new file mode 100644 index 000000000..aaaf4d815 --- /dev/null +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -0,0 +1,126 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace rpp::subjects::details +{ + template + class behavior_subject_base + { + class behavior_state final : public subject_state + { + public: + behavior_state(){} + + rpp::utils::pointer_under_lock get_value() { return rpp::utils::pointer_under_lock{m_value}; } + + private: + rpp::utils::value_with_mutex m_value{}; + }; + + struct observer_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr state; + + void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } + + bool is_disposed() const noexcept { return state->is_disposed(); } + + void on_next(const Type& v) const + { + *state->get_value() = v; + state->on_next(v); + } + + void on_error(const std::exception_ptr& err) const { state->on_error(err); } + + void on_completed() const { state->on_completed(); } + }; + + public: + using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; + + behavior_subject_base() + : m_state{disposable_wrapper_impl::make()} + { + } + + auto get_observer() const + { + return rpp::observer{m_state.lock()}; + } + + auto get_observable() const + { + return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) { + const auto locked = state.lock(); + if (!locked->is_disposed()) + { + auto v = *locked->get_value(); + observer.on_next(std::move(v)); + } + locked->on_subscribe(std::forward(observer)); + }); + } + + rpp::disposable_wrapper get_disposable() const + { + return m_state; + } + + private: + disposable_wrapper_impl m_state; + }; +} // namespace rpp::subjects::details + +namespace rpp::subjects +{ + /** + * @brief Same as rpp::subjects::publish_subject but keeps last value (or default) and emits it to newly subscribed observer + * + * @tparam Type value provided by this subject + * + * @ingroup subjects + * @see https://reactivex.io/documentation/subject.html + */ + template + class behavior_subject final : public details::behavior_subject_base + { + public: + using details::behavior_subject_base::behavior_subject_base; + }; + + /** + * @brief Same as rpp::subjects::behavior_subject but on_next/on_error/on_completed calls are serialized via mutex. + * @details When you are using ordinary rpp::subjects::behavior_subject, then you must take care not to call its on_next method (or its other on methods) in async way. + * + * @ingroup subjects + * @see https://reactivex.io/documentation/subject.html + */ + template + class serialized_behavior_subject final : public details::behavior_subject_base + { + public: + using details::behavior_subject_base::behavior_subject_base; + }; +} // namespace rpp::subjects \ No newline at end of file diff --git a/src/rpp/rpp/subjects/fwd.hpp b/src/rpp/rpp/subjects/fwd.hpp index 22daa29ed..76dd683ba 100644 --- a/src/rpp/rpp/subjects/fwd.hpp +++ b/src/rpp/rpp/subjects/fwd.hpp @@ -32,6 +32,13 @@ namespace rpp::subjects class serialized_replay_subject; + template + class behavior_subject; + + template + class serialized_behavior_subject; + + } // namespace rpp::subjects namespace rpp::constraint From e943ad5ad448fa4dea18ef28dc7b9fad1f88fa16 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 2 Apr 2024 18:19:43 +0300 Subject: [PATCH 03/10] adapt value_with_mutex --- src/rpp/rpp/subjects/behavior_subject.hpp | 3 +- src/rpp/rpp/utils/utils.hpp | 55 +++++++++++++---------- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp index aaaf4d815..c691545bd 100644 --- a/src/rpp/rpp/subjects/behavior_subject.hpp +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -28,7 +28,8 @@ namespace rpp::subjects::details class behavior_state final : public subject_state { public: - behavior_state(){} + behavior_state(const Type& v) : m_value{v} {} + behavior_state(Type&& v) : m_value{std::move(v)} {} rpp::utils::pointer_under_lock get_value() { return rpp::utils::pointer_under_lock{m_value}; } diff --git a/src/rpp/rpp/utils/utils.hpp b/src/rpp/rpp/utils/utils.hpp index 85196bc74..4a1d8fe6a 100644 --- a/src/rpp/rpp/utils/utils.hpp +++ b/src/rpp/rpp/utils/utils.hpp @@ -255,48 +255,57 @@ namespace rpp::utils }; template - struct value_with_mutex + class value_with_mutex { + public: value_with_mutex() = default; explicit value_with_mutex(const T& v) - : value{v} + : m_value{v} { } explicit value_with_mutex(T&& v) - : value{std::move(v)} + : m_value{std::move(v)} { } - T value{}; - std::mutex mutex{}; - }; - - template - class pointer_under_lock - { - public: - explicit pointer_under_lock(value_with_mutex& value) - : pointer_under_lock{value.value, value.mutex} + class pointer_under_lock { - } + public: + pointer_under_lock(value_with_mutex&& value) = default; - pointer_under_lock(T& val, std::mutex& mutex) - : m_ptr{&val} - , m_lock{mutex} - { - } + pointer_under_lock(value_with_mutex& value) + : pointer_under_lock{value.value, value.mutex} + { + } - T* operator->() { return m_ptr; } + private: + pointer_under_lock(T& val, std::mutex& mutex) + : m_ptr{&val} + , m_lock{mutex} + { + } + + public: + T* operator->() { return m_ptr; } + const T* operator->() const { return m_ptr; } + + private: + T* m_ptr; + std::scoped_lock m_lock; + }; - const T* operator->() const { return m_ptr; } private: - T* m_ptr; - std::scoped_lock m_lock; + T m_value{}; + std::mutex m_mutex{}; }; + template + using pointer_under_lock = typename value_with_mutex::pointer_under_lock; + + #define RPP_CALL_DURING_CONSTRUCTION(...) RPP_NO_UNIQUE_ADDRESS rpp::utils::none _ = [&]() { \ __VA_ARGS__; \ return rpp::utils::none{}; \ From 854c9d60d8efca36bed9f46ef4483d6a553386ad Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 2 Apr 2024 23:13:18 +0300 Subject: [PATCH 04/10] fix tests --- src/rpp/rpp/subjects/behavior_subject.hpp | 9 +++++++-- src/rpp/rpp/utils/utils.hpp | 7 +++++-- src/tests/rpp/test_subjects.cpp | 10 ++++++++-- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp index c691545bd..a8ab83003 100644 --- a/src/rpp/rpp/subjects/behavior_subject.hpp +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -61,8 +61,13 @@ namespace rpp::subjects::details public: using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; - behavior_subject_base() - : m_state{disposable_wrapper_impl::make()} + explicit behavior_subject_base(const Type& value) + : m_state{disposable_wrapper_impl::make(value)} + { + } + + explicit behavior_subject_base(Type&& value) + : m_state{disposable_wrapper_impl::make(std::move(value))} { } diff --git a/src/rpp/rpp/utils/utils.hpp b/src/rpp/rpp/utils/utils.hpp index 4a1d8fe6a..82dc65a10 100644 --- a/src/rpp/rpp/utils/utils.hpp +++ b/src/rpp/rpp/utils/utils.hpp @@ -273,10 +273,10 @@ namespace rpp::utils class pointer_under_lock { public: - pointer_under_lock(value_with_mutex&& value) = default; + pointer_under_lock(value_with_mutex&& value) = delete; pointer_under_lock(value_with_mutex& value) - : pointer_under_lock{value.value, value.mutex} + : pointer_under_lock{value.m_value, value.m_mutex} { } @@ -291,6 +291,9 @@ namespace rpp::utils T* operator->() { return m_ptr; } const T* operator->() const { return m_ptr; } + T& operator*() { return *m_ptr; } + const T& operator*() const { return *m_ptr; } + private: T* m_ptr; std::scoped_lock m_lock; diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index a50469f66..ae20c5fa7 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include "copy_count_tracker.hpp" #include "mock_observer.hpp" @@ -231,9 +232,14 @@ TEST_CASE("publish subject caches error/completed") } } -TEMPLATE_TEST_CASE("serialized subjects handles race condition", "", rpp::subjects::serialized_publish_subject, rpp::subjects::serialized_replay_subject) +TEMPLATE_TEST_CASE("serialized subjects handles race condition", "", rpp::subjects::serialized_publish_subject, rpp::subjects::serialized_replay_subject, rpp::subjects::serialized_behavior_subject) { - auto subj = TestType{}; + auto subj = []() { + if constexpr (std::same_as>) + return TestType{0}; + else + return TestType{}; + }(); SECTION("call on_next from 2 threads") { From c360092e9631cce629b817b9cf222c45f9b8f2d4 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 2 Apr 2024 23:50:54 +0300 Subject: [PATCH 05/10] tests --- src/tests/rpp/test_subjects.cpp | 54 +++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index ae20c5fa7..ccbcfde5b 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -19,6 +19,7 @@ #include "copy_count_tracker.hpp" #include "mock_observer.hpp" +#include "snitch_logging.hpp" #include @@ -433,4 +434,57 @@ TEMPLATE_TEST_CASE("replay subject doesn't introduce additional copies", "", rpp CHECK(tracker.get_move_count() == 0 + 1); // + 1 move to this observer }); } +} + +TEMPLATE_TEST_CASE("replay subject multicasts values and replay", "", rpp::subjects::behavior_subject, rpp::subjects::serialized_behavior_subject) +{ + const auto mock_1 = mock_observer_strategy{}; + const auto subj = TestType{10}; + + SECTION("subscribe to subject with default") + { + subj.get_observable().subscribe(mock_1); + CHECK(mock_1.get_received_values() == std::vector{10}); + + SECTION("emit value and subscribe other observer") + { + const auto mock_2 = mock_observer_strategy{}; + + subj.get_observer().on_next(5); + + CHECK(mock_1.get_received_values() == std::vector{10, 5}); + CHECK(mock_2.get_received_values() == std::vector{}); + + subj.get_observable().subscribe(mock_2); + + CHECK(mock_2.get_received_values() == std::vector{5}); + + SECTION("emit one more value and subscribe one more other observer") + { + const auto mock_3 = mock_observer_strategy{}; + subj.get_observer().on_next(1); + + CHECK(mock_1.get_received_values() == std::vector{10, 5, 1}); + CHECK(mock_2.get_received_values() == std::vector{5, 1}); + CHECK(mock_3.get_received_values() == std::vector{}); + + subj.get_observable().subscribe(mock_3); + + CHECK(mock_3.get_received_values() == std::vector{1}); + } + } + + SECTION("subject keeps error") + { + subj.get_observer().on_error(std::exception_ptr{}); + CHECK(mock_1.get_on_error_count() == 1); + + const auto mock_4 = mock_observer_strategy{}; + subj.get_observable().subscribe(mock_4); + + CHECK(mock_4.get_received_values() == std::vector{}); + CHECK(mock_4.get_on_error_count() == 1); + CHECK(mock_4.get_on_completed_count() == 0); + } + } } \ No newline at end of file From 0aa13b917b48992e5403046a97d8ef707230852b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 2 Apr 2024 20:50:40 +0000 Subject: [PATCH 06/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/operators/concat.hpp | 2 +- src/rpp/rpp/operators/debounce.hpp | 6 +++--- src/rpp/rpp/operators/details/combining_strategy.hpp | 2 +- src/rpp/rpp/operators/merge.hpp | 4 ++-- src/rpp/rpp/operators/take_until.hpp | 2 +- src/rpp/rpp/operators/timeout.hpp | 2 +- src/rpp/rpp/operators/window_toggle.hpp | 6 +++--- src/rpp/rpp/operators/with_latest_from.hpp | 4 ++-- src/rpp/rpp/subjects.hpp | 4 ++-- src/rpp/rpp/subjects/behavior_subject.hpp | 10 ++++++++-- src/rpp/rpp/utils/utils.hpp | 6 +++--- src/tests/rpp/test_subjects.cpp | 4 ++-- 12 files changed, 29 insertions(+), 23 deletions(-) diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 45761c7ee..d28508cbb 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -99,7 +99,7 @@ namespace rpp::operators::details private: rpp::utils::value_with_mutex m_observer; rpp::utils::value_with_mutex> m_queue; - std::atomic m_stage{}; + std::atomic m_stage{}; }; template diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index cf7ca56d3..8cf2466c5 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -104,9 +104,9 @@ namespace rpp::operators::details return v; } - rpp::utils::value_with_mutex m_observer; - RPP_NO_UNIQUE_ADDRESS Worker m_worker; - rpp::schedulers::duration m_period; + rpp::utils::value_with_mutex m_observer; + RPP_NO_UNIQUE_ADDRESS Worker m_worker; + rpp::schedulers::duration m_period; std::mutex m_mutex{}; std::optional m_time_when_value_should_be_emitted{}; diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index fd6fba9bc..1e4332c6e 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -15,8 +15,8 @@ #include #include #include -#include #include +#include #include diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 0fab2099e..02df3ecb8 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -15,9 +15,9 @@ #include #include #include -#include #include #include +#include #include @@ -42,7 +42,7 @@ namespace rpp::operators::details private: rpp::utils::value_with_mutex m_observer{}; - std::atomic_size_t m_on_completed_needed{1}; + std::atomic_size_t m_on_completed_needed{1}; }; template diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index cd9e21292..435cfaef9 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -14,8 +14,8 @@ #include #include -#include #include +#include namespace rpp::operators::details { diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 7c4fda71f..9f6c0e972 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -13,9 +13,9 @@ #include #include -#include #include #include +#include namespace rpp::operators::details { diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index f5d375add..c05fb3dd6 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -16,8 +16,8 @@ #include #include #include -#include #include +#include #include @@ -67,8 +67,8 @@ namespace rpp::operators::details } private: - rpp::utils::value_with_mutex m_state{}; - RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn m_closings; + rpp::utils::value_with_mutex m_state{}; + RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn m_closings; }; template diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index 12352df04..c13553552 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -15,8 +15,8 @@ #include #include #include -#include #include +#include #include @@ -41,7 +41,7 @@ namespace rpp::operators::details private: rpp::utils::value_with_mutex observer_with_mutex{}; rpp::utils::tuple>...> values{}; - RPP_NO_UNIQUE_ADDRESS TSelector selector; + RPP_NO_UNIQUE_ADDRESS TSelector selector; }; template diff --git a/src/rpp/rpp/subjects.hpp b/src/rpp/rpp/subjects.hpp index 42af54881..49cb7b52b 100644 --- a/src/rpp/rpp/subjects.hpp +++ b/src/rpp/rpp/subjects.hpp @@ -17,6 +17,6 @@ * \ingroup rpp */ +#include #include -#include -#include \ No newline at end of file +#include \ No newline at end of file diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp index a8ab83003..9219c9a44 100644 --- a/src/rpp/rpp/subjects/behavior_subject.hpp +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -28,8 +28,14 @@ namespace rpp::subjects::details class behavior_state final : public subject_state { public: - behavior_state(const Type& v) : m_value{v} {} - behavior_state(Type&& v) : m_value{std::move(v)} {} + behavior_state(const Type& v) + : m_value{v} + { + } + behavior_state(Type&& v) + : m_value{std::move(v)} + { + } rpp::utils::pointer_under_lock get_value() { return rpp::utils::pointer_under_lock{m_value}; } diff --git a/src/rpp/rpp/utils/utils.hpp b/src/rpp/rpp/utils/utils.hpp index 82dc65a10..209ebf347 100644 --- a/src/rpp/rpp/utils/utils.hpp +++ b/src/rpp/rpp/utils/utils.hpp @@ -288,12 +288,12 @@ namespace rpp::utils } public: - T* operator->() { return m_ptr; } + T* operator->() { return m_ptr; } const T* operator->() const { return m_ptr; } - T& operator*() { return *m_ptr; } + T& operator*() { return *m_ptr; } const T& operator*() const { return *m_ptr; } - + private: T* m_ptr; std::scoped_lock m_lock; diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index ccbcfde5b..89bc9e901 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -13,9 +13,9 @@ #include #include #include +#include #include #include -#include #include "copy_count_tracker.hpp" #include "mock_observer.hpp" @@ -439,7 +439,7 @@ TEMPLATE_TEST_CASE("replay subject doesn't introduce additional copies", "", rpp TEMPLATE_TEST_CASE("replay subject multicasts values and replay", "", rpp::subjects::behavior_subject, rpp::subjects::serialized_behavior_subject) { const auto mock_1 = mock_observer_strategy{}; - const auto subj = TestType{10}; + const auto subj = TestType{10}; SECTION("subscribe to subject with default") { From a8dff460714fd17e50a1723872fec97aeaa852bf Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 2 Apr 2024 23:54:41 +0300 Subject: [PATCH 07/10] compile fix --- src/rpp/rpp/operators/concat.hpp | 4 ++-- src/rpp/rpp/operators/debounce.hpp | 2 +- src/rpp/rpp/operators/details/combining_strategy.hpp | 2 +- src/rpp/rpp/operators/merge.hpp | 2 +- src/rpp/rpp/operators/switch_on_next.hpp | 2 +- src/rpp/rpp/operators/take_until.hpp | 2 +- src/rpp/rpp/operators/timeout.hpp | 2 +- src/rpp/rpp/operators/with_latest_from.hpp | 2 +- 8 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index d28508cbb..6b063e897 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -42,8 +42,8 @@ namespace rpp::operators::details { } - rpp::utils::pointer_under_lock get_observer() { return rpp::utils::pointer_under_lock{m_observer}; } - rpp::utils::pointer_under_lock> get_queue() { return rpp::utils::pointer_under_lock{m_queue}; } + rpp::utils::pointer_under_lock get_observer() { return m_observer; } + rpp::utils::pointer_under_lock> get_queue() { return m_queue; } std::atomic& stage() { return m_stage; } diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 8cf2466c5..4e2a5d1b7 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -69,7 +69,7 @@ namespace rpp::operators::details return std::exchange(m_value_to_be_emitted, std::optional{}); } - rpp::utils::pointer_under_lock get_observer_under_lock() { return rpp::utils::pointer_under_lock{m_observer}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer; } private: void schedule() diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 1e4332c6e..1d7995cdd 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -31,7 +31,7 @@ namespace rpp::operators::details { } - rpp::utils::pointer_under_lock get_observer_under_lock() { return rpp::utils::pointer_under_lock{m_observer_with_mutex}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer_with_mutex; } bool decrement_on_completed() { diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 02df3ecb8..511f43f1d 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -38,7 +38,7 @@ namespace rpp::operators::details // just need atomicity, not guarding anything bool decrement_on_completed() { return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; } - rpp::utils::pointer_under_lock get_observer_under_lock() { return rpp::utils::pointer_under_lock{m_observer}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer; } private: rpp::utils::value_with_mutex m_observer{}; diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index 3480932ed..15b017ee7 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -35,7 +35,7 @@ namespace rpp::operators::details rpp::utils::pointer_under_lock get_observer() { - return rpp::utils::pointer_under_lock{m_observer_with_mutex}; + return m_observer_with_mutex; } private: diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index 435cfaef9..acb43f37f 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -33,7 +33,7 @@ namespace rpp::operators::details { } - rpp::utils::pointer_under_lock get_observer() { return rpp::utils::pointer_under_lock{m_observer_with_mutex}; } + rpp::utils::pointer_under_lock get_observer() { return m_observer_with_mutex; } private: rpp::utils::value_with_mutex m_observer_with_mutex{}; diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 9f6c0e972..beda2ed36 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -35,7 +35,7 @@ namespace rpp::operators::details , m_fallback{fallback} { } - rpp::utils::pointer_under_lock get_observer_with_timeout_under_lock() { return rpp::utils::pointer_under_lock{m_observer_with_timeout}; } + rpp::utils::pointer_under_lock get_observer_with_timeout_under_lock() { return m_observer_with_timeout; } const TFallbackObservable& get_fallback() const { return m_fallback; } diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index c13553552..b657ba01a 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -32,7 +32,7 @@ namespace rpp::operators::details { } - rpp::utils::pointer_under_lock get_observer_under_lock() { return rpp::utils::pointer_under_lock{observer_with_mutex}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return observer_with_mutex; } rpp::utils::tuple>...>& get_values() { return values; } From f47e4a0687f402d11f4e7b3c13b61c2c2d7fc47d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 3 Apr 2024 00:03:03 +0300 Subject: [PATCH 08/10] fix --- src/rpp/rpp/operators/group_by.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rpp/rpp/operators/group_by.hpp b/src/rpp/rpp/operators/group_by.hpp index 5d1d887bf..e9b32e9f9 100644 --- a/src/rpp/rpp/operators/group_by.hpp +++ b/src/rpp/rpp/operators/group_by.hpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include From e95a2c23b5fb512befa0b6b7603bc4e3cd580ec1 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 3 Apr 2024 00:11:26 +0300 Subject: [PATCH 09/10] compile fixes --- src/rpp/rpp/operators/combine_latest.hpp | 2 +- src/rpp/rpp/operators/with_latest_from.hpp | 11 +++++------ src/rpp/rpp/operators/zip.hpp | 2 +- src/rpp/rpp/utils/utils.hpp | 3 +++ 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index e712f6628..671232cdf 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -56,7 +56,7 @@ namespace rpp::operators::details private: template - static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, const std::optional&... vals) + static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock& observer, const std::optional&... vals) { if ((vals.has_value() && ...)) observer->on_next(disposable->get_selector()(vals.value()...)); diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index b657ba01a..e6e8140ff 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -62,9 +62,8 @@ namespace rpp::operators::details template void on_next(T&& v) const { - auto& [value, mutex] = disposable->get_values().template get(); - std::scoped_lock lock{mutex}; - value.emplace(std::forward(v)); + auto locked_value = disposable->get_values().template get().lock(); + locked_value->emplace(std::forward(v)); } void on_error(const std::exception_ptr& err) const @@ -100,10 +99,10 @@ namespace rpp::operators::details void on_next(T&& v) const { auto result = disposable->get_values().apply([this, &v](rpp::utils::value_with_mutex>&... vals) -> std::optional { - auto lock = std::scoped_lock{vals.mutex...}; + auto lock = std::scoped_lock{vals.get_mutex()...}; - if ((vals.value.has_value() && ...)) - return disposable->get_selector()(rpp::utils::as_const(std::forward(v)), rpp::utils::as_const(vals.value.value())...); + if ((vals.get_value_unsafe().has_value() && ...)) + return disposable->get_selector()(rpp::utils::as_const(std::forward(v)), rpp::utils::as_const(vals.get_value_unsafe().value())...); return std::nullopt; }); diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp index 10664c5ac..ae0247314 100644 --- a/src/rpp/rpp/operators/zip.hpp +++ b/src/rpp/rpp/operators/zip.hpp @@ -57,7 +57,7 @@ namespace rpp::operators::details private: template - static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, std::deque&... values) + static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock& observer, std::deque&... values) { if ((!values.empty() && ...)) { diff --git a/src/rpp/rpp/utils/utils.hpp b/src/rpp/rpp/utils/utils.hpp index 209ebf347..5d3ba9bb3 100644 --- a/src/rpp/rpp/utils/utils.hpp +++ b/src/rpp/rpp/utils/utils.hpp @@ -299,7 +299,10 @@ namespace rpp::utils std::scoped_lock m_lock; }; + pointer_under_lock lock() { return *this; } + std::mutex& get_mutex() { return m_mutex; } + T& get_value_unsafe() { return m_value; } private: T m_value{}; std::mutex m_mutex{}; From cee7337b7fd82a9637a805e318721c4902da1ad4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 2 Apr 2024 21:11:11 +0000 Subject: [PATCH 10/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/utils/utils.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rpp/rpp/utils/utils.hpp b/src/rpp/rpp/utils/utils.hpp index 5d3ba9bb3..091af0d75 100644 --- a/src/rpp/rpp/utils/utils.hpp +++ b/src/rpp/rpp/utils/utils.hpp @@ -302,7 +302,8 @@ namespace rpp::utils pointer_under_lock lock() { return *this; } std::mutex& get_mutex() { return m_mutex; } - T& get_value_unsafe() { return m_value; } + T& get_value_unsafe() { return m_value; } + private: T m_value{}; std::mutex m_mutex{};