From 77badb11b26be2d26e9a3545becf286fe1320f72 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 21 Dec 2022 23:54:45 +0300 Subject: [PATCH 1/9] buffer --- src/rpp/rpp/operators/fwd/buffer.hpp | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/fwd/buffer.hpp b/src/rpp/rpp/operators/fwd/buffer.hpp index 4e3354f04..78b7b265b 100644 --- a/src/rpp/rpp/operators/fwd/buffer.hpp +++ b/src/rpp/rpp/operators/fwd/buffer.hpp @@ -32,7 +32,7 @@ template struct member_overload { /** - * \brief periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting + * \brief Periodically gather emissions emitted by an original Observable into bundles and emit these bundles rather than emitting * the items one at a time * * \marble buffer @@ -41,7 +41,7 @@ struct member_overload operator "buffer(2)" : +---{1,2}-{3}-| } * - * \details the resulting bundle is std::vector. Actually it is similar to `window` but it emits vectors instead of observables. + * \details The resulting bundle is `std::vector` of requested size. Actually it is similar to `window()` operator, but it emits vectors instead of observables. * * \param count number of items being bundled. * \return new specific_observable with the buffer operator as most recent operator. @@ -49,6 +49,16 @@ struct member_overload * * \par Example: * \snippet buffer.cpp buffer + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store `std::vector` of requested size. + * - OnNext + * - Accumulates emissions inside current bundle and emits this bundle when requested cound reached and starts new bundle. + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Emits current active bundle (if any) and just forwards on_completed * * \ingroup transforming_operators * \see https://reactivex.io/documentation/operators/buffer.html From 8f69f86699ace7bf408b3ee4082e386cbd62d95d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 22 Dec 2022 00:02:36 +0300 Subject: [PATCH 2/9] combine latest --- src/rpp/rpp/operators/fwd/combine_latest.hpp | 110 ++++++++++++------- 1 file changed, 68 insertions(+), 42 deletions(-) diff --git a/src/rpp/rpp/operators/fwd/combine_latest.hpp b/src/rpp/rpp/operators/fwd/combine_latest.hpp index 9a1183a66..ae3b1eba9 100644 --- a/src/rpp/rpp/operators/fwd/combine_latest.hpp +++ b/src/rpp/rpp/operators/fwd/combine_latest.hpp @@ -32,28 +32,41 @@ template struct member_overload { - /** - * \brief Combines latest emissions from current observable and other observables when any of them emits. - * \warning According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses mutex to satisfy this requirement - * - * \marble combine_latest_custom_combiner - { - source observable : +---1 -- -- -2 -- -3 -| - source other_observable : +-5-- -6 -7 -- -8 - -| - operator "combine_latest: x,y =>std::pair{x,y}" : +---{1,5}-{1,6}-{1,7}-{2,7}-{2,8}-{3,8}-| - } - * - * \param combiner combines emissions from all the observables using custom composition. - * \param observables are observables whose emissions would be combined with the current observable's emissions - * \return new specific_observable with the combine_latest operator as most recent operator. - * \warning #include - * - * \par Examples - * \snippet combine_latest.cpp combine_latest custom combiner - * - * \ingroup combining_operators - * \see https://reactivex.io/documentation/operators/combinelatest.html - */ + /** + * \brief Combines latest emissions from original observable and other observables when any of them emits. + * \warning According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses `std::mutex` to satisfy this requirement + * + * \marble combine_latest_custom_combiner + { + source observable : +---1 -- -- -2 -- -3 -| + source other_observable : +-5-- -6 -7 -- -8 - -| + operator "combine_latest: x,y =>std::pair{x,y}" : +---{1,5}-{1,6}-{1,7}-{2,7}-{2,8}-{3,8}-| + } + * \details Actually this operator subscribes on all of theses observables and emits new combined value when any of them emits new emission (and each observable emit values at least one to be able to provide combined value) + * + * \param combiner combines emissions from all the observables using custom composition. + * \param observables are observables whose emissions would be combined with the current observable's emissions + * \return new specific_observable with the combine_latest operator as most recent operator. + * \warning #include + * + * \par Examples + * \snippet combine_latest.cpp combine_latest custom combiner + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store last emissions. + * - Wraps subscriber with serialization logic to be sure callbacks called serialized + * - OnNext + * - Keeps last emission from each observable + * - Applies combiner function and emits result if there is last emissions for each observable + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * + * \ingroup combining_operators + * \see https://reactivex.io/documentation/operators/combinelatest.html + */ template...> TCombiner> auto combine_latest(TCombiner&& combiner, TOtherObservable&&...observables) const& requires is_header_included { @@ -78,26 +91,39 @@ struct member_overload }); } - /** - * \brief Combines latest emissions from current observable and other observables when any of them emits. The combining result is std::tuple<...>. - * - * \marble combine_latest - { - source observable : +---1 -- -- -2 -- -3 -| - source other_observable : +-5-- -6 -7 -- -8 - -| - operator "combine_latest:tuple" : +---{1,5}-{1,6}-{1,7}-{2,7}-{2,8}-{3,8}-| - } - * - * \param observables are observables whose emissions would be combined with the current observable's emissions - * \return new specific_observable with the combine_latest operator as most recent operator. - * \warning #include - * - * \par Examples - * \snippet combine_latest.cpp combine_latest custom combiner - * - * \ingroup combining_operators - * \see https://reactivex.io/documentation/operators/combinelatest.html - */ + /** + * \brief Combines latest emissions from current observable and other observables when any of them emits. The combining result is std::tuple<...>. + * + * \marble combine_latest + { + source observable : +---1 -- -- -2 -- -3 -| + source other_observable : +-5-- -6 -7 -- -8 - -| + operator "combine_latest:tuple" : +---{1,5}-{1,6}-{1,7}-{2,7}-{2,8}-{3,8}-| + } + * \details Actually this operator subscribes on all of theses observables and emits `std::tuple` of last emissions when any of them emits new emission (and each observable emit values at least one to be able to provide combined value) + * + * \param observables are observables whose emissions would be combined with the current observable's emissions + * \return new specific_observable with the combine_latest operator as most recent operator. + * \warning #include + * + * \par Examples + * \snippet combine_latest.cpp combine_latest custom combiner + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store last emissions. + * - Wraps subscriber with serialization logic to be sure callbacks called serialized + * - OnNext + * - Keeps last emission from each observable + * - Emits `std::tuple` of last emissions if there is last emissions for each observable + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * + * \ingroup combining_operators + * \see https://reactivex.io/documentation/operators/combinelatest.html + */ template auto combine_latest(TOtherObservable&&...observables) const& requires is_header_included { From 7bf8b00a467b422763a6569ca05410b603c0a016 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 22 Dec 2022 00:13:27 +0300 Subject: [PATCH 3/9] concat --- src/rpp/rpp/operators/fwd/concat.hpp | 72 +++++++++++++++++++--------- 1 file changed, 49 insertions(+), 23 deletions(-) diff --git a/src/rpp/rpp/operators/fwd/concat.hpp b/src/rpp/rpp/operators/fwd/concat.hpp index 32da7f54d..cfb1dfdb5 100644 --- a/src/rpp/rpp/operators/fwd/concat.hpp +++ b/src/rpp/rpp/operators/fwd/concat.hpp @@ -30,28 +30,41 @@ auto concat_with_impl(TObservables&&... observables); template struct member_overload { - /** - * \brief Converts observable of observables of items into observable of items via merging emissions but without overlapping (current observable completes THEN next started to emit its values) - * - * \marble concat - { - source observable : - { - +--1-2-3-| - .....+4--6-| - } - operator "concat" : +--1-2-3-4--6-| + /** + * \brief Converts observable of observables of items into observable of items via merging emissions but without overlapping (current observable completes THEN next started to emit its values) + * + * \marble concat + { + source observable : + { + +--1-2-3-| + .....+4--6-| } - * - * \return new specific_observable with the concat operator as most recent operator. - * \warning #include - * - * \par Example - * \snippet concat.cpp concat - * - * \ingroup aggregate_operators - * \see https://reactivex.io/documentation/operators/concat.html - */ + operator "concat" : +--1-2-3-4--6-| + } + * \details Actually it subscribes on first observable from emissions. When first observable completes, then it subscribes on second observable from emissions and etc... + * + * \return new specific_observable with the concat operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet concat.cpp concat + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store observables (== emissions) and some internal variables + * - Wraps subscriber with serialization logic to be sure callbacks called serialized + * - OnNext + * - If no any active observable, then subscribes on new obtained observable, else place it in queue + * - When active observable completes, then subscribe on next observable from queue (if any) + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber) + * + * \ingroup aggregate_operators + * \see https://reactivex.io/documentation/operators/concat.html + */ template auto concat() const& requires (is_header_included&& rpp::constraint::observable) { @@ -70,9 +83,10 @@ struct member_overload * \marble concat_with { source original_observable: +--1-2-3-| - source second: +-----4--6-| - operator "concat_with" : +--1-2-3------4--6-| + source second: +-4--6-| + operator "concat_with" : +--1-2-3--4--6-| } + * \details Actually it subscribes on original observable. When original observable completes, then it subscribes on first observable from arguments and etc... * * \return new specific_observable with the concat operator as most recent operator. * \warning #include @@ -80,6 +94,18 @@ struct member_overload * \par Example * \snippet concat.cpp concat_with * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store observables (== emissions) and some internal variables + * - Wraps subscriber with serialization logic to be sure callbacks called serialized + * - OnNext + * - If no any active observable, then subscribes on new obtained observable, else place it in queue + * - When active observable completes, then subscribe on next observable from queue (if any) + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber) + * * \ingroup aggregate_operators * \see https://reactivex.io/documentation/operators/concat.html */ From 96fdcedb268077ddb3fc8a7a3a74759a08d268d5 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 23 Dec 2022 23:26:20 +0300 Subject: [PATCH 4/9] add debounce --- src/rpp/rpp/operators/fwd/concat.hpp | 9 +++-- src/rpp/rpp/operators/fwd/debounce.hpp | 54 +++++++++++++++++--------- 2 files changed, 40 insertions(+), 23 deletions(-) diff --git a/src/rpp/rpp/operators/fwd/concat.hpp b/src/rpp/rpp/operators/fwd/concat.hpp index cfb1dfdb5..b3424f6ed 100644 --- a/src/rpp/rpp/operators/fwd/concat.hpp +++ b/src/rpp/rpp/operators/fwd/concat.hpp @@ -86,7 +86,7 @@ struct member_overload source second: +-4--6-| operator "concat_with" : +--1-2-3--4--6-| } - * \details Actually it subscribes on original observable. When original observable completes, then it subscribes on first observable from arguments and etc... + * \details Actually this operator subscribes on original observable. When original observable completes, then it subscribes on first observable from arguments and etc... * * \return new specific_observable with the concat operator as most recent operator. * \warning #include @@ -98,13 +98,14 @@ struct member_overload * - On subscribe * - Allocates one `shared_ptr` to store observables (== emissions) and some internal variables * - Wraps subscriber with serialization logic to be sure callbacks called serialized - * - OnNext + * - OnNext for original observable * - If no any active observable, then subscribes on new obtained observable, else place it in queue - * - When active observable completes, then subscribe on next observable from queue (if any) * - OnError * - Just forwards original on_error - * - OnCompleted + * - OnCompleted from original observable * - Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber) + * - OnCompleted from inner observable + * - Subscribe on next observable from queue (if any) * * \ingroup aggregate_operators * \see https://reactivex.io/documentation/operators/concat.html diff --git a/src/rpp/rpp/operators/fwd/debounce.hpp b/src/rpp/rpp/operators/fwd/debounce.hpp index 0b86f6247..98894558e 100644 --- a/src/rpp/rpp/operators/fwd/debounce.hpp +++ b/src/rpp/rpp/operators/fwd/debounce.hpp @@ -26,25 +26,41 @@ struct debounce_impl; template struct member_overload { - /** - * \brief Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset. - * - * \marble debounce - { - source observable : +--1-2-----3---| - operator "debounce(4)" : +--------2-----3| - } - * \param period is duration of time should be passed since emission from original observable without any new emissions to emit this emission. - * \param scheduler is scheduler used to run timer for debounce - * \return new specific_observable with the debounce operator as most recent operator. - * \warning #include - * - * \par Example - * \snippet debounce.cpp debounce - * - * \ingroup utility_operators - * \see https://reactivex.io/documentation/operators/debounce.html - */ + /** + * \brief Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset. + * + * \marble debounce + { + source observable : +--1-2-----3---| + operator "debounce(4)" : +--------2-----3| + } + * \details Actually this operator resets time of last emission, schedules action to send this emission after specified period if no any new emissions till this moment. + * + * \param period is duration of time should be passed since emission from original observable without any new emissions to emit this emission. + * \param scheduler is scheduler used to run timer for debounce + * \return new specific_observable with the debounce operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet debounce.cpp debounce + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store last emission and time. + * - Wraps subscriber with serialization logic to prevent race-conditions + * - OnNext + * - Saves time when emission happened + * - Saves emission + * - Schedule action to send this emission with check if no any new emissions + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * - Immediately send current active emission if any + * + * \ingroup utility_operators + * \see https://reactivex.io/documentation/operators/debounce.html + */ template auto debounce(schedulers::duration period,const TScheduler& scheduler = TScheduler{}) const & requires is_header_included { From 971672e297ef73b40332d08076eeb9ac498df2a7 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 23 Dec 2022 23:31:00 +0300 Subject: [PATCH 5/9] Delay/distinct_until_changed --- src/rpp/rpp/operators/fwd/delay.hpp | 43 +++++++-------- .../operators/fwd/distinct_until_changed.hpp | 52 ++++++++++++------- 2 files changed, 54 insertions(+), 41 deletions(-) diff --git a/src/rpp/rpp/operators/fwd/delay.hpp b/src/rpp/rpp/operators/fwd/delay.hpp index 546b065f5..6deda3790 100644 --- a/src/rpp/rpp/operators/fwd/delay.hpp +++ b/src/rpp/rpp/operators/fwd/delay.hpp @@ -27,27 +27,28 @@ template struct member_overload { - /** - * \brief Shift the emissions from an Observable forward in time by a particular amount. - * \details The delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable’s items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment. - * - * \marble delay - { - source observable : +-1-2-3-| - operator "delay: --" : +---1-2-3-| - } - * - * \param delay_duration is the delay duration for emitting items. Delay duration should be able to cast to rpp::schedulers::duration. - * \param scheduler provides the threading model for delay. e.g. With a new thread scheduler, the observer sees the values in a new thread after a delay duration to the subscription. - * \return new specific_observable with the delay operator as most recent operator. - * \warning #include - * - * \par Examples - * \snippet delay.cpp delay - * - * \ingroup utility_operators - * \see https://reactivex.io/documentation/operators/delay.html - */ + /** + * \brief Shift the emissions from an Observable forward in time by a particular amount. + * \details The delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable’s items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment. + * + * \marble delay + { + source observable : +-1-2-3-| + operator "delay: --" : +---1-2-3-| + } + * \details Actually this operator just schedules emissions via provided scheduler with provided delay_duration. + * + * \param delay_duration is the delay duration for emitting items. Delay duration should be able to cast to rpp::schedulers::duration. + * \param scheduler provides the threading model for delay. e.g. With a new thread scheduler, the observer sees the values in a new thread after a delay duration to the subscription. + * \return new specific_observable with the delay operator as most recent operator. + * \warning #include + * + * \par Examples + * \snippet delay.cpp delay + * + * \ingroup utility_operators + * \see https://reactivex.io/documentation/operators/delay.html + */ template auto delay(auto&& delay_duration, TScheduler&& scheduler) const& requires is_header_included diff --git a/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp b/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp index 3318e6f56..5e2bc9f05 100644 --- a/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp +++ b/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp @@ -28,26 +28,38 @@ struct distinct_until_changed_impl; template struct member_overload { - /** - * \brief Suppress consecutive duplicates of emissions from original observable - * - * \marble distinct_until_changed - { - source observable : +--1-1-2-2-3-2-1-| - operator "distinct_until_changed" : +--1---2---3-2-1-| - } - * - * \param equality_fn optional equality comparator function - * \return new specific_observable with the distinct_until_changed operator as most recent operator. - * \warning #include - * - * \par Example - * \snippet distinct_until_changed.cpp distinct_until_changed - * \snippet distinct_until_changed.cpp distinct_until_changed_with_comparator - * - * \ingroup filtering_operators - * \see https://reactivex.io/documentation/operators/distinct.html - */ + /** + * \brief Suppress consecutive duplicates of emissions from original observable + * + * \marble distinct_until_changed + { + source observable : +--1-1-2-2-3-2-1-| + operator "distinct_until_changed" : +--1---2---3-2-1-| + } + * \details Actually this operator has `std::optional` with last item and checks everytime where new emission is same or not. + * + * \param equality_fn optional equality comparator function + * \return new specific_observable with the distinct_until_changed operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet distinct_until_changed.cpp distinct_until_changed + * \snippet distinct_until_changed.cpp distinct_until_changed_with_comparator + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store last emission + * - OnNext + * - Checks if value in state same as new emission + * - If new emission is not same, then updates state and emit this emission + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * + * \ingroup filtering_operators + * \see https://reactivex.io/documentation/operators/distinct.html + */ template EqualityFn = std::equal_to> auto distinct_until_changed(EqualityFn&& equality_fn = EqualityFn{}) const & requires is_header_included { From 0e988bc721fa94b17cb653c5ade44781d807c529 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 24 Dec 2022 00:01:49 +0300 Subject: [PATCH 6/9] fix minor issue in first --- src/rpp/rpp/operators/first.hpp | 17 ++++++++--------- src/tests/rpp/test_first.cpp | 26 ++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/rpp/rpp/operators/first.hpp b/src/rpp/rpp/operators/first.hpp index 65285e5aa..70a6cf293 100644 --- a/src/rpp/rpp/operators/first.hpp +++ b/src/rpp/rpp/operators/first.hpp @@ -33,10 +33,9 @@ using first_on_next = take_on_next; struct first_on_completed { - void operator()(const constraint::subscriber auto& subscriber, const first_state& state) const + void operator()(const constraint::subscriber auto& subscriber, const first_state&) const { - if (state.count != 0) - subscriber.on_error(std::make_exception_ptr(utils::not_enough_emissions{"first() operator expects at least one emission from observable before completion"})); + subscriber.on_error(std::make_exception_ptr(utils::not_enough_emissions{"first() operator expects at least one emission from observable before completion"})); } }; @@ -50,12 +49,12 @@ struct first_impl auto subscription = subscriber.get_subscription(); // dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state - return create_subscriber_with_state(std::move(subscription), - first_on_next{}, - utils::forwarding_on_error{}, - first_on_completed{}, - std::forward(subscriber), - first_state{}); + return create_subscriber_with_dynamic_state(std::move(subscription), + first_on_next{}, + utils::forwarding_on_error{}, + first_on_completed{}, + std::forward(subscriber), + first_state{}); } }; } // namespace rpp::details diff --git a/src/tests/rpp/test_first.cpp b/src/tests/rpp/test_first.cpp index ca2f7c7bd..98381e4a4 100644 --- a/src/tests/rpp/test_first.cpp +++ b/src/tests/rpp/test_first.cpp @@ -82,6 +82,32 @@ SCENARIO("first forwards error", "[first]") } } +SCENARIO("first keeps state for copies", "[first]") +{ + auto mock = mock_observer{}; + GIVEN("observable which sends values via copy") + { + auto obs = rpp::source::create([](const auto& sub) + { + for(size_t i = 0; i < 10; ++i) + { + auto copy = sub; + copy.on_next(1); + } + }); + WHEN("subscribe on it via first") + { + obs.first().subscribe(mock); + THEN("observer obtains one value as expected") + { + CHECK(mock.get_received_values() == std::vector{ 1 }); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } +} + SCENARIO("first raises error for empty", "[first]") { GIVEN("observable of ---|") From ae11210c6fe4696c66ae50ee5e861d969dc52dfc Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 24 Dec 2022 00:01:56 +0300 Subject: [PATCH 7/9] Extend docs --- src/rpp/rpp/operators/fwd/debounce.hpp | 1 + .../operators/fwd/distinct_until_changed.hpp | 1 + src/rpp/rpp/operators/fwd/filter.hpp | 50 ++++++++++++------- src/rpp/rpp/operators/fwd/merge.hpp | 13 +++++ 4 files changed, 46 insertions(+), 19 deletions(-) diff --git a/src/rpp/rpp/operators/fwd/debounce.hpp b/src/rpp/rpp/operators/fwd/debounce.hpp index 98894558e..26c1b5005 100644 --- a/src/rpp/rpp/operators/fwd/debounce.hpp +++ b/src/rpp/rpp/operators/fwd/debounce.hpp @@ -34,6 +34,7 @@ struct member_overload source observable : +--1-2-----3---| operator "debounce(4)" : +--------2-----3| } + * * \details Actually this operator resets time of last emission, schedules action to send this emission after specified period if no any new emissions till this moment. * * \param period is duration of time should be passed since emission from original observable without any new emissions to emit this emission. diff --git a/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp b/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp index 5e2bc9f05..4a5376494 100644 --- a/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp +++ b/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp @@ -36,6 +36,7 @@ struct member_overload source observable : +--1-1-2-2-3-2-1-| operator "distinct_until_changed" : +--1---2---3-2-1-| } + * * \details Actually this operator has `std::optional` with last item and checks everytime where new emission is same or not. * * \param equality_fn optional equality comparator function diff --git a/src/rpp/rpp/operators/fwd/filter.hpp b/src/rpp/rpp/operators/fwd/filter.hpp index d612e2fbf..5519cb5de 100644 --- a/src/rpp/rpp/operators/fwd/filter.hpp +++ b/src/rpp/rpp/operators/fwd/filter.hpp @@ -25,25 +25,37 @@ struct filter_impl; template struct member_overload { - /** - * \brief Emit only those items from an Observable that satisfies a provided predicate - * - * \marble filter - { - source observable : +--1-2-3-4-| - operator "filter: x=>x%2==0" : +----2---4-| - } - * - * \param predicate is predicate used to check emitted items. true -> items satisfies condition, false -> not - * \return new specific_observable with the Filter operator as most recent operator. - * \warning #include - * - * \par Example: - * \snippet filter.cpp Filter - * - * \ingroup filtering_operators - * \see https://reactivex.io/documentation/operators/filter.html - */ + /** + * \brief Emit only those items from an Observable that satisfies a provided predicate + * + * \marble filter + { + source observable : +--1-2-3-4-| + operator "filter: x=>x%2==0" : +----2---4-| + } + * + * \details Actually this operator just checks if predicate returns true, then forwards emission + * + * \param predicate is predicate used to check emitted items. true -> items satisfies condition, false -> not + * \return new specific_observable with the Filter operator as most recent operator. + * \warning #include + * + * \par Example: + * \snippet filter.cpp Filter + * + * \par Implementation details: + * - On subscribe + * - None + * - OnNext + * - Just forwards emission of predicate returns true + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * + * \ingroup filtering_operators + * \see https://reactivex.io/documentation/operators/filter.html + */ template Predicate> auto filter(Predicate&& predicate) const& requires is_header_included { diff --git a/src/rpp/rpp/operators/fwd/merge.hpp b/src/rpp/rpp/operators/fwd/merge.hpp index 7e4a54b6c..38ea5b9d1 100644 --- a/src/rpp/rpp/operators/fwd/merge.hpp +++ b/src/rpp/rpp/operators/fwd/merge.hpp @@ -44,11 +44,24 @@ struct member_overload operator "merge" : +--1-243-6-| } * + * \details Actually it subscribes on each observable from emissions. Resulting observables completes when ALL observables completes + * * \return new specific_observable with the merge operator as most recent operator. * \warning #include * * \par Example: * \snippet merge.cpp merge + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store interal state + * - Wraps subscriber with serialization logic to be sure callbacks called serialized + * - OnNext + * - Subscribes on obtained observable + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed when all observables emit on_completed * * \ingroup combining_operators * \see https://reactivex.io/documentation/operators/merge.html From a2ab1720b248a1b8c6739a4b2aa8fde0c048d228 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 24 Dec 2022 00:04:49 +0300 Subject: [PATCH 8/9] Improve docs --- src/rpp/rpp/operators/fwd/combine_latest.hpp | 1 + src/rpp/rpp/operators/fwd/concat.hpp | 13 +++-- src/rpp/rpp/operators/fwd/delay.hpp | 11 ++++ src/rpp/rpp/operators/fwd/first.hpp | 51 +++++++++++------- src/rpp/rpp/operators/fwd/flat_map.hpp | 2 +- src/rpp/rpp/operators/fwd/group_by.hpp | 14 ++++- src/rpp/rpp/operators/fwd/last.hpp | 51 +++++++++++------- src/rpp/rpp/operators/fwd/map.hpp | 57 ++++++++++++-------- src/rpp/rpp/operators/fwd/merge.hpp | 17 +++++- 9 files changed, 149 insertions(+), 68 deletions(-) diff --git a/src/rpp/rpp/operators/fwd/combine_latest.hpp b/src/rpp/rpp/operators/fwd/combine_latest.hpp index ae3b1eba9..96f7a2857 100644 --- a/src/rpp/rpp/operators/fwd/combine_latest.hpp +++ b/src/rpp/rpp/operators/fwd/combine_latest.hpp @@ -100,6 +100,7 @@ struct member_overload source other_observable : +-5-- -6 -7 -- -8 - -| operator "combine_latest:tuple" : +---{1,5}-{1,6}-{1,7}-{2,7}-{2,8}-{3,8}-| } + * * \details Actually this operator subscribes on all of theses observables and emits `std::tuple` of last emissions when any of them emits new emission (and each observable emit values at least one to be able to provide combined value) * * \param observables are observables whose emissions would be combined with the current observable's emissions diff --git a/src/rpp/rpp/operators/fwd/concat.hpp b/src/rpp/rpp/operators/fwd/concat.hpp index b3424f6ed..eea96ec26 100644 --- a/src/rpp/rpp/operators/fwd/concat.hpp +++ b/src/rpp/rpp/operators/fwd/concat.hpp @@ -42,6 +42,7 @@ struct member_overload } operator "concat" : +--1-2-3-4--6-| } + * * \details Actually it subscribes on first observable from emissions. When first observable completes, then it subscribes on second observable from emissions and etc... * * \return new specific_observable with the concat operator as most recent operator. @@ -54,13 +55,14 @@ struct member_overload * - On subscribe * - Allocates one `shared_ptr` to store observables (== emissions) and some internal variables * - Wraps subscriber with serialization logic to be sure callbacks called serialized - * - OnNext + * - OnNext for original observable * - If no any active observable, then subscribes on new obtained observable, else place it in queue - * - When active observable completes, then subscribe on next observable from queue (if any) * - OnError * - Just forwards original on_error - * - OnCompleted + * - OnCompleted from original observable * - Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber) + * - OnCompleted from inner observable + * - Subscribe on next observable from queue (if any) * * \ingroup aggregate_operators * \see https://reactivex.io/documentation/operators/concat.html @@ -86,6 +88,7 @@ struct member_overload source second: +-4--6-| operator "concat_with" : +--1-2-3--4--6-| } + * * \details Actually this operator subscribes on original observable. When original observable completes, then it subscribes on first observable from arguments and etc... * * \return new specific_observable with the concat operator as most recent operator. @@ -98,8 +101,8 @@ struct member_overload * - On subscribe * - Allocates one `shared_ptr` to store observables (== emissions) and some internal variables * - Wraps subscriber with serialization logic to be sure callbacks called serialized - * - OnNext for original observable - * - If no any active observable, then subscribes on new obtained observable, else place it in queue + * - OnNext + * - Just forwards on_next * - OnError * - Just forwards original on_error * - OnCompleted from original observable diff --git a/src/rpp/rpp/operators/fwd/delay.hpp b/src/rpp/rpp/operators/fwd/delay.hpp index 6deda3790..3bcba34d8 100644 --- a/src/rpp/rpp/operators/fwd/delay.hpp +++ b/src/rpp/rpp/operators/fwd/delay.hpp @@ -36,6 +36,7 @@ struct member_overload source observable : +-1-2-3-| operator "delay: --" : +---1-2-3-| } + * * \details Actually this operator just schedules emissions via provided scheduler with provided delay_duration. * * \param delay_duration is the delay duration for emitting items. Delay duration should be able to cast to rpp::schedulers::duration. @@ -46,6 +47,16 @@ struct member_overload * \par Examples * \snippet delay.cpp delay * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store internal state + * - OnNext + * - Move emission to queue and schedule action to drain queue (if not yet) + * - OnError + * - Just forwards original on_error via scheduling + * - OnCompleted + * - Just forwards original on_completed via scheduling + * * \ingroup utility_operators * \see https://reactivex.io/documentation/operators/delay.html */ diff --git a/src/rpp/rpp/operators/fwd/first.hpp b/src/rpp/rpp/operators/fwd/first.hpp index 0797bdcc7..2207b2dec 100644 --- a/src/rpp/rpp/operators/fwd/first.hpp +++ b/src/rpp/rpp/operators/fwd/first.hpp @@ -27,25 +27,38 @@ struct first_impl; template struct member_overload { - /** - * \brief emit only the first item. - * - * \marble first - { - source observable : +--1--2--3--| - operator "first" : +--1| - } - * - * \return new specific_observable with the first operator as most recent operator. - * \warning #include - * - * \par Example: - * \snippet first.cpp first - * \snippet first.cpp first_empty - * - * \ingroup filtering_operators - * \see https://reactivex.io/documentation/operators/first.html - */ + /** + * \brief emit only the first item. + * + * \marble first + { + source observable : +--1--2--3--| + operator "first" : +--1| + } + * + * \details Actually this operator is `take(1)` with exception during `on_completed` if no any emision happens. So, it just forwards first obtained emission and emits on_completed immediately + * \throws rpp::utils::not_enough_emissions in case of on_completed obtained without any emissions + * + * \return new specific_observable with the first operator as most recent operator. + * \warning #include + * + * \par Example: + * \snippet first.cpp first + * \snippet first.cpp first_empty + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to keep internal state + * - OnNext + * - Just forwards 1 emission and emit on_completed + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - throws exception if no any emission before + * + * \ingroup filtering_operators + * \see https://reactivex.io/documentation/operators/first.html + */ auto first() const & requires is_header_included { return cast_this()->template lift(first_impl{}); diff --git a/src/rpp/rpp/operators/fwd/flat_map.hpp b/src/rpp/rpp/operators/fwd/flat_map.hpp index cc9bc415b..5e61c400e 100644 --- a/src/rpp/rpp/operators/fwd/flat_map.hpp +++ b/src/rpp/rpp/operators/fwd/flat_map.hpp @@ -41,7 +41,7 @@ struct member_overload operator "flat_map: x=>just(x,x+1)" : +--12-23-34-| } * - * \details Actually it makes `map` and then `merge`. + * \details Actually it makes `map(callable)` and then `merge`. * * \param callable Function to transform item to observable * \return new specific_observable with the flat_map operator as most recent operator. diff --git a/src/rpp/rpp/operators/fwd/group_by.hpp b/src/rpp/rpp/operators/fwd/group_by.hpp index 52b0a0e2f..981a6afc4 100644 --- a/src/rpp/rpp/operators/fwd/group_by.hpp +++ b/src/rpp/rpp/operators/fwd/group_by.hpp @@ -38,7 +38,7 @@ struct member_overload } * * - * \details Original observable applies `key_selector` to obtain key and split values to sub-groups based on these keys and then send rpp::grouped_observable with this key. Such an grouped observables emit values which has same value of key. + * \details Actually this operator applies `key_selector` to emission to obtain key, place rpp::grouped_observable to map with corresponding map and then send observable with this key (if not yet). Original values emitted via this grouped_observables * * \param key_selector Function which determines key for provided item * \param value_selector Function which determines value to be emitted to grouped observable @@ -51,6 +51,18 @@ struct member_overload * \snippet group_by.cpp group_by * \snippet group_by.cpp group_by selector * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to keep map + * - OnNext + * - Applies key_selector to obtained emission + * - For calculated key create new entry in map (if not yet) + * - Emit value via grouped_observable from map for corresponding key + * - OnError + * - Just forwards original on_error to both subscribers of observable of grouped observables and grouped observables + * - OnCompleted + * - Just forwards original on_completed to both subscribers of observable of grouped observables and grouped observables + * * \ingroup transforming_operators * \see https://reactivex.io/documentation/operators/groupby.html */ diff --git a/src/rpp/rpp/operators/fwd/last.hpp b/src/rpp/rpp/operators/fwd/last.hpp index 55a52508f..0fe26f463 100644 --- a/src/rpp/rpp/operators/fwd/last.hpp +++ b/src/rpp/rpp/operators/fwd/last.hpp @@ -26,25 +26,38 @@ struct last_impl; template struct member_overload { - /** - * \brief Emit only the last item provided before on_completed. - * - * \marble last - { - source observable : +--1--2--3--| - operator "last" : +--3-| - } - * - * \return new specific_observable with the last operator as most recent operator. - * \warning #include - * - * \par Example: - * \snippet last.cpp last - * \snippet last.cpp last empty - * - * \ingroup filtering_operators - * \see https://reactivex.io/documentation/operators/last.html - */ + /** + * \brief Emit only the last item provided before on_completed. + * + * \marble last + { + source observable : +--1--2--3--| + operator "last" : +--3-| + } + * + * \details Actually this operator just updates `std::optional` on every new emission and emits this value on_completed + * \throws rpp::utils::not_enough_emissions in case of on_completed obtained without any emissions + * + * \return new specific_observable with the last operator as most recent operator. + * \warning #include + * + * \par Example: + * \snippet last.cpp last + * \snippet last.cpp last empty + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to keep `std::optional` + * - OnNext + * - Just saves emission to `std::optional` + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Emits saved emission or throws exception if no any emissions + * + * \ingroup filtering_operators + * \see https://reactivex.io/documentation/operators/last.html + */ auto last() const & requires is_header_included { return cast_this()->template lift(last_impl{}); diff --git a/src/rpp/rpp/operators/fwd/map.hpp b/src/rpp/rpp/operators/fwd/map.hpp index 340ec87f2..df5ac462c 100644 --- a/src/rpp/rpp/operators/fwd/map.hpp +++ b/src/rpp/rpp/operators/fwd/map.hpp @@ -26,28 +26,41 @@ struct map_impl; template struct member_overload { - /** - * \brief Transform the items emitted by an Observable via applying a function to each item and emitting result - * \note The Map operator can keep same type of value or change it to some another type. - * - * \marble map - { - source observable : +--1 -2 --3 -| - operator "map: x=>x+10" : +--(11)-(12)--(13)-| - } - * - * \param callable is callable used to provide this transformation. Should accept Type of original observable and return type for new observable - * \return new specific_observable with the Map operator as most recent operator. - * \warning #include - * - * \par Example with same type: - * \snippet map.cpp Same type - * - * \par Example with changed type: - * \snippet map.cpp Changed type - * \ingroup transforming_operators - * \see https://reactivex.io/documentation/operators/map.html - */ + /** + * \brief Transform the items emitted by an Observable via applying a function to each item and emitting result + * \note The Map operator can keep same type of value or change it to some another type. + * + * \marble map + { + source observable : +--1 -2 --3 -| + operator "map: x=>x+10" : +--(11)-(12)--(13)-| + } + * + * \details Actually this operator just applies callable to each obtained emission and emit resulting value + * + * \param callable is callable used to provide this transformation. Should accept Type of original observable and return type for new observable + * \return new specific_observable with the Map operator as most recent operator. + * \warning #include + * + * \par Example with same type: + * \snippet map.cpp Same type + * + * \par Example with changed type: + * \snippet map.cpp Changed type + * + * \par Implementation details: + * - On subscribe + * - None + * - OnNext + * - Just forwards result of applying callable to emissions + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/map.html + */ template Callable> auto map(Callable&& callable) const & requires is_header_included { diff --git a/src/rpp/rpp/operators/fwd/merge.hpp b/src/rpp/rpp/operators/fwd/merge.hpp index 38ea5b9d1..c956e02b0 100644 --- a/src/rpp/rpp/operators/fwd/merge.hpp +++ b/src/rpp/rpp/operators/fwd/merge.hpp @@ -56,8 +56,10 @@ struct member_overload * - On subscribe * - Allocates one `shared_ptr` to store interal state * - Wraps subscriber with serialization logic to be sure callbacks called serialized - * - OnNext + * - OnNext for original observable * - Subscribes on obtained observable + * - OnNext for inner observable + * - Just forwards original on_next * - OnError * - Just forwards original on_error * - OnCompleted @@ -89,6 +91,8 @@ struct member_overload source second: +-----4--6-| operator "merge_with" : +--1-243-6-| } + * + * \details Actually it subscribes on each observable. Resulting observables completes when ALL observables completes * * \param observables are observables whose emissions would be merged with current observable * \return new specific_observable with the merge operator as most recent operator. @@ -97,6 +101,17 @@ struct member_overload * \par Example: * \snippet merge.cpp merge_with * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store interal state + * - Wraps subscriber with serialization logic to be sure callbacks called serialized + * - OnNext + * - Just forwards original on_next + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed when all observables emit on_completed + * * \ingroup combining_operators * \see https://reactivex.io/documentation/operators/merge.html */ From 4a63942337b260228aa237864b8942a898ee06ea Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 25 Dec 2022 00:02:30 +0300 Subject: [PATCH 9/9] Add comments --- src/rpp/rpp/operators/fwd/observe_on.hpp | 12 ++++ .../operators/fwd/on_error_resume_next.hpp | 56 ++++++++++++------- src/rpp/rpp/operators/fwd/reduce.hpp | 56 +++++++++++-------- src/rpp/rpp/operators/fwd/repeat.hpp | 15 ++++- src/rpp/rpp/operators/fwd/sample.hpp | 56 ++++++++++++------- src/rpp/rpp/operators/last.hpp | 12 ++-- 6 files changed, 136 insertions(+), 71 deletions(-) diff --git a/src/rpp/rpp/operators/fwd/observe_on.hpp b/src/rpp/rpp/operators/fwd/observe_on.hpp index 808f97d8b..ec7f0c108 100644 --- a/src/rpp/rpp/operators/fwd/observe_on.hpp +++ b/src/rpp/rpp/operators/fwd/observe_on.hpp @@ -29,6 +29,8 @@ struct member_overload /** * \brief Emit emissions of observable starting from this point via provided scheduler * + * \details Actually this operator just schedules emissions via provided scheduler. So, actually it is delay(0) operator + * * \param scheduler is scheduler used for scheduling of OnNext * \return new specific_observable with the observe_on operator as most recent operator. * \warning #include @@ -36,6 +38,16 @@ struct member_overload * \par Example: * \snippet observe_on.cpp observe_on * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store internal state + * - OnNext + * - Move emission to queue and schedule action to drain queue (if not yet) + * - OnError + * - Just forwards original on_error via scheduling + * - OnCompleted + * - Just forwards original on_completed via scheduling + * * \ingroup utility_operators * \see https://reactivex.io/documentation/operators/observeon.html */ diff --git a/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp index 14cc910a9..9e1dc2c2a 100644 --- a/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp @@ -35,27 +35,41 @@ template struct member_overload { - /** - * \brief Recover from an on_error notification by continuing the sequence without error. - * \details The operator intercepts an on_error notification from the source Observable and, instead of passing it through to any observers, replaces it with some other item or sequence of items. - * \warning This operator potentially allows the resulting Observable to terminate normally or not to terminate at all. - * - * \marble on_error_resume_next - { - source observable : +-1-# - operator "on_error_resume_next: -9-9-|" : +-1-9-9-| - } - * - * \param resume_callable A callable that is given an error pointer and shall return an Observable. - * \return new specific_observable with the on_error_resume_next operator as most recent operator. - * \warning #include - * - * \par Examples - * \snippet on_error_resume_next.cpp on_error_resume_next - * - * \ingroup error_handling_operators - * \see https://reactivex.io/documentation/operators/on_error_resume_next.html - */ + /** + * \brief Recover from an on_error notification by continuing the sequence without error. + * \details The operator intercepts an `on_error` notification from the source Observable and, instead of passing it through to any observers, replaces it with some other item or sequence of items. + * \warning This operator potentially allows the resulting Observable to terminate normally or not to terminate at all. + * + * \marble on_error_resume_next + { + source observable : +-1-# + operator "on_error_resume_next: -9-9-|" : +-1-9-9-| + } + * + * \details Actually this operator just subscribes on observable calculated from callback when `on_error` from original observable obtained. + * + * \param resume_callable A callable that is given an error pointer and shall return an Observable. + * \return new specific_observable with the on_error_resume_next operator as most recent operator. + * \warning #include + * + * \par Examples + * \snippet on_error_resume_next.cpp on_error_resume_next + * + * \par Implementation details: + * - On subscribe + * - None + * - OnNext + * - Just forwards original on_next + * - OnError from original observable + * - Subscribes subscriber on observable obtained from callable + * - OnError from calculated observable + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * + * \ingroup error_handling_operators + * \see https://reactivex.io/documentation/operators/on_error_resume_next.html + */ template auto on_error_resume_next(ResumeCallable&& resume_callable) const& requires is_header_included { diff --git a/src/rpp/rpp/operators/fwd/reduce.hpp b/src/rpp/rpp/operators/fwd/reduce.hpp index 1dfcc8b6a..1ee35cf63 100644 --- a/src/rpp/rpp/operators/fwd/reduce.hpp +++ b/src/rpp/rpp/operators/fwd/reduce.hpp @@ -56,28 +56,40 @@ auto max_impl(TObs&& observable, Comparator&& comparator); template struct member_overload { - /** - * \brief Applies accumulator function to each emission from observable and result of accumulator from previous step and emits final value - * - * \marble reduce - { - source observable : +--1-2-3-| - operator "reduce: s=1, (s,x)=>s+x" : +--------7| - } - * - * \param initial_seed initial value for seed which will be applied for first value from observable. Then it will be replaced with result and etc. - * \param accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference. - * - * \return new specific_observable with the reduce operator as most recent operator. - * \warning #include - * - * \par Example - * \snippet reduce.cpp reduce - * \snippet reduce.cpp reduce_vector - * - * \ingroup aggregate_operators - * \see https://reactivex.io/documentation/operators/reduce.html - */ + /** + * \brief Applies accumulator function to each emission from observable and result of accumulator from previous step and emits final value + * + * \marble reduce + { + source observable : +--1-2-3-| + operator "reduce: s=1, (s,x)=>s+x" : +--------7| + } + * + * \details Actually this operator behaves like `scan()` + `take_last(1)`, so, it just accumulates `seed` and emits it `on_completed` + * + * \param initial_seed initial value for seed which will be applied for first value from observable. Then it will be replaced with result and etc. + * \param accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference. + * + * \return new specific_observable with the reduce operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet reduce.cpp reduce + * \snippet reduce.cpp reduce_vector + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store internal state + * - OnNext + * - Applies accumulator to each emission + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Emits accumulated seed via applyting result_selector + * + * \ingroup aggregate_operators + * \see https://reactivex.io/documentation/operators/reduce.html + */ template AccumulatorFn, std::invocable ResultSelectorFn = std::identity> auto reduce(Seed&& initial_seed, AccumulatorFn&& accumulator, ResultSelectorFn&& result_selector = {}) const & requires is_header_included { diff --git a/src/rpp/rpp/operators/fwd/repeat.hpp b/src/rpp/rpp/operators/fwd/repeat.hpp index 3f1363c76..64bef9f20 100644 --- a/src/rpp/rpp/operators/fwd/repeat.hpp +++ b/src/rpp/rpp/operators/fwd/repeat.hpp @@ -30,7 +30,7 @@ template struct member_overload { /** - * \brief Re-subscribes on current observable during `on_completed` provided amount of times + * \brief Re-subscribes on current observable provided amount of times when `on_completed` obtained * * \marble repeat { @@ -38,6 +38,8 @@ struct member_overload operator "repeat(2)" : +-1-2-3-1-2-3-| } * + * \details Actually this operator re-subscribes on same observable when `on_completed` obtained while counter not reached zero + * * \param count total amount of times subscription happens. For example: * - `count(0)` - means no any subscription at all * - `count(1)` - behave like ordinal observable @@ -48,6 +50,17 @@ struct member_overload * \par Examples: * \snippet repeat.cpp repeat * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store counter + * - OnNext + * - Just forwards original on_next + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Decrements counter + * - If counter not zero, then re-subscribes on the same observable + * * \ingroup utility_operators * \see https://reactivex.io/documentation/operators/repeat.html */ diff --git a/src/rpp/rpp/operators/fwd/sample.hpp b/src/rpp/rpp/operators/fwd/sample.hpp index 48bdf94c0..35337289a 100644 --- a/src/rpp/rpp/operators/fwd/sample.hpp +++ b/src/rpp/rpp/operators/fwd/sample.hpp @@ -27,27 +27,41 @@ struct sample_with_time_impl; template struct member_overload { - /** - * \brief Emit most recent emitted from original observable emission obtained during last period of time. - * \details Emit item immediately in case of completion of the original observable - * - * \marble sample_with_time - { - source observable : +--1---2-3-4---5-6-7-| - operator "sample_with_time(2)" : +--1---2---4---5---7-| - } - * - * \param period sampling period - * \scheduler scheduler to use to schedule emissions with provided sampling period - * \return new specific_observable with the sample_with_time operator as most recent operator. - * \warning #include - * - * \par Example - * \snippet sample.cpp sample_with_time - * - * \ingroup filtering_operators - * \see https://reactivex.io/documentation/operators/sample.htmlhttps://reactivex.io/documentation/operators/sample.html - */ + /** + * \brief Emit most recent emitted from original observable emission obtained during last period of time. + * \details Emit item immediately in case of completion of the original observable + * + * \marble sample_with_time + { + source observable : +--1---2-3-4---5-6-7-| + operator "sample_with_time(2)" : +--1---2---4---5---7-| + } + * + * \details Actually operator just schedules periodical action and on each schedulable execution just emits last emitted emission (if any) + * + * \param period sampling period + * \scheduler scheduler to use to schedule emissions with provided sampling period + * \return new specific_observable with the sample_with_time operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet sample.cpp sample_with_time + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store last emitted value + * - Schedules periodical action to emit stored value (if any) + * - OnNext + * - Updates stored value + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * - Emit last emitted value (if any) + * + * \ingroup filtering_operators + * \see https://reactivex.io/documentation/operators/sample.htmlhttps://reactivex.io/documentation/operators/sample.html + */ template auto sample_with_time(schedulers::duration period, const TScheduler& scheduler) const & requires is_header_included { diff --git a/src/rpp/rpp/operators/last.hpp b/src/rpp/rpp/operators/last.hpp index 0b1f20534..9ac2baf72 100644 --- a/src/rpp/rpp/operators/last.hpp +++ b/src/rpp/rpp/operators/last.hpp @@ -64,12 +64,12 @@ struct last_impl { auto subscription = subscriber.get_subscription(); - return create_subscriber_with_state(std::move(subscription), - last_on_next{}, - utils::forwarding_on_error{}, - last_on_completed{}, - std::forward(subscriber), - last_state{}); + return create_subscriber_with_dynamic_state(std::move(subscription), + last_on_next{}, + utils::forwarding_on_error{}, + last_on_completed{}, + std::forward(subscriber), + last_state{}); } }; } // namespace rpp::details