From 3b6d5e3cb9a2378626d50f92c11b7b3e1058a804 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sat, 10 Feb 2024 11:49:18 +0100 Subject: [PATCH 1/2] Add tap operator --- src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/tap.hpp | 158 ++++++++++++++++++++++++++++++++++ src/tests/rpp/test_tap.cpp | 95 ++++++++++++++++++++ 3 files changed, 254 insertions(+) create mode 100644 src/rpp/rpp/operators/tap.hpp create mode 100644 src/tests/rpp/test_tap.cpp diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index a21a73327..d91434379 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -94,6 +94,7 @@ #include #include #include +#include /** * @defgroup connectable_operators Connectable Operators diff --git a/src/rpp/rpp/operators/tap.hpp b/src/rpp/rpp/operators/tap.hpp new file mode 100644 index 000000000..29aa775ec --- /dev/null +++ b/src/rpp/rpp/operators/tap.hpp @@ -0,0 +1,158 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include + +namespace rpp::operators::details +{ +template< + rpp::constraint::observer TObserver, + rpp::constraint::decayed_type OnNext, + rpp::constraint::decayed_type OnError, + rpp::constraint::decayed_type OnCompleted> +struct tap_observer_strategy +{ + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + RPP_NO_UNIQUE_ADDRESS OnNext onNext; + RPP_NO_UNIQUE_ADDRESS OnError onError; + RPP_NO_UNIQUE_ADDRESS OnCompleted onCompleted; + + template + void on_next(T&& v) const + { + onNext(utils::as_const(v)); + observer.on_next(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const + { + onError(err); + observer.on_error(err); + } + + void on_completed() const + { + onCompleted(); + observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); } + + bool is_disposed() const { return observer.is_disposed(); } +}; + +template< + rpp::constraint::decayed_type OnNext, + rpp::constraint::decayed_type OnError, + rpp::constraint::decayed_type OnCompleted> +struct tap_t : public operators::details::operator_observable_strategy +{ + using operators::details::operator_observable_strategy::operator_observable_strategy; + + template + requires rpp::constraint::invocable_r_v + using result_value = T; + + template + using updated_disposable_strategy = Prev; +}; +} + +namespace rpp::operators +{ +/** + * @brief Register callbacks to inspect observable emissions and perform side-effects + * + * @param on_error error handler + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/do.html + */ +template OnError = rpp::utils::empty_function_t, + std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> +auto tap(OnError&& on_error) +{ + return details::tap_t, std::decay_t, std::decay_t>{ + OnNext{}, + std::forward(on_error), + OnCompleted{}}; +} + +/** + * @brief Register callbacks to inspect observable emissions and perform side-effects + * + * @param on_completed completion handler + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/do.html + */ +template OnError = rpp::utils::empty_function_t, + std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> +auto tap(OnCompleted&& on_completed) +{ + return details::tap_t, std::decay_t, std::decay_t>{ + OnNext{}, + OnError{}, + std::forward(on_completed)}; +} + +/** + * @brief Register callbacks to inspect observable emissions and perform side-effects + * + * @param on_next next handler + * @param on_completed completion handler + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/do.html + */ +template OnError = rpp::utils::empty_function_t, + std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> +auto tap(OnNext&& on_next, + OnCompleted&& on_completed) +{ + return details::tap_t, std::decay_t, std::decay_t>{ + std::forward(on_next), + OnError{}, + std::forward(on_completed)}; +} + +/** + * @brief Register callbacks to inspect observable emissions and perform side-effects + * + * @param on_next next handler + * @param on_error error handler + * @param on_completed completion handler + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/do.html + */ +template OnError = rpp::utils::empty_function_t, + std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> +auto tap(OnNext&& on_next = {}, + OnError&& on_error = {}, + OnCompleted&& on_completed = {}) +{ + return details::tap_t, std::decay_t, std::decay_t>{ + std::forward(on_next), + std::forward(on_error), + std::forward(on_completed)}; +} +} // namespace rpp::operators \ No newline at end of file diff --git a/src/tests/rpp/test_tap.cpp b/src/tests/rpp/test_tap.cpp new file mode 100644 index 000000000..f08d5e5e0 --- /dev/null +++ b/src/tests/rpp/test_tap.cpp @@ -0,0 +1,95 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" +#include "mock_observer.hpp" + +TEMPLATE_TEST_CASE("tap observes emissions and doesn't modify them", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared) +{ + auto mock = mock_observer_strategy{}; + + SECTION("observable with error emission") + { + auto obs = + rpp::source::concat(rpp::source::just(1, 2, 3), + rpp::source::error(std::make_exception_ptr(std::runtime_error{""}))); + + SECTION("subscribe") + { + size_t on_next_invoked = 0; + size_t on_error_invoked = 0; + + // clang-format off + obs | rpp::ops::tap( + [&](const int&) { ++on_next_invoked; }, + [&](const std::exception_ptr&) { ++on_error_invoked; }) + | rpp::ops::subscribe(mock); + // clang-format on + + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + + CHECK(on_next_invoked == mock.get_total_on_next_count()); + CHECK(on_error_invoked == mock.get_on_error_count()); + } + } + + SECTION("observable with completed emission") + { + auto obs = rpp::source::just(1, 2, 3); + + SECTION("subscribe") + { + size_t on_next_invoked = 0; + size_t on_completed_invoked = 0; + + // clang-format off + obs | rpp::ops::tap( + [&](const int&) { ++on_next_invoked; }, + [&]() { ++on_completed_invoked; }) + | rpp::ops::subscribe(mock); + // clang-format on + + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + + CHECK(on_next_invoked == mock.get_total_on_next_count()); + CHECK(on_completed_invoked == mock.get_on_completed_count()); + } + } +} + +TEST_CASE("tap doesn't produce extra copies") +{ + // clang-format off + copy_count_tracker::test_operator(rpp::ops::tap(), + { + .send_by_copy = { .copy_count = 1, // 1 copy on emission + .move_count = 0 }, + .send_by_move = { .copy_count = 0, + .move_count = 1 } // 1 move on emission + }); + // clang-format on +} + +TEST_CASE("tap satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::tap()); +} \ No newline at end of file From 5b39ceb5ecbb778823a718289c838e5bdf6c8e3a Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sat, 10 Feb 2024 12:35:50 +0100 Subject: [PATCH 2/2] Address comments --- src/rpp/rpp/operators/tap.hpp | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/rpp/rpp/operators/tap.hpp b/src/rpp/rpp/operators/tap.hpp index 29aa775ec..17b44619f 100644 --- a/src/rpp/rpp/operators/tap.hpp +++ b/src/rpp/rpp/operators/tap.hpp @@ -82,11 +82,12 @@ namespace rpp::operators * @ingroup utility_operators * @see https://reactivex.io/documentation/operators/do.html */ -template OnError = rpp::utils::empty_function_t, - std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> +template OnError = rpp::utils::empty_function_t> auto tap(OnError&& on_error) { + using OnNext = rpp::utils::empty_function_any_t; + using OnCompleted = rpp::utils::empty_function_t<>; + return details::tap_t, std::decay_t, std::decay_t>{ OnNext{}, std::forward(on_error), @@ -101,11 +102,12 @@ auto tap(OnError&& on_error) * @ingroup utility_operators * @see https://reactivex.io/documentation/operators/do.html */ -template OnError = rpp::utils::empty_function_t, - std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> +template OnCompleted = rpp::utils::empty_function_t<>> auto tap(OnCompleted&& on_completed) { + using OnNext = rpp::utils::empty_function_any_t; + using OnError = rpp::utils::empty_function_t; + return details::tap_t, std::decay_t, std::decay_t>{ OnNext{}, OnError{}, @@ -122,11 +124,12 @@ auto tap(OnCompleted&& on_completed) * @see https://reactivex.io/documentation/operators/do.html */ template OnError = rpp::utils::empty_function_t, - std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> + std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> auto tap(OnNext&& on_next, OnCompleted&& on_completed) { + using OnError = rpp::utils::empty_function_t; + return details::tap_t, std::decay_t, std::decay_t>{ std::forward(on_next), OnError{},