From b10255ecadce212abca7d5332b3e176f7310ea6e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 12 Mar 2024 10:09:09 +0300 Subject: [PATCH 1/3] add test --- src/tests/rpp/test_scheduler.cpp | 45 ++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index bd0d07d37..f21a72f3e 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -895,4 +895,49 @@ TEST_CASE("different delaying strategies") CHECK(scheduler.get_schedulings() == std::vector{now, now + delay}); CHECK(scheduler.get_executions() == std::vector{now}); } +} + +TEST_CASE("current_thread inside new_thread") +{ + auto worker = std::optional{rpp::schedulers::new_thread{}.create_worker()}; + auto d = rpp::composite_disposable_wrapper::make(); + auto obs = std::optional{mock_observer_strategy{}.get_observer(d).as_dynamic()}; + auto started = std::make_shared(); + auto done = std::make_shared(); + + worker->schedule([&](const auto&) { + thread_local rpp::utils::finally_action th{[done] { + done->store(true); + }}; + return rpp::schedulers::optional_delay_from_now{}; + }, obs.value()); + + auto current_thread_invoked = std::make_shared(); + + worker->schedule([&](const auto& obs) { + worker->get_disposable().dispose(); + + rpp::schedulers::current_thread{}.create_worker().schedule([current_thread_invoked](const auto&){ + current_thread_invoked->store(true); + return rpp::schedulers::optional_delay_from_now{}; + }, obs); + + if (current_thread_invoked->load()) + throw std::runtime_error{"current_thread was invoked"}; + + started->store(true); + + return rpp::schedulers::optional_delay_from_now{}; + }, obs.value()); + + while (!started->load()){} + + worker.reset(); + obs.reset(); + d = rpp::composite_disposable_wrapper::empty(); + + std::this_thread::sleep_for(std::chrono::seconds{1}); + + REQUIRE(done->load()); + CHECK(current_thread_invoked->load()); } \ No newline at end of file From 9fed43897cdd43875eaed28eb6965caecec6542d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 12 Mar 2024 15:49:00 +0300 Subject: [PATCH 2/3] fix new_thread vs current_thread --- src/rpp/rpp/schedulers/new_thread.hpp | 30 +++++++-------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/src/rpp/rpp/schedulers/new_thread.hpp b/src/rpp/rpp/schedulers/new_thread.hpp index 4aa176dec..6ceabe5b5 100644 --- a/src/rpp/rpp/schedulers/new_thread.hpp +++ b/src/rpp/rpp/schedulers/new_thread.hpp @@ -40,25 +40,9 @@ namespace rpp::schedulers }; } - ~disposable() override - { - if (!m_thread.joinable()) - return; - - { - std::lock_guard lock{m_state->mutex}; - m_state->is_destroying.store(true, std::memory_order::relaxed); - } - m_state->cv.notify_all(); - m_thread.detach(); - } - template Fn> void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) { - if (is_disposed()) - return; - std::lock_guard lock{m_state->mutex}; // guarded by lock if (const auto queue = m_state->queue_ptr.load(std::memory_order::seq_cst)) @@ -87,7 +71,6 @@ namespace rpp::schedulers { std::atomic*> queue_ptr{}; std::atomic_bool is_disposed{}; - std::atomic_bool is_destroying{}; }; static void data_thread(std::shared_ptr state) @@ -95,16 +78,16 @@ namespace rpp::schedulers auto& queue = current_thread::s_queue; state->queue_ptr.store(&queue.emplace(state), std::memory_order::seq_cst); - while (!state->is_disposed.load(std::memory_order::seq_cst)) + while (true) { std::unique_lock lock{state->mutex}; - if (state->is_destroying.load(std::memory_order::seq_cst) && queue->is_empty()) + if (queue->is_empty() && state->is_disposed.load(std::memory_order::seq_cst)) break; - state->cv.wait(lock, [&] { return state->is_disposed.load(std::memory_order::seq_cst) || !queue->is_empty() || state->is_destroying.load(std::memory_order::seq_cst); }); + state->cv.wait(lock, [&] { return !queue->is_empty() || state->is_disposed.load(std::memory_order::seq_cst); }); - if (state->is_disposed.load(std::memory_order::seq_cst) || state->is_destroying.load(std::memory_order::seq_cst)) + if (queue->is_empty()) break; if (queue->top()->is_disposed()) @@ -117,7 +100,7 @@ namespace rpp::schedulers { if (const auto now = worker_strategy::now(); now < queue->top()->get_timepoint()) { - state->cv.wait_for(lock, queue->top()->get_timepoint() - now, [&] { return state->is_disposed.load(std::memory_order::seq_cst) || state->is_destroying.load(std::memory_order::seq_cst) || worker_strategy::now() >= queue->top()->get_timepoint(); }); + state->cv.wait_for(lock, queue->top()->get_timepoint() - now, [&] { return queue->top()->is_disposed() || worker_strategy::now() >= queue->top()->get_timepoint(); }); continue; } } @@ -126,7 +109,8 @@ namespace rpp::schedulers lock.unlock(); if (const auto timepoint = (*top)()) - queue->emplace(timepoint.value(), std::move(top)); + if (!top->is_disposed()) + queue->emplace(timepoint.value(), std::move(top)); } std::unique_lock lock{state->mutex}; From 2f742840a1239562b06b30ce20caa186837c13c5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 12 Mar 2024 12:50:43 +0000 Subject: [PATCH 3/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tests/rpp/test_scheduler.cpp | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index f21a72f3e..3e40eeec8 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -899,28 +899,30 @@ TEST_CASE("different delaying strategies") TEST_CASE("current_thread inside new_thread") { - auto worker = std::optional{rpp::schedulers::new_thread{}.create_worker()}; - auto d = rpp::composite_disposable_wrapper::make(); - auto obs = std::optional{mock_observer_strategy{}.get_observer(d).as_dynamic()}; + auto worker = std::optional{rpp::schedulers::new_thread{}.create_worker()}; + auto d = rpp::composite_disposable_wrapper::make(); + auto obs = std::optional{mock_observer_strategy{}.get_observer(d).as_dynamic()}; auto started = std::make_shared(); - auto done = std::make_shared(); + auto done = std::make_shared(); worker->schedule([&](const auto&) { thread_local rpp::utils::finally_action th{[done] { done->store(true); }}; return rpp::schedulers::optional_delay_from_now{}; - }, obs.value()); + }, + obs.value()); auto current_thread_invoked = std::make_shared(); worker->schedule([&](const auto& obs) { worker->get_disposable().dispose(); - rpp::schedulers::current_thread{}.create_worker().schedule([current_thread_invoked](const auto&){ + rpp::schedulers::current_thread{}.create_worker().schedule([current_thread_invoked](const auto&) { current_thread_invoked->store(true); return rpp::schedulers::optional_delay_from_now{}; - }, obs); + }, + obs); if (current_thread_invoked->load()) throw std::runtime_error{"current_thread was invoked"}; @@ -928,9 +930,12 @@ TEST_CASE("current_thread inside new_thread") started->store(true); return rpp::schedulers::optional_delay_from_now{}; - }, obs.value()); + }, + obs.value()); - while (!started->load()){} + while (!started->load()) + { + } worker.reset(); obs.reset();