Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
download_deps:
strategy:
matrix:
os: [ubuntu-latest, windows-latest]
os: [ubuntu-latest, windows-latest, macos-latest]

runs-on: ${{ matrix.os }}
name: Build Deps - ${{ matrix.os }}
Expand Down Expand Up @@ -63,6 +63,8 @@ jobs:

- name: Install Qt
uses: jurplel/install-qt-action@v3
with:
cache: true

- name: Restore Catch2
uses: actions/cache@v3
Expand Down Expand Up @@ -98,6 +100,8 @@ jobs:

- name: Install Qt
uses: jurplel/install-qt-action@v3
with:
cache: true

- name: Restore Catch2
uses: actions/cache@v3
Expand All @@ -120,7 +124,8 @@ jobs:
matrix:
config: [{name: ci-ubuntu-gcc, os: ubuntu-latest},
{name: ci-ubuntu-clang,os: ubuntu-latest},
{name: ci-windows, os: windows-latest}]
{name: ci-windows, os: windows-latest},
{name: ci-macos, os: macos-latest}]
type: [tests, benchmarks]

timeout-minutes: 20
Expand All @@ -135,9 +140,11 @@ jobs:
- name: Install Qt
if: matrix.type == 'tests'
uses: jurplel/install-qt-action@v3
with:
cache: true

- name: Install deps
if: matrix.config.os != 'windows-latest'
if: matrix.config.os == 'ubuntu-latest'
run: |
sudo apt-get update -q && sudo apt-get install clang-tidy cppcheck libsfml-dev -y -q
pip install pyyaml
Expand Down Expand Up @@ -198,6 +205,8 @@ jobs:

- name: Install Qt
uses: jurplel/install-qt-action@v3
with:
cache: true

- name: Cache SonarCloud packages
uses: actions/cache@v3
Expand Down Expand Up @@ -283,6 +292,8 @@ jobs:

- name: Install Qt
uses: jurplel/install-qt-action@v3
with:
cache: true

- name: Install project and build
env:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/sonarcloud_analyze.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ jobs:

- name: Install Qt
uses: jurplel/install-qt-action@v3
with:
cache: true

- name: Cmake
run: cmake -S . -B build -DRPP_BUILD_TESTS=1 -DRPP_BUILD_QT_CODE=1
Expand Down
6 changes: 5 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ amount_of_clicks

Main advantages of ReactivePlusPlus are that it is written in Modern C++ with Performance and Usage in mind. As a result it is fast, readable, easy to use and well-documented. And it is proven with [continous benchmarking results and comparison with RxCpp](https://victimsnino.github.io/ReactivePlusPlus/benchmark)

**NOTE**: ReactivePlusPlus is library for C++20. So, it works only on compilers that supports most C++20 features. CI uses gcc-10, clang-11, visual studio 2022
**NOTE**: ReactivePlusPlus is library for C++20. So, it works only on compilers that supports most C++20 features. List of minimal supported compilers:
- (ubuntu) gcc-10
- (ubuntu) clang-11
- (windows) visual studio 2022
- (macos) Apple Clang 14

# Useful links
- [Why ReactivePlusPlus? What about existing Reactive Extension libraries for C++?](https://victimsnino.github.io/ReactivePlusPlus/docs/html/why_rpp.html)
Expand Down
10 changes: 4 additions & 6 deletions src/examples/rpp/petri/petri.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ using PetriNet = std::map<Transition, Mutations>;
using Marking = std::map<Place, int>;
typedef void (*const TransitionFunction)(void);

using namespace std::ranges;

void foo() { std::cout << "foo" << std::endl; }
void bar() { std::cout << "bar" << std::endl; }

Expand All @@ -39,16 +37,16 @@ const auto& run_transition(Transition t)
std::pair<Marking, Transitions> mutate_marking(Marking&& marking, const Places& produce)
{
// Produce tokens
for_each(produce, [&marking](const auto& place) { marking[place] += 1; });
std::for_each(produce.cbegin(), produce.cend(), [&marking](const auto& place) { marking[place] += 1; });
// Consume tokens by exeucting enabled transitions
Transitions transitions;
for (const auto& [transition, mutations] : net)
{
const auto& c = mutations.consume;
if (!c.empty() &&
all_of(c, [&](const auto& place) { return marking[place] >= count(c, place); }))
std::all_of(c.cbegin(), c.cend(), [&](const auto& place) { return marking[place] >= std::count(c.cbegin(), c.cend(), place); }))
{
for_each(c, [&marking](const auto& place) { marking[place] -= 1; });
std::for_each(c.cbegin(), c.cend(), [&marking](const auto& place) { marking[place] -= 1; });
transitions.push_back(transition);
}
}
Expand Down Expand Up @@ -78,7 +76,7 @@ int main()
{
Transitions transitions;
std::tie(marking, transitions) = mutate_marking(std::forward<decltype(marking)>(marking), places);
for_each(transitions,
std::for_each(transitions.cbegin(), transitions.cend(),
[dispatcher = transitions_subject.get_subscriber()](const auto& t)
{ dispatcher.on_next(t); });
return std::forward<decltype(marking)>(marking);
Expand Down
2 changes: 1 addition & 1 deletion src/examples/rpp/sfml/snake/snake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static bool is_snake_eat_self(const SnakeBody& body)

static Coordinates update_apple_position_if_eat(Coordinates&& apple_position, const SnakeBody& snake)
{
if (std::ranges::find(snake, apple_position) == snake.cend())
if (std::find(snake.cbegin(), snake.cend(), apple_position) == snake.cend())
return apple_position;

static std::random_device rd;
Expand Down
49 changes: 33 additions & 16 deletions src/rpp/rpp/schedulers/details/queue_worker_state.hpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
// ReactivePlusPlus library
//
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#pragma once

#include <rpp/schedulers/fwd.hpp> // own forwarding
#include <rpp/schedulers/constraints.hpp>
#include <rpp/subscriptions/subscription_guard.hpp>

#include <mutex>
#include <condition_variable>
Expand Down Expand Up @@ -51,7 +52,9 @@ template<typename SchedulableFn>
class queue_worker_state
{
public:
queue_worker_state() noexcept = default;
queue_worker_state() = default;
queue_worker_state(const queue_worker_state&) = delete;
queue_worker_state(queue_worker_state&&) = delete;

void emplace(time_point time_point, constraint::inner_schedulable_fn auto&& fn)
{
Expand Down Expand Up @@ -82,39 +85,45 @@ class queue_worker_state
return true;
}

bool pop_with_wait(std::optional<SchedulableFn>& out, const std::stop_token& token)
bool pop_with_wait(std::optional<SchedulableFn>& out)
{
while (!token.stop_requested())
while (m_subscription->is_subscribed())
{
std::unique_lock lock{m_mutex};

if (!m_cv.wait(lock, token, [&] { return !m_queue.empty(); }))
continue;
m_cv.wait(lock, [&] { return !m_subscription->is_subscribed() || !m_queue.empty(); });

if (!m_cv.wait_until(lock,
token,
m_queue.top().get_time_point(),
std::bind_front(&queue_worker_state<SchedulableFn>::is_any_ready_schedulable_unsafe, this)))
if (m_queue.empty() || !m_cv.wait_until(lock,
m_queue.top().get_time_point(),
[&] { return !m_subscription->is_subscribed() || is_any_ready_schedulable_unsafe(); }))
continue;

if (!m_subscription->is_subscribed())
return false;

out.emplace(std::move(m_queue.top().extract_function()));
m_queue.pop();
return true;
}
return false;
}

void reset()
bool is_subscribed() const
{
std::lock_guard lock{ m_mutex };
m_queue = std::priority_queue<schedulable<SchedulableFn>>{};
return m_subscription->is_subscribed();
}

void unsubscribe()
{
m_subscription->unsubscribe();
}

private:
void emplace_safe(time_point time_point, constraint::inner_schedulable_fn auto&& fn)
{
std::lock_guard lock{ m_mutex };
m_queue.emplace(time_point, ++m_current_id, std::forward<decltype(fn)>(fn));
std::lock_guard lock{m_mutex};
if (m_subscription->is_subscribed())
m_queue.emplace(time_point, ++m_current_id, std::forward<decltype(fn)>(fn));
}

bool is_any_ready_schedulable_unsafe() const
Expand All @@ -127,5 +136,13 @@ class queue_worker_state
std::condition_variable_any m_cv{};
std::priority_queue<schedulable<SchedulableFn>> m_queue{};
size_t m_current_id{};
subscription_guard m_subscription = callback_subscription{[&]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ clang-diagnostic-error ⚠️
no viable conversion from rpp::callback_subscription to rpp::subscription_guard

{
{
std::lock_guard lock{m_mutex};
m_queue = std::priority_queue<schedulable<SchedulableFn>>{};
}
m_cv.notify_one();
}};
};
} // namespace rpp::schedulers::details
24 changes: 14 additions & 10 deletions src/rpp/rpp/schedulers/new_thread_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class new_thread final : public details::scheduler_tag

worker_strategy(const rpp::composite_subscription& sub)
{
if (!sub.is_subscribed())
return;

auto shared = std::make_shared<state>();
// init while it is alive as shared
shared->init_thread(sub);
Expand Down Expand Up @@ -82,18 +85,19 @@ class new_thread final : public details::scheduler_tag

void init_thread(const rpp::composite_subscription& sub)
{
m_thread = std::jthread{[state = shared_from_this()](const std::stop_token& token)
m_thread = std::thread{[state = shared_from_this()]()
{
state->data_thread(token);
state->data_thread();
}};
const auto callback = rpp::callback_subscription{[state = weak_from_this()]
{
auto locked = state.lock();
const auto locked = state.lock();
if (!locked)
return;
locked->m_thread.request_stop();

if (locked->m_thread.get_id() != std::this_thread::get_id())
locked->m_queue.unsubscribe();

if (locked->m_thread.joinable() && locked->m_thread.get_id() != std::this_thread::get_id())
locked->m_thread.join();
else
locked->m_thread.detach();
Expand All @@ -103,24 +107,24 @@ class new_thread final : public details::scheduler_tag
}

private:
void data_thread(const std::stop_token& token)
void data_thread()
{
std::optional<new_thread_schedulable> fn{};
while (!token.stop_requested())
while (m_queue.is_subscribed())
{
if (m_queue.pop_with_wait(fn, token))
if (m_queue.pop_with_wait(fn))
{
(*fn)();
fn.reset();
}
}

// clear
m_queue.reset();
m_queue.unsubscribe();
}

details::queue_worker_state<new_thread_schedulable> m_queue{};
std::jthread m_thread{};
std::thread m_thread{};
rpp::subscription_guard m_sub = rpp::subscription_base::empty();
};

Expand Down
8 changes: 3 additions & 5 deletions src/rpp/rpp/schedulers/run_loop_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,16 @@ class run_loop final : public details::scheduler_tag

~state()
{
m_source.request_stop();
m_queue.unsubscribe();
m_sub.unsubscribe();
}

details::queue_worker_state<run_loop_schedulable>& get_queue() { return m_queue; }
std::stop_token get_token() const { return m_source.get_token(); }


const composite_subscription& get_subscription() const { return m_sub; }
private:
rpp::composite_subscription m_sub;
details::queue_worker_state<run_loop_schedulable> m_queue{};
std::stop_source m_source{};
};

public:
Expand Down Expand Up @@ -126,7 +124,7 @@ class run_loop final : public details::scheduler_tag
void dispatch() const
{
std::optional<run_loop_schedulable> fn{};
if (m_state->get_queue().pop_with_wait(fn, m_state->get_token()))
if (m_state->get_queue().pop_with_wait(fn))
(*fn)();
}

Expand Down
22 changes: 13 additions & 9 deletions src/rpp/rpp/sources/from.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@


#include <array>
#include <ranges>
#include <type_traits>

IMPLEMENTATION_FILE(just_tag);
Expand All @@ -32,10 +31,13 @@ namespace rpp::observable::details
template<typename T>
auto extract_iterable_from_packed(const T & v) -> const auto&
{
if constexpr (std::ranges::range<T>)
return v;
else
return *v;
return v;
}

template<typename T>
auto extract_iterable_from_packed(const std::shared_ptr<T> & v) -> const auto&
{
return *v;
}

void iterate(auto&& iterable,
Expand Down Expand Up @@ -66,7 +68,7 @@ void iterate(auto&& iterable,
const auto end = std::cend(extracted_iterable);
auto itr = std::cbegin(extracted_iterable);

std::ranges::advance(itr, static_cast<int64_t>(index), end);
std::advance(itr, static_cast<int64_t>(index));

if (itr != end)
{
Expand All @@ -90,7 +92,9 @@ void iterate(auto&& iterable,
}
}

template<memory_model memory_model, std::ranges::range Container, typename ...Ts>


template<memory_model memory_model, constraint::iterable Container, typename ...Ts>
auto pack_to_container(Ts&& ...items)
{
if constexpr (memory_model == memory_model::use_stack)
Expand Down Expand Up @@ -215,10 +219,10 @@ auto just(T&& item, Ts&& ...items) requires (rpp::details::is_header_included<rp
* \see https://reactivex.io/documentation/operators/from.html
*/
template<memory_model memory_model /* = memory_model::use_stack */, schedulers::constraint::scheduler TScheduler /* = schedulers::immediate */>
auto from_iterable(std::ranges::range auto&& iterable, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included<rpp::details::from_tag, TScheduler >
auto from_iterable(constraint::iterable auto&& iterable, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included<rpp::details::from_tag, TScheduler >
{
using Container = std::decay_t<decltype(iterable)>;
return create<std::ranges::range_value_t<Container>>(details::iterate_impl{ details::pack_to_container<memory_model, Container>(std::forward<decltype(iterable)>(iterable)), scheduler });
return create<utils::iterable_value_t<Container>>(details::iterate_impl{ details::pack_to_container<memory_model, Container>(std::forward<decltype(iterable)>(iterable)), scheduler });
}

/**
Expand Down
Loading