diff --git a/.vscode/settings.json b/.vscode/settings.json index f74ccbc62..9116f6d65 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,7 +3,8 @@ "--header-insertion=never", "--background-index", "--clang-tidy", - "-j=32" + "-j=32", + "--function-arg-placeholders=0" ], "sonarlint.pathToCompileCommands": "${workspaceFolder}/build/compile_commands.json", } \ No newline at end of file diff --git a/docs/readme.md b/docs/readme.md index bf43eaeeb..5fe7f8e7b 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -269,15 +269,25 @@ There we convert observable to concatenation of original observable and `just(2) One more posible but a bit more advanced way to implement operators - is to lift observer. To do this, your functor-adapter must to satisfy `rpp::constraint::operator_lift` concept. Actually, your class must to have: - member function `lift` accepting downstream observer and returning new upstream observer -- using `result_value` accepting typename of upstream and providing new value for downstream (== typename of original observable and return new resulting type for new observable) +- inner `template struct traits` struct accepting typename of upstream and providing: + - `using result_type =` with typename of new resulting type for new observable + - (optionally) `struct requirements` with static_asserts over passed type Example: ```cpp template struct map { - template - using result_value = std::invoke_result_t; + template + struct traits + { + struct requirements + { + static_assert(std::invocable, "Fn is not invocable with T"); + }; + + using result_type = std::invoke_result_t; + }; Fn fn{}; diff --git a/src/rpp/rpp/observables/details/chain_strategy.hpp b/src/rpp/rpp/observables/details/chain_strategy.hpp index e50a2c5e3..c03fbf2b6 100644 --- a/src/rpp/rpp/observables/details/chain_strategy.hpp +++ b/src/rpp/rpp/observables/details/chain_strategy.hpp @@ -23,7 +23,7 @@ class observable_chain_strategy public: using expected_disposable_strategy = details::observables::deduce_updated_disposable_strategy; - using value_type = typename TStrategy::template result_value; + using value_type = typename TStrategy::template operator_traits::result_type; observable_chain_strategy(const TStrategy& strategy, const TStrategies&... strategies) : m_strategy(strategy) diff --git a/src/rpp/rpp/observables/fwd.hpp b/src/rpp/rpp/observables/fwd.hpp index 085fd0196..b6e33876e 100644 --- a/src/rpp/rpp/observables/fwd.hpp +++ b/src/rpp/rpp/observables/fwd.hpp @@ -91,34 +91,37 @@ concept operator_observable_transform = requires(const Op& op, TObs obs) }; template -concept operator_base = requires(const Op& op) { - typename std::decay_t::template result_value; - requires details::observables::constraint::disposable_strategy, typename observable_chain_strategy>::expected_disposable_strategy>>; -}; +concept operator_base = requires(const Op& op) { typename std::decay_t::template operator_traits; } && details::observables::constraint::disposable_strategy, typename observable_chain_strategy>::expected_disposable_strategy>>; template -concept operator_subscribe = operator_base && requires(const Op& op, rpp::details::observers::fake_observer::template result_value>&& observer, const observable_chain_strategy>& chain) +concept operator_subscribe = operator_base && requires(const Op& op, rpp::details::observers::fake_observer::template operator_traits::result_type>&& observer, const observable_chain_strategy>& chain) { {op.subscribe(std::move(observer), chain)}; }; template -concept operator_lift = operator_base && requires(const Op& op, rpp::details::observers::fake_observer::template result_value>&& observer) +concept operator_lift = operator_base && requires(const Op& op, rpp::details::observers::fake_observer::template operator_traits::result_type>&& observer) { {op.template lift(std::move(observer))} -> rpp::constraint::observer_of_type; }; template -concept operator_lift_with_disposable_strategy = operator_base && requires(const Op& op, rpp::details::observers::fake_observer::template result_value>&& observer) +concept operator_lift_with_disposable_strategy = operator_base && requires(const Op& op, rpp::details::observers::fake_observer::template operator_traits::result_type>&& observer) { {op.template lift_with_disposable_strategy(std::move(observer))} -> rpp::constraint::observer_of_type; }; template -concept operator_chain = operator_subscribe || operator_lift || operator_lift_with_disposable_strategy; +concept operator_chain = operator_base, Type> + && requires { typename std::decay_t::template operator_traits::result_type; } + && (operator_subscribe, Type> || operator_lift, Type> || operator_lift_with_disposable_strategy, Type, DisposableStrategy>); template concept observables_of_same_type = rpp::constraint::observable && (rpp::constraint::observable && ...) && (std::same_as, rpp::utils::extract_observable_type_t> && ...); } + +#define RPP_CHECK_IF_TRAIT_ASSERTS_SATISFIED(Op, Type) \ + /* operator_traits can be instantiated if all inner static_asserts are fine*/ \ + if constexpr (requires { { typename std::decay_t::template operator_traits{}}; }) diff --git a/src/rpp/rpp/observables/observable.hpp b/src/rpp/rpp/observables/observable.hpp index 30828eae8..309f9c785 100644 --- a/src/rpp/rpp/observables/observable.hpp +++ b/src/rpp/rpp/observables/observable.hpp @@ -36,7 +36,7 @@ class observable { public: using value_type = Type; - + using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t; template @@ -310,16 +310,18 @@ class observable */ auto as_dynamic() && { return rpp::dynamic_observable{std::move(*this)}; } - template Op> + template Op> auto operator|(Op&& op) const & { - return observable::template result_value, make_chain_observable_t, Strategy>>{std::forward(op), m_strategy}; + RPP_CHECK_IF_TRAIT_ASSERTS_SATISFIED(Op, Type) + return inner_make_chain_operator(std::forward(op)); } - template Op> + template Op> auto operator|(Op&& op) && { - return observable::template result_value, make_chain_observable_t, Strategy>>{std::forward(op), std::move(m_strategy)}; + RPP_CHECK_IF_TRAIT_ASSERTS_SATISFIED(Op, Type) + return std::move(*this).inner_make_chain_operator(std::forward(op)); } template Op> @@ -358,6 +360,19 @@ class observable return std::move(*this) | std::forward(op); } +private: + template Op> + auto inner_make_chain_operator(Op&& op) const & + { + return observable::template operator_traits::result_type, make_chain_observable_t, Strategy>>{std::forward(op), m_strategy}; + } + + template Op> + auto inner_make_chain_operator(Op&& op) && + { + return observable::template operator_traits::result_type, make_chain_observable_t, Strategy>>{std::forward(op), std::move(m_strategy)}; + } + private: RPP_NO_UNIQUE_ADDRESS Strategy m_strategy; }; diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index 3e9be1406..de0e9553a 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -64,10 +64,16 @@ class buffer_observer_strategy mutable std::vector m_bucket; }; -struct buffer_t : public operators::details::operator_observable_strategy_different_types, size_t> +struct buffer_t : lift_operator { template - using result_value = std::vector; + struct operator_traits + { + using result_type = std::vector; + + template TObserver> + using observer_strategy = buffer_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 17d62352d..49b74ae0c 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -103,8 +103,12 @@ struct combine_latest_t RPP_NO_UNIQUE_ADDRESS TSelector selector; template - requires std::invocable...> - using result_value = std::invoke_result_t...>; + struct operator_traits + { + static_assert(std::invocable...>, "Selector is not callable with passed T type"); + + using result_type = std::invoke_result_t...>; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index d8de38db9..4f365faa1 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -71,7 +71,7 @@ class concat_state_t final : public rpp::refcount_disposable { if (handle_observable_impl(observable, refcounted)) return; - + drain(refcounted); } @@ -168,12 +168,12 @@ struct concat_observer_strategy : public concat_observer_strategy_base - void on_next(T&& v) const - { + void on_next(T&& v) const + { ConcatStage current = ConcatStage::None; if (base::state->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst)) base::state->handle_observable(std::forward(v), base::state->add_ref()); - else + else base::state->get_queue()->push(std::forward(v)); } @@ -195,11 +195,18 @@ struct concat_observer_strategy : public concat_observer_strategy_base +struct concat_t : lift_operator { template - requires rpp::constraint::observable - using result_value = rpp::utils::extract_observable_type_t; + struct operator_traits + { + static_assert(rpp::constraint::observable, "T is not observable"); + + using result_type = rpp::utils::extract_observable_type_t; + + template TObserver> + using observer_strategy = concat_observer_strategy; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 9541efbc7..6030b1490 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -156,7 +156,10 @@ template struct debounce_t { template - using result_value = T; + struct operator_traits + { + using result_type = T; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 5959f23a8..61b22ee7b 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -170,7 +170,10 @@ template struct delay_t { template - using result_value = T; + struct operator_traits + { + using result_type = T; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; diff --git a/src/rpp/rpp/operators/details/strategy.hpp b/src/rpp/rpp/operators/details/strategy.hpp index ee5737d1a..13778739d 100644 --- a/src/rpp/rpp/operators/details/strategy.hpp +++ b/src/rpp/rpp/operators/details/strategy.hpp @@ -23,61 +23,33 @@ namespace rpp::operators::details { -template -class operator_observable_strategy_base +template +class lift_operator { public: - template... TArgs> - operator_observable_strategy_base(TArgs&&... args) - : m_vals{std::forward(args)...} + template... TTArgs> + lift_operator(TTArgs&&... args) + : m_vals{std::forward(args)...} { } template auto lift(Observer&& observer) const { - return m_vals.apply(&SubscribeStrategy::template apply, std::forward(observer)); + return m_vals.apply(&apply, std::forward(observer)); } private: - RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple m_vals{}; -}; - -template typename Strategy, typename Types> -struct identity_subscribe_strategy; - -template typename Strategy, typename... Types> -struct identity_subscribe_strategy> -{ - template + template static auto apply(Observer&& observer, const Args&... vals) { - return rpp::observer, Types...>>{std::forward(observer), vals...}; + static_assert(rpp::constraint::observer_of_type, typename Operator::template operator_traits::result_type>); + return rpp::observer::template observer_strategy>>{std::forward(observer), vals...}; } -}; - -template typename Strategy, rpp::constraint::decayed_type... Args> -using operator_observable_strategy = operator_observable_strategy_base>, Args...>; - -template typename Strategy, typename Types = rpp::utils::types<>, rpp::constraint::decayed_type... Args> -using operator_observable_strategy_different_types = operator_observable_strategy_base, Args...>; - -template typename Strategy, typename Types> -struct template_subscribe_strategy; -template typename Strategy, typename... Types> -struct template_subscribe_strategy> -{ - template - static auto apply(Observer&& observer, const Args&... vals) - { - return rpp::observer, Types...>>{std::forward(observer), vals...}; - } +private: + RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple m_vals{}; }; - -template typename Strategy, rpp::constraint::decayed_type... Args> -using template_operator_observable_strategy = operator_observable_strategy_base>, Args...>; - -template typename Strategy, typename Types = rpp::utils::types<>, rpp::constraint::decayed_type... Args> -using template_operator_observable_strategy_different_types = operator_observable_strategy_base, Args...>; } diff --git a/src/rpp/rpp/operators/distinct.hpp b/src/rpp/rpp/operators/distinct.hpp index dcc71d5f3..0446e01d4 100644 --- a/src/rpp/rpp/operators/distinct.hpp +++ b/src/rpp/rpp/operators/distinct.hpp @@ -23,8 +23,6 @@ namespace rpp::operators::details template struct distinct_observer_strategy { - static_assert(rpp::constraint::hashable, "Distinct operator requires hashable type"); - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; RPP_NO_UNIQUE_ADDRESS TObserver observer; @@ -49,10 +47,18 @@ struct distinct_observer_strategy bool is_disposed() const { return observer.is_disposed(); } }; -struct distinct_t : public operators::details::template_operator_observable_strategy +struct distinct_t : lift_operator { template - using result_value = T; + struct operator_traits + { + static_assert(rpp::constraint::hashable, "T is not hashable"); + + using result_type = T; + + template TObserver> + using observer_strategy = distinct_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/distinct_until_changed.hpp b/src/rpp/rpp/operators/distinct_until_changed.hpp index 876bbe016..5caf9aa6a 100644 --- a/src/rpp/rpp/operators/distinct_until_changed.hpp +++ b/src/rpp/rpp/operators/distinct_until_changed.hpp @@ -48,11 +48,18 @@ struct distinct_until_changed_observer_strategy }; template -struct distinct_until_changed_t : public operators::details::template_operator_observable_strategy +struct distinct_until_changed_t : public operators::details::lift_operator, EqualityFn> { template - requires rpp::constraint::invocable_r_v - using result_value = T; + struct operator_traits + { + static_assert(rpp::constraint::invocable_r_v, "EqualityFn is not invocable with T and T returning bool"); + + using result_type = T; + + template TObserver> + using observer_strategy = distinct_until_changed_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/filter.hpp b/src/rpp/rpp/operators/filter.hpp index d3a16e615..6ebcddb6c 100644 --- a/src/rpp/rpp/operators/filter.hpp +++ b/src/rpp/rpp/operators/filter.hpp @@ -44,11 +44,18 @@ struct filter_observer_strategy }; template -struct filter_t : public operators::details::operator_observable_strategy +struct filter_t : lift_operator, Fn> { template - requires std::is_invocable_r_v - using result_value = T; + struct operator_traits + { + static_assert(std::is_invocable_r_v, "Fn is not invocable with T returning bool"); + + using result_type = T; + + template TObserver> + using observer_strategy = filter_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/first.hpp b/src/rpp/rpp/operators/first.hpp index 838960ea7..6fcad3145 100644 --- a/src/rpp/rpp/operators/first.hpp +++ b/src/rpp/rpp/operators/first.hpp @@ -43,10 +43,16 @@ struct first_observer_strategy bool is_disposed() const { return observer.is_disposed(); } }; -struct first_t : public operators::details::operator_observable_strategy +struct first_t : lift_operator { template - using result_value = T; + struct operator_traits + { + using result_type = T; + + template TObserver> + using observer_strategy = first_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/group_by.hpp b/src/rpp/rpp/operators/group_by.hpp index 75de84d6f..6efe5099e 100644 --- a/src/rpp/rpp/operators/group_by.hpp +++ b/src/rpp/rpp/operators/group_by.hpp @@ -121,7 +121,7 @@ struct group_by_observer_strategy const subject_observer* deduce_observer(const rpp::constraint::observer auto& obs, const TT& val) const { const auto key = key_selector(utils::as_const(val)); - + if (const auto itr = key_to_observer.find(key); itr != key_to_observer.cend()) return &itr->second; @@ -162,13 +162,22 @@ struct group_by_observable_strategy }; template -struct group_by_t : public operators::details::template_operator_observable_strategy +struct group_by_t : lift_operator, KeySelector, ValueSelector, KeyComparator> { - using operators::details::template_operator_observable_strategy::template_operator_observable_strategy; + using operators::details::lift_operator, KeySelector, ValueSelector, KeyComparator>::lift_operator; template - requires std::invocable && std::invocable && std::strict_weak_order, rpp::utils::decayed_invoke_result_t> - using result_value = grouped_observable, rpp::utils::decayed_invoke_result_t, group_by_observable_strategy>>; + struct operator_traits + { + static_assert(std::invocable, "KeySelector is not invocacble with T"); + static_assert(std::invocable, "ValueSelector is not invocable with T"); + static_assert(std::strict_weak_order, rpp::utils::decayed_invoke_result_t>, "KeyComparator is not invocable with result of KeySelector"); + + using result_type = grouped_observable, rpp::utils::decayed_invoke_result_t, group_by_observable_strategy>>; + + template TObserver> + using observer_strategy = group_by_observer_strategy; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; diff --git a/src/rpp/rpp/operators/last.hpp b/src/rpp/rpp/operators/last.hpp index de187f3ff..e971f3ddf 100644 --- a/src/rpp/rpp/operators/last.hpp +++ b/src/rpp/rpp/operators/last.hpp @@ -51,10 +51,16 @@ struct last_observer_strategy bool is_disposed() const { return observer.is_disposed(); } }; -struct last_t : public operators::details::template_operator_observable_strategy +struct last_t : lift_operator { template - using result_value = T; + struct operator_traits + { + using result_type = T; + + template TObserver> + using observer_strategy = last_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/map.hpp b/src/rpp/rpp/operators/map.hpp index 7ae7aa03d..9cf964fd2 100644 --- a/src/rpp/rpp/operators/map.hpp +++ b/src/rpp/rpp/operators/map.hpp @@ -43,11 +43,18 @@ struct map_observer_strategy }; template -struct map_t : public operators::details::operator_observable_strategy +struct map_t : lift_operator, Fn> { template - requires std::invocable - using result_value = std::invoke_result_t; + struct operator_traits + { + static_assert(std::invocable, "Fn is not invocable with T"); + + using result_type = std::invoke_result_t; + + template TObserver> + using observer_strategy = map_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 0571ebe97..0491ca8c9 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -133,8 +133,12 @@ class merge_observer_strategy final : public merge_observer_base_strategy - requires rpp::constraint::observable - using result_value = rpp::utils::extract_observable_type_t; + struct operator_traits + { + static_assert(rpp::constraint::observable, "T is not observable"); + + using result_type = rpp::utils::extract_observable_type_t; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; @@ -157,8 +161,12 @@ struct merge_with_t RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables{}; template - requires (std::same_as> && ...) - using result_value = T; + struct operator_traits + { + static_assert((std::same_as> && ...), "T is not same as values of other observables"); + + using result_type = T; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp index ad69db487..36ab5e04d 100644 --- a/src/rpp/rpp/operators/reduce.hpp +++ b/src/rpp/rpp/operators/reduce.hpp @@ -47,13 +47,20 @@ struct reduce_observer_strategy }; template -struct reduce_t : public operators::details::operator_observable_strategy_different_types, Seed, Accumulator> +struct reduce_t : lift_operator, Seed, Accumulator> { - using operators::details::operator_observable_strategy_different_types, Seed, Accumulator>::operator_observable_strategy_different_types; + using operators::details::lift_operator, Seed, Accumulator>::lift_operator; template - requires std::is_invocable_r_v - using result_value = Seed; + struct operator_traits + { + static_assert(std::is_invocable_r_v, "Accumulator is not invocable with Seed&& abnd T returning Seed"); + + using result_type = Seed; + + template TObserver> + using observer_strategy = reduce_observer_strategy; + }; template using updated_disposable_strategy = Prev; @@ -92,11 +99,18 @@ struct reduce_no_seed_observer_strategy }; template -struct reduce_no_seed_t : public operators::details::operator_observable_strategy +struct reduce_no_seed_t : lift_operator, Accumulator> { template - requires std::is_invocable_r_v - using result_value = T; + struct operator_traits + { + static_assert(std::is_invocable_r_v, "Accumulator is not invocable with T&& abnd T returning T"); + + using result_type = T; + + template TObserver> + using observer_strategy = reduce_no_seed_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/scan.hpp b/src/rpp/rpp/operators/scan.hpp index 512619715..005972de8 100644 --- a/src/rpp/rpp/operators/scan.hpp +++ b/src/rpp/rpp/operators/scan.hpp @@ -52,23 +52,32 @@ struct scan_observer_strategy }; template -struct scan_t : public operators::details::operator_observable_strategy +struct scan_t : lift_operator, InitialValue, Fn> { - using operators::details::operator_observable_strategy::operator_observable_strategy; + using operators::details::lift_operator, InitialValue, Fn>::lift_operator; template - requires std::is_invocable_r_v - using result_value = InitialValue; + struct operator_traits + { + static_assert(std::is_invocable_r_v, "Accumulator is not invocable with Seed&& abnd T returning Seed"); + + using result_type = InitialValue; + + template TObserver> + using observer_strategy = scan_observer_strategy; + }; template using updated_disposable_strategy = Prev; }; -template +template struct scan_no_seed_observer_strategy { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + using Seed = rpp::utils::extract_observer_type_t; + RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS Fn fn; mutable std::optional seed{}; @@ -94,11 +103,18 @@ struct scan_no_seed_observer_strategy }; template -struct scan_no_seed_t : public operators::details::template_operator_observable_strategy +struct scan_no_seed_t : lift_operator, Fn> { template - requires std::is_invocable_r_v - using result_value = T; + struct operator_traits + { + static_assert(std::is_invocable_r_v, "Accumulator is not invocable with T&& abnd T returning T"); + + using result_type = T; + + template TObserver> + using observer_strategy = scan_no_seed_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/skip.hpp b/src/rpp/rpp/operators/skip.hpp index 6158b887a..7ef47f385 100644 --- a/src/rpp/rpp/operators/skip.hpp +++ b/src/rpp/rpp/operators/skip.hpp @@ -45,10 +45,16 @@ struct skip_observer_strategy bool is_disposed() const { return observer.is_disposed(); } }; -struct skip_t : public operators::details::operator_observable_strategy_different_types, size_t> +struct skip_t : lift_operator { template - using result_value = T; + struct operator_traits + { + using result_type = T; + + template TObserver> + using observer_strategy = skip_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/subscribe_on.hpp b/src/rpp/rpp/operators/subscribe_on.hpp index 4264a2386..a6b722187 100644 --- a/src/rpp/rpp/operators/subscribe_on.hpp +++ b/src/rpp/rpp/operators/subscribe_on.hpp @@ -36,7 +36,10 @@ template struct subscribe_on_t { template - using result_value = T; + struct operator_traits + { + using result_type = T; + }; template using updated_disposable_strategy = typename Prev::template add::is_none_disposable ? 0 : 1>; diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index c2ee5cb35..934036385 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -89,7 +89,7 @@ class switch_on_next_observer_strategy using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; switch_on_next_observer_strategy(TObserver&& obs) - : m_state{init_state(std::move(obs))} + : m_state{init_state(std::move(obs))} { } @@ -127,17 +127,25 @@ class switch_on_next_observer_strategy ptr->get_observer()->set_upstream(d.as_weak()); return ptr; } - + private: std::shared_ptr> m_state; rpp::composite_disposable_wrapper m_this_refcount = m_state->add_ref(); mutable rpp::composite_disposable_wrapper m_last_refcount = composite_disposable_wrapper::empty(); }; -struct switch_on_next_t : public operators::details::operator_observable_strategy +struct switch_on_next_t : lift_operator { - template - using result_value = rpp::utils::extract_observable_type_t; + template + struct operator_traits + { + static_assert(rpp::constraint::observable, "T is not observable"); + + using result_type = rpp::utils::extract_observable_type_t; + + template TObserver> + using observer_strategy = switch_on_next_observer_strategy; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; diff --git a/src/rpp/rpp/operators/take.hpp b/src/rpp/rpp/operators/take.hpp index 6ac92125c..aed98fe54 100644 --- a/src/rpp/rpp/operators/take.hpp +++ b/src/rpp/rpp/operators/take.hpp @@ -49,10 +49,16 @@ struct take_observer_strategy bool is_disposed() const { return observer.is_disposed(); } }; -struct take_t : public operator_observable_strategy_different_types, size_t> +struct take_t : lift_operator { template - using result_value = T; + struct operator_traits + { + using result_type = T; + + template TObserver> + using observer_strategy = take_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/take_last.hpp b/src/rpp/rpp/operators/take_last.hpp index bd37d5e9f..c4ffaf032 100644 --- a/src/rpp/rpp/operators/take_last.hpp +++ b/src/rpp/rpp/operators/take_last.hpp @@ -52,15 +52,15 @@ class take_last_observer_strategy void on_error(const std::exception_ptr& err) const { m_observer.on_error(err); } - void on_completed() const - { + void on_completed() const + { for (size_t i =0; i < m_data.size(); ++i) { m_observer.on_next(std::move(m_data[m_current_end])); m_current_end = get_next(m_current_end); } - m_observer.on_completed(); + m_observer.on_completed(); } void set_upstream(const disposable_wrapper& d) { m_observer.set_upstream(d); } @@ -80,10 +80,16 @@ class take_last_observer_strategy mutable size_t m_current_end{}; }; -struct take_last_t : public operator_observable_strategy_different_types, size_t> +struct take_last_t : lift_operator { template - using result_value = T; + struct operator_traits + { + using result_type = T; + + template TObserver> + using observer_strategy = take_last_observer_strategy; + }; template using updated_disposable_strategy = Prev; @@ -94,7 +100,7 @@ namespace rpp::operators { /** * @brief Emit only last `count` items provided by observable, then send `on_completed` - * + * * @marble take_last { source observable : +--1-2-3-4-5-6-| @@ -105,7 +111,7 @@ namespace rpp::operators * * @param count amount of last items to be emitted * @warning #include - * + * * @par Example * @snippet take_last.cpp take_last * diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index 05a92feaf..b8156f79b 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -46,13 +46,13 @@ struct take_until_observer_strategy_base std::shared_ptr> state; - void on_error(const std::exception_ptr& err) const + void on_error(const std::exception_ptr& err) const { state->dispose(); state->get_observer()->on_error(err); } - void on_completed() const + void on_completed() const { state->dispose(); state->get_observer()->on_completed(); @@ -90,7 +90,10 @@ struct take_until_t RPP_NO_UNIQUE_ADDRESS TObservable observable{}; template - using result_value = T; + struct operator_traits + { + using result_type = T; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; diff --git a/src/rpp/rpp/operators/take_while.hpp b/src/rpp/rpp/operators/take_while.hpp index 834e8916e..b8acc9867 100644 --- a/src/rpp/rpp/operators/take_while.hpp +++ b/src/rpp/rpp/operators/take_while.hpp @@ -44,11 +44,18 @@ struct take_while_observer_strategy }; template -struct take_while_t : public operators::details::operator_observable_strategy +struct take_while_t : lift_operator, Fn> { template - requires std::is_invocable_r_v - using result_value = T; + struct operator_traits + { + static_assert(std::is_invocable_r_v, "Fn is not invocable with T returning bool"); + + using result_type = T; + + template TObserver> + using observer_strategy = take_while_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/tap.hpp b/src/rpp/rpp/operators/tap.hpp index 17b44619f..ea4098e4f 100644 --- a/src/rpp/rpp/operators/tap.hpp +++ b/src/rpp/rpp/operators/tap.hpp @@ -59,13 +59,20 @@ template< rpp::constraint::decayed_type OnNext, rpp::constraint::decayed_type OnError, rpp::constraint::decayed_type OnCompleted> -struct tap_t : public operators::details::operator_observable_strategy +struct tap_t : public operators::details::lift_operator, OnNext, OnError, OnCompleted> { - using operators::details::operator_observable_strategy::operator_observable_strategy; + using operators::details::lift_operator, OnNext, OnError, OnCompleted>::lift_operator; template - requires rpp::constraint::invocable_r_v - using result_value = T; + struct operator_traits + { + static_assert(rpp::constraint::invocable_r_v, "OnNext is not invocable with T"); + + using result_type = T; + + template TObserver> + using observer_strategy = tap_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/throttle.hpp b/src/rpp/rpp/operators/throttle.hpp index 2decc7234..55e5aaf56 100644 --- a/src/rpp/rpp/operators/throttle.hpp +++ b/src/rpp/rpp/operators/throttle.hpp @@ -51,10 +51,16 @@ struct throttle_observer_strategy }; template -struct throttle_t : public operators::details::operator_observable_strategy_different_types, rpp::schedulers::duration> +struct throttle_t : lift_operator, rpp::schedulers::duration> { template - using result_value = T; + struct operator_traits + { + using result_type = T; + + template TObserver> + using observer_strategy = throttle_observer_strategy; + }; template using updated_disposable_strategy = Prev; diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index 30d6d0ef2..f5d32a074 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -102,10 +102,16 @@ class window_observer_strategy mutable size_t m_items_in_current_window = m_window_size; }; -struct window_t : public operators::details::operator_observable_strategy_different_types, size_t> +struct window_t : lift_operator { template - using result_value = window_observable; + struct operator_traits + { + using result_type = window_observable; + + template TObserver> + using observer_strategy = window_observer_strategy; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 72e64491b..8517d959e 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -185,13 +185,16 @@ class window_toggle_observer_strategy template requires rpp::constraint::observable>> -struct window_toggle_t //: public operators::details::operator_observable_strategy +struct window_toggle_t { RPP_NO_UNIQUE_ADDRESS TOpeningsObservable openings; RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn closings_selector; template - using result_value = rpp::window_toggle_observable; + struct operator_traits + { + using result_type = rpp::window_toggle_observable; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index 94900b5fa..ba3814526 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -131,8 +131,12 @@ struct with_latest_from_t RPP_NO_UNIQUE_ADDRESS TSelector selector; template - requires std::invocable...> - using result_value = std::invoke_result_t...>; + struct operator_traits + { + static_assert(std::invocable...>, "TSelector is not invocable with T and types of rest observables"); + + using result_type = std::invoke_result_t...>; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;