Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/examples/rpp/doxygen/timeout.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include <rpp/rpp.hpp>

#include <chrono>
#include <ctime>
#include <iostream>

/**
* @example timeout.cpp
**/

int main() // NOLINT(bugprone-exception-escape)
{
{
//! [fallback_observable]
auto start = rpp::schedulers::clock_type::now();

rpp::source::just(10, 30, 90, 110)
| rpp::operators::flat_map([](int v) {
return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{});
})
| rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::source::just(rpp::schedulers::immediate{}, 0), rpp::schedulers::new_thread{})
| rpp::operators::as_blocking()
| rpp::operators::subscribe([start](int v) { std::cout << "received " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; },
[start](const std::exception_ptr&) {
std::cout << "received error at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
});
//! [fallback_observable]
}

{
//! [default]
auto start = rpp::schedulers::clock_type::now();

rpp::source::just(10, 30, 90, 110)
| rpp::operators::flat_map([](int v) {
return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{});
})
| rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::schedulers::new_thread{})
| rpp::operators::as_blocking()
| rpp::operators::subscribe([start](int v) { std::cout << "received " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; },
[start](const std::exception_ptr&) {
std::cout << "received error at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
});
//! [default]
}
}
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
#include <rpp/operators/repeat.hpp>
#include <rpp/operators/subscribe_on.hpp>
#include <rpp/operators/tap.hpp>
#include <rpp/operators/timeout.hpp>

/**
* @defgroup connectable_operators Connectable Operators
Expand Down
6 changes: 6 additions & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ namespace rpp::operators
OnError&& on_error = {},
OnCompleted&& on_completed = {});

template<rpp::constraint::observable TFallbackObservable, rpp::schedulers::constraint::scheduler TScheduler>
auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable, const TScheduler& scheduler);

template<rpp::schedulers::constraint::scheduler TScheduler>
auto timeout(rpp::schedulers::duration period, const TScheduler& scheduler);

template<rpp::schedulers::constraint::scheduler Scheduler = rpp::schedulers::immediate>
auto throttle(rpp::schedulers::duration period);

Expand Down
4 changes: 1 addition & 3 deletions src/rpp/rpp/operators/on_error_resume_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ namespace rpp::operators::details
mutable rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make();

RPP_CALL_DURING_CONSTRUCTION(
{
observer.set_upstream(disposable);
});
observer.set_upstream(disposable););

template<typename T>
void on_next(T&& v) const
Expand Down
4 changes: 1 addition & 3 deletions src/rpp/rpp/operators/scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ namespace rpp::operators::details
RPP_NO_UNIQUE_ADDRESS Fn fn;

RPP_CALL_DURING_CONSTRUCTION(
{
observer.on_next(utils::as_const(seed));
});
observer.on_next(utils::as_const(seed)););

template<typename T>
void on_next(T&& v) const
Expand Down
238 changes: 238 additions & 0 deletions src/rpp/rpp/operators/timeout.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/operators/fwd.hpp>

#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/sources/error.hpp>
#include <rpp/utils/exceptions.hpp>

namespace rpp::operators::details
{
template<rpp::constraint::observer TObserver, rpp::constraint::observable TFallbackObservable, rpp::details::disposables::constraint::disposable_container Container>
class timeout_disposable final : public rpp::composite_disposable_impl<Container>
{
public:
struct observer_with_timeout
{
TObserver observer;
rpp::schedulers::time_point timeout;
};

timeout_disposable(TObserver&& observer, rpp::schedulers::duration period, const TFallbackObservable& fallback, rpp::schedulers::time_point timeout)
: m_observer_with_timeout{observer_with_timeout{std::move(observer), timeout}}
, m_period{period}
, m_fallback{fallback}
{
}
pointer_under_lock<observer_with_timeout> get_observer_with_timeout_under_lock() { return pointer_under_lock{m_observer_with_timeout}; }

const TFallbackObservable& get_fallback() const { return m_fallback; }

rpp::schedulers::duration get_period() const { return m_period; }

private:
value_with_mutex<observer_with_timeout> m_observer_with_timeout;

const rpp::schedulers::duration m_period;
const TFallbackObservable m_fallback;
};

template<rpp::constraint::observer TObserver, rpp::constraint::observable TFallbackObservable, rpp::details::disposables::constraint::disposable_container Container>
struct timeout_disposable_wrapper
{
std::shared_ptr<timeout_disposable<TObserver, TFallbackObservable, Container>> disposable;

bool is_disposed() const { return disposable->is_disposed(); }

void on_error(const std::exception_ptr& err) const
{
disposable->dispose();
disposable->get_observer_with_timeout_under_lock()->observer.on_error(err);
}
};

template<rpp::constraint::observer TObserver, rpp::constraint::observable TFallbackObservable, rpp::details::disposables::constraint::disposable_container Container, rpp::schedulers::constraint::scheduler TScheduler>
struct timeout_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

std::shared_ptr<timeout_disposable<TObserver, TFallbackObservable, Container>> disposable;

void set_upstream(const rpp::disposable_wrapper& d) const
{
disposable->add(d);
}

bool is_disposed() const
{
return disposable->is_disposed();
}

template<typename T>
void on_next(T&& v) const
{
auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
obs_with_timeout->observer.on_next(std::forward<T>(v));
obs_with_timeout->timeout = rpp::schedulers::utils::get_worker_t<TScheduler>::now() + disposable->get_period();
}

void on_error(const std::exception_ptr& err) const noexcept
{
const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
if (disposable->is_disposed())
return;

disposable->dispose();
obs_with_timeout->observer.on_error(err);
}

void on_completed() const noexcept
{
const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
if (disposable->is_disposed())
return;

disposable->dispose();
obs_with_timeout->observer.on_completed();
}
};

template<rpp::constraint::observable TFallbackObservable, rpp::schedulers::constraint::scheduler TScheduler>
struct timeout_t
{
template<rpp::constraint::decayed_type T>
struct operator_traits
{
static_assert(rpp::constraint::observable_of_type<TFallbackObservable, T>, "TFallbackObservable should be the same type as T");

using result_type = T;
};

rpp::schedulers::duration period;
RPP_NO_UNIQUE_ADDRESS TFallbackObservable fallback;
RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;

template<rpp::constraint::decayed_type Type, rpp::details::observables::constraint::disposable_strategy DisposableStrategy, rpp::constraint::observer Observer>
auto lift_with_disposable_strategy(Observer&& observer) const
{
using worker_t = rpp::schedulers::utils::get_worker_t<TScheduler>;
using container = typename DisposableStrategy::template add<worker_t::is_none_disposable ? 0 : 1>::disposable_container;

const auto timeout = rpp::schedulers::utils::get_worker_t<TScheduler>::now() + period;

const auto disposable = disposable_wrapper_impl<timeout_disposable<std::decay_t<Observer>, TFallbackObservable, container>>::make(std::forward<Observer>(observer), period, fallback, timeout);
auto ptr = disposable.lock();
ptr->get_observer_with_timeout_under_lock()->observer.set_upstream(disposable.as_weak());

const auto worker = scheduler.create_worker();
if constexpr (!rpp::schedulers::utils::get_worker_t<TScheduler>::is_none_disposable)
{
if (auto d = worker.get_disposable(); !d.is_disposed())
disposable.add(std::move(d));
}

using wrapper = timeout_disposable_wrapper<std::decay_t<Observer>, TFallbackObservable, container>;
worker.schedule(
timeout,
[](wrapper& handler) -> rpp::schedulers::optional_delay_to {
auto locked_obs_with_timeout = handler.disposable->get_observer_with_timeout_under_lock();
if (rpp::schedulers::utils::get_worker_t<TScheduler>::now() < locked_obs_with_timeout->timeout)
return rpp::schedulers::delay_to(locked_obs_with_timeout->timeout);

if (!handler.disposable->is_disposed())
{
handler.disposable->dispose();
handler.disposable->get_fallback().subscribe(std::move(locked_obs_with_timeout->observer));
}
return std::nullopt;
},
wrapper{ptr});

return rpp::observer<Type, timeout_observer_strategy<std::decay_t<Observer>, TFallbackObservable, container, TScheduler>>{std::move(ptr)};
}
};

template<rpp::schedulers::constraint::scheduler TScheduler>
struct timeout_with_error_t
{
template<rpp::constraint::decayed_type T>
struct operator_traits
{
using result_type = T;
};

rpp::schedulers::duration period;
RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;

template<rpp::constraint::decayed_type Type, rpp::details::observables::constraint::disposable_strategy DisposableStrategy, rpp::constraint::observer Observer>
auto lift_with_disposable_strategy(Observer&& observer) const
{
return timeout_t<rpp::error_observable<int>, TScheduler>{period, rpp::source::error<rpp::utils::extract_observer_type_t<Observer>>(std::make_exception_ptr(rpp::utils::timeout_reached{"Timeout reached"})), scheduler}
.template lift_with_disposable_strategy<Type, DisposableStrategy>(std::forward<Observer>(observer));
}
};
} // namespace rpp::operators::details

namespace rpp::operators
{
/**
* @brief Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission)
*
* @marble timeout_fallback_obs
{
source observable : +--1-2-3-4--------5-|
operator "timeout(4, -10-|)" : +--1-2-3-4----10-|
}
*
* @param period is maximum duration between emitted items before a timeout occurs
* @param fallback_observable is observable to subscribe on when timeout reached
* @param scheduler is scheduler used to run timer for timeout
* @warning #include <rpp/operators/timeout.hpp>
*
* @par Example
* @snippet timeout.cpp fallback_observable
*
* @ingroup utility_operators
* @see https://reactivex.io/documentation/operators/timeout.html
*/
template<rpp::constraint::observable TFallbackObservable, rpp::schedulers::constraint::scheduler TScheduler>
auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable, const TScheduler& scheduler)
{
return details::timeout_t<std::decay_t<TFallbackObservable>, TScheduler>{period, std::forward<TFallbackObservable>(fallback_observable), scheduler};
}

/**
* @brief Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission)
*
* @marble timeout_default
{
source observable : +--1-2-3-4------5-|
operator "timeout(4)" : +--1-2-3-4----#
}
* @param period is maximum duration between emitted items before a timeout occurs
* @param scheduler is scheduler used to run timer for timeout
* @warning #include <rpp/operators/timeout.hpp>
*
* @par Example
* @snippet timeout.cpp default
*
* @ingroup utility_operators
* @see https://reactivex.io/documentation/operators/timeout.html
*/
template<rpp::schedulers::constraint::scheduler TScheduler>
auto timeout(rpp::schedulers::duration period, const TScheduler& scheduler)
{
return details::timeout_with_error_t<TScheduler>{period, scheduler};
}
} // namespace rpp::operators
5 changes: 5 additions & 0 deletions src/rpp/rpp/utils/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,9 @@ namespace rpp::utils
{
using std::runtime_error::runtime_error;
};

struct timeout_reached : public std::runtime_error
{
using std::runtime_error::runtime_error;
};
} // namespace rpp::utils
Loading