From 0586dc9bc8210db50248d28c5680fc6c98f4c183 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 7 Dec 2022 23:54:21 +0300 Subject: [PATCH 1/7] make queue-based scheduling to delay --- src/rpp/rpp/operators/delay.hpp | 163 ++++++++++++++++++++++---------- 1 file changed, 114 insertions(+), 49 deletions(-) diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 33ef7d16b..ef5916a48 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -16,66 +16,129 @@ #include // create_subscriber_with_state #include // own forwarding #include // constraint::subscriber_of_type +#include + +#include IMPLEMENTATION_FILE(delay_tag); namespace rpp::details { -/** - * Functor (type-erasure) of "delay" for on_next operator. - */ -struct delay_on_next +struct completion {}; + +template +class queue_based_worker final : std::enable_shared_from_this> { - schedulers::duration delay; +public: + queue_based_worker(schedulers::duration delay, Worker&& worker, const Subscriber& subscriber) + : m_delay{delay} + , m_worker{std::move(worker)} + , m_subscriber{subscriber} {} - void operator()(auto&& value, const auto& subscriber, const auto& worker) const + queue_based_worker(schedulers::duration delay, Worker&& worker, Subscriber&& subscriber) + : m_delay{delay} + , m_worker{std::move(worker)} + , m_subscriber{std::move(subscriber)} {} + + struct on_next { - worker.schedule(delay, - [value = std::forward(value), subscriber]() - { - subscriber.on_next(std::move(value)); - return schedulers::optional_duration{}; - }); - } -}; + void operator()(auto&& value, const std::shared_ptr>& state) const + { + state->emplace(std::forward(value)); + } + }; -/** - * Functor (type-erasure) of "delay" for on_error operator. - */ -struct delay_on_error -{ - void operator()(const std::exception_ptr& err, const auto& subscriber, const auto& worker) const + struct on_error { - // on-error must be delivered as soon as possible - worker.schedule([err, subscriber]() - { - subscriber.on_error(err); - return schedulers::optional_duration{}; - }); + void operator()(const std::exception_ptr& err, const std::shared_ptr>& state) const + { + state->emplace(err); + } + }; + + struct on_completed + { + void operator()(const std::shared_ptr>& state) const + { + state->emplace(completion{}); + } + }; + +private: + void emplace(std::variant&& item) + { + if (const auto timepoint = emplace_safe(std::move(item))) + { + m_worker.schedule(timepoint.value(), + [state = this->shared_from_this()]()-> schedulers::optional_duration + { + return state->drain_queue(); + }); + } } -}; -/** - * Functor (type-erasure) of "delay" for on_completed operator. - */ -struct delay_on_completed -{ - schedulers::duration delay; + std::optional emplace_safe(std::variant&& item) + { + std::lock_guard lock{m_mutex}; + m_queue.emplace(++m_current_id, m_worker.now()+m_delay, std::move(item)); + if (!m_active && m_queue.size() == 1) + { + m_active = true; + return m_queue.top().time; + } + return {}; + } - void operator()(const auto& subscriber, const auto& worker) const + schedulers::optional_duration drain_queue() { - worker.schedule(delay, - [subscriber]() - { - subscriber.on_completed(); - return schedulers::optional_duration{}; - }); + const auto now = m_worker.now(); + while (true) + { + std::unique_lock lock{m_mutex}; + if (m_queue.empty()) + { + m_active = false; + return {}; + } + + auto& top = m_queue.top(); + if (top.time > now) + return top.time - now; + + auto item = std::move(top.item); + m_queue.pop(); + lock.unlock(); + + std::visit(utils::overloaded + { + [&](T&& v) { m_subscriber.on_next(std::move(v)); }, + [&](const std::exception_ptr& err) { m_subscriber.on_error(err); }, + [&](completion) { m_subscriber.on_completed(); } + }, + std::move(item)); + } } + +private: + struct emission + { + size_t id{}; + schedulers::time_point time{}; + std::variant item{}; + + bool operator<(const emission& other) const { return std::tie(time, id) >= std::tie(other.time, other.id); } + }; + + schedulers::duration m_delay; + Worker m_worker; + Subscriber m_subscriber; + std::mutex m_mutex{}; + size_t m_current_id{}; + std::priority_queue m_queue{}; + bool m_active{}; }; -/** - * \brief Functor of OperatorFn for "combine_latest" operator (used by "lift"). - */ + template struct delay_impl { @@ -86,14 +149,16 @@ struct delay_impl auto operator()(TSub&& subscriber) const { auto worker = scheduler.create_worker(subscriber.get_subscription()); - auto subscription = subscriber.get_subscription().make_child(); + + using state_t = queue_based_worker, std::decay_t>; + auto state = std::make_shared(delay, std::move(worker), std::forward(subscriber)); + return create_subscriber_with_state(std::move(subscription), - delay_on_next{delay}, - delay_on_error{}, - delay_on_completed{delay}, - std::forward(subscriber), - std::move(worker)); + typename state_t::on_next{}, + typename state_t::on_error{}, + typename state_t::on_completed{}, + std::move(state)); } }; } // namespace rpp::details From 78f2221ca3c8fa5cafba8db3b2a18f616faf912a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 8 Dec 2022 00:08:37 +0300 Subject: [PATCH 2/7] fix tests --- src/rpp/rpp/operators/delay.hpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index ef5916a48..b6b45b4f7 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -27,7 +27,7 @@ namespace rpp::details struct completion {}; template -class queue_based_worker final : std::enable_shared_from_this> +class queue_based_worker final : public std::enable_shared_from_this> { public: queue_based_worker(schedulers::duration delay, Worker&& worker, const Subscriber& subscriber) @@ -79,8 +79,9 @@ class queue_based_worker final : std::enable_shared_from_this emplace_safe(std::variant&& item) { - std::lock_guard lock{m_mutex}; - m_queue.emplace(++m_current_id, m_worker.now()+m_delay, std::move(item)); + std::lock_guard lock{m_mutex}; + const auto delay = std::holds_alternative(item) ? schedulers::duration{0} : m_delay ; + m_queue.emplace(++m_current_id, m_worker.now()+delay, std::move(item)); if (!m_active && m_queue.size() == 1) { m_active = true; @@ -91,7 +92,6 @@ class queue_based_worker final : std::enable_shared_from_this now) return top.time - now; From e185974017ab10b48100341a6602e308e2548d30 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 8 Dec 2022 00:13:25 +0300 Subject: [PATCH 3/7] compile fix --- src/rpp/rpp/operators/delay.hpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index b6b45b4f7..979e972da 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -123,6 +123,11 @@ class queue_based_worker final : public std::enable_shared_from_this&& item) + : id{id} + , time{std::move(time)} + , item{std::move(item)} {} + size_t id{}; schedulers::time_point time{}; std::variant item{}; From 0b2ba04a92d67d66df68af039551eb37c31e656c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 8 Dec 2022 12:52:51 +0300 Subject: [PATCH 4/7] Cover with tests --- src/rpp/rpp/operators/delay.hpp | 17 +++--- src/tests/rpp/test_delay.cpp | 44 ++++++++++++++-- src/tests/rpp/test_observe_on.cpp | 86 ++++++++++++++++++++++++++++--- 3 files changed, 129 insertions(+), 18 deletions(-) diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 979e972da..b50e1f850 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -65,9 +65,10 @@ class queue_based_worker final : public std::enable_shared_from_this&& item) + template + void emplace(TT&& item) { - if (const auto timepoint = emplace_safe(std::move(item))) + if (const auto timepoint = emplace_safe(std::forward(item))) { m_worker.schedule(timepoint.value(), [state = this->shared_from_this()]()-> schedulers::optional_duration @@ -77,11 +78,12 @@ class queue_based_worker final : public std::enable_shared_from_this emplace_safe(std::variant&& item) + template + std::optional emplace_safe(TT&& item) { std::lock_guard lock{m_mutex}; - const auto delay = std::holds_alternative(item) ? schedulers::duration{0} : m_delay ; - m_queue.emplace(++m_current_id, m_worker.now()+delay, std::move(item)); + const auto delay = std::is_same_v> ? schedulers::duration{0} : m_delay; + m_queue.emplace(++m_current_id, m_worker.now()+delay, std::forward(item)); if (!m_active && m_queue.size() == 1) { m_active = true; @@ -123,10 +125,11 @@ class queue_based_worker final : public std::enable_shared_from_this&& item) + template + emission(size_t id, schedulers::time_point time, TT&& item) : id{id} , time{std::move(time)} - , item{std::move(item)} {} + , item{std::forward(item)} {} size_t id{}; schedulers::time_point time{}; diff --git a/src/tests/rpp/test_delay.cpp b/src/tests/rpp/test_delay.cpp index dfafa9ae7..bdd78ea7c 100644 --- a/src/tests/rpp/test_delay.cpp +++ b/src/tests/rpp/test_delay.cpp @@ -13,17 +13,18 @@ #include #include #include +#include #include #include #include SCENARIO("delay mirrors both source observable and trigger observable", "[delay]") { + auto mock = mock_observer{}; std::chrono::milliseconds delay_duration{300}; GIVEN("observable of -1-|") { - auto mock = mock_observer{}; const auto now = rpp::schedulers::clock_type::now(); rpp::source::just(1) @@ -60,7 +61,6 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay] GIVEN("observable of -x") { - auto mock = mock_observer{}; const auto now = rpp::schedulers::clock_type::now(); rpp::source::error(std::make_exception_ptr(std::runtime_error{""})) @@ -87,7 +87,6 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay] GIVEN("observable of -|") { - auto mock = mock_observer{}; const auto now = rpp::schedulers::clock_type::now(); rpp::source::empty() @@ -111,4 +110,43 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay] CHECK(mock.get_on_error_count() == 0); } } + + GIVEN("subject with items") + { + auto subj = rpp::subjects::publish_subject{}; + + WHEN("subscribe on subject via delay and doing recursive submit from another thread") + { + THEN("all values obtained in the same thread") + { + auto current_thread = std::this_thread::get_id(); + + auto sub = subj.get_observable() + .delay(delay_duration, rpp::schedulers::trampoline{}) + .subscribe([&](int v) + { + CHECK(std::this_thread::get_id() == current_thread); + + mock.on_next(v); + + if (v == 1) + { + std::thread{[&]{subj.get_subscriber().on_next(2);}}.join(); + + THEN("no recursive on_next calls") + { + CHECK(mock.get_received_values() == std::vector{1}); + } + } + }); + + subj.get_subscriber().on_next(1); + + AND_THEN("all values obtained") + { + CHECK(mock.get_received_values() == std::vector{ 1, 2 }); + } + } + } + } } diff --git a/src/tests/rpp/test_observe_on.cpp b/src/tests/rpp/test_observe_on.cpp index 2ce5d90df..4af1d70cf 100644 --- a/src/tests/rpp/test_observe_on.cpp +++ b/src/tests/rpp/test_observe_on.cpp @@ -1,10 +1,10 @@ // ReactivePlusPlus library -// +// // Copyright Aleksey Loginov 2022 - present. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // https://www.boost.org/LICENSE_1_0.txt) -// +// // Project home: https://github.com/victimsnino/ReactivePlusPlus #include "copy_count_tracker.hpp" @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -39,8 +40,8 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" CHECK(mock.get_received_values() == vals); CHECK(mock.get_on_completed_count() == 1); - CHECK(scheduler.get_schedulings() == std::vector{ initial_time, initial_time, initial_time });//2 items + on_completed - CHECK(scheduler.get_executions() == std::vector{ initial_time, initial_time, initial_time });//2 items + on_completed + CHECK(scheduler.get_schedulings() == std::vector{ initial_time, initial_time, initial_time });//2 items + on_completed + CHECK(scheduler.get_executions() == std::vector{ initial_time, initial_time, initial_time });//2 items + on_completed } } } @@ -85,6 +86,75 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" } } } + + GIVEN("subject with items") + { + auto mock = mock_observer{}; + auto subj = rpp::subjects::publish_subject{}; + WHEN("subscribe on subject via observe_on and doing recursive submit") + { + auto sub = subj.get_observable() + .observe_on(scheduler) + .subscribe([&](int v) + { + mock.on_next(v); + + if (v == 1) + { + subj.get_subscriber().on_next(2); + THEN("no direct schedule to scheduler after recursive on_next") + { + CHECK(scheduler.get_schedulings() == std::vector{ initial_time }); + CHECK(scheduler.get_executions() == std::vector{ initial_time }); + CHECK(mock.get_received_values() == std::vector{1}); + } + } + }); + + subj.get_subscriber().on_next(1); + + THEN("second job executed without extra schedule") + { + CHECK(scheduler.get_schedulings() == std::vector{ initial_time }); + CHECK(scheduler.get_executions() == std::vector{ initial_time }); + CHECK(mock.get_received_values() == std::vector{ 1, 2 }); + } + } + + WHEN("subscribe on subject via observe_on trampoline and doing recursive submit from another thread") + { + THEN("all values obtained in the same thread") + { + auto current_thread = std::this_thread::get_id(); + + auto sub = subj.get_observable() + .observe_on(rpp::schedulers::trampoline{}) + .subscribe([&](int v) + { + CHECK(std::this_thread::get_id() == current_thread); + + mock.on_next(v); + + if (v == 1) + { + std::thread{[&]{subj.get_subscriber().on_next(2);}}.join(); + + THEN("no recursive on_next calls") + { + CHECK(mock.get_received_values() == std::vector{1}); + } + } + }); + + subj.get_subscriber().on_next(1); + + AND_THEN("all values obtained") + { + CHECK(mock.get_received_values() == std::vector{ 1, 2 }); + } + } + } + } } SCENARIO("observe_on with immediate doesn't produce a lot of copies", "[operators][observe_on][track_copy]") @@ -95,9 +165,9 @@ SCENARIO("observe_on with immediate doesn't produce a lot of copies", "[operator WHEN("subscribe on it via scheduler") { tracker.get_observable().observe_on(rpp::schedulers::immediate{}).subscribe(); - THEN("only 1 extra copy") + THEN("only 2 extra copies") { - CHECK(tracker.get_copy_count() == 1); + CHECK(tracker.get_copy_count() == 2); CHECK(tracker.get_move_count() == 0); } } @@ -108,9 +178,9 @@ SCENARIO("observe_on with immediate doesn't produce a lot of copies", "[operator WHEN("subscribe on it via scheduler") { tracker.get_observable_for_move().observe_on(rpp::schedulers::immediate{}).subscribe(); - THEN("only 1 extra move") + THEN("only 1 extra copy and 1 extra move") { - CHECK(tracker.get_copy_count() == 0); + CHECK(tracker.get_copy_count() == 1); CHECK(tracker.get_move_count() == 1); } } From b5aa4e2f33f35109441fadaddf33eb6717cb62c5 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 8 Dec 2022 18:18:16 +0300 Subject: [PATCH 5/7] compile fix --- src/tests/rpp/test_observe_on.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/tests/rpp/test_observe_on.cpp b/src/tests/rpp/test_observe_on.cpp index 4af1d70cf..49f246ae9 100644 --- a/src/tests/rpp/test_observe_on.cpp +++ b/src/tests/rpp/test_observe_on.cpp @@ -89,7 +89,7 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" GIVEN("subject with items") { - auto mock = mock_observer{}; + auto int_mock = mock_observer{}; auto subj = rpp::subjects::publish_subject{}; WHEN("subscribe on subject via observe_on and doing recursive submit") { @@ -97,7 +97,7 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" .observe_on(scheduler) .subscribe([&](int v) { - mock.on_next(v); + int_mock.on_next(v); if (v == 1) { @@ -106,7 +106,7 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" { CHECK(scheduler.get_schedulings() == std::vector{ initial_time }); CHECK(scheduler.get_executions() == std::vector{ initial_time }); - CHECK(mock.get_received_values() == std::vector{1}); + CHECK(int_mock.get_received_values() == std::vector{1}); } } }); @@ -117,7 +117,7 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" { CHECK(scheduler.get_schedulings() == std::vector{ initial_time }); CHECK(scheduler.get_executions() == std::vector{ initial_time }); - CHECK(mock.get_received_values() == std::vector{ 1, 2 }); + CHECK(int_mock.get_received_values() == std::vector{ 1, 2 }); } } @@ -133,7 +133,7 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" { CHECK(std::this_thread::get_id() == current_thread); - mock.on_next(v); + int_mock.on_next(v); if (v == 1) { @@ -141,7 +141,7 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" THEN("no recursive on_next calls") { - CHECK(mock.get_received_values() == std::vector{1}); + CHECK(int_mock.get_received_values() == std::vector{1}); } } }); @@ -150,7 +150,7 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" AND_THEN("all values obtained") { - CHECK(mock.get_received_values() == std::vector{ 1, 2 }); + CHECK(int_mock.get_received_values() == std::vector{ 1, 2 }); } } } From 5af4225c40247b8e210b0aa11922b09c1f98a53b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 8 Dec 2022 22:55:18 +0300 Subject: [PATCH 6/7] remove test --- src/tests/rpp/test_observe_on.cpp | 32 +------------------------------ 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/src/tests/rpp/test_observe_on.cpp b/src/tests/rpp/test_observe_on.cpp index 49f246ae9..8b8480b1a 100644 --- a/src/tests/rpp/test_observe_on.cpp +++ b/src/tests/rpp/test_observe_on.cpp @@ -155,34 +155,4 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" } } } -} - -SCENARIO("observe_on with immediate doesn't produce a lot of copies", "[operators][observe_on][track_copy]") -{ - GIVEN("observable with value by copy") - { - auto tracker = copy_count_tracker{}; - WHEN("subscribe on it via scheduler") - { - tracker.get_observable().observe_on(rpp::schedulers::immediate{}).subscribe(); - THEN("only 2 extra copies") - { - CHECK(tracker.get_copy_count() == 2); - CHECK(tracker.get_move_count() == 0); - } - } - } - GIVEN("observable with value by move") - { - auto tracker = copy_count_tracker{}; - WHEN("subscribe on it via scheduler") - { - tracker.get_observable_for_move().observe_on(rpp::schedulers::immediate{}).subscribe(); - THEN("only 1 extra copy and 1 extra move") - { - CHECK(tracker.get_copy_count() == 1); - CHECK(tracker.get_move_count() == 1); - } - } - } -} +} \ No newline at end of file From 434e148eceeb069029ed25ffd09c875de5a29e60 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 8 Dec 2022 23:12:27 +0300 Subject: [PATCH 7/7] compile fix --- src/tests/rppqt/test_main_thread_scheduler.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tests/rppqt/test_main_thread_scheduler.cpp b/src/tests/rppqt/test_main_thread_scheduler.cpp index ff18d3987..2210202de 100644 --- a/src/tests/rppqt/test_main_thread_scheduler.cpp +++ b/src/tests/rppqt/test_main_thread_scheduler.cpp @@ -9,6 +9,7 @@ #include #include +#include #include