Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/benchmarks/benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,21 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
| rxcpp::operators::subscribe<int>([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}

SECTION("immediate_just(1,2,3)+element_at(1)+subscribe")
{
TEST_RPP([&]() {
rpp::immediate_just(1, 2, 3)
| rpp::operators::element_at(1)
| rpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});

TEST_RXCPP([&]() {
rxcpp::immediate_just(1, 2, 3)
| rxcpp::operators::element_at(1)
| rxcpp::operators::subscribe<int>([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}
}; // BENCHMARK("Filtering Operators")

BENCHMARK("Utility Operators")
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include <rpp/operators/debounce.hpp>
#include <rpp/operators/distinct.hpp>
#include <rpp/operators/distinct_until_changed.hpp>
#include <rpp/operators/element_at.hpp>
#include <rpp/operators/filter.hpp>
#include <rpp/operators/first.hpp>
#include <rpp/operators/last.hpp>
Expand Down
101 changes: 101 additions & 0 deletions src/rpp/rpp/operators/element_at.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/operators/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/utils/exceptions.hpp>

#include <cstddef>

namespace rpp::operators::details
{
template<rpp::constraint::observer TObserver>
struct element_at_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

RPP_NO_UNIQUE_ADDRESS TObserver observer;
size_t index{};
mutable size_t current{};
Comment thread
AlexInLog marked this conversation as resolved.

template<typename T>
void on_next(T&& v) const
{
if (current++ == index)
{
observer.on_next(std::forward<T>(v));
Comment thread
AlexInLog marked this conversation as resolved.
observer.on_completed();
}
}

void on_error(const std::exception_ptr& err) const { observer.on_error(err); }

void on_completed() const
{
if (current <= index)
{
observer.on_error(std::make_exception_ptr(utils::out_of_range{"index is out of bounds"}));
}
else
{
observer.on_completed();
}
}

void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); }

bool is_disposed() const { return observer.is_disposed(); }
};

struct element_at_t : lift_operator<element_at_t, size_t>
{
using lift_operator<element_at_t, size_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
using result_type = T;

template<rpp::constraint::observer_of_type<result_type> TObserver>
using observer_strategy = element_at_observer_strategy<TObserver>;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = Prev;
};
} // namespace rpp::operators::details

namespace rpp::operators
{
/**
* @brief Emit item located at specified `index` location in the sequence of items emitted by the source observable
*
* @marble element_at
{
source observable : +--1-2--3-4---|
operator "element_at(2)" : +-------3|
}
* @details If source observable completes without emitting at least `index` + 1 items, observable emits an error
*
* @param index index of the item to return
* @warning #include <rpp/operators/element_at.hpp>
*
* @ingroup filtering_operators
* @see https://reactivex.io/documentation/operators/elementat.html
*/
inline auto element_at(size_t index)
{
return details::element_at_t{index};
}
} // namespace rpp::operators
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ namespace rpp::operators
requires (!utils::is_not_template_callable<EqualityFn> || std::same_as<bool, std::invoke_result_t<EqualityFn, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>>)
auto distinct_until_changed(EqualityFn&& equality_fn = {});

auto element_at(size_t index);

auto first();

template<typename Fn>
Expand Down
5 changes: 5 additions & 0 deletions src/rpp/rpp/utils/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ namespace rpp::utils
{
using std::runtime_error::runtime_error;
};

struct out_of_range : public std::range_error
{
using std::range_error::range_error;
};
} // namespace rpp::utils
110 changes: 110 additions & 0 deletions src/tests/rpp/test_element_at.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#include <catch2/catch_template_test_macros.hpp>
#include <catch2/catch_test_macros.hpp>

#include <rpp/observers/mock_observer.hpp>
#include <rpp/operators/element_at.hpp>
#include <rpp/sources/create.hpp>
#include <rpp/sources/empty.hpp>
#include <rpp/sources/just.hpp>

#include "copy_count_tracker.hpp"
#include "disposable_observable.hpp"

TEST_CASE("element_at emits element at index")
{
mock_observer_strategy<int> mock{};

SECTION("sequence of values")
{
auto obs = rpp::source::just(1, 2, 3);

SECTION("subscribe via element_at(1)")
{
(obs | rpp::operators::element_at(1)).subscribe(mock);

CHECK(mock.get_received_values() == std::vector{2});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}

SECTION("subscribe via element_at(0)")
{
(obs | rpp::operators::element_at(0)).subscribe(mock);

CHECK(mock.get_received_values() == std::vector{1});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}

SECTION("subscribe via element_at(3)")
{
(obs | rpp::operators::element_at(3)).subscribe(mock);

CHECK(mock.get_received_values() == std::vector<int>{});
CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
}
}

SECTION("empty sequence")
{
auto obs = rpp::source::create<int>([](auto&& obs) {
obs.on_completed();
});

SECTION("subscribe via element_at(0)")
{
(obs | rpp::operators::element_at(0)).subscribe(mock);

CHECK(mock.get_received_values() == std::vector<int>{});
CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
}
}

SECTION("error sequence")
{
auto obs = rpp::source::create<int>([](auto&& obs) {
obs.on_error({});
});

SECTION("subscribe via element_at(0)")
{
(obs | rpp::operators::element_at(0)).subscribe(mock);

CHECK(mock.get_received_values() == std::vector<int>{});
CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
}
}
}

TEST_CASE("element_at doesn't produce extra copies")
{
SECTION("element_at(1)")
{
copy_count_tracker::test_operator(rpp::ops::element_at(1),
{
.send_by_copy = {.copy_count = 1, // 1 copy to final subscriber
.move_count = 0},
.send_by_move = {.copy_count = 0,
.move_count = 1} // 1 move to final subscriber
},
2);
}
}

TEST_CASE("element_at satisfies disposable contracts")
{
test_operator_with_disposable<int>(rpp::ops::element_at(1));
}