Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/rpp/rpp/schedulers/details/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ class schedulable_wrapper

void operator()()
{
if (auto duration = m_fn())
if (!m_strategy.is_subscribed())
return;

if (const auto duration = m_fn())
{
m_time_point = std::max(m_strategy.now(), m_time_point + duration.value());

Expand Down
9 changes: 9 additions & 0 deletions src/rpp/rpp/schedulers/new_thread_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ class new_thread final : public details::scheduler_tag
m_state = shared;
}

bool is_subscribed() const
{
if (const auto locked = m_state.lock())
return locked->is_subscribed();
return false;
}

void defer_at(time_point time_point, constraint::schedulable_fn auto&& fn) const
{
defer_at(time_point, new_thread_schedulable{*this, time_point, std::forward<decltype(fn)>(fn)});
Expand All @@ -65,6 +72,8 @@ class new_thread final : public details::scheduler_tag
state(const state&) = delete;
state(state&&) noexcept = delete;

bool is_subscribed() const { return m_sub->is_subscribed(); }

void defer_at(time_point time_point, new_thread_schedulable&& fn)
{
if (m_sub->is_subscribed())
Expand Down
5 changes: 5 additions & 0 deletions src/rpp/rpp/schedulers/run_loop_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ class run_loop final : public details::scheduler_tag
, m_sub{sub}
{}

bool is_subscribed() const
{
return m_sub.is_subscribed();
}

void defer_at(time_point time_point, constraint::schedulable_fn auto&& fn) const
{
defer_at(time_point, run_loop_schedulable{*this, time_point, std::forward<decltype(fn)>(fn)});
Expand Down
5 changes: 5 additions & 0 deletions src/rpp/rpp/schedulers/trampoline_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class trampoline final : public details::scheduler_tag
explicit worker_strategy(const rpp::composite_subscription& subscription)
: m_sub{ subscription } {}

bool is_subscribed() const
{
return m_sub.is_subscribed();
}

void defer_at(time_point time_point, constraint::schedulable_fn auto&& fn) const
{
if (!m_sub.is_subscribed())
Expand Down