diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index ff9a2ba19..f609bc7dd 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -297,6 +297,22 @@ int main(int argc, char* argv[]) // NOLINT | rxcpp::operators::subscribe([](int v){ ankerl::nanobench::doNotOptimizeAway(v); }); }); } + SECTION("create+flat_map(just(v*2))+subscribe") + { + TEST_RPP([&]() + { + rpp::source::create([](const auto& obs){ obs.on_next(1); }) + | rpp::operators::flat_map([](int v) { return rpp::source::create([v](const auto& obs){ obs.on_next(v * 2); }); }) + | rpp::operators::subscribe([](int v){ ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + TEST_RXCPP([&]() + { + rxcpp::observable<>::create([](const auto& obs){obs.on_next(1);}) + | rxcpp::operators::flat_map([](int v) { return rxcpp::observable<>::create([v](const auto& obs){ obs.on_next(v * 2); }); }) + | rxcpp::operators::subscribe([](int v){ ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } }; BENCHMARK("Filtering Operators") diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 20838bb73..52f02c6ac 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -30,6 +30,7 @@ * @ingroup operators */ +#include #include #include #include diff --git a/src/rpp/rpp/operators/flat_map.hpp b/src/rpp/rpp/operators/flat_map.hpp new file mode 100644 index 000000000..0b6fba7f6 --- /dev/null +++ b/src/rpp/rpp/operators/flat_map.hpp @@ -0,0 +1,76 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include +#include + +namespace rpp::operators::details +{ + +template +struct flat_map_t +{ + RPP_NO_UNIQUE_ADDRESS Fn m_fn; + + template + requires (std::invocable> + && rpp::constraint::observable>>) + auto operator()(TObservable&& observable) const & + { + return std::forward(observable) + | rpp::ops::map(m_fn) + | rpp::ops::merge(); + } + + template + requires (std::invocable> + && rpp::constraint::observable>>) + auto operator()(TObservable&& observable) && + { + return std::forward(observable) + | rpp::ops::map(std::move(m_fn)) + | rpp::ops::merge(); + } +}; + +} // namespace rpp::operators::details + +namespace rpp::operators +{ + +/** + * @brief Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable + * + * @marble flat_map + { + source observable : +--1--2--3--| + operator "flat_map: x=>just(x,x+1)" : +--12-23-34-| + } + * + * @details Actually it makes `map(callable)` and then `merge`. + * @details Note that flat_map merges the emissions of these Observables, so that they may interleave. + * + * @param a function that returns an observable for each item emitted by the source observable. + * @warning #include + * + * @ingroup transforming_operators + * @see https://reactivex.io/documentation/operators/flatmap.html + */ +template + requires (!utils::is_not_template_callable || rpp::constraint::observable>) +auto flat_map(Fn&& callable) +{ + return details::flat_map_t>{std::forward(callable)}; +} + +} // namespace rpp::operators diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index fcd95b746..b80850c0c 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -33,6 +33,10 @@ template requires (!utils::is_not_template_callable || !std::same_as>) auto map(Fn&& callable); +template + requires (!utils::is_not_template_callable || rpp::constraint::observable>) +auto flat_map(Fn&& callable); + template requires constraint::observables_of_same_type, std::decay_t...> auto merge_with(TObservable&& observable, TObservables&& ...observables); diff --git a/src/tests/rpp/test_flat_map.cpp b/src/tests/rpp/test_flat_map.cpp new file mode 100644 index 000000000..b9c601e3c --- /dev/null +++ b/src/tests/rpp/test_flat_map.cpp @@ -0,0 +1,137 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "mock_observer.hpp" + +#include +#include + +TEMPLATE_TEST_CASE("flat_map", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared) +{ + auto mock = mock_observer_strategy(); + SECTION("observable of items") + { + auto obs = rpp::source::just(rpp::schedulers::immediate{}, 1, 2, 3); + + SECTION("subscribe using flat_map with templated lambda") + { + obs | rpp::operators::flat_map([](auto v) { return rpp::source::just(v * 2); }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_received_values() == std::vector{ 2, 4, 6 }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + SECTION("subscribe using flat_map with templated lambda and pass operator by variable") + { + const auto multiply_by_two = rpp::operators::flat_map([](auto v) { return rpp::source::just(v * 2); }); + obs | multiply_by_two + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_received_values() == std::vector{ 2, 4, 6 }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + SECTION("subscribe using flat_map with error") + { + obs | rpp::operators::flat_map([](int) { return rpp::source::error(std::make_exception_ptr(std::runtime_error{""})); }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 1); + } + } + SECTION("subscribe using flat_map with empty") + { + obs | rpp::operators::flat_map([](int) { return rpp::source::empty(); }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + SECTION("subscribe using flat_map with never") + { + obs | rpp::operators::flat_map([](int) { return rpp::source::never(); }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 0); + } + } + SECTION("subscribe using flat_map with empty in middle") + { + obs | rpp::operators::flat_map([](int v) { + if (v == 2) + return rpp::source::empty().as_dynamic(); + return rpp::source::just(v).as_dynamic(); + }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_received_values() == std::vector{ 1, 3 }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + SECTION("subscribe using flat_map with never in middle") + { + obs | rpp::operators::flat_map([](int v) { + if (v == 2) + return rpp::source::never().as_dynamic(); + return rpp::source::just(v).as_dynamic(); + }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_received_values() == std::vector{ 1, 3 }); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 0); + } + } + SECTION("subscribe using flat_map with error in middle") + { + obs | rpp::operators::flat_map([](int v) { + if (v == 2) + return rpp::source::error(std::make_exception_ptr(std::runtime_error{""})).as_dynamic(); + return rpp::source::just(v).as_dynamic(); + }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 1); + } + } + } +} \ No newline at end of file