From 5549ba8676a07b4a142a42c425f7f7ee53d2b60b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 2 Dec 2022 23:14:43 +0300 Subject: [PATCH 1/2] add timeout with fallback observable --- docs/Implementation Status.md | 2 + src/examples/rpp/doxygen/timeout.cpp | 81 ++++++++++++++++++--------- src/rpp/rpp/operators/fwd/timeout.hpp | 42 ++++++++++++-- src/rpp/rpp/operators/timeout.hpp | 29 ++++++---- src/tests/rpp/test_timeout.cpp | 20 +++++++ 5 files changed, 131 insertions(+), 43 deletions(-) diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index 8f1849e3f..bbbac7ebb 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -136,6 +136,8 @@ - [x] do_on_error - [x] do_on_completed - [x] timeout + - [x] timeout + - [x] timeout with fallback observable ### Connectable diff --git a/src/examples/rpp/doxygen/timeout.cpp b/src/examples/rpp/doxygen/timeout.cpp index e7ef71281..a91f92235 100644 --- a/src/examples/rpp/doxygen/timeout.cpp +++ b/src/examples/rpp/doxygen/timeout.cpp @@ -7,36 +7,61 @@ **/ int main() { - //! [timeout] - rpp::subjects::publish_subject subj{}; - subj.get_observable() - .timeout(std::chrono::milliseconds{450}, rpp::schedulers::new_thread{}) - .subscribe([](int v) { std::cout << "new value " << v << std::endl; }, - [](std::exception_ptr err) - { - try - { - std::rethrow_exception(err); - } - catch (const std::exception& exc) + { + //! [timeout] + rpp::subjects::publish_subject subj{}; + subj.get_observable() + .timeout(std::chrono::milliseconds{450}, rpp::schedulers::new_thread{}) + .subscribe([](int v) { std::cout << "new value " << v << std::endl; }, + [](std::exception_ptr err) { - std::cout << "ERR: " << exc.what() << std::endl; - } - }, - []() { std::cout << "completed" << std::endl; }); - for (int i = 0; i < 10; ++i) + try + { + std::rethrow_exception(err); + } + catch (const std::exception& exc) + { + std::cout << "ERR: " << exc.what() << std::endl; + } + }, + []() { std::cout << "completed" << std::endl; }); + for (int i = 0; i < 10; ++i) + { + std::this_thread::sleep_for(std::chrono::milliseconds{i * 100}); + subj.get_subscriber().on_next(i); + } + + // Output: + // new value 0 + // new value 1 + // new value 2 + // new value 3 + // new value 4 + // ERR : Timeout reached + //! [timeout] + } { - std::this_thread::sleep_for(std::chrono::milliseconds{i * 100}); - subj.get_subscriber().on_next(i); + //! [timeout_fallback_obs] + rpp::subjects::publish_subject subj{}; + subj.get_observable() + .timeout(std::chrono::milliseconds{450}, rpp::source::just(100), rpp::schedulers::new_thread{}) + .subscribe([](int v) { std::cout << "new value " << v << std::endl; }, + []() { std::cout << "completed" << std::endl; }); + for (int i = 0; i < 10; ++i) + { + std::this_thread::sleep_for(std::chrono::milliseconds{i * 100}); + subj.get_subscriber().on_next(i); + } + + // Output: + //new value 0 + //new value 1 + //new value 2 + //new value 3 + //new value 4 + //new value 100 + //completed + //! [timeout_fallback_obs] } - - // Output: - // new value 0 - // new value 1 - // new value 2 - // new value 3 - // new value 4 - // ERR : Timeout reached - //! [timeout] return 0; } diff --git a/src/rpp/rpp/operators/fwd/timeout.hpp b/src/rpp/rpp/operators/fwd/timeout.hpp index 07dd51214..29ad846f3 100644 --- a/src/rpp/rpp/operators/fwd/timeout.hpp +++ b/src/rpp/rpp/operators/fwd/timeout.hpp @@ -11,7 +11,7 @@ #pragma once #include - +#include #include namespace rpp::details @@ -21,12 +21,44 @@ struct timeout_tag; namespace rpp::details { -template +template FallbackObs, schedulers::constraint::scheduler TScheduler> struct timeout_impl; template struct member_overload { + /** + * \brief Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission) + * + * \marble timeout_fallback_obs + { + source observable : +--1-2-3-4----- ---5-| + operator "timeout(4, -10-|)" : +--1-2-3-4----10-| + } + * \param period is maximum duration between emitted items before a timeout occurs + * \param fallback_obs is observable to subscribe on when timeout reached + * \param scheduler is scheduler used to run timer for timeout + * \return new specific_observable with the timeout operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet timeout.cpp timeout_fallback_obs + * + * \ingroup utility_operators + * \see https://reactivex.io/documentation/operators/timeout.html + */ + template FallbackObs, schedulers::constraint::scheduler TScheduler> + auto timeout(schedulers::duration period, FallbackObs&& fallback_obs, const TScheduler& scheduler = TScheduler{}) const & requires is_header_included + { + return static_cast(this)->template lift(timeout_impl, TScheduler>{period, std::forward(fallback_obs), scheduler}); + } + + template FallbackObs,schedulers::constraint::scheduler TScheduler> + auto timeout(schedulers::duration period, FallbackObs&& fallback_obs, const TScheduler& scheduler = TScheduler{}) && requires is_header_included + { + return std::move(*static_cast(this)).template lift(timeout_impl, TScheduler>{period, std::forward(fallback_obs), scheduler}); + } + /** * \brief Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission) * @@ -49,13 +81,15 @@ struct member_overload template auto timeout(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) const & requires is_header_included { - return static_cast(this)->template lift(timeout_impl{period, scheduler}); + return timeout(period, rpp::source::error(std::make_exception_ptr(utils::timeout{"Timeout reached"})), scheduler); } template auto timeout(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) && requires is_header_included { - return std::move(*static_cast(this)).template lift(timeout_impl{period, scheduler}); + return std::move(*static_cast(this)).timeout(period, rpp::source::error(std::make_exception_ptr(utils::timeout{"Timeout reached"})), scheduler); } + + }; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 70815a834..ecccdea52 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -26,22 +27,26 @@ IMPLEMENTATION_FILE(timeout_tag); namespace rpp::details { +template struct timeout_state : early_unsubscribe_state { - using early_unsubscribe_state::early_unsubscribe_state; + timeout_state(const FallbackObs& fallback_obs, const composite_subscription& subscription_of_subscriber) + : early_unsubscribe_state(subscription_of_subscriber) + , fallback_obs{fallback_obs} {} + FallbackObs fallback_obs; std::atomic last_emission_time{}; static constexpr schedulers::time_point s_timeout_reached = schedulers::time_point::min(); }; -template +template struct timeout_on_next { template - void operator()(Value&& v, const auto& subscriber, const std::shared_ptr& state) const + void operator()(Value&& v, const auto& subscriber, const std::shared_ptr>& state) const { - if (state->last_emission_time.exchange(Worker::now(), std::memory_order_acq_rel) != timeout_state::s_timeout_reached) + if (state->last_emission_time.exchange(Worker::now(), std::memory_order_acq_rel) != timeout_state::s_timeout_reached) subscriber.on_next(std::forward(v)); } }; @@ -49,24 +54,26 @@ struct timeout_on_next using timeout_on_error = early_unsubscribe_on_error; using timeout_on_completed = early_unsubscribe_on_completed; -struct timeout_state_with_serialized_spinlock : timeout_state +template +struct timeout_state_with_serialized_spinlock : timeout_state { - using timeout_state::timeout_state; + using timeout_state::timeout_state; // spinlock because most part of time there is only one thread would be active utils::spinlock spinlock{}; }; -template +template FallbackObs, schedulers::constraint::scheduler TScheduler> struct timeout_impl { schedulers::duration period; + FallbackObs fallback_obs; TScheduler scheduler; template TSub> auto operator()(TSub&& in_subscriber) const { - auto state = std::make_shared(in_subscriber.get_subscription()); + auto state = std::make_shared>(fallback_obs, in_subscriber.get_subscription()); // change subscriber to serialized to avoid manual using of mutex auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->spinlock}); @@ -83,7 +90,7 @@ struct timeout_impl // last emission time still same value -> timeout reached, else -> prev_emission_time // would be update to actual emission time if (state->last_emission_time.compare_exchange_strong(prev_emission_time, - timeout_state::s_timeout_reached, + timeout_state::s_timeout_reached, std::memory_order_acq_rel)) return time_is_out(state, subscriber); @@ -101,7 +108,7 @@ struct timeout_impl }); return create_subscriber_with_state(state->children_subscriptions, - timeout_on_next{}, + timeout_on_next{}, timeout_on_error{}, timeout_on_completed{}, std::move(subscriber), @@ -112,7 +119,7 @@ struct timeout_impl static schedulers::optional_duration time_is_out(const auto& state, const auto& subscriber) { state->children_subscriptions.unsubscribe(); - subscriber.on_error(std::make_exception_ptr(utils::timeout{"Timeout reached"})); + state->fallback_obs.subscribe(subscriber); return std::nullopt; } }; diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp index 95ffee2cd..ffc1621e2 100644 --- a/src/tests/rpp/test_timeout.cpp +++ b/src/tests/rpp/test_timeout.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -112,3 +113,22 @@ SCENARIO("timeout sends error only on timeout", "[operators][timeout]") } } } + +SCENARIO("timeout subscribes on provided observable on timeout", "[operators][timeout]") +{ + auto mock = mock_observer{}; + GIVEN("never observable") + { + auto obs = rpp::source::never(); + WHEN("subscribe on it via timeout with fallback obs") + { + obs.timeout(std::chrono::microseconds{10}, rpp::source::just(100), rpp::schedulers::immediate{}).subscribe(mock); + THEN("subuscribe sees vales from fallback observabble") + { + CHECK(mock.get_received_values() == std::vector{100}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } +} From 9bc7aed98ef81c1391ce43483e59e88714d12d89 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 2 Dec 2022 23:27:44 +0300 Subject: [PATCH 2/2] minor fix --- src/rpp/rpp/operators/fwd/timeout.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rpp/rpp/operators/fwd/timeout.hpp b/src/rpp/rpp/operators/fwd/timeout.hpp index 29ad846f3..b652b4bfd 100644 --- a/src/rpp/rpp/operators/fwd/timeout.hpp +++ b/src/rpp/rpp/operators/fwd/timeout.hpp @@ -12,6 +12,7 @@ #include #include +#include #include namespace rpp::details