Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/rpp/rpp/observables/observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class observable
if constexpr (details::observers::has_disposable_strategy<ObserverStrategy>)
subscribe(rpp::observer<Type, std::decay_t<ObserverStrategy>>{std::forward<ObserverStrategy>(observer_strategy)});
else
subscribe(rpp::observer<Type, details::with_disposable_strategy<std::decay_t<ObserverStrategy>, typename expected_disposable_strategy::disposable_strategy>>{std::forward<ObserverStrategy>(observer_strategy)});
subscribe(rpp::observer_with_disposable<Type, std::decay_t<ObserverStrategy>, typename expected_disposable_strategy::disposable_strategy>{std::forward<ObserverStrategy>(observer_strategy)});
}

/**
Expand Down Expand Up @@ -109,7 +109,7 @@ class observable
composite_disposable_wrapper subscribe(const composite_disposable_wrapper& d, observer<Type, ObserverStrategy>&& obs) const
{
if (!d.is_disposed())
m_strategy.subscribe(observer<Type, rpp::details::with_external_disposable<observer<Type, ObserverStrategy>>>{d, std::move(obs)});
m_strategy.subscribe(observer_with_disposable<Type, observer<Type, ObserverStrategy>>{d, std::move(obs)});
return d;
}

Expand All @@ -125,7 +125,7 @@ class observable
requires (!constraint::observer<ObserverStrategy>)
composite_disposable_wrapper subscribe(const composite_disposable_wrapper& d, ObserverStrategy&& observer_strategy) const
{
subscribe(rpp::observer<Type, rpp::details::with_external_disposable<std::decay_t<ObserverStrategy>>>{d, std::forward<ObserverStrategy>(observer_strategy)});
subscribe(observer_with_disposable<Type, std::decay_t<ObserverStrategy>>{d, std::forward<ObserverStrategy>(observer_strategy)});
return d;
}

Expand Down Expand Up @@ -186,9 +186,9 @@ class observable
{
using strategy = rpp::details::observers::lambda_strategy<Type, std::decay_t<OnNext>, std::decay_t<OnError>, std::decay_t<OnCompleted>>;

subscribe(rpp::observer<Type, rpp::details::with_disposable_strategy<strategy, typename expected_disposable_strategy::disposable_strategy>>{std::forward<OnNext>(on_next),
std::forward<OnError>(on_error),
std::forward<OnCompleted>(on_completed)});
subscribe(observer_with_disposable<Type, strategy, typename expected_disposable_strategy::disposable_strategy>{std::forward<OnNext>(on_next),
std::forward<OnError>(on_error),
std::forward<OnCompleted>(on_completed)});
}

/**
Expand Down
10 changes: 6 additions & 4 deletions src/rpp/rpp/observers/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,18 @@ struct with_disposable_strategy
static void set_upstream(const disposable_wrapper&) noexcept;
static bool is_disposed() noexcept;
};

template<typename S>
using with_external_disposable = with_disposable_strategy<S, observers::external_disposable_strategy>;
}

namespace rpp
{
template<constraint::decayed_type Type, constraint::observer_strategy<Type> Strategy>
class observer;

template<constraint::decayed_type Type,
constraint::observer_strategy<Type> Strategy,
rpp::details::observers::constraint::disposable_strategy DisposableStrategy = rpp::details::observers::external_disposable_strategy>
using observer_with_disposable = observer<Type, rpp::details::with_disposable_strategy<Strategy, DisposableStrategy>>;

template<constraint::decayed_type Type>
class dynamic_observer;

Expand All @@ -107,7 +109,7 @@ template<constraint::decayed_type Type, std::invocable<Type> OnNext, std::invoca
using lambda_observer = observer<Type, details::observers::lambda_strategy<Type, OnNext, OnError, OnCompleted>>;

template<constraint::decayed_type Type, std::invocable<Type> OnNext, std::invocable<const std::exception_ptr&> OnError, std::invocable<> OnCompleted>
using lambda_observer_with_disposable = observer<Type, details::with_external_disposable<details::observers::lambda_strategy<Type, OnNext, OnError, OnCompleted>>>;
using lambda_observer_with_disposable = observer_with_disposable<Type, details::observers::lambda_strategy<Type, OnNext, OnError, OnCompleted>>;

/**
* @brief Constructs observer specialized with passed callbacks. Most easiesest way to construct observer "on the fly" via lambdas and etc.
Expand Down
35 changes: 22 additions & 13 deletions src/rpp/rpp/observers/observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,6 @@ class observer_impl
{
}

template<typename... Args>
requires (constraint::is_constructible_from<Strategy, Args && ...> && !rpp::constraint::variadic_decayed_same_as<observer_impl<Type, Strategy, DisposablesStrategy>, Args...>)
explicit observer_impl(Args&&... args)
: m_strategy{std::forward<Args>(args)...}
, m_disposable{}
{
}

observer_impl(const observer_impl&) = default;
observer_impl(observer_impl&&) noexcept = default;

Expand Down Expand Up @@ -180,10 +172,19 @@ template<constraint::decayed_type Type, constraint::observer_strategy<Type> Stra
class observer final : public details::observer_impl<Type, Strategy, details::observers::deduce_disposable_strategy_t<Strategy>>
{
public:
using DisposableStrategy = details::observers::deduce_disposable_strategy_t<Strategy>;
using Base = details::observer_impl<Type, Strategy, details::observers::deduce_disposable_strategy_t<Strategy>>;

template<typename... Args>
requires (!constraint::variadic_decayed_same_as<observer<Type, Strategy>, Args...> && constraint::is_constructible_from<Strategy, Args && ...>)
requires constraint::is_constructible_from<Strategy, Args&&...>
explicit observer(DisposableStrategy strategy, Args&&... args)
: Base{std::move(strategy), std::forward<Args>(args)...}
{}

template<typename... Args>
requires (constraint::is_constructible_from<Strategy, Args && ...> && !rpp::constraint::variadic_decayed_same_as<observer, Args...>)
explicit observer(Args&&... args)
: details::observer_impl<Type, Strategy, details::observers::deduce_disposable_strategy_t<Strategy>>{std::forward<Args>(args)...}
: Base{DisposableStrategy{}, std::forward<Args>(args)...}
{
}

Expand All @@ -204,10 +205,18 @@ class observer<Type, details::with_disposable_strategy<Strategy, DisposableStrat
: public details::observer_impl<Type, Strategy, DisposableStrategy>
{
public:
using Base = details::observer_impl<Type, Strategy, DisposableStrategy>;

template<typename... Args>
requires constraint::is_constructible_from<Strategy, Args&&...>
explicit observer(DisposableStrategy strategy, Args&&... args)
: Base{std::move(strategy), std::forward<Args>(args)...}
{}

template<typename... Args>
requires (!rpp::constraint::variadic_decayed_same_as<observer<Type, details::with_disposable_strategy<Strategy, DisposableStrategy>> , Args...>)
requires (constraint::is_constructible_from<Strategy, Args && ...> && !rpp::constraint::variadic_decayed_same_as<observer, Args...>)
explicit observer(Args&&... args)
: details::observer_impl<Type, Strategy, DisposableStrategy>{std::forward<Args>(args)...}
: Base{DisposableStrategy{}, std::forward<Args>(args)...}
{
}

Expand All @@ -231,7 +240,7 @@ class observer<Type, rpp::details::observers::dynamic_strategy<Type>>
template<constraint::observer_strategy<Type> TStrategy>
requires (!std::same_as<TStrategy, rpp::details::observers::dynamic_strategy<Type>>)
explicit observer(observer<Type, TStrategy>&& other)
: details::observer_impl<Type, rpp::details::observers::dynamic_strategy<Type>, details::observers::none_disposable_strategy>{std::move(other)}
: details::observer_impl<Type, rpp::details::observers::dynamic_strategy<Type>, details::observers::none_disposable_strategy>{details::observers::none_disposable_strategy{}, std::move(other)}
{
}

Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/subscribe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class subscribe_t<rpp::composite_disposable_wrapper, observer<Type, ObserverStra
template<rpp::constraint::observable_strategy<Type> Strategy>
rpp::composite_disposable_wrapper operator()(const rpp::observable<Type, Strategy>& observable) &&
{
observable.subscribe(observer<Type, rpp::details::with_external_disposable<observer<Type, ObserverStrategy>>>{m_disposable, std::move(m_observer)});
observable.subscribe(observer_with_disposable<Type, observer<Type, ObserverStrategy>>{m_disposable, std::move(m_observer)});
return m_disposable;
}

Expand Down
2 changes: 1 addition & 1 deletion src/tests/utils/mock_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class mock_observer_strategy final
std::vector<Type> get_received_values() const {return m_state->vals; }

auto get_observer() const {return rpp::observer<Type, mock_observer_strategy<Type>>{*this}; }
auto get_observer(rpp::composite_disposable_wrapper d) const {return rpp::observer<Type, rpp::details::with_external_disposable<mock_observer_strategy<Type>>>{std::move(d), *this}; }
auto get_observer(rpp::composite_disposable_wrapper d) const {return rpp::observer_with_disposable<Type, mock_observer_strategy<Type>>{std::move(d), *this}; }

private:
struct State
Expand Down