From 676c98c32c1de660173a73f3060d18a901110295 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sun, 11 Feb 2024 23:54:42 +0100 Subject: [PATCH 1/8] Add zip operator --- src/benchmarks/benchmarks.cpp | 15 ++ src/rpp/rpp/operators.hpp | 5 +- src/rpp/rpp/operators/combine_latest.hpp | 98 +-------- .../operators/details/combining_strategy.hpp | 125 ++++++++++++ src/rpp/rpp/operators/zip.hpp | 129 ++++++++++++ src/tests/rpp/test_zip.cpp | 188 ++++++++++++++++++ 6 files changed, 469 insertions(+), 91 deletions(-) create mode 100644 src/rpp/rpp/operators/details/combining_strategy.hpp create mode 100644 src/rpp/rpp/operators/zip.hpp create mode 100644 src/tests/rpp/test_zip.cpp diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index 2564deeec..a836ae842 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -310,6 +310,21 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) | rxcpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); }); } + + SECTION("immediate_just(1) + zip(immediate_just(2)) + subscribe") + { + TEST_RPP([&]() { + rpp::immediate_just(1) + | rpp::operators::zip(rpp::immediate_just(2)) + | rpp::operators::subscribe([](auto&& v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + TEST_RXCPP([&]() { + rxcpp::immediate_just(1) + | rxcpp::operators::zip(rxcpp::immediate_just(2)) + | rxcpp::operators::subscribe>([](auto&& v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } } BENCHMARK("Conditional Operators") diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index d91434379..3ec4ec6ba 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -65,8 +65,8 @@ * @ingroup operators */ -#include #include +#include /** * @defgroup combining_operators Combining Operators @@ -77,9 +77,10 @@ #include #include -#include #include #include +#include +#include /** * @defgroup utility_operators Utility Operators diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 17d62352d..08842b8fa 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -13,59 +13,32 @@ #include #include -#include +#include #include -#include #include -#include - namespace rpp::operators::details { template -class combine_latest_disposable final : public composite_disposable +class combine_latest_disposable final : public combining_disposable { public: explicit combine_latest_disposable(Observer&& observer, const TSelector& selector) - : observer_with_mutex{std::move(observer)} - , selector{selector} + : combining_disposable(std::forward(observer), selector) { } - pointer_under_lock get_observer_under_lock() { return pointer_under_lock{observer_with_mutex}; } - - rpp::utils::tuple...>& get_values() { return values; } - - const TSelector& get_selector() const { return selector; } - - bool decrement_on_completed() - { - // just need atomicity, not guarding anything - return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; - } + auto& get_values() { return m_values; } private: - value_with_mutex observer_with_mutex{}; - rpp::utils::tuple...> values{}; - RPP_NO_UNIQUE_ADDRESS TSelector selector; - - std::atomic_size_t m_on_completed_needed{sizeof...(Args)}; + rpp::utils::tuple...> m_values{}; }; template -struct combine_latest_observer_strategy +struct combine_latest_observer_strategy final + : public combining_observer_strategy> { - std::shared_ptr> disposable{}; - - void set_upstream(const rpp::disposable_wrapper& d) const - { - disposable->add(d); - } - - bool is_disposed() const - { - return disposable->is_disposed(); - } + using combining_observer_strategy>::disposable; template void on_next(T&& v) const @@ -79,64 +52,11 @@ struct combine_latest_observer_strategy observer->on_next(disposable->get_selector()(vals.value()...)); }); } - - void on_error(const std::exception_ptr& err) const - { - disposable->dispose(); - disposable->get_observer_under_lock()->on_error(err); - } - - void on_completed() const - { - if (disposable->decrement_on_completed()) - { - disposable->dispose(); - disposable->get_observer_under_lock()->on_completed(); - } - } }; template -struct combine_latest_t +struct combine_latest_t final : public combining_operator_t { - RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables; - RPP_NO_UNIQUE_ADDRESS TSelector selector; - - template - requires std::invocable...> - using result_value = std::invoke_result_t...>; - - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - - template - void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const - { - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); - } - -private: - template - static void subscribe_impl(Observer&& observer, const observable_chain_strategy& observable_strategy, const TSelector& selector, const TObservables&... observables) - { - using ExpectedValue = typename observable_chain_strategy::value_type; - using Disposable = combine_latest_disposable...>; - - const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); - auto locked = disposable.lock(); - locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); - subscribe>(locked, std::index_sequence_for{}, observables...); - - observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(locked)}); - } - - template - static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) - { - (..., observables.subscribe(rpp::observer, combine_latest_observer_strategy...>>{disposable})); - } }; } diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp new file mode 100644 index 000000000..96f225dfa --- /dev/null +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -0,0 +1,125 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include + +namespace rpp::operators::details +{ +template +class combining_disposable : public composite_disposable +{ +public: + explicit combining_disposable(Observer&& observer, const TSelector& selector) + : m_observer_with_mutex{std::move(observer)} + , m_selector{selector} + { + } + + pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer_with_mutex}; } + + const TSelector& get_selector() const { return m_selector; } + + bool decrement_on_completed() + { + // just need atomicity, not guarding anything + return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; + } + +private: + value_with_mutex m_observer_with_mutex{}; + RPP_NO_UNIQUE_ADDRESS TSelector m_selector; + + std::atomic_size_t m_on_completed_needed{sizeof...(Args)}; +}; + +template +struct combining_observer_strategy +{ + std::shared_ptr disposable{}; + + void set_upstream(const rpp::disposable_wrapper& d) const + { + disposable->add(d); + } + + bool is_disposed() const + { + return disposable->is_disposed(); + } + + void on_error(const std::exception_ptr& err) const + { + disposable->dispose(); + disposable->get_observer_under_lock()->on_error(err); + } + + void on_completed() const + { + if (disposable->decrement_on_completed()) + { + disposable->dispose(); + disposable->get_observer_under_lock()->on_completed(); + } + } +}; + +template typename TDisposable, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> +struct combining_operator_t +{ + RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables; + RPP_NO_UNIQUE_ADDRESS TSelector selector; + + template + requires std::invocable...> + using result_value = std::invoke_result_t...>; + + template + using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + + template + void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + { + // Need to take ownership over current_thread in case of inner-observables also using it + auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); + observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); + } + +private: + template + static void subscribe_impl(Observer&& observer, const observable_chain_strategy& observable_strategy, const TSelector& selector, const TObservables&... observables) + { + using ExpectedValue = typename observable_chain_strategy::value_type; + using Disposable = TDisposable...>; + + const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); + auto locked = disposable.lock(); + locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); + subscribe>(locked, std::index_sequence_for{}, observables...); + + observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(locked)}); + } + + template + static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) + { + (..., observables.subscribe(rpp::observer, TStrategy...>>{disposable})); + } +}; +} \ No newline at end of file diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp new file mode 100644 index 000000000..12c880abc --- /dev/null +++ b/src/rpp/rpp/operators/zip.hpp @@ -0,0 +1,129 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include +#include +#include + +#include + +namespace rpp::operators::details +{ +template +class zip_disposable final : public combining_disposable +{ +public: + explicit zip_disposable(Observer&& observer, const TSelector& selector) + : combining_disposable(std::forward(observer), selector) + { + } + + auto& get_pendings() { return m_pendings; } + +private: + utils::tuple...> m_pendings{}; +}; + +template +struct zip_observer_strategy final + : public combining_observer_strategy> +{ + using combining_observer_strategy>::disposable; + + template + void on_next(T&& v) const + { + const auto observer = disposable->get_observer_under_lock(); + disposable->get_pendings().template get().push_back(std::forward(v)); + + disposable->get_pendings().apply([this, &observer](auto&... values) { + if ((!values.empty() && ...)) + { + observer->on_next(disposable->get_selector()(std::move(values.front())...)); + (..., values.pop_front()); + } + }); + } +}; + +template +struct zip_t final : public combining_operator_t +{ +}; +} + +namespace rpp::operators +{ +/** + * @brief combines emissions from observables and emit single items for each combination based on the results of provided selector + * + * @marble zip_custom_selector + { + source observable : +------1 -2 -3-- ------| + source other_observable : +-5-6-- - ---7 --8---| + operator "zip: x,y =>std::pair{x,y}" : +------{1,5}-{2,6}---{3,7}------| + } + * + * + * @par Performance notes: + * - 1 heap allocation for disposable + * - each value from any observable copied/moved to internal storage + * - mutex acquired every time value obtained + * + * @param selector is applied to current emission of current observable and latests emissions from observables + * @param observables are observables whose emissions would be zipped with current observable + * @warning #include + * + * @ingroup combining_operators + * @see https://reactivex.io/documentation/operators/zip.html + */ +template + requires (!rpp::constraint::observable && (!utils::is_not_template_callable || std::invocable, utils::extract_observable_type_t...>)) +auto zip(TSelector&& selector, TObservable&& observable, TObservables&&... observables) +{ + return details::zip_t, std::decay_t, std::decay_t...>{ + rpp::utils::tuple{std::forward(observable), std::forward(observables)...}, + std::forward(selector) + }; +} + +/** + * @brief combines emissions from observables and emit tuple of items for each combination + * + * @marble zip_custom_selector + { + source observable : +------1 -2 -3-- ------| + source other_observable : +-5-6-- - ---7 --8---| + operator "zip: make_tuple" : +------{1,5}-{2,6}---{3,7}------| + } + * + * + * @par Performance notes: + * - 1 heap allocation for disposable + * - each value from any observable copied/moved to internal storage + * - mutex acquired every time value obtained + * + * @param observables are observables whose emissions would be zipped with current observable + * @warning #include + * + * @ingroup combining_operators + * @see https://reactivex.io/documentation/operators/zip.html + */ +template +auto zip(TObservable&& observable, TObservables&&... observables) +{ + return zip(rpp::utils::pack_to_tuple{}, std::forward(observable), std::forward(observables)...); +} +} // namespace rpp::operators \ No newline at end of file diff --git a/src/tests/rpp/test_zip.cpp b/src/tests/rpp/test_zip.cpp new file mode 100644 index 000000000..f4d9570f3 --- /dev/null +++ b/src/tests/rpp/test_zip.cpp @@ -0,0 +1,188 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" +#include "mock_observer.hpp" + +TEST_CASE("zip zips items") +{ + SECTION("observable of -1-2-3-| zip with -4-5-6-| on immediate scheduler") + { + auto mock = mock_observer_strategy>{}; + rpp::source::just(rpp::schedulers::immediate{}, 1, 2, 3) + | rpp::ops::zip(rpp::source::just(rpp::schedulers::immediate{}, 4, 5, 6)) + | rpp::ops::subscribe(mock); + + CHECK(mock.get_received_values() == std::vector>{ + std::make_tuple(1, 4), + std::make_tuple(2, 5), + std::make_tuple(3, 6), + }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + + SECTION("observable of -1-2-3-| zip with -4-5-6-| on current_thread") + { + auto mock = mock_observer_strategy>{}; + + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) // source 1 + | rpp::ops::zip(rpp::source::just(rpp::schedulers::current_thread{}, 4, 5, 6)) // source 2 + | rpp::ops::subscribe(mock); + + // Above stream should output in such sequence + // source 1: -1---2---3-| + // source 2: -4---5---6-| + + CHECK(mock.get_received_values() == std::vector>{ + std::make_tuple(1, 4), + std::make_tuple(2, 5), + std::make_tuple(3, 6), + }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + + SECTION("observable of -1-2-3-| zip with two other sources on current_thread") + { + auto mock = mock_observer_strategy>{}; + + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) // source 1 + | rpp::ops::zip( + rpp::source::just(rpp::schedulers::current_thread{}, 4, 5, 6), // source 2 + rpp::source::just(rpp::schedulers::current_thread{}, 7, 8, 9)) // source 3 + | rpp::ops::subscribe(mock); + + // Above stream should output in such sequence + // source 1: --1---2---3-| + // source 2: -4---5---6-| + // source 3: --7---8---9-| + + CHECK(mock.get_received_values() == std::vector>{ + std::make_tuple(1, 4, 7), + std::make_tuple(2, 5, 8), + std::make_tuple(3, 6, 9), + }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } +} + +TEST_CASE("zip waits for all emissions") +{ + SECTION("observable of -1-2-3-| and never") + { + auto mock = mock_observer_strategy>{}; + rpp::source::just(1, 2, 3) + | rpp::ops::zip(rpp::source::never()) + | rpp::ops::subscribe(mock); + + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 0); + } +} + +TEST_CASE("zip forwards errors") +{ + SECTION("observable of -1-2-3-| combines with error") + { + auto mock = mock_observer_strategy>{}; + rpp::source::just(1, 2, 3) + | rpp::ops::zip(rpp::source::error(std::make_exception_ptr(std::runtime_error{""}))) + | rpp::ops::subscribe(mock); + + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 1); + } +} + +TEST_CASE("zip handles race conditions") +{ + SECTION("source observable in current thread pairs with error in other thread") + { + std::atomic_bool on_error_called{false}; + auto subject = rpp::subjects::publish_subject{}; + + SECTION("subscribe on it") + { + SECTION("on_error can't interleave with on_next") + { + rpp::source::just(1, 1, 1) + | rpp::ops::zip(rpp::source::concat(rpp::source::just(2), subject.get_observable())) + | rpp::ops::as_blocking() + | rpp::ops::subscribe([&](auto&&) { + CHECK(!on_error_called); + std::thread{[&] + { + subject.get_observer().on_error(std::exception_ptr{}); + }}.detach(); + std::this_thread::sleep_for(std::chrono::seconds{1}); + CHECK(!on_error_called); }, + [&](auto) { on_error_called = true; }); + + CHECK(on_error_called); + } + } + } +} + +TEST_CASE("zip doesn't produce extra copies") +{ + SECTION("send value by copy") + { + copy_count_tracker verifier{}; + auto obs = verifier.get_observable() + | rpp::ops::zip( + [](copy_count_tracker&& verifier, auto&&) { return verifier; }, + rpp::source::just(1)); + obs.subscribe([](copy_count_tracker) {}); + REQUIRE(verifier.get_copy_count() == 1); // 1 copy to internal state + REQUIRE(verifier.get_move_count() == 2); // 1 move to selector + 1 move to final subscriber + } + + SECTION("send value by move") + { + copy_count_tracker verifier{}; + auto obs = verifier.get_observable_for_move() + | rpp::ops::zip( + [](auto&& verifier, auto&&) { return verifier; }, + rpp::source::just(1)); + obs.subscribe([](copy_count_tracker) {}); + REQUIRE(verifier.get_copy_count() == 0); + REQUIRE(verifier.get_move_count() == 2); // 1 move to selector + 1 move to final subscriber + } +} + +TEST_CASE("zip satisfies disposable contracts") +{ + auto observable_disposable = rpp::composite_disposable_wrapper::make(); + { + auto observable = observable_with_disposable(observable_disposable); + + test_operator_with_disposable(rpp::ops::zip(observable)); + } + + CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); +} \ No newline at end of file From 9f3b9507426d8bfdbe5fdf45a57b55af415ccb85 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Mon, 12 Feb 2024 21:28:23 +0100 Subject: [PATCH 2/8] Address comments --- src/rpp/rpp/operators/combine_latest.hpp | 23 ++++++++---- .../operators/details/combining_strategy.hpp | 10 ++---- src/rpp/rpp/operators/zip.hpp | 35 ++++++++++++------- src/tests/rpp/test_zip.cpp | 2 +- 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 08842b8fa..9ba0960c9 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -15,22 +15,26 @@ #include #include #include -#include namespace rpp::operators::details { template -class combine_latest_disposable final : public combining_disposable +class combine_latest_disposable final : public combining_disposable { public: explicit combine_latest_disposable(Observer&& observer, const TSelector& selector) - : combining_disposable(std::forward(observer), selector) + : combining_disposable(std::forward(observer)) + , m_selector(selector) { } + const auto& get_selector() const { return m_selector; } + auto& get_values() { return m_values; } private: + RPP_NO_UNIQUE_ADDRESS TSelector m_selector; + rpp::utils::tuple...> m_values{}; }; @@ -47,10 +51,15 @@ struct combine_latest_observer_strategy final const auto observer = disposable->get_observer_under_lock(); disposable->get_values().template get().emplace(std::forward(v)); - disposable->get_values().apply([this, &observer](const std::optional&... vals) { - if ((vals.has_value() && ...)) - observer->on_next(disposable->get_selector()(vals.value()...)); - }); + disposable->get_values().apply(&apply_impl, disposable, observer); + } + +private: + template + static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, const std::optional&... vals) + { + if ((vals.has_value() && ...)) + observer->on_next(disposable->get_selector()(vals.value()...)); } }; diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 96f225dfa..74e620583 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -22,20 +22,17 @@ namespace rpp::operators::details { -template +template class combining_disposable : public composite_disposable { public: - explicit combining_disposable(Observer&& observer, const TSelector& selector) + explicit combining_disposable(Observer&& observer) : m_observer_with_mutex{std::move(observer)} - , m_selector{selector} { } pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer_with_mutex}; } - const TSelector& get_selector() const { return m_selector; } - bool decrement_on_completed() { // just need atomicity, not guarding anything @@ -43,8 +40,7 @@ class combining_disposable : public composite_disposable } private: - value_with_mutex m_observer_with_mutex{}; - RPP_NO_UNIQUE_ADDRESS TSelector m_selector; + value_with_mutex m_observer_with_mutex{}; std::atomic_size_t m_on_completed_needed{sizeof...(Args)}; }; diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp index 12c880abc..28be84cad 100644 --- a/src/rpp/rpp/operators/zip.hpp +++ b/src/rpp/rpp/operators/zip.hpp @@ -15,25 +15,29 @@ #include #include #include -#include -#include +#include namespace rpp::operators::details { template -class zip_disposable final : public combining_disposable +class zip_disposable final : public combining_disposable { public: explicit zip_disposable(Observer&& observer, const TSelector& selector) - : combining_disposable(std::forward(observer), selector) + : combining_disposable(std::forward(observer)) + , m_selector(selector) { } + const auto& get_selector() const { return m_selector; } + auto& get_pendings() { return m_pendings; } private: - utils::tuple...> m_pendings{}; + RPP_NO_UNIQUE_ADDRESS TSelector m_selector; + + utils::tuple...> m_pendings{}; }; template @@ -48,13 +52,18 @@ struct zip_observer_strategy final const auto observer = disposable->get_observer_under_lock(); disposable->get_pendings().template get().push_back(std::forward(v)); - disposable->get_pendings().apply([this, &observer](auto&... values) { - if ((!values.empty() && ...)) - { - observer->on_next(disposable->get_selector()(std::move(values.front())...)); - (..., values.pop_front()); - } - }); + disposable->get_pendings().apply(&apply_impl, disposable, observer); + } + +private: + template + static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, std::deque&... values) + { + if ((!values.empty() && ...)) + { + observer->on_next(disposable->get_selector()(std::move(values.front())...)); + (values.pop_front(), ...); + } } }; @@ -102,7 +111,7 @@ auto zip(TSelector&& selector, TObservable&& observable, TObservables&&... obser /** * @brief combines emissions from observables and emit tuple of items for each combination * - * @marble zip_custom_selector + * @marble zip { source observable : +------1 -2 -3-- ------| source other_observable : +-5-6-- - ---7 --8---| diff --git a/src/tests/rpp/test_zip.cpp b/src/tests/rpp/test_zip.cpp index f4d9570f3..235aa7afe 100644 --- a/src/tests/rpp/test_zip.cpp +++ b/src/tests/rpp/test_zip.cpp @@ -171,7 +171,7 @@ TEST_CASE("zip doesn't produce extra copies") rpp::source::just(1)); obs.subscribe([](copy_count_tracker) {}); REQUIRE(verifier.get_copy_count() == 0); - REQUIRE(verifier.get_move_count() == 2); // 1 move to selector + 1 move to final subscriber + REQUIRE(verifier.get_move_count() == 3); // 1 move to interal state + 1 move to selector + 1 move to final subscriber } } From ab0ea35ec598420e895db2483331f5ab2910702f Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Tue, 13 Feb 2024 22:11:39 +0100 Subject: [PATCH 3/8] Address comment --- src/rpp/rpp/operators/combine_latest.hpp | 4 ++-- src/rpp/rpp/operators/zip.hpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 9ba0960c9..d5afc4b34 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -33,9 +33,9 @@ class combine_latest_disposable final : public combining_disposable...> m_values{}; + + RPP_NO_UNIQUE_ADDRESS TSelector m_selector; }; template diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp index 28be84cad..dc3370b3d 100644 --- a/src/rpp/rpp/operators/zip.hpp +++ b/src/rpp/rpp/operators/zip.hpp @@ -35,9 +35,9 @@ class zip_disposable final : public combining_disposable auto& get_pendings() { return m_pendings; } private: - RPP_NO_UNIQUE_ADDRESS TSelector m_selector; - utils::tuple...> m_pendings{}; + + RPP_NO_UNIQUE_ADDRESS TSelector m_selector; }; template From 4196c6d8c077271c96fae4fbe6f581b33ca61198 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Wed, 14 Feb 2024 21:13:39 +0100 Subject: [PATCH 4/8] Add nolint notations --- src/tests/rpp/test_zip.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/tests/rpp/test_zip.cpp b/src/tests/rpp/test_zip.cpp index 235aa7afe..1f4667e1f 100644 --- a/src/tests/rpp/test_zip.cpp +++ b/src/tests/rpp/test_zip.cpp @@ -157,9 +157,9 @@ TEST_CASE("zip doesn't produce extra copies") | rpp::ops::zip( [](copy_count_tracker&& verifier, auto&&) { return verifier; }, rpp::source::just(1)); - obs.subscribe([](copy_count_tracker) {}); - REQUIRE(verifier.get_copy_count() == 1); // 1 copy to internal state - REQUIRE(verifier.get_move_count() == 2); // 1 move to selector + 1 move to final subscriber + obs.subscribe([](copy_count_tracker) {}); // NOLINT + REQUIRE(verifier.get_copy_count() == 1); // 1 copy to internal state + REQUIRE(verifier.get_move_count() == 2); // 1 move to selector + 1 move to final subscriber } SECTION("send value by move") @@ -169,7 +169,7 @@ TEST_CASE("zip doesn't produce extra copies") | rpp::ops::zip( [](auto&& verifier, auto&&) { return verifier; }, rpp::source::just(1)); - obs.subscribe([](copy_count_tracker) {}); + obs.subscribe([](copy_count_tracker) {}); // NOLINT REQUIRE(verifier.get_copy_count() == 0); REQUIRE(verifier.get_move_count() == 3); // 1 move to interal state + 1 move to selector + 1 move to final subscriber } From 42c9e770ad85d84a5ef3cabec43c728188e6cb8f Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Wed, 14 Feb 2024 22:21:54 +0100 Subject: [PATCH 5/8] Use emplace_back instead of push_back --- src/rpp/rpp/operators/zip.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp index dc3370b3d..fa1029ebf 100644 --- a/src/rpp/rpp/operators/zip.hpp +++ b/src/rpp/rpp/operators/zip.hpp @@ -50,7 +50,7 @@ struct zip_observer_strategy final void on_next(T&& v) const { const auto observer = disposable->get_observer_under_lock(); - disposable->get_pendings().template get().push_back(std::forward(v)); + disposable->get_pendings().template get().emplace_back(std::forward(v)); disposable->get_pendings().apply(&apply_impl, disposable, observer); } From e7b414a9335de218bc71628eb3d6d5c1b333f720 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Thu, 15 Feb 2024 22:17:57 +0100 Subject: [PATCH 6/8] Fix gcc10 test --- src/tests/rpp/test_zip.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/rpp/test_zip.cpp b/src/tests/rpp/test_zip.cpp index 1f4667e1f..a3e3ec27c 100644 --- a/src/tests/rpp/test_zip.cpp +++ b/src/tests/rpp/test_zip.cpp @@ -155,7 +155,7 @@ TEST_CASE("zip doesn't produce extra copies") copy_count_tracker verifier{}; auto obs = verifier.get_observable() | rpp::ops::zip( - [](copy_count_tracker&& verifier, auto&&) { return verifier; }, + [](copy_count_tracker&& verifier, auto&&) { return std::move(verifier); }, rpp::source::just(1)); obs.subscribe([](copy_count_tracker) {}); // NOLINT REQUIRE(verifier.get_copy_count() == 1); // 1 copy to internal state @@ -167,7 +167,7 @@ TEST_CASE("zip doesn't produce extra copies") copy_count_tracker verifier{}; auto obs = verifier.get_observable_for_move() | rpp::ops::zip( - [](auto&& verifier, auto&&) { return verifier; }, + [](copy_count_tracker&& verifier, auto&&) { return std::move(verifier); }, rpp::source::just(1)); obs.subscribe([](copy_count_tracker) {}); // NOLINT REQUIRE(verifier.get_copy_count() == 0); From ef57bbbd17399d4f5a975d18976927f9edcf636d Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Fri, 16 Feb 2024 18:10:02 +0100 Subject: [PATCH 7/8] Fix code smells --- src/rpp/rpp/operators/combine_latest.hpp | 4 ++-- src/rpp/rpp/operators/zip.hpp | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index d5afc4b34..c02e57d85 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -23,7 +23,7 @@ class combine_latest_disposable final : public combining_disposable(std::forward(observer)) + : combining_disposable(std::move(observer)) , m_selector(selector) { } @@ -64,7 +64,7 @@ struct combine_latest_observer_strategy final }; template -struct combine_latest_t final : public combining_operator_t +struct combine_latest_t : public combining_operator_t { }; } diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp index fa1029ebf..7915e2232 100644 --- a/src/rpp/rpp/operators/zip.hpp +++ b/src/rpp/rpp/operators/zip.hpp @@ -25,7 +25,7 @@ class zip_disposable final : public combining_disposable { public: explicit zip_disposable(Observer&& observer, const TSelector& selector) - : combining_disposable(std::forward(observer)) + : combining_disposable(std::move(observer)) , m_selector(selector) { } @@ -50,7 +50,7 @@ struct zip_observer_strategy final void on_next(T&& v) const { const auto observer = disposable->get_observer_under_lock(); - disposable->get_pendings().template get().emplace_back(std::forward(v)); + disposable->get_pendings().template get().push_back(std::forward(v)); disposable->get_pendings().apply(&apply_impl, disposable, observer); } @@ -68,7 +68,7 @@ struct zip_observer_strategy final }; template -struct zip_t final : public combining_operator_t +struct zip_t : public combining_operator_t { }; } From 2548ba0783ca94a27c4918d3b6874ee99b58ba25 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 Feb 2024 17:12:23 +0000 Subject: [PATCH 8/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/benchmarks/benchmarks.cpp | 2 +- src/rpp/rpp/operators/combine_latest.hpp | 80 +++---- .../operators/details/combining_strategy.hpp | 158 +++++++------- src/rpp/rpp/operators/zip.hpp | 199 +++++++++--------- 4 files changed, 219 insertions(+), 220 deletions(-) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index 29cb2a0ce..af3ca7490 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -325,7 +325,7 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) | rxcpp::operators::subscribe>([](auto&& v) { ankerl::nanobench::doNotOptimizeAway(v); }); }); } - } + } // BENCHMARK("Combining Operators") BENCHMARK("Conditional Operators") { diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index bf4099251..e712f6628 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -18,56 +18,56 @@ namespace rpp::operators::details { -template -class combine_latest_disposable final : public combining_disposable -{ -public: - explicit combine_latest_disposable(Observer&& observer, const TSelector& selector) - : combining_disposable(std::move(observer)) - , m_selector(selector) + template + class combine_latest_disposable final : public combining_disposable { - } + public: + explicit combine_latest_disposable(Observer&& observer, const TSelector& selector) + : combining_disposable(std::move(observer)) + , m_selector(selector) + { + } - const auto& get_selector() const { return m_selector; } + const auto& get_selector() const { return m_selector; } - auto& get_values() { return m_values; } + auto& get_values() { return m_values; } -private: - rpp::utils::tuple...> m_values{}; + private: + rpp::utils::tuple...> m_values{}; - RPP_NO_UNIQUE_ADDRESS TSelector m_selector; -}; + RPP_NO_UNIQUE_ADDRESS TSelector m_selector; + }; -template -struct combine_latest_observer_strategy final - : public combining_observer_strategy> -{ - using combining_observer_strategy>::disposable; - - template - void on_next(T&& v) const + template + struct combine_latest_observer_strategy final + : public combining_observer_strategy> { - // mutex need to be locked during changing of values, generating new values and sending of new values due to we can't update value while we are sending old one - const auto observer = disposable->get_observer_under_lock(); - disposable->get_values().template get().emplace(std::forward(v)); + using combining_observer_strategy>::disposable; - disposable->get_values().apply(&apply_impl, disposable, observer); - } + template + void on_next(T&& v) const + { + // mutex need to be locked during changing of values, generating new values and sending of new values due to we can't update value while we are sending old one + const auto observer = disposable->get_observer_under_lock(); + disposable->get_values().template get().emplace(std::forward(v)); -private: - template - static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, const std::optional&... vals) - { - if ((vals.has_value() && ...)) - observer->on_next(disposable->get_selector()(vals.value()...)); - } -}; + disposable->get_values().apply(&apply_impl, disposable, observer); + } -template -struct combine_latest_t : public combining_operator_t -{ -}; -} + private: + template + static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, const std::optional&... vals) + { + if ((vals.has_value() && ...)) + observer->on_next(disposable->get_selector()(vals.value()...)); + } + }; + + template + struct combine_latest_t : public combining_operator_t + { + }; +} // namespace rpp::operators::details namespace rpp::operators { diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 46ff39f5c..c08a106f7 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -22,104 +22,104 @@ namespace rpp::operators::details { -template -class combining_disposable : public composite_disposable -{ -public: - explicit combining_disposable(Observer&& observer) - : m_observer_with_mutex{std::move(observer)} + template + class combining_disposable : public composite_disposable { - } - - pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer_with_mutex}; } + public: + explicit combining_disposable(Observer&& observer) + : m_observer_with_mutex{std::move(observer)} + { + } - bool decrement_on_completed() - { - // just need atomicity, not guarding anything - return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; - } + pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer_with_mutex}; } -private: - value_with_mutex m_observer_with_mutex{}; + bool decrement_on_completed() + { + // just need atomicity, not guarding anything + return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; + } - std::atomic_size_t m_on_completed_needed{sizeof...(Args)}; -}; + private: + value_with_mutex m_observer_with_mutex{}; -template -struct combining_observer_strategy -{ - std::shared_ptr disposable{}; + std::atomic_size_t m_on_completed_needed{sizeof...(Args)}; + }; - void set_upstream(const rpp::disposable_wrapper& d) const + template + struct combining_observer_strategy { - disposable->add(d); - } + std::shared_ptr disposable{}; - bool is_disposed() const - { - return disposable->is_disposed(); - } + void set_upstream(const rpp::disposable_wrapper& d) const + { + disposable->add(d); + } - void on_error(const std::exception_ptr& err) const - { - disposable->dispose(); - disposable->get_observer_under_lock()->on_error(err); - } + bool is_disposed() const + { + return disposable->is_disposed(); + } - void on_completed() const - { - if (disposable->decrement_on_completed()) + void on_error(const std::exception_ptr& err) const { disposable->dispose(); - disposable->get_observer_under_lock()->on_completed(); + disposable->get_observer_under_lock()->on_error(err); } - } -}; -template typename TDisposable, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> -struct combining_operator_t -{ - RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables; - RPP_NO_UNIQUE_ADDRESS TSelector selector; + void on_completed() const + { + if (disposable->decrement_on_completed()) + { + disposable->dispose(); + disposable->get_observer_under_lock()->on_completed(); + } + } + }; - template - struct operator_traits + template typename TDisposable, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> + struct combining_operator_t { - static_assert(std::invocable...>, "Selector is not callable with passed T type"); + RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables; + RPP_NO_UNIQUE_ADDRESS TSelector selector; - using result_type = std::invoke_result_t...>; - }; + template + struct operator_traits + { + static_assert(std::invocable...>, "Selector is not callable with passed T type"); - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using result_type = std::invoke_result_t...>; + }; - template - void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const - { - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); - } - -private: - template - static void subscribe_impl(Observer&& observer, const observable_chain_strategy& observable_strategy, const TSelector& selector, const TObservables&... observables) - { - using ExpectedValue = typename observable_chain_strategy::value_type; - using Disposable = TDisposable...>; + template + using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); - auto locked = disposable.lock(); - locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); - subscribe>(locked, std::index_sequence_for{}, observables...); + template + void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + { + // Need to take ownership over current_thread in case of inner-observables also using it + auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); + observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); + } - observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(locked)}); - } + private: + template + static void subscribe_impl(Observer&& observer, const observable_chain_strategy& observable_strategy, const TSelector& selector, const TObservables&... observables) + { + using ExpectedValue = typename observable_chain_strategy::value_type; + using Disposable = TDisposable...>; - template - static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) - { - (..., observables.subscribe(rpp::observer, TStrategy...>>{disposable})); - } -}; -} \ No newline at end of file + const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); + auto locked = disposable.lock(); + locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); + subscribe>(locked, std::index_sequence_for{}, observables...); + + observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(locked)}); + } + + template + static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) + { + (..., observables.subscribe(rpp::observer, TStrategy...>>{disposable})); + } + }; +} // namespace rpp::operators::details \ No newline at end of file diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp index 7915e2232..10664c5ac 100644 --- a/src/rpp/rpp/operators/zip.hpp +++ b/src/rpp/rpp/operators/zip.hpp @@ -20,119 +20,118 @@ namespace rpp::operators::details { -template -class zip_disposable final : public combining_disposable -{ -public: - explicit zip_disposable(Observer&& observer, const TSelector& selector) - : combining_disposable(std::move(observer)) - , m_selector(selector) + template + class zip_disposable final : public combining_disposable { - } - - const auto& get_selector() const { return m_selector; } + public: + explicit zip_disposable(Observer&& observer, const TSelector& selector) + : combining_disposable(std::move(observer)) + , m_selector(selector) + { + } - auto& get_pendings() { return m_pendings; } + const auto& get_selector() const { return m_selector; } -private: - utils::tuple...> m_pendings{}; + auto& get_pendings() { return m_pendings; } - RPP_NO_UNIQUE_ADDRESS TSelector m_selector; -}; + private: + utils::tuple...> m_pendings{}; -template -struct zip_observer_strategy final - : public combining_observer_strategy> -{ - using combining_observer_strategy>::disposable; + RPP_NO_UNIQUE_ADDRESS TSelector m_selector; + }; - template - void on_next(T&& v) const + template + struct zip_observer_strategy final + : public combining_observer_strategy> { - const auto observer = disposable->get_observer_under_lock(); - disposable->get_pendings().template get().push_back(std::forward(v)); + using combining_observer_strategy>::disposable; - disposable->get_pendings().apply(&apply_impl, disposable, observer); - } + template + void on_next(T&& v) const + { + const auto observer = disposable->get_observer_under_lock(); + disposable->get_pendings().template get().push_back(std::forward(v)); -private: - template - static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, std::deque&... values) - { - if ((!values.empty() && ...)) + disposable->get_pendings().apply(&apply_impl, disposable, observer); + } + + private: + template + static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, std::deque&... values) { - observer->on_next(disposable->get_selector()(std::move(values.front())...)); - (values.pop_front(), ...); + if ((!values.empty() && ...)) + { + observer->on_next(disposable->get_selector()(std::move(values.front())...)); + (values.pop_front(), ...); + } } - } -}; + }; -template -struct zip_t : public combining_operator_t -{ -}; -} + template + struct zip_t : public combining_operator_t + { + }; +} // namespace rpp::operators::details namespace rpp::operators { -/** - * @brief combines emissions from observables and emit single items for each combination based on the results of provided selector - * - * @marble zip_custom_selector - { - source observable : +------1 -2 -3-- ------| - source other_observable : +-5-6-- - ---7 --8---| - operator "zip: x,y =>std::pair{x,y}" : +------{1,5}-{2,6}---{3,7}------| - } - * - * - * @par Performance notes: - * - 1 heap allocation for disposable - * - each value from any observable copied/moved to internal storage - * - mutex acquired every time value obtained - * - * @param selector is applied to current emission of current observable and latests emissions from observables - * @param observables are observables whose emissions would be zipped with current observable - * @warning #include - * - * @ingroup combining_operators - * @see https://reactivex.io/documentation/operators/zip.html - */ -template - requires (!rpp::constraint::observable && (!utils::is_not_template_callable || std::invocable, utils::extract_observable_type_t...>)) -auto zip(TSelector&& selector, TObservable&& observable, TObservables&&... observables) -{ - return details::zip_t, std::decay_t, std::decay_t...>{ - rpp::utils::tuple{std::forward(observable), std::forward(observables)...}, - std::forward(selector) - }; -} - -/** - * @brief combines emissions from observables and emit tuple of items for each combination - * - * @marble zip - { - source observable : +------1 -2 -3-- ------| - source other_observable : +-5-6-- - ---7 --8---| - operator "zip: make_tuple" : +------{1,5}-{2,6}---{3,7}------| - } - * - * - * @par Performance notes: - * - 1 heap allocation for disposable - * - each value from any observable copied/moved to internal storage - * - mutex acquired every time value obtained - * - * @param observables are observables whose emissions would be zipped with current observable - * @warning #include - * - * @ingroup combining_operators - * @see https://reactivex.io/documentation/operators/zip.html - */ -template -auto zip(TObservable&& observable, TObservables&&... observables) -{ - return zip(rpp::utils::pack_to_tuple{}, std::forward(observable), std::forward(observables)...); -} + /** + * @brief combines emissions from observables and emit single items for each combination based on the results of provided selector + * + * @marble zip_custom_selector + { + source observable : +------1 -2 -3-- ------| + source other_observable : +-5-6-- - ---7 --8---| + operator "zip: x,y =>std::pair{x,y}" : +------{1,5}-{2,6}---{3,7}------| + } + * + * + * @par Performance notes: + * - 1 heap allocation for disposable + * - each value from any observable copied/moved to internal storage + * - mutex acquired every time value obtained + * + * @param selector is applied to current emission of current observable and latests emissions from observables + * @param observables are observables whose emissions would be zipped with current observable + * @warning #include + * + * @ingroup combining_operators + * @see https://reactivex.io/documentation/operators/zip.html + */ + template + requires (!rpp::constraint::observable && (!utils::is_not_template_callable || std::invocable, utils::extract_observable_type_t...>)) + auto zip(TSelector&& selector, TObservable&& observable, TObservables&&... observables) + { + return details::zip_t, std::decay_t, std::decay_t...>{ + rpp::utils::tuple{std::forward(observable), std::forward(observables)...}, + std::forward(selector)}; + } + + /** + * @brief combines emissions from observables and emit tuple of items for each combination + * + * @marble zip + { + source observable : +------1 -2 -3-- ------| + source other_observable : +-5-6-- - ---7 --8---| + operator "zip: make_tuple" : +------{1,5}-{2,6}---{3,7}------| + } + * + * + * @par Performance notes: + * - 1 heap allocation for disposable + * - each value from any observable copied/moved to internal storage + * - mutex acquired every time value obtained + * + * @param observables are observables whose emissions would be zipped with current observable + * @warning #include + * + * @ingroup combining_operators + * @see https://reactivex.io/documentation/operators/zip.html + */ + template + auto zip(TObservable&& observable, TObservables&&... observables) + { + return zip(rpp::utils::pack_to_tuple{}, std::forward(observable), std::forward(observables)...); + } } // namespace rpp::operators \ No newline at end of file