diff --git a/src/benchmarks/rpp_benchmark.cpp b/src/benchmarks/rpp_benchmark.cpp index 8e1fb0695..335801e9b 100644 --- a/src/benchmarks/rpp_benchmark.cpp +++ b/src/benchmarks/rpp_benchmark.cpp @@ -1083,20 +1083,24 @@ TEST_CASE("on_error_resume_next") { BENCHMARK_ADVANCED("on_error_resume_next construction from observable via dot + subscribe")(Catch::Benchmark::Chronometer meter) { - const auto obs = rpp::observable::create([](const auto& subscriber) + auto err = std::make_exception_ptr(std::runtime_error{""}); + const auto obs = rpp::observable::create([&err](const auto& subscriber) { - subscriber.on_error(std::make_exception_ptr(std::runtime_error{""})); + subscriber.on_error(err); }); - auto subscriber = rpp::specific_subscriber{[](const int&) {}}; - meter.measure([&] + std::vector> subs{}; + for (int i = 0; i < meter.runs(); ++i) + subs.push_back(rpp::dynamic_subscriber{}); + + meter.measure([&](int i) { return obs .on_error_resume_next([](auto&&) { - return rpp::observable::just(1); + return rpp::observable::never(); }) - .subscribe(subscriber); + .subscribe(subs[i]); }); }; } diff --git a/src/benchmarks/rxcpp_benchmark.cpp b/src/benchmarks/rxcpp_benchmark.cpp index 6438bcfd6..dc41a80f2 100644 --- a/src/benchmarks/rxcpp_benchmark.cpp +++ b/src/benchmarks/rxcpp_benchmark.cpp @@ -1049,20 +1049,24 @@ TEST_CASE("on_error_resume_next") { BENCHMARK_ADVANCED("on_error_resume_next construction from observable via dot + subscribe")(Catch::Benchmark::Chronometer meter) { - const auto obs = rxcpp::sources::create([](const auto& subscriber) + auto err = std::make_exception_ptr(std::runtime_error{""}); + const auto obs = rxcpp::sources::create([&err](const auto& subscriber) { - subscriber.on_error(std::make_exception_ptr(std::runtime_error{""})); + subscriber.on_error(err); }); - auto subscriber = rxcpp::make_subscriber(); - meter.measure([&](int) + std::vector> subs{}; + for (int i = 0; i < meter.runs(); ++i) + subs.push_back(rxcpp::make_subscriber()); + + meter.measure([&](int i) { return obs .on_error_resume_next([](auto&&) { - return rxcpp::observable<>::just(1); + return rxcpp::observable<>::never(); }) - .subscribe(subscriber); + .subscribe(subs[i]); }); }; } diff --git a/src/rpp/rpp/observables/details/member_overload.hpp b/src/rpp/rpp/observables/details/member_overload.hpp index 6cbc6754f..0a61ad651 100644 --- a/src/rpp/rpp/observables/details/member_overload.hpp +++ b/src/rpp/rpp/observables/details/member_overload.hpp @@ -15,6 +15,6 @@ namespace rpp::details { -template +template struct member_overload; } // namespace rpp::details diff --git a/src/rpp/rpp/observables/interface_observable.hpp b/src/rpp/rpp/observables/interface_observable.hpp index 0208be9e3..a27a9d182 100644 --- a/src/rpp/rpp/observables/interface_observable.hpp +++ b/src/rpp/rpp/observables/interface_observable.hpp @@ -10,14 +10,12 @@ #pragma once +#include // RPP_EMPTY_BASES +#include // as_blocking #include // own constraints #include // own forwarding #include // forwarding of member_overaloads -#include // must-have operators -#include // as_blocking - -#include // RPP_EMPTY_BASES -#include +#include // decayed_invoke_result_t #include diff --git a/src/rpp/rpp/observers/fwd.hpp b/src/rpp/rpp/observers/fwd.hpp index 25bba773b..f68672827 100644 --- a/src/rpp/rpp/observers/fwd.hpp +++ b/src/rpp/rpp/observers/fwd.hpp @@ -37,4 +37,7 @@ template class specific_observer; + +template +using specific_observer_with_decayed_args = rpp::specific_observer...>; } // namespace rpp \ No newline at end of file diff --git a/src/rpp/rpp/observers/specific_observer.hpp b/src/rpp/rpp/observers/specific_observer.hpp index 342e2a5b1..3637c095f 100644 --- a/src/rpp/rpp/observers/specific_observer.hpp +++ b/src/rpp/rpp/observers/specific_observer.hpp @@ -67,9 +67,6 @@ specific_observer(OnNext, OnError, Args...) -> specific_observer specific_observer(OnNext, OnCompleted) -> specific_observer, OnNext, utils::rethrow_error_t, OnCompleted>; -template -using specific_observer_with_decayed_args = rpp::specific_observer...>; - /** * \brief Create specific_observer with manually specified Type. In case of type can be deduced from argument of OnNext use direct constructor of rpp::specific_observer * \tparam Type manually specific type of observer diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index 8b96de4ef..77d13297e 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -11,14 +11,13 @@ #pragma once -#include - -#include -#include -#include -#include +#include // required due to operator uses lift +#include // create_subscriber_with_dynamic_state +#include // own forwarding +#include // subscriber_of_type +#include // forwarding_on_error -#include // create_subscriber_with_state +#include IMPLEMENTATION_FILE(buffer_tag); diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 4d96c9688..1498963e2 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -11,18 +11,15 @@ #pragma once -#include - -#include -#include -#include -#include -#include -#include -#include - +#include // RPP_NO_UNIQUE_ADDRESS +#include // required due to operator uses lift +#include // merge_state #include // create_subscriber_with_state +#include // own forwarding +#include // constraint::subscriber_of_type +#include // spinlock +#include IMPLEMENTATION_FILE(combine_latest_tag); diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 377d85da4..081f3efd1 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -10,21 +10,19 @@ #pragma once -#include -#include -#include -#include - -#include +#include // dynamic_observable +#include // required due to operator uses lift +#include // merge_forwarding_on_next/merge_on_error #include // create_subscriber_with_state - -#include +#include // own forwarding +#include +#include // constraint::subscriber_of_type +#include // composite_subscription #include #include - -#include #include +#include #include diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 8075ec5ef..33ef7d16b 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -11,10 +11,11 @@ #pragma once -#include +#include // RPP_NO_UNIQUE_ADDRESS +#include // required due to operator uses lift #include // create_subscriber_with_state -#include -#include +#include // own forwarding +#include // constraint::subscriber_of_type IMPLEMENTATION_FILE(delay_tag); diff --git a/src/rpp/rpp/operators/details/serialized_subscriber.hpp b/src/rpp/rpp/operators/details/serialized_subscriber.hpp index 156e574fe..966d9a488 100644 --- a/src/rpp/rpp/operators/details/serialized_subscriber.hpp +++ b/src/rpp/rpp/operators/details/serialized_subscriber.hpp @@ -10,7 +10,7 @@ #pragma once #include -#include +#include // create_subscriber_with_state #include #include diff --git a/src/rpp/rpp/operators/distinct_until_changed.hpp b/src/rpp/rpp/operators/distinct_until_changed.hpp index 6ec53f296..c729da3d4 100644 --- a/src/rpp/rpp/operators/distinct_until_changed.hpp +++ b/src/rpp/rpp/operators/distinct_until_changed.hpp @@ -10,17 +10,14 @@ #pragma once -#include -#include -#include -#include // create_subscriber_with_state -#include +#include // RPP_NO_UNIQUE_ADDRESS +#include // required due to operator uses lift +#include // create_subscriber_with_dynamic_state +#include // own forwarding +#include // constraint::subscriber_of_type +#include // forwarding_on_error/forwarding_on_completed +#include // as_const -#include - -#include - -#include #include diff --git a/src/rpp/rpp/operators/do.hpp b/src/rpp/rpp/operators/do.hpp index 05bc84a66..973a1d6d3 100644 --- a/src/rpp/rpp/operators/do.hpp +++ b/src/rpp/rpp/operators/do.hpp @@ -10,11 +10,11 @@ #pragma once -#include -#include - -#include - +#include // required due to operator uses lift +#include // own forwarding +#include // create_subscriber_with_state +#include // constraint::subscriber_of_type +#include // utils::as_const IMPLEMENTATION_FILE(do_tag); diff --git a/src/rpp/rpp/operators/filter.hpp b/src/rpp/rpp/operators/filter.hpp index caf29dfba..5a2e69200 100644 --- a/src/rpp/rpp/operators/filter.hpp +++ b/src/rpp/rpp/operators/filter.hpp @@ -10,12 +10,11 @@ #pragma once -#include -#include -#include -#include - -#include +#include // RPP_NO_UNIQUE_ADDRESS +#include // required due to operator uses lift +#include // own forwarding +#include // constraint::subscriber_of_type +#include // utils::as_const #include diff --git a/src/rpp/rpp/operators/first.hpp b/src/rpp/rpp/operators/first.hpp index 6d339384d..65285e5aa 100644 --- a/src/rpp/rpp/operators/first.hpp +++ b/src/rpp/rpp/operators/first.hpp @@ -11,14 +11,13 @@ #pragma once -#include -#include -#include -#include -#include -#include - -#include +#include // required due to operator uses lift +#include // take_state +#include // create_subscriber_with_state +#include // own forwarding +#include // constraint::subscriber +#include // not_enough_emissions +#include // forwarding_on_error IMPLEMENTATION_FILE(first_tag); diff --git a/src/rpp/rpp/operators/flat_map.hpp b/src/rpp/rpp/operators/flat_map.hpp index 0aabeedf3..83737503a 100644 --- a/src/rpp/rpp/operators/flat_map.hpp +++ b/src/rpp/rpp/operators/flat_map.hpp @@ -10,10 +10,9 @@ #pragma once -#include - #include #include +#include IMPLEMENTATION_FILE(flat_map_tag); diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 22f9d49ad..1aa551061 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/fwd/buffer.hpp b/src/rpp/rpp/operators/fwd/buffer.hpp index c7e6e743d..9c1efe6b2 100644 --- a/src/rpp/rpp/operators/fwd/buffer.hpp +++ b/src/rpp/rpp/operators/fwd/buffer.hpp @@ -11,11 +11,10 @@ #pragma once -#include - -#include #include +#include + namespace rpp::details { struct buffer_tag; diff --git a/src/rpp/rpp/operators/fwd/combine_latest.hpp b/src/rpp/rpp/operators/fwd/combine_latest.hpp index aebbb0fa6..9a1183a66 100644 --- a/src/rpp/rpp/operators/fwd/combine_latest.hpp +++ b/src/rpp/rpp/operators/fwd/combine_latest.hpp @@ -10,11 +10,10 @@ #pragma once -#include -#include -#include -#include - +#include // constraint::observable +#include // member_overload +#include // decayed_invoke_result_t +#include // pack_to_tuple #include diff --git a/src/rpp/rpp/operators/fwd/concat.hpp b/src/rpp/rpp/operators/fwd/concat.hpp index 69f7dcd91..32da7f54d 100644 --- a/src/rpp/rpp/operators/fwd/concat.hpp +++ b/src/rpp/rpp/operators/fwd/concat.hpp @@ -10,9 +10,8 @@ #pragma once -#include - -#include +#include // constraint::observable_of_type +#include // member_overload namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd/delay.hpp b/src/rpp/rpp/operators/fwd/delay.hpp index 1e8c07beb..546b065f5 100644 --- a/src/rpp/rpp/operators/fwd/delay.hpp +++ b/src/rpp/rpp/operators/fwd/delay.hpp @@ -10,10 +10,8 @@ #pragma once -#include -#include -#include -#include +#include // schedulers::constraint::scheduler +#include // member_overload namespace rpp::details { @@ -22,7 +20,6 @@ struct delay_tag; namespace rpp::details { - template struct delay_impl; diff --git a/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp b/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp index 5c27f143a..3318e6f56 100644 --- a/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp +++ b/src/rpp/rpp/operators/fwd/distinct_until_changed.hpp @@ -10,7 +10,7 @@ #pragma once -#include +#include // member_overload #include #include diff --git a/src/rpp/rpp/operators/fwd/do.hpp b/src/rpp/rpp/operators/fwd/do.hpp index 45a7c3dcc..b64a8b080 100644 --- a/src/rpp/rpp/operators/fwd/do.hpp +++ b/src/rpp/rpp/operators/fwd/do.hpp @@ -10,9 +10,8 @@ #pragma once -#include -#include -#include +#include // member_overload +#include // constraint::observer_of_type namespace rpp::details { diff --git a/src/rpp/rpp/operators/fwd/first.hpp b/src/rpp/rpp/operators/fwd/first.hpp index 12a0e51ed..0797bdcc7 100644 --- a/src/rpp/rpp/operators/fwd/first.hpp +++ b/src/rpp/rpp/operators/fwd/first.hpp @@ -11,11 +11,7 @@ #pragma once -#include #include -#include - -#include namespace rpp::details { diff --git a/src/rpp/rpp/operators/fwd/flat_map.hpp b/src/rpp/rpp/operators/fwd/flat_map.hpp index 897c5b9a4..cc9bc415b 100644 --- a/src/rpp/rpp/operators/fwd/flat_map.hpp +++ b/src/rpp/rpp/operators/fwd/flat_map.hpp @@ -10,10 +10,9 @@ #pragma once -#include -#include - -#include +#include // constraint::observable +#include // member_overload +#include // decayed_invoke_result_t namespace rpp::details { diff --git a/src/rpp/rpp/operators/fwd/last.hpp b/src/rpp/rpp/operators/fwd/last.hpp index c6e7c5ea1..55a52508f 100644 --- a/src/rpp/rpp/operators/fwd/last.hpp +++ b/src/rpp/rpp/operators/fwd/last.hpp @@ -11,7 +11,6 @@ #pragma once -#include #include namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd/lift.hpp b/src/rpp/rpp/operators/fwd/lift.hpp new file mode 100644 index 000000000..33ddaea06 --- /dev/null +++ b/src/rpp/rpp/operators/fwd/lift.hpp @@ -0,0 +1,161 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include // member_overload +#include // constraint::subscriber +#include // extract_subscriber_type_t +#include // forwarding_on_error + +namespace rpp::details +{ +struct lift_tag; +} + +namespace rpp::details +{ +template +concept lift_fn = constraint::subscriber>>; + +template +struct lift_action_by_callbacks; + +template +using decayed_lift_action_by_callbacks = lift_action_by_callbacks...>; + +template OperatorFn, typename TObs> +auto lift_impl(OperatorFn&& op, TObs&& _this); + +template +struct member_overload +{ + /** + * \brief The lift operator provides ability to create your own operator and apply it to observable + * \tparam NewType manually specified new type of observable after applying of fn + * \param op represents operator logic in the form: accepts NEW subscriber and returns OLD subscriber + * \return new specific_observable of NewType + */ + template + auto lift(lift_fn auto&& op) const& requires is_header_included + { + return details::lift_impl(std::forward(op), CastThis()); + } + + template + auto lift(lift_fn auto&& op)&& requires is_header_included + { + return details::lift_impl(std::forward(op), MoveThis()); + } + + // ********************************* LIFT OPERATOR: SUBSCRIBER -> SUBSCRIBER ******************// + /** + * \brief The lift operator provides ability to create your own operator and apply it to observable + * \tparam OperatorFn type of your custom functor + * \tparam NewType auto-deduced type of observable after applying of fn + * \param op represents operator logic in the form: accepts NEW subscriber and returns OLD subscriber + * \return new specific_observable of NewType + */ + template>> + auto lift(OperatorFn&& op) const& requires (details::lift_fn && is_header_included) + { + return details::lift_impl(std::forward(op), CastThis()); + } + template>> + auto lift(OperatorFn&& op) && requires (details::lift_fn && is_header_included) + { + return details::lift_impl(std::forward(op), MoveThis()); + } + + // ********************************* LIFT Direct type + OnNext, Onerror, OnCompleted ******************// + + /** + * \brief The lift operator provides ability to create your own operator and apply it to observable. + * \details This overload provides this ability via providing on_next, on_eror and on_completed with 2 params: old type of value + new subscriber + * \tparam NewType manually specified new type of observable after lift + * \tparam OnNext on_next of new subscriber accepting old value + new subscriber (logic how to transfer old value to new subscriber) + * \tparam OnError on_error of new subscriber accepting exception + new subscriber + * \tparam OnCompleted on_completed of new subscriber accepting new subscriber + * \return new specific_observable of NewType + */ + template> OnNext, + std::invocable> OnError = utils::forwarding_on_error, + std::invocable> OnCompleted = utils::forwarding_on_completed> + auto lift(OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {}) const& requires is_header_included + { + return details::lift_impl(details::decayed_lift_action_by_callbacks{std::forward(on_next), + std::forward(on_error), + std::forward(on_completed)}, + CastThis()); + } + + template> OnNext, + std::invocable> OnError = utils::forwarding_on_error, + std::invocable> OnCompleted = utils::forwarding_on_completed> + auto lift(OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {})&& requires is_header_included + { + return details::lift_impl(details::decayed_lift_action_by_callbacks{std::forward(on_next), + std::forward(on_error), + std::forward(on_completed)}, + MoveThis()); + } + + // ********************************* LIFT OnNext, Onerror, OnCompleted ******************// + + /** + * \brief The lift operator provides ability to create your own operator and apply it to observable. + * \details This overload provides this ability via providing on_next, on_eror and on_completed with 2 params: old type of value + new subscriber + * \tparam OnNext on_next of new subscriber accepting old value + new subscriber + * \tparam OnError on_error of new subscriber accepting exception + new subscriber + * \tparam OnCompleted on_completed of new subscriber accepting new subscriber + * \return new specific_observable of NewType + */ + template>>, + std::invocable> OnError = utils::forwarding_on_error, + std::invocable> OnCompleted = utils::forwarding_on_completed> + requires std::invocable> + auto lift(OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {}) const& requires is_header_included + { + return details::lift_impl(details::decayed_lift_action_by_callbacks{std::forward(on_next), + std::forward(on_error), + std::forward(on_completed)}, + CastThis()); + } + + template>>, + std::invocable> OnError = utils::forwarding_on_error, + std::invocable> OnCompleted = utils::forwarding_on_completed> + requires std::invocable> + auto lift(OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {})&& requires is_header_included + { + return details::lift_impl(details::decayed_lift_action_by_callbacks{std::forward(on_next), + std::forward(on_error), + std::forward(on_completed)}, + MoveThis()); + } + +private: + const SpecificObservable& CastThis() const + { + return *static_cast(this); + } + + SpecificObservable&& MoveThis() + { + return std::move(*static_cast(this)); + } +}; +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd/map.hpp b/src/rpp/rpp/operators/fwd/map.hpp index fc6915f23..340ec87f2 100644 --- a/src/rpp/rpp/operators/fwd/map.hpp +++ b/src/rpp/rpp/operators/fwd/map.hpp @@ -11,7 +11,6 @@ #pragma once #include - #include namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd/multicast.hpp b/src/rpp/rpp/operators/fwd/multicast.hpp index 72c7cd4d3..ec089dbd4 100644 --- a/src/rpp/rpp/operators/fwd/multicast.hpp +++ b/src/rpp/rpp/operators/fwd/multicast.hpp @@ -10,8 +10,8 @@ #pragma once -#include #include +#include #include namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp index 20565e4ea..6de4bb40c 100644 --- a/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/fwd/on_error_resume_next.hpp @@ -14,7 +14,6 @@ #include #include #include -#include #include diff --git a/src/rpp/rpp/operators/fwd/publish.hpp b/src/rpp/rpp/operators/fwd/publish.hpp index 3b9eefcba..17561b740 100644 --- a/src/rpp/rpp/operators/fwd/publish.hpp +++ b/src/rpp/rpp/operators/fwd/publish.hpp @@ -10,8 +10,8 @@ #pragma once -#include #include +#include namespace rpp::details { diff --git a/src/rpp/rpp/operators/fwd/ref_count.hpp b/src/rpp/rpp/operators/fwd/ref_count.hpp index ec2309e2a..fb410ef45 100644 --- a/src/rpp/rpp/operators/fwd/ref_count.hpp +++ b/src/rpp/rpp/operators/fwd/ref_count.hpp @@ -10,8 +10,8 @@ #pragma once -#include #include +#include namespace rpp::details { diff --git a/src/rpp/rpp/operators/fwd/repeat.hpp b/src/rpp/rpp/operators/fwd/repeat.hpp index 7f14c18f5..3f1363c76 100644 --- a/src/rpp/rpp/operators/fwd/repeat.hpp +++ b/src/rpp/rpp/operators/fwd/repeat.hpp @@ -10,8 +10,8 @@ #pragma once -#include #include +#include namespace rpp::details { diff --git a/src/rpp/rpp/operators/fwd/start_with.hpp b/src/rpp/rpp/operators/fwd/start_with.hpp index 31a633e13..3ea065499 100644 --- a/src/rpp/rpp/operators/fwd/start_with.hpp +++ b/src/rpp/rpp/operators/fwd/start_with.hpp @@ -10,11 +10,10 @@ #pragma once +#include #include #include -#include - namespace rpp::details { struct start_with_tag; diff --git a/src/rpp/rpp/operators/fwd/subscribe_on.hpp b/src/rpp/rpp/operators/fwd/subscribe_on.hpp index 72385f798..fc9ba2174 100644 --- a/src/rpp/rpp/operators/fwd/subscribe_on.hpp +++ b/src/rpp/rpp/operators/fwd/subscribe_on.hpp @@ -10,11 +10,10 @@ #pragma once -#include #include +#include #include - namespace rpp::details { struct subscribe_on_tag; diff --git a/src/rpp/rpp/operators/fwd/take_until.hpp b/src/rpp/rpp/operators/fwd/take_until.hpp index a0ee144c8..34fab575a 100644 --- a/src/rpp/rpp/operators/fwd/take_until.hpp +++ b/src/rpp/rpp/operators/fwd/take_until.hpp @@ -12,8 +12,6 @@ #include #include -#include -#include namespace rpp::details { diff --git a/src/rpp/rpp/operators/group_by.hpp b/src/rpp/rpp/operators/group_by.hpp index e1b6c2cf4..6a140ac11 100644 --- a/src/rpp/rpp/operators/group_by.hpp +++ b/src/rpp/rpp/operators/group_by.hpp @@ -1,12 +1,15 @@ #pragma once -#include -#include -#include -#include -#include -#include - +#include // RPP_NO_UNIQUE_ADDRESS +#include // grouped_observable +#include // required due to operator uses lift +#include // create_subscriber_with_state +#include // own forwarding +#include // publish_subject +#include // constraint::subscriber +#include // utils::as_const + +#include #include IMPLEMENTATION_FILE(group_by_tag); diff --git a/src/rpp/rpp/operators/last.hpp b/src/rpp/rpp/operators/last.hpp index f48886381..0b1f20534 100644 --- a/src/rpp/rpp/operators/last.hpp +++ b/src/rpp/rpp/operators/last.hpp @@ -11,18 +11,18 @@ #pragma once -#include -#include -#include -#include -#include -#include +#include // required due to operator uses lift +#include // take_last +#include // own forwarding +#include // create_subscriber_with_state +#include // constraint::subscriber +#include // not_enough_emissions +#include // forwarding_on_error IMPLEMENTATION_FILE(last_tag); namespace rpp::details { - template struct last_state : public take_last_state { diff --git a/src/rpp/rpp/operators/lift.hpp b/src/rpp/rpp/operators/lift.hpp index a1710358d..1fe4fcdcd 100644 --- a/src/rpp/rpp/operators/lift.hpp +++ b/src/rpp/rpp/operators/lift.hpp @@ -10,25 +10,15 @@ #pragma once -#include // override this +#include // RPP_NO_UNIQUE_ADDRESS #include // create_subscriber_with_state -#include // concept for lift_impl -#include // extract subscriber type - -#include - -namespace rpp::details -{ -struct lift_tag; -} +#include // own forwarding +#include // create observable IMPLEMENTATION_FILE(lift_tag); namespace rpp::details { -template -concept lift_fn = constraint::subscriber>>; - template struct lift_action_by_callbacks { @@ -48,9 +38,6 @@ struct lift_action_by_callbacks } }; -template -using decayed_lift_action_by_callbacks = lift_action_by_callbacks...>; - /** * \brief Functor of "lift" operator for on_subscribe overload function. * \details Each observable has an on_subscribe function and observable is activated (pub-sub channel is established) after on_subscribe is called. The on_subscribe is called when the observable is subscribed by a subscriber @@ -74,132 +61,6 @@ struct lift_on_subscribe template OperatorFn, typename TObs> auto lift_impl(OperatorFn&& op, TObs&& _this) { - using LiftedOnSubscribeFn = lift_on_subscribe, std::decay_t>; - return specific_observable{LiftedOnSubscribeFn{ std::forward(_this), std::forward(op) }}; + return rpp::observable::create, std::decay_t>>({ std::forward(_this), std::forward(op) }); } - -template -struct member_overload -{ - /** - * \brief The lift operator provides ability to create your own operator and apply it to observable - * \tparam NewType manually specified new type of observable after applying of fn - * \param op represents operator logic in the form: accepts NEW subscriber and returns OLD subscriber - * \return new specific_observable of NewType - */ - template - auto lift(lift_fn auto&& op) const& - { - return details::lift_impl(std::forward(op), CastThis()); - } - - template - auto lift(lift_fn auto&& op)&& - { - return details::lift_impl(std::forward(op), MoveThis()); - } - - // ********************************* LIFT OPERATOR: SUBSCRIBER -> SUBSCRIBER ******************// - /** - * \brief The lift operator provides ability to create your own operator and apply it to observable - * \tparam OperatorFn type of your custom functor - * \tparam NewType auto-deduced type of observable after applying of fn - * \param op represents operator logic in the form: accepts NEW subscriber and returns OLD subscriber - * \return new specific_observable of NewType - */ - template>> - auto lift(OperatorFn&& op) const& requires details::lift_fn - { - return details::lift_impl(std::forward(op), CastThis()); - } - template>> - auto lift(OperatorFn&& op) && requires details::lift_fn - { - return details::lift_impl(std::forward(op), MoveThis()); - } - - // ********************************* LIFT Direct type + OnNext, Onerror, OnCompleted ******************// - - /** - * \brief The lift operator provides ability to create your own operator and apply it to observable. - * \details This overload provides this ability via providing on_next, on_eror and on_completed with 2 params: old type of value + new subscriber - * \tparam NewType manually specified new type of observable after lift - * \tparam OnNext on_next of new subscriber accepting old value + new subscriber (logic how to transfer old value to new subscriber) - * \tparam OnError on_error of new subscriber accepting exception + new subscriber - * \tparam OnCompleted on_completed of new subscriber accepting new subscriber - * \return new specific_observable of NewType - */ - template> OnNext, - std::invocable> OnError = utils::forwarding_on_error, - std::invocable> OnCompleted = utils::forwarding_on_completed> - auto lift(OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {}) const& - { - return details::lift_impl(details::decayed_lift_action_by_callbacks{std::forward(on_next), - std::forward(on_error), - std::forward(on_completed)}, - CastThis()); - } - - template> OnNext, - std::invocable> OnError = utils::forwarding_on_error, - std::invocable> OnCompleted = utils::forwarding_on_completed> - auto lift(OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {})&& - { - return details::lift_impl(details::decayed_lift_action_by_callbacks{std::forward(on_next), - std::forward(on_error), - std::forward(on_completed)}, - MoveThis()); - } - - // ********************************* LIFT OnNext, Onerror, OnCompleted ******************// - - /** - * \brief The lift operator provides ability to create your own operator and apply it to observable. - * \details This overload provides this ability via providing on_next, on_eror and on_completed with 2 params: old type of value + new subscriber - * \tparam OnNext on_next of new subscriber accepting old value + new subscriber - * \tparam OnError on_error of new subscriber accepting exception + new subscriber - * \tparam OnCompleted on_completed of new subscriber accepting new subscriber - * \return new specific_observable of NewType - */ - template>>, - std::invocable> OnError = utils::forwarding_on_error, - std::invocable> OnCompleted = utils::forwarding_on_completed> - requires std::invocable> - auto lift(OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {}) const& - { - return details::lift_impl(details::decayed_lift_action_by_callbacks{std::forward(on_next), - std::forward(on_error), - std::forward(on_completed)}, - CastThis()); - } - - template>>, - std::invocable> OnError = utils::forwarding_on_error, - std::invocable> OnCompleted = utils::forwarding_on_completed> - requires std::invocable> - auto lift(OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {})&& - { - return details::lift_impl(details::decayed_lift_action_by_callbacks{std::forward(on_next), - std::forward(on_error), - std::forward(on_completed)}, - MoveThis()); - } - -private: - const SpecificObservable& CastThis() const - { - return *static_cast(this); - } - - SpecificObservable&& MoveThis() - { - return std::move(*static_cast(this)); - } -}; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/map.hpp b/src/rpp/rpp/operators/map.hpp index 7182c5c61..f6172cebb 100644 --- a/src/rpp/rpp/operators/map.hpp +++ b/src/rpp/rpp/operators/map.hpp @@ -10,13 +10,12 @@ #pragma once -#include -#include -#include -#include -#include - -#include +#include // RPP_NO_UNIQUE_ADDRESS +#include // required due to operator uses lift +#include // own forwarding +#include // constraint::subscriber_of_type +#include // decayed_invoke_result_t +#include // as_counst #include diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 36a3493c9..3742a68bc 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -10,14 +10,14 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include +#include // required due to operator uses lift +#include // early_unsubscribe +#include // make_serialized_subscriber +#include // create_subscriber_with_state +#include // own forwarding +#include // just +#include // constraint::subscriber +#include // forwarding_on_next #include #include diff --git a/src/rpp/rpp/operators/multicast.hpp b/src/rpp/rpp/operators/multicast.hpp index c56c6bde1..92bc280ed 100644 --- a/src/rpp/rpp/operators/multicast.hpp +++ b/src/rpp/rpp/operators/multicast.hpp @@ -10,9 +10,9 @@ #pragma once +#include #include #include -#include IMPLEMENTATION_FILE(multicast_tag); diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 90df7ae98..fee92f790 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -11,16 +11,17 @@ #pragma once -#include +#include // RPP_NO_UNIQUE_ADDRESS +#include // required due to operator uses lift #include // create_subscriber_with_state -#include -#include +#include // own forwarduing +#include // constraint::subscriber_of_type +#include IMPLEMENTATION_FILE(on_error_resume_next_tag); namespace rpp::details { - /** * Functor (type-erasure) of "on_error_resume_next" for on_error operator. */ @@ -31,14 +32,8 @@ struct on_error_resume_next_on_error const auto& subscriber, const ResumeCallable& resume_callable) const { - using Type = rpp::utils::extract_subscriber_type_t; - // Subscribe to next_observable - resume_callable(err).subscribe(create_subscriber_with_state(subscriber.get_subscription(), - rpp::utils::forwarding_on_next{}, - rpp::utils::forwarding_on_error{}, - rpp::utils::forwarding_on_completed{}, - subscriber)); + resume_callable(err).subscribe(subscriber); } }; @@ -48,7 +43,7 @@ struct on_error_resume_next_on_error template struct on_error_resume_next_impl { - RPP_NO_UNIQUE_ADDRESS ResumeCallable m_resume_callable; + RPP_NO_UNIQUE_ADDRESS ResumeCallable resume_callable; template TSub> auto operator()(TSub&& downstream_subscriber) const @@ -61,7 +56,7 @@ struct on_error_resume_next_impl on_error_resume_next_on_error{}, rpp::utils::forwarding_on_completed{}, std::forward(downstream_subscriber), - m_resume_callable); + resume_callable); } }; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/publish.hpp b/src/rpp/rpp/operators/publish.hpp index 893a02be0..7abc86eaf 100644 --- a/src/rpp/rpp/operators/publish.hpp +++ b/src/rpp/rpp/operators/publish.hpp @@ -11,8 +11,8 @@ #pragma once #include -#include #include +#include IMPLEMENTATION_FILE(publish_tag); diff --git a/src/rpp/rpp/operators/ref_count.hpp b/src/rpp/rpp/operators/ref_count.hpp index c2348254e..b5908a97e 100644 --- a/src/rpp/rpp/operators/ref_count.hpp +++ b/src/rpp/rpp/operators/ref_count.hpp @@ -10,13 +10,11 @@ #pragma once -#include -#include -#include -#include - +#include // required due to operator uses lift #include // create_subscriber_with_state - +#include // own forwarding +#include // create observable +#include // constraint::subscriber_of_type IMPLEMENTATION_FILE(ref_count_tag); @@ -82,6 +80,6 @@ struct ref_count_on_subscribe template TObs> auto ref_count_impl(TObs&& observable) { - return rpp::source::create(ref_count_on_subscribe>{std::forward(observable)}); + return source::create(ref_count_on_subscribe>{std::forward(observable)}); } } // namespace rpp::details diff --git a/src/rpp/rpp/operators/repeat.hpp b/src/rpp/rpp/operators/repeat.hpp index 2ec85d0ea..4c4c99e43 100644 --- a/src/rpp/rpp/operators/repeat.hpp +++ b/src/rpp/rpp/operators/repeat.hpp @@ -10,19 +10,13 @@ #pragma once -#include - -#include -#include -#include -#include -#include - -#include - - +#include // RPP_NO_UNIQUE_ADDRESS +#include // required due to operator uses lift #include // create_subscriber_with_state - +#include // own forwarding +#include // create observable +#include // constraint::subscriber +#include // forwarding_on_next IMPLEMENTATION_FILE(repeat_tag); diff --git a/src/rpp/rpp/operators/sample.hpp b/src/rpp/rpp/operators/sample.hpp index e6888986a..82f8d898e 100644 --- a/src/rpp/rpp/operators/sample.hpp +++ b/src/rpp/rpp/operators/sample.hpp @@ -10,6 +10,7 @@ #pragma once +#include // required due to operator uses lift #include #include #include diff --git a/src/rpp/rpp/operators/scan.hpp b/src/rpp/rpp/operators/scan.hpp index 5cb9b7157..a9d8e25e2 100644 --- a/src/rpp/rpp/operators/scan.hpp +++ b/src/rpp/rpp/operators/scan.hpp @@ -10,17 +10,16 @@ #pragma once -#include -#include +#include // RPP_NO_UNIQUE_ADDRESS +#include // required due to operator uses lift #include // create_subscriber_with_state -#include +#include // own forwarding +#include // constraint::subscriber +#include // forwarding_on_error +#include // utils::as_const -#include -#include - - -IMPLEMENTATION_FILE (scan_tag); +IMPLEMENTATION_FILE(scan_tag); namespace rpp::details { @@ -35,7 +34,7 @@ struct scan_on_next { template void operator()(auto&& value, - const rpp::constraint::subscriber auto& sub, + const constraint::subscriber auto& sub, const scan_state& state) const { state.seed = state.accumulator(std::move(state.seed), std::forward(value)); diff --git a/src/rpp/rpp/operators/skip.hpp b/src/rpp/rpp/operators/skip.hpp index 54779d329..0fc9e09be 100644 --- a/src/rpp/rpp/operators/skip.hpp +++ b/src/rpp/rpp/operators/skip.hpp @@ -10,11 +10,11 @@ #pragma once +#include // required due to operator uses lift #include // create_subscriber_with_state -#include -#include -#include -#include +#include // own forwarding +#include // constraint::subscriber +#include // forwarding_on_error IMPLEMENTATION_FILE(skip_tag); diff --git a/src/rpp/rpp/operators/start_with.hpp b/src/rpp/rpp/operators/start_with.hpp index bc2d609ff..2aa496ee7 100644 --- a/src/rpp/rpp/operators/start_with.hpp +++ b/src/rpp/rpp/operators/start_with.hpp @@ -10,13 +10,10 @@ #pragma once -#include -#include - #include +#include #include - IMPLEMENTATION_FILE (start_with_tag); namespace rpp::details diff --git a/src/rpp/rpp/operators/subscribe_on.hpp b/src/rpp/rpp/operators/subscribe_on.hpp index 7afb82892..651748f2c 100644 --- a/src/rpp/rpp/operators/subscribe_on.hpp +++ b/src/rpp/rpp/operators/subscribe_on.hpp @@ -11,11 +11,9 @@ #pragma once -#include #include -#include #include -#include +#include IMPLEMENTATION_FILE (subscribe_on_tag); diff --git a/src/rpp/rpp/operators/switch_map.hpp b/src/rpp/rpp/operators/switch_map.hpp index 5bc7d327d..16ef00468 100644 --- a/src/rpp/rpp/operators/switch_map.hpp +++ b/src/rpp/rpp/operators/switch_map.hpp @@ -11,10 +11,9 @@ #pragma once -#include - #include #include +#include IMPLEMENTATION_FILE(switch_map_tag); diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index 2c9799234..e6683918e 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -10,10 +10,9 @@ #pragma once -#include -#include -#include +#include // required due to operator uses lift #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/take.hpp b/src/rpp/rpp/operators/take.hpp index 3bdf4856e..de04e5aee 100644 --- a/src/rpp/rpp/operators/take.hpp +++ b/src/rpp/rpp/operators/take.hpp @@ -10,9 +10,10 @@ #pragma once +#include // required due to operator uses lift +#include // create_subscriber_with_state #include #include -#include // create_subscriber_with_state #include IMPLEMENTATION_FILE(take_tag); diff --git a/src/rpp/rpp/operators/take_last.hpp b/src/rpp/rpp/operators/take_last.hpp index e55c0efa8..54665f51e 100644 --- a/src/rpp/rpp/operators/take_last.hpp +++ b/src/rpp/rpp/operators/take_last.hpp @@ -10,10 +10,10 @@ #pragma once -#include -#include - +#include // required due to operator uses lift #include // create_subscriber_with_state +#include +#include #include #include diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index aff5050da..e7b89d0a6 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -11,17 +11,14 @@ #pragma once -#include +#include // required due to operator uses lift #include +#include // create_subscriber_with_state #include #include #include #include -#include // create_subscriber_with_state - -#include - IMPLEMENTATION_FILE(take_until_tag); namespace rpp::details diff --git a/src/rpp/rpp/operators/take_while.hpp b/src/rpp/rpp/operators/take_while.hpp index b818df062..0778c8a7f 100644 --- a/src/rpp/rpp/operators/take_while.hpp +++ b/src/rpp/rpp/operators/take_while.hpp @@ -10,12 +10,12 @@ #pragma once +#include +#include // required due to operator uses lift #include #include #include -#include - IMPLEMENTATION_FILE(take_while_tag); diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index f97acb537..8cf90ec79 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -1,9 +1,20 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + #pragma once #include +#include #include +#include // create_subscriber_with_state -#include IMPLEMENTATION_FILE(window_tag); diff --git a/src/rpp/rpp/sources/create.hpp b/src/rpp/rpp/sources/create.hpp index 975df80ea..6217483fc 100644 --- a/src/rpp/rpp/sources/create.hpp +++ b/src/rpp/rpp/sources/create.hpp @@ -10,8 +10,9 @@ #pragma once -#include #include +#include + #include IMPLEMENTATION_FILE(create_tag); diff --git a/src/tests/test_lift.cpp b/src/tests/test_lift.cpp index e06f56385..d3b04f8c2 100644 --- a/src/tests/test_lift.cpp +++ b/src/tests/test_lift.cpp @@ -12,6 +12,7 @@ #include "mock_observer.hpp" #include +#include #include #include #include diff --git a/src/tests/test_subscriber.cpp b/src/tests/test_subscriber.cpp index c5b7a8adf..c0bfcd9e4 100644 --- a/src/tests/test_subscriber.cpp +++ b/src/tests/test_subscriber.cpp @@ -12,6 +12,7 @@ #include +#include #include #include