From 4f58a84cbad2d50afe549c7f06d29f6deab5cc4b Mon Sep 17 00:00:00 2001 From: Corentin Date: Sat, 10 Jun 2023 12:30:25 +0200 Subject: [PATCH 1/8] Implement flat_map operator --- src/benchmarks/benchmarks.cpp | 16 ++++ src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/flat_map.hpp | 68 ++++++++++++++++ src/rpp/rpp/operators/fwd.hpp | 4 + src/tests/rpp/test_flat_map.cpp | 126 +++++++++++++++++++++++++++++ 5 files changed, 215 insertions(+) create mode 100644 src/rpp/rpp/operators/flat_map.hpp create mode 100644 src/tests/rpp/test_flat_map.cpp diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index ff9a2ba19..a24ab7106 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -297,6 +297,22 @@ int main(int argc, char* argv[]) // NOLINT | rxcpp::operators::subscribe([](int v){ ankerl::nanobench::doNotOptimizeAway(v); }); }); } + SECTION("create+flat_map(just(v*2))+subscribe") + { + TEST_RPP([&]() + { + rpp::source::create([](const auto& obs){ obs.on_next(1); }) + | rpp::operators::flat_map([](int v) { return rpp::source::just(v * 2); }) + | rpp::operators::subscribe([](int v){ ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + TEST_RXCPP([&]() + { + rxcpp::observable<>::create([](const auto& obs){obs.on_next(1);}) + | rxcpp::operators::flat_map([](int v) { return rxcpp::observable<>::just(v * 2); }) + | rxcpp::operators::subscribe([](int v){ ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } }; BENCHMARK("Filtering Operators") diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 20838bb73..52f02c6ac 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -30,6 +30,7 @@ * @ingroup operators */ +#include #include #include #include diff --git a/src/rpp/rpp/operators/flat_map.hpp b/src/rpp/rpp/operators/flat_map.hpp new file mode 100644 index 000000000..3df6a79f9 --- /dev/null +++ b/src/rpp/rpp/operators/flat_map.hpp @@ -0,0 +1,68 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include +#include + +namespace rpp::operators::details +{ + +template +struct flat_map_t +{ + RPP_NO_UNIQUE_ADDRESS Fn m_fn; + + template + requires std::invocable> + auto operator()(TObservable&& observable) const & + { + return std::forward(observable) + | rpp::ops::map(m_fn) + | rpp::ops::merge(); + + } + + template + requires std::invocable> + auto operator()(TObservable&& observable) && + { + return std::forward(observable) + | rpp::ops::map(std::forward(m_fn)) + | rpp::ops::merge(); + } +}; + +} // namespace rpp::operators::details + +namespace rpp::operators +{ + +/** + * @brief Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable + * + * @details Note that flat_map merges the emissions of these Observables, so that they may interleave. + * + * @param a function that returns an observable for each item emitted by the source observable. + * @warning #include + * + * @ingroup transforming_operators + * @see https://reactivex.io/documentation/operators/flatmap.html + */ +template + requires rpp::constraint::observable> +auto flat_map(Fn&& callable) +{ + return details::flat_map_t>{std::forward(callable)}; +} + +} // namespace rpp::operators \ No newline at end of file diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index fcd95b746..9c55cac0e 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -33,6 +33,10 @@ template requires (!utils::is_not_template_callable || !std::same_as>) auto map(Fn&& callable); +template + requires rpp::constraint::observable> +auto flat_map(Fn&& callable); + template requires constraint::observables_of_same_type, std::decay_t...> auto merge_with(TObservable&& observable, TObservables&& ...observables); diff --git a/src/tests/rpp/test_flat_map.cpp b/src/tests/rpp/test_flat_map.cpp new file mode 100644 index 000000000..080f95158 --- /dev/null +++ b/src/tests/rpp/test_flat_map.cpp @@ -0,0 +1,126 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "mock_observer.hpp" + +#include +#include + +TEMPLATE_TEST_CASE("flat_map", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared) +{ + auto mock = mock_observer_strategy(); + SECTION("observable of items") + { + auto obs = rpp::source::just(rpp::schedulers::immediate{}, 1, 2, 3); + + SECTION("subscribe using flat_map") + { + obs | rpp::operators::flat_map([](int v) { return rpp::source::just(v * 2); }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_received_values() == std::vector{ 2, 4, 6 }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + SECTION("subscribe using flat_map with error") + { + obs | rpp::operators::flat_map([](int) { return rpp::source::error(std::make_exception_ptr(std::runtime_error{""})); }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 1); + } + } + SECTION("subscribe using flat_map with empty") + { + obs | rpp::operators::flat_map([](int) { return rpp::source::empty(); }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + SECTION("subscribe using flat_map with never") + { + obs | rpp::operators::flat_map([](int) { return rpp::source::never(); }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 0); + } + } + SECTION("subscribe using flat_map with empty in middle") + { + obs | rpp::operators::flat_map([](int v) { + if (v == 2) + return rpp::source::empty().as_dynamic(); + return rpp::source::just(v).as_dynamic(); + }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_received_values() == std::vector{ 1, 3 }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } + SECTION("subscribe using flat_map with error in middle") + { + obs | rpp::operators::flat_map([](int v) { + if (v == 2) + return rpp::source::error(std::make_exception_ptr(std::runtime_error{""})).as_dynamic(); + return rpp::source::just(v).as_dynamic(); + }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 1); + } + } + } + /* + SECTION("flat_map doesn't produce extra copies") + { + copy_count_tracker verifier{}; + auto obs = rpp::source::just(std::move(verifier)) + | rpp::ops::flat_map([](copy_count_tracker&& verifier) { return rpp::source::just(std::move(verifier)); }); + SECTION("subscribe") + { + obs.subscribe([](const auto&){}); + SECTION("no extra copies") + { + REQUIRE(verifier.get_copy_count() == ??); + REQUIRE(verifier.get_move_count() == ??); + } + } + } + */ +} \ No newline at end of file From a748a6f2997d75f16f5e5998d75f83696543da7d Mon Sep 17 00:00:00 2001 From: Corentin Date: Sat, 10 Jun 2023 18:24:53 +0200 Subject: [PATCH 2/8] Address comments --- src/benchmarks/benchmarks.cpp | 4 ++-- src/rpp/rpp/operators/flat_map.hpp | 12 +++++----- src/rpp/rpp/operators/fwd.hpp | 2 +- src/tests/rpp/test_flat_map.cpp | 38 +++++++++++++++++++++++------- 4 files changed, 39 insertions(+), 17 deletions(-) diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index a24ab7106..f609bc7dd 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -302,14 +302,14 @@ int main(int argc, char* argv[]) // NOLINT TEST_RPP([&]() { rpp::source::create([](const auto& obs){ obs.on_next(1); }) - | rpp::operators::flat_map([](int v) { return rpp::source::just(v * 2); }) + | rpp::operators::flat_map([](int v) { return rpp::source::create([v](const auto& obs){ obs.on_next(v * 2); }); }) | rpp::operators::subscribe([](int v){ ankerl::nanobench::doNotOptimizeAway(v); }); }); TEST_RXCPP([&]() { rxcpp::observable<>::create([](const auto& obs){obs.on_next(1);}) - | rxcpp::operators::flat_map([](int v) { return rxcpp::observable<>::just(v * 2); }) + | rxcpp::operators::flat_map([](int v) { return rxcpp::observable<>::create([v](const auto& obs){ obs.on_next(v * 2); }); }) | rxcpp::operators::subscribe([](int v){ ankerl::nanobench::doNotOptimizeAway(v); }); }); } diff --git a/src/rpp/rpp/operators/flat_map.hpp b/src/rpp/rpp/operators/flat_map.hpp index 3df6a79f9..2320aabfd 100644 --- a/src/rpp/rpp/operators/flat_map.hpp +++ b/src/rpp/rpp/operators/flat_map.hpp @@ -22,8 +22,8 @@ struct flat_map_t { RPP_NO_UNIQUE_ADDRESS Fn m_fn; - template - requires std::invocable> + template> + requires (std::invocable && rpp::constraint::observable>) auto operator()(TObservable&& observable) const & { return std::forward(observable) @@ -32,12 +32,12 @@ struct flat_map_t } - template - requires std::invocable> + template> + requires (std::invocable && rpp::constraint::observable>) auto operator()(TObservable&& observable) && { return std::forward(observable) - | rpp::ops::map(std::forward(m_fn)) + | rpp::ops::map(std::move(m_fn)) | rpp::ops::merge(); } }; @@ -59,7 +59,7 @@ namespace rpp::operators * @see https://reactivex.io/documentation/operators/flatmap.html */ template - requires rpp::constraint::observable> + requires (!utils::is_not_template_callable || rpp::constraint::observable>) auto flat_map(Fn&& callable) { return details::flat_map_t>{std::forward(callable)}; diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 9c55cac0e..b80850c0c 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -34,7 +34,7 @@ template auto map(Fn&& callable); template - requires rpp::constraint::observable> + requires (!utils::is_not_template_callable || rpp::constraint::observable>) auto flat_map(Fn&& callable); template diff --git a/src/tests/rpp/test_flat_map.cpp b/src/tests/rpp/test_flat_map.cpp index 080f95158..3adb988f7 100644 --- a/src/tests/rpp/test_flat_map.cpp +++ b/src/tests/rpp/test_flat_map.cpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -105,22 +106,43 @@ TEMPLATE_TEST_CASE("flat_map", "", rpp::memory_model::use_stack, rpp::memory_mod CHECK(mock.get_on_error_count() == 1); } } + SECTION("subscribe using flat_map with templated lambda") + { + obs | rpp::operators::flat_map([](auto v) { return rpp::source::just(v * 2); }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_on_completed_count() == 1); + } + } } - /* +} + +TEST_CASE("flat_map copies/moves") +{ SECTION("flat_map doesn't produce extra copies") { copy_count_tracker verifier{}; - auto obs = rpp::source::just(std::move(verifier)) - | rpp::ops::flat_map([](copy_count_tracker&& verifier) { return rpp::source::just(std::move(verifier)); }); - SECTION("subscribe") + auto obs = rpp::source::create([verifier = std::move(verifier)](const auto& obs) { obs.on_next(verifier); }) + | rpp::ops::map([](copy_count_tracker verifier) { return std::move(verifier); }) // copy from source to map + | rpp::ops::flat_map([](copy_count_tracker&& verifier) { // no copy + return rpp::source::create([verifier = std::move(verifier)](const auto& obs) { obs.on_next(std::move(verifier)); }); + }); + SECTION("first subscribe") + { + obs.subscribe([](const auto&){}); // subscribe by const lvalue reference so no copy + SECTION("no extra copies") + { + REQUIRE(verifier.get_copy_count() == 1); // only one copy from source to first operator + } + } + SECTION("second subscribe") { - obs.subscribe([](const auto&){}); + obs.subscribe([](auto){}); // subscribe by value so one additional copy SECTION("no extra copies") { - REQUIRE(verifier.get_copy_count() == ??); - REQUIRE(verifier.get_move_count() == ??); + REQUIRE(verifier.get_copy_count() == 1 + 1); } } } - */ } \ No newline at end of file From bd37f38cf450c7c23513e041c8ce66a61cfca0d9 Mon Sep 17 00:00:00 2001 From: Corentin Date: Sun, 11 Jun 2023 16:21:00 +0200 Subject: [PATCH 3/8] Address comments --- src/rpp/rpp/operators/flat_map.hpp | 10 ++++++---- src/tests/rpp/test_flat_map.cpp | 19 +++++-------------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/rpp/rpp/operators/flat_map.hpp b/src/rpp/rpp/operators/flat_map.hpp index 2320aabfd..176eb5723 100644 --- a/src/rpp/rpp/operators/flat_map.hpp +++ b/src/rpp/rpp/operators/flat_map.hpp @@ -22,8 +22,9 @@ struct flat_map_t { RPP_NO_UNIQUE_ADDRESS Fn m_fn; - template> - requires (std::invocable && rpp::constraint::observable>) + template + requires (std::invocable> + && rpp::constraint::observable>>) auto operator()(TObservable&& observable) const & { return std::forward(observable) @@ -32,8 +33,9 @@ struct flat_map_t } - template> - requires (std::invocable && rpp::constraint::observable>) + template + requires (std::invocable> + && rpp::constraint::observable>>) auto operator()(TObservable&& observable) && { return std::forward(observable) diff --git a/src/tests/rpp/test_flat_map.cpp b/src/tests/rpp/test_flat_map.cpp index 3adb988f7..b96fbb54c 100644 --- a/src/tests/rpp/test_flat_map.cpp +++ b/src/tests/rpp/test_flat_map.cpp @@ -32,9 +32,9 @@ TEMPLATE_TEST_CASE("flat_map", "", rpp::memory_model::use_stack, rpp::memory_mod { auto obs = rpp::source::just(rpp::schedulers::immediate{}, 1, 2, 3); - SECTION("subscribe using flat_map") + SECTION("subscribe using flat_map with templated lambda") { - obs | rpp::operators::flat_map([](int v) { return rpp::source::just(v * 2); }) + obs | rpp::operators::flat_map([](auto v) { return rpp::source::just(v * 2); }) | rpp::ops::subscribe(mock.get_observer()); SECTION("observer obtains values from underlying observables") { @@ -106,15 +106,6 @@ TEMPLATE_TEST_CASE("flat_map", "", rpp::memory_model::use_stack, rpp::memory_mod CHECK(mock.get_on_error_count() == 1); } } - SECTION("subscribe using flat_map with templated lambda") - { - obs | rpp::operators::flat_map([](auto v) { return rpp::source::just(v * 2); }) - | rpp::ops::subscribe(mock.get_observer()); - SECTION("observer obtains values from underlying observables") - { - CHECK(mock.get_on_completed_count() == 1); - } - } } } @@ -126,7 +117,7 @@ TEST_CASE("flat_map copies/moves") auto obs = rpp::source::create([verifier = std::move(verifier)](const auto& obs) { obs.on_next(verifier); }) | rpp::ops::map([](copy_count_tracker verifier) { return std::move(verifier); }) // copy from source to map | rpp::ops::flat_map([](copy_count_tracker&& verifier) { // no copy - return rpp::source::create([verifier = std::move(verifier)](const auto& obs) { obs.on_next(std::move(verifier)); }); + return rpp::source::create([&](const auto& obs) { obs.on_next(std::move(verifier)); }); }); SECTION("first subscribe") { @@ -138,10 +129,10 @@ TEST_CASE("flat_map copies/moves") } SECTION("second subscribe") { - obs.subscribe([](auto){}); // subscribe by value so one additional copy + obs.subscribe([](auto){}); // subscribe by value doesn't copy either as value is moved upstream SECTION("no extra copies") { - REQUIRE(verifier.get_copy_count() == 1 + 1); + REQUIRE(verifier.get_copy_count() == 1); } } } From 47e25ddaccb2da4d390f564160543151670cb38e Mon Sep 17 00:00:00 2001 From: Corentin Date: Sun, 11 Jun 2023 16:23:54 +0200 Subject: [PATCH 4/8] Remove whitespace --- src/rpp/rpp/operators/flat_map.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rpp/rpp/operators/flat_map.hpp b/src/rpp/rpp/operators/flat_map.hpp index 176eb5723..d2fa58a1d 100644 --- a/src/rpp/rpp/operators/flat_map.hpp +++ b/src/rpp/rpp/operators/flat_map.hpp @@ -30,7 +30,6 @@ struct flat_map_t return std::forward(observable) | rpp::ops::map(m_fn) | rpp::ops::merge(); - } template From 9e99db8de96afe89d27e104dc36682d958ce989c Mon Sep 17 00:00:00 2001 From: Corentin Date: Sun, 11 Jun 2023 16:26:18 +0200 Subject: [PATCH 5/8] Added comments --- src/tests/rpp/test_flat_map.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/rpp/test_flat_map.cpp b/src/tests/rpp/test_flat_map.cpp index b96fbb54c..797d94e61 100644 --- a/src/tests/rpp/test_flat_map.cpp +++ b/src/tests/rpp/test_flat_map.cpp @@ -114,10 +114,10 @@ TEST_CASE("flat_map copies/moves") SECTION("flat_map doesn't produce extra copies") { copy_count_tracker verifier{}; - auto obs = rpp::source::create([verifier = std::move(verifier)](const auto& obs) { obs.on_next(verifier); }) + auto obs = rpp::source::create([&](const auto& obs) { obs.on_next(verifier); }) // pass by value as obs is subscribed multiple times | rpp::ops::map([](copy_count_tracker verifier) { return std::move(verifier); }) // copy from source to map | rpp::ops::flat_map([](copy_count_tracker&& verifier) { // no copy - return rpp::source::create([&](const auto& obs) { obs.on_next(std::move(verifier)); }); + return rpp::source::create([&](const auto& obs) { obs.on_next(std::move(verifier)); }); // move verifier }); SECTION("first subscribe") { From aba97faed02ad0736e1b00a0b3f4e722de299522 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 11 Jun 2023 18:40:25 +0300 Subject: [PATCH 6/8] add marble --- src/rpp/rpp/operators/flat_map.hpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/flat_map.hpp b/src/rpp/rpp/operators/flat_map.hpp index d2fa58a1d..0b6fba7f6 100644 --- a/src/rpp/rpp/operators/flat_map.hpp +++ b/src/rpp/rpp/operators/flat_map.hpp @@ -51,6 +51,13 @@ namespace rpp::operators /** * @brief Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable * + * @marble flat_map + { + source observable : +--1--2--3--| + operator "flat_map: x=>just(x,x+1)" : +--12-23-34-| + } + * + * @details Actually it makes `map(callable)` and then `merge`. * @details Note that flat_map merges the emissions of these Observables, so that they may interleave. * * @param a function that returns an observable for each item emitted by the source observable. @@ -66,4 +73,4 @@ auto flat_map(Fn&& callable) return details::flat_map_t>{std::forward(callable)}; } -} // namespace rpp::operators \ No newline at end of file +} // namespace rpp::operators From 627a88e9026e7a9d3550d9858761052ab41baa9d Mon Sep 17 00:00:00 2001 From: Corentin Date: Sun, 11 Jun 2023 18:17:20 +0200 Subject: [PATCH 7/8] Address comments --- src/tests/rpp/test_flat_map.cpp | 44 +++++++++++---------------------- 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/src/tests/rpp/test_flat_map.cpp b/src/tests/rpp/test_flat_map.cpp index 797d94e61..ae93cc134 100644 --- a/src/tests/rpp/test_flat_map.cpp +++ b/src/tests/rpp/test_flat_map.cpp @@ -91,48 +91,34 @@ TEMPLATE_TEST_CASE("flat_map", "", rpp::memory_model::use_stack, rpp::memory_mod CHECK(mock.get_on_error_count() == 0); } } - SECTION("subscribe using flat_map with error in middle") + SECTION("subscribe using flat_map with never in middle") { obs | rpp::operators::flat_map([](int v) { if (v == 2) - return rpp::source::error(std::make_exception_ptr(std::runtime_error{""})).as_dynamic(); + return rpp::source::never().as_dynamic(); return rpp::source::just(v).as_dynamic(); }) | rpp::ops::subscribe(mock.get_observer()); SECTION("observer obtains values from underlying observables") { - CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_received_values() == std::vector{ 1, 3 }); CHECK(mock.get_on_completed_count() == 0); - CHECK(mock.get_on_error_count() == 1); - } - } - } -} - -TEST_CASE("flat_map copies/moves") -{ - SECTION("flat_map doesn't produce extra copies") - { - copy_count_tracker verifier{}; - auto obs = rpp::source::create([&](const auto& obs) { obs.on_next(verifier); }) // pass by value as obs is subscribed multiple times - | rpp::ops::map([](copy_count_tracker verifier) { return std::move(verifier); }) // copy from source to map - | rpp::ops::flat_map([](copy_count_tracker&& verifier) { // no copy - return rpp::source::create([&](const auto& obs) { obs.on_next(std::move(verifier)); }); // move verifier - }); - SECTION("first subscribe") - { - obs.subscribe([](const auto&){}); // subscribe by const lvalue reference so no copy - SECTION("no extra copies") - { - REQUIRE(verifier.get_copy_count() == 1); // only one copy from source to first operator + CHECK(mock.get_on_error_count() == 0); } } - SECTION("second subscribe") + SECTION("subscribe using flat_map with error in middle") { - obs.subscribe([](auto){}); // subscribe by value doesn't copy either as value is moved upstream - SECTION("no extra copies") + obs | rpp::operators::flat_map([](int v) { + if (v == 2) + return rpp::source::error(std::make_exception_ptr(std::runtime_error{""})).as_dynamic(); + return rpp::source::just(v).as_dynamic(); + }) + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") { - REQUIRE(verifier.get_copy_count() == 1); + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + CHECK(mock.get_on_error_count() == 1); } } } From 45e0ec6adbb0fce6ab7c8e51a02f7a003cbb46f9 Mon Sep 17 00:00:00 2001 From: Corentin Date: Sun, 11 Jun 2023 18:55:31 +0200 Subject: [PATCH 8/8] Increase code coverage --- src/tests/rpp/test_flat_map.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/tests/rpp/test_flat_map.cpp b/src/tests/rpp/test_flat_map.cpp index ae93cc134..b9c601e3c 100644 --- a/src/tests/rpp/test_flat_map.cpp +++ b/src/tests/rpp/test_flat_map.cpp @@ -43,6 +43,18 @@ TEMPLATE_TEST_CASE("flat_map", "", rpp::memory_model::use_stack, rpp::memory_mod CHECK(mock.get_on_error_count() == 0); } } + SECTION("subscribe using flat_map with templated lambda and pass operator by variable") + { + const auto multiply_by_two = rpp::operators::flat_map([](auto v) { return rpp::source::just(v * 2); }); + obs | multiply_by_two + | rpp::ops::subscribe(mock.get_observer()); + SECTION("observer obtains values from underlying observables") + { + CHECK(mock.get_received_values() == std::vector{ 2, 4, 6 }); + CHECK(mock.get_on_completed_count() == 1); + CHECK(mock.get_on_error_count() == 0); + } + } SECTION("subscribe using flat_map with error") { obs | rpp::operators::flat_map([](int) { return rpp::source::error(std::make_exception_ptr(std::runtime_error{""})); })