Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/benchmarks/rpp_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <rpp/subjects.hpp>
#include <rpp/schedulers/run_loop_scheduler.hpp>
#include <rpp/schedulers/trampoline_scheduler.hpp>
#include <rpp/schedulers/immediate_scheduler.hpp>
#include <rpp/utils/spinlock.hpp>

#include <array>
Expand Down
3 changes: 2 additions & 1 deletion src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#pragma once

#include <rpp/schedulers/immediate_scheduler.hpp>
#include <rpp/observables/dynamic_observable.hpp> // dynamic_observable
#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/merge.hpp> // merge_forwarding_on_next/merge_on_error
Expand Down Expand Up @@ -147,6 +148,6 @@ struct concat_impl
template<constraint::decayed_type Type, constraint::observable_of_type<Type> ... TObservables>
auto concat_with_impl(TObservables&&... observables)
{
return source::just(std::forward<TObservables>(observables).as_dynamic()...).concat();
return source::just(rpp::schedulers::immediate{}, std::forward<TObservables>(observables).as_dynamic()...).concat();
}
} // namespace rpp::details
3 changes: 2 additions & 1 deletion src/rpp/rpp/operators/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#pragma once

#include <rpp/schedulers/immediate_scheduler.hpp>
#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/details/early_unsubscribe.hpp> // early_unsubscribe
#include <rpp/operators/details/serialized_subscriber.hpp> // make_serialized_subscriber
Expand Down Expand Up @@ -101,6 +102,6 @@ struct merge_impl
template<constraint::decayed_type Type, constraint::observable_of_type<Type> ... TObservables>
auto merge_with_impl(TObservables&&... observables)
{
return source::just(std::forward<TObservables>(observables).as_dynamic()...).merge();
return source::just(rpp::schedulers::immediate{}, std::forward<TObservables>(observables).as_dynamic()...).merge();
}
} // namespace rpp::details
8 changes: 4 additions & 4 deletions src/rpp/rpp/sources/from.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#pragma once

#include <rpp/memory_model.hpp>
#include <rpp/schedulers/immediate_scheduler.hpp>
#include <rpp/schedulers/trampoline_scheduler.hpp>
#include <rpp/sources/create.hpp>
#include <rpp/sources/fwd.hpp>
#include <rpp/utils/utilities.hpp>
Expand Down Expand Up @@ -172,7 +172,7 @@ auto just(const schedulers::constraint::scheduler auto& scheduler, T&& item, Ts&

/**
* \brief Creates rpp::specific_observable that emits a particular items and completes
* \warning this overloading uses immediate scheduler as default
* \warning this overloading uses trampoline scheduler as default
*
* \marble just
{
Expand All @@ -195,7 +195,7 @@ auto just(const schedulers::constraint::scheduler auto& scheduler, T&& item, Ts&
template<memory_model memory_model /* = memory_model::use_stack */, typename T, typename ...Ts>
auto just(T&& item, Ts&& ...items) requires (rpp::details::is_header_included<rpp::details::just_tag, T, Ts...> && (constraint::decayed_same_as<T, Ts> && ...))
{
return just<memory_model>(schedulers::immediate{}, std::forward<T>(item), std::forward<Ts>(items)...);
return just<memory_model>(schedulers::trampoline{}, std::forward<T>(item), std::forward<Ts>(items)...);
}

/**
Expand All @@ -218,7 +218,7 @@ auto just(T&& item, Ts&& ...items) requires (rpp::details::is_header_included<rp
* \ingroup creational_operators
* \see https://reactivex.io/documentation/operators/from.html
*/
template<memory_model memory_model /* = memory_model::use_stack */, schedulers::constraint::scheduler TScheduler /* = schedulers::immediate */>
template<memory_model memory_model /* = memory_model::use_stack */, schedulers::constraint::scheduler TScheduler /* = schedulers::trampoline */>
auto from_iterable(constraint::iterable auto&& iterable, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included<rpp::details::from_tag, TScheduler >
{
using Container = std::decay_t<decltype(iterable)>;
Expand Down
6 changes: 3 additions & 3 deletions src/rpp/rpp/sources/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ auto just(const schedulers::constraint::scheduler auto& scheduler, T&& item, Ts&
template<memory_model memory_model = memory_model::use_stack, typename T, typename ...Ts>
auto just(T&& item, Ts&& ...items) requires (rpp::details::is_header_included<rpp::details::just_tag, T, Ts...> && (constraint::decayed_same_as<T, Ts> && ...));

template<memory_model memory_model= memory_model::use_stack, schedulers::constraint::scheduler TScheduler = schedulers::immediate>
template<memory_model memory_model= memory_model::use_stack, schedulers::constraint::scheduler TScheduler = schedulers::trampoline>
auto from_iterable(constraint::iterable auto&& iterable, const TScheduler& scheduler = TScheduler{}) requires rpp::details::is_header_included <rpp::details::from_tag, TScheduler > ;

template<memory_model memory_model = memory_model::use_stack>
auto from_callable(std::invocable<> auto&& callable) requires rpp::details::is_header_included<rpp::details::from_tag, decltype(callable)>;

//************************ INTERVAL *********************//
template<schedulers::constraint::scheduler TScheduler = schedulers::immediate>
template<schedulers::constraint::scheduler TScheduler = schedulers::trampoline>
auto interval(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) requires rpp::details::is_header_included<rpp::details::interval_tag, TScheduler>;

template<schedulers::constraint::scheduler TScheduler = schedulers::immediate>
template<schedulers::constraint::scheduler TScheduler = schedulers::trampoline>
auto interval(schedulers::duration first_delay, schedulers::duration period, const TScheduler& scheduler = TScheduler{}) requires rpp::details::is_header_included<rpp::details::interval_tag, TScheduler>;
} // namespace rpp::observable

Expand Down
6 changes: 3 additions & 3 deletions src/rpp/rpp/sources/interval.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include <rpp/sources/fwd.hpp>
#include <rpp/sources/create.hpp>
#include <rpp/schedulers/fwd.hpp>
#include <rpp/schedulers/immediate_scheduler.hpp>
#include <rpp/schedulers/trampoline_scheduler.hpp>

#include <type_traits>

Expand Down Expand Up @@ -42,7 +42,7 @@ namespace rpp::observable
* \ingroup creational_operators
* \see https://reactivex.io/documentation/operators/interval.html
*/
template<schedulers::constraint::scheduler TScheduler /*= schedulers::immediate*/>
template<schedulers::constraint::scheduler TScheduler /*= schedulers::trampoline */>
auto interval(schedulers::duration period, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included<rpp::details::interval_tag, TScheduler>
{
return interval(period, period, scheduler);
Expand All @@ -67,7 +67,7 @@ namespace rpp::observable
* \ingroup creational_operators
* \see https://reactivex.io/documentation/operators/interval.html
*/
template<schedulers::constraint::scheduler TScheduler /*= schedulers::immediate*/>
template<schedulers::constraint::scheduler TScheduler /*= schedulers::trampoline*/>
auto interval(schedulers::duration first_delay, schedulers::duration period, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included<rpp::details::interval_tag, TScheduler>
{
return source::create<size_t>([first_delay, period, scheduler](auto&& subscriber)
Expand Down
6 changes: 3 additions & 3 deletions src/tests/rpp/test_combine_latest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@

SCENARIO("combine_latest bundles items", "[combine_latest]")
{
GIVEN("observable of -1-2-3-| combines with -4-5-6-| on default scheduler")
GIVEN("observable of -1-2-3-| combines with -4-5-6-| on immediate scheduler")
{
auto mock = mock_observer<std::tuple<int, int>>{};
rpp::source::just(1, 2, 3)
.combine_latest(rpp::source::just(4, 5, 6))
rpp::source::just(rpp::schedulers::immediate{}, 1, 2, 3)
.combine_latest(rpp::source::just(rpp::schedulers::immediate{}, 4, 5, 6))
.subscribe(mock);

CHECK(mock.get_received_values() == std::vector<std::tuple<int, int>>{
Expand Down
10 changes: 5 additions & 5 deletions src/tests/rpp/test_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

#include "copy_count_tracker.hpp"
#include "mock_observer.hpp"
#include "rpp/schedulers/run_loop_scheduler.hpp"

#include <rpp/sources/from.hpp>
#include <rpp/schedulers/new_thread_scheduler.hpp>
#include <rpp/schedulers/run_loop_scheduler.hpp>
#include <rpp/schedulers/immediate_scheduler.hpp>
#include <catch2/catch_test_macros.hpp>


Expand Down Expand Up @@ -122,7 +122,7 @@ SCENARIO("from iterable", "[source][from][track_copy]")

GIVEN("observable from copied iterable")
{
auto obs = rpp::source::from_iterable(vals);
auto obs = rpp::source::from_iterable(vals, rpp::schedulers::immediate{});
WHEN("subscribe on it")
{
obs.subscribe();
Expand All @@ -135,7 +135,7 @@ SCENARIO("from iterable", "[source][from][track_copy]")
}
GIVEN("observable from moved iterable")
{
auto obs = rpp::source::from_iterable(std::move(vals));
auto obs = rpp::source::from_iterable(std::move(vals), rpp::schedulers::immediate{});
WHEN("subscribe on it")
{
obs.subscribe();
Expand Down Expand Up @@ -220,7 +220,7 @@ SCENARIO("just")
GIVEN("observable with copied item")
{
copy_count_tracker v{};
auto obs = rpp::observable::just(v);
auto obs = rpp::observable::just(rpp::schedulers::immediate{}, v);
WHEN("subscribe on this observable")
{
obs.subscribe(mock);
Expand All @@ -237,7 +237,7 @@ SCENARIO("just")
GIVEN("observable with moved item")
{
copy_count_tracker v{};
auto obs = rpp::observable::just(std::move(v));
auto obs = rpp::observable::just(rpp::schedulers::immediate{}, std::move(v));
WHEN("subscribe on this observable")
{
obs.subscribe(mock);
Expand Down
4 changes: 2 additions & 2 deletions src/tests/rpp/test_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ SCENARIO("merge for observable of observables", "[operators][merge]")
{
auto obs = rpp::source::create<rpp::dynamic_observable<int>>([](const auto& sub)
{
sub.on_next(rpp::source::just(1).as_dynamic());
sub.on_next(rpp::source::just(rpp::schedulers::immediate{}, 1).as_dynamic());
sub.on_next(rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{""})).as_dynamic());
sub.on_next(rpp::source::just(2).as_dynamic());
sub.on_next(rpp::source::just(rpp::schedulers::immediate{}, 2).as_dynamic());
});

WHEN("subscribe on merge of observable")
Expand Down
8 changes: 4 additions & 4 deletions src/tests/rpp/test_switch_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ SCENARIO("switch_map acts like flat_map except it unsubscribes the previous sour
auto mock = mock_observer<int>{};
GIVEN("observable of items")
{
auto obs = rpp::source::just(1,2,3);
auto obs = rpp::source::just(rpp::schedulers::immediate{}, 1,2,3);
WHEN("subscribe on it via switch_map")
{
obs.switch_map([](int v){return rpp::source::just(v, v*10);})
obs.switch_map([](int v){return rpp::source::just(rpp::schedulers::immediate{}, v, v*10);})
.subscribe(mock);
THEN("subscriber obtains values from observables obtained via switch_map")
{
Expand Down Expand Up @@ -91,7 +91,7 @@ SCENARIO("switch_map acts like flat_map except it unsubscribes the previous sour
.as_dynamic();
}

return rpp::source::just(2 * val).as_dynamic();
return rpp::source::just(rpp::schedulers::immediate{}, 2 * val).as_dynamic();
})
.subscribe(mock);
THEN("subscriber observes last two emission")
Expand All @@ -113,7 +113,7 @@ SCENARIO("switch_map acts like flat_map except it unsubscribes the previous sour
.as_dynamic();
}

return rpp::source::just(2 * val).as_dynamic();
return rpp::source::just(rpp::schedulers::immediate{}, 2 * val).as_dynamic();
})
.subscribe(mock);
THEN("subscriber observes first two emission and shall not see complete event")
Expand Down