From 1dad21ea4a8b004d47af4fdab0eb5b1c9b7750a2 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sat, 3 Aug 2024 19:33:22 +0200 Subject: [PATCH 1/4] element_at operator --- src/benchmarks/benchmarks.cpp | 15 ++++ src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/element_at.hpp | 99 ++++++++++++++++++++++++ src/rpp/rpp/operators/fwd.hpp | 2 + src/tests/rpp/test_element_at.cpp | 110 +++++++++++++++++++++++++++ 5 files changed, 227 insertions(+) create mode 100644 src/rpp/rpp/operators/element_at.hpp create mode 100644 src/tests/rpp/test_element_at.cpp diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index a90f0c962..f404f06b1 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -580,6 +580,21 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) | rxcpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); }); } + + SECTION("immediate_just(1,2,3)+element_at(1)+subscribe") + { + TEST_RPP([&]() { + rpp::immediate_just(1, 2, 3) + | rpp::operators::element_at(1) + | rpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + TEST_RXCPP([&]() { + rxcpp::immediate_just(1, 2, 3) + | rxcpp::operators::element_at(1) + | rxcpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } }; // BENCHMARK("Filtering Operators") BENCHMARK("Utility Operators") diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 0b2473a25..bc7c75bee 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -49,6 +49,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/element_at.hpp b/src/rpp/rpp/operators/element_at.hpp new file mode 100644 index 000000000..7fd47445f --- /dev/null +++ b/src/rpp/rpp/operators/element_at.hpp @@ -0,0 +1,99 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include + +#include + +namespace rpp::operators::details +{ + template + struct element_at_observer_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + size_t index{}; + mutable size_t current{}; + + template + void on_next(T&& v) const + { + if (current++ == index) + { + observer.on_next(std::forward(v)); + } + } + + void on_error(const std::exception_ptr& err) const { observer.on_error(err); } + + void on_completed() const + { + if (current <= index) + { + observer.on_error(std::make_exception_ptr(std::range_error{"index is out of bounds"})); + } + else + { + observer.on_completed(); + } + } + + void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); } + + bool is_disposed() const { return observer.is_disposed(); } + }; + + struct element_at_t : lift_operator + { + using lift_operator::lift_operator; + + template + struct operator_traits + { + using result_type = T; + + template TObserver> + using observer_strategy = element_at_observer_strategy; + }; + + template + using updated_disposable_strategy = Prev; + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + /** + * @brief Emit item located at specified `index` location in the sequence of items emitted by the source observable + * + * @marble take + { + source observable : +--1-2--3-4---| + operator "element_at(2)" : +-------3| + } + * @details If source observable completes without emitting at least `index` + 1 items, observable emits an error + * + * @param index index of the item to return + * @warning #include + * + * @ingroup filtering_operators + * @see https://reactivex.io/documentation/operators/elementat.html + */ + inline auto element_at(size_t index) + { + return details::element_at_t{index}; + } +} // namespace rpp::operators diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index efc52f2e6..a8d73803e 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -45,6 +45,8 @@ namespace rpp::operators requires (!utils::is_not_template_callable || std::same_as>) auto distinct_until_changed(EqualityFn&& equality_fn = {}); + auto element_at(size_t index); + auto first(); template diff --git a/src/tests/rpp/test_element_at.cpp b/src/tests/rpp/test_element_at.cpp new file mode 100644 index 000000000..9529b89b9 --- /dev/null +++ b/src/tests/rpp/test_element_at.cpp @@ -0,0 +1,110 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include +#include + +#include +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" + +TEST_CASE("element_at emits element at index") +{ + mock_observer_strategy mock{}; + + SECTION("sequence of values") + { + auto obs = rpp::source::just(1, 2, 3); + + SECTION("subscribe via element_at(1)") + { + (obs | rpp::operators::element_at(1)).subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{2}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("subscribe via element_at(0)") + { + (obs | rpp::operators::element_at(0)).subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{1}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("subscribe via element_at(3)") + { + (obs | rpp::operators::element_at(3)).subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + + SECTION("empty sequence") + { + auto obs = rpp::source::create([](auto&& obs) { + obs.on_completed(); + }); + + SECTION("subscribe via element_at(0)") + { + (obs | rpp::operators::element_at(0)).subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + + SECTION("error sequence") + { + auto obs = rpp::source::create([](auto&& obs) { + obs.on_error({}); + }); + + SECTION("subscribe via element_at(0)") + { + (obs | rpp::operators::element_at(0)).subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } +} + +TEST_CASE("element_at doesn't produce extra copies") +{ + SECTION("element_at(1)") + { + copy_count_tracker::test_operator(rpp::ops::element_at(1), + { + .send_by_copy = {.copy_count = 1, // 1 copy to final subscriber + .move_count = 0}, + .send_by_move = {.copy_count = 0, + .move_count = 1} // 1 move to final subscriber + }, + 2); + } +} + +TEST_CASE("take satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::element_at(1)); +} \ No newline at end of file From 1f1051c1cc8eb598f2d7839a8dc9e1504fc4f9b3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 3 Aug 2024 17:35:06 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/operators/publish.hpp | 2 +- src/tests/rpp/test_element_at.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/publish.hpp b/src/rpp/rpp/operators/publish.hpp index 31db520fd..25afff05d 100644 --- a/src/rpp/rpp/operators/publish.hpp +++ b/src/rpp/rpp/operators/publish.hpp @@ -1,7 +1,7 @@ #pragma once -#include #include +#include namespace rpp::operators { diff --git a/src/tests/rpp/test_element_at.cpp b/src/tests/rpp/test_element_at.cpp index 9529b89b9..5e2069445 100644 --- a/src/tests/rpp/test_element_at.cpp +++ b/src/tests/rpp/test_element_at.cpp @@ -107,4 +107,4 @@ TEST_CASE("element_at doesn't produce extra copies") TEST_CASE("take satisfies disposable contracts") { test_operator_with_disposable(rpp::ops::element_at(1)); -} \ No newline at end of file +} From d586a714b092c4b0deb448156675af159e31b94e Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Mon, 5 Aug 2024 20:57:07 +0200 Subject: [PATCH 3/4] Address comments --- src/rpp/rpp/operators/element_at.hpp | 8 +++++--- src/rpp/rpp/utils/exceptions.hpp | 5 +++++ src/tests/rpp/test_element_at.cpp | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/rpp/rpp/operators/element_at.hpp b/src/rpp/rpp/operators/element_at.hpp index 7fd47445f..a2ac007ac 100644 --- a/src/rpp/rpp/operators/element_at.hpp +++ b/src/rpp/rpp/operators/element_at.hpp @@ -14,6 +14,7 @@ #include #include +#include #include @@ -34,6 +35,7 @@ namespace rpp::operators::details if (current++ == index) { observer.on_next(std::forward(v)); + observer.on_completed(); } } @@ -43,9 +45,9 @@ namespace rpp::operators::details { if (current <= index) { - observer.on_error(std::make_exception_ptr(std::range_error{"index is out of bounds"})); + observer.on_error(std::make_exception_ptr(utils::out_of_range{"index is out of bounds"})); } - else + else { observer.on_completed(); } @@ -79,7 +81,7 @@ namespace rpp::operators /** * @brief Emit item located at specified `index` location in the sequence of items emitted by the source observable * - * @marble take + * @marble element_at { source observable : +--1-2--3-4---| operator "element_at(2)" : +-------3| diff --git a/src/rpp/rpp/utils/exceptions.hpp b/src/rpp/rpp/utils/exceptions.hpp index 6569b0207..df693b26e 100644 --- a/src/rpp/rpp/utils/exceptions.hpp +++ b/src/rpp/rpp/utils/exceptions.hpp @@ -28,4 +28,9 @@ namespace rpp::utils { using std::runtime_error::runtime_error; }; + + struct out_of_range : public std::range_error + { + using std::range_error::range_error; + }; } // namespace rpp::utils diff --git a/src/tests/rpp/test_element_at.cpp b/src/tests/rpp/test_element_at.cpp index 5e2069445..c428e8606 100644 --- a/src/tests/rpp/test_element_at.cpp +++ b/src/tests/rpp/test_element_at.cpp @@ -104,7 +104,7 @@ TEST_CASE("element_at doesn't produce extra copies") } } -TEST_CASE("take satisfies disposable contracts") +TEST_CASE("element_at satisfies disposable contracts") { test_operator_with_disposable(rpp::ops::element_at(1)); } From e2c46c62fb96625a5e35e093f4ab122d79e0ee34 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 5 Aug 2024 18:57:19 +0000 Subject: [PATCH 4/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/operators/element_at.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/element_at.hpp b/src/rpp/rpp/operators/element_at.hpp index a2ac007ac..05dce746e 100644 --- a/src/rpp/rpp/operators/element_at.hpp +++ b/src/rpp/rpp/operators/element_at.hpp @@ -35,7 +35,7 @@ namespace rpp::operators::details if (current++ == index) { observer.on_next(std::forward(v)); - observer.on_completed(); + observer.on_completed(); } } @@ -47,7 +47,7 @@ namespace rpp::operators::details { observer.on_error(std::make_exception_ptr(utils::out_of_range{"index is out of bounds"})); } - else + else { observer.on_completed(); }