Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 7 additions & 23 deletions src/rpp/rpp/schedulers/new_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,9 @@ namespace rpp::schedulers
};
}

~disposable() override
{
if (!m_thread.joinable())
return;

{
std::lock_guard lock{m_state->mutex};
m_state->is_destroying.store(true, std::memory_order::relaxed);
}
m_state->cv.notify_all();
m_thread.detach();
}

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
{
if (is_disposed())
return;

std::lock_guard lock{m_state->mutex};
// guarded by lock
if (const auto queue = m_state->queue_ptr.load(std::memory_order::seq_cst))
Expand Down Expand Up @@ -87,24 +71,23 @@ namespace rpp::schedulers
{
std::atomic<details::schedulables_queue<current_thread::worker_strategy>*> queue_ptr{};
std::atomic_bool is_disposed{};
std::atomic_bool is_destroying{};
};

static void data_thread(std::shared_ptr<state_t> state)
{
auto& queue = current_thread::s_queue;
state->queue_ptr.store(&queue.emplace(state), std::memory_order::seq_cst);

while (!state->is_disposed.load(std::memory_order::seq_cst))
while (true)
{
std::unique_lock lock{state->mutex};

if (state->is_destroying.load(std::memory_order::seq_cst) && queue->is_empty())
if (queue->is_empty() && state->is_disposed.load(std::memory_order::seq_cst))
break;

state->cv.wait(lock, [&] { return state->is_disposed.load(std::memory_order::seq_cst) || !queue->is_empty() || state->is_destroying.load(std::memory_order::seq_cst); });
state->cv.wait(lock, [&] { return !queue->is_empty() || state->is_disposed.load(std::memory_order::seq_cst); });

if (state->is_disposed.load(std::memory_order::seq_cst) || state->is_destroying.load(std::memory_order::seq_cst))
if (queue->is_empty())
break;

if (queue->top()->is_disposed())
Expand All @@ -117,7 +100,7 @@ namespace rpp::schedulers
{
if (const auto now = worker_strategy::now(); now < queue->top()->get_timepoint())
{
state->cv.wait_for(lock, queue->top()->get_timepoint() - now, [&] { return state->is_disposed.load(std::memory_order::seq_cst) || state->is_destroying.load(std::memory_order::seq_cst) || worker_strategy::now() >= queue->top()->get_timepoint(); });
state->cv.wait_for(lock, queue->top()->get_timepoint() - now, [&] { return queue->top()->is_disposed() || worker_strategy::now() >= queue->top()->get_timepoint(); });
continue;
}
}
Expand All @@ -126,7 +109,8 @@ namespace rpp::schedulers
lock.unlock();

if (const auto timepoint = (*top)())
queue->emplace(timepoint.value(), std::move(top));
if (!top->is_disposed())
queue->emplace(timepoint.value(), std::move(top));
}

std::unique_lock lock{state->mutex};
Expand Down
50 changes: 50 additions & 0 deletions src/tests/rpp/test_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,4 +895,54 @@ TEST_CASE("different delaying strategies")
CHECK(scheduler.get_schedulings() == std::vector{now, now + delay});
CHECK(scheduler.get_executions() == std::vector{now});
}
}

TEST_CASE("current_thread inside new_thread")
{
auto worker = std::optional{rpp::schedulers::new_thread{}.create_worker()};
auto d = rpp::composite_disposable_wrapper::make();
auto obs = std::optional{mock_observer_strategy<int>{}.get_observer(d).as_dynamic()};
auto started = std::make_shared<std::atomic_bool>();
auto done = std::make_shared<std::atomic_bool>();

worker->schedule([&](const auto&) {
thread_local rpp::utils::finally_action th{[done] {
done->store(true);
}};
return rpp::schedulers::optional_delay_from_now{};
},
obs.value());

auto current_thread_invoked = std::make_shared<std::atomic_bool>();

worker->schedule([&](const auto& obs) {
worker->get_disposable().dispose();

rpp::schedulers::current_thread{}.create_worker().schedule([current_thread_invoked](const auto&) {
current_thread_invoked->store(true);
return rpp::schedulers::optional_delay_from_now{};
},
obs);

if (current_thread_invoked->load())
throw std::runtime_error{"current_thread was invoked"};

started->store(true);

return rpp::schedulers::optional_delay_from_now{};
},
obs.value());

while (!started->load())
{
}

worker.reset();
obs.reset();
d = rpp::composite_disposable_wrapper::empty();

std::this_thread::sleep_for(std::chrono::seconds{1});

REQUIRE(done->load());
CHECK(current_thread_invoked->load());
}