Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/combine_latest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ namespace rpp::operators::details

private:
template<typename TDisposable>
static void apply_impl(const TDisposable& disposable, const pointer_under_lock<Observer>& observer, const std::optional<Args>&... vals)
static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock<Observer>& observer, const std::optional<Args>&... vals)
{
if ((vals.has_value() && ...))
observer->on_next(disposable->get_selector()(vals.value()...));
Expand Down
12 changes: 6 additions & 6 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#include <rpp/disposables/refcount_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/utils/utils.hpp>

#include <cassert>
#include <queue>
Expand Down Expand Up @@ -42,8 +42,8 @@ namespace rpp::operators::details
{
}

pointer_under_lock<TObserver> get_observer() { return pointer_under_lock{m_observer}; }
pointer_under_lock<std::queue<TObservable>> get_queue() { return pointer_under_lock{m_queue}; }
rpp::utils::pointer_under_lock<TObserver> get_observer() { return m_observer; }
rpp::utils::pointer_under_lock<std::queue<TObservable>> get_queue() { return m_queue; }

std::atomic<ConcatStage>& stage() { return m_stage; }

Expand Down Expand Up @@ -97,9 +97,9 @@ namespace rpp::operators::details
}

private:
value_with_mutex<TObserver> m_observer;
value_with_mutex<std::queue<TObservable>> m_queue;
std::atomic<ConcatStage> m_stage{};
rpp::utils::value_with_mutex<TObserver> m_observer;
rpp::utils::value_with_mutex<std::queue<TObservable>> m_queue;
std::atomic<ConcatStage> m_stage{};
};

template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
Expand Down
10 changes: 5 additions & 5 deletions src/rpp/rpp/operators/debounce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/utils/utils.hpp>

namespace rpp::operators::details
{
Expand Down Expand Up @@ -69,7 +69,7 @@ namespace rpp::operators::details
return std::exchange(m_value_to_be_emitted, std::optional<T>{});
}

pointer_under_lock<Observer> get_observer_under_lock() { return pointer_under_lock{m_observer}; }
rpp::utils::pointer_under_lock<Observer> get_observer_under_lock() { return m_observer; }

private:
void schedule()
Expand Down Expand Up @@ -104,9 +104,9 @@ namespace rpp::operators::details
return v;
}

value_with_mutex<Observer> m_observer;
RPP_NO_UNIQUE_ADDRESS Worker m_worker;
rpp::schedulers::duration m_period;
rpp::utils::value_with_mutex<Observer> m_observer;
RPP_NO_UNIQUE_ADDRESS Worker m_worker;
rpp::schedulers::duration m_period;

std::mutex m_mutex{};
std::optional<schedulers::time_point> m_time_when_value_should_be_emitted{};
Expand Down
6 changes: 3 additions & 3 deletions src/rpp/rpp/operators/details/combining_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
#include <rpp/defs.hpp>
#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/schedulers/current_thread.hpp>
#include <rpp/utils/utils.hpp>

#include <memory>

Expand All @@ -31,7 +31,7 @@ namespace rpp::operators::details
{
}

pointer_under_lock<Observer> get_observer_under_lock() { return pointer_under_lock{m_observer_with_mutex}; }
rpp::utils::pointer_under_lock<Observer> get_observer_under_lock() { return m_observer_with_mutex; }

bool decrement_on_completed()
{
Expand All @@ -40,7 +40,7 @@ namespace rpp::operators::details
}

private:
value_with_mutex<Observer> m_observer_with_mutex{};
rpp::utils::value_with_mutex<Observer> m_observer_with_mutex{};

std::atomic_size_t m_on_completed_needed{sizeof...(Args)};
};
Expand Down
59 changes: 0 additions & 59 deletions src/rpp/rpp/operators/details/utils.hpp

This file was deleted.

1 change: 0 additions & 1 deletion src/rpp/rpp/operators/group_by.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <rpp/disposables/refcount_disposable.hpp>
#include <rpp/observables/grouped_observable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/subjects/publish_subject.hpp>
#include <rpp/utils/function_traits.hpp>

Expand Down
8 changes: 4 additions & 4 deletions src/rpp/rpp/operators/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
#include <rpp/defs.hpp>
#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/schedulers/current_thread.hpp>
#include <rpp/utils/tuple.hpp>
#include <rpp/utils/utils.hpp>

#include <atomic>

Expand All @@ -38,11 +38,11 @@ namespace rpp::operators::details
// just need atomicity, not guarding anything
bool decrement_on_completed() { return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; }

pointer_under_lock<TObserver> get_observer_under_lock() { return pointer_under_lock{m_observer}; }
rpp::utils::pointer_under_lock<TObserver> get_observer_under_lock() { return m_observer; }

private:
value_with_mutex<TObserver> m_observer{};
std::atomic_size_t m_on_completed_needed{1};
rpp::utils::value_with_mutex<TObserver> m_observer{};
std::atomic_size_t m_on_completed_needed{1};
};

template<rpp::constraint::observer TObserver>
Expand Down
8 changes: 4 additions & 4 deletions src/rpp/rpp/operators/switch_on_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <rpp/defs.hpp>
#include <rpp/disposables/refcount_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/utils/utils.hpp>

namespace rpp::operators::details
{
Expand All @@ -33,13 +33,13 @@ namespace rpp::operators::details
switch_on_next_state_t(const switch_on_next_state_t&) = delete;
switch_on_next_state_t(switch_on_next_state_t&&) noexcept = delete;

pointer_under_lock<TObserver> get_observer()
rpp::utils::pointer_under_lock<TObserver> get_observer()
{
return pointer_under_lock{m_observer_with_mutex};
return m_observer_with_mutex;
}

private:
value_with_mutex<TObserver> m_observer_with_mutex{};
rpp::utils::value_with_mutex<TObserver> m_observer_with_mutex{};
};

template<rpp::constraint::observer TObserver>
Expand Down
6 changes: 3 additions & 3 deletions src/rpp/rpp/operators/take_until.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

#include <rpp/defs.hpp>
#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/schedulers/current_thread.hpp>
#include <rpp/utils/utils.hpp>

namespace rpp::operators::details
{
Expand All @@ -33,10 +33,10 @@ namespace rpp::operators::details
{
}

pointer_under_lock<TObserver> get_observer() { return pointer_under_lock{m_observer_with_mutex}; }
rpp::utils::pointer_under_lock<TObserver> get_observer() { return m_observer_with_mutex; }

private:
value_with_mutex<TObserver> m_observer_with_mutex{};
rpp::utils::value_with_mutex<TObserver> m_observer_with_mutex{};
};

template<rpp::constraint::observer TObserver>
Expand Down
6 changes: 3 additions & 3 deletions src/rpp/rpp/operators/timeout.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
#include <rpp/operators/fwd.hpp>

#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/sources/error.hpp>
#include <rpp/utils/exceptions.hpp>
#include <rpp/utils/utils.hpp>

namespace rpp::operators::details
{
Expand All @@ -35,14 +35,14 @@ namespace rpp::operators::details
, m_fallback{fallback}
{
}
pointer_under_lock<observer_with_timeout> get_observer_with_timeout_under_lock() { return pointer_under_lock{m_observer_with_timeout}; }
rpp::utils::pointer_under_lock<observer_with_timeout> get_observer_with_timeout_under_lock() { return m_observer_with_timeout; }

const TFallbackObservable& get_fallback() const { return m_fallback; }

rpp::schedulers::duration get_period() const { return m_period; }

private:
value_with_mutex<observer_with_timeout> m_observer_with_timeout;
rpp::utils::value_with_mutex<observer_with_timeout> m_observer_with_timeout;

const rpp::schedulers::duration m_period;
const TFallbackObservable m_fallback;
Expand Down
8 changes: 4 additions & 4 deletions src/rpp/rpp/operators/window_toggle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include <rpp/disposables/refcount_disposable.hpp>
#include <rpp/operators/details/forwarding_subject.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/schedulers/current_thread.hpp>
#include <rpp/utils/utils.hpp>

#include <list>

Expand Down Expand Up @@ -50,7 +50,7 @@ namespace rpp::operators::details
{
}

rpp::operators::details::pointer_under_lock<state_t> get_state_under_lock() { return rpp::operators::details::pointer_under_lock<state_t>{m_state}; }
rpp::utils::pointer_under_lock<state_t> get_state_under_lock() { return rpp::utils::pointer_under_lock<state_t>{m_state}; }

template<typename T>
auto get_closing(T&& v) const
Expand All @@ -67,8 +67,8 @@ namespace rpp::operators::details
}

private:
rpp::operators::details::value_with_mutex<state_t> m_state{};
RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn m_closings;
rpp::utils::value_with_mutex<state_t> m_state{};
RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn m_closings;
};

template<rpp::constraint::decayed_type TState>
Expand Down
25 changes: 12 additions & 13 deletions src/rpp/rpp/operators/with_latest_from.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
#include <rpp/defs.hpp>
#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/schedulers/current_thread.hpp>
#include <rpp/utils/utils.hpp>

#include <memory>

Expand All @@ -32,16 +32,16 @@ namespace rpp::operators::details
{
}

pointer_under_lock<Observer> get_observer_under_lock() { return pointer_under_lock{observer_with_mutex}; }
rpp::utils::pointer_under_lock<Observer> get_observer_under_lock() { return observer_with_mutex; }

rpp::utils::tuple<value_with_mutex<std::optional<RestArgs>>...>& get_values() { return values; }
rpp::utils::tuple<rpp::utils::value_with_mutex<std::optional<RestArgs>>...>& get_values() { return values; }

const TSelector& get_selector() const { return selector; }

private:
value_with_mutex<Observer> observer_with_mutex{};
rpp::utils::tuple<value_with_mutex<std::optional<RestArgs>>...> values{};
RPP_NO_UNIQUE_ADDRESS TSelector selector;
rpp::utils::value_with_mutex<Observer> observer_with_mutex{};
rpp::utils::tuple<rpp::utils::value_with_mutex<std::optional<RestArgs>>...> values{};
RPP_NO_UNIQUE_ADDRESS TSelector selector;
};

template<size_t I, rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... RestArgs>
Expand All @@ -62,9 +62,8 @@ namespace rpp::operators::details
template<typename T>
void on_next(T&& v) const
{
auto& [value, mutex] = disposable->get_values().template get<I>();
std::scoped_lock lock{mutex};
value.emplace(std::forward<T>(v));
auto locked_value = disposable->get_values().template get<I>().lock();
locked_value->emplace(std::forward<T>(v));
}

void on_error(const std::exception_ptr& err) const
Expand Down Expand Up @@ -99,11 +98,11 @@ namespace rpp::operators::details
template<typename T>
void on_next(T&& v) const
{
auto result = disposable->get_values().apply([this, &v](value_with_mutex<std::optional<RestArgs>>&... vals) -> std::optional<Result> {
auto lock = std::scoped_lock{vals.mutex...};
auto result = disposable->get_values().apply([this, &v](rpp::utils::value_with_mutex<std::optional<RestArgs>>&... vals) -> std::optional<Result> {
auto lock = std::scoped_lock{vals.get_mutex()...};

if ((vals.value.has_value() && ...))
return disposable->get_selector()(rpp::utils::as_const(std::forward<T>(v)), rpp::utils::as_const(vals.value.value())...);
if ((vals.get_value_unsafe().has_value() && ...))
return disposable->get_selector()(rpp::utils::as_const(std::forward<T>(v)), rpp::utils::as_const(vals.get_value_unsafe().value())...);
return std::nullopt;
});

Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/zip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ namespace rpp::operators::details

private:
template<typename TDisposable>
static void apply_impl(const TDisposable& disposable, const pointer_under_lock<Observer>& observer, std::deque<Args>&... values)
static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock<Observer>& observer, std::deque<Args>&... values)
{
if ((!values.empty() && ...))
{
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/subjects.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
* \ingroup rpp
*/

#include <rpp/subjects/behavior_subject.hpp>
#include <rpp/subjects/publish_subject.hpp>
#include <rpp/subjects/replay_subject.hpp>
Loading