Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/extensions/rppqt/rppqt/schedulers/main_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ namespace rppqt::schedulers
});
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }

static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); }

private:
Expand Down
7 changes: 1 addition & 6 deletions src/rpp/rpp/operators/debounce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ namespace rpp::operators::details
, m_worker{std::move(in_worker)}
, m_period{period}
{
if constexpr (!Worker::is_none_disposable)
{
if (auto d = m_worker.get_disposable(); !d.is_disposed())
rpp::composite_disposable_impl<Container>::add(std::move(d));
}
}

template<typename TT>
Expand Down Expand Up @@ -172,7 +167,7 @@ namespace rpp::operators::details
auto lift_with_disposable_strategy(Observer&& observer) const
{
using worker_t = rpp::schedulers::utils::get_worker_t<Scheduler>;
using container = typename DisposableStrategy::template add<worker_t::is_none_disposable ? 0 : 1>::disposable_container;
using container = typename DisposableStrategy::disposable_container;

const auto disposable = disposable_wrapper_impl<debounce_disposable<std::decay_t<Observer>, worker_t, container>>::make(std::forward<Observer>(observer), scheduler.create_worker(), duration);
auto ptr = disposable.lock();
Expand Down
7 changes: 1 addition & 6 deletions src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ namespace rpp::operators::details
, worker{std::move(in_worker)}
, delay{delay}
{
if constexpr (!Worker::is_none_disposable)
{
if (auto d = worker.get_disposable(); !d.is_disposed())
rpp::composite_disposable_impl<Container>::add(std::move(d));
}
}

RPP_NO_UNIQUE_ADDRESS Observer observer;
Expand Down Expand Up @@ -186,7 +181,7 @@ namespace rpp::operators::details
auto lift_with_disposable_strategy(Observer&& observer) const
{
using worker_t = rpp::schedulers::utils::get_worker_t<Scheduler>;
using container = typename DisposableStrategy::template add<worker_t::is_none_disposable ? 0 : 1>::disposable_container;
using container = typename DisposableStrategy::disposable_container;

const auto disposable = disposable_wrapper_impl<delay_disposable<std::decay_t<Observer>, worker_t, container>>::make(std::forward<Observer>(observer), scheduler.create_worker(), duration);
auto ptr = disposable.lock();
Expand Down
7 changes: 1 addition & 6 deletions src/rpp/rpp/operators/subscribe_on.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,14 @@ namespace rpp::operators::details
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = typename Prev::template add<rpp::schedulers::utils::get_worker_t<TScheduler>::is_none_disposable ? 0 : 1>;
using updated_disposable_strategy = Prev;

RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;

template<rpp::constraint::observer Observer, typename... Strategies>
void subscribe(Observer&& observer, const rpp::details::observables::chain<Strategies...>& observable_strategy) const
{
const auto worker = scheduler.create_worker();
if constexpr (!rpp::schedulers::utils::get_worker_t<TScheduler>::is_none_disposable)
{
if (auto d = worker.get_disposable(); !d.is_disposed())
observer.set_upstream(std::move(d));
}
worker.schedule(subscribe_on_schedulable<rpp::details::observables::chain<Strategies...>>{observable_strategy}, std::forward<Observer>(observer));
}
};
Expand Down
12 changes: 3 additions & 9 deletions src/rpp/rpp/operators/timeout.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,16 @@ namespace rpp::operators::details
auto lift_with_disposable_strategy(Observer&& observer) const
{
using worker_t = rpp::schedulers::utils::get_worker_t<TScheduler>;
using container = typename DisposableStrategy::template add<worker_t::is_none_disposable ? 0 : 1>::disposable_container;
using container = typename DisposableStrategy::disposable_container;

const auto timeout = rpp::schedulers::utils::get_worker_t<TScheduler>::now() + period;
const auto timeout = worker_t::now() + period;

const auto disposable = disposable_wrapper_impl<timeout_disposable<std::decay_t<Observer>, TFallbackObservable, container>>::make(std::forward<Observer>(observer), period, fallback, timeout);
auto ptr = disposable.lock();
ptr->get_observer_with_timeout_under_lock()->observer.set_upstream(disposable.as_weak());

const auto worker = scheduler.create_worker();
if constexpr (!rpp::schedulers::utils::get_worker_t<TScheduler>::is_none_disposable)
{
if (auto d = worker.get_disposable(); !d.is_disposed())
disposable.add(std::move(d));
}

using wrapper = timeout_disposable_wrapper<std::decay_t<Observer>, TFallbackObservable, container>;
using wrapper = timeout_disposable_wrapper<std::decay_t<Observer>, TFallbackObservable, container>;
worker.schedule(
timeout,
[](wrapper& handler) -> rpp::schedulers::optional_delay_to {
Expand Down
2 changes: 0 additions & 2 deletions src/rpp/rpp/schedulers/current_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@ namespace rpp::schedulers
}
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }

static rpp::schedulers::time_point now() { return details::now(); }
};

Expand Down
10 changes: 0 additions & 10 deletions src/rpp/rpp/schedulers/details/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,8 @@ namespace rpp::schedulers
schedule(tp - now(), std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

rpp::disposable_wrapper get_disposable() const
{
if constexpr (is_none_disposable)
return disposable_wrapper::empty();
else
return m_strategy.get_disposable();
}

static rpp::schedulers::time_point now() { return Strategy::now(); }

static constexpr bool is_none_disposable = std::same_as<decltype(std::declval<Strategy>().get_disposable()), rpp::schedulers::details::none_disposable>;

private:
RPP_NO_UNIQUE_ADDRESS Strategy m_strategy;
};
Expand Down
11 changes: 3 additions & 8 deletions src/rpp/rpp/schedulers/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ namespace rpp::schedulers::details

static void on_error(const std::exception_ptr&) {}
};

struct none_disposable
{
};
} // namespace rpp::schedulers::details

namespace rpp::schedulers::constraint
Expand Down Expand Up @@ -148,10 +144,9 @@ namespace rpp::schedulers::constraint
};

template<typename S>
concept strategy = (defer_for_strategy<S> || defer_to_strategy<S>) && requires(const S& s, const details::fake_schedulable_handler& handler) {
{
s.get_disposable()
} -> rpp::constraint::any_of<rpp::disposable_wrapper, details::none_disposable>;
concept strategy = (defer_for_strategy<S> || defer_to_strategy<S>)&&
requires ()
{
{
S::now()
} -> std::same_as<rpp::schedulers::time_point>;
Expand Down
2 changes: 0 additions & 2 deletions src/rpp/rpp/schedulers/immediate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ namespace rpp::schedulers
details::immediate_scheduling_while_condition<worker_strategy>(duration, rpp::utils::return_true{}, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }

static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); }
};

Expand Down
42 changes: 20 additions & 22 deletions src/rpp/rpp/schedulers/new_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,19 @@ namespace rpp::schedulers
*/
class new_thread
{
class disposable final : public rpp::details::base_disposable
class state_t final
{
public:
disposable() = default;
state_t() = default;

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
{
m_state->queue.emplace(time_point, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
m_state->has_fresh_data.store(true);
}

private:
void base_dispose_impl(interface_disposable::Mode) noexcept override
~state_t() noexcept
{
if (!m_thread.joinable())
return;

{
std::lock_guard lock{m_state->mutex};
m_state->is_disposed = true;
m_state->is_stoping = true;
}
m_state->cv.notify_all();

Expand All @@ -59,24 +51,32 @@ namespace rpp::schedulers
m_thread.detach();
}

struct state_t : public details::shared_queue_data
template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
{
m_state->queue.emplace(time_point, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
m_state->has_fresh_data.store(true);
}

private:
struct queue_data : public details::shared_queue_data
{
details::schedulables_queue<current_thread::worker_strategy> queue{};
bool is_disposed{};
bool is_stoping{};
std::atomic_bool has_fresh_data{false};
};

static void data_thread(std::shared_ptr<state_t> state)
static void data_thread(std::shared_ptr<queue_data> state)
{
current_thread::get_queue() = &state->queue;

while (true)
{
std::unique_lock lock{state->mutex};
if (state->queue.is_empty() && state->is_disposed)
if (state->queue.is_empty() && state->is_stoping)
break;

state->cv.wait(lock, [&] { return !state->queue.is_empty() || state->is_disposed; });
state->cv.wait(lock, [&] { return !state->queue.is_empty() || state->is_stoping; });

if (state->queue.is_empty())
break;
Expand Down Expand Up @@ -120,7 +120,7 @@ namespace rpp::schedulers
}

private:
std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
std::shared_ptr<queue_data> m_state = std::make_shared<queue_data>();

RPP_CALL_DURING_CONSTRUCTION(m_state->queue = details::schedulables_queue<current_thread::worker_strategy>(m_state));

Expand All @@ -136,15 +136,13 @@ namespace rpp::schedulers
template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const
{
m_state.lock()->defer_to(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
m_state->defer_to(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

rpp::disposable_wrapper get_disposable() const { return m_state; }

static rpp::schedulers::time_point now() { return details::now(); }

private:
disposable_wrapper_impl<disposable> m_state = disposable_wrapper_impl<disposable>::make();
std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
};

static rpp::schedulers::worker<worker_strategy> create_worker()
Expand Down
2 changes: 0 additions & 2 deletions src/rpp/rpp/schedulers/run_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ namespace rpp::schedulers
shared->emplace_and_notify(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }

static rpp::schedulers::time_point now() { return details::now(); }

private:
Expand Down
41 changes: 15 additions & 26 deletions src/rpp/rpp/schedulers/test_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

#pragma once

#include <rpp/disposables.hpp>
#include <rpp/schedulers.hpp>

namespace rpp::schedulers
Expand All @@ -23,16 +22,13 @@ namespace rpp::schedulers

class worker_strategy;

struct state : public rpp::details::base_disposable
struct state
{
state() = default;

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, rpp::schedulers::constraint::schedulable_fn<Handler, Args...> Fn>
void schedule(rpp::schedulers::time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
{
if (is_disposed())
return;

schedulings.push_back(time_point);
queue.emplace(time_point, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}
Expand All @@ -53,17 +49,15 @@ namespace rpp::schedulers
executions.push_back(s_current_time);
if (auto new_timepoint = (*fn)())
{
if (!is_disposed())
{
schedulings.push_back(std::max(s_current_time, new_timepoint.value()));
queue.emplace(schedulings.back(), std::move(fn));
}
if (fn->is_disposed())
continue;

schedulings.push_back(std::max(s_current_time, new_timepoint.value()));
queue.emplace(schedulings.back(), std::move(fn));
}
}
}

void base_dispose_impl(interface_disposable::Mode) noexcept override {}

std::vector<rpp::schedulers::time_point> schedulings{};
std::vector<rpp::schedulers::time_point> executions{};
rpp::schedulers::details::schedulables_queue<worker_strategy> queue{};
Expand All @@ -72,7 +66,7 @@ namespace rpp::schedulers
class worker_strategy
{
public:
worker_strategy(rpp::disposable_wrapper_impl<state> state)
worker_strategy(std::weak_ptr<state> state)
: m_state{std::move(state)}
{
}
Expand All @@ -82,41 +76,36 @@ namespace rpp::schedulers
{
if (auto locked = m_state.lock())
{
if (!locked->is_disposed())
{
locked->schedule(now() + duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
locked->drain();
}
locked->schedule(now() + duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
locked->drain();
}
}

static rpp::schedulers::time_point now() { return s_current_time; }

rpp::disposable_wrapper get_disposable() const { return m_state.as_weak(); }

private:
rpp::disposable_wrapper_impl<state> m_state;
std::weak_ptr<state> m_state;
};

test_scheduler() = default;

rpp::schedulers::worker<worker_strategy> create_worker() const
{
return rpp::schedulers::worker<worker_strategy>{m_state.as_weak()};
return rpp::schedulers::worker<worker_strategy>{m_state};
}

const auto& get_schedulings() const { return m_state.lock()->schedulings; }
const auto& get_executions() const { return m_state.lock()->executions; }
const auto& get_schedulings() const { return m_state->schedulings; }
const auto& get_executions() const { return m_state->executions; }

static rpp::schedulers::time_point now() { return s_current_time; }

void time_advance(rpp::schedulers::duration dur) const
{
s_current_time += dur;
m_state.lock()->drain();
m_state->drain();
}

private:
rpp::disposable_wrapper_impl<state> m_state = rpp::disposable_wrapper_impl<state>::make();
std::shared_ptr<state> m_state = std::make_shared<state>();
};
} // namespace rpp::schedulers
3 changes: 1 addition & 2 deletions src/rpp/rpp/schedulers/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ namespace rpp::schedulers
m_original_worker.schedule(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }
static rpp::schedulers::time_point now() { return original_worker::now(); }
static rpp::schedulers::time_point now() { return original_worker::now(); }

private:
original_worker m_original_worker;
Expand Down
Loading