Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions src/rpp/rpp/observables/connectable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,28 +130,40 @@ namespace rpp
details::ref_count_on_subscribe_t<connectable_observable<OriginalObservable, Subject>>>{*this};
}

template<rpp::constraint::operator_observable_transform<const connectable_observable&> Op>
template<typename Op>
auto operator|(Op&& op) const &
{
return std::forward<Op>(op)(*this);
if constexpr (std::invocable<std::decay_t<Op>, const connectable_observable&>)
{
static_assert(rpp::constraint::observable<std::invoke_result_t<std::decay_t<Op>, const connectable_observable&>>, "Result of Op should be observable");
return std::forward<Op>(op)(*this);
}
else
return static_cast<const base&>(*this) | std::forward<Op>(op);
}

template<rpp::constraint::operator_observable_transform<connectable_observable&&> Op>
template<typename Op>
auto operator|(Op&& op)&&
{
return std::forward<Op>(op)(std::move(*this));
if constexpr (std::invocable<std::decay_t<Op>, connectable_observable&&>)
{
static_assert(rpp::constraint::observable<std::invoke_result_t<std::decay_t<Op>, connectable_observable&&>>, "Result of Op should be observable");
return std::forward<Op>(op)(std::move(*this));
}
else
return static_cast<base&&>(*this) | std::forward<Op>(op);
}

template<typename Op>
decltype(std::declval<const base>() | std::declval<Op>()) operator|(Op&& op) const &
auto pipe(Op && op) const &
{
return static_cast<const base&>(*this) | std::forward<Op>(op);
return *this | std::forward<Op>(op);
}

template<typename Op>
decltype(std::declval<base>() | std::declval<Op>()) operator|(Op&& op)&&
auto pipe(Op && op)&&
{
return static_cast<base&&>(*this) | std::forward<Op>(op);
return std::move(*this) | std::forward<Op>(op);
}

private:
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/observables/details/chain_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace rpp::details::observables

using operator_traits = typename TStrategy::template operator_traits<typename base::value_type>;

static_assert(rpp::constraint::operator_chain<TStrategy, typename base::value_type, typename base::expected_disposable_strategy>);

public:
using expected_disposable_strategy = deduce_updated_disposable_strategy<TStrategy, typename base::expected_disposable_strategy>;
using value_type = typename operator_traits::result_type;
Expand Down
35 changes: 13 additions & 22 deletions src/rpp/rpp/observables/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,44 +79,40 @@ namespace rpp::constraint
template<typename T, typename Type>
concept observable_of_type = observable<T> && std::same_as<utils::extract_observable_type_t<T>, std::decay_t<Type>>;

template<typename Op, typename TObs>
concept operator_observable_transform = requires(const Op& op, TObs obs) {
{
op(static_cast<TObs>(obs))
} -> rpp::constraint::observable;
};

template<typename Op, typename Type>
concept operator_base = requires(const Op& op) { typename std::decay_t<Op>::template operator_traits<Type>; } && details::observables::constraint::disposable_strategy<details::observables::deduce_updated_disposable_strategy<std::decay_t<Op>, typename details::observables::chain<details::observables::fake_strategy<Type>>::expected_disposable_strategy>>;
template<typename TObservable, typename... TObservables>
concept observables_of_same_type = rpp::constraint::observable<TObservable> && (rpp::constraint::observable<TObservables> && ...) && (std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::utils::extract_observable_type_t<TObservables>> && ...);

template<typename Op, typename Type>
concept operator_subscribe = operator_base<Op, Type> && requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template operator_traits<Type>::result_type>&& observer, const details::observables::chain<details::observables::fake_strategy<Type>>& chain) {
concept operator_subscribe = requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template operator_traits<Type>::result_type>&& observer, const details::observables::chain<details::observables::fake_strategy<Type>>& chain) {
{
op.subscribe(std::move(observer), chain)
};
};

template<typename Op, typename Type>
concept operator_lift = operator_base<Op, Type> && requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template operator_traits<Type>::result_type>&& observer) {
concept operator_lift = requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template operator_traits<Type>::result_type>&& observer) {
{
op.template lift<Type>(std::move(observer))
} -> rpp::constraint::observer_of_type<Type>;
};

template<typename Op, typename Type, typename DisposableStrategy>
concept operator_lift_with_disposable_strategy = operator_base<Op, Type> && requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template operator_traits<Type>::result_type>&& observer) {
concept operator_lift_with_disposable_strategy = requires(const Op& op, rpp::details::observers::fake_observer<typename std::decay_t<Op>::template operator_traits<Type>::result_type>&& observer) {
{
op.template lift_with_disposable_strategy<Type, DisposableStrategy>(std::move(observer))
} -> rpp::constraint::observer_of_type<Type>;
};

template<typename Op, typename Type, typename DisposableStrategy>
concept operator_chain = operator_base<std::decay_t<Op>, Type>
&& requires { typename std::decay_t<Op>::template operator_traits<Type>::result_type; }
&& (operator_subscribe<std::decay_t<Op>, Type> || operator_lift<std::decay_t<Op>, Type> || operator_lift_with_disposable_strategy<std::decay_t<Op>, Type, DisposableStrategy>);
concept operator_chain =
requires() {
typename std::decay_t<Op>::template operator_traits<Type>;
typename std::decay_t<Op>::template operator_traits<Type>::result_type;
}
&& details::observables::constraint::disposable_strategy<details::observables::deduce_updated_disposable_strategy<std::decay_t<Op>,
typename details::observables::chain<details::observables::fake_strategy<Type>>::expected_disposable_strategy>>
&& (operator_subscribe<std::decay_t<Op>, Type> || operator_lift<std::decay_t<Op>, Type> || operator_lift_with_disposable_strategy<std::decay_t<Op>, Type, DisposableStrategy>);

template<typename TObservable, typename... TObservables>
concept observables_of_same_type = rpp::constraint::observable<TObservable> && (rpp::constraint::observable<TObservables> && ...) && (std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::utils::extract_observable_type_t<TObservables>> && ...);
} // namespace rpp::constraint

namespace rpp
Expand All @@ -133,8 +129,3 @@ namespace rpp
template<constraint::decayed_type Type, rpp::constraint::observable_of_type<Type>... Observables>
class variant_observable;
} // namespace rpp


#define RPP_CHECK_IF_TRAIT_ASSERTS_SATISFIED(Op, Type) \
/* operator_traits can be instantiated if all inner static_asserts are fine*/ \
if constexpr (requires { { typename std::decay_t<Op>::template operator_traits<Type>{}}; })
70 changes: 28 additions & 42 deletions src/rpp/rpp/observables/observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,42 +310,41 @@ namespace rpp
*/
auto as_dynamic() && { return rpp::dynamic_observable<Type>{std::move(*this)}; }

template<constraint::operator_base<Type> Op>
auto operator|(Op&& op) const &
template<typename Subscribe>
requires rpp::utils::is_base_of_v<std::decay_t<Subscribe>, rpp::operators::details::subscribe_t>
auto operator|(Subscribe&& op) const
{
RPP_CHECK_IF_TRAIT_ASSERTS_SATISFIED(Op, Type)
return inner_make_chain_operator(std::forward<Op>(op));
return std::forward<Subscribe>(op)(*this);
}

template<constraint::operator_base<Type> Op>
auto operator|(Op&& op) &&
{
RPP_CHECK_IF_TRAIT_ASSERTS_SATISFIED(Op, Type)
return std::move(*this).inner_make_chain_operator(std::forward<Op>(op));
}

template<constraint::operator_observable_transform<const observable&> Op>
auto operator|(Op&& op) const &
{
return std::forward<Op>(op)(*this);
}

template<constraint::operator_observable_transform<observable&&> Op>
auto operator|(Op&& op) &&
{
return std::forward<Op>(op)(std::move(*this));
}

template<typename... Args>
auto operator|(const rpp::operators::details::subscribe_t<Args...>& op) const
template<typename Op>
requires (!rpp::utils::is_base_of_v<std::decay_t<Op>, rpp::operators::details::subscribe_t>)
rpp::constraint::observable auto operator|(Op&& op) const &
{
return op(*this);
if constexpr (requires { typename std::decay_t<Op>::template operator_traits<Type>; })
{
using result_type = typename std::decay_t<Op>::template operator_traits<Type>::result_type;
return observable<result_type, details::observables::make_chain_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), m_strategy};
}
else
{
return std::forward<Op>(op)(*this);
}
}

template<typename... Args>
auto operator|(rpp::operators::details::subscribe_t<Args...>&& op) const
template<typename Op>
requires (!rpp::utils::is_base_of_v<std::decay_t<Op>, rpp::operators::details::subscribe_t>)
rpp::constraint::observable auto operator|(Op&& op) &&
{
return std::move(op)(*this);
if constexpr (requires { typename std::decay_t<Op>::template operator_traits<Type>; })
{
using result_type = typename std::decay_t<Op>::template operator_traits<Type>::result_type;
return observable<result_type, details::observables::make_chain_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), std::move(m_strategy)};
}
else
{
return std::forward<Op>(op)(std::move(*this));
}
}

template<typename Op>
Expand All @@ -360,19 +359,6 @@ namespace rpp
return std::move(*this) | std::forward<Op>(op);
}

private:
template<constraint::operator_chain<Type, expected_disposable_strategy> Op>
auto inner_make_chain_operator(Op&& op) const &
{
return observable<typename std::decay_t<Op>::template operator_traits<Type>::result_type, details::observables::make_chain_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), m_strategy};
}

template<constraint::operator_chain<Type, expected_disposable_strategy> Op>
auto inner_make_chain_operator(Op&& op) &&
{
return observable<typename std::decay_t<Op>::template operator_traits<Type>::result_type, details::observables::make_chain_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), std::move(m_strategy)};
}

private:
RPP_NO_UNIQUE_ADDRESS Strategy m_strategy;
};
Expand Down
6 changes: 2 additions & 4 deletions src/rpp/rpp/operators/flat_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,18 @@ namespace rpp::operators::details
RPP_NO_UNIQUE_ADDRESS Fn m_fn;

template<rpp::constraint::observable TObservable>
requires (std::invocable<Fn, rpp::utils::extract_observable_type_t<TObservable>>
&& rpp::constraint::observable<std::invoke_result_t<Fn, rpp::utils::extract_observable_type_t<TObservable>>>)
auto operator()(TObservable&& observable) const &
{
static_assert(std::invocable<Fn, rpp::utils::extract_observable_type_t<TObservable>> && rpp::constraint::observable<std::invoke_result_t<Fn, rpp::utils::extract_observable_type_t<TObservable>>>, "fn should return observable");
return std::forward<TObservable>(observable)
| rpp::ops::map(m_fn)
| rpp::ops::merge();
}

template<rpp::constraint::observable TObservable>
requires (std::invocable<Fn, rpp::utils::extract_observable_type_t<TObservable>>
&& rpp::constraint::observable<std::invoke_result_t<Fn, rpp::utils::extract_observable_type_t<TObservable>>>)
auto operator()(TObservable&& observable) &&
{
static_assert(std::invocable<Fn, rpp::utils::extract_observable_type_t<TObservable>> && rpp::constraint::observable<std::invoke_result_t<Fn, rpp::utils::extract_observable_type_t<TObservable>>>, "fn should return observable");
return std::forward<TObservable>(observable)
| rpp::ops::map(std::move(m_fn))
| rpp::ops::merge();
Expand Down
5 changes: 3 additions & 2 deletions src/rpp/rpp/operators/multicast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ namespace rpp::operators::details
RPP_NO_UNIQUE_ADDRESS Subject m_subject;

template<rpp::constraint::observable TObservable>
requires std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::subjects::utils::extract_subject_type_t<Subject>>
auto operator()(TObservable&& observable) const
{
static_assert(std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::subjects::utils::extract_subject_type_t<Subject>>, "observable and subject should be of same type");
return rpp::connectable_observable<std::decay_t<TObservable>, Subject>{std::forward<TObservable>(observable), m_subject};
}
};
Expand All @@ -33,9 +33,10 @@ namespace rpp::operators::details
struct template_multicast_t
{
template<rpp::constraint::observable TObservable>
requires rpp::constraint::subject<Subject<rpp::utils::extract_observable_type_t<TObservable>>>
auto operator()(TObservable&& observable) const
{
static_assert(rpp::constraint::subject<Subject<rpp::utils::extract_observable_type_t<TObservable>>>, "subject should be constructible with type of observable");

return rpp::connectable_observable<std::decay_t<TObservable>,
Subject<rpp::utils::extract_observable_type_t<TObservable>>>{std::forward<TObservable>(observable),
Subject<rpp::utils::extract_observable_type_t<TObservable>>{}};
Expand Down
5 changes: 3 additions & 2 deletions src/rpp/rpp/operators/start_with.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ namespace rpp::operators::details
rpp::utils::tuple<TObservables...> observables{};

template<rpp::constraint::observable TObservable>
requires rpp::constraint::observables_of_same_type<TObservable, TObservables...>
auto operator()(TObservable&& observable) const
{
static_assert(rpp::constraint::observables_of_same_type<TObservable, TObservables...>, "observables should be of same type");
return observables.apply(&apply<TObservable>, std::forward<TObservable>(observable));
}

Expand All @@ -50,9 +50,10 @@ namespace rpp::operators::details
{
}

template<rpp::constraint::observable_of_type<rpp::utils::iterable_value_t<PackedContainer>> TObservable>
template<rpp::constraint::observable TObservable>
auto operator()(TObservable&& observable) const
{
static_assert(rpp::constraint::observable_of_type<TObservable, rpp::utils::iterable_value_t<PackedContainer>>, "observables should be of same type");
return rpp::source::concat(rpp::source::from_iterable(container, scheduler), std::forward<TObservable>(observable));
}
};
Expand Down
Loading