diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 0b19cccd1..a21a73327 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -47,6 +47,7 @@ */ #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/distinct.hpp b/src/rpp/rpp/operators/distinct.hpp new file mode 100644 index 000000000..dcc71d5f3 --- /dev/null +++ b/src/rpp/rpp/operators/distinct.hpp @@ -0,0 +1,82 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include +#include + +#include + +namespace rpp::operators::details +{ +template +struct distinct_observer_strategy +{ + static_assert(rpp::constraint::hashable, "Distinct operator requires hashable type"); + + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + mutable std::unordered_set past_values{}; + + template + void on_next(T&& v) const + { + const auto [it, inserted] = past_values.insert(std::forward(v)); + if (inserted) + { + observer.on_next(*it); + } + } + + void on_error(const std::exception_ptr& err) const { observer.on_error(err); } + + void on_completed() const { observer.on_completed(); } + + void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); } + + bool is_disposed() const { return observer.is_disposed(); } +}; + +struct distinct_t : public operators::details::template_operator_observable_strategy +{ + template + using result_value = T; + + template + using updated_disposable_strategy = Prev; +}; +} + +namespace rpp::operators +{ +/** + * @brief For each item from this observable, filter out repeated values and emit only items that have not already been emitted + * + * @marble distinct + { + source observable : +--1-1-2-2-3-2-1-| + operator "distinct" : +--1---2---3-----| + } + * + * @warning This operator keeps an `std::unordered_set` of past values, so std::hash specialization is required. + * + * @ingroup filtering_operators + * @see https://reactivex.io/documentation/operators/distinct.html + */ +inline auto distinct() +{ + return details::distinct_t{}; +} +} \ No newline at end of file diff --git a/src/rpp/rpp/utils/constraints.hpp b/src/rpp/rpp/utils/constraints.hpp index 9846adb9b..097399288 100644 --- a/src/rpp/rpp/utils/constraints.hpp +++ b/src/rpp/rpp/utils/constraints.hpp @@ -46,10 +46,14 @@ concept is_constructible_from = requires(Args... args) {T{static_cast(args)...}} -> std::same_as; }; - template concept invocable_r_v = std::invocable && std::same_as>; template concept is_nothrow_invocable = std::is_nothrow_invocable_v; + +template +concept hashable = requires(T a) { + { std::hash{}(a) } -> std::convertible_to; +}; } // namespace rpp::constraint diff --git a/src/rpp/rpp/utils/tuple.hpp b/src/rpp/rpp/utils/tuple.hpp index ffd637cc5..18620b216 100644 --- a/src/rpp/rpp/utils/tuple.hpp +++ b/src/rpp/rpp/utils/tuple.hpp @@ -18,7 +18,7 @@ namespace rpp::details { -template +template class tuple_leaf { public: diff --git a/src/tests/rpp/test_distinct.cpp b/src/tests/rpp/test_distinct.cpp new file mode 100644 index 000000000..2e5f0b02d --- /dev/null +++ b/src/tests/rpp/test_distinct.cpp @@ -0,0 +1,69 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" +#include "mock_observer.hpp" + +TEMPLATE_TEST_CASE("distinct filters out repeated values and emit only items that have not already been emitted", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared) +{ + auto mock = mock_observer_strategy{}; + auto obs = rpp::source::just(1, 1, 2, 2, 3, 4, 4, 2, 2, 1, 3); + + SECTION("WHEN subscribe on observable with duplicates via distinct THEN subscriber obtains values without duplicates") + { + obs | rpp::ops::distinct() | rpp::ops::subscribe(mock); + CHECK(mock.get_received_values() == std::vector{1, 2, 3, 4}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } +} + +TEST_CASE("distinct forwards error") +{ + auto mock = mock_observer_strategy{}; + + rpp::source::error({}) | rpp::operators::distinct() | rpp::ops::subscribe(mock); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); +} + +TEST_CASE("distinct forwards completion") +{ + auto mock = mock_observer_strategy{}; + rpp::source::empty() | rpp::operators::distinct() | rpp::ops::subscribe(mock); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); +} + +TEST_CASE("distinct doesn't produce extra copies") +{ + copy_count_tracker::test_operator(rpp::ops::distinct(), + { + .send_by_copy = {.copy_count = 2, // 1 copy on emission + 1 copy to final subscriber + .move_count = 0}, + .send_by_move = {.copy_count = 1, // 1 copy to final subscriber + .move_count = 1} // 1 move on emission + }); +} + +TEST_CASE("distinct satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::distinct()); +} \ No newline at end of file diff --git a/src/tests/utils/copy_count_tracker.hpp b/src/tests/utils/copy_count_tracker.hpp index 8aac6ccfd..eedf8f1e1 100644 --- a/src/tests/utils/copy_count_tracker.hpp +++ b/src/tests/utils/copy_count_tracker.hpp @@ -116,3 +116,17 @@ class copy_count_tracker private: std::shared_ptr _state; }; + +namespace std +{ +// Make copy_count_tracker hashable for distinct operator +template<> +struct hash +{ + size_t operator()(const copy_count_tracker& tracker) const noexcept + { + return std::hash{}(tracker.get_copy_count()) + ^ (std::hash{}(tracker.get_move_count()) << 1); + } +}; +}