From 236d374f89b03f8baa8c8c1c81c2115e04bf1a79 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 24 Nov 2022 23:53:39 +0300 Subject: [PATCH 01/17] Start to support macos --- .github/workflows/ci.yml | 7 ++-- .../schedulers/details/queue_worker_state.hpp | 34 +++++++++++++------ .../rpp/schedulers/new_thread_scheduler.hpp | 21 ++++++------ src/rpp/rpp/schedulers/run_loop_scheduler.hpp | 7 ++-- .../rpp/subscriptions/subscription_guard.hpp | 1 + 5 files changed, 42 insertions(+), 28 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a8a86c57..6c1a47020 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: download_deps: strategy: matrix: - os: [ubuntu-latest, windows-latest] + os: [ubuntu-latest, windows-latest, macos-latest] runs-on: ${{ matrix.os }} name: Build Deps - ${{ matrix.os }} @@ -120,7 +120,8 @@ jobs: matrix: config: [{name: ci-ubuntu-gcc, os: ubuntu-latest}, {name: ci-ubuntu-clang,os: ubuntu-latest}, - {name: ci-windows, os: windows-latest}] + {name: ci-windows, os: windows-latest}, + {name: ci-macos, os: macos-latest}] type: [tests, benchmarks] timeout-minutes: 20 @@ -137,7 +138,7 @@ jobs: uses: jurplel/install-qt-action@v3 - name: Install deps - if: matrix.config.os != 'windows-latest' + if: matrix.config.os == 'ubuntu-latest' run: | sudo apt-get update -q && sudo apt-get install clang-tidy cppcheck libsfml-dev -y -q pip install pyyaml diff --git a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp index 293876bee..50dfcef19 100644 --- a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp +++ b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp @@ -51,7 +51,9 @@ template class queue_worker_state { public: - queue_worker_state() noexcept = default; + queue_worker_state() = default; + queue_worker_state(const queue_worker_state&) = delete; + queue_worker_state(queue_worker_state&&) = delete; void emplace(time_point time_point, constraint::inner_schedulable_fn auto&& fn) { @@ -82,21 +84,23 @@ class queue_worker_state return true; } - bool pop_with_wait(std::optional& out, const std::stop_token& token) + bool pop_with_wait(std::optional& out) { - while (!token.stop_requested()) + while (m_subscription->is_subscribed()) { std::unique_lock lock{m_mutex}; - if (!m_cv.wait(lock, token, [&] { return !m_queue.empty(); })) + if (!m_cv.wait(lock, [&] { return !m_subscription->is_subscribed() || !m_queue.empty(); })) continue; if (!m_cv.wait_until(lock, - token, m_queue.top().get_time_point(), - std::bind_front(&queue_worker_state::is_any_ready_schedulable_unsafe, this))) + [&] { return !m_subscription->is_subscribed() || is_any_ready_schedulable_unsafe(); })) continue; + if (!m_subscription->is_subscribed()) + break; + out.emplace(std::move(m_queue.top().extract_function())); m_queue.pop(); return true; @@ -104,17 +108,17 @@ class queue_worker_state return false; } - void reset() + void destroy() { - std::lock_guard lock{ m_mutex }; - m_queue = std::priority_queue>{}; + m_subscription->unsubscribe(); } private: void emplace_safe(time_point time_point, constraint::inner_schedulable_fn auto&& fn) { - std::lock_guard lock{ m_mutex }; - m_queue.emplace(time_point, ++m_current_id, std::forward(fn)); + std::lock_guard lock{m_mutex}; + if (m_subscription->is_subscribed()) + m_queue.emplace(time_point, ++m_current_id, std::forward(fn)); } bool is_any_ready_schedulable_unsafe() const @@ -127,5 +131,13 @@ class queue_worker_state std::condition_variable_any m_cv{}; std::priority_queue> m_queue{}; size_t m_current_id{}; + subscription_guard m_subscription = callback_subscription{[&] + { + { + std::lock_guard lock{m_mutex}; + m_queue = std::priority_queue>{}; + } + m_cv.notify_one(); + }}; }; } // namespace rpp::schedulers::details diff --git a/src/rpp/rpp/schedulers/new_thread_scheduler.hpp b/src/rpp/rpp/schedulers/new_thread_scheduler.hpp index edb5ae12a..2582219a1 100644 --- a/src/rpp/rpp/schedulers/new_thread_scheduler.hpp +++ b/src/rpp/rpp/schedulers/new_thread_scheduler.hpp @@ -82,18 +82,19 @@ class new_thread final : public details::scheduler_tag void init_thread(const rpp::composite_subscription& sub) { - m_thread = std::jthread{[state = shared_from_this()](const std::stop_token& token) + m_thread = std::thread{[state = shared_from_this()]() { - state->data_thread(token); + state->data_thread(); }}; const auto callback = rpp::callback_subscription{[state = weak_from_this()] { - auto locked = state.lock(); + const auto locked = state.lock(); if (!locked) return; - locked->m_thread.request_stop(); - if (locked->m_thread.get_id() != std::this_thread::get_id()) + locked->m_queue.destroy(); + + if (locked->m_thread.joinable() && locked->m_thread.get_id() != std::this_thread::get_id()) locked->m_thread.join(); else locked->m_thread.detach(); @@ -103,12 +104,12 @@ class new_thread final : public details::scheduler_tag } private: - void data_thread(const std::stop_token& token) + void data_thread() { std::optional fn{}; - while (!token.stop_requested()) + while (m_sub->is_subscribed()) { - if (m_queue.pop_with_wait(fn, token)) + if (m_queue.pop_with_wait(fn)) { (*fn)(); fn.reset(); @@ -116,11 +117,11 @@ class new_thread final : public details::scheduler_tag } // clear - m_queue.reset(); + m_queue.destroy(); } details::queue_worker_state m_queue{}; - std::jthread m_thread{}; + std::thread m_thread{}; rpp::subscription_guard m_sub = rpp::subscription_base::empty(); }; diff --git a/src/rpp/rpp/schedulers/run_loop_scheduler.hpp b/src/rpp/rpp/schedulers/run_loop_scheduler.hpp index 71bceba9c..698497de4 100644 --- a/src/rpp/rpp/schedulers/run_loop_scheduler.hpp +++ b/src/rpp/rpp/schedulers/run_loop_scheduler.hpp @@ -76,13 +76,12 @@ class run_loop final : public details::scheduler_tag ~state() { - m_source.request_stop(); + m_queue.destroy(); m_sub.unsubscribe(); } details::queue_worker_state& get_queue() { return m_queue; } - std::stop_token get_token() const { return m_source.get_token(); } - + const composite_subscription& get_subscription() const { return m_sub; } private: rpp::composite_subscription m_sub; @@ -126,7 +125,7 @@ class run_loop final : public details::scheduler_tag void dispatch() const { std::optional fn{}; - if (m_state->get_queue().pop_with_wait(fn, m_state->get_token())) + if (m_state->get_queue().pop_with_wait(fn)) (*fn)(); } diff --git a/src/rpp/rpp/subscriptions/subscription_guard.hpp b/src/rpp/rpp/subscriptions/subscription_guard.hpp index 171f8237e..8ebb478c3 100644 --- a/src/rpp/rpp/subscriptions/subscription_guard.hpp +++ b/src/rpp/rpp/subscriptions/subscription_guard.hpp @@ -44,6 +44,7 @@ class subscription_guard } const subscription_base* operator->() const { return &m_sub; } + const subscription_base& operator*() const { return m_sub; } private: subscription_base m_sub; }; From 1641f49995a608514eab9c59cd7dad5bb6045f6b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 00:06:24 +0300 Subject: [PATCH 02/17] minor fix --- src/rpp/rpp/schedulers/details/queue_worker_state.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp index 50dfcef19..2b4b2132c 100644 --- a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp +++ b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp @@ -90,8 +90,7 @@ class queue_worker_state { std::unique_lock lock{m_mutex}; - if (!m_cv.wait(lock, [&] { return !m_subscription->is_subscribed() || !m_queue.empty(); })) - continue; + m_cv.wait(lock, [&] { return !m_subscription->is_subscribed() || !m_queue.empty(); }); if (!m_cv.wait_until(lock, m_queue.top().get_time_point(), From 6e9042f9de9416ce52e0e2bedffb2a974ce4a3eb Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 00:18:25 +0300 Subject: [PATCH 03/17] get rid of std::ranges --- src/examples/rpp/petri/petri.cpp | 10 ++++----- src/rpp/rpp/sources/from.hpp | 22 +++++++++++-------- src/rpp/rpp/sources/fwd.hpp | 4 +--- .../rpp/subjects/details/subject_state.hpp | 16 ++++++++------ .../subscriptions/composite_subscription.hpp | 5 ++++- src/rpp/rpp/utils/constraints.hpp | 9 +++++++- src/rpp/rpp/utils/utilities.hpp | 16 ++++++++++++++ src/tests/rpp/test_group_by.cpp | 20 ++++++++--------- 8 files changed, 65 insertions(+), 37 deletions(-) diff --git a/src/examples/rpp/petri/petri.cpp b/src/examples/rpp/petri/petri.cpp index cdb07a45a..9c775b7ce 100644 --- a/src/examples/rpp/petri/petri.cpp +++ b/src/examples/rpp/petri/petri.cpp @@ -19,8 +19,6 @@ using PetriNet = std::map; using Marking = std::map; typedef void (*const TransitionFunction)(void); -using namespace std::ranges; - void foo() { std::cout << "foo" << std::endl; } void bar() { std::cout << "bar" << std::endl; } @@ -39,16 +37,16 @@ const auto& run_transition(Transition t) std::pair mutate_marking(Marking&& marking, const Places& produce) { // Produce tokens - for_each(produce, [&marking](const auto& place) { marking[place] += 1; }); + std::for_each(produce.cbegin(), produce.cend(), [&marking](const auto& place) { marking[place] += 1; }); // Consume tokens by exeucting enabled transitions Transitions transitions; for (const auto& [transition, mutations] : net) { const auto& c = mutations.consume; if (!c.empty() && - all_of(c, [&](const auto& place) { return marking[place] >= count(c, place); })) + std::all_of(c.cbegin(), c.cend(), [&](const auto& place) { return marking[place] >= std::count(c.cbegin(), c.cend(), place); })) { - for_each(c, [&marking](const auto& place) { marking[place] -= 1; }); + std::for_each(c.cbegin(), c.cend(), [&marking](const auto& place) { marking[place] -= 1; }); transitions.push_back(transition); } } @@ -78,7 +76,7 @@ int main() { Transitions transitions; std::tie(marking, transitions) = mutate_marking(std::forward(marking), places); - for_each(transitions, + std::for_each(transitions.cbegin(), transitions.cend(), [dispatcher = transitions_subject.get_subscriber()](const auto& t) { dispatcher.on_next(t); }); return std::forward(marking); diff --git a/src/rpp/rpp/sources/from.hpp b/src/rpp/rpp/sources/from.hpp index fa10e5151..113f179f3 100644 --- a/src/rpp/rpp/sources/from.hpp +++ b/src/rpp/rpp/sources/from.hpp @@ -21,7 +21,6 @@ #include -#include #include IMPLEMENTATION_FILE(just_tag); @@ -32,10 +31,13 @@ namespace rpp::observable::details template auto extract_iterable_from_packed(const T & v) -> const auto& { - if constexpr (std::ranges::range) - return v; - else - return *v; + return v; +} + +template +auto extract_iterable_from_packed(const std::shared_ptr & v) -> const auto& +{ + return *v; } void iterate(auto&& iterable, @@ -66,7 +68,7 @@ void iterate(auto&& iterable, const auto end = std::cend(extracted_iterable); auto itr = std::cbegin(extracted_iterable); - std::ranges::advance(itr, static_cast(index), end); + std::advance(itr, static_cast(index)); if (itr != end) { @@ -90,7 +92,9 @@ void iterate(auto&& iterable, } } -template + + +template auto pack_to_container(Ts&& ...items) { if constexpr (memory_model == memory_model::use_stack) @@ -215,10 +219,10 @@ auto just(T&& item, Ts&& ...items) requires (rpp::details::is_header_included -auto from_iterable(std::ranges::range auto&& iterable, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included +auto from_iterable(constraint::iterable auto&& iterable, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included { using Container = std::decay_t; - return create>(details::iterate_impl{ details::pack_to_container(std::forward(iterable)), scheduler }); + return create>(details::iterate_impl{ details::pack_to_container(std::forward(iterable)), scheduler }); } /** diff --git a/src/rpp/rpp/sources/fwd.hpp b/src/rpp/rpp/sources/fwd.hpp index ae82fdcbb..a4431640d 100644 --- a/src/rpp/rpp/sources/fwd.hpp +++ b/src/rpp/rpp/sources/fwd.hpp @@ -19,8 +19,6 @@ #include -#include - namespace rpp::details { struct create_tag; @@ -63,7 +61,7 @@ template && (constraint::decayed_same_as && ...)); template -auto from_iterable(std::ranges::range auto&& iterable, const TScheduler& scheduler = TScheduler{}) requires rpp::details::is_header_included ; +auto from_iterable(constraint::iterable auto&& iterable, const TScheduler& scheduler = TScheduler{}) requires rpp::details::is_header_included ; template auto from_callable(std::invocable<> auto&& callable) requires rpp::details::is_header_included; diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index dcb603015..559c242f5 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -73,25 +74,25 @@ class subject_state : public std::enable_shared_from_this> void on_next(const T& v) { if (auto subs = extract_subscribers_under_lock_if_there()) - std::ranges::for_each(*subs, [&](const auto& sub) { sub.on_next(v); }); + rpp::utils::for_each(*subs, [&](const auto& sub) { sub.on_next(v); }); } void on_error(const std::exception_ptr& err) { if (auto subs = exchange_subscribers_under_lock_if_there(state_t{err})) - std::ranges::for_each(*subs, [&](const auto& sub) { sub.on_error(err); }); + rpp::utils::for_each(*subs, [&](const auto& sub) { sub.on_error(err); }); } void on_completed() { if (auto subs = exchange_subscribers_under_lock_if_there(completed{})) - std::ranges::for_each(*subs, std::mem_fn(&dynamic_subscriber::on_completed)); + rpp::utils::for_each(*subs, std::mem_fn(&dynamic_subscriber::on_completed)); } void on_unsubscribe() { if (auto subs = exchange_subscribers_under_lock_if_there(unsubscribed{})) - std::ranges::for_each(*subs, std::mem_fn(&dynamic_subscriber::unsubscribe)); + rpp::utils::for_each(*subs, std::mem_fn(&dynamic_subscriber::unsubscribe)); } private: @@ -104,9 +105,10 @@ class subject_state : public std::enable_shared_from_this> { auto subs = std::make_shared>>(); subs->reserve(expected_size); - std::ranges::copy_if(*current_subs, - std::back_inserter(*subs), - std::mem_fn(&dynamic_subscriber::is_subscribed)); + std::copy_if(current_subs->cbegin(), + current_subs->cend(), + std::back_inserter(*subs), + std::mem_fn(&dynamic_subscriber::is_subscribed)); return subs; } diff --git a/src/rpp/rpp/subscriptions/composite_subscription.hpp b/src/rpp/rpp/subscriptions/composite_subscription.hpp index afbb96a97..718a972f5 100644 --- a/src/rpp/rpp/subscriptions/composite_subscription.hpp +++ b/src/rpp/rpp/subscriptions/composite_subscription.hpp @@ -10,6 +10,8 @@ #pragma once +#include "rpp/utils/utilities.hpp" + #include #include #include @@ -154,7 +156,8 @@ class composite_subscription final : public subscription_base DepsState expected{DepsState::None}; if (m_state.compare_exchange_strong(expected, DepsState::Unsubscribed, std::memory_order::acq_rel)) { - std::ranges::for_each(m_deps, &subscription_state::unsubscribe); + for(const auto& state : m_deps) + state->unsubscribe(); m_deps.clear(); return; } diff --git a/src/rpp/rpp/utils/constraints.hpp b/src/rpp/rpp/utils/constraints.hpp index f2a0a77ac..63a417c2f 100644 --- a/src/rpp/rpp/utils/constraints.hpp +++ b/src/rpp/rpp/utils/constraints.hpp @@ -12,7 +12,7 @@ #include #include - +#include namespace rpp::constraint { @@ -21,4 +21,11 @@ template concept decayed_same_as = std::same_as< template concept decayed_type = std::same_as, T>; template concept variadic_is_same_type = sizeof...(Types) == 1 && (decayed_same_as && ...); + +template +concept iterable = requires(R& rng) +{ + std::cbegin(rng); + std::cend(rng); +}; } // namespace rpp::constraint \ No newline at end of file diff --git a/src/rpp/rpp/utils/utilities.hpp b/src/rpp/rpp/utils/utilities.hpp index d0e330054..503318df2 100644 --- a/src/rpp/rpp/utils/utilities.hpp +++ b/src/rpp/rpp/utils/utilities.hpp @@ -11,8 +11,10 @@ #pragma once #include +#include #include +#include #include #include #include @@ -36,6 +38,20 @@ using atomic_shared_ptr = std::shared_ptr; // used as interpetation of "void" struct none{}; +template +using iterable_value_t = typename decltype(std::begin(std::declval()))::value_type; + +template> Fn> +void for_each(Cont&& container, Fn&& fn) +{ + std::for_each(std::begin(container), std::end(container), std::forward(fn)); +} + +template> Fn> +bool all_of(const Cont& container, const Fn& fn) +{ + return std::all_of(std::cbegin(container), std::cend(container), fn); +} // some objects can't be copy/moved-assigned, but can be copy/move constructed (like immutable lambdas), so, use trick with optional and emplace. template diff --git a/src/tests/rpp/test_group_by.cpp b/src/tests/rpp/test_group_by.cpp index 0d6bef4f7..3f7dd5678 100644 --- a/src/tests/rpp/test_group_by.cpp +++ b/src/tests/rpp/test_group_by.cpp @@ -46,7 +46,7 @@ SCENARIO("group_by emits grouped seqences of values", "[group_by]") CHECK(grouped_mocks.size() == 4); for(const auto& [key, observer] : grouped_mocks) { - CHECK(std::ranges::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); + CHECK(rpp::utils::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); CHECK(observer.get_total_on_next_count() == 2); CHECK(observer.get_on_error_count() == 0); CHECK(observer.get_on_completed_count() == 1); @@ -70,7 +70,7 @@ SCENARIO("group_by emits grouped seqences of values", "[group_by]") CHECK(grouped_mocks.size() == 4); for(const auto& [key, observer] : grouped_mocks) { - CHECK(std::ranges::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); + CHECK(rpp::utils::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); if (key == 4) CHECK(observer.get_total_on_next_count() == 1); else @@ -92,7 +92,7 @@ SCENARIO("group_by emits grouped seqences of values", "[group_by]") for(const auto& [key, observer] : grouped_mocks) { - CHECK(std::ranges::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); + CHECK(rpp::utils::all_of(observer.get_received_values(), [key=key](int v){return v == key;})); CHECK(observer.get_total_on_next_count() == 2); CHECK(observer.get_on_error_count() == 0); CHECK(observer.get_on_completed_count() == 1); @@ -133,18 +133,18 @@ SCENARIO("group_by keeps subscription till anyone subscribed", "[group_by]") { CHECK(sub.is_subscribed()); CHECK(sub_subscriptions.size() == 2); - CHECK(std::ranges::all_of(sub_subscriptions, [](const auto& sub){return sub.is_subscribed();})); + CHECK(rpp::utils::all_of(sub_subscriptions, [](const auto& sub){return sub.is_subscribed();})); AND_WHEN("unsubscribe root") { sub.unsubscribe(); THEN("sub-subscriptions are still alive") { - CHECK(std::ranges::all_of(sub_subscriptions, [](const auto& sub){return sub.is_subscribed();})); + CHECK(rpp::utils::all_of(sub_subscriptions, [](const auto& sub){return sub.is_subscribed();})); } } AND_WHEN("unsubscribe sub-subscriptions") { - std::ranges::for_each(sub_subscriptions, &rpp::composite_subscription::unsubscribe); + rpp::utils::for_each(sub_subscriptions, &rpp::composite_subscription::unsubscribe); THEN("root subscription is still alive") { CHECK(sub.is_subscribed()); @@ -153,11 +153,11 @@ SCENARIO("group_by keeps subscription till anyone subscribed", "[group_by]") AND_WHEN("unsubscribe all") { sub.unsubscribe(); - std::ranges::for_each(sub_subscriptions, &rpp::composite_subscription::unsubscribe); + rpp::utils::for_each(sub_subscriptions, &rpp::composite_subscription::unsubscribe); THEN("no any active subscriptions") { CHECK(!sub.is_subscribed()); - CHECK(std::ranges::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); + CHECK(rpp::utils::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); } } AND_WHEN("send on_error") @@ -166,7 +166,7 @@ SCENARIO("group_by keeps subscription till anyone subscribed", "[group_by]") THEN("no any active subscriptions") { CHECK(!sub.is_subscribed()); - CHECK(std::ranges::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); + CHECK(rpp::utils::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); CHECK(on_error_count == sub_subscriptions.size()+1); } } @@ -176,7 +176,7 @@ SCENARIO("group_by keeps subscription till anyone subscribed", "[group_by]") THEN("no any active subscriptions") { CHECK(!sub.is_subscribed()); - CHECK(std::ranges::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); + CHECK(rpp::utils::all_of(sub_subscriptions, [](const auto& sub){return !sub.is_subscribed();})); CHECK(on_completed_count == sub_subscriptions.size()+1); } From 3e183f3285acdf0e91db92bae6444dd0718db72b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 00:19:10 +0300 Subject: [PATCH 04/17] one more --- src/examples/rpp/sfml/snake/snake.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/examples/rpp/sfml/snake/snake.cpp b/src/examples/rpp/sfml/snake/snake.cpp index 1f352a6e6..5d8d9fe90 100644 --- a/src/examples/rpp/sfml/snake/snake.cpp +++ b/src/examples/rpp/sfml/snake/snake.cpp @@ -54,7 +54,7 @@ static bool is_snake_eat_self(const SnakeBody& body) static Coordinates update_apple_position_if_eat(Coordinates&& apple_position, const SnakeBody& snake) { - if (std::ranges::find(snake, apple_position) == snake.cend()) + if (std::find(snake.cbegin(), snake.cend(), apple_position) == snake.cend()) return apple_position; static std::random_device rd; From d4a40f0916e761ea3ae12752991929caccae5ecf Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 00:21:54 +0300 Subject: [PATCH 05/17] add caching --- .github/workflows/ci.yml | 10 ++++++++++ .github/workflows/sonarcloud_analyze.yml | 2 ++ 2 files changed, 12 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6c1a47020..3f13ed0ad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -63,6 +63,8 @@ jobs: - name: Install Qt uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Restore Catch2 uses: actions/cache@v3 @@ -98,6 +100,8 @@ jobs: - name: Install Qt uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Restore Catch2 uses: actions/cache@v3 @@ -136,6 +140,8 @@ jobs: - name: Install Qt if: matrix.type == 'tests' uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Install deps if: matrix.config.os == 'ubuntu-latest' @@ -199,6 +205,8 @@ jobs: - name: Install Qt uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Cache SonarCloud packages uses: actions/cache@v3 @@ -284,6 +292,8 @@ jobs: - name: Install Qt uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Install project and build env: diff --git a/.github/workflows/sonarcloud_analyze.yml b/.github/workflows/sonarcloud_analyze.yml index 3b77d7d6f..dfe75e253 100644 --- a/.github/workflows/sonarcloud_analyze.yml +++ b/.github/workflows/sonarcloud_analyze.yml @@ -24,6 +24,8 @@ jobs: - name: Install Qt uses: jurplel/install-qt-action@v3 + with: + cache: true - name: Cmake run: cmake -S . -B build -DRPP_BUILD_TESTS=1 -DRPP_BUILD_QT_CODE=1 From c4053fcea2ba507b2bd6b2573bbb5b0ef6a7a19d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 00:22:40 +0300 Subject: [PATCH 06/17] remove stop_source --- src/rpp/rpp/schedulers/run_loop_scheduler.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rpp/rpp/schedulers/run_loop_scheduler.hpp b/src/rpp/rpp/schedulers/run_loop_scheduler.hpp index 698497de4..f92e71d1d 100644 --- a/src/rpp/rpp/schedulers/run_loop_scheduler.hpp +++ b/src/rpp/rpp/schedulers/run_loop_scheduler.hpp @@ -86,7 +86,6 @@ class run_loop final : public details::scheduler_tag private: rpp::composite_subscription m_sub; details::queue_worker_state m_queue{}; - std::stop_source m_source{}; }; public: From 5a58e431945d2bbbf32b496a5f3fbfe879ba8f86 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 00:29:10 +0300 Subject: [PATCH 07/17] try to compile --- src/rpp/rpp/utils/utilities.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/utils/utilities.hpp b/src/rpp/rpp/utils/utilities.hpp index 503318df2..1a60e45d4 100644 --- a/src/rpp/rpp/utils/utilities.hpp +++ b/src/rpp/rpp/utils/utilities.hpp @@ -39,7 +39,7 @@ using atomic_shared_ptr = std::shared_ptr; struct none{}; template -using iterable_value_t = typename decltype(std::begin(std::declval()))::value_type; +using iterable_value_t = typename decltype(std::cbegin(std::declval()))::value_type; template> Fn> void for_each(Cont&& container, Fn&& fn) From 9fd9d3d5fa6d8fe9e3f98e5b6267c2b9604622a5 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 00:36:00 +0300 Subject: [PATCH 08/17] fix traits --- src/rpp/rpp/utils/utilities.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/utils/utilities.hpp b/src/rpp/rpp/utils/utilities.hpp index 1a60e45d4..6a487a183 100644 --- a/src/rpp/rpp/utils/utilities.hpp +++ b/src/rpp/rpp/utils/utilities.hpp @@ -39,7 +39,7 @@ using atomic_shared_ptr = std::shared_ptr; struct none{}; template -using iterable_value_t = typename decltype(std::cbegin(std::declval()))::value_type; +using iterable_value_t = std::iter_value_t()))>; template> Fn> void for_each(Cont&& container, Fn&& fn) From 93cc833800affac6897a51e644109a2c9e0a86d9 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 10:21:46 +0300 Subject: [PATCH 09/17] Compile --- src/rpp/rpp/subscriptions/composite_subscription.hpp | 3 +-- src/tests/rpp/test_group_by.cpp | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/rpp/rpp/subscriptions/composite_subscription.hpp b/src/rpp/rpp/subscriptions/composite_subscription.hpp index 718a972f5..8e2a5d391 100644 --- a/src/rpp/rpp/subscriptions/composite_subscription.hpp +++ b/src/rpp/rpp/subscriptions/composite_subscription.hpp @@ -156,8 +156,7 @@ class composite_subscription final : public subscription_base DepsState expected{DepsState::None}; if (m_state.compare_exchange_strong(expected, DepsState::Unsubscribed, std::memory_order::acq_rel)) { - for(const auto& state : m_deps) - state->unsubscribe(); + rpp::utils::for_each(m_deps, std::mem_fn(&details::subscription_state::unsubscribe)); m_deps.clear(); return; } diff --git a/src/tests/rpp/test_group_by.cpp b/src/tests/rpp/test_group_by.cpp index 3f7dd5678..92b22953a 100644 --- a/src/tests/rpp/test_group_by.cpp +++ b/src/tests/rpp/test_group_by.cpp @@ -144,7 +144,7 @@ SCENARIO("group_by keeps subscription till anyone subscribed", "[group_by]") } AND_WHEN("unsubscribe sub-subscriptions") { - rpp::utils::for_each(sub_subscriptions, &rpp::composite_subscription::unsubscribe); + rpp::utils::for_each(sub_subscriptions, std::mem_fn(&rpp::composite_subscription::unsubscribe)); THEN("root subscription is still alive") { CHECK(sub.is_subscribed()); @@ -153,7 +153,7 @@ SCENARIO("group_by keeps subscription till anyone subscribed", "[group_by]") AND_WHEN("unsubscribe all") { sub.unsubscribe(); - rpp::utils::for_each(sub_subscriptions, &rpp::composite_subscription::unsubscribe); + rpp::utils::for_each(sub_subscriptions, std::mem_fn(&rpp::composite_subscription::unsubscribe)); THEN("no any active subscriptions") { CHECK(!sub.is_subscribed()); From 390f9b3691b4351cd9f0072e535c44dc06d06938 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 10:25:53 +0300 Subject: [PATCH 10/17] Update queue_worker_state.hpp --- src/rpp/rpp/schedulers/details/queue_worker_state.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp index 2b4b2132c..eb4bae6e1 100644 --- a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp +++ b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp @@ -11,6 +11,7 @@ #include // own forwarding #include +#include #include #include From 5445e84eea82048e29bf45a1b1f3292a1f500ed9 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 10:34:03 +0300 Subject: [PATCH 11/17] Compile --- .github/workflows/ci.yml | 4 ++-- Readme.md | 6 +++++- src/tests/rpp/test_schedulers.cpp | 6 ++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3f13ed0ad..2018debb3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: download_deps: strategy: matrix: - os: [ubuntu-latest, windows-latest, macos-latest] + os: [ubuntu-latest, windows-latest, macos-10.15] runs-on: ${{ matrix.os }} name: Build Deps - ${{ matrix.os }} @@ -125,7 +125,7 @@ jobs: config: [{name: ci-ubuntu-gcc, os: ubuntu-latest}, {name: ci-ubuntu-clang,os: ubuntu-latest}, {name: ci-windows, os: windows-latest}, - {name: ci-macos, os: macos-latest}] + {name: ci-macos, os: macos-10.15}] type: [tests, benchmarks] timeout-minutes: 20 diff --git a/Readme.md b/Readme.md index 763ed7570..620916099 100644 --- a/Readme.md +++ b/Readme.md @@ -70,7 +70,11 @@ amount_of_clicks Main advantages of ReactivePlusPlus are that it is written in Modern C++ with Performance and Usage in mind. As a result it is fast, readable, easy to use and well-documented. And it is proven with [continous benchmarking results and comparison with RxCpp](https://victimsnino.github.io/ReactivePlusPlus/benchmark) -**NOTE**: ReactivePlusPlus is library for C++20. So, it works only on compilers that supports most C++20 features. CI uses gcc-10, clang-11, visual studio 2022 +**NOTE**: ReactivePlusPlus is library for C++20. So, it works only on compilers that supports most C++20 features. CI uses successfully: +- (ubuntu) gcc-10 +- (ubuntu) clang-11 +- (windows) visual studio 2022 +- (macos) Apple Clang 12 # Useful links - [Why ReactivePlusPlus? What about existing Reactive Extension libraries for C++?](https://victimsnino.github.io/ReactivePlusPlus/docs/html/why_rpp.html) diff --git a/src/tests/rpp/test_schedulers.cpp b/src/tests/rpp/test_schedulers.cpp index 7313b1d26..bea28c975 100644 --- a/src/tests/rpp/test_schedulers.cpp +++ b/src/tests/rpp/test_schedulers.cpp @@ -31,7 +31,7 @@ static std::string get_thread_id_as_string(std::thread::id id = std::this_thread static std::string simulate_nested_scheduling(const auto& worker, std::vector& out) { - const std::jthread thread([&] + const std::thread thread([&] { worker.schedule([&] { @@ -56,7 +56,9 @@ static std::string simulate_nested_scheduling(const auto& worker, std::vector Date: Fri, 25 Nov 2022 10:38:31 +0300 Subject: [PATCH 12/17] One more macos version --- .github/workflows/ci.yml | 4 ++-- Readme.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2018debb3..f97956057 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: download_deps: strategy: matrix: - os: [ubuntu-latest, windows-latest, macos-10.15] + os: [ubuntu-latest, windows-latest, macos-11] runs-on: ${{ matrix.os }} name: Build Deps - ${{ matrix.os }} @@ -125,7 +125,7 @@ jobs: config: [{name: ci-ubuntu-gcc, os: ubuntu-latest}, {name: ci-ubuntu-clang,os: ubuntu-latest}, {name: ci-windows, os: windows-latest}, - {name: ci-macos, os: macos-10.15}] + {name: ci-macos, os: macos-11}] type: [tests, benchmarks] timeout-minutes: 20 diff --git a/Readme.md b/Readme.md index 620916099..ebf6e75a8 100644 --- a/Readme.md +++ b/Readme.md @@ -70,11 +70,11 @@ amount_of_clicks Main advantages of ReactivePlusPlus are that it is written in Modern C++ with Performance and Usage in mind. As a result it is fast, readable, easy to use and well-documented. And it is proven with [continous benchmarking results and comparison with RxCpp](https://victimsnino.github.io/ReactivePlusPlus/benchmark) -**NOTE**: ReactivePlusPlus is library for C++20. So, it works only on compilers that supports most C++20 features. CI uses successfully: +**NOTE**: ReactivePlusPlus is library for C++20. So, it works only on compilers that supports most C++20 features. List of minimal supported compilers: - (ubuntu) gcc-10 - (ubuntu) clang-11 - (windows) visual studio 2022 -- (macos) Apple Clang 12 +- (macos) Apple Clang 13 # Useful links - [Why ReactivePlusPlus? What about existing Reactive Extension libraries for C++?](https://victimsnino.github.io/ReactivePlusPlus/docs/html/why_rpp.html) From 25deea2e17098dd2a4dc057161d78ff8cd229145 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 10:43:30 +0300 Subject: [PATCH 13/17] One more macos version --- .github/workflows/ci.yml | 4 ++-- Readme.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f97956057..3f13ed0ad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: download_deps: strategy: matrix: - os: [ubuntu-latest, windows-latest, macos-11] + os: [ubuntu-latest, windows-latest, macos-latest] runs-on: ${{ matrix.os }} name: Build Deps - ${{ matrix.os }} @@ -125,7 +125,7 @@ jobs: config: [{name: ci-ubuntu-gcc, os: ubuntu-latest}, {name: ci-ubuntu-clang,os: ubuntu-latest}, {name: ci-windows, os: windows-latest}, - {name: ci-macos, os: macos-11}] + {name: ci-macos, os: macos-latest}] type: [tests, benchmarks] timeout-minutes: 20 diff --git a/Readme.md b/Readme.md index ebf6e75a8..6abc7004c 100644 --- a/Readme.md +++ b/Readme.md @@ -74,7 +74,7 @@ Main advantages of ReactivePlusPlus are that it is written in Modern C++ with Pe - (ubuntu) gcc-10 - (ubuntu) clang-11 - (windows) visual studio 2022 -- (macos) Apple Clang 13 +- (macos) Apple Clang 14 # Useful links - [Why ReactivePlusPlus? What about existing Reactive Extension libraries for C++?](https://victimsnino.github.io/ReactivePlusPlus/docs/html/why_rpp.html) From 16573f268c1f3a71a35b5675fbae615f06df1049 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 10:48:42 +0300 Subject: [PATCH 14/17] Update test_schedulers.cpp --- src/tests/rpp/test_schedulers.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/rpp/test_schedulers.cpp b/src/tests/rpp/test_schedulers.cpp index bea28c975..c4fefcb93 100644 --- a/src/tests/rpp/test_schedulers.cpp +++ b/src/tests/rpp/test_schedulers.cpp @@ -31,7 +31,7 @@ static std::string get_thread_id_as_string(std::thread::id id = std::this_thread static std::string simulate_nested_scheduling(const auto& worker, std::vector& out) { - const std::thread thread([&] + std::thread thread([&] { worker.schedule([&] { From fe214eb7b2b08bc3074d6f99a6277fe8542a6eda Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 11:30:54 +0300 Subject: [PATCH 15/17] Update queue_worker_state.hpp --- .../rpp/schedulers/details/queue_worker_state.hpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp index eb4bae6e1..ba764d14f 100644 --- a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp +++ b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp @@ -1,10 +1,10 @@ // ReactivePlusPlus library -// +// // Copyright Aleksey Loginov 2022 - present. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // https://www.boost.org/LICENSE_1_0.txt) -// +// // Project home: https://github.com/victimsnino/ReactivePlusPlus #pragma once @@ -93,13 +93,13 @@ class queue_worker_state m_cv.wait(lock, [&] { return !m_subscription->is_subscribed() || !m_queue.empty(); }); - if (!m_cv.wait_until(lock, - m_queue.top().get_time_point(), - [&] { return !m_subscription->is_subscribed() || is_any_ready_schedulable_unsafe(); })) + if (m_queue.empty() || !m_cv.wait_until(lock, + m_queue.top().get_time_point(), + [&] { return !m_subscription->is_subscribed() || is_any_ready_schedulable_unsafe(); })) continue; if (!m_subscription->is_subscribed()) - break; + return false; out.emplace(std::move(m_queue.top().extract_function())); m_queue.pop(); From 928a364857b5e82cf55cb1018c10f21eaa11bf81 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 11:44:39 +0300 Subject: [PATCH 16/17] compile win --- src/rpp/rpp/subscriptions/composite_subscription.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rpp/rpp/subscriptions/composite_subscription.hpp b/src/rpp/rpp/subscriptions/composite_subscription.hpp index 8e2a5d391..a6403dc72 100644 --- a/src/rpp/rpp/subscriptions/composite_subscription.hpp +++ b/src/rpp/rpp/subscriptions/composite_subscription.hpp @@ -18,6 +18,7 @@ #include #include +#include #include namespace rpp From aad83b7c2cb93a93a7a010c2a4879b6bf857787e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 25 Nov 2022 23:42:30 +0300 Subject: [PATCH 17/17] try to fix --- src/rpp/rpp/schedulers/details/queue_worker_state.hpp | 7 ++++++- src/rpp/rpp/schedulers/new_thread_scheduler.hpp | 9 ++++++--- src/rpp/rpp/schedulers/run_loop_scheduler.hpp | 2 +- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp index ba764d14f..ae0814a08 100644 --- a/src/rpp/rpp/schedulers/details/queue_worker_state.hpp +++ b/src/rpp/rpp/schedulers/details/queue_worker_state.hpp @@ -108,7 +108,12 @@ class queue_worker_state return false; } - void destroy() + bool is_subscribed() const + { + return m_subscription->is_subscribed(); + } + + void unsubscribe() { m_subscription->unsubscribe(); } diff --git a/src/rpp/rpp/schedulers/new_thread_scheduler.hpp b/src/rpp/rpp/schedulers/new_thread_scheduler.hpp index 2582219a1..830b45e97 100644 --- a/src/rpp/rpp/schedulers/new_thread_scheduler.hpp +++ b/src/rpp/rpp/schedulers/new_thread_scheduler.hpp @@ -38,6 +38,9 @@ class new_thread final : public details::scheduler_tag worker_strategy(const rpp::composite_subscription& sub) { + if (!sub.is_subscribed()) + return; + auto shared = std::make_shared(); // init while it is alive as shared shared->init_thread(sub); @@ -92,7 +95,7 @@ class new_thread final : public details::scheduler_tag if (!locked) return; - locked->m_queue.destroy(); + locked->m_queue.unsubscribe(); if (locked->m_thread.joinable() && locked->m_thread.get_id() != std::this_thread::get_id()) locked->m_thread.join(); @@ -107,7 +110,7 @@ class new_thread final : public details::scheduler_tag void data_thread() { std::optional fn{}; - while (m_sub->is_subscribed()) + while (m_queue.is_subscribed()) { if (m_queue.pop_with_wait(fn)) { @@ -117,7 +120,7 @@ class new_thread final : public details::scheduler_tag } // clear - m_queue.destroy(); + m_queue.unsubscribe(); } details::queue_worker_state m_queue{}; diff --git a/src/rpp/rpp/schedulers/run_loop_scheduler.hpp b/src/rpp/rpp/schedulers/run_loop_scheduler.hpp index f92e71d1d..98f778553 100644 --- a/src/rpp/rpp/schedulers/run_loop_scheduler.hpp +++ b/src/rpp/rpp/schedulers/run_loop_scheduler.hpp @@ -76,7 +76,7 @@ class run_loop final : public details::scheduler_tag ~state() { - m_queue.destroy(); + m_queue.unsubscribe(); m_sub.unsubscribe(); }