Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Implementation Status.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
### Filtering
- [x] filter
- [x] take
- [ ] debounce
- [x] debounce
- [ ] distinct
- [ ] distinct
- [x] distinct_until_changed
Expand Down
42 changes: 42 additions & 0 deletions src/examples/rpp/doxygen/debounce.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include <rpp/rpp.hpp>

#include <iostream>

/**
* \example debounce.cpp
**/
int main()
{
{
//! [debounce]
auto start = rpp::schedulers::clock_type::now();
rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 5, 6, 9, 10)
.flat_map([](int v)
{
return rpp::source::just(v)
.delay(std::chrono::milliseconds(500) * v, rpp::schedulers::current_thread{});
})
.tap([&](int v)
{
std::cout << "Sent value " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
})
.debounce(std::chrono::milliseconds{700}, rpp::schedulers::current_thread{})
.subscribe([](int v) { std::cout << "new value " << v << std::endl; },
[]() { std::cout << "completed" << std::endl; });


// Output:
// Sent value 1 at 504
// Sent value 2 at 1009
// new value 2
// Sent value 5 at 2505
// Sent value 6 at 3010
// new value 6
// Sent value 9 at 4507
// Sent value 10 at 5007
// new value 10
// completed
//! [debounce]
}
return 0;
}
1 change: 1 addition & 0 deletions src/rpp/rpp/observables/interface_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct RPP_EMPTY_BASES interface_observable
, details::member_overload<Type, SpecificObservable, details::combine_latest_tag>
, details::member_overload<Type, SpecificObservable, details::concat_tag>
, details::member_overload<Type, SpecificObservable, details::delay_tag>
, details::member_overload<Type, SpecificObservable, details::debounce_tag>
, details::member_overload<Type, SpecificObservable, details::distinct_until_changed_tag>
, details::member_overload<Type, SpecificObservable, details::do_tag>
, details::member_overload<Type, SpecificObservable, details::filter_tag>
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* \ingroup operators
*/

#include <rpp/operators/debounce.hpp>
#include <rpp/operators/distinct_until_changed.hpp>
#include <rpp/operators/filter.hpp>
#include <rpp/operators/first.hpp>
Expand Down
162 changes: 162 additions & 0 deletions src/rpp/rpp/operators/debounce.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/details/early_unsubscribe.hpp>
#include <rpp/operators/details/serialized_subscriber.hpp>
#include <rpp/operators/details/subscriber_with_state.hpp>
#include <rpp/operators/fwd/debounce.hpp>
#include <rpp/subscribers/constraints.hpp>

#include <rpp/utils/spinlock.hpp>

#include <type_traits>
#include <variant>

IMPLEMENTATION_FILE(debounce_tag);

namespace rpp::details
{
template<typename T, typename Scheduler>
class debounce_state : public early_unsubscribe_state
{
public:
debounce_state(schedulers::duration period, const Scheduler& scheduler, const composite_subscription& subscription_of_subscriber)
: early_unsubscribe_state(subscription_of_subscriber)
, m_period{period}
, m_worker{scheduler.create_worker(children_subscriptions)} {}

std::optional<schedulers::time_point> emplace_safe(auto&& v)
{
std::lock_guard lock{m_mutex};
m_value_to_be_emitted.emplace(std::forward<decltype(v)>(v));
const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value();
m_time_when_value_should_be_emitted = m_worker.now() + m_period;
return need_to_scheduled ? m_time_when_value_should_be_emitted : std::optional<schedulers::time_point>{};
}

std::variant<std::monostate, T, schedulers::duration> extract_value_or_time()
{
std::lock_guard lock{m_mutex};
if (!m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value())
return std::monostate{};

const auto now = m_worker.now();
if (m_time_when_value_should_be_emitted > now)
return m_time_when_value_should_be_emitted.value() - now;

m_time_when_value_should_be_emitted.reset();
auto v = std::move(m_value_to_be_emitted).value();
m_value_to_be_emitted.reset();
return v;
}

std::optional<T> extract_value()
{
std::lock_guard lock{m_mutex};
std::optional<T> res{};
m_value_to_be_emitted.swap(res);
return res;
}

using Worker = decltype(std::declval<Scheduler>().create_worker(std::declval<composite_subscription>()));
const Worker& get_worker() const { return m_worker; }

private:
schedulers::duration m_period;
Worker m_worker;
std::mutex m_mutex{};
std::optional<schedulers::time_point> m_time_when_value_should_be_emitted{};
std::optional<T> m_value_to_be_emitted{};
};

struct debounce_on_next
{
template<typename Value>
void operator()(Value&& v, const auto& state_ptr) const
{
if (const auto time_to_schedule = state_ptr->emplace_safe(std::forward<Value>(v)))
{
state_ptr->get_worker().schedule(time_to_schedule.value(),
[state_ptr]() mutable -> schedulers::optional_duration
{
auto value_or_duration = state_ptr->extract_value_or_time();
if (auto* duration = std::get_if<schedulers::duration>(&value_or_duration))
return *duration;

if (auto* value = std::get_if<std::decay_t<Value>>(&value_or_duration))
state_ptr->subscriber.on_next(std::move(*value));

return std::nullopt;
});
}
}
};

struct debounce_on_error
{
void operator()(const std::exception_ptr& err, const auto& state) const
{
state->children_subscriptions.unsubscribe();
state->subscriber.on_error(err);
}
};

struct debounce_on_completed
{
void operator()(const auto& state_ptr) const
{
state_ptr->children_subscriptions.unsubscribe();

if (auto v = state_ptr->extract_value())
state_ptr->subscriber.on_next(std::move(v.value()));

state_ptr->subscriber.on_completed();
}
};

template<typename T, typename Scheduler, typename TSub>
struct debounce_state_with_serialized_spinlock : debounce_state<T, Scheduler>
{
debounce_state_with_serialized_spinlock(auto&& sub,
schedulers::duration period,
const Scheduler& scheduler,
const composite_subscription& subscription_of_subscriber)
: debounce_state<T, Scheduler>{std::move(period), scheduler, subscription_of_subscriber}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ performance-move-const-arg ⚠️
std::move of the variable period of the trivially-copyable type schedulers::duration (aka duration<long, ratio<1, 1000000000>>) has no effect; remove std::move()

Suggested change
: debounce_state<T, Scheduler>{std::move(period), scheduler, subscription_of_subscriber}
: debounce_state<T, Scheduler>{eriod), scheduler, subscription_of_subscriber}

, subscriber(make_serialized_subscriber(std::forward<decltype(sub)>(sub), std::ref(spinlock))) {}

// spinlock because most part of time there is only one thread would be active
utils::spinlock spinlock{};

using InnerSub = decltype(make_serialized_subscriber(std::declval<TSub>(), std::declval<std::reference_wrapper<utils::spinlock>>()));
InnerSub subscriber;
};

template<constraint::decayed_type Type,schedulers::constraint::scheduler TScheduler>
struct debounce_impl
{
schedulers::duration period;
TScheduler scheduler;

template<constraint::subscriber_of_type<Type> TSub>
auto operator()(TSub&& in_subscriber) const
{
auto state = std::make_shared<debounce_state_with_serialized_spinlock<Type, TScheduler, std::decay_t<TSub>>>(std::forward<TSub>(in_subscriber), period, scheduler, in_subscriber.get_subscription());

return create_subscriber_with_state<Type>(state->children_subscriptions,
debounce_on_next{},
debounce_on_error{},
debounce_on_completed{},
std::move(state));
}
};
} // namespace rpp::details
45 changes: 34 additions & 11 deletions src/rpp/rpp/operators/details/serialized_subscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,44 @@

namespace rpp::details
{
template<typename T>
auto lock(const std::shared_ptr<T>& ptr)
{
return std::lock_guard{*ptr};
}

template<typename T>
auto lock(const std::reference_wrapper<T>& ref)
{
return std::lock_guard{ref.get()};
}

struct forwarding_on_next_under_lock
{
template<typename T, typename TSerializationPrimitive>
void operator()(T&& v, const auto& subscriber, const std::shared_ptr<TSerializationPrimitive>& primitive) const
template<typename T>
void operator()(T&& v, const auto& subscriber, const auto& primitive) const
{
std::lock_guard lock{*primitive};
auto lock_guard = lock(primitive);
subscriber.on_next(std::forward<T>(v));
}
};

struct forwarding_on_error_under_lock
{
template<typename TSerializationPrimitive>
void operator()(const std::exception_ptr& err,
const auto& subscriber,
const std::shared_ptr<TSerializationPrimitive>& primitive) const
void operator()(const std::exception_ptr& err,
const auto& subscriber,
const auto& primitive) const
{
std::lock_guard lock{*primitive};
auto lock_guard = lock(primitive);
subscriber.on_error(err);
}
};

struct forwarding_on_completed_under_lock
{
template<typename TSerializationPrimitive>
void operator()(const auto& subscriber, const std::shared_ptr<TSerializationPrimitive>& primitive) const
void operator()(const auto& subscriber, const auto& primitive) const
{
std::lock_guard lock{*primitive};
auto lock_guard = lock(primitive);
subscriber.on_completed();
}
};
Expand All @@ -62,4 +72,17 @@ auto make_serialized_subscriber(TSub&&
std::forward<TSub>(subscriber),
primitive);
}

template<typename TSerializationPrimitive, constraint::subscriber TSub>
auto make_serialized_subscriber(TSub&& subscriber,
std::reference_wrapper<TSerializationPrimitive> primitive)
{
auto sub = subscriber.get_subscription();
return create_subscriber_with_state<utils::extract_subscriber_type_t<std::decay_t<TSub>>>(std::move(sub),
forwarding_on_next_under_lock{},
forwarding_on_error_under_lock{},
forwarding_on_completed_under_lock{},
std::forward<TSub>(subscriber),
primitive);
}
} // namespace rpp::details
1 change: 1 addition & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <rpp/operators/fwd/buffer.hpp>
#include <rpp/operators/fwd/combine_latest.hpp>
#include <rpp/operators/fwd/concat.hpp>
#include <rpp/operators/fwd/debounce.hpp>
#include <rpp/operators/fwd/delay.hpp>
#include <rpp/operators/fwd/distinct_until_changed.hpp>
#include <rpp/operators/fwd/do.hpp>
Expand Down
60 changes: 60 additions & 0 deletions src/rpp/rpp/operators/fwd/debounce.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/schedulers/constraints.hpp>
#include <rpp/observables/details/member_overload.hpp>

namespace rpp::details
{
struct debounce_tag;
}

namespace rpp::details
{
template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
struct debounce_impl;

template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, debounce_tag>
{
/**
* \brief Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset.
*
* \marble debounce
{
source observable : +--1-2-----3---|
operator "debounce(4)" : +--------2-----3|
}
* \param period is duration of time should be passed since emission from original observable without any new emissions to emit this emission.
* \param scheduler is scheduler used to run timer for debounce
* \return new specific_observable with the debounce operator as most recent operator.
* \warning #include <rpp/operators/debounce.hpp>
*
* \par Example
* \snippet debounce.cpp debounce
*
* \ingroup utility_operators
* \see https://reactivex.io/documentation/operators/debounce.html
*/
template<schedulers::constraint::scheduler TScheduler>
auto debounce(schedulers::duration period,const TScheduler& scheduler = TScheduler{}) const & requires is_header_included<debounce_tag, TScheduler>
{
return static_cast<const SpecificObservable*>(this)->template lift<Type>(debounce_impl<Type, TScheduler>{period, scheduler});
}

template<schedulers::constraint::scheduler TScheduler>
auto debounce(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) && requires is_header_included<debounce_tag, TScheduler>
{
return std::move(*static_cast<SpecificObservable*>(this)).template lift<Type>(debounce_impl<Type, TScheduler>{period, scheduler});
}
};
} // namespace rpp::details
Loading