diff --git a/src/rpp/rpp/sources/interval.hpp b/src/rpp/rpp/sources/interval.hpp index 27ed2016e..ff8a0c9b1 100644 --- a/src/rpp/rpp/sources/interval.hpp +++ b/src/rpp/rpp/sources/interval.hpp @@ -25,14 +25,14 @@ struct interval_schedulable } }; -template +template struct interval_strategy { - using value_type = size_t; + using value_type = size_t; using expected_disposable_strategy = std::conditional_t::is_none_disposable, rpp::details::observables::bool_disposable_strategy_selector, rpp::details::observables::fixed_disposable_strategy_selector<1>>; RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; - rpp::schedulers::duration initial; + TimePointOrDuration initial; rpp::schedulers::duration period; template TObs> @@ -44,15 +44,23 @@ struct interval_strategy if (auto d = worker.get_disposable(); !d.is_disposed()) observer.set_upstream(std::move(d)); } - worker.schedule(initial, interval_schedulable{}, std::forward(observer), period, size_t{}); + + if constexpr (std::is_same_v) + { + worker.schedule(initial - worker.now(), interval_schedulable{}, std::forward(observer), period, size_t{}); + } + else + { + worker.schedule(initial, interval_schedulable{}, std::forward(observer), period, size_t{}); + } } }; } namespace rpp { -template -using interval_observable = observable>; +template +using interval_observable = observable>; } namespace rpp::source @@ -66,7 +74,7 @@ namespace rpp::source operator "interval(20s, 10s)": +--1-2-3-5-> } * - * @param initial time before first emission + * @param initial duration before first emission * @param period period between emitted values * @param scheduler the scheduler to use for scheduling the items * @@ -79,7 +87,23 @@ namespace rpp::source template auto interval(rpp::schedulers::duration initial, rpp::schedulers::duration period, TScheduler&& scheduler) { - return interval_observable>{std::forward(scheduler), initial, period}; + return interval_observable, rpp::schedulers::duration>{std::forward(scheduler), initial, period}; +} + +/** + * @brief Same rpp::source::interval but using a time_point as initial time instead of a duration. + * + * @param initial time_point before first emission + * @param period period between emitted values + * @param scheduler the scheduler to use for scheduling the items + * + * @ingroup creational_operators + * @see https://reactivex.io/documentation/operators/interval.html + */ +template +auto interval(rpp::schedulers::time_point initial, rpp::schedulers::duration period, TScheduler&& scheduler) +{ + return interval_observable, rpp::schedulers::time_point>{std::forward(scheduler), initial, period}; } /** diff --git a/src/rpp/rpp/sources/timer.hpp b/src/rpp/rpp/sources/timer.hpp index 6d2ab3a50..a091f34bb 100644 --- a/src/rpp/rpp/sources/timer.hpp +++ b/src/rpp/rpp/sources/timer.hpp @@ -15,7 +15,7 @@ namespace rpp::source operator "timer(1s)": +--0| } * - * @param when time point when the value is emitted + * @param when duration from now when the value is emitted * @param scheduler the scheduler to use for scheduling the items * * @ingroup creational_operators @@ -24,6 +24,21 @@ namespace rpp::source template auto timer(rpp::schedulers::duration when, TScheduler&& scheduler) { - return interval(when, std::forward(scheduler)) | operators::take(1); + return interval(when, rpp::schedulers::duration::zero(), std::forward(scheduler)) | operators::take(1); +} + +/** + * @brief Same as rpp::source::timer but using a time_point as delay instead of a duration. + * + * @param when time point when the value is emitted + * @param scheduler the scheduler to use for scheduling the items + * + * @ingroup creational_operators + * @see https://reactivex.io/documentation/operators/timer.html + */ +template +auto timer(rpp::schedulers::time_point when, TScheduler&& scheduler) +{ + return interval(when, rpp::schedulers::duration::zero(), std::forward(scheduler)) | operators::take(1); } } \ No newline at end of file diff --git a/src/tests/rpp/test_interval.cpp b/src/tests/rpp/test_interval.cpp index 62b23e58b..bcf4a505d 100644 --- a/src/tests/rpp/test_interval.cpp +++ b/src/tests/rpp/test_interval.cpp @@ -13,14 +13,14 @@ #include #include #include -#include #include #include #include +#include #include "mock_observer.hpp" -#include "test_scheduler.hpp" #include "snitch_logging.hpp" +#include "test_scheduler.hpp" #include @@ -28,6 +28,7 @@ TEST_CASE("interval emit values with provided interval") { auto scheduler = test_scheduler{}; auto mock = mock_observer_strategy{}; + SECTION("interval observable") { auto interval = std::chrono::seconds{1}; @@ -37,30 +38,34 @@ TEST_CASE("interval emit values with provided interval") SECTION("subscribe on it via take 3") { obs | rpp::ops::take(3) | rpp::ops::subscribe(mock); + SECTION("nothing happens immediately till scheduler advanced") { CHECK(mock.get_received_values() == std::vector{}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); - CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval}); + CHECK(scheduler.get_schedulings() == std::vector{initial_time + interval}); CHECK(scheduler.get_executions().empty()); } + SECTION("move time in advance on interval once") { scheduler.time_advance(interval); + SECTION("observer obtains first value") { CHECK(mock.get_received_values() == std::vector{0}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); } + SECTION("interval schedules schedulable with provided interval") { - CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval, - initial_time + 2*interval}); - CHECK(scheduler.get_executions() == std::vector{initial_time+interval}); + CHECK(scheduler.get_schedulings() == std::vector{initial_time + interval, initial_time + 2 * interval}); + CHECK(scheduler.get_executions() == std::vector{initial_time + interval}); } } + SECTION("move time in advance on interval enough amount of time") { for (size_t i = 0; i < 5; ++i) @@ -72,20 +77,17 @@ TEST_CASE("interval emit values with provided interval") CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); } + SECTION("interval schedules schedulable with provided interval") { - CHECK(scheduler.get_executions() == std::vector{ initial_time + interval, - initial_time + 2*interval, - initial_time + 3*interval}); - CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval, - initial_time + 2*interval, - initial_time + 3*interval}); + CHECK(scheduler.get_executions() == std::vector{initial_time + interval, initial_time + 2 * interval, initial_time + 3 * interval}); + CHECK(scheduler.get_schedulings() == std::vector{initial_time + interval, initial_time + 2 * interval, initial_time + 3 * interval}); } } } } - SECTION("interval observable with initial delay") + SECTION("interval observable with initial delay duration") { auto initial_delay = std::chrono::seconds{2}; auto interval = std::chrono::seconds{1}; @@ -95,12 +97,13 @@ TEST_CASE("interval emit values with provided interval") SECTION("subscribe on it via take 3") { obs | rpp::ops::take(3) | rpp::ops::subscribe(mock); + SECTION("nothing happens immediately till scheduler advanced") { CHECK(mock.get_received_values() == std::vector{}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); - CHECK(scheduler.get_schedulings() == std::vector{ initial_time + initial_delay}); + CHECK(scheduler.get_schedulings() == std::vector{initial_time + initial_delay}); CHECK(scheduler.get_executions().empty()); } @@ -109,22 +112,41 @@ TEST_CASE("interval emit values with provided interval") for (size_t i = 0; i < 5; ++i) scheduler.time_advance(interval); scheduler.time_advance(interval); + SECTION("observer obtains sequence of values") { CHECK(mock.get_received_values() == std::vector{0, 1, 2}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); } + SECTION("interval schedules schedulable with provided interval") { - CHECK(scheduler.get_schedulings() == std::vector{ initial_time + initial_delay, - initial_time + initial_delay + interval, - initial_time + initial_delay + 2 * interval }); - CHECK(scheduler.get_executions() == std::vector{ initial_time + initial_delay, - initial_time + initial_delay + interval, - initial_time + initial_delay + 2 * interval }); + CHECK(scheduler.get_schedulings() == std::vector{initial_time + initial_delay, initial_time + initial_delay + interval, initial_time + initial_delay + 2 * interval}); + CHECK(scheduler.get_executions() == std::vector{initial_time + initial_delay, initial_time + initial_delay + interval, initial_time + initial_delay + 2 * interval}); } } } } + + SECTION("interval observable with initial delay time_point") + { + auto when = std::chrono::seconds{2}; + auto initial_delay = scheduler.now() + when; + auto interval = std::chrono::seconds{1}; + auto obs = rpp::source::interval(initial_delay, interval, scheduler); + + SECTION("subscribe") + { + scheduler.time_advance(when * 2); + obs | rpp::ops::take(1) | rpp::ops::subscribe(mock); + + SECTION("expect value as initial_delay time_point is in the past") + { + CHECK(mock.get_received_values() == std::vector{0}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } } diff --git a/src/tests/rpp/test_timer.cpp b/src/tests/rpp/test_timer.cpp index 9de9ede82..a292768d8 100644 --- a/src/tests/rpp/test_timer.cpp +++ b/src/tests/rpp/test_timer.cpp @@ -19,45 +19,85 @@ TEST_CASE("timer emit single value at provided duration") { - auto scheduler = test_scheduler{}; - auto mock = mock_observer_strategy{}; + auto scheduler = test_scheduler{}; + auto scheduler2 = test_scheduler{}; + auto mock = mock_observer_strategy{}; + auto mock2 = mock_observer_strategy{}; SECTION("timer observable") { - auto when = std::chrono::seconds{1}; - auto obs = rpp::source::timer(when, scheduler); - auto initial_time = test_scheduler::worker_strategy::now(); + auto when = std::chrono::seconds{1}; + auto time_point = scheduler2.now() + when; + auto obs = rpp::source::timer(when, scheduler); + auto obs2 = rpp::source::timer(time_point, scheduler2); SECTION("subscribe") { obs | rpp::ops::subscribe(mock); + obs2 | rpp::ops::subscribe(mock2); SECTION("nothing happens immediately till scheduler advanced") { - CHECK(mock.get_received_values() == std::vector{}); - CHECK(mock.get_on_error_count() == 0); - CHECK(mock.get_on_completed_count() == 0); - CHECK(scheduler.get_schedulings() == std::vector{initial_time + when}); - CHECK(scheduler.get_executions().empty()); + auto validate = [&](const auto& mock, const auto& scheduler) { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + + CHECK(scheduler.get_schedulings().size() == 1); + CHECK(scheduler.get_executions().empty()); + }; + validate(mock, scheduler); + validate(mock2, scheduler2); } SECTION("advance time") { scheduler.time_advance(when); + scheduler2.time_advance(when); SECTION("observer obtains value") { - CHECK(mock.get_received_values() == std::vector{0}); - CHECK(mock.get_on_error_count() == 0); - CHECK(mock.get_on_completed_count() == 1); + auto validate = [&](const auto& mock) { + CHECK(mock.get_received_values() == std::vector{0}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + }; + validate(mock); + validate(mock2); } SECTION("timer schedules schedulable with provided interval") { - CHECK(scheduler.get_schedulings() == std::vector{initial_time + when}); - CHECK(scheduler.get_executions() == std::vector{initial_time + when}); + CHECK(scheduler.get_executions().size() == 1); + CHECK(scheduler2.get_executions().size() == 1); } } } } +} + +TEST_CASE("timer emit single value at provided time_point") +{ + auto scheduler = test_scheduler{}; + auto mock = mock_observer_strategy{}; + + SECTION("timer observable") + { + auto when = std::chrono::seconds{1}; + auto time_point = scheduler.now() + when; + auto obs = rpp::source::timer(time_point, scheduler); + + SECTION("subscribe") + { + scheduler.time_advance(when * 2); + obs | rpp::ops::subscribe(mock); + + SECTION("expect value as time_point is in the past") + { + CHECK(mock.get_received_values() == std::vector{0}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } } \ No newline at end of file