diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index d96a6f08f..af1e3f46d 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -106,13 +106,13 @@ ### Aggregate -- [ ] average +- [x] average - [x] concat -- [ ] count -- [ ] max -- [ ] min -- [ ] reduce -- [ ] sum +- [x] count +- [x] max +- [x] min +- [x] reduce +- [x] sum ### Backpressure diff --git a/src/examples/rpp/doxygen/reduce.cpp b/src/examples/rpp/doxygen/reduce.cpp new file mode 100644 index 000000000..db3e9dcce --- /dev/null +++ b/src/examples/rpp/doxygen/reduce.cpp @@ -0,0 +1,70 @@ +#include + +#include + +/** + * \example reduce.cpp + **/ + +int main() +{ + //! [reduce] + rpp::source::just(1,2,3) + .reduce(0, std::plus{}) + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 6 + //! [reduce] + + //! [reduce_vector] + rpp::source::just(1,2,3) + .reduce(std::vector{}, [](std::vector&& seed, int new_value) + { + seed.push_back(new_value); + return std::move(seed); + }) + .subscribe([](const std::vector& v) + { + std::cout << "vector: "; + for(int val : v) + std::cout << val << " "; + std::cout << std::endl; + }); + // Output: vector: 1 2 3 + //! [reduce_vector] + + //! [average] + rpp::source::just(1,2,3) + .average() + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 2 + //! [average] + + //! [sum] + rpp::source::just(1,2,3) + .sum() + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 6 + //! [sum] + + //! [count] + rpp::source::just(1,2,3) + .count() + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 3 + //! [count] + + //! [min] + rpp::source::just(5,1,2,3) + .min() + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 1 + //! [min] + + //! [max] + rpp::source::just(5,1,2,3) + .max() + .subscribe([](int v) { std::cout << v << std::endl; }); + // Output: 5 + //! [max] + return 0; +} diff --git a/src/rpp/rpp/observables/interface_observable.hpp b/src/rpp/rpp/observables/interface_observable.hpp index 1cebbeb72..bd5ec0dff 100644 --- a/src/rpp/rpp/observables/interface_observable.hpp +++ b/src/rpp/rpp/observables/interface_observable.hpp @@ -54,6 +54,7 @@ struct RPP_EMPTY_BASES interface_observable , details::member_overload , details::member_overload , details::member_overload + , details::member_overload , details::member_overload , details::member_overload , details::member_overload diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index da700d740..1479add18 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -80,6 +80,7 @@ */ #include +#include /** * \defgroup utility_operators Utility Operators diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 0c72e24bb..556bd6d55 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/fwd/reduce.hpp b/src/rpp/rpp/operators/fwd/reduce.hpp new file mode 100644 index 000000000..fdbd0dc60 --- /dev/null +++ b/src/rpp/rpp/operators/fwd/reduce.hpp @@ -0,0 +1,263 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +namespace rpp::details +{ +struct reduce_tag; +} + +namespace rpp::details +{ +// accepts Result and Type and returns Result +template +concept reduce_accumulator = std::is_invocable_r_v, Fn, std::decay_t, std::decay_t>; + +template +concept is_can_be_summed = requires(T t) +{ + { t + t } -> std::convertible_to; +}; + +template +concept is_can_be_averaged = is_can_be_summed && requires(CastBeforeDrop nt) +{ + { nt / size_t{} }; +}; + +template AccumulatorFn, std::invocable ResultSelectorFn> +struct reduce_impl; + +template +auto average_impl(TObs&& observable); + +template +auto sum_impl(TObs&& observable); + +template +auto count_impl(TObs&& observable); + +template +auto min_impl(TObs&& observable, Comparator&& comparator); + +template +auto max_impl(TObs&& observable, Comparator&& comparator); + +template +struct member_overload +{ + /** + * \brief Applies accumulator function to each emission from observable and result of accumulator from previous step and emits final value + * + * \marble reduce + { + source observable : +--1-2-3-| + operator "reduce: s=1, (s,x)=>s+x" : +--------7| + } + * + * \param initial_seed initial value for seed which will be applied for first value from observable. Then it will be replaced with result and etc. + * \param accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference. + * + * \return new specific_observable with the reduce operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet reduce.cpp reduce + * \snippet reduce.cpp reduce_vector + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/reduce.html + */ + template AccumulatorFn, std::invocable ResultSelectorFn = std::identity> + auto reduce(Seed&& initial_seed, AccumulatorFn&& accumulator, ResultSelectorFn&& result_selector = {}) const & requires is_header_included + { + return static_cast(this)->template lift>>( + reduce_impl, std::decay_t, std::decay_t>{ + std::forward(initial_seed), + std::forward(accumulator), + std::forward(result_selector)}); + } + + template AccumulatorFn, std::invocable ResultSelectorFn = std::identity> + auto reduce(Seed&& initial_seed, AccumulatorFn&& accumulator, ResultSelectorFn&& result_selector = {}) && requires is_header_included + { + return std::move(*static_cast(this)).template lift>>( + reduce_impl, std::decay_t, std::decay_t>{ + std::forward(initial_seed), + std::forward(accumulator), + std::forward(result_selector)}); + } + + /** + * \brief Calculates the average of emissions and emits final value + * + * \marble average + { + source observable : +--1-2-3-| + operator "average" : +--------2| + } + * + * \tparam CastBeforeDivide cast accumulated value to this type before division + * \return new specific_observable with the average operator as most recent operator. + * \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable + * + * \warning #include + * + * \par Example + * \snippet reduce.cpp average + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/average.html + */ + template + auto average() const & requires (is_header_included && is_can_be_averaged) + { + return average_impl(*static_cast(this)); + } + + template + auto average() && requires (is_header_included && is_can_be_averaged) + { + return average_impl(std::move(*static_cast(this))); + } + + + /** + * \brief Calculates the sum of emissions and emits final value + * + * \marble sum + { + source observable : +--1-2-3-| + operator "sum" : +--------6| + } + * + * \return new specific_observable with the sum operator as most recent operator. + * \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable + * + * \warning #include + * + * \par Example + * \snippet reduce.cpp sum + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/sum.html + */ + template + auto sum() const & requires (is_header_included && is_can_be_summed) + { + return sum_impl(*static_cast(this)); + } + + template + auto sum() && requires (is_header_included && is_can_be_summed) + { + return sum_impl(std::move(*static_cast(this))); + } + + /** + * \brief Calculates the amount of emitted emissions and emits this count + * + * \marble count + { + source observable : +--1-2-3-| + operator "count" : +--------3| + } + * + * \return new specific_observable with the count operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet reduce.cpp count + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/count.html + */ + template + auto count() const & requires is_header_included + { + return count_impl(*static_cast(this)); + } + + template + auto count() && requires is_header_included + { + return count_impl(std::move(*static_cast(this))); + } + + /** + * \brief Emits the emission which has minimal value from the whole observable + * + * \marble min + { + source observable : +-6-1-2-3-| + operator "min" : +---------1| + } + * + * \param comparator is function to deduce if left value is less than right + * \return new specific_observable with the min operator as most recent operator. + * \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable + * + * \warning #include + * + * \par Example + * \snippet reduce.cpp min + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/min.html + */ + template Comparator = std::less, typename ...Args> + auto min(Comparator&& comparator = {}) const & requires is_header_included + { + return min_impl(*static_cast(this), std::forward(comparator)); + } + + template Comparator = std::less, typename ...Args> + auto min(Comparator&& comparator = {}) && requires is_header_included + { + return min_impl(std::move(*static_cast(this)), std::forward(comparator)); + } + + /** + * \brief Emits the emission which has maximal value from the whole observable + * + * \marble max + { + source observable : +-6-1-2-3-| + operator "max" : +---------6| + } + * + * \param comparator is function to deduce if left value is less than right + * \return new specific_observable with the max operator as most recent operator. + * \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable + * + * \warning #include + * + * \par Example + * \snippet reduce.cpp max + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/max.html + */ + template Comparator = std::less, typename ...Args> + auto max(Comparator&& comparator = {}) const & requires is_header_included + { + return max_impl(*static_cast(this), std::forward(comparator)); + } + + template Comparator = std::less, typename ...Args> + auto max(Comparator&& comparator = {}) && requires is_header_included + { + return max_impl(std::move(*static_cast(this)), std::forward(comparator)); + } +}; +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd/scan.hpp b/src/rpp/rpp/operators/fwd/scan.hpp index 270b38600..c6fff707e 100644 --- a/src/rpp/rpp/operators/fwd/scan.hpp +++ b/src/rpp/rpp/operators/fwd/scan.hpp @@ -21,7 +21,7 @@ namespace rpp::details { // accepts Result and Type and returns Result template -concept scan_accumulator = std::is_invocable_r_v, Fn, std::decay_t, std::decay_t>; +concept scan_accumulator = reduce_accumulator; template AccumulatorFn> struct scan_impl; diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp new file mode 100644 index 000000000..8a636315d --- /dev/null +++ b/src/rpp/rpp/operators/reduce.hpp @@ -0,0 +1,178 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include // RPP_NO_UNIQUE_ADDRESS +#include +#include // required due to operator uses lift +#include // create_subscriber_with_state +#include // own forwarding +#include // constraint::subscriber +#include // forwarding_on_error +#include // utils::as_const + + +IMPLEMENTATION_FILE(reduce_tag); + +namespace rpp::details +{ +template SelectorFn = std::identity> +struct reduce_state +{ + mutable Seed seed; + RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator; + RPP_NO_UNIQUE_ADDRESS SelectorFn selector{}; +}; + +struct reduce_on_next +{ + template + void operator()(auto&& value, + const constraint::subscriber auto&, + const reduce_state& state) const + { + state.seed = state.accumulator(std::move(state.seed), std::forward(value)); + } +}; + +struct reduce_on_completed +{ + template + void operator()(const constraint::subscriber auto& sub, + const reduce_state& state) const + { + try + { + sub.on_next(state.selector(std::move(state.seed))); + } + catch (...) + { + sub.on_error(std::current_exception()); + return; + } + sub.on_completed(); + } +}; + +template AccumulatorFn, std::invocable ResultSelectorFn> +struct reduce_impl +{ + Seed initial_value; + RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator; + RPP_NO_UNIQUE_ADDRESS ResultSelectorFn selector; + + template> TSub> + auto operator()(TSub&& subscriber) const + { + auto subscription = subscriber.get_subscription(); + // dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state + return create_subscriber_with_dynamic_state(std::move(subscription), + reduce_on_next{}, + utils::forwarding_on_error{}, + reduce_on_completed{}, + std::forward(subscriber), + reduce_state{initial_value, accumulator, selector}); + } +}; + +template +auto average_impl(TObs&& observable) +{ + using Type = utils::extract_observable_type_t>; + using Pair = std::pair, int32_t>; + return std::forward(observable).reduce(Pair{}, + [](Pair&& seed, auto&& val) + { + if (seed.first) + seed.first.value() += std::forward(val); + else + seed.first = std::forward(val); + ++seed.second; + return std::move(seed); + }, + [](Pair&& seed) + { + if (!seed.first) + throw utils::not_enough_emissions{"`average` operator requires at least one emission to calculate average"}; + + return static_cast(std::move(seed.first).value()) / seed.second; + }); +} + +template +auto sum_impl(TObs&& observable) +{ + using Type = utils::extract_observable_type_t>; + return std::forward(observable).reduce(std::optional{}, + [](std::optional&& seed, auto&& val) + { + if (!seed) + seed = std::forward(val); + else + seed.value() += std::forward(val); + return std::move(seed); + }, + [](std::optional&& seed) + { + if (!seed) + throw utils::not_enough_emissions{"`sum` operator requires at least one emission to calculate sum"}; + + return std::move(seed.value()); + }); +} + +template +auto count_impl(TObs&& observable) +{ + return std::forward(observable).reduce(size_t{}, [](size_t seed, auto&&) { return ++seed; }); +} + +template +auto min_impl(TObs&& observable, Comparator&& comparator) +{ + using Type = utils::extract_observable_type_t>; + return std::forward(observable).reduce(std::optional{}, + [comparator](std::optional&& seed, auto&& val) + { + if (!seed || comparator(utils::as_const(val), seed.value())) + seed = std::forward(val); + return std::move(seed); + }, + [](std::optional&& seed) + { + if (!seed) + throw utils::not_enough_emissions{"`min` operator requires at least one emission to calculate min"}; + + return std::move(seed.value()); + }); +} + +template +auto max_impl(TObs&& observable, Comparator&& comparator) +{ + using Type = utils::extract_observable_type_t>; + return std::forward(observable).reduce(std::optional{}, + [comparator](std::optional&& seed, auto&& val) + { + if (!seed || comparator(seed.value(), utils::as_const(val))) + seed = std::forward(val); + return std::move(seed); + }, + [](std::optional&& seed) + { + if (!seed) + throw utils::not_enough_emissions{"`max` operator requires at least one emission to calculate min"}; + + return std::move(seed.value()); + }); +} +} // namespace rpp::details + diff --git a/src/rpp/rpp/operators/scan.hpp b/src/rpp/rpp/operators/scan.hpp index a9d8e25e2..e24d6b325 100644 --- a/src/rpp/rpp/operators/scan.hpp +++ b/src/rpp/rpp/operators/scan.hpp @@ -14,6 +14,7 @@ #include // required due to operator uses lift #include // create_subscriber_with_state #include // own forwarding +#include // reduce to re-use #include // constraint::subscriber #include // forwarding_on_error #include // utils::as_const @@ -23,21 +24,14 @@ IMPLEMENTATION_FILE(scan_tag); namespace rpp::details { -template -struct scan_state -{ - mutable Result seed; - RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator; -}; - -struct scan_on_next +struct scan_on_next : private reduce_on_next { template void operator()(auto&& value, const constraint::subscriber auto& sub, - const scan_state& state) const + const reduce_state& state) const { - state.seed = state.accumulator(std::move(state.seed), std::forward(value)); + reduce_on_next::operator()(std::forward(value), sub, state); sub.on_next(utils::as_const(state.seed)); } }; @@ -58,7 +52,7 @@ struct scan_impl utils::forwarding_on_error{}, utils::forwarding_on_completed{}, std::forward(subscriber), - scan_state{initial_value, accumulator}); + reduce_state{initial_value, accumulator}); } }; } // namespace rpp::details diff --git a/src/tests/rpp/test_reduce.cpp b/src/tests/rpp/test_reduce.cpp new file mode 100644 index 000000000..7e357ff54 --- /dev/null +++ b/src/tests/rpp/test_reduce.cpp @@ -0,0 +1,337 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#include "copy_count_tracker.hpp" +#include "mock_observer.hpp" + +#include + +#include + +#include +#include + +SCENARIO("reduce reduces values and store state", "[reduce]") +{ + GIVEN("observable") + { + auto obs = rpp::observable::just(1, 2, 3); + WHEN("subscribe on it via reduce with plus") + { + auto mock = mock_observer{}; + + obs.reduce(0, std::plus{}).subscribe(mock); + THEN("observer obtains final sum") + { + CHECK(mock.get_received_values() == std::vector{6}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + WHEN("subscribe on it via reduce with aggregating in vector") + { + auto mock = mock_observer>{}; + + obs.reduce(std::vector{}, + [](std::vector&& seed, int new_val) + { + seed.push_back(new_val); + return std::move(seed); + }).subscribe(mock); + + THEN("observer obtains final vector") + { + CHECK(mock.get_received_values() == std::vector{{std::vector{1,2,3}}}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + WHEN("subscribe on it via reduce with exception") + { + auto mock = mock_observer{}; + + volatile bool none{}; + obs.reduce(0, + [&](int, int)-> int + { + if (none) + return 0; + throw std::runtime_error{""}; + }).subscribe(mock); + + THEN("observer obtains only on_error") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } + GIVEN("observable 1-2->") + { + auto obs = rpp::observable::create([](const auto& sub) + { + sub.on_next(1); + sub.on_next(2); + }); + WHEN("subscribe on it via reduce with plus") + { + auto mock = mock_observer{}; + + obs.reduce(0, std::plus{}).subscribe(mock); + THEN("no any values") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} + +SCENARIO("average calculates average", "[reduce]") +{ + GIVEN("-1-2| observable") + { + auto obs = rpp::source::just(1,2); + WHEN("subscribe on it via average") + { + auto mock = mock_observer{}; + auto r= obs.average().subscribe(mock); + + THEN("observer obtained value as average int") + { + CHECK(mock.get_received_values() == std::vector{static_cast(1+2)/2}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + WHEN("subscribe on it via average") + { + auto mock = mock_observer{}; + obs.average().subscribe(mock); + + THEN("observer obtained value as average double") + { + CHECK(mock.get_received_values() == std::vector{1.5}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + GIVEN("-| observable") + { + auto obs = rpp::source::empty(); + WHEN("subscribe on it via average") + { + auto mock = mock_observer{}; + auto r= obs.average().subscribe(mock); + + THEN("observer obtained on_error") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} + +SCENARIO("sum calculates sum", "[reduce]") +{ + GIVEN("-1-2| observable") + { + auto obs = rpp::source::just(1,2); + WHEN("subscribe on it via sum") + { + auto mock = mock_observer{}; + auto r= obs.sum().subscribe(mock); + + THEN("observer obtained value as sum int") + { + CHECK(mock.get_received_values() == std::vector{3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + GIVEN("-| observable") + { + auto obs = rpp::source::empty(); + WHEN("subscribe on it via sum") + { + auto mock = mock_observer{}; + auto r= obs.sum().subscribe(mock); + + THEN("observer obtained on_error") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} + +SCENARIO("count calculates count", "[reduce]") +{ + GIVEN("-1-2| observable") + { + auto obs = rpp::source::just(1,2); + WHEN("subscribe on it via count") + { + auto mock = mock_observer{}; + auto r= obs.count().subscribe(mock); + + THEN("observer obtained value as count size_t") + { + CHECK(mock.get_received_values() == std::vector{size_t{2}}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + GIVEN("-| observable") + { + auto obs = rpp::source::empty(); + WHEN("subscribe on it via count") + { + auto mock = mock_observer{}; + auto r= obs.count().subscribe(mock); + + THEN("observer obtained zero") + { + CHECK(mock.get_received_values() == std::vector{0}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } +} + +SCENARIO("min calculates min", "[reduce]") +{ + GIVEN("-3-1-2| observable") + { + auto obs = rpp::source::just(3,1,2); + WHEN("subscribe on it via min") + { + auto mock = mock_observer{}; + auto r= obs.min().subscribe(mock); + + THEN("observer obtained min value") + { + CHECK(mock.get_received_values() == std::vector{1}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + GIVEN("-| observable") + { + auto obs = rpp::source::empty(); + WHEN("subscribe on it via min") + { + auto mock = mock_observer{}; + auto r= obs.min().subscribe(mock); + + THEN("observer obtained on_error") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} + +SCENARIO("max calculates min", "[reduce]") +{ + GIVEN("-3-1-2| observable") + { + auto obs = rpp::source::just(3,1,2); + WHEN("subscribe on it via max") + { + auto mock = mock_observer{}; + auto r= obs.max().subscribe(mock); + + THEN("observer obtained max value") + { + CHECK(mock.get_received_values() == std::vector{3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + GIVEN("-| observable") + { + auto obs = rpp::source::empty(); + WHEN("subscribe on it via max") + { + auto mock = mock_observer{}; + auto r= obs.max().subscribe(mock); + + THEN("observer obtained on_error") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} + +SCENARIO("reduce keeps state for copies", "[reduce]") +{ + auto mock = mock_observer{}; + GIVEN("observable which sends values via copy") + { + auto obs = rpp::source::create([](const auto& sub) + { + for (size_t i = 0; i < 10; ++i) + { + auto copy = sub; + copy.on_next(1); + } + sub.on_completed(); + }); + WHEN("subscribe on it via reduce") + { + obs.reduce(int{}, [](int seed, int new_v) { return seed + new_v; }).subscribe(mock); + THEN("observer obtains only last value") + { + CHECK(mock.get_received_values() == std::vector{ 10 }); + } + } + } +} + +SCENARIO("reduce doesn't produce extra copies", "[reduce][track_copy]") +{ + GIVEN("observable and subscriber") + { + copy_count_tracker verifier{}; + auto obs = rpp::source::just(1).reduce(verifier, [](copy_count_tracker&& seed, int) { return std::move(seed); }); + WHEN("subscribe") + { + obs.subscribe([](const auto&) {}); + THEN("no extra copies") + { + REQUIRE(verifier.get_copy_count() == 2); // 1 copy to reduce state + 1 copy for provided subscriber to shared_state + REQUIRE(verifier.get_move_count() == 5); + // 1 move to observable state + 1 move to subscriber + 1 move from lambda + 1 move to new_state + 1 move to final lambda + } + } + } +}