Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions src/rpp/rpp/operators/first.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ using first_on_next = take_on_next;

struct first_on_completed
{
void operator()(const constraint::subscriber auto& subscriber, const first_state& state) const
void operator()(const constraint::subscriber auto& subscriber, const first_state&) const
{
if (state.count != 0)
subscriber.on_error(std::make_exception_ptr(utils::not_enough_emissions{"first() operator expects at least one emission from observable before completion"}));
subscriber.on_error(std::make_exception_ptr(utils::not_enough_emissions{"first() operator expects at least one emission from observable before completion"}));
}
};

Expand All @@ -50,12 +49,12 @@ struct first_impl
auto subscription = subscriber.get_subscription();

// dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state
return create_subscriber_with_state<Type>(std::move(subscription),
first_on_next{},
utils::forwarding_on_error{},
first_on_completed{},
std::forward<TSub>(subscriber),
first_state{});
return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
first_on_next{},
utils::forwarding_on_error{},
first_on_completed{},
std::forward<TSub>(subscriber),
first_state{});
}
};
} // namespace rpp::details
14 changes: 12 additions & 2 deletions src/rpp/rpp/operators/fwd/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, buffer_tag>
{
/**
* \brief periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting
* \brief Periodically gather emissions emitted by an original Observable into bundles and emit these bundles rather than emitting
* the items one at a time
*
* \marble buffer
Expand All @@ -41,14 +41,24 @@ struct member_overload<Type, SpecificObservable, buffer_tag>
operator "buffer(2)" : +---{1,2}-{3}-|
}
*
* \details the resulting bundle is std::vector. Actually it is similar to `window` but it emits vectors instead of observables.
* \details The resulting bundle is `std::vector<Type>` of requested size. Actually it is similar to `window()` operator, but it emits vectors instead of observables.
*
* \param count number of items being bundled.
* \return new specific_observable with the buffer operator as most recent operator.
* \warning #include <rpp/operators/buffer.hpp>
*
* \par Example:
* \snippet buffer.cpp buffer
*
* \par Implementation details:
* - <b>On subscribe</b>
* - Allocates one `shared_ptr` to store `std::vector<Type>` of requested size.
* - <b>OnNext</b>
* - Accumulates emissions inside current bundle and emits this bundle when requested cound reached and starts new bundle.
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted</b>
* - Emits current active bundle (if any) and just forwards on_completed
*
* \ingroup transforming_operators
* \see https://reactivex.io/documentation/operators/buffer.html
Expand Down
111 changes: 69 additions & 42 deletions src/rpp/rpp/operators/fwd/combine_latest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,41 @@ template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, combine_latest_tag>
{

/**
* \brief Combines latest emissions from current observable and other observables when any of them emits.
* \warning According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses mutex to satisfy this requirement
*
* \marble combine_latest_custom_combiner
{
source observable : +---1 -- -- -2 -- -3 -|
source other_observable : +-5-- -6 -7 -- -8 - -|
operator "combine_latest: x,y =>std::pair{x,y}" : +---{1,5}-{1,6}-{1,7}-{2,7}-{2,8}-{3,8}-|
}
*
* \param combiner combines emissions from all the observables using custom composition.
* \param observables are observables whose emissions would be combined with the current observable's emissions
* \return new specific_observable with the combine_latest operator as most recent operator.
* \warning #include <rpp/operators/combine_latest.hpp>
*
* \par Examples
* \snippet combine_latest.cpp combine_latest custom combiner
*
* \ingroup combining_operators
* \see https://reactivex.io/documentation/operators/combinelatest.html
*/
/**
* \brief Combines latest emissions from original observable and other observables when any of them emits.
* \warning According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses `std::mutex` to satisfy this requirement
*
* \marble combine_latest_custom_combiner
{
source observable : +---1 -- -- -2 -- -3 -|
source other_observable : +-5-- -6 -7 -- -8 - -|
operator "combine_latest: x,y =>std::pair{x,y}" : +---{1,5}-{1,6}-{1,7}-{2,7}-{2,8}-{3,8}-|
}
* \details Actually this operator subscribes on all of theses observables and emits new combined value when any of them emits new emission (and each observable emit values at least one to be able to provide combined value)
*
* \param combiner combines emissions from all the observables using custom composition.
* \param observables are observables whose emissions would be combined with the current observable's emissions
* \return new specific_observable with the combine_latest operator as most recent operator.
* \warning #include <rpp/operators/combine_latest.hpp>
*
* \par Examples
* \snippet combine_latest.cpp combine_latest custom combiner
*
* \par Implementation details:
* - <b>On subscribe</b>
* - Allocates one `shared_ptr` to store last emissions.
* - Wraps subscriber with serialization logic to be sure callbacks called serialized
* - <b>OnNext</b>
* - Keeps last emission from each observable
* - Applies combiner function and emits result if there is last emissions for each observable
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted</b>
* - Just forwards original on_completed
*
* \ingroup combining_operators
* \see https://reactivex.io/documentation/operators/combinelatest.html
*/
template<constraint::observable ...TOtherObservable, std::invocable<Type, utils::extract_observable_type_t<TOtherObservable>...> TCombiner>
auto combine_latest(TCombiner&& combiner, TOtherObservable&&...observables) const& requires is_header_included<combine_latest_tag, TOtherObservable...>
{
Expand All @@ -78,26 +91,40 @@ struct member_overload<Type, SpecificObservable, combine_latest_tag>
});
}

/**
* \brief Combines latest emissions from current observable and other observables when any of them emits. The combining result is std::tuple<...>.
*
* \marble combine_latest
{
source observable : +---1 -- -- -2 -- -3 -|
source other_observable : +-5-- -6 -7 -- -8 - -|
operator "combine_latest:tuple" : +---{1,5}-{1,6}-{1,7}-{2,7}-{2,8}-{3,8}-|
}
*
* \param observables are observables whose emissions would be combined with the current observable's emissions
* \return new specific_observable with the combine_latest operator as most recent operator.
* \warning #include <rpp/operators/combine_latest.hpp>
*
* \par Examples
* \snippet combine_latest.cpp combine_latest custom combiner
*
* \ingroup combining_operators
* \see https://reactivex.io/documentation/operators/combinelatest.html
*/
/**
* \brief Combines latest emissions from current observable and other observables when any of them emits. The combining result is std::tuple<...>.
*
* \marble combine_latest
{
source observable : +---1 -- -- -2 -- -3 -|
source other_observable : +-5-- -6 -7 -- -8 - -|
operator "combine_latest:tuple" : +---{1,5}-{1,6}-{1,7}-{2,7}-{2,8}-{3,8}-|
}
*
* \details Actually this operator subscribes on all of theses observables and emits `std::tuple` of last emissions when any of them emits new emission (and each observable emit values at least one to be able to provide combined value)
*
* \param observables are observables whose emissions would be combined with the current observable's emissions
* \return new specific_observable with the combine_latest operator as most recent operator.
* \warning #include <rpp/operators/combine_latest.hpp>
*
* \par Examples
* \snippet combine_latest.cpp combine_latest custom combiner
*
* \par Implementation details:
* - <b>On subscribe</b>
* - Allocates one `shared_ptr` to store last emissions.
* - Wraps subscriber with serialization logic to be sure callbacks called serialized
* - <b>OnNext</b>
* - Keeps last emission from each observable
* - Emits `std::tuple` of last emissions if there is last emissions for each observable
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted</b>
* - Just forwards original on_completed
*
* \ingroup combining_operators
* \see https://reactivex.io/documentation/operators/combinelatest.html
*/
template<constraint::observable ...TOtherObservable>
auto combine_latest(TOtherObservable&&...observables) const& requires is_header_included<combine_latest_tag, TOtherObservable...>
{
Expand Down
76 changes: 53 additions & 23 deletions src/rpp/rpp/operators/fwd/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,43 @@ auto concat_with_impl(TObservables&&... observables);
template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, concat_tag>
{
/**
* \brief Converts observable of observables of items into observable of items via merging emissions but without overlapping (current observable completes THEN next started to emit its values)
*
* \marble concat
{
source observable :
{
+--1-2-3-|
.....+4--6-|
}
operator "concat" : +--1-2-3-4--6-|
/**
* \brief Converts observable of observables of items into observable of items via merging emissions but without overlapping (current observable completes THEN next started to emit its values)
*
* \marble concat
{
source observable :
{
+--1-2-3-|
.....+4--6-|
}
*
* \return new specific_observable with the concat operator as most recent operator.
* \warning #include <rpp/operators/concat.hpp>
*
* \par Example
* \snippet concat.cpp concat
*
* \ingroup aggregate_operators
* \see https://reactivex.io/documentation/operators/concat.html
*/
operator "concat" : +--1-2-3-4--6-|
}
*
* \details Actually it subscribes on first observable from emissions. When first observable completes, then it subscribes on second observable from emissions and etc...
*
* \return new specific_observable with the concat operator as most recent operator.
* \warning #include <rpp/operators/concat.hpp>
*
* \par Example
* \snippet concat.cpp concat
*
* \par Implementation details:
* - <b>On subscribe</b>
* - Allocates one `shared_ptr` to store observables (== emissions) and some internal variables
* - Wraps subscriber with serialization logic to be sure callbacks called serialized
* - <b>OnNext for original observable</b>
* - If no any active observable, then subscribes on new obtained observable, else place it in queue
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted from original observable</b>
* - Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber)
* - <b>OnCompleted from inner observable</b>
* - Subscribe on next observable from queue (if any)
*
* \ingroup aggregate_operators
* \see https://reactivex.io/documentation/operators/concat.html
*/
template<typename ...Args>
auto concat() const& requires (is_header_included<concat_tag, Args...>&& rpp::constraint::observable<Type>)
{
Expand All @@ -70,16 +85,31 @@ struct member_overload<Type, SpecificObservable, concat_tag>
* \marble concat_with
{
source original_observable: +--1-2-3-|
source second: +-----4--6-|
operator "concat_with" : +--1-2-3------4--6-|
source second: +-4--6-|
operator "concat_with" : +--1-2-3--4--6-|
}
*
* \details Actually this operator subscribes on original observable. When original observable completes, then it subscribes on first observable from arguments and etc...
*
* \return new specific_observable with the concat operator as most recent operator.
* \warning #include <rpp/operators/concat.hpp>
*
* \par Example
* \snippet concat.cpp concat_with
*
* \par Implementation details:
* - <b>On subscribe</b>
* - Allocates one `shared_ptr` to store observables (== emissions) and some internal variables
* - Wraps subscriber with serialization logic to be sure callbacks called serialized
* - <b>OnNext</b>
* - Just forwards on_next
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted from original observable</b>
* - Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber)
* - <b>OnCompleted from inner observable</b>
* - Subscribe on next observable from queue (if any)
*
* \ingroup aggregate_operators
* \see https://reactivex.io/documentation/operators/concat.html
*/
Expand Down
55 changes: 36 additions & 19 deletions src/rpp/rpp/operators/fwd/debounce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,42 @@ struct debounce_impl;
template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, debounce_tag>
{
/**
* \brief Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset.
*
* \marble debounce
{
source observable : +--1-2-----3---|
operator "debounce(4)" : +--------2-----3|
}
* \param period is duration of time should be passed since emission from original observable without any new emissions to emit this emission.
* \param scheduler is scheduler used to run timer for debounce
* \return new specific_observable with the debounce operator as most recent operator.
* \warning #include <rpp/operators/debounce.hpp>
*
* \par Example
* \snippet debounce.cpp debounce
*
* \ingroup utility_operators
* \see https://reactivex.io/documentation/operators/debounce.html
*/
/**
* \brief Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset.
*
* \marble debounce
{
source observable : +--1-2-----3---|
operator "debounce(4)" : +--------2-----3|
}
*
* \details Actually this operator resets time of last emission, schedules action to send this emission after specified period if no any new emissions till this moment.
*
* \param period is duration of time should be passed since emission from original observable without any new emissions to emit this emission.
* \param scheduler is scheduler used to run timer for debounce
* \return new specific_observable with the debounce operator as most recent operator.
* \warning #include <rpp/operators/debounce.hpp>
*
* \par Example
* \snippet debounce.cpp debounce
*
* \par Implementation details:
* - <b>On subscribe</b>
* - Allocates one `shared_ptr` to store last emission and time.
* - Wraps subscriber with serialization logic to prevent race-conditions
* - <b>OnNext</b>
* - Saves time when emission happened
* - Saves emission
* - Schedule action to send this emission with check if no any new emissions
* - <b>OnError</b>
* - Just forwards original on_error
* - <b>OnCompleted</b>
* - Just forwards original on_completed
* - Immediately send current active emission if any
*
* \ingroup utility_operators
* \see https://reactivex.io/documentation/operators/debounce.html
*/
template<schedulers::constraint::scheduler TScheduler>
auto debounce(schedulers::duration period,const TScheduler& scheduler = TScheduler{}) const & requires is_header_included<debounce_tag, TScheduler>
{
Expand Down
Loading