Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.idea/
.vs/
.vscode/
.cache/
build/
_build/
cmake/open-cpp-coverage.cmake
Expand Down
18 changes: 17 additions & 1 deletion src/rpp/rpp/operators/filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ IMPLEMENTATION_FILE(filter_tag);
namespace rpp::details
{
template<constraint::decayed_type Type, std::predicate<const Type&> Predicate>
struct filter_impl
struct filter_impl_on_next
{
RPP_NO_UNIQUE_ADDRESS Predicate predicate;

Expand All @@ -35,4 +35,20 @@ struct filter_impl
}
};

template<constraint::decayed_type Type, std::predicate<const Type&> Predicate>
struct filter_impl
{
RPP_NO_UNIQUE_ADDRESS filter_impl_on_next<Type, Predicate> on_next;

template<constraint::subscriber TSub>
auto operator()(TSub&& subscriber) const
{
auto subscription = subscriber.get_subscription();
return create_subscriber_with_state<Type>(std::move(subscription),
on_next,
utils::forwarding_on_error{},
utils::forwarding_on_completed{},
std::forward<TSub>(subscriber));
}
};
} // namespace rpp::details
44 changes: 39 additions & 5 deletions src/rpp/rpp/operators/lift.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@

#pragma once

#include "rpp/observables/specific_observable.hpp"
#include "rpp/subscribers/constraints.hpp"
#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <rpp/operators/fwd/lift.hpp> // own forwarding
#include <rpp/sources/create.hpp> // create observable
#include <utility>

IMPLEMENTATION_FILE(lift_tag);

Expand All @@ -38,18 +41,25 @@ struct lift_action_by_callbacks
}
};

template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn>
using subscriber_type_of_list_fn = utils::extract_subscriber_type_t<utils::decayed_invoke_result_t<OperatorFn, dynamic_subscriber<NewType>>>;

/**
* \brief Functor of "lift" operator for on_subscribe overload function.
* \details Each observable has an on_subscribe function and observable is activated (pub-sub channel is established) after on_subscribe is called. The on_subscribe is called when the observable is subscribed by a subscriber
*
* \param _this is the current observable.
* \param op is the functor that provides the "operator()(subscriber_of_new_type) -> subscriber_of_old_type".
*/
template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename TObs>
struct lift_on_subscribe
template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename ...ChildLiftArgs>
struct lift_on_subscribe : public lift_on_subscribe<NewType, OperatorFn, lift_on_subscribe<subscriber_type_of_list_fn<NewType, OperatorFn>, ChildLiftArgs...>> {};

template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename TOnSubscribe>
struct lift_on_subscribe<NewType, OperatorFn, TOnSubscribe>
{
RPP_NO_UNIQUE_ADDRESS TObs _this;
RPP_NO_UNIQUE_ADDRESS OperatorFn op;
using T = subscriber_type_of_list_fn<NewType, OperatorFn>;
RPP_NO_UNIQUE_ADDRESS specific_observable<T, TOnSubscribe> _this;
RPP_NO_UNIQUE_ADDRESS OperatorFn op;

template<constraint::subscriber_of_type<NewType> TSub>
void operator()(TSub&& subscriber) const
Expand All @@ -58,9 +68,33 @@ struct lift_on_subscribe
}
};

template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename ObservableValue, typename ...ChildLiftArgs>
auto lift_impl_internal(OperatorFn&& op, specific_observable<ObservableValue, lift_on_subscribe<ObservableValue, ChildLiftArgs...>>&& _this)
{
return observable::create<NewType, lift_on_subscribe<NewType, std::decay_t<OperatorFn>, ChildLiftArgs...>>({ std::move(_this), std::forward<OperatorFn>(op) });
}

template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename ObservableValue, typename ...ChildLiftArgs>
auto lift_impl_internal(OperatorFn&& op, const specific_observable<ObservableValue, lift_on_subscribe<ObservableValue, ChildLiftArgs...>>& _this)
{
return observable::create<NewType, lift_on_subscribe<NewType, std::decay_t<OperatorFn>, ChildLiftArgs...>>({ _this, std::forward<OperatorFn>(op) });
}

template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename ObservableValue, typename OnSubscribe>
auto lift_impl_internal(OperatorFn&& op, specific_observable<ObservableValue, OnSubscribe>&& _this)
{
return observable::create<NewType, lift_on_subscribe<NewType, std::decay_t<OperatorFn>, OnSubscribe>>({ std::move(_this), std::forward<OperatorFn>(op) });
}

template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename ObservableValue, typename OnSubscribe>
auto lift_impl_internal(OperatorFn&& op, const specific_observable<ObservableValue, OnSubscribe>& _this)
{
return observable::create<NewType, lift_on_subscribe<NewType, std::decay_t<OperatorFn>, OnSubscribe>>({ _this, std::forward<OperatorFn>(op) });
}

template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename TObs>
auto lift_impl(OperatorFn&& op, TObs&& _this)
{
return rpp::observable::create<NewType, lift_on_subscribe<NewType, std::decay_t<OperatorFn>, std::decay_t<TObs>>>({ std::forward<TObs>(_this), std::forward<OperatorFn>(op) });
return lift_impl_internal<NewType>(std::forward<OperatorFn>(op), std::forward<TObs>(_this));
}
} // namespace rpp::details
19 changes: 18 additions & 1 deletion src/rpp/rpp/operators/map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ IMPLEMENTATION_FILE(map_tag);
namespace rpp::details
{
template<constraint::decayed_type Type, std::invocable<Type> Callable>
struct map_impl
struct map_impl_on_next
{
RPP_NO_UNIQUE_ADDRESS Callable callable;

Expand All @@ -34,4 +34,21 @@ struct map_impl
subscriber.on_next(callable(utils::as_const(std::forward<TVal>(value))));
}
};

template<constraint::decayed_type Type, std::invocable<Type> Callable>
struct map_impl
{
RPP_NO_UNIQUE_ADDRESS map_impl_on_next<Type, Callable> on_next;

template<constraint::subscriber TSub>
auto operator()(TSub&& subscriber) const
{
auto subscription = subscriber.get_subscription();
return create_subscriber_with_state<Type>(std::move(subscription),
on_next,
utils::forwarding_on_error{},
utils::forwarding_on_completed{},
std::forward<TSub>(subscriber));
}
};
} // namespace rpp::details
19 changes: 18 additions & 1 deletion src/rpp/rpp/operators/take_while.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ IMPLEMENTATION_FILE(take_while_tag);
namespace rpp::details
{
template<constraint::decayed_type Type, std::predicate<const Type&> Predicate>
struct take_while_impl
struct take_while_impl_on_next
{
RPP_NO_UNIQUE_ADDRESS Predicate predicate;

Expand All @@ -35,4 +35,21 @@ struct take_while_impl
subscriber.on_completed();
}
};

template<constraint::decayed_type Type, std::predicate<const Type&> Predicate>
struct take_while_impl
{
RPP_NO_UNIQUE_ADDRESS take_while_impl_on_next<Type, Predicate> on_next;

template<constraint::subscriber TSub>
auto operator()(TSub&& subscriber) const
{
auto subscription = subscriber.get_subscription();
return create_subscriber_with_state<Type>(std::move(subscription),
on_next,
utils::forwarding_on_error{},
utils::forwarding_on_completed{},
std::forward<TSub>(subscriber));
}
};
} // namespace rpp::details