diff --git a/src/rpp/rpp/observables/details/chain_strategy.hpp b/src/rpp/rpp/observables/details/chain_strategy.hpp index ad412aa26..ee35fc035 100644 --- a/src/rpp/rpp/observables/details/chain_strategy.hpp +++ b/src/rpp/rpp/observables/details/chain_strategy.hpp @@ -13,6 +13,7 @@ #include #include +#include namespace rpp { @@ -21,9 +22,11 @@ namespace rpp { using base = observable_chain_strategy; + using operator_traits = typename TStrategy::template operator_traits; + public: using expected_disposable_strategy = details::observables::deduce_updated_disposable_strategy; - using value_type = typename TStrategy::template operator_traits::result_type; + using value_type = typename operator_traits::result_type; observable_chain_strategy(const TStrategy& strategy, const TStrategies&... strategies) : m_strategy(strategy) @@ -40,6 +43,8 @@ namespace rpp template Observer> void subscribe(Observer&& observer) const { + [[maybe_unused]] const auto drain_on_exit = own_current_thread_if_needed(); + if constexpr (rpp::constraint::operator_lift_with_disposable_strategy) m_strategies.subscribe(m_strategy.template lift_with_disposable_strategy(std::forward(observer))); else if constexpr (rpp::constraint::operator_lift) @@ -48,6 +53,15 @@ namespace rpp m_strategy.subscribe(std::forward(observer), m_strategies); } + private: + static auto own_current_thread_if_needed() + { + if constexpr (requires { requires operator_traits::own_current_queue; }) + return rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); + else + return rpp::utils::none{}; + } + private: RPP_NO_UNIQUE_ADDRESS TStrategy m_strategy; RPP_NO_UNIQUE_ADDRESS observable_chain_strategy m_strategies; diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index 480a02812..59f8b743a 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -66,6 +66,8 @@ namespace rpp::operators::details struct buffer_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 89bb5ee32..cb5a28ee3 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -196,6 +196,8 @@ namespace rpp::operators::details struct concat_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index c08a106f7..206c44727 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -88,32 +88,31 @@ namespace rpp::operators::details static_assert(std::invocable...>, "Selector is not callable with passed T type"); using result_type = std::invoke_result_t...>; + + constexpr static bool own_current_queue = true; }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - template - void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + template + auto lift(Observer&& observer) const { - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); + return observables.apply(&subscribe_impl, std::forward(observer), selector); } private: - template - static void subscribe_impl(Observer&& observer, const observable_chain_strategy& observable_strategy, const TSelector& selector, const TObservables&... observables) + template + static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { - using ExpectedValue = typename observable_chain_strategy::value_type; - using Disposable = TDisposable...>; + using Disposable = TDisposable...>; const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); auto locked = disposable.lock(); locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); - subscribe>(locked, std::index_sequence_for{}, observables...); + subscribe>(locked, std::index_sequence_for{}, observables...); - observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(locked)}); + return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(locked)}; } template diff --git a/src/rpp/rpp/operators/distinct.hpp b/src/rpp/rpp/operators/distinct.hpp index eb2a4feae..ff4286885 100644 --- a/src/rpp/rpp/operators/distinct.hpp +++ b/src/rpp/rpp/operators/distinct.hpp @@ -49,6 +49,8 @@ namespace rpp::operators::details struct distinct_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/distinct_until_changed.hpp b/src/rpp/rpp/operators/distinct_until_changed.hpp index a27cdb37c..da57bab38 100644 --- a/src/rpp/rpp/operators/distinct_until_changed.hpp +++ b/src/rpp/rpp/operators/distinct_until_changed.hpp @@ -50,6 +50,8 @@ namespace rpp::operators::details template struct distinct_until_changed_t : public operators::details::lift_operator, EqualityFn> { + using operators::details::lift_operator, EqualityFn>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/filter.hpp b/src/rpp/rpp/operators/filter.hpp index abcad4b22..7844e77b5 100644 --- a/src/rpp/rpp/operators/filter.hpp +++ b/src/rpp/rpp/operators/filter.hpp @@ -46,6 +46,8 @@ namespace rpp::operators::details template struct filter_t : lift_operator, Fn> { + using lift_operator, Fn>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/first.hpp b/src/rpp/rpp/operators/first.hpp index abeaec1fb..50f6da93e 100644 --- a/src/rpp/rpp/operators/first.hpp +++ b/src/rpp/rpp/operators/first.hpp @@ -45,6 +45,8 @@ namespace rpp::operators::details struct first_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/last.hpp b/src/rpp/rpp/operators/last.hpp index 81c99a9c8..0a2833672 100644 --- a/src/rpp/rpp/operators/last.hpp +++ b/src/rpp/rpp/operators/last.hpp @@ -53,6 +53,8 @@ namespace rpp::operators::details struct last_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/map.hpp b/src/rpp/rpp/operators/map.hpp index b43ce7ebb..6bcb48968 100644 --- a/src/rpp/rpp/operators/map.hpp +++ b/src/rpp/rpp/operators/map.hpp @@ -45,6 +45,8 @@ namespace rpp::operators::details template struct map_t : lift_operator, Fn> { + using lift_operator, Fn>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 9e53d2712..80634c823 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -131,29 +131,25 @@ namespace rpp::operators::details } }; - struct merge_t + struct merge_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { static_assert(rpp::constraint::observable, "T is not observable"); using result_type = rpp::utils::extract_observable_type_t; + + constexpr static bool own_current_queue = true; + + template TObserver> + using observer_strategy = merge_observer_strategy>; }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - - template - void subscribe(Observer&& observer, const observable_chain_strategy& strategy) const - { - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - - using InnerObservable = typename observable_chain_strategy::value_type; - - strategy.subscribe(rpp::observer>>{std::forward(observer)}); - } }; template diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index c7719df7e..f648f817e 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -59,6 +59,8 @@ namespace rpp::operators::details template struct on_error_resume_next_t : lift_operator, Selector> { + using lift_operator, Selector>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp index 0c29ce559..d72ad586b 100644 --- a/src/rpp/rpp/operators/reduce.hpp +++ b/src/rpp/rpp/operators/reduce.hpp @@ -101,6 +101,8 @@ namespace rpp::operators::details template struct reduce_no_seed_t : lift_operator, Accumulator> { + using lift_operator, Accumulator>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/scan.hpp b/src/rpp/rpp/operators/scan.hpp index 814fd8a0f..fde385524 100644 --- a/src/rpp/rpp/operators/scan.hpp +++ b/src/rpp/rpp/operators/scan.hpp @@ -102,6 +102,8 @@ namespace rpp::operators::details template struct scan_no_seed_t : lift_operator, Fn> { + using lift_operator, Fn>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/skip.hpp b/src/rpp/rpp/operators/skip.hpp index 033094c7d..e050a2b2f 100644 --- a/src/rpp/rpp/operators/skip.hpp +++ b/src/rpp/rpp/operators/skip.hpp @@ -47,6 +47,8 @@ namespace rpp::operators::details struct skip_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index 561c728c0..a789cd7e8 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -137,6 +137,8 @@ namespace rpp::operators::details struct switch_on_next_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/take.hpp b/src/rpp/rpp/operators/take.hpp index b64e9b411..814ba6f93 100644 --- a/src/rpp/rpp/operators/take.hpp +++ b/src/rpp/rpp/operators/take.hpp @@ -51,6 +51,8 @@ namespace rpp::operators::details struct take_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/take_last.hpp b/src/rpp/rpp/operators/take_last.hpp index ba4d34fa2..6203db1f1 100644 --- a/src/rpp/rpp/operators/take_last.hpp +++ b/src/rpp/rpp/operators/take_last.hpp @@ -82,6 +82,8 @@ namespace rpp::operators::details struct take_last_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index e9d5adc65..04a9b1e23 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -93,24 +93,22 @@ namespace rpp::operators::details struct operator_traits { using result_type = T; + + constexpr static bool own_current_queue = true; }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - template - void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + template + auto lift(Observer&& observer) const { const auto d = disposable_wrapper_impl>>::make(std::forward(observer)); auto ptr = d.lock(); ptr->get_observer()->set_upstream(d.as_weak()); - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); observable.subscribe(take_until_throttle_observer_strategy>{ptr}); - - using expected_value = typename observable_chain_strategy::value_type; - observable_strategy.subscribe(rpp::observer>>(std::move(ptr))); + return rpp::observer>>(std::move(ptr)); } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/take_while.hpp b/src/rpp/rpp/operators/take_while.hpp index 7a8392853..a6ed05aae 100644 --- a/src/rpp/rpp/operators/take_while.hpp +++ b/src/rpp/rpp/operators/take_while.hpp @@ -46,6 +46,8 @@ namespace rpp::operators::details template struct take_while_t : lift_operator, Fn> { + using lift_operator, Fn>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/throttle.hpp b/src/rpp/rpp/operators/throttle.hpp index 0abf94b60..1aadb929b 100644 --- a/src/rpp/rpp/operators/throttle.hpp +++ b/src/rpp/rpp/operators/throttle.hpp @@ -53,6 +53,8 @@ namespace rpp::operators::details template struct throttle_t : lift_operator, rpp::schedulers::duration> { + using lift_operator, rpp::schedulers::duration>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index da63b4e70..494a66ca7 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -117,6 +117,8 @@ namespace rpp::operators::details static_assert(rpp::constraint::observable_of_type, "TFallbackObservable should be the same type as T"); using result_type = T; + + constexpr static bool own_current_queue = true; }; rpp::schedulers::duration period; @@ -170,6 +172,8 @@ namespace rpp::operators::details struct operator_traits { using result_type = T; + + constexpr static bool own_current_queue = true; }; rpp::schedulers::duration period; diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index 1fa61ad33..b717bd432 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -104,6 +104,8 @@ namespace rpp::operators::details struct window_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 8830a4cbd..5a67b28ad 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -188,28 +188,23 @@ namespace rpp::operators::details template requires rpp::constraint::observable>> - struct window_toggle_t + struct window_toggle_t : lift_operator, TOpeningsObservable, TClosingsSelectorFn> { - RPP_NO_UNIQUE_ADDRESS TOpeningsObservable openings; - RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn closings_selector; + using lift_operator, TOpeningsObservable, TClosingsSelectorFn>::lift_operator; template struct operator_traits { using result_type = rpp::window_toggle_observable; + + constexpr static bool own_current_queue = true; + + template TObserver> + using observer_strategy = window_toggle_observer_strategy, TOpeningsObservable, TClosingsSelectorFn>; }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - - template - void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const - { - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - using expected_value = typename observable_chain_strategy::value_type; - observable_strategy.subscribe(rpp::observer, TOpeningsObservable, TClosingsSelectorFn>>{std::forward(observer), openings, closings_selector}); - } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index 639599e57..c966bbda5 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -136,22 +136,22 @@ namespace rpp::operators::details static_assert(std::invocable...>, "TSelector is not invocable with T and types of rest observables"); using result_type = std::invoke_result_t...>; + + constexpr static bool own_current_queue = true; }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - template - void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + template + auto lift(Observer&& observer) const { - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); + return observables.apply(&subscribe_impl, std::forward(observer), selector); } private: - template - static void subscribe_impl(Observer&& observer, const observable_chain_strategy& observable_strategy, const TSelector& selector, const TObservables&... observables) + template + static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { using Disposable = with_latest_from_disposable...>; @@ -160,9 +160,7 @@ namespace rpp::operators::details ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); subscribe(ptr, std::index_sequence_for{}, observables...); - using ExpectedValue = typename observable_chain_strategy::value_type; - - observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(ptr)}); + return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(ptr)}; } template diff --git a/src/tests/rpp/test_take_until.cpp b/src/tests/rpp/test_take_until.cpp index a4b765826..b35d16019 100644 --- a/src/tests/rpp/test_take_until.cpp +++ b/src/tests/rpp/test_take_until.cpp @@ -191,6 +191,19 @@ TEST_CASE("take_until can handle race condition") } } +TEST_CASE("take_until handles current_thread scheduling") +{ + auto mock = mock_observer_strategy{}; + + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) + | rpp::operators::take_until(rpp::source::interval(std::chrono::seconds{1}, rpp::schedulers::current_thread{})) + | rpp::operators::subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); +} + TEST_CASE("take_until doesn't produce extra copies") { SECTION("take_until(other)") diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp index 063563018..4a2e5d170 100644 --- a/src/tests/rpp/test_timeout.cpp +++ b/src/tests/rpp/test_timeout.cpp @@ -141,6 +141,19 @@ TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") } } +TEST_CASE("timeout handles current_thread scheduling") +{ + auto mock = mock_observer_strategy{}; + + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) + | rpp::operators::timeout(std::chrono::seconds{2}, rpp::source::error({}), rpp::schedulers::current_thread{}) + | rpp::operators::subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); +} + TEST_CASE("timeout satisfies disposable contracts") { test_operator_with_disposable(rpp::ops::timeout(std::chrono::seconds{10000000}, test_scheduler{})); diff --git a/src/tests/rpp/test_with_lastest_from.cpp b/src/tests/rpp/test_with_lastest_from.cpp index 44c21203d..6af2d8b6a 100644 --- a/src/tests/rpp/test_with_lastest_from.cpp +++ b/src/tests/rpp/test_with_lastest_from.cpp @@ -179,6 +179,20 @@ TEST_CASE("with_latest_from handles race condition") } } + +TEST_CASE("with_latest_from handles current_thread scheduling") +{ + auto mock = mock_observer_strategy>{}; + + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) + | rpp::operators::with_latest_from(rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3)) + | rpp::operators::subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{std::tuple{1, 1}, std::tuple{2, 2}, std::tuple{3, 3}}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); +} + TEST_CASE("with_latest_from satisfies disposable contracts") { auto observable_disposable = rpp::composite_disposable_wrapper::make();