diff --git a/src/examples/doxygen/delay.cpp b/src/examples/doxygen/delay.cpp index c347dbb47..5a6a6bc8d 100644 --- a/src/examples/doxygen/delay.cpp +++ b/src/examples/doxygen/delay.cpp @@ -11,26 +11,28 @@ int main() { //! [delay] + auto start = rpp::schedulers::clock_type::now(); + rpp::source::just(1, 2, 3) - .do_on_next([](auto&& v) + .do_on_next([&](auto&& v) { auto emitting_time = rpp::schedulers::clock_type::now(); - std::cout << "emit " << v << " in thread{" << std::this_thread::get_id() << "} at epoch time " << emitting_time.time_since_epoch().count() << std::endl; + std::cout << "emit " << v << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast(emitting_time - start).count() << "s"<< std::endl; }) .delay(std::chrono::seconds{3}, rpp::schedulers::new_thread{}) .as_blocking() - .subscribe([](int v) + .subscribe([&](int v) { auto observing_time = rpp::schedulers::clock_type::now(); - std::cout << "observe " << v << " in thread{" << std::this_thread::get_id() << "} at epoch time " << observing_time.time_since_epoch().count() << std::endl; + std::cout << "observe " << v << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast(observing_time - start).count() <<"s" << std::endl; }); // Template for output: - // emit 1 in thread{281472967355984} at epoch time 9302615113068 - // emit 2 in thread{281472967355984} at epoch time 9302615155151 - // emit 3 in thread{281472967355984} at epoch time 9302615157860 - // observe 1 in thread{281472962380144} at epoch time 9305618428153 - // observe 2 in thread{281472962380144} at epoch time 9305618551778 - // observe 3 in thread{281472962380144} at epoch time 9305618558236 + // emit 1 in thread{11772} duration since start 0s + // emit 2 in thread{11772} duration since start 0s + // emit 3 in thread{11772} duration since start 0s + // observe 1 in thread{15516} duration since start 3s + // observe 2 in thread{15516} duration since start 3s + // observe 3 in thread{15516} duration since start 3s //! [delay] return 0; } diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index fd849136d..ebf2b8d5f 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -12,32 +12,28 @@ #pragma once #include +#include // create_subscriber_with_state #include #include -#include - -#include // create_subscriber_with_state IMPLEMENTATION_FILE(delay_tag); namespace rpp::details { - /** * Functor (type-erasure) of "delay" for on_next operator. */ struct delay_on_next { - rpp::schedulers::duration delay; + schedulers::duration delay; - void operator()(auto &&value, - const auto &subscriber, - const auto &worker) const { + void operator()(auto&& value, const auto& subscriber, const auto& worker) const + { worker.schedule(delay, - [value = std::forward(value), subscriber]() -> rpp::schedulers::optional_duration + [value = std::forward(value), subscriber]() { subscriber.on_next(std::move(value)); - return std::nullopt; + return schedulers::optional_duration{}; }); } }; @@ -47,17 +43,13 @@ struct delay_on_next */ struct delay_on_error { - rpp::schedulers::duration delay; - - void operator()(const std::exception_ptr &err, - const auto &subscriber, - const auto& worker) const + void operator()(const std::exception_ptr& err, const auto& subscriber, const auto& worker) const { - worker.schedule(delay, - [err, subscriber]() -> rpp::schedulers::optional_duration + // on-error must be delivered as soon as possible + worker.schedule([err, subscriber]() { subscriber.on_error(err); - return std::nullopt; + return schedulers::optional_duration{}; }); } }; @@ -67,13 +59,12 @@ struct delay_on_error */ struct delay_on_completed { - rpp::schedulers::duration delay; + schedulers::duration delay; - void operator()(const auto& subscriber, - const auto& worker) const + void operator()(const auto& subscriber, const auto& worker) const { worker.schedule(delay, - [subscriber]() -> rpp::schedulers::optional_duration + [subscriber]() { subscriber.on_completed(); return schedulers::optional_duration{}; @@ -88,24 +79,22 @@ template TSub> auto operator()(TSub&& subscriber) const { // convert it to dynamic due to expected amount of copies == amount of items auto dynamic_subscriber = std::forward(subscriber).as_dynamic(); - auto subscription = dynamic_subscriber.get_subscription().make_child(); - auto worker = scheduler.create_worker(dynamic_subscriber.get_subscription()); + auto subscription = dynamic_subscriber.get_subscription().make_child(); return create_subscriber_with_state(std::move(subscription), delay_on_next{delay}, - delay_on_error{delay}, + delay_on_error{}, delay_on_completed{delay}, std::move(dynamic_subscriber), std::move(worker)); } }; - } // namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd/observe_on.hpp b/src/rpp/rpp/operators/fwd/observe_on.hpp index db438baa1..808f97d8b 100644 --- a/src/rpp/rpp/operators/fwd/observe_on.hpp +++ b/src/rpp/rpp/operators/fwd/observe_on.hpp @@ -13,7 +13,6 @@ #include #include - namespace rpp::details { struct observe_on_tag; @@ -43,13 +42,13 @@ struct member_overload template auto observe_on(TScheduler&& scheduler) const& requires is_header_included { - return cast_this()->template lift(observe_on_impl>{std::forward(scheduler)}); + return cast_this()->delay(schedulers::duration{0}, std::forward(scheduler)); } template auto observe_on(TScheduler&& scheduler) && requires is_header_included { - return move_this().template lift(observe_on_impl>{std::forward(scheduler)}); + return move_this().delay(schedulers::duration{0}, std::forward(scheduler)); } private: diff --git a/src/rpp/rpp/operators/observe_on.hpp b/src/rpp/rpp/operators/observe_on.hpp index 88bf92f32..85f381a0d 100644 --- a/src/rpp/rpp/operators/observe_on.hpp +++ b/src/rpp/rpp/operators/observe_on.hpp @@ -10,40 +10,9 @@ #pragma once -#include -#include #include -#include - -#include // create_subscriber_with_state IMPLEMENTATION_FILE(observe_on_tag); -namespace rpp::details -{ -using observe_on_on_next = delay_on_next; -using observe_on_on_error = delay_on_error; -using observe_on_on_completed = delay_on_completed; - -template -struct observe_on_impl -{ - RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; - - template TSub> - auto operator()(TSub&& subscriber) const - { - // convert it to dynamic due to expected amount of copies == amount of items - auto dynamic_subscriber = std::forward(subscriber).as_dynamic(); - - auto worker = scheduler.create_worker(dynamic_subscriber.get_subscription()); - - return create_subscriber_with_state(dynamic_subscriber.get_subscription().make_child(), - observe_on_on_next{rpp::schedulers::duration{0}}, - observe_on_on_error{rpp::schedulers::duration{0}}, - observe_on_on_completed{rpp::schedulers::duration{0}}, - std::move(dynamic_subscriber), - std::move(worker)); - } -}; -} // namespace rpp::details +// we just need delay to completed this one +#include diff --git a/src/tests/test_delay.cpp b/src/tests/test_delay.cpp index d62b88357..dfafa9ae7 100644 --- a/src/tests/test_delay.cpp +++ b/src/tests/test_delay.cpp @@ -8,47 +8,47 @@ // // Project home: https://github.com/victimsnino/ReactivePlusPlus -#include +#include "mock_observer.hpp" +#include #include #include #include #include #include -#include "mock_observer.hpp" - SCENARIO("delay mirrors both source observable and trigger observable", "[delay]") { std::chrono::milliseconds delay_duration{300}; GIVEN("observable of -1-|") { - auto mock = mock_observer{}; - const auto now = rpp::schedulers::clock_type::now(); + auto mock = mock_observer{}; + const auto now = rpp::schedulers::clock_type::now(); rpp::source::just(1) - .delay(delay_duration, rpp::schedulers::trampoline{}) - .as_blocking() - .subscribe([&](auto&& v) - { - THEN("should see event after the delay") + .delay(delay_duration, rpp::schedulers::trampoline{}) + .as_blocking() + .subscribe( + [&](auto&& v) { - CHECK(rpp::schedulers::clock_type::now() >= now + std::chrono::duration_cast(delay_duration)); - } + THEN("should see event after the delay") + { + CHECK(rpp::schedulers::clock_type::now() >= now + delay_duration); + } - mock.on_next(v); - }, - [&](const std::exception_ptr& err) { mock.on_error(err); }, - [&]() - { - THEN("should see event after the delay") + mock.on_next(v); + }, + [&](const std::exception_ptr& err) { mock.on_error(err); }, + [&]() { - CHECK(rpp::schedulers::clock_type::now() >= now + std::chrono::duration_cast(delay_duration)); - } + THEN("should see event after the delay") + { + CHECK(rpp::schedulers::clock_type::now() >= now + delay_duration); + } - mock.on_completed(); - }); + mock.on_completed(); + }); THEN("should see -1-|") { @@ -60,22 +60,22 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay] GIVEN("observable of -x") { - auto mock = mock_observer{}; - const auto now = rpp::schedulers::clock_type::now(); + auto mock = mock_observer{}; + const auto now = rpp::schedulers::clock_type::now(); rpp::source::error(std::make_exception_ptr(std::runtime_error{""})) - .delay(delay_duration, rpp::schedulers::trampoline{}) - .as_blocking() - .subscribe([&](auto&& v) { mock.on_next(v); }, - [&](const std::exception_ptr& err) - { - THEN("should see event after the delay") + .delay(delay_duration, rpp::schedulers::trampoline{}) + .as_blocking() + .subscribe([&](auto&& v) { mock.on_next(v); }, + [&](const std::exception_ptr& err) { - CHECK(rpp::schedulers::clock_type::now() >= now + std::chrono::duration_cast(delay_duration)); - } - mock.on_error(err); - }, - [&]() { mock.on_completed(); }); + THEN("should see event immediately") + { + CHECK(rpp::schedulers::clock_type::now() < now + delay_duration); + } + mock.on_error(err); + }, + [&]() { mock.on_completed(); }); THEN("should see -x after the delay") { @@ -87,22 +87,22 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay] GIVEN("observable of -|") { - auto mock = mock_observer{}; - const auto now = rpp::schedulers::clock_type::now(); + auto mock = mock_observer{}; + const auto now = rpp::schedulers::clock_type::now(); rpp::source::empty() - .delay(delay_duration, rpp::schedulers::trampoline{}) - .as_blocking() - .subscribe([&](auto&& v) { mock.on_next(v); }, - [&](const std::exception_ptr& err) { mock.on_error(err); }, - [&]() - { - THEN("should see event after delay") + .delay(delay_duration, rpp::schedulers::trampoline{}) + .as_blocking() + .subscribe([&](auto&& v) { mock.on_next(v); }, + [&](const std::exception_ptr& err) { mock.on_error(err); }, + [&]() { - CHECK(rpp::schedulers::clock_type::now() >= now + std::chrono::duration_cast(delay_duration)); - } - mock.on_completed(); - }); + THEN("should see event after delay") + { + CHECK(rpp::schedulers::clock_type::now() >= now + delay_duration); + } + mock.on_completed(); + }); THEN("should see -|") {