diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index a90f0c962..f404f06b1 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -580,6 +580,21 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) | rxcpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); }); } + + SECTION("immediate_just(1,2,3)+element_at(1)+subscribe") + { + TEST_RPP([&]() { + rpp::immediate_just(1, 2, 3) + | rpp::operators::element_at(1) + | rpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + TEST_RXCPP([&]() { + rxcpp::immediate_just(1, 2, 3) + | rxcpp::operators::element_at(1) + | rxcpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } }; // BENCHMARK("Filtering Operators") BENCHMARK("Utility Operators") diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 0b2473a25..bc7c75bee 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -49,6 +49,7 @@ #include #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/element_at.hpp b/src/rpp/rpp/operators/element_at.hpp new file mode 100644 index 000000000..05dce746e --- /dev/null +++ b/src/rpp/rpp/operators/element_at.hpp @@ -0,0 +1,101 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include +#include + +#include + +namespace rpp::operators::details +{ + template + struct element_at_observer_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + size_t index{}; + mutable size_t current{}; + + template + void on_next(T&& v) const + { + if (current++ == index) + { + observer.on_next(std::forward(v)); + observer.on_completed(); + } + } + + void on_error(const std::exception_ptr& err) const { observer.on_error(err); } + + void on_completed() const + { + if (current <= index) + { + observer.on_error(std::make_exception_ptr(utils::out_of_range{"index is out of bounds"})); + } + else + { + observer.on_completed(); + } + } + + void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); } + + bool is_disposed() const { return observer.is_disposed(); } + }; + + struct element_at_t : lift_operator + { + using lift_operator::lift_operator; + + template + struct operator_traits + { + using result_type = T; + + template TObserver> + using observer_strategy = element_at_observer_strategy; + }; + + template + using updated_disposable_strategy = Prev; + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + /** + * @brief Emit item located at specified `index` location in the sequence of items emitted by the source observable + * + * @marble element_at + { + source observable : +--1-2--3-4---| + operator "element_at(2)" : +-------3| + } + * @details If source observable completes without emitting at least `index` + 1 items, observable emits an error + * + * @param index index of the item to return + * @warning #include + * + * @ingroup filtering_operators + * @see https://reactivex.io/documentation/operators/elementat.html + */ + inline auto element_at(size_t index) + { + return details::element_at_t{index}; + } +} // namespace rpp::operators diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index efc52f2e6..a8d73803e 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -45,6 +45,8 @@ namespace rpp::operators requires (!utils::is_not_template_callable || std::same_as>) auto distinct_until_changed(EqualityFn&& equality_fn = {}); + auto element_at(size_t index); + auto first(); template diff --git a/src/rpp/rpp/utils/exceptions.hpp b/src/rpp/rpp/utils/exceptions.hpp index 6569b0207..df693b26e 100644 --- a/src/rpp/rpp/utils/exceptions.hpp +++ b/src/rpp/rpp/utils/exceptions.hpp @@ -28,4 +28,9 @@ namespace rpp::utils { using std::runtime_error::runtime_error; }; + + struct out_of_range : public std::range_error + { + using std::range_error::range_error; + }; } // namespace rpp::utils diff --git a/src/tests/rpp/test_element_at.cpp b/src/tests/rpp/test_element_at.cpp new file mode 100644 index 000000000..c428e8606 --- /dev/null +++ b/src/tests/rpp/test_element_at.cpp @@ -0,0 +1,110 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include +#include + +#include +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" + +TEST_CASE("element_at emits element at index") +{ + mock_observer_strategy mock{}; + + SECTION("sequence of values") + { + auto obs = rpp::source::just(1, 2, 3); + + SECTION("subscribe via element_at(1)") + { + (obs | rpp::operators::element_at(1)).subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{2}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("subscribe via element_at(0)") + { + (obs | rpp::operators::element_at(0)).subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{1}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("subscribe via element_at(3)") + { + (obs | rpp::operators::element_at(3)).subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + + SECTION("empty sequence") + { + auto obs = rpp::source::create([](auto&& obs) { + obs.on_completed(); + }); + + SECTION("subscribe via element_at(0)") + { + (obs | rpp::operators::element_at(0)).subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + + SECTION("error sequence") + { + auto obs = rpp::source::create([](auto&& obs) { + obs.on_error({}); + }); + + SECTION("subscribe via element_at(0)") + { + (obs | rpp::operators::element_at(0)).subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } +} + +TEST_CASE("element_at doesn't produce extra copies") +{ + SECTION("element_at(1)") + { + copy_count_tracker::test_operator(rpp::ops::element_at(1), + { + .send_by_copy = {.copy_count = 1, // 1 copy to final subscriber + .move_count = 0}, + .send_by_move = {.copy_count = 0, + .move_count = 1} // 1 move to final subscriber + }, + 2); + } +} + +TEST_CASE("element_at satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::element_at(1)); +}