diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index 16ce51216..a938dbeca 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -71,6 +71,8 @@ - [ ] ignore_elements - [x] last - [ ] sample + - [ ] sample (observable) + - [x] sample_with_time - [x] skip - [ ] skip_last - [x] take_last diff --git a/src/examples/doxygen/sample.cpp b/src/examples/doxygen/sample.cpp new file mode 100644 index 000000000..289756087 --- /dev/null +++ b/src/examples/doxygen/sample.cpp @@ -0,0 +1,17 @@ +#include +#include + +/** + * \example sample.cpp + **/ +int main() +{ + //! [sample_with_time] + rpp::source::interval(std::chrono::milliseconds{100}, rpp::schedulers::trampoline{}) + .sample_with_time(std::chrono::milliseconds{300}, rpp::schedulers::trampoline{}) + .take(5) + .subscribe([](int v) { std::cout << v << " "; }); + // Output: 1 4 7 10 13 + //! [sample_with_time] + return 0; +} diff --git a/src/rpp/rpp/observables/interface_observable.hpp b/src/rpp/rpp/observables/interface_observable.hpp index cfd4614f3..e7bcfbfef 100644 --- a/src/rpp/rpp/observables/interface_observable.hpp +++ b/src/rpp/rpp/observables/interface_observable.hpp @@ -55,6 +55,7 @@ struct RPP_EMPTY_BASES interface_observable , details::member_overload , details::member_overload , details::member_overload + , details::member_overload , details::member_overload , details::member_overload , details::member_overload diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index b74fa237e..1ddc4f268 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index ebf2b8d5f..8075ec5ef 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -84,16 +84,14 @@ struct delay_impl template TSub> auto operator()(TSub&& subscriber) const { - // convert it to dynamic due to expected amount of copies == amount of items - auto dynamic_subscriber = std::forward(subscriber).as_dynamic(); - auto worker = scheduler.create_worker(dynamic_subscriber.get_subscription()); + auto worker = scheduler.create_worker(subscriber.get_subscription()); - auto subscription = dynamic_subscriber.get_subscription().make_child(); + auto subscription = subscriber.get_subscription().make_child(); return create_subscriber_with_state(std::move(subscription), delay_on_next{delay}, delay_on_error{}, delay_on_completed{delay}, - std::move(dynamic_subscriber), + std::forward(subscriber), std::move(worker)); } }; diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 32d637338..4dd8e1c3f 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/fwd/sample.hpp b/src/rpp/rpp/operators/fwd/sample.hpp new file mode 100644 index 000000000..48bdf94c0 --- /dev/null +++ b/src/rpp/rpp/operators/fwd/sample.hpp @@ -0,0 +1,63 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include +#include + +namespace rpp::details +{ +struct sample_tag; +} + +namespace rpp::details +{ +template +struct sample_with_time_impl; + +template +struct member_overload +{ + /** + * \brief Emit most recent emitted from original observable emission obtained during last period of time. + * \details Emit item immediately in case of completion of the original observable + * + * \marble sample_with_time + { + source observable : +--1---2-3-4---5-6-7-| + operator "sample_with_time(2)" : +--1---2---4---5---7-| + } + * + * \param period sampling period + * \scheduler scheduler to use to schedule emissions with provided sampling period + * \return new specific_observable with the sample_with_time operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet sample.cpp sample_with_time + * + * \ingroup filtering_operators + * \see https://reactivex.io/documentation/operators/sample.htmlhttps://reactivex.io/documentation/operators/sample.html + */ + template + auto sample_with_time(schedulers::duration period, const TScheduler& scheduler) const & requires is_header_included + { + return static_cast(this)->template lift(sample_with_time_impl{period, scheduler}); + } + + template + auto sample_with_time(schedulers::duration period, const TScheduler& scheduler) && requires is_header_included + { + return std::move(*static_cast(this)).template lift(sample_with_time_impl{period, scheduler}); + } +}; +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/sample.hpp b/src/rpp/rpp/operators/sample.hpp new file mode 100644 index 000000000..e6888986a --- /dev/null +++ b/src/rpp/rpp/operators/sample.hpp @@ -0,0 +1,106 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +IMPLEMENTATION_FILE(sample_tag); + +namespace rpp::details +{ +template +struct sample_state : early_unsubscribe_state +{ + using early_unsubscribe_state::early_unsubscribe_state; + + std::mutex value_mutex{}; + std::optional value{}; +}; + +template +struct sample_state_with_serialized_spinlock : sample_state +{ + using sample_state::sample_state; + + utils::spinlock spinlock{}; +}; + +struct sample_on_next +{ + template + void operator()(Value&& value, const auto&, const std::shared_ptr>>& state) const + { + std::lock_guard lock{state->value_mutex}; + state->value.emplace(std::forward(value)); + } +}; + +using sample_on_error = early_unsubscribe_on_error; + +struct sample_on_completed +{ + void operator()(const auto& subscriber, const auto& state) const + { + state->children_subscriptions.unsubscribe(); + + { + std::lock_guard lock{state->value_mutex}; + if (state->value.has_value()) + subscriber.on_next(std::move(state->value.value())); + } + subscriber.on_completed(); + } +}; + +template +struct sample_with_time_impl +{ + schedulers::duration period; + TScheduler scheduler; + + template TSub> + auto operator()(TSub&& in_subscriber) const + { + auto state = std::make_shared>(in_subscriber.get_subscription()); + // change subscriber to serialized to avoid manual using of mutex + auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), + std::shared_ptr{state, &state->spinlock}); + + scheduler.create_worker(state->children_subscriptions) + .schedule(period, + [period = period, subscriber = subscriber, state]() -> rpp::schedulers::optional_duration + { + std::optional extracted{}; + { + std::lock_guard lock{state->value_mutex}; + std::swap(extracted, state->value); + } + if (extracted.has_value()) + subscriber.on_next(std::move(extracted.value())); + return period; + }); + + return create_subscriber_with_state(state->children_subscriptions, + sample_on_next{}, + sample_on_error{}, + sample_on_completed{}, + std::move(subscriber), + std::move(state)); + } +}; +} // namespace rpp::details diff --git a/src/rpp/rpp/utils/spinlock.hpp b/src/rpp/rpp/utils/spinlock.hpp index f94876e8c..298ba533e 100644 --- a/src/rpp/rpp/utils/spinlock.hpp +++ b/src/rpp/rpp/utils/spinlock.hpp @@ -8,7 +8,9 @@ // Project home: https://github.com/victimsnino/ReactivePlusPlus #pragma once + #include +#include namespace rpp::utils { @@ -19,9 +21,16 @@ class spinlock void lock() { - while(m_lock_flag.exchange(true, std::memory_order_acquire)) + while (m_lock_flag.exchange(true, std::memory_order_acquire)) { - while(m_lock_flag.load(std::memory_order_relaxed)){}; + for (uint8_t i = 0; m_lock_flag.load(std::memory_order_relaxed); ++i) + { + if (i == 30u) + { + std::this_thread::yield(); + i = 0; + } + } } } diff --git a/src/tests/test_interval.cpp b/src/tests/test_interval.cpp index 32976681f..53c5b3e04 100644 --- a/src/tests/test_interval.cpp +++ b/src/tests/test_interval.cpp @@ -72,8 +72,7 @@ SCENARIO("interval emit values with provided interval", "[interval]") initial_time + 3*interval}); CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval, initial_time + 2*interval, - initial_time + 3*interval, - initial_time + 4*interval}); + initial_time + 3*interval}); } } } @@ -113,8 +112,7 @@ SCENARIO("interval emit values with provided interval", "[interval]") { CHECK(scheduler.get_schedulings() == std::vector{ initial_time + initial_delay, initial_time + initial_delay + interval, - initial_time + initial_delay + 2 * interval, - initial_time + initial_delay + 3 * interval }); + initial_time + initial_delay + 2 * interval }); CHECK(scheduler.get_executions() == std::vector{ initial_time + initial_delay, initial_time + initial_delay + interval, initial_time + initial_delay + 2 * interval }); diff --git a/src/tests/test_sample.cpp b/src/tests/test_sample.cpp new file mode 100644 index 000000000..8221ff5eb --- /dev/null +++ b/src/tests/test_sample.cpp @@ -0,0 +1,212 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#include "mock_observer.hpp" +#include "test_scheduler.hpp" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +SCENARIO("sample throttles emissions", "[operators][sample]") +{ + auto mock = mock_observer{}; + auto interval_scheduler = test_scheduler{}; + auto sample_scheduler = test_scheduler{}; + auto start_time = test_scheduler::worker_strategy::now(); + GIVEN("interval observable") + { + constexpr rpp::schedulers::duration interval_duration = std::chrono::milliseconds{1}; + auto obs = rpp::source::interval(interval_duration, interval_scheduler); + WHEN("subscribe on it via sample with period == interval period") + { + auto sample_period =interval_duration; + auto sub = obs.take(3).sample_with_time(sample_period, sample_scheduler).subscribe(mock); + while(sub.is_subscribed()) + { + interval_scheduler.time_advance(sample_period); + sample_scheduler.time_advance(rpp::schedulers::duration{}); // empty due to actually test_schedulers use same global time, but each of them drains different queues + } + + THEN("emissions happens as soon as item appears") + { + CHECK(mock.get_received_values() == std::vector{0, 1, 2}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + + CHECK(interval_scheduler.get_schedulings() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration}); + CHECK(sample_scheduler.get_schedulings() == + std::vector{start_time + 1 * sample_period, + start_time + 2 * sample_period, + start_time + 3 * sample_period}); + + CHECK(interval_scheduler.get_executions() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration}); + // last item arrived immediately with on_completed BEFORE actually this scheduler does any action + CHECK(sample_scheduler.get_executions() == std::vector{start_time + sample_period, start_time + 2 * sample_period}); + } + } + WHEN("subscribe on it via sample with period <= interval period") + { + auto sample_period = rpp::schedulers::duration{interval_duration.count() / 2}; + std::vector item_sent{}; + auto sub = obs.take(3).sample_with_time(sample_period, sample_scheduler).tap([&](const auto&) + { + item_sent.push_back(test_scheduler::worker_strategy::now()); + }).subscribe(mock); + while (sub.is_subscribed()) + { + interval_scheduler.time_advance(sample_period); + sample_scheduler.time_advance(rpp::schedulers::duration{}); // empty due to actually test_schedulers use same global time, but each of them drains different queues + } + + THEN("emissions happens as soon as item appears") + { + CHECK(mock.get_received_values() == std::vector{0, 1, 2}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + + CHECK(interval_scheduler.get_schedulings() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration}); + CHECK(sample_scheduler.get_schedulings() == + std::vector{start_time + 1 * sample_period, + start_time + 2 * sample_period, + start_time + 3 * sample_period, + start_time + 4 * sample_period, + start_time + 5 * sample_period, + start_time + 6 * sample_period}); + + CHECK(interval_scheduler.get_executions() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration}); + // last item arrived immediately with on_completed BEFORE actually this + // scheduler does any action + CHECK(sample_scheduler.get_executions() == + std::vector{start_time + 1 * sample_period, + start_time + 2 * sample_period, + start_time + 3 * sample_period, + start_time + 4 * sample_period, + start_time + 5 * sample_period}); + CHECK(item_sent == + std::vector{start_time + 2 * sample_period, + start_time + 4 * sample_period, + start_time + 6 * sample_period}); + } + } + WHEN("subscribe on it via sample with period >= interval period") + { + auto sample_period = rpp::schedulers::duration{interval_duration.count() * 2}; + std::vector item_sent{}; + auto sub = obs.take(4).sample_with_time(sample_period, sample_scheduler).tap([&](const auto&) + { + item_sent.push_back(test_scheduler::worker_strategy::now()); + }).subscribe(mock); + while (sub.is_subscribed()) + { + interval_scheduler.time_advance(interval_duration); + sample_scheduler.time_advance(rpp::schedulers::duration{}); // empty due to actually test_schedulers use same global time, but each of them drains different queues + } + + THEN("emissions happens as soon as item appears") + { + CHECK(mock.get_received_values() == std::vector{1, 3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + + CHECK(interval_scheduler.get_schedulings() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration, + start_time + 4 * interval_duration}); + CHECK(sample_scheduler.get_schedulings() == + std::vector{start_time + 1 * sample_period, + start_time + 2 * sample_period}); + + CHECK(interval_scheduler.get_executions() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration, + start_time + 4 * interval_duration}); + // last item arrived immediately with on_completed BEFORE actually this + // scheduler does any action + CHECK(sample_scheduler.get_executions() == + std::vector{start_time + 1 * sample_period}); + CHECK(item_sent == + std::vector{start_time + 1 * sample_period, + start_time + 2 * sample_period}); + } + } + } +} + + +SCENARIO("sample sends on_completed immediately", "[operators][sample]") +{ + auto mock = mock_observer{}; + GIVEN("observable of items") + { + auto obs = rpp::source::just(1,2,3); + WHEN("subscribe on it via sample with infinite like duration") + { + auto duration = std::chrono::days{10}; + auto begin = rpp::schedulers::clock_type::now(); + auto sub = obs.sample_with_time(duration, test_scheduler{}).subscribe(mock); + auto end = rpp::schedulers::clock_type::now(); + THEN("subscriber obtains only item from completed") + { + CHECK(end-begin < duration); + CHECK(!sub.is_subscribed()); + CHECK(mock.get_received_values() == std::vector{3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } +} + +SCENARIO("sample sends on_error immediately", "[operators][sample]") +{ + auto mock = mock_observer{}; + GIVEN("observable of items") + { + auto obs = rpp::source::just(rpp::source::just(1).as_dynamic(), + rpp::source::just(1).as_dynamic(), + rpp::source::error(std::exception_ptr{}).as_dynamic()).concat(); + WHEN("subscribe on it via sample with infinite like duration") + { + auto duration = std::chrono::days{10}; + auto begin = rpp::schedulers::clock_type::now(); + auto sub = obs.sample_with_time(duration, test_scheduler{}).subscribe(mock); + auto end = rpp::schedulers::clock_type::now(); + THEN("subscriber obtains only on_error") + { + CHECK(end-begin < duration); + CHECK(!sub.is_subscribed()); + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} \ No newline at end of file diff --git a/src/tests/test_scheduler.hpp b/src/tests/test_scheduler.hpp index 6adbaec86..8909744e8 100644 --- a/src/tests/test_scheduler.hpp +++ b/src/tests/test_scheduler.hpp @@ -23,6 +23,9 @@ class test_scheduler final : public rpp::schedulers::details::scheduler_tag void schedule(rpp::schedulers::time_point time_point, rpp::schedulers::constraint::schedulable_fn auto&& fn) { + if (!sub.is_subscribed()) + return; + schedulings.push_back(time_point); queue.emplace(time_point, static_cast(rpp::schedulers::clock_type::now().time_since_epoch().count()),