From c189782e19609cb1f7e678c75e25c2838ece6c14 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 28 May 2024 00:07:22 +0300 Subject: [PATCH 1/7] asio strand scheduler --- BUILDING.md | 1 + CMakePresets.json | 29 ++-- cmake/dependencies.cmake | 10 ++ cmake/install-rules.cmake | 6 + cmake/variables.cmake | 4 + conanfile.py | 9 +- src/extensions/CMakeLists.txt | 1 + src/extensions/rppasio/CMakeLists.txt | 11 ++ src/extensions/rppasio/rppasio/fwd.hpp | 23 +++ src/extensions/rppasio/rppasio/rppasio.hpp | 14 ++ src/extensions/rppasio/rppasio/schedulers.hpp | 20 +++ .../rppasio/rppasio/schedulers/fwd.hpp | 16 ++ .../rppasio/rppasio/schedulers/strand.hpp | 105 +++++++++++++ src/tests/CMakeLists.txt | 8 + src/tests/rppasio/test_rppasio_scheduler.cpp | 138 ++++++++++++++++++ src/tests/utils/rpp_trompeloil.hpp | 4 + 16 files changed, 386 insertions(+), 13 deletions(-) create mode 100644 src/extensions/rppasio/CMakeLists.txt create mode 100644 src/extensions/rppasio/rppasio/fwd.hpp create mode 100644 src/extensions/rppasio/rppasio/rppasio.hpp create mode 100644 src/extensions/rppasio/rppasio/schedulers.hpp create mode 100644 src/extensions/rppasio/rppasio/schedulers/fwd.hpp create mode 100644 src/extensions/rppasio/rppasio/schedulers/strand.hpp create mode 100644 src/tests/rppasio/test_rppasio_scheduler.cpp diff --git a/BUILDING.md b/BUILDING.md index 76f001ec6..23ede557d 100644 --- a/BUILDING.md +++ b/BUILDING.md @@ -28,6 +28,7 @@ But RPP is header-only library, so, without enabling any extra options is just c - `RPP_BUILD_SFML_CODE` - (ON/OFF) build RPP code related to SFML or not (default OFF) - requires SFML to be installed - `RPP_BUILD_QT_CODE` - (ON/OFF) build QT related code (examples/tests)(rppqt module doesn't requires this one) (default OFF) - requires QT5/6 to be installed - `RPP_BUILD_GRPC_CODE` - (ON/OFF) build GRPC related code (examples/tests)(rppgrpc module doesn't requires this one) (default OFF) - requires grpc++/protobuf to be installed +- `RPP_BUILD_ASIO_CODE` - (ON/OFF) build RPPASIO related code (examples/tests)(rppasio module doesn't requires this one) (default OFF) - requires asio to be installed By default, it provides rpp and rppqt INTERFACE modules. diff --git a/CMakePresets.json b/CMakePresets.json index 8000f9b99..149c07bd4 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -145,6 +145,13 @@ "RPP_BUILD_GRPC_CODE" : "ON" } }, + { + "name" : "build-asio", + "hidden": true, + "cacheVariables": { + "RPP_BUILD_ASIO_CODE" : "ON" + } + }, { "name" : "use-conan", "hidden": true, @@ -168,7 +175,7 @@ }, { "name": "ci-coverage-clang", - "inherits": ["ci-build", "build-tests", "build-qt", "build-grpc", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "RPP_ENABLE_COVERAGE": "ON", "CMAKE_CXX_FLAGS": "-O0 -g -fprofile-instr-generate -fcoverage-mapping --coverage", @@ -177,58 +184,58 @@ }, { "name": "ci-sanitize-tsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=thread -g -O1" } }, { "name": "ci-sanitize-asan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=address -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1" } }, { "name": "ci-sanitize-lsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=leak -fno-omit-frame-pointer -g -O1" } }, { "name": "ci-sanitize-msan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=memory -fno-optimize-sibling-calls -fsanitize-memory-track-origins=2 -fno-omit-frame-pointer -g -O2" } }, { "name": "ci-sanitize-ubsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=undefined" } }, { "name": "ci-macos-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix"] }, { "name": "ci-ubuntu-clang-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-clang", "cppcheck", "clang-tidy"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix", "ci-clang", "cppcheck", "clang-tidy"] }, { "name": "ci-ubuntu-gcc-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-gcc", "cppcheck", "clang-tidy"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix", "ci-gcc", "cppcheck", "clang-tidy"] }, { "name": "ci-windows-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-win64"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-win64"] }, { "name": "ci-ubuntu-clang-tests-no-checks", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-clang" ] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix", "ci-clang" ] }, diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index 8bad63ed2..4e957a3e0 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -124,3 +124,13 @@ endif() if (RPP_BUILD_BENCHMARKS) rpp_fetch_library(nanobench https://github.com/martinus/nanobench.git master) endif() + +# ==================== ASIO ===================== +if (RPP_BUILD_ASIO_CODE) + find_package(asio REQUIRED) + + macro(rpp_add_asio_support_to_executable TARGET) + target_link_libraries(${TARGET} PRIVATE asio::asio) + target_compile_definitions(${TARGET} PRIVATE "ASIO_NO_TYPEID") + endmacro() +endif() diff --git a/cmake/install-rules.cmake b/cmake/install-rules.cmake index f38560340..fbc4a425b 100644 --- a/cmake/install-rules.cmake +++ b/cmake/install-rules.cmake @@ -32,6 +32,12 @@ install( INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/rppgrpc" ) +install( + TARGETS rppasio + EXPORT RPPTargets + INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/rppasio" +) + write_basic_package_version_file( "${package}ConfigVersion.cmake" COMPATIBILITY SameMajorVersion diff --git a/cmake/variables.cmake b/cmake/variables.cmake index 93a57a500..297a9933d 100644 --- a/cmake/variables.cmake +++ b/cmake/variables.cmake @@ -83,6 +83,7 @@ endfunction() option(RPP_BUILD_SFML_CODE "Enable SFML support in examples/code." OFF) option(RPP_BUILD_QT_CODE "Enable QT support in examples/code." OFF) option(RPP_BUILD_GRPC_CODE "Enable grpc++ support in examples/code." OFF) +option(RPP_BUILD_ASIO_CODE "Enable ASIO support in examples/code." OFF) if (RPP_DEVELOPER_MODE) option(RPP_BUILD_TESTS "Build unit tests tree." OFF) @@ -104,6 +105,9 @@ if (RPP_DEVELOPER_MODE) if (RPP_BUILD_GRPC_CODE) set(CONAN_ARGS "${CONAN_ARGS};-o rpp/*:with_grpc=True") endif() + if (RPP_BUILD_ASIO_CODE) + set(CONAN_ARGS "${CONAN_ARGS};-o rpp/*:with_asio=True") + endif() endif() if(RPP_ENABLE_COVERAGE) diff --git a/conanfile.py b/conanfile.py index 355f386da..a65b223f4 100644 --- a/conanfile.py +++ b/conanfile.py @@ -11,14 +11,16 @@ class RppConan(ConanFile): "with_sfml" : [False, True], "with_tests" : [False, True], "with_cmake" : [False, True], - "with_benchmarks" : [False, True] + "with_benchmarks" : [False, True], + "with_asio" : [False, True] } default_options = { "with_grpc" : False, "with_sfml" : False, "with_tests": False, "with_cmake": False, - "with_benchmarks" : False + "with_benchmarks" : False, + "with_asio" : False } def requirements(self): @@ -37,5 +39,8 @@ def requirements(self): self.requires("protobuf/3.21.12") self.requires("libmount/2.39", override=True) + if self.options.with_asio: + self.requires("asio/1.30.2") + if self.options.with_cmake: self.tool_requires("cmake/3.29.3") diff --git a/src/extensions/CMakeLists.txt b/src/extensions/CMakeLists.txt index 397ebb5b2..2c1afc7c7 100644 --- a/src/extensions/CMakeLists.txt +++ b/src/extensions/CMakeLists.txt @@ -1,2 +1,3 @@ add_subdirectory(rppqt) add_subdirectory(rppgrpc) +add_subdirectory(rppasio) diff --git a/src/extensions/rppasio/CMakeLists.txt b/src/extensions/rppasio/CMakeLists.txt new file mode 100644 index 000000000..f7fd97401 --- /dev/null +++ b/src/extensions/rppasio/CMakeLists.txt @@ -0,0 +1,11 @@ +# ReactivePlusPlus library +# +# Copyright Aleksey Loginov 2022 - present. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# https://www.boost.org/LICENSE_1_0.txt) +# +# Project home: https://github.com/victimsnino/ReactivePlusPlus +# + +rpp_add_library(rppasio) diff --git a/src/extensions/rppasio/rppasio/fwd.hpp b/src/extensions/rppasio/rppasio/fwd.hpp new file mode 100644 index 000000000..047846684 --- /dev/null +++ b/src/extensions/rppasio/rppasio/fwd.hpp @@ -0,0 +1,23 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +/** + * @defgroup rppasio RPPASIO + * @brief RppAsio is extension of RPP which enables support of boost-asio library. + */ + + +/** + * @defgroup asio_schedulers Asio Schedulers + * @ingroup rppasio + */ +#include diff --git a/src/extensions/rppasio/rppasio/rppasio.hpp b/src/extensions/rppasio/rppasio/rppasio.hpp new file mode 100644 index 000000000..92d130473 --- /dev/null +++ b/src/extensions/rppasio/rppasio/rppasio.hpp @@ -0,0 +1,14 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include diff --git a/src/extensions/rppasio/rppasio/schedulers.hpp b/src/extensions/rppasio/rppasio/schedulers.hpp new file mode 100644 index 000000000..71838075b --- /dev/null +++ b/src/extensions/rppasio/rppasio/schedulers.hpp @@ -0,0 +1,20 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +/** + * @defgroup asio_schedulers Asio Schedulers + * @brief Scheduler is the way to introduce multi-threading in your application via RPP + * @see https://reactivex.io/documentation/scheduler.html + * @ingroup rppasio + */ + + #include \ No newline at end of file diff --git a/src/extensions/rppasio/rppasio/schedulers/fwd.hpp b/src/extensions/rppasio/rppasio/schedulers/fwd.hpp new file mode 100644 index 000000000..41e24d8a3 --- /dev/null +++ b/src/extensions/rppasio/rppasio/schedulers/fwd.hpp @@ -0,0 +1,16 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +namespace rppasio::schedulers +{ + class strand; +} // namespace rppasio::schedulers diff --git a/src/extensions/rppasio/rppasio/schedulers/strand.hpp b/src/extensions/rppasio/rppasio/schedulers/strand.hpp new file mode 100644 index 000000000..0c487182e --- /dev/null +++ b/src/extensions/rppasio/rppasio/schedulers/strand.hpp @@ -0,0 +1,105 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include + +#include +#include +#include + +namespace rppasio::schedulers +{ + /** + * @brief TODO + * @ingroup asio_schedulers + */ + class strand + { + private: + class state : public std::enable_shared_from_this + { + public: + state(const asio::io_context::executor_type& executor) + : m_strand{executor.context()} + { + } + + template Fn> + void defer_for(rpp::schedulers::duration duration, Fn&& fn, Handler&& handler, Args&&... args) const + { + if (handler.is_disposed()) + return; + + if (duration == rpp::schedulers::duration::zero()) + { + asio::post(asio::bind_executor(m_strand, [self = this->shared_from_this(), fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)]() mutable { + if (handler.is_disposed()) + return; + + if (const auto new_duration = fn(handler, args...)) + self->defer_for(new_duration->value, std::forward(fn), std::forward(handler), std::forward(args)...); + })); + } + else + { + auto timer = std::make_shared>(m_strand.context(), duration); + timer->async_wait(asio::bind_executor(m_strand, [self = this->shared_from_this(), timer = std::move(timer), fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)](const asio::error_code& ec) mutable { + if (ec || handler.is_disposed()) + return; + + + if (const auto new_duration = fn(handler, args...)) + self->defer_for(new_duration->value, std::forward(fn), std::forward(handler), std::forward(args)...); + })); + } + } + + private: + asio::io_context::strand m_strand; + }; + + class worker_strategy + { + public: + explicit worker_strategy(const asio::io_context::executor_type& executor) + : m_state{std::make_shared(executor)} + { + } + + template Fn> + void defer_for(rpp::schedulers::duration duration, Fn&& fn, Handler&& handler, Args&&... args) const + { + m_state->defer_for(duration, std::forward(fn), std::forward(handler), std::forward(args)...); + } + + static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } + static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); } + + private: + std::shared_ptr m_state; + }; + + public: + explicit strand(asio::io_context::executor_type executor) + : m_executor{std::move(executor)} + { + } + + auto create_worker() const + { + return rpp::schedulers::worker{m_executor}; + } + + asio::io_context::executor_type m_executor; + }; +} // namespace rppasio::schedulers \ No newline at end of file diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index fb39b2cfe..a1a7210e5 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -28,6 +28,10 @@ macro(add_test_target target_name module files) target_link_libraries(${TARGET} PRIVATE rppgrpc_tests_proto) endif() + if (${module} STREQUAL rppasio) + rpp_add_asio_support_to_executable(${TARGET}) + endif() + target_compile_features(${TARGET} PRIVATE cxx_std_20) if(MSVC) @@ -59,3 +63,7 @@ if (RPP_BUILD_GRPC_CODE) rpp_add_proto_target(rppgrpc_tests_proto rppgrpc/proto.proto) rpp_register_tests(rppgrpc) endif() + +if (RPP_BUILD_ASIO_CODE) + rpp_register_tests(rppasio) +endif() \ No newline at end of file diff --git a/src/tests/rppasio/test_rppasio_scheduler.cpp b/src/tests/rppasio/test_rppasio_scheduler.cpp new file mode 100644 index 000000000..ea888b1ed --- /dev/null +++ b/src/tests/rppasio/test_rppasio_scheduler.cpp @@ -0,0 +1,138 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "rpp_trompeloil.hpp" + +#include + +using namespace std::chrono_literals; + +namespace +{ + void drain(const auto& disposable, asio::io_context& context) + { + while (!disposable.is_disposed()) + { + context.run_one(); + } + } +} // namespace + +TEST_CASE("strand utilized current_thread") +{ + asio::io_context context; + mock_observer mock; + + { + auto worker = rppasio::schedulers::strand{context.get_executor()}.create_worker(); + auto obs = mock.get_observer().as_dynamic(); + + worker.schedule([](const auto& obs) { + // As a strand can be executed on different threads it doesn't share queue with current_thread + bool inner_schedule_executed = false; + rpp::schedulers::current_thread::create_worker().schedule([&inner_schedule_executed](const auto&) { + inner_schedule_executed = true; + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + + CHECK(inner_schedule_executed); + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + } + + context.run_one(); +} + +TEST_CASE("strand works till end") +{ + asio::io_context context; + mock_observer mock; + trompeloeil::sequence seq; + + auto disposable = rpp::source::just(1, 2, 3) + | rpp::operators::subscribe_on(rppasio::schedulers::strand{context.get_executor()}) + | rpp::operators::subscribe_with_disposable(mock.get_observer()); + + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(2)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + drain(disposable, context); +} + + +TEST_CASE("strand in combination with current_thread") +{ + asio::io_context context; + mock_observer mock; + trompeloeil::sequence seq; + + auto scheduler = rppasio::schedulers::strand{context.get_executor()}; + auto disposable = rpp::source::just(scheduler, 1, 2, 3) + | rpp::ops::flat_map([](int v) { + return rpp::source::just(rpp::schedulers::current_thread{}, v); + }) + | rpp::ops::subscribe_with_disposable(mock.get_observer()); + + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(2)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + drain(disposable, context); +} + +TEST_CASE("strand worker has correct execution order") +{ + asio::io_context context; + auto obs = mock_observer_strategy{}.get_observer().as_dynamic(); + bool has_run_first_task = false; + bool has_run_second_task = false; + + auto worker = rppasio::schedulers::strand{context.get_executor()}.create_worker(); + + worker.schedule( + 50ms, + [&](const auto&) { + has_run_first_task = true; + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + + worker.schedule( + 5ms, + [&](const auto&) { + has_run_second_task = true; + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + + context.run_one(); + CHECK_FALSE(has_run_first_task); + CHECK(has_run_second_task); + + context.run_one(); + CHECK(has_run_first_task); +} \ No newline at end of file diff --git a/src/tests/utils/rpp_trompeloil.hpp b/src/tests/utils/rpp_trompeloil.hpp index e8508899c..44a01a3d4 100644 --- a/src/tests/utils/rpp_trompeloil.hpp +++ b/src/tests/utils/rpp_trompeloil.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -66,6 +67,9 @@ class mock_observer static bool is_disposed() noexcept { return false; } static void set_upstream(const rpp::disposable_wrapper&) noexcept {} + auto get_observer() const { return rpp::observer>{*this}; } + auto get_observer(rpp::composite_disposable_wrapper d) const { return rpp::observer_with_disposable>{std::move(d), *this}; } + private: std::shared_ptr m_impl = std::make_shared(); }; From f60974eafa1b433cd5318187e774dd58f09e81d6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 29 Aug 2024 22:08:42 +0000 Subject: [PATCH 2/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/extensions/rppasio/rppasio/schedulers.hpp | 2 +- src/extensions/rppasio/rppasio/schedulers/strand.hpp | 2 +- src/tests/CMakeLists.txt | 2 +- src/tests/rppasio/test_rppasio_scheduler.cpp | 2 +- src/tests/utils/rpp_trompeloil.hpp | 3 ++- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/extensions/rppasio/rppasio/schedulers.hpp b/src/extensions/rppasio/rppasio/schedulers.hpp index 71838075b..52db0686d 100644 --- a/src/extensions/rppasio/rppasio/schedulers.hpp +++ b/src/extensions/rppasio/rppasio/schedulers.hpp @@ -17,4 +17,4 @@ * @ingroup rppasio */ - #include \ No newline at end of file +#include diff --git a/src/extensions/rppasio/rppasio/schedulers/strand.hpp b/src/extensions/rppasio/rppasio/schedulers/strand.hpp index 0c487182e..60ff89054 100644 --- a/src/extensions/rppasio/rppasio/schedulers/strand.hpp +++ b/src/extensions/rppasio/rppasio/schedulers/strand.hpp @@ -102,4 +102,4 @@ namespace rppasio::schedulers asio::io_context::executor_type m_executor; }; -} // namespace rppasio::schedulers \ No newline at end of file +} // namespace rppasio::schedulers diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index a1a7210e5..97b750c67 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -66,4 +66,4 @@ endif() if (RPP_BUILD_ASIO_CODE) rpp_register_tests(rppasio) -endif() \ No newline at end of file +endif() diff --git a/src/tests/rppasio/test_rppasio_scheduler.cpp b/src/tests/rppasio/test_rppasio_scheduler.cpp index ea888b1ed..994ae1479 100644 --- a/src/tests/rppasio/test_rppasio_scheduler.cpp +++ b/src/tests/rppasio/test_rppasio_scheduler.cpp @@ -135,4 +135,4 @@ TEST_CASE("strand worker has correct execution order") context.run_one(); CHECK(has_run_first_task); -} \ No newline at end of file +} diff --git a/src/tests/utils/rpp_trompeloil.hpp b/src/tests/utils/rpp_trompeloil.hpp index 44a01a3d4..f2fb3b7f1 100644 --- a/src/tests/utils/rpp_trompeloil.hpp +++ b/src/tests/utils/rpp_trompeloil.hpp @@ -3,9 +3,10 @@ #include #include -#include #include +#include + #include #include From fc3c93ca90375377ba9eab1a85cb1867b16ed371 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sun, 1 Sep 2024 20:08:18 +0200 Subject: [PATCH 3/7] Address comments --- BUILDING.md | 6 +++--- .../rppasio/rppasio/schedulers/strand.hpp | 14 +++++++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/BUILDING.md b/BUILDING.md index 23ede557d..4b661b3b8 100644 --- a/BUILDING.md +++ b/BUILDING.md @@ -26,9 +26,9 @@ But RPP is header-only library, so, without enabling any extra options is just c - `RPP_BUILD_TESTS` - (ON/OFF) build unit tests (default OFF) - `RPP_BUILD_EXAMPLES` - (ON/OFF) build examples of usage of RPP (default OFF) - `RPP_BUILD_SFML_CODE` - (ON/OFF) build RPP code related to SFML or not (default OFF) - requires SFML to be installed -- `RPP_BUILD_QT_CODE` - (ON/OFF) build QT related code (examples/tests)(rppqt module doesn't requires this one) (default OFF) - requires QT5/6 to be installed -- `RPP_BUILD_GRPC_CODE` - (ON/OFF) build GRPC related code (examples/tests)(rppgrpc module doesn't requires this one) (default OFF) - requires grpc++/protobuf to be installed -- `RPP_BUILD_ASIO_CODE` - (ON/OFF) build RPPASIO related code (examples/tests)(rppasio module doesn't requires this one) (default OFF) - requires asio to be installed +- `RPP_BUILD_QT_CODE` - (ON/OFF) build QT related code (examples/tests)(rppqt module doesn't require this one) (default OFF) - requires QT5/6 to be installed +- `RPP_BUILD_GRPC_CODE` - (ON/OFF) build GRPC related code (examples/tests)(rppgrpc module doesn't require this one) (default OFF) - requires grpc++/protobuf to be installed +- `RPP_BUILD_ASIO_CODE` - (ON/OFF) build RPPASIO related code (examples/tests)(rppasio module doesn't require this one) (default OFF) - requires asio to be installed By default, it provides rpp and rppqt INTERFACE modules. diff --git a/src/extensions/rppasio/rppasio/schedulers/strand.hpp b/src/extensions/rppasio/rppasio/schedulers/strand.hpp index 60ff89054..a9db46f29 100644 --- a/src/extensions/rppasio/rppasio/schedulers/strand.hpp +++ b/src/extensions/rppasio/rppasio/schedulers/strand.hpp @@ -20,7 +20,12 @@ namespace rppasio::schedulers { /** - * @brief TODO + * @brief Asio based scheduler where each worker is assigned an asio `strand` to execute schedulables with the + * guarantee that none of those `schedulables` will execute concurrently. + * @details This scheduler can efficiently enable multi-threading execution when running provided io_context + * in multiple threads. Compared to the `thread_pool` scheduler, a worker is not pinned to a single thread and + * `schedulables` are instead dynamically dispatched to potentially different io_context threads depending + * on load. * @ingroup asio_schedulers */ class strand @@ -47,19 +52,18 @@ namespace rppasio::schedulers return; if (const auto new_duration = fn(handler, args...)) - self->defer_for(new_duration->value, std::forward(fn), std::forward(handler), std::forward(args)...); + self->defer_for(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); })); } else { auto timer = std::make_shared>(m_strand.context(), duration); - timer->async_wait(asio::bind_executor(m_strand, [self = this->shared_from_this(), timer = std::move(timer), fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)](const asio::error_code& ec) mutable { + timer->async_wait(asio::bind_executor(m_strand, [self = this->shared_from_this(), timer, fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)](const asio::error_code& ec) mutable { if (ec || handler.is_disposed()) return; - if (const auto new_duration = fn(handler, args...)) - self->defer_for(new_duration->value, std::forward(fn), std::forward(handler), std::forward(args)...); + self->defer_for(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); })); } } From b64f6642f8b0fb787e968ffaa32bff08aa7117f9 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Thu, 5 Sep 2024 10:50:07 +0200 Subject: [PATCH 4/7] Handle current_thread queue --- .../rppasio/rppasio/schedulers/strand.hpp | 98 +++++++++++++++++-- src/rpp/rpp/schedulers/current_thread.hpp | 1 + src/rpp/rpp/schedulers/details/queue.hpp | 4 + src/tests/rppasio/test_rppasio_scheduler.cpp | 37 ++++++- 4 files changed, 129 insertions(+), 11 deletions(-) diff --git a/src/extensions/rppasio/rppasio/schedulers/strand.hpp b/src/extensions/rppasio/rppasio/schedulers/strand.hpp index a9db46f29..ca0a482f0 100644 --- a/src/extensions/rppasio/rppasio/schedulers/strand.hpp +++ b/src/extensions/rppasio/rppasio/schedulers/strand.hpp @@ -31,10 +31,12 @@ namespace rppasio::schedulers class strand { private: - class state : public std::enable_shared_from_this + class state_t : public std::enable_shared_from_this { + class current_thread_queue_guard; + public: - state(const asio::io_context::executor_type& executor) + state_t(const asio::io_context::executor_type& executor) : m_strand{executor.context()} { } @@ -51,6 +53,7 @@ namespace rppasio::schedulers if (handler.is_disposed()) return; + current_thread_queue_guard guard{*self}; if (const auto new_duration = fn(handler, args...)) self->defer_for(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); })); @@ -62,12 +65,96 @@ namespace rppasio::schedulers if (ec || handler.is_disposed()) return; + current_thread_queue_guard guard{*self}; if (const auto new_duration = fn(handler, args...)) self->defer_for(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); })); } } + template Fn> + void defer_to(rpp::schedulers::time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const + { + if (handler.is_disposed()) + return; + + auto timer = std::make_shared>(m_strand.context(), tp); + timer->async_wait(asio::bind_executor(m_strand, [self = this->shared_from_this(), timer, fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)](const asio::error_code& ec) mutable { + if (ec || handler.is_disposed()) + return; + + current_thread_queue_guard guard{*self}; + if (const auto new_duration = fn(handler, args...)) + self->defer_to(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); + })); + } + + private: + // Guard draining schedulables queued to thread local queue to schedule them back to strand queue + class current_thread_queue_guard + { + public: + current_thread_queue_guard(const state_t& state) + : m_process_on_destruction{!rpp::schedulers::current_thread::get_queue()} + , m_state(state) + { + if (m_process_on_destruction) + rpp::schedulers::current_thread::get_queue() = &m_queue; + } + ~current_thread_queue_guard() + { + if (m_process_on_destruction) + process_queue(); + } + current_thread_queue_guard(const current_thread_queue_guard&) = delete; + current_thread_queue_guard(current_thread_queue_guard&&) = delete; + + private: + struct handler + { + bool is_disposed() const noexcept + { + return m_schedulable->is_disposed(); + } + + void on_error(const std::exception_ptr& ep) const + { + m_schedulable->on_error(ep); + } + + std::shared_ptr m_schedulable; + }; + + private: + void process_queue() + { + while (!m_queue.is_empty()) + { + const auto top = m_queue.pop(); + if (top->is_disposed()) + continue; + + m_state.defer_to( + top->get_timepoint(), + [top](const auto&) -> rpp::schedulers::optional_delay_to { + if (const auto advanced_call = top->make_advanced_call()) + { + top->set_timepoint(worker_strategy::now()); + return rpp::schedulers::delay_to{top->handle_advanced_call(*advanced_call)}; + } + return std::nullopt; + }, + handler{top}); + } + rpp::schedulers::current_thread::get_queue() = nullptr; + } + + private: + rpp::schedulers::details::schedulables_queue m_queue; + bool m_process_on_destruction; + const state_t& m_state; + }; + private: asio::io_context::strand m_strand; }; @@ -76,7 +163,7 @@ namespace rppasio::schedulers { public: explicit worker_strategy(const asio::io_context::executor_type& executor) - : m_state{std::make_shared(executor)} + : m_state{std::make_shared(executor)} { } @@ -86,11 +173,10 @@ namespace rppasio::schedulers m_state->defer_for(duration, std::forward(fn), std::forward(handler), std::forward(args)...); } - static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } - static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); } + static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); } private: - std::shared_ptr m_state; + std::shared_ptr m_state; }; public: diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index b0d60c43e..f803214c8 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -84,6 +84,7 @@ namespace rpp::schedulers */ class current_thread { + public: friend class new_thread; class worker_strategy; diff --git a/src/rpp/rpp/schedulers/details/queue.hpp b/src/rpp/rpp/schedulers/details/queue.hpp index 4bd73b526..d2578e417 100644 --- a/src/rpp/rpp/schedulers/details/queue.hpp +++ b/src/rpp/rpp/schedulers/details/queue.hpp @@ -75,6 +75,8 @@ namespace rpp::schedulers::details virtual bool is_disposed() const noexcept = 0; + virtual void on_error(const std::exception_ptr& ep) const = 0; + time_point get_timepoint() const { return m_time_point; } void set_timepoint(const time_point& timepoint) { m_time_point = timepoint; } @@ -159,6 +161,8 @@ namespace rpp::schedulers::details bool is_disposed() const noexcept override { return m_args.template get<0>().is_disposed(); } + void on_error(const std::exception_ptr& ep) const override { m_args.template get<0>().on_error(ep); } + private: RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple m_args; RPP_NO_UNIQUE_ADDRESS Fn m_fn; diff --git a/src/tests/rppasio/test_rppasio_scheduler.cpp b/src/tests/rppasio/test_rppasio_scheduler.cpp index 994ae1479..e54877f20 100644 --- a/src/tests/rppasio/test_rppasio_scheduler.cpp +++ b/src/tests/rppasio/test_rppasio_scheduler.cpp @@ -14,7 +14,9 @@ #include #include #include +#include #include +#include #include #include @@ -37,31 +39,56 @@ namespace } } // namespace -TEST_CASE("strand utilized current_thread") +TEST_CASE("strand shares current_thread queue") { asio::io_context context; mock_observer mock; + bool inner_schedule_executed = false; { auto worker = rppasio::schedulers::strand{context.get_executor()}.create_worker(); auto obs = mock.get_observer().as_dynamic(); - worker.schedule([](const auto& obs) { - // As a strand can be executed on different threads it doesn't share queue with current_thread - bool inner_schedule_executed = false; + worker.schedule([&](const auto& obs) { + // Strand and current_thread share same queue that can be potentially executed in different threads rpp::schedulers::current_thread::create_worker().schedule([&inner_schedule_executed](const auto&) { inner_schedule_executed = true; return rpp::schedulers::optional_delay_from_now{}; }, obs); - CHECK(inner_schedule_executed); + CHECK_FALSE(inner_schedule_executed); return rpp::schedulers::optional_delay_from_now{}; }, obs); } context.run_one(); + context.run_one(); + + CHECK(inner_schedule_executed); +} + +TEST_CASE("strand drains current_thread queue") +{ + asio::io_context context; + mock_observer mock; + trompeloeil::sequence seq; + + auto disposable = rpp::source::just(rppasio::schedulers::strand{context.get_executor()}, 1, 2, 3) + | rpp::ops::flat_map([](int v) { + return rpp::source::interval(10ms, rpp::schedulers::current_thread{}) + | rpp::ops::map([v](long) { return v; }); + }) + | rpp::ops::take(3) + | rpp::ops::subscribe_with_disposable(mock.get_observer()); + + REQUIRE_CALL(*mock, on_next_rvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_rvalue(2)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_rvalue(3)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + drain(disposable, context); } TEST_CASE("strand works till end") From 5906592867e117e33f69a09fbe4c0e5411b9ad13 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Thu, 5 Sep 2024 11:08:00 +0200 Subject: [PATCH 5/7] Remove duplication --- .../rppasio/rppasio/schedulers/strand.hpp | 56 +++++++++---------- 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/src/extensions/rppasio/rppasio/schedulers/strand.hpp b/src/extensions/rppasio/rppasio/schedulers/strand.hpp index ca0a482f0..f6f3adb63 100644 --- a/src/extensions/rppasio/rppasio/schedulers/strand.hpp +++ b/src/extensions/rppasio/rppasio/schedulers/strand.hpp @@ -41,51 +41,36 @@ namespace rppasio::schedulers { } - template Fn> - void defer_for(rpp::schedulers::duration duration, Fn&& fn, Handler&& handler, Args&&... args) const + template + void defer(Fn&& fn, Handler&& handler, Args&&... args) const { if (handler.is_disposed()) return; - if (duration == rpp::schedulers::duration::zero()) - { - asio::post(asio::bind_executor(m_strand, [self = this->shared_from_this(), fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)]() mutable { - if (handler.is_disposed()) - return; - - current_thread_queue_guard guard{*self}; - if (const auto new_duration = fn(handler, args...)) - self->defer_for(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); - })); - } - else - { - auto timer = std::make_shared>(m_strand.context(), duration); - timer->async_wait(asio::bind_executor(m_strand, [self = this->shared_from_this(), timer, fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)](const asio::error_code& ec) mutable { - if (ec || handler.is_disposed()) - return; - - current_thread_queue_guard guard{*self}; - if (const auto new_duration = fn(handler, args...)) - self->defer_for(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); - })); - } + asio::post(asio::bind_executor(m_strand, [self = this->shared_from_this(), fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)]() mutable { + if (handler.is_disposed()) + return; + + current_thread_queue_guard guard{*self}; + if (const auto new_duration = fn(handler, args...)) + self->defer_with_time(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); + })); } - template Fn> - void defer_to(rpp::schedulers::time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const + template + void defer_with_time(Time time, Fn&& fn, Handler&& handler, Args&&... args) const { if (handler.is_disposed()) return; - auto timer = std::make_shared>(m_strand.context(), tp); + auto timer = std::make_shared>(m_strand.context(), time); timer->async_wait(asio::bind_executor(m_strand, [self = this->shared_from_this(), timer, fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)](const asio::error_code& ec) mutable { if (ec || handler.is_disposed()) return; current_thread_queue_guard guard{*self}; if (const auto new_duration = fn(handler, args...)) - self->defer_to(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); + self->defer_with_time(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); })); } @@ -134,7 +119,7 @@ namespace rppasio::schedulers if (top->is_disposed()) continue; - m_state.defer_to( + m_state.defer_with_time( top->get_timepoint(), [top](const auto&) -> rpp::schedulers::optional_delay_to { if (const auto advanced_call = top->make_advanced_call()) @@ -170,7 +155,16 @@ namespace rppasio::schedulers template Fn> void defer_for(rpp::schedulers::duration duration, Fn&& fn, Handler&& handler, Args&&... args) const { - m_state->defer_for(duration, std::forward(fn), std::forward(handler), std::forward(args)...); + if (duration == rpp::schedulers::duration::zero()) + m_state->defer(std::forward(fn), std::forward(handler), std::forward(args)...); + else + m_state->defer_with_time(duration, std::forward(fn), std::forward(handler), std::forward(args)...); + } + + template Fn> + void defer_to(rpp::schedulers::time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const + { + m_state->defer_with_time(tp, std::forward(fn), std::forward(handler), std::forward(args)...); } static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); } From 32570e37445dc4dcbfde189cca6623ec37e65505 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Fri, 6 Sep 2024 21:09:46 +0200 Subject: [PATCH 6/7] Address comment --- src/extensions/rppasio/rppasio/schedulers/strand.hpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/extensions/rppasio/rppasio/schedulers/strand.hpp b/src/extensions/rppasio/rppasio/schedulers/strand.hpp index f6f3adb63..feb6ad875 100644 --- a/src/extensions/rppasio/rppasio/schedulers/strand.hpp +++ b/src/extensions/rppasio/rppasio/schedulers/strand.hpp @@ -124,8 +124,9 @@ namespace rppasio::schedulers [top](const auto&) -> rpp::schedulers::optional_delay_to { if (const auto advanced_call = top->make_advanced_call()) { - top->set_timepoint(worker_strategy::now()); - return rpp::schedulers::delay_to{top->handle_advanced_call(*advanced_call)}; + const auto tp = top->handle_advanced_call(*advanced_call); + top->set_timepoint(tp); + return rpp::schedulers::delay_to{tp}; } return std::nullopt; }, From c60a8c8314dce0446295bff0379aa9730a03c739 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sat, 7 Sep 2024 11:59:01 +0200 Subject: [PATCH 7/7] Fix build --- src/tests/rppasio/test_rppasio_scheduler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/rppasio/test_rppasio_scheduler.cpp b/src/tests/rppasio/test_rppasio_scheduler.cpp index e54877f20..87df9aa6a 100644 --- a/src/tests/rppasio/test_rppasio_scheduler.cpp +++ b/src/tests/rppasio/test_rppasio_scheduler.cpp @@ -78,7 +78,7 @@ TEST_CASE("strand drains current_thread queue") auto disposable = rpp::source::just(rppasio::schedulers::strand{context.get_executor()}, 1, 2, 3) | rpp::ops::flat_map([](int v) { return rpp::source::interval(10ms, rpp::schedulers::current_thread{}) - | rpp::ops::map([v](long) { return v; }); + | rpp::ops::map([v](size_t) { return v; }); }) | rpp::ops::take(3) | rpp::ops::subscribe_with_disposable(mock.get_observer());