Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 123 additions & 49 deletions src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,138 @@
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <rpp/operators/fwd/delay.hpp> // own forwarding
#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
#include <rpp/utils/overloaded.hpp>

#include <variant>

IMPLEMENTATION_FILE(delay_tag);

namespace rpp::details
{
/**
* Functor (type-erasure) of "delay" for on_next operator.
*/
struct delay_on_next
struct completion {};

template<typename T, typename Subscriber, typename Worker>
class queue_based_worker final : public std::enable_shared_from_this<queue_based_worker<T, Subscriber, Worker>>
{
schedulers::duration delay;
public:
queue_based_worker(schedulers::duration delay, Worker&& worker, const Subscriber& subscriber)
: m_delay{delay}
, m_worker{std::move(worker)}
, m_subscriber{subscriber} {}

void operator()(auto&& value, const auto& subscriber, const auto& worker) const
queue_based_worker(schedulers::duration delay, Worker&& worker, Subscriber&& subscriber)
: m_delay{delay}
, m_worker{std::move(worker)}
, m_subscriber{std::move(subscriber)} {}

struct on_next
{
worker.schedule(delay,
[value = std::forward<decltype(value)>(value), subscriber]()
{
subscriber.on_next(std::move(value));
return schedulers::optional_duration{};
});
}
};
void operator()(auto&& value, const std::shared_ptr<queue_based_worker<T, Subscriber, Worker>>& state) const
{
state->emplace(std::forward<decltype(value)>(value));
}
};

/**
* Functor (type-erasure) of "delay" for on_error operator.
*/
struct delay_on_error
{
void operator()(const std::exception_ptr& err, const auto& subscriber, const auto& worker) const
struct on_error
{
// on-error must be delivered as soon as possible
worker.schedule([err, subscriber]()
{
subscriber.on_error(err);
return schedulers::optional_duration{};
});
void operator()(const std::exception_ptr& err, const std::shared_ptr<queue_based_worker<T, Subscriber, Worker>>& state) const
{
state->emplace(err);
}
};

struct on_completed
{
void operator()(const std::shared_ptr<queue_based_worker<T, Subscriber, Worker>>& state) const
{
state->emplace(completion{});
}
};

private:
template<typename TT>
void emplace(TT&& item)
{
if (const auto timepoint = emplace_safe(std::forward<TT>(item)))
{
m_worker.schedule(timepoint.value(),
[state = this->shared_from_this()]()-> schedulers::optional_duration
{
return state->drain_queue();
});
}
}
};

/**
* Functor (type-erasure) of "delay" for on_completed operator.
*/
struct delay_on_completed
{
schedulers::duration delay;
template<typename TT>
std::optional<schedulers::time_point> emplace_safe(TT&& item)
{
std::lock_guard lock{m_mutex};
const auto delay = std::is_same_v<std::exception_ptr, std::decay_t<TT>> ? schedulers::duration{0} : m_delay;
m_queue.emplace(++m_current_id, m_worker.now()+delay, std::forward<TT>(item));
if (!m_active && m_queue.size() == 1)
{
m_active = true;
return m_queue.top().time;
}
return {};
}

void operator()(const auto& subscriber, const auto& worker) const
schedulers::optional_duration drain_queue()
{
worker.schedule(delay,
[subscriber]()
{
subscriber.on_completed();
return schedulers::optional_duration{};
});
while (true)
{
std::unique_lock lock{m_mutex};
if (m_queue.empty())
{
m_active = false;
return {};
}

auto& top = m_queue.top();
const auto now = m_worker.now();
if (top.time > now)
return top.time - now;

auto item = std::move(top.item);
m_queue.pop();
lock.unlock();

std::visit(utils::overloaded
{
[&](T&& v) { m_subscriber.on_next(std::move(v)); },
[&](const std::exception_ptr& err) { m_subscriber.on_error(err); },
[&](completion) { m_subscriber.on_completed(); }
},
std::move(item));
}
}

private:
struct emission
{
template<typename TT>
emission(size_t id, schedulers::time_point time, TT&& item)
: id{id}
, time{std::move(time)}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ performance-move-const-arg ⚠️
std::move of the variable time of the trivially-copyable type schedulers::time_point (aka time_point<std::chrono::steady_clock, duration<long, ratio<1, 1000000000>>>) has no effect; remove std::move()

Suggested change
, time{std::move(time)}
, time{time)}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ performance-move-const-arg ⚠️
std::move of the variable time of the trivially-copyable type schedulers::time_point (aka time_point<std::chrono::steady_clock, duration<long, ratio<1, 1000000000>>>) has no effect; remove std::move()

Suggested change
, time{std::move(time)}
, time{std::move(time}

, item{std::forward<TT>(item)} {}

size_t id{};
schedulers::time_point time{};
std::variant<T, std::exception_ptr, completion> item{};

bool operator<(const emission& other) const { return std::tie(time, id) >= std::tie(other.time, other.id); }
};

schedulers::duration m_delay;
Worker m_worker;
Subscriber m_subscriber;
std::mutex m_mutex{};
size_t m_current_id{};
std::priority_queue<emission> m_queue{};
bool m_active{};
};

/**
* \brief Functor of OperatorFn for "combine_latest" operator (used by "lift").
*/

template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
struct delay_impl
{
Expand All @@ -86,14 +158,16 @@ struct delay_impl
auto operator()(TSub&& subscriber) const
{
auto worker = scheduler.create_worker(subscriber.get_subscription());

auto subscription = subscriber.get_subscription().make_child();

using state_t = queue_based_worker<Type, std::decay_t<TSub>, std::decay_t<decltype(worker)>>;
auto state = std::make_shared<state_t>(delay, std::move(worker), std::forward<TSub>(subscriber));

return create_subscriber_with_state<Type>(std::move(subscription),
delay_on_next{delay},
delay_on_error{},
delay_on_completed{delay},
std::forward<TSub>(subscriber),
std::move(worker));
typename state_t::on_next{},
typename state_t::on_error{},
typename state_t::on_completed{},
std::move(state));
}
};
} // namespace rpp::details
44 changes: 41 additions & 3 deletions src/tests/rpp/test_delay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
#include <catch2/catch_test_macros.hpp>
#include <rpp/operators/delay.hpp>
#include <rpp/schedulers/trampoline_scheduler.hpp>
#include <rpp/subjects/publish_subject.hpp>
#include <rpp/sources/empty.hpp>
#include <rpp/sources/error.hpp>
#include <rpp/sources/just.hpp>

SCENARIO("delay mirrors both source observable and trigger observable", "[delay]")
{
auto mock = mock_observer<int>{};
std::chrono::milliseconds delay_duration{300};

GIVEN("observable of -1-|")
{
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();

rpp::source::just(1)
Expand Down Expand Up @@ -60,7 +61,6 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay]

GIVEN("observable of -x")
{
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();

rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{""}))
Expand All @@ -87,7 +87,6 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay]

GIVEN("observable of -|")
{
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();

rpp::source::empty<int>()
Expand All @@ -111,4 +110,43 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay]
CHECK(mock.get_on_error_count() == 0);
}
}

GIVEN("subject with items")
{
auto subj = rpp::subjects::publish_subject<int>{};

WHEN("subscribe on subject via delay and doing recursive submit from another thread")
{
THEN("all values obtained in the same thread")
{
auto current_thread = std::this_thread::get_id();

auto sub = subj.get_observable()
.delay(delay_duration, rpp::schedulers::trampoline{})
.subscribe([&](int v)
{
CHECK(std::this_thread::get_id() == current_thread);

mock.on_next(v);

if (v == 1)
{
std::thread{[&]{subj.get_subscriber().on_next(2);}}.join();

THEN("no recursive on_next calls")
{
CHECK(mock.get_received_values() == std::vector{1});
}
}
});

subj.get_subscriber().on_next(1);

AND_THEN("all values obtained")
{
CHECK(mock.get_received_values() == std::vector{ 1, 2 });
}
}
}
}
}
Loading