diff --git a/.gitignore b/.gitignore index b6bed4d54..1e0b317d3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .idea/ .vs/ .vscode/ +.cache/ build/ _build/ cmake/open-cpp-coverage.cmake diff --git a/src/rpp/rpp/operators/filter.hpp b/src/rpp/rpp/operators/filter.hpp index 09355df28..57f7c0ae1 100644 --- a/src/rpp/rpp/operators/filter.hpp +++ b/src/rpp/rpp/operators/filter.hpp @@ -23,7 +23,7 @@ IMPLEMENTATION_FILE(filter_tag); namespace rpp::details { template Predicate> -struct filter_impl +struct filter_impl_on_next { RPP_NO_UNIQUE_ADDRESS Predicate predicate; @@ -35,4 +35,20 @@ struct filter_impl } }; +template Predicate> +struct filter_impl +{ + RPP_NO_UNIQUE_ADDRESS filter_impl_on_next on_next; + + template + auto operator()(TSub&& subscriber) const + { + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + on_next, + utils::forwarding_on_error{}, + utils::forwarding_on_completed{}, + std::forward(subscriber)); + } +}; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/lift.hpp b/src/rpp/rpp/operators/lift.hpp index 1fe4fcdcd..c04d6a380 100644 --- a/src/rpp/rpp/operators/lift.hpp +++ b/src/rpp/rpp/operators/lift.hpp @@ -10,10 +10,13 @@ #pragma once +#include "rpp/observables/specific_observable.hpp" +#include "rpp/subscribers/constraints.hpp" #include // RPP_NO_UNIQUE_ADDRESS #include // create_subscriber_with_state #include // own forwarding #include // create observable +#include IMPLEMENTATION_FILE(lift_tag); @@ -38,6 +41,9 @@ struct lift_action_by_callbacks } }; +template OperatorFn> +using subscriber_type_of_list_fn = utils::extract_subscriber_type_t>>; + /** * \brief Functor of "lift" operator for on_subscribe overload function. * \details Each observable has an on_subscribe function and observable is activated (pub-sub channel is established) after on_subscribe is called. The on_subscribe is called when the observable is subscribed by a subscriber @@ -45,11 +51,15 @@ struct lift_action_by_callbacks * \param _this is the current observable. * \param op is the functor that provides the "operator()(subscriber_of_new_type) -> subscriber_of_old_type". */ -template OperatorFn, typename TObs> -struct lift_on_subscribe +template OperatorFn, typename ...ChildLiftArgs> +struct lift_on_subscribe : public lift_on_subscribe, ChildLiftArgs...>> {}; + +template OperatorFn, typename TOnSubscribe> +struct lift_on_subscribe { - RPP_NO_UNIQUE_ADDRESS TObs _this; - RPP_NO_UNIQUE_ADDRESS OperatorFn op; + using T = subscriber_type_of_list_fn; + RPP_NO_UNIQUE_ADDRESS specific_observable _this; + RPP_NO_UNIQUE_ADDRESS OperatorFn op; template TSub> void operator()(TSub&& subscriber) const @@ -58,9 +68,33 @@ struct lift_on_subscribe } }; +template OperatorFn, typename ObservableValue, typename ...ChildLiftArgs> +auto lift_impl_internal(OperatorFn&& op, specific_observable>&& _this) +{ + return observable::create, ChildLiftArgs...>>({ std::move(_this), std::forward(op) }); +} + +template OperatorFn, typename ObservableValue, typename ...ChildLiftArgs> +auto lift_impl_internal(OperatorFn&& op, const specific_observable>& _this) +{ + return observable::create, ChildLiftArgs...>>({ _this, std::forward(op) }); +} + +template OperatorFn, typename ObservableValue, typename OnSubscribe> +auto lift_impl_internal(OperatorFn&& op, specific_observable&& _this) +{ + return observable::create, OnSubscribe>>({ std::move(_this), std::forward(op) }); +} + +template OperatorFn, typename ObservableValue, typename OnSubscribe> +auto lift_impl_internal(OperatorFn&& op, const specific_observable& _this) +{ + return observable::create, OnSubscribe>>({ _this, std::forward(op) }); +} + template OperatorFn, typename TObs> auto lift_impl(OperatorFn&& op, TObs&& _this) { - return rpp::observable::create, std::decay_t>>({ std::forward(_this), std::forward(op) }); + return lift_impl_internal(std::forward(op), std::forward(_this)); } } // namespace rpp::details diff --git a/src/rpp/rpp/operators/map.hpp b/src/rpp/rpp/operators/map.hpp index f6172cebb..e36f2ab0f 100644 --- a/src/rpp/rpp/operators/map.hpp +++ b/src/rpp/rpp/operators/map.hpp @@ -24,7 +24,7 @@ IMPLEMENTATION_FILE(map_tag); namespace rpp::details { template Callable> -struct map_impl +struct map_impl_on_next { RPP_NO_UNIQUE_ADDRESS Callable callable; @@ -34,4 +34,21 @@ struct map_impl subscriber.on_next(callable(utils::as_const(std::forward(value)))); } }; + +template Callable> +struct map_impl +{ + RPP_NO_UNIQUE_ADDRESS map_impl_on_next on_next; + + template + auto operator()(TSub&& subscriber) const + { + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + on_next, + utils::forwarding_on_error{}, + utils::forwarding_on_completed{}, + std::forward(subscriber)); + } +}; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/take_while.hpp b/src/rpp/rpp/operators/take_while.hpp index 815789f9e..3ec29d5b8 100644 --- a/src/rpp/rpp/operators/take_while.hpp +++ b/src/rpp/rpp/operators/take_while.hpp @@ -22,7 +22,7 @@ IMPLEMENTATION_FILE(take_while_tag); namespace rpp::details { template Predicate> -struct take_while_impl +struct take_while_impl_on_next { RPP_NO_UNIQUE_ADDRESS Predicate predicate; @@ -35,4 +35,21 @@ struct take_while_impl subscriber.on_completed(); } }; + +template Predicate> +struct take_while_impl +{ + RPP_NO_UNIQUE_ADDRESS take_while_impl_on_next on_next; + + template + auto operator()(TSub&& subscriber) const + { + auto subscription = subscriber.get_subscription(); + return create_subscriber_with_state(std::move(subscription), + on_next, + utils::forwarding_on_error{}, + utils::forwarding_on_completed{}, + std::forward(subscriber)); + } +}; } // namespace rpp::details