Skip to content
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
"--header-insertion=never",
"--background-index",
"--clang-tidy",
"-j=32"
"-j=32",
"--function-arg-placeholders=0"
],
"sonarlint.pathToCompileCommands": "${workspaceFolder}/build/compile_commands.json",
}
16 changes: 13 additions & 3 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,25 @@ There we convert observable to concatenation of original observable and `just(2)

One more posible but a bit more advanced way to implement operators - is to lift observer. To do this, your functor-adapter must to satisfy `rpp::constraint::operator_lift` concept. Actually, your class must to have:
- member function `lift` accepting downstream observer and returning new upstream observer
- using `result_value<T>` accepting typename of upstream and providing new value for downstream (== typename of original observable and return new resulting type for new observable)
- inner `template<rpp::constraint::decayed_type T> struct traits` struct accepting typename of upstream and providing:
- `using result_type =` with typename of new resulting type for new observable
- (optionally) `struct requirements` with static_asserts over passed type

Example:
```cpp
template<typename Fn>
struct map
{
template<typename T>
using result_value = std::invoke_result_t<Fn, T>;
template<rpp::constraint::decayed_type T>
struct traits
{
struct requirements
{
static_assert(std::invocable<Fn, T>, "Fn is not invocable with T");
};

using result_type = std::invoke_result_t<Fn, T>;
};

Fn fn{};

Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/observables/details/chain_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class observable_chain_strategy

public:
using expected_disposable_strategy = details::observables::deduce_updated_disposable_strategy<TStrategy, typename base::expected_disposable_strategy>;
using value_type = typename TStrategy::template result_value<typename base::value_type>;
using value_type = typename TStrategy::template operator_traits<typename base::value_type>::result_type;

observable_chain_strategy(const TStrategy& strategy, const TStrategies&... strategies)
: m_strategy(strategy)
Expand Down
19 changes: 11 additions & 8 deletions src/rpp/rpp/observables/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,34 +91,37 @@ concept operator_observable_transform = requires(const Op& op, TObs obs)
};

template<typename Op, typename Type>
concept operator_base = requires(const Op& op) {
typename std::decay_t<Op>::template result_value<Type>;
requires details::observables::constraint::disposable_strategy<details::observables::deduce_updated_disposable_strategy<std::decay_t<Op>, typename observable_chain_strategy<details::observables::fake_strategy<Type>>::expected_disposable_strategy>>;
};
concept operator_base = requires(const Op& op) { typename std::decay_t<Op>::template operator_traits<Type>; } && details::observables::constraint::disposable_strategy<details::observables::deduce_updated_disposable_strategy<std::decay_t<Op>, typename observable_chain_strategy<details::observables::fake_strategy<Type>>::expected_disposable_strategy>>;

template<typename Op, typename Type>
concept operator_subscribe = operator_base<Op, Type> && requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template result_value<Type>>&& observer, const observable_chain_strategy<details::observables::fake_strategy<Type>>& chain)
concept operator_subscribe = operator_base<Op, Type> && requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template operator_traits<Type>::result_type>&& observer, const observable_chain_strategy<details::observables::fake_strategy<Type>>& chain)
{
{op.subscribe(std::move(observer), chain)};
};

template<typename Op, typename Type>
concept operator_lift = operator_base<Op, Type> && requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template result_value<Type>>&& observer)
concept operator_lift = operator_base<Op, Type> && requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template operator_traits<Type>::result_type>&& observer)
{
{op.template lift<Type>(std::move(observer))} -> rpp::constraint::observer_of_type<Type>;
};

template<typename Op, typename Type, typename DisposableStrategy>
concept operator_lift_with_disposable_strategy = operator_base<Op, Type> && requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template result_value<Type>>&& observer)
concept operator_lift_with_disposable_strategy = operator_base<Op, Type> && requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template operator_traits<Type>::result_type>&& observer)
{
{op.template lift_with_disposable_strategy<Type, DisposableStrategy>(std::move(observer))} -> rpp::constraint::observer_of_type<Type>;
};

template<typename Op, typename Type, typename DisposableStrategy>
concept operator_chain = operator_subscribe<Op, Type> || operator_lift<Op, Type> || operator_lift_with_disposable_strategy<Op, Type, DisposableStrategy>;
concept operator_chain = operator_base<std::decay_t<Op>, Type>
&& requires { typename std::decay_t<Op>::template operator_traits<Type>::result_type; }
&& (operator_subscribe<std::decay_t<Op>, Type> || operator_lift<std::decay_t<Op>, Type> || operator_lift_with_disposable_strategy<std::decay_t<Op>, Type, DisposableStrategy>);

template<typename TObservable, typename... TObservables>
concept observables_of_same_type = rpp::constraint::observable<TObservable> &&
(rpp::constraint::observable<TObservables> && ...) &&
(std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::utils::extract_observable_type_t<TObservables>> && ...);
}

#define RPP_CHECK_IF_TRAIT_ASSERTS_SATISFIED(Op, Type) \
/* operator_traits can be instantiated if all inner static_asserts are fine*/ \
if constexpr (requires { { typename std::decay_t<Op>::template operator_traits<Type>{}}; })
25 changes: 20 additions & 5 deletions src/rpp/rpp/observables/observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class observable
{
public:
using value_type = Type;

using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t<Strategy>;

template<typename... Args>
Expand Down Expand Up @@ -310,16 +310,18 @@ class observable
*/
auto as_dynamic() && { return rpp::dynamic_observable<Type>{std::move(*this)}; }

template<constraint::operator_chain<Type, expected_disposable_strategy> Op>
template<constraint::operator_base<Type> Op>
auto operator|(Op&& op) const &
{
return observable<typename std::decay_t<Op>::template result_value<Type>, make_chain_observable_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), m_strategy};
RPP_CHECK_IF_TRAIT_ASSERTS_SATISFIED(Op, Type)
return inner_make_chain_operator(std::forward<Op>(op));
}

template<constraint::operator_chain<Type, expected_disposable_strategy> Op>
template<constraint::operator_base<Type> Op>
auto operator|(Op&& op) &&
{
return observable<typename std::decay_t<Op>::template result_value<Type>, make_chain_observable_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), std::move(m_strategy)};
RPP_CHECK_IF_TRAIT_ASSERTS_SATISFIED(Op, Type)
return std::move(*this).inner_make_chain_operator(std::forward<Op>(op));
}

template<constraint::operator_observable_transform<const observable&> Op>
Expand Down Expand Up @@ -358,6 +360,19 @@ class observable
return std::move(*this) | std::forward<Op>(op);
}

private:
template<constraint::operator_chain<Type, expected_disposable_strategy> Op>
auto inner_make_chain_operator(Op&& op) const &
{
return observable<typename std::decay_t<Op>::template operator_traits<Type>::result_type, make_chain_observable_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), m_strategy};
}

template<constraint::operator_chain<Type, expected_disposable_strategy> Op>
auto inner_make_chain_operator(Op&& op) &&
{
return observable<typename std::decay_t<Op>::template operator_traits<Type>::result_type, make_chain_observable_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), std::move(m_strategy)};
}

private:
RPP_NO_UNIQUE_ADDRESS Strategy m_strategy;
};
Expand Down
10 changes: 8 additions & 2 deletions src/rpp/rpp/operators/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@ class buffer_observer_strategy
mutable std::vector<value_type> m_bucket;
};

struct buffer_t : public operators::details::operator_observable_strategy_different_types<buffer_observer_strategy, rpp::utils::types<>, size_t>
struct buffer_t : lift_operator<buffer_t, size_t>
{
template<rpp::constraint::decayed_type T>
using result_value = std::vector<T>;
struct operator_traits
{
using result_type = std::vector<T>;

template<rpp::constraint::observer_of_type<result_type> TObserver>
using observer_strategy = buffer_observer_strategy<TObserver>;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = Prev;
Expand Down
8 changes: 6 additions & 2 deletions src/rpp/rpp/operators/combine_latest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,12 @@ struct combine_latest_t
RPP_NO_UNIQUE_ADDRESS TSelector selector;

template<rpp::constraint::decayed_type T>
requires std::invocable<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>
using result_value = std::invoke_result_t<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>;
struct operator_traits
{
static_assert(std::invocable<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>, "Selector is not callable with passed T type");

using result_type = std::invoke_result_t<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;
Expand Down
21 changes: 14 additions & 7 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class concat_state_t final : public rpp::refcount_disposable
{
if (handle_observable_impl(observable, refcounted))
return;

drain(refcounted);
}

Expand Down Expand Up @@ -168,12 +168,12 @@ struct concat_observer_strategy : public concat_observer_strategy_base<TObservab
}

template<typename T>
void on_next(T&& v) const
{
void on_next(T&& v) const
{
ConcatStage current = ConcatStage::None;
if (base::state->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
base::state->handle_observable(std::forward<T>(v), base::state->add_ref());
else
else
base::state->get_queue()->push(std::forward<T>(v));
}

Expand All @@ -195,11 +195,18 @@ struct concat_observer_strategy : public concat_observer_strategy_base<TObservab
}
};

struct concat_t: public operators::details::template_operator_observable_strategy<concat_observer_strategy>
struct concat_t : lift_operator<concat_t>
{
template<rpp::constraint::decayed_type T>
requires rpp::constraint::observable<T>
using result_value = rpp::utils::extract_observable_type_t<T>;
struct operator_traits
{
static_assert(rpp::constraint::observable<T>, "T is not observable");

using result_type = rpp::utils::extract_observable_type_t<T>;

template<rpp::constraint::observer_of_type<result_type> TObserver>
using observer_strategy = concat_observer_strategy<T, TObserver>;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;
Expand Down
5 changes: 4 additions & 1 deletion src/rpp/rpp/operators/debounce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ template<rpp::schedulers::constraint::scheduler Scheduler>
struct debounce_t
{
template<rpp::constraint::decayed_type T>
using result_value = T;
struct operator_traits
{
using result_type = T;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;
Expand Down
5 changes: 4 additions & 1 deletion src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ template<rpp::schedulers::constraint::scheduler Scheduler, bool ClearOnError>
struct delay_t
{
template<rpp::constraint::decayed_type T>
using result_value = T;
struct operator_traits
{
using result_type = T;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;
Expand Down
54 changes: 13 additions & 41 deletions src/rpp/rpp/operators/details/strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,33 @@

namespace rpp::operators::details
{
template<typename SubscribeStrategy, rpp::constraint::decayed_type... Args>
class operator_observable_strategy_base
template<typename Operator, rpp::constraint::decayed_type... TArgs>
class lift_operator
{
public:
template<rpp::constraint::decayed_same_as<Args>... TArgs>
operator_observable_strategy_base(TArgs&&... args)
: m_vals{std::forward<TArgs>(args)...}
template<rpp::constraint::decayed_same_as<TArgs>... TTArgs>
lift_operator(TTArgs&&... args)
: m_vals{std::forward<TTArgs>(args)...}
{
}

template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer>
auto lift(Observer&& observer) const
{
return m_vals.apply(&SubscribeStrategy::template apply<Type, Observer, Args...>, std::forward<Observer>(observer));
return m_vals.apply(&apply<Type, Observer, TArgs...>, std::forward<Observer>(observer));
}

private:
RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple<Args...> m_vals{};
};

template<template<typename, typename...> typename Strategy, typename Types>
struct identity_subscribe_strategy;

template<template<typename, typename...> typename Strategy, typename... Types>
struct identity_subscribe_strategy<Strategy, rpp::utils::types<Types...>>
{
template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer, typename... Args>
template<rpp::constraint::decayed_type Type,
rpp::constraint::observer Observer,
typename... Args>
static auto apply(Observer&& observer, const Args&... vals)
{
return rpp::observer<Type, Strategy<std::decay_t<Observer>, Types...>>{std::forward<Observer>(observer), vals...};
static_assert(rpp::constraint::observer_of_type<std::decay_t<Observer>, typename Operator::template operator_traits<Type>::result_type>);
return rpp::observer<Type, typename Operator::template operator_traits<Type>::template observer_strategy<std::decay_t<Observer>>>{std::forward<Observer>(observer), vals...};
}
};

template<template<typename, typename...> typename Strategy, rpp::constraint::decayed_type... Args>
using operator_observable_strategy = operator_observable_strategy_base<identity_subscribe_strategy<Strategy, rpp::utils::types<Args...>>, Args...>;

template<template<typename, typename...> typename Strategy, typename Types = rpp::utils::types<>, rpp::constraint::decayed_type... Args>
using operator_observable_strategy_different_types = operator_observable_strategy_base<identity_subscribe_strategy<Strategy, Types>, Args...>;

template<template<typename, typename, typename...> typename Strategy, typename Types>
struct template_subscribe_strategy;

template<template<typename, typename, typename...> typename Strategy, typename... Types>
struct template_subscribe_strategy<Strategy, rpp::utils::types<Types...>>
{
template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer, typename... Args>
static auto apply(Observer&& observer, const Args&... vals)
{
return rpp::observer<Type, Strategy<Type, std::decay_t<Observer>, Types...>>{std::forward<Observer>(observer), vals...};
}
private:
RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple<TArgs...> m_vals{};
};

template<template<typename, typename, typename...> typename Strategy, rpp::constraint::decayed_type... Args>
using template_operator_observable_strategy = operator_observable_strategy_base<template_subscribe_strategy<Strategy, rpp::utils::types<Args...>>, Args...>;

template<template<typename, typename, typename...> typename Strategy, typename Types = rpp::utils::types<>, rpp::constraint::decayed_type... Args>
using template_operator_observable_strategy_different_types = operator_observable_strategy_base<template_subscribe_strategy<Strategy, Types>, Args...>;
}
14 changes: 10 additions & 4 deletions src/rpp/rpp/operators/distinct.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ namespace rpp::operators::details
template<rpp::constraint::decayed_type Type, rpp::constraint::observer TObserver>
struct distinct_observer_strategy
{
static_assert(rpp::constraint::hashable<Type>, "Distinct operator requires hashable type");

using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

RPP_NO_UNIQUE_ADDRESS TObserver observer;
Expand All @@ -49,10 +47,18 @@ struct distinct_observer_strategy
bool is_disposed() const { return observer.is_disposed(); }
};

struct distinct_t : public operators::details::template_operator_observable_strategy<distinct_observer_strategy>
struct distinct_t : lift_operator<distinct_t>
{
template<rpp::constraint::decayed_type T>
using result_value = T;
struct operator_traits
{
static_assert(rpp::constraint::hashable<T>, "T is not hashable");

using result_type = T;

template<rpp::constraint::observer_of_type<result_type> TObserver>
using observer_strategy = distinct_observer_strategy<T, TObserver>;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = Prev;
Expand Down
13 changes: 10 additions & 3 deletions src/rpp/rpp/operators/distinct_until_changed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,18 @@ struct distinct_until_changed_observer_strategy
};

template<rpp::constraint::decayed_type EqualityFn>
struct distinct_until_changed_t : public operators::details::template_operator_observable_strategy<distinct_until_changed_observer_strategy, EqualityFn>
struct distinct_until_changed_t : public operators::details::lift_operator<distinct_until_changed_t<EqualityFn>, EqualityFn>
{
template<rpp::constraint::decayed_type T>
requires rpp::constraint::invocable_r_v<bool, EqualityFn, T, T>
using result_value = T;
struct operator_traits
{
static_assert(rpp::constraint::invocable_r_v<bool, EqualityFn, T, T>, "EqualityFn is not invocable with T and T returning bool");

using result_type = T;

template<rpp::constraint::observer_of_type<result_type> TObserver>
using observer_strategy = distinct_until_changed_observer_strategy<T, TObserver, EqualityFn>;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = Prev;
Expand Down
Loading