Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions src/rpp/rpp/sources/interval.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ struct interval_schedulable
}
};

template<typename TScheduler>
template<typename TScheduler, typename TimePointOrDuration>
struct interval_strategy
{
using value_type = size_t;
using value_type = size_t;
using expected_disposable_strategy = std::conditional_t<rpp::schedulers::utils::get_worker_t<TScheduler>::is_none_disposable, rpp::details::observables::bool_disposable_strategy_selector, rpp::details::observables::fixed_disposable_strategy_selector<1>>;

RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
rpp::schedulers::duration initial;
TimePointOrDuration initial;
rpp::schedulers::duration period;

template<rpp::constraint::observer_of_type<value_type> TObs>
Expand All @@ -44,15 +44,23 @@ struct interval_strategy
if (auto d = worker.get_disposable(); !d.is_disposed())
observer.set_upstream(std::move(d));
}
worker.schedule(initial, interval_schedulable{}, std::forward<TObs>(observer), period, size_t{});

if constexpr (std::is_same_v<TimePointOrDuration, rpp::schedulers::time_point>)
{
worker.schedule(initial - worker.now(), interval_schedulable{}, std::forward<TObs>(observer), period, size_t{});
}
else
{
worker.schedule(initial, interval_schedulable{}, std::forward<TObs>(observer), period, size_t{});
}
}
};
}

namespace rpp
{
template<schedulers::constraint::scheduler TScheduler>
using interval_observable = observable<size_t, details::interval_strategy<TScheduler>>;
template<schedulers::constraint::scheduler TScheduler, typename TimePointOrDuration>
using interval_observable = observable<size_t, details::interval_strategy<TScheduler, TimePointOrDuration>>;
}

namespace rpp::source
Expand All @@ -66,7 +74,7 @@ namespace rpp::source
operator "interval(20s, 10s)": +--1-2-3-5->
}
*
* @param initial time before first emission
* @param initial duration before first emission
* @param period period between emitted values
* @param scheduler the scheduler to use for scheduling the items
*
Expand All @@ -79,7 +87,23 @@ namespace rpp::source
template<schedulers::constraint::scheduler TScheduler>
auto interval(rpp::schedulers::duration initial, rpp::schedulers::duration period, TScheduler&& scheduler)
{
return interval_observable<std::decay_t<TScheduler>>{std::forward<TScheduler>(scheduler), initial, period};
return interval_observable<std::decay_t<TScheduler>, rpp::schedulers::duration>{std::forward<TScheduler>(scheduler), initial, period};
}

/**
* @brief Same rpp::source::interval but using a time_point as initial time instead of a duration.
*
* @param initial time_point before first emission
* @param period period between emitted values
* @param scheduler the scheduler to use for scheduling the items
*
* @ingroup creational_operators
* @see https://reactivex.io/documentation/operators/interval.html
*/
template<schedulers::constraint::scheduler TScheduler>
auto interval(rpp::schedulers::time_point initial, rpp::schedulers::duration period, TScheduler&& scheduler)
{
return interval_observable<std::decay_t<TScheduler>, rpp::schedulers::time_point>{std::forward<TScheduler>(scheduler), initial, period};
}

/**
Expand Down
19 changes: 17 additions & 2 deletions src/rpp/rpp/sources/timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace rpp::source
operator "timer(1s)": +--0|
}
*
* @param when time point when the value is emitted
* @param when duration from now when the value is emitted
* @param scheduler the scheduler to use for scheduling the items
*
* @ingroup creational_operators
Expand All @@ -24,6 +24,21 @@ namespace rpp::source
template<schedulers::constraint::scheduler TScheduler>
auto timer(rpp::schedulers::duration when, TScheduler&& scheduler)
{
return interval(when, std::forward<TScheduler>(scheduler)) | operators::take(1);
return interval(when, rpp::schedulers::duration::zero(), std::forward<TScheduler>(scheduler)) | operators::take(1);
}

/**
* @brief Same as rpp::source::timer but using a time_point as delay instead of a duration.
*
* @param when time point when the value is emitted
* @param scheduler the scheduler to use for scheduling the items
*
* @ingroup creational_operators
* @see https://reactivex.io/documentation/operators/timer.html
*/
template<schedulers::constraint::scheduler TScheduler>
auto timer(rpp::schedulers::time_point when, TScheduler&& scheduler)
{
return interval(when, rpp::schedulers::duration::zero(), std::forward<TScheduler>(scheduler)) | operators::take(1);
}
}
62 changes: 42 additions & 20 deletions src/tests/rpp/test_interval.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@
#include <rpp/operators/as_blocking.hpp>
#include <rpp/operators/subscribe.hpp>
#include <rpp/operators/take.hpp>
#include <rpp/sources/interval.hpp>
#include <rpp/schedulers/current_thread.hpp>
#include <rpp/schedulers/immediate.hpp>
#include <rpp/schedulers/new_thread.hpp>
#include <rpp/sources/interval.hpp>

#include "mock_observer.hpp"
#include "test_scheduler.hpp"
#include "snitch_logging.hpp"
#include "test_scheduler.hpp"

#include <chrono>

TEST_CASE("interval emit values with provided interval")
{
auto scheduler = test_scheduler{};
auto mock = mock_observer_strategy<size_t>{};

SECTION("interval observable")
{
auto interval = std::chrono::seconds{1};
Expand All @@ -37,30 +38,34 @@ TEST_CASE("interval emit values with provided interval")
SECTION("subscribe on it via take 3")
{
obs | rpp::ops::take(3) | rpp::ops::subscribe(mock);

SECTION("nothing happens immediately till scheduler advanced")
{
CHECK(mock.get_received_values() == std::vector<size_t>{});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval});
CHECK(scheduler.get_schedulings() == std::vector{initial_time + interval});
CHECK(scheduler.get_executions().empty());
}

SECTION("move time in advance on interval once")
{
scheduler.time_advance(interval);

SECTION("observer obtains first value")
{
CHECK(mock.get_received_values() == std::vector<size_t>{0});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}

SECTION("interval schedules schedulable with provided interval")
{
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval,
initial_time + 2*interval});
CHECK(scheduler.get_executions() == std::vector{initial_time+interval});
CHECK(scheduler.get_schedulings() == std::vector{initial_time + interval, initial_time + 2 * interval});
CHECK(scheduler.get_executions() == std::vector{initial_time + interval});
}
}

SECTION("move time in advance on interval enough amount of time")
{
for (size_t i = 0; i < 5; ++i)
Expand All @@ -72,20 +77,17 @@ TEST_CASE("interval emit values with provided interval")
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}

SECTION("interval schedules schedulable with provided interval")
{
CHECK(scheduler.get_executions() == std::vector{ initial_time + interval,
initial_time + 2*interval,
initial_time + 3*interval});
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval,
initial_time + 2*interval,
initial_time + 3*interval});
CHECK(scheduler.get_executions() == std::vector{initial_time + interval, initial_time + 2 * interval, initial_time + 3 * interval});
CHECK(scheduler.get_schedulings() == std::vector{initial_time + interval, initial_time + 2 * interval, initial_time + 3 * interval});
}
}
}
}

SECTION("interval observable with initial delay")
SECTION("interval observable with initial delay duration")
{
auto initial_delay = std::chrono::seconds{2};
auto interval = std::chrono::seconds{1};
Expand All @@ -95,12 +97,13 @@ TEST_CASE("interval emit values with provided interval")
SECTION("subscribe on it via take 3")
{
obs | rpp::ops::take(3) | rpp::ops::subscribe(mock);

SECTION("nothing happens immediately till scheduler advanced")
{
CHECK(mock.get_received_values() == std::vector<size_t>{});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + initial_delay});
CHECK(scheduler.get_schedulings() == std::vector{initial_time + initial_delay});
CHECK(scheduler.get_executions().empty());
}

Expand All @@ -109,22 +112,41 @@ TEST_CASE("interval emit values with provided interval")
for (size_t i = 0; i < 5; ++i)
scheduler.time_advance(interval);
scheduler.time_advance(interval);

SECTION("observer obtains sequence of values")
{
CHECK(mock.get_received_values() == std::vector<size_t>{0, 1, 2});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}

SECTION("interval schedules schedulable with provided interval")
{
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + initial_delay,
initial_time + initial_delay + interval,
initial_time + initial_delay + 2 * interval });
CHECK(scheduler.get_executions() == std::vector{ initial_time + initial_delay,
initial_time + initial_delay + interval,
initial_time + initial_delay + 2 * interval });
CHECK(scheduler.get_schedulings() == std::vector{initial_time + initial_delay, initial_time + initial_delay + interval, initial_time + initial_delay + 2 * interval});
CHECK(scheduler.get_executions() == std::vector{initial_time + initial_delay, initial_time + initial_delay + interval, initial_time + initial_delay + 2 * interval});
}
}
}
}

SECTION("interval observable with initial delay time_point")
{
auto when = std::chrono::seconds{2};
auto initial_delay = scheduler.now() + when;
auto interval = std::chrono::seconds{1};
auto obs = rpp::source::interval(initial_delay, interval, scheduler);

SECTION("subscribe")
{
scheduler.time_advance(when * 2);
obs | rpp::ops::take(1) | rpp::ops::subscribe(mock);

SECTION("expect value as initial_delay time_point is in the past")
{
CHECK(mock.get_received_values() == std::vector<size_t>{0});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}
}
70 changes: 55 additions & 15 deletions src/tests/rpp/test_timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,85 @@

TEST_CASE("timer emit single value at provided duration")
{
auto scheduler = test_scheduler{};
auto mock = mock_observer_strategy<size_t>{};
auto scheduler = test_scheduler{};
auto scheduler2 = test_scheduler{};
auto mock = mock_observer_strategy<size_t>{};
auto mock2 = mock_observer_strategy<size_t>{};

SECTION("timer observable")
{
auto when = std::chrono::seconds{1};
auto obs = rpp::source::timer(when, scheduler);
auto initial_time = test_scheduler::worker_strategy::now();
auto when = std::chrono::seconds{1};
auto time_point = scheduler2.now() + when;
auto obs = rpp::source::timer(when, scheduler);
auto obs2 = rpp::source::timer(time_point, scheduler2);

SECTION("subscribe")
{
obs | rpp::ops::subscribe(mock);
obs2 | rpp::ops::subscribe(mock2);

SECTION("nothing happens immediately till scheduler advanced")
{
CHECK(mock.get_received_values() == std::vector<size_t>{});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
CHECK(scheduler.get_schedulings() == std::vector{initial_time + when});
CHECK(scheduler.get_executions().empty());
auto validate = [&](const auto& mock, const auto& scheduler) {
CHECK(mock.get_received_values() == std::vector<size_t>{});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);

CHECK(scheduler.get_schedulings().size() == 1);
CHECK(scheduler.get_executions().empty());
};
validate(mock, scheduler);
validate(mock2, scheduler2);
}

SECTION("advance time")
{
scheduler.time_advance(when);
scheduler2.time_advance(when);

SECTION("observer obtains value")
{
CHECK(mock.get_received_values() == std::vector<size_t>{0});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
auto validate = [&](const auto& mock) {
CHECK(mock.get_received_values() == std::vector<size_t>{0});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
};
validate(mock);
validate(mock2);
}

SECTION("timer schedules schedulable with provided interval")
{
CHECK(scheduler.get_schedulings() == std::vector{initial_time + when});
CHECK(scheduler.get_executions() == std::vector{initial_time + when});
CHECK(scheduler.get_executions().size() == 1);
CHECK(scheduler2.get_executions().size() == 1);
}
}
}
}
}

TEST_CASE("timer emit single value at provided time_point")
{
auto scheduler = test_scheduler{};
auto mock = mock_observer_strategy<size_t>{};

SECTION("timer observable")
{
auto when = std::chrono::seconds{1};
auto time_point = scheduler.now() + when;
auto obs = rpp::source::timer(time_point, scheduler);

SECTION("subscribe")
{
scheduler.time_advance(when * 2);
obs | rpp::ops::subscribe(mock);

SECTION("expect value as time_point is in the past")
{
CHECK(mock.get_received_values() == std::vector<size_t>{0});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}
}