From b8a03d7f63c8ccfe9a81447d540fb6645181d6b2 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 19 Dec 2022 22:53:39 +0300 Subject: [PATCH 1/8] Basse implementation for reduce --- src/examples/rpp/doxygen/reduce.cpp | 0 .../rpp/observables/interface_observable.hpp | 1 + src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/fwd.hpp | 1 + src/rpp/rpp/operators/fwd/reduce.hpp | 65 ++++++++ src/rpp/rpp/operators/reduce.hpp | 75 ++++++++++ src/rpp/rpp/operators/scan.hpp | 16 +- src/tests/rpp/test_reduce.cpp | 139 ++++++++++++++++++ 8 files changed, 287 insertions(+), 11 deletions(-) create mode 100644 src/examples/rpp/doxygen/reduce.cpp create mode 100644 src/rpp/rpp/operators/fwd/reduce.hpp create mode 100644 src/rpp/rpp/operators/reduce.hpp create mode 100644 src/tests/rpp/test_reduce.cpp diff --git a/src/examples/rpp/doxygen/reduce.cpp b/src/examples/rpp/doxygen/reduce.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/src/rpp/rpp/observables/interface_observable.hpp b/src/rpp/rpp/observables/interface_observable.hpp index 1cebbeb72..bd5ec0dff 100644 --- a/src/rpp/rpp/observables/interface_observable.hpp +++ b/src/rpp/rpp/observables/interface_observable.hpp @@ -54,6 +54,7 @@ struct RPP_EMPTY_BASES interface_observable , details::member_overload , details::member_overload , details::member_overload + , details::member_overload , details::member_overload , details::member_overload , details::member_overload diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index da700d740..1479add18 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -80,6 +80,7 @@ */ #include +#include /** * \defgroup utility_operators Utility Operators diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 0c72e24bb..556bd6d55 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/fwd/reduce.hpp b/src/rpp/rpp/operators/fwd/reduce.hpp new file mode 100644 index 000000000..7c506e4ab --- /dev/null +++ b/src/rpp/rpp/operators/fwd/reduce.hpp @@ -0,0 +1,65 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +namespace rpp::details +{ +struct reduce_tag; +} + +namespace rpp::details +{ +// accepts Result and Type and returns Result +template +concept reduce_accumulator = std::is_invocable_r_v, Fn, std::decay_t, std::decay_t>; + +template AccumulatorFn> +struct reduce_impl; + +template +struct member_overload +{ + /** + * \brief Apply accumulator function to each emission from observable and result of accumulator from previous step and emit final value + * + * \marble reduce + { + source observable : +--1-2-3-| + operator "reduce: s=1, (s,x)=>s+x" : +--------7| + } + * + * \param initial_value initial value for seed which will be applied for first value from observable. Then it will be replaced with result and etc. + * \param accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference. + * + * \return new specific_observable with the reduce operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet reduce.cpp reduce + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/reduce.html + */ + template AccumulatorFn> + auto reduce(Result&& initial_value, AccumulatorFn&& accumulator) const & requires is_header_included + { + return static_cast(this)->template lift>(reduce_impl, std::decay_t>{std::forward(initial_value), std::forward(accumulator)}); + } + + template AccumulatorFn> + auto reduce(Result&& initial_value, AccumulatorFn&& accumulator) && requires is_header_included + { + return std::move(*static_cast(this)).template lift>(reduce_impl, std::decay_t>{std::forward(initial_value), std::forward(accumulator)}); + } +}; +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp new file mode 100644 index 000000000..8c49ea723 --- /dev/null +++ b/src/rpp/rpp/operators/reduce.hpp @@ -0,0 +1,75 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include // RPP_NO_UNIQUE_ADDRESS +#include // required due to operator uses lift +#include // create_subscriber_with_state +#include // own forwarding +#include // constraint::subscriber +#include // forwarding_on_error +#include // utils::as_const + + +IMPLEMENTATION_FILE(reduce_tag); + +namespace rpp::details +{ +template +struct reduce_state +{ + mutable Result seed; + RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator; +}; + +struct reduce_on_next +{ + template + void operator()(auto&& value, + const constraint::subscriber auto&, + const reduce_state& state) const + { + state.seed = state.accumulator(std::move(state.seed), std::forward(value)); + } +}; + +struct reduce_on_completed +{ + template + void operator()(const constraint::subscriber auto& sub, + const reduce_state& state) const + { + sub.on_next(std::move(state.seed)); + sub.on_completed(); + } +}; + +template AccumulatorFn> +struct reduce_impl +{ + Result initial_value; + RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator; + + template TSub> + auto operator()(TSub&& subscriber) const + { + auto subscription = subscriber.get_subscription(); + // dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state + return create_subscriber_with_dynamic_state(std::move(subscription), + reduce_on_next{}, + utils::forwarding_on_error{}, + reduce_on_completed{}, + std::forward(subscriber), + reduce_state{initial_value, accumulator}); + } +}; +} // namespace rpp::details + diff --git a/src/rpp/rpp/operators/scan.hpp b/src/rpp/rpp/operators/scan.hpp index a9d8e25e2..e24d6b325 100644 --- a/src/rpp/rpp/operators/scan.hpp +++ b/src/rpp/rpp/operators/scan.hpp @@ -14,6 +14,7 @@ #include // required due to operator uses lift #include // create_subscriber_with_state #include // own forwarding +#include // reduce to re-use #include // constraint::subscriber #include // forwarding_on_error #include // utils::as_const @@ -23,21 +24,14 @@ IMPLEMENTATION_FILE(scan_tag); namespace rpp::details { -template -struct scan_state -{ - mutable Result seed; - RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator; -}; - -struct scan_on_next +struct scan_on_next : private reduce_on_next { template void operator()(auto&& value, const constraint::subscriber auto& sub, - const scan_state& state) const + const reduce_state& state) const { - state.seed = state.accumulator(std::move(state.seed), std::forward(value)); + reduce_on_next::operator()(std::forward(value), sub, state); sub.on_next(utils::as_const(state.seed)); } }; @@ -58,7 +52,7 @@ struct scan_impl utils::forwarding_on_error{}, utils::forwarding_on_completed{}, std::forward(subscriber), - scan_state{initial_value, accumulator}); + reduce_state{initial_value, accumulator}); } }; } // namespace rpp::details diff --git a/src/tests/rpp/test_reduce.cpp b/src/tests/rpp/test_reduce.cpp new file mode 100644 index 000000000..cbfa124fc --- /dev/null +++ b/src/tests/rpp/test_reduce.cpp @@ -0,0 +1,139 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#include "copy_count_tracker.hpp" +#include "mock_observer.hpp" + +#include + +#include + +#include + +SCENARIO("reduce reduces values and store state", "[reduce]") +{ + GIVEN("observable") + { + auto obs = rpp::observable::just(1, 2, 3); + WHEN("subscribe on it via reduce with plus") + { + auto mock = mock_observer{}; + + obs.reduce(0, std::plus{}).subscribe(mock); + THEN("observer obtains final sum") + { + CHECK(mock.get_received_values() == std::vector{6}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + WHEN("subscribe on it via reduce with aggregating in vector") + { + auto mock = mock_observer>{}; + + obs.reduce(std::vector{}, + [](std::vector&& seed, int new_val) + { + seed.push_back(new_val); + return std::move(seed); + }).subscribe(mock); + + THEN("observer obtains final vector") + { + CHECK(mock.get_received_values() == std::vector{{std::vector{1,2,3}}}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + WHEN("subscribe on it via reduce with exception") + { + auto mock = mock_observer{}; + + volatile bool none{}; + obs.reduce(0, + [&](int, int)-> int + { + if (none) + return 0; + throw std::runtime_error{""}; + }).subscribe(mock); + + THEN("observer obtains only on_error") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } + GIVEN("observable 1-2->") + { + auto obs = rpp::observable::create([](const auto& sub) + { + sub.on_next(1); + sub.on_next(2); + }); + WHEN("subscribe on it via reduce with plus") + { + auto mock = mock_observer{}; + + obs.reduce(0, std::plus{}).subscribe(mock); + THEN("no any values") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} + +SCENARIO("reduce keeps state for copies", "[reduce]") +{ + auto mock = mock_observer{}; + GIVEN("observable which sends values via copy") + { + auto obs = rpp::source::create([](const auto& sub) + { + for (size_t i = 0; i < 10; ++i) + { + auto copy = sub; + copy.on_next(1); + } + sub.on_completed(); + }); + WHEN("subscribe on it via reduce") + { + obs.reduce(int{}, [](int seed, int new_v) { return seed + new_v; }).subscribe(mock); + THEN("observer obtains only last value") + { + CHECK(mock.get_received_values() == std::vector{ 10 }); + } + } + } +} + +SCENARIO("reduce doesn't produce extra copies", "[reduce][track_copy]") +{ + GIVEN("observable and subscriber") + { + copy_count_tracker verifier{}; + auto obs = rpp::source::just(1).reduce(verifier, [](copy_count_tracker&& seed, int) { return std::move(seed); }); + WHEN("subscribe") + { + obs.subscribe([](const auto&) {}); + THEN("no extra copies") + { + REQUIRE(verifier.get_copy_count() == 2); // 1 copy to reduce state + 1 copy for provided subscriber to shared_state + REQUIRE(verifier.get_move_count() == 5); + // 1 move to observable state + 1 move to subscriber + 1 move from lambda + 1 move to new_state + 1 move to final lambda + } + } + } +} From 27b40ce43d0ee337f187198b40179b513fb647a9 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 19 Dec 2022 22:54:31 +0300 Subject: [PATCH 2/8] Examples --- src/examples/rpp/doxygen/reduce.cpp | 35 ++++++++++++++++++++++++++++ src/rpp/rpp/operators/fwd/reduce.hpp | 1 + 2 files changed, 36 insertions(+) diff --git a/src/examples/rpp/doxygen/reduce.cpp b/src/examples/rpp/doxygen/reduce.cpp index e69de29bb..b5933a59d 100644 --- a/src/examples/rpp/doxygen/reduce.cpp +++ b/src/examples/rpp/doxygen/reduce.cpp @@ -0,0 +1,35 @@ +#include + +#include + +/** + * \example reduce.cpp + **/ + +int main() +{ + //! [reduce] + rpp::source::just(1,2,3) + .reduce(0, std::plus{}) + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 6 + //! [reduce] + + //! [reduce_vector] + rpp::source::just(1,2,3) + .reduce(std::vector{}, [](std::vector&& seed, int new_value) + { + seed.push_back(new_value); + return std::move(seed); + }) + .subscribe([](const std::vector& v) + { + std::cout << "vector: "; + for(int val : v) + std::cout << val << " "; + std::cout << std::endl; + }); + // Output: vector: 1 2 3 + //! [reduce_vector] + return 0; +} diff --git a/src/rpp/rpp/operators/fwd/reduce.hpp b/src/rpp/rpp/operators/fwd/reduce.hpp index 7c506e4ab..5511e793b 100644 --- a/src/rpp/rpp/operators/fwd/reduce.hpp +++ b/src/rpp/rpp/operators/fwd/reduce.hpp @@ -46,6 +46,7 @@ struct member_overload * * \par Example * \snippet reduce.cpp reduce + * \snippet reduce.cpp reduce_vector * * \ingroup transforming_operators * \see https://reactivex.io/documentation/operators/reduce.html From a4a77dfaa7bc2592b343bf22de9dc7b580ce6fdd Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 19 Dec 2022 22:54:49 +0300 Subject: [PATCH 3/8] DOcs --- docs/Implementation Status.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index d96a6f08f..5cf04894e 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -111,7 +111,7 @@ - [ ] count - [ ] max - [ ] min -- [ ] reduce +- [x] reduce - [ ] sum ### Backpressure From 4f1f9e9a980bc814858200e56eaa8f1abe6e7f2b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 19 Dec 2022 23:46:41 +0300 Subject: [PATCH 4/8] add averag --- docs/Implementation Status.md | 2 +- src/examples/rpp/doxygen/reduce.cpp | 7 +++ src/rpp/rpp/operators/fwd/reduce.hpp | 64 ++++++++++++++++++++++++---- src/rpp/rpp/operators/fwd/scan.hpp | 2 +- src/rpp/rpp/operators/reduce.hpp | 43 +++++++++++++------ src/tests/rpp/test_reduce.cpp | 32 ++++++++++++++ 6 files changed, 128 insertions(+), 22 deletions(-) diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index 5cf04894e..457735ded 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -106,7 +106,7 @@ ### Aggregate -- [ ] average +- [x] average - [x] concat - [ ] count - [ ] max diff --git a/src/examples/rpp/doxygen/reduce.cpp b/src/examples/rpp/doxygen/reduce.cpp index b5933a59d..54bf015c2 100644 --- a/src/examples/rpp/doxygen/reduce.cpp +++ b/src/examples/rpp/doxygen/reduce.cpp @@ -31,5 +31,12 @@ int main() }); // Output: vector: 1 2 3 //! [reduce_vector] + + //! [average] + rpp::source::just(1,2,3) + .average() + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 2 + //! [average] return 0; } diff --git a/src/rpp/rpp/operators/fwd/reduce.hpp b/src/rpp/rpp/operators/fwd/reduce.hpp index 5511e793b..086c5e103 100644 --- a/src/rpp/rpp/operators/fwd/reduce.hpp +++ b/src/rpp/rpp/operators/fwd/reduce.hpp @@ -23,9 +23,19 @@ namespace rpp::details template concept reduce_accumulator = std::is_invocable_r_v, Fn, std::decay_t, std::decay_t>; -template AccumulatorFn> +template +concept is_can_be_averaged = requires(T t, CastBeforeDrop nt) +{ + { t + t } -> std::convertible_to; + { nt / size_t{} }; +}; + +template AccumulatorFn, std::invocable ResultSelectorFn> struct reduce_impl; +template +auto average_impl(TObs&& observable); + template struct member_overload { @@ -38,7 +48,7 @@ struct member_overload operator "reduce: s=1, (s,x)=>s+x" : +--------7| } * - * \param initial_value initial value for seed which will be applied for first value from observable. Then it will be replaced with result and etc. + * \param initial_seed initial value for seed which will be applied for first value from observable. Then it will be replaced with result and etc. * \param accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference. * * \return new specific_observable with the reduce operator as most recent operator. @@ -51,16 +61,54 @@ struct member_overload * \ingroup transforming_operators * \see https://reactivex.io/documentation/operators/reduce.html */ - template AccumulatorFn> - auto reduce(Result&& initial_value, AccumulatorFn&& accumulator) const & requires is_header_included + template AccumulatorFn, std::invocable ResultSelectorFn = std::identity> + auto reduce(Seed&& initial_seed, AccumulatorFn&& accumulator, ResultSelectorFn&& result_selector = {}) const & requires is_header_included + { + return static_cast(this)->template lift>>( + reduce_impl, std::decay_t, std::decay_t>{ + std::forward(initial_seed), + std::forward(accumulator), + std::forward(result_selector)}); + } + + template AccumulatorFn, std::invocable ResultSelectorFn = std::identity> + auto reduce(Seed&& initial_seed, AccumulatorFn&& accumulator, ResultSelectorFn&& result_selector = {}) && requires is_header_included + { + return std::move(*static_cast(this)).template lift>>( + reduce_impl, std::decay_t, std::decay_t>{ + std::forward(initial_seed), + std::forward(accumulator), + std::forward(result_selector)}); + } + + /** + * \brief Calculated the average of emissions and emit final value + * + * \marble reduce + { + source observable : +--1-2-3-| + operator "average" : +--------2| + } + * + * \return new specific_observable with the average operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet reduce.cpp average + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/average.html + */ + template + auto average() const & requires (is_header_included && is_can_be_averaged) { - return static_cast(this)->template lift>(reduce_impl, std::decay_t>{std::forward(initial_value), std::forward(accumulator)}); + return average_impl(*static_cast(this)); } - template AccumulatorFn> - auto reduce(Result&& initial_value, AccumulatorFn&& accumulator) && requires is_header_included + template + auto average() && requires (is_header_included && is_can_be_averaged) { - return std::move(*static_cast(this)).template lift>(reduce_impl, std::decay_t>{std::forward(initial_value), std::forward(accumulator)}); + return average_impl(std::move(*static_cast(this))); } }; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd/scan.hpp b/src/rpp/rpp/operators/fwd/scan.hpp index 270b38600..c6fff707e 100644 --- a/src/rpp/rpp/operators/fwd/scan.hpp +++ b/src/rpp/rpp/operators/fwd/scan.hpp @@ -21,7 +21,7 @@ namespace rpp::details { // accepts Result and Type and returns Result template -concept scan_accumulator = std::is_invocable_r_v, Fn, std::decay_t, std::decay_t>; +concept scan_accumulator = reduce_accumulator; template AccumulatorFn> struct scan_impl; diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp index 8c49ea723..c127d5b09 100644 --- a/src/rpp/rpp/operators/reduce.hpp +++ b/src/rpp/rpp/operators/reduce.hpp @@ -23,19 +23,20 @@ IMPLEMENTATION_FILE(reduce_tag); namespace rpp::details { -template +template SelectorFn = std::identity> struct reduce_state { - mutable Result seed; + mutable Seed seed; RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator; + RPP_NO_UNIQUE_ADDRESS SelectorFn selector; }; struct reduce_on_next { - template + template void operator()(auto&& value, const constraint::subscriber auto&, - const reduce_state& state) const + const reduce_state& state) const { state.seed = state.accumulator(std::move(state.seed), std::forward(value)); } @@ -43,22 +44,23 @@ struct reduce_on_next struct reduce_on_completed { - template + template void operator()(const constraint::subscriber auto& sub, - const reduce_state& state) const + const reduce_state& state) const { - sub.on_next(std::move(state.seed)); + sub.on_next(state.selector(std::move(state.seed))); sub.on_completed(); } }; -template AccumulatorFn> +template AccumulatorFn, std::invocable ResultSelectorFn> struct reduce_impl { - Result initial_value; - RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator; + Seed initial_value; + RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator; + RPP_NO_UNIQUE_ADDRESS ResultSelectorFn selector; - template TSub> + template> TSub> auto operator()(TSub&& subscriber) const { auto subscription = subscriber.get_subscription(); @@ -68,8 +70,25 @@ struct reduce_impl utils::forwarding_on_error{}, reduce_on_completed{}, std::forward(subscriber), - reduce_state{initial_value, accumulator}); + reduce_state{initial_value, accumulator, selector}); } }; + +template +auto average_impl(TObs&& observable) +{ + using Type = utils::extract_observable_type_t>; + return std::forward(observable).reduce(std::pair{}, + [](std::pair&& seed, auto&& val) + { + seed.first += std::forward(val); + ++seed.second; + return std::move(seed); + }, + [](std::pair seed) + { + return static_cast(seed.first) / seed.second; + }); +} } // namespace rpp::details diff --git a/src/tests/rpp/test_reduce.cpp b/src/tests/rpp/test_reduce.cpp index cbfa124fc..d3f4b530c 100644 --- a/src/tests/rpp/test_reduce.cpp +++ b/src/tests/rpp/test_reduce.cpp @@ -94,6 +94,38 @@ SCENARIO("reduce reduces values and store state", "[reduce]") } } +SCENARIO("average calculates average", "[reduce]") +{ + GIVEN("observable") + { + auto obs = rpp::source::just(1,2); + WHEN("subscribe on it via average") + { + auto mock = mock_observer{}; + auto r= obs.average().subscribe(mock); + + THEN("observer obtained value as average int") + { + CHECK(mock.get_received_values() == std::vector{static_cast(1+2)/2}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + WHEN("subscribe on it via average") + { + auto mock = mock_observer{}; + obs.average().subscribe(mock); + + THEN("observer obtained value as average double") + { + CHECK(mock.get_received_values() == std::vector{1.5}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } +} + SCENARIO("reduce keeps state for copies", "[reduce]") { auto mock = mock_observer{}; From 1e448618912fa279b69318b98958595050d6bacc Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 20 Dec 2022 12:03:42 +0300 Subject: [PATCH 5/8] Update reduce.hpp --- src/rpp/rpp/operators/reduce.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp index c127d5b09..c99c3ae5f 100644 --- a/src/rpp/rpp/operators/reduce.hpp +++ b/src/rpp/rpp/operators/reduce.hpp @@ -28,7 +28,7 @@ struct reduce_state { mutable Seed seed; RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator; - RPP_NO_UNIQUE_ADDRESS SelectorFn selector; + RPP_NO_UNIQUE_ADDRESS SelectorFn selector{}; }; struct reduce_on_next From 493b02452ad91ebe944478298a316250b03d3f44 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 20 Dec 2022 23:04:19 +0300 Subject: [PATCH 6/8] Fix average --- src/rpp/rpp/operators/fwd/reduce.hpp | 9 ++++---- src/rpp/rpp/operators/reduce.hpp | 33 ++++++++++++++++++++-------- src/tests/rpp/test_reduce.cpp | 20 ++++++++++++++++- 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/src/rpp/rpp/operators/fwd/reduce.hpp b/src/rpp/rpp/operators/fwd/reduce.hpp index 086c5e103..318bc8d6d 100644 --- a/src/rpp/rpp/operators/fwd/reduce.hpp +++ b/src/rpp/rpp/operators/fwd/reduce.hpp @@ -30,7 +30,7 @@ concept is_can_be_averaged = requires(T t, CastBeforeDrop nt) { nt / size_t{} }; }; -template AccumulatorFn, std::invocable ResultSelectorFn> +template AccumulatorFn, std::invocable ResultSelectorFn> struct reduce_impl; template @@ -61,7 +61,7 @@ struct member_overload * \ingroup transforming_operators * \see https://reactivex.io/documentation/operators/reduce.html */ - template AccumulatorFn, std::invocable ResultSelectorFn = std::identity> + template AccumulatorFn, std::invocable ResultSelectorFn = std::identity> auto reduce(Seed&& initial_seed, AccumulatorFn&& accumulator, ResultSelectorFn&& result_selector = {}) const & requires is_header_included { return static_cast(this)->template lift>>( @@ -71,7 +71,7 @@ struct member_overload std::forward(result_selector)}); } - template AccumulatorFn, std::invocable ResultSelectorFn = std::identity> + template AccumulatorFn, std::invocable ResultSelectorFn = std::identity> auto reduce(Seed&& initial_seed, AccumulatorFn&& accumulator, ResultSelectorFn&& result_selector = {}) && requires is_header_included { return std::move(*static_cast(this)).template lift>>( @@ -84,7 +84,7 @@ struct member_overload /** * \brief Calculated the average of emissions and emit final value * - * \marble reduce + * \marble average { source observable : +--1-2-3-| operator "average" : +--------2| @@ -92,6 +92,7 @@ struct member_overload * * \return new specific_observable with the average operator as most recent operator. * \warning #include + * \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable * * \par Example * \snippet reduce.cpp average diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp index c99c3ae5f..5d53142ff 100644 --- a/src/rpp/rpp/operators/reduce.hpp +++ b/src/rpp/rpp/operators/reduce.hpp @@ -23,7 +23,7 @@ IMPLEMENTATION_FILE(reduce_tag); namespace rpp::details { -template SelectorFn = std::identity> +template SelectorFn = std::identity> struct reduce_state { mutable Seed seed; @@ -45,15 +45,23 @@ struct reduce_on_next struct reduce_on_completed { template - void operator()(const constraint::subscriber auto& sub, + void operator()(const constraint::subscriber auto& sub, const reduce_state& state) const { - sub.on_next(state.selector(std::move(state.seed))); + try + { + sub.on_next(state.selector(std::move(state.seed))); + } + catch (...) + { + sub.on_error(std::current_exception()); + return; + } sub.on_completed(); } }; -template AccumulatorFn, std::invocable ResultSelectorFn> +template AccumulatorFn, std::invocable ResultSelectorFn> struct reduce_impl { Seed initial_value; @@ -78,16 +86,23 @@ template auto average_impl(TObs&& observable) { using Type = utils::extract_observable_type_t>; - return std::forward(observable).reduce(std::pair{}, - [](std::pair&& seed, auto&& val) + using Pair = std::pair, int32_t>; + return std::forward(observable).reduce(Pair{}, + [](Pair&& seed, auto&& val) { - seed.first += std::forward(val); + if (seed.first) + seed.first.value() += std::forward(val); + else + seed.first = std::forward(val); ++seed.second; return std::move(seed); }, - [](std::pair seed) + [](Pair&& seed) { - return static_cast(seed.first) / seed.second; + if (!seed.first) + throw utils::not_enough_emissions{"`average` operator requires at least one emission to calculate average"}; + + return static_cast(std::move(seed.first).value()) / seed.second; }); } } // namespace rpp::details diff --git a/src/tests/rpp/test_reduce.cpp b/src/tests/rpp/test_reduce.cpp index d3f4b530c..5bafea1a2 100644 --- a/src/tests/rpp/test_reduce.cpp +++ b/src/tests/rpp/test_reduce.cpp @@ -15,6 +15,7 @@ #include #include +#include SCENARIO("reduce reduces values and store state", "[reduce]") { @@ -96,7 +97,7 @@ SCENARIO("reduce reduces values and store state", "[reduce]") SCENARIO("average calculates average", "[reduce]") { - GIVEN("observable") + GIVEN("-1-2| observable") { auto obs = rpp::source::just(1,2); WHEN("subscribe on it via average") @@ -124,6 +125,23 @@ SCENARIO("average calculates average", "[reduce]") } } } + + GIVEN("-| observable") + { + auto obs = rpp::source::empty(); + WHEN("subscribe on it via average") + { + auto mock = mock_observer{}; + auto r= obs.average().subscribe(mock); + + THEN("observer obtained on_error") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } } SCENARIO("reduce keeps state for copies", "[reduce]") From 0f37b85e357c07f89530834e842e2214c28b5e7d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 20 Dec 2022 23:35:19 +0300 Subject: [PATCH 7/8] add min/max/count/sum --- docs/Implementation Status.md | 8 +- src/examples/rpp/doxygen/reduce.cpp | 28 +++++ src/rpp/rpp/operators/fwd/reduce.hpp | 162 +++++++++++++++++++++++++-- src/rpp/rpp/operators/reduce.hpp | 68 +++++++++++ src/tests/rpp/test_reduce.cpp | 148 ++++++++++++++++++++++++ 5 files changed, 403 insertions(+), 11 deletions(-) diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index 457735ded..af1e3f46d 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -108,11 +108,11 @@ - [x] average - [x] concat -- [ ] count -- [ ] max -- [ ] min +- [x] count +- [x] max +- [x] min - [x] reduce -- [ ] sum +- [x] sum ### Backpressure diff --git a/src/examples/rpp/doxygen/reduce.cpp b/src/examples/rpp/doxygen/reduce.cpp index 54bf015c2..db3e9dcce 100644 --- a/src/examples/rpp/doxygen/reduce.cpp +++ b/src/examples/rpp/doxygen/reduce.cpp @@ -38,5 +38,33 @@ int main() .subscribe([](int v) { std::cout << v << std::endl; }); // Output: 2 //! [average] + + //! [sum] + rpp::source::just(1,2,3) + .sum() + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 6 + //! [sum] + + //! [count] + rpp::source::just(1,2,3) + .count() + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 3 + //! [count] + + //! [min] + rpp::source::just(5,1,2,3) + .min() + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 1 + //! [min] + + //! [max] + rpp::source::just(5,1,2,3) + .max() + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 5 + //! [max] return 0; } diff --git a/src/rpp/rpp/operators/fwd/reduce.hpp b/src/rpp/rpp/operators/fwd/reduce.hpp index 318bc8d6d..fdbd0dc60 100644 --- a/src/rpp/rpp/operators/fwd/reduce.hpp +++ b/src/rpp/rpp/operators/fwd/reduce.hpp @@ -23,10 +23,15 @@ namespace rpp::details template concept reduce_accumulator = std::is_invocable_r_v, Fn, std::decay_t, std::decay_t>; -template -concept is_can_be_averaged = requires(T t, CastBeforeDrop nt) +template +concept is_can_be_summed = requires(T t) { { t + t } -> std::convertible_to; +}; + +template +concept is_can_be_averaged = is_can_be_summed && requires(CastBeforeDrop nt) +{ { nt / size_t{} }; }; @@ -36,11 +41,23 @@ struct reduce_impl; template auto average_impl(TObs&& observable); +template +auto sum_impl(TObs&& observable); + +template +auto count_impl(TObs&& observable); + +template +auto min_impl(TObs&& observable, Comparator&& comparator); + +template +auto max_impl(TObs&& observable, Comparator&& comparator); + template struct member_overload { /** - * \brief Apply accumulator function to each emission from observable and result of accumulator from previous step and emit final value + * \brief Applies accumulator function to each emission from observable and result of accumulator from previous step and emits final value * * \marble reduce { @@ -71,7 +88,7 @@ struct member_overload std::forward(result_selector)}); } - template AccumulatorFn, std::invocable ResultSelectorFn = std::identity> + template AccumulatorFn, std::invocable ResultSelectorFn = std::identity> auto reduce(Seed&& initial_seed, AccumulatorFn&& accumulator, ResultSelectorFn&& result_selector = {}) && requires is_header_included { return std::move(*static_cast(this)).template lift>>( @@ -82,7 +99,7 @@ struct member_overload } /** - * \brief Calculated the average of emissions and emit final value + * \brief Calculates the average of emissions and emits final value * * \marble average { @@ -90,9 +107,11 @@ struct member_overload operator "average" : +--------2| } * + * \tparam CastBeforeDivide cast accumulated value to this type before division * \return new specific_observable with the average operator as most recent operator. - * \warning #include * \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable + * + * \warning #include * * \par Example * \snippet reduce.cpp average @@ -106,10 +125,139 @@ struct member_overload return average_impl(*static_cast(this)); } - template + template auto average() && requires (is_header_included && is_can_be_averaged) { return average_impl(std::move(*static_cast(this))); } + + + /** + * \brief Calculates the sum of emissions and emits final value + * + * \marble sum + { + source observable : +--1-2-3-| + operator "sum" : +--------6| + } + * + * \return new specific_observable with the sum operator as most recent operator. + * \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable + * + * \warning #include + * + * \par Example + * \snippet reduce.cpp sum + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/sum.html + */ + template + auto sum() const & requires (is_header_included && is_can_be_summed) + { + return sum_impl(*static_cast(this)); + } + + template + auto sum() && requires (is_header_included && is_can_be_summed) + { + return sum_impl(std::move(*static_cast(this))); + } + + /** + * \brief Calculates the amount of emitted emissions and emits this count + * + * \marble count + { + source observable : +--1-2-3-| + operator "count" : +--------3| + } + * + * \return new specific_observable with the count operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet reduce.cpp count + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/count.html + */ + template + auto count() const & requires is_header_included + { + return count_impl(*static_cast(this)); + } + + template + auto count() && requires is_header_included + { + return count_impl(std::move(*static_cast(this))); + } + + /** + * \brief Emits the emission which has minimal value from the whole observable + * + * \marble min + { + source observable : +-6-1-2-3-| + operator "min" : +---------1| + } + * + * \param comparator is function to deduce if left value is less than right + * \return new specific_observable with the min operator as most recent operator. + * \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable + * + * \warning #include + * + * \par Example + * \snippet reduce.cpp min + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/min.html + */ + template Comparator = std::less, typename ...Args> + auto min(Comparator&& comparator = {}) const & requires is_header_included + { + return min_impl(*static_cast(this), std::forward(comparator)); + } + + template Comparator = std::less, typename ...Args> + auto min(Comparator&& comparator = {}) && requires is_header_included + { + return min_impl(std::move(*static_cast(this)), std::forward(comparator)); + } + + /** + * \brief Emits the emission which has maximal value from the whole observable + * + * \marble max + { + source observable : +-6-1-2-3-| + operator "max" : +---------6| + } + * + * \param comparator is function to deduce if left value is less than right + * \return new specific_observable with the max operator as most recent operator. + * \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable + * + * \warning #include + * + * \par Example + * \snippet reduce.cpp max + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/max.html + */ + template Comparator = std::less, typename ...Args> + auto max(Comparator&& comparator = {}) const & requires is_header_included + { + return max_impl(*static_cast(this), std::forward(comparator)); + } + + template Comparator = std::less, typename ...Args> + auto max(Comparator&& comparator = {}) && requires is_header_included + { + return max_impl(std::move(*static_cast(this)), std::forward(comparator)); + } }; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp index 5d53142ff..b492a926a 100644 --- a/src/rpp/rpp/operators/reduce.hpp +++ b/src/rpp/rpp/operators/reduce.hpp @@ -105,5 +105,73 @@ auto average_impl(TObs&& observable) return static_cast(std::move(seed.first).value()) / seed.second; }); } + +template +auto sum_impl(TObs&& observable) +{ + using Type = utils::extract_observable_type_t>; + return std::forward(observable).reduce(std::optional{}, + [](std::optional&& seed, auto&& val) + { + if (!seed) + seed = std::forward(val); + else + seed.value() += std::forward(val); + return std::move(seed); + }, + [](std::optional&& seed) + { + if (!seed) + throw utils::not_enough_emissions{"`sum` operator requires at least one emission to calculate sum"}; + + return std::move(seed.value()); + }); +} + +template +auto count_impl(TObs&& observable) +{ + return std::forward(observable).reduce(size_t{}, [](size_t seed, auto&&) { return ++seed; }); +} + +template +auto min_impl(TObs&& observable, Comparator&& comparator) +{ + using Type = utils::extract_observable_type_t>; + return std::forward(observable).reduce(std::optional{}, + [comparator](std::optional&& seed, auto&& val) + { + if (!seed || comparator(utils::as_const(val), seed.value())) + seed = std::forward(val); + return std::move(seed); + }, + [](std::optional&& seed) + { + if (!seed) + throw utils::not_enough_emissions{"`min` operator requires at least one emission to calculate min"}; + + return std::move(seed.value()); + }); +} + +template +auto max_impl(TObs&& observable, Comparator&& comparator) +{ + using Type = utils::extract_observable_type_t>; + return std::forward(observable).reduce(std::optional{}, + [comparator](std::optional&& seed, auto&& val) + { + if (!seed || comparator(seed.value(), utils::as_const(val))) + seed = std::forward(val); + return std::move(seed); + }, + [](std::optional&& seed) + { + if (!seed) + throw utils::not_enough_emissions{"`max` operator requires at least one emission to calculate min"}; + + return std::move(seed.value()); + }); +} } // namespace rpp::details diff --git a/src/tests/rpp/test_reduce.cpp b/src/tests/rpp/test_reduce.cpp index 5bafea1a2..7e357ff54 100644 --- a/src/tests/rpp/test_reduce.cpp +++ b/src/tests/rpp/test_reduce.cpp @@ -144,6 +144,154 @@ SCENARIO("average calculates average", "[reduce]") } } +SCENARIO("sum calculates sum", "[reduce]") +{ + GIVEN("-1-2| observable") + { + auto obs = rpp::source::just(1,2); + WHEN("subscribe on it via sum") + { + auto mock = mock_observer{}; + auto r= obs.sum().subscribe(mock); + + THEN("observer obtained value as sum int") + { + CHECK(mock.get_received_values() == std::vector{3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + GIVEN("-| observable") + { + auto obs = rpp::source::empty(); + WHEN("subscribe on it via sum") + { + auto mock = mock_observer{}; + auto r= obs.sum().subscribe(mock); + + THEN("observer obtained on_error") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} + +SCENARIO("count calculates count", "[reduce]") +{ + GIVEN("-1-2| observable") + { + auto obs = rpp::source::just(1,2); + WHEN("subscribe on it via count") + { + auto mock = mock_observer{}; + auto r= obs.count().subscribe(mock); + + THEN("observer obtained value as count size_t") + { + CHECK(mock.get_received_values() == std::vector{size_t{2}}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + GIVEN("-| observable") + { + auto obs = rpp::source::empty(); + WHEN("subscribe on it via count") + { + auto mock = mock_observer{}; + auto r= obs.count().subscribe(mock); + + THEN("observer obtained zero") + { + CHECK(mock.get_received_values() == std::vector{0}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } +} + +SCENARIO("min calculates min", "[reduce]") +{ + GIVEN("-3-1-2| observable") + { + auto obs = rpp::source::just(3,1,2); + WHEN("subscribe on it via min") + { + auto mock = mock_observer{}; + auto r= obs.min().subscribe(mock); + + THEN("observer obtained min value") + { + CHECK(mock.get_received_values() == std::vector{1}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + GIVEN("-| observable") + { + auto obs = rpp::source::empty(); + WHEN("subscribe on it via min") + { + auto mock = mock_observer{}; + auto r= obs.min().subscribe(mock); + + THEN("observer obtained on_error") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} + +SCENARIO("max calculates min", "[reduce]") +{ + GIVEN("-3-1-2| observable") + { + auto obs = rpp::source::just(3,1,2); + WHEN("subscribe on it via max") + { + auto mock = mock_observer{}; + auto r= obs.max().subscribe(mock); + + THEN("observer obtained max value") + { + CHECK(mock.get_received_values() == std::vector{3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + GIVEN("-| observable") + { + auto obs = rpp::source::empty(); + WHEN("subscribe on it via max") + { + auto mock = mock_observer{}; + auto r= obs.max().subscribe(mock); + + THEN("observer obtained on_error") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} + SCENARIO("reduce keeps state for copies", "[reduce]") { auto mock = mock_observer{}; From 6c5735733ba54710630e79a1f253c7ff5cabf3d5 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 20 Dec 2022 23:39:21 +0300 Subject: [PATCH 8/8] fixx --- src/rpp/rpp/operators/reduce.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp index b492a926a..8a636315d 100644 --- a/src/rpp/rpp/operators/reduce.hpp +++ b/src/rpp/rpp/operators/reduce.hpp @@ -11,6 +11,7 @@ #pragma once #include // RPP_NO_UNIQUE_ADDRESS +#include #include // required due to operator uses lift #include // create_subscriber_with_state #include // own forwarding