From a11e32e4f626f133523557f48aa4fd763b4452db Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sat, 20 Jan 2024 15:27:25 +0100 Subject: [PATCH 1/4] Add replay_subject --- src/rpp/rpp/subjects/replay_subject.hpp | 158 ++++++++++++++++++++++++ src/tests/rpp/test_subjects.cpp | 143 ++++++++++++++++++++- 2 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 src/rpp/rpp/subjects/replay_subject.hpp diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp new file mode 100644 index 000000000..2a5567681 --- /dev/null +++ b/src/rpp/rpp/subjects/replay_subject.hpp @@ -0,0 +1,158 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include +#include + +#include +#include +#include + +#include +#include + +namespace rpp::subjects::details +{ +template +class replay_strategy +{ + struct replay_state + { + std::optional count; + std::optional duration; + + std::list values{}; + std::list time_points{}; + + std::mutex mutex{}; + subject_state state{}; + }; + + struct observer_strategy + { + std::shared_ptr state; + + void set_upstream(const disposable_wrapper& d) const noexcept { state->state.add(d); } + + bool is_disposed() const noexcept + { + return state->state.is_disposed(); + } + + void on_next(const Type& v) const + { + std::unique_lock lock{state->mutex}; + if (state->count.has_value()) + { + if (state->values.size() == state->count.value()) + { + state->values.pop_front(); + if (state->duration.has_value()) + { + state->time_points.pop_front(); + } + } + } + if (state->duration.has_value()) + { + auto now = rpp::schedulers::clock_type::now(); + while (!state->time_points.empty() && (now - state->time_points.front() > state->duration.value())) + { + state->values.pop_front(); + state->time_points.pop_front(); + } + state->time_points.push_back(now); + } + + state->values.push_back(v); + state->state.on_next(v); + } + + void on_error(const std::exception_ptr& err) const + { + std::unique_lock lock{state->mutex}; + state->state.on_error(err); + } + + void on_completed() const + { + std::unique_lock lock{state->mutex}; + state->state.on_completed(); + } + }; + +public: + replay_strategy() + : m_state(std::make_shared(std::nullopt, std::nullopt)) + { + } + + replay_strategy(size_t count) + : m_state{std::make_shared(count, std::nullopt)} + { + } + + replay_strategy(size_t count, rpp::schedulers::duration duration) + : m_state{std::make_shared(count, duration)} + { + } + + auto get_observer() const + { + return rpp::observer>{ + composite_disposable_wrapper{ + std::shared_ptr>{m_state, &m_state->state}}, + observer_strategy{m_state}}; + } + + template TObs> + void on_subscribe(TObs&& observer) const + { + { + std::unique_lock lock{m_state->mutex}; + for (const auto& value : m_state->values) + { + observer.on_next(value); + } + } + m_state->state.on_subscribe(std::forward(observer)); + } + + rpp::disposable_wrapper get_disposable() const + { + return rpp::disposable_wrapper{m_state->state}; + } + +private: + std::shared_ptr m_state = std::make_shared(); +}; +} + +namespace rpp::subjects +{ +/** + * @brief Same as rpp::subjects::serialized_subject but send all earlier emitted values to any new observers. + * + * @param count maximum element count of the replay buffer (optional) + * @param duration maximum time length the replay buffer (optional) + * + * @tparam Type value provided by this subject + * + * @ingroup subjects + * @see https://reactivex.io/documentation/subject.html + */ +template +class replay_subject final : public details::base_subject> +{ +public: + using details::base_subject>::base_subject; +}; +} \ No newline at end of file diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index efd00c749..e57d60a09 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -12,11 +12,11 @@ #include #include +#include #include #include #include - #include "mock_observer.hpp" #include @@ -255,4 +255,145 @@ TEST_CASE("serialized_subject handles race condition") CHECK(on_error_called); } +} + + +TEST_CASE("replay subject multicasts values and replay") +{ + SECTION("replay subject") + { + auto mock_1 = mock_observer_strategy{}; + auto mock_2 = mock_observer_strategy{}; + auto mock_3 = mock_observer_strategy{}; + + auto sub = rpp::subjects::replay_subject{}; + + SECTION("subscribe multiple observers") + { + auto dis = std::make_shared(); + sub.get_observable().subscribe(mock_1.get_observer(dis)); + sub.get_observable().subscribe(mock_2.get_observer(dis)); + + sub.get_observer().on_next(1); + sub.get_observer().on_next(2); + sub.get_observer().on_next(3); + + SECTION("observers obtain values") + { + auto validate = [](auto mock) { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_total_on_next_count() == 3); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + }; + validate(mock_1); + validate(mock_2); + } + + sub.get_observable().subscribe(mock_3.get_observer(dis)); + + SECTION("observer obtains replayed values") + { + CHECK(mock_3.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock_3.get_total_on_next_count() == 3); + CHECK(mock_3.get_on_error_count() == 0); + CHECK(mock_3.get_on_completed_count() == 0); + } + + sub.get_observer().on_next(4); + + SECTION("observers stil obtain values") + { + auto validate = [](auto mock) { + CHECK(mock.get_received_values() == std::vector{1, 2, 3, 4}); + CHECK(mock.get_total_on_next_count() == 4); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + }; + validate(mock_1); + validate(mock_2); + validate(mock_3); + } + } + } + + SECTION("bounded replay subject") + { + auto mock_1 = mock_observer_strategy{}; + auto mock_2 = mock_observer_strategy{}; + + size_t bound = 1; + auto sub = rpp::subjects::replay_subject{bound}; + + SECTION("subscribe multiple observers") + { + auto dis = std::make_shared(); + sub.get_observable().subscribe(mock_1.get_observer(dis)); + + sub.get_observer().on_next(1); + sub.get_observer().on_next(2); + sub.get_observer().on_next(3); + + SECTION("observer obtains values") + { + CHECK(mock_1.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock_1.get_total_on_next_count() == 3); + CHECK(mock_1.get_on_error_count() == 0); + CHECK(mock_1.get_on_completed_count() == 0); + } + + sub.get_observable().subscribe(mock_2.get_observer(dis)); + + SECTION("observer obtains latest replayed values") + { + CHECK(mock_2.get_received_values() == std::vector{3}); + CHECK(mock_2.get_total_on_next_count() == 1); + CHECK(mock_2.get_on_error_count() == 0); + CHECK(mock_2.get_on_completed_count() == 0); + } + } + } + + SECTION("bounded replay subject with duration") + { + using namespace std::chrono_literals; + + auto mock_1 = mock_observer_strategy{}; + auto mock_2 = mock_observer_strategy{}; + + size_t bound = 2; + auto duration = 5ms; + auto sub = rpp::subjects::replay_subject{bound, duration}; + + SECTION("subscribe multiple observers") + { + auto dis = std::make_shared(); + sub.get_observable().subscribe(mock_1.get_observer(dis)); + + sub.get_observer().on_next(1); + sub.get_observer().on_next(2); + sub.get_observer().on_next(3); + + SECTION("observer obtains values") + { + CHECK(mock_1.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock_1.get_total_on_next_count() == 3); + CHECK(mock_1.get_on_error_count() == 0); + CHECK(mock_1.get_on_completed_count() == 0); + } + + std::this_thread::sleep_for(duration); + sub.get_observer().on_next(4); + + sub.get_observable().subscribe(mock_2.get_observer(dis)); + + SECTION("subject replay only non expired values") + { + CHECK(mock_2.get_received_values() == std::vector{4}); + CHECK(mock_2.get_total_on_next_count() == 1); + CHECK(mock_2.get_on_error_count() == 0); + CHECK(mock_2.get_on_completed_count() == 0); + } + } + } } \ No newline at end of file From b914f2ce4d639b042ba49eb904e822482fa0b177 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sat, 20 Jan 2024 16:45:04 +0100 Subject: [PATCH 2/4] Add copy test --- src/tests/rpp/test_subjects.cpp | 36 ++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index e57d60a09..8f2584608 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -17,6 +17,7 @@ #include #include +#include "copy_count_tracker.hpp" #include "mock_observer.hpp" #include @@ -270,9 +271,8 @@ TEST_CASE("replay subject multicasts values and replay") SECTION("subscribe multiple observers") { - auto dis = std::make_shared(); - sub.get_observable().subscribe(mock_1.get_observer(dis)); - sub.get_observable().subscribe(mock_2.get_observer(dis)); + sub.get_observable().subscribe(mock_1.get_observer()); + sub.get_observable().subscribe(mock_2.get_observer()); sub.get_observer().on_next(1); sub.get_observer().on_next(2); @@ -290,7 +290,7 @@ TEST_CASE("replay subject multicasts values and replay") validate(mock_2); } - sub.get_observable().subscribe(mock_3.get_observer(dis)); + sub.get_observable().subscribe(mock_3.get_observer()); SECTION("observer obtains replayed values") { @@ -327,8 +327,7 @@ TEST_CASE("replay subject multicasts values and replay") SECTION("subscribe multiple observers") { - auto dis = std::make_shared(); - sub.get_observable().subscribe(mock_1.get_observer(dis)); + sub.get_observable().subscribe(mock_1.get_observer()); sub.get_observer().on_next(1); sub.get_observer().on_next(2); @@ -342,7 +341,7 @@ TEST_CASE("replay subject multicasts values and replay") CHECK(mock_1.get_on_completed_count() == 0); } - sub.get_observable().subscribe(mock_2.get_observer(dis)); + sub.get_observable().subscribe(mock_2.get_observer()); SECTION("observer obtains latest replayed values") { @@ -367,8 +366,7 @@ TEST_CASE("replay subject multicasts values and replay") SECTION("subscribe multiple observers") { - auto dis = std::make_shared(); - sub.get_observable().subscribe(mock_1.get_observer(dis)); + sub.get_observable().subscribe(mock_1.get_observer()); sub.get_observer().on_next(1); sub.get_observer().on_next(2); @@ -385,7 +383,7 @@ TEST_CASE("replay subject multicasts values and replay") std::this_thread::sleep_for(duration); sub.get_observer().on_next(4); - sub.get_observable().subscribe(mock_2.get_observer(dis)); + sub.get_observable().subscribe(mock_2.get_observer()); SECTION("subject replay only non expired values") { @@ -396,4 +394,22 @@ TEST_CASE("replay subject multicasts values and replay") } } } +} + +TEST_CASE("replay subject doesn't introduce additional copies") +{ + auto sub = rpp::subjects::replay_subject{}; + + SECTION("subscribe") + { + sub.get_observable().subscribe([](const copy_count_tracker& tracker){ + CHECK(tracker.get_copy_count(), 1); // 1 copy to internal replay buffer + }); + + sub.get_observer().on_next(copy_count_tracker{}); + + sub.get_observable().subscribe([](const copy_count_tracker& tracker){ + CHECK(tracker.get_copy_count(), 1); // 1 copy to internal replay buffer + }); + } } \ No newline at end of file From bedd14ae4211697c0df44fb269224e4002e5249e Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Wed, 24 Jan 2024 22:32:18 +0100 Subject: [PATCH 3/4] Address comments --- src/rpp/rpp/subjects/replay_subject.hpp | 158 +++++++++++++++++------- src/tests/rpp/test_subjects.cpp | 72 ++++++----- 2 files changed, 155 insertions(+), 75 deletions(-) diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp index 2a5567681..3c989071d 100644 --- a/src/rpp/rpp/subjects/replay_subject.hpp +++ b/src/rpp/rpp/subjects/replay_subject.hpp @@ -18,74 +18,125 @@ #include #include +#include namespace rpp::subjects::details { -template +template class replay_strategy { - struct replay_state + struct replay_state final : public subject_state { + replay_state(std::optional count, std::optional duration) + : count(count) + , duration(duration) + { + } + + auto collect_duration() + { + if (duration.has_value()) + { + auto now = rpp::schedulers::clock_type::now(); + while (!values.empty() && (now - values.front().second > duration.value())) + { + values.pop_front(); + } + return now; + } + return rpp::schedulers::clock_type::time_point{}; + } + + void collect_bound() + { + if (count.has_value()) + { + if (values.size() == count.value()) + { + values.pop_front(); + } + } + } + + template + void collect(T&& v) + { + std::unique_lock lock{list_mutex}; + collect_bound(); + const auto time_point = collect_duration(); + + values.emplace_back(std::forward(v), time_point); + } + std::optional count; std::optional duration; - std::list values{}; - std::list time_points{}; + std::list> values{}; - std::mutex mutex{}; - subject_state state{}; + std::mutex list_mutex{}; + std::mutex serialized_mutex{}; }; struct observer_strategy { std::shared_ptr state; - void set_upstream(const disposable_wrapper& d) const noexcept { state->state.add(d); } + template + void collect_and_on_next(T&& v) const + requires Serialized + { + state->collect(std::forward(v)); + + std::unique_lock lock{state->serialized_mutex}; + state->on_next(v); + } + + template + void collect_and_on_next(T&& v) const + { + state->collect(std::forward(v)); + state->on_next(v); + } + + void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } bool is_disposed() const noexcept { - return state->state.is_disposed(); + return state->is_disposed(); + } + + void on_next(Type&& v) const + { + collect_and_on_next(std::move(v)); } void on_next(const Type& v) const { - std::unique_lock lock{state->mutex}; - if (state->count.has_value()) - { - if (state->values.size() == state->count.value()) - { - state->values.pop_front(); - if (state->duration.has_value()) - { - state->time_points.pop_front(); - } - } - } - if (state->duration.has_value()) - { - auto now = rpp::schedulers::clock_type::now(); - while (!state->time_points.empty() && (now - state->time_points.front() > state->duration.value())) - { - state->values.pop_front(); - state->time_points.pop_front(); - } - state->time_points.push_back(now); - } + collect_and_on_next(v); + } - state->values.push_back(v); - state->state.on_next(v); + void on_error(const std::exception_ptr& err) const + requires Serialized + { + std::unique_lock lock{state->serialized_mutex}; + state->on_error(err); } void on_error(const std::exception_ptr& err) const { - std::unique_lock lock{state->mutex}; - state->state.on_error(err); + state->on_error(err); } void on_completed() const + requires Serialized { - std::unique_lock lock{state->mutex}; - state->state.on_completed(); + std::unique_lock lock{state->serialized_mutex}; + state->on_completed(); + } + + void on_completed() const + { + state->on_completed(); } }; @@ -105,30 +156,30 @@ class replay_strategy { } + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + auto get_observer() const { - return rpp::observer>{ - composite_disposable_wrapper{ - std::shared_ptr>{m_state, &m_state->state}}, - observer_strategy{m_state}}; + return rpp::observer{observer_strategy{m_state}}; } template TObs> void on_subscribe(TObs&& observer) const { { - std::unique_lock lock{m_state->mutex}; + std::unique_lock lock{m_state->list_mutex}; + m_state->collect_duration(); for (const auto& value : m_state->values) { - observer.on_next(value); + observer.on_next(value.first); } } - m_state->state.on_subscribe(std::forward(observer)); + m_state->on_subscribe(std::forward(observer)); } rpp::disposable_wrapper get_disposable() const { - return rpp::disposable_wrapper{m_state->state}; + return rpp::disposable_wrapper{m_state}; } private: @@ -139,7 +190,7 @@ class replay_strategy namespace rpp::subjects { /** - * @brief Same as rpp::subjects::serialized_subject but send all earlier emitted values to any new observers. + * @brief Same as rpp::subjects::publish_subject but send all earlier emitted values to any new observers. * * @param count maximum element count of the replay buffer (optional) * @param duration maximum time length the replay buffer (optional) @@ -150,9 +201,22 @@ namespace rpp::subjects * @see https://reactivex.io/documentation/subject.html */ template -class replay_subject final : public details::base_subject> +class replay_subject final : public details::base_subject> +{ +public: + using details::base_subject>::base_subject; +}; + +/** + * @brief Same as rpp::subjects::replay_subject but on_next/on_error/on_completed calls are serialized via mutex. + * + * @ingroup subjects + * @see https://reactivex.io/documentation/subject.html + */ +template +class serialized_replay_subject final : public details::base_subject> { public: - using details::base_subject>::base_subject; + using details::base_subject>::base_subject; }; } \ No newline at end of file diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index 8f2584608..a37a1c61e 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -10,12 +10,12 @@ #include -#include #include +#include +#include +#include #include #include -#include -#include #include "copy_count_tracker.hpp" #include "mock_observer.hpp" @@ -232,34 +232,31 @@ TEST_CASE("publish subject caches error/completed") } } -TEST_CASE("serialized_subject handles race condition") +TEMPLATE_TEST_CASE("serialized subjects handles race condition", "", rpp::subjects::serialized_subject, rpp::subjects::serialized_replay_subject) { - auto subj = rpp::subjects::serialized_subject{}; + auto subj = TestType{}; + SECTION("call on_next from 2 threads") { bool on_error_called{}; - rpp::source::create([&](auto&& obs) - { + rpp::source::create([&](auto&& obs) { subj.get_observable().subscribe(std::forward(obs)); subj.get_observer().on_next(1); }) | rpp::operators::as_blocking() - | rpp::operators::subscribe([&](int) - { + | rpp::operators::subscribe([&](int) { CHECK(!on_error_called); std::thread{[&]{ subj.get_observer().on_error({}); }}.detach(); std::this_thread::sleep_for(std::chrono::seconds{1}); - CHECK(!on_error_called); - }, - [&](const std::exception_ptr&){ on_error_called = true; }); + CHECK(!on_error_called); }, + [&](const std::exception_ptr&) { on_error_called = true; }); CHECK(on_error_called); } } - -TEST_CASE("replay subject multicasts values and replay") +TEMPLATE_TEST_CASE("replay subject multicasts values and replay", "", rpp::subjects::replay_subject, rpp::subjects::serialized_replay_subject) { SECTION("replay subject") { @@ -267,7 +264,7 @@ TEST_CASE("replay subject multicasts values and replay") auto mock_2 = mock_observer_strategy{}; auto mock_3 = mock_observer_strategy{}; - auto sub = rpp::subjects::replay_subject{}; + auto sub = TestType{}; SECTION("subscribe multiple observers") { @@ -323,7 +320,7 @@ TEST_CASE("replay subject multicasts values and replay") auto mock_2 = mock_observer_strategy{}; size_t bound = 1; - auto sub = rpp::subjects::replay_subject{bound}; + auto sub = TestType{bound}; SECTION("subscribe multiple observers") { @@ -362,7 +359,7 @@ TEST_CASE("replay subject multicasts values and replay") size_t bound = 2; auto duration = 5ms; - auto sub = rpp::subjects::replay_subject{bound, duration}; + auto sub = TestType{bound, duration}; SECTION("subscribe multiple observers") { @@ -381,14 +378,13 @@ TEST_CASE("replay subject multicasts values and replay") } std::this_thread::sleep_for(duration); - sub.get_observer().on_next(4); sub.get_observable().subscribe(mock_2.get_observer()); SECTION("subject replay only non expired values") { - CHECK(mock_2.get_received_values() == std::vector{4}); - CHECK(mock_2.get_total_on_next_count() == 1); + CHECK(mock_2.get_received_values() == std::vector{}); + CHECK(mock_2.get_total_on_next_count() == 0); CHECK(mock_2.get_on_error_count() == 0); CHECK(mock_2.get_on_completed_count() == 0); } @@ -396,20 +392,40 @@ TEST_CASE("replay subject multicasts values and replay") } } -TEST_CASE("replay subject doesn't introduce additional copies") +TEMPLATE_TEST_CASE("replay subject doesn't introduce additional copies", "", rpp::subjects::replay_subject, rpp::subjects::serialized_replay_subject) { - auto sub = rpp::subjects::replay_subject{}; - - SECTION("subscribe") + SECTION("on_next by rvalue") { - sub.get_observable().subscribe([](const copy_count_tracker& tracker){ - CHECK(tracker.get_copy_count(), 1); // 1 copy to internal replay buffer + auto sub = TestType{}; + + sub.get_observable().subscribe([](const copy_count_tracker& tracker) { + CHECK(tracker.get_copy_count() == 0); + CHECK(tracker.get_move_count() == 1); // 1 move to internal replay buffer }); sub.get_observer().on_next(copy_count_tracker{}); - sub.get_observable().subscribe([](const copy_count_tracker& tracker){ - CHECK(tracker.get_copy_count(), 1); // 1 copy to internal replay buffer + sub.get_observable().subscribe([](const copy_count_tracker& tracker) { + CHECK(tracker.get_copy_count() == 0); + CHECK(tracker.get_move_count() == 1); // 1 move to internal replay buffer + }); + } + + SECTION("on_next by lvalue") + { + copy_count_tracker tracker{}; + auto sub = TestType{}; + + sub.get_observable().subscribe([](const copy_count_tracker& tracker) { + CHECK(tracker.get_copy_count() == 1); // 1 copy to internal replay buffer + CHECK(tracker.get_move_count() == 0); + }); + + sub.get_observer().on_next(tracker); + + sub.get_observable().subscribe([](const copy_count_tracker& tracker) { + CHECK(tracker.get_copy_count() == 1); // 1 copy to internal replay buffer + CHECK(tracker.get_move_count() == 0); }); } } \ No newline at end of file From 9c4b00958dcd5421d5feaddb95645d491a98ec97 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Thu, 25 Jan 2024 22:32:36 +0100 Subject: [PATCH 4/4] Address comments --- src/rpp/rpp/subjects/replay_subject.hpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp index 3c989071d..b5cfe0add 100644 --- a/src/rpp/rpp/subjects/replay_subject.hpp +++ b/src/rpp/rpp/subjects/replay_subject.hpp @@ -88,14 +88,14 @@ class replay_strategy state->collect(std::forward(v)); std::unique_lock lock{state->serialized_mutex}; - state->on_next(v); + state->on_next(state->values.back().first); } template void collect_and_on_next(T&& v) const { state->collect(std::forward(v)); - state->on_next(v); + state->on_next(state->values.back().first); } void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } @@ -147,12 +147,12 @@ class replay_strategy } replay_strategy(size_t count) - : m_state{std::make_shared(count, std::nullopt)} + : m_state{std::make_shared(std::max(1, count), std::nullopt)} { } replay_strategy(size_t count, rpp::schedulers::duration duration) - : m_state{std::make_shared(count, duration)} + : m_state{std::make_shared(std::max(1, count), duration)} { }