From 97d42559fbfc247d57e14f5d886777449cbf610a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 12 Dec 2022 23:32:40 +0300 Subject: [PATCH 1/2] Trampoline scheduler as default --- src/rpp/rpp/sources/from.hpp | 8 ++++---- src/rpp/rpp/sources/fwd.hpp | 6 +++--- src/rpp/rpp/sources/interval.hpp | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/rpp/rpp/sources/from.hpp b/src/rpp/rpp/sources/from.hpp index 113f179f3..843c90109 100644 --- a/src/rpp/rpp/sources/from.hpp +++ b/src/rpp/rpp/sources/from.hpp @@ -11,7 +11,7 @@ #pragma once #include -#include +#include #include #include #include @@ -172,7 +172,7 @@ auto just(const schedulers::constraint::scheduler auto& scheduler, T&& item, Ts& /** * \brief Creates rpp::specific_observable that emits a particular items and completes - * \warning this overloading uses immediate scheduler as default + * \warning this overloading uses trampoline scheduler as default * * \marble just { @@ -195,7 +195,7 @@ auto just(const schedulers::constraint::scheduler auto& scheduler, T&& item, Ts& template auto just(T&& item, Ts&& ...items) requires (rpp::details::is_header_included && (constraint::decayed_same_as && ...)) { - return just(schedulers::immediate{}, std::forward(item), std::forward(items)...); + return just(schedulers::trampoline{}, std::forward(item), std::forward(items)...); } /** @@ -218,7 +218,7 @@ auto just(T&& item, Ts&& ...items) requires (rpp::details::is_header_included +template auto from_iterable(constraint::iterable auto&& iterable, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included { using Container = std::decay_t; diff --git a/src/rpp/rpp/sources/fwd.hpp b/src/rpp/rpp/sources/fwd.hpp index a4431640d..b157576df 100644 --- a/src/rpp/rpp/sources/fwd.hpp +++ b/src/rpp/rpp/sources/fwd.hpp @@ -60,17 +60,17 @@ auto just(const schedulers::constraint::scheduler auto& scheduler, T&& item, Ts& template auto just(T&& item, Ts&& ...items) requires (rpp::details::is_header_included && (constraint::decayed_same_as && ...)); -template +template auto from_iterable(constraint::iterable auto&& iterable, const TScheduler& scheduler = TScheduler{}) requires rpp::details::is_header_included ; template auto from_callable(std::invocable<> auto&& callable) requires rpp::details::is_header_included; //************************ INTERVAL *********************// -template +template auto interval(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) requires rpp::details::is_header_included; -template +template auto interval(schedulers::duration first_delay, schedulers::duration period, const TScheduler& scheduler = TScheduler{}) requires rpp::details::is_header_included; } // namespace rpp::observable diff --git a/src/rpp/rpp/sources/interval.hpp b/src/rpp/rpp/sources/interval.hpp index 4a2958382..19ff935d6 100644 --- a/src/rpp/rpp/sources/interval.hpp +++ b/src/rpp/rpp/sources/interval.hpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include @@ -42,7 +42,7 @@ namespace rpp::observable * \ingroup creational_operators * \see https://reactivex.io/documentation/operators/interval.html */ - template + template auto interval(schedulers::duration period, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included { return interval(period, period, scheduler); @@ -67,7 +67,7 @@ namespace rpp::observable * \ingroup creational_operators * \see https://reactivex.io/documentation/operators/interval.html */ - template + template auto interval(schedulers::duration first_delay, schedulers::duration period, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included { return source::create([first_delay, period, scheduler](auto&& subscriber) From 1bcac130e8ad281a0c48d3f5bf20ec015980e000 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 12 Dec 2022 23:48:39 +0300 Subject: [PATCH 2/2] test fixes --- src/benchmarks/rpp_benchmark.cpp | 1 + src/rpp/rpp/operators/concat.hpp | 3 ++- src/rpp/rpp/operators/merge.hpp | 3 ++- src/tests/rpp/test_combine_latest.cpp | 6 +++--- src/tests/rpp/test_from.cpp | 10 +++++----- src/tests/rpp/test_merge.cpp | 4 ++-- src/tests/rpp/test_switch_map.cpp | 8 ++++---- 7 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/benchmarks/rpp_benchmark.cpp b/src/benchmarks/rpp_benchmark.cpp index 52fd6eff6..d7af0c480 100644 --- a/src/benchmarks/rpp_benchmark.cpp +++ b/src/benchmarks/rpp_benchmark.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 081f3efd1..977e1ff27 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -10,6 +10,7 @@ #pragma once +#include #include // dynamic_observable #include // required due to operator uses lift #include // merge_forwarding_on_next/merge_on_error @@ -147,6 +148,6 @@ struct concat_impl template ... TObservables> auto concat_with_impl(TObservables&&... observables) { - return source::just(std::forward(observables).as_dynamic()...).concat(); + return source::just(rpp::schedulers::immediate{}, std::forward(observables).as_dynamic()...).concat(); } } // namespace rpp::details diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 3742a68bc..6c5cd2993 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -10,6 +10,7 @@ #pragma once +#include #include // required due to operator uses lift #include // early_unsubscribe #include // make_serialized_subscriber @@ -101,6 +102,6 @@ struct merge_impl template ... TObservables> auto merge_with_impl(TObservables&&... observables) { - return source::just(std::forward(observables).as_dynamic()...).merge(); + return source::just(rpp::schedulers::immediate{}, std::forward(observables).as_dynamic()...).merge(); } } // namespace rpp::details diff --git a/src/tests/rpp/test_combine_latest.cpp b/src/tests/rpp/test_combine_latest.cpp index d5dc601a9..bfd967505 100644 --- a/src/tests/rpp/test_combine_latest.cpp +++ b/src/tests/rpp/test_combine_latest.cpp @@ -23,11 +23,11 @@ SCENARIO("combine_latest bundles items", "[combine_latest]") { - GIVEN("observable of -1-2-3-| combines with -4-5-6-| on default scheduler") + GIVEN("observable of -1-2-3-| combines with -4-5-6-| on immediate scheduler") { auto mock = mock_observer>{}; - rpp::source::just(1, 2, 3) - .combine_latest(rpp::source::just(4, 5, 6)) + rpp::source::just(rpp::schedulers::immediate{}, 1, 2, 3) + .combine_latest(rpp::source::just(rpp::schedulers::immediate{}, 4, 5, 6)) .subscribe(mock); CHECK(mock.get_received_values() == std::vector>{ diff --git a/src/tests/rpp/test_from.cpp b/src/tests/rpp/test_from.cpp index 881a768ce..1ea1e9bd7 100644 --- a/src/tests/rpp/test_from.cpp +++ b/src/tests/rpp/test_from.cpp @@ -10,11 +10,11 @@ #include "copy_count_tracker.hpp" #include "mock_observer.hpp" -#include "rpp/schedulers/run_loop_scheduler.hpp" #include #include #include +#include #include @@ -122,7 +122,7 @@ SCENARIO("from iterable", "[source][from][track_copy]") GIVEN("observable from copied iterable") { - auto obs = rpp::source::from_iterable(vals); + auto obs = rpp::source::from_iterable(vals, rpp::schedulers::immediate{}); WHEN("subscribe on it") { obs.subscribe(); @@ -135,7 +135,7 @@ SCENARIO("from iterable", "[source][from][track_copy]") } GIVEN("observable from moved iterable") { - auto obs = rpp::source::from_iterable(std::move(vals)); + auto obs = rpp::source::from_iterable(std::move(vals), rpp::schedulers::immediate{}); WHEN("subscribe on it") { obs.subscribe(); @@ -220,7 +220,7 @@ SCENARIO("just") GIVEN("observable with copied item") { copy_count_tracker v{}; - auto obs = rpp::observable::just(v); + auto obs = rpp::observable::just(rpp::schedulers::immediate{}, v); WHEN("subscribe on this observable") { obs.subscribe(mock); @@ -237,7 +237,7 @@ SCENARIO("just") GIVEN("observable with moved item") { copy_count_tracker v{}; - auto obs = rpp::observable::just(std::move(v)); + auto obs = rpp::observable::just(rpp::schedulers::immediate{}, std::move(v)); WHEN("subscribe on this observable") { obs.subscribe(mock); diff --git a/src/tests/rpp/test_merge.cpp b/src/tests/rpp/test_merge.cpp index d6adf451e..cf55725fc 100644 --- a/src/tests/rpp/test_merge.cpp +++ b/src/tests/rpp/test_merge.cpp @@ -74,9 +74,9 @@ SCENARIO("merge for observable of observables", "[operators][merge]") { auto obs = rpp::source::create>([](const auto& sub) { - sub.on_next(rpp::source::just(1).as_dynamic()); + sub.on_next(rpp::source::just(rpp::schedulers::immediate{}, 1).as_dynamic()); sub.on_next(rpp::source::error(std::make_exception_ptr(std::runtime_error{""})).as_dynamic()); - sub.on_next(rpp::source::just(2).as_dynamic()); + sub.on_next(rpp::source::just(rpp::schedulers::immediate{}, 2).as_dynamic()); }); WHEN("subscribe on merge of observable") diff --git a/src/tests/rpp/test_switch_map.cpp b/src/tests/rpp/test_switch_map.cpp index a0c5e15bb..5ba7925ff 100644 --- a/src/tests/rpp/test_switch_map.cpp +++ b/src/tests/rpp/test_switch_map.cpp @@ -25,10 +25,10 @@ SCENARIO("switch_map acts like flat_map except it unsubscribes the previous sour auto mock = mock_observer{}; GIVEN("observable of items") { - auto obs = rpp::source::just(1,2,3); + auto obs = rpp::source::just(rpp::schedulers::immediate{}, 1,2,3); WHEN("subscribe on it via switch_map") { - obs.switch_map([](int v){return rpp::source::just(v, v*10);}) + obs.switch_map([](int v){return rpp::source::just(rpp::schedulers::immediate{}, v, v*10);}) .subscribe(mock); THEN("subscriber obtains values from observables obtained via switch_map") { @@ -91,7 +91,7 @@ SCENARIO("switch_map acts like flat_map except it unsubscribes the previous sour .as_dynamic(); } - return rpp::source::just(2 * val).as_dynamic(); + return rpp::source::just(rpp::schedulers::immediate{}, 2 * val).as_dynamic(); }) .subscribe(mock); THEN("subscriber observes last two emission") @@ -113,7 +113,7 @@ SCENARIO("switch_map acts like flat_map except it unsubscribes the previous sour .as_dynamic(); } - return rpp::source::just(2 * val).as_dynamic(); + return rpp::source::just(rpp::schedulers::immediate{}, 2 * val).as_dynamic(); }) .subscribe(mock); THEN("subscriber observes first two emission and shall not see complete event")