From a4f2158ae548fed240c11bf3ed56df3f4fe39231 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 8 Mar 2024 21:46:16 +0300 Subject: [PATCH 1/6] handle current_thread more elegant --- .../observables/details/chain_strategy.hpp | 16 +++++++++++++- .../operators/details/combining_strategy.hpp | 21 +++++++++---------- src/rpp/rpp/operators/merge.hpp | 13 +++++------- src/rpp/rpp/operators/take_until.hpp | 13 ++++++------ src/rpp/rpp/operators/timeout.hpp | 4 ++++ src/rpp/rpp/operators/window_toggle.hpp | 11 +++++----- src/rpp/rpp/operators/with_latest_from.hpp | 18 +++++++--------- src/tests/rpp/test_timeout.cpp | 13 ++++++++++++ 8 files changed, 66 insertions(+), 43 deletions(-) diff --git a/src/rpp/rpp/observables/details/chain_strategy.hpp b/src/rpp/rpp/observables/details/chain_strategy.hpp index ad412aa26..39e0bc441 100644 --- a/src/rpp/rpp/observables/details/chain_strategy.hpp +++ b/src/rpp/rpp/observables/details/chain_strategy.hpp @@ -11,6 +11,7 @@ #include #include +#include #include @@ -21,9 +22,10 @@ namespace rpp { using base = observable_chain_strategy; + using operator_traits = typename TStrategy::template operator_traits; public: using expected_disposable_strategy = details::observables::deduce_updated_disposable_strategy; - using value_type = typename TStrategy::template operator_traits::result_type; + using value_type = typename operator_traits::result_type; observable_chain_strategy(const TStrategy& strategy, const TStrategies&... strategies) : m_strategy(strategy) @@ -40,6 +42,8 @@ namespace rpp template Observer> void subscribe(Observer&& observer) const { + [[maybe_unused]] const auto drain_on_exit = own_current_thread_if_needed(); + if constexpr (rpp::constraint::operator_lift_with_disposable_strategy) m_strategies.subscribe(m_strategy.template lift_with_disposable_strategy(std::forward(observer))); else if constexpr (rpp::constraint::operator_lift) @@ -47,6 +51,16 @@ namespace rpp else m_strategy.subscribe(std::forward(observer), m_strategies); } + private: + static auto own_current_thread_if_needed() requires requires { requires operator_traits::own_current_queue; } + { + return rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); + } + + static auto own_current_thread_if_needed() + { + return rpp::utils::none{}; + } private: RPP_NO_UNIQUE_ADDRESS TStrategy m_strategy; diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index c08a106f7..94149938f 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -88,32 +88,31 @@ namespace rpp::operators::details static_assert(std::invocable...>, "Selector is not callable with passed T type"); using result_type = std::invoke_result_t...>; + + constexpr static bool own_current_queue = true; }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - template - void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + template + auto lift(Observer&& observer) const { - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); + return observables.apply(&subscribe_impl, std::forward(observer), selector); } private: - template - static void subscribe_impl(Observer&& observer, const observable_chain_strategy& observable_strategy, const TSelector& selector, const TObservables&... observables) + template + static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { - using ExpectedValue = typename observable_chain_strategy::value_type; - using Disposable = TDisposable...>; + using Disposable = TDisposable...>; const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); auto locked = disposable.lock(); locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); - subscribe>(locked, std::index_sequence_for{}, observables...); + subscribe>(locked, std::index_sequence_for{}, observables...); - observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(locked)}); + return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(locked)}; } template diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 9e53d2712..7fcaa0b1f 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -139,20 +139,17 @@ namespace rpp::operators::details static_assert(rpp::constraint::observable, "T is not observable"); using result_type = rpp::utils::extract_observable_type_t; + + constexpr static bool own_current_queue = true; }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - template - void subscribe(Observer&& observer, const observable_chain_strategy& strategy) const + template + auto lift(Observer&& observer) const { - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - - using InnerObservable = typename observable_chain_strategy::value_type; - - strategy.subscribe(rpp::observer>>{std::forward(observer)}); + return rpp::observer>>{std::forward(observer)}; } }; diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index e9d5adc65..38549cc42 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -93,24 +93,23 @@ namespace rpp::operators::details struct operator_traits { using result_type = T; + + constexpr static bool own_current_queue = true; + }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - template - void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + template + auto lift(Observer&& observer) const { const auto d = disposable_wrapper_impl>>::make(std::forward(observer)); auto ptr = d.lock(); ptr->get_observer()->set_upstream(d.as_weak()); - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); observable.subscribe(take_until_throttle_observer_strategy>{ptr}); - - using expected_value = typename observable_chain_strategy::value_type; - observable_strategy.subscribe(rpp::observer>>(std::move(ptr))); + return rpp::observer>>(std::move(ptr)); } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index da63b4e70..494a66ca7 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -117,6 +117,8 @@ namespace rpp::operators::details static_assert(rpp::constraint::observable_of_type, "TFallbackObservable should be the same type as T"); using result_type = T; + + constexpr static bool own_current_queue = true; }; rpp::schedulers::duration period; @@ -170,6 +172,8 @@ namespace rpp::operators::details struct operator_traits { using result_type = T; + + constexpr static bool own_current_queue = true; }; rpp::schedulers::duration period; diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 8830a4cbd..7b421781d 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -197,18 +197,17 @@ namespace rpp::operators::details struct operator_traits { using result_type = rpp::window_toggle_observable; + + constexpr static bool own_current_queue = true; }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - template - void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + template + auto lift(Observer&& observer) const { - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - using expected_value = typename observable_chain_strategy::value_type; - observable_strategy.subscribe(rpp::observer, TOpeningsObservable, TClosingsSelectorFn>>{std::forward(observer), openings, closings_selector}); + return rpp::observer, TOpeningsObservable, TClosingsSelectorFn>>{std::forward(observer), openings, closings_selector}; } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index 639599e57..c966bbda5 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -136,22 +136,22 @@ namespace rpp::operators::details static_assert(std::invocable...>, "TSelector is not invocable with T and types of rest observables"); using result_type = std::invoke_result_t...>; + + constexpr static bool own_current_queue = true; }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - template - void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + template + auto lift(Observer&& observer) const { - // Need to take ownership over current_thread in case of inner-observables also using it - auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); + return observables.apply(&subscribe_impl, std::forward(observer), selector); } private: - template - static void subscribe_impl(Observer&& observer, const observable_chain_strategy& observable_strategy, const TSelector& selector, const TObservables&... observables) + template + static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { using Disposable = with_latest_from_disposable...>; @@ -160,9 +160,7 @@ namespace rpp::operators::details ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); subscribe(ptr, std::index_sequence_for{}, observables...); - using ExpectedValue = typename observable_chain_strategy::value_type; - - observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t...>>{std::move(ptr)}); + return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(ptr)}; } template diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp index 063563018..1cb437390 100644 --- a/src/tests/rpp/test_timeout.cpp +++ b/src/tests/rpp/test_timeout.cpp @@ -141,6 +141,19 @@ TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") } } +TEST_CASE("timeout handles current_thread scheduling") +{ + auto mock = mock_observer_strategy{}; + + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) + | rpp::operators::timeout(std::chrono::seconds{2}, rpp::source::error({}), rpp::schedulers::current_thread{}) + | rpp::operators::subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{1,2,3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); +} + TEST_CASE("timeout satisfies disposable contracts") { test_operator_with_disposable(rpp::ops::timeout(std::chrono::seconds{10000000}, test_scheduler{})); From 2a1904e7095492fc191e815db4b98c2af5b262f2 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 8 Mar 2024 21:50:27 +0300 Subject: [PATCH 2/6] minor --- src/rpp/rpp/operators/take_until.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index 38549cc42..04a9b1e23 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -95,7 +95,6 @@ namespace rpp::operators::details using result_type = T; constexpr static bool own_current_queue = true; - }; template From 6ab0e33c6f6a079306997137363b8a22dde0471d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 8 Mar 2024 22:02:37 +0300 Subject: [PATCH 3/6] provide explicit lift_operator --- src/rpp/rpp/operators/buffer.hpp | 2 ++ src/rpp/rpp/operators/concat.hpp | 2 ++ src/rpp/rpp/operators/distinct.hpp | 2 ++ src/rpp/rpp/operators/distinct_until_changed.hpp | 2 ++ src/rpp/rpp/operators/filter.hpp | 2 ++ src/rpp/rpp/operators/first.hpp | 2 ++ src/rpp/rpp/operators/last.hpp | 2 ++ src/rpp/rpp/operators/map.hpp | 2 ++ src/rpp/rpp/operators/merge.hpp | 13 ++++++------- src/rpp/rpp/operators/on_error_resume_next.hpp | 2 ++ src/rpp/rpp/operators/reduce.hpp | 2 ++ src/rpp/rpp/operators/scan.hpp | 2 ++ src/rpp/rpp/operators/skip.hpp | 2 ++ src/rpp/rpp/operators/switch_on_next.hpp | 2 ++ src/rpp/rpp/operators/take.hpp | 2 ++ src/rpp/rpp/operators/take_last.hpp | 2 ++ src/rpp/rpp/operators/take_while.hpp | 2 ++ src/rpp/rpp/operators/throttle.hpp | 2 ++ src/rpp/rpp/operators/window.hpp | 2 ++ src/rpp/rpp/operators/window_toggle.hpp | 16 ++++++---------- 20 files changed, 48 insertions(+), 17 deletions(-) diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index 480a02812..f03bbb0db 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -66,6 +66,8 @@ namespace rpp::operators::details struct buffer_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 89bb5ee32..1f26946de 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -196,6 +196,8 @@ namespace rpp::operators::details struct concat_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/distinct.hpp b/src/rpp/rpp/operators/distinct.hpp index eb2a4feae..a235cde1f 100644 --- a/src/rpp/rpp/operators/distinct.hpp +++ b/src/rpp/rpp/operators/distinct.hpp @@ -49,6 +49,8 @@ namespace rpp::operators::details struct distinct_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/distinct_until_changed.hpp b/src/rpp/rpp/operators/distinct_until_changed.hpp index a27cdb37c..da5f1c808 100644 --- a/src/rpp/rpp/operators/distinct_until_changed.hpp +++ b/src/rpp/rpp/operators/distinct_until_changed.hpp @@ -50,6 +50,8 @@ namespace rpp::operators::details template struct distinct_until_changed_t : public operators::details::lift_operator, EqualityFn> { + using operators::details::lift_operator, EqualityFn>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/filter.hpp b/src/rpp/rpp/operators/filter.hpp index abcad4b22..aa5837d16 100644 --- a/src/rpp/rpp/operators/filter.hpp +++ b/src/rpp/rpp/operators/filter.hpp @@ -46,6 +46,8 @@ namespace rpp::operators::details template struct filter_t : lift_operator, Fn> { + using lift_operator, Fn>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/first.hpp b/src/rpp/rpp/operators/first.hpp index abeaec1fb..ee80420ba 100644 --- a/src/rpp/rpp/operators/first.hpp +++ b/src/rpp/rpp/operators/first.hpp @@ -45,6 +45,8 @@ namespace rpp::operators::details struct first_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/last.hpp b/src/rpp/rpp/operators/last.hpp index 81c99a9c8..bafddb7c5 100644 --- a/src/rpp/rpp/operators/last.hpp +++ b/src/rpp/rpp/operators/last.hpp @@ -53,6 +53,8 @@ namespace rpp::operators::details struct last_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/map.hpp b/src/rpp/rpp/operators/map.hpp index b43ce7ebb..3d3b11fb3 100644 --- a/src/rpp/rpp/operators/map.hpp +++ b/src/rpp/rpp/operators/map.hpp @@ -45,6 +45,8 @@ namespace rpp::operators::details template struct map_t : lift_operator, Fn> { + using lift_operator, Fn>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 7fcaa0b1f..b6238fa67 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -131,8 +131,10 @@ namespace rpp::operators::details } }; - struct merge_t + struct merge_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { @@ -141,16 +143,13 @@ namespace rpp::operators::details using result_type = rpp::utils::extract_observable_type_t; constexpr static bool own_current_queue = true; + + template TObserver> + using observer_strategy = merge_observer_strategy>; }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - - template - auto lift(Observer&& observer) const - { - return rpp::observer>>{std::forward(observer)}; - } }; template diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index c7719df7e..26749f016 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -59,6 +59,8 @@ namespace rpp::operators::details template struct on_error_resume_next_t : lift_operator, Selector> { + using lift_operator, Selector>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp index 0c29ce559..520e2dae6 100644 --- a/src/rpp/rpp/operators/reduce.hpp +++ b/src/rpp/rpp/operators/reduce.hpp @@ -101,6 +101,8 @@ namespace rpp::operators::details template struct reduce_no_seed_t : lift_operator, Accumulator> { + using lift_operator, Accumulator>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/scan.hpp b/src/rpp/rpp/operators/scan.hpp index 814fd8a0f..553e15bb5 100644 --- a/src/rpp/rpp/operators/scan.hpp +++ b/src/rpp/rpp/operators/scan.hpp @@ -102,6 +102,8 @@ namespace rpp::operators::details template struct scan_no_seed_t : lift_operator, Fn> { + using lift_operator, Fn>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/skip.hpp b/src/rpp/rpp/operators/skip.hpp index 033094c7d..54d8d7565 100644 --- a/src/rpp/rpp/operators/skip.hpp +++ b/src/rpp/rpp/operators/skip.hpp @@ -47,6 +47,8 @@ namespace rpp::operators::details struct skip_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index 561c728c0..4be916958 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -137,6 +137,8 @@ namespace rpp::operators::details struct switch_on_next_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/take.hpp b/src/rpp/rpp/operators/take.hpp index b64e9b411..2d6e61a2a 100644 --- a/src/rpp/rpp/operators/take.hpp +++ b/src/rpp/rpp/operators/take.hpp @@ -51,6 +51,8 @@ namespace rpp::operators::details struct take_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/take_last.hpp b/src/rpp/rpp/operators/take_last.hpp index ba4d34fa2..8457af0e1 100644 --- a/src/rpp/rpp/operators/take_last.hpp +++ b/src/rpp/rpp/operators/take_last.hpp @@ -82,6 +82,8 @@ namespace rpp::operators::details struct take_last_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/take_while.hpp b/src/rpp/rpp/operators/take_while.hpp index 7a8392853..e4ddcce6c 100644 --- a/src/rpp/rpp/operators/take_while.hpp +++ b/src/rpp/rpp/operators/take_while.hpp @@ -46,6 +46,8 @@ namespace rpp::operators::details template struct take_while_t : lift_operator, Fn> { + using lift_operator, Fn>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/throttle.hpp b/src/rpp/rpp/operators/throttle.hpp index 0abf94b60..0e033aa35 100644 --- a/src/rpp/rpp/operators/throttle.hpp +++ b/src/rpp/rpp/operators/throttle.hpp @@ -53,6 +53,8 @@ namespace rpp::operators::details template struct throttle_t : lift_operator, rpp::schedulers::duration> { + using lift_operator, rpp::schedulers::duration>::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index 1fa61ad33..bdfaa72a9 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -104,6 +104,8 @@ namespace rpp::operators::details struct window_t : lift_operator { + using lift_operator::lift_operator; + template struct operator_traits { diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 7b421781d..98d44b03b 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -188,27 +188,23 @@ namespace rpp::operators::details template requires rpp::constraint::observable>> - struct window_toggle_t + struct window_toggle_t : lift_operator, TOpeningsObservable, TClosingsSelectorFn> { - RPP_NO_UNIQUE_ADDRESS TOpeningsObservable openings; - RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn closings_selector; - + using lift_operator, TOpeningsObservable, TClosingsSelectorFn>::lift_operator; + template struct operator_traits { using result_type = rpp::window_toggle_observable; constexpr static bool own_current_queue = true; + + template TObserver> + using observer_strategy = window_toggle_observer_strategy, TOpeningsObservable, TClosingsSelectorFn>; }; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - - template - auto lift(Observer&& observer) const - { - return rpp::observer, TOpeningsObservable, TClosingsSelectorFn>>{std::forward(observer), openings, closings_selector}; - } }; } // namespace rpp::operators::details From b63efac798642ed63ff0d921f46a3e781549fc3e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 8 Mar 2024 23:17:15 +0300 Subject: [PATCH 4/6] add tests --- src/tests/rpp/test_take_until.cpp | 13 +++++++++++++ src/tests/rpp/test_with_lastest_from.cpp | 14 ++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/src/tests/rpp/test_take_until.cpp b/src/tests/rpp/test_take_until.cpp index a4b765826..31eae773c 100644 --- a/src/tests/rpp/test_take_until.cpp +++ b/src/tests/rpp/test_take_until.cpp @@ -191,6 +191,19 @@ TEST_CASE("take_until can handle race condition") } } +TEST_CASE("take_until handles current_thread scheduling") +{ + auto mock = mock_observer_strategy{}; + + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) + | rpp::operators::take_until(rpp::source::interval(std::chrono::seconds{1}, rpp::schedulers::current_thread{})) + | rpp::operators::subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{1,2,3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); +} + TEST_CASE("take_until doesn't produce extra copies") { SECTION("take_until(other)") diff --git a/src/tests/rpp/test_with_lastest_from.cpp b/src/tests/rpp/test_with_lastest_from.cpp index 44c21203d..0c5a57786 100644 --- a/src/tests/rpp/test_with_lastest_from.cpp +++ b/src/tests/rpp/test_with_lastest_from.cpp @@ -179,6 +179,20 @@ TEST_CASE("with_latest_from handles race condition") } } + +TEST_CASE("with_latest_from handles current_thread scheduling") +{ + auto mock = mock_observer_strategy>{}; + + rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) + | rpp::operators::with_latest_from(rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3)) + | rpp::operators::subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{std::tuple{1,1}, std::tuple{2,2}, std::tuple{3,3}}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); +} + TEST_CASE("with_latest_from satisfies disposable contracts") { auto observable_disposable = rpp::composite_disposable_wrapper::make(); From fd075cef5e00458326bb99369c8d00e813d1c290 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 8 Mar 2024 20:17:24 +0000 Subject: [PATCH 5/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/observables/details/chain_strategy.hpp | 7 +++++-- src/rpp/rpp/operators/buffer.hpp | 2 +- src/rpp/rpp/operators/concat.hpp | 2 +- src/rpp/rpp/operators/details/combining_strategy.hpp | 2 +- src/rpp/rpp/operators/distinct.hpp | 2 +- src/rpp/rpp/operators/distinct_until_changed.hpp | 2 +- src/rpp/rpp/operators/filter.hpp | 2 +- src/rpp/rpp/operators/first.hpp | 2 +- src/rpp/rpp/operators/last.hpp | 2 +- src/rpp/rpp/operators/map.hpp | 2 +- src/rpp/rpp/operators/merge.hpp | 2 +- src/rpp/rpp/operators/on_error_resume_next.hpp | 2 +- src/rpp/rpp/operators/reduce.hpp | 2 +- src/rpp/rpp/operators/scan.hpp | 2 +- src/rpp/rpp/operators/skip.hpp | 2 +- src/rpp/rpp/operators/switch_on_next.hpp | 2 +- src/rpp/rpp/operators/take.hpp | 2 +- src/rpp/rpp/operators/take_last.hpp | 2 +- src/rpp/rpp/operators/take_while.hpp | 2 +- src/rpp/rpp/operators/throttle.hpp | 2 +- src/rpp/rpp/operators/window.hpp | 2 +- src/rpp/rpp/operators/window_toggle.hpp | 2 +- src/tests/rpp/test_take_until.cpp | 6 +++--- src/tests/rpp/test_timeout.cpp | 6 +++--- src/tests/rpp/test_with_lastest_from.cpp | 6 +++--- 25 files changed, 35 insertions(+), 32 deletions(-) diff --git a/src/rpp/rpp/observables/details/chain_strategy.hpp b/src/rpp/rpp/observables/details/chain_strategy.hpp index 39e0bc441..e9ac6c431 100644 --- a/src/rpp/rpp/observables/details/chain_strategy.hpp +++ b/src/rpp/rpp/observables/details/chain_strategy.hpp @@ -11,9 +11,9 @@ #include #include -#include #include +#include namespace rpp { @@ -23,6 +23,7 @@ namespace rpp using base = observable_chain_strategy; using operator_traits = typename TStrategy::template operator_traits; + public: using expected_disposable_strategy = details::observables::deduce_updated_disposable_strategy; using value_type = typename operator_traits::result_type; @@ -51,8 +52,10 @@ namespace rpp else m_strategy.subscribe(std::forward(observer), m_strategies); } + private: - static auto own_current_thread_if_needed() requires requires { requires operator_traits::own_current_queue; } + static auto own_current_thread_if_needed() + requires requires { requires operator_traits::own_current_queue; } { return rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); } diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index f03bbb0db..59f8b743a 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -67,7 +67,7 @@ namespace rpp::operators::details struct buffer_t : lift_operator { using lift_operator::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 1f26946de..cb5a28ee3 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -197,7 +197,7 @@ namespace rpp::operators::details struct concat_t : lift_operator { using lift_operator::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 94149938f..206c44727 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -105,7 +105,7 @@ namespace rpp::operators::details template static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { - using Disposable = TDisposable...>; + using Disposable = TDisposable...>; const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); auto locked = disposable.lock(); diff --git a/src/rpp/rpp/operators/distinct.hpp b/src/rpp/rpp/operators/distinct.hpp index a235cde1f..ff4286885 100644 --- a/src/rpp/rpp/operators/distinct.hpp +++ b/src/rpp/rpp/operators/distinct.hpp @@ -50,7 +50,7 @@ namespace rpp::operators::details struct distinct_t : lift_operator { using lift_operator::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/distinct_until_changed.hpp b/src/rpp/rpp/operators/distinct_until_changed.hpp index da5f1c808..da57bab38 100644 --- a/src/rpp/rpp/operators/distinct_until_changed.hpp +++ b/src/rpp/rpp/operators/distinct_until_changed.hpp @@ -51,7 +51,7 @@ namespace rpp::operators::details struct distinct_until_changed_t : public operators::details::lift_operator, EqualityFn> { using operators::details::lift_operator, EqualityFn>::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/filter.hpp b/src/rpp/rpp/operators/filter.hpp index aa5837d16..7844e77b5 100644 --- a/src/rpp/rpp/operators/filter.hpp +++ b/src/rpp/rpp/operators/filter.hpp @@ -47,7 +47,7 @@ namespace rpp::operators::details struct filter_t : lift_operator, Fn> { using lift_operator, Fn>::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/first.hpp b/src/rpp/rpp/operators/first.hpp index ee80420ba..50f6da93e 100644 --- a/src/rpp/rpp/operators/first.hpp +++ b/src/rpp/rpp/operators/first.hpp @@ -46,7 +46,7 @@ namespace rpp::operators::details struct first_t : lift_operator { using lift_operator::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/last.hpp b/src/rpp/rpp/operators/last.hpp index bafddb7c5..0a2833672 100644 --- a/src/rpp/rpp/operators/last.hpp +++ b/src/rpp/rpp/operators/last.hpp @@ -54,7 +54,7 @@ namespace rpp::operators::details struct last_t : lift_operator { using lift_operator::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/map.hpp b/src/rpp/rpp/operators/map.hpp index 3d3b11fb3..6bcb48968 100644 --- a/src/rpp/rpp/operators/map.hpp +++ b/src/rpp/rpp/operators/map.hpp @@ -46,7 +46,7 @@ namespace rpp::operators::details struct map_t : lift_operator, Fn> { using lift_operator, Fn>::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index b6238fa67..80634c823 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -134,7 +134,7 @@ namespace rpp::operators::details struct merge_t : lift_operator { using lift_operator::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 26749f016..f648f817e 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -60,7 +60,7 @@ namespace rpp::operators::details struct on_error_resume_next_t : lift_operator, Selector> { using lift_operator, Selector>::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp index 520e2dae6..d72ad586b 100644 --- a/src/rpp/rpp/operators/reduce.hpp +++ b/src/rpp/rpp/operators/reduce.hpp @@ -102,7 +102,7 @@ namespace rpp::operators::details struct reduce_no_seed_t : lift_operator, Accumulator> { using lift_operator, Accumulator>::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/scan.hpp b/src/rpp/rpp/operators/scan.hpp index 553e15bb5..fde385524 100644 --- a/src/rpp/rpp/operators/scan.hpp +++ b/src/rpp/rpp/operators/scan.hpp @@ -103,7 +103,7 @@ namespace rpp::operators::details struct scan_no_seed_t : lift_operator, Fn> { using lift_operator, Fn>::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/skip.hpp b/src/rpp/rpp/operators/skip.hpp index 54d8d7565..e050a2b2f 100644 --- a/src/rpp/rpp/operators/skip.hpp +++ b/src/rpp/rpp/operators/skip.hpp @@ -48,7 +48,7 @@ namespace rpp::operators::details struct skip_t : lift_operator { using lift_operator::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index 4be916958..a789cd7e8 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -138,7 +138,7 @@ namespace rpp::operators::details struct switch_on_next_t : lift_operator { using lift_operator::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/take.hpp b/src/rpp/rpp/operators/take.hpp index 2d6e61a2a..814ba6f93 100644 --- a/src/rpp/rpp/operators/take.hpp +++ b/src/rpp/rpp/operators/take.hpp @@ -52,7 +52,7 @@ namespace rpp::operators::details struct take_t : lift_operator { using lift_operator::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/take_last.hpp b/src/rpp/rpp/operators/take_last.hpp index 8457af0e1..6203db1f1 100644 --- a/src/rpp/rpp/operators/take_last.hpp +++ b/src/rpp/rpp/operators/take_last.hpp @@ -83,7 +83,7 @@ namespace rpp::operators::details struct take_last_t : lift_operator { using lift_operator::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/take_while.hpp b/src/rpp/rpp/operators/take_while.hpp index e4ddcce6c..a6ed05aae 100644 --- a/src/rpp/rpp/operators/take_while.hpp +++ b/src/rpp/rpp/operators/take_while.hpp @@ -47,7 +47,7 @@ namespace rpp::operators::details struct take_while_t : lift_operator, Fn> { using lift_operator, Fn>::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/throttle.hpp b/src/rpp/rpp/operators/throttle.hpp index 0e033aa35..1aadb929b 100644 --- a/src/rpp/rpp/operators/throttle.hpp +++ b/src/rpp/rpp/operators/throttle.hpp @@ -54,7 +54,7 @@ namespace rpp::operators::details struct throttle_t : lift_operator, rpp::schedulers::duration> { using lift_operator, rpp::schedulers::duration>::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index bdfaa72a9..b717bd432 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -105,7 +105,7 @@ namespace rpp::operators::details struct window_t : lift_operator { using lift_operator::lift_operator; - + template struct operator_traits { diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 98d44b03b..5a67b28ad 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -191,7 +191,7 @@ namespace rpp::operators::details struct window_toggle_t : lift_operator, TOpeningsObservable, TClosingsSelectorFn> { using lift_operator, TOpeningsObservable, TClosingsSelectorFn>::lift_operator; - + template struct operator_traits { diff --git a/src/tests/rpp/test_take_until.cpp b/src/tests/rpp/test_take_until.cpp index 31eae773c..b35d16019 100644 --- a/src/tests/rpp/test_take_until.cpp +++ b/src/tests/rpp/test_take_until.cpp @@ -196,10 +196,10 @@ TEST_CASE("take_until handles current_thread scheduling") auto mock = mock_observer_strategy{}; rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) - | rpp::operators::take_until(rpp::source::interval(std::chrono::seconds{1}, rpp::schedulers::current_thread{})) - | rpp::operators::subscribe(mock); + | rpp::operators::take_until(rpp::source::interval(std::chrono::seconds{1}, rpp::schedulers::current_thread{})) + | rpp::operators::subscribe(mock); - CHECK(mock.get_received_values() == std::vector{1,2,3}); + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); } diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp index 1cb437390..4a2e5d170 100644 --- a/src/tests/rpp/test_timeout.cpp +++ b/src/tests/rpp/test_timeout.cpp @@ -146,10 +146,10 @@ TEST_CASE("timeout handles current_thread scheduling") auto mock = mock_observer_strategy{}; rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) - | rpp::operators::timeout(std::chrono::seconds{2}, rpp::source::error({}), rpp::schedulers::current_thread{}) - | rpp::operators::subscribe(mock); + | rpp::operators::timeout(std::chrono::seconds{2}, rpp::source::error({}), rpp::schedulers::current_thread{}) + | rpp::operators::subscribe(mock); - CHECK(mock.get_received_values() == std::vector{1,2,3}); + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); } diff --git a/src/tests/rpp/test_with_lastest_from.cpp b/src/tests/rpp/test_with_lastest_from.cpp index 0c5a57786..6af2d8b6a 100644 --- a/src/tests/rpp/test_with_lastest_from.cpp +++ b/src/tests/rpp/test_with_lastest_from.cpp @@ -185,10 +185,10 @@ TEST_CASE("with_latest_from handles current_thread scheduling") auto mock = mock_observer_strategy>{}; rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3) - | rpp::operators::with_latest_from(rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3)) - | rpp::operators::subscribe(mock); + | rpp::operators::with_latest_from(rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3)) + | rpp::operators::subscribe(mock); - CHECK(mock.get_received_values() == std::vector{std::tuple{1,1}, std::tuple{2,2}, std::tuple{3,3}}); + CHECK(mock.get_received_values() == std::vector{std::tuple{1, 1}, std::tuple{2, 2}, std::tuple{3, 3}}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); } From 68d1a1a3e6651fc2faee81c405fcc72d31fb7425 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 8 Mar 2024 23:21:03 +0300 Subject: [PATCH 6/6] simplify --- src/rpp/rpp/observables/details/chain_strategy.hpp | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/rpp/rpp/observables/details/chain_strategy.hpp b/src/rpp/rpp/observables/details/chain_strategy.hpp index e9ac6c431..ee35fc035 100644 --- a/src/rpp/rpp/observables/details/chain_strategy.hpp +++ b/src/rpp/rpp/observables/details/chain_strategy.hpp @@ -55,14 +55,11 @@ namespace rpp private: static auto own_current_thread_if_needed() - requires requires { requires operator_traits::own_current_queue; } { - return rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); - } - - static auto own_current_thread_if_needed() - { - return rpp::utils::none{}; + if constexpr (requires { requires operator_traits::own_current_queue; }) + return rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); + else + return rpp::utils::none{}; } private: