diff --git a/Doxyfile b/Doxyfile index 7fc2f7bc0..445bebe4b 100644 --- a/Doxyfile +++ b/Doxyfile @@ -230,7 +230,7 @@ JAVADOC_BANNER = NO # If the QT_AUTOBRIEF tag is set to YES then Doxygen will interpret the first # line (until the first dot) of a Qt-style comment as the brief description. If # set to NO, the Qt-style will behave just like regular Qt-style comments (thus -# requiring an explicit \brief command for a brief description.) +# requiring an explicit @brief command for a brief description.) # The default value is: NO. QT_AUTOBRIEF = NO @@ -432,7 +432,7 @@ DISTRIBUTE_GROUP_DOC = NO # If one adds a struct or class to a group and this option is enabled, then also # any nested class or struct is added to the same group. By default this option -# is disabled and one has to add nested compounds explicitly via \ingroup. +# is disabled and one has to add nested compounds explicitly via @ingroup. # The default value is: NO. GROUP_NESTED_COMPOUNDS = NO @@ -447,7 +447,7 @@ GROUP_NESTED_COMPOUNDS = NO SUBGROUPING = YES # When the INLINE_GROUPED_CLASSES tag is set to YES, classes, structs and unions -# are shown inside the group in which they are included (e.g. using \ingroup) +# are shown inside the group in which they are included (e.g. using @ingroup) # instead of on a separate page (for HTML and Man pages) or section (for LaTeX # and RTF). # diff --git a/docs/readme.md b/docs/readme.md index 39fe5f90c..83bf032de 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -131,98 +131,15 @@ In such an way it is not powerful enough, so Reactive Programming provides a lis \copydoc observables -### Operators - -\copydoc operators - -#### How Operators Work? - -Example: - -```cpp -rpp::source::create([](const auto& observer){ - observer.on_next(1); - observer.on_completed(); -}); -``` - -This example shows next: we create observble of `int` via operator `create`. This observable just emits to observer value `1` and then completes. Type of this observable is `rpp::observable` where `...` implementation defined type. So, actually it is `observable of ints`. Let's say we want to convert `int` to `std::string`. We could subscribe and then convert it or use `map` operator (also known as `transform`) to transform some original value to some another value: +### Observers: -```cpp -rpp::source::create([](const auto& observer){ - observer.on_next(1); - observer.on_completed(); -}) -| rpp::operators::map([](int v){ return std::string{v}; }); -``` - -For now it is `observable of strings` due to it is `rpp::observable`. But what is `rpp::operators::map` then? Actually it is functor-adaptor - just functor accepting observable and returning another observable. It accepts original observable and converts it to observable of "final type". "final type" is result of invocation of passed function against original observable's type. In our case it is `decltype([](int v){ return std::string{v}; }(int{}))` is it is `std::string`. So, `map` can be implemented in the following way: - -```cpp -template -struct map -{ - Fn fn{}; +\copydoc observers - template - auto operator()(const rpp::observable& observable) const { - using FinalType = std::invoke_result_t; - return rpp::source::create([observable, fn](const rpp::dynamic_observer& observer) - { - observable.subscribe([observer, fn](const auto& v) { observer.on_next(fn(v)); }, - [observer](const std::exception_ptr& err) { observer.on_error(err); }, - [observer]() { observer.on_completed(); }); - };); - } -} -``` - -It is template for such an functor-adaptor. Provided example - is simplest possible way to implement new operators - just provide function for transformation of observable. For example, it is fully valid example: -```cpp -rpp::source::just(1) - | [](const auto& observable) { return rpp::source::concat(observable, rpp::source::just(2)); }; -``` - -There we convert observable to concatenation of original observable and `just(2)`. - -One more posible but a bit more advanced way to implement operators - is to lift observer. To do this, your functor-adapter must to satisfy `rpp::constraint::operator_lift` concept. Actually, your class must to have: -- member function `lift` accepting downstream observer and returning new upstream observer -- inner `template struct traits` struct accepting typename of upstream and providing: - - `using result_type =` with typename of new resulting type for new observable - - (optionally) `struct requirements` with static_asserts over passed type - -Example: -```cpp -template -struct map -{ - template - struct traits - { - struct requirements - { - static_assert(std::invocable, "Fn is not invocable with T"); - }; - - using result_type = std::invoke_result_t; - }; - - Fn fn{}; - - template - auto lift(const rpp::dynamic_observer& observer) const - { - return rpp::make_lambda_observer([observer, fn](const auto& v){ observer.on_next(fn(v)); }, - [observer](const std::exception_ptr& err) { observer.on_error(err); }, - [observer]() { observer.on_completed(); }); - } -} +### Operators -``` -In this case you providing logic how to convert downstream observer to upstream observer. Actually this implementation is equal to previous one, but without handling of observable - you are expressing your operator in terms of observers +\copydoc operators -**(Advanced)** -In case of implementing operator via `lift` you can control disposable strategy via `updated_disposable_strategy` parameter. It accepts disposable strategy of upstream and returns disposable strategy for downstream. It needed only for optimization and reducing disposables handling cost and it is purely advanced thing. Not sure if anyone is going to use it by its own for now =) +Check the @link operators @endlink for more details about operators. ### Schedulers @@ -419,14 +336,14 @@ Below you can find list of extensions for RPP with adaption to external framewor ### rppqt \copydoc rppqt -Check API reference of \link rppqt \endlink for more details +Check API reference of @link rppqt @endlink for more details ### rppgrpc \copydoc rppgrpc -Check API reference of \link rppgrpc \endlink for more details +Check API reference of @link rppgrpc @endlink for more details ### rppasio \copydoc rppasio -Check API reference of \link rppasio \endlink for more details +Check API reference of @link rppasio @endlink for more details diff --git a/src/examples/rpp/doxygen/as_blocking.cpp b/src/examples/rpp/doxygen/as_blocking.cpp index 42d62fb3e..de1f6ae8e 100644 --- a/src/examples/rpp/doxygen/as_blocking.cpp +++ b/src/examples/rpp/doxygen/as_blocking.cpp @@ -5,7 +5,7 @@ #include /** - * \example as_blocking.cpp + * @example as_blocking.cpp **/ int main() // NOLINT(bugprone-exception-escape) diff --git a/src/examples/rpp/doxygen/concat.cpp b/src/examples/rpp/doxygen/concat.cpp index 9d0ca9723..418891ea4 100644 --- a/src/examples/rpp/doxygen/concat.cpp +++ b/src/examples/rpp/doxygen/concat.cpp @@ -5,7 +5,7 @@ #include /** - * \example concat.cpp + * @example concat.cpp **/ int main() // NOLINT(bugprone-exception-escape) diff --git a/src/examples/rpp/doxygen/connect.cpp b/src/examples/rpp/doxygen/connect.cpp index 826213409..5a70e1f5f 100644 --- a/src/examples/rpp/doxygen/connect.cpp +++ b/src/examples/rpp/doxygen/connect.cpp @@ -5,7 +5,7 @@ #include /** - * \example connect.cpp + * @example connect.cpp **/ int main() // NOLINT(bugprone-exception-escape) diff --git a/src/examples/rpp/doxygen/create.cpp b/src/examples/rpp/doxygen/create.cpp index c4d4e8b47..d0637fed1 100644 --- a/src/examples/rpp/doxygen/create.cpp +++ b/src/examples/rpp/doxygen/create.cpp @@ -3,7 +3,7 @@ #include /** - * \example create.cpp + * @example create.cpp **/ int main() // NOLINT(bugprone-exception-escape) diff --git a/src/examples/rpp/doxygen/defer.cpp b/src/examples/rpp/doxygen/defer.cpp index d4075eaef..843335a54 100644 --- a/src/examples/rpp/doxygen/defer.cpp +++ b/src/examples/rpp/doxygen/defer.cpp @@ -3,7 +3,7 @@ #include /** - * \example defer.cpp + * @example defer.cpp **/ int main() // NOLINT(bugprone-exception-escape) diff --git a/src/examples/rpp/doxygen/delay.cpp b/src/examples/rpp/doxygen/delay.cpp index 8a0756d85..c8b176d73 100644 --- a/src/examples/rpp/doxygen/delay.cpp +++ b/src/examples/rpp/doxygen/delay.cpp @@ -5,7 +5,7 @@ #include /** - * \example delay.cpp + * @example delay.cpp **/ int main() // NOLINT(bugprone-exception-escape) { diff --git a/src/examples/rpp/doxygen/distinct_until_changed.cpp b/src/examples/rpp/doxygen/distinct_until_changed.cpp index b6201ad30..f02a4cc87 100644 --- a/src/examples/rpp/doxygen/distinct_until_changed.cpp +++ b/src/examples/rpp/doxygen/distinct_until_changed.cpp @@ -3,7 +3,7 @@ #include /** - * \example distinct_until_changed.cpp + * @example distinct_until_changed.cpp **/ int main() diff --git a/src/examples/rpp/doxygen/from.cpp b/src/examples/rpp/doxygen/from.cpp index 24b87fee4..28c1321cc 100644 --- a/src/examples/rpp/doxygen/from.cpp +++ b/src/examples/rpp/doxygen/from.cpp @@ -3,7 +3,7 @@ #include /** - * \example from.cpp + * @example from.cpp **/ int main() // NOLINT(bugprone-exception-escape) diff --git a/src/examples/rpp/doxygen/group_by.cpp b/src/examples/rpp/doxygen/group_by.cpp index 31eeebd47..86f833c1e 100644 --- a/src/examples/rpp/doxygen/group_by.cpp +++ b/src/examples/rpp/doxygen/group_by.cpp @@ -3,7 +3,7 @@ #include /** - * \example group_by.cpp + * @example group_by.cpp **/ int main() { diff --git a/src/examples/rpp/doxygen/interval.cpp b/src/examples/rpp/doxygen/interval.cpp index 89cb5ab90..d3281de47 100644 --- a/src/examples/rpp/doxygen/interval.cpp +++ b/src/examples/rpp/doxygen/interval.cpp @@ -4,7 +4,7 @@ #include /** - * \example interval.cpp + * @example interval.cpp **/ int main() // NOLINT(bugprone-exception-escape) diff --git a/src/examples/rpp/doxygen/just.cpp b/src/examples/rpp/doxygen/just.cpp index 2fd41b3cc..48ad72641 100644 --- a/src/examples/rpp/doxygen/just.cpp +++ b/src/examples/rpp/doxygen/just.cpp @@ -4,7 +4,7 @@ #include /** - * \example just.cpp + * @example just.cpp **/ int main() // NOLINT(bugprone-exception-escape) diff --git a/src/examples/rpp/doxygen/last.cpp b/src/examples/rpp/doxygen/last.cpp index cb5af243b..8f2e899be 100644 --- a/src/examples/rpp/doxygen/last.cpp +++ b/src/examples/rpp/doxygen/last.cpp @@ -3,7 +3,7 @@ #include /** - * \example last.cpp + * @example last.cpp **/ int main() { diff --git a/src/examples/rpp/doxygen/map.cpp b/src/examples/rpp/doxygen/map.cpp index 4e22e1332..3c27f5543 100644 --- a/src/examples/rpp/doxygen/map.cpp +++ b/src/examples/rpp/doxygen/map.cpp @@ -3,7 +3,7 @@ #include /** - * \example map.cpp + * @example map.cpp **/ int main() // NOLINT(bugprone-exception-escape) diff --git a/src/examples/rpp/doxygen/observe_on.cpp b/src/examples/rpp/doxygen/observe_on.cpp index 430747203..39fd2611b 100644 --- a/src/examples/rpp/doxygen/observe_on.cpp +++ b/src/examples/rpp/doxygen/observe_on.cpp @@ -5,7 +5,7 @@ #include /** - * \example observe_on.cpp + * @example observe_on.cpp **/ int main() // NOLINT(bugprone-exception-escape) { diff --git a/src/examples/rpp/doxygen/readme.cpp b/src/examples/rpp/doxygen/readme.cpp index 1fc29392e..9d159f834 100644 --- a/src/examples/rpp/doxygen/readme.cpp +++ b/src/examples/rpp/doxygen/readme.cpp @@ -7,6 +7,54 @@ * @example readme.cpp */ + +// ![simple_custom_map] +template +struct simple_map +{ + simple_map(const Fn& fn) + : fn(fn) + { + } + + Fn fn{}; + + // 1: define traits for the operator with upstream (previous type) type + template + struct operator_traits + { + // 1.1: it could have static asserts to be sure T is applicable for this operator + static_assert(std::invocable, "Fn is not invocable with T"); + + // 1.2: it should have `result_type` is type of new observable after applying this operator + using result_type = std::invoke_result_t; + }; + + // 2: define updated optimal disposables strategy. Set to `rpp::details::observables::default_disposables_strategy` if you don't know what is that. + template + using updated_optimal_disposables_strategy = Prev; + + + // 3: implement core logic of operator: accept downstream observer (of result_type) and convert it to upstream observer (of T). + template + auto lift(Observer&& observer) const + { + const auto dynamic_observer = std::forward(observer).as_dynamic(); + return rpp::make_lambda_observer([dynamic_observer, fn = fn](const auto& v) { dynamic_observer.on_next(fn(v)); }, + [dynamic_observer](const std::exception_ptr& err) { dynamic_observer.on_error(err); }, + [dynamic_observer]() { dynamic_observer.on_completed(); }); + } +}; + +template +simple_map(const Fn& fn) -> simple_map; + +void test() +{ + rpp::source::just(1) | simple_map([](int v) { return std::to_string(v); }) | rpp::ops::subscribe(); +} +// ![simple_custom_map] + int main() // NOLINT(bugprone-exception-escape) { // ![readme] @@ -18,5 +66,7 @@ int main() // NOLINT(bugprone-exception-escape) | rpp::operators::subscribe([](char v) { std::cout << v; }); // ![readme] + rpp::source::just(1) | simple_map([](int v) { return std::to_string(v); }) | rpp::ops::subscribe(); + return 0; } diff --git a/src/examples/rpp/doxygen/skip.cpp b/src/examples/rpp/doxygen/skip.cpp index dfaa8defa..0da8518a7 100644 --- a/src/examples/rpp/doxygen/skip.cpp +++ b/src/examples/rpp/doxygen/skip.cpp @@ -3,7 +3,7 @@ #include /** - * \example skip.cpp + * @example skip.cpp **/ int main() { diff --git a/src/examples/rpp/doxygen/switch_on_next.cpp b/src/examples/rpp/doxygen/switch_on_next.cpp index 346310426..b58ecebf1 100644 --- a/src/examples/rpp/doxygen/switch_on_next.cpp +++ b/src/examples/rpp/doxygen/switch_on_next.cpp @@ -3,7 +3,7 @@ #include /** - * \example switch_on_next.cpp + * @example switch_on_next.cpp **/ int main() { diff --git a/src/examples/rpp/doxygen/take.cpp b/src/examples/rpp/doxygen/take.cpp index dca51dab2..d308797c0 100644 --- a/src/examples/rpp/doxygen/take.cpp +++ b/src/examples/rpp/doxygen/take.cpp @@ -3,7 +3,7 @@ #include /** - * \example take.cpp + * @example take.cpp **/ int main() // NOLINT(bugprone-exception-escape) { diff --git a/src/examples/rpp/doxygen/take_while.cpp b/src/examples/rpp/doxygen/take_while.cpp index 3eea416fb..52193992e 100644 --- a/src/examples/rpp/doxygen/take_while.cpp +++ b/src/examples/rpp/doxygen/take_while.cpp @@ -3,7 +3,7 @@ #include /** - * \example take_while.cpp + * @example take_while.cpp **/ int main() // NOLINT(bugprone-exception-escape) { diff --git a/src/examples/rpp/doxygen/thread_pool.cpp b/src/examples/rpp/doxygen/thread_pool.cpp index 91d52c242..e8f53d2bc 100644 --- a/src/examples/rpp/doxygen/thread_pool.cpp +++ b/src/examples/rpp/doxygen/thread_pool.cpp @@ -3,7 +3,7 @@ #include /** - * \example thread_pool.cpp + * @example thread_pool.cpp **/ int main() // NOLINT(bugprone-exception-escape) diff --git a/src/examples/rppgrpc/doxygen/client_reactor.cpp b/src/examples/rppgrpc/doxygen/client_reactor.cpp index db5562478..06ee998a8 100644 --- a/src/examples/rppgrpc/doxygen/client_reactor.cpp +++ b/src/examples/rppgrpc/doxygen/client_reactor.cpp @@ -5,7 +5,7 @@ #include "protocol.grpc.pb.h" /** - * \example client_reactor.cpp + * @example client_reactor.cpp **/ int main() // NOLINT(bugprone-exception-escape) diff --git a/src/examples/rppgrpc/doxygen/server_reactor.cpp b/src/examples/rppgrpc/doxygen/server_reactor.cpp index 86f5822ce..1e5aa046f 100644 --- a/src/examples/rppgrpc/doxygen/server_reactor.cpp +++ b/src/examples/rppgrpc/doxygen/server_reactor.cpp @@ -6,7 +6,7 @@ #include "protocol.grpc.pb.h" /** - * \example server_reactor.cpp + * @example server_reactor.cpp **/ class server : public TestService::CallbackService diff --git a/src/examples/rppqt/doxygen/from_signal.cpp b/src/examples/rppqt/doxygen/from_signal.cpp index e9535c705..5d41de202 100644 --- a/src/examples/rppqt/doxygen/from_signal.cpp +++ b/src/examples/rppqt/doxygen/from_signal.cpp @@ -7,7 +7,7 @@ #include /** - * \example from_signal.cpp + * @example from_signal.cpp **/ int main(int argc, char* argv[]) diff --git a/src/extensions/rppgrpc/rppgrpc/details/base.hpp b/src/extensions/rppgrpc/rppgrpc/details/base.hpp index c5e73facb..60582d90f 100644 --- a/src/extensions/rppgrpc/rppgrpc/details/base.hpp +++ b/src/extensions/rppgrpc/rppgrpc/details/base.hpp @@ -65,6 +65,8 @@ namespace rppgrpc::details struct observer_strategy { + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + std::reference_wrapper owner{}; template T> diff --git a/src/rpp/rpp/disposables/composite_disposable.hpp b/src/rpp/rpp/disposables/composite_disposable.hpp index 25b05d9a7..0ee87b544 100644 --- a/src/rpp/rpp/disposables/composite_disposable.hpp +++ b/src/rpp/rpp/disposables/composite_disposable.hpp @@ -26,7 +26,7 @@ namespace rpp * * @ingroup disposables */ - template + template class composite_disposable_impl : public interface_composite_disposable { public: @@ -171,7 +171,7 @@ namespace rpp * * @ingroup disposables */ - class composite_disposable : public composite_disposable_impl> + class composite_disposable : public composite_disposable_impl { }; } // namespace rpp diff --git a/src/rpp/rpp/disposables/details/container.hpp b/src/rpp/rpp/disposables/details/container.hpp index 8a4c84b5a..3c77df649 100644 --- a/src/rpp/rpp/disposables/details/container.hpp +++ b/src/rpp/rpp/disposables/details/container.hpp @@ -17,13 +17,16 @@ namespace rpp::details::disposables { - class dynamic_disposables_container_base + class dynamic_disposables_container { public: - explicit dynamic_disposables_container_base(size_t count) - { - m_data.reserve(count); - } + explicit dynamic_disposables_container() = default; + + dynamic_disposables_container(const dynamic_disposables_container&) = delete; + dynamic_disposables_container(dynamic_disposables_container&& other) noexcept = default; + + dynamic_disposables_container& operator=(const dynamic_disposables_container& other) = delete; + dynamic_disposables_container& operator=(dynamic_disposables_container&& other) noexcept = default; void push_back(const rpp::disposable_wrapper& d) { @@ -57,16 +60,6 @@ namespace rpp::details::disposables mutable std::vector m_data{}; }; - template - class dynamic_disposables_container : public dynamic_disposables_container_base - { - public: - dynamic_disposables_container() - : dynamic_disposables_container_base{Count} - { - } - }; - template class static_disposables_container { @@ -159,11 +152,13 @@ namespace rpp::details::disposables size_t m_size{}; }; - struct none_disposables_container + template<> + class static_disposables_container<0> { + public: [[noreturn]] static void push_back(const rpp::disposable_wrapper&) { - throw rpp::utils::more_disposables_than_expected{"none_disposables_container expected none disposables but obtained one"}; + throw rpp::utils::more_disposables_than_expected{"static_disposables_container<0> expected no disposables but received at least one"}; } static void remove(const rpp::disposable_wrapper&) {} diff --git a/src/rpp/rpp/disposables/fwd.hpp b/src/rpp/rpp/disposables/fwd.hpp index 6cfb7344a..ea7dffed5 100644 --- a/src/rpp/rpp/disposables/fwd.hpp +++ b/src/rpp/rpp/disposables/fwd.hpp @@ -43,23 +43,28 @@ namespace rpp namespace rpp::details::disposables { - template - class dynamic_disposables_container; - - template - class static_disposables_container; - - struct none_disposables_container; - namespace constraint { template - concept disposable_container = requires(T& c, const T& const_c, const rpp::disposable_wrapper& d) { + concept disposables_container = requires(T& c, const T& const_c, const rpp::disposable_wrapper& d) { c.push_back(d); const_c.dispose(); c.clear(); }; } // namespace constraint + + /** + * @brief Container with std::vector as underlying storage. + */ + class dynamic_disposables_container; + + /** + * @brief Container with fixed std::array as underlying storage. + */ + template + class static_disposables_container; + + using default_disposables_container = dynamic_disposables_container; } // namespace rpp::details::disposables namespace rpp diff --git a/src/rpp/rpp/observables.hpp b/src/rpp/rpp/observables.hpp index a5c16f2b8..d67892f8c 100644 --- a/src/rpp/rpp/observables.hpp +++ b/src/rpp/rpp/observables.hpp @@ -34,7 +34,7 @@ * 4. This is true **EXCEPT FOR** subjects if used manually. Use serialized_* instead if you can't guarantee serial emissions. * * For example: - * \code{.cpp} + * @code{.cpp} * auto s1 = rpp::source::just(1) | rpp::operators::repeat() | rpp::operators::subscribe_on(rpp::schedulers::new_thread{}); * auto s2 = rpp::source::just(2) | rpp::operators::repeat() | rpp::operators::subscribe_on(rpp::schedulers::new_thread{}); * s1 | rpp::operators::merge_with(s2) @@ -47,18 +47,18 @@ * }) * | rpp::operators::as_blocking() * | rpp::operators::subscribe([](int){}); - * \endcode + * @endcode * * This will never produce: - * \code{.log} + * @code{.log} * enter 1 * enter 2 * exit 2 * exit 1 - * \endcode + * @endcode * * Only serially: - * \code{.log} + * @code{.log} * enter 1 * exit 1 * enter 1 @@ -67,7 +67,7 @@ * exit 2 * enter 2 * exit 2 - * \endcode + * @endcode * @see https://reactivex.io/documentation/observable.html * @ingroup rpp */ diff --git a/src/rpp/rpp/observables/blocking_observable.hpp b/src/rpp/rpp/observables/blocking_observable.hpp index 137b76cd7..c95451126 100644 --- a/src/rpp/rpp/observables/blocking_observable.hpp +++ b/src/rpp/rpp/observables/blocking_observable.hpp @@ -48,7 +48,7 @@ namespace rpp::details::observables { public: using value_type = Type; - using expected_disposable_strategy = typename rpp::details::observables::deduce_disposable_strategy_t::template add<1>; + using optimal_disposables_strategy = typename Strategy::optimal_disposables_strategy::template add<1>; blocking_strategy(observable&& observable) : m_original{std::move(observable)} diff --git a/src/rpp/rpp/observables/connectable_observable.hpp b/src/rpp/rpp/observables/connectable_observable.hpp index 557699e4b..9ef71cf4f 100644 --- a/src/rpp/rpp/observables/connectable_observable.hpp +++ b/src/rpp/rpp/observables/connectable_observable.hpp @@ -35,7 +35,7 @@ namespace rpp::details std::shared_ptr m_state = std::make_shared(); using value_type = rpp::utils::extract_observable_type_t; - using expected_disposable_strategy = typename rpp::details::observables::deduce_disposable_strategy_t>::template add<1>; + using optimal_disposables_strategy = typename rpp::connectable_observable::optimal_disposables_strategy::template add<1>; template Strategy> void subscribe(observer&& obs) const diff --git a/src/rpp/rpp/observables/details/chain_strategy.hpp b/src/rpp/rpp/observables/details/chain_strategy.hpp index 0d2d1948f..9bb42e98c 100644 --- a/src/rpp/rpp/observables/details/chain_strategy.hpp +++ b/src/rpp/rpp/observables/details/chain_strategy.hpp @@ -24,10 +24,10 @@ namespace rpp::details::observables using operator_traits = typename TStrategy::template operator_traits; - static_assert(rpp::constraint::operator_chain); + static_assert(rpp::constraint::operator_); public: - using expected_disposable_strategy = deduce_updated_disposable_strategy; + using optimal_disposables_strategy = typename TStrategy::template updated_optimal_disposables_strategy; using value_type = typename operator_traits::result_type; chain(const TStrategy& strategy, const TStrategies&... strategies) @@ -47,8 +47,8 @@ namespace rpp::details::observables { [[maybe_unused]] const auto drain_on_exit = own_current_thread_if_needed(); - if constexpr (rpp::constraint::operator_lift_with_disposable_strategy) - m_strategies.subscribe(m_strategy.template lift_with_disposable_strategy(std::forward(observer))); + if constexpr (rpp::constraint::operator_lift_with_disposables_strategy) + m_strategies.subscribe(m_strategy.template lift_with_disposables_strategy(std::forward(observer))); else if constexpr (rpp::constraint::operator_lift) m_strategies.subscribe(m_strategy.template lift(std::forward(observer))); else @@ -73,7 +73,7 @@ namespace rpp::details::observables class chain { public: - using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t; + using optimal_disposables_strategy = typename TStrategy::optimal_disposables_strategy; using value_type = typename TStrategy::value_type; chain(const TStrategy& strategy) diff --git a/src/rpp/rpp/observables/details/disposable_strategy.hpp b/src/rpp/rpp/observables/details/disposable_strategy.hpp deleted file mode 100644 index dfcb0cdbb..000000000 --- a/src/rpp/rpp/observables/details/disposable_strategy.hpp +++ /dev/null @@ -1,119 +0,0 @@ -// ReactivePlusPlus library -// -// Copyright Aleksey Loginov 2023 - present. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// https://www.boost.org/LICENSE_1_0.txt) -// -// Project home: https://github.com/victimsnino/ReactivePlusPlus - -#pragma once - -#include - -#include - -namespace rpp::details::observables -{ - enum class AtomicMode - { - NonAtomic = 0, - Atomic = 1 - }; - - template - using deduce_atomic_bool = std::conditional_t; - - template - struct dynamic_disposable_strategy_selector - { - template - using add = dynamic_disposable_strategy_selector; - - using disposable_container = disposables::dynamic_disposables_container; - using disposable_strategy = observers::dynamic_local_disposable_strategy>; - }; - - template - using atomic_dynamic_disposable_strategy_selector = dynamic_disposable_strategy_selector; - - struct default_disposable_strategy_selector - { - template - using add = default_disposable_strategy_selector; - - using disposable_container = dynamic_disposable_strategy_selector<0, AtomicMode::Atomic>::disposable_container; - using disposable_strategy = dynamic_disposable_strategy_selector<0, AtomicMode::Atomic>::disposable_strategy; - }; - - template - struct fixed_disposable_strategy_selector - { - template - using add = fixed_disposable_strategy_selector; - - using disposable_container = disposables::static_disposables_container; - using disposable_strategy = observers::static_local_disposable_strategy>; - }; - - template - struct fixed_disposable_strategy_selector<0, Mode> - { - template - using add = fixed_disposable_strategy_selector; - - using disposable_container = default_disposable_strategy_selector::disposable_container; - using disposable_strategy = observers::bool_local_disposable_strategy>; - }; - - template - using atomic_fixed_disposable_strategy_selector = fixed_disposable_strategy_selector; - - using bool_disposable_strategy_selector = fixed_disposable_strategy_selector<0, AtomicMode::NonAtomic>; - using atomic_bool_disposable_strategy_selector = fixed_disposable_strategy_selector<0, AtomicMode::Atomic>; - - - namespace details - { - template - concept has_expected_disposable_strategy = requires { typename T::expected_disposable_strategy; }; - - template - consteval auto* deduce_disposable_strategy() - { - if constexpr (has_expected_disposable_strategy) - return static_cast(nullptr); - else - return static_cast(nullptr); - } - - template - concept has_updated_disposable_strategy = requires { typename T::template updated_disposable_strategy; }; - - template - consteval auto* deduce_updated_disposable_strategy() - { - if constexpr (has_updated_disposable_strategy) - return static_cast*>(nullptr); - else - return static_cast(nullptr); - } - } // namespace details - - template - using deduce_disposable_strategy_t = std::remove_pointer_t())>; - - template - using deduce_updated_disposable_strategy = std::remove_pointer_t())>; - - namespace constraint - { - template - concept disposable_strategy = requires(const T&) { - typename T::template add; - typename T::disposable_strategy; - typename T::disposable_container; - requires observers::constraint::disposable_strategy; - }; - } // namespace constraint -} // namespace rpp::details::observables diff --git a/src/rpp/rpp/observables/details/disposables_strategy.hpp b/src/rpp/rpp/observables/details/disposables_strategy.hpp new file mode 100644 index 000000000..e290e9e19 --- /dev/null +++ b/src/rpp/rpp/observables/details/disposables_strategy.hpp @@ -0,0 +1,49 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include + +#include + +namespace rpp::details::observables +{ + struct dynamic_disposables_strategy + { + template + using add = dynamic_disposables_strategy; + + using disposables_container = disposables::dynamic_disposables_container; + using observer_disposables_strategy = observers::dynamic_disposables_strategy; + }; + + template + struct fixed_disposables_strategy + { + template + using add = fixed_disposables_strategy; + + using disposables_container = disposables::static_disposables_container; + using observer_disposables_strategy = observers::static_disposables_strategy; + }; + + using default_disposables_strategy = dynamic_disposables_strategy; + + namespace constraint + { + template + concept disposables_strategy = requires(const T&) { + typename T::template add; + typename T::observer_disposables_strategy; + typename T::disposables_container; + requires observers::constraint::disposables_strategy; + }; + } // namespace constraint +} // namespace rpp::details::observables diff --git a/src/rpp/rpp/observables/dynamic_observable.hpp b/src/rpp/rpp/observables/dynamic_observable.hpp index db0b2a201..60f586a4e 100644 --- a/src/rpp/rpp/observables/dynamic_observable.hpp +++ b/src/rpp/rpp/observables/dynamic_observable.hpp @@ -29,7 +29,8 @@ namespace rpp::details::observables class dynamic_strategy final { public: - using value_type = Type; + using value_type = Type; + using optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy; template Strategy> requires (!rpp::constraint::decayed_same_as>) diff --git a/src/rpp/rpp/observables/fwd.hpp b/src/rpp/rpp/observables/fwd.hpp index 6cd897baf..3de20b121 100644 --- a/src/rpp/rpp/observables/fwd.hpp +++ b/src/rpp/rpp/observables/fwd.hpp @@ -11,18 +11,35 @@ #include -#include +#include #include #include namespace rpp::constraint { + /** + * @concept observable_strategy + * @brief A concept that defines the requirements for an observable strategy. + * + * This concept ensures that a type `S` meets the following criteria: + * - It has a `subscribe` method that accepts observer of type `T` and returns `void`. + * - It defines a nested type `value_type` to represent the type of values emitted by the observable. + * - It defines a nested type `optimal_disposables_strategy` to define the optimal disposables strategy observer could/should use to handle current observable properly. + * + * @tparam S The type to be checked against the concept. + * @tparam T The type of the values emitted by the observable. + * + * @ingroup observables + */ template concept observable_strategy = requires(const S& strategy, rpp::details::observers::fake_observer&& observer) { { strategy.subscribe(std::move(observer)) } -> std::same_as; + typename S::value_type; + typename S::optimal_disposables_strategy; + requires rpp::details::observables::constraint::disposables_strategy; }; } // namespace rpp::constraint @@ -37,9 +54,10 @@ namespace rpp::details::observables template struct fake_strategy { - using value_type = Type; + using value_type = Type; + using optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<0>; - static void subscribe(const auto&) {} + consteval static void subscribe(const auto&) {} }; } // namespace rpp::details::observables @@ -82,6 +100,11 @@ namespace rpp::constraint template concept observables_of_same_type = rpp::constraint::observable && (rpp::constraint::observable && ...) && (std::same_as, rpp::utils::extract_observable_type_t> && ...); + /** + * @concept operator_subscribe + * @brief Simple operator defining logic how to subscribe passed observer to passed observable. In most cases it means operator have some custom logic over observable too, so, you need to have access to observable, for example, subscribe to observable multiple times. + * @ingroup operators + */ template concept operator_subscribe = requires(const Op& op, rpp::details::observers::fake_observer::template operator_traits::result_type>&& observer, const details::observables::chain>& chain) { { @@ -89,6 +112,11 @@ namespace rpp::constraint }; }; + /** + * @concept operator_lift + * @brief Accept downstream observer and return new upstream (of type Type) observer. + * @ingroup operators + */ template concept operator_lift = requires(const Op& op, rpp::details::observers::fake_observer::template operator_traits::result_type>&& observer) { { @@ -96,22 +124,46 @@ namespace rpp::constraint } -> rpp::constraint::observer_of_type; }; + /** + * @concept operator_lift_with_disposables_strategy + * @brief Same as @link rpp::constraint::operator_lift @endlink but with custom disposables logic. For example, if you are manually create storage for disposables and want to do it optimal. + * @ingroup operators + */ + template - concept operator_lift_with_disposable_strategy = requires(const Op& op, rpp::details::observers::fake_observer::template operator_traits::result_type>&& observer) { + concept operator_lift_with_disposables_strategy = requires(const Op& op, rpp::details::observers::fake_observer::template operator_traits::result_type>&& observer) { { - op.template lift_with_disposable_strategy(std::move(observer)) + op.template lift_with_disposables_strategy(std::move(observer)) } -> rpp::constraint::observer_of_type; }; + template + concept has_operator_traits = requires() { + typename std::decay_t::template operator_traits; + typename std::decay_t::template operator_traits::result_type; + }; + + template + concept has_operator_disposables_strategy = requires() { + typename std::decay_t::template updated_optimal_disposables_strategy; + } && details::observables::constraint::disposables_strategy::template updated_optimal_disposables_strategy>; + + /** + * @concept operator_ + * @details Concept for any RPP-related operator: + * - operator should have type-traits: template sub-struct `operator_traits` where template typename is type of upstream. + * - Such and sub-struct should have `result_type` using to type of final observable/downstream observer + * - You can place any static_asserts to this `operator_traits` if you have some specific requiremenets + * - operator should have template using `updated_optimal_disposables_strategy` accepting `rpp::details::observables::disposbles_strategy` and returning new (updated) strategy to provide optimal behavior. For example, your operator could add+1 disposables to the strategy + * - operator should satisfy `rpp::constraint::operator_subscribe`, `rpp::constraint::operator_lift` or `rpp::constraint::operator_lift_with_disposables_strategy` + * + * @ingroup operators + */ template - concept operator_chain = - requires() { - typename std::decay_t::template operator_traits; - typename std::decay_t::template operator_traits::result_type; - } - && details::observables::constraint::disposable_strategy, - typename details::observables::chain>::expected_disposable_strategy>> - && (operator_subscribe, Type> || operator_lift, Type> || operator_lift_with_disposable_strategy, Type, DisposableStrategy>); + concept operator_ = + has_operator_traits + && has_operator_disposables_strategy + && (operator_subscribe, Type> || operator_lift, Type> || operator_lift_with_disposables_strategy, Type, DisposableStrategy>); } // namespace rpp::constraint diff --git a/src/rpp/rpp/observables/observable.hpp b/src/rpp/rpp/observables/observable.hpp index 2f12ebfb7..ab8bdcec9 100644 --- a/src/rpp/rpp/observables/observable.hpp +++ b/src/rpp/rpp/observables/observable.hpp @@ -40,7 +40,7 @@ namespace rpp using value_type = Type; using strategy_type = Strategy; - using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t; + using optimal_disposables_strategy = typename Strategy::optimal_disposables_strategy; template requires (!constraint::variadic_decayed_same_as, Args...> && constraint::is_constructible_from) @@ -77,10 +77,10 @@ namespace rpp requires (!constraint::observer) void subscribe(ObserverStrategy&& observer_strategy) const { - if constexpr (details::observers::has_disposable_strategy) - subscribe(rpp::observer>{std::forward(observer_strategy)}); + if constexpr (std::decay_t::preferred_disposables_mode == rpp::details::observers::disposables_mode::Auto) + subscribe(rpp::observer, typename optimal_disposables_strategy::observer_disposables_strategy>>{std::forward(observer_strategy)}); else - subscribe(rpp::observer_with_disposable, typename expected_disposable_strategy::disposable_strategy>{std::forward(observer_strategy)}); + subscribe(rpp::observer>{std::forward(observer_strategy)}); } /** @@ -93,7 +93,7 @@ namespace rpp * @return composite_disposable_wrapper is disposable to be able to dispose observer when it needed * * @par Example - * \code{.cpp} + * @code{.cpp} * auto disposable = rpp::composite_disposable_wrapper::make(); * rpp::source::just(1) * | rpp::operators::repeat() @@ -103,14 +103,14 @@ namespace rpp * std::this_thread::sleep_for(std::chrono::seconds(1)); * disposable.dispose(); * std::this_thread::sleep_for(std::chrono::seconds(1)); - * \endcode + * @endcode * */ template ObserverStrategy> composite_disposable_wrapper subscribe(const composite_disposable_wrapper& d, observer&& obs) const { if (!d.is_disposed()) - m_strategy.subscribe(observer_with_disposable>{d, std::move(obs)}); + m_strategy.subscribe(observer_with_external_disposable>{d, std::move(obs)}); return d; } @@ -127,7 +127,7 @@ namespace rpp requires (!constraint::observer) composite_disposable_wrapper subscribe(const composite_disposable_wrapper& d, ObserverStrategy&& observer_strategy) const { - subscribe(observer_with_disposable>{d, std::forward(observer_strategy)}); + subscribe(observer_with_external_disposable>{d, std::forward(observer_strategy)}); return d; } @@ -144,7 +144,7 @@ namespace rpp [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(observer&& observer) const { if (!observer.is_disposed()) - return subscribe(rpp::composite_disposable_wrapper::make>(), std::move(observer)); + return subscribe(rpp::composite_disposable_wrapper::make>(), std::move(observer)); return composite_disposable_wrapper::empty(); } @@ -159,7 +159,7 @@ namespace rpp requires (!constraint::observer) [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(ObserverStrategy&& observer_strategy) const { - return subscribe(rpp::composite_disposable_wrapper::make>(), std::forward(observer_strategy)); + return subscribe(rpp::composite_disposable_wrapper::make>(), std::forward(observer_strategy)); } /** @@ -173,7 +173,7 @@ namespace rpp */ [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(dynamic_observer observer) const { - return subscribe>(rpp::composite_disposable_wrapper::make>(), std::move(observer)); + return subscribe>(rpp::composite_disposable_wrapper::make>(), std::move(observer)); } /** @@ -188,9 +188,9 @@ namespace rpp { using strategy = rpp::details::observers::lambda_strategy, std::decay_t, std::decay_t>; - subscribe(observer_with_disposable{std::forward(on_next), - std::forward(on_error), - std::forward(on_completed)}); + subscribe(observer>{std::forward(on_next), + std::forward(on_error), + std::forward(on_completed)}); } /** @@ -216,7 +216,7 @@ namespace rpp std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> [[nodiscard("Use returned disposable or use subscribe(on_next, on_error, on_completed) instead")]] composite_disposable_wrapper subscribe_with_disposable(OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {}) const { - auto res = rpp::composite_disposable_wrapper::make>(); + auto res = rpp::composite_disposable_wrapper::make>(); subscribe(make_lambda_observer(res, std::forward(on_next), std::forward(on_error), @@ -250,7 +250,7 @@ namespace rpp * @return composite_disposable_wrapper is disposable to be able to dispose observer when it needed * * @par Example - * \code{.cpp} + * @code{.cpp} * auto disposable = rpp::composite_disposable_wrapper::make(); * rpp::source::just(1) * | rpp::operators::repeat() @@ -260,7 +260,7 @@ namespace rpp * std::this_thread::sleep_for(std::chrono::seconds(1)); * disposable.dispose(); * std::this_thread::sleep_for(std::chrono::seconds(1)); - * \endcode + * @endcode * */ template OnNext, @@ -288,7 +288,7 @@ namespace rpp * @return composite_disposable_wrapper is disposable to be able to dispose observer when it needed * * @par Example - * \code{.cpp} + * @code{.cpp} * auto disposable = rpp::composite_disposable_wrapper::make(); * rpp::source::just(1) * | rpp::operators::repeat() @@ -298,7 +298,7 @@ namespace rpp * std::this_thread::sleep_for(std::chrono::seconds(1)); * disposable.dispose(); * std::this_thread::sleep_for(std::chrono::seconds(1)); - * \endcode + * @endcode * */ template OnNext, @@ -332,7 +332,8 @@ namespace rpp if constexpr (requires { typename std::decay_t::template operator_traits; }) { using result_type = typename std::decay_t::template operator_traits::result_type; - return observable, Strategy>>{std::forward(op), m_strategy}; + if constexpr (requires { typename std::decay_t::template operator_traits::result_type; }) // narrow compilataion error a bit + return observable, Strategy>>{std::forward(op), m_strategy}; } else { diff --git a/src/rpp/rpp/observables/variant_observable.hpp b/src/rpp/rpp/observables/variant_observable.hpp index 06e0bc3cb..fe82e64bf 100644 --- a/src/rpp/rpp/observables/variant_observable.hpp +++ b/src/rpp/rpp/observables/variant_observable.hpp @@ -19,6 +19,8 @@ namespace rpp::details template... Observables> struct variant_observable_strategy { + using optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy; + using value_type = Type; template TT> requires (!constraint::decayed_same_as) @@ -27,7 +29,6 @@ namespace rpp::details { } - variant_observable_strategy(const variant_observable_strategy& other) = default; variant_observable_strategy(variant_observable_strategy&& other) noexcept = default; diff --git a/src/rpp/rpp/observers.hpp b/src/rpp/rpp/observers.hpp index 600126365..61bafbb42 100644 --- a/src/rpp/rpp/observers.hpp +++ b/src/rpp/rpp/observers.hpp @@ -12,11 +12,32 @@ /** * @defgroup observers Observers - * @brief Observer subscribes on Observable and obtains values provided by Observable. - * @details Observer is kind of wrapper over 3 core functions: - * - on_next(T) - callback with new emission provided by observable - * - on_error(err) - failure termination callback with reason of failure of observable (why observable can't continue processing) - * - on_completed() - succeed termination callback - observable is done, no any future emissions from this + * + * @details Observer subscribes on Observable and obtains values provided by Observable. + * + * In fact observer is kind of wrapper over 3 core functions: + * - `on_next(T)` - callback with new emission provided by observable + * - `on_error(err)` - failure termination callback with reason of failure of observable (why observable can't continue processing) + * - `on_completed()` - succeed termination callback - observable is done, no any future emissions from this + * + * Additionally in RPP observer handles @link disposables @endlink related logic: + * - `set_upstream(disposable)` - observable could pass to observer it's own disposable to provide ability for observer to terminate observable's internal actions/state. + * - `is_disposed()` - observable could check if observer is still interested in emissions (`false`) or done and no any futher calls would be success (`true`) + * + * @par Observer creation: + * - **Observer creation inside subscribe:**
+ * RPP expects user to create observers only inside `subscribe` function of observables. Something like this: + * @code{.cpp} + * rpp::source::just(1).subscribe([](int){}, [](const std::exception_ptr&){}, [](){}); + * rpp::source::just(1) | rpp::operators::subscribe([](int){}, [](const std::exception_ptr&){}, [](){}); + * @endcode + * Some of the callbacks (on_next/on_error/on_completed) can be omitted. Check @link rpp::operators::subscribe @endlink for more details. + * + * - **Advanced observer creation:**
+ * Technically it is possible to create custom observer via creating new class/struct which satisfies concept @link rpp::constraint::observer_strategy @endlink, but it is **highly not-recommended for most cases**
+ * Also *technically* you could create your observer via `make_lambda_observer` function, but it is not recommended too: it could disable some built-in optimizations and cause worse performance.
+ * Also it is **most probably** bad pattern and invalid usage of RX if you want to keep/store observers as member variables/fields. Most probably you are doing something wrong IF you are not implementing custom observable/operator. + * * @see https://reactivex.io/documentation/observable.html * @ingroup rpp */ diff --git a/src/rpp/rpp/observers/details/disposable_strategy.hpp b/src/rpp/rpp/observers/details/disposable_strategy.hpp deleted file mode 100644 index 30db40c10..000000000 --- a/src/rpp/rpp/observers/details/disposable_strategy.hpp +++ /dev/null @@ -1,106 +0,0 @@ -// ReactivePlusPlus library -// -// Copyright Aleksey Loginov 2022 - present. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// https://www.boost.org/LICENSE_1_0.txt) -// -// Project home: https://github.com/victimsnino/ReactivePlusPlus -// - -#pragma once - -#include - -#include -#include - -#include - -namespace rpp::details::observers -{ - class atomic_bool - { - public: - atomic_bool() = default; - atomic_bool(atomic_bool&& other) noexcept - // just need atomicity, not guarding anything - : m_value{other.m_value.load(std::memory_order::seq_cst)} - { - } - - bool test() const noexcept - { - // just need atomicity, not guarding anything - return m_value.load(std::memory_order::seq_cst); - } - - void set() noexcept - { - // just need atomicity, not guarding anything - m_value.store(true, std::memory_order::seq_cst); - } - - private: - std::atomic_bool m_value{}; - }; - - class non_atomic_bool - { - public: - non_atomic_bool() = default; - non_atomic_bool(non_atomic_bool&& other) noexcept = default; - - bool test() const noexcept - { - return m_value; - } - - void set() noexcept - { - m_value = true; - } - - private: - bool m_value{}; - }; - - template Bool> - class local_disposable_strategy - { - public: - local_disposable_strategy() = default; - local_disposable_strategy(local_disposable_strategy&& other) noexcept = default; - - void add(const disposable_wrapper& d) - { - m_upstreams.push_back(d); - } - - bool is_disposed() const noexcept - { - // just need atomicity, not guarding anything - return m_is_disposed.test(); - } - - void dispose() const - { - // just need atomicity, not guarding anything - m_is_disposed.set(); - m_upstreams.dispose(); - } - - private: - RPP_NO_UNIQUE_ADDRESS DisposableContainer m_upstreams{}; - mutable Bool m_is_disposed{}; - }; - - struct none_disposable_strategy - { - static void add(const rpp::disposable_wrapper&) {} - - static bool is_disposed() noexcept { return false; } - - static void dispose() {} - }; -} // namespace rpp::details::observers diff --git a/src/rpp/rpp/observers/details/disposables_strategy.hpp b/src/rpp/rpp/observers/details/disposables_strategy.hpp new file mode 100644 index 000000000..966079736 --- /dev/null +++ b/src/rpp/rpp/observers/details/disposables_strategy.hpp @@ -0,0 +1,58 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include + +#include + +namespace rpp::details::observers +{ + template + class local_disposables_strategy + { + public: + local_disposables_strategy() = default; + local_disposables_strategy(local_disposables_strategy&& other) noexcept = default; + + void add(const disposable_wrapper& d) + { + m_upstreams.push_back(d); + } + + bool is_disposed() const noexcept + { + return m_is_disposed; + } + + void dispose() const + { + m_is_disposed = true; + m_upstreams.dispose(); + } + + private: + RPP_NO_UNIQUE_ADDRESS DisposableContainer m_upstreams{}; + mutable bool m_is_disposed{}; + }; + + struct none_disposables_strategy + { + static constexpr void add(const rpp::disposable_wrapper&) {} + + static constexpr bool is_disposed() noexcept { return false; } + + static constexpr void dispose() {} + }; +} // namespace rpp::details::observers diff --git a/src/rpp/rpp/observers/details/fwd.hpp b/src/rpp/rpp/observers/details/fwd.hpp index 377b9f295..0ac6be5be 100644 --- a/src/rpp/rpp/observers/details/fwd.hpp +++ b/src/rpp/rpp/observers/details/fwd.hpp @@ -18,44 +18,20 @@ namespace rpp::details::observers { - class atomic_bool; - class non_atomic_bool; - - template Bool> - class local_disposable_strategy; - - /** - * @brief No any disposable logic at all. Used only inside proxy-forwarding operators where extra disposable logic not requires - */ - struct none_disposable_strategy; - - /** - * @brief Dynamic disposable logic based on pre-allocated vector - */ - template Bool> - using dynamic_local_disposable_strategy = local_disposable_strategy, Bool>; - - /** - * @brief Same as dynamic strategy, but based on array. - */ - template Bool> - using static_local_disposable_strategy = local_disposable_strategy, Bool>; - - /** - * @brief Just an boolean with no any disposables - */ - template Bool> - using bool_local_disposable_strategy = local_disposable_strategy; - - /** - * @brief External disposable used as strategy - */ - using external_disposable_strategy = composite_disposable_wrapper; + enum class disposables_mode : uint8_t + { + // Let observer deduce disposables mode + Auto = 0, + // No any disposables logic for observer expected + None = 1, + // Use external (passed to constructor) composite_disposable_wrapper as disposable + External = 2 + }; namespace constraint { template - concept disposable_strategy = requires(T& v, const T& const_v, const disposable_wrapper& d) { + concept disposables_strategy = requires(T& v, const T& const_v, const disposable_wrapper& d) { v.add(d); { const_v.is_disposed() @@ -64,21 +40,45 @@ namespace rpp::details::observers }; } // namespace constraint - template - concept has_disposable_strategy = requires { typename T::preferred_disposable_strategy; }; + template + class local_disposables_strategy; + + /** + * @brief No any disposable logic at all. Used only inside proxy-forwarding operators where extra disposable logic not requires + */ + struct none_disposables_strategy; + + /** + * @brief Keep disposables inside dynamic_disposables_container container (based on std::vector) + */ + using dynamic_disposables_strategy = local_disposables_strategy; + + /** + * @brief Keep disposables inside static_disposables_container container (based on std::array) + */ + template + using static_disposables_strategy = local_disposables_strategy>; + + using default_disposables_strategy = dynamic_disposables_strategy; namespace details { - template - consteval auto* deduce_disposable_strategy() + template + consteval auto* deduce_optimal_disposables_strategy() { - if constexpr (has_disposable_strategy) - return static_cast(nullptr); + static_assert(mode == disposables_mode::Auto || mode == disposables_mode::None || mode == disposables_mode::External); + + if constexpr (mode == disposables_mode::Auto) + return static_cast(nullptr); + else if constexpr (mode == disposables_mode::None) + return static_cast(nullptr); + else if constexpr (mode == disposables_mode::External) + return static_cast(nullptr); else - return static_cast*>(nullptr); + return static_cast(nullptr); } } // namespace details - template - using deduce_disposable_strategy_t = std::remove_pointer_t())>; + template + using deduce_optimal_disposables_strategy_t = std::remove_pointer_t())>; } // namespace rpp::details::observers diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index 0ec799162..6a585961e 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -91,6 +91,8 @@ namespace rpp::details::observers class dynamic_strategy final { public: + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; + template Strategy> requires (!rpp::constraint::decayed_same_as>) explicit dynamic_strategy(observer&& obs) diff --git a/src/rpp/rpp/observers/fwd.hpp b/src/rpp/rpp/observers/fwd.hpp index d7236777a..a5abf0e92 100644 --- a/src/rpp/rpp/observers/fwd.hpp +++ b/src/rpp/rpp/observers/fwd.hpp @@ -36,18 +36,31 @@ namespace rpp::constraint }; /** - * @brief Concept to define strategy to override observer behavior. Strategy must be able to handle all observer's - * callbacks: on_next/on_error/on_completed + * @concept observer_strategy + * @brief Concept defines requirements for an user-defined observer strategy. * * @tparam S is Strategy * @tparam Type is type of value observer would obtain * + * @details Strategy should be able to handle: + * - on_next for both: const lvalue ref and rvalue ref of Type + * - on_error(exception_ptr) for unsuccessful termination event + * - on_completed() for successful termination event + * - set_upstream(disposable) for custom disposables related logic. In most cases you should OR do nothing OR just forward disposable to downstream observer (and set preferred_disposables_mode to None) OR fully handle disposales related logic properly + * - is_disposed() for extending custom disposables related logic with indicating current status. + * - `static constexpr rpp::details::observers::disposables_mode preferred_disposables_mode` with preferred disposables logic for observer over this strategy + * * @ingroup observers */ template concept observer_strategy = observer_strategy_base && requires(const S& const_strategy, const Type& v, Type& mv) { const_strategy.on_next(v); const_strategy.on_next(std::move(mv)); + + // strategy has to provide it's preferred disposable mode: minimal level of disposable logic it could work with. + // if observer_strategy fully controls disposable logic or just forwards disposable to downstream observer: rpp::details::observers::disposables_mode::None + // if you not sure about this field - just use rpp::details::observers::disposables_mode::Auto + { std::decay_t::preferred_disposables_mode } -> rpp::constraint::decayed_same_as; /* = rpp::details::observers::disposables_mode::Auto */ }; } // namespace rpp::constraint @@ -61,35 +74,33 @@ namespace rpp::details::observers std::invocable OnError, std::invocable<> OnCompleted> struct lambda_strategy; -} // namespace rpp::details::observers -namespace rpp::details -{ - template - struct with_disposable_strategy + template + struct override_disposables_strategy { - using preferred_disposable_strategy = Strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; - with_disposable_strategy() = delete; + override_disposables_strategy() = delete; - static void on_next(const auto&) noexcept; - static void on_error(const std::exception_ptr&) noexcept; - static void on_completed() noexcept; + consteval static void on_next(const auto&) noexcept {} + consteval static void on_error(const std::exception_ptr&) noexcept {} + consteval static void on_completed() noexcept {} - static void set_upstream(const disposable_wrapper&) noexcept; - static bool is_disposed() noexcept; + consteval static void set_upstream(const disposable_wrapper&) noexcept {} + consteval static bool is_disposed() noexcept { return false; } }; -} // namespace rpp::details - +} // namespace rpp::details::observers namespace rpp { template Strategy> class observer; - template Strategy, - rpp::details::observers::constraint::disposable_strategy DisposableStrategy = rpp::details::observers::external_disposable_strategy> - using observer_with_disposable = observer>; + /* + * @brief Same as rpp::observer, but with passed rpp::composite_disposable_wrapper to constructor instead (as a result, it's possible to dispose observer early outside) + * @ingroup observers + */ + template Strategy> + using observer_with_external_disposable = observer>>; template class dynamic_observer; @@ -107,8 +118,12 @@ namespace rpp template OnNext, std::invocable OnError, std::invocable<> OnCompleted> using lambda_observer = observer>; + /* + * @brief Same as rpp::lambda_observer, but with passed rpp::composite_disposable_wrapper to constructor instead (as a result, it's possible to dispose observer early outside) + * @ingroup observers + */ template OnNext, std::invocable OnError, std::invocable<> OnCompleted> - using lambda_observer_with_disposable = observer_with_disposable>; + using lambda_observer_with_external_disposable = observer_with_external_disposable>; /** * @brief Constructs observer specialized with passed callbacks. Most easiesest way to construct observer "on the fly" via lambdas and etc. @@ -149,10 +164,10 @@ namespace rpp auto make_lambda_observer(const rpp::composite_disposable_wrapper& d, OnNext&& on_next, OnError&& on_error = {}, - OnCompleted&& on_completed = {}) -> lambda_observer_with_disposable, - std::decay_t, - std::decay_t>; + OnCompleted&& on_completed = {}) -> lambda_observer_with_external_disposable, + std::decay_t, + std::decay_t>; /** * @brief Constructs observer specialized with passed callbacks. Most easiesest way to construct observer "on the fly" via lambdas and etc. @@ -206,7 +221,7 @@ namespace rpp::details::observers { struct fake_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; static void on_next(const auto&) noexcept {} diff --git a/src/rpp/rpp/observers/lambda_observer.hpp b/src/rpp/rpp/observers/lambda_observer.hpp index 411057d4a..96709d95d 100644 --- a/src/rpp/rpp/observers/lambda_observer.hpp +++ b/src/rpp/rpp/observers/lambda_observer.hpp @@ -22,6 +22,8 @@ namespace rpp::details::observers std::invocable<> OnCompleted> struct lambda_strategy { + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + template TOnNext, rpp::constraint::decayed_same_as TOnError, rpp::constraint::decayed_same_as TOnCompleted> explicit lambda_strategy(TOnNext&& on_next, TOnError&& on_error, TOnCompleted&& on_completed) : on_next{std::forward(on_next)} @@ -69,15 +71,15 @@ namespace rpp auto make_lambda_observer(const rpp::composite_disposable_wrapper& d, OnNext&& on_next, OnError&& on_error, - OnCompleted&& on_completed) -> lambda_observer_with_disposable, - std::decay_t, - std::decay_t> + OnCompleted&& on_completed) -> lambda_observer_with_external_disposable, + std::decay_t, + std::decay_t> { - return lambda_observer_with_disposable, - std::decay_t, - std::decay_t>{ + return lambda_observer_with_external_disposable, + std::decay_t, + std::decay_t>{ d, std::forward(on_next), std::forward(on_error), diff --git a/src/rpp/rpp/observers/mock_observer.hpp b/src/rpp/rpp/observers/mock_observer.hpp index 98cf7e29e..0f91726c0 100644 --- a/src/rpp/rpp/observers/mock_observer.hpp +++ b/src/rpp/rpp/observers/mock_observer.hpp @@ -20,6 +20,8 @@ template class mock_observer_strategy final { public: + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + explicit mock_observer_strategy(bool copy_values = true) : m_state{std::make_shared(copy_values)} { @@ -54,7 +56,7 @@ class mock_observer_strategy final std::vector get_received_values() const { return m_state->vals; } auto get_observer() const { return rpp::observer>{*this}; } - auto get_observer(rpp::composite_disposable_wrapper d) const { return rpp::observer_with_disposable>{std::move(d), *this}; } + auto get_observer(rpp::composite_disposable_wrapper d) const { return rpp::observer_with_external_disposable>{std::move(d), *this}; } private: struct state diff --git a/src/rpp/rpp/observers/observer.hpp b/src/rpp/rpp/observers/observer.hpp index 462467412..6e6750f90 100644 --- a/src/rpp/rpp/observers/observer.hpp +++ b/src/rpp/rpp/observers/observer.hpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -24,7 +24,7 @@ namespace rpp::details { - template Strategy, observers::constraint::disposable_strategy DisposablesStrategy> + template Strategy, observers::constraint::disposables_strategy DisposablesStrategy> class observer_impl { protected: @@ -37,7 +37,7 @@ namespace rpp::details } public: - using preferred_disposable_strategy = observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; using on_next_lvalue = void (observer_impl::*)(const Type&) const noexcept; using on_next_rvalue = void (observer_impl::*)(Type&&) const noexcept; @@ -157,7 +157,7 @@ namespace rpp * @warning By default observer is not copyable, only movable. If you need to COPY your observer, you need to convert it to rpp::dynamic_observer via rpp::observer::as_dynamic * @warning Expected that observer would be subscribed only to ONE observable ever. It can keep internal state and track it it was disposed or not. So, subscribing same observer multiple time follows unspecified behavior. * @warning If you are passing disposable to ctor, then state of this disposable would be used used (if empty disposable or disposed -> observer is disposed by default) - * @warning It is expected, that member of this observer would be called in SERIAL way. It means, no any parallel calls allowed, only serial ones from one observable. + * @warning It is expected that members of this observer are called in a SERIAL manner. This means no parallel or concurrent calls are allowed—only serial calls or those guarded under a lock. * * @tparam Type of value this observer can handle * @tparam Strategy used to provide logic over observer's callbacks @@ -168,11 +168,11 @@ namespace rpp class observer; template Strategy> - class observer final : public details::observer_impl> + class observer final : public details::observer_impl> { public: - using DisposableStrategy = details::observers::deduce_disposable_strategy_t; - using Base = details::observer_impl>; + using DisposableStrategy = details::observers::deduce_optimal_disposables_strategy_t; + using Base = details::observer_impl; template requires constraint::is_constructible_from @@ -200,9 +200,8 @@ namespace rpp } }; - template Strategy, rpp::details::observers::constraint::disposable_strategy DisposableStrategy> - class observer> final - : public details::observer_impl + template Strategy, rpp::details::observers::constraint::disposables_strategy DisposableStrategy> + class observer> final : public details::observer_impl { public: using Base = details::observer_impl; @@ -235,13 +234,13 @@ namespace rpp template class observer> - : public details::observer_impl, details::observers::none_disposable_strategy> + : public details::observer_impl, details::observers::none_disposables_strategy> { public: template TStrategy> requires (!std::same_as>) observer(observer&& other) - : details::observer_impl, details::observers::none_disposable_strategy>{details::observers::none_disposable_strategy{}, std::move(other)} + : details::observer_impl, details::observers::none_disposables_strategy>{details::observers::none_disposables_strategy{}, std::move(other)} { } diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 5f8d89847..89744d67e 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -12,10 +12,10 @@ /** * @defgroup operators Operators - * @brief Operators provide a way to modify observables and extend them with custom logic. - * @details By default, an observable emits values based on some underlying logic. For example, it might iterate over a vector and emit values. Operators allow you to make such a stream more complex, for example, by emitting only certain values, transforming them to strings, etc. As a result, you get another stream of different values, but more suitable for a specific case. + * @brief Operators modify observables and extend them with custom logic. + * @details Observables emit values based on underlying logic, such as iterating over a vector and etc. Operators allow you to enhance this stream, for example, by filtering values, transforming them, etc., resulting in a more suitable stream for specific cases. * - * For example, you can create an observable to get characters from console input, continue until the '0' character is encountered, filter out non-letter characters, and send the remaining letters as uppercase to the observer. With operators, this is straightforward to implement correctly: + * Example: Create an observable to read characters from console input, continue until '0' is encountered, filter out non-letter characters, and send the remaining letters as uppercase to the observer: * * @code{.cpp} * #include @@ -37,7 +37,63 @@ * } * @endcode * - * Check the [API Reference](https://victimsnino.github.io/ReactivePlusPlus/v2/docs/html/group__operators.html) for more details about operators. + * @par How operators work and how to create your own? + * Example: + * + * @code{cpp} + * rpp::source::create([](const auto& observer){ + * observer.on_next(1); + * observer.on_completed(); + * }); + * @endcode + * + * This example creates an observable of `int` using the `create` operator, which emits the value `1` and then completes. + * The type of this observable is `rpp::observable`, where `...` is an implementation-defined type. + * To convert `int` to `std::string`, you can use the `map` operator: + * + * @code{cpp} + * rpp::source::create([](const auto& observer){ + * observer.on_next(1); + * observer.on_completed(); + * }) + * | rpp::operators::map([](int v){ return std::to_string(v); }); + * @endcode + * + * Now it is an `observable of strings` (`rpp::observable`). The `map` operator is a functor-adaptor that accepts an observable and returns another observable. + * It transforms the original observable's type to the "final type" by invoking the passed function. In this case, the final type is `std::string`. + * The `map` operator can be implemented in multiple ways: + * + * 1) call-based (function/functor or others) - operator accepts (old) observable and returns new (modified) observable + * @code{cpp} + * template + * struct map + * { + * Fn fn{}; + * + * template + * auto operator()(const rpp::observable& observable) const { + * using FinalType = std::invoke_result_t; + * return rpp::source::create([observable, fn](const rpp::dynamic_observer& observer) + * { + * observable.subscribe([observer, fn](const auto& v) { observer.on_next(fn(v)); }, + * [observer](const std::exception_ptr& err) { observer.on_error(err); }, + * [observer]() { observer.on_completed(); }); + * }); + * } + * } + * @endcode + * It is template for such an functor-adaptor. It is also fully valid example of call-based operator: + * @code{cpp} + * rpp::source::just(1) + * | [](const auto& observable) { return rpp::source::concat(observable, rpp::source::just(2)); }; + * @endcode + * This converts the observable to a concatenation of the original observable and `just(2)`. + * + * 2) type-traits based - should satisfy @link rpp::constraint::operator_ @endlink concept.
+ * For example, you can implement such an operator like this: + * @snippet readme.cpp simple_custom_map + * But in this case you are missing disposables-related functionality. + * So, it is better to implement it via providing custom observer's strategy with correct handling of disposables. Check real @link rpp::operators::map @endlink implementation for it =) * * @see https://reactivex.io/documentation/operators.html * @ingroup rpp diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index b82286f8e..d45c19e1c 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -27,7 +27,7 @@ namespace rpp::operators::details static_assert(std::same_as>); public: - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; buffer_observer_strategy(TObserver&& observer, size_t count) : m_observer{std::move(observer)} @@ -80,8 +80,8 @@ namespace rpp::operators::details using observer_strategy = buffer_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 053725378..fbe29fced 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -138,8 +138,9 @@ namespace rpp::operators::details template struct concat_inner_observer_strategy : public concat_observer_strategy_base { - using base = concat_observer_strategy_base; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + using base = concat_observer_strategy_base; using base::concat_observer_strategy_base; template @@ -163,8 +164,8 @@ namespace rpp::operators::details template struct concat_observer_strategy : public concat_observer_strategy_base { - using base = concat_observer_strategy_base; - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + using base = concat_observer_strategy_base; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; concat_observer_strategy(TObserver&& observer) : base{std::make_shared>(std::move(observer))} @@ -204,8 +205,8 @@ namespace rpp::operators::details using observer_strategy = concat_observer_strategy; }; - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + template + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 0bb9c1274..3130fe9e8 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -110,7 +110,7 @@ namespace rpp::operators::details template struct debounce_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr> state{}; @@ -153,8 +153,8 @@ namespace rpp::operators::details using result_type = T; }; - template - using updated_disposable_strategy = typename Prev::template add<1>; + template + using updated_optimal_disposables_strategy = typename Prev::template add<1>; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 612df5c35..8b924ee4b 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -68,6 +68,8 @@ namespace rpp::operators::details template struct delay_observer_strategy { + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const @@ -170,8 +172,8 @@ namespace rpp::operators::details using result_type = T; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index b07327b92..89fbe5f60 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -49,6 +49,8 @@ namespace rpp::operators::details template struct combining_observer_strategy { + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + std::shared_ptr state{}; void set_upstream(const rpp::disposable_wrapper& d) const @@ -89,8 +91,8 @@ namespace rpp::operators::details constexpr static bool own_current_queue = true; }; - template - using updated_disposable_strategy = ::rpp::details::observables::default_disposable_strategy_selector; // TODO: sum of Prev + TObservables + template + using updated_optimal_disposables_strategy = ::rpp::details::observables::default_disposables_strategy; // TODO: sum of Prev + TObservables template auto lift(Observer&& observer) const diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index 91b0f3e60..b6d916316 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -26,7 +26,7 @@ namespace rpp::operators::details { struct observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr> state{}; @@ -42,7 +42,7 @@ namespace rpp::operators::details }; public: - using expected_disposable_strategy = typename rpp::details::observables::deduce_disposable_strategy_t>::template add<1>; + using optimal_disposables_strategy = typename subjects::details::subject_state::optimal_disposables_strategy::template add<1>; explicit forwarding_subject(disposable_wrapper_impl refcount) : m_refcount{std::move(refcount)} @@ -56,7 +56,7 @@ namespace rpp::operators::details auto get_observable() const { - return subjects::details::create_subject_on_subscribe_observable([state = m_state.as_weak(), refcount = m_refcount] TObs>(TObs&& observer) { + return subjects::details::create_subject_on_subscribe_observable([state = m_state.as_weak(), refcount = m_refcount] TObs>(TObs&& observer) { if (const auto locked_state = state.lock()) { if (const auto locked = refcount.lock()) diff --git a/src/rpp/rpp/operators/distinct.hpp b/src/rpp/rpp/operators/distinct.hpp index 9f2151fc8..264784af5 100644 --- a/src/rpp/rpp/operators/distinct.hpp +++ b/src/rpp/rpp/operators/distinct.hpp @@ -23,7 +23,7 @@ namespace rpp::operators::details template struct distinct_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; mutable std::unordered_set past_values{}; @@ -62,8 +62,8 @@ namespace rpp::operators::details using observer_strategy = distinct_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/distinct_until_changed.hpp b/src/rpp/rpp/operators/distinct_until_changed.hpp index 18f676acd..3b5065759 100644 --- a/src/rpp/rpp/operators/distinct_until_changed.hpp +++ b/src/rpp/rpp/operators/distinct_until_changed.hpp @@ -22,7 +22,7 @@ namespace rpp::operators::details template struct distinct_until_changed_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS EqualityFn comparator; @@ -63,8 +63,8 @@ namespace rpp::operators::details using observer_strategy = distinct_until_changed_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/element_at.hpp b/src/rpp/rpp/operators/element_at.hpp index 1cf3afa61..c9063e561 100644 --- a/src/rpp/rpp/operators/element_at.hpp +++ b/src/rpp/rpp/operators/element_at.hpp @@ -23,7 +23,7 @@ namespace rpp::operators::details template struct element_at_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; mutable size_t count; @@ -75,8 +75,8 @@ namespace rpp::operators::details using observer_strategy = element_at_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/filter.hpp b/src/rpp/rpp/operators/filter.hpp index 887757957..aca3abd74 100644 --- a/src/rpp/rpp/operators/filter.hpp +++ b/src/rpp/rpp/operators/filter.hpp @@ -22,7 +22,7 @@ namespace rpp::operators::details template struct filter_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS Fn fn; @@ -59,8 +59,8 @@ namespace rpp::operators::details using observer_strategy = filter_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/finally.hpp b/src/rpp/rpp/operators/finally.hpp index cdcf843fc..3735cdf95 100644 --- a/src/rpp/rpp/operators/finally.hpp +++ b/src/rpp/rpp/operators/finally.hpp @@ -26,8 +26,8 @@ namespace rpp::operators::details using result_type = T; }; - template - using updated_disposable_strategy = typename Prev::template add<1>; + template + using updated_optimal_disposables_strategy = typename Prev::template add<1>; RPP_NO_UNIQUE_ADDRESS LastFn last_fn; diff --git a/src/rpp/rpp/operators/first.hpp b/src/rpp/rpp/operators/first.hpp index fd8f79cc1..93c9b88a9 100644 --- a/src/rpp/rpp/operators/first.hpp +++ b/src/rpp/rpp/operators/first.hpp @@ -20,7 +20,7 @@ namespace rpp::operators::details template struct first_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; @@ -56,8 +56,8 @@ namespace rpp::operators::details using observer_strategy = first_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/group_by.hpp b/src/rpp/rpp/operators/group_by.hpp index 1e6cccba4..16c4f5a17 100644 --- a/src/rpp/rpp/operators/group_by.hpp +++ b/src/rpp/rpp/operators/group_by.hpp @@ -38,7 +38,7 @@ namespace rpp::operators::details template struct group_by_inner_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; rpp::composite_disposable_wrapper disposable; @@ -61,7 +61,7 @@ namespace rpp::operators::details template struct group_by_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; using TKey = rpp::utils::decayed_invoke_result_t; using Type = rpp::utils::decayed_invoke_result_t; @@ -140,7 +140,8 @@ namespace rpp::operators::details template struct group_by_observable_strategy { - using value_type = T; + using value_type = T; + using optimal_disposables_strategy = typename rpp::subjects::publish_subject::optimal_disposables_strategy; rpp::subjects::publish_subject subj; std::weak_ptr disposable; @@ -176,8 +177,8 @@ namespace rpp::operators::details using observer_strategy = group_by_observer_strategy; }; - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + template + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/last.hpp b/src/rpp/rpp/operators/last.hpp index 9126303b9..fc77d6cdb 100644 --- a/src/rpp/rpp/operators/last.hpp +++ b/src/rpp/rpp/operators/last.hpp @@ -22,7 +22,7 @@ namespace rpp::operators::details template struct last_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; mutable std::optional value{}; @@ -64,8 +64,8 @@ namespace rpp::operators::details using observer_strategy = last_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/map.hpp b/src/rpp/rpp/operators/map.hpp index 73159588d..6356c1300 100644 --- a/src/rpp/rpp/operators/map.hpp +++ b/src/rpp/rpp/operators/map.hpp @@ -22,7 +22,7 @@ namespace rpp::operators::details template struct map_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS Fn fn; @@ -58,8 +58,8 @@ namespace rpp::operators::details using observer_strategy = map_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 72c1897a1..ed5a4ae61 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -51,6 +51,8 @@ namespace rpp::operators::details template struct merge_observer_base_strategy { + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + merge_observer_base_strategy(std::shared_ptr>&& state) : m_state{std::move(state)} { @@ -143,8 +145,8 @@ namespace rpp::operators::details using observer_strategy = merge_observer_strategy>; }; - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + template + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; }; template @@ -160,8 +162,8 @@ namespace rpp::operators::details using result_type = T; }; - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + template + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; template void subscribe(Observer&& observer, const rpp::details::observables::chain& observable_strategy) const diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 4426cc250..87ce69773 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -20,7 +20,7 @@ namespace rpp::operators::details template struct on_error_resume_next_inner_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr observer; @@ -49,7 +49,7 @@ namespace rpp::operators::details template struct on_error_resume_next_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; on_error_resume_next_observer_strategy(TObserver&& observer, const Selector& selector) : state{std::make_shared(std::move(observer))} @@ -112,8 +112,8 @@ namespace rpp::operators::details using observer_strategy = on_error_resume_next_observer_strategy; }; - template - using updated_disposable_strategy = rpp::details::observables::default_disposable_strategy_selector; + template + using updated_optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp index de3c6fbba..b4fb4d24d 100644 --- a/src/rpp/rpp/operators/reduce.hpp +++ b/src/rpp/rpp/operators/reduce.hpp @@ -20,8 +20,8 @@ namespace rpp::operators::details template struct reduce_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - using Seed = rpp::utils::extract_observer_type_t; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; + using Seed = rpp::utils::extract_observer_type_t; RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS mutable Seed seed; @@ -62,15 +62,15 @@ namespace rpp::operators::details using observer_strategy = reduce_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; template struct reduce_no_seed_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - using Seed = rpp::utils::extract_observer_type_t; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; + using Seed = rpp::utils::extract_observer_type_t; RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS Accumulator accumulator; @@ -115,8 +115,8 @@ namespace rpp::operators::details using observer_strategy = reduce_no_seed_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index dbeb667f8..73797acdc 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -44,7 +44,7 @@ namespace rpp::operators::details template struct retry_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr> state; mutable bool locally_disposed{}; @@ -120,8 +120,8 @@ namespace rpp::operators::details using result_type = T; }; - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + template + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; template void subscribe(TObserver&& observer, TObservable&& observble) const diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index ef9f28a22..79155d8aa 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -50,7 +50,7 @@ namespace rpp::operators::details typename TNotifier> struct retry_when_impl_inner_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr> state; mutable bool locally_disposed{}; @@ -89,7 +89,7 @@ namespace rpp::operators::details typename TNotifier> struct retry_when_impl_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr> state; @@ -154,8 +154,8 @@ namespace rpp::operators::details using result_type = T; }; - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + template + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; template void subscribe(TObserver&& observer, TObservable&& observable) const diff --git a/src/rpp/rpp/operators/scan.hpp b/src/rpp/rpp/operators/scan.hpp index 1fdff676f..9d72f7f02 100644 --- a/src/rpp/rpp/operators/scan.hpp +++ b/src/rpp/rpp/operators/scan.hpp @@ -23,7 +23,7 @@ namespace rpp::operators::details template struct scan_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS mutable Seed seed; @@ -64,14 +64,14 @@ namespace rpp::operators::details using observer_strategy = scan_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; template struct scan_no_seed_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; using Seed = rpp::utils::extract_observer_type_t; @@ -115,8 +115,8 @@ namespace rpp::operators::details using observer_strategy = scan_no_seed_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/skip.hpp b/src/rpp/rpp/operators/skip.hpp index dd6aaea46..af68c2173 100644 --- a/src/rpp/rpp/operators/skip.hpp +++ b/src/rpp/rpp/operators/skip.hpp @@ -22,7 +22,7 @@ namespace rpp::operators::details template struct skip_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; mutable size_t count{}; @@ -58,8 +58,8 @@ namespace rpp::operators::details using observer_strategy = skip_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/subscribe.hpp b/src/rpp/rpp/operators/subscribe.hpp index b76ca1bf9..8db7d3561 100644 --- a/src/rpp/rpp/operators/subscribe.hpp +++ b/src/rpp/rpp/operators/subscribe.hpp @@ -90,7 +90,7 @@ namespace rpp::operators::details template Strategy> rpp::composite_disposable_wrapper operator()(const rpp::observable& observable) && { - observable.subscribe(observer_with_disposable>{m_disposable, std::move(m_observer)}); + observable.subscribe(observer_with_external_disposable>{m_disposable, std::move(m_observer)}); return m_disposable; } diff --git a/src/rpp/rpp/operators/subscribe_on.hpp b/src/rpp/rpp/operators/subscribe_on.hpp index 918137bb7..cac118664 100644 --- a/src/rpp/rpp/operators/subscribe_on.hpp +++ b/src/rpp/rpp/operators/subscribe_on.hpp @@ -41,8 +41,8 @@ namespace rpp::operators::details using result_type = T; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index 5a578b0d9..5ad52dd4e 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -46,7 +46,7 @@ namespace rpp::operators::details class switch_on_next_inner_observer_strategy { public: - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; switch_on_next_inner_observer_strategy(const std::shared_ptr>& state, const composite_disposable_wrapper& refcounted) : m_state{state} @@ -85,7 +85,7 @@ namespace rpp::operators::details class switch_on_next_observer_strategy { public: - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; switch_on_next_observer_strategy(TObserver&& obs) : m_state{init_state(std::move(obs))} @@ -148,8 +148,8 @@ namespace rpp::operators::details using observer_strategy = switch_on_next_observer_strategy; }; - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + template + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/take.hpp b/src/rpp/rpp/operators/take.hpp index dce510d1b..9b376e375 100644 --- a/src/rpp/rpp/operators/take.hpp +++ b/src/rpp/rpp/operators/take.hpp @@ -22,7 +22,7 @@ namespace rpp::operators::details template struct take_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; mutable size_t count{}; @@ -62,8 +62,8 @@ namespace rpp::operators::details using observer_strategy = take_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/take_last.hpp b/src/rpp/rpp/operators/take_last.hpp index f12b26620..fcfbc5a87 100644 --- a/src/rpp/rpp/operators/take_last.hpp +++ b/src/rpp/rpp/operators/take_last.hpp @@ -23,7 +23,7 @@ namespace rpp::operators::details class take_last_observer_strategy { public: - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; take_last_observer_strategy(TObserver&& observer, size_t count) : m_observer{std::move(observer)} @@ -93,8 +93,8 @@ namespace rpp::operators::details using observer_strategy = take_last_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index 2046bf0bc..6a9933953 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -46,7 +46,7 @@ namespace rpp::operators::details template struct take_until_observer_strategy_base { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr> state; @@ -102,8 +102,8 @@ namespace rpp::operators::details constexpr static bool own_current_queue = true; }; - template - using updated_disposable_strategy = rpp::details::observables::default_disposable_strategy_selector; + template + using updated_optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy; template auto lift(Observer&& observer) const diff --git a/src/rpp/rpp/operators/take_while.hpp b/src/rpp/rpp/operators/take_while.hpp index 574d4047e..c66423134 100644 --- a/src/rpp/rpp/operators/take_while.hpp +++ b/src/rpp/rpp/operators/take_while.hpp @@ -20,7 +20,7 @@ namespace rpp::operators::details template struct take_while_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS Fn fn; @@ -59,8 +59,8 @@ namespace rpp::operators::details using observer_strategy = take_while_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/tap.hpp b/src/rpp/rpp/operators/tap.hpp index be29f9117..4e4680011 100644 --- a/src/rpp/rpp/operators/tap.hpp +++ b/src/rpp/rpp/operators/tap.hpp @@ -24,7 +24,7 @@ namespace rpp::operators::details rpp::constraint::decayed_type OnCompleted> struct tap_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS OnNext onNext; @@ -74,8 +74,8 @@ namespace rpp::operators::details using observer_strategy = tap_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/throttle.hpp b/src/rpp/rpp/operators/throttle.hpp index bd52d9904..afebf2f6a 100644 --- a/src/rpp/rpp/operators/throttle.hpp +++ b/src/rpp/rpp/operators/throttle.hpp @@ -24,7 +24,7 @@ namespace rpp::operators::details template struct throttle_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; RPP_NO_UNIQUE_ADDRESS TObserver observer; rpp::schedulers::duration duration{}; @@ -64,8 +64,8 @@ namespace rpp::operators::details using observer_strategy = throttle_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 4d2ce2709..2994372e1 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -19,7 +19,7 @@ namespace rpp::operators::details { - template + template class timeout_disposable final : public rpp::composite_disposable_impl { public: @@ -48,7 +48,7 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS const TFallbackObservable m_fallback; }; - template + template struct timeout_disposable_wrapper { std::shared_ptr> disposable; @@ -61,10 +61,10 @@ namespace rpp::operators::details } }; - template + template struct timeout_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr> disposable; @@ -114,15 +114,18 @@ namespace rpp::operators::details constexpr static bool own_current_queue = true; }; + template + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; + rpp::schedulers::duration period; RPP_NO_UNIQUE_ADDRESS TFallbackObservable fallback; RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; - template - auto lift_with_disposable_strategy(Observer&& observer) const + template + auto lift_with_disposables_strategy(Observer&& observer) const { using worker_t = rpp::schedulers::utils::get_worker_t; - using container = typename DisposableStrategy::disposable_container; + using container = typename DisposableStrategy::disposables_container; const auto timeout = worker_t::now() + period; @@ -163,14 +166,17 @@ namespace rpp::operators::details constexpr static bool own_current_queue = true; }; + template + using updated_optimal_disposables_strategy = typename timeout_t, TScheduler>::template updated_optimal_disposables_strategy; + rpp::schedulers::duration period; RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; - template - auto lift_with_disposable_strategy(Observer&& observer) const + template + auto lift_with_disposables_strategy(Observer&& observer) const { return timeout_t, TScheduler>{period, rpp::source::error>(std::make_exception_ptr(rpp::utils::timeout_reached{"Timeout reached"})), scheduler} - .template lift_with_disposable_strategy(std::forward(observer)); + .template lift_with_disposables_strategy(std::forward(observer)); } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index c62ce7ee8..3d1fa14cc 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -36,7 +36,7 @@ namespace rpp::operators::details static_assert(std::same_as().get_observable())>); public: - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; window_observer_strategy(TObserver&& observer, size_t count) : m_observer{std::move(observer)} @@ -123,8 +123,8 @@ namespace rpp::operators::details using observer_strategy = window_observer_strategy; }; - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + template + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index c51d918ec..6b55680e9 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -74,7 +74,7 @@ namespace rpp::operators::details template struct window_toggle_closing_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr disposable; std::shared_ptr state; @@ -112,6 +112,8 @@ namespace rpp::operators::details template struct window_toggle_opening_observer_strategy { + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + std::shared_ptr disposable; std::shared_ptr state; @@ -145,7 +147,7 @@ namespace rpp::operators::details using TState = window_toggle_state; public: - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; window_toggle_observer_strategy(TObserver&& observer, const TOpeningsObservable& openings, const TClosingsSelectorFn& closings) : m_state{std::make_shared(std::move(observer), closings)} @@ -203,8 +205,8 @@ namespace rpp::operators::details using observer_strategy = window_toggle_observer_strategy, TOpeningsObservable, TClosingsSelectorFn>; }; - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + template + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index c446ca27b..f952d43f8 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -50,6 +50,8 @@ namespace rpp::operators::details template struct with_latest_from_inner_observer_strategy { + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + std::shared_ptr> state{}; void set_upstream(const rpp::disposable_wrapper& d) const @@ -81,9 +83,9 @@ namespace rpp::operators::details requires std::invocable struct with_latest_from_observer_strategy { - using Disposable = with_latest_from_state; - using Result = std::invoke_result_t; - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + using Disposable = with_latest_from_state; + using Result = std::invoke_result_t; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr state{}; @@ -139,8 +141,8 @@ namespace rpp::operators::details constexpr static bool own_current_queue = true; }; - template - using updated_disposable_strategy = rpp::details::observables::default_disposable_strategy_selector; + template + using updated_optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy; template auto lift(Observer&& observer) const diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index bf2c9ef7e..d3ab4f60c 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -27,7 +27,7 @@ namespace rpp::schedulers * * @par Why do we need it? * This scheduler used to prevent recursion calls and making planar linear execution of schedulables. For example: - * \code{.cpp} + * @code{.cpp} * auto worker = rpp::schedulers::current_thread::create_worker(); * worker.schedule([&worker](const auto& handler) * { @@ -54,7 +54,7 @@ namespace rpp::schedulers * std::cout << "Task 1 ends" << std::endl; * return rpp::schedulers::optional_delay_from_now{}; * }, handler); - * \endcode + * @endcode * Would lead to: * - "Task 1 starts" * - "Task 1 ends" @@ -67,17 +67,17 @@ namespace rpp::schedulers * To have any visible impact you need to use it at least **twice** during same observable. For example, `rpp::source::just` source uses it as default scheduler as well as `rpp::operators::merge` operator (which just "owns" it during subscription). * * For example, this one - * \code{.cpp} + * @code{.cpp} * rpp::source::just(1, 2, 3) * | rpp::operators::merge_with(rpp::source::just(4, 5, 6)) * | rpp::operators::subscribe([](int v) { std::cout << v << " "; }); - * \endcode + * @endcode * Procedes output `1 4 2 5 3 6` due to `merge_with` takes ownership over this scheduler during subscription, both sources schedule their first emissions into scheduler, then `merge_with` frees scheduler and it starts to proceed scheduled actions. As a result it continues interleaving of values. In case of usingg `rpp::schedulers::immediate` it would be: - * \code{.cpp} + * @code{.cpp} * rpp::source::just(rpp::schedulers::immediate{}, 1, 2, 3) * | rpp::operators::merge_with(rpp::source::just(rpp::schedulers::immediate{}, 4, 5, 6)) * | rpp::operators::subscribe([](int v) { std::cout << v << " "; }); - * \endcode + * @endcode * With output `1 2 3 4 5 6` * * @ingroup schedulers diff --git a/src/rpp/rpp/schedulers/fwd.hpp b/src/rpp/rpp/schedulers/fwd.hpp index 67ea51c32..106a1d9df 100644 --- a/src/rpp/rpp/schedulers/fwd.hpp +++ b/src/rpp/rpp/schedulers/fwd.hpp @@ -27,10 +27,10 @@ namespace rpp::schedulers * @brief Timepoint of next execution would be calculcated from NOW timpoint (time of returning from schedulable) * * @details Implementation looks like this - * \code{.cpp} + * @code{.cpp} * const auto duration_from_now = schedulable(); * schedule(now() + duration_from_now, schedulable); - * \endcode + * @endcode */ struct delay_from_now { @@ -46,12 +46,12 @@ namespace rpp::schedulers * @brief Timepoint of next execution would be calculcated from timepoint of current scheduling * * @details Implementation looks like this - * \code{.cpp} + * @code{.cpp} * const auto timepoint_for_schedulable = schedulable->get_timepoint(); * sleep_until(timepoint_for_schedulable); * const auto duration_from_this_timepoint = schedulable(); * schedule(timepoint_for_schedulable + duration_from_this_timepoint, schedulable); - * \endcode + * @endcode */ struct delay_from_this_timepoint { diff --git a/src/rpp/rpp/schedulers/immediate.hpp b/src/rpp/rpp/schedulers/immediate.hpp index 2c539610f..6d8ccee63 100644 --- a/src/rpp/rpp/schedulers/immediate.hpp +++ b/src/rpp/rpp/schedulers/immediate.hpp @@ -22,7 +22,7 @@ namespace rpp::schedulers /** * @brief immediately calls provided schedulable or waits for time_point (in the caller-thread) * @par Example - * \code{.cpp} + * @code{.cpp} * auto worker = rpp::schedulers::immediate::create_worker(); * worker.schedule([&worker](const auto& handler) * { @@ -49,7 +49,7 @@ namespace rpp::schedulers * std::cout << "Task 1 ends" << std::endl; * return rpp::schedulers::optional_delay_from_now{}; * }, handler); - * \endcode + * @endcode * * Would lead to: * - "Task 1 starts" diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index a3a661f40..d8e74722d 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -52,7 +52,7 @@ namespace rpp::details template struct concat_source_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr> state{}; mutable bool locally_disposed{}; @@ -127,6 +127,9 @@ namespace rpp::details using value_type = rpp::utils::extract_observable_type_t>; + using optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy; + + template Strategy> void subscribe(observer&& obs) const { diff --git a/src/rpp/rpp/sources/create.hpp b/src/rpp/rpp/sources/create.hpp index da1bdd27c..e91af1626 100644 --- a/src/rpp/rpp/sources/create.hpp +++ b/src/rpp/rpp/sources/create.hpp @@ -19,7 +19,7 @@ namespace rpp::details struct create_strategy { using value_type = Type; - using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t; + using optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy; RPP_NO_UNIQUE_ADDRESS OnSubscribe subscribe; }; diff --git a/src/rpp/rpp/sources/defer.hpp b/src/rpp/rpp/sources/defer.hpp index d9578a045..0d8e8ce24 100644 --- a/src/rpp/rpp/sources/defer.hpp +++ b/src/rpp/rpp/sources/defer.hpp @@ -20,7 +20,7 @@ namespace rpp::details struct defer_strategy { using value_type = rpp::utils::extract_observable_type_t>; - using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; + using optimal_disposables_strategy = typename std::invoke_result_t::optimal_disposables_strategy; RPP_NO_UNIQUE_ADDRESS Factory observable_factory; diff --git a/src/rpp/rpp/sources/empty.hpp b/src/rpp/rpp/sources/empty.hpp index 49826b6b1..ec9049f3c 100644 --- a/src/rpp/rpp/sources/empty.hpp +++ b/src/rpp/rpp/sources/empty.hpp @@ -20,7 +20,7 @@ namespace rpp::details struct empty_strategy { using value_type = Type; - using expected_disposable_strategy = rpp::details::observables::bool_disposable_strategy_selector; + using optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<0>; static void subscribe(const auto& obs) { obs.on_completed(); } }; diff --git a/src/rpp/rpp/sources/error.hpp b/src/rpp/rpp/sources/error.hpp index f75ea7059..e286f5a94 100644 --- a/src/rpp/rpp/sources/error.hpp +++ b/src/rpp/rpp/sources/error.hpp @@ -19,7 +19,7 @@ namespace rpp::details struct error_strategy { using value_type = Type; - using expected_disposable_strategy = rpp::details::observables::bool_disposable_strategy_selector; + using optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<0>; std::exception_ptr err{}; diff --git a/src/rpp/rpp/sources/from.hpp b/src/rpp/rpp/sources/from.hpp index af447bb8e..b9332926a 100644 --- a/src/rpp/rpp/sources/from.hpp +++ b/src/rpp/rpp/sources/from.hpp @@ -84,7 +84,7 @@ namespace rpp::details { public: using value_type = rpp::utils::iterable_value_t; - using expected_disposable_strategy = rpp::details::observables::bool_disposable_strategy_selector; + using optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<0>; template from_iterable_strategy(const TScheduler& scheduler, Args&&... args) diff --git a/src/rpp/rpp/sources/interval.hpp b/src/rpp/rpp/sources/interval.hpp index 42f3b9233..36d146231 100644 --- a/src/rpp/rpp/sources/interval.hpp +++ b/src/rpp/rpp/sources/interval.hpp @@ -29,7 +29,7 @@ namespace rpp::details struct interval_strategy { using value_type = size_t; - using expected_disposable_strategy = rpp::details::observables::bool_disposable_strategy_selector; + using optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<0>; RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; TimePointOrDuration initial; diff --git a/src/rpp/rpp/sources/never.hpp b/src/rpp/rpp/sources/never.hpp index da4d10ad1..b0c52d9c7 100644 --- a/src/rpp/rpp/sources/never.hpp +++ b/src/rpp/rpp/sources/never.hpp @@ -19,7 +19,7 @@ namespace rpp::details struct never_strategy { using value_type = Type; - using expected_disposable_strategy = rpp::details::observables::bool_disposable_strategy_selector; + using optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<0>; static void subscribe(const auto&) {} }; diff --git a/src/rpp/rpp/subjects.hpp b/src/rpp/rpp/subjects.hpp index 43c04ff7b..abfde6ab9 100644 --- a/src/rpp/rpp/subjects.hpp +++ b/src/rpp/rpp/subjects.hpp @@ -11,10 +11,10 @@ #pragma once /** - * \defgroup subjects Subjects - * \brief Observable is the observable and observer at the same time. Uses as a bridge and for manual sending of values. - * \see https://reactivex.io/documentation/subject.html - * \ingroup rpp + * @defgroup subjects Subjects + * @brief Observable is the observable and observer at the same time. Uses as a bridge and for manual sending of values. + * @see https://reactivex.io/documentation/subject.html + * @ingroup rpp */ #include diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp index 3ab5dc960..d584203c9 100644 --- a/src/rpp/rpp/subjects/behavior_subject.hpp +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -44,7 +44,7 @@ namespace rpp::subjects::details struct observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr state; @@ -64,7 +64,7 @@ namespace rpp::subjects::details }; public: - using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; + using optimal_disposables_strategy = typename details::subject_state::optimal_disposables_strategy; explicit behavior_subject_base(const Type& value) : m_state{disposable_wrapper_impl::make(value)} @@ -83,7 +83,7 @@ namespace rpp::subjects::details auto get_observable() const { - return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) { + return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) { const auto locked = state.lock(); if (!locked->is_disposed()) { diff --git a/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp index 06841aa02..7a8753e00 100644 --- a/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp +++ b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp @@ -19,7 +19,7 @@ namespace rpp::subjects::details struct subject_on_subscribe_strategy { using value_type = Type; - using expected_disposable_strategy = DisposableStrategy; + using optimal_disposables_strategy = DisposableStrategy; RPP_NO_UNIQUE_ADDRESS OnSubscribe subscribe; }; diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 12969ffe3..78bfd6908 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -72,7 +72,7 @@ namespace rpp::subjects::details using state_t = std::variant; public: - using expected_disposable_strategy = rpp::details::observables::atomic_fixed_disposable_strategy_selector<1>; + using optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; subject_state() = default; @@ -165,7 +165,7 @@ namespace rpp::subjects::details current_subs->cend(), std::back_inserter(*subs), [&to_delete](const observer& obs) { - return to_delete != obs.get() && !obs->is_disposed(); + return to_delete != obs.get(); }); } return subs; diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index c095fc501..96ade98ba 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -23,7 +23,7 @@ namespace rpp::subjects::details { struct observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr> state{}; @@ -39,7 +39,7 @@ namespace rpp::subjects::details }; public: - using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; + using optimal_disposables_strategy = typename details::subject_state::optimal_disposables_strategy; publish_subject_base() = default; @@ -50,7 +50,7 @@ namespace rpp::subjects::details auto get_observable() const { - return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) { state.lock()->on_subscribe(std::forward(observer)); }); + return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) { state.lock()->on_subscribe(std::forward(observer)); }); } rpp::disposable_wrapper get_disposable() const diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp index a4a549d3a..01d06be4b 100644 --- a/src/rpp/rpp/subjects/replay_subject.hpp +++ b/src/rpp/rpp/subjects/replay_subject.hpp @@ -84,7 +84,7 @@ namespace rpp::subjects::details struct observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; std::shared_ptr state; @@ -104,7 +104,7 @@ namespace rpp::subjects::details }; public: - using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; + using optimal_disposables_strategy = typename details::subject_state::optimal_disposables_strategy; replay_subject_base() : m_state{disposable_wrapper_impl::make()} @@ -128,7 +128,7 @@ namespace rpp::subjects::details auto get_observable() const { - return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) { + return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) { const auto locked = state.lock(); for (auto&& value : locked->get_actual_values()) observer.on_next(std::move(value.value)); diff --git a/src/tests/rpp/test_disposables.cpp b/src/tests/rpp/test_disposables.cpp index 5a659bdf7..137c8633f 100644 --- a/src/tests/rpp/test_disposables.cpp +++ b/src/tests/rpp/test_disposables.cpp @@ -27,7 +27,7 @@ namespace }; } // namespace -TEST_CASE_TEMPLATE("disposable keeps state", TestType, rpp::details::disposables::dynamic_disposables_container<0>, rpp::details::disposables::static_disposables_container<1>) +TEST_CASE_TEMPLATE("disposable keeps state", TestType, rpp::details::disposables::dynamic_disposables_container, rpp::details::disposables::static_disposables_container<1>) { auto d = rpp::composite_disposable_wrapper::make>(); @@ -282,7 +282,7 @@ TEST_CASE("composite_disposable correctly handles exception") CHECK(!d2.is_disposed()); } -TEST_CASE("static_disposable_container works as expected") +TEST_CASE("static_disposables_container works as expected") { rpp::details::disposables::static_disposables_container<2> container{}; diff --git a/src/tests/rpp/test_merge.cpp b/src/tests/rpp/test_merge.cpp index 8c982300c..4246120a2 100644 --- a/src/tests/rpp/test_merge.cpp +++ b/src/tests/rpp/test_merge.cpp @@ -221,19 +221,24 @@ TEST_CASE_TEMPLATE("merge handles race condition", TestType, rpp::memory_model:: { SUBCASE("on_error can't interleave with on_next") { + std::optional t{}; source | rpp::ops::as_blocking() | rpp::ops::subscribe([&](auto&&) { REQUIRE(extracted_obs.has_value()); - CHECK(!on_error_called); - std::thread{[extracted_obs] + if (!t) { - extracted_obs->on_error(std::exception_ptr{}); - }}.detach(); - std::this_thread::sleep_for(std::chrono::seconds{1}); - CHECK(!on_error_called); }, + CHECK(!on_error_called); + t = std::thread{[extracted_obs] + { + extracted_obs->on_error(std::exception_ptr{}); + }}; + std::this_thread::sleep_for(std::chrono::seconds{1}); + CHECK(!on_error_called); + } }, [&](auto) { on_error_called = true; }); - + REQUIRE(t.has_value()); + t->join(); CHECK(on_error_called); } } diff --git a/src/tests/utils/disposable_observable.hpp b/src/tests/utils/disposable_observable.hpp index e5b527b95..b457d07f0 100644 --- a/src/tests/utils/disposable_observable.hpp +++ b/src/tests/utils/disposable_observable.hpp @@ -24,11 +24,11 @@ auto observable_with_disposable(rpp::disposable_wrapper d) }); } -template +template struct wrapped_observable_strategy_set_upstream { using value_type = Type; - using expected_disposable_strategy = Strategy; + using optimal_disposables_strategy = Strategy; auto subscribe(auto&& observer) const { @@ -36,11 +36,11 @@ struct wrapped_observable_strategy_set_upstream } }; -template +template struct wrapped_observable_strategy_no_set_upstream { using value_type = Type; - using expected_disposable_strategy = Strategy; + using optimal_disposables_strategy = Strategy; auto subscribe(auto&&) const {} }; @@ -133,45 +133,45 @@ void test_operator_over_observable_with_disposable(auto&& op) })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); } - SUBCASE("set_upstream with fixed_disposable_strategy_selector<1>") + SUBCASE("set_upstream with fixed_disposables_strategy<1>") { - CHECK_NOTHROW(op(rpp::observable>>{}) + CHECK_NOTHROW(op(rpp::observable>>{}) .subscribe([](const auto&) {}, rpp::utils::rethrow_error_t{})); } - SUBCASE("set_upstream with dynamic_disposable_strategy_selector<0>") + SUBCASE("set_upstream with dynamic_disposables_strategy") { - CHECK_NOTHROW(op(rpp::observable>>{}) + CHECK_NOTHROW(op(rpp::observable>{}) .subscribe([](const auto&) {}, rpp::utils::rethrow_error_t{})); } - SUBCASE("none_disposable_strategy") + SUBCASE("none_disposables_strategy") { - CHECK_NOTHROW(op(rpp::observable>{}) + CHECK_NOTHROW(op(rpp::observable>>{}) .subscribe([](const auto&) {}, rpp::utils::rethrow_error_t{})); } - SUBCASE("fixed_disposable_strategy_selector<0>") + SUBCASE("fixed_disposables_strategy<0>") { - CHECK_NOTHROW(op(rpp::observable>>{}) + CHECK_NOTHROW(op(rpp::observable>>{}) .subscribe([](const auto&) {}, rpp::utils::rethrow_error_t{})); } - SUBCASE("dynamic_disposable_strategy_selector<0>") + SUBCASE("dynamic_disposables_strategy") { - CHECK_NOTHROW(op(rpp::observable>>{}) + CHECK_NOTHROW(op(rpp::observable>{}) .subscribe([](const auto&) {}, rpp::utils::rethrow_error_t{})); } - SUBCASE("set_upstream with none_disposable_strategy calls on_error") + SUBCASE("set_upstream with none_disposables_strategy calls on_error") { - CHECK_NOTHROW(op(rpp::observable>{}) + CHECK_NOTHROW(op(rpp::observable>>{}) .subscribe([](const auto&) {}, [](const std::exception_ptr& err) { CHECK_THROWS_AS(std::rethrow_exception(err), rpp::utils::more_disposables_than_expected); })); } - SUBCASE("set_upstream with fixed_disposable_strategy_selector<0> calls on_error") + SUBCASE("set_upstream with fixed_disposables_strategy<0> calls on_error") { - CHECK_NOTHROW(op(rpp::observable>>{}) + CHECK_NOTHROW(op(rpp::observable>>{}) .subscribe([](const auto&) {}, [](const std::exception_ptr& err) { CHECK_THROWS_AS(std::rethrow_exception(err), rpp::utils::more_disposables_than_expected); })); } } diff --git a/src/tests/utils/rpp_trompeloil.hpp b/src/tests/utils/rpp_trompeloil.hpp index 578e21fa4..23234c56a 100644 --- a/src/tests/utils/rpp_trompeloil.hpp +++ b/src/tests/utils/rpp_trompeloil.hpp @@ -16,6 +16,8 @@ template class mock_observer { public: + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + struct impl_t { impl_t() = default; @@ -45,7 +47,7 @@ class mock_observer static void set_upstream(const rpp::disposable_wrapper&) noexcept {} auto get_observer() const { return rpp::observer>{*this}; } - auto get_observer(rpp::composite_disposable_wrapper d) const { return rpp::observer_with_disposable>{std::move(d), *this}; } + auto get_observer(rpp::composite_disposable_wrapper d) const { return rpp::observer_with_external_disposable>{std::move(d), *this}; } private: std::shared_ptr m_impl = std::make_shared();