Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/rpp/rpp/observables/details/chain_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <rpp/observers/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/schedulers/current_thread.hpp>

namespace rpp
{
Expand All @@ -21,9 +22,11 @@ namespace rpp
{
using base = observable_chain_strategy<TStrategies...>;

using operator_traits = typename TStrategy::template operator_traits<typename base::value_type>;

public:
using expected_disposable_strategy = details::observables::deduce_updated_disposable_strategy<TStrategy, typename base::expected_disposable_strategy>;
using value_type = typename TStrategy::template operator_traits<typename base::value_type>::result_type;
using value_type = typename operator_traits::result_type;

observable_chain_strategy(const TStrategy& strategy, const TStrategies&... strategies)
: m_strategy(strategy)
Expand All @@ -40,6 +43,8 @@ namespace rpp
template<rpp::constraint::observer_of_type<value_type> Observer>
void subscribe(Observer&& observer) const
{
[[maybe_unused]] const auto drain_on_exit = own_current_thread_if_needed();

if constexpr (rpp::constraint::operator_lift_with_disposable_strategy<TStrategy, typename base::value_type, typename base::expected_disposable_strategy>)
m_strategies.subscribe(m_strategy.template lift_with_disposable_strategy<typename base::value_type, typename base::expected_disposable_strategy>(std::forward<Observer>(observer)));
else if constexpr (rpp::constraint::operator_lift<TStrategy, typename base::value_type>)
Expand All @@ -48,6 +53,15 @@ namespace rpp
m_strategy.subscribe(std::forward<Observer>(observer), m_strategies);
}

private:
static auto own_current_thread_if_needed()
{
if constexpr (requires { requires operator_traits::own_current_queue; })
return rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
else
return rpp::utils::none{};
}

private:
RPP_NO_UNIQUE_ADDRESS TStrategy m_strategy;
RPP_NO_UNIQUE_ADDRESS observable_chain_strategy<TStrategies...> m_strategies;
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ namespace rpp::operators::details

struct buffer_t : lift_operator<buffer_t, size_t>
{
using lift_operator<buffer_t, size_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ namespace rpp::operators::details

struct concat_t : lift_operator<concat_t>
{
using lift_operator<concat_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
21 changes: 10 additions & 11 deletions src/rpp/rpp/operators/details/combining_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,32 +88,31 @@ namespace rpp::operators::details
static_assert(std::invocable<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>, "Selector is not callable with passed T type");

using result_type = std::invoke_result_t<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>;

constexpr static bool own_current_queue = true;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;

template<rpp::constraint::observer Observer, typename... Strategies>
void subscribe(Observer&& observer, const observable_chain_strategy<Strategies...>& observable_strategy) const
template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer>
auto lift(Observer&& observer) const
{
// Need to take ownership over current_thread in case of inner-observables also using it
auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
observables.apply(&subscribe_impl<Observer, Strategies...>, std::forward<Observer>(observer), observable_strategy, selector);
return observables.apply(&subscribe_impl<Type, Observer>, std::forward<Observer>(observer), selector);
}

private:
template<rpp::constraint::observer Observer, typename... Strategies>
static void subscribe_impl(Observer&& observer, const observable_chain_strategy<Strategies...>& observable_strategy, const TSelector& selector, const TObservables&... observables)
template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer>
static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables)
{
using ExpectedValue = typename observable_chain_strategy<Strategies...>::value_type;
using Disposable = TDisposable<Observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>;
using Disposable = TDisposable<Observer, TSelector, Type, rpp::utils::extract_observable_type_t<TObservables>...>;

const auto disposable = disposable_wrapper_impl<Disposable>::make(std::forward<Observer>(observer), selector);
auto locked = disposable.lock();
locked->get_observer_under_lock()->set_upstream(disposable.as_weak());
subscribe<std::decay_t<ExpectedValue>>(locked, std::index_sequence_for<TObservables...>{}, observables...);
subscribe<std::decay_t<Type>>(locked, std::index_sequence_for<TObservables...>{}, observables...);

observable_strategy.subscribe(rpp::observer<ExpectedValue, TStrategy<0, std::decay_t<Observer>, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>>{std::move(locked)});
return rpp::observer<Type, TStrategy<0, std::decay_t<Observer>, TSelector, Type, rpp::utils::extract_observable_type_t<TObservables>...>>{std::move(locked)};
}

template<typename ExpectedValue, rpp::constraint::observer Observer, size_t... I>
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/distinct.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ namespace rpp::operators::details

struct distinct_t : lift_operator<distinct_t>
{
using lift_operator<distinct_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/distinct_until_changed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ namespace rpp::operators::details
template<rpp::constraint::decayed_type EqualityFn>
struct distinct_until_changed_t : public operators::details::lift_operator<distinct_until_changed_t<EqualityFn>, EqualityFn>
{
using operators::details::lift_operator<distinct_until_changed_t<EqualityFn>, EqualityFn>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ namespace rpp::operators::details
template<rpp::constraint::decayed_type Fn>
struct filter_t : lift_operator<filter_t<Fn>, Fn>
{
using lift_operator<filter_t<Fn>, Fn>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/first.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ namespace rpp::operators::details

struct first_t : lift_operator<first_t>
{
using lift_operator<first_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/last.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ namespace rpp::operators::details

struct last_t : lift_operator<last_t>
{
using lift_operator<last_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ namespace rpp::operators::details
template<rpp::constraint::decayed_type Fn>
struct map_t : lift_operator<map_t<Fn>, Fn>
{
using lift_operator<map_t<Fn>, Fn>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
20 changes: 8 additions & 12 deletions src/rpp/rpp/operators/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,29 +131,25 @@ namespace rpp::operators::details
}
};

struct merge_t
struct merge_t : lift_operator<merge_t>
{
using lift_operator<merge_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
static_assert(rpp::constraint::observable<T>, "T is not observable");

using result_type = rpp::utils::extract_observable_type_t<T>;

constexpr static bool own_current_queue = true;

template<rpp::constraint::observer_of_type<result_type> TObserver>
using observer_strategy = merge_observer_strategy<std::decay_t<TObserver>>;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;

template<rpp::constraint::observer Observer, typename... Strategies>
void subscribe(Observer&& observer, const observable_chain_strategy<Strategies...>& strategy) const
{
// Need to take ownership over current_thread in case of inner-observables also using it
auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();

using InnerObservable = typename observable_chain_strategy<Strategies...>::value_type;

strategy.subscribe(rpp::observer<InnerObservable, merge_observer_strategy<std::decay_t<Observer>>>{std::forward<Observer>(observer)});
}
};

template<rpp::constraint::observable... TObservables>
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/on_error_resume_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ namespace rpp::operators::details
template<rpp::constraint::decayed_type Selector>
struct on_error_resume_next_t : lift_operator<on_error_resume_next_t<Selector>, Selector>
{
using lift_operator<on_error_resume_next_t<Selector>, Selector>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/reduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ namespace rpp::operators::details
template<rpp::constraint::decayed_type Accumulator>
struct reduce_no_seed_t : lift_operator<reduce_no_seed_t<Accumulator>, Accumulator>
{
using lift_operator<reduce_no_seed_t<Accumulator>, Accumulator>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ namespace rpp::operators::details
template<rpp::constraint::decayed_type Fn>
struct scan_no_seed_t : lift_operator<scan_no_seed_t<Fn>, Fn>
{
using lift_operator<scan_no_seed_t<Fn>, Fn>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/skip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ namespace rpp::operators::details

struct skip_t : lift_operator<skip_t, size_t>
{
using lift_operator<skip_t, size_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/switch_on_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ namespace rpp::operators::details

struct switch_on_next_t : lift_operator<switch_on_next_t>
{
using lift_operator<switch_on_next_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/take.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ namespace rpp::operators::details

struct take_t : lift_operator<take_t, size_t>
{
using lift_operator<take_t, size_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/take_last.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ namespace rpp::operators::details

struct take_last_t : lift_operator<take_last_t, size_t>
{
using lift_operator<take_last_t, size_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
12 changes: 5 additions & 7 deletions src/rpp/rpp/operators/take_until.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,22 @@ namespace rpp::operators::details
struct operator_traits
{
using result_type = T;

constexpr static bool own_current_queue = true;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;

template<rpp::constraint::observer Observer, typename... Strategies>
void subscribe(Observer&& observer, const observable_chain_strategy<Strategies...>& observable_strategy) const
template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer>
auto lift(Observer&& observer) const
{
const auto d = disposable_wrapper_impl<take_until_disposable<std::decay_t<Observer>>>::make(std::forward<Observer>(observer));
auto ptr = d.lock();
ptr->get_observer()->set_upstream(d.as_weak());

// Need to take ownership over current_thread in case of inner-observables also using it
auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
observable.subscribe(take_until_throttle_observer_strategy<std::decay_t<Observer>>{ptr});

using expected_value = typename observable_chain_strategy<Strategies...>::value_type;
observable_strategy.subscribe(rpp::observer<expected_value, take_until_observer_strategy<std::decay_t<Observer>>>(std::move(ptr)));
return rpp::observer<Type, take_until_observer_strategy<std::decay_t<Observer>>>(std::move(ptr));
}
};
} // namespace rpp::operators::details
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/take_while.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ namespace rpp::operators::details
template<rpp::constraint::decayed_type Fn>
struct take_while_t : lift_operator<take_while_t<Fn>, Fn>
{
using lift_operator<take_while_t<Fn>, Fn>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/throttle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ namespace rpp::operators::details
template<rpp::schedulers::constraint::scheduler Scheduler>
struct throttle_t : lift_operator<throttle_t<Scheduler>, rpp::schedulers::duration>
{
using lift_operator<throttle_t<Scheduler>, rpp::schedulers::duration>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
4 changes: 4 additions & 0 deletions src/rpp/rpp/operators/timeout.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ namespace rpp::operators::details
static_assert(rpp::constraint::observable_of_type<TFallbackObservable, T>, "TFallbackObservable should be the same type as T");

using result_type = T;

constexpr static bool own_current_queue = true;
};

rpp::schedulers::duration period;
Expand Down Expand Up @@ -170,6 +172,8 @@ namespace rpp::operators::details
struct operator_traits
{
using result_type = T;

constexpr static bool own_current_queue = true;
};

rpp::schedulers::duration period;
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/window.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ namespace rpp::operators::details

struct window_t : lift_operator<window_t, size_t>
{
using lift_operator<window_t, size_t>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
Expand Down
19 changes: 7 additions & 12 deletions src/rpp/rpp/operators/window_toggle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,28 +188,23 @@ namespace rpp::operators::details

template<rpp::constraint::observable TOpeningsObservable, typename TClosingsSelectorFn>
requires rpp::constraint::observable<std::invoke_result_t<TClosingsSelectorFn, rpp::utils::extract_observable_type_t<TOpeningsObservable>>>
struct window_toggle_t
struct window_toggle_t : lift_operator<window_toggle_t<TOpeningsObservable, TClosingsSelectorFn>, TOpeningsObservable, TClosingsSelectorFn>
{
RPP_NO_UNIQUE_ADDRESS TOpeningsObservable openings;
RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn closings_selector;
using lift_operator<window_toggle_t<TOpeningsObservable, TClosingsSelectorFn>, TOpeningsObservable, TClosingsSelectorFn>::lift_operator;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
using result_type = rpp::window_toggle_observable<T>;

constexpr static bool own_current_queue = true;

template<rpp::constraint::observer_of_type<result_type> TObserver>
using observer_strategy = window_toggle_observer_strategy<std::decay_t<TObserver>, TOpeningsObservable, TClosingsSelectorFn>;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;

template<rpp::constraint::observer Observer, typename... Strategies>
void subscribe(Observer&& observer, const observable_chain_strategy<Strategies...>& observable_strategy) const
{
// Need to take ownership over current_thread in case of inner-observables also using it
auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
using expected_value = typename observable_chain_strategy<Strategies...>::value_type;
observable_strategy.subscribe(rpp::observer<expected_value, window_toggle_observer_strategy<std::decay_t<Observer>, TOpeningsObservable, TClosingsSelectorFn>>{std::forward<Observer>(observer), openings, closings_selector});
}
};
} // namespace rpp::operators::details

Expand Down
Loading