diff --git a/BUILDING.md b/BUILDING.md index 76f001ec6..4b661b3b8 100644 --- a/BUILDING.md +++ b/BUILDING.md @@ -26,8 +26,9 @@ But RPP is header-only library, so, without enabling any extra options is just c - `RPP_BUILD_TESTS` - (ON/OFF) build unit tests (default OFF) - `RPP_BUILD_EXAMPLES` - (ON/OFF) build examples of usage of RPP (default OFF) - `RPP_BUILD_SFML_CODE` - (ON/OFF) build RPP code related to SFML or not (default OFF) - requires SFML to be installed -- `RPP_BUILD_QT_CODE` - (ON/OFF) build QT related code (examples/tests)(rppqt module doesn't requires this one) (default OFF) - requires QT5/6 to be installed -- `RPP_BUILD_GRPC_CODE` - (ON/OFF) build GRPC related code (examples/tests)(rppgrpc module doesn't requires this one) (default OFF) - requires grpc++/protobuf to be installed +- `RPP_BUILD_QT_CODE` - (ON/OFF) build QT related code (examples/tests)(rppqt module doesn't require this one) (default OFF) - requires QT5/6 to be installed +- `RPP_BUILD_GRPC_CODE` - (ON/OFF) build GRPC related code (examples/tests)(rppgrpc module doesn't require this one) (default OFF) - requires grpc++/protobuf to be installed +- `RPP_BUILD_ASIO_CODE` - (ON/OFF) build RPPASIO related code (examples/tests)(rppasio module doesn't require this one) (default OFF) - requires asio to be installed By default, it provides rpp and rppqt INTERFACE modules. diff --git a/CMakePresets.json b/CMakePresets.json index 8000f9b99..149c07bd4 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -145,6 +145,13 @@ "RPP_BUILD_GRPC_CODE" : "ON" } }, + { + "name" : "build-asio", + "hidden": true, + "cacheVariables": { + "RPP_BUILD_ASIO_CODE" : "ON" + } + }, { "name" : "use-conan", "hidden": true, @@ -168,7 +175,7 @@ }, { "name": "ci-coverage-clang", - "inherits": ["ci-build", "build-tests", "build-qt", "build-grpc", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "RPP_ENABLE_COVERAGE": "ON", "CMAKE_CXX_FLAGS": "-O0 -g -fprofile-instr-generate -fcoverage-mapping --coverage", @@ -177,58 +184,58 @@ }, { "name": "ci-sanitize-tsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=thread -g -O1" } }, { "name": "ci-sanitize-asan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=address -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1" } }, { "name": "ci-sanitize-lsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=leak -fno-omit-frame-pointer -g -O1" } }, { "name": "ci-sanitize-msan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=memory -fno-optimize-sibling-calls -fsanitize-memory-track-origins=2 -fno-omit-frame-pointer -g -O2" } }, { "name": "ci-sanitize-ubsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=undefined" } }, { "name": "ci-macos-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix"] }, { "name": "ci-ubuntu-clang-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-clang", "cppcheck", "clang-tidy"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix", "ci-clang", "cppcheck", "clang-tidy"] }, { "name": "ci-ubuntu-gcc-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-gcc", "cppcheck", "clang-tidy"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix", "ci-gcc", "cppcheck", "clang-tidy"] }, { "name": "ci-windows-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-win64"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-win64"] }, { "name": "ci-ubuntu-clang-tests-no-checks", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-clang" ] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix", "ci-clang" ] }, diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index 8bad63ed2..4e957a3e0 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -124,3 +124,13 @@ endif() if (RPP_BUILD_BENCHMARKS) rpp_fetch_library(nanobench https://github.com/martinus/nanobench.git master) endif() + +# ==================== ASIO ===================== +if (RPP_BUILD_ASIO_CODE) + find_package(asio REQUIRED) + + macro(rpp_add_asio_support_to_executable TARGET) + target_link_libraries(${TARGET} PRIVATE asio::asio) + target_compile_definitions(${TARGET} PRIVATE "ASIO_NO_TYPEID") + endmacro() +endif() diff --git a/cmake/install-rules.cmake b/cmake/install-rules.cmake index f38560340..fbc4a425b 100644 --- a/cmake/install-rules.cmake +++ b/cmake/install-rules.cmake @@ -32,6 +32,12 @@ install( INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/rppgrpc" ) +install( + TARGETS rppasio + EXPORT RPPTargets + INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/rppasio" +) + write_basic_package_version_file( "${package}ConfigVersion.cmake" COMPATIBILITY SameMajorVersion diff --git a/cmake/variables.cmake b/cmake/variables.cmake index 93a57a500..297a9933d 100644 --- a/cmake/variables.cmake +++ b/cmake/variables.cmake @@ -83,6 +83,7 @@ endfunction() option(RPP_BUILD_SFML_CODE "Enable SFML support in examples/code." OFF) option(RPP_BUILD_QT_CODE "Enable QT support in examples/code." OFF) option(RPP_BUILD_GRPC_CODE "Enable grpc++ support in examples/code." OFF) +option(RPP_BUILD_ASIO_CODE "Enable ASIO support in examples/code." OFF) if (RPP_DEVELOPER_MODE) option(RPP_BUILD_TESTS "Build unit tests tree." OFF) @@ -104,6 +105,9 @@ if (RPP_DEVELOPER_MODE) if (RPP_BUILD_GRPC_CODE) set(CONAN_ARGS "${CONAN_ARGS};-o rpp/*:with_grpc=True") endif() + if (RPP_BUILD_ASIO_CODE) + set(CONAN_ARGS "${CONAN_ARGS};-o rpp/*:with_asio=True") + endif() endif() if(RPP_ENABLE_COVERAGE) diff --git a/conanfile.py b/conanfile.py index 355f386da..a65b223f4 100644 --- a/conanfile.py +++ b/conanfile.py @@ -11,14 +11,16 @@ class RppConan(ConanFile): "with_sfml" : [False, True], "with_tests" : [False, True], "with_cmake" : [False, True], - "with_benchmarks" : [False, True] + "with_benchmarks" : [False, True], + "with_asio" : [False, True] } default_options = { "with_grpc" : False, "with_sfml" : False, "with_tests": False, "with_cmake": False, - "with_benchmarks" : False + "with_benchmarks" : False, + "with_asio" : False } def requirements(self): @@ -37,5 +39,8 @@ def requirements(self): self.requires("protobuf/3.21.12") self.requires("libmount/2.39", override=True) + if self.options.with_asio: + self.requires("asio/1.30.2") + if self.options.with_cmake: self.tool_requires("cmake/3.29.3") diff --git a/src/extensions/CMakeLists.txt b/src/extensions/CMakeLists.txt index 397ebb5b2..2c1afc7c7 100644 --- a/src/extensions/CMakeLists.txt +++ b/src/extensions/CMakeLists.txt @@ -1,2 +1,3 @@ add_subdirectory(rppqt) add_subdirectory(rppgrpc) +add_subdirectory(rppasio) diff --git a/src/extensions/rppasio/CMakeLists.txt b/src/extensions/rppasio/CMakeLists.txt new file mode 100644 index 000000000..f7fd97401 --- /dev/null +++ b/src/extensions/rppasio/CMakeLists.txt @@ -0,0 +1,11 @@ +# ReactivePlusPlus library +# +# Copyright Aleksey Loginov 2022 - present. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# https://www.boost.org/LICENSE_1_0.txt) +# +# Project home: https://github.com/victimsnino/ReactivePlusPlus +# + +rpp_add_library(rppasio) diff --git a/src/extensions/rppasio/rppasio/fwd.hpp b/src/extensions/rppasio/rppasio/fwd.hpp new file mode 100644 index 000000000..047846684 --- /dev/null +++ b/src/extensions/rppasio/rppasio/fwd.hpp @@ -0,0 +1,23 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +/** + * @defgroup rppasio RPPASIO + * @brief RppAsio is extension of RPP which enables support of boost-asio library. + */ + + +/** + * @defgroup asio_schedulers Asio Schedulers + * @ingroup rppasio + */ +#include diff --git a/src/extensions/rppasio/rppasio/rppasio.hpp b/src/extensions/rppasio/rppasio/rppasio.hpp new file mode 100644 index 000000000..92d130473 --- /dev/null +++ b/src/extensions/rppasio/rppasio/rppasio.hpp @@ -0,0 +1,14 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include diff --git a/src/extensions/rppasio/rppasio/schedulers.hpp b/src/extensions/rppasio/rppasio/schedulers.hpp new file mode 100644 index 000000000..52db0686d --- /dev/null +++ b/src/extensions/rppasio/rppasio/schedulers.hpp @@ -0,0 +1,20 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +/** + * @defgroup asio_schedulers Asio Schedulers + * @brief Scheduler is the way to introduce multi-threading in your application via RPP + * @see https://reactivex.io/documentation/scheduler.html + * @ingroup rppasio + */ + +#include diff --git a/src/extensions/rppasio/rppasio/schedulers/fwd.hpp b/src/extensions/rppasio/rppasio/schedulers/fwd.hpp new file mode 100644 index 000000000..41e24d8a3 --- /dev/null +++ b/src/extensions/rppasio/rppasio/schedulers/fwd.hpp @@ -0,0 +1,16 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +namespace rppasio::schedulers +{ + class strand; +} // namespace rppasio::schedulers diff --git a/src/extensions/rppasio/rppasio/schedulers/strand.hpp b/src/extensions/rppasio/rppasio/schedulers/strand.hpp new file mode 100644 index 000000000..feb6ad875 --- /dev/null +++ b/src/extensions/rppasio/rppasio/schedulers/strand.hpp @@ -0,0 +1,190 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include + +#include +#include +#include + +namespace rppasio::schedulers +{ + /** + * @brief Asio based scheduler where each worker is assigned an asio `strand` to execute schedulables with the + * guarantee that none of those `schedulables` will execute concurrently. + * @details This scheduler can efficiently enable multi-threading execution when running provided io_context + * in multiple threads. Compared to the `thread_pool` scheduler, a worker is not pinned to a single thread and + * `schedulables` are instead dynamically dispatched to potentially different io_context threads depending + * on load. + * @ingroup asio_schedulers + */ + class strand + { + private: + class state_t : public std::enable_shared_from_this + { + class current_thread_queue_guard; + + public: + state_t(const asio::io_context::executor_type& executor) + : m_strand{executor.context()} + { + } + + template + void defer(Fn&& fn, Handler&& handler, Args&&... args) const + { + if (handler.is_disposed()) + return; + + asio::post(asio::bind_executor(m_strand, [self = this->shared_from_this(), fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)]() mutable { + if (handler.is_disposed()) + return; + + current_thread_queue_guard guard{*self}; + if (const auto new_duration = fn(handler, args...)) + self->defer_with_time(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); + })); + } + + template + void defer_with_time(Time time, Fn&& fn, Handler&& handler, Args&&... args) const + { + if (handler.is_disposed()) + return; + + auto timer = std::make_shared>(m_strand.context(), time); + timer->async_wait(asio::bind_executor(m_strand, [self = this->shared_from_this(), timer, fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)](const asio::error_code& ec) mutable { + if (ec || handler.is_disposed()) + return; + + current_thread_queue_guard guard{*self}; + if (const auto new_duration = fn(handler, args...)) + self->defer_with_time(new_duration->value, std::move(fn), std::move(handler), std::move(args)...); + })); + } + + private: + // Guard draining schedulables queued to thread local queue to schedule them back to strand queue + class current_thread_queue_guard + { + public: + current_thread_queue_guard(const state_t& state) + : m_process_on_destruction{!rpp::schedulers::current_thread::get_queue()} + , m_state(state) + { + if (m_process_on_destruction) + rpp::schedulers::current_thread::get_queue() = &m_queue; + } + ~current_thread_queue_guard() + { + if (m_process_on_destruction) + process_queue(); + } + current_thread_queue_guard(const current_thread_queue_guard&) = delete; + current_thread_queue_guard(current_thread_queue_guard&&) = delete; + + private: + struct handler + { + bool is_disposed() const noexcept + { + return m_schedulable->is_disposed(); + } + + void on_error(const std::exception_ptr& ep) const + { + m_schedulable->on_error(ep); + } + + std::shared_ptr m_schedulable; + }; + + private: + void process_queue() + { + while (!m_queue.is_empty()) + { + const auto top = m_queue.pop(); + if (top->is_disposed()) + continue; + + m_state.defer_with_time( + top->get_timepoint(), + [top](const auto&) -> rpp::schedulers::optional_delay_to { + if (const auto advanced_call = top->make_advanced_call()) + { + const auto tp = top->handle_advanced_call(*advanced_call); + top->set_timepoint(tp); + return rpp::schedulers::delay_to{tp}; + } + return std::nullopt; + }, + handler{top}); + } + rpp::schedulers::current_thread::get_queue() = nullptr; + } + + private: + rpp::schedulers::details::schedulables_queue m_queue; + bool m_process_on_destruction; + const state_t& m_state; + }; + + private: + asio::io_context::strand m_strand; + }; + + class worker_strategy + { + public: + explicit worker_strategy(const asio::io_context::executor_type& executor) + : m_state{std::make_shared(executor)} + { + } + + template Fn> + void defer_for(rpp::schedulers::duration duration, Fn&& fn, Handler&& handler, Args&&... args) const + { + if (duration == rpp::schedulers::duration::zero()) + m_state->defer(std::forward(fn), std::forward(handler), std::forward(args)...); + else + m_state->defer_with_time(duration, std::forward(fn), std::forward(handler), std::forward(args)...); + } + + template Fn> + void defer_to(rpp::schedulers::time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const + { + m_state->defer_with_time(tp, std::forward(fn), std::forward(handler), std::forward(args)...); + } + + static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); } + + private: + std::shared_ptr m_state; + }; + + public: + explicit strand(asio::io_context::executor_type executor) + : m_executor{std::move(executor)} + { + } + + auto create_worker() const + { + return rpp::schedulers::worker{m_executor}; + } + + asio::io_context::executor_type m_executor; + }; +} // namespace rppasio::schedulers diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index 3fa99c84b..bf2c9ef7e 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -84,6 +84,7 @@ namespace rpp::schedulers */ class current_thread { + public: friend class new_thread; class worker_strategy; diff --git a/src/rpp/rpp/schedulers/details/queue.hpp b/src/rpp/rpp/schedulers/details/queue.hpp index 4bd73b526..d2578e417 100644 --- a/src/rpp/rpp/schedulers/details/queue.hpp +++ b/src/rpp/rpp/schedulers/details/queue.hpp @@ -75,6 +75,8 @@ namespace rpp::schedulers::details virtual bool is_disposed() const noexcept = 0; + virtual void on_error(const std::exception_ptr& ep) const = 0; + time_point get_timepoint() const { return m_time_point; } void set_timepoint(const time_point& timepoint) { m_time_point = timepoint; } @@ -159,6 +161,8 @@ namespace rpp::schedulers::details bool is_disposed() const noexcept override { return m_args.template get<0>().is_disposed(); } + void on_error(const std::exception_ptr& ep) const override { m_args.template get<0>().on_error(ep); } + private: RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple m_args; RPP_NO_UNIQUE_ADDRESS Fn m_fn; diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index fb39b2cfe..97b750c67 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -28,6 +28,10 @@ macro(add_test_target target_name module files) target_link_libraries(${TARGET} PRIVATE rppgrpc_tests_proto) endif() + if (${module} STREQUAL rppasio) + rpp_add_asio_support_to_executable(${TARGET}) + endif() + target_compile_features(${TARGET} PRIVATE cxx_std_20) if(MSVC) @@ -59,3 +63,7 @@ if (RPP_BUILD_GRPC_CODE) rpp_add_proto_target(rppgrpc_tests_proto rppgrpc/proto.proto) rpp_register_tests(rppgrpc) endif() + +if (RPP_BUILD_ASIO_CODE) + rpp_register_tests(rppasio) +endif() diff --git a/src/tests/rppasio/test_rppasio_scheduler.cpp b/src/tests/rppasio/test_rppasio_scheduler.cpp new file mode 100644 index 000000000..87df9aa6a --- /dev/null +++ b/src/tests/rppasio/test_rppasio_scheduler.cpp @@ -0,0 +1,165 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "rpp_trompeloil.hpp" + +#include + +using namespace std::chrono_literals; + +namespace +{ + void drain(const auto& disposable, asio::io_context& context) + { + while (!disposable.is_disposed()) + { + context.run_one(); + } + } +} // namespace + +TEST_CASE("strand shares current_thread queue") +{ + asio::io_context context; + mock_observer mock; + bool inner_schedule_executed = false; + + { + auto worker = rppasio::schedulers::strand{context.get_executor()}.create_worker(); + auto obs = mock.get_observer().as_dynamic(); + + worker.schedule([&](const auto& obs) { + // Strand and current_thread share same queue that can be potentially executed in different threads + rpp::schedulers::current_thread::create_worker().schedule([&inner_schedule_executed](const auto&) { + inner_schedule_executed = true; + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + + CHECK_FALSE(inner_schedule_executed); + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + } + + context.run_one(); + context.run_one(); + + CHECK(inner_schedule_executed); +} + +TEST_CASE("strand drains current_thread queue") +{ + asio::io_context context; + mock_observer mock; + trompeloeil::sequence seq; + + auto disposable = rpp::source::just(rppasio::schedulers::strand{context.get_executor()}, 1, 2, 3) + | rpp::ops::flat_map([](int v) { + return rpp::source::interval(10ms, rpp::schedulers::current_thread{}) + | rpp::ops::map([v](size_t) { return v; }); + }) + | rpp::ops::take(3) + | rpp::ops::subscribe_with_disposable(mock.get_observer()); + + REQUIRE_CALL(*mock, on_next_rvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_rvalue(2)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_rvalue(3)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + drain(disposable, context); +} + +TEST_CASE("strand works till end") +{ + asio::io_context context; + mock_observer mock; + trompeloeil::sequence seq; + + auto disposable = rpp::source::just(1, 2, 3) + | rpp::operators::subscribe_on(rppasio::schedulers::strand{context.get_executor()}) + | rpp::operators::subscribe_with_disposable(mock.get_observer()); + + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(2)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + drain(disposable, context); +} + + +TEST_CASE("strand in combination with current_thread") +{ + asio::io_context context; + mock_observer mock; + trompeloeil::sequence seq; + + auto scheduler = rppasio::schedulers::strand{context.get_executor()}; + auto disposable = rpp::source::just(scheduler, 1, 2, 3) + | rpp::ops::flat_map([](int v) { + return rpp::source::just(rpp::schedulers::current_thread{}, v); + }) + | rpp::ops::subscribe_with_disposable(mock.get_observer()); + + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(2)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + drain(disposable, context); +} + +TEST_CASE("strand worker has correct execution order") +{ + asio::io_context context; + auto obs = mock_observer_strategy{}.get_observer().as_dynamic(); + bool has_run_first_task = false; + bool has_run_second_task = false; + + auto worker = rppasio::schedulers::strand{context.get_executor()}.create_worker(); + + worker.schedule( + 50ms, + [&](const auto&) { + has_run_first_task = true; + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + + worker.schedule( + 5ms, + [&](const auto&) { + has_run_second_task = true; + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + + context.run_one(); + CHECK_FALSE(has_run_first_task); + CHECK(has_run_second_task); + + context.run_one(); + CHECK(has_run_first_task); +} diff --git a/src/tests/utils/rpp_trompeloil.hpp b/src/tests/utils/rpp_trompeloil.hpp index e8508899c..f2fb3b7f1 100644 --- a/src/tests/utils/rpp_trompeloil.hpp +++ b/src/tests/utils/rpp_trompeloil.hpp @@ -5,6 +5,8 @@ #include +#include + #include #include @@ -66,6 +68,9 @@ class mock_observer static bool is_disposed() noexcept { return false; } static void set_upstream(const rpp::disposable_wrapper&) noexcept {} + auto get_observer() const { return rpp::observer>{*this}; } + auto get_observer(rpp::composite_disposable_wrapper d) const { return rpp::observer_with_disposable>{std::move(d), *this}; } + private: std::shared_ptr m_impl = std::make_shared(); };