Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/benchmarks/rpp_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1083,20 +1083,24 @@ TEST_CASE("on_error_resume_next")
{
BENCHMARK_ADVANCED("on_error_resume_next construction from observable via dot + subscribe")(Catch::Benchmark::Chronometer meter)
{
const auto obs = rpp::observable::create<int>([](const auto& subscriber)
auto err = std::make_exception_ptr(std::runtime_error{""});
const auto obs = rpp::observable::create<int>([&err](const auto& subscriber)
{
subscriber.on_error(std::make_exception_ptr(std::runtime_error{""}));
subscriber.on_error(err);
});
auto subscriber = rpp::specific_subscriber{[](const int&) {}};

meter.measure([&]
std::vector<rpp::dynamic_subscriber<int>> subs{};
for (int i = 0; i < meter.runs(); ++i)
subs.push_back(rpp::dynamic_subscriber<int>{});

meter.measure([&](int i)
{
return obs
.on_error_resume_next([](auto&&)
{
return rpp::observable::just(1);
return rpp::observable::never<int>();
})
.subscribe(subscriber);
.subscribe(subs[i]);
});
};
}
16 changes: 10 additions & 6 deletions src/benchmarks/rxcpp_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1049,20 +1049,24 @@ TEST_CASE("on_error_resume_next")
{
BENCHMARK_ADVANCED("on_error_resume_next construction from observable via dot + subscribe")(Catch::Benchmark::Chronometer meter)
{
const auto obs = rxcpp::sources::create<int>([](const auto& subscriber)
auto err = std::make_exception_ptr(std::runtime_error{""});
const auto obs = rxcpp::sources::create<int>([&err](const auto& subscriber)
{
subscriber.on_error(std::make_exception_ptr(std::runtime_error{""}));
subscriber.on_error(err);
});
auto subscriber = rxcpp::make_subscriber<int>();

meter.measure([&](int)
std::vector<rxcpp::subscriber<int>> subs{};
for (int i = 0; i < meter.runs(); ++i)
subs.push_back(rxcpp::make_subscriber<int>());

meter.measure([&](int i)
{
return obs
.on_error_resume_next([](auto&&)
{
return rxcpp::observable<>::just(1);
return rxcpp::observable<>::never<int>();
})
.subscribe(subscriber);
.subscribe(subs[i]);
});
};
}
2 changes: 1 addition & 1 deletion src/rpp/rpp/observables/details/member_overload.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@

namespace rpp::details
{
template<rpp::constraint::decayed_type Type, typename SpecificObservable, typename MemberTag>
template<constraint::decayed_type Type, typename SpecificObservable, typename MemberTag>
struct member_overload;
} // namespace rpp::details
8 changes: 3 additions & 5 deletions src/rpp/rpp/observables/interface_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@

#pragma once

#include <rpp/defs.hpp> // RPP_EMPTY_BASES
#include <rpp/observables/blocking_observable.hpp> // as_blocking
#include <rpp/observables/constraints.hpp> // own constraints
#include <rpp/observables/fwd.hpp> // own forwarding
#include <rpp/operators/fwd.hpp> // forwarding of member_overaloads
#include <rpp/operators/lift.hpp> // must-have operators
#include <rpp/observables/blocking_observable.hpp> // as_blocking

#include <rpp/defs.hpp> // RPP_EMPTY_BASES
#include <rpp/utils/function_traits.hpp>
#include <rpp/utils/function_traits.hpp> // decayed_invoke_result_t

#include <type_traits>

Expand Down
3 changes: 3 additions & 0 deletions src/rpp/rpp/observers/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ template<constraint::decayed_type T,
constraint::on_error_fn OnError,
constraint::on_completed_fn OnCompleted>
class specific_observer;

template<typename...Args>
using specific_observer_with_decayed_args = rpp::specific_observer<std::decay_t<Args>...>;
} // namespace rpp
3 changes: 0 additions & 3 deletions src/rpp/rpp/observers/specific_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ specific_observer(OnNext, OnError, Args...) -> specific_observer<utils::decayed_
template<typename OnNext, constraint::on_completed_fn OnCompleted>
specific_observer(OnNext, OnCompleted) -> specific_observer<utils::decayed_function_argument_t<OnNext>, OnNext, utils::rethrow_error_t, OnCompleted>;

template<typename...Args>
using specific_observer_with_decayed_args = rpp::specific_observer<std::decay_t<Args>...>;

/**
* \brief Create specific_observer with manually specified Type. In case of type can be deduced from argument of OnNext use direct constructor of rpp::specific_observer
* \tparam Type manually specific type of observer
Expand Down
13 changes: 6 additions & 7 deletions src/rpp/rpp/operators/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@

#pragma once

#include <algorithm>

#include <rpp/operators/fwd/buffer.hpp>
#include <rpp/subscribers/constraints.hpp>
#include <rpp/sources/create.hpp>
#include <rpp/utils/functors.hpp>
#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_dynamic_state
#include <rpp/operators/fwd/buffer.hpp> // own forwarding
#include <rpp/subscribers/constraints.hpp> // subscriber_of_type
#include <rpp/utils/functors.hpp> // forwarding_on_error

#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <algorithm>


IMPLEMENTATION_FILE(buffer_tag);
Expand Down
17 changes: 7 additions & 10 deletions src/rpp/rpp/operators/combine_latest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,15 @@

#pragma once

#include <algorithm>

#include <rpp/operators/fwd/combine_latest.hpp>
#include <rpp/subscribers/constraints.hpp>
#include <rpp/sources/create.hpp>
#include <rpp/utils/functors.hpp>
#include <rpp/operators/merge.hpp>
#include <rpp/defs.hpp>
#include <rpp/utils/spinlock.hpp>

#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/merge.hpp> // merge_state
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <rpp/operators/fwd/combine_latest.hpp> // own forwarding
#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
#include <rpp/utils/spinlock.hpp> // spinlock

#include <algorithm>

IMPLEMENTATION_FILE(combine_latest_tag);

Expand Down
18 changes: 8 additions & 10 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,19 @@

#pragma once

#include <rpp/subscribers/constraints.hpp>
#include <rpp/operators/fwd/concat.hpp>
#include <rpp/operators/merge.hpp>
#include <rpp/observables/dynamic_observable.hpp>

#include <rpp/subscriptions/composite_subscription.hpp>
#include <rpp/observables/dynamic_observable.hpp> // dynamic_observable
#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/merge.hpp> // merge_forwarding_on_next/merge_on_error
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state

#include <rpp/sources/just.hpp>
#include <rpp/operators/fwd/concat.hpp> // own forwarding
#include <rpp/sources/just.hpp>
#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
#include <rpp/subscriptions/composite_subscription.hpp> // composite_subscription
#include <rpp/utils/functors.hpp>
#include <rpp/utils/spinlock.hpp>


#include <mutex>
#include <memory>
#include <mutex>
#include <queue>


Expand Down
7 changes: 4 additions & 3 deletions src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@

#pragma once

#include <rpp/defs.hpp>
#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <rpp/operators/fwd/delay.hpp>
#include <rpp/subscribers/constraints.hpp>
#include <rpp/operators/fwd/delay.hpp> // own forwarding
#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type

IMPLEMENTATION_FILE(delay_tag);

Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/details/serialized_subscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#pragma once

#include <rpp/subscribers/constraints.hpp>
#include <rpp/operators/details/subscriber_with_state.hpp>
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <rpp/subscriptions/composite_subscription.hpp>

#include <memory>
Expand Down
17 changes: 7 additions & 10 deletions src/rpp/rpp/operators/distinct_until_changed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@

#pragma once

#include <rpp/observables/constraints.hpp>
#include <rpp/subscribers/constraints.hpp>
#include <rpp/operators/fwd/distinct_until_changed.hpp>
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <rpp/utils/functors.hpp>
#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_dynamic_state
#include <rpp/operators/fwd/distinct_until_changed.hpp> // own forwarding
#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
#include <rpp/utils/functors.hpp> // forwarding_on_error/forwarding_on_completed
#include <rpp/utils/utilities.hpp> // as_const

#include <rpp/defs.hpp>

#include <rpp/utils/utilities.hpp>

#include <memory>
#include <optional>


Expand Down
10 changes: 5 additions & 5 deletions src/rpp/rpp/operators/do.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

#pragma once

#include <rpp/subscribers/constraints.hpp>
#include <rpp/operators/fwd/do.hpp>

#include <rpp/utils/utilities.hpp>

#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/fwd/do.hpp> // own forwarding
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
#include <rpp/utils/utilities.hpp> // utils::as_const

IMPLEMENTATION_FILE(do_tag);

Expand Down
11 changes: 5 additions & 6 deletions src/rpp/rpp/operators/filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@

#pragma once

#include <rpp/observables/constraints.hpp>
#include <rpp/operators/fwd/filter.hpp>
#include <rpp/subscribers/constraints.hpp>
#include <rpp/utils/utilities.hpp>

#include <rpp/defs.hpp>
#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/fwd/filter.hpp> // own forwarding
#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
#include <rpp/utils/utilities.hpp> // utils::as_const

#include <utility>

Expand Down
15 changes: 7 additions & 8 deletions src/rpp/rpp/operators/first.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@

#pragma once

#include <rpp/operators/fwd/first.hpp>
#include <rpp/operators/take.hpp>
#include <rpp/subscribers/constraints.hpp>
#include <rpp/utils/exceptions.hpp>
#include <rpp/utils/functors.hpp>
#include <rpp/utils/utilities.hpp>

#include <memory>
#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/take.hpp> // take_state
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <rpp/operators/fwd/first.hpp> // own forwarding
#include <rpp/subscribers/constraints.hpp> // constraint::subscriber
#include <rpp/utils/exceptions.hpp> // not_enough_emissions
#include <rpp/utils/functors.hpp> // forwarding_on_error

IMPLEMENTATION_FILE(first_tag);

Expand Down
3 changes: 1 addition & 2 deletions src/rpp/rpp/operators/flat_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@

#pragma once

#include <rpp/operators/fwd/flat_map.hpp>

#include <rpp/operators/map.hpp>
#include <rpp/operators/merge.hpp>
#include <rpp/operators/fwd/flat_map.hpp>

IMPLEMENTATION_FILE(flat_map_tag);

Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <rpp/operators/fwd/flat_map.hpp>
#include <rpp/operators/fwd/group_by.hpp>
#include <rpp/operators/fwd/last.hpp>
#include <rpp/operators/fwd/lift.hpp>
#include <rpp/operators/fwd/map.hpp>
#include <rpp/operators/fwd/merge.hpp>
#include <rpp/operators/fwd/multicast.hpp>
Expand Down
5 changes: 2 additions & 3 deletions src/rpp/rpp/operators/fwd/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@

#pragma once

#include <vector>

#include <rpp/observables/constraints.hpp>
#include <rpp/observables/details/member_overload.hpp>

#include <vector>

namespace rpp::details
{
struct buffer_tag;
Expand Down
9 changes: 4 additions & 5 deletions src/rpp/rpp/operators/fwd/combine_latest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@

#pragma once

#include <rpp/observables/details/member_overload.hpp>
#include <rpp/observables/constraints.hpp>
#include <rpp/utils/functors.hpp>
#include <rpp/utils/function_traits.hpp>

#include <rpp/observables/constraints.hpp> // constraint::observable
#include <rpp/observables/details/member_overload.hpp> // member_overload
#include <rpp/utils/function_traits.hpp> // decayed_invoke_result_t
#include <rpp/utils/functors.hpp> // pack_to_tuple

#include <tuple>

Expand Down
5 changes: 2 additions & 3 deletions src/rpp/rpp/operators/fwd/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@

#pragma once

#include <rpp/observables/details/member_overload.hpp>

#include <rpp/observables/constraints.hpp>
#include <rpp/observables/constraints.hpp> // constraint::observable_of_type
#include <rpp/observables/details/member_overload.hpp> // member_overload


namespace rpp::details
Expand Down
7 changes: 2 additions & 5 deletions src/rpp/rpp/operators/fwd/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@

#pragma once

#include <rpp/schedulers/constraints.hpp>
#include <rpp/observables/details/member_overload.hpp>
#include <rpp/utils/function_traits.hpp>
#include <rpp/utils/functors.hpp>
#include <rpp/schedulers/constraints.hpp> // schedulers::constraint::scheduler
#include <rpp/observables/details/member_overload.hpp> // member_overload

namespace rpp::details
{
Expand All @@ -22,7 +20,6 @@ struct delay_tag;

namespace rpp::details
{

template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
struct delay_impl;

Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/fwd/distinct_until_changed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

#pragma once

#include <rpp/observables/details/member_overload.hpp>
#include <rpp/observables/details/member_overload.hpp> // member_overload

#include <concepts>
#include <functional>
Expand Down
5 changes: 2 additions & 3 deletions src/rpp/rpp/operators/fwd/do.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@

#pragma once

#include <rpp/observables/details/member_overload.hpp>
#include <rpp/observers/fwd.hpp>
#include <rpp/observers/specific_observer.hpp>
#include <rpp/observables/details/member_overload.hpp> // member_overload
#include <rpp/observers/constraints.hpp> // constraint::observer_of_type

namespace rpp::details
{
Expand Down
Loading