diff --git a/src/examples/rppgrpc/doxygen/server_reactor.cpp b/src/examples/rppgrpc/doxygen/server_reactor.cpp new file mode 100644 index 000000000..ebfb5fcc8 --- /dev/null +++ b/src/examples/rppgrpc/doxygen/server_reactor.cpp @@ -0,0 +1,52 @@ +#include + +#include +#include + +#include "protocol.grpc.pb.h" + +/** + * \example server_reactor.cpp + **/ + +class Server : public TestService::CallbackService +{ + + //! [read_reactor] + grpc::ServerReadReactor* ClientSide(grpc::CallbackServerContext* /*context*/, Response* /*response*/) override + { + const auto reactor = new rppgrpc::server_read_reactor(); + reactor->get_observable().subscribe([](const Request&) {}, []() { std::cout << "DONE" << std::endl; }); + return reactor; + } + //! [read_reactor] + + //! [bidi_reactor] + grpc::ServerBidiReactor* Bidirectional(grpc::CallbackServerContext* /*context*/) override + { + const auto reactor = new rppgrpc::server_bidi_reactor(); + reactor->get_observable().subscribe([](const Request&) {}, []() { std::cout << "DONE" << std::endl; }); + + reactor->get_observer().on_next(Response{}); + + return reactor; + } + //! [bidi_reactor] + + //! [write_reactor] + grpc::ServerWriteReactor* ServerSide(grpc::CallbackServerContext* /*context*/, const Request* /*request*/) override + { + const auto reactor = new rppgrpc::server_write_reactor(); + reactor->get_observable().subscribe([](rpp::utils::none) {}, []() { std::cout << "DONE" << std::endl; }); + + reactor->get_observer().on_next(Response{}); + + return reactor; + } + //! [write_reactor] +}; + +int main() +{ + return 0; +} diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index 76a70c8ea..2e1ff1cfc 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -12,73 +12,60 @@ #include -#include - #include +#include #include #include -#include - namespace rppgrpc { /** * @brief RPP's based implementation for grpc client bidirectional reactor. * @details To use it you need: - * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - create it via `new` operator * - pass it to `stub->async()->GrpcBidirectionalStream(ctx, reactor);` * - call `reactor->init()` method for actual starting of grpc logic * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` + * + * @snippet client_reactor.cpp bidi_reactor + * */ template class client_bidi_reactor final : public grpc::ClientBidiReactor + , private details::base_writer + , private details::base_reader { using Base = grpc::ClientBidiReactor; public: - client_bidi_reactor() - { - m_requests.get_observable().subscribe( - [this] T>(T&& message) { - std::lock_guard lock{m_write_mutex}; - m_write.push_back(std::forward(message)); - if (m_write.size() == 1) - Base::StartWrite(&m_write.front()); - }, - [this](const std::exception_ptr&) { - std::lock_guard lock{m_write_mutex}; - m_finished = true; - - if (m_write.size() == 0) - Base::StartWritesDone(); - }, - [this]() { - std::lock_guard lock{m_write_mutex}; - m_finished = true; - - if (m_write.size() == 0) - Base::StartWritesDone(); - }); - } + client_bidi_reactor() = default; void init() { Base::StartCall(); - Base::StartRead(&m_read); + details::base_reader::handle_read_done(true); } - auto get_observer() + using details::base_writer::get_observer; + using details::base_reader::get_observable; + + private: + void start_read(Response& data) override { - return m_requests.get_observer(); + Base::StartRead(&data); } - auto get_observable() + void start_write(const Request& v) override { - return m_observer.get_observable(); + Base::StartWrite(&v); + } + + void finish_writes(const grpc::Status&) override + { + Base::StartWritesDone(); } - private: using Base::StartCall; using Base::StartRead; @@ -87,8 +74,7 @@ namespace rppgrpc if (!ok) return; - m_observer.get_observer().on_next(m_read); - Base::StartRead(&m_read); + details::base_reader::handle_read_done(); } void OnWriteDone(bool ok) override @@ -96,101 +82,62 @@ namespace rppgrpc if (!ok) return; - std::lock_guard lock{m_write_mutex}; - m_write.pop_front(); - - if (!m_write.empty()) - { - Base::StartWrite(&m_write.front()); - } - else if (m_finished) - { - Base::StartWritesDone(); - } + details::base_writer::handle_write_done(); } void OnDone(const grpc::Status& s) override { - m_requests.get_disposable().dispose(); - - if (s.ok()) - { - m_observer.get_observer().on_completed(); - } - else - { - m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); - } + details::base_writer::handle_on_done(); + details::base_reader::handle_on_done(s.ok() ? std::exception_ptr{} : std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); delete this; } - - private: - rpp::subjects::serialized_publish_subject m_requests{}; - - rpp::subjects::publish_subject m_observer; - Response m_read{}; - - std::mutex m_write_mutex{}; - std::deque m_write{}; - bool m_finished{}; }; /** * @brief RPP's based implementation for grpc client write reactor * @details To use it you need: - * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - create it via `new` operator * - pass it to `stub->async()->GrpcWriteStream(ctx, &request, reactor);` * - call `reactor->init()` method for actual starting of grpc logic * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` * - reactor provides `reactor->get_observable()` method but such as observable emits nothing and can be used only to be notified about completion/error + * + * @snippet client_reactor.cpp write_reactor + * */ template class client_write_reactor final : public grpc::ClientWriteReactor + , private details::base_writer + , private details::base_reader { using Base = grpc::ClientWriteReactor; public: - client_write_reactor() - { - m_requests.get_observable().subscribe( - [this] T>(T&& message) { - std::lock_guard lock{m_write_mutex}; - m_write.push_back(std::forward(message)); - if (m_write.size() == 1) - Base::StartWrite(&m_write.front()); - }, - [this](const std::exception_ptr&) { - std::lock_guard lock{m_write_mutex}; - m_finished = true; - - if (m_write.size() == 0) - Base::StartWritesDone(); - }, - [this]() { - std::lock_guard lock{m_write_mutex}; - m_finished = true; - - if (m_write.size() == 0) - Base::StartWritesDone(); - }); - } + client_write_reactor() = default; void init() { Base::StartCall(); } - auto get_observer() + using details::base_writer::get_observer; + using details::base_reader::get_observable; + + private: + void start_read(rpp::utils::none& data) override { - return m_requests.get_observer(); } - auto get_observable() + void start_write(const Request& v) override { - return m_observer.get_observable(); + Base::StartWrite(&v); + } + + void finish_writes(const grpc::Status&) override + { + Base::StartWritesDone(); } - private: using Base::StartCall; void OnWriteDone(bool ok) override @@ -198,100 +145,67 @@ namespace rppgrpc if (!ok) return; - std::lock_guard lock{m_write_mutex}; - m_write.pop_front(); - - if (!m_write.empty()) - { - Base::StartWrite(&m_write.front()); - } - else if (m_finished) - { - Base::StartWritesDone(); - } + details::base_writer::handle_write_done(); } void OnDone(const grpc::Status& s) override { - m_requests.get_disposable().dispose(); - - if (s.ok()) - { - m_observer.get_observer().on_completed(); - } - else - { - m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); - } + details::base_writer::handle_on_done(); + details::base_reader::handle_on_done(s.ok() ? std::exception_ptr{} : std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); + delete this; } - - private: - rpp::subjects::serialized_publish_subject m_requests{}; - rpp::subjects::publish_subject m_observer; - - std::mutex m_write_mutex{}; - std::deque m_write{}; - bool m_finished{}; }; /** * @brief RPP's based implementation for grpc client read reactor. * @details To use it you need: - * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - create it via `new` operator * - pass it to `stub->async()->GrpcReadStream(ctx, &response, reactor);` * - call `reactor->init()` method for actual starting of grpc logic * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) + * + * @snippet client_reactor.cpp read_reactor + * */ template class client_read_reactor final : public grpc::ClientReadReactor + , private details::base_reader { using Base = grpc::ClientReadReactor; public: - client_read_reactor() - { - } + client_read_reactor() = default; void init() { Base::StartCall(); - Base::StartRead(&m_read); + details::base_reader::handle_read_done(true); } - auto get_observable() - { - return m_observer.get_observable(); - } + using details::base_reader::get_observable; private: using Base::StartCall; using Base::StartRead; + void start_read(Response& data) override + { + Base::StartRead(&data); + } + void OnReadDone(bool ok) override { if (!ok) return; - m_observer.get_observer().on_next(m_read); - Base::StartRead(&m_read); + details::base_reader::handle_read_done(); } void OnDone(const grpc::Status& s) override { - if (s.ok()) - { - m_observer.get_observer().on_completed(); - } - else - { - m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); - } + details::base_reader::handle_on_done(s.ok() ? std::exception_ptr{} : std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); delete this; } - - private: - rpp::subjects::publish_subject m_observer; - Response m_read{}; }; } // namespace rppgrpc diff --git a/src/extensions/rppgrpc/rppgrpc/details/base.hpp b/src/extensions/rppgrpc/rppgrpc/details/base.hpp new file mode 100644 index 000000000..30a9dc96e --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/details/base.hpp @@ -0,0 +1,143 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include + +#include + +#include "rpp/subjects/publish_subject.hpp" + +#include +#include + +namespace rppgrpc::details +{ + template + class base_writer + { + public: + base_writer() + { + m_subject.get_observable().subscribe(typename details::base_writer::observer_strategy{*this}); + } + + virtual ~base_writer() noexcept = default; + + auto get_observer() const + { + return m_subject.get_observer(); + } + + protected: + virtual void start_write(const TData& v) = 0; + virtual void finish_writes(const grpc::Status& status) = 0; + + void handle_on_done() + { + m_subject.get_disposable().dispose(); + } + + void handle_write_done() + { + std::lock_guard lock{write_mutex}; + write.pop_front(); + + if (!write.empty()) + { + start_write(write.front()); + } + else if (finished) + { + finish_writes(grpc::Status::OK); + } + } + + struct observer_strategy + { + std::reference_wrapper owner{}; + + template T> + void on_next(T&& message) const + { + std::lock_guard lock{owner.get().write_mutex}; + owner.get().write.push_back(std::forward(message)); + if (owner.get().write.size() == 1) + owner.get().start_write(owner.get().write.front()); + } + + void on_error(const std::exception_ptr&) const + { + std::lock_guard lock{owner.get().write_mutex}; + owner.get().finished = true; + + if (owner.get().write.size() == 0) + owner.get().finish_writes(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"}); + } + void on_completed() const + { + std::lock_guard lock{owner.get().write_mutex}; + owner.get().finished = true; + + if (owner.get().write.size() == 0) + owner.get().finish_writes(grpc::Status::OK); + } + + static constexpr bool is_disposed() { return false; } + static constexpr void set_upstream(const rpp::disposable_wrapper&) {} + }; + + private: + rpp::subjects::serialized_publish_subject m_subject{}; + + std::mutex write_mutex{}; + std::deque write{}; + bool finished{}; + }; + + template + class base_reader + { + public: + base_reader() = default; + virtual ~base_reader() = default; + + auto get_observable() + { + return m_observer.get_observable(); + } + + protected: + virtual void start_read(TData& data) = 0; + + void handle_read_done(bool initial = false) + { + if (!initial) + m_observer.get_observer().on_next(m_data); + start_read(m_data); + } + + void handle_on_done(std::exception_ptr err) + { + if (err) + m_observer.get_observer().on_error(err); + else + m_observer.get_observer().on_completed(); + } + + private: + rpp::subjects::publish_subject m_observer; + TData m_data{}; + }; + +} // namespace rppgrpc::details diff --git a/src/extensions/rppgrpc/rppgrpc/fwd.hpp b/src/extensions/rppgrpc/rppgrpc/fwd.hpp index d807537f8..4f1114a43 100644 --- a/src/extensions/rppgrpc/rppgrpc/fwd.hpp +++ b/src/extensions/rppgrpc/rppgrpc/fwd.hpp @@ -27,4 +27,13 @@ namespace rppgrpc template class client_read_reactor; + + template + class server_bidi_reactor; + + template + class server_write_reactor; + + template + class server_read_reactor; } // namespace rppgrpc diff --git a/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp b/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp index 78306d613..15153db75 100644 --- a/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp +++ b/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp @@ -12,4 +12,4 @@ #include #include -// #include +#include diff --git a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp index 44f7689cb..392d8aae1 100644 --- a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp @@ -12,102 +12,212 @@ #include -#include - #include +#include #include #include -#include - -namespace rppgrpc::details +namespace rppgrpc { - template - class server_bidi_reactor final : public grpc::ServerBidiReactor, Response> + /** + * @brief RPP's based implementation for grpc server bidirectional reactor. + * @details To use it you need: + * - create it via `new` operator + * - return it from bidirection method of CallbackService interface + * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) + * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` + * + * @warning grpc server reactor have to finish manually, so it is expected that you call `on_completed()` on reactor->get_observer() + * + * @snippet server_reactor.cpp bidi_reactor + * + */ + template + class server_bidi_reactor final : public grpc::ServerBidiReactor + , private details::base_writer + , private details::base_reader { - using Request = rpp::utils::extract_observer_type_t; - using Base = grpc::ServerBidiReactor; + using Base = grpc::ServerBidiReactor; public: - template Observable, rpp::constraint::decayed_same_as TObserver> - server_bidi_reactor(const Observable& messages, TObserver&& events) - : m_observer{std::forward(events)} - , m_disposable{messages.subscribe_with_disposable([this] T>(T&& message) { - std::lock_guard lock{m_write_mutex}; - m_write.push_back(std::forward(message)); - if (m_write.size() == 1) - Base::StartWrite(&m_write.front()); }, - [this](const std::exception_ptr&) { - Base::Finish(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"}); - }, - [this]() { - Base::Finish(grpc::Status::OK); - })} + server_bidi_reactor() { Base::StartSendInitialMetadata(); - Base::StartRead(&m_read); + details::base_reader::handle_read_done(true); } + using details::base_writer::get_observer; + using details::base_reader::get_observable; + private: + void start_write(const Response& v) override + { + Base::StartWrite(&v); + } + + void start_read(Request& data) override + { + Base::StartRead(&data); + } + + void finish_writes(const grpc::Status& status) override + { + Base::Finish(status); + } + void OnReadDone(bool ok) override { if (!ok) - { - m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnReadDone is not ok"})); - Base::Finish(grpc::Status::CANCELLED); return; - } - m_observer.on_next(m_read); - Base::StartRead(&m_read); + + details::base_reader::handle_read_done(); } void OnWriteDone(bool ok) override { if (!ok) - { - m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); - Base::Finish(grpc::Status::CANCELLED); return; - } - std::lock_guard lock{m_write_mutex}; - m_write.pop_front(); + details::base_writer::handle_write_done(); + } + + void OnDone() override + { + details::base_writer::handle_on_done(); + details::base_reader::handle_on_done({}); - if (!m_write.empty()) + delete this; + } + + void OnCancel() override + { + details::base_writer::handle_on_done(); + details::base_reader::handle_on_done(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); + + Base::Finish(grpc::Status::CANCELLED); + } + }; + + /** + * @brief RPP's based implementation for grpc server write reactor. + * @details To use it you need: + * - create it via `new` operator + * - return it from write-based method of CallbackService interface + * - reactor provides `reactor->get_observable()` method but such as observable emits nothing and can be used only to be notified about completion/error + * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` + * + * @snippet server_reactor.cpp write_reactor + * + */ + template + class server_write_reactor final : public grpc::ServerWriteReactor + , private details::base_writer + , private details::base_reader + { + using Base = grpc::ServerWriteReactor; + + public: + server_write_reactor() + { + Base::StartSendInitialMetadata(); + } + + using details::base_writer::get_observer; + using details::base_reader::get_observable; + + private: + void start_write(const Response& v) override + { + Base::StartWrite(&v); + } + + void start_read(rpp::utils::none& data) override {} + + void finish_writes(const grpc::Status& status) override + { + Base::Finish(status); + } + + void OnWriteDone(bool ok) override + { + if (!ok) { - Base::StartWrite(&m_write.front()); + Base::Finish(grpc::Status::OK); + return; } + + details::base_writer::handle_write_done(); } void OnDone() override { - m_observer.on_completed(); - Destroy(); + details::base_writer::handle_on_done(); + details::base_reader::handle_on_done({}); + + delete this; } void OnCancel() override { - m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); + details::base_writer::handle_on_done(); + details::base_reader::handle_on_done(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); + Base::Finish(grpc::Status::CANCELLED); } + }; - private: - void Destroy() + /** + * @brief RPP's based implementation for grpc server read reactor. + * @details To use it you need: + * - create it via `new` operator + * - return it from read-based method of CallbackService interface + * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) + * + * @snippet server_reactor.cpp read_reactor + * + */ + template + class server_read_reactor final : public grpc::ServerReadReactor + , private details::base_reader + { + using Base = grpc::ServerReadReactor; + + public: + server_read_reactor() { - m_disposable.dispose(); - delete this; + Base::StartSendInitialMetadata(); + details::base_reader::handle_read_done(true); } + using details::base_reader::get_observable; + private: - Observer m_observer; - rpp::disposable_wrapper m_disposable; + void start_read(Request& data) override + { + Base::StartRead(&data); + } - Request m_read{}; + void OnReadDone(bool ok) override + { + if (!ok) + { + Base::Finish(grpc::Status::OK); + return; + } - std::mutex m_write_mutex{}; - std::deque m_write{}; + details::base_reader::handle_read_done(); + } + + void OnDone() override + { + details::base_reader::handle_on_done({}); + + delete this; + } + + void OnCancel() override + { + details::base_reader::handle_on_done(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); + } }; -} // namespace rppgrpc::details -namespace rppgrpc -{ } // namespace rppgrpc diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index f28168ae5..fb39b2cfe 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -25,8 +25,7 @@ macro(add_test_target target_name module files) endif() if (${module} STREQUAL rppgrpc) - rpp_add_proto_target(${TARGET}_proto rppgrpc/proto.proto) - target_link_libraries(${TARGET} PRIVATE ${TARGET}_proto) + target_link_libraries(${TARGET} PRIVATE rppgrpc_tests_proto) endif() target_compile_features(${TARGET} PRIVATE cxx_std_20) @@ -57,5 +56,6 @@ if (RPP_BUILD_QT_CODE) endif() if (RPP_BUILD_GRPC_CODE) + rpp_add_proto_target(rppgrpc_tests_proto rppgrpc/proto.proto) rpp_register_tests(rppgrpc) endif() diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index 880314d96..ee870bdaf 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -15,20 +15,15 @@ #include -struct service : public trompeloeil::mock_interface +namespace { - IMPLEMENT_MOCK3(ServerSide); - IMPLEMENT_MOCK3(ClientSide); - IMPLEMENT_MOCK2(Bidirectional); -}; - -void wait(const std::unique_ptr& e) -{ - while (!e->is_satisfied()) + struct service : public trompeloeil::mock_interface { - std::this_thread::sleep_for(std::chrono::seconds{1}); - } -} + IMPLEMENT_MOCK3(ServerSide); + IMPLEMENT_MOCK3(ClientSide); + IMPLEMENT_MOCK2(Bidirectional); + }; +} // namespace TEST_CASE("async client reactor") { diff --git a/src/tests/rppgrpc/test_async_server.cpp b/src/tests/rppgrpc/test_async_server.cpp new file mode 100644 index 000000000..76b7eb28a --- /dev/null +++ b/src/tests/rppgrpc/test_async_server.cpp @@ -0,0 +1,187 @@ +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "rpp_trompeloil.hpp" + + +namespace +{ + struct service : public trompeloeil::mock_interface + { + using write_reactor_ptr = grpc::ServerWriteReactor*; + using read_reactor_ptr = grpc::ServerReadReactor*; + using bidi_reactor_ptr = grpc::ServerBidiReactor*; + + MAKE_MOCK2(ServerSide, write_reactor_ptr(grpc::CallbackServerContext* /*context*/, const Request* /*request*/)); + MAKE_MOCK2(ClientSide, read_reactor_ptr(grpc::CallbackServerContext* /*context*/, Response* /*response*/)); + MAKE_MOCK1(Bidirectional, bidi_reactor_ptr(grpc::CallbackServerContext* /*context*/)); + }; +} // namespace + + +TEST_CASE("Async server") +{ + grpc::ServerBuilder builder{}; + trompeloeil::sequence s{}; + + auto mock_service = std::make_unique(); + + builder.RegisterService(mock_service.get()); + + auto server(builder.BuildAndStart()); + + const auto channel = server->InProcessChannel({}); + const auto stub = TestService::NewStub(channel, {}); + + mock_observer out_mock{}; + grpc::ClientContext ctx{}; + grpc::CallbackServerContext* obtained_context{}; + + auto test_common = [&](const auto& writer, const auto* reactor) { + SECTION("writer immediate finish") + { + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + auto t = std::thread{[&] { + if constexpr (requires { writer->WritesDone(); }) + writer->WritesDone(); + CHECK(writer->Finish().ok()); + }}; + + if constexpr (requires { reactor->get_observer(); }) + reactor->get_observer().on_completed(); + + t.join(); + wait(last); + } + + SECTION("writer cancels") + { + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + ctx.TryCancel(); + wait(last); + } + + SECTION("server cancels") + { + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + obtained_context->TryCancel(); + wait(last); + } + }; + + auto test_read = [&](const auto& writer, const auto* reactor) { + SECTION("writer writes") + { + REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + std::thread{[&] { + Request request{}; + for (int i : {1, 2, 3}) + { + request.set_value(i); + REQUIRE(writer->Write(request)); + } + writer->WritesDone(); + }}.join(); + + if constexpr (requires { reactor->get_observer(); }) + reactor->get_observer().on_completed(); + + std::thread{[&] { + REQUIRE(writer->Finish().ok()); + }}.join(); + + wait(last); + } + }; + + auto test_write = [&](const auto& writer, const auto* reactor) { + SECTION("writer reads") + { + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + Response response{}; + for (int i : {1, 2, 3}) + { + response.set_value(i); + reactor->get_observer().on_next(response); + } + reactor->get_observer().on_completed(); + + std::thread{[&] { + if constexpr (requires { writer->WritesDone(); }) + writer->WritesDone(); + + Response response{}; + for (int i : {1, 2, 3}) + { + REQUIRE(writer->Read(&response)); + REQUIRE(response.value() == i); + } + REQUIRE(!writer->Read(&response)); + REQUIRE(writer->Finish().ok()); + }}.join(); + wait(last); + } + }; + + SECTION("bidirectionl") + { + const auto reactor = new rppgrpc::server_bidi_reactor(); + reactor->get_observable() | rpp::ops::map([](const Request& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + + const auto call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_)).LR_SIDE_EFFECT(obtained_context = _1;).RETURN(reactor).IN_SEQUENCE(s); + + const auto writer = stub->Bidirectional(&ctx); + + wait(call); + + test_common(writer, reactor); + test_read(writer, reactor); + test_write(writer, reactor); + } + + SECTION("server-side") + { + const auto reactor = new rppgrpc::server_write_reactor(); + reactor->get_observable() | rpp::ops::map([](const rpp::utils::none&) { return 0; }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + + const auto call = NAMED_REQUIRE_CALL(*mock_service, ServerSide(trompeloeil::_, trompeloeil::_)).LR_SIDE_EFFECT(obtained_context = _1;).RETURN(reactor).IN_SEQUENCE(s); + + const auto writer = stub->ServerSide(&ctx, {}); + + wait(call); + + test_common(writer, reactor); + test_write(writer, reactor); + } + + SECTION("client-side") + { + const auto reactor = new rppgrpc::server_read_reactor(); + reactor->get_observable() | rpp::ops::map([](const Request& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + + const auto call = NAMED_REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_)).LR_SIDE_EFFECT(obtained_context = _1;).RETURN(reactor).IN_SEQUENCE(s); + + Response response{}; + const auto writer = stub->ClientSide(&ctx, &response); + + wait(call); + + test_common(writer, reactor); + test_read(writer, reactor); + } + + server->Shutdown(); + server->Wait(); +} diff --git a/src/tests/utils/rpp_trompeloil.hpp b/src/tests/utils/rpp_trompeloil.hpp index edd7bebc4..c2c817960 100644 --- a/src/tests/utils/rpp_trompeloil.hpp +++ b/src/tests/utils/rpp_trompeloil.hpp @@ -8,7 +8,7 @@ #include #include - +#include namespace trompeloeil { @@ -69,3 +69,11 @@ class mock_observer private: std::shared_ptr impl = std::make_shared(); }; + +inline void wait(const std::unique_ptr& e) +{ + while (!e->is_satisfied()) + { + std::this_thread::sleep_for(std::chrono::seconds{1}); + } +}