diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a8a86c57..3f13ed0ad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: download_deps: strategy: matrix: - os: [ubuntu-latest, windows-latest] + os: [ubuntu-latest, windows-latest, macos-latest] runs-on: ${{ matrix.os }} name: Build Deps - ${{ matrix.os }} @@ -63,6 +63,8 @@ jobs: - name: Install Qt uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Restore Catch2 uses: actions/cache@v3 @@ -98,6 +100,8 @@ jobs: - name: Install Qt uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Restore Catch2 uses: actions/cache@v3 @@ -120,7 +124,8 @@ jobs: matrix: config: [{name: ci-ubuntu-gcc, os: ubuntu-latest}, {name: ci-ubuntu-clang,os: ubuntu-latest}, - {name: ci-windows, os: windows-latest}] + {name: ci-windows, os: windows-latest}, + {name: ci-macos, os: macos-latest}] type: [tests, benchmarks] timeout-minutes: 20 @@ -135,9 +140,11 @@ jobs: - name: Install Qt if: matrix.type == 'tests' uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Install deps - if: matrix.config.os != 'windows-latest' + if: matrix.config.os == 'ubuntu-latest' run: | sudo apt-get update -q && sudo apt-get install clang-tidy cppcheck libsfml-dev -y -q pip install pyyaml @@ -198,6 +205,8 @@ jobs: - name: Install Qt uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Cache SonarCloud packages uses: actions/cache@v3 @@ -283,6 +292,8 @@ jobs: - name: Install Qt uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Install project and build env: diff --git a/.github/workflows/sonarcloud_analyze.yml b/.github/workflows/sonarcloud_analyze.yml index 3b77d7d6f..dfe75e253 100644 --- a/.github/workflows/sonarcloud_analyze.yml +++ b/.github/workflows/sonarcloud_analyze.yml @@ -24,6 +24,8 @@ jobs: - name: Install Qt uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Cmake run: cmake -S . -B build -DRPP_BUILD_TESTS=1 -DRPP_BUILD_QT_CODE=1 diff --git a/Readme.md b/Readme.md index 763ed7570..6abc7004c 100644 --- a/Readme.md +++ b/Readme.md @@ -70,7 +70,11 @@ amount_of_clicks Main advantages of ReactivePlusPlus are that it is written in Modern C++ with Performance and Usage in mind. As a result it is fast, readable, easy to use and well-documented. And it is proven with [continous benchmarking results and comparison with RxCpp](https://victimsnino.github.io/ReactivePlusPlus/benchmark) -**NOTE**: ReactivePlusPlus is library for C++20. So, it works only on compilers that supports most C++20 features. CI uses gcc-10, clang-11, visual studio 2022 +**NOTE**: ReactivePlusPlus is library for C++20. So, it works only on compilers that supports most C++20 features. List of minimal supported compilers: +- (ubuntu) gcc-10 +- (ubuntu) clang-11 +- (windows) visual studio 2022 +- (macos) Apple Clang 14 # Useful links - [Why ReactivePlusPlus? What about existing Reactive Extension libraries for C++?](https://victimsnino.github.io/ReactivePlusPlus/docs/html/why_rpp.html) diff --git a/src/examples/rpp/petri/petri.cpp b/src/examples/rpp/petri/petri.cpp index cdb07a45a..9c775b7ce 100644 --- a/src/examples/rpp/petri/petri.cpp +++ b/src/examples/rpp/petri/petri.cpp @@ -19,8 +19,6 @@ using PetriNet = std::map; using Marking = std::map; typedef void (*const TransitionFunction)(void); -using namespace std::ranges; - void foo() { std::cout << "foo" << std::endl; } void bar() { std::cout << "bar" << std::endl; } @@ -39,16 +37,16 @@ const auto& run_transition(Transition t) std::pair mutate_marking(Marking&& marking, const Places& produce) { // Produce tokens - for_each(produce, [&marking](const auto& place) { marking[place] += 1; }); + std::for_each(produce.cbegin(), produce.cend(), [&marking](const auto& place) { marking[place] += 1; }); // Consume tokens by exeucting enabled transitions Transitions transitions; for (const auto& [transition, mutations] : net) { const auto& c = mutations.consume; if (!c.empty() && - all_of(c, [&](const auto& place) { return marking[place] >= count(c, place); })) + std::all_of(c.cbegin(), c.cend(), [&](const auto& place) { return marking[place] >= std::count(c.cbegin(), c.cend(), place); })) { - for_each(c, [&marking](const auto& place) { marking[place] -= 1; }); + std::for_each(c.cbegin(), c.cend(), [&marking](const auto& place) { marking[place] -= 1; }); transitions.push_back(transition); } } @@ -78,7 +76,7 @@ int main() { Transitions transitions; std::tie(marking, transitions) = mutate_marking(std::forward(marking), places); - for_each(transitions, + std::for_each(transitions.cbegin(), transitions.cend(), [dispatcher = transitions_subject.get_subscriber()](const auto& t) { dispatcher.on_next(t); }); return std::forward(marking); diff --git a/src/examples/rpp/sfml/snake/snake.cpp b/src/examples/rpp/sfml/snake/snake.cpp index 1f352a6e6..5d8d9fe90 100644 --- a/src/examples/rpp/sfml/snake/snake.cpp +++ b/src/examples/rpp/sfml/snake/snake.cpp @@ -54,7 +54,7 @@ static bool is_snake_eat_self(const SnakeBody& body) static Coordinates update_apple_position_if_eat(Coordinates&& apple_position, const SnakeBody& snake) { - if (std::ranges::find(snake, apple_position) == snake.cend()) + if (std::find(snake.cbegin(), snake.cend(), apple_position) == snake.cend()) return apple_position; static std::random_device rd; diff --git a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp index 293876bee..ae0814a08 100644 --- a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp +++ b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp @@ -1,16 +1,17 @@ // ReactivePlusPlus library -// +// // Copyright Aleksey Loginov 2022 - present. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // https://www.boost.org/LICENSE_1_0.txt) -// +// // Project home: https://github.com/victimsnino/ReactivePlusPlus #pragma once #include // own forwarding #include +#include #include #include @@ -51,7 +52,9 @@ template class queue_worker_state { public: - queue_worker_state() noexcept = default; + queue_worker_state() = default; + queue_worker_state(const queue_worker_state&) = delete; + queue_worker_state(queue_worker_state&&) = delete; void emplace(time_point time_point, constraint::inner_schedulable_fn auto&& fn) { @@ -82,21 +85,22 @@ class queue_worker_state return true; } - bool pop_with_wait(std::optional& out, const std::stop_token& token) + bool pop_with_wait(std::optional& out) { - while (!token.stop_requested()) + while (m_subscription->is_subscribed()) { std::unique_lock lock{m_mutex}; - if (!m_cv.wait(lock, token, [&] { return !m_queue.empty(); })) - continue; + m_cv.wait(lock, [&] { return !m_subscription->is_subscribed() || !m_queue.empty(); }); - if (!m_cv.wait_until(lock, - token, - m_queue.top().get_time_point(), - std::bind_front(&queue_worker_state::is_any_ready_schedulable_unsafe, this))) + if (m_queue.empty() || !m_cv.wait_until(lock, + m_queue.top().get_time_point(), + [&] { return !m_subscription->is_subscribed() || is_any_ready_schedulable_unsafe(); })) continue; + if (!m_subscription->is_subscribed()) + return false; + out.emplace(std::move(m_queue.top().extract_function())); m_queue.pop(); return true; @@ -104,17 +108,22 @@ class queue_worker_state return false; } - void reset() + bool is_subscribed() const { - std::lock_guard lock{ m_mutex }; - m_queue = std::priority_queue>{}; + return m_subscription->is_subscribed(); + } + + void unsubscribe() + { + m_subscription->unsubscribe(); } private: void emplace_safe(time_point time_point, constraint::inner_schedulable_fn auto&& fn) { - std::lock_guard lock{ m_mutex }; - m_queue.emplace(time_point, ++m_current_id, std::forward(fn)); + std::lock_guard lock{m_mutex}; + if (m_subscription->is_subscribed()) + m_queue.emplace(time_point, ++m_current_id, std::forward(fn)); } bool is_any_ready_schedulable_unsafe() const @@ -127,5 +136,13 @@ class queue_worker_state std::condition_variable_any m_cv{}; std::priority_queue> m_queue{}; size_t m_current_id{}; + subscription_guard m_subscription = callback_subscription{[&] + { + { + std::lock_guard lock{m_mutex}; + m_queue = std::priority_queue>{}; + } + m_cv.notify_one(); + }}; }; } // namespace rpp::schedulers::details diff --git a/src/rpp/rpp/schedulers/new_thread_scheduler.hpp b/src/rpp/rpp/schedulers/new_thread_scheduler.hpp index edb5ae12a..830b45e97 100644 --- a/src/rpp/rpp/schedulers/new_thread_scheduler.hpp +++ b/src/rpp/rpp/schedulers/new_thread_scheduler.hpp @@ -38,6 +38,9 @@ class new_thread final : public details::scheduler_tag worker_strategy(const rpp::composite_subscription& sub) { + if (!sub.is_subscribed()) + return; + auto shared = std::make_shared(); // init while it is alive as shared shared->init_thread(sub); @@ -82,18 +85,19 @@ class new_thread final : public details::scheduler_tag void init_thread(const rpp::composite_subscription& sub) { - m_thread = std::jthread{[state = shared_from_this()](const std::stop_token& token) + m_thread = std::thread{[state = shared_from_this()]() { - state->data_thread(token); + state->data_thread(); }}; const auto callback = rpp::callback_subscription{[state = weak_from_this()] { - auto locked = state.lock(); + const auto locked = state.lock(); if (!locked) return; - locked->m_thread.request_stop(); - if (locked->m_thread.get_id() != std::this_thread::get_id()) + locked->m_queue.unsubscribe(); + + if (locked->m_thread.joinable() && locked->m_thread.get_id() != std::this_thread::get_id()) locked->m_thread.join(); else locked->m_thread.detach(); @@ -103,12 +107,12 @@ class new_thread final : public details::scheduler_tag } private: - void data_thread(const std::stop_token& token) + void data_thread() { std::optional fn{}; - while (!token.stop_requested()) + while (m_queue.is_subscribed()) { - if (m_queue.pop_with_wait(fn, token)) + if (m_queue.pop_with_wait(fn)) { (*fn)(); fn.reset(); @@ -116,11 +120,11 @@ class new_thread final : public details::scheduler_tag } // clear - m_queue.reset(); + m_queue.unsubscribe(); } details::queue_worker_state m_queue{}; - std::jthread m_thread{}; + std::thread m_thread{}; rpp::subscription_guard m_sub = rpp::subscription_base::empty(); }; diff --git a/src/rpp/rpp/schedulers/run_loop_scheduler.hpp b/src/rpp/rpp/schedulers/run_loop_scheduler.hpp index 71bceba9c..98f778553 100644 --- a/src/rpp/rpp/schedulers/run_loop_scheduler.hpp +++ b/src/rpp/rpp/schedulers/run_loop_scheduler.hpp @@ -76,18 +76,16 @@ class run_loop final : public details::scheduler_tag ~state() { - m_source.request_stop(); + m_queue.unsubscribe(); m_sub.unsubscribe(); } details::queue_worker_state& get_queue() { return m_queue; } - std::stop_token get_token() const { return m_source.get_token(); } - + const composite_subscription& get_subscription() const { return m_sub; } private: rpp::composite_subscription m_sub; details::queue_worker_state m_queue{}; - std::stop_source m_source{}; }; public: @@ -126,7 +124,7 @@ class run_loop final : public details::scheduler_tag void dispatch() const { std::optional fn{}; - if (m_state->get_queue().pop_with_wait(fn, m_state->get_token())) + if (m_state->get_queue().pop_with_wait(fn)) (*fn)(); } diff --git a/src/rpp/rpp/sources/from.hpp b/src/rpp/rpp/sources/from.hpp index fa10e5151..113f179f3 100644 --- a/src/rpp/rpp/sources/from.hpp +++ b/src/rpp/rpp/sources/from.hpp @@ -21,7 +21,6 @@ #include -#include #include IMPLEMENTATION_FILE(just_tag); @@ -32,10 +31,13 @@ namespace rpp::observable::details template auto extract_iterable_from_packed(const T & v) -> const auto& { - if constexpr (std::ranges::range) - return v; - else - return *v; + return v; +} + +template +auto extract_iterable_from_packed(const std::shared_ptr & v) -> const auto& +{ + return *v; } void iterate(auto&& iterable, @@ -66,7 +68,7 @@ void iterate(auto&& iterable, const auto end = std::cend(extracted_iterable); auto itr = std::cbegin(extracted_iterable); - std::ranges::advance(itr, static_cast(index), end); + std::advance(itr, static_cast(index)); if (itr != end) { @@ -90,7 +92,9 @@ void iterate(auto&& iterable, } } -template + + +template auto pack_to_container(Ts&& ...items) { if constexpr (memory_model == memory_model::use_stack) @@ -215,10 +219,10 @@ auto just(T&& item, Ts&& ...items) requires (rpp::details::is_header_included -auto from_iterable(std::ranges::range auto&& iterable, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included +auto from_iterable(constraint::iterable auto&& iterable, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included { using Container = std::decay_t; - return create>(details::iterate_impl{ details::pack_to_container(std::forward(iterable)), scheduler }); + return create>(details::iterate_impl{ details::pack_to_container(std::forward(iterable)), scheduler }); } /** diff --git a/src/rpp/rpp/sources/fwd.hpp b/src/rpp/rpp/sources/fwd.hpp index ae82fdcbb..a4431640d 100644 --- a/src/rpp/rpp/sources/fwd.hpp +++ b/src/rpp/rpp/sources/fwd.hpp @@ -19,8 +19,6 @@ #include -#include - namespace rpp::details { struct create_tag; @@ -63,7 +61,7 @@ template && (constraint::decayed_same_as && ...)); template -auto from_iterable(std::ranges::range auto&& iterable, const TScheduler& scheduler = TScheduler{}) requires rpp::details::is_header_included ; +auto from_iterable(constraint::iterable auto&& iterable, const TScheduler& scheduler = TScheduler{}) requires rpp::details::is_header_included ; template auto from_callable(std::invocable<> auto&& callable) requires rpp::details::is_header_included; diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index dcb603015..559c242f5 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -73,25 +74,25 @@ class subject_state : public std::enable_shared_from_this> void on_next(const T& v) { if (auto subs = extract_subscribers_under_lock_if_there()) - std::ranges::for_each(*subs, [&](const auto& sub) { sub.on_next(v); }); + rpp::utils::for_each(*subs, [&](const auto& sub) { sub.on_next(v); }); } void on_error(const std::exception_ptr& err) { if (auto subs = exchange_subscribers_under_lock_if_there(state_t{err})) - std::ranges::for_each(*subs, [&](const auto& sub) { sub.on_error(err); }); + rpp::utils::for_each(*subs, [&](const auto& sub) { sub.on_error(err); }); } void on_completed() { if (auto subs = exchange_subscribers_under_lock_if_there(completed{})) - std::ranges::for_each(*subs, std::mem_fn(&dynamic_subscriber::on_completed)); + rpp::utils::for_each(*subs, std::mem_fn(&dynamic_subscriber::on_completed)); } void on_unsubscribe() { if (auto subs = exchange_subscribers_under_lock_if_there(unsubscribed{})) - std::ranges::for_each(*subs, std::mem_fn(&dynamic_subscriber::unsubscribe)); + rpp::utils::for_each(*subs, std::mem_fn(&dynamic_subscriber::unsubscribe)); } private: @@ -104,9 +105,10 @@ class subject_state : public std::enable_shared_from_this> { auto subs = std::make_shared>>(); subs->reserve(expected_size); - std::ranges::copy_if(*current_subs, - std::back_inserter(*subs), - std::mem_fn(&dynamic_subscriber::is_subscribed)); + std::copy_if(current_subs->cbegin(), + current_subs->cend(), + std::back_inserter(*subs), + std::mem_fn(&dynamic_subscriber::is_subscribed)); return subs; } diff --git a/src/rpp/rpp/subscriptions/composite_subscription.hpp b/src/rpp/rpp/subscriptions/composite_subscription.hpp index afbb96a97..a6403dc72 100644 --- a/src/rpp/rpp/subscriptions/composite_subscription.hpp +++ b/src/rpp/rpp/subscriptions/composite_subscription.hpp @@ -10,12 +10,15 @@ #pragma once +#include "rpp/utils/utilities.hpp" + #include #include #include #include #include +#include #include namespace rpp @@ -154,7 +157,7 @@ class composite_subscription final : public subscription_base DepsState expected{DepsState::None}; if (m_state.compare_exchange_strong(expected, DepsState::Unsubscribed, std::memory_order::acq_rel)) { - std::ranges::for_each(m_deps, &subscription_state::unsubscribe); + rpp::utils::for_each(m_deps, std::mem_fn(&details::subscription_state::unsubscribe)); m_deps.clear(); return; } diff --git a/src/rpp/rpp/subscriptions/subscription_guard.hpp b/src/rpp/rpp/subscriptions/subscription_guard.hpp index 171f8237e..8ebb478c3 100644 --- a/src/rpp/rpp/subscriptions/subscription_guard.hpp +++ b/src/rpp/rpp/subscriptions/subscription_guard.hpp @@ -44,6 +44,7 @@ class subscription_guard } const subscription_base* operator->() const { return &m_sub; } + const subscription_base& operator*() const { return m_sub; } private: subscription_base m_sub; }; diff --git a/src/rpp/rpp/utils/constraints.hpp b/src/rpp/rpp/utils/constraints.hpp index f2a0a77ac..63a417c2f 100644 --- a/src/rpp/rpp/utils/constraints.hpp +++ b/src/rpp/rpp/utils/constraints.hpp @@ -12,7 +12,7 @@ #include #include - +#include namespace rpp::constraint { @@ -21,4 +21,11 @@ template concept decayed_same_as = std::same_as< template concept decayed_type = std::same_as, T>; template concept variadic_is_same_type = sizeof...(Types) == 1 && (decayed_same_as && ...); + +template +concept iterable = requires(R& rng) +{ + std::cbegin(rng); + std::cend(rng); +}; } // namespace rpp::constraint \ No newline at end of file diff --git a/src/rpp/rpp/utils/utilities.hpp b/src/rpp/rpp/utils/utilities.hpp index d0e330054..6a487a183 100644 --- a/src/rpp/rpp/utils/utilities.hpp +++ b/src/rpp/rpp/utils/utilities.hpp @@ -11,8 +11,10 @@ #pragma once #include +#include #include +#include #include #include #include @@ -36,6 +38,20 @@ using atomic_shared_ptr = std::shared_ptr; // used as interpetation of "void" struct none{}; +template +using iterable_value_t = std::iter_value_t()))>; + +template> Fn> +void for_each(Cont&& container, Fn&& fn) +{ + std::for_each(std::begin(container), std::end(container), std::forward(fn)); +} + +template> Fn> +bool all_of(const Cont& container, const Fn& fn) +{ + return std::all_of(std::cbegin(container), std::cend(container), fn); +} // some objects can't be copy/moved-assigned, but can be copy/move constructed (like immutable lambdas), so, use trick with optional and emplace. template diff --git a/src/tests/rpp/test_group_by.cpp b/src/tests/rpp/test_group_by.cpp index 0d6bef4f7..92b22953a 100644 --- a/src/tests/rpp/test_group_by.cpp +++ b/src/tests/rpp/test_group_by.cpp @@ -46,7 +46,7 @@ SCENARIO("group_by emits grouped seqences of values", "[group_by]") CHECK(grouped_mocks.size() == 4); for(const auto& [key, observer] : grouped_mocks) { - CHECK(std::ranges::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); + CHECK(rpp::utils::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); CHECK(observer.get_total_on_next_count() == 2); CHECK(observer.get_on_error_count() == 0); CHECK(observer.get_on_completed_count() == 1); @@ -70,7 +70,7 @@ SCENARIO("group_by emits grouped seqences of values", "[group_by]") CHECK(grouped_mocks.size() == 4); for(const auto& [key, observer] : grouped_mocks) { - CHECK(std::ranges::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); + CHECK(rpp::utils::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); if (key == 4) CHECK(observer.get_total_on_next_count() == 1); else @@ -92,7 +92,7 @@ SCENARIO("group_by emits grouped seqences of values", "[group_by]") for(const auto& [key, observer] : grouped_mocks) { - CHECK(std::ranges::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); + CHECK(rpp::utils::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); CHECK(observer.get_total_on_next_count() == 2); CHECK(observer.get_on_error_count() == 0); CHECK(observer.get_on_completed_count() == 1); @@ -133,18 +133,18 @@ SCENARIO("group_by keeps subscription till anyone subscribed", "[group_by]") { CHECK(sub.is_subscribed()); CHECK(sub_subscriptions.size() == 2); - CHECK(std::ranges::all_of(sub_subscriptions, [](const auto& sub){return sub.is_subscribed();})); + CHECK(rpp::utils::all_of(sub_subscriptions, [](const auto& sub){return sub.is_subscribed();})); AND_WHEN("unsubscribe root") { sub.unsubscribe(); THEN("sub-subscriptions are still alive") { - CHECK(std::ranges::all_of(sub_subscriptions, [](const auto& sub){return sub.is_subscribed();})); + CHECK(rpp::utils::all_of(sub_subscriptions, [](const auto& sub){return sub.is_subscribed();})); } } AND_WHEN("unsubscribe sub-subscriptions") { - std::ranges::for_each(sub_subscriptions, &rpp::composite_subscription::unsubscribe); + rpp::utils::for_each(sub_subscriptions, std::mem_fn(&rpp::composite_subscription::unsubscribe)); THEN("root subscription is still alive") { CHECK(sub.is_subscribed()); @@ -153,11 +153,11 @@ SCENARIO("group_by keeps subscription till anyone subscribed", "[group_by]") AND_WHEN("unsubscribe all") { sub.unsubscribe(); - std::ranges::for_each(sub_subscriptions, &rpp::composite_subscription::unsubscribe); + rpp::utils::for_each(sub_subscriptions, std::mem_fn(&rpp::composite_subscription::unsubscribe)); THEN("no any active subscriptions") { CHECK(!sub.is_subscribed()); - CHECK(std::ranges::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); + CHECK(rpp::utils::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); } } AND_WHEN("send on_error") @@ -166,7 +166,7 @@ SCENARIO("group_by keeps subscription till anyone subscribed", "[group_by]") THEN("no any active subscriptions") { CHECK(!sub.is_subscribed()); - CHECK(std::ranges::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); + CHECK(rpp::utils::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); CHECK(on_error_count == sub_subscriptions.size()+1); } } @@ -176,7 +176,7 @@ SCENARIO("group_by keeps subscription till anyone subscribed", "[group_by]") THEN("no any active subscriptions") { CHECK(!sub.is_subscribed()); - CHECK(std::ranges::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); + CHECK(rpp::utils::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); CHECK(on_completed_count == sub_subscriptions.size()+1); } diff --git a/src/tests/rpp/test_schedulers.cpp b/src/tests/rpp/test_schedulers.cpp index 7313b1d26..c4fefcb93 100644 --- a/src/tests/rpp/test_schedulers.cpp +++ b/src/tests/rpp/test_schedulers.cpp @@ -31,7 +31,7 @@ static std::string get_thread_id_as_string(std::thread::id id = std::this_thread static std::string simulate_nested_scheduling(const auto& worker, std::vector& out) { - const std::jthread thread([&] + std::thread thread([&] { worker.schedule([&] { @@ -56,7 +56,9 @@ static std::string simulate_nested_scheduling(const auto& worker, std::vector