diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp new file mode 100644 index 000000000..b5cfe0add --- /dev/null +++ b/src/rpp/rpp/subjects/replay_subject.hpp @@ -0,0 +1,222 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include +#include + +#include +#include +#include + +#include +#include +#include + +namespace rpp::subjects::details +{ +template +class replay_strategy +{ + struct replay_state final : public subject_state + { + replay_state(std::optional count, std::optional duration) + : count(count) + , duration(duration) + { + } + + auto collect_duration() + { + if (duration.has_value()) + { + auto now = rpp::schedulers::clock_type::now(); + while (!values.empty() && (now - values.front().second > duration.value())) + { + values.pop_front(); + } + return now; + } + return rpp::schedulers::clock_type::time_point{}; + } + + void collect_bound() + { + if (count.has_value()) + { + if (values.size() == count.value()) + { + values.pop_front(); + } + } + } + + template + void collect(T&& v) + { + std::unique_lock lock{list_mutex}; + collect_bound(); + const auto time_point = collect_duration(); + + values.emplace_back(std::forward(v), time_point); + } + + std::optional count; + std::optional duration; + + std::list> values{}; + + std::mutex list_mutex{}; + std::mutex serialized_mutex{}; + }; + + struct observer_strategy + { + std::shared_ptr state; + + template + void collect_and_on_next(T&& v) const + requires Serialized + { + state->collect(std::forward(v)); + + std::unique_lock lock{state->serialized_mutex}; + state->on_next(state->values.back().first); + } + + template + void collect_and_on_next(T&& v) const + { + state->collect(std::forward(v)); + state->on_next(state->values.back().first); + } + + void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } + + bool is_disposed() const noexcept + { + return state->is_disposed(); + } + + void on_next(Type&& v) const + { + collect_and_on_next(std::move(v)); + } + + void on_next(const Type& v) const + { + collect_and_on_next(v); + } + + void on_error(const std::exception_ptr& err) const + requires Serialized + { + std::unique_lock lock{state->serialized_mutex}; + state->on_error(err); + } + + void on_error(const std::exception_ptr& err) const + { + state->on_error(err); + } + + void on_completed() const + requires Serialized + { + std::unique_lock lock{state->serialized_mutex}; + state->on_completed(); + } + + void on_completed() const + { + state->on_completed(); + } + }; + +public: + replay_strategy() + : m_state(std::make_shared(std::nullopt, std::nullopt)) + { + } + + replay_strategy(size_t count) + : m_state{std::make_shared(std::max(1, count), std::nullopt)} + { + } + + replay_strategy(size_t count, rpp::schedulers::duration duration) + : m_state{std::make_shared(std::max(1, count), duration)} + { + } + + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + auto get_observer() const + { + return rpp::observer{observer_strategy{m_state}}; + } + + template TObs> + void on_subscribe(TObs&& observer) const + { + { + std::unique_lock lock{m_state->list_mutex}; + m_state->collect_duration(); + for (const auto& value : m_state->values) + { + observer.on_next(value.first); + } + } + m_state->on_subscribe(std::forward(observer)); + } + + rpp::disposable_wrapper get_disposable() const + { + return rpp::disposable_wrapper{m_state}; + } + +private: + std::shared_ptr m_state = std::make_shared(); +}; +} + +namespace rpp::subjects +{ +/** + * @brief Same as rpp::subjects::publish_subject but send all earlier emitted values to any new observers. + * + * @param count maximum element count of the replay buffer (optional) + * @param duration maximum time length the replay buffer (optional) + * + * @tparam Type value provided by this subject + * + * @ingroup subjects + * @see https://reactivex.io/documentation/subject.html + */ +template +class replay_subject final : public details::base_subject> +{ +public: + using details::base_subject>::base_subject; +}; + +/** + * @brief Same as rpp::subjects::replay_subject but on_next/on_error/on_completed calls are serialized via mutex. + * + * @ingroup subjects + * @see https://reactivex.io/documentation/subject.html + */ +template +class serialized_replay_subject final : public details::base_subject> +{ +public: + using details::base_subject>::base_subject; +}; +} \ No newline at end of file diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index efd00c749..a37a1c61e 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -10,13 +10,14 @@ #include -#include #include -#include -#include +#include #include +#include +#include +#include - +#include "copy_count_tracker.hpp" #include "mock_observer.hpp" #include @@ -231,28 +232,200 @@ TEST_CASE("publish subject caches error/completed") } } -TEST_CASE("serialized_subject handles race condition") +TEMPLATE_TEST_CASE("serialized subjects handles race condition", "", rpp::subjects::serialized_subject, rpp::subjects::serialized_replay_subject) { - auto subj = rpp::subjects::serialized_subject{}; + auto subj = TestType{}; + SECTION("call on_next from 2 threads") { bool on_error_called{}; - rpp::source::create([&](auto&& obs) - { + rpp::source::create([&](auto&& obs) { subj.get_observable().subscribe(std::forward(obs)); subj.get_observer().on_next(1); }) | rpp::operators::as_blocking() - | rpp::operators::subscribe([&](int) - { + | rpp::operators::subscribe([&](int) { CHECK(!on_error_called); std::thread{[&]{ subj.get_observer().on_error({}); }}.detach(); std::this_thread::sleep_for(std::chrono::seconds{1}); - CHECK(!on_error_called); - }, - [&](const std::exception_ptr&){ on_error_called = true; }); + CHECK(!on_error_called); }, + [&](const std::exception_ptr&) { on_error_called = true; }); CHECK(on_error_called); } +} + +TEMPLATE_TEST_CASE("replay subject multicasts values and replay", "", rpp::subjects::replay_subject, rpp::subjects::serialized_replay_subject) +{ + SECTION("replay subject") + { + auto mock_1 = mock_observer_strategy{}; + auto mock_2 = mock_observer_strategy{}; + auto mock_3 = mock_observer_strategy{}; + + auto sub = TestType{}; + + SECTION("subscribe multiple observers") + { + sub.get_observable().subscribe(mock_1.get_observer()); + sub.get_observable().subscribe(mock_2.get_observer()); + + sub.get_observer().on_next(1); + sub.get_observer().on_next(2); + sub.get_observer().on_next(3); + + SECTION("observers obtain values") + { + auto validate = [](auto mock) { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_total_on_next_count() == 3); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + }; + validate(mock_1); + validate(mock_2); + } + + sub.get_observable().subscribe(mock_3.get_observer()); + + SECTION("observer obtains replayed values") + { + CHECK(mock_3.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock_3.get_total_on_next_count() == 3); + CHECK(mock_3.get_on_error_count() == 0); + CHECK(mock_3.get_on_completed_count() == 0); + } + + sub.get_observer().on_next(4); + + SECTION("observers stil obtain values") + { + auto validate = [](auto mock) { + CHECK(mock.get_received_values() == std::vector{1, 2, 3, 4}); + CHECK(mock.get_total_on_next_count() == 4); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + }; + validate(mock_1); + validate(mock_2); + validate(mock_3); + } + } + } + + SECTION("bounded replay subject") + { + auto mock_1 = mock_observer_strategy{}; + auto mock_2 = mock_observer_strategy{}; + + size_t bound = 1; + auto sub = TestType{bound}; + + SECTION("subscribe multiple observers") + { + sub.get_observable().subscribe(mock_1.get_observer()); + + sub.get_observer().on_next(1); + sub.get_observer().on_next(2); + sub.get_observer().on_next(3); + + SECTION("observer obtains values") + { + CHECK(mock_1.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock_1.get_total_on_next_count() == 3); + CHECK(mock_1.get_on_error_count() == 0); + CHECK(mock_1.get_on_completed_count() == 0); + } + + sub.get_observable().subscribe(mock_2.get_observer()); + + SECTION("observer obtains latest replayed values") + { + CHECK(mock_2.get_received_values() == std::vector{3}); + CHECK(mock_2.get_total_on_next_count() == 1); + CHECK(mock_2.get_on_error_count() == 0); + CHECK(mock_2.get_on_completed_count() == 0); + } + } + } + + SECTION("bounded replay subject with duration") + { + using namespace std::chrono_literals; + + auto mock_1 = mock_observer_strategy{}; + auto mock_2 = mock_observer_strategy{}; + + size_t bound = 2; + auto duration = 5ms; + auto sub = TestType{bound, duration}; + + SECTION("subscribe multiple observers") + { + sub.get_observable().subscribe(mock_1.get_observer()); + + sub.get_observer().on_next(1); + sub.get_observer().on_next(2); + sub.get_observer().on_next(3); + + SECTION("observer obtains values") + { + CHECK(mock_1.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock_1.get_total_on_next_count() == 3); + CHECK(mock_1.get_on_error_count() == 0); + CHECK(mock_1.get_on_completed_count() == 0); + } + + std::this_thread::sleep_for(duration); + + sub.get_observable().subscribe(mock_2.get_observer()); + + SECTION("subject replay only non expired values") + { + CHECK(mock_2.get_received_values() == std::vector{}); + CHECK(mock_2.get_total_on_next_count() == 0); + CHECK(mock_2.get_on_error_count() == 0); + CHECK(mock_2.get_on_completed_count() == 0); + } + } + } +} + +TEMPLATE_TEST_CASE("replay subject doesn't introduce additional copies", "", rpp::subjects::replay_subject, rpp::subjects::serialized_replay_subject) +{ + SECTION("on_next by rvalue") + { + auto sub = TestType{}; + + sub.get_observable().subscribe([](const copy_count_tracker& tracker) { + CHECK(tracker.get_copy_count() == 0); + CHECK(tracker.get_move_count() == 1); // 1 move to internal replay buffer + }); + + sub.get_observer().on_next(copy_count_tracker{}); + + sub.get_observable().subscribe([](const copy_count_tracker& tracker) { + CHECK(tracker.get_copy_count() == 0); + CHECK(tracker.get_move_count() == 1); // 1 move to internal replay buffer + }); + } + + SECTION("on_next by lvalue") + { + copy_count_tracker tracker{}; + auto sub = TestType{}; + + sub.get_observable().subscribe([](const copy_count_tracker& tracker) { + CHECK(tracker.get_copy_count() == 1); // 1 copy to internal replay buffer + CHECK(tracker.get_move_count() == 0); + }); + + sub.get_observer().on_next(tracker); + + sub.get_observable().subscribe([](const copy_count_tracker& tracker) { + CHECK(tracker.get_copy_count() == 1); // 1 copy to internal replay buffer + CHECK(tracker.get_move_count() == 0); + }); + } } \ No newline at end of file