From c582238b3f56bfcfa07850be50e9db5f327ec382 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 16 Dec 2022 16:40:35 +0300 Subject: [PATCH 1/4] Start to simplify lift a bit --- .gitignore | 1 + src/rpp/rpp/operators/lift.hpp | 49 +++++++++++++++++++++++++++++++--- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index b6bed4d54..1e0b317d3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .idea/ .vs/ .vscode/ +.cache/ build/ _build/ cmake/open-cpp-coverage.cmake diff --git a/src/rpp/rpp/operators/lift.hpp b/src/rpp/rpp/operators/lift.hpp index 1fe4fcdcd..acbb69ddf 100644 --- a/src/rpp/rpp/operators/lift.hpp +++ b/src/rpp/rpp/operators/lift.hpp @@ -10,10 +10,13 @@ #pragma once +#include "rpp/observables/specific_observable.hpp" +#include "rpp/subscribers/constraints.hpp" #include // RPP_NO_UNIQUE_ADDRESS #include // create_subscriber_with_state #include // own forwarding #include // create observable +#include IMPLEMENTATION_FILE(lift_tag); @@ -45,10 +48,26 @@ struct lift_action_by_callbacks * \param _this is the current observable. * \param op is the functor that provides the "operator()(subscriber_of_new_type) -> subscriber_of_old_type". */ -template OperatorFn, typename TObs> + +template OperatorFn, typename ...ChildLiftArgs> struct lift_on_subscribe { - RPP_NO_UNIQUE_ADDRESS TObs _this; + using T = utils::extract_subscriber_type_t>>; + RPP_NO_UNIQUE_ADDRESS rpp::specific_observable> _this; + RPP_NO_UNIQUE_ADDRESS OperatorFn op; + + template TSub> + void operator()(TSub&& subscriber) const + { + _this.subscribe(op(std::forward(subscriber))); + } +}; + +template OperatorFn, typename TOnSubscribe> +struct lift_on_subscribe +{ + using T = utils::extract_subscriber_type_t>>; + RPP_NO_UNIQUE_ADDRESS rpp::specific_observable _this; RPP_NO_UNIQUE_ADDRESS OperatorFn op; template TSub> @@ -58,9 +77,33 @@ struct lift_on_subscribe } }; +template OperatorFn, typename ObservableValue, typename ...ChildLiftArgs> +auto lift_impl_internal(OperatorFn&& op, rpp::specific_observable>&& _this) +{ + return rpp::observable::create, ChildLiftArgs...>>({ std::move(_this), std::forward(op) }); +} + +template OperatorFn, typename ObservableValue, typename ...ChildLiftArgs> +auto lift_impl_internal(OperatorFn&& op, const rpp::specific_observable>& _this) +{ + return rpp::observable::create, ChildLiftArgs...>>({ _this, std::forward(op) }); +} + +template OperatorFn, typename ObservableValue, typename OnSubscribe> +auto lift_impl_internal(OperatorFn&& op, rpp::specific_observable&& _this) +{ + return rpp::observable::create, OnSubscribe>>({ std::move(_this), std::forward(op) }); +} + +template OperatorFn, typename ObservableValue, typename OnSubscribe> +auto lift_impl_internal(OperatorFn&& op, const rpp::specific_observable& _this) +{ + return rpp::observable::create, OnSubscribe>>({ _this, std::forward(op) }); +} + template OperatorFn, typename TObs> auto lift_impl(OperatorFn&& op, TObs&& _this) { - return rpp::observable::create, std::decay_t>>({ std::forward(_this), std::forward(op) }); + return lift_impl_internal(std::forward(op), std::forward(_this)); } } // namespace rpp::details From df0f9102f58b9ebddee174fabcde87e4b133f895 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 16 Dec 2022 22:49:17 +0300 Subject: [PATCH 2/4] minor update --- src/rpp/rpp/operators/lift.hpp | 39 +++++++++++++--------------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/src/rpp/rpp/operators/lift.hpp b/src/rpp/rpp/operators/lift.hpp index acbb69ddf..c04d6a380 100644 --- a/src/rpp/rpp/operators/lift.hpp +++ b/src/rpp/rpp/operators/lift.hpp @@ -41,6 +41,9 @@ struct lift_action_by_callbacks } }; +template OperatorFn> +using subscriber_type_of_list_fn = utils::extract_subscriber_type_t>>; + /** * \brief Functor of "lift" operator for on_subscribe overload function. * \details Each observable has an on_subscribe function and observable is activated (pub-sub channel is established) after on_subscribe is called. The on_subscribe is called when the observable is subscribed by a subscriber @@ -48,27 +51,15 @@ struct lift_action_by_callbacks * \param _this is the current observable. * \param op is the functor that provides the "operator()(subscriber_of_new_type) -> subscriber_of_old_type". */ - template OperatorFn, typename ...ChildLiftArgs> -struct lift_on_subscribe -{ - using T = utils::extract_subscriber_type_t>>; - RPP_NO_UNIQUE_ADDRESS rpp::specific_observable> _this; - RPP_NO_UNIQUE_ADDRESS OperatorFn op; - - template TSub> - void operator()(TSub&& subscriber) const - { - _this.subscribe(op(std::forward(subscriber))); - } -}; +struct lift_on_subscribe : public lift_on_subscribe, ChildLiftArgs...>> {}; template OperatorFn, typename TOnSubscribe> struct lift_on_subscribe { - using T = utils::extract_subscriber_type_t>>; - RPP_NO_UNIQUE_ADDRESS rpp::specific_observable _this; - RPP_NO_UNIQUE_ADDRESS OperatorFn op; + using T = subscriber_type_of_list_fn; + RPP_NO_UNIQUE_ADDRESS specific_observable _this; + RPP_NO_UNIQUE_ADDRESS OperatorFn op; template TSub> void operator()(TSub&& subscriber) const @@ -78,27 +69,27 @@ struct lift_on_subscribe }; template OperatorFn, typename ObservableValue, typename ...ChildLiftArgs> -auto lift_impl_internal(OperatorFn&& op, rpp::specific_observable>&& _this) +auto lift_impl_internal(OperatorFn&& op, specific_observable>&& _this) { - return rpp::observable::create, ChildLiftArgs...>>({ std::move(_this), std::forward(op) }); + return observable::create, ChildLiftArgs...>>({ std::move(_this), std::forward(op) }); } template OperatorFn, typename ObservableValue, typename ...ChildLiftArgs> -auto lift_impl_internal(OperatorFn&& op, const rpp::specific_observable>& _this) +auto lift_impl_internal(OperatorFn&& op, const specific_observable>& _this) { - return rpp::observable::create, ChildLiftArgs...>>({ _this, std::forward(op) }); + return observable::create, ChildLiftArgs...>>({ _this, std::forward(op) }); } template OperatorFn, typename ObservableValue, typename OnSubscribe> -auto lift_impl_internal(OperatorFn&& op, rpp::specific_observable&& _this) +auto lift_impl_internal(OperatorFn&& op, specific_observable&& _this) { - return rpp::observable::create, OnSubscribe>>({ std::move(_this), std::forward(op) }); + return observable::create, OnSubscribe>>({ std::move(_this), std::forward(op) }); } template OperatorFn, typename ObservableValue, typename OnSubscribe> -auto lift_impl_internal(OperatorFn&& op, const rpp::specific_observable& _this) +auto lift_impl_internal(OperatorFn&& op, const specific_observable& _this) { - return rpp::observable::create, OnSubscribe>>({ _this, std::forward(op) }); + return observable::create, OnSubscribe>>({ _this, std::forward(op) }); } template OperatorFn, typename TObs> From 935ceb54032d94107b412062940804f14da9614e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 16 Dec 2022 23:06:15 +0300 Subject: [PATCH 3/4] simplify map --- src/rpp/rpp/operators/lift.hpp | 24 ++++++++++++++++++------ src/rpp/rpp/operators/map.hpp | 14 +++++++++++++- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/rpp/rpp/operators/lift.hpp b/src/rpp/rpp/operators/lift.hpp index c04d6a380..022eb15fa 100644 --- a/src/rpp/rpp/operators/lift.hpp +++ b/src/rpp/rpp/operators/lift.hpp @@ -22,6 +22,23 @@ IMPLEMENTATION_FILE(lift_tag); namespace rpp::details { +template OnNext, + std::invocable OnError = utils::forwarding_on_error, + std::invocable OnCompleted = utils::forwarding_on_completed> +auto create_subscriber_in_lift(TSub&& subscriber, + OnNext&& on_next = {}, + OnError&& on_error = {}, + OnCompleted&& on_completed = {}) +{ + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + std::forward(on_next), + std::forward(on_error), + std::forward(on_completed), + std::forward(subscriber)); +} template struct lift_action_by_callbacks { @@ -32,12 +49,7 @@ struct lift_action_by_callbacks template auto operator()(TSub&& subscriber) const { - auto subscription = subscriber.get_subscription(); - return create_subscriber_with_state(std::move(subscription), - on_next, - on_error, - on_completed, - std::forward(subscriber)); + return create_subscriber_in_lift(std::forward(subscriber), on_next, on_error, on_completed); } }; diff --git a/src/rpp/rpp/operators/map.hpp b/src/rpp/rpp/operators/map.hpp index f6172cebb..bd19dbd70 100644 --- a/src/rpp/rpp/operators/map.hpp +++ b/src/rpp/rpp/operators/map.hpp @@ -24,7 +24,7 @@ IMPLEMENTATION_FILE(map_tag); namespace rpp::details { template Callable> -struct map_impl +struct map_impl_on_next { RPP_NO_UNIQUE_ADDRESS Callable callable; @@ -34,4 +34,16 @@ struct map_impl subscriber.on_next(callable(utils::as_const(std::forward(value)))); } }; + +template Callable> +struct map_impl +{ + RPP_NO_UNIQUE_ADDRESS map_impl_on_next on_next; + + template + auto operator()(TSub&& subscriber) const + { + return create_subscriber_in_lift(std::forward(subscriber), on_next); + } +}; } // namespace rpp::details From 81cb50555d7c2454168f7573134ba1a3cf7a0bca Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 16 Dec 2022 23:13:32 +0300 Subject: [PATCH 4/4] simplify --- src/rpp/rpp/operators/filter.hpp | 18 +++++++++++++++++- src/rpp/rpp/operators/lift.hpp | 24 ++++++------------------ src/rpp/rpp/operators/map.hpp | 7 ++++++- src/rpp/rpp/operators/take_while.hpp | 19 ++++++++++++++++++- 4 files changed, 47 insertions(+), 21 deletions(-) diff --git a/src/rpp/rpp/operators/filter.hpp b/src/rpp/rpp/operators/filter.hpp index 09355df28..57f7c0ae1 100644 --- a/src/rpp/rpp/operators/filter.hpp +++ b/src/rpp/rpp/operators/filter.hpp @@ -23,7 +23,7 @@ IMPLEMENTATION_FILE(filter_tag); namespace rpp::details { template Predicate> -struct filter_impl +struct filter_impl_on_next { RPP_NO_UNIQUE_ADDRESS Predicate predicate; @@ -35,4 +35,20 @@ struct filter_impl } }; +template Predicate> +struct filter_impl +{ + RPP_NO_UNIQUE_ADDRESS filter_impl_on_next on_next; + + template + auto operator()(TSub&& subscriber) const + { + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + on_next, + utils::forwarding_on_error{}, + utils::forwarding_on_completed{}, + std::forward(subscriber)); + } +}; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/lift.hpp b/src/rpp/rpp/operators/lift.hpp index 022eb15fa..c04d6a380 100644 --- a/src/rpp/rpp/operators/lift.hpp +++ b/src/rpp/rpp/operators/lift.hpp @@ -22,23 +22,6 @@ IMPLEMENTATION_FILE(lift_tag); namespace rpp::details { -template OnNext, - std::invocable OnError = utils::forwarding_on_error, - std::invocable OnCompleted = utils::forwarding_on_completed> -auto create_subscriber_in_lift(TSub&& subscriber, - OnNext&& on_next = {}, - OnError&& on_error = {}, - OnCompleted&& on_completed = {}) -{ - auto subscription = subscriber.get_subscription(); - return create_subscriber_with_state(std::move(subscription), - std::forward(on_next), - std::forward(on_error), - std::forward(on_completed), - std::forward(subscriber)); -} template struct lift_action_by_callbacks { @@ -49,7 +32,12 @@ struct lift_action_by_callbacks template auto operator()(TSub&& subscriber) const { - return create_subscriber_in_lift(std::forward(subscriber), on_next, on_error, on_completed); + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + on_next, + on_error, + on_completed, + std::forward(subscriber)); } }; diff --git a/src/rpp/rpp/operators/map.hpp b/src/rpp/rpp/operators/map.hpp index bd19dbd70..e36f2ab0f 100644 --- a/src/rpp/rpp/operators/map.hpp +++ b/src/rpp/rpp/operators/map.hpp @@ -43,7 +43,12 @@ struct map_impl template auto operator()(TSub&& subscriber) const { - return create_subscriber_in_lift(std::forward(subscriber), on_next); + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + on_next, + utils::forwarding_on_error{}, + utils::forwarding_on_completed{}, + std::forward(subscriber)); } }; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/take_while.hpp b/src/rpp/rpp/operators/take_while.hpp index 815789f9e..3ec29d5b8 100644 --- a/src/rpp/rpp/operators/take_while.hpp +++ b/src/rpp/rpp/operators/take_while.hpp @@ -22,7 +22,7 @@ IMPLEMENTATION_FILE(take_while_tag); namespace rpp::details { template Predicate> -struct take_while_impl +struct take_while_impl_on_next { RPP_NO_UNIQUE_ADDRESS Predicate predicate; @@ -35,4 +35,21 @@ struct take_while_impl subscriber.on_completed(); } }; + +template Predicate> +struct take_while_impl +{ + RPP_NO_UNIQUE_ADDRESS take_while_impl_on_next on_next; + + template + auto operator()(TSub&& subscriber) const + { + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + on_next, + utils::forwarding_on_error{}, + utils::forwarding_on_completed{}, + std::forward(subscriber)); + } +}; } // namespace rpp::details