From d224db37b909bed6d2252988e41feb68edc454f5 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 25 Sep 2022 21:57:38 +0300 Subject: [PATCH 1/8] Sample prototype --- src/examples/doxygen/sample.cpp | 0 src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/fwd.hpp | 1 + src/rpp/rpp/operators/fwd/sample.hpp | 58 ++++++++++++++ src/rpp/rpp/operators/sample.hpp | 109 +++++++++++++++++++++++++++ src/rpp/rpp/utils/spinlock.hpp | 11 ++- src/tests/test_sample.cpp | 0 7 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 src/examples/doxygen/sample.cpp create mode 100644 src/rpp/rpp/operators/fwd/sample.hpp create mode 100644 src/rpp/rpp/operators/sample.hpp create mode 100644 src/tests/test_sample.cpp diff --git a/src/examples/doxygen/sample.cpp b/src/examples/doxygen/sample.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index b74fa237e..1ddc4f268 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 32d637338..4dd8e1c3f 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/fwd/sample.hpp b/src/rpp/rpp/operators/fwd/sample.hpp new file mode 100644 index 000000000..e975c7c92 --- /dev/null +++ b/src/rpp/rpp/operators/fwd/sample.hpp @@ -0,0 +1,58 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include + +namespace rpp::details +{ +struct sample_tag; +} + +namespace rpp::details +{ +template +struct sample_impl; + +template +struct member_overload +{ + /** + * \brief + * + * \marble sample + { + + } + * + * \return new specific_observable with the sample operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet sample.cpp sample + * + * \ingroup + * \see + */ + template + auto sample(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) const & requires is_header_included + { + return static_cast(this)->template lift(sample_impl{period, scheduler}); + } + + template + auto sample(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) && requires is_header_included + { + return std::move(*static_cast(this)).template lift(sample_impl{period, scheduler}); + } +}; +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/sample.hpp b/src/rpp/rpp/operators/sample.hpp new file mode 100644 index 000000000..a3cb96e91 --- /dev/null +++ b/src/rpp/rpp/operators/sample.hpp @@ -0,0 +1,109 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include +#include + +#include +#include + +#include +#include + +IMPLEMENTATION_FILE(sample_tag); + +namespace rpp::details +{ +template +struct sample_state : early_unsubscribe_state +{ + using early_unsubscribe_state::early_unsubscribe_state; + + std::mutex value_mutex{}; + std::optional value{}; +}; + +template +struct sample_state_with_serialized_spinlock : sample_state +{ + using sample_state::sample_state; + + utils::spinlock spinlock{}; +}; + +struct sample_on_next +{ + template + void operator()(Value&& value, + const auto& , + const std::shared_ptr>>& state) const + { + std::lock_guard lock{state->value_mutex}; + state->value.emplace(std::forward(value)); + } +}; + +using sample_on_error = early_unsubscribe_on_error; + +struct sample_on_completed +{ + template + void operator()(const auto& subscriber, const std::shared_ptr>& state) const + { + state->children_subscriptions.unsubscribe(); + + std::lock_guard lock{state->value_mutex}; + if (state->value.has_value()) + subscriber.on_next(std::move(state->value.value())); + } +}; + +template +struct sample_impl +{ + schedulers::duration period; + TScheduler scheduler; + + template TSub> + auto operator()(TSub&& in_subscriber) const + { + auto state = std::make_shared(in_subscriber.get_subscription()); + // change subscriber to serialized to avoid manual using of mutex + auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), + std::shared_ptr{state, &state->spinlock}); + + scheduler.create_worker(state->children_subscriptions).schedule(period, + [period=period, subscriber=subscriber, state]() + { + std::optional extracted{}; + { + std::lock_guard lock + {state->value_mutex}; + std::swap(extracted, state->value); + } + if (extracted.has_value()) + subscriber. + on_next(std::move(extracted. + value())); + return period; + }); + + return create_subscriber_with_state(state->children_subscriptions, + sample_on_next{}, + sample_on_error{}, + sample_on_completed{}, + std::move(subscriber), + std::move(state)); + } +}; +} // namespace rpp::details diff --git a/src/rpp/rpp/utils/spinlock.hpp b/src/rpp/rpp/utils/spinlock.hpp index f94876e8c..bc8022361 100644 --- a/src/rpp/rpp/utils/spinlock.hpp +++ b/src/rpp/rpp/utils/spinlock.hpp @@ -19,9 +19,16 @@ class spinlock void lock() { - while(m_lock_flag.exchange(true, std::memory_order_acquire)) + while (m_lock_flag.exchange(true, std::memory_order_acquire)) { - while(m_lock_flag.load(std::memory_order_relaxed)){}; + for (uint8_t i = 0; m_lock_flag.load(std::memory_order_relaxed); ++i) + { + if (i == 30u) + { + std::this_thread::yield(); + i = 0; + } + } } } diff --git a/src/tests/test_sample.cpp b/src/tests/test_sample.cpp new file mode 100644 index 000000000..e69de29bb From 4725698e3b6da54a59331cb6b39b2dbb15af49e1 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 26 Sep 2022 22:23:20 +0300 Subject: [PATCH 2/8] simplify delay --- src/rpp/rpp/operators/delay.hpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index ebf2b8d5f..8075ec5ef 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -84,16 +84,14 @@ struct delay_impl template TSub> auto operator()(TSub&& subscriber) const { - // convert it to dynamic due to expected amount of copies == amount of items - auto dynamic_subscriber = std::forward(subscriber).as_dynamic(); - auto worker = scheduler.create_worker(dynamic_subscriber.get_subscription()); + auto worker = scheduler.create_worker(subscriber.get_subscription()); - auto subscription = dynamic_subscriber.get_subscription().make_child(); + auto subscription = subscriber.get_subscription().make_child(); return create_subscriber_with_state(std::move(subscription), delay_on_next{delay}, delay_on_error{}, delay_on_completed{delay}, - std::move(dynamic_subscriber), + std::forward(subscriber), std::move(worker)); } }; From d89324d18087f25355670cc709c2bdef6017f40e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 26 Sep 2022 22:23:29 +0300 Subject: [PATCH 3/8] start tests for sample --- src/rpp/rpp/operators/sample.hpp | 38 +++++++++++++++----------------- src/tests/test_sample.cpp | 24 ++++++++++++++++++++ 2 files changed, 42 insertions(+), 20 deletions(-) diff --git a/src/rpp/rpp/operators/sample.hpp b/src/rpp/rpp/operators/sample.hpp index a3cb96e91..cc8310a42 100644 --- a/src/rpp/rpp/operators/sample.hpp +++ b/src/rpp/rpp/operators/sample.hpp @@ -10,12 +10,11 @@ #pragma once -#include -#include -#include - #include #include +#include +#include +#include #include #include @@ -77,26 +76,25 @@ struct sample_impl template TSub> auto operator()(TSub&& in_subscriber) const { - auto state = std::make_shared(in_subscriber.get_subscription()); + auto state = + std::make_shared(in_subscriber.get_subscription()); // change subscriber to serialized to avoid manual using of mutex auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->spinlock}); - scheduler.create_worker(state->children_subscriptions).schedule(period, - [period=period, subscriber=subscriber, state]() - { - std::optional extracted{}; - { - std::lock_guard lock - {state->value_mutex}; - std::swap(extracted, state->value); - } - if (extracted.has_value()) - subscriber. - on_next(std::move(extracted. - value())); - return period; - }); + scheduler.create_worker(state->children_subscriptions) + .schedule(period, + [period = period, subscriber = subscriber, state]() + { + std::optional extracted{}; + { + std::lock_guard lock{state->value_mutex}; + std::swap(extracted, state->value); + } + if (extracted.has_value()) + subscriber.on_next(std::move(extracted.value())); + return period; + }); return create_subscriber_with_state(state->children_subscriptions, sample_on_next{}, diff --git a/src/tests/test_sample.cpp b/src/tests/test_sample.cpp index e69de29bb..74eb2a348 100644 --- a/src/tests/test_sample.cpp +++ b/src/tests/test_sample.cpp @@ -0,0 +1,24 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#include "mock_observer.hpp" +#include "test_scheduler.hpp" + +#include + +#include + +SCENARIO("sample throttles emissions", "[operators][sample]") +{ + auto mock = mock_observer{}; + GIVEN("observable") + { + auto obs = rpp::source::interval(std::chrono::milliseconds{1}, test_scheduler{}); + } +} \ No newline at end of file From fdb5793c4571719dd7a03828826a3f199dca946c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 10 Oct 2022 22:33:44 +0300 Subject: [PATCH 4/8] tests + fixes --- .../rpp/observables/interface_observable.hpp | 1 + src/rpp/rpp/operators/sample.hpp | 21 +- src/tests/test_sample.cpp | 192 +++++++++++++++++- src/tests/test_scheduler.hpp | 3 + 4 files changed, 204 insertions(+), 13 deletions(-) diff --git a/src/rpp/rpp/observables/interface_observable.hpp b/src/rpp/rpp/observables/interface_observable.hpp index cfd4614f3..e7bcfbfef 100644 --- a/src/rpp/rpp/observables/interface_observable.hpp +++ b/src/rpp/rpp/observables/interface_observable.hpp @@ -55,6 +55,7 @@ struct RPP_EMPTY_BASES interface_observable , details::member_overload , details::member_overload , details::member_overload + , details::member_overload , details::member_overload , details::member_overload , details::member_overload diff --git a/src/rpp/rpp/operators/sample.hpp b/src/rpp/rpp/operators/sample.hpp index cc8310a42..9a6176e22 100644 --- a/src/rpp/rpp/operators/sample.hpp +++ b/src/rpp/rpp/operators/sample.hpp @@ -43,9 +43,7 @@ struct sample_state_with_serialized_spinlock : sample_state struct sample_on_next { template - void operator()(Value&& value, - const auto& , - const std::shared_ptr>>& state) const + void operator()(Value&& value, const auto&, const std::shared_ptr>>& state) const { std::lock_guard lock{state->value_mutex}; state->value.emplace(std::forward(value)); @@ -56,14 +54,16 @@ using sample_on_error = early_unsubscribe_on_error; struct sample_on_completed { - template - void operator()(const auto& subscriber, const std::shared_ptr>& state) const + void operator()(const auto& subscriber, const auto& state) const { state->children_subscriptions.unsubscribe(); - std::lock_guard lock{state->value_mutex}; - if (state->value.has_value()) - subscriber.on_next(std::move(state->value.value())); + { + std::lock_guard lock{state->value_mutex}; + if (state->value.has_value()) + subscriber.on_next(std::move(state->value.value())); + } + subscriber.on_completed(); } }; @@ -76,15 +76,14 @@ struct sample_impl template TSub> auto operator()(TSub&& in_subscriber) const { - auto state = - std::make_shared(in_subscriber.get_subscription()); + auto state = std::make_shared>(in_subscriber.get_subscription()); // change subscriber to serialized to avoid manual using of mutex auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->spinlock}); scheduler.create_worker(state->children_subscriptions) .schedule(period, - [period = period, subscriber = subscriber, state]() + [period = period, subscriber = subscriber, state]() -> rpp::schedulers::optional_duration { std::optional extracted{}; { diff --git a/src/tests/test_sample.cpp b/src/tests/test_sample.cpp index 74eb2a348..89540beff 100644 --- a/src/tests/test_sample.cpp +++ b/src/tests/test_sample.cpp @@ -11,14 +11,202 @@ #include "test_scheduler.hpp" #include +#include +#include +#include +#include +#include +#include +#include #include SCENARIO("sample throttles emissions", "[operators][sample]") +{ + auto mock = mock_observer{}; + auto interval_scheduler = test_scheduler{}; + auto sample_scheduler = test_scheduler{}; + auto start_time = test_scheduler::worker_strategy::now(); + GIVEN("interval observable") + { + constexpr rpp::schedulers::duration interval_duration = std::chrono::milliseconds{1}; + auto obs = rpp::source::interval(interval_duration, interval_scheduler); + WHEN("subscribe on it via sample with period == interval period") + { + auto sample_period =interval_duration; + auto sub = obs.take(3).sample(sample_period, sample_scheduler).subscribe(mock); + while(sub.is_subscribed()) + { + interval_scheduler.time_advance(sample_period); + sample_scheduler.time_advance(rpp::schedulers::duration{}); // empty due to actually test_schedulers use same global time, but each of them drains different queues + } + + THEN("emissions happens as soon as item appears") + { + CHECK(mock.get_received_values() == std::vector{0, 1, 2}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + + CHECK(interval_scheduler.get_schedulings() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration}); + CHECK(sample_scheduler.get_schedulings() == + std::vector{start_time + 1 * sample_period, + start_time + 2 * sample_period, + start_time + 3 * sample_period}); + + CHECK(interval_scheduler.get_executions() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration}); + // last item arrived immediately with on_completed BEFORE actually this scheduler does any action + CHECK(sample_scheduler.get_executions() == std::vector{start_time + sample_period, start_time + 2 * sample_period}); + } + } + WHEN("subscribe on it via sample with period <= interval period") + { + auto sample_period = rpp::schedulers::duration{interval_duration.count() / 2}; + std::vector item_sent{}; + auto sub = obs.take(3).sample(sample_period, sample_scheduler).tap([&](const auto&) + { + item_sent.push_back(test_scheduler::worker_strategy::now()); + }).subscribe(mock); + while (sub.is_subscribed()) + { + interval_scheduler.time_advance(sample_period); + sample_scheduler.time_advance(rpp::schedulers::duration{}); // empty due to actually test_schedulers use same global time, but each of them drains different queues + } + + THEN("emissions happens as soon as item appears") + { + CHECK(mock.get_received_values() == std::vector{0, 1, 2}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + + CHECK(interval_scheduler.get_schedulings() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration}); + CHECK(sample_scheduler.get_schedulings() == + std::vector{start_time + 1 * sample_period, + start_time + 2 * sample_period, + start_time + 3 * sample_period, + start_time + 4 * sample_period, + start_time + 5 * sample_period, + start_time + 6 * sample_period}); + + CHECK(interval_scheduler.get_executions() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration}); + // last item arrived immediately with on_completed BEFORE actually this + // scheduler does any action + CHECK(sample_scheduler.get_executions() == + std::vector{start_time + 1 * sample_period, + start_time + 2 * sample_period, + start_time + 3 * sample_period, + start_time + 4 * sample_period, + start_time + 5 * sample_period}); + CHECK(item_sent == + std::vector{start_time + 2 * sample_period, + start_time + 4 * sample_period, + start_time + 6 * sample_period}); + } + } + WHEN("subscribe on it via sample with period >= interval period") + { + auto sample_period = rpp::schedulers::duration{interval_duration.count() * 2}; + std::vector item_sent{}; + auto sub = obs.take(4).sample(sample_period, sample_scheduler).tap([&](const auto&) + { + item_sent.push_back(test_scheduler::worker_strategy::now()); + }).subscribe(mock); + while (sub.is_subscribed()) + { + interval_scheduler.time_advance(interval_duration); + sample_scheduler.time_advance(rpp::schedulers::duration{}); // empty due to actually test_schedulers use same global time, but each of them drains different queues + } + + THEN("emissions happens as soon as item appears") + { + CHECK(mock.get_received_values() == std::vector{1, 3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + + CHECK(interval_scheduler.get_schedulings() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration, + start_time + 4 * interval_duration}); + CHECK(sample_scheduler.get_schedulings() == + std::vector{start_time + 1 * sample_period, + start_time + 2 * sample_period}); + + CHECK(interval_scheduler.get_executions() == + std::vector{start_time + 1 * interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration, + start_time + 4 * interval_duration}); + // last item arrived immediately with on_completed BEFORE actually this + // scheduler does any action + CHECK(sample_scheduler.get_executions() == + std::vector{start_time + 1 * sample_period}); + CHECK(item_sent == + std::vector{start_time + 1 * sample_period, + start_time + 2 * sample_period}); + } + } + } +} + + +SCENARIO("sample sends on_completed immediately", "[operators][sample]") +{ + auto mock = mock_observer{}; + GIVEN("observable of items") + { + auto obs = rpp::source::just(1,2,3); + WHEN("subscribe on it via sample with infinite like duration") + { + auto duration = std::chrono::days{10}; + auto begin = rpp::schedulers::clock_type::now(); + auto sub = obs.sample(duration, test_scheduler{}).subscribe(mock); + auto end = rpp::schedulers::clock_type::now(); + THEN("subscriber obtains only item from completed") + { + CHECK(end-begin < duration); + CHECK(!sub.is_subscribed()); + CHECK(mock.get_received_values() == std::vector{3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } +} + +SCENARIO("sample sends on_error immediately", "[operators][sample]") { auto mock = mock_observer{}; - GIVEN("observable") + GIVEN("observable of items") { - auto obs = rpp::source::interval(std::chrono::milliseconds{1}, test_scheduler{}); + auto obs = rpp::source::just(rpp::source::just(1).as_dynamic(), + rpp::source::just(1).as_dynamic(), + rpp::source::error(std::exception_ptr{}).as_dynamic()).concat(); + WHEN("subscribe on it via sample with infinite like duration") + { + auto duration = std::chrono::days{10}; + auto begin = rpp::schedulers::clock_type::now(); + auto sub = obs.sample(duration, test_scheduler{}).subscribe(mock); + auto end = rpp::schedulers::clock_type::now(); + THEN("subscriber obtains only on_error") + { + CHECK(end-begin < duration); + CHECK(!sub.is_subscribed()); + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } } } \ No newline at end of file diff --git a/src/tests/test_scheduler.hpp b/src/tests/test_scheduler.hpp index 6adbaec86..8909744e8 100644 --- a/src/tests/test_scheduler.hpp +++ b/src/tests/test_scheduler.hpp @@ -23,6 +23,9 @@ class test_scheduler final : public rpp::schedulers::details::scheduler_tag void schedule(rpp::schedulers::time_point time_point, rpp::schedulers::constraint::schedulable_fn auto&& fn) { + if (!sub.is_subscribed()) + return; + schedulings.push_back(time_point); queue.emplace(time_point, static_cast(rpp::schedulers::clock_type::now().time_since_epoch().count()), From 35185c5b838001d84a5e5466afea970ba0b99df2 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 11 Oct 2022 22:38:11 +0300 Subject: [PATCH 5/8] Add docs --- src/examples/doxygen/sample.cpp | 17 ++++++++++++++ src/rpp/rpp/operators/fwd/sample.hpp | 33 ++++++++++++++++------------ src/rpp/rpp/operators/sample.hpp | 2 +- src/tests/test_sample.cpp | 10 ++++----- 4 files changed, 42 insertions(+), 20 deletions(-) diff --git a/src/examples/doxygen/sample.cpp b/src/examples/doxygen/sample.cpp index e69de29bb..289756087 100644 --- a/src/examples/doxygen/sample.cpp +++ b/src/examples/doxygen/sample.cpp @@ -0,0 +1,17 @@ +#include +#include + +/** + * \example sample.cpp + **/ +int main() +{ + //! [sample_with_time] + rpp::source::interval(std::chrono::milliseconds{100}, rpp::schedulers::trampoline{}) + .sample_with_time(std::chrono::milliseconds{300}, rpp::schedulers::trampoline{}) + .take(5) + .subscribe([](int v) { std::cout << v << " "; }); + // Output: 1 4 7 10 13 + //! [sample_with_time] + return 0; +} diff --git a/src/rpp/rpp/operators/fwd/sample.hpp b/src/rpp/rpp/operators/fwd/sample.hpp index e975c7c92..48bdf94c0 100644 --- a/src/rpp/rpp/operators/fwd/sample.hpp +++ b/src/rpp/rpp/operators/fwd/sample.hpp @@ -12,6 +12,7 @@ #include #include +#include namespace rpp::details { @@ -21,38 +22,42 @@ struct sample_tag; namespace rpp::details { template -struct sample_impl; +struct sample_with_time_impl; template struct member_overload { /** - * \brief + * \brief Emit most recent emitted from original observable emission obtained during last period of time. + * \details Emit item immediately in case of completion of the original observable * - * \marble sample + * \marble sample_with_time { - + source observable : +--1---2-3-4---5-6-7-| + operator "sample_with_time(2)" : +--1---2---4---5---7-| } * - * \return new specific_observable with the sample operator as most recent operator. + * \param period sampling period + * \scheduler scheduler to use to schedule emissions with provided sampling period + * \return new specific_observable with the sample_with_time operator as most recent operator. * \warning #include * * \par Example - * \snippet sample.cpp sample + * \snippet sample.cpp sample_with_time * - * \ingroup - * \see + * \ingroup filtering_operators + * \see https://reactivex.io/documentation/operators/sample.htmlhttps://reactivex.io/documentation/operators/sample.html */ - template - auto sample(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) const & requires is_header_included + template + auto sample_with_time(schedulers::duration period, const TScheduler& scheduler) const & requires is_header_included { - return static_cast(this)->template lift(sample_impl{period, scheduler}); + return static_cast(this)->template lift(sample_with_time_impl{period, scheduler}); } - template - auto sample(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) && requires is_header_included + template + auto sample_with_time(schedulers::duration period, const TScheduler& scheduler) && requires is_header_included { - return std::move(*static_cast(this)).template lift(sample_impl{period, scheduler}); + return std::move(*static_cast(this)).template lift(sample_with_time_impl{period, scheduler}); } }; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/sample.hpp b/src/rpp/rpp/operators/sample.hpp index 9a6176e22..e6888986a 100644 --- a/src/rpp/rpp/operators/sample.hpp +++ b/src/rpp/rpp/operators/sample.hpp @@ -68,7 +68,7 @@ struct sample_on_completed }; template -struct sample_impl +struct sample_with_time_impl { schedulers::duration period; TScheduler scheduler; diff --git a/src/tests/test_sample.cpp b/src/tests/test_sample.cpp index 89540beff..8221ff5eb 100644 --- a/src/tests/test_sample.cpp +++ b/src/tests/test_sample.cpp @@ -34,7 +34,7 @@ SCENARIO("sample throttles emissions", "[operators][sample]") WHEN("subscribe on it via sample with period == interval period") { auto sample_period =interval_duration; - auto sub = obs.take(3).sample(sample_period, sample_scheduler).subscribe(mock); + auto sub = obs.take(3).sample_with_time(sample_period, sample_scheduler).subscribe(mock); while(sub.is_subscribed()) { interval_scheduler.time_advance(sample_period); @@ -68,7 +68,7 @@ SCENARIO("sample throttles emissions", "[operators][sample]") { auto sample_period = rpp::schedulers::duration{interval_duration.count() / 2}; std::vector item_sent{}; - auto sub = obs.take(3).sample(sample_period, sample_scheduler).tap([&](const auto&) + auto sub = obs.take(3).sample_with_time(sample_period, sample_scheduler).tap([&](const auto&) { item_sent.push_back(test_scheduler::worker_strategy::now()); }).subscribe(mock); @@ -118,7 +118,7 @@ SCENARIO("sample throttles emissions", "[operators][sample]") { auto sample_period = rpp::schedulers::duration{interval_duration.count() * 2}; std::vector item_sent{}; - auto sub = obs.take(4).sample(sample_period, sample_scheduler).tap([&](const auto&) + auto sub = obs.take(4).sample_with_time(sample_period, sample_scheduler).tap([&](const auto&) { item_sent.push_back(test_scheduler::worker_strategy::now()); }).subscribe(mock); @@ -171,7 +171,7 @@ SCENARIO("sample sends on_completed immediately", "[operators][sample]") { auto duration = std::chrono::days{10}; auto begin = rpp::schedulers::clock_type::now(); - auto sub = obs.sample(duration, test_scheduler{}).subscribe(mock); + auto sub = obs.sample_with_time(duration, test_scheduler{}).subscribe(mock); auto end = rpp::schedulers::clock_type::now(); THEN("subscriber obtains only item from completed") { @@ -197,7 +197,7 @@ SCENARIO("sample sends on_error immediately", "[operators][sample]") { auto duration = std::chrono::days{10}; auto begin = rpp::schedulers::clock_type::now(); - auto sub = obs.sample(duration, test_scheduler{}).subscribe(mock); + auto sub = obs.sample_with_time(duration, test_scheduler{}).subscribe(mock); auto end = rpp::schedulers::clock_type::now(); THEN("subscriber obtains only on_error") { From 1d25553762d40a3dde1c0523b89c9639f7d76b8c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 11 Oct 2022 22:38:40 +0300 Subject: [PATCH 6/8] status --- docs/Implementation Status.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index 16ce51216..a938dbeca 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -71,6 +71,8 @@ - [ ] ignore_elements - [x] last - [ ] sample + - [ ] sample (observable) + - [x] sample_with_time - [x] skip - [ ] skip_last - [x] take_last From e9c503f257bb65e9a79b9fb290a5de8a430e79af Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 11 Oct 2022 23:46:25 +0300 Subject: [PATCH 7/8] fix interval tests --- src/tests/test_interval.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/tests/test_interval.cpp b/src/tests/test_interval.cpp index 32976681f..53c5b3e04 100644 --- a/src/tests/test_interval.cpp +++ b/src/tests/test_interval.cpp @@ -72,8 +72,7 @@ SCENARIO("interval emit values with provided interval", "[interval]") initial_time + 3*interval}); CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval, initial_time + 2*interval, - initial_time + 3*interval, - initial_time + 4*interval}); + initial_time + 3*interval}); } } } @@ -113,8 +112,7 @@ SCENARIO("interval emit values with provided interval", "[interval]") { CHECK(scheduler.get_schedulings() == std::vector{ initial_time + initial_delay, initial_time + initial_delay + interval, - initial_time + initial_delay + 2 * interval, - initial_time + initial_delay + 3 * interval }); + initial_time + initial_delay + 2 * interval }); CHECK(scheduler.get_executions() == std::vector{ initial_time + initial_delay, initial_time + initial_delay + interval, initial_time + initial_delay + 2 * interval }); From 9f9b3e5661ab1776a113542e151f31f6ccda2d80 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 12 Oct 2022 00:00:51 +0300 Subject: [PATCH 8/8] Update spinlock.hpp --- src/rpp/rpp/utils/spinlock.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/rpp/rpp/utils/spinlock.hpp b/src/rpp/rpp/utils/spinlock.hpp index bc8022361..298ba533e 100644 --- a/src/rpp/rpp/utils/spinlock.hpp +++ b/src/rpp/rpp/utils/spinlock.hpp @@ -8,7 +8,9 @@ // Project home: https://github.com/victimsnino/ReactivePlusPlus #pragma once + #include +#include namespace rpp::utils {