From 2dcb8021937fabecec361ab970fb6cd07072fb41 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Wed, 7 Feb 2024 23:09:02 +0100 Subject: [PATCH 1/3] Add distinct operator --- src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/distinct.hpp | 81 ++++++++++++++++++++++++++++++ src/rpp/rpp/utils/constraints.hpp | 6 ++- src/rpp/rpp/utils/tuple.hpp | 2 +- src/tests/rpp/test_distinct.cpp | 71 ++++++++++++++++++++++++++ 5 files changed, 159 insertions(+), 2 deletions(-) create mode 100644 src/rpp/rpp/operators/distinct.hpp create mode 100644 src/tests/rpp/test_distinct.cpp diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 0b19cccd1..a21a73327 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -47,6 +47,7 @@ */ #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/distinct.hpp b/src/rpp/rpp/operators/distinct.hpp new file mode 100644 index 000000000..fc8a50853 --- /dev/null +++ b/src/rpp/rpp/operators/distinct.hpp @@ -0,0 +1,81 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include +#include + +#include + +namespace rpp::operators::details +{ +template +struct distinct_observer_strategy +{ + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + mutable std::unordered_set past_values{}; + + template + void on_next(T&& v) const + { + const auto [it, inserted] = past_values.insert(std::forward(v)); + if (inserted) + { + observer.on_next(*it); + } + } + + void on_error(const std::exception_ptr& err) const { observer.on_error(err); } + + void on_completed() const { observer.on_completed(); } + + void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); } + + bool is_disposed() const { return observer.is_disposed(); } +}; + +struct distinct_t : public operators::details::template_operator_observable_strategy +{ + template + requires constraint::hashable + using result_value = T; + + template + using updated_disposable_strategy = Prev; +}; +} + +namespace rpp::operators +{ +/** + * @brief For each item from this observable, filter out repeated values and emit only items that have not already been emitted + * + * @marble distinct + { + source observable : +--1-1-2-2-3-2-1-| + operator "distinct" : +--1---2---3-----| + } + * + * @warning This operator keeps an `std::unordered_set` of past values, so std::hash specialization is required. + * + * @ingroup filtering_operators + * @see https://reactivex.io/documentation/operators/distinct.html + */ +inline auto distinct() +{ + return details::distinct_t{}; +} +} \ No newline at end of file diff --git a/src/rpp/rpp/utils/constraints.hpp b/src/rpp/rpp/utils/constraints.hpp index 9846adb9b..097399288 100644 --- a/src/rpp/rpp/utils/constraints.hpp +++ b/src/rpp/rpp/utils/constraints.hpp @@ -46,10 +46,14 @@ concept is_constructible_from = requires(Args... args) {T{static_cast(args)...}} -> std::same_as; }; - template concept invocable_r_v = std::invocable && std::same_as>; template concept is_nothrow_invocable = std::is_nothrow_invocable_v; + +template +concept hashable = requires(T a) { + { std::hash{}(a) } -> std::convertible_to; +}; } // namespace rpp::constraint diff --git a/src/rpp/rpp/utils/tuple.hpp b/src/rpp/rpp/utils/tuple.hpp index ffd637cc5..18620b216 100644 --- a/src/rpp/rpp/utils/tuple.hpp +++ b/src/rpp/rpp/utils/tuple.hpp @@ -18,7 +18,7 @@ namespace rpp::details { -template +template class tuple_leaf { public: diff --git a/src/tests/rpp/test_distinct.cpp b/src/tests/rpp/test_distinct.cpp new file mode 100644 index 000000000..6f6aa699a --- /dev/null +++ b/src/tests/rpp/test_distinct.cpp @@ -0,0 +1,71 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" +#include "mock_observer.hpp" + +TEMPLATE_TEST_CASE("distinct filters out repeated values and emit only items that have not already been emitted", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared) +{ + auto mock = mock_observer_strategy{}; + auto obs = rpp::source::just(1, 1, 2, 2, 3, 4, 4, 2, 2, 1, 3); + + SECTION("WHEN subscribe on observable with duplicates via distinct THEN subscriber obtains values without consecutive duplicates") + { + obs | rpp::ops::distinct() | rpp::ops::subscribe(mock); + CHECK(mock.get_received_values() == std::vector{1, 2, 3, 4}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } +} + +TEST_CASE("distinct forwards error") +{ + auto mock = mock_observer_strategy{}; + + rpp::source::error({}) | rpp::operators::distinct() | rpp::ops::subscribe(mock); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); +} + +TEST_CASE("distinct forwards completion") +{ + auto mock = mock_observer_strategy{}; + rpp::source::empty() | rpp::operators::distinct() | rpp::ops::subscribe(mock); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); +} + +TEST_CASE("distinct doesn't produce extra copies") +{ + copy_count_tracker::test_operator(rpp::ops::distinct(), + { + .send_by_copy = {.copy_count = 0, + .move_count = 0}, + .send_by_move = {.copy_count = 0, + .move_count = 0} + }); +} + +/* +TEST_CASE("distinct satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::distinct()); +} +*/ \ No newline at end of file From f533f8fdfb2bfe620c2ded88bf30570b4aad6b91 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Wed, 7 Feb 2024 23:10:56 +0100 Subject: [PATCH 2/3] Small nits --- src/rpp/rpp/operators/distinct.hpp | 2 +- src/tests/rpp/test_distinct.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/distinct.hpp b/src/rpp/rpp/operators/distinct.hpp index fc8a50853..8e1ca5204 100644 --- a/src/rpp/rpp/operators/distinct.hpp +++ b/src/rpp/rpp/operators/distinct.hpp @@ -50,7 +50,7 @@ struct distinct_observer_strategy struct distinct_t : public operators::details::template_operator_observable_strategy { template - requires constraint::hashable + requires rpp::constraint::hashable using result_value = T; template diff --git a/src/tests/rpp/test_distinct.cpp b/src/tests/rpp/test_distinct.cpp index 6f6aa699a..59a6a5a21 100644 --- a/src/tests/rpp/test_distinct.cpp +++ b/src/tests/rpp/test_distinct.cpp @@ -24,7 +24,7 @@ TEMPLATE_TEST_CASE("distinct filters out repeated values and emit only items tha auto mock = mock_observer_strategy{}; auto obs = rpp::source::just(1, 1, 2, 2, 3, 4, 4, 2, 2, 1, 3); - SECTION("WHEN subscribe on observable with duplicates via distinct THEN subscriber obtains values without consecutive duplicates") + SECTION("WHEN subscribe on observable with duplicates via distinct THEN subscriber obtains values without duplicates") { obs | rpp::ops::distinct() | rpp::ops::subscribe(mock); CHECK(mock.get_received_values() == std::vector{1, 2, 3, 4}); From e6c76af2bce49c67563660d5a3c90f09e5a6a961 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Thu, 8 Feb 2024 21:43:17 +0100 Subject: [PATCH 3/3] Make copy_count_tracker hashable + improve compile error --- src/rpp/rpp/operators/distinct.hpp | 3 ++- src/tests/rpp/test_distinct.cpp | 10 ++++------ src/tests/utils/copy_count_tracker.hpp | 14 ++++++++++++++ 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/rpp/rpp/operators/distinct.hpp b/src/rpp/rpp/operators/distinct.hpp index 8e1ca5204..dcc71d5f3 100644 --- a/src/rpp/rpp/operators/distinct.hpp +++ b/src/rpp/rpp/operators/distinct.hpp @@ -23,6 +23,8 @@ namespace rpp::operators::details template struct distinct_observer_strategy { + static_assert(rpp::constraint::hashable, "Distinct operator requires hashable type"); + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; RPP_NO_UNIQUE_ADDRESS TObserver observer; @@ -50,7 +52,6 @@ struct distinct_observer_strategy struct distinct_t : public operators::details::template_operator_observable_strategy { template - requires rpp::constraint::hashable using result_value = T; template diff --git a/src/tests/rpp/test_distinct.cpp b/src/tests/rpp/test_distinct.cpp index 59a6a5a21..2e5f0b02d 100644 --- a/src/tests/rpp/test_distinct.cpp +++ b/src/tests/rpp/test_distinct.cpp @@ -56,16 +56,14 @@ TEST_CASE("distinct doesn't produce extra copies") { copy_count_tracker::test_operator(rpp::ops::distinct(), { - .send_by_copy = {.copy_count = 0, + .send_by_copy = {.copy_count = 2, // 1 copy on emission + 1 copy to final subscriber .move_count = 0}, - .send_by_move = {.copy_count = 0, - .move_count = 0} + .send_by_move = {.copy_count = 1, // 1 copy to final subscriber + .move_count = 1} // 1 move on emission }); } -/* TEST_CASE("distinct satisfies disposable contracts") { test_operator_with_disposable(rpp::ops::distinct()); -} -*/ \ No newline at end of file +} \ No newline at end of file diff --git a/src/tests/utils/copy_count_tracker.hpp b/src/tests/utils/copy_count_tracker.hpp index 8aac6ccfd..eedf8f1e1 100644 --- a/src/tests/utils/copy_count_tracker.hpp +++ b/src/tests/utils/copy_count_tracker.hpp @@ -116,3 +116,17 @@ class copy_count_tracker private: std::shared_ptr _state; }; + +namespace std +{ +// Make copy_count_tracker hashable for distinct operator +template<> +struct hash +{ + size_t operator()(const copy_count_tracker& tracker) const noexcept + { + return std::hash{}(tracker.get_copy_count()) + ^ (std::hash{}(tracker.get_move_count()) << 1); + } +}; +}