Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/benchmarks/benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,21 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
| rxcpp::operators::subscribe<int>([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}

SECTION("immediate_just(1) + zip(immediate_just(2)) + subscribe")
{
TEST_RPP([&]() {
rpp::immediate_just(1)
| rpp::operators::zip(rpp::immediate_just(2))
| rpp::operators::subscribe([](auto&& v) { ankerl::nanobench::doNotOptimizeAway(v); });
});

TEST_RXCPP([&]() {
rxcpp::immediate_just(1)
| rxcpp::operators::zip(rxcpp::immediate_just(2))
| rxcpp::operators::subscribe<std::tuple<int, int>>([](auto&& v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}
} // BENCHMARK("Combining Operators")

BENCHMARK("Conditional Operators")
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
#include <rpp/operators/start_with.hpp>
#include <rpp/operators/switch_on_next.hpp>
#include <rpp/operators/with_latest_from.hpp>
#include <rpp/operators/zip.hpp>

/**
* @defgroup utility_operators Utility Operators
Expand Down
111 changes: 18 additions & 93 deletions src/rpp/rpp/operators/combine_latest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,59 +13,36 @@
#include <rpp/operators/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/combining_strategy.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/schedulers/current_thread.hpp>

#include <memory>

namespace rpp::operators::details
{
template<rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... Args>
class combine_latest_disposable final : public composite_disposable
class combine_latest_disposable final : public combining_disposable<Observer, Args...>
{
public:
explicit combine_latest_disposable(Observer&& observer, const TSelector& selector)
: observer_with_mutex{std::move(observer)}
, selector{selector}
: combining_disposable<Observer, Args...>(std::move(observer))
, m_selector(selector)
{
}

pointer_under_lock<Observer> get_observer_under_lock() { return pointer_under_lock{observer_with_mutex}; }

rpp::utils::tuple<std::optional<Args>...>& get_values() { return values; }
const auto& get_selector() const { return m_selector; }

const TSelector& get_selector() const { return selector; }

bool decrement_on_completed()
{
// just need atomicity, not guarding anything
return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1;
}
auto& get_values() { return m_values; }

private:
value_with_mutex<Observer> observer_with_mutex{};
rpp::utils::tuple<std::optional<Args>...> values{};
RPP_NO_UNIQUE_ADDRESS TSelector selector;
rpp::utils::tuple<std::optional<Args>...> m_values{};

std::atomic_size_t m_on_completed_needed{sizeof...(Args)};
RPP_NO_UNIQUE_ADDRESS TSelector m_selector;
};

template<size_t I, rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... Args>
struct combine_latest_observer_strategy
struct combine_latest_observer_strategy final
: public combining_observer_strategy<combine_latest_disposable<Observer, TSelector, Args...>>
{
std::shared_ptr<combine_latest_disposable<Observer, TSelector, Args...>> disposable{};

void set_upstream(const rpp::disposable_wrapper& d) const
{
disposable->add(d);
}

bool is_disposed() const
{
return disposable->is_disposed();
}
using combining_observer_strategy<combine_latest_disposable<Observer, TSelector, Args...>>::disposable;

template<typename T>
void on_next(T&& v) const
Expand All @@ -74,73 +51,21 @@ namespace rpp::operators::details
const auto observer = disposable->get_observer_under_lock();
disposable->get_values().template get<I>().emplace(std::forward<T>(v));

disposable->get_values().apply([this, &observer](const std::optional<Args>&... vals) {
if ((vals.has_value() && ...))
observer->on_next(disposable->get_selector()(vals.value()...));
});
}

void on_error(const std::exception_ptr& err) const
{
disposable->dispose();
disposable->get_observer_under_lock()->on_error(err);
disposable->get_values().apply(&apply_impl<decltype(disposable)>, disposable, observer);
}

void on_completed() const
private:
template<typename TDisposable>
static void apply_impl(const TDisposable& disposable, const pointer_under_lock<Observer>& observer, const std::optional<Args>&... vals)
{
if (disposable->decrement_on_completed())
{
disposable->dispose();
disposable->get_observer_under_lock()->on_completed();
}
if ((vals.has_value() && ...))
observer->on_next(disposable->get_selector()(vals.value()...));
}
};

template<typename TSelector, rpp::constraint::observable... TObservables>
struct combine_latest_t
struct combine_latest_t : public combining_operator_t<combine_latest_disposable, combine_latest_observer_strategy, TSelector, TObservables...>
{
RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple<TObservables...> observables;
RPP_NO_UNIQUE_ADDRESS TSelector selector;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
static_assert(std::invocable<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>, "Selector is not callable with passed T type");

using result_type = std::invoke_result_t<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;

template<rpp::constraint::observer Observer, typename... Strategies>
void subscribe(Observer&& observer, const observable_chain_strategy<Strategies...>& observable_strategy) const
{
// Need to take ownership over current_thread in case of inner-observables also using it
auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
observables.apply(&subscribe_impl<Observer, Strategies...>, std::forward<Observer>(observer), observable_strategy, selector);
}

private:
template<rpp::constraint::observer Observer, typename... Strategies>
static void subscribe_impl(Observer&& observer, const observable_chain_strategy<Strategies...>& observable_strategy, const TSelector& selector, const TObservables&... observables)
{
using ExpectedValue = typename observable_chain_strategy<Strategies...>::value_type;
using Disposable = combine_latest_disposable<Observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>;

const auto disposable = disposable_wrapper_impl<Disposable>::make(std::forward<Observer>(observer), selector);
auto locked = disposable.lock();
locked->get_observer_under_lock()->set_upstream(disposable.as_weak());
subscribe<std::decay_t<ExpectedValue>>(locked, std::index_sequence_for<TObservables...>{}, observables...);

observable_strategy.subscribe(rpp::observer<ExpectedValue, combine_latest_observer_strategy<0, std::decay_t<Observer>, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>>{std::move(locked)});
}

template<typename ExpectedValue, rpp::constraint::observer Observer, size_t... I>
static void subscribe(const std::shared_ptr<combine_latest_disposable<Observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>>& disposable, std::index_sequence<I...>, const TObservables&... observables)
{
(..., observables.subscribe(rpp::observer<rpp::utils::extract_observable_type_t<TObservables>, combine_latest_observer_strategy<I + 1, Observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>>{disposable}));
}
};
} // namespace rpp::operators::details

Expand Down
125 changes: 125 additions & 0 deletions src/rpp/rpp/operators/details/combining_strategy.hpp
Comment thread
AlexInLog marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/operators/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>
#include <rpp/schedulers/current_thread.hpp>

#include <memory>

namespace rpp::operators::details
{
template<rpp::constraint::observer Observer, rpp::constraint::decayed_type... Args>
class combining_disposable : public composite_disposable
{
public:
explicit combining_disposable(Observer&& observer)
: m_observer_with_mutex{std::move(observer)}
{
}

pointer_under_lock<Observer> get_observer_under_lock() { return pointer_under_lock{m_observer_with_mutex}; }

bool decrement_on_completed()
{
// just need atomicity, not guarding anything
return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1;
}

private:
value_with_mutex<Observer> m_observer_with_mutex{};

std::atomic_size_t m_on_completed_needed{sizeof...(Args)};
};

template<typename TDisposable>
struct combining_observer_strategy
{
std::shared_ptr<TDisposable> disposable{};

void set_upstream(const rpp::disposable_wrapper& d) const
{
disposable->add(d);
}

bool is_disposed() const
{
return disposable->is_disposed();
}

void on_error(const std::exception_ptr& err) const
{
disposable->dispose();
disposable->get_observer_under_lock()->on_error(err);
}

void on_completed() const
{
if (disposable->decrement_on_completed())
{
disposable->dispose();
disposable->get_observer_under_lock()->on_completed();
}
}
};

template<template<typename...> typename TDisposable, template<auto, typename...> typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables>
struct combining_operator_t
{
RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple<TObservables...> observables;
RPP_NO_UNIQUE_ADDRESS TSelector selector;

template<rpp::constraint::decayed_type T>
struct operator_traits
{
static_assert(std::invocable<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>, "Selector is not callable with passed T type");

using result_type = std::invoke_result_t<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;

template<rpp::constraint::observer Observer, typename... Strategies>
void subscribe(Observer&& observer, const observable_chain_strategy<Strategies...>& observable_strategy) const
{
// Need to take ownership over current_thread in case of inner-observables also using it
auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
observables.apply(&subscribe_impl<Observer, Strategies...>, std::forward<Observer>(observer), observable_strategy, selector);
}

private:
template<rpp::constraint::observer Observer, typename... Strategies>
static void subscribe_impl(Observer&& observer, const observable_chain_strategy<Strategies...>& observable_strategy, const TSelector& selector, const TObservables&... observables)
{
using ExpectedValue = typename observable_chain_strategy<Strategies...>::value_type;
using Disposable = TDisposable<Observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>;

const auto disposable = disposable_wrapper_impl<Disposable>::make(std::forward<Observer>(observer), selector);
auto locked = disposable.lock();
locked->get_observer_under_lock()->set_upstream(disposable.as_weak());
subscribe<std::decay_t<ExpectedValue>>(locked, std::index_sequence_for<TObservables...>{}, observables...);

observable_strategy.subscribe(rpp::observer<ExpectedValue, TStrategy<0, std::decay_t<Observer>, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>>{std::move(locked)});
}

template<typename ExpectedValue, rpp::constraint::observer Observer, size_t... I>
static void subscribe(const std::shared_ptr<TDisposable<Observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>>& disposable, std::index_sequence<I...>, const TObservables&... observables)
{
(..., observables.subscribe(rpp::observer<rpp::utils::extract_observable_type_t<TObservables>, TStrategy<I + 1, Observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>>{disposable}));
}
};
} // namespace rpp::operators::details
Loading