From e626509be690f283a620129591dbeec08c4595a4 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 20 Nov 2022 22:59:44 +0300 Subject: [PATCH] Fix issue with execution after unsubscribe --- src/rpp/rpp/schedulers/details/worker.hpp | 5 ++++- src/rpp/rpp/schedulers/new_thread_scheduler.hpp | 9 +++++++++ src/rpp/rpp/schedulers/run_loop_scheduler.hpp | 5 +++++ src/rpp/rpp/schedulers/trampoline_scheduler.hpp | 5 +++++ 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/rpp/rpp/schedulers/details/worker.hpp b/src/rpp/rpp/schedulers/details/worker.hpp index 3cb76443e..b30cf9f73 100644 --- a/src/rpp/rpp/schedulers/details/worker.hpp +++ b/src/rpp/rpp/schedulers/details/worker.hpp @@ -38,7 +38,10 @@ class schedulable_wrapper void operator()() { - if (auto duration = m_fn()) + if (!m_strategy.is_subscribed()) + return; + + if (const auto duration = m_fn()) { m_time_point = std::max(m_strategy.now(), m_time_point + duration.value()); diff --git a/src/rpp/rpp/schedulers/new_thread_scheduler.hpp b/src/rpp/rpp/schedulers/new_thread_scheduler.hpp index a36e2a895..edb5ae12a 100644 --- a/src/rpp/rpp/schedulers/new_thread_scheduler.hpp +++ b/src/rpp/rpp/schedulers/new_thread_scheduler.hpp @@ -44,6 +44,13 @@ class new_thread final : public details::scheduler_tag m_state = shared; } + bool is_subscribed() const + { + if (const auto locked = m_state.lock()) + return locked->is_subscribed(); + return false; + } + void defer_at(time_point time_point, constraint::schedulable_fn auto&& fn) const { defer_at(time_point, new_thread_schedulable{*this, time_point, std::forward(fn)}); @@ -65,6 +72,8 @@ class new_thread final : public details::scheduler_tag state(const state&) = delete; state(state&&) noexcept = delete; + bool is_subscribed() const { return m_sub->is_subscribed(); } + void defer_at(time_point time_point, new_thread_schedulable&& fn) { if (m_sub->is_subscribed()) diff --git a/src/rpp/rpp/schedulers/run_loop_scheduler.hpp b/src/rpp/rpp/schedulers/run_loop_scheduler.hpp index d43603cac..71bceba9c 100644 --- a/src/rpp/rpp/schedulers/run_loop_scheduler.hpp +++ b/src/rpp/rpp/schedulers/run_loop_scheduler.hpp @@ -41,6 +41,11 @@ class run_loop final : public details::scheduler_tag , m_sub{sub} {} + bool is_subscribed() const + { + return m_sub.is_subscribed(); + } + void defer_at(time_point time_point, constraint::schedulable_fn auto&& fn) const { defer_at(time_point, run_loop_schedulable{*this, time_point, std::forward(fn)}); diff --git a/src/rpp/rpp/schedulers/trampoline_scheduler.hpp b/src/rpp/rpp/schedulers/trampoline_scheduler.hpp index 11c13df1a..fefbe288c 100644 --- a/src/rpp/rpp/schedulers/trampoline_scheduler.hpp +++ b/src/rpp/rpp/schedulers/trampoline_scheduler.hpp @@ -48,6 +48,11 @@ class trampoline final : public details::scheduler_tag explicit worker_strategy(const rpp::composite_subscription& subscription) : m_sub{ subscription } {} + bool is_subscribed() const + { + return m_sub.is_subscribed(); + } + void defer_at(time_point time_point, constraint::schedulable_fn auto&& fn) const { if (!m_sub.is_subscribed())