Skip to content
Merged
2 changes: 1 addition & 1 deletion CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"inherits" : ["ci-flags"],
"hidden": true,
"cacheVariables": {
"CMAKE_CXX_FLAGS": "/utf-8 /W4 /permissive- /volatile:iso /Zc:preprocessor /EHsc /Zc:__cplusplus /Zc:externConstexpr /Zc:throwingNew"
"CMAKE_CXX_FLAGS": "/utf-8 /W4 /permissive- /volatile:iso /Zc:preprocessor /EHsc /Zc:__cplusplus /Zc:externConstexpr /Zc:throwingNew /bigobj"
}
},
{
Expand Down
25 changes: 25 additions & 0 deletions src/benchmarks/benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,31 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
});
}


SECTION("from array of 1 - create + as_blocking + subscribe + new_thread")
{
std::array<int, 1> vals{123};
TEST_RPP([&]() {
(rpp::source::from_iterable(vals, rpp::schedulers::new_thread{}) | rpp::ops::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});

TEST_RXCPP([&]() {
(rxcpp::observable<>::iterate(vals, rxcpp::observe_on_new_thread()) | rxcpp::operators::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}

SECTION("from array of 1000 - create + as_blocking + subscribe + new_thread")
{
std::array<int, 1000> vals{};
TEST_RPP([&]() {
(rpp::source::from_iterable(vals, rpp::schedulers::new_thread{}) | rpp::ops::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});

TEST_RXCPP([&]() {
(rxcpp::observable<>::iterate(vals, rxcpp::observe_on_new_thread()) | rxcpp::operators::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}

SECTION("concat_as_source of just(1 immediate) create + subscribe")
{
TEST_RPP([&]() {
Expand Down
111 changes: 64 additions & 47 deletions src/rpp/rpp/schedulers/current_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ namespace rpp::schedulers
friend class new_thread;
class worker_strategy;

inline static thread_local std::optional<details::schedulables_queue<worker_strategy>> s_queue{};
inline static thread_local details::schedulables_queue<worker_strategy>* s_queue{};

struct is_queue_is_empty
{
Expand All @@ -97,38 +97,43 @@ namespace rpp::schedulers
};


static void drain_current_queue()
static void drain_queue() noexcept
{
drain_queue(s_queue);
}

static void drain_queue(std::optional<details::schedulables_queue<worker_strategy>>& queue)
{
while (!queue->is_empty())
while (s_queue && !s_queue->is_empty())
{
auto top = queue->pop();
auto top = s_queue->pop();
if (top->is_disposed())
continue;

std::optional<time_point> timepoint{top->get_timepoint()};
// immediate like scheduling
do
{
if (timepoint && !top->is_disposed())
details::sleep_until(top->get_timepoint());

if (top->is_disposed())
timepoint.reset();
else
timepoint = (*top)();

} while (queue->is_empty() && timepoint.has_value());
details::sleep_until(top->get_timepoint());

if (timepoint.has_value())
queue->emplace(timepoint.value(), std::move(top));
while (true)
{
if (const auto res = top->make_advanced_call())
{
if (!top->is_disposed())
{
if (s_queue->is_empty())
{
if (const auto d = std::get_if<delay_from_now>(&res->get()))
{
std::this_thread::sleep_for(d->value);
}
else
{
details::sleep_until(top->handle_advanced_call(res.value()));
}
continue;
}

s_queue->emplace(top->handle_advanced_call(res.value()), std::move(top));
}
}
break;
}
}

queue.reset();
s_queue = nullptr;
}

class worker_strategy
Expand All @@ -140,26 +145,20 @@ namespace rpp::schedulers
if (handler.is_disposed())
return;

auto& queue = s_queue;
const bool someone_owns_queue = queue.has_value();
std::optional<time_point> timepoint{};
if (!someone_owns_queue)
if (!s_queue)
{
queue.emplace();
details::schedulables_queue<worker_strategy> queue{};
s_queue = &queue;

timepoint = details::immediate_scheduling_while_condition<worker_strategy>(duration, is_queue_is_empty{queue.value()}, fn, handler, args...);
const auto timepoint = details::immediate_scheduling_while_condition<worker_strategy>(duration, is_queue_is_empty{queue}, fn, handler, args...);
if (!timepoint || handler.is_disposed())
return drain_queue(queue);
}
else
{
timepoint = now() + duration;
}
return drain_queue();

queue->emplace(timepoint.value(), std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
s_queue->emplace(timepoint.value(), std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
return drain_queue();
}

if (!someone_owns_queue)
drain_queue(queue);
s_queue->emplace(now() + duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
Expand All @@ -168,7 +167,7 @@ namespace rpp::schedulers
if (handler.is_disposed())
return;

if (s_queue.has_value())
if (s_queue)
{
s_queue->emplace(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}
Expand All @@ -183,15 +182,33 @@ namespace rpp::schedulers
static rpp::schedulers::time_point now() { return details::now(); }
};

public:
static rpp::utils::finally_action<void (*)()> own_queue_and_drain_finally_if_not_owned()
private:
class own_queue_guard
{
const bool someone_owns_queue = s_queue.has_value();
public:
own_queue_guard()
: m_clear_on_destruction{!s_queue}
{
if (m_clear_on_destruction)
s_queue = &m_queue;
}
~own_queue_guard()
{
if (m_clear_on_destruction)
drain_queue();
}
own_queue_guard(const own_queue_guard&) = delete;
own_queue_guard(own_queue_guard&&) = delete;

if (!someone_owns_queue)
s_queue.emplace();
private:
details::schedulables_queue<worker_strategy> m_queue{};
bool m_clear_on_destruction{};
};

return rpp::utils::finally_action{!someone_owns_queue ? &drain_current_queue : &rpp::utils::empty_function<>};
public:
static own_queue_guard own_queue_and_drain_finally_if_not_owned()
{
return own_queue_guard{};
}

static rpp::schedulers::worker<worker_strategy> create_worker()
Expand Down
109 changes: 84 additions & 25 deletions src/rpp/rpp/schedulers/details/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
#include <rpp/utils/tuple.hpp>
#include <rpp/utils/utils.hpp>

#include "rpp/utils/functors.hpp"

#include <condition_variable>
#include <exception>
#include <memory>
#include <mutex>
#include <optional>
#include <utility>
#include <variant>

namespace rpp::schedulers::details
{
Expand All @@ -36,8 +39,41 @@ namespace rpp::schedulers::details

virtual ~schedulable_base() noexcept = default;

virtual std::optional<time_point> operator()() noexcept = 0;
virtual bool is_disposed() const noexcept = 0;
virtual std::optional<time_point> operator()() noexcept = 0;

class advanced_call
{
public:
advanced_call(std::variant<delay_from_now, delay_from_this_timepoint, delay_to> data)
: m_data{data}
{
}

const std::variant<delay_from_now, delay_from_this_timepoint, delay_to>& get() const { return m_data; }

auto visit(const auto& fn) const
{
return std::visit(fn, m_data);
}

bool can_run_immediately() const noexcept
{
return visit(rpp::utils::overloaded{[](const delay_to&) {
return false;
},
[](const auto& d) {
return d.value == rpp::schedulers::duration::zero();
}});
}

private:
std::variant<delay_from_now, delay_from_this_timepoint, delay_to> m_data;
};

virtual std::optional<advanced_call> make_advanced_call() noexcept = 0;
virtual time_point handle_advanced_call(const advanced_call&) noexcept = 0;

virtual bool is_disposed() const noexcept = 0;

time_point get_timepoint() const { return m_time_point; }

Expand All @@ -54,6 +90,22 @@ namespace rpp::schedulers::details
m_next = std::move(next);
}

protected:
template<typename NowStrategy>
auto get_advanced_call_handler() const
{
return rpp::utils::overloaded{
[](const delay_from_now& v) {
return NowStrategy::now() + v.value;
},
[this](const delay_from_this_timepoint& v) {
return get_timepoint() + v.value;
},
[](const delay_to& v) {
return v.value;
}};
}

private:
std::shared_ptr<schedulable_base> m_next{};
time_point m_time_point;
Expand All @@ -77,21 +129,7 @@ namespace rpp::schedulers::details
try
{
if (const auto res = m_args.apply(m_fn))
{
if constexpr (constraint::schedulable_delay_from_now_fn<Fn, Handler, Args...>)
{
return NowStrategy::now() + res->value;
}
else if constexpr (constraint::schedulable_delay_to_fn<Fn, Handler, Args...>)
{
return res->value;
}
else
{
static_assert(constraint::schedulable_delay_from_this_timepoint_fn<Fn, Handler, Args...>);
return get_timepoint() + res->value;
}
}
return get_advanced_call_handler<NowStrategy>()(res.value());
}
catch (...)
{
Expand All @@ -100,6 +138,25 @@ namespace rpp::schedulers::details
return std::nullopt;
}

std::optional<advanced_call> make_advanced_call() noexcept override
{
try
{
if (const auto res = m_args.apply(m_fn))
return advanced_call{res.value()};
}
catch (...)
{
m_args.template get<0>().on_error(std::current_exception());
}
return std::nullopt;
}

time_point handle_advanced_call(const advanced_call& v) noexcept override
{
return v.visit(get_advanced_call_handler<NowStrategy>());
}

bool is_disposed() const noexcept override { return m_args.template get<0>().is_disposed(); }

private:
Expand Down Expand Up @@ -151,7 +208,7 @@ namespace rpp::schedulers::details
schedulables_queue& operator=(const schedulables_queue& other) = delete;
schedulables_queue& operator=(schedulables_queue&& other) noexcept = default;

schedulables_queue(std::shared_ptr<shared_queue_data> shared_data)
schedulables_queue(std::weak_ptr<shared_queue_data> shared_data)
: m_shared_data{std::move(shared_data)}
{
}
Expand All @@ -162,8 +219,6 @@ namespace rpp::schedulers::details
using schedulable_type = specific_schedulable<NowStrategy, std::decay_t<Fn>, std::decay_t<Handler>, std::decay_t<Args>...>;

emplace_impl(std::make_shared<schedulable_type>(timepoint, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...));
if (m_shared_data)
m_shared_data->cv.notify_all();
}

void emplace(const time_point& timepoint, std::shared_ptr<schedulable_base>&& schedulable)
Expand All @@ -173,8 +228,6 @@ namespace rpp::schedulers::details

schedulable->set_timepoint(timepoint);
emplace_impl(std::move(schedulable));
if (m_shared_data)
m_shared_data->cv.notify_all();
}

bool is_empty() const { return !m_head; }
Expand All @@ -193,7 +246,13 @@ namespace rpp::schedulers::details
void emplace_impl(std::shared_ptr<schedulable_base>&& schedulable)
{
// needed in case of new_thread and current_thread shares same queue
optional_mutex<std::recursive_mutex> mutex{m_shared_data ? &m_shared_data->mutex : nullptr};
const auto s = m_shared_data.lock();
const rpp::utils::finally_action _{[&] {
if (s)
s->cv.notify_one();
}};

optional_mutex<std::recursive_mutex> mutex{s ? &s->mutex : nullptr};
std::lock_guard lock{mutex};

if (!m_head || schedulable->get_timepoint() < m_head->get_timepoint())
Expand All @@ -215,7 +274,7 @@ namespace rpp::schedulers::details
}

private:
std::shared_ptr<schedulable_base> m_head{};
std::shared_ptr<shared_queue_data> m_shared_data{};
std::shared_ptr<schedulable_base> m_head{};
std::weak_ptr<shared_queue_data> m_shared_data{};
};
} // namespace rpp::schedulers::details
Loading