Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/fwd/filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct member_overload<Type, SpecificObservable, filter_tag>
* - <b>On subscribe</b>
* - None
* - <b>OnNext</b>
* - Just forwards emission of predicate returns true
* - Just forwards emission when predicate returns true
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted</b>
Expand Down
58 changes: 36 additions & 22 deletions src/rpp/rpp/operators/fwd/scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,42 @@ struct scan_impl;
template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, scan_tag>
{
/**
* \brief Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value
*
* \marble scan
{
source observable : +--1-2-3-|
operator "scan: s=1, (s,x)=>s+x" : +--2-4-7-|
}
*
* \param initial_value initial value for seed which will be applied for first value from observable (instead of emitting this as first value). Then it will be replaced with result and etc.
* \param accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference.
*
* \return new specific_observable with the scan operator as most recent operator.
* \warning #include <rpp/operators/scan.hpp>
*
* \par Example
* \snippet scan.cpp scan
* \snippet scan.cpp scan_vector
*
* \ingroup transforming_operators
* \see https://reactivex.io/documentation/operators/scan.html
*/
/**
* \brief Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value
*
* \marble scan
{
source observable : +--1-2-3-|
operator "scan: s=1, (s,x)=>s+x" : +--2-4-7-|
}
*
* \details Acttually this operator applies provided accumulator function to seed and new emission, emits resulting value and updates seed value for next emission
*
* \param initial_value initial value for seed which will be applied for first value from observable (instead of emitting this as first value). Then it will be replaced with result and etc.
* \param accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference.
*
* \return new specific_observable with the scan operator as most recent operator.
* \warning #include <rpp/operators/scan.hpp>
*
* \par Example
* \snippet scan.cpp scan
* \snippet scan.cpp scan_vector
*
* \par Implementation details:
* - <b>On subscribe</b>
* - Allocates one `shared_ptr` to store seed
* - <b>OnNext</b>
* - Applies accumulator to each emission
* - Updates seed value
* - Emits new seed value
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted</b>
* - Just forwards original on_completed
*
* \ingroup transforming_operators
* \see https://reactivex.io/documentation/operators/scan.html
*/
template<typename Result, scan_accumulator<Result, Type> AccumulatorFn>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no template named scan_accumulator

auto scan(Result&& initial_value, AccumulatorFn&& accumulator) const & requires is_header_included<scan_tag, Result, AccumulatorFn>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
unknown type name AccumulatorFn

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
template argument for template type parameter must be a type

{
Expand Down
80 changes: 47 additions & 33 deletions src/rpp/rpp/operators/fwd/skip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,54 @@ namespace rpp::details

namespace rpp::details
{
template<constraint::decayed_type Type>
struct skip_impl;
template<constraint::decayed_type Type>
struct skip_impl;

template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, skip_tag>
template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, skip_tag>
{
/**
* \brief Skip first `count` items provided by observable then send rest items as expected
*
* \marble skip
{
source observable : +--1-2-3-4-5-6-|
operator "skip(3)" : +--------4-5-6-|
}
*
* \details Actually this operator just decrements counter and starts to forward emissions when counter reaches zero.
*
* \param count amount of items to be skipped
* \return new specific_observable with the skip operator as most recent operator.
* \warning #include <rpp/operators/skip.hpp>
*
* \par Example:
* \snippet skip.cpp skip
*
* \par Implementation details:
* - <b>On subscribe</b>
* - Allocates one `shared_ptr` to store counter
* - <b>OnNext</b>
* - Forwards emission if counter is zero
* - Decrements counter if not zero
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted</b>
* - Just forwards original on_completed
*
* \ingroup filtering_operators
* \see https://reactivex.io/documentation/operators/skip.html
*/
template<typename...Args>
auto skip(size_t count) const& requires is_header_included<skip_tag, Args...>
{
/**
* \brief Skip first `count` items provided by observable then send rest items as expected
*
* \marble skip
{
source observable : +--1-2-3-4-5-6-|
operator "skip(3)" : +--------4-5-6-|
}
* \param count amount of items to be skipped
* \return new specific_observable with the skip operator as most recent operator.
* \warning #include <rpp/operators/skip.hpp>
*
* \par Example:
* \snippet skip.cpp skip
*
* \ingroup filtering_operators
* \see https://reactivex.io/documentation/operators/skip.html
*/
template<typename...Args>
auto skip(size_t count) const& requires is_header_included<skip_tag, Args...>
{
return static_cast<const SpecificObservable*>(this)->template lift<Type>(skip_impl<Type>{count});
}
return static_cast<const SpecificObservable*>(this)->template lift<Type>(skip_impl<Type>{count});
}

template<typename...Args>
auto skip(size_t count) && requires is_header_included<skip_tag, Args...>
{
return std::move(*static_cast<SpecificObservable*>(this)).template lift<Type>(skip_impl<Type>{count});
}
};
template<typename...Args>
auto skip(size_t count) && requires is_header_included<skip_tag, Args...>
{
return std::move(*static_cast<SpecificObservable*>(this)).template lift<Type>(skip_impl<Type>{count});
}
};
} // namespace rpp::details
4 changes: 2 additions & 2 deletions src/rpp/rpp/operators/fwd/start_with.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct member_overload<Type, SpecificObservable, start_with_tag>
operator "start_with(1,2,3)" : +-1-2-3--4--6-|
}
*
* \details Actually it makes concat but arguments passed before current observable
* \details Actually it makes concat(rpp::source::just(vals_to_start_with)..., current_observable) so observables from argument subscribed before current observable
*
* \tparam memory_model memory_model strategy used to store provided values
* \param vals list of values which should be emitted before current observable
Expand Down Expand Up @@ -74,7 +74,7 @@ struct member_overload<Type, SpecificObservable, start_with_tag>
operator "start_with(-1-2-3-|)" : +--1-2-3--4--6-|
}
*
* \details Actually it makes concat but arguments passed before current observable
* \details Actually it makes concat(observables_to_start_with..., current_observable) so observables from argument subscribed before current observable
*
* \param observables list of observables which should be used before current observable
*
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/fwd/subscribe_on.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ struct member_overload<Type, SpecificObservable, subscribe_on_tag>
{
/**
* \brief OnSubscribe function for this observable will be scheduled via provided scheduler
*
* \details Actually this operator just schedules subscription on original observable to provided scheduler
*
* \param scheduler is scheduler used for scheduling of OnSubscribe
* \return new specific_observable with the subscribe_on operator as most recent operator.
Expand Down
87 changes: 50 additions & 37 deletions src/rpp/rpp/operators/fwd/switch_on_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,58 @@ namespace rpp::details

namespace rpp::details
{
template<constraint::decayed_type Type>
struct switch_on_next_impl;
template<constraint::decayed_type Type>
struct switch_on_next_impl;

template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, switch_on_next_tag>
{
/**
* \brief Converts observable of observables into observable of values which emits values from most recent underlying observable till new observable obtained
*
* \marble switch_on_next
{
source observable :
{
+--1-2-3-5--|
.....+4--6-9|
.......+7-8-|
}
operator "switch_on_next" : +--1-24-7-8|
}
*
* \return new specific_observable with the switch_on_next operator as most recent operator.
* \warning #include <rpp/operators/switch_on_next.hpp>
*
* \par Example:
* \snippet switch_on_next.cpp switch_on_next
*
* \ingroup combining_operators
* \see https://reactivex.io/documentation/operators/switch.html
*/
template<typename ...Args>
auto switch_on_next() const& requires (is_header_included<switch_on_next_tag, Args...>&& rpp::constraint::observable<Type>)
template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, switch_on_next_tag>
{
/**
* \brief Converts observable of observables into observable of values which emits values from most recent underlying observable till new observable obtained
*
* \marble switch_on_next
{
return static_cast<const SpecificObservable*>(this)->template lift<utils::extract_observable_type_t<Type>>(switch_on_next_impl<Type>());
source observable :
{
+--1-2-3-5--|
.....+4--6-9|
.......+7-8-|
}
operator "switch_on_next" : +--1-24-7-8|
}
*
* \details Actually this operator just unsubscribes from previous observable and subscribes on new observable when obtained in `on_next`
*
* \return new specific_observable with the switch_on_next operator as most recent operator.
* \warning #include <rpp/operators/switch_on_next.hpp>
*
* \par Example:
* \snippet switch_on_next.cpp switch_on_next
*
* \par Implementation details:
* - <b>On subscribe</b>
* - Allocates one `shared_ptr` to store internal state
* - <b>OnNext</b>
* - Unsubscribed from previous observable (if any)
* - Subscribed on new emitted observable
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted</b>
* - Just forwards original on_completed if no any active inner observable or original observable yet
*
* \ingroup combining_operators
* \see https://reactivex.io/documentation/operators/switch.html
*/
template<typename ...Args>
auto switch_on_next() const& requires (is_header_included<switch_on_next_tag, Args...>&& rpp::constraint::observable<Type>)
{
return static_cast<const SpecificObservable*>(this)->template lift<utils::extract_observable_type_t<Type>>(switch_on_next_impl<Type>());
}

template<typename ...Args>
auto switch_on_next() && requires (is_header_included<switch_on_next_tag, Args...>&& rpp::constraint::observable<Type>)
{
return std::move(*static_cast<SpecificObservable*>(this)).template lift<utils::extract_observable_type_t<Type>>(switch_on_next_impl<Type>());
}
};
template<typename ...Args>
auto switch_on_next() && requires (is_header_included<switch_on_next_tag, Args...>&& rpp::constraint::observable<Type>)
{
return std::move(*static_cast<SpecificObservable*>(this)).template lift<utils::extract_observable_type_t<Type>>(switch_on_next_impl<Type>());
}
};
} // namespace rpp::details
50 changes: 32 additions & 18 deletions src/rpp/rpp/operators/fwd/take.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,38 @@ struct take_impl;
template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, take_tag>
{
/**
* \brief Emit only first `count` items provided by observable, then send `on_completed`
*
* \marble take
{
source observable : +--1-2-3-4-5-6-|
operator "take(3)" : +--1-2-3|
}
* \param count amount of items to be emitted. 0 - instant complete
* \return new specific_observable with the Take operator as most recent operator.
* \warning #include <rpp/operators/take.hpp>
*
* \par Example:
* \snippet take.cpp take
*
* \ingroup filtering_operators
* \see https://reactivex.io/documentation/operators/take.html
*/
/**
* \brief Emit only first `count` items provided by observable, then send `on_completed`
*
* \marble take
{
source observable : +--1-2-3-4-5-6-|
operator "take(3)" : +--1-2-3|
}
* \details Actually this operator just emits emissions while counter is not zero and decrements counter on each emission
*
* \param count amount of items to be emitted. 0 - instant complete
* \return new specific_observable with the Take operator as most recent operator.
* \warning #include <rpp/operators/take.hpp>
*
* \par Example:
* \snippet take.cpp take
*
* \par Implementation details:
* - <b>On subscribe</b>
* - Allocate one `shared_ptr` to store counter
* - <b>OnNext</b>
* - Just forwards emission if counter is not zero
* - Decrements counter if not zero
* - If counter reached zero, then emits OnCompleted
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted</b>
* - Just forwards original on_completed
*
* \ingroup filtering_operators
* \see https://reactivex.io/documentation/operators/take.html
*/
template<typename...Args>
auto take(size_t count) const & requires is_header_included<take_tag, Args...>
{
Expand Down
51 changes: 32 additions & 19 deletions src/rpp/rpp/operators/fwd/take_last.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,38 @@ struct take_last_impl;
template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, take_last_tag>
{
/**
* \brief Emit only last `count` items provided by observable, then send `on_completed`
*
* \marble take_last
{
source observable : +--1-2-3-4-5-6-|
operator "take_last(3)" : +--------------456|
}
*
* \param count amount of last items to be emitted
* \return new specific_observable with the take_last operator as most recent operator.
* \warning #include <rpp/operators/take_last.hpp>
*
* \par Example
* \snippet take_last.cpp take_last
*
* \ingroup filtering_operators
* \see https://reactivex.io/documentation/operators/takelast.html
*/
/**
* \brief Emit only last `count` items provided by observable, then send `on_completed`
*
* \marble take_last
{
source observable : +--1-2-3-4-5-6-|
operator "take_last(3)" : +--------------456|
}
*
* \details Actually this operator has buffer of requested size inside, keeps last `count` values and emit stored values on `on_completed`
*
* \param count amount of last items to be emitted
* \return new specific_observable with the take_last operator as most recent operator.
* \warning #include <rpp/operators/take_last.hpp>
*
* \par Example
* \snippet take_last.cpp take_last
*
* \par Implementation details:
* - <b>On subscribe</b>
* - Allocates one `shared_ptr` to store internal buffer
* - <b>OnNext</b>
* - Place obtained value into queue
* - If queue contains more values than expected - remove oldest one
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted</b>
* - Emits values stored in queue
*
* \ingroup filtering_operators
* \see https://reactivex.io/documentation/operators/takelast.html
*/
template<typename ...Args>
auto take_last(size_t count) const & requires is_header_included<take_last_tag, Args...>
{
Expand Down
Loading