From 9d83e53d2958f46b4a200344d5384dda47d69ac8 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 19 Feb 2024 00:14:25 +0300 Subject: [PATCH 1/8] init timeout --- src/rpp/rpp/operators/timeout.hpp | 48 +++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 src/rpp/rpp/operators/timeout.hpp diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp new file mode 100644 index 000000000..f9972f8f4 --- /dev/null +++ b/src/rpp/rpp/operators/timeout.hpp @@ -0,0 +1,48 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include + +namespace rpp::operators::details +{ + template + struct timeout_observer_strategy + { + }; + + template + struct timeout_t : public lift_operator, rpp::schedulers::duration, TFallbackObservable, TScheduler> + { + template + struct operator_traits + { + using result_type = T; + + template TObserver> + using observer_strategy = timeout_observer_strategy; + }; + + template + using updated_disposable_strategy = typename Prev::template add::is_none_disposable ? 0 : 1>; + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + template + auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable, const TScheduler& scheduler /* = {}*/) + { + return details::timeout_t, TScheduler>{period, std::forward(fallback_observable), scheduler}; + } +} // namespace rpp::operators \ No newline at end of file From 1c099cfe410ea2e1233c6e321649192ae8821e27 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 25 Feb 2024 13:26:29 +0300 Subject: [PATCH 2/8] update implementation --- .../rpp/operators/on_error_resume_next.hpp | 5 +- src/rpp/rpp/operators/scan.hpp | 5 +- src/rpp/rpp/operators/timeout.hpp | 129 +++++++++++++++++- 3 files changed, 126 insertions(+), 13 deletions(-) diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index f6449d6c9..4dd907e0d 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -28,9 +28,8 @@ namespace rpp::operators::details mutable rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); RPP_CALL_DURING_CONSTRUCTION( - { - observer.set_upstream(disposable); - }); + observer.set_upstream(disposable); + ); template void on_next(T&& v) const diff --git a/src/rpp/rpp/operators/scan.hpp b/src/rpp/rpp/operators/scan.hpp index cc81dc571..ce54be5bb 100644 --- a/src/rpp/rpp/operators/scan.hpp +++ b/src/rpp/rpp/operators/scan.hpp @@ -30,9 +30,8 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS Fn fn; RPP_CALL_DURING_CONSTRUCTION( - { - observer.on_next(utils::as_const(seed)); - }); + observer.on_next(utils::as_const(seed)); + ); template void on_next(T&& v) const diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index f9972f8f4..2e5358db6 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -13,28 +13,143 @@ #include #include +#include namespace rpp::operators::details { - template + template + class timeout_disposable final : public rpp::composite_disposable_impl + { + public: + struct observer_with_timeout + { + TObserver observer; + rpp::schedulers::time_point timeout; + }; + + timeout_disposable(TObserver&& observer, rpp::schedulers::duration period, const TFallbackObservable& fallback, rpp::schedulers::time_point timeout) + : m_observer_with_timeout{std::move(observer), timeout} + , m_period{period} + , m_fallback{fallback} + { + } + pointer_under_lock get_observer_with_timeout_under_lock() { return pointer_under_lock{m_observer_with_timeout}; } + + const TFallbackObservable& get_fallback() const { return m_fallback; } + + private: + value_with_mutex m_observer_with_timeout; + + const rpp::schedulers::duration m_period; + const TFallbackObservable m_fallback; + }; + + template + struct timeout_disposable_wrapper + { + std::shared_ptr> disposable; + + bool is_disposed() const { return disposable->is_disposed(); } + + void on_error(const std::exception_ptr& err) const + { + disposable->dispose(); + disposable->get_observer_with_timeout_under_lock()->observer.on_error(err); + } + }; + + template struct timeout_observer_strategy { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr> disposable; + + void set_upstream(const rpp::disposable_wrapper& d) const + { + disposable->add(d); + } + + bool is_disposed() const + { + return disposable->is_disposed(); + } + + template + void on_next(T&& v) const + { + const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); + obs_with_timeout->observer.on_next(std::forward(v)); + obs_with_timeout->timeout = disposable->scheduler.now() + disposable->m_period; + } + + void on_error(const std::exception_ptr& err) const noexcept + { + const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); + if (disposable->is_disposed()) + return; + + disposable->dispose(); + obs_with_timeout->observer.on_error(err); + } + + void on_completed() const noexcept + { + const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); + if (disposable->is_disposed()) + return; + + disposable->dispose(); + obs_with_timeout->observer.on_completed(); + } }; template - struct timeout_t : public lift_operator, rpp::schedulers::duration, TFallbackObservable, TScheduler> + struct timeout_t { template struct operator_traits { using result_type = T; - - template TObserver> - using observer_strategy = timeout_observer_strategy; }; - template - using updated_disposable_strategy = typename Prev::template add::is_none_disposable ? 0 : 1>; + rpp::schedulers::duration period; + RPP_NO_UNIQUE_ADDRESS TFallbackObservable fallback; + RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; + + template + auto lift_with_disposable_strategy(Observer&& observer) const + { + using worker_t = rpp::schedulers::utils::get_worker_t; + using container = typename DisposableStrategy::template add::disposable_container; + + const auto timeout = TScheduler::now() + period; + + const auto disposable = disposable_wrapper_impl, TFallbackObservable, container>>::make(std::forward(observer), period, fallback, timeout); + auto ptr = disposable.lock(); + ptr->get_observer_with_timeout_under_lock()->observer.set_upstream(disposable.as_weak()); + + const auto worker = scheduler.create_worker(); + if constexpr (!rpp::schedulers::utils::get_worker_t::is_none_disposable) + { + if (auto d = worker.get_disposable(); !d.is_disposed()) + disposable.add(std::move(d)); + } + + worker.schedule( + timeout, + [](timeout_disposable_wrapper, TFallbackObservable, container>& handler) -> rpp::schedulers::optional_delay_to { + auto locked_obs_with_timeout = handler.disposable.get_observer_with_timeout_under_lock(); + if (TScheduler::now() < locked_obs_with_timeout->timeout) + return rpp::schedulers::delay_to(locked_obs_with_timeout->timeout); + + handler.disposable->dispose(); + handler.disposable->get_fallback().subscribe(std::move(locked_obs_with_timeout->observer)); + return std::nullopt; + }, ptr); + + return rpp::observer, TFallbackObservable, container, TScheduler>>{std::move(ptr)}; + } }; } // namespace rpp::operators::details From a97cbce896ce90a39df0b4f0ceb1a70735a5af46 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 27 Feb 2024 23:34:27 +0300 Subject: [PATCH 3/8] extend tests --- src/rpp/rpp/operators/timeout.hpp | 26 +++++++----- src/tests/rpp/test_timeout.cpp | 63 ++++++++++++++++++++++++++++++ src/tests/utils/test_scheduler.hpp | 5 ++- 3 files changed, 84 insertions(+), 10 deletions(-) create mode 100644 src/tests/rpp/test_timeout.cpp diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 2e5358db6..02d241407 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -28,7 +28,7 @@ namespace rpp::operators::details }; timeout_disposable(TObserver&& observer, rpp::schedulers::duration period, const TFallbackObservable& fallback, rpp::schedulers::time_point timeout) - : m_observer_with_timeout{std::move(observer), timeout} + : m_observer_with_timeout{observer_with_timeout{std::move(observer), timeout}} , m_period{period} , m_fallback{fallback} { @@ -37,6 +37,8 @@ namespace rpp::operators::details const TFallbackObservable& get_fallback() const { return m_fallback; } + rpp::schedulers::duration get_period() const { return m_period; } + private: value_with_mutex m_observer_with_timeout; @@ -78,9 +80,9 @@ namespace rpp::operators::details template void on_next(T&& v) const { - const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); + auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); obs_with_timeout->observer.on_next(std::forward(v)); - obs_with_timeout->timeout = disposable->scheduler.now() + disposable->m_period; + obs_with_timeout->timeout = TScheduler::now() + disposable->get_period(); } void on_error(const std::exception_ptr& err) const noexcept @@ -110,6 +112,8 @@ namespace rpp::operators::details template struct operator_traits { + static_assert(rpp::constraint::observable_of_type, "TFallbackObservable should be the same type as T"); + using result_type = T; }; @@ -136,17 +140,21 @@ namespace rpp::operators::details disposable.add(std::move(d)); } + using wrapper = timeout_disposable_wrapper, TFallbackObservable, container>; worker.schedule( timeout, - [](timeout_disposable_wrapper, TFallbackObservable, container>& handler) -> rpp::schedulers::optional_delay_to { - auto locked_obs_with_timeout = handler.disposable.get_observer_with_timeout_under_lock(); + [](wrapper& handler) -> rpp::schedulers::optional_delay_to { + auto locked_obs_with_timeout = handler.disposable->get_observer_with_timeout_under_lock(); if (TScheduler::now() < locked_obs_with_timeout->timeout) return rpp::schedulers::delay_to(locked_obs_with_timeout->timeout); - - handler.disposable->dispose(); - handler.disposable->get_fallback().subscribe(std::move(locked_obs_with_timeout->observer)); + + if (!handler.disposable->is_disposed()) + { + handler.disposable->dispose(); + handler.disposable->get_fallback().subscribe(std::move(locked_obs_with_timeout->observer)); + } return std::nullopt; - }, ptr); + }, wrapper{ptr}); return rpp::observer, TFallbackObservable, container, TScheduler>>{std::move(ptr)}; } diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp new file mode 100644 index 000000000..5dfdf2407 --- /dev/null +++ b/src/tests/rpp/test_timeout.cpp @@ -0,0 +1,63 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include + +#include "snitch_logging.hpp" + +#include "mock_observer.hpp" +#include "test_scheduler.hpp" + +TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") +{ + auto scheduler = test_scheduler{}; + auto mock = mock_observer_strategy{}; + const auto now = scheduler.now(); + + SECTION("timeout not reached") + { + rpp::source::just(scheduler, 1) + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(2), scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{0}); + scheduler.time_advance(std::chrono::seconds{1}); + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{now}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}, now}); + CHECK(mock.get_received_values() == std::vector{1}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("timeout reached") + { + rpp::source::just(scheduler, 1) + | rpp::ops::delay(std::chrono::seconds{5}, scheduler) + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(2), scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{0}); + + for (size_t i =0; i < 10; ++i) + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{now, now + std::chrono::seconds{1}}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}, now, now + std::chrono::seconds{5}}); + CHECK(mock.get_received_values() == std::vector{2}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } +} \ No newline at end of file diff --git a/src/tests/utils/test_scheduler.hpp b/src/tests/utils/test_scheduler.hpp index e87966060..3bbaca6c5 100644 --- a/src/tests/utils/test_scheduler.hpp +++ b/src/tests/utils/test_scheduler.hpp @@ -35,7 +35,7 @@ class test_scheduler final void drain() { - while (!queue.is_empty() && !is_disposed()) + while (!queue.is_empty()) { auto time_point = queue.top()->get_timepoint(); if (time_point > s_current_time) @@ -44,6 +44,9 @@ class test_scheduler final auto fn = queue.top(); queue.pop(); + if (fn->is_disposed()) + continue; + executions.push_back(s_current_time); if (auto new_timepoint = (*fn)()) { From f6b1e5da5d24a226e5a7d7e07fabd6eb4254c439 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 27 Feb 2024 21:16:49 +0000 Subject: [PATCH 4/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/operators/on_error_resume_next.hpp | 3 +-- src/rpp/rpp/operators/scan.hpp | 3 +-- src/rpp/rpp/operators/timeout.hpp | 11 ++++++----- src/tests/rpp/test_timeout.cpp | 11 +++++------ 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 4dd907e0d..c7719df7e 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -28,8 +28,7 @@ namespace rpp::operators::details mutable rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); RPP_CALL_DURING_CONSTRUCTION( - observer.set_upstream(disposable); - ); + observer.set_upstream(disposable);); template void on_next(T&& v) const diff --git a/src/rpp/rpp/operators/scan.hpp b/src/rpp/rpp/operators/scan.hpp index ce54be5bb..814fd8a0f 100644 --- a/src/rpp/rpp/operators/scan.hpp +++ b/src/rpp/rpp/operators/scan.hpp @@ -30,8 +30,7 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS Fn fn; RPP_CALL_DURING_CONSTRUCTION( - observer.on_next(utils::as_const(seed)); - ); + observer.on_next(utils::as_const(seed));); template void on_next(T&& v) const diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 02d241407..e6817ed69 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -23,7 +23,7 @@ namespace rpp::operators::details public: struct observer_with_timeout { - TObserver observer; + TObserver observer; rpp::schedulers::time_point timeout; }; @@ -40,7 +40,7 @@ namespace rpp::operators::details rpp::schedulers::duration get_period() const { return m_period; } private: - value_with_mutex m_observer_with_timeout; + value_with_mutex m_observer_with_timeout; const rpp::schedulers::duration m_period; const TFallbackObservable m_fallback; @@ -147,14 +147,15 @@ namespace rpp::operators::details auto locked_obs_with_timeout = handler.disposable->get_observer_with_timeout_under_lock(); if (TScheduler::now() < locked_obs_with_timeout->timeout) return rpp::schedulers::delay_to(locked_obs_with_timeout->timeout); - - if (!handler.disposable->is_disposed()) + + if (!handler.disposable->is_disposed()) { handler.disposable->dispose(); handler.disposable->get_fallback().subscribe(std::move(locked_obs_with_timeout->observer)); } return std::nullopt; - }, wrapper{ptr}); + }, + wrapper{ptr}); return rpp::observer, TFallbackObservable, container, TScheduler>>{std::move(ptr)}; } diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp index 5dfdf2407..6b3906074 100644 --- a/src/tests/rpp/test_timeout.cpp +++ b/src/tests/rpp/test_timeout.cpp @@ -10,13 +10,12 @@ #include -#include #include +#include #include -#include "snitch_logging.hpp" - #include "mock_observer.hpp" +#include "snitch_logging.hpp" #include "test_scheduler.hpp" TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") @@ -48,10 +47,10 @@ TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") | rpp::ops::delay(std::chrono::seconds{5}, scheduler) | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(2), scheduler) | rpp::ops::subscribe(mock); - + scheduler.time_advance(std::chrono::seconds{0}); - - for (size_t i =0; i < 10; ++i) + + for (size_t i = 0; i < 10; ++i) scheduler.time_advance(std::chrono::seconds{1}); CHECK(scheduler.get_executions() == std::vector{now, now + std::chrono::seconds{1}}); From 8ec3c80c6d85a29102f4eff85853a34f57582438 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 3 Mar 2024 12:43:06 +0300 Subject: [PATCH 5/8] extend tests and examples --- src/examples/rpp/doxygen/timeout.cpp | 52 ++++++++++++++++++ src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/fwd.hpp | 6 +++ src/rpp/rpp/operators/timeout.hpp | 77 ++++++++++++++++++++++++-- src/rpp/rpp/utils/exceptions.hpp | 5 ++ src/tests/rpp/test_timeout.cpp | 81 ++++++++++++++++++++++++---- 6 files changed, 208 insertions(+), 14 deletions(-) create mode 100644 src/examples/rpp/doxygen/timeout.cpp diff --git a/src/examples/rpp/doxygen/timeout.cpp b/src/examples/rpp/doxygen/timeout.cpp new file mode 100644 index 000000000..727d0c190 --- /dev/null +++ b/src/examples/rpp/doxygen/timeout.cpp @@ -0,0 +1,52 @@ +#include + +#include +#include +#include + +/** + * @example timeout.cpp + **/ + +int main() // NOLINT(bugprone-exception-escape) +{ + { + //! [fallback_observable] + auto start = rpp::schedulers::clock_type::now(); + + rpp::source::just(10,30,90,110) + | rpp::operators::flat_map([](int v) { + return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{}); + }) + | rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::source::just(rpp::schedulers::immediate{}, 0), rpp::schedulers::new_thread{}) + | rpp::operators::as_blocking() + | rpp::operators::subscribe([start](int v) { + std::cout << "received " << v << " at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; + }, + [start](const std::exception_ptr&) + { + std::cout << "received error at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; + }); + //! [fallback_observable] + } + + { + //! [default] + auto start = rpp::schedulers::clock_type::now(); + + rpp::source::just(10,30,90,110) + | rpp::operators::flat_map([](int v) { + return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{}); + }) + | rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::schedulers::new_thread{}) + | rpp::operators::as_blocking() + | rpp::operators::subscribe([start](int v) { + std::cout << "received " << v << " at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; + }, + [start](const std::exception_ptr&) + { + std::cout << "received error at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; + }); + //! [default] + } +} \ No newline at end of file diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 1c9c1dfb0..0b2473a25 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -97,6 +97,7 @@ #include #include #include +#include /** * @defgroup connectable_operators Connectable Operators diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index fb5e40b74..efc52f2e6 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -167,6 +167,12 @@ namespace rpp::operators OnError&& on_error = {}, OnCompleted&& on_completed = {}); + template + auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable, const TScheduler& scheduler); + + template + auto timeout(rpp::schedulers::duration period, const TScheduler& scheduler); + template auto throttle(rpp::schedulers::duration period); diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index e6817ed69..c8e2e8825 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -14,6 +14,9 @@ #include #include +#include + +#include namespace rpp::operators::details { @@ -82,7 +85,7 @@ namespace rpp::operators::details { auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); obs_with_timeout->observer.on_next(std::forward(v)); - obs_with_timeout->timeout = TScheduler::now() + disposable->get_period(); + obs_with_timeout->timeout = rpp::schedulers::utils::get_worker_t::now() + disposable->get_period(); } void on_error(const std::exception_ptr& err) const noexcept @@ -127,7 +130,7 @@ namespace rpp::operators::details using worker_t = rpp::schedulers::utils::get_worker_t; using container = typename DisposableStrategy::template add::disposable_container; - const auto timeout = TScheduler::now() + period; + const auto timeout = rpp::schedulers::utils::get_worker_t::now() + period; const auto disposable = disposable_wrapper_impl, TFallbackObservable, container>>::make(std::forward(observer), period, fallback, timeout); auto ptr = disposable.lock(); @@ -145,7 +148,7 @@ namespace rpp::operators::details timeout, [](wrapper& handler) -> rpp::schedulers::optional_delay_to { auto locked_obs_with_timeout = handler.disposable->get_observer_with_timeout_under_lock(); - if (TScheduler::now() < locked_obs_with_timeout->timeout) + if (rpp::schedulers::utils::get_worker_t::now() < locked_obs_with_timeout->timeout) return rpp::schedulers::delay_to(locked_obs_with_timeout->timeout); if (!handler.disposable->is_disposed()) @@ -160,13 +163,77 @@ namespace rpp::operators::details return rpp::observer, TFallbackObservable, container, TScheduler>>{std::move(ptr)}; } }; + + template + struct timeout_with_error_t + { + template + struct operator_traits + { + using result_type = T; + }; + + rpp::schedulers::duration period; + RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; + + template + auto lift_with_disposable_strategy(Observer&& observer) const + { + return timeout_t, TScheduler>{period, rpp::source::error>(std::make_exception_ptr(rpp::utils::timeout_reached{"Timeout reached"})), scheduler} + .template lift_with_disposable_strategy(std::forward(observer)); + } + }; } // namespace rpp::operators::details namespace rpp::operators { - template - auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable, const TScheduler& scheduler /* = {}*/) + /** + * @brief Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission) + * + * @marble timeout_fallback_obs + { + source observable : +--1-2-3-4--------5-| + operator "timeout(4, -10-|)" : +--1-2-3-4----10-| + } + * + * @param period is maximum duration between emitted items before a timeout occurs + * @param fallback_observable is observable to subscribe on when timeout reached + * @param scheduler is scheduler used to run timer for timeout + * @warning #include + * + * @par Example + * @snippet timeout.cpp fallback_observable + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/timeout.html + */ + template + auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable, const TScheduler& scheduler) { return details::timeout_t, TScheduler>{period, std::forward(fallback_observable), scheduler}; } + + /** + * @brief Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission) + * + * @marble timeout_default + { + source observable : +--1-2-3-4------5-| + operator "timeout(4)" : +--1-2-3-4----# + } + * @param period is maximum duration between emitted items before a timeout occurs + * @param scheduler is scheduler used to run timer for timeout + * @warning #include + * + * @par Example + * @snippet timeout.cpp default + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/timeout.html + */ + template + auto timeout(rpp::schedulers::duration period, const TScheduler& scheduler) + { + return details::timeout_with_error_t{period, scheduler}; + } } // namespace rpp::operators \ No newline at end of file diff --git a/src/rpp/rpp/utils/exceptions.hpp b/src/rpp/rpp/utils/exceptions.hpp index 4b1741ebe..3b4c06f54 100644 --- a/src/rpp/rpp/utils/exceptions.hpp +++ b/src/rpp/rpp/utils/exceptions.hpp @@ -23,4 +23,9 @@ namespace rpp::utils { using std::runtime_error::runtime_error; }; + + struct timeout_reached : public std::runtime_error + { + using std::runtime_error::runtime_error; + }; } // namespace rpp::utils \ No newline at end of file diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp index 6b3906074..4c9bcabef 100644 --- a/src/tests/rpp/test_timeout.cpp +++ b/src/tests/rpp/test_timeout.cpp @@ -13,6 +13,9 @@ #include #include #include +#include +#include +#include #include "mock_observer.hpp" #include "snitch_logging.hpp" @@ -26,17 +29,17 @@ TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") SECTION("timeout not reached") { - rpp::source::just(scheduler, 1) - | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(2), scheduler) + rpp::source::just(scheduler, 1,2,3) + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(100), scheduler) | rpp::ops::subscribe(mock); - scheduler.time_advance(std::chrono::seconds{0}); - scheduler.time_advance(std::chrono::seconds{1}); + for (size_t i =0; i < 3; ++i) + scheduler.time_advance(std::chrono::seconds{0}); scheduler.time_advance(std::chrono::seconds{1}); - CHECK(scheduler.get_executions() == std::vector{now}); - CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}, now}); - CHECK(mock.get_received_values() == std::vector{1}); + CHECK(scheduler.get_executions() == std::vector{now, now, now}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}, now, now, now}); + CHECK(mock.get_received_values() == std::vector{1,2,3}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); } @@ -45,7 +48,7 @@ TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") { rpp::source::just(scheduler, 1) | rpp::ops::delay(std::chrono::seconds{5}, scheduler) - | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(2), scheduler) + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(100), scheduler) | rpp::ops::subscribe(mock); scheduler.time_advance(std::chrono::seconds{0}); @@ -55,8 +58,68 @@ TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") CHECK(scheduler.get_executions() == std::vector{now, now + std::chrono::seconds{1}}); CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}, now, now + std::chrono::seconds{5}}); - CHECK(mock.get_received_values() == std::vector{2}); + CHECK(mock.get_received_values() == std::vector{100}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("timeout with never") + { + rpp::source::never() + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(100), scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{now + std::chrono::seconds{1}}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}}); + CHECK(mock.get_received_values() == std::vector{100}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); } + + SECTION("timeout with empty") + { + rpp::source::empty() + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(100), scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}}); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("timeout with error") + { + rpp::source::error({}) + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(100), scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}}); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + + SECTION("timeout with default args") + { + rpp::source::never() + | rpp::ops::timeout(std::chrono::seconds{1}, scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{now + std::chrono::seconds{1}}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}}); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } } \ No newline at end of file From 3f8387294a9258e9366b08f81b5c0bd5629f82a4 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 3 Mar 2024 12:44:22 +0300 Subject: [PATCH 6/8] extend tests --- src/tests/rpp/test_timeout.cpp | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp index 4c9bcabef..5b9d3f124 100644 --- a/src/tests/rpp/test_timeout.cpp +++ b/src/tests/rpp/test_timeout.cpp @@ -20,6 +20,8 @@ #include "mock_observer.hpp" #include "snitch_logging.hpp" #include "test_scheduler.hpp" +#include "disposable_observable.hpp" + TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") { @@ -122,4 +124,24 @@ TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") CHECK(mock.get_on_error_count() == 1); CHECK(mock.get_on_completed_count() == 0); } + + SECTION("never timeout with empty") + { + rpp::source::empty() + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::never(), scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}}); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } +} + +TEST_CASE("timeout satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::timeout(std::chrono::seconds{0}, test_scheduler{})); } \ No newline at end of file From 9a3b21493cb88c13e3d4510fcdf3deac3e59e4c4 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 3 Mar 2024 12:46:14 +0300 Subject: [PATCH 7/8] extend --- src/tests/rpp/test_timeout.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp index 5b9d3f124..207a0cb07 100644 --- a/src/tests/rpp/test_timeout.cpp +++ b/src/tests/rpp/test_timeout.cpp @@ -125,15 +125,15 @@ TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") CHECK(mock.get_on_completed_count() == 0); } - SECTION("never timeout with empty") + SECTION("never timeout with never") { - rpp::source::empty() + rpp::source::never() | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::never(), scheduler) | rpp::ops::subscribe(mock); scheduler.time_advance(std::chrono::seconds{1}); - CHECK(scheduler.get_executions() == std::vector{}); + CHECK(scheduler.get_executions() == std::vector{now + std::chrono::seconds{1}}); CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}}); CHECK(mock.get_received_values() == std::vector{}); CHECK(mock.get_on_error_count() == 0); @@ -143,5 +143,5 @@ TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") TEST_CASE("timeout satisfies disposable contracts") { - test_operator_with_disposable(rpp::ops::timeout(std::chrono::seconds{0}, test_scheduler{})); + test_operator_with_disposable(rpp::ops::timeout(std::chrono::seconds{10000000}, test_scheduler{})); } \ No newline at end of file From 53827ee6ec3d519b0581c79f10de5f5b21e87050 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 3 Mar 2024 09:46:21 +0000 Subject: [PATCH 8/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/examples/rpp/doxygen/timeout.cpp | 58 +++++++++++++--------------- src/rpp/rpp/operators/timeout.hpp | 13 +++---- src/tests/rpp/test_timeout.cpp | 12 +++--- 3 files changed, 38 insertions(+), 45 deletions(-) diff --git a/src/examples/rpp/doxygen/timeout.cpp b/src/examples/rpp/doxygen/timeout.cpp index 727d0c190..ef83bf0cf 100644 --- a/src/examples/rpp/doxygen/timeout.cpp +++ b/src/examples/rpp/doxygen/timeout.cpp @@ -11,42 +11,36 @@ int main() // NOLINT(bugprone-exception-escape) { { - //! [fallback_observable] - auto start = rpp::schedulers::clock_type::now(); + //! [fallback_observable] + auto start = rpp::schedulers::clock_type::now(); - rpp::source::just(10,30,90,110) - | rpp::operators::flat_map([](int v) { - return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{}); - }) - | rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::source::just(rpp::schedulers::immediate{}, 0), rpp::schedulers::new_thread{}) - | rpp::operators::as_blocking() - | rpp::operators::subscribe([start](int v) { - std::cout << "received " << v << " at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; - }, - [start](const std::exception_ptr&) - { - std::cout << "received error at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; - }); - //! [fallback_observable] + rpp::source::just(10, 30, 90, 110) + | rpp::operators::flat_map([](int v) { + return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{}); + }) + | rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::source::just(rpp::schedulers::immediate{}, 0), rpp::schedulers::new_thread{}) + | rpp::operators::as_blocking() + | rpp::operators::subscribe([start](int v) { std::cout << "received " << v << " at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; }, + [start](const std::exception_ptr&) { + std::cout << "received error at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; + }); + //! [fallback_observable] } { - //! [default] - auto start = rpp::schedulers::clock_type::now(); + //! [default] + auto start = rpp::schedulers::clock_type::now(); - rpp::source::just(10,30,90,110) - | rpp::operators::flat_map([](int v) { - return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{}); - }) - | rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::schedulers::new_thread{}) - | rpp::operators::as_blocking() - | rpp::operators::subscribe([start](int v) { - std::cout << "received " << v << " at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; - }, - [start](const std::exception_ptr&) - { - std::cout << "received error at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; - }); - //! [default] + rpp::source::just(10, 30, 90, 110) + | rpp::operators::flat_map([](int v) { + return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{}); + }) + | rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::schedulers::new_thread{}) + | rpp::operators::as_blocking() + | rpp::operators::subscribe([start](int v) { std::cout << "received " << v << " at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; }, + [start](const std::exception_ptr&) { + std::cout << "received error at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; + }); + //! [default] } } \ No newline at end of file diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index c8e2e8825..93e54b2bd 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -15,7 +15,6 @@ #include #include #include - #include namespace rpp::operators::details @@ -189,18 +188,18 @@ namespace rpp::operators { /** * @brief Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission) - * + * * @marble timeout_fallback_obs { source observable : +--1-2-3-4--------5-| operator "timeout(4, -10-|)" : +--1-2-3-4----10-| } - * + * * @param period is maximum duration between emitted items before a timeout occurs * @param fallback_observable is observable to subscribe on when timeout reached * @param scheduler is scheduler used to run timer for timeout * @warning #include - * + * * @par Example * @snippet timeout.cpp fallback_observable * @@ -215,7 +214,7 @@ namespace rpp::operators /** * @brief Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission) - * + * * @marble timeout_default { source observable : +--1-2-3-4------5-| @@ -224,10 +223,10 @@ namespace rpp::operators * @param period is maximum duration between emitted items before a timeout occurs * @param scheduler is scheduler used to run timer for timeout * @warning #include - * + * * @par Example * @snippet timeout.cpp default - * + * * @ingroup utility_operators * @see https://reactivex.io/documentation/operators/timeout.html */ diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp index 207a0cb07..063563018 100644 --- a/src/tests/rpp/test_timeout.cpp +++ b/src/tests/rpp/test_timeout.cpp @@ -12,15 +12,15 @@ #include #include -#include #include -#include #include +#include +#include +#include "disposable_observable.hpp" #include "mock_observer.hpp" #include "snitch_logging.hpp" #include "test_scheduler.hpp" -#include "disposable_observable.hpp" TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") @@ -31,17 +31,17 @@ TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") SECTION("timeout not reached") { - rpp::source::just(scheduler, 1,2,3) + rpp::source::just(scheduler, 1, 2, 3) | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(100), scheduler) | rpp::ops::subscribe(mock); - for (size_t i =0; i < 3; ++i) + for (size_t i = 0; i < 3; ++i) scheduler.time_advance(std::chrono::seconds{0}); scheduler.time_advance(std::chrono::seconds{1}); CHECK(scheduler.get_executions() == std::vector{now, now, now}); CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}, now, now, now}); - CHECK(mock.get_received_values() == std::vector{1,2,3}); + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); }