From d77a6285986bfa9d0aa24fdc7b5dd777e9ba12bc Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 27 Oct 2022 23:43:03 +0300 Subject: [PATCH] Simplify templates for current_thread a bit --- .../rpp/observables/specific_observable.hpp | 31 +++++-------------- .../rpp/schedulers/trampoline_scheduler.hpp | 13 +++++++- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/src/rpp/rpp/observables/specific_observable.hpp b/src/rpp/rpp/observables/specific_observable.hpp index 28e8533a5..9803a185f 100644 --- a/src/rpp/rpp/observables/specific_observable.hpp +++ b/src/rpp/rpp/observables/specific_observable.hpp @@ -73,38 +73,23 @@ class specific_observable : public interface_observable TSub> void actual_subscribe(const TSub& subscriber) const { - // will be scheduled immediately -> reference can be passed - const auto safe_subscribe = [&] - { - try - { - m_on_subscribe(subscriber); - } - catch (...) - { - if (subscriber.is_subscribed()) - subscriber.on_error(std::current_exception()); - else - throw; - } - return schedulers::optional_duration{}; - }; - // take ownership over current thread as early as possible to delay all next "current_thread" schedulings. For example, scheduling of emissions from "just" to delay it till whole chain is subscribed and ready to listened emissions // For example, if we have // rpp::source::just(rpp::schedulers::current_thread{}, 1,2).combine_latest(rpp::source::just(rpp::schedulers::current_thread{}, 1,2)) // // then we expect to see emissions like (1,1) (2,1) (2,2) instead of (2,1) (2,2). TO do it we need to "take ownership" over queue to prevent ANY immediate schedulings from ANY next subscriptions - if (schedulers::current_thread::is_queue_owned()) + const auto drain_on_exit_if_needed = schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); + try { - safe_subscribe(); + m_on_subscribe(subscriber); } - else + catch (...) { - // we need to submit work into queue to take ownership over it. We can submit work with time_point "zero" due to anyway queue is empty and it doesn't make sense, but we can take performance boost due to avoiding extra calls to "now" - schedulers::current_thread::create_worker(subscriber.get_subscription()).schedule(schedulers::time_point{}, safe_subscribe); + if (subscriber.is_subscribed()) + subscriber.on_error(std::current_exception()); + else + throw; } - } private: diff --git a/src/rpp/rpp/schedulers/trampoline_scheduler.hpp b/src/rpp/rpp/schedulers/trampoline_scheduler.hpp index 5da72d0ac..11c13df1a 100644 --- a/src/rpp/rpp/schedulers/trampoline_scheduler.hpp +++ b/src/rpp/rpp/schedulers/trampoline_scheduler.hpp @@ -87,6 +87,9 @@ class trampoline final : public details::scheduler_tag static void drain_queue() { + if (!s_queue.has_value()) + return; + auto reset_at_final = utils::finally_action{ [] { s_queue.reset(); } }; std::optional function{}; @@ -152,7 +155,15 @@ class trampoline final : public details::scheduler_tag inline static thread_local std::optional> s_queue{}; public: - static bool is_queue_owned() { return s_queue.has_value(); } + static utils::finally_action own_queue_and_drain_finally_if_not_owned() + { + const bool someone_owns_queue = s_queue.has_value(); + + if (!someone_owns_queue) + s_queue = std::priority_queue{}; + + return {!someone_owns_queue ? &drain_queue : +[] {}}; + } static auto create_worker(const rpp::composite_subscription& sub = composite_subscription{}) {