Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions old_build/_deps/snitch-src
Submodule snitch-src added at 2f6230
6 changes: 3 additions & 3 deletions src/rpp/rpp/operators/debounce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace rpp::operators::details
{
std::lock_guard lock{m_mutex};
m_value_to_be_emitted.emplace(std::forward<TT>(v));
const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value();
const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value();
m_time_when_value_should_be_emitted = m_worker.now() + m_period;
if (need_to_scheduled)
{
Expand All @@ -75,7 +75,7 @@ namespace rpp::operators::details
void schedule()
{
m_worker.schedule(
m_period,
m_time_when_value_should_be_emitted.value(),
[](const debounce_disposable_wrapper<Observer, Worker, Container>& handler) -> schedulers::optional_delay_to {
auto value_or_duration = handler.disposable->extract_value_or_time();
if (auto* timepoint = std::get_if<schedulers::time_point>(&value_or_duration))
Expand Down Expand Up @@ -211,4 +211,4 @@ namespace rpp::operators
{
return details::debounce_t<std::decay_t<Scheduler>>{period, std::forward<Scheduler>(scheduler)};
}
} // namespace rpp::operators
} // namespace rpp::operators
11 changes: 6 additions & 5 deletions src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,17 @@ namespace rpp::operators::details
template<typename TT>
void emplace(TT&& value) const
{
if (const auto delay = emplace_safe(std::forward<TT>(value)))
if (const auto tp = emplace_safe(std::forward<TT>(value)))
{
disposable->worker.schedule(
delay.value(),
tp.value(),
[](const delay_disposable_wrapper<Observer, Worker, Container>& wrapper) { return drain_queue(wrapper.disposable); },
delay_disposable_wrapper<Observer, Worker, Container>{disposable});
}
}

template<typename TT>
std::optional<rpp::schedulers::duration> emplace_safe(TT&& item) const
std::optional<rpp::schedulers::time_point> emplace_safe(TT&& item) const
{
std::lock_guard lock{disposable->mutex};
if constexpr (ClearOnError && rpp::constraint::decayed_same_as<std::exception_ptr, TT>)
Expand All @@ -127,11 +127,12 @@ namespace rpp::operators::details
}
else
{
disposable->queue.emplace(std::forward<TT>(item), disposable->worker.now() + disposable->delay);
const auto tp = disposable->worker.now() + disposable->delay;
disposable->queue.emplace(std::forward<TT>(item), tp);
if (!disposable->is_active)
{
disposable->is_active = true;
return disposable->delay;
return tp;
}
return std::nullopt;
}
Expand Down
16 changes: 16 additions & 0 deletions src/rpp/rpp/schedulers/current_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,22 @@ namespace rpp::schedulers
drain_queue(queue);
}

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
static void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args)
{
if (handler.is_disposed())
return;

if (s_queue.has_value())
{
s_queue->emplace(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}
else
{
defer_for(tp - now(), std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }

static rpp::schedulers::time_point now() { return details::now(); }
Expand Down
14 changes: 13 additions & 1 deletion src/rpp/rpp/schedulers/details/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,19 @@ namespace rpp::schedulers
template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void schedule(const duration delay, Fn&& fn, Handler&& handler, Args&&... args) const
{
m_strategy.defer_for(delay, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
if constexpr (constraint::defer_for_strategy<Strategy>)
m_strategy.defer_for(delay, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
else
schedule(now() + delay, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void schedule(const time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const
{
if constexpr (constraint::defer_to_strategy<Strategy>)
m_strategy.defer_to(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
else
schedule(tp - now(), std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

rpp::disposable_wrapper get_disposable() const
Expand Down
19 changes: 18 additions & 1 deletion src/rpp/rpp/schedulers/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ namespace rpp::schedulers::constraint
};

template<typename S>
concept strategy = requires(const S& s, const details::fake_schedulable_handler& handler) {
concept defer_for_strategy = requires(const S& s, const details::fake_schedulable_handler& handler) {
{
s.defer_for(duration{}, std::declval<optional_delay_from_now (*)(const details::fake_schedulable_handler&)>(), handler)
} -> std::same_as<void>;
Expand All @@ -132,6 +132,23 @@ namespace rpp::schedulers::constraint
{
s.defer_for(duration{}, std::declval<optional_delay_to (*)(const details::fake_schedulable_handler&)>(), handler)
} -> std::same_as<void>;
};

template<typename S>
concept defer_to_strategy = requires(const S& s, const details::fake_schedulable_handler& handler) {
{
s.defer_to(time_point{}, std::declval<optional_delay_from_now (*)(const details::fake_schedulable_handler&)>(), handler)
} -> std::same_as<void>;
{
s.defer_to(time_point{}, std::declval<optional_delay_from_this_timepoint (*)(const details::fake_schedulable_handler&)>(), handler)
} -> std::same_as<void>;
{
s.defer_to(time_point{}, std::declval<optional_delay_to (*)(const details::fake_schedulable_handler&)>(), handler)
} -> std::same_as<void>;
};

template<typename S>
concept strategy = (defer_for_strategy<S> || defer_to_strategy<S>)&&requires(const S& s, const details::fake_schedulable_handler& handler) {
{
s.get_disposable()
} -> rpp::constraint::any_of<rpp::disposable_wrapper, details::none_disposable>;
Expand Down
6 changes: 3 additions & 3 deletions src/rpp/rpp/schedulers/new_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ namespace rpp::schedulers
}

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void defer_at(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
{
if (is_disposed())
return;
Expand Down Expand Up @@ -142,9 +142,9 @@ namespace rpp::schedulers
worker_strategy() = default;

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args) const
void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const
{
m_state.lock()->defer_at(now() + duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
m_state.lock()->defer_to(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

rpp::disposable_wrapper get_disposable() const { return m_state; }
Expand Down
4 changes: 2 additions & 2 deletions src/rpp/rpp/schedulers/run_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ namespace rpp::schedulers
}

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args) const
void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const
{
if (const auto shared = m_state.lock())
shared->emplace_and_notify(now() + duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
shared->emplace_and_notify(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }
Expand Down
9 changes: 1 addition & 8 deletions src/rpp/rpp/sources/interval.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,7 @@ namespace rpp::details
observer.set_upstream(std::move(d));
}

if constexpr (std::is_same_v<TimePointOrDuration, rpp::schedulers::time_point>)
{
worker.schedule(initial - worker.now(), interval_schedulable{}, std::forward<TObs>(observer), period, size_t{});
}
else
{
worker.schedule(initial, interval_schedulable{}, std::forward<TObs>(observer), period, size_t{});
}
worker.schedule(initial, interval_schedulable{}, std::forward<TObs>(observer), period, size_t{});
}
};
} // namespace rpp::details
Expand Down