Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
*/

#include <rpp/operators/debounce.hpp>
#include <rpp/operators/distinct.hpp>
#include <rpp/operators/distinct_until_changed.hpp>
#include <rpp/operators/filter.hpp>
#include <rpp/operators/first.hpp>
Expand Down
82 changes: 82 additions & 0 deletions src/rpp/rpp/operators/distinct.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/operators/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/utils/constraints.hpp>

#include <unordered_set>

namespace rpp::operators::details
{
template<rpp::constraint::decayed_type Type, rpp::constraint::observer TObserver>
struct distinct_observer_strategy
{
static_assert(rpp::constraint::hashable<Type>, "Distinct operator requires hashable type");

using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

RPP_NO_UNIQUE_ADDRESS TObserver observer;
mutable std::unordered_set<Type> past_values{};

template<typename T>
void on_next(T&& v) const
{
const auto [it, inserted] = past_values.insert(std::forward<T>(v));
if (inserted)
{
observer.on_next(*it);
}
}

void on_error(const std::exception_ptr& err) const { observer.on_error(err); }

void on_completed() const { observer.on_completed(); }

void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); }

bool is_disposed() const { return observer.is_disposed(); }
};

struct distinct_t : public operators::details::template_operator_observable_strategy<distinct_observer_strategy>
{
template<rpp::constraint::decayed_type T>
using result_value = T;

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = Prev;
};
}

namespace rpp::operators
{
/**
* @brief For each item from this observable, filter out repeated values and emit only items that have not already been emitted
*
* @marble distinct
{
source observable : +--1-1-2-2-3-2-1-|
operator "distinct" : +--1---2---3-----|
}
*
* @warning This operator keeps an `std::unordered_set<T>` of past values, so std::hash<T> specialization is required.
*
* @ingroup filtering_operators
* @see https://reactivex.io/documentation/operators/distinct.html
*/
inline auto distinct()
{
return details::distinct_t{};
}
}
6 changes: 5 additions & 1 deletion src/rpp/rpp/utils/constraints.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,14 @@ concept is_constructible_from = requires(Args... args)
{T{static_cast<Args&&>(args)...}} -> std::same_as<T>;
};


template<typename Ret, typename Fn, typename... Args>
concept invocable_r_v = std::invocable<Fn, Args...> && std::same_as<Ret, std::invoke_result_t<Fn, Args...>>;

template<typename Fn, typename... Args>
concept is_nothrow_invocable = std::is_nothrow_invocable_v<Fn, Args...>;

template<typename T>
concept hashable = requires(T a) {
{ std::hash<T>{}(a) } -> std::convertible_to<std::size_t>;
};
} // namespace rpp::constraint
2 changes: 1 addition & 1 deletion src/rpp/rpp/utils/tuple.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

namespace rpp::details
{
template<size_t index, typename T>
template<size_t, typename T>
class tuple_leaf
{
public:
Expand Down
69 changes: 69 additions & 0 deletions src/tests/rpp/test_distinct.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#include <snitch/snitch.hpp>

#include <rpp/operators/distinct.hpp>
#include <rpp/sources/empty.hpp>
#include <rpp/sources/error.hpp>
#include <rpp/sources/just.hpp>

#include "copy_count_tracker.hpp"
#include "disposable_observable.hpp"
#include "mock_observer.hpp"

TEMPLATE_TEST_CASE("distinct filters out repeated values and emit only items that have not already been emitted", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared)
{
auto mock = mock_observer_strategy<int>{};
auto obs = rpp::source::just<TestType>(1, 1, 2, 2, 3, 4, 4, 2, 2, 1, 3);

SECTION("WHEN subscribe on observable with duplicates via distinct THEN subscriber obtains values without duplicates")
{
obs | rpp::ops::distinct() | rpp::ops::subscribe(mock);
CHECK(mock.get_received_values() == std::vector{1, 2, 3, 4});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}

TEST_CASE("distinct forwards error")
{
auto mock = mock_observer_strategy<int>{};

rpp::source::error<int>({}) | rpp::operators::distinct() | rpp::ops::subscribe(mock);
CHECK(mock.get_received_values() == std::vector<int>{});
CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
}

TEST_CASE("distinct forwards completion")
{
auto mock = mock_observer_strategy<int>{};
rpp::source::empty<int>() | rpp::operators::distinct() | rpp::ops::subscribe(mock);
CHECK(mock.get_received_values() == std::vector<int>{});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}

TEST_CASE("distinct doesn't produce extra copies")
{
copy_count_tracker::test_operator(rpp::ops::distinct(),
Comment thread
AlexInLog marked this conversation as resolved.
{
.send_by_copy = {.copy_count = 2, // 1 copy on emission + 1 copy to final subscriber
.move_count = 0},
.send_by_move = {.copy_count = 1, // 1 copy to final subscriber
.move_count = 1} // 1 move on emission
});
}

TEST_CASE("distinct satisfies disposable contracts")
{
test_operator_with_disposable<int>(rpp::ops::distinct());
}
14 changes: 14 additions & 0 deletions src/tests/utils/copy_count_tracker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,17 @@ class copy_count_tracker
private:
std::shared_ptr<state> _state;
};

namespace std
{
// Make copy_count_tracker hashable for distinct operator
template<>
struct hash<copy_count_tracker>
{
size_t operator()(const copy_count_tracker& tracker) const noexcept
{
return std::hash<int>{}(tracker.get_copy_count())
^ (std::hash<int>{}(tracker.get_move_count()) << 1);
}
};
}