Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 14 additions & 50 deletions src/rpp/rpp/observers/dynamic_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,71 +63,35 @@ std::shared_ptr<dynamic_observer_state_base<T>> make_dynamic_observer_state_from
}

template<constraint::decayed_type T, constraint::decayed_type ...States>
class dynamic_state_observer : public details::typed_observer_tag<T>
class dynamic_state_observer : public state_observer<T, utils::forwarding_on_next_for_pointer,utils::forwarding_on_error_for_pointer, utils::forwarding_on_completed_for_pointer, std::shared_ptr<dynamic_observer_state_base<T>>>
{
using base = state_observer<T, utils::forwarding_on_next_for_pointer,utils::forwarding_on_error_for_pointer, utils::forwarding_on_completed_for_pointer, std::shared_ptr<dynamic_observer_state_base<T>>>;
public:
template<typename ...TStates>
requires (constraint::decayed_same_as<States, TStates> && ...)
dynamic_state_observer(std::invocable<T, States...> auto&& on_next,
std::invocable<std::exception_ptr, States...> auto&& on_error,
std::invocable<States...> auto&& on_completed,
TStates&& ... states)
: m_state{make_dynamic_observer_state_from_fns<T>(std::forward<decltype(on_next)>(on_next),
std::forward<decltype(on_error)>(on_error),
std::forward<decltype(on_completed)>(on_completed),
std::forward<TStates>(states)...)} {}
: base{utils::forwarding_on_next_for_pointer{},
utils::forwarding_on_error_for_pointer{},
utils::forwarding_on_completed_for_pointer{},
make_dynamic_observer_state_from_fns<T>(std::forward<decltype(on_next)>(on_next),
std::forward<decltype(on_error)>(on_error),
std::forward<decltype(on_completed)>(on_completed),
std::forward<TStates>(states)...)} {}

dynamic_state_observer(std::shared_ptr<details::dynamic_observer_state_base<T>> state)
: m_state{ std::move(state) } {}
: base{utils::forwarding_on_next_for_pointer{},
utils::forwarding_on_error_for_pointer{},
utils::forwarding_on_completed_for_pointer{},
std::move(state)} {}


template<constraint::observer_of_type<T> TObserver>
requires (!std::is_same_v<std::decay_t<TObserver>, dynamic_state_observer<T, States...>>)
dynamic_state_observer(TObserver&& obs)
: m_state{details::make_dynamic_observer_state<T, std::decay_t<TObserver>>(std::forward<TObserver>(obs))} {}


/**
* \brief Observable calls this methods to notify observer about new value.
*
* \note obtains value by const-reference to original object.
*/
void on_next(const T& v) const
{
m_state->on_next(v);
}

/**
* \brief Observable calls this methods to notify observer about new value.
*
* \note obtains value by rvalue-reference to original object
*/
void on_next(T&& v) const
{
m_state->on_next(std::move(v));
}

/**
* \brief Observable calls this method to notify observer about some error during generation next data.
* \warning Obtaining this call means no any further on_next or on_completed calls
* \param err details of error
*/
void on_error(const std::exception_ptr& err) const
{
m_state->on_error(err);
}

/**
* \brief Observable calls this method to notify observer about finish of work.
* \warning Obtaining this call means no any further on_next calls
*/
void on_completed() const
{
m_state->on_completed();
}

private:
std::shared_ptr<dynamic_observer_state_base<T>> m_state{};
: dynamic_state_observer{details::make_dynamic_observer_state<T, std::decay_t<TObserver>>(std::forward<TObserver>(obs))} {}
};
} // namespace rpp::details

Expand Down
6 changes: 0 additions & 6 deletions src/rpp/rpp/operators/fwd/subscribe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,5 @@ struct member_overload<Type, SpecificObservable, subscribe_tag>
{
return static_cast<const SpecificObservable*>(this)->subscribe_impl(subscriber);
}

template<constraint::observer_of_type<Type> Obs>
auto subscribe_impl(specific_subscriber<Type, Obs>&& subscriber) const
{
return static_cast<const SpecificObservable*>(this)->subscribe_impl(std::move(subscriber));
}
};
} // namespace rpp::details
14 changes: 4 additions & 10 deletions src/rpp/rpp/sources/from.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ auto extract_iterable_from_packed(const std::shared_ptr<T> & v) -> const auto&
return *v;
}

void iterate(auto&& iterable,
void iterate(const auto& iterable,
const schedulers::constraint::scheduler auto& scheduler,
constraint::subscriber auto&& subscriber)
{
Expand All @@ -58,7 +58,7 @@ void iterate(auto&& iterable,
else
{
auto worker = scheduler.create_worker(subscriber.get_subscription());
worker.schedule([iterable=std::forward<decltype(iterable)>(iterable),
worker.schedule([iterable=iterable,
subscriber=std::forward<decltype(subscriber)>(subscriber),
index = size_t{0}]() mutable-> schedulers::optional_duration
{
Expand Down Expand Up @@ -123,19 +123,13 @@ class iterate_impl
, m_scheduler{scheduler} {}

template<constraint::subscriber TSub>
void operator()(TSub&& subscriber) const &
void operator()(TSub&& subscriber) const
{
details::iterate(m_iterable, m_scheduler, std::forward<TSub>(subscriber));
}

template<constraint::subscriber TSub>
void operator()(TSub&& subscriber) const &&
{
details::iterate(std::move(m_iterable), m_scheduler, std::forward<TSub>(subscriber));
}

private:
mutable PackedIterable m_iterable;
PackedIterable m_iterable;
RPP_NO_UNIQUE_ADDRESS TScheduler m_scheduler;
};
} // namespace rpp::observable::details
Expand Down