From ff5a9283de6e83d69c8224ada3c619513161a8c8 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 4 Dec 2022 22:59:46 +0300 Subject: [PATCH 1/6] Add behavior subject --- docs/Implementation Status.md | 2 +- src/rpp/rpp/subjects.hpp | 3 +- src/rpp/rpp/subjects/behavior_subject.hpp | 132 ++++++++++++++++++ src/rpp/rpp/subjects/details/base_subject.hpp | 8 +- src/rpp/rpp/subjects/fwd.hpp | 5 +- src/tests/rpp/test_subjects.cpp | 50 ++++++- 6 files changed, 195 insertions(+), 5 deletions(-) create mode 100644 src/rpp/rpp/subjects/behavior_subject.hpp diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index bbbac7ebb..89d16a1da 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -150,7 +150,7 @@ ## Subjects - [x] publish_subject +- [x] behavior_subject - [ ] serialized_subject - [ ] replay_subject -- [ ] publish_subject - [ ] async_subject diff --git a/src/rpp/rpp/subjects.hpp b/src/rpp/rpp/subjects.hpp index 58fc1aafd..0a1aa2893 100644 --- a/src/rpp/rpp/subjects.hpp +++ b/src/rpp/rpp/subjects.hpp @@ -17,4 +17,5 @@ * \ingroup rpp */ -#include \ No newline at end of file +#include +#include \ No newline at end of file diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp new file mode 100644 index 000000000..13f60752f --- /dev/null +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -0,0 +1,132 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include +#include +#include +#include +#include + +namespace rpp::subjects::details +{ +template +class behavior_strategy +{ +public: + template TT> + behavior_strategy(TT&& v, const composite_subscription& sub) + : m_state{std::make_shared(std::forward(v))} + , m_sub{sub} + { + m_sub.add([state = std::weak_ptr{m_state}] + { + if (const auto locked = state.lock()) + locked->on_unsubscribe(); + }); + } + + void on_subscribe(const dynamic_subscriber& sub) const + { + if (m_sub.is_subscribed()) + sub.on_next(m_state->get_value()); + + m_state->on_subscribe(sub); + } + + auto get_subscriber() const + { + return rpp::make_specific_subscriber(m_sub, + [state = m_state](const T& v) + { + state->set_value(v); + state->on_next(v); + }, + [state = m_state](const std::exception_ptr& err) + { + state->on_error(err); + }, + [state = m_state]() + { + state->on_completed(); + }); + } + + T get_value() const + { + return m_state->get_value(); + } + +private: + class behavior_state : public subject_state + { + public: + behavior_state(const T& v) + : subject_state{} + , value{v} {} + + behavior_state(T&& v) + : subject_state{} + , value{std::move(v)} {} + + T get_value() + { + std::lock_guard lock{mutex}; + return value; + } + + + void set_value(const T& v) + { + std::lock_guard lock{mutex}; + value = v; + } + + private: + + std::mutex mutex; + T value; + }; + + std::shared_ptr m_state; + composite_subscription m_sub{}; +}; +} // namespace rpp::subjects::details + +namespace rpp::subjects +{ +/** + * \brief Subject which multicasts values to observers subscribed on it and sends last emitted value (or initial value) on subscribe. It contains two parts: subscriber and observable at the same time. + * + * \details Each subscriber obtains only last/initial value + values which emitted after corresponding subscribe. on_error/on_completer/unsubscribe cached and provided to new subscribers if any + * + * \warning this subject is not synchronized/serialized! It means, that expected to call callbacks of subscriber in the serialized way to follow observable contract: "Observables must issue notifications to observers serially (not in parallel).". If you are not sure or need extra serialization, please, use serialized_subject. + * + * \tparam T value provided by this subject + * + * \ingroup subjects + * \see https://reactivex.io/documentation/subject.html + */ +template +class behavior_subject final : public details::base_subject> +{ +public: + behavior_subject(const T& initial_value, const composite_subscription& sub = composite_subscription{}) + : details::base_subject>{initial_value, sub} {} + + behavior_subject(T&& initial_value, const composite_subscription& sub = composite_subscription{}) + : details::base_subject>{std::move(initial_value), sub} {} + + T get_value() const + { + return details::base_subject>::get_strategy().get_value(); + } +}; +} // namespace rpp::subjects diff --git a/src/rpp/rpp/subjects/details/base_subject.hpp b/src/rpp/rpp/subjects/details/base_subject.hpp index 99f022110..49887f924 100644 --- a/src/rpp/rpp/subjects/details/base_subject.hpp +++ b/src/rpp/rpp/subjects/details/base_subject.hpp @@ -38,7 +38,13 @@ class base_subject : public subject_tag }); } +protected: + base_subject(auto&& ...args) + : m_strategy{std::forward(args)...} {} + + const Strategy& get_strategy() const { return m_strategy; } + private: Strategy m_strategy{}; }; -} // namespace rpp::subjects::details \ No newline at end of file +} // namespace rpp::subjects::details diff --git a/src/rpp/rpp/subjects/fwd.hpp b/src/rpp/rpp/subjects/fwd.hpp index 3452e1c55..d7a0504e8 100644 --- a/src/rpp/rpp/subjects/fwd.hpp +++ b/src/rpp/rpp/subjects/fwd.hpp @@ -18,7 +18,7 @@ namespace rpp::subjects::details struct subject_tag; template -concept subject_strategy = std::constructible_from && requires(Strategy t) +concept subject_strategy = requires(Strategy t) { {t.get_subscriber()} -> rpp::constraint::subscriber; t.on_subscribe(std::declval>()); @@ -32,4 +32,7 @@ namespace rpp::subjects { template class publish_subject; + +template +class behavior_subject; } // namespace rpp::subjects diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index e3c921340..d047b9095 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -10,6 +10,7 @@ #include "mock_observer.hpp" #include +#include #include SCENARIO("publish subject multicasts values") @@ -231,4 +232,51 @@ SCENARIO("publish subject caches error/completed/unsubscribe") } } } -} \ No newline at end of file +} + +SCENARIO("behavior subject caches last emission") +{ + auto mock = mock_observer{}; + GIVEN("behavior subject") + { + auto subj = rpp::subjects::behavior_subject{100}; + WHEN("subscribe on it") + { + subj.get_observable().subscribe(mock); + THEN("subject emits initial value on subscribe") + { + CHECK(mock.get_received_values() == std::vector{100}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + WHEN("send new value to subject") + { + subj.get_subscriber().on_next(5); + AND_WHEN("subscribe on it") + { + subj.get_observable().subscribe(mock); + THEN("subject emits last value on subscribe") + { + CHECK(mock.get_received_values() == std::vector{5}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + } + WHEN("complete subject") + { + subj.get_subscriber().on_completed(); + AND_WHEN("subscribe on it") + { + subj.get_observable().subscribe(mock); + THEN("subject emits completion on subscribe") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + } +} From 8153262b40c3173b9baf097295411aacab650586 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 4 Dec 2022 23:10:06 +0300 Subject: [PATCH 2/6] compile fix --- src/rpp/rpp/subjects/behavior_subject.hpp | 6 +++--- src/rpp/rpp/subjects/details/base_subject.hpp | 3 +++ src/rpp/rpp/subjects/publish_subject.hpp | 5 +++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp index 13f60752f..cdd11a5f2 100644 --- a/src/rpp/rpp/subjects/behavior_subject.hpp +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -21,10 +21,10 @@ template class behavior_strategy { public: - template TT> - behavior_strategy(TT&& v, const composite_subscription& sub) + template TT, rpp::constraint::decayed_same_as TSub> + behavior_strategy(TT&& v, TSub&& sub) : m_state{std::make_shared(std::forward(v))} - , m_sub{sub} + , m_sub{std::forward(sub)} { m_sub.add([state = std::weak_ptr{m_state}] { diff --git a/src/rpp/rpp/subjects/details/base_subject.hpp b/src/rpp/rpp/subjects/details/base_subject.hpp index 49887f924..44c9a9d83 100644 --- a/src/rpp/rpp/subjects/details/base_subject.hpp +++ b/src/rpp/rpp/subjects/details/base_subject.hpp @@ -25,6 +25,9 @@ class base_subject : public subject_tag base_subject(const composite_subscription& sub = composite_subscription{}) : m_strategy{sub} {} + base_subject(composite_subscription&& sub = composite_subscription{}) + : m_strategy{std::move(sub)} {} + auto get_subscriber() const { return m_strategy.get_subscriber(); diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 0417de5af..81ffe9118 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -21,8 +21,9 @@ template class publish_strategy { public: - publish_strategy(const composite_subscription& sub) - : m_sub{sub} + template TSub> + publish_strategy(TSub&& sub) + : m_sub{std::forward(sub)} { m_sub.add([state = std::weak_ptr{m_state}] { From 36b21f8a742cf0186d6147aa9a0e33877b609416 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 4 Dec 2022 23:16:32 +0300 Subject: [PATCH 3/6] one more --- src/rpp/rpp/subjects/details/base_subject.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/subjects/details/base_subject.hpp b/src/rpp/rpp/subjects/details/base_subject.hpp index 44c9a9d83..8ec297c6b 100644 --- a/src/rpp/rpp/subjects/details/base_subject.hpp +++ b/src/rpp/rpp/subjects/details/base_subject.hpp @@ -25,7 +25,7 @@ class base_subject : public subject_tag base_subject(const composite_subscription& sub = composite_subscription{}) : m_strategy{sub} {} - base_subject(composite_subscription&& sub = composite_subscription{}) + base_subject(composite_subscription&& sub) : m_strategy{std::move(sub)} {} auto get_subscriber() const From a6e1580e9d678dfde6b48785a211a5c5a96c2a39 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 5 Dec 2022 00:10:57 +0300 Subject: [PATCH 4/6] fix --- src/rpp/rpp/subjects/details/base_subject.hpp | 6 ------ src/rpp/rpp/subjects/publish_subject.hpp | 9 ++++++++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/rpp/rpp/subjects/details/base_subject.hpp b/src/rpp/rpp/subjects/details/base_subject.hpp index 8ec297c6b..003aea899 100644 --- a/src/rpp/rpp/subjects/details/base_subject.hpp +++ b/src/rpp/rpp/subjects/details/base_subject.hpp @@ -22,12 +22,6 @@ template Strategy> class base_subject : public subject_tag { public: - base_subject(const composite_subscription& sub = composite_subscription{}) - : m_strategy{sub} {} - - base_subject(composite_subscription&& sub) - : m_strategy{std::move(sub)} {} - auto get_subscriber() const { return m_strategy.get_subscriber(); diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 81ffe9118..ee71549fa 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -75,5 +75,12 @@ namespace rpp::subjects * \see https://reactivex.io/documentation/subject.html */ template -class publish_subject final : public details::base_subject>{}; +class publish_subject final : public details::base_subject>{ +public: + publish_subject(const composite_subscription& sub = composite_subscription{}) + : details::base_subject>{sub} {} + + publish_subject(const composite_subscription& sub = composite_subscription{}) + : details::base_subject>{std::move(sub)} {} +}; } // namespace rpp::subjects From 04ddbd3a43902a74d4b6e31a3b908548e65f25af Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 5 Dec 2022 00:15:09 +0300 Subject: [PATCH 5/6] typo -_- --- src/rpp/rpp/subjects/behavior_subject.hpp | 11 +++++++++-- src/rpp/rpp/subjects/publish_subject.hpp | 4 ++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp index cdd11a5f2..17d7a242d 100644 --- a/src/rpp/rpp/subjects/behavior_subject.hpp +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -118,12 +118,19 @@ template class behavior_subject final : public details::base_subject> { public: - behavior_subject(const T& initial_value, const composite_subscription& sub = composite_subscription{}) + behavior_subject(const T& initial_value, const composite_subscription& sub) : details::base_subject>{initial_value, sub} {} - behavior_subject(T&& initial_value, const composite_subscription& sub = composite_subscription{}) + behavior_subject(T&& initial_value, const composite_subscription& sub) : details::base_subject>{std::move(initial_value), sub} {} + behavior_subject(const T& initial_value, composite_subscription&& sub = composite_subscription{}) + : details::base_subject>{initial_value, std::move(sub)} {} + + behavior_subject(T&& initial_value, composite_subscription&& sub = composite_subscription{}) + : details::base_subject>{std::move(initial_value), std::move(sub)} {} + + T get_value() const { return details::base_subject>::get_strategy().get_value(); diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index ee71549fa..b0649eed2 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -77,10 +77,10 @@ namespace rpp::subjects template class publish_subject final : public details::base_subject>{ public: - publish_subject(const composite_subscription& sub = composite_subscription{}) + publish_subject(const composite_subscription& sub) : details::base_subject>{sub} {} - publish_subject(const composite_subscription& sub = composite_subscription{}) + publish_subject(composite_subscription&& sub = composite_subscription{}) : details::base_subject>{std::move(sub)} {} }; } // namespace rpp::subjects From e2540f2209b6a0132f4d1b130413bef9506567a6 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 5 Dec 2022 09:21:55 +0300 Subject: [PATCH 6/6] Update publish_subject.hpp --- src/rpp/rpp/subjects/publish_subject.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index b0649eed2..f55199ffc 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -78,9 +78,9 @@ template class publish_subject final : public details::base_subject>{ public: publish_subject(const composite_subscription& sub) - : details::base_subject>{sub} {} + : details::base_subject>{sub} {} publish_subject(composite_subscription&& sub = composite_subscription{}) - : details::base_subject>{std::move(sub)} {} + : details::base_subject>{std::move(sub)} {} }; } // namespace rpp::subjects