diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index ea9a6e5e0..af3ca7490 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -310,6 +310,21 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) | rxcpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); }); } + + SECTION("immediate_just(1) + zip(immediate_just(2)) + subscribe") + { + TEST_RPP([&]() { + rpp::immediate_just(1) + | rpp::operators::zip(rpp::immediate_just(2)) + | rpp::operators::subscribe([](auto&& v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + TEST_RXCPP([&]() { + rxcpp::immediate_just(1) + | rxcpp::operators::zip(rxcpp::immediate_just(2)) + | rxcpp::operators::subscribe>([](auto&& v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } } // BENCHMARK("Combining Operators") BENCHMARK("Conditional Operators") diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 0a1255d85..3ec4ec6ba 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -80,6 +80,7 @@ #include #include #include +#include /** * @defgroup utility_operators Utility Operators diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 0ac197dfa..e712f6628 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -13,59 +13,36 @@ #include #include -#include +#include #include -#include -#include - -#include namespace rpp::operators::details { template - class combine_latest_disposable final : public composite_disposable + class combine_latest_disposable final : public combining_disposable { public: explicit combine_latest_disposable(Observer&& observer, const TSelector& selector) - : observer_with_mutex{std::move(observer)} - , selector{selector} + : combining_disposable(std::move(observer)) + , m_selector(selector) { } - pointer_under_lock get_observer_under_lock() { return pointer_under_lock{observer_with_mutex}; } - - rpp::utils::tuple...>& get_values() { return values; } + const auto& get_selector() const { return m_selector; } - const TSelector& get_selector() const { return selector; } - - bool decrement_on_completed() - { - // just need atomicity, not guarding anything - return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; - } + auto& get_values() { return m_values; } private: - value_with_mutex observer_with_mutex{}; - rpp::utils::tuple...> values{}; - RPP_NO_UNIQUE_ADDRESS TSelector selector; + rpp::utils::tuple...> m_values{}; - std::atomic_size_t m_on_completed_needed{sizeof...(Args)}; + RPP_NO_UNIQUE_ADDRESS TSelector m_selector; }; template - struct combine_latest_observer_strategy + struct combine_latest_observer_strategy final + : public combining_observer_strategy> { - std::shared_ptr> disposable{}; - - void set_upstream(const rpp::disposable_wrapper& d) const - { - disposable->add(d); - } - - bool is_disposed() const - { - return disposable->is_disposed(); - } + using combining_observer_strategy>::disposable; template void on_next(T&& v) const @@ -74,73 +51,21 @@ namespace rpp::operators::details const auto observer = disposable->get_observer_under_lock(); disposable->get_values().template get().emplace(std::forward(v)); - disposable->get_values().apply([this, &observer](const std::optional&... vals) { - if ((vals.has_value() && ...)) - observer->on_next(disposable->get_selector()(vals.value()...)); - }); - } - - void on_error(const std::exception_ptr& err) const - { - disposable->dispose(); - disposable->get_observer_under_lock()->on_error(err); + disposable->get_values().apply(&apply_impl, disposable, observer); } - void on_completed() const + private: + template + static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, const std::optional&... vals) { - if (disposable->decrement_on_completed()) - { - disposable->dispose(); - disposable->get_observer_under_lock()->on_completed(); - } + if ((vals.has_value() && ...)) + observer->on_next(disposable->get_selector()(vals.value()...)); } }; template - struct combine_latest_t + struct combine_latest_t : public combining_operator_t { - RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables; - RPP_NO_UNIQUE_ADDRESS TSelector selector; - - template - struct operator_traits - { - static_assert(std::invocable...>, "Selector is not callable with passed T type"); - - using result_type = std::invoke_result_t...>; - }; - - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - - template - void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const - { - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); - } - - private: - template - static void subscribe_impl(Observer&& observer, const observable_chain_strategy& observable_strategy, const TSelector& selector, const TObservables&... observables) - { - using ExpectedValue = typename observable_chain_strategy::value_type; - using Disposable = combine_latest_disposable...>; - - const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); - auto locked = disposable.lock(); - locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); - subscribe>(locked, std::index_sequence_for{}, observables...); - - observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(locked)}); - } - - template - static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) - { - (..., observables.subscribe(rpp::observer, combine_latest_observer_strategy...>>{disposable})); - } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp new file mode 100644 index 000000000..c08a106f7 --- /dev/null +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -0,0 +1,125 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include + +namespace rpp::operators::details +{ + template + class combining_disposable : public composite_disposable + { + public: + explicit combining_disposable(Observer&& observer) + : m_observer_with_mutex{std::move(observer)} + { + } + + pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer_with_mutex}; } + + bool decrement_on_completed() + { + // just need atomicity, not guarding anything + return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; + } + + private: + value_with_mutex m_observer_with_mutex{}; + + std::atomic_size_t m_on_completed_needed{sizeof...(Args)}; + }; + + template + struct combining_observer_strategy + { + std::shared_ptr disposable{}; + + void set_upstream(const rpp::disposable_wrapper& d) const + { + disposable->add(d); + } + + bool is_disposed() const + { + return disposable->is_disposed(); + } + + void on_error(const std::exception_ptr& err) const + { + disposable->dispose(); + disposable->get_observer_under_lock()->on_error(err); + } + + void on_completed() const + { + if (disposable->decrement_on_completed()) + { + disposable->dispose(); + disposable->get_observer_under_lock()->on_completed(); + } + } + }; + + template typename TDisposable, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> + struct combining_operator_t + { + RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables; + RPP_NO_UNIQUE_ADDRESS TSelector selector; + + template + struct operator_traits + { + static_assert(std::invocable...>, "Selector is not callable with passed T type"); + + using result_type = std::invoke_result_t...>; + }; + + template + using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + + template + void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + { + // Need to take ownership over current_thread in case of inner-observables also using it + auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); + observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); + } + + private: + template + static void subscribe_impl(Observer&& observer, const observable_chain_strategy& observable_strategy, const TSelector& selector, const TObservables&... observables) + { + using ExpectedValue = typename observable_chain_strategy::value_type; + using Disposable = TDisposable...>; + + const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); + auto locked = disposable.lock(); + locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); + subscribe>(locked, std::index_sequence_for{}, observables...); + + observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(locked)}); + } + + template + static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) + { + (..., observables.subscribe(rpp::observer, TStrategy...>>{disposable})); + } + }; +} // namespace rpp::operators::details \ No newline at end of file diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp new file mode 100644 index 000000000..10664c5ac --- /dev/null +++ b/src/rpp/rpp/operators/zip.hpp @@ -0,0 +1,137 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include +#include + +#include + +namespace rpp::operators::details +{ + template + class zip_disposable final : public combining_disposable + { + public: + explicit zip_disposable(Observer&& observer, const TSelector& selector) + : combining_disposable(std::move(observer)) + , m_selector(selector) + { + } + + const auto& get_selector() const { return m_selector; } + + auto& get_pendings() { return m_pendings; } + + private: + utils::tuple...> m_pendings{}; + + RPP_NO_UNIQUE_ADDRESS TSelector m_selector; + }; + + template + struct zip_observer_strategy final + : public combining_observer_strategy> + { + using combining_observer_strategy>::disposable; + + template + void on_next(T&& v) const + { + const auto observer = disposable->get_observer_under_lock(); + disposable->get_pendings().template get().push_back(std::forward(v)); + + disposable->get_pendings().apply(&apply_impl, disposable, observer); + } + + private: + template + static void apply_impl(const TDisposable& disposable, const pointer_under_lock& observer, std::deque&... values) + { + if ((!values.empty() && ...)) + { + observer->on_next(disposable->get_selector()(std::move(values.front())...)); + (values.pop_front(), ...); + } + } + }; + + template + struct zip_t : public combining_operator_t + { + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + /** + * @brief combines emissions from observables and emit single items for each combination based on the results of provided selector + * + * @marble zip_custom_selector + { + source observable : +------1 -2 -3-- ------| + source other_observable : +-5-6-- - ---7 --8---| + operator "zip: x,y =>std::pair{x,y}" : +------{1,5}-{2,6}---{3,7}------| + } + * + * + * @par Performance notes: + * - 1 heap allocation for disposable + * - each value from any observable copied/moved to internal storage + * - mutex acquired every time value obtained + * + * @param selector is applied to current emission of current observable and latests emissions from observables + * @param observables are observables whose emissions would be zipped with current observable + * @warning #include + * + * @ingroup combining_operators + * @see https://reactivex.io/documentation/operators/zip.html + */ + template + requires (!rpp::constraint::observable && (!utils::is_not_template_callable || std::invocable, utils::extract_observable_type_t...>)) + auto zip(TSelector&& selector, TObservable&& observable, TObservables&&... observables) + { + return details::zip_t, std::decay_t, std::decay_t...>{ + rpp::utils::tuple{std::forward(observable), std::forward(observables)...}, + std::forward(selector)}; + } + + /** + * @brief combines emissions from observables and emit tuple of items for each combination + * + * @marble zip + { + source observable : +------1 -2 -3-- ------| + source other_observable : +-5-6-- - ---7 --8---| + operator "zip: make_tuple" : +------{1,5}-{2,6}---{3,7}------| + } + * + * + * @par Performance notes: + * - 1 heap allocation for disposable + * - each value from any observable copied/moved to internal storage + * - mutex acquired every time value obtained + * + * @param observables are observables whose emissions would be zipped with current observable + * @warning #include + * + * @ingroup combining_operators + * @see https://reactivex.io/documentation/operators/zip.html + */ + template + auto zip(TObservable&& observable, TObservables&&... observables) + { + return zip(rpp::utils::pack_to_tuple{}, std::forward(observable), std::forward(observables)...); + } +} // namespace rpp::operators \ No newline at end of file diff --git a/src/tests/rpp/test_zip.cpp b/src/tests/rpp/test_zip.cpp new file mode 100644 index 000000000..a3e3ec27c --- /dev/null +++ b/src/tests/rpp/test_zip.cpp @@ -0,0 +1,188 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" +#include "mock_observer.hpp" + +TEST_CASE("zip zips items") +{ + SECTION("observable of -1-2-3-| zip with -4-5-6-| on immediate scheduler") + { + auto mock = mock_observer_strategy>{}; + rpp::source::just(rpp::schedulers::immediate{}, 1, 2, 3) + | rpp::ops::zip(rpp::source::just(rpp::schedulers::immediate{}, 4, 5, 6)) + | rpp::ops::subscribe(mock); + + CHECK(mock.get_received_values() == std::vector>{ + std::make_tuple(1, 4), + std::make_tuple(2, 5), + std::make_tuple(3, 6), + }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + + SECTION("observable of -1-2-3-| zip with -4-5-6-| on current_thread") + { + auto mock = mock_observer_strategy>{}; + + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) // source 1 + | rpp::ops::zip(rpp::source::just(rpp::schedulers::current_thread{}, 4, 5, 6)) // source 2 + | rpp::ops::subscribe(mock); + + // Above stream should output in such sequence + // source 1: -1---2---3-| + // source 2: -4---5---6-| + + CHECK(mock.get_received_values() == std::vector>{ + std::make_tuple(1, 4), + std::make_tuple(2, 5), + std::make_tuple(3, 6), + }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + + SECTION("observable of -1-2-3-| zip with two other sources on current_thread") + { + auto mock = mock_observer_strategy>{}; + + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) // source 1 + | rpp::ops::zip( + rpp::source::just(rpp::schedulers::current_thread{}, 4, 5, 6), // source 2 + rpp::source::just(rpp::schedulers::current_thread{}, 7, 8, 9)) // source 3 + | rpp::ops::subscribe(mock); + + // Above stream should output in such sequence + // source 1: --1---2---3-| + // source 2: -4---5---6-| + // source 3: --7---8---9-| + + CHECK(mock.get_received_values() == std::vector>{ + std::make_tuple(1, 4, 7), + std::make_tuple(2, 5, 8), + std::make_tuple(3, 6, 9), + }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } +} + +TEST_CASE("zip waits for all emissions") +{ + SECTION("observable of -1-2-3-| and never") + { + auto mock = mock_observer_strategy>{}; + rpp::source::just(1, 2, 3) + | rpp::ops::zip(rpp::source::never()) + | rpp::ops::subscribe(mock); + + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 0); + } +} + +TEST_CASE("zip forwards errors") +{ + SECTION("observable of -1-2-3-| combines with error") + { + auto mock = mock_observer_strategy>{}; + rpp::source::just(1, 2, 3) + | rpp::ops::zip(rpp::source::error(std::make_exception_ptr(std::runtime_error{""}))) + | rpp::ops::subscribe(mock); + + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 1); + } +} + +TEST_CASE("zip handles race conditions") +{ + SECTION("source observable in current thread pairs with error in other thread") + { + std::atomic_bool on_error_called{false}; + auto subject = rpp::subjects::publish_subject{}; + + SECTION("subscribe on it") + { + SECTION("on_error can't interleave with on_next") + { + rpp::source::just(1, 1, 1) + | rpp::ops::zip(rpp::source::concat(rpp::source::just(2), subject.get_observable())) + | rpp::ops::as_blocking() + | rpp::ops::subscribe([&](auto&&) { + CHECK(!on_error_called); + std::thread{[&] + { + subject.get_observer().on_error(std::exception_ptr{}); + }}.detach(); + std::this_thread::sleep_for(std::chrono::seconds{1}); + CHECK(!on_error_called); }, + [&](auto) { on_error_called = true; }); + + CHECK(on_error_called); + } + } + } +} + +TEST_CASE("zip doesn't produce extra copies") +{ + SECTION("send value by copy") + { + copy_count_tracker verifier{}; + auto obs = verifier.get_observable() + | rpp::ops::zip( + [](copy_count_tracker&& verifier, auto&&) { return std::move(verifier); }, + rpp::source::just(1)); + obs.subscribe([](copy_count_tracker) {}); // NOLINT + REQUIRE(verifier.get_copy_count() == 1); // 1 copy to internal state + REQUIRE(verifier.get_move_count() == 2); // 1 move to selector + 1 move to final subscriber + } + + SECTION("send value by move") + { + copy_count_tracker verifier{}; + auto obs = verifier.get_observable_for_move() + | rpp::ops::zip( + [](copy_count_tracker&& verifier, auto&&) { return std::move(verifier); }, + rpp::source::just(1)); + obs.subscribe([](copy_count_tracker) {}); // NOLINT + REQUIRE(verifier.get_copy_count() == 0); + REQUIRE(verifier.get_move_count() == 3); // 1 move to interal state + 1 move to selector + 1 move to final subscriber + } +} + +TEST_CASE("zip satisfies disposable contracts") +{ + auto observable_disposable = rpp::composite_disposable_wrapper::make(); + { + auto observable = observable_with_disposable(observable_disposable); + + test_operator_with_disposable(rpp::ops::zip(observable)); + } + + CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); +} \ No newline at end of file