Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/Implementation Status.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@

### Aggregate

- [ ] average
- [x] average
- [x] concat
- [ ] count
- [ ] max
- [ ] min
- [ ] reduce
- [ ] sum
- [x] count
- [x] max
- [x] min
- [x] reduce
- [x] sum

### Backpressure

Expand Down
70 changes: 70 additions & 0 deletions src/examples/rpp/doxygen/reduce.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#include <rpp/rpp.hpp>

#include <iostream>

/**
* \example reduce.cpp
**/

int main()
{
//! [reduce]
rpp::source::just(1,2,3)
.reduce(0, std::plus<int>{})
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 6
//! [reduce]

//! [reduce_vector]
rpp::source::just(1,2,3)
.reduce(std::vector<int>{}, [](std::vector<int>&& seed, int new_value)
{
seed.push_back(new_value);
return std::move(seed);
})
.subscribe([](const std::vector<int>& v)
{
std::cout << "vector: ";
for(int val : v)
std::cout << val << " ";
std::cout << std::endl;
});
// Output: vector: 1 2 3
//! [reduce_vector]

//! [average]
rpp::source::just(1,2,3)
.average()
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 2
//! [average]

//! [sum]
rpp::source::just(1,2,3)
.sum()
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 6
//! [sum]

//! [count]
rpp::source::just(1,2,3)
.count()
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 3
//! [count]

//! [min]
rpp::source::just(5,1,2,3)
.min()
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 1
//! [min]

//! [max]
rpp::source::just(5,1,2,3)
.max()
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 5
//! [max]
return 0;
}
1 change: 1 addition & 0 deletions src/rpp/rpp/observables/interface_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct RPP_EMPTY_BASES interface_observable
, details::member_overload<Type, SpecificObservable, details::observe_on_tag>
, details::member_overload<Type, SpecificObservable, details::on_error_resume_next_tag>
, details::member_overload<Type, SpecificObservable, details::publish_tag>
, details::member_overload<Type, SpecificObservable, details::reduce_tag>
, details::member_overload<Type, SpecificObservable, details::repeat_tag>
, details::member_overload<Type, SpecificObservable, details::sample_tag>
, details::member_overload<Type, SpecificObservable, details::scan_tag>
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
*/

#include <rpp/operators/concat.hpp>
#include <rpp/operators/reduce.hpp>

/**
* \defgroup utility_operators Utility Operators
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <rpp/operators/fwd/observe_on.hpp>
#include <rpp/operators/fwd/on_error_resume_next.hpp>
#include <rpp/operators/fwd/publish.hpp>
#include <rpp/operators/fwd/reduce.hpp>
#include <rpp/operators/fwd/ref_count.hpp>
#include <rpp/operators/fwd/repeat.hpp>
#include <rpp/operators/fwd/sample.hpp>
Expand Down
263 changes: 263 additions & 0 deletions src/rpp/rpp/operators/fwd/reduce.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/observables/details/member_overload.hpp>

namespace rpp::details
{
struct reduce_tag;
}

namespace rpp::details
{
// accepts Result and Type and returns Result
template<typename Fn, typename Result, typename Type>
concept reduce_accumulator = std::is_invocable_r_v<std::decay_t<Result>, Fn, std::decay_t<Result>, std::decay_t<Type>>;

template<typename T>
concept is_can_be_summed = requires(T t)
{
{ t + t } -> std::convertible_to<T>;
};

template<typename T, typename CastBeforeDrop>
concept is_can_be_averaged = is_can_be_summed<T> && requires(CastBeforeDrop nt)
{
{ nt / size_t{} };
};

template<constraint::decayed_type Type, constraint::decayed_type Seed, reduce_accumulator<Seed, Type> AccumulatorFn, std::invocable<Seed&&> ResultSelectorFn>
struct reduce_impl;

template<constraint::decayed_type CastBeforeDivide, constraint::observable TObs>
Comment thread
AlexInLog marked this conversation as resolved.
auto average_impl(TObs&& observable);
Comment thread
AlexInLog marked this conversation as resolved.

template<constraint::observable TObs>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no type named observable in namespace rpp::constraint

auto sum_impl(TObs&& observable);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
use of undeclared identifier observable


template<constraint::observable TObs>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no type named observable in namespace rpp::constraint

auto count_impl(TObs&& observable);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
use of undeclared identifier observable


template<constraint::observable TObs, typename Comparator>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no type named observable in namespace rpp::constraint

auto min_impl(TObs&& observable, Comparator&& comparator);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
use of undeclared identifier observable

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
Comparator does not refer to a value

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
use of undeclared identifier comparator; did you mean operator?

Suggested change
auto min_impl(TObs&& observable, Comparator&& comparator);
auto min_impl(TObs&& observable, Comparator&& operator);


template<constraint::observable TObs, typename Comparator>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no type named observable in namespace rpp::constraint

auto max_impl(TObs&& observable, Comparator&& comparator);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
use of undeclared identifier observable

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
Comparator does not refer to a value

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
use of undeclared identifier comparator; did you mean operator?

Suggested change
auto max_impl(TObs&& observable, Comparator&& comparator);
auto max_impl(TObs&& observable, Comparator&& operator);


template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, reduce_tag>
{
/**
* \brief Applies accumulator function to each emission from observable and result of accumulator from previous step and emits final value
*
* \marble reduce
{
source observable : +--1-2-3-|
operator "reduce: s=1, (s,x)=>s+x" : +--------7|
}
*
* \param initial_seed initial value for seed which will be applied for first value from observable. Then it will be replaced with result and etc.
* \param accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference.
*
* \return new specific_observable with the reduce operator as most recent operator.
* \warning #include <rpp/operators/reduce.hpp>
*
* \par Example
* \snippet reduce.cpp reduce
* \snippet reduce.cpp reduce_vector
*
* \ingroup transforming_operators
* \see https://reactivex.io/documentation/operators/reduce.html
*/
template<typename Seed, reduce_accumulator<Seed, Type> AccumulatorFn, std::invocable<Seed&&> ResultSelectorFn = std::identity>
auto reduce(Seed&& initial_seed, AccumulatorFn&& accumulator, ResultSelectorFn&& result_selector = {}) const & requires is_header_included<reduce_tag, Seed, AccumulatorFn, ResultSelectorFn>
{
return static_cast<const SpecificObservable*>(this)->template lift<utils::decayed_invoke_result_t<ResultSelectorFn, std::decay_t<Seed>>>(
Comment thread
AlexInLog marked this conversation as resolved.
Comment thread
AlexInLog marked this conversation as resolved.
Comment thread
AlexInLog marked this conversation as resolved.
reduce_impl<Type, std::decay_t<Seed>, std::decay_t<AccumulatorFn>, std::decay_t<ResultSelectorFn>>{
std::forward<Seed>(initial_seed),
std::forward<AccumulatorFn>(accumulator),
std::forward<ResultSelectorFn>(result_selector)});
}

template<typename Seed, reduce_accumulator<Seed, Type> AccumulatorFn, std::invocable<Seed&&> ResultSelectorFn = std::identity>
auto reduce(Seed&& initial_seed, AccumulatorFn&& accumulator, ResultSelectorFn&& result_selector = {}) && requires is_header_included<reduce_tag, Seed, AccumulatorFn, ResultSelectorFn>
{
return std::move(*static_cast<SpecificObservable*>(this)).template lift<utils::decayed_invoke_result_t<ResultSelectorFn, std::decay_t<Seed>>>(
Comment thread
AlexInLog marked this conversation as resolved.
Comment thread
AlexInLog marked this conversation as resolved.
Comment thread
AlexInLog marked this conversation as resolved.
reduce_impl<Type, std::decay_t<Seed>, std::decay_t<AccumulatorFn>, std::decay_t<ResultSelectorFn>>{
std::forward<Seed>(initial_seed),
std::forward<AccumulatorFn>(accumulator),
std::forward<ResultSelectorFn>(result_selector)});
}

/**
* \brief Calculates the average of emissions and emits final value
*
* \marble average
{
source observable : +--1-2-3-|
operator "average" : +--------2|
}
*
* \tparam CastBeforeDivide cast accumulated value to this type before division
* \return new specific_observable with the average operator as most recent operator.
* \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable
*
* \warning #include <rpp/operators/reduce.hpp>
*
* \par Example
* \snippet reduce.cpp average
*
* \ingroup transforming_operators
* \see https://reactivex.io/documentation/operators/average.html
*/
template<typename CastBeforeDivide = Type, typename ...Args>
auto average() const & requires (is_header_included<reduce_tag, CastBeforeDivide, Args...> && is_can_be_averaged<Type, CastBeforeDivide>)
{
return average_impl<CastBeforeDivide>(*static_cast<const SpecificObservable*>(this));
Comment thread
AlexInLog marked this conversation as resolved.
}

template<typename CastBeforeDivide = Type, typename ...Args>
auto average() && requires (is_header_included<reduce_tag, CastBeforeDivide, Args...> && is_can_be_averaged<Type, CastBeforeDivide>)
{
return average_impl<CastBeforeDivide>(std::move(*static_cast<SpecificObservable*>(this)));
Comment thread
AlexInLog marked this conversation as resolved.
}


/**
* \brief Calculates the sum of emissions and emits final value
*
* \marble sum
{
source observable : +--1-2-3-|
operator "sum" : +--------6|
}
*
* \return new specific_observable with the sum operator as most recent operator.
* \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable
*
* \warning #include <rpp/operators/reduce.hpp>
*
* \par Example
* \snippet reduce.cpp sum
*
* \ingroup transforming_operators
* \see https://reactivex.io/documentation/operators/sum.html
*/
template<typename ...Args>
auto sum() const & requires (is_header_included<reduce_tag, Args...> && is_can_be_summed<Type>)
{
return sum_impl(*static_cast<const SpecificObservable*>(this));
}

template<typename ...Args>
auto sum() && requires (is_header_included<reduce_tag, Args...> && is_can_be_summed<Type>)
{
return sum_impl(std::move(*static_cast<SpecificObservable*>(this)));
}

/**
* \brief Calculates the amount of emitted emissions and emits this count
*
* \marble count
{
source observable : +--1-2-3-|
operator "count" : +--------3|
}
*
* \return new specific_observable with the count operator as most recent operator.
* \warning #include <rpp/operators/reduce.hpp>
*
* \par Example
* \snippet reduce.cpp count
*
* \ingroup transforming_operators
* \see https://reactivex.io/documentation/operators/count.html
*/
template<typename ...Args>
auto count() const & requires is_header_included<reduce_tag, Args...>
{
return count_impl(*static_cast<const SpecificObservable*>(this));
}

template<typename ...Args>
auto count() && requires is_header_included<reduce_tag, Args...>
{
return count_impl(std::move(*static_cast<SpecificObservable*>(this)));
}

/**
* \brief Emits the emission which has minimal value from the whole observable
*
* \marble min
{
source observable : +-6-1-2-3-|
operator "min" : +---------1|
}
*
* \param comparator is function to deduce if left value is less than right
* \return new specific_observable with the min operator as most recent operator.
* \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable
*
* \warning #include <rpp/operators/reduce.hpp>
*
* \par Example
* \snippet reduce.cpp min
*
* \ingroup transforming_operators
* \see https://reactivex.io/documentation/operators/min.html
*/
template<std::strict_weak_order<Type, Type> Comparator = std::less<Type>, typename ...Args>
auto min(Comparator&& comparator = {}) const & requires is_header_included<reduce_tag, Comparator, Args...>
{
return min_impl(*static_cast<const SpecificObservable*>(this), std::forward<Comparator>(comparator));
}

template<std::strict_weak_order<Type, Type> Comparator = std::less<Type>, typename ...Args>
auto min(Comparator&& comparator = {}) && requires is_header_included<reduce_tag, Comparator, Args...>
{
return min_impl(std::move(*static_cast<SpecificObservable*>(this)), std::forward<Comparator>(comparator));
}

/**
* \brief Emits the emission which has maximal value from the whole observable
*
* \marble max
{
source observable : +-6-1-2-3-|
operator "max" : +---------6|
}
*
* \param comparator is function to deduce if left value is less than right
* \return new specific_observable with the max operator as most recent operator.
* \throws rpp::utils::not_enough_emissions in case of no any emissions from original observable
*
* \warning #include <rpp/operators/reduce.hpp>
*
* \par Example
* \snippet reduce.cpp max
*
* \ingroup transforming_operators
* \see https://reactivex.io/documentation/operators/max.html
*/
template<std::strict_weak_order<Type, Type> Comparator = std::less<Type>, typename ...Args>
auto max(Comparator&& comparator = {}) const & requires is_header_included<reduce_tag, Comparator, Args...>
{
return max_impl(*static_cast<const SpecificObservable*>(this), std::forward<Comparator>(comparator));
}

template<std::strict_weak_order<Type, Type> Comparator = std::less<Type>, typename ...Args>
auto max(Comparator&& comparator = {}) && requires is_header_included<reduce_tag, Comparator, Args...>
{
return max_impl(std::move(*static_cast<SpecificObservable*>(this)), std::forward<Comparator>(comparator));
}
};
} // namespace rpp::details
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/fwd/scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace rpp::details
{
// accepts Result and Type and returns Result
template<typename Fn, typename Result, typename Type>
concept scan_accumulator = std::is_invocable_r_v<std::decay_t<Result>, Fn, std::decay_t<Result>, std::decay_t<Type>>;
concept scan_accumulator = reduce_accumulator<Fn,Result,Type>;
Comment thread
AlexInLog marked this conversation as resolved.

template<constraint::decayed_type Type, constraint::decayed_type Result, scan_accumulator<Result, Type> AccumulatorFn>
Comment thread
AlexInLog marked this conversation as resolved.
struct scan_impl;
Expand Down
Loading