Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace rpp::operators
template<rpp::schedulers::constraint::scheduler Scheduler>
auto delay(rpp::schedulers::duration delay_duration, Scheduler&& scheduler);

auto distinct();

template<typename EqualityFn = rpp::utils::equal_to>
requires (!utils::is_not_template_callable<EqualityFn> || std::same_as<bool, std::invoke_result_t<EqualityFn, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>>)
auto distinct_until_changed(EqualityFn&& equality_fn = {});
Expand All @@ -49,6 +51,10 @@ namespace rpp::operators
requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto filter(Fn&& predicate);

template<typename Fn>
requires (!utils::is_not_template_callable<Fn> || rpp::constraint::observable<std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto flat_map(Fn&& callable);

template<typename KeySelector,
typename ValueSelector = std::identity,
typename KeyComparator = rpp::utils::less>
Expand Down Expand Up @@ -82,6 +88,13 @@ namespace rpp::operators

auto publish();

template<typename Seed, typename Accumulator>
requires (!utils::is_not_template_callable<Accumulator> || std::same_as<std::decay_t<Seed>, std::invoke_result_t<Accumulator, std::decay_t<Seed> &&, rpp::utils::convertible_to_any>>)
auto reduce(Seed&& seed, Accumulator&& accumulator);

template<typename Accumulator>
auto reduce(Accumulator&& accumulator);

auto ref_count();

auto repeat(size_t count);
Expand Down Expand Up @@ -133,9 +146,31 @@ namespace rpp::operators
template<rpp::constraint::observable TObservable>
auto take_until(TObservable&& until_observable);

template<std::invocable<const std::exception_ptr&> OnError = rpp::utils::empty_function_t<std::exception_ptr>>
auto tap(OnError&& on_error);

template<std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto tap(OnCompleted&& on_completed);

template<typename OnNext,
std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto tap(OnNext&& on_next,
OnCompleted&& on_completed);

template<typename OnNext = rpp::utils::empty_function_any_t,
std::invocable<const std::exception_ptr&> OnError = rpp::utils::empty_function_t<std::exception_ptr>,
std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto tap(OnNext&& on_next = {},
OnError&& on_error = {},
OnCompleted&& on_completed = {});

template<rpp::schedulers::constraint::scheduler Scheduler = rpp::schedulers::immediate>
auto throttle(rpp::schedulers::duration period);

template<typename Selector>
requires rpp::constraint::observable<std::invoke_result_t<Selector, std::exception_ptr>>
auto on_error_resume_next(Selector&& selector);

template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto with_latest_from(TSelector&& selector, TObservable&& observable, TObservables&&... observables);
Expand All @@ -148,6 +183,13 @@ namespace rpp::operators
template<rpp::constraint::observable TOpeningsObservable, typename TClosingsSelectorFn>
requires rpp::constraint::observable<std::invoke_result_t<TClosingsSelectorFn, rpp::utils::extract_observable_type_t<TOpeningsObservable>>>
auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector);

template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto zip(TSelector&& selector, TObservable&& observable, TObservables&&... observables);

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto zip(TObservable&& observable, TObservables&&... observables);
} // namespace rpp::operators

namespace rpp
Expand Down
18 changes: 9 additions & 9 deletions src/rpp/rpp/operators/tap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ namespace rpp::operators
* @ingroup utility_operators
* @see https://reactivex.io/documentation/operators/do.html
*/
template<std::invocable<const std::exception_ptr&> OnError = rpp::utils::empty_function_t<std::exception_ptr>>
template<std::invocable<const std::exception_ptr&> OnError /* = rpp::utils::empty_function_t<std::exception_ptr> */>
auto tap(OnError&& on_error)
{
using OnNext = rpp::utils::empty_function_any_t;
Expand All @@ -109,7 +109,7 @@ namespace rpp::operators
* @ingroup utility_operators
* @see https://reactivex.io/documentation/operators/do.html
*/
template<std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
template<std::invocable<> OnCompleted /* = rpp::utils::empty_function_t<> */>
auto tap(OnCompleted&& on_completed)
{
using OnNext = rpp::utils::empty_function_any_t;
Expand All @@ -131,7 +131,7 @@ namespace rpp::operators
* @see https://reactivex.io/documentation/operators/do.html
*/
template<typename OnNext,
std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
std::invocable<> OnCompleted /* = rpp::utils::empty_function_t<> */>
auto tap(OnNext&& on_next,
OnCompleted&& on_completed)
{
Expand All @@ -153,12 +153,12 @@ namespace rpp::operators
* @ingroup utility_operators
* @see https://reactivex.io/documentation/operators/do.html
*/
template<typename OnNext = rpp::utils::empty_function_any_t,
std::invocable<const std::exception_ptr&> OnError = rpp::utils::empty_function_t<std::exception_ptr>,
std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto tap(OnNext&& on_next = {},
OnError&& on_error = {},
OnCompleted&& on_completed = {})
template<typename OnNext /* = rpp::utils::empty_function_any_t */,
std::invocable<const std::exception_ptr&> OnError /* = rpp::utils::empty_function_t<std::exception_ptr> */,
std::invocable<> OnCompleted /* = rpp::utils::empty_function_t<> */>
auto tap(OnNext&& on_next /* = {} */,
OnError&& on_error /* = {} */,
OnCompleted&& on_completed /* = {} */)
{
return details::tap_t<std::decay_t<OnNext>, std::decay_t<OnError>, std::decay_t<OnCompleted>>{
std::forward<OnNext>(on_next),
Expand Down
23 changes: 21 additions & 2 deletions src/rpp/rpp/sources/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,31 @@ namespace rpp::source
requires constraint::observable<utils::iterable_value_t<Iterable>>
auto concat(Iterable&& iterable);

template<constraint::decayed_type Type>
auto never();
template<std::invocable Factory>
requires rpp::constraint::observable<std::invoke_result_t<Factory>>
auto defer(Factory&& observable_factory);

template<constraint::decayed_type Type>
auto error(std::exception_ptr err);

template<constraint::decayed_type Type>
auto empty();

template<schedulers::constraint::scheduler TScheduler>
auto interval(rpp::schedulers::duration initial, rpp::schedulers::duration period, TScheduler&& scheduler);

template<schedulers::constraint::scheduler TScheduler>
auto interval(rpp::schedulers::time_point initial, rpp::schedulers::duration period, TScheduler&& scheduler);

template<schedulers::constraint::scheduler TScheduler>
auto interval(rpp::schedulers::duration period, TScheduler&& scheduler);

template<constraint::decayed_type Type>
auto never();

template<schedulers::constraint::scheduler TScheduler>
auto timer(rpp::schedulers::duration when, TScheduler&& scheduler);

template<schedulers::constraint::scheduler TScheduler>
auto timer(rpp::schedulers::time_point when, TScheduler&& scheduler);
} // namespace rpp::source