diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index e712f6628..671232cdf 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -56,7 +56,7 @@ namespace rpp::operators::details private: template - static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, const std::optional&... vals) + static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock& observer, const std::optional&... vals) { if ((vals.has_value() && ...)) observer->on_next(disposable->get_selector()(vals.value()...)); diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index cb5a28ee3..6b063e897 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -14,7 +14,7 @@ #include #include -#include +#include #include #include @@ -42,8 +42,8 @@ namespace rpp::operators::details { } - pointer_under_lock get_observer() { return pointer_under_lock{m_observer}; } - pointer_under_lock> get_queue() { return pointer_under_lock{m_queue}; } + rpp::utils::pointer_under_lock get_observer() { return m_observer; } + rpp::utils::pointer_under_lock> get_queue() { return m_queue; } std::atomic& stage() { return m_stage; } @@ -97,9 +97,9 @@ namespace rpp::operators::details } private: - value_with_mutex m_observer; - value_with_mutex> m_queue; - std::atomic m_stage{}; + rpp::utils::value_with_mutex m_observer; + rpp::utils::value_with_mutex> m_queue; + std::atomic m_stage{}; }; template diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 13339e901..4e2a5d1b7 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -14,7 +14,7 @@ #include #include -#include +#include namespace rpp::operators::details { @@ -69,7 +69,7 @@ namespace rpp::operators::details return std::exchange(m_value_to_be_emitted, std::optional{}); } - pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer; } private: void schedule() @@ -104,9 +104,9 @@ namespace rpp::operators::details return v; } - value_with_mutex m_observer; - RPP_NO_UNIQUE_ADDRESS Worker m_worker; - rpp::schedulers::duration m_period; + rpp::utils::value_with_mutex m_observer; + RPP_NO_UNIQUE_ADDRESS Worker m_worker; + rpp::schedulers::duration m_period; std::mutex m_mutex{}; std::optional m_time_when_value_should_be_emitted{}; diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 206c44727..1d7995cdd 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -15,8 +15,8 @@ #include #include #include -#include #include +#include #include @@ -31,7 +31,7 @@ namespace rpp::operators::details { } - pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer_with_mutex}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer_with_mutex; } bool decrement_on_completed() { @@ -40,7 +40,7 @@ namespace rpp::operators::details } private: - value_with_mutex m_observer_with_mutex{}; + rpp::utils::value_with_mutex m_observer_with_mutex{}; std::atomic_size_t m_on_completed_needed{sizeof...(Args)}; }; diff --git a/src/rpp/rpp/operators/details/utils.hpp b/src/rpp/rpp/operators/details/utils.hpp deleted file mode 100644 index 8c6a0d4f6..000000000 --- a/src/rpp/rpp/operators/details/utils.hpp +++ /dev/null @@ -1,59 +0,0 @@ -// ReactivePlusPlus library -// -// Copyright Aleksey Loginov 2023 - present. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// https://www.boost.org/LICENSE_1_0.txt) -// -// Project home: https://github.com/victimsnino/ReactivePlusPlus -// - -#pragma once - -#include - -namespace rpp::operators::details -{ - template - struct value_with_mutex - { - value_with_mutex() = default; - - explicit value_with_mutex(const T& v) - : value{v} - { - } - - explicit value_with_mutex(T&& v) - : value{std::move(v)} - { - } - - T value{}; - std::mutex mutex{}; - }; - - template - class pointer_under_lock - { - public: - explicit pointer_under_lock(value_with_mutex& value) - : pointer_under_lock{value.value, value.mutex} - { - } - - pointer_under_lock(T& val, std::mutex& mutex) - : m_ptr{&val} - , m_lock{mutex} - { - } - - T* operator->() { return m_ptr; } - - const T* operator->() const { return m_ptr; } - - private: - T* m_ptr; - std::scoped_lock m_lock; - }; -} // namespace rpp::operators::details \ No newline at end of file diff --git a/src/rpp/rpp/operators/group_by.hpp b/src/rpp/rpp/operators/group_by.hpp index 5d1d887bf..e9b32e9f9 100644 --- a/src/rpp/rpp/operators/group_by.hpp +++ b/src/rpp/rpp/operators/group_by.hpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 80634c823..511f43f1d 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -15,9 +15,9 @@ #include #include #include -#include #include #include +#include #include @@ -38,11 +38,11 @@ namespace rpp::operators::details // just need atomicity, not guarding anything bool decrement_on_completed() { return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; } - pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer; } private: - value_with_mutex m_observer{}; - std::atomic_size_t m_on_completed_needed{1}; + rpp::utils::value_with_mutex m_observer{}; + std::atomic_size_t m_on_completed_needed{1}; }; template diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index a789cd7e8..15b017ee7 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include namespace rpp::operators::details { @@ -33,13 +33,13 @@ namespace rpp::operators::details switch_on_next_state_t(const switch_on_next_state_t&) = delete; switch_on_next_state_t(switch_on_next_state_t&&) noexcept = delete; - pointer_under_lock get_observer() + rpp::utils::pointer_under_lock get_observer() { - return pointer_under_lock{m_observer_with_mutex}; + return m_observer_with_mutex; } private: - value_with_mutex m_observer_with_mutex{}; + rpp::utils::value_with_mutex m_observer_with_mutex{}; }; template diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index 04a9b1e23..acb43f37f 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -14,8 +14,8 @@ #include #include -#include #include +#include namespace rpp::operators::details { @@ -33,10 +33,10 @@ namespace rpp::operators::details { } - pointer_under_lock get_observer() { return pointer_under_lock{m_observer_with_mutex}; } + rpp::utils::pointer_under_lock get_observer() { return m_observer_with_mutex; } private: - value_with_mutex m_observer_with_mutex{}; + rpp::utils::value_with_mutex m_observer_with_mutex{}; }; template diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 494a66ca7..beda2ed36 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -13,9 +13,9 @@ #include #include -#include #include #include +#include namespace rpp::operators::details { @@ -35,14 +35,14 @@ namespace rpp::operators::details , m_fallback{fallback} { } - pointer_under_lock get_observer_with_timeout_under_lock() { return pointer_under_lock{m_observer_with_timeout}; } + rpp::utils::pointer_under_lock get_observer_with_timeout_under_lock() { return m_observer_with_timeout; } const TFallbackObservable& get_fallback() const { return m_fallback; } rpp::schedulers::duration get_period() const { return m_period; } private: - value_with_mutex m_observer_with_timeout; + rpp::utils::value_with_mutex m_observer_with_timeout; const rpp::schedulers::duration m_period; const TFallbackObservable m_fallback; diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 5a67b28ad..c05fb3dd6 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -16,8 +16,8 @@ #include #include #include -#include #include +#include #include @@ -50,7 +50,7 @@ namespace rpp::operators::details { } - rpp::operators::details::pointer_under_lock get_state_under_lock() { return rpp::operators::details::pointer_under_lock{m_state}; } + rpp::utils::pointer_under_lock get_state_under_lock() { return rpp::utils::pointer_under_lock{m_state}; } template auto get_closing(T&& v) const @@ -67,8 +67,8 @@ namespace rpp::operators::details } private: - rpp::operators::details::value_with_mutex m_state{}; - RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn m_closings; + rpp::utils::value_with_mutex m_state{}; + RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn m_closings; }; template diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index c966bbda5..e6e8140ff 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -15,8 +15,8 @@ #include #include #include -#include #include +#include #include @@ -32,16 +32,16 @@ namespace rpp::operators::details { } - pointer_under_lock get_observer_under_lock() { return pointer_under_lock{observer_with_mutex}; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return observer_with_mutex; } - rpp::utils::tuple>...>& get_values() { return values; } + rpp::utils::tuple>...>& get_values() { return values; } const TSelector& get_selector() const { return selector; } private: - value_with_mutex observer_with_mutex{}; - rpp::utils::tuple>...> values{}; - RPP_NO_UNIQUE_ADDRESS TSelector selector; + rpp::utils::value_with_mutex observer_with_mutex{}; + rpp::utils::tuple>...> values{}; + RPP_NO_UNIQUE_ADDRESS TSelector selector; }; template @@ -62,9 +62,8 @@ namespace rpp::operators::details template void on_next(T&& v) const { - auto& [value, mutex] = disposable->get_values().template get(); - std::scoped_lock lock{mutex}; - value.emplace(std::forward(v)); + auto locked_value = disposable->get_values().template get().lock(); + locked_value->emplace(std::forward(v)); } void on_error(const std::exception_ptr& err) const @@ -99,11 +98,11 @@ namespace rpp::operators::details template void on_next(T&& v) const { - auto result = disposable->get_values().apply([this, &v](value_with_mutex>&... vals) -> std::optional { - auto lock = std::scoped_lock{vals.mutex...}; + auto result = disposable->get_values().apply([this, &v](rpp::utils::value_with_mutex>&... vals) -> std::optional { + auto lock = std::scoped_lock{vals.get_mutex()...}; - if ((vals.value.has_value() && ...)) - return disposable->get_selector()(rpp::utils::as_const(std::forward(v)), rpp::utils::as_const(vals.value.value())...); + if ((vals.get_value_unsafe().has_value() && ...)) + return disposable->get_selector()(rpp::utils::as_const(std::forward(v)), rpp::utils::as_const(vals.get_value_unsafe().value())...); return std::nullopt; }); diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp index 10664c5ac..ae0247314 100644 --- a/src/rpp/rpp/operators/zip.hpp +++ b/src/rpp/rpp/operators/zip.hpp @@ -57,7 +57,7 @@ namespace rpp::operators::details private: template - static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, std::deque&... values) + static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock& observer, std::deque&... values) { if ((!values.empty() && ...)) { diff --git a/src/rpp/rpp/subjects.hpp b/src/rpp/rpp/subjects.hpp index bf5ed2934..49cb7b52b 100644 --- a/src/rpp/rpp/subjects.hpp +++ b/src/rpp/rpp/subjects.hpp @@ -17,5 +17,6 @@ * \ingroup rpp */ +#include #include #include \ No newline at end of file diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp new file mode 100644 index 000000000..9219c9a44 --- /dev/null +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -0,0 +1,138 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace rpp::subjects::details +{ + template + class behavior_subject_base + { + class behavior_state final : public subject_state + { + public: + behavior_state(const Type& v) + : m_value{v} + { + } + behavior_state(Type&& v) + : m_value{std::move(v)} + { + } + + rpp::utils::pointer_under_lock get_value() { return rpp::utils::pointer_under_lock{m_value}; } + + private: + rpp::utils::value_with_mutex m_value{}; + }; + + struct observer_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr state; + + void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } + + bool is_disposed() const noexcept { return state->is_disposed(); } + + void on_next(const Type& v) const + { + *state->get_value() = v; + state->on_next(v); + } + + void on_error(const std::exception_ptr& err) const { state->on_error(err); } + + void on_completed() const { state->on_completed(); } + }; + + public: + using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; + + explicit behavior_subject_base(const Type& value) + : m_state{disposable_wrapper_impl::make(value)} + { + } + + explicit behavior_subject_base(Type&& value) + : m_state{disposable_wrapper_impl::make(std::move(value))} + { + } + + auto get_observer() const + { + return rpp::observer{m_state.lock()}; + } + + auto get_observable() const + { + return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) { + const auto locked = state.lock(); + if (!locked->is_disposed()) + { + auto v = *locked->get_value(); + observer.on_next(std::move(v)); + } + locked->on_subscribe(std::forward(observer)); + }); + } + + rpp::disposable_wrapper get_disposable() const + { + return m_state; + } + + private: + disposable_wrapper_impl m_state; + }; +} // namespace rpp::subjects::details + +namespace rpp::subjects +{ + /** + * @brief Same as rpp::subjects::publish_subject but keeps last value (or default) and emits it to newly subscribed observer + * + * @tparam Type value provided by this subject + * + * @ingroup subjects + * @see https://reactivex.io/documentation/subject.html + */ + template + class behavior_subject final : public details::behavior_subject_base + { + public: + using details::behavior_subject_base::behavior_subject_base; + }; + + /** + * @brief Same as rpp::subjects::behavior_subject but on_next/on_error/on_completed calls are serialized via mutex. + * @details When you are using ordinary rpp::subjects::behavior_subject, then you must take care not to call its on_next method (or its other on methods) in async way. + * + * @ingroup subjects + * @see https://reactivex.io/documentation/subject.html + */ + template + class serialized_behavior_subject final : public details::behavior_subject_base + { + public: + using details::behavior_subject_base::behavior_subject_base; + }; +} // namespace rpp::subjects \ No newline at end of file diff --git a/src/rpp/rpp/subjects/fwd.hpp b/src/rpp/rpp/subjects/fwd.hpp index 22daa29ed..76dd683ba 100644 --- a/src/rpp/rpp/subjects/fwd.hpp +++ b/src/rpp/rpp/subjects/fwd.hpp @@ -32,6 +32,13 @@ namespace rpp::subjects class serialized_replay_subject; + template + class behavior_subject; + + template + class serialized_behavior_subject; + + } // namespace rpp::subjects namespace rpp::constraint diff --git a/src/rpp/rpp/utils/utils.hpp b/src/rpp/rpp/utils/utils.hpp index 65c33865a..091af0d75 100644 --- a/src/rpp/rpp/utils/utils.hpp +++ b/src/rpp/rpp/utils/utils.hpp @@ -15,6 +15,7 @@ #include #include +#include namespace rpp::utils { @@ -253,6 +254,65 @@ namespace rpp::utils static constexpr void try_lock() {} }; + template + class value_with_mutex + { + public: + value_with_mutex() = default; + + explicit value_with_mutex(const T& v) + : m_value{v} + { + } + + explicit value_with_mutex(T&& v) + : m_value{std::move(v)} + { + } + + class pointer_under_lock + { + public: + pointer_under_lock(value_with_mutex&& value) = delete; + + pointer_under_lock(value_with_mutex& value) + : pointer_under_lock{value.m_value, value.m_mutex} + { + } + + private: + pointer_under_lock(T& val, std::mutex& mutex) + : m_ptr{&val} + , m_lock{mutex} + { + } + + public: + T* operator->() { return m_ptr; } + const T* operator->() const { return m_ptr; } + + T& operator*() { return *m_ptr; } + const T& operator*() const { return *m_ptr; } + + private: + T* m_ptr; + std::scoped_lock m_lock; + }; + + pointer_under_lock lock() { return *this; } + + std::mutex& get_mutex() { return m_mutex; } + T& get_value_unsafe() { return m_value; } + + private: + T m_value{}; + std::mutex m_mutex{}; + }; + + template + using pointer_under_lock = typename value_with_mutex::pointer_under_lock; + + #define RPP_CALL_DURING_CONSTRUCTION(...) RPP_NO_UNIQUE_ADDRESS rpp::utils::none _ = [&]() { \ __VA_ARGS__; \ return rpp::utils::none{}; \ diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index a50469f66..89bc9e901 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -13,11 +13,13 @@ #include #include #include +#include #include #include #include "copy_count_tracker.hpp" #include "mock_observer.hpp" +#include "snitch_logging.hpp" #include @@ -231,9 +233,14 @@ TEST_CASE("publish subject caches error/completed") } } -TEMPLATE_TEST_CASE("serialized subjects handles race condition", "", rpp::subjects::serialized_publish_subject, rpp::subjects::serialized_replay_subject) +TEMPLATE_TEST_CASE("serialized subjects handles race condition", "", rpp::subjects::serialized_publish_subject, rpp::subjects::serialized_replay_subject, rpp::subjects::serialized_behavior_subject) { - auto subj = TestType{}; + auto subj = []() { + if constexpr (std::same_as>) + return TestType{0}; + else + return TestType{}; + }(); SECTION("call on_next from 2 threads") { @@ -427,4 +434,57 @@ TEMPLATE_TEST_CASE("replay subject doesn't introduce additional copies", "", rpp CHECK(tracker.get_move_count() == 0 + 1); // + 1 move to this observer }); } +} + +TEMPLATE_TEST_CASE("replay subject multicasts values and replay", "", rpp::subjects::behavior_subject, rpp::subjects::serialized_behavior_subject) +{ + const auto mock_1 = mock_observer_strategy{}; + const auto subj = TestType{10}; + + SECTION("subscribe to subject with default") + { + subj.get_observable().subscribe(mock_1); + CHECK(mock_1.get_received_values() == std::vector{10}); + + SECTION("emit value and subscribe other observer") + { + const auto mock_2 = mock_observer_strategy{}; + + subj.get_observer().on_next(5); + + CHECK(mock_1.get_received_values() == std::vector{10, 5}); + CHECK(mock_2.get_received_values() == std::vector{}); + + subj.get_observable().subscribe(mock_2); + + CHECK(mock_2.get_received_values() == std::vector{5}); + + SECTION("emit one more value and subscribe one more other observer") + { + const auto mock_3 = mock_observer_strategy{}; + subj.get_observer().on_next(1); + + CHECK(mock_1.get_received_values() == std::vector{10, 5, 1}); + CHECK(mock_2.get_received_values() == std::vector{5, 1}); + CHECK(mock_3.get_received_values() == std::vector{}); + + subj.get_observable().subscribe(mock_3); + + CHECK(mock_3.get_received_values() == std::vector{1}); + } + } + + SECTION("subject keeps error") + { + subj.get_observer().on_error(std::exception_ptr{}); + CHECK(mock_1.get_on_error_count() == 1); + + const auto mock_4 = mock_observer_strategy{}; + subj.get_observable().subscribe(mock_4); + + CHECK(mock_4.get_received_values() == std::vector{}); + CHECK(mock_4.get_on_error_count() == 1); + CHECK(mock_4.get_on_completed_count() == 0); + } + } } \ No newline at end of file