From 43f089e3c4293d3c6052c09bb43d45b4876dc94e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 8 Jun 2024 19:52:44 +0300 Subject: [PATCH 01/20] all ine one --- BUILDING.md | 3 +- CMakePresets.json | 32 ++- Doxyfile | 4 +- Readme.md | 35 +++ cmake/dependencies.cmake | 83 ++++-- cmake/install-rules.cmake | 6 + cmake/variables.cmake | 4 + conanfile.py | 8 +- src/examples/CMakeLists.txt | 4 + src/examples/rppgrpc/CMakeLists.txt | 2 + .../rppgrpc/communication/CMakeLists.txt | 9 + src/examples/rppgrpc/communication/client.cpp | 78 ++++++ .../rppgrpc/communication/protocol.proto | 15 + src/examples/rppgrpc/communication/server.cpp | 68 +++++ src/examples/rppgrpc/doxygen/CMakeLists.txt | 11 + .../rppgrpc/doxygen/client_reactor.cpp | 48 ++++ src/examples/rppgrpc/doxygen/protocol.proto | 15 + .../rppgrpc/doxygen/server_reactor.cpp | 0 src/extensions/CMakeLists.txt | 1 + src/extensions/rppgrpc/CMakeLists.txt | 11 + .../rppgrpc/rppgrpc/client_reactor.hpp | 265 ++++++++++++++++++ src/extensions/rppgrpc/rppgrpc/fwd.hpp | 24 ++ src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp | 15 + .../rppgrpc/rppgrpc/server_reactor.hpp | 263 +++++++++++++++++ .../rppgrpc/rppgrpc/utils/exceptions.hpp | 22 ++ src/rpp/rpp/subjects/behavior_subject.hpp | 1 - src/tests/CMakeLists.txt | 9 + src/tests/rppgrpc/proto.proto | 15 + src/tests/rppgrpc/test_async_client.cpp | 211 ++++++++++++++ 29 files changed, 1221 insertions(+), 41 deletions(-) create mode 100644 src/examples/rppgrpc/CMakeLists.txt create mode 100644 src/examples/rppgrpc/communication/CMakeLists.txt create mode 100644 src/examples/rppgrpc/communication/client.cpp create mode 100644 src/examples/rppgrpc/communication/protocol.proto create mode 100644 src/examples/rppgrpc/communication/server.cpp create mode 100644 src/examples/rppgrpc/doxygen/CMakeLists.txt create mode 100644 src/examples/rppgrpc/doxygen/client_reactor.cpp create mode 100644 src/examples/rppgrpc/doxygen/protocol.proto create mode 100644 src/examples/rppgrpc/doxygen/server_reactor.cpp create mode 100644 src/extensions/rppgrpc/CMakeLists.txt create mode 100644 src/extensions/rppgrpc/rppgrpc/client_reactor.hpp create mode 100644 src/extensions/rppgrpc/rppgrpc/fwd.hpp create mode 100644 src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp create mode 100644 src/extensions/rppgrpc/rppgrpc/server_reactor.hpp create mode 100644 src/extensions/rppgrpc/rppgrpc/utils/exceptions.hpp create mode 100644 src/tests/rppgrpc/proto.proto create mode 100644 src/tests/rppgrpc/test_async_client.cpp diff --git a/BUILDING.md b/BUILDING.md index d6834db5e..76f001ec6 100644 --- a/BUILDING.md +++ b/BUILDING.md @@ -26,7 +26,8 @@ But RPP is header-only library, so, without enabling any extra options is just c - `RPP_BUILD_TESTS` - (ON/OFF) build unit tests (default OFF) - `RPP_BUILD_EXAMPLES` - (ON/OFF) build examples of usage of RPP (default OFF) - `RPP_BUILD_SFML_CODE` - (ON/OFF) build RPP code related to SFML or not (default OFF) - requires SFML to be installed -- `RPP_BUILD_QT_CODE` - (ON/OFF) build RPPQT related code (examples/tests)(rppqt module doesn't requires this one) (default OFF) - requires QT5/6 to be installed +- `RPP_BUILD_QT_CODE` - (ON/OFF) build QT related code (examples/tests)(rppqt module doesn't requires this one) (default OFF) - requires QT5/6 to be installed +- `RPP_BUILD_GRPC_CODE` - (ON/OFF) build GRPC related code (examples/tests)(rppgrpc module doesn't requires this one) (default OFF) - requires grpc++/protobuf to be installed By default, it provides rpp and rppqt INTERFACE modules. diff --git a/CMakePresets.json b/CMakePresets.json index 09a71abae..ff76dd7e8 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -138,6 +138,13 @@ "RPP_BUILD_QT_CODE" : "ON" } }, + { + "name" : "build-grpc", + "hidden": true, + "cacheVariables": { + "RPP_BUILD_GRPC_CODE" : "ON" + } + }, { "name" : "use-conan", "hidden": true, @@ -149,6 +156,7 @@ }, { "name": "build-dir", + "hidden": true, "binaryDir": "${sourceDir}/build" }, @@ -160,7 +168,7 @@ }, { "name": "ci-coverage-gcc", - "inherits": ["ci-build", "build-tests", "build-qt", "ci-unix", "ci-gcc"], + "inherits": ["ci-build", "build-tests", "build-qt", "build-grpc", "ci-unix", "ci-gcc"], "cacheVariables": { "RPP_ENABLE_COVERAGE": "ON", "CMAKE_CXX_FLAGS": "-O0 -g --coverage -fkeep-inline-functions -fkeep-static-functions -fprofile-arcs -ftest-coverage -fno-inline -fno-omit-frame-pointer -fno-optimize-sibling-calls", @@ -171,7 +179,7 @@ }, { "name": "ci-coverage-clang", - "inherits": ["ci-build", "build-tests", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-qt", "build-grpc", "ci-unix", "ci-clang"], "cacheVariables": { "RPP_ENABLE_COVERAGE": "ON", "CMAKE_CXX_FLAGS": "-fprofile-instr-generate -fcoverage-mapping", @@ -182,58 +190,58 @@ }, { "name": "ci-sanitize-tsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=thread -g -O1" } }, { "name": "ci-sanitize-asan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=address -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1" } }, { "name": "ci-sanitize-lsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=leak -fno-omit-frame-pointer -g -O1" } }, { "name": "ci-sanitize-msan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=memory -fno-optimize-sibling-calls -fsanitize-memory-track-origins=2 -fno-omit-frame-pointer -g -O2" } }, { "name": "ci-sanitize-ubsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=undefined" } }, { "name": "ci-macos-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-sfml", "ci-unix"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix"] }, { "name": "ci-ubuntu-clang-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-sfml", "ci-unix", "ci-clang", "cppcheck", "clang-tidy"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-clang", "cppcheck", "clang-tidy"] }, { "name": "ci-ubuntu-gcc-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-sfml", "ci-unix", "ci-gcc", "cppcheck", "clang-tidy"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-gcc", "cppcheck", "clang-tidy"] }, { "name": "ci-windows-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-sfml", "ci-win64"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-win64"] }, { "name": "ci-ubuntu-clang-tests-no-checks", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-sfml", "ci-unix", "ci-clang" ] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-clang" ] }, diff --git a/Doxyfile b/Doxyfile index 92b818846..393db99f7 100644 --- a/Doxyfile +++ b/Doxyfile @@ -947,6 +947,7 @@ INPUT = src/rpp \ src/extensions \ src/examples/rpp/doxygen \ src/examples/rppqt/doxygen \ + src/examples/rppgrpc/doxygen \ docs # This tag can be used to specify the character encoding of the source files @@ -1078,7 +1079,8 @@ EXCLUDE_SYMBOLS = # command). EXAMPLE_PATH = src/examples/rpp/doxygen \ - src/examples/rppqt/doxygen + src/examples/rppqt/doxygen \ + src/examples/rppgrpc/doxygen # If the value of the EXAMPLE_PATH tag contains directories, you can use the # EXAMPLE_PATTERNS tag to specify one or more wildcard pattern (like *.cpp and diff --git a/Readme.md b/Readme.md index ab2190494..23d6de960 100644 --- a/Readme.md +++ b/Readme.md @@ -32,6 +32,7 @@ rpp::source::from_callable(&::getchar) There we are creating observable (soure of emissions/values/data) to emit value via invoking of `getchar` function, `repeat`-ing it infinite amount of time till termination event happening. It emits values while symbol is not equal to `0`, takeing only **not** digits, maping them to upper case and then just printing to console. +<<<<<<< HEAD Also RPP supports QT out of box. Checkout [RPPQT reference](https://victimsnino.github.io/ReactivePlusPlus/v2/docs/html/group__rppqt.html). For example: ```cpp @@ -47,6 +48,40 @@ rppqt::source::from_signal(*button, &QPushButton::clicked); // <------ react on { clicks_count_label->setText(QString{"Clicked %1 times in total!"}.arg(clicks)); }); +======= +## QT +Also it supports QT out of box. Checkout [RPPQT reference](https://victimsnino.github.io/ReactivePlusPlus/v2/docs/html/group__rppqt.html) + +```cpp +auto button = new QPushButton("Click me!"); +auto label = new QLabel(); + +rppqt::source::from_signal(*button, &QPushButton::clicked); + | rpp::operators::observe_on(rpp::schedulers::new_thread{}) + // some heavy job + | rpp::operators::scan(0, [](int seed, auto) { return ++seed; }) + | rpp::operators::observe_on(rppqt::schedulers::main_thread_scheduler{}) // <--- go back to main QT scheduler + | rpp::operators::subscribe([&clicks_count_label](int clicks) + { + clicks_count_label->setText(QString{"Clicked %1 times in total!"}.arg(clicks)); + }); +``` + + + + +## Why do you need it? + +Check the [User Guide](https://victimsnino.github.io/ReactivePlusPlus/v2/docs/html/md_docs_2readme.html) for a detailed overview of the Reactive Programming concept and RPP itself. + +In short, RPP can help you build complex pipelines to distribute values over time, connect "some sources of data" without directly connecting them. + +Take a look at the example code for QT. Here, you can see how to connect a button to a label and update it based on the number of clicks. +```cpp +auto button = new QPushButton("Click me!"); +auto clicks_count_label = new QLabel(); +rppqt::source::from_signal(*button_1, &QPushButton::clicked); +>>>>>>> 9561a40d (all ine one) ``` ## What about existing Reactive Extension libraries for C++? diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index d343ba044..26c018cb6 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -14,6 +14,33 @@ macro(rpp_handle_3rdparty TARGET_NAME) set_target_properties(${TARGET_NAME} PROPERTIES INTERFACE_SYSTEM_INCLUDE_DIRECTORIES $) endmacro() +macro(rpp_fetch_library_extended NAME URL TAG TARGET_NAME) + Include(FetchContent) + set(BUILD_SHARED_LIBS OFF CACHE INTERNAL "Build SHARED libraries") + + Set(FETCHCONTENT_QUIET FALSE) + + FetchContent_Declare( + ${NAME} + GIT_REPOSITORY ${URL} + GIT_TAG ${TAG} + GIT_SHALLOW TRUE + GIT_PROGRESS TRUE + GIT_SUBMODULES "" + ) + + FetchContent_MakeAvailable(${NAME}) + rpp_handle_3rdparty(${TARGET_NAME}) +endmacro() + +macro(rpp_fetch_library NAME URL TAG) + find_package(${NAME} QUIET) + if (NOT ${NAME}_FOUND) + message("-- RPP: Fetching ${NAME}...") + rpp_fetch_library_extended(${NAME} ${URL} ${TAG} ${NAME}) + endif() +endmacro() + # ===================== SFML ======================= if (RPP_BUILD_SFML_CODE AND RPP_BUILD_EXAMPLES) find_package(SFML COMPONENTS graphics system window REQUIRED) @@ -42,32 +69,44 @@ if (RPP_BUILD_QT_CODE AND (RPP_BUILD_TESTS OR RPP_BUILD_EXAMPLES)) endmacro() endif() -macro(rpp_fetch_library_extended NAME URL TAG TARGET_NAME) - Include(FetchContent) - set(BUILD_SHARED_LIBS OFF CACHE INTERNAL "Build SHARED libraries") +# ========================== GRPC ==================================== +if (RPP_BUILD_GRPC_CODE AND (RPP_BUILD_TESTS OR RPP_BUILD_EXAMPLES)) + find_package(Protobuf CONFIG REQUIRED) + find_package(gRPC CONFIG REQUIRED) - Set(FETCHCONTENT_QUIET FALSE) + rpp_handle_3rdparty(gRPC::grpc++) + rpp_handle_3rdparty(protobuf::protobuf) - FetchContent_Declare( - ${NAME} - GIT_REPOSITORY ${URL} - GIT_TAG ${TAG} - GIT_SHALLOW TRUE - GIT_PROGRESS TRUE - GIT_SUBMODULES "" - ) + macro(rpp_add_proto_target TARGET FILES) + add_library(${TARGET} STATIC ${FILES}) - FetchContent_MakeAvailable(${NAME}) - rpp_handle_3rdparty(${TARGET_NAME}) -endmacro() + target_link_libraries(${TARGET} + PUBLIC + gRPC::grpc++ + protobuf::libprotobuf + ${grpc_LIBRARIES_TARGETS} + ) -macro(rpp_fetch_library NAME URL TAG) - find_package(${NAME} QUIET) - if (NOT ${NAME}_FOUND) - message("-- RPP: Fetching ${NAME} from ${URL} by ${TAG}...") - rpp_fetch_library_extended(${NAME} ${URL} ${TAG} ${NAME}) - endif() -endmacro() + target_include_directories(${TARGET} PUBLIC ${CMAKE_CURRENT_BINARY_DIR}) + + get_target_property(grpc_cpp_plugin_location gRPC::grpc_cpp_plugin LOCATION ) + protobuf_generate(TARGET ${TARGET} OUT_VAR PROTO_FILES LANGUAGE cpp ) + + protobuf_generate( + TARGET ${TARGET} + LANGUAGE grpc + OUT_VAR GRPC_PROTO_FILES + # PROTOC_OUT_DIR "${PROTO_BINARY_DIR}" + PLUGIN protoc-gen-grpc=${grpc_cpp_plugin_location} + GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc) + + + set_target_properties(${TARGET} PROPERTIES INTERFACE_SYSTEM_INCLUDE_DIRECTORIES $) + set_target_properties(${TARGET} PROPERTIES CXX_CLANG_TIDY "") + set_target_properties(${TARGET} PROPERTIES CXX_CPPCHECK "") + endmacro() +endif() +>>>>>>> 9561a40d (all ine one) # ==================== RXCPP ======================= if (RPP_BUILD_RXCPP AND RPP_BUILD_BENCHMARKS) diff --git a/cmake/install-rules.cmake b/cmake/install-rules.cmake index 500b20bab..f38560340 100644 --- a/cmake/install-rules.cmake +++ b/cmake/install-rules.cmake @@ -8,6 +8,7 @@ install( DIRECTORY src/rpp src/extensions/rppqt + src/extensions/rppgrpc DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}" COMPONENT @@ -25,6 +26,11 @@ install( EXPORT RPPTargets INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/rppqt" ) +install( + TARGETS rppgrpc + EXPORT RPPTargets + INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/rppgrpc" +) write_basic_package_version_file( "${package}ConfigVersion.cmake" diff --git a/cmake/variables.cmake b/cmake/variables.cmake index 8d91fdb67..93a57a500 100644 --- a/cmake/variables.cmake +++ b/cmake/variables.cmake @@ -82,6 +82,7 @@ endfunction() # ------------ Options to tweak --------------------- option(RPP_BUILD_SFML_CODE "Enable SFML support in examples/code." OFF) option(RPP_BUILD_QT_CODE "Enable QT support in examples/code." OFF) +option(RPP_BUILD_GRPC_CODE "Enable grpc++ support in examples/code." OFF) if (RPP_DEVELOPER_MODE) option(RPP_BUILD_TESTS "Build unit tests tree." OFF) @@ -100,6 +101,9 @@ if (RPP_DEVELOPER_MODE) if (RPP_BUILD_BENCHMARKS) set(CONAN_ARGS "${CONAN_ARGS};-o rpp/*:with_benchmarks=True") endif() + if (RPP_BUILD_GRPC_CODE) + set(CONAN_ARGS "${CONAN_ARGS};-o rpp/*:with_grpc=True") + endif() endif() if(RPP_ENABLE_COVERAGE) diff --git a/conanfile.py b/conanfile.py index f7bb8f74a..355f386da 100644 --- a/conanfile.py +++ b/conanfile.py @@ -32,10 +32,10 @@ def requirements(self): if self.options.with_sfml: self.requires("sfml/2.6.1", options={"audio": False}) - # if self.options.with_grpc: - # self.requires("grpc/1.54.3", transitive_libs=True, transitive_headers=True) - # self.requires("protobuf/3.21.12") - # self.requires("libmount/2.39", override=True) + if self.options.with_grpc: + self.requires("grpc/1.54.3", transitive_libs=True, transitive_headers=True) + self.requires("protobuf/3.21.12") + self.requires("libmount/2.39", override=True) if self.options.with_cmake: self.tool_requires("cmake/3.29.3") diff --git a/src/examples/CMakeLists.txt b/src/examples/CMakeLists.txt index 041d2c3ef..fa5f9428a 100644 --- a/src/examples/CMakeLists.txt +++ b/src/examples/CMakeLists.txt @@ -3,3 +3,7 @@ add_subdirectory(rpp) if(RPP_BUILD_QT_CODE) add_subdirectory(rppqt) endif() + +if (RPP_BUILD_GRPC_CODE) + add_subdirectory(rppgrpc) +endif() diff --git a/src/examples/rppgrpc/CMakeLists.txt b/src/examples/rppgrpc/CMakeLists.txt new file mode 100644 index 000000000..f928b4394 --- /dev/null +++ b/src/examples/rppgrpc/CMakeLists.txt @@ -0,0 +1,2 @@ +add_subdirectory(communication) +add_subdirectory(doxygen) diff --git a/src/examples/rppgrpc/communication/CMakeLists.txt b/src/examples/rppgrpc/communication/CMakeLists.txt new file mode 100644 index 000000000..779b39cad --- /dev/null +++ b/src/examples/rppgrpc/communication/CMakeLists.txt @@ -0,0 +1,9 @@ +set(TARGET rppgrpc_communication) + +rpp_add_proto_target(${TARGET}_proto protocol.proto) + +add_executable(${TARGET}_server server.cpp) +target_link_libraries(${TARGET}_server PRIVATE ${TARGET}_proto rppgrpc) + +add_executable(${TARGET}_client client.cpp) +target_link_libraries(${TARGET}_client PRIVATE ${TARGET}_proto rppgrpc) diff --git a/src/examples/rppgrpc/communication/client.cpp b/src/examples/rppgrpc/communication/client.cpp new file mode 100644 index 000000000..b3ad2d9e8 --- /dev/null +++ b/src/examples/rppgrpc/communication/client.cpp @@ -0,0 +1,78 @@ +#include + +#include +#include + +#include "protocol.grpc.pb.h" +#include "protocol.pb.h" + + +int main() +{ + auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); + if (!channel) + { + std::cout << "NO CHANNEL" << std::endl; + return 0; + } + auto stub = TestService::NewStub(channel); + if (!stub) + { + std::cout << "NO STUB" << std::endl; + return 0; + } + + std::array ctx{}; + auto d = rpp::composite_disposable_wrapper::make(); + + rpp::subjects::publish_subject bidi_requests{}; + rpp::subjects::publish_subject bidi_responses{}; + bidi_responses.get_observable().subscribe(d, [](const Response& v) { + std::cout << "[BidireactionalResponse]: " << v.ShortDebugString() << std::endl; + }); + + rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::Bidirectional, + *stub->async(), + &ctx[0], + bidi_requests.get_observable() + | rpp::ops::take_while([](const std::string& v) { return v != "0"; }) + | rpp::ops::map([](const std::string& v) { + Request i{}; + i.set_value(std::string{"BidiRequest "} + v); + return i; + }), + bidi_responses.get_observer()); + + rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ClientSide, + *stub->async(), + &ctx[1], + bidi_responses.get_observable() + | rpp::ops::map([](const Response& response) { + Request request{}; + request.set_value(std::string{"ClientSideRequest "} + response.value()); + return request; + }), + rpp::make_lambda_observer(d, [](const Response& v) { + std::cout << "[ClientsideResponse]: " << v.ShortDebugString() << std::endl; + })); + Request req{}; + rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ServerSide, + *stub->async(), + &ctx[2], + &req, + rpp::make_lambda_observer(d, [](const Response& v) { + std::cout << "[ServerSideResponse]: " << v.ShortDebugString() << std::endl; + })); + + std::cout << "SUBSCRIBED" << std::endl; + + std::string in{}; + while (!d.is_disposed()) + { + std::getline(std::cin, in); + bidi_requests.get_observer().on_next(in); + in.clear(); + } + + return 0; +} diff --git a/src/examples/rppgrpc/communication/protocol.proto b/src/examples/rppgrpc/communication/protocol.proto new file mode 100644 index 000000000..8703bde05 --- /dev/null +++ b/src/examples/rppgrpc/communication/protocol.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +message Request { + string Value = 1; +} + +message Response { + string Value = 1; +} + +service TestService { + rpc ServerSide(Request) returns (stream Response) {} + rpc ClientSide(stream Request) returns (Response) {} + rpc Bidirectional(stream Request) returns (stream Response) {} +} diff --git a/src/examples/rppgrpc/communication/server.cpp b/src/examples/rppgrpc/communication/server.cpp new file mode 100644 index 000000000..90654870f --- /dev/null +++ b/src/examples/rppgrpc/communication/server.cpp @@ -0,0 +1,68 @@ + +#include + +#include +#include + +#include "protocol.grpc.pb.h" +#include "protocol.pb.h" + +class Service : public TestService::CallbackService +{ +public: + Service() + { + client_side_requests.get_observable().subscribe([](const Request& s) { std::cout << "[ClientSideRequest]: " << s.ShortDebugString() << std::endl; }); + } + + grpc::ServerBidiReactor<::Request, ::Response>* Bidirectional(::grpc::CallbackServerContext* /*context*/) override + { + rpp::subjects::publish_subject response{}; + rpp::subjects::publish_subject request{}; + request.get_observable() + | rpp::ops::subscribe([](const Request& s) { std::cout << "[BidireactionalRequest]: " << s.ShortDebugString() << std::endl; }); + request.get_observable() + | rpp::ops::map([](const Request& request) { + Response response{}; + response.set_value(std::string{"BidiResponse "} + request.value()); + return response; + }) + | rpp::ops::subscribe(response.get_observer()); + return rppgrpc::make_server_reactor(response.get_observable(), request.get_observer()); + } + + ::grpc::ServerReadReactor<::Request>* ClientSide(::grpc::CallbackServerContext* /*context*/, ::Response* /*response*/) override + { + return rppgrpc::make_server_reactor(client_side_requests.get_observer()); + } + + ::grpc::ServerWriteReactor<::Response>* ServerSide(::grpc::CallbackServerContext* /*context*/, const ::Request* /*request*/) override + { + return rppgrpc::make_server_reactor(client_side_requests.get_observable() + | rpp::ops::map([](const Request& v) { + Response response{}; + response.set_value(std::string{"ServerSideResponse "} + v.value()); + return response; + })); + } + +private: + rpp::subjects::publish_subject client_side_requests{}; +}; + +int main() +{ + Service service{}; + grpc::ServerBuilder builder{}; + + std::string server_address("localhost:50051"); + + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); + + auto server(builder.BuildAndStart()); + std::cout << "Server listening on " << server_address << std::endl; + server->Wait(); + + return 0; +} diff --git a/src/examples/rppgrpc/doxygen/CMakeLists.txt b/src/examples/rppgrpc/doxygen/CMakeLists.txt new file mode 100644 index 000000000..e99bba244 --- /dev/null +++ b/src/examples/rppgrpc/doxygen/CMakeLists.txt @@ -0,0 +1,11 @@ +file(GLOB_RECURSE FILES "*.cpp") + +rpp_add_proto_target(doyxgen_grpc_proto protocol.proto) + +foreach(SOURCE ${FILES}) + get_filename_component(BASE_NAME ${SOURCE} NAME_WE) + set(TARGET ${BASE_NAME}_doxygen_sample) + add_executable(${TARGET} ${SOURCE}) + target_link_libraries(${TARGET} PRIVATE rpp rppgrpc doyxgen_grpc_proto) + set_target_properties(${TARGET} PROPERTIES FOLDER Examples/rppqt/Doxygen) +endforeach() diff --git a/src/examples/rppgrpc/doxygen/client_reactor.cpp b/src/examples/rppgrpc/doxygen/client_reactor.cpp new file mode 100644 index 000000000..657dc7688 --- /dev/null +++ b/src/examples/rppgrpc/doxygen/client_reactor.cpp @@ -0,0 +1,48 @@ +#include + +#include +#include + +#include "protocol.grpc.pb.h" +/** + * \example client_reactor.cpp + **/ + +int main() // NOLINT(bugprone-exception-escape) +{ + { + //! [bidi_reactor] + auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); + auto stub = TestService::NewStub(channel); + + grpc::ClientContext ctx{}; + rpp::subjects::publish_subject requests{}; + rpp::subjects::publish_subject responses{}; + rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::Bidirectional, *stub->async(), &ctx, requests.get_observable(), responses.get_observer()); + //! [bidi_reactor] + } + { + //! [read_reactor] + auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); + auto stub = TestService::NewStub(channel); + + grpc::ClientContext ctx{}; + rpp::subjects::publish_subject responses{}; + Request request{}; + rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ServerSide, *stub->async(), &ctx, &request, responses.get_observer()); + //! [read_reactor] + } + { + //! [write_reactor] + auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); + auto stub = TestService::NewStub(channel); + + grpc::ClientContext ctx{}; + rpp::subjects::publish_subject requests{}; + rpp::subjects::publish_subject responses{}; + rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ClientSide, *stub->async(), &ctx, requests.get_observable(), responses.get_observer()); + //! [write_reactor] + } + + return 0; +} diff --git a/src/examples/rppgrpc/doxygen/protocol.proto b/src/examples/rppgrpc/doxygen/protocol.proto new file mode 100644 index 000000000..8703bde05 --- /dev/null +++ b/src/examples/rppgrpc/doxygen/protocol.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +message Request { + string Value = 1; +} + +message Response { + string Value = 1; +} + +service TestService { + rpc ServerSide(Request) returns (stream Response) {} + rpc ClientSide(stream Request) returns (Response) {} + rpc Bidirectional(stream Request) returns (stream Response) {} +} diff --git a/src/examples/rppgrpc/doxygen/server_reactor.cpp b/src/examples/rppgrpc/doxygen/server_reactor.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/src/extensions/CMakeLists.txt b/src/extensions/CMakeLists.txt index 63cf503d3..397ebb5b2 100644 --- a/src/extensions/CMakeLists.txt +++ b/src/extensions/CMakeLists.txt @@ -1 +1,2 @@ add_subdirectory(rppqt) +add_subdirectory(rppgrpc) diff --git a/src/extensions/rppgrpc/CMakeLists.txt b/src/extensions/rppgrpc/CMakeLists.txt new file mode 100644 index 000000000..68720d72c --- /dev/null +++ b/src/extensions/rppgrpc/CMakeLists.txt @@ -0,0 +1,11 @@ +# ReactivePlusPlus library +# +# Copyright Aleksey Loginov 2022 - present. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# https://www.boost.org/LICENSE_1_0.txt) +# +# Project home: https://github.com/victimsnino/ReactivePlusPlus +# + +rpp_add_library(rppgrpc) diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp new file mode 100644 index 000000000..7e791fb63 --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -0,0 +1,265 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include + +#include +#include +#include + +#include + +namespace rppgrpc::details +{ + // template + // class client_write_reactor final : public grpc::ClientWriteReactor + // { + // using Response = rpp::utils::extract_observer_type_t; + // using Base = grpc::ClientWriteReactor; + + // public: + // template Observable, rpp::constraint::decayed_same_as TObserver> + // client_write_reactor(const Observable& messages, TObserver&& events, Response*& ptr_to_write_response) + // : m_observer{std::forward(events)} + // , m_disposable{messages.subscribe_with_disposable([this] T>(T&& message) { + // std::lock_guard lock{m_write_mutex}; + // m_write.push_back(std::forward(message)); + // if (m_write.size() == 1) + // Base::StartWrite(&m_write.front()); }, + // [this](const std::exception_ptr&) { + // // Base::StartWritesDone(); + // context_.TryCancel(); + // }, + // [this]() { + // Base::StartWritesDone(); + // })} + // { + // ptr_to_write_response = &m_read; + // } + + // void Init() + // { + // Base::StartCall(); + // } + + // private: + // void OnWriteDone(bool ok) override + // { + // if (!ok) + // { + // m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); + // context_.TryCancel(); + // return; + // } + + // std::lock_guard lock{m_write_mutex}; + // m_write.pop_front(); + + // if (!m_write.empty()) + // { + // Base::StartWrite(&m_write.front()); + // } + // } + + // void OnDone(const grpc::Status& s) override + // { + // if (s.ok()) + // { + // m_observer.on_next(std::move(m_read)); + // m_observer.on_completed(); + // } + // else + // { + // m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); + // } + // Destroy(); + // } + + // private: + // void Destroy() + // { + // m_disposable.dispose(); + // delete this; + // } + + // private: + // Observer m_observer; + // rpp::disposable_wrapper m_disposable; + + // Response m_read{}; + + // std::mutex m_write_mutex{}; + // std::list m_write{}; + // }; + + // template + // class client_read_reactor final : public grpc::ClientReadReactor> + // { + // using Response = rpp::utils::extract_observer_type_t; + // using Base = grpc::ClientReadReactor; + + // public: + // template TObserver> + // requires (!rpp::constraint::decayed_same_as>) + // explicit client_read_reactor(TObserver&& events) + // : m_observer{std::forward(events)} + // { + // } + + // client_read_reactor(const client_read_reactor&) = delete; + // client_read_reactor(client_read_reactor&&) = delete; + + // void Init() + // { + // Base::StartCall(); + // Base::StartRead(&m_read); + // } + + // private: + // void OnReadDone(bool ok) override + // { + // if (!ok) + // { + // m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnReadDone is not ok"})); + // context_.TryCancel(); + // return; + // } + // m_observer.on_next(m_read); + // Base::StartRead(&m_read); + // } + + // void OnDone(const grpc::Status& s) override + // { + // if (s.ok()) + // { + // m_observer.on_completed(); + // } + // else + // { + // m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); + // } + // Destroy(); + // } + + // private: + // void Destroy() + // { + // delete this; + // } + + // private: + // Observer m_observer; + // Response m_read{}; + // }; +} // namespace rppgrpc::details +namespace rppgrpc +{ + template + class client_bidi_reactor final : public grpc::ClientBidiReactor + { + using Base = grpc::ClientBidiReactor; + + public: + client_bidi_reactor() + { + m_requests.get_observable().subscribe( + [this] T>(T&& message) { + std::lock_guard lock{m_write_mutex}; + m_write.push_back(std::forward(message)); + if (m_write.size() == 1) + Base::StartWrite(&m_write.front()); + }, + [this](const std::exception_ptr&) { + Base::StartWritesDone(); + }, + [this]() { + Base::StartWritesDone(); + }); + } + + void init() + { + Base::StartCall(); + Base::StartRead(&m_read); + } + + auto get_observer() + { + return m_requests.get_observer(); + } + + auto get_observable() + { + return m_observer.get_observable(); + } + + private: + using Base::StartCall; + using Base::StartRead; + + void OnReadDone(bool ok) override + { + if (!ok) + { + m_requests.get_disposable().dispose(); + m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnReadDone is not ok"})); + return; + } + m_observer.get_observer().on_next(m_read); + Base::StartRead(&m_read); + } + + void OnWriteDone(bool ok) override + { + if (!ok) + { + m_requests.get_disposable().dispose(); + m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); + return; + } + + std::lock_guard lock{m_write_mutex}; + m_write.pop_front(); + + if (!m_write.empty()) + { + Base::StartWrite(&m_write.front()); + } + } + + void OnDone(const grpc::Status& s) override + { + m_requests.get_disposable().dispose(); + + if (s.ok()) + { + m_observer.get_observer().on_completed(); + } + else + { + m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); + } + delete this; + } + + private: + rpp::subjects::serialized_publish_subject m_requests{}; + + rpp::subjects::publish_subject m_observer; + Response m_read{}; + + std::mutex m_write_mutex{}; + std::deque m_write{}; + }; +} // namespace rppgrpc diff --git a/src/extensions/rppgrpc/rppgrpc/fwd.hpp b/src/extensions/rppgrpc/rppgrpc/fwd.hpp new file mode 100644 index 000000000..03059bd13 --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/fwd.hpp @@ -0,0 +1,24 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +/** + * @defgroup rppgrpc RPPGRPC + * @brief RppGrpc is extension of RPP which enables support of grpc library. + */ + +namespace rppgrpc +{ + template + class client_bidi_reactor; +} // namespace rppgrpc diff --git a/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp b/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp new file mode 100644 index 000000000..15153db75 --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp @@ -0,0 +1,15 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include +#include diff --git a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp new file mode 100644 index 000000000..40ddb30e8 --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp @@ -0,0 +1,263 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include + +#include +#include +#include + +#include + +namespace rppgrpc::details +{ + template + class server_bidi_reactor final : public grpc::ServerBidiReactor, Response> + { + using Request = rpp::utils::extract_observer_type_t; + using Base = grpc::ServerBidiReactor; + + public: + template Observable, rpp::constraint::decayed_same_as TObserver> + server_bidi_reactor(const Observable& messages, TObserver&& events) + : m_observer{std::forward(events)} + , m_disposable{messages.subscribe_with_disposable([this] T>(T&& message) { + std::lock_guard lock{m_write_mutex}; + m_write.push_back(std::forward(message)); + if (m_write.size() == 1) + Base::StartWrite(&m_write.front()); }, + [this](const std::exception_ptr&) { + Base::Finish(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"}); + }, + [this]() { + Base::Finish(grpc::Status::OK); + })} + { + Base::StartSendInitialMetadata(); + Base::StartRead(&m_read); + } + + private: + void OnReadDone(bool ok) override + { + if (!ok) + { + m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnReadDone is not ok"})); + Base::Finish(grpc::Status::CANCELLED); + return; + } + m_observer.on_next(m_read); + Base::StartRead(&m_read); + } + + void OnWriteDone(bool ok) override + { + if (!ok) + { + m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); + Base::Finish(grpc::Status::CANCELLED); + return; + } + + std::lock_guard lock{m_write_mutex}; + m_write.pop_front(); + + if (!m_write.empty()) + { + Base::StartWrite(&m_write.front()); + } + } + + void OnDone() override + { + m_observer.on_completed(); + Destroy(); + } + + void OnCancel() override + { + m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); + Base::Finish(grpc::Status::CANCELLED); + } + + private: + void Destroy() + { + m_disposable.dispose(); + delete this; + } + + private: + Observer m_observer; + rpp::disposable_wrapper m_disposable; + + Request m_read{}; + + std::mutex m_write_mutex{}; + std::deque m_write{}; + }; + + template + class server_write_reactor final : public grpc::ServerWriteReactor + { + using Base = grpc::ServerWriteReactor; + + public: + template Observable, rpp::constraint::decayed_same_as TObserver> + requires (!rpp::constraint::decayed_same_as>) + server_write_reactor(const Observable& messages, TObserver&& observer) + : m_observer{std::forward(observer)} + , m_disposable{messages.subscribe_with_disposable([this] T>(T&& message) { + std::lock_guard lock{m_write_mutex}; + m_write.push_back(std::forward(message)); + if (m_write.size() == 1) + Base::StartWrite(&m_write.front()); }, + [this](const std::exception_ptr&) { + Base::Finish(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"}); + }, + [this]() { + Base::Finish(grpc::Status::OK); + })} + { + Base::StartSendInitialMetadata(); + } + + + server_write_reactor(const server_write_reactor&) = delete; + server_write_reactor(server_write_reactor&&) = delete; + + private: + void OnWriteDone(bool ok) override + { + if (!ok) + { + m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); + Base::Finish(grpc::Status::CANCELLED); + return; + } + + std::lock_guard lock{m_write_mutex}; + m_write.pop_front(); + + if (!m_write.empty()) + { + Base::StartWrite(&m_write.front()); + } + } + + void OnDone() override + { + m_observer.on_completed(); + Destroy(); + } + + void OnCancel() override + { + m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); + Base::Finish(grpc::Status::CANCELLED); + } + + private: + void Destroy() + { + m_disposable.dispose(); + delete this; + } + + private: + Observer m_observer; + rpp::disposable_wrapper m_disposable; + + std::mutex m_write_mutex{}; + std::deque m_write{}; + }; + + template + class server_reader_reactor final : public grpc::ServerReadReactor> + { + using Request = rpp::utils::extract_observer_type_t; + using Base = grpc::ServerReadReactor; + + public: + template TObserver> + requires (!rpp::constraint::decayed_same_as>) + explicit server_reader_reactor(TObserver&& events) + : m_observer{std::forward(events)} + { + Base::StartSendInitialMetadata(); + Base::StartRead(&m_read); + } + + server_reader_reactor(const server_reader_reactor&) = delete; + server_reader_reactor(server_reader_reactor&&) = delete; + + private: + void OnReadDone(bool ok) override + { + if (!ok) + { + m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnReadDone is not ok"})); + Base::Finish(grpc::Status::CANCELLED); + return; + } + m_observer.on_next(m_read); + Base::StartRead(&m_read); + } + + void OnDone() override + { + m_observer.on_completed(); + Destroy(); + } + + void OnCancel() override + { + m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); + Base::Finish(grpc::Status::CANCELLED); + } + + private: + void Destroy() + { + m_disposable.dispose(); + delete this; + } + + private: + Observer m_observer; + rpp::disposable_wrapper m_disposable; + + Request m_read{}; + }; +} // namespace rppgrpc::details +namespace rppgrpc +{ + // template + // auto make_server_reactor(const Observable& responses, Observer&& requests) + // { + // return new details::server_bidi_reactor, std::decay_t>(responses, std::forward(requests)); + // } + + // template + // auto make_server_reactor(Observer&& requests) + // { + // return new details::server_reader_reactor>(std::forward(requests)); + // } + + // template + // auto make_server_reactor(const Observable& responses) + // { + // return new details::server_write_reactor>(responses); + // } +} // namespace rppgrpc diff --git a/src/extensions/rppgrpc/rppgrpc/utils/exceptions.hpp b/src/extensions/rppgrpc/rppgrpc/utils/exceptions.hpp new file mode 100644 index 000000000..6e3124496 --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/utils/exceptions.hpp @@ -0,0 +1,22 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +namespace rppgrpc::utils +{ + struct reactor_failed : public std::runtime_error + { + using std::runtime_error::runtime_error; + }; + +} // namespace rppgrpc::utils diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp index 261548c88..3ab5dc960 100644 --- a/src/rpp/rpp/subjects/behavior_subject.hpp +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -17,7 +17,6 @@ #include #include -#include #include namespace rpp::subjects::details diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 855142c86..f28168ae5 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -24,6 +24,11 @@ macro(add_test_target target_name module files) rpp_add_qt_support_to_executable(${TARGET}) endif() + if (${module} STREQUAL rppgrpc) + rpp_add_proto_target(${TARGET}_proto rppgrpc/proto.proto) + target_link_libraries(${TARGET} PRIVATE ${TARGET}_proto) + endif() + target_compile_features(${TARGET} PRIVATE cxx_std_20) if(MSVC) @@ -50,3 +55,7 @@ rpp_register_tests(rpp) if (RPP_BUILD_QT_CODE) rpp_register_tests(rppqt) endif() + +if (RPP_BUILD_GRPC_CODE) + rpp_register_tests(rppgrpc) +endif() diff --git a/src/tests/rppgrpc/proto.proto b/src/tests/rppgrpc/proto.proto new file mode 100644 index 000000000..a7e1c0f32 --- /dev/null +++ b/src/tests/rppgrpc/proto.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +message Request { + uint32 Value = 1; +} + +message Response { + uint32 Value = 1; +} + +service TestService { + rpc ServerSide(Request) returns (stream Response) {} + rpc ClientSide(stream Request) returns (Response) {} + rpc Bidirectional(stream Request) returns (stream Response) {} +} diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp new file mode 100644 index 000000000..31b8c575d --- /dev/null +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -0,0 +1,211 @@ +#include + +#include +#include +#include + +#include +#include +#include + +#include "rpp_trompeloil.hpp" + +struct async_interface : public trompeloeil::mock_interface +{ + IMPLEMENT_MOCK3(ServerSide); + IMPLEMENT_MOCK3(ClientSide); + IMPLEMENT_MOCK2(Bidirectional); +}; + +struct ClientCallbackReaderWriter : public trompeloeil::mock_interface> +{ + using grpc::ClientCallbackReaderWriter::BindReactor; + + IMPLEMENT_MOCK0(StartCall); + IMPLEMENT_MOCK2(Write); + IMPLEMENT_MOCK0(WritesDone); + IMPLEMENT_MOCK1(Read); + IMPLEMENT_MOCK1(AddHold); + IMPLEMENT_MOCK0(RemoveHold); +}; + +struct ClientCallbackReader : public trompeloeil::mock_interface> +{ + using grpc::ClientCallbackReader::BindReactor; + + IMPLEMENT_MOCK0(StartCall); + IMPLEMENT_MOCK1(Read); + IMPLEMENT_MOCK1(AddHold); + IMPLEMENT_MOCK0(RemoveHold); +}; + +struct ClientCallbackWriter : public trompeloeil::mock_interface> +{ + using grpc::ClientCallbackWriter::BindReactor; + + IMPLEMENT_MOCK0(StartCall); + MAKE_MOCK2(Write, void(const Request* req, grpc::WriteOptions options), override); + IMPLEMENT_MOCK0(WritesDone); + IMPLEMENT_MOCK1(AddHold); + IMPLEMENT_MOCK0(RemoveHold); +}; + +TEST_CASE("async client can be casted to rppgrpc") +{ + grpc::ClientContext ctx{}; + Response* resp{}; + + rpp::subjects::publish_subject subj{}; + rpp::subjects::publish_subject out_subj{}; + + async_interface stub_mock{}; + mock_observer_strategy mock{}; + out_subj.get_observable() | rpp::ops::map([](const Response& out) { return out.value(); }) | rpp::ops::subscribe(mock); + + + auto validate_write = [&](auto& stream_mock, auto*& reactor) { + SECTION("write to stream") + { + for (auto v : {10, 3, 15, 20}) + { + Request request{}; + request.set_value(v); + + REQUIRE_CALL(stream_mock, Write(trompeloeil::_, trompeloeil::_)).WITH(_1->value() == v); + + subj.get_observer().on_next(request); + reactor->OnWriteDone(true); + } + } + SECTION("write failed") + { + CHECK(mock.get_on_error_count() == 0); + reactor->OnWriteDone(false); + reactor = nullptr; + CHECK(mock.get_on_error_count() == 1); + } + }; + + auto validate_read = [&](auto& stream_mock, auto*& reactor) { + SECTION("read from stream") + { + std::vector expected_values{}; + for (auto v : {3, 5, 2, 1}) + { + resp->set_value(v); + CHECK(mock.get_received_values() == expected_values); + + REQUIRE_CALL(stream_mock, Read(trompeloeil::_)); + reactor->OnReadDone(true); + + expected_values.push_back(v); + CHECK(mock.get_received_values() == expected_values); + } + } + SECTION("read failed") + { + CHECK(mock.get_on_error_count() == 0); + reactor->OnReadDone(false); + reactor = nullptr; + CHECK(mock.get_on_error_count() == 1); + } + }; + + SECTION("bidirectional") + { + grpc::ClientBidiReactor<::Request, ::Response>* reactor{}; + + ClientCallbackReaderWriter stream_mock{}; + trompeloeil::sequence stream_sequence{}; + + REQUIRE_CALL(stub_mock, Bidirectional(trompeloeil::_, trompeloeil::_)).LR_SIDE_EFFECT({ + reactor = _2; + CHECK(_1 == &ctx); + stream_mock.BindReactor(reactor); + }); + + const auto temp_reactor = new rppgrpc::client_bidi_reactor(); + stub_mock.Bidirectional(&ctx, temp_reactor); + temp_reactor->get_observable().subscribe(out_subj.get_observer()); + subj.get_observable().subscribe(temp_reactor->get_observer()); + + REQUIRE_CALL(stream_mock, StartCall()).IN_SEQUENCE(stream_sequence); + REQUIRE_CALL(stream_mock, Read(trompeloeil::_)).LR_SIDE_EFFECT({ resp = _1; }).IN_SEQUENCE(stream_sequence); + + temp_reactor->init(); + + validate_write(stream_mock, reactor); + validate_read(stream_mock, reactor); + + if (reactor) + { + reactor->OnDone(grpc::Status::OK); + } + } + // SECTION("server-side") + // { + // grpc::ClientReadReactor* reactor{}; + + // ClientCallbackReader stream_mock{}; + // trompeloeil::sequence stream_sequence{}; + + // REQUIRE_CALL(stream_mock, StartCall()).IN_SEQUENCE(stream_sequence); + // REQUIRE_CALL(stream_mock, Read(trompeloeil::_)).LR_SIDE_EFFECT({ resp = _1; }).IN_SEQUENCE(stream_sequence); + + // Request message{}; + + // REQUIRE_CALL(stub_mock, ServerSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).LR_SIDE_EFFECT({ + // reactor = _3; + // CHECK(_2 == &message); + // CHECK(_1 == &ctx); + // stream_mock.BindReactor(reactor); + // }); + + // rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ServerSide, stub_mock, &ctx, &message, out_subj.get_observer()); + + // validate_read(stream_mock, reactor); + + // if (reactor) + // { + // reactor->OnDone(grpc::Status::OK); + // } + // } + // SECTION("client-side") + // { + // grpc::ClientWriteReactor<::Request>* reactor{}; + + // ClientCallbackWriter stream_mock{}; + // trompeloeil::sequence stream_sequence{}; + + // REQUIRE_CALL(stream_mock, StartCall()).IN_SEQUENCE(stream_sequence); + + // REQUIRE_CALL(stub_mock, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).LR_SIDE_EFFECT({ + // reactor = _3; + // resp = _2; + // CHECK(_1 == &ctx); + // stream_mock.BindReactor(reactor); + // }); + + // rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ClientSide, stub_mock, &ctx, subj.get_observable(), out_subj.get_observer()); + + // validate_write(stream_mock, reactor); + + // SECTION("get response") + // { + // CHECK(resp); + // resp->set_value(30); + + // REQUIRE_CALL(stream_mock, WritesDone()); + // subj.get_observer().on_completed(); + // CHECK(mock.get_total_on_next_count() == 0); + // reactor->OnDone(grpc::Status::OK); + // reactor = nullptr; + + // CHECK(mock.get_received_values() == std::vector{30}); + // } + // if (reactor) + // { + // reactor->OnDone(grpc::Status::OK); + // } + // } +} From 00ad5dcfaaa718d621696a69cd50ea4773a8fb51 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 8 Jun 2024 19:54:44 +0300 Subject: [PATCH 02/20] fixes --- Readme.md | 35 ----------------------------------- cmake/dependencies.cmake | 1 - 2 files changed, 36 deletions(-) diff --git a/Readme.md b/Readme.md index 23d6de960..ab2190494 100644 --- a/Readme.md +++ b/Readme.md @@ -32,7 +32,6 @@ rpp::source::from_callable(&::getchar) There we are creating observable (soure of emissions/values/data) to emit value via invoking of `getchar` function, `repeat`-ing it infinite amount of time till termination event happening. It emits values while symbol is not equal to `0`, takeing only **not** digits, maping them to upper case and then just printing to console. -<<<<<<< HEAD Also RPP supports QT out of box. Checkout [RPPQT reference](https://victimsnino.github.io/ReactivePlusPlus/v2/docs/html/group__rppqt.html). For example: ```cpp @@ -48,40 +47,6 @@ rppqt::source::from_signal(*button, &QPushButton::clicked); // <------ react on { clicks_count_label->setText(QString{"Clicked %1 times in total!"}.arg(clicks)); }); -======= -## QT -Also it supports QT out of box. Checkout [RPPQT reference](https://victimsnino.github.io/ReactivePlusPlus/v2/docs/html/group__rppqt.html) - -```cpp -auto button = new QPushButton("Click me!"); -auto label = new QLabel(); - -rppqt::source::from_signal(*button, &QPushButton::clicked); - | rpp::operators::observe_on(rpp::schedulers::new_thread{}) - // some heavy job - | rpp::operators::scan(0, [](int seed, auto) { return ++seed; }) - | rpp::operators::observe_on(rppqt::schedulers::main_thread_scheduler{}) // <--- go back to main QT scheduler - | rpp::operators::subscribe([&clicks_count_label](int clicks) - { - clicks_count_label->setText(QString{"Clicked %1 times in total!"}.arg(clicks)); - }); -``` - - - - -## Why do you need it? - -Check the [User Guide](https://victimsnino.github.io/ReactivePlusPlus/v2/docs/html/md_docs_2readme.html) for a detailed overview of the Reactive Programming concept and RPP itself. - -In short, RPP can help you build complex pipelines to distribute values over time, connect "some sources of data" without directly connecting them. - -Take a look at the example code for QT. Here, you can see how to connect a button to a label and update it based on the number of clicks. -```cpp -auto button = new QPushButton("Click me!"); -auto clicks_count_label = new QLabel(); -rppqt::source::from_signal(*button_1, &QPushButton::clicked); ->>>>>>> 9561a40d (all ine one) ``` ## What about existing Reactive Extension libraries for C++? diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index 26c018cb6..8bad63ed2 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -106,7 +106,6 @@ if (RPP_BUILD_GRPC_CODE AND (RPP_BUILD_TESTS OR RPP_BUILD_EXAMPLES)) set_target_properties(${TARGET} PROPERTIES CXX_CPPCHECK "") endmacro() endif() ->>>>>>> 9561a40d (all ine one) # ==================== RXCPP ======================= if (RPP_BUILD_RXCPP AND RPP_BUILD_BENCHMARKS) From c4d4a9487766fef29934627e874996934e6b406d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 8 Jun 2024 22:10:21 +0300 Subject: [PATCH 03/20] fixes --- .../rppgrpc/rppgrpc/client_reactor.hpp | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index 7e791fb63..10cfcca61 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -181,10 +181,18 @@ namespace rppgrpc Base::StartWrite(&m_write.front()); }, [this](const std::exception_ptr&) { - Base::StartWritesDone(); + std::lock_guard lock{m_write_mutex}; + m_finished = true; + + if (m_write.size() == 0) + Base::StartWritesDone(); }, [this]() { - Base::StartWritesDone(); + std::lock_guard lock{m_write_mutex}; + m_finished = true; + + if (m_write.size() == 0) + Base::StartWritesDone(); }); } @@ -211,11 +219,8 @@ namespace rppgrpc void OnReadDone(bool ok) override { if (!ok) - { - m_requests.get_disposable().dispose(); - m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnReadDone is not ok"})); return; - } + m_observer.get_observer().on_next(m_read); Base::StartRead(&m_read); } @@ -223,11 +228,7 @@ namespace rppgrpc void OnWriteDone(bool ok) override { if (!ok) - { - m_requests.get_disposable().dispose(); - m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); return; - } std::lock_guard lock{m_write_mutex}; m_write.pop_front(); @@ -236,6 +237,10 @@ namespace rppgrpc { Base::StartWrite(&m_write.front()); } + else if (m_finished) + { + Base::StartWritesDone(); + } } void OnDone(const grpc::Status& s) override @@ -261,5 +266,6 @@ namespace rppgrpc std::mutex m_write_mutex{}; std::deque m_write{}; + bool m_finished{}; }; } // namespace rppgrpc From 8758c7edc9448abaf72f5f523958b7664b7c6658 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 8 Jun 2024 22:10:30 +0300 Subject: [PATCH 04/20] tests --- src/tests/rppgrpc/test_async_client.cpp | 310 ++++++++++++++++-------- 1 file changed, 213 insertions(+), 97 deletions(-) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index 31b8c575d..9bfdf8151 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -4,144 +4,258 @@ #include #include +#include #include #include #include #include "rpp_trompeloil.hpp" -struct async_interface : public trompeloeil::mock_interface +#include + +struct service : public trompeloeil::mock_interface { IMPLEMENT_MOCK3(ServerSide); IMPLEMENT_MOCK3(ClientSide); IMPLEMENT_MOCK2(Bidirectional); }; -struct ClientCallbackReaderWriter : public trompeloeil::mock_interface> +void wait(const std::unique_ptr& e) { - using grpc::ClientCallbackReaderWriter::BindReactor; - - IMPLEMENT_MOCK0(StartCall); - IMPLEMENT_MOCK2(Write); - IMPLEMENT_MOCK0(WritesDone); - IMPLEMENT_MOCK1(Read); - IMPLEMENT_MOCK1(AddHold); - IMPLEMENT_MOCK0(RemoveHold); -}; + const auto start = std::chrono::system_clock::now(); + while (!e->is_satisfied() && std::chrono::system_clock::now() - start < std::chrono::seconds{1}) + { + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + } +} -struct ClientCallbackReader : public trompeloeil::mock_interface> +TEST_CASE("async client reactor") { - using grpc::ClientCallbackReader::BindReactor; + grpc::ServerBuilder builder{}; + trompeloeil::sequence s{}; - IMPLEMENT_MOCK0(StartCall); - IMPLEMENT_MOCK1(Read); - IMPLEMENT_MOCK1(AddHold); - IMPLEMENT_MOCK0(RemoveHold); -}; + auto mock_service = std::make_unique(); -struct ClientCallbackWriter : public trompeloeil::mock_interface> -{ - using grpc::ClientCallbackWriter::BindReactor; + builder.RegisterService(mock_service.get()); - IMPLEMENT_MOCK0(StartCall); - MAKE_MOCK2(Write, void(const Request* req, grpc::WriteOptions options), override); - IMPLEMENT_MOCK0(WritesDone); - IMPLEMENT_MOCK1(AddHold); - IMPLEMENT_MOCK0(RemoveHold); -}; + auto server(builder.BuildAndStart()); + const auto channel = server->InProcessChannel({}); -TEST_CASE("async client can be casted to rppgrpc") -{ - grpc::ClientContext ctx{}; - Response* resp{}; + const auto stub = TestService::NewStub(channel, {}); + + rpp::subjects::publish_subject subj{}; + mock_observer out_mock{}; - rpp::subjects::publish_subject subj{}; - rpp::subjects::publish_subject out_subj{}; - async_interface stub_mock{}; - mock_observer_strategy mock{}; - out_subj.get_observable() | rpp::ops::map([](const Response& out) { return out.value(); }) | rpp::ops::subscribe(mock); + SECTION("bidirectional") + { + grpc::ClientContext ctx{}; + const auto bidi_reactor = new rppgrpc::client_bidi_reactor(); + bidi_reactor->get_observable() | rpp::ops::map([](const Response& out) { return out.value(); }) | rpp::ops::subscribe(out_mock); + subj.get_observable() | rpp::ops::map([](int v) { Request request{}; request.set_value(v); return request; }) | rpp::ops::subscribe(bidi_reactor->get_observer()); - auto validate_write = [&](auto& stream_mock, auto*& reactor) { - SECTION("write to stream") - { - for (auto v : {10, 3, 15, 20}) - { - Request request{}; - request.set_value(v); - REQUIRE_CALL(stream_mock, Write(trompeloeil::_, trompeloeil::_)).WITH(_1->value() == v); + stub->async()->Bidirectional(&ctx, bidi_reactor); + SECTION("no stream job - completion") + { + const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::OK).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + bidi_reactor->init(); - subj.get_observer().on_next(request); - reactor->OnWriteDone(true); - } + wait(initial_call); } - SECTION("write failed") + SECTION("error status - error") { - CHECK(mock.get_on_error_count() == 0); - reactor->OnWriteDone(false); - reactor = nullptr; - CHECK(mock.get_on_error_count() == 1); + const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::CANCELLED).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + bidi_reactor->init(); + + wait(initial_call); } - }; + SECTION("manual client-side completion") + { + const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Request request{}; + while (_2->Read(&request)) + { + } + }); + + bidi_reactor->init(); - auto validate_read = [&](auto& stream_mock, auto*& reactor) { - SECTION("read from stream") + const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); + + wait(completed); + } + SECTION("client-side write + completion") { - std::vector expected_values{}; - for (auto v : {3, 5, 2, 1}) - { - resp->set_value(v); - CHECK(mock.get_received_values() == expected_values); - - REQUIRE_CALL(stream_mock, Read(trompeloeil::_)); - reactor->OnReadDone(true); - - expected_values.push_back(v); - CHECK(mock.get_received_values() == expected_values); - } + std::promise> results{}; + const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + std::vector reads{}; + Request request{}; + while (_2->Read(&request)) + { + reads.push_back(request.value()); + } + results.set_value(reads); + }); + + bidi_reactor->init(); + subj.get_observer().on_next(1); + subj.get_observer().on_next(2); + + const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); + + auto f = results.get_future(); + REQUIRE(f.wait_for(std::chrono::seconds{1}) == std::future_status::ready); + CHECK(f.get() == std::vector{1, 2}); + + wait(completed); } - SECTION("read failed") + SECTION("client-side read + completion") { - CHECK(mock.get_on_error_count() == 0); - reactor->OnReadDone(false); - reactor = nullptr; - CHECK(mock.get_on_error_count() == 1); + const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Response response{}; + std::cout << std::this_thread::get_id() << __LINE__ << std::endl; + + for (int v : {1, 2, 3}) + { + std::cout << std::this_thread::get_id() << __LINE__ << std::endl; + response.set_value(v); + _2->Write(response); + } + std::cout << std::this_thread::get_id() << __LINE__ << std::endl; + }); + std::cout << std::this_thread::get_id() << __LINE__ << std::endl; + bidi_reactor->init(); + + std::thread{[&] { + REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s); + const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + + std::cout << std::this_thread::get_id() << __LINE__ << std::endl; + + wait(initial_call); + }}.join(); } - }; + // SECTION("successful read-write") + // { + // REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + // .RETURN(grpc::Status::OK) + // .LR_SIDE_EFFECT({ + // Request request{}; + // while(_2->Read(&request)) { + // Response response{}; + // response.set_value(request.value()*10); + // _2->Write(response); + // } + // }); + + // bidi_reactor->init(); + + // REQUIRE_CALL(*out_mock, on_next_rvalue(10)).IN_SEQUENCE(s); + // subj.get_observer().on_next(1); + + // REQUIRE_CALL(*out_mock, on_next_rvalue(20)).IN_SEQUENCE(s); + // subj.get_observer().on_next(2); + + // const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + // subj.get_observer().on_completed(); + + // wait(completed); + // } + } - SECTION("bidirectional") - { - grpc::ClientBidiReactor<::Request, ::Response>* reactor{}; - ClientCallbackReaderWriter stream_mock{}; - trompeloeil::sequence stream_sequence{}; + // auto validate_write = [&](auto& stream_mock, auto*& reactor) { + // SECTION("write to stream") + // { + // for (auto v : {10, 3, 15, 20}) + // { + // Request request{}; + // request.set_value(v); + + // REQUIRE_CALL(stream_mock, Write(trompeloeil::_, trompeloeil::_)).WITH(_1->value() == v); + + // subj.get_observer().on_next(request); + // reactor->OnWriteDone(true); + // } + // } + // SECTION("write failed") + // { + // CHECK(mock.get_on_error_count() == 0); + // reactor->OnWriteDone(false); + // reactor = nullptr; + // CHECK(mock.get_on_error_count() == 1); + // } + // }; + + // auto validate_read = [&](auto& stream_mock, auto*& reactor) { + // SECTION("read from stream") + // { + // std::vector expected_values{}; + // for (auto v : {3, 5, 2, 1}) + // { + // resp->set_value(v); + // CHECK(mock.get_received_values() == expected_values); + + // REQUIRE_CALL(stream_mock, Read(trompeloeil::_)); + // reactor->OnReadDone(true); + + // expected_values.push_back(v); + // CHECK(mock.get_received_values() == expected_values); + // } + // } + // SECTION("read failed") + // { + // CHECK(mock.get_on_error_count() == 0); + // reactor->OnReadDone(false); + // reactor = nullptr; + // CHECK(mock.get_on_error_count() == 1); + // } + // }; + + // SECTION("bidirectional") + // { + // grpc::ClientBidiReactor<::Request, ::Response>* reactor{}; + + // ClientCallbackReaderWriter stream_mock{}; + // trompeloeil::sequence stream_sequence{}; - REQUIRE_CALL(stub_mock, Bidirectional(trompeloeil::_, trompeloeil::_)).LR_SIDE_EFFECT({ - reactor = _2; - CHECK(_1 == &ctx); - stream_mock.BindReactor(reactor); - }); + // REQUIRE_CALL(stub_mock, Bidirectional(trompeloeil::_, trompeloeil::_)).LR_WITH(_1 == &ctx).LR_SIDE_EFFECT({ + // reactor = _2; + // stream_mock.BindReactor(reactor); + // }); - const auto temp_reactor = new rppgrpc::client_bidi_reactor(); - stub_mock.Bidirectional(&ctx, temp_reactor); - temp_reactor->get_observable().subscribe(out_subj.get_observer()); - subj.get_observable().subscribe(temp_reactor->get_observer()); + // const auto temp_reactor = new rppgrpc::client_bidi_reactor(); + // stub_mock.Bidirectional(&ctx, temp_reactor); + // temp_reactor->get_observable().subscribe(out_subj.get_observer()); + // subj.get_observable().subscribe(temp_reactor->get_observer()); - REQUIRE_CALL(stream_mock, StartCall()).IN_SEQUENCE(stream_sequence); - REQUIRE_CALL(stream_mock, Read(trompeloeil::_)).LR_SIDE_EFFECT({ resp = _1; }).IN_SEQUENCE(stream_sequence); + // REQUIRE_CALL(stream_mock, StartCall()).IN_SEQUENCE(stream_sequence); + // REQUIRE_CALL(stream_mock, Read(trompeloeil::_)).LR_SIDE_EFFECT({ resp = _1; }).IN_SEQUENCE(stream_sequence); - temp_reactor->init(); + // temp_reactor->init(); - validate_write(stream_mock, reactor); - validate_read(stream_mock, reactor); + // validate_write(stream_mock, reactor); + // validate_read(stream_mock, reactor); - if (reactor) - { - reactor->OnDone(grpc::Status::OK); - } - } + // if (reactor) + // { + // reactor->OnDone(grpc::Status::OK); + // } + // } // SECTION("server-side") // { // grpc::ClientReadReactor* reactor{}; @@ -208,4 +322,6 @@ TEST_CASE("async client can be casted to rppgrpc") // reactor->OnDone(grpc::Status::OK); // } // } + server->Shutdown(); + server->Wait(); } From 4086b96fa4d11556cc08d29b7af34d202d039791 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 14 Jun 2024 00:19:46 +0300 Subject: [PATCH 05/20] fix tests --- src/tests/rppgrpc/test_async_client.cpp | 94 ++++++++++++------------- 1 file changed, 44 insertions(+), 50 deletions(-) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index 9bfdf8151..26ac40702 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include @@ -52,7 +54,7 @@ TEST_CASE("async client reactor") grpc::ClientContext ctx{}; const auto bidi_reactor = new rppgrpc::client_bidi_reactor(); - bidi_reactor->get_observable() | rpp::ops::map([](const Response& out) { return out.value(); }) | rpp::ops::subscribe(out_mock); + bidi_reactor->get_observable() | rpp::ops::map([](const Response& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); subj.get_observable() | rpp::ops::map([](int v) { Request request{}; request.set_value(v); return request; }) | rpp::ops::subscribe(bidi_reactor->get_observer()); @@ -121,63 +123,55 @@ TEST_CASE("async client reactor") } SECTION("client-side read + completion") { - const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) - .RETURN(grpc::Status::OK) - .LR_SIDE_EFFECT({ - Response response{}; - std::cout << std::this_thread::get_id() << __LINE__ << std::endl; + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Response response{}; + + for (int v : {1, 2, 3}) + { + response.set_value(v); + _2->Write(response); + } + }); + bidi_reactor->init(); + + REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s); + const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + + wait(completed); + } + SECTION("client-side read-write + completeion") + { + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Request request{}; + while (_2->Read(&request)) + { + Response response{}; + response.set_value(request.value() * 10); + _2->Write(response); + } + }); - for (int v : {1, 2, 3}) - { - std::cout << std::this_thread::get_id() << __LINE__ << std::endl; - response.set_value(v); - _2->Write(response); - } - std::cout << std::this_thread::get_id() << __LINE__ << std::endl; - }); - std::cout << std::this_thread::get_id() << __LINE__ << std::endl; bidi_reactor->init(); - std::thread{[&] { - REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); - REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); - REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s); - const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(10)).IN_SEQUENCE(s); + subj.get_observer().on_next(1); + + REQUIRE_CALL(*out_mock, on_next_rvalue(20)).IN_SEQUENCE(s); + subj.get_observer().on_next(2); - std::cout << std::this_thread::get_id() << __LINE__ << std::endl; + const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); - wait(initial_call); - }}.join(); + wait(completed); } - // SECTION("successful read-write") - // { - // REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) - // .RETURN(grpc::Status::OK) - // .LR_SIDE_EFFECT({ - // Request request{}; - // while(_2->Read(&request)) { - // Response response{}; - // response.set_value(request.value()*10); - // _2->Write(response); - // } - // }); - - // bidi_reactor->init(); - - // REQUIRE_CALL(*out_mock, on_next_rvalue(10)).IN_SEQUENCE(s); - // subj.get_observer().on_next(1); - - // REQUIRE_CALL(*out_mock, on_next_rvalue(20)).IN_SEQUENCE(s); - // subj.get_observer().on_next(2); - - // const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); - // subj.get_observer().on_completed(); - - // wait(completed); - // } } - // auto validate_write = [&](auto& stream_mock, auto*& reactor) { // SECTION("write to stream") // { From 9e937a114752280892d2382d9a480fdbcddd2a7d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 14 Jun 2024 00:21:03 +0300 Subject: [PATCH 06/20] minor updates --- src/tests/rppgrpc/test_async_client.cpp | 44 ++++++++++++++----------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index 26ac40702..9f7de72ad 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -57,7 +57,6 @@ TEST_CASE("async client reactor") bidi_reactor->get_observable() | rpp::ops::map([](const Response& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); subj.get_observable() | rpp::ops::map([](int v) { Request request{}; request.set_value(v); return request; }) | rpp::ops::subscribe(bidi_reactor->get_observer()); - stub->async()->Bidirectional(&ctx, bidi_reactor); SECTION("no stream job - completion") { @@ -67,6 +66,7 @@ TEST_CASE("async client reactor") wait(initial_call); } + SECTION("error status - error") { const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::CANCELLED).IN_SEQUENCE(s); @@ -75,16 +75,17 @@ TEST_CASE("async client reactor") wait(initial_call); } + SECTION("manual client-side completion") { - const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) - .RETURN(grpc::Status::OK) - .LR_SIDE_EFFECT({ - Request request{}; - while (_2->Read(&request)) - { - } - }); + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Request request{}; + while (_2->Read(&request)) + { + } + }); bidi_reactor->init(); @@ -93,20 +94,21 @@ TEST_CASE("async client reactor") wait(completed); } + SECTION("client-side write + completion") { std::promise> results{}; - const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) - .RETURN(grpc::Status::OK) - .LR_SIDE_EFFECT({ - std::vector reads{}; - Request request{}; - while (_2->Read(&request)) - { - reads.push_back(request.value()); - } - results.set_value(reads); - }); + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + std::vector reads{}; + Request request{}; + while (_2->Read(&request)) + { + reads.push_back(request.value()); + } + results.set_value(reads); + }); bidi_reactor->init(); subj.get_observer().on_next(1); @@ -121,6 +123,7 @@ TEST_CASE("async client reactor") wait(completed); } + SECTION("client-side read + completion") { REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) @@ -143,6 +146,7 @@ TEST_CASE("async client reactor") wait(completed); } + SECTION("client-side read-write + completeion") { REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) From 97b852c8558831b8e25c534ca4a42dab5cd136c3 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 14 Jun 2024 00:21:20 +0300 Subject: [PATCH 07/20] temp comment --- src/examples/rppgrpc/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/examples/rppgrpc/CMakeLists.txt b/src/examples/rppgrpc/CMakeLists.txt index f928b4394..36e62e1b1 100644 --- a/src/examples/rppgrpc/CMakeLists.txt +++ b/src/examples/rppgrpc/CMakeLists.txt @@ -1,2 +1,2 @@ -add_subdirectory(communication) +# add_subdirectory(communication) add_subdirectory(doxygen) From 41e6cbded602067a9e53f2adf4ef6c0bfdb0c29f Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 14 Jun 2024 11:35:05 +0300 Subject: [PATCH 08/20] minor --- src/examples/rppgrpc/CMakeLists.txt | 2 +- src/tests/rppgrpc/test_async_client.cpp | 145 ------------------------ 2 files changed, 1 insertion(+), 146 deletions(-) diff --git a/src/examples/rppgrpc/CMakeLists.txt b/src/examples/rppgrpc/CMakeLists.txt index 36e62e1b1..4871b5ba6 100644 --- a/src/examples/rppgrpc/CMakeLists.txt +++ b/src/examples/rppgrpc/CMakeLists.txt @@ -1,2 +1,2 @@ # add_subdirectory(communication) -add_subdirectory(doxygen) +# add_subdirectory(doxygen) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index 9f7de72ad..cd24b299d 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -175,151 +175,6 @@ TEST_CASE("async client reactor") wait(completed); } } - - // auto validate_write = [&](auto& stream_mock, auto*& reactor) { - // SECTION("write to stream") - // { - // for (auto v : {10, 3, 15, 20}) - // { - // Request request{}; - // request.set_value(v); - - // REQUIRE_CALL(stream_mock, Write(trompeloeil::_, trompeloeil::_)).WITH(_1->value() == v); - - // subj.get_observer().on_next(request); - // reactor->OnWriteDone(true); - // } - // } - // SECTION("write failed") - // { - // CHECK(mock.get_on_error_count() == 0); - // reactor->OnWriteDone(false); - // reactor = nullptr; - // CHECK(mock.get_on_error_count() == 1); - // } - // }; - - // auto validate_read = [&](auto& stream_mock, auto*& reactor) { - // SECTION("read from stream") - // { - // std::vector expected_values{}; - // for (auto v : {3, 5, 2, 1}) - // { - // resp->set_value(v); - // CHECK(mock.get_received_values() == expected_values); - - // REQUIRE_CALL(stream_mock, Read(trompeloeil::_)); - // reactor->OnReadDone(true); - - // expected_values.push_back(v); - // CHECK(mock.get_received_values() == expected_values); - // } - // } - // SECTION("read failed") - // { - // CHECK(mock.get_on_error_count() == 0); - // reactor->OnReadDone(false); - // reactor = nullptr; - // CHECK(mock.get_on_error_count() == 1); - // } - // }; - - // SECTION("bidirectional") - // { - // grpc::ClientBidiReactor<::Request, ::Response>* reactor{}; - - // ClientCallbackReaderWriter stream_mock{}; - // trompeloeil::sequence stream_sequence{}; - - // REQUIRE_CALL(stub_mock, Bidirectional(trompeloeil::_, trompeloeil::_)).LR_WITH(_1 == &ctx).LR_SIDE_EFFECT({ - // reactor = _2; - // stream_mock.BindReactor(reactor); - // }); - - // const auto temp_reactor = new rppgrpc::client_bidi_reactor(); - // stub_mock.Bidirectional(&ctx, temp_reactor); - // temp_reactor->get_observable().subscribe(out_subj.get_observer()); - // subj.get_observable().subscribe(temp_reactor->get_observer()); - - // REQUIRE_CALL(stream_mock, StartCall()).IN_SEQUENCE(stream_sequence); - // REQUIRE_CALL(stream_mock, Read(trompeloeil::_)).LR_SIDE_EFFECT({ resp = _1; }).IN_SEQUENCE(stream_sequence); - - // temp_reactor->init(); - - // validate_write(stream_mock, reactor); - // validate_read(stream_mock, reactor); - - // if (reactor) - // { - // reactor->OnDone(grpc::Status::OK); - // } - // } - // SECTION("server-side") - // { - // grpc::ClientReadReactor* reactor{}; - - // ClientCallbackReader stream_mock{}; - // trompeloeil::sequence stream_sequence{}; - - // REQUIRE_CALL(stream_mock, StartCall()).IN_SEQUENCE(stream_sequence); - // REQUIRE_CALL(stream_mock, Read(trompeloeil::_)).LR_SIDE_EFFECT({ resp = _1; }).IN_SEQUENCE(stream_sequence); - - // Request message{}; - - // REQUIRE_CALL(stub_mock, ServerSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).LR_SIDE_EFFECT({ - // reactor = _3; - // CHECK(_2 == &message); - // CHECK(_1 == &ctx); - // stream_mock.BindReactor(reactor); - // }); - - // rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ServerSide, stub_mock, &ctx, &message, out_subj.get_observer()); - - // validate_read(stream_mock, reactor); - - // if (reactor) - // { - // reactor->OnDone(grpc::Status::OK); - // } - // } - // SECTION("client-side") - // { - // grpc::ClientWriteReactor<::Request>* reactor{}; - - // ClientCallbackWriter stream_mock{}; - // trompeloeil::sequence stream_sequence{}; - - // REQUIRE_CALL(stream_mock, StartCall()).IN_SEQUENCE(stream_sequence); - - // REQUIRE_CALL(stub_mock, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).LR_SIDE_EFFECT({ - // reactor = _3; - // resp = _2; - // CHECK(_1 == &ctx); - // stream_mock.BindReactor(reactor); - // }); - - // rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ClientSide, stub_mock, &ctx, subj.get_observable(), out_subj.get_observer()); - - // validate_write(stream_mock, reactor); - - // SECTION("get response") - // { - // CHECK(resp); - // resp->set_value(30); - - // REQUIRE_CALL(stream_mock, WritesDone()); - // subj.get_observer().on_completed(); - // CHECK(mock.get_total_on_next_count() == 0); - // reactor->OnDone(grpc::Status::OK); - // reactor = nullptr; - - // CHECK(mock.get_received_values() == std::vector{30}); - // } - // if (reactor) - // { - // reactor->OnDone(grpc::Status::OK); - // } - // } server->Shutdown(); server->Wait(); } From 7c2db09c858c23c47a73b0ca875b37571672dace Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 16 Jun 2024 22:21:04 +0300 Subject: [PATCH 09/20] one more attempt --- src/tests/rppgrpc/test_async_client.cpp | 33 ++++++++++++------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index cd24b299d..ab8205b22 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -24,10 +24,9 @@ struct service : public trompeloeil::mock_interface void wait(const std::unique_ptr& e) { - const auto start = std::chrono::system_clock::now(); - while (!e->is_satisfied() && std::chrono::system_clock::now() - start < std::chrono::seconds{1}) + while (!e->is_satisfied()) { - std::this_thread::sleep_for(std::chrono::milliseconds{10}); + std::this_thread::yield(); } } @@ -60,20 +59,20 @@ TEST_CASE("async client reactor") stub->async()->Bidirectional(&ctx, bidi_reactor); SECTION("no stream job - completion") { - const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::OK).IN_SEQUENCE(s); - REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::OK).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); bidi_reactor->init(); - wait(initial_call); + wait(last); } SECTION("error status - error") { - const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::CANCELLED).IN_SEQUENCE(s); - REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::CANCELLED).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); bidi_reactor->init(); - wait(initial_call); + wait(last); } SECTION("manual client-side completion") @@ -89,10 +88,10 @@ TEST_CASE("async client reactor") bidi_reactor->init(); - const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); subj.get_observer().on_completed(); - wait(completed); + wait(last); } SECTION("client-side write + completion") @@ -114,14 +113,14 @@ TEST_CASE("async client reactor") subj.get_observer().on_next(1); subj.get_observer().on_next(2); - const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); subj.get_observer().on_completed(); auto f = results.get_future(); REQUIRE(f.wait_for(std::chrono::seconds{1}) == std::future_status::ready); CHECK(f.get() == std::vector{1, 2}); - wait(completed); + wait(last); } SECTION("client-side read + completion") @@ -142,9 +141,9 @@ TEST_CASE("async client reactor") REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s); - const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); - wait(completed); + wait(last); } SECTION("client-side read-write + completeion") @@ -169,10 +168,10 @@ TEST_CASE("async client reactor") REQUIRE_CALL(*out_mock, on_next_rvalue(20)).IN_SEQUENCE(s); subj.get_observer().on_next(2); - const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); subj.get_observer().on_completed(); - wait(completed); + wait(last); } } server->Shutdown(); From e2d3a9ea5f2b67c11aa0bb40e8fbd0ca8b3d809f Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 16 Jun 2024 23:58:14 +0300 Subject: [PATCH 10/20] rever --- CMakePresets.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakePresets.json b/CMakePresets.json index ff76dd7e8..50eaa16eb 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -190,7 +190,7 @@ }, { "name": "ci-sanitize-tsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=thread -g -O1" } From 138d47f66ce0688e49cfef9ef6221d90ea8f01f4 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 17 Jun 2024 13:54:34 +0300 Subject: [PATCH 11/20] Update test_async_client.cpp --- src/tests/rppgrpc/test_async_client.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index ab8205b22..97f129fe6 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -26,7 +26,7 @@ void wait(const std::unique_ptr& e) { while (!e->is_satisfied()) { - std::this_thread::yield(); + std::this_thread::sleep_for(std::chrono::nanoseconds{100}); } } From 78fbea7ac208e1a82855d173b00b708d904712f7 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 18 Jun 2024 17:30:25 +0300 Subject: [PATCH 12/20] Update test_async_client.cpp --- src/tests/rppgrpc/test_async_client.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index 97f129fe6..62013cca5 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -26,7 +26,7 @@ void wait(const std::unique_ptr& e) { while (!e->is_satisfied()) { - std::this_thread::sleep_for(std::chrono::nanoseconds{100}); + std::this_thread::sleep_for(std::chrono::seconds{1}); } } From ab1dadd2215ea071a2a0547a57efe5332aa7db32 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 19 Jun 2024 09:46:03 +0300 Subject: [PATCH 13/20] Update test_async_client.cpp --- src/tests/rppgrpc/test_async_client.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index 62013cca5..bb866090d 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -108,8 +108,9 @@ TEST_CASE("async client reactor") } results.set_value(reads); }); - - bidi_reactor->init(); + + auto t = std::thread{[&]{ bidi_reactor->init();}}; + subj.get_observer().on_next(1); subj.get_observer().on_next(2); @@ -121,6 +122,7 @@ TEST_CASE("async client reactor") CHECK(f.get() == std::vector{1, 2}); wait(last); + t.join(); } SECTION("client-side read + completion") From 88866312cfa445b6eb95e741786f51c95bf5c8ec Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 19 Jun 2024 06:46:44 +0000 Subject: [PATCH 14/20] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tests/rppgrpc/test_async_client.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index bb866090d..01b0335dd 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -108,9 +108,11 @@ TEST_CASE("async client reactor") } results.set_value(reads); }); - - auto t = std::thread{[&]{ bidi_reactor->init();}}; - + + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; + subj.get_observer().on_next(1); subj.get_observer().on_next(2); From da08d0ddf5d400a3e531cbec0b929a5143991c6f Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 19 Jun 2024 13:06:51 +0300 Subject: [PATCH 15/20] Update test_async_client.cpp --- src/tests/rppgrpc/test_async_client.cpp | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index 01b0335dd..490dcf10e 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -61,18 +61,24 @@ TEST_CASE("async client reactor") { REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::OK).IN_SEQUENCE(s); const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); - bidi_reactor->init(); + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; wait(last); + t.join(); } SECTION("error status - error") { REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::CANCELLED).IN_SEQUENCE(s); const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); - bidi_reactor->init(); + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; wait(last); + t.join(); } SECTION("manual client-side completion") @@ -86,12 +92,15 @@ TEST_CASE("async client reactor") } }); - bidi_reactor->init(); + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); subj.get_observer().on_completed(); wait(last); + t.join(); } SECTION("client-side write + completion") @@ -140,7 +149,9 @@ TEST_CASE("async client reactor") _2->Write(response); } }); - bidi_reactor->init(); + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); @@ -148,6 +159,7 @@ TEST_CASE("async client reactor") const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); wait(last); + t.join(); } SECTION("client-side read-write + completeion") @@ -164,7 +176,9 @@ TEST_CASE("async client reactor") } }); - bidi_reactor->init(); + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; REQUIRE_CALL(*out_mock, on_next_rvalue(10)).IN_SEQUENCE(s); subj.get_observer().on_next(1); @@ -176,6 +190,7 @@ TEST_CASE("async client reactor") subj.get_observer().on_completed(); wait(last); + t.join(); } } server->Shutdown(); From 19c53b79f58e1b64347ed472dac374d047a0104e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 19 Jun 2024 10:07:20 +0000 Subject: [PATCH 16/20] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tests/rppgrpc/test_async_client.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index 490dcf10e..9b6489b54 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -61,7 +61,7 @@ TEST_CASE("async client reactor") { REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::OK).IN_SEQUENCE(s); const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); - auto t = std::thread{[&] { + auto t = std::thread{[&] { bidi_reactor->init(); }}; @@ -73,7 +73,7 @@ TEST_CASE("async client reactor") { REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::CANCELLED).IN_SEQUENCE(s); const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); - auto t = std::thread{[&] { + auto t = std::thread{[&] { bidi_reactor->init(); }}; From bddd7869878785a56ccbefa5e90f24871dfc3adf Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 19 Jun 2024 23:27:47 +0300 Subject: [PATCH 17/20] extend tests --- .../rppgrpc/rppgrpc/client_reactor.hpp | 286 +++++++++--------- src/tests/rppgrpc/test_async_client.cpp | 219 +++++++++++++- 2 files changed, 361 insertions(+), 144 deletions(-) diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index 10cfcca61..26f0a8a00 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -20,149 +20,6 @@ #include -namespace rppgrpc::details -{ - // template - // class client_write_reactor final : public grpc::ClientWriteReactor - // { - // using Response = rpp::utils::extract_observer_type_t; - // using Base = grpc::ClientWriteReactor; - - // public: - // template Observable, rpp::constraint::decayed_same_as TObserver> - // client_write_reactor(const Observable& messages, TObserver&& events, Response*& ptr_to_write_response) - // : m_observer{std::forward(events)} - // , m_disposable{messages.subscribe_with_disposable([this] T>(T&& message) { - // std::lock_guard lock{m_write_mutex}; - // m_write.push_back(std::forward(message)); - // if (m_write.size() == 1) - // Base::StartWrite(&m_write.front()); }, - // [this](const std::exception_ptr&) { - // // Base::StartWritesDone(); - // context_.TryCancel(); - // }, - // [this]() { - // Base::StartWritesDone(); - // })} - // { - // ptr_to_write_response = &m_read; - // } - - // void Init() - // { - // Base::StartCall(); - // } - - // private: - // void OnWriteDone(bool ok) override - // { - // if (!ok) - // { - // m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); - // context_.TryCancel(); - // return; - // } - - // std::lock_guard lock{m_write_mutex}; - // m_write.pop_front(); - - // if (!m_write.empty()) - // { - // Base::StartWrite(&m_write.front()); - // } - // } - - // void OnDone(const grpc::Status& s) override - // { - // if (s.ok()) - // { - // m_observer.on_next(std::move(m_read)); - // m_observer.on_completed(); - // } - // else - // { - // m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); - // } - // Destroy(); - // } - - // private: - // void Destroy() - // { - // m_disposable.dispose(); - // delete this; - // } - - // private: - // Observer m_observer; - // rpp::disposable_wrapper m_disposable; - - // Response m_read{}; - - // std::mutex m_write_mutex{}; - // std::list m_write{}; - // }; - - // template - // class client_read_reactor final : public grpc::ClientReadReactor> - // { - // using Response = rpp::utils::extract_observer_type_t; - // using Base = grpc::ClientReadReactor; - - // public: - // template TObserver> - // requires (!rpp::constraint::decayed_same_as>) - // explicit client_read_reactor(TObserver&& events) - // : m_observer{std::forward(events)} - // { - // } - - // client_read_reactor(const client_read_reactor&) = delete; - // client_read_reactor(client_read_reactor&&) = delete; - - // void Init() - // { - // Base::StartCall(); - // Base::StartRead(&m_read); - // } - - // private: - // void OnReadDone(bool ok) override - // { - // if (!ok) - // { - // m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnReadDone is not ok"})); - // context_.TryCancel(); - // return; - // } - // m_observer.on_next(m_read); - // Base::StartRead(&m_read); - // } - - // void OnDone(const grpc::Status& s) override - // { - // if (s.ok()) - // { - // m_observer.on_completed(); - // } - // else - // { - // m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); - // } - // Destroy(); - // } - - // private: - // void Destroy() - // { - // delete this; - // } - - // private: - // Observer m_observer; - // Response m_read{}; - // }; -} // namespace rppgrpc::details namespace rppgrpc { template @@ -268,4 +125,147 @@ namespace rppgrpc std::deque m_write{}; bool m_finished{}; }; + + template + class client_write_reactor final : public grpc::ClientWriteReactor + { + using Base = grpc::ClientWriteReactor; + + public: + client_write_reactor() + { + m_requests.get_observable().subscribe( + [this] T>(T&& message) { + std::lock_guard lock{m_write_mutex}; + m_write.push_back(std::forward(message)); + if (m_write.size() == 1) + Base::StartWrite(&m_write.front()); + }, + [this](const std::exception_ptr&) { + std::lock_guard lock{m_write_mutex}; + m_finished = true; + + if (m_write.size() == 0) + Base::StartWritesDone(); + }, + [this]() { + std::lock_guard lock{m_write_mutex}; + m_finished = true; + + if (m_write.size() == 0) + Base::StartWritesDone(); + }); + } + + void init() + { + Base::StartCall(); + } + + auto get_observer() + { + return m_requests.get_observer(); + } + + auto get_observable() + { + return m_observer.get_observable(); + } + + private: + using Base::StartCall; + + void OnWriteDone(bool ok) override + { + if (!ok) + return; + + std::lock_guard lock{m_write_mutex}; + m_write.pop_front(); + + if (!m_write.empty()) + { + Base::StartWrite(&m_write.front()); + } + else if (m_finished) + { + Base::StartWritesDone(); + } + } + + void OnDone(const grpc::Status& s) override + { + m_requests.get_disposable().dispose(); + + if (s.ok()) + { + m_observer.get_observer().on_completed(); + } + else + { + m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); + } + delete this; + } + + private: + rpp::subjects::serialized_publish_subject m_requests{}; + rpp::subjects::publish_subject m_observer; + + std::mutex m_write_mutex{}; + std::deque m_write{}; + bool m_finished{}; + }; + + template + class client_read_reactor final : public grpc::ClientReadReactor + { + using Base = grpc::ClientReadReactor; + + public: + client_read_reactor() + { + } + + void init() + { + Base::StartCall(); + Base::StartRead(&m_read); + } + + auto get_observable() + { + return m_observer.get_observable(); + } + + private: + using Base::StartCall; + using Base::StartRead; + + void OnReadDone(bool ok) override + { + if (!ok) + return; + + m_observer.get_observer().on_next(m_read); + Base::StartRead(&m_read); + } + + void OnDone(const grpc::Status& s) override + { + if (s.ok()) + { + m_observer.get_observer().on_completed(); + } + else + { + m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); + } + delete this; + } + + private: + rpp::subjects::publish_subject m_observer; + Response m_read{}; + }; } // namespace rppgrpc diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index 9b6489b54..880314d96 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -47,7 +47,6 @@ TEST_CASE("async client reactor") rpp::subjects::publish_subject subj{}; mock_observer out_mock{}; - SECTION("bidirectional") { grpc::ClientContext ctx{}; @@ -81,6 +80,24 @@ TEST_CASE("async client reactor") t.join(); } + SECTION("manual server-side cancel") + { + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + ctx.TryCancel(); + }); + + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + + wait(last); + t.join(); + } + SECTION("manual client-side completion") { REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) @@ -193,6 +210,206 @@ TEST_CASE("async client reactor") t.join(); } } + SECTION("server-side") + { + grpc::ClientContext ctx{}; + + const auto read_reactor = new rppgrpc::client_read_reactor(); + read_reactor->get_observable() | rpp::ops::map([](const Response& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + + SECTION("empty request") + { + stub->async()->ServerSide(&ctx, nullptr, read_reactor); + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + auto t = std::thread{[&] { + read_reactor->init(); + }}; + + wait(last); + t.join(); + } + SECTION("normal request") + { + Request r{}; + stub->async()->ServerSide(&ctx, &r, read_reactor); + + SECTION("no stream job - completion") + { + REQUIRE_CALL(*mock_service, ServerSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::OK).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + auto t = std::thread{[&] { + read_reactor->init(); + }}; + + wait(last); + t.join(); + } + + SECTION("error status - error") + { + REQUIRE_CALL(*mock_service, ServerSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::CANCELLED).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + auto t = std::thread{[&] { + read_reactor->init(); + }}; + + wait(last); + t.join(); + } + + SECTION("manual server-side cancel") + { + REQUIRE_CALL(*mock_service, ServerSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + ctx.TryCancel(); + }); + + auto t = std::thread{[&] { + read_reactor->init(); + }}; + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + + wait(last); + t.join(); + } + + SECTION("client-side read + completion") + { + REQUIRE_CALL(*mock_service, ServerSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Response response{}; + + for (int v : {1, 2, 3}) + { + response.set_value(v); + _3->Write(response); + } + }); + auto t = std::thread{[&] { + read_reactor->init(); + }}; + + REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + + wait(last); + t.join(); + } + } + } + SECTION("client-side") + { + grpc::ClientContext ctx{}; + + const auto write_reactor = new rppgrpc::client_write_reactor(); + write_reactor->get_observable() | rpp::ops::map([](const rpp::utils::none& out) { return 0; }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + subj.get_observable() | rpp::ops::map([](int v) { Request request{}; request.set_value(v); return request; }) | rpp::ops::subscribe(write_reactor->get_observer()); + + Response r{}; + stub->async()->ClientSide(&ctx, &r, write_reactor); + SECTION("no stream job - completion") + { + REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::OK).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + auto t = std::thread{[&] { + write_reactor->init(); + }}; + + wait(last); + t.join(); + } + + SECTION("error status - error") + { + REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::CANCELLED).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + auto t = std::thread{[&] { + write_reactor->init(); + }}; + + wait(last); + t.join(); + } + + SECTION("manual server-side cancel") + { + REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + ctx.TryCancel(); + }); + + auto t = std::thread{[&] { + write_reactor->init(); + }}; + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + + wait(last); + t.join(); + } + + SECTION("manual client-side completion") + { + REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Request request{}; + while (_2->Read(&request)) + { + } + }); + + auto t = std::thread{[&] { + write_reactor->init(); + }}; + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); + + wait(last); + t.join(); + } + + SECTION("client-side write + completion") + { + std::promise> results{}; + REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + std::vector reads{}; + Request request{}; + while (_2->Read(&request)) + { + reads.push_back(request.value()); + } + results.set_value(reads); + }); + + auto t = std::thread{[&] { + write_reactor->init(); + }}; + + subj.get_observer().on_next(1); + subj.get_observer().on_next(2); + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); + + auto f = results.get_future(); + REQUIRE(f.wait_for(std::chrono::seconds{1}) == std::future_status::ready); + CHECK(f.get() == std::vector{1, 2}); + + wait(last); + t.join(); + } + } server->Shutdown(); server->Wait(); } From 3928997eab8315dac135012cfc700fadc9855e4c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 19 Jun 2024 23:45:40 +0300 Subject: [PATCH 18/20] doxygen --- src/examples/rppgrpc/CMakeLists.txt | 2 +- .../rppgrpc/doxygen/client_reactor.cpp | 36 ++++++++++++------- .../rppgrpc/rppgrpc/client_reactor.hpp | 26 ++++++++++++++ src/extensions/rppgrpc/rppgrpc/fwd.hpp | 6 ++++ 4 files changed, 57 insertions(+), 13 deletions(-) diff --git a/src/examples/rppgrpc/CMakeLists.txt b/src/examples/rppgrpc/CMakeLists.txt index 4871b5ba6..36e62e1b1 100644 --- a/src/examples/rppgrpc/CMakeLists.txt +++ b/src/examples/rppgrpc/CMakeLists.txt @@ -1,2 +1,2 @@ # add_subdirectory(communication) -# add_subdirectory(doxygen) +add_subdirectory(doxygen) diff --git a/src/examples/rppgrpc/doxygen/client_reactor.cpp b/src/examples/rppgrpc/doxygen/client_reactor.cpp index 657dc7688..db5562478 100644 --- a/src/examples/rppgrpc/doxygen/client_reactor.cpp +++ b/src/examples/rppgrpc/doxygen/client_reactor.cpp @@ -15,10 +15,14 @@ int main() // NOLINT(bugprone-exception-escape) auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); auto stub = TestService::NewStub(channel); - grpc::ClientContext ctx{}; - rpp::subjects::publish_subject requests{}; - rpp::subjects::publish_subject responses{}; - rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::Bidirectional, *stub->async(), &ctx, requests.get_observable(), responses.get_observer()); + grpc::ClientContext ctx{}; + const auto reactor = new rppgrpc::client_bidi_reactor(); + stub->async()->Bidirectional(&ctx, reactor); + reactor->get_observable().subscribe([](const Response&) {}); + + reactor->init(); + + reactor->get_observer().on_next(Request{}); //! [bidi_reactor] } { @@ -26,10 +30,13 @@ int main() // NOLINT(bugprone-exception-escape) auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); auto stub = TestService::NewStub(channel); - grpc::ClientContext ctx{}; - rpp::subjects::publish_subject responses{}; - Request request{}; - rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ServerSide, *stub->async(), &ctx, &request, responses.get_observer()); + grpc::ClientContext ctx{}; + const auto reactor = new rppgrpc::client_read_reactor(); + Request req{}; + stub->async()->ServerSide(&ctx, &req, reactor); + reactor->get_observable().subscribe([](const Response&) {}); + + reactor->init(); //! [read_reactor] } { @@ -37,10 +44,15 @@ int main() // NOLINT(bugprone-exception-escape) auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); auto stub = TestService::NewStub(channel); - grpc::ClientContext ctx{}; - rpp::subjects::publish_subject requests{}; - rpp::subjects::publish_subject responses{}; - rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ClientSide, *stub->async(), &ctx, requests.get_observable(), responses.get_observer()); + grpc::ClientContext ctx{}; + const auto reactor = new rppgrpc::client_write_reactor(); + Response resp{}; + stub->async()->ClientSide(&ctx, &resp, reactor); + reactor->get_observable().subscribe([](const rpp::utils::none&) {}); + + reactor->init(); + + reactor->get_observer().on_next(Request{}); //! [write_reactor] } diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index 26f0a8a00..76a70c8ea 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -22,6 +22,15 @@ namespace rppgrpc { + /** + * @brief RPP's based implementation for grpc client bidirectional reactor. + * @details To use it you need: + * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - pass it to `stub->async()->GrpcBidirectionalStream(ctx, reactor);` + * - call `reactor->init()` method for actual starting of grpc logic + * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) + * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` + */ template class client_bidi_reactor final : public grpc::ClientBidiReactor { @@ -126,6 +135,15 @@ namespace rppgrpc bool m_finished{}; }; + /** + * @brief RPP's based implementation for grpc client write reactor + * @details To use it you need: + * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - pass it to `stub->async()->GrpcWriteStream(ctx, &request, reactor);` + * - call `reactor->init()` method for actual starting of grpc logic + * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` + * - reactor provides `reactor->get_observable()` method but such as observable emits nothing and can be used only to be notified about completion/error + */ template class client_write_reactor final : public grpc::ClientWriteReactor { @@ -217,6 +235,14 @@ namespace rppgrpc bool m_finished{}; }; + /** + * @brief RPP's based implementation for grpc client read reactor. + * @details To use it you need: + * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - pass it to `stub->async()->GrpcReadStream(ctx, &response, reactor);` + * - call `reactor->init()` method for actual starting of grpc logic + * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) + */ template class client_read_reactor final : public grpc::ClientReadReactor { diff --git a/src/extensions/rppgrpc/rppgrpc/fwd.hpp b/src/extensions/rppgrpc/rppgrpc/fwd.hpp index 03059bd13..d807537f8 100644 --- a/src/extensions/rppgrpc/rppgrpc/fwd.hpp +++ b/src/extensions/rppgrpc/rppgrpc/fwd.hpp @@ -21,4 +21,10 @@ namespace rppgrpc { template class client_bidi_reactor; + + template + class client_write_reactor; + + template + class client_read_reactor; } // namespace rppgrpc From abb2bc869005fd50e6a6427cdb3a78c58ebe3544 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 19 Jun 2024 23:47:22 +0300 Subject: [PATCH 19/20] hide server reactor --- src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp | 2 +- .../rppgrpc/rppgrpc/server_reactor.hpp | 150 ------------------ 2 files changed, 1 insertion(+), 151 deletions(-) diff --git a/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp b/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp index 15153db75..78306d613 100644 --- a/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp +++ b/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp @@ -12,4 +12,4 @@ #include #include -#include +// #include diff --git a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp index 40ddb30e8..44f7689cb 100644 --- a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp @@ -107,157 +107,7 @@ namespace rppgrpc::details std::mutex m_write_mutex{}; std::deque m_write{}; }; - - template - class server_write_reactor final : public grpc::ServerWriteReactor - { - using Base = grpc::ServerWriteReactor; - - public: - template Observable, rpp::constraint::decayed_same_as TObserver> - requires (!rpp::constraint::decayed_same_as>) - server_write_reactor(const Observable& messages, TObserver&& observer) - : m_observer{std::forward(observer)} - , m_disposable{messages.subscribe_with_disposable([this] T>(T&& message) { - std::lock_guard lock{m_write_mutex}; - m_write.push_back(std::forward(message)); - if (m_write.size() == 1) - Base::StartWrite(&m_write.front()); }, - [this](const std::exception_ptr&) { - Base::Finish(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"}); - }, - [this]() { - Base::Finish(grpc::Status::OK); - })} - { - Base::StartSendInitialMetadata(); - } - - - server_write_reactor(const server_write_reactor&) = delete; - server_write_reactor(server_write_reactor&&) = delete; - - private: - void OnWriteDone(bool ok) override - { - if (!ok) - { - m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); - Base::Finish(grpc::Status::CANCELLED); - return; - } - - std::lock_guard lock{m_write_mutex}; - m_write.pop_front(); - - if (!m_write.empty()) - { - Base::StartWrite(&m_write.front()); - } - } - - void OnDone() override - { - m_observer.on_completed(); - Destroy(); - } - - void OnCancel() override - { - m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); - Base::Finish(grpc::Status::CANCELLED); - } - - private: - void Destroy() - { - m_disposable.dispose(); - delete this; - } - - private: - Observer m_observer; - rpp::disposable_wrapper m_disposable; - - std::mutex m_write_mutex{}; - std::deque m_write{}; - }; - - template - class server_reader_reactor final : public grpc::ServerReadReactor> - { - using Request = rpp::utils::extract_observer_type_t; - using Base = grpc::ServerReadReactor; - - public: - template TObserver> - requires (!rpp::constraint::decayed_same_as>) - explicit server_reader_reactor(TObserver&& events) - : m_observer{std::forward(events)} - { - Base::StartSendInitialMetadata(); - Base::StartRead(&m_read); - } - - server_reader_reactor(const server_reader_reactor&) = delete; - server_reader_reactor(server_reader_reactor&&) = delete; - - private: - void OnReadDone(bool ok) override - { - if (!ok) - { - m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnReadDone is not ok"})); - Base::Finish(grpc::Status::CANCELLED); - return; - } - m_observer.on_next(m_read); - Base::StartRead(&m_read); - } - - void OnDone() override - { - m_observer.on_completed(); - Destroy(); - } - - void OnCancel() override - { - m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); - Base::Finish(grpc::Status::CANCELLED); - } - - private: - void Destroy() - { - m_disposable.dispose(); - delete this; - } - - private: - Observer m_observer; - rpp::disposable_wrapper m_disposable; - - Request m_read{}; - }; } // namespace rppgrpc::details namespace rppgrpc { - // template - // auto make_server_reactor(const Observable& responses, Observer&& requests) - // { - // return new details::server_bidi_reactor, std::decay_t>(responses, std::forward(requests)); - // } - - // template - // auto make_server_reactor(Observer&& requests) - // { - // return new details::server_reader_reactor>(std::forward(requests)); - // } - - // template - // auto make_server_reactor(const Observable& responses) - // { - // return new details::server_write_reactor>(responses); - // } } // namespace rppgrpc From 75cd453f3e8fd7962f0aeca4a704eda1dff779e1 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 20 Jun 2024 09:46:19 +0300 Subject: [PATCH 20/20] Delete src/examples/rppgrpc/doxygen/server_reactor.cpp --- src/examples/rppgrpc/doxygen/server_reactor.cpp | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 src/examples/rppgrpc/doxygen/server_reactor.cpp diff --git a/src/examples/rppgrpc/doxygen/server_reactor.cpp b/src/examples/rppgrpc/doxygen/server_reactor.cpp deleted file mode 100644 index e69de29bb..000000000