Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/Implementation Status.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@
- [x] do_on_error
- [x] do_on_completed
- [x] timeout
- [x] timeout
- [x] timeout with fallback observable

### Connectable

Expand Down
81 changes: 53 additions & 28 deletions src/examples/rpp/doxygen/timeout.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,61 @@
**/
int main()
{
//! [timeout]
rpp::subjects::publish_subject<int> subj{};
subj.get_observable()
.timeout(std::chrono::milliseconds{450}, rpp::schedulers::new_thread{})
.subscribe([](int v) { std::cout << "new value " << v << std::endl; },
[](std::exception_ptr err)
{
try
{
std::rethrow_exception(err);
}
catch (const std::exception& exc)
{
//! [timeout]
rpp::subjects::publish_subject<int> subj{};
subj.get_observable()
.timeout(std::chrono::milliseconds{450}, rpp::schedulers::new_thread{})
.subscribe([](int v) { std::cout << "new value " << v << std::endl; },
[](std::exception_ptr err)
{
std::cout << "ERR: " << exc.what() << std::endl;
}
},
[]() { std::cout << "completed" << std::endl; });
for (int i = 0; i < 10; ++i)
try
{
std::rethrow_exception(err);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ performance-unnecessary-value-param ⚠️
parameter err is passed by value and only copied once; consider moving it to avoid unnecessary copies

Suggested change
std::rethrow_exception(err);
std::rethrow_exception(std::move(err);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ performance-unnecessary-value-param ⚠️
parameter err is passed by value and only copied once; consider moving it to avoid unnecessary copies

Suggested change
std::rethrow_exception(err);
std::rethrow_exception(err));

}
catch (const std::exception& exc)
{
std::cout << "ERR: " << exc.what() << std::endl;
}
},
[]() { std::cout << "completed" << std::endl; });
for (int i = 0; i < 10; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds{i * 100});
subj.get_subscriber().on_next(i);
}

// Output:
// new value 0
// new value 1
// new value 2
// new value 3
// new value 4
// ERR : Timeout reached
//! [timeout]
}
{
std::this_thread::sleep_for(std::chrono::milliseconds{i * 100});
subj.get_subscriber().on_next(i);
//! [timeout_fallback_obs]
rpp::subjects::publish_subject<int> subj{};
subj.get_observable()
.timeout(std::chrono::milliseconds{450}, rpp::source::just(100), rpp::schedulers::new_thread{})
.subscribe([](int v) { std::cout << "new value " << v << std::endl; },
[]() { std::cout << "completed" << std::endl; });
for (int i = 0; i < 10; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds{i * 100});
subj.get_subscriber().on_next(i);
}

// Output:
//new value 0
//new value 1
//new value 2
//new value 3
//new value 4
//new value 100
//completed
//! [timeout_fallback_obs]
}

// Output:
// new value 0
// new value 1
// new value 2
// new value 3
// new value 4
// ERR : Timeout reached
//! [timeout]
return 0;
}
43 changes: 39 additions & 4 deletions src/rpp/rpp/operators/fwd/timeout.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
#pragma once

#include <rpp/schedulers/constraints.hpp>

#include <rpp/sources/fwd.hpp>
#include <rpp/observables/constraints.hpp>
#include <rpp/observables/details/member_overload.hpp>

namespace rpp::details
Expand All @@ -21,12 +22,44 @@ struct timeout_tag;

namespace rpp::details
{
template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
template<constraint::decayed_type Type, constraint::observable_of_type<Type> FallbackObs, schedulers::constraint::scheduler TScheduler>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no template named observable_of_type in namespace rpp::constraint; did you mean observer_of_type?

Suggested change
template<constraint::decayed_type Type, constraint::observable_of_type<Type> FallbackObs, schedulers::constraint::scheduler TScheduler>
template<constraint::decayed_type Type, constraint::observer_of_type<Type> FallbackObs, schedulers::constraint::scheduler TScheduler>

struct timeout_impl;

template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, timeout_tag>
{
/**
* \brief Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission)
*
* \marble timeout_fallback_obs
{
source observable : +--1-2-3-4----- ---5-|
operator "timeout(4, -10-|)" : +--1-2-3-4----10-|
}
* \param period is maximum duration between emitted items before a timeout occurs
* \param fallback_obs is observable to subscribe on when timeout reached
* \param scheduler is scheduler used to run timer for timeout
* \return new specific_observable with the timeout operator as most recent operator.
* \warning #include <rpp/operators/timeout.hpp>
*
* \par Example
* \snippet timeout.cpp timeout_fallback_obs
*
* \ingroup utility_operators
* \see https://reactivex.io/documentation/operators/timeout.html
*/
template<constraint::observable_of_type<Type> FallbackObs, schedulers::constraint::scheduler TScheduler>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no template named observable_of_type in namespace rpp::constraint; did you mean observer_of_type?

Suggested change
template<constraint::observable_of_type<Type> FallbackObs, schedulers::constraint::scheduler TScheduler>
template<constraint::observer_of_type<Type> FallbackObs, schedulers::constraint::scheduler TScheduler>

auto timeout(schedulers::duration period, FallbackObs&& fallback_obs, const TScheduler& scheduler = TScheduler{}) const & requires is_header_included<timeout_tag, FallbackObs, TScheduler>
{
return static_cast<const SpecificObservable*>(this)->template lift<Type>(timeout_impl<Type, std::decay_t<FallbackObs>, TScheduler>{period, std::forward<FallbackObs>(fallback_obs), scheduler});
}

template<constraint::observable_of_type<Type> FallbackObs,schedulers::constraint::scheduler TScheduler>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no template named observable_of_type in namespace rpp::constraint; did you mean observer_of_type?

Suggested change
template<constraint::observable_of_type<Type> FallbackObs,schedulers::constraint::scheduler TScheduler>
template<constraint::observer_of_type<Type> FallbackObs,schedulers::constraint::scheduler TScheduler>

auto timeout(schedulers::duration period, FallbackObs&& fallback_obs, const TScheduler& scheduler = TScheduler{}) && requires is_header_included<timeout_tag, FallbackObs, TScheduler>
{
return std::move(*static_cast<SpecificObservable*>(this)).template lift<Type>(timeout_impl<Type, std::decay_t<FallbackObs>, TScheduler>{period, std::forward<FallbackObs>(fallback_obs), scheduler});
}

/**
* \brief Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission)
*
Expand All @@ -49,13 +82,15 @@ struct member_overload<Type, SpecificObservable, timeout_tag>
template<schedulers::constraint::scheduler TScheduler>
auto timeout(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) const & requires is_header_included<timeout_tag, TScheduler>
{
return static_cast<const SpecificObservable*>(this)->template lift<Type>(timeout_impl<Type, TScheduler>{period, scheduler});
return timeout(period, rpp::source::error<Type>(std::make_exception_ptr(utils::timeout{"Timeout reached"})), scheduler);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no member named timeout in namespace rpp::utils

}

template<schedulers::constraint::scheduler TScheduler>
auto timeout(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) && requires is_header_included<timeout_tag, TScheduler>
{
return std::move(*static_cast<SpecificObservable*>(this)).template lift<Type>(timeout_impl<Type, TScheduler>{period, scheduler});
return std::move(*static_cast<SpecificObservable*>(this)).timeout(period, rpp::source::error<Type>(std::make_exception_ptr(utils::timeout{"Timeout reached"})), scheduler);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no member named timeout in namespace rpp::utils

}


};
} // namespace rpp::details
29 changes: 18 additions & 11 deletions src/rpp/rpp/operators/timeout.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <rpp/operators/fwd/timeout.hpp>
#include <rpp/subscribers/constraints.hpp>
#include <rpp/utils/exceptions.hpp>
#include <rpp/sources/error.hpp>

#include <rpp/utils/spinlock.hpp>

Expand All @@ -26,47 +27,53 @@ IMPLEMENTATION_FILE(timeout_tag);

namespace rpp::details
{
template<constraint::observable FallbackObs>
struct timeout_state : early_unsubscribe_state
{
using early_unsubscribe_state::early_unsubscribe_state;
timeout_state(const FallbackObs& fallback_obs, const composite_subscription& subscription_of_subscriber)
: early_unsubscribe_state(subscription_of_subscriber)
, fallback_obs{fallback_obs} {}

FallbackObs fallback_obs;
std::atomic<schedulers::time_point> last_emission_time{};

static constexpr schedulers::time_point s_timeout_reached = schedulers::time_point::min();
};

template<typename Worker>
template<constraint::observable FallbackObs, typename Worker>
struct timeout_on_next
{
template<typename Value>
void operator()(Value&& v, const auto& subscriber, const std::shared_ptr<timeout_state>& state) const
void operator()(Value&& v, const auto& subscriber, const std::shared_ptr<timeout_state<FallbackObs>>& state) const
{
if (state->last_emission_time.exchange(Worker::now(), std::memory_order_acq_rel) != timeout_state::s_timeout_reached)
if (state->last_emission_time.exchange(Worker::now(), std::memory_order_acq_rel) != timeout_state<FallbackObs>::s_timeout_reached)
subscriber.on_next(std::forward<Value>(v));
}
};

using timeout_on_error = early_unsubscribe_on_error;
using timeout_on_completed = early_unsubscribe_on_completed;

struct timeout_state_with_serialized_spinlock : timeout_state
template<constraint::observable FallbackObs>
struct timeout_state_with_serialized_spinlock : timeout_state<FallbackObs>
{
using timeout_state::timeout_state;
using timeout_state<FallbackObs>::timeout_state;

// spinlock because most part of time there is only one thread would be active
utils::spinlock spinlock{};
};

template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
template<constraint::decayed_type Type, constraint::observable_of_type<Type> FallbackObs, schedulers::constraint::scheduler TScheduler>
struct timeout_impl
{
schedulers::duration period;
FallbackObs fallback_obs;
TScheduler scheduler;

template<constraint::subscriber_of_type<Type> TSub>
auto operator()(TSub&& in_subscriber) const
{
auto state = std::make_shared<timeout_state_with_serialized_spinlock>(in_subscriber.get_subscription());
auto state = std::make_shared<timeout_state_with_serialized_spinlock<FallbackObs>>(fallback_obs, in_subscriber.get_subscription());
// change subscriber to serialized to avoid manual using of mutex
auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber),
std::shared_ptr<utils::spinlock>{state, &state->spinlock});
Expand All @@ -83,7 +90,7 @@ struct timeout_impl
// last emission time still same value -> timeout reached, else -> prev_emission_time
// would be update to actual emission time
if (state->last_emission_time.compare_exchange_strong(prev_emission_time,
timeout_state::s_timeout_reached,
timeout_state<FallbackObs>::s_timeout_reached,
std::memory_order_acq_rel))
return time_is_out(state, subscriber);

Expand All @@ -101,7 +108,7 @@ struct timeout_impl
});

return create_subscriber_with_state<Type>(state->children_subscriptions,
timeout_on_next<decltype(worker)>{},
timeout_on_next<FallbackObs, decltype(worker)>{},
timeout_on_error{},
timeout_on_completed{},
std::move(subscriber),
Expand All @@ -112,7 +119,7 @@ struct timeout_impl
static schedulers::optional_duration time_is_out(const auto& state, const auto& subscriber)
{
state->children_subscriptions.unsubscribe();
subscriber.on_error(std::make_exception_ptr(utils::timeout{"Timeout reached"}));
state->fallback_obs.subscribe(subscriber);
return std::nullopt;
}
};
Expand Down
20 changes: 20 additions & 0 deletions src/tests/rpp/test_timeout.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <rpp/sources/interval.hpp>
#include <rpp/sources/never.hpp>
#include <rpp/sources/just.hpp>
#include <rpp/operators/timeout.hpp>
#include <rpp/operators/concat.hpp>
#include <rpp/operators/take.hpp>
Expand Down Expand Up @@ -112,3 +113,22 @@ SCENARIO("timeout sends error only on timeout", "[operators][timeout]")
}
}
}

SCENARIO("timeout subscribes on provided observable on timeout", "[operators][timeout]")
{
auto mock = mock_observer<int>{};
GIVEN("never observable")
{
auto obs = rpp::source::never<int>();
WHEN("subscribe on it via timeout with fallback obs")
{
obs.timeout(std::chrono::microseconds{10}, rpp::source::just(100), rpp::schedulers::immediate{}).subscribe(mock);
THEN("subuscribe sees vales from fallback observabble")
{
CHECK(mock.get_received_values() == std::vector<int>{100});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}
}