diff --git a/src/examples/rpp/doxygen/timeout.cpp b/src/examples/rpp/doxygen/timeout.cpp new file mode 100644 index 000000000..ef83bf0cf --- /dev/null +++ b/src/examples/rpp/doxygen/timeout.cpp @@ -0,0 +1,46 @@ +#include + +#include +#include +#include + +/** + * @example timeout.cpp + **/ + +int main() // NOLINT(bugprone-exception-escape) +{ + { + //! [fallback_observable] + auto start = rpp::schedulers::clock_type::now(); + + rpp::source::just(10, 30, 90, 110) + | rpp::operators::flat_map([](int v) { + return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{}); + }) + | rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::source::just(rpp::schedulers::immediate{}, 0), rpp::schedulers::new_thread{}) + | rpp::operators::as_blocking() + | rpp::operators::subscribe([start](int v) { std::cout << "received " << v << " at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; }, + [start](const std::exception_ptr&) { + std::cout << "received error at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; + }); + //! [fallback_observable] + } + + { + //! [default] + auto start = rpp::schedulers::clock_type::now(); + + rpp::source::just(10, 30, 90, 110) + | rpp::operators::flat_map([](int v) { + return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{}); + }) + | rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::schedulers::new_thread{}) + | rpp::operators::as_blocking() + | rpp::operators::subscribe([start](int v) { std::cout << "received " << v << " at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; }, + [start](const std::exception_ptr&) { + std::cout << "received error at " << std::chrono::duration_cast(rpp::schedulers::clock_type::now() - start).count() << std::endl; + }); + //! [default] + } +} \ No newline at end of file diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 1c9c1dfb0..0b2473a25 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -97,6 +97,7 @@ #include #include #include +#include /** * @defgroup connectable_operators Connectable Operators diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index fb5e40b74..efc52f2e6 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -167,6 +167,12 @@ namespace rpp::operators OnError&& on_error = {}, OnCompleted&& on_completed = {}); + template + auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable, const TScheduler& scheduler); + + template + auto timeout(rpp::schedulers::duration period, const TScheduler& scheduler); + template auto throttle(rpp::schedulers::duration period); diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index f6449d6c9..c7719df7e 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -28,9 +28,7 @@ namespace rpp::operators::details mutable rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); RPP_CALL_DURING_CONSTRUCTION( - { - observer.set_upstream(disposable); - }); + observer.set_upstream(disposable);); template void on_next(T&& v) const diff --git a/src/rpp/rpp/operators/scan.hpp b/src/rpp/rpp/operators/scan.hpp index cc81dc571..814fd8a0f 100644 --- a/src/rpp/rpp/operators/scan.hpp +++ b/src/rpp/rpp/operators/scan.hpp @@ -30,9 +30,7 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS Fn fn; RPP_CALL_DURING_CONSTRUCTION( - { - observer.on_next(utils::as_const(seed)); - }); + observer.on_next(utils::as_const(seed));); template void on_next(T&& v) const diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp new file mode 100644 index 000000000..93e54b2bd --- /dev/null +++ b/src/rpp/rpp/operators/timeout.hpp @@ -0,0 +1,238 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include +#include +#include + +namespace rpp::operators::details +{ + template + class timeout_disposable final : public rpp::composite_disposable_impl + { + public: + struct observer_with_timeout + { + TObserver observer; + rpp::schedulers::time_point timeout; + }; + + timeout_disposable(TObserver&& observer, rpp::schedulers::duration period, const TFallbackObservable& fallback, rpp::schedulers::time_point timeout) + : m_observer_with_timeout{observer_with_timeout{std::move(observer), timeout}} + , m_period{period} + , m_fallback{fallback} + { + } + pointer_under_lock get_observer_with_timeout_under_lock() { return pointer_under_lock{m_observer_with_timeout}; } + + const TFallbackObservable& get_fallback() const { return m_fallback; } + + rpp::schedulers::duration get_period() const { return m_period; } + + private: + value_with_mutex m_observer_with_timeout; + + const rpp::schedulers::duration m_period; + const TFallbackObservable m_fallback; + }; + + template + struct timeout_disposable_wrapper + { + std::shared_ptr> disposable; + + bool is_disposed() const { return disposable->is_disposed(); } + + void on_error(const std::exception_ptr& err) const + { + disposable->dispose(); + disposable->get_observer_with_timeout_under_lock()->observer.on_error(err); + } + }; + + template + struct timeout_observer_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr> disposable; + + void set_upstream(const rpp::disposable_wrapper& d) const + { + disposable->add(d); + } + + bool is_disposed() const + { + return disposable->is_disposed(); + } + + template + void on_next(T&& v) const + { + auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); + obs_with_timeout->observer.on_next(std::forward(v)); + obs_with_timeout->timeout = rpp::schedulers::utils::get_worker_t::now() + disposable->get_period(); + } + + void on_error(const std::exception_ptr& err) const noexcept + { + const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); + if (disposable->is_disposed()) + return; + + disposable->dispose(); + obs_with_timeout->observer.on_error(err); + } + + void on_completed() const noexcept + { + const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); + if (disposable->is_disposed()) + return; + + disposable->dispose(); + obs_with_timeout->observer.on_completed(); + } + }; + + template + struct timeout_t + { + template + struct operator_traits + { + static_assert(rpp::constraint::observable_of_type, "TFallbackObservable should be the same type as T"); + + using result_type = T; + }; + + rpp::schedulers::duration period; + RPP_NO_UNIQUE_ADDRESS TFallbackObservable fallback; + RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; + + template + auto lift_with_disposable_strategy(Observer&& observer) const + { + using worker_t = rpp::schedulers::utils::get_worker_t; + using container = typename DisposableStrategy::template add::disposable_container; + + const auto timeout = rpp::schedulers::utils::get_worker_t::now() + period; + + const auto disposable = disposable_wrapper_impl, TFallbackObservable, container>>::make(std::forward(observer), period, fallback, timeout); + auto ptr = disposable.lock(); + ptr->get_observer_with_timeout_under_lock()->observer.set_upstream(disposable.as_weak()); + + const auto worker = scheduler.create_worker(); + if constexpr (!rpp::schedulers::utils::get_worker_t::is_none_disposable) + { + if (auto d = worker.get_disposable(); !d.is_disposed()) + disposable.add(std::move(d)); + } + + using wrapper = timeout_disposable_wrapper, TFallbackObservable, container>; + worker.schedule( + timeout, + [](wrapper& handler) -> rpp::schedulers::optional_delay_to { + auto locked_obs_with_timeout = handler.disposable->get_observer_with_timeout_under_lock(); + if (rpp::schedulers::utils::get_worker_t::now() < locked_obs_with_timeout->timeout) + return rpp::schedulers::delay_to(locked_obs_with_timeout->timeout); + + if (!handler.disposable->is_disposed()) + { + handler.disposable->dispose(); + handler.disposable->get_fallback().subscribe(std::move(locked_obs_with_timeout->observer)); + } + return std::nullopt; + }, + wrapper{ptr}); + + return rpp::observer, TFallbackObservable, container, TScheduler>>{std::move(ptr)}; + } + }; + + template + struct timeout_with_error_t + { + template + struct operator_traits + { + using result_type = T; + }; + + rpp::schedulers::duration period; + RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; + + template + auto lift_with_disposable_strategy(Observer&& observer) const + { + return timeout_t, TScheduler>{period, rpp::source::error>(std::make_exception_ptr(rpp::utils::timeout_reached{"Timeout reached"})), scheduler} + .template lift_with_disposable_strategy(std::forward(observer)); + } + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + /** + * @brief Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission) + * + * @marble timeout_fallback_obs + { + source observable : +--1-2-3-4--------5-| + operator "timeout(4, -10-|)" : +--1-2-3-4----10-| + } + * + * @param period is maximum duration between emitted items before a timeout occurs + * @param fallback_observable is observable to subscribe on when timeout reached + * @param scheduler is scheduler used to run timer for timeout + * @warning #include + * + * @par Example + * @snippet timeout.cpp fallback_observable + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/timeout.html + */ + template + auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable, const TScheduler& scheduler) + { + return details::timeout_t, TScheduler>{period, std::forward(fallback_observable), scheduler}; + } + + /** + * @brief Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission) + * + * @marble timeout_default + { + source observable : +--1-2-3-4------5-| + operator "timeout(4)" : +--1-2-3-4----# + } + * @param period is maximum duration between emitted items before a timeout occurs + * @param scheduler is scheduler used to run timer for timeout + * @warning #include + * + * @par Example + * @snippet timeout.cpp default + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/timeout.html + */ + template + auto timeout(rpp::schedulers::duration period, const TScheduler& scheduler) + { + return details::timeout_with_error_t{period, scheduler}; + } +} // namespace rpp::operators \ No newline at end of file diff --git a/src/rpp/rpp/utils/exceptions.hpp b/src/rpp/rpp/utils/exceptions.hpp index 4b1741ebe..3b4c06f54 100644 --- a/src/rpp/rpp/utils/exceptions.hpp +++ b/src/rpp/rpp/utils/exceptions.hpp @@ -23,4 +23,9 @@ namespace rpp::utils { using std::runtime_error::runtime_error; }; + + struct timeout_reached : public std::runtime_error + { + using std::runtime_error::runtime_error; + }; } // namespace rpp::utils \ No newline at end of file diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp new file mode 100644 index 000000000..063563018 --- /dev/null +++ b/src/tests/rpp/test_timeout.cpp @@ -0,0 +1,147 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include +#include +#include +#include + +#include "disposable_observable.hpp" +#include "mock_observer.hpp" +#include "snitch_logging.hpp" +#include "test_scheduler.hpp" + + +TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") +{ + auto scheduler = test_scheduler{}; + auto mock = mock_observer_strategy{}; + const auto now = scheduler.now(); + + SECTION("timeout not reached") + { + rpp::source::just(scheduler, 1, 2, 3) + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(100), scheduler) + | rpp::ops::subscribe(mock); + + for (size_t i = 0; i < 3; ++i) + scheduler.time_advance(std::chrono::seconds{0}); + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{now, now, now}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}, now, now, now}); + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("timeout reached") + { + rpp::source::just(scheduler, 1) + | rpp::ops::delay(std::chrono::seconds{5}, scheduler) + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(100), scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{0}); + + for (size_t i = 0; i < 10; ++i) + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{now, now + std::chrono::seconds{1}}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}, now, now + std::chrono::seconds{5}}); + CHECK(mock.get_received_values() == std::vector{100}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("timeout with never") + { + rpp::source::never() + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(100), scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{now + std::chrono::seconds{1}}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}}); + CHECK(mock.get_received_values() == std::vector{100}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("timeout with empty") + { + rpp::source::empty() + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(100), scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}}); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("timeout with error") + { + rpp::source::error({}) + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::just(100), scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}}); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + + SECTION("timeout with default args") + { + rpp::source::never() + | rpp::ops::timeout(std::chrono::seconds{1}, scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{now + std::chrono::seconds{1}}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}}); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + + SECTION("never timeout with never") + { + rpp::source::never() + | rpp::ops::timeout(std::chrono::seconds{1}, rpp::source::never(), scheduler) + | rpp::ops::subscribe(mock); + + scheduler.time_advance(std::chrono::seconds{1}); + + CHECK(scheduler.get_executions() == std::vector{now + std::chrono::seconds{1}}); + CHECK(scheduler.get_schedulings() == std::vector{now + std::chrono::seconds{1}}); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } +} + +TEST_CASE("timeout satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::timeout(std::chrono::seconds{10000000}, test_scheduler{})); +} \ No newline at end of file diff --git a/src/tests/utils/test_scheduler.hpp b/src/tests/utils/test_scheduler.hpp index e87966060..3bbaca6c5 100644 --- a/src/tests/utils/test_scheduler.hpp +++ b/src/tests/utils/test_scheduler.hpp @@ -35,7 +35,7 @@ class test_scheduler final void drain() { - while (!queue.is_empty() && !is_disposed()) + while (!queue.is_empty()) { auto time_point = queue.top()->get_timepoint(); if (time_point > s_current_time) @@ -44,6 +44,9 @@ class test_scheduler final auto fn = queue.top(); queue.pop(); + if (fn->is_disposed()) + continue; + executions.push_back(s_current_time); if (auto new_timepoint = (*fn)()) {