Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/Implementation Status.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
- [ ] ignore_elements
- [x] last
- [ ] sample
- [ ] sample (observable)
- [x] sample_with_time
- [x] skip
- [ ] skip_last
- [x] take_last
Expand Down
17 changes: 17 additions & 0 deletions src/examples/doxygen/sample.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#include <rpp/rpp.hpp>
#include <iostream>

/**
* \example sample.cpp
**/
int main()
{
//! [sample_with_time]
rpp::source::interval(std::chrono::milliseconds{100}, rpp::schedulers::trampoline{})
.sample_with_time(std::chrono::milliseconds{300}, rpp::schedulers::trampoline{})
.take(5)
.subscribe([](int v) { std::cout << v << " "; });
// Output: 1 4 7 10 13
//! [sample_with_time]
return 0;
}
1 change: 1 addition & 0 deletions src/rpp/rpp/observables/interface_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct RPP_EMPTY_BASES interface_observable
, details::member_overload<Type, SpecificObservable, details::observe_on_tag>
, details::member_overload<Type, SpecificObservable, details::publish_tag>
, details::member_overload<Type, SpecificObservable, details::repeat_tag>
, details::member_overload<Type, SpecificObservable, details::sample_tag>
, details::member_overload<Type, SpecificObservable, details::scan_tag>
, details::member_overload<Type, SpecificObservable, details::skip_tag>
, details::member_overload<Type, SpecificObservable, details::start_with_tag>
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <rpp/operators/filter.hpp>
#include <rpp/operators/first.hpp>
#include <rpp/operators/last.hpp>
#include <rpp/operators/sample.hpp>
#include <rpp/operators/skip.hpp>
#include <rpp/operators/take.hpp>
#include <rpp/operators/take_last.hpp>
Expand Down
8 changes: 3 additions & 5 deletions src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,14 @@ struct delay_impl
template<constraint::subscriber_of_type<Type> TSub>
auto operator()(TSub&& subscriber) const
{
// convert it to dynamic due to expected amount of copies == amount of items
auto dynamic_subscriber = std::forward<TSub>(subscriber).as_dynamic();
auto worker = scheduler.create_worker(dynamic_subscriber.get_subscription());
auto worker = scheduler.create_worker(subscriber.get_subscription());

auto subscription = dynamic_subscriber.get_subscription().make_child();
auto subscription = subscriber.get_subscription().make_child();
return create_subscriber_with_state<Type>(std::move(subscription),
delay_on_next{delay},
delay_on_error{},
delay_on_completed{delay},
std::move(dynamic_subscriber),
std::forward<TSub>(subscriber),
std::move(worker));
}
};
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <rpp/operators/fwd/publish.hpp>
#include <rpp/operators/fwd/ref_count.hpp>
#include <rpp/operators/fwd/repeat.hpp>
#include <rpp/operators/fwd/sample.hpp>
#include <rpp/operators/fwd/scan.hpp>
#include <rpp/operators/fwd/skip.hpp>
#include <rpp/operators/fwd/start_with.hpp>
Expand Down
63 changes: 63 additions & 0 deletions src/rpp/rpp/operators/fwd/sample.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/observables/details/member_overload.hpp>
#include <rpp/schedulers/constraints.hpp>
#include <rpp/schedulers/trampoline_scheduler.hpp>

namespace rpp::details
{
struct sample_tag;
}

namespace rpp::details
{
template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
struct sample_with_time_impl;

template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, sample_tag>
{
/**
* \brief Emit most recent emitted from original observable emission obtained during last period of time.
* \details Emit item immediately in case of completion of the original observable
*
* \marble sample_with_time
{
source observable : +--1---2-3-4---5-6-7-|
operator "sample_with_time(2)" : +--1---2---4---5---7-|
}
*
* \param period sampling period
* \scheduler scheduler to use to schedule emissions with provided sampling period
* \return new specific_observable with the sample_with_time operator as most recent operator.
* \warning #include <rpp/operators/sample.hpp>
*
* \par Example
* \snippet sample.cpp sample_with_time
*
* \ingroup filtering_operators
* \see https://reactivex.io/documentation/operators/sample.htmlhttps://reactivex.io/documentation/operators/sample.html
*/
template<schedulers::constraint::scheduler TScheduler, typename ...Args>
auto sample_with_time(schedulers::duration period, const TScheduler& scheduler) const & requires is_header_included<sample_tag, TScheduler, Args...>
{
return static_cast<const SpecificObservable*>(this)->template lift<Type>(sample_with_time_impl<Type, TScheduler>{period, scheduler});
}

template<schedulers::constraint::scheduler TScheduler, typename ...Args>
auto sample_with_time(schedulers::duration period, const TScheduler& scheduler) && requires is_header_included<sample_tag, TScheduler, Args...>
{
return std::move(*static_cast<SpecificObservable*>(this)).template lift<Type>(sample_with_time_impl<Type, TScheduler>{period, scheduler});
}
};
} // namespace rpp::details
106 changes: 106 additions & 0 deletions src/rpp/rpp/operators/sample.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/operators/details/early_unsubscribe.hpp>
#include <rpp/operators/details/serialized_subscriber.hpp>
#include <rpp/operators/fwd/sample.hpp>
#include <rpp/subscribers/constraints.hpp>
#include <rpp/utils/spinlock.hpp>

#include <mutex>
#include <optional>

IMPLEMENTATION_FILE(sample_tag);

namespace rpp::details
{
template<constraint::decayed_type Type>
struct sample_state : early_unsubscribe_state
{
using early_unsubscribe_state::early_unsubscribe_state;

std::mutex value_mutex{};
std::optional<Type> value{};
};

template<constraint::decayed_type Type>
struct sample_state_with_serialized_spinlock : sample_state<Type>
{
using sample_state<Type>::sample_state;

utils::spinlock spinlock{};
};

struct sample_on_next
{
template<typename Value>
void operator()(Value&& value, const auto&, const std::shared_ptr<sample_state<std::decay_t<Value>>>& state) const
{
std::lock_guard lock{state->value_mutex};
state->value.emplace(std::forward<Value>(value));
}
};

using sample_on_error = early_unsubscribe_on_error;

struct sample_on_completed
{
void operator()(const auto& subscriber, const auto& state) const
{
state->children_subscriptions.unsubscribe();

{
std::lock_guard lock{state->value_mutex};
if (state->value.has_value())
subscriber.on_next(std::move(state->value.value()));
}
subscriber.on_completed();
}
};

template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
struct sample_with_time_impl
{
schedulers::duration period;
TScheduler scheduler;

template<constraint::subscriber_of_type<Type> TSub>
auto operator()(TSub&& in_subscriber) const
{
auto state = std::make_shared<sample_state_with_serialized_spinlock<Type>>(in_subscriber.get_subscription());
// change subscriber to serialized to avoid manual using of mutex
auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber),
std::shared_ptr<utils::spinlock>{state, &state->spinlock});

scheduler.create_worker(state->children_subscriptions)
.schedule(period,
[period = period, subscriber = subscriber, state]() -> rpp::schedulers::optional_duration
{
std::optional<Type> extracted{};
{
std::lock_guard lock{state->value_mutex};
std::swap(extracted, state->value);
}
if (extracted.has_value())
subscriber.on_next(std::move(extracted.value()));
return period;
});

return create_subscriber_with_state<Type>(state->children_subscriptions,
sample_on_next{},
sample_on_error{},
sample_on_completed{},
std::move(subscriber),
std::move(state));
}
};
} // namespace rpp::details
13 changes: 11 additions & 2 deletions src/rpp/rpp/utils/spinlock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#pragma once

#include <atomic>
#include <thread>

namespace rpp::utils
{
Expand All @@ -19,9 +21,16 @@ class spinlock

void lock()
{
while(m_lock_flag.exchange(true, std::memory_order_acquire))
while (m_lock_flag.exchange(true, std::memory_order_acquire))
{
while(m_lock_flag.load(std::memory_order_relaxed)){};
for (uint8_t i = 0; m_lock_flag.load(std::memory_order_relaxed); ++i)
{
if (i == 30u)
{
std::this_thread::yield();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no member named this_thread in namespace std

i = 0;
}
}
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/tests/test_interval.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ SCENARIO("interval emit values with provided interval", "[interval]")
initial_time + 3*interval});
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval,
initial_time + 2*interval,
initial_time + 3*interval,
initial_time + 4*interval});
initial_time + 3*interval});
}
}
}
Expand Down Expand Up @@ -113,8 +112,7 @@ SCENARIO("interval emit values with provided interval", "[interval]")
{
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + initial_delay,
initial_time + initial_delay + interval,
initial_time + initial_delay + 2 * interval,
initial_time + initial_delay + 3 * interval });
initial_time + initial_delay + 2 * interval });
CHECK(scheduler.get_executions() == std::vector{ initial_time + initial_delay,
initial_time + initial_delay + interval,
initial_time + initial_delay + 2 * interval });
Expand Down
Loading