diff --git a/CMakePresets.json b/CMakePresets.json index 08335a799..c392d9ed0 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -47,7 +47,7 @@ "inherits" : ["ci-flags"], "hidden": true, "cacheVariables": { - "CMAKE_CXX_FLAGS": "/utf-8 /W4 /permissive- /volatile:iso /Zc:preprocessor /EHsc /Zc:__cplusplus /Zc:externConstexpr /Zc:throwingNew" + "CMAKE_CXX_FLAGS": "/utf-8 /W4 /permissive- /volatile:iso /Zc:preprocessor /EHsc /Zc:__cplusplus /Zc:externConstexpr /Zc:throwingNew /bigobj" } }, { diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index 55b0f4f62..23024edab 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -146,6 +146,31 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) }); } + + SECTION("from array of 1 - create + as_blocking + subscribe + new_thread") + { + std::array vals{123}; + TEST_RPP([&]() { + (rpp::source::from_iterable(vals, rpp::schedulers::new_thread{}) | rpp::ops::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + TEST_RXCPP([&]() { + (rxcpp::observable<>::iterate(vals, rxcpp::observe_on_new_thread()) | rxcpp::operators::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } + + SECTION("from array of 1000 - create + as_blocking + subscribe + new_thread") + { + std::array vals{}; + TEST_RPP([&]() { + (rpp::source::from_iterable(vals, rpp::schedulers::new_thread{}) | rpp::ops::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + TEST_RXCPP([&]() { + (rxcpp::observable<>::iterate(vals, rxcpp::observe_on_new_thread()) | rxcpp::operators::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } + SECTION("concat_as_source of just(1 immediate) create + subscribe") { TEST_RPP([&]() { diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index 74aad619b..6ac994884 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -87,7 +87,7 @@ namespace rpp::schedulers friend class new_thread; class worker_strategy; - inline static thread_local std::optional> s_queue{}; + inline static thread_local details::schedulables_queue* s_queue{}; struct is_queue_is_empty { @@ -97,38 +97,43 @@ namespace rpp::schedulers }; - static void drain_current_queue() + static void drain_queue() noexcept { - drain_queue(s_queue); - } - - static void drain_queue(std::optional>& queue) - { - while (!queue->is_empty()) + while (s_queue && !s_queue->is_empty()) { - auto top = queue->pop(); + auto top = s_queue->pop(); if (top->is_disposed()) continue; - std::optional timepoint{top->get_timepoint()}; - // immediate like scheduling - do - { - if (timepoint && !top->is_disposed()) - details::sleep_until(top->get_timepoint()); - - if (top->is_disposed()) - timepoint.reset(); - else - timepoint = (*top)(); - - } while (queue->is_empty() && timepoint.has_value()); + details::sleep_until(top->get_timepoint()); - if (timepoint.has_value()) - queue->emplace(timepoint.value(), std::move(top)); + while (true) + { + if (const auto res = top->make_advanced_call()) + { + if (!top->is_disposed()) + { + if (s_queue->is_empty()) + { + if (const auto d = std::get_if(&res->get())) + { + std::this_thread::sleep_for(d->value); + } + else + { + details::sleep_until(top->handle_advanced_call(res.value())); + } + continue; + } + + s_queue->emplace(top->handle_advanced_call(res.value()), std::move(top)); + } + } + break; + } } - queue.reset(); + s_queue = nullptr; } class worker_strategy @@ -140,26 +145,20 @@ namespace rpp::schedulers if (handler.is_disposed()) return; - auto& queue = s_queue; - const bool someone_owns_queue = queue.has_value(); - std::optional timepoint{}; - if (!someone_owns_queue) + if (!s_queue) { - queue.emplace(); + details::schedulables_queue queue{}; + s_queue = &queue; - timepoint = details::immediate_scheduling_while_condition(duration, is_queue_is_empty{queue.value()}, fn, handler, args...); + const auto timepoint = details::immediate_scheduling_while_condition(duration, is_queue_is_empty{queue}, fn, handler, args...); if (!timepoint || handler.is_disposed()) - return drain_queue(queue); - } - else - { - timepoint = now() + duration; - } + return drain_queue(); - queue->emplace(timepoint.value(), std::forward(fn), std::forward(handler), std::forward(args)...); + s_queue->emplace(timepoint.value(), std::forward(fn), std::forward(handler), std::forward(args)...); + return drain_queue(); + } - if (!someone_owns_queue) - drain_queue(queue); + s_queue->emplace(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); } template Fn> @@ -168,7 +167,7 @@ namespace rpp::schedulers if (handler.is_disposed()) return; - if (s_queue.has_value()) + if (s_queue) { s_queue->emplace(tp, std::forward(fn), std::forward(handler), std::forward(args)...); } @@ -183,15 +182,33 @@ namespace rpp::schedulers static rpp::schedulers::time_point now() { return details::now(); } }; - public: - static rpp::utils::finally_action own_queue_and_drain_finally_if_not_owned() + private: + class own_queue_guard { - const bool someone_owns_queue = s_queue.has_value(); + public: + own_queue_guard() + : m_clear_on_destruction{!s_queue} + { + if (m_clear_on_destruction) + s_queue = &m_queue; + } + ~own_queue_guard() + { + if (m_clear_on_destruction) + drain_queue(); + } + own_queue_guard(const own_queue_guard&) = delete; + own_queue_guard(own_queue_guard&&) = delete; - if (!someone_owns_queue) - s_queue.emplace(); + private: + details::schedulables_queue m_queue{}; + bool m_clear_on_destruction{}; + }; - return rpp::utils::finally_action{!someone_owns_queue ? &drain_current_queue : &rpp::utils::empty_function<>}; + public: + static own_queue_guard own_queue_and_drain_finally_if_not_owned() + { + return own_queue_guard{}; } static rpp::schedulers::worker create_worker() diff --git a/src/rpp/rpp/schedulers/details/queue.hpp b/src/rpp/rpp/schedulers/details/queue.hpp index 44b4c6870..26f424430 100644 --- a/src/rpp/rpp/schedulers/details/queue.hpp +++ b/src/rpp/rpp/schedulers/details/queue.hpp @@ -17,12 +17,15 @@ #include #include +#include "rpp/utils/functors.hpp" + #include #include #include #include #include #include +#include namespace rpp::schedulers::details { @@ -36,8 +39,41 @@ namespace rpp::schedulers::details virtual ~schedulable_base() noexcept = default; - virtual std::optional operator()() noexcept = 0; - virtual bool is_disposed() const noexcept = 0; + virtual std::optional operator()() noexcept = 0; + + class advanced_call + { + public: + advanced_call(std::variant data) + : m_data{data} + { + } + + const std::variant& get() const { return m_data; } + + auto visit(const auto& fn) const + { + return std::visit(fn, m_data); + } + + bool can_run_immediately() const noexcept + { + return visit(rpp::utils::overloaded{[](const delay_to&) { + return false; + }, + [](const auto& d) { + return d.value == rpp::schedulers::duration::zero(); + }}); + } + + private: + std::variant m_data; + }; + + virtual std::optional make_advanced_call() noexcept = 0; + virtual time_point handle_advanced_call(const advanced_call&) noexcept = 0; + + virtual bool is_disposed() const noexcept = 0; time_point get_timepoint() const { return m_time_point; } @@ -54,6 +90,22 @@ namespace rpp::schedulers::details m_next = std::move(next); } + protected: + template + auto get_advanced_call_handler() const + { + return rpp::utils::overloaded{ + [](const delay_from_now& v) { + return NowStrategy::now() + v.value; + }, + [this](const delay_from_this_timepoint& v) { + return get_timepoint() + v.value; + }, + [](const delay_to& v) { + return v.value; + }}; + } + private: std::shared_ptr m_next{}; time_point m_time_point; @@ -77,21 +129,7 @@ namespace rpp::schedulers::details try { if (const auto res = m_args.apply(m_fn)) - { - if constexpr (constraint::schedulable_delay_from_now_fn) - { - return NowStrategy::now() + res->value; - } - else if constexpr (constraint::schedulable_delay_to_fn) - { - return res->value; - } - else - { - static_assert(constraint::schedulable_delay_from_this_timepoint_fn); - return get_timepoint() + res->value; - } - } + return get_advanced_call_handler()(res.value()); } catch (...) { @@ -100,6 +138,25 @@ namespace rpp::schedulers::details return std::nullopt; } + std::optional make_advanced_call() noexcept override + { + try + { + if (const auto res = m_args.apply(m_fn)) + return advanced_call{res.value()}; + } + catch (...) + { + m_args.template get<0>().on_error(std::current_exception()); + } + return std::nullopt; + } + + time_point handle_advanced_call(const advanced_call& v) noexcept override + { + return v.visit(get_advanced_call_handler()); + } + bool is_disposed() const noexcept override { return m_args.template get<0>().is_disposed(); } private: @@ -151,7 +208,7 @@ namespace rpp::schedulers::details schedulables_queue& operator=(const schedulables_queue& other) = delete; schedulables_queue& operator=(schedulables_queue&& other) noexcept = default; - schedulables_queue(std::shared_ptr shared_data) + schedulables_queue(std::weak_ptr shared_data) : m_shared_data{std::move(shared_data)} { } @@ -162,8 +219,6 @@ namespace rpp::schedulers::details using schedulable_type = specific_schedulable, std::decay_t, std::decay_t...>; emplace_impl(std::make_shared(timepoint, std::forward(fn), std::forward(handler), std::forward(args)...)); - if (m_shared_data) - m_shared_data->cv.notify_all(); } void emplace(const time_point& timepoint, std::shared_ptr&& schedulable) @@ -173,8 +228,6 @@ namespace rpp::schedulers::details schedulable->set_timepoint(timepoint); emplace_impl(std::move(schedulable)); - if (m_shared_data) - m_shared_data->cv.notify_all(); } bool is_empty() const { return !m_head; } @@ -193,7 +246,13 @@ namespace rpp::schedulers::details void emplace_impl(std::shared_ptr&& schedulable) { // needed in case of new_thread and current_thread shares same queue - optional_mutex mutex{m_shared_data ? &m_shared_data->mutex : nullptr}; + const auto s = m_shared_data.lock(); + const rpp::utils::finally_action _{[&] { + if (s) + s->cv.notify_one(); + }}; + + optional_mutex mutex{s ? &s->mutex : nullptr}; std::lock_guard lock{mutex}; if (!m_head || schedulable->get_timepoint() < m_head->get_timepoint()) @@ -215,7 +274,7 @@ namespace rpp::schedulers::details } private: - std::shared_ptr m_head{}; - std::shared_ptr m_shared_data{}; + std::shared_ptr m_head{}; + std::weak_ptr m_shared_data{}; }; } // namespace rpp::schedulers::details diff --git a/src/rpp/rpp/schedulers/new_thread.hpp b/src/rpp/rpp/schedulers/new_thread.hpp index 6ceabe5b5..49361ef2c 100644 --- a/src/rpp/rpp/schedulers/new_thread.hpp +++ b/src/rpp/rpp/schedulers/new_thread.hpp @@ -32,21 +32,13 @@ namespace rpp::schedulers class disposable final : public rpp::details::base_disposable { public: - disposable() - { - // just waiting - while (!m_state->queue_ptr.load(std::memory_order::seq_cst)) - { - }; - } + disposable() = default; template Fn> void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) { - std::lock_guard lock{m_state->mutex}; - // guarded by lock - if (const auto queue = m_state->queue_ptr.load(std::memory_order::seq_cst)) - queue->emplace(time_point, std::forward(fn), std::forward(handler), std::forward(args)...); + m_state->queue.emplace(time_point, std::forward(fn), std::forward(handler), std::forward(args)...); + m_state->has_fresh_data.store(true); } private: @@ -57,7 +49,7 @@ namespace rpp::schedulers { std::lock_guard lock{m_state->mutex}; - m_state->is_disposed.store(true, std::memory_order::relaxed); + m_state->is_disposed = true; } m_state->cv.notify_all(); @@ -69,58 +61,70 @@ namespace rpp::schedulers struct state_t : public details::shared_queue_data { - std::atomic*> queue_ptr{}; - std::atomic_bool is_disposed{}; + details::schedulables_queue queue{}; + bool is_disposed{}; + std::atomic_bool has_fresh_data{false}; }; static void data_thread(std::shared_ptr state) { - auto& queue = current_thread::s_queue; - state->queue_ptr.store(&queue.emplace(state), std::memory_order::seq_cst); + current_thread::s_queue = &state->queue; while (true) { std::unique_lock lock{state->mutex}; - - if (queue->is_empty() && state->is_disposed.load(std::memory_order::seq_cst)) + if (state->queue.is_empty() && state->is_disposed) break; - state->cv.wait(lock, [&] { return !queue->is_empty() || state->is_disposed.load(std::memory_order::seq_cst); }); + state->cv.wait(lock, [&] { return !state->queue.is_empty() || state->is_disposed; }); - if (queue->is_empty()) + if (state->queue.is_empty()) break; - if (queue->top()->is_disposed()) + if (state->queue.top()->is_disposed()) { - queue->pop(); + state->queue.pop(); continue; } - if (details::s_last_now_time < queue->top()->get_timepoint()) + if (details::s_last_now_time < state->queue.top()->get_timepoint()) { - if (const auto now = worker_strategy::now(); now < queue->top()->get_timepoint()) + if (const auto now = worker_strategy::now(); now < state->queue.top()->get_timepoint()) { - state->cv.wait_for(lock, queue->top()->get_timepoint() - now, [&] { return queue->top()->is_disposed() || worker_strategy::now() >= queue->top()->get_timepoint(); }); + state->cv.wait_for(lock, state->queue.top()->get_timepoint() - now, [&] { return state->queue.top()->is_disposed() || worker_strategy::now() >= state->queue.top()->get_timepoint(); }); continue; } } - auto top = queue->pop(); + auto top = state->queue.pop(); + state->has_fresh_data.store(!state->queue.is_empty()); lock.unlock(); - if (const auto timepoint = (*top)()) - if (!top->is_disposed()) - queue->emplace(timepoint.value(), std::move(top)); + while (true) + { + if (const auto res = top->make_advanced_call()) + { + if (!top->is_disposed()) + { + if (res->can_run_immediately() && !state->has_fresh_data.load()) + continue; + + state->queue.emplace(top->handle_advanced_call(res.value()), std::move(top)); + } + } + break; + } } - std::unique_lock lock{state->mutex}; - state->queue_ptr.store(nullptr, std::memory_order::seq_cst); - queue.reset(); + current_thread::s_queue = nullptr; } private: std::shared_ptr m_state = std::make_shared(); - std::thread m_thread{&data_thread, m_state}; + + RPP_CALL_DURING_CONSTRUCTION(m_state->queue = details::schedulables_queue(m_state)); + + std::thread m_thread{&data_thread, m_state}; }; public: