Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmake/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ endif()
# ===================== Tests ===================
if (RPP_BUILD_TESTS)
rpp_fetch_library(Catch2 https://github.com/catchorg/Catch2.git v3.6.0)
target_compile_features(Catch2::Catch2WithMain INTERFACE cxx_std_20)
rpp_fetch_library(trompeloeil https://github.com/rollbear/trompeloeil.git main)
endif()

Expand Down
11 changes: 11 additions & 0 deletions src/benchmarks/benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
});
}

SECTION("concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe")
{
TEST_RPP([&]() {
rpp::source::concat(rpp::source::just(rpp::schedulers::immediate{}, 1), rpp::source::just(rpp::schedulers::immediate{}, 1, 2)).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});

TEST_RXCPP([&]() {
rxcpp::observable<>::from(rxcpp::identity_immediate(), rxcpp::observable<>::just(1, rxcpp::identity_immediate()).as_dynamic(), rxcpp::observable<>::from(rxcpp::identity_immediate(), 1, 2).as_dynamic()) | rxcpp::operators::concat() | rxcpp::operators::subscribe<int>([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}

SECTION("defer from array of 1 - defer + create + subscribe + immediate")
{
TEST_RPP([&]() {
Expand Down
20 changes: 20 additions & 0 deletions src/examples/rpp/doxygen/switch_on_next.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include <rpp/rpp.hpp>

#include <iostream>

/**
* \example switch_on_next.cpp
**/
int main()
{
//! [switch_on_next]
rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::never<int>().as_dynamic(),
rpp::source::just(2).as_dynamic())
| rpp::operators::switch_on_next()
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 1 2
//! [switch_on_next]

return 0;
}
1 change: 1 addition & 0 deletions src/rpp/rpp/observables.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@
#include <rpp/observables/dynamic_observable.hpp>
#include <rpp/observables/grouped_observable.hpp>
#include <rpp/observables/observable.hpp>
#include <rpp/observables/variant_observable.hpp>
26 changes: 17 additions & 9 deletions src/rpp/rpp/observables/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,9 @@ namespace rpp

template<constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
class observable;

template<constraint::decayed_type Type>
class dynamic_observable;

template<constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
class blocking_observable;

template<constraint::decayed_type KeyType, constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
class grouped_observable;
} // namespace rpp


namespace rpp::constraint
{
template<typename T>
Expand Down Expand Up @@ -124,6 +116,22 @@ namespace rpp::constraint
concept observables_of_same_type = rpp::constraint::observable<TObservable> && (rpp::constraint::observable<TObservables> && ...) && (std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::utils::extract_observable_type_t<TObservables>> && ...);
} // namespace rpp::constraint

namespace rpp
{
template<constraint::decayed_type Type>
class dynamic_observable;

template<constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
class blocking_observable;

template<constraint::decayed_type KeyType, constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
class grouped_observable;

template<constraint::decayed_type Type, rpp::constraint::observable_of_type<Type>... Observables>
class variant_observable;
} // namespace rpp


#define RPP_CHECK_IF_TRAIT_ASSERTS_SATISFIED(Op, Type) \
/* operator_traits can be instantiated if all inner static_asserts are fine*/ \
if constexpr (requires { { typename std::decay_t<Op>::template operator_traits<Type>{}}; })
62 changes: 62 additions & 0 deletions src/rpp/rpp/observables/variant_observable.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#pragma once

#include <rpp/observables/observable.hpp>
#include <rpp/utils/utils.hpp>

#include <variant>

namespace rpp::details
{
template<constraint::decayed_type Type, constraint::observable_of_type<Type>... Observables>
struct variant_observable_strategy
{
using value_type = Type;
template<constraint::decayed_any_of<Observables...> TT>
requires (!constraint::decayed_same_as<variant_observable_strategy, TT>)
explicit variant_observable_strategy(TT&& observable) // NOLINT
: observables(std::forward<TT>(observable))
{
}


variant_observable_strategy(const variant_observable_strategy& other) = default;
variant_observable_strategy(variant_observable_strategy&& other) noexcept = default;

utils::unique_variant<Observables...> observables;

template<rpp::constraint::observer_of_type<value_type> TObs>
void subscribe(TObs&& obs) const
{
std::visit([&](const auto& o) { o.subscribe(std::forward<TObs>(obs)); }, observables);
}
};
} // namespace rpp::details

namespace rpp
{
/**
* @brief Extension over rpp::observable to provide ability statically keep one of multiple observables
*
* @ingroup observables
*/
template<constraint::decayed_type Type, constraint::observable_of_type<Type>... Observables>
class variant_observable : public rpp::observable<Type, details::variant_observable_strategy<Type, Observables...>>
{
using base = rpp::observable<Type, details::variant_observable_strategy<Type, Observables...>>;

public:
using base::base;
};

template<constraint::observable T, constraint::observable_of_type<rpp::utils::extract_observable_type_t<T>>... Observables>
variant_observable(std::variant<T, Observables...> variant) -> variant_observable<rpp::utils::extract_observable_type_t<T>, T, Observables...>;
} // namespace rpp
24 changes: 24 additions & 0 deletions src/rpp/rpp/operators/switch_on_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,30 @@ namespace rpp::operators::details

namespace rpp::operators
{
/**
* @brief Converts observable of observables into observable of values which emits values from most recent underlying observable till new observable obtained
*
* @marble switch_on_next
{
source observable:
{
+--1-2-3-5--|
.....+4--6-9|
.......+7-8-|
}
operator "switch_on_next" : +--1-24-7-8|
}
*
* @details Actually this operator just unsubscribes from previous observable and subscribes on new observable when obtained in `on_next`
*
* @warning #include <rpp/operators/switch_on_next.hpp>
*
* @par Example:
* @snippet switch_on_next.cpp switch_on_next
*
* @ingroup combining_operators
* @see https://reactivex.io/documentation/operators/switch.html
*/
inline auto switch_on_next()
{
return details::switch_on_next_t{};
Expand Down
7 changes: 5 additions & 2 deletions src/rpp/rpp/sources/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
#include <rpp/sources/fwd.hpp>

#include <rpp/memory_model.hpp>
#include <rpp/observables/dynamic_observable.hpp>
#include <rpp/observables/observable.hpp>
#include <rpp/observables/variant_observable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/sources/from.hpp>

Expand Down Expand Up @@ -183,7 +183,10 @@ namespace rpp::source
return rpp::details::make_concat_from_iterable<container>(std::forward<TObservable>(obs), std::forward<TObservables>(others)...);
}
else
return concat<MemoryModel>(std::forward<TObservable>(obs).as_dynamic(), std::forward<TObservables>(others).as_dynamic()...);
{
using variant_observable_t = rpp::variant_observable<rpp::utils::extract_observable_type_t<TObservable>, std::decay_t<TObservable>, std::decay_t<TObservables>...>;
return concat<MemoryModel>(variant_observable_t{std::forward<TObservable>(obs)}, variant_observable_t{std::forward<TObservables>(others)}...);
}
}

/**
Expand Down
23 changes: 23 additions & 0 deletions src/rpp/rpp/utils/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <algorithm>
#include <mutex>
#include <variant>

namespace rpp::utils
{
Expand Down Expand Up @@ -312,6 +313,28 @@ namespace rpp::utils
template<typename T>
using pointer_under_lock = typename value_with_mutex<T>::pointer_under_lock;

namespace details
{
template<typename T, typename... Ts>
struct unique_variant_t : std::type_identity<T>
{
};

template<typename... Ts, typename U, typename... Us>
requires (std::is_same_v<U, Ts> || ...)
struct unique_variant_t<std::variant<Ts...>, U, Us...> : unique_variant_t<std::variant<Ts...>, Us...>
{
};

template<typename... Ts, typename U, typename... Us>
struct unique_variant_t<std::variant<Ts...>, U, Us...> : public unique_variant_t<std::variant<Ts..., U>, Us...>
{
};
} // namespace details

template<typename... Ts>
using unique_variant = typename details::unique_variant_t<std::variant<>, Ts...>::type; // inspired by https://stackoverflow.com/a/57528226/17771792


#define RPP_CALL_DURING_CONSTRUCTION(...) RPP_NO_UNIQUE_ADDRESS rpp::utils::none _ = [&]() { \
__VA_ARGS__; \
Expand Down