From c433d200fb63d151faf2c565c76b72c9dcd42d4b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 28 Jul 2024 23:46:51 +0300 Subject: [PATCH 01/20] minor client refactoring --- .../rppgrpc/rppgrpc/client_reactor.hpp | 126 +++++++++--------- 1 file changed, 65 insertions(+), 61 deletions(-) diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index 76a70c8ea..002273cd2 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -20,6 +20,52 @@ #include +namespace rppgrpc::details +{ + template + struct write_data + { + std::mutex write_mutex{}; + std::deque write{}; + bool finished{}; + }; + + template + struct write_observer_strategy + { + template T> + void on_next(T&& message) const + { + std::lock_guard lock{owner->m_write_data.write_mutex}; + owner->m_write_data.write.push_back(std::forward(message)); + if (owner->m_write_data.write.size() == 1) + owner->StartWrite(&owner->m_write_data.write.front()); + } + + void on_error(const std::exception_ptr&) const + { + std::lock_guard lock{owner->m_write_data.write_mutex}; + owner->m_write_data.finished = true; + + if (owner->m_write_data.write.size() == 0) + owner->StartWritesDone(); + } + void on_completed() const + { + std::lock_guard lock{owner->m_write_data.write_mutex}; + owner->m_write_data.finished = true; + + if (owner->m_write_data.write.size() == 0) + owner->StartWritesDone(); + } + + static constexpr bool is_disposed() { return false; } + static constexpr void set_upstream(const rpp::disposable_wrapper&) {} + + std::reference_wrapper owner{}; + }; +} // namespace rppgrpc::details + namespace rppgrpc { /** @@ -37,29 +83,11 @@ namespace rppgrpc using Base = grpc::ClientBidiReactor; public: + friend struct details::write_observer_strategy; + client_bidi_reactor() { - m_requests.get_observable().subscribe( - [this] T>(T&& message) { - std::lock_guard lock{m_write_mutex}; - m_write.push_back(std::forward(message)); - if (m_write.size() == 1) - Base::StartWrite(&m_write.front()); - }, - [this](const std::exception_ptr&) { - std::lock_guard lock{m_write_mutex}; - m_finished = true; - - if (m_write.size() == 0) - Base::StartWritesDone(); - }, - [this]() { - std::lock_guard lock{m_write_mutex}; - m_finished = true; - - if (m_write.size() == 0) - Base::StartWritesDone(); - }); + m_requests.get_observable().subscribe(details::write_observer_strategy{*this}); } void init() @@ -96,14 +124,14 @@ namespace rppgrpc if (!ok) return; - std::lock_guard lock{m_write_mutex}; - m_write.pop_front(); + std::lock_guard lock{m_write_data.write_mutex}; + m_write_data.write.pop_front(); - if (!m_write.empty()) + if (!m_write_data.write.empty()) { - Base::StartWrite(&m_write.front()); + Base::StartWrite(&m_write_data.write.front()); } - else if (m_finished) + else if (m_write_data.finished) { Base::StartWritesDone(); } @@ -130,9 +158,7 @@ namespace rppgrpc rpp::subjects::publish_subject m_observer; Response m_read{}; - std::mutex m_write_mutex{}; - std::deque m_write{}; - bool m_finished{}; + details::write_data m_write_data{}; }; /** @@ -150,29 +176,11 @@ namespace rppgrpc using Base = grpc::ClientWriteReactor; public: + friend struct details::write_observer_strategy; + client_write_reactor() { - m_requests.get_observable().subscribe( - [this] T>(T&& message) { - std::lock_guard lock{m_write_mutex}; - m_write.push_back(std::forward(message)); - if (m_write.size() == 1) - Base::StartWrite(&m_write.front()); - }, - [this](const std::exception_ptr&) { - std::lock_guard lock{m_write_mutex}; - m_finished = true; - - if (m_write.size() == 0) - Base::StartWritesDone(); - }, - [this]() { - std::lock_guard lock{m_write_mutex}; - m_finished = true; - - if (m_write.size() == 0) - Base::StartWritesDone(); - }); + m_requests.get_observable().subscribe(details::write_observer_strategy{*this}); } void init() @@ -198,14 +206,14 @@ namespace rppgrpc if (!ok) return; - std::lock_guard lock{m_write_mutex}; - m_write.pop_front(); + std::lock_guard lock{m_write_data.write_mutex}; + m_write_data.write.pop_front(); - if (!m_write.empty()) + if (!m_write_data.write.empty()) { - Base::StartWrite(&m_write.front()); + Base::StartWrite(&m_write_data.write.front()); } - else if (m_finished) + else if (m_write_data.finished) { Base::StartWritesDone(); } @@ -230,9 +238,7 @@ namespace rppgrpc rpp::subjects::serialized_publish_subject m_requests{}; rpp::subjects::publish_subject m_observer; - std::mutex m_write_mutex{}; - std::deque m_write{}; - bool m_finished{}; + details::write_data m_write_data{}; }; /** @@ -249,9 +255,7 @@ namespace rppgrpc using Base = grpc::ClientReadReactor; public: - client_read_reactor() - { - } + client_read_reactor() = default; void init() { From 79de0ac8ecf8f963f9a9c37d3590669b8be3df0e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 29 Jul 2024 00:07:05 +0300 Subject: [PATCH 02/20] start next work --- src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp | 2 +- .../rppgrpc/rppgrpc/server_reactor.hpp | 5 +-- src/tests/rppgrpc/test_async_server.cpp | 44 +++++++++++++++++++ 3 files changed, 46 insertions(+), 5 deletions(-) create mode 100644 src/tests/rppgrpc/test_async_server.cpp diff --git a/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp b/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp index 78306d613..15153db75 100644 --- a/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp +++ b/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp @@ -12,4 +12,4 @@ #include #include -// #include +#include diff --git a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp index 44f7689cb..b64d5ad41 100644 --- a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp @@ -20,7 +20,7 @@ #include -namespace rppgrpc::details +namespace rppgrpc { template class server_bidi_reactor final : public grpc::ServerBidiReactor, Response> @@ -107,7 +107,4 @@ namespace rppgrpc::details std::mutex m_write_mutex{}; std::deque m_write{}; }; -} // namespace rppgrpc::details -namespace rppgrpc -{ } // namespace rppgrpc diff --git a/src/tests/rppgrpc/test_async_server.cpp b/src/tests/rppgrpc/test_async_server.cpp new file mode 100644 index 000000000..627e77dc7 --- /dev/null +++ b/src/tests/rppgrpc/test_async_server.cpp @@ -0,0 +1,44 @@ +#include + +#include +#include +#include +#include + +#include "rpp_trompeloil.hpp" + +struct service : public trompeloeil::mock_interface +{ + MAKE_MOCK2(ServerSide, (grpc::ServerWriteReactor<::Response>*)(::grpc::CallbackServerContext* /*context*/, const ::Request* /*request*/)); + MAKE_MOCK2(ClientSide, (::grpc::ServerReadReactor<::Request>*)(::grpc::CallbackServerContext* /*context*/, ::Response* /*response*/)); + MAKE_MOCK1(Bidirectional, (::grpc::ServerBidiReactor<::Request, ::Response>*)(::grpc::CallbackServerContext* /*context*/)); +}; + +TEST_CASE("Async server") +{ + grpc::ServerBuilder builder{}; + + auto mock_service = std::make_unique(); + + builder.RegisterService(mock_service.get()); + + auto server(builder.BuildAndStart()); + + const auto channel = server->InProcessChannel({}); + const auto stub = TestService::NewStub(channel, {}); + + SECTION("bidirectionl") + { + grpc::ClientContext ctx{}; + Request req{}; + req.set_value(32); + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_)) + .WITH(_1.value() == 32) + .RETURN(new rppgrpc::server_bidi_reactor()); + + const auto writer = stub->Bidirectional(&ctx); + } + + server->Shutdown(); + server->Wait(); +} From f6c92e63e4569edae5ca8f761ba2b2a116656aba Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 29 Jul 2024 23:04:25 +0300 Subject: [PATCH 03/20] Extend server reactor --- .../rppgrpc/rppgrpc/client_reactor.hpp | 40 +++--- .../rppgrpc/rppgrpc/server_reactor.hpp | 119 ++++++++++++------ src/tests/rppgrpc/test_async_server.cpp | 103 ++++++++++++++- 3 files changed, 200 insertions(+), 62 deletions(-) diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index 002273cd2..c97923f6d 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -23,7 +23,7 @@ namespace rppgrpc::details { template - struct write_data + struct client_write_data { std::mutex write_mutex{}; std::deque write{}; @@ -31,32 +31,32 @@ namespace rppgrpc::details }; template - struct write_observer_strategy + struct client_write_observer_strategy { template T> void on_next(T&& message) const { - std::lock_guard lock{owner->m_write_data.write_mutex}; - owner->m_write_data.write.push_back(std::forward(message)); - if (owner->m_write_data.write.size() == 1) - owner->StartWrite(&owner->m_write_data.write.front()); + std::lock_guard lock{owner.get().m_write_data.write_mutex}; + owner.get().m_write_data.write.push_back(std::forward(message)); + if (owner.get().m_write_data.write.size() == 1) + owner.get().StartWrite(&owner.get().m_write_data.write.front()); } void on_error(const std::exception_ptr&) const { - std::lock_guard lock{owner->m_write_data.write_mutex}; - owner->m_write_data.finished = true; + std::lock_guard lock{owner.get().m_write_data.write_mutex}; + owner.get().m_write_data.finished = true; - if (owner->m_write_data.write.size() == 0) - owner->StartWritesDone(); + if (owner.get().m_write_data.write.size() == 0) + owner.get().StartWritesDone(); } void on_completed() const { - std::lock_guard lock{owner->m_write_data.write_mutex}; - owner->m_write_data.finished = true; + std::lock_guard lock{owner.get().m_write_data.write_mutex}; + owner.get().m_write_data.finished = true; - if (owner->m_write_data.write.size() == 0) - owner->StartWritesDone(); + if (owner.get().m_write_data.write.size() == 0) + owner.get().StartWritesDone(); } static constexpr bool is_disposed() { return false; } @@ -83,11 +83,11 @@ namespace rppgrpc using Base = grpc::ClientBidiReactor; public: - friend struct details::write_observer_strategy; + friend struct details::client_write_observer_strategy; client_bidi_reactor() { - m_requests.get_observable().subscribe(details::write_observer_strategy{*this}); + m_requests.get_observable().subscribe(details::client_write_observer_strategy{*this}); } void init() @@ -158,7 +158,7 @@ namespace rppgrpc rpp::subjects::publish_subject m_observer; Response m_read{}; - details::write_data m_write_data{}; + details::client_write_data m_write_data{}; }; /** @@ -176,11 +176,11 @@ namespace rppgrpc using Base = grpc::ClientWriteReactor; public: - friend struct details::write_observer_strategy; + friend struct details::client_write_observer_strategy; client_write_reactor() { - m_requests.get_observable().subscribe(details::write_observer_strategy{*this}); + m_requests.get_observable().subscribe(details::client_write_observer_strategy{*this}); } void init() @@ -238,7 +238,7 @@ namespace rppgrpc rpp::subjects::serialized_publish_subject m_requests{}; rpp::subjects::publish_subject m_observer; - details::write_data m_write_data{}; + details::client_write_data m_write_data{}; }; /** diff --git a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp index b64d5ad41..1e354f51e 100644 --- a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp @@ -20,44 +20,87 @@ #include +namespace rppgrpc::details +{ + template + struct server_write_data + { + std::mutex write_mutex{}; + std::deque write{}; + bool finished{}; + }; + + template + struct server_write_observer_strategy + { + template T> + void on_next(T&& message) const + { + std::lock_guard lock{owner.get().m_write_data.write_mutex}; + owner.get().m_write_data.write.push_back(std::forward(message)); + if (owner.get().m_write_data.write.size() == 1) + owner.get().StartWrite(&owner.get().m_write_data.write.front()); + } + + void on_error(const std::exception_ptr&) const + { + std::lock_guard lock{owner.get().m_write_data.write_mutex}; + owner.get().m_write_data.finished = true; + + if (owner.get().m_write_data.write.size() == 0) + owner.get().Finish(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"}); + } + void on_completed() const + { + std::lock_guard lock{owner.get().m_write_data.write_mutex}; + owner.get().m_write_data.finished = true; + + if (owner.get().m_write_data.write.size() == 0) + owner.get().Finish(grpc::Status::OK); + } + + static constexpr bool is_disposed() { return false; } + static constexpr void set_upstream(const rpp::disposable_wrapper&) {} + + std::reference_wrapper owner{}; + }; +} // namespace rppgrpc::details + namespace rppgrpc { - template - class server_bidi_reactor final : public grpc::ServerBidiReactor, Response> + template + class server_bidi_reactor final : public grpc::ServerBidiReactor { - using Request = rpp::utils::extract_observer_type_t; - using Base = grpc::ServerBidiReactor; + using Base = grpc::ServerBidiReactor; public: - template Observable, rpp::constraint::decayed_same_as TObserver> - server_bidi_reactor(const Observable& messages, TObserver&& events) - : m_observer{std::forward(events)} - , m_disposable{messages.subscribe_with_disposable([this] T>(T&& message) { - std::lock_guard lock{m_write_mutex}; - m_write.push_back(std::forward(message)); - if (m_write.size() == 1) - Base::StartWrite(&m_write.front()); }, - [this](const std::exception_ptr&) { - Base::Finish(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"}); - }, - [this]() { - Base::Finish(grpc::Status::OK); - })} + friend struct details::server_write_observer_strategy; + + server_bidi_reactor() { + m_responses.get_observable().subscribe(details::server_write_observer_strategy{*this}); + Base::StartSendInitialMetadata(); Base::StartRead(&m_read); } + auto get_observer() + { + return m_responses.get_observer(); + } + + auto get_observable() + { + return m_observer.get_observable(); + } + private: void OnReadDone(bool ok) override { if (!ok) - { - m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnReadDone is not ok"})); - Base::Finish(grpc::Status::CANCELLED); return; - } - m_observer.on_next(m_read); + + m_observer.get_observer().on_next(m_read); Base::StartRead(&m_read); } @@ -65,46 +108,50 @@ namespace rppgrpc { if (!ok) { - m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); + m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); Base::Finish(grpc::Status::CANCELLED); return; } - std::lock_guard lock{m_write_mutex}; - m_write.pop_front(); + std::lock_guard lock{m_write_data.write_mutex}; + m_write_data.write.pop_front(); - if (!m_write.empty()) + if (!m_write_data.write.empty()) + { + Base::StartWrite(&m_write_data.write.front()); + } + else if (m_write_data.finished) { - Base::StartWrite(&m_write.front()); + Base::Finish(grpc::Status::OK); } } void OnDone() override { - m_observer.on_completed(); + m_responses.get_disposable().dispose(); + m_observer.get_observer().on_completed(); Destroy(); } void OnCancel() override { - m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); + m_responses.get_disposable().dispose(); + m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); Base::Finish(grpc::Status::CANCELLED); } private: void Destroy() { - m_disposable.dispose(); delete this; } private: - Observer m_observer; - rpp::disposable_wrapper m_disposable; + rpp::subjects::serialized_publish_subject m_responses{}; - Request m_read{}; + rpp::subjects::publish_subject m_observer; + Request m_read{}; - std::mutex m_write_mutex{}; - std::deque m_write{}; + details::server_write_data m_write_data{}; }; } // namespace rppgrpc diff --git a/src/tests/rppgrpc/test_async_server.cpp b/src/tests/rppgrpc/test_async_server.cpp index 627e77dc7..a41bf1e86 100644 --- a/src/tests/rppgrpc/test_async_server.cpp +++ b/src/tests/rppgrpc/test_async_server.cpp @@ -1,5 +1,10 @@ #include +#include +#include +#include +#include + #include #include #include @@ -14,9 +19,19 @@ struct service : public trompeloeil::mock_interface*)(::grpc::CallbackServerContext* /*context*/)); }; +void wait(const std::unique_ptr& e) +{ + while (!e->is_satisfied()) + { + std::this_thread::sleep_for(std::chrono::seconds{1}); + } +} + + TEST_CASE("Async server") { - grpc::ServerBuilder builder{}; + grpc::ServerBuilder builder{}; + trompeloeil::sequence s{}; auto mock_service = std::make_unique(); @@ -27,16 +42,92 @@ TEST_CASE("Async server") const auto channel = server->InProcessChannel({}); const auto stub = TestService::NewStub(channel, {}); + mock_observer out_mock{}; + SECTION("bidirectionl") { grpc::ClientContext ctx{}; - Request req{}; - req.set_value(32); - REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_)) - .WITH(_1.value() == 32) - .RETURN(new rppgrpc::server_bidi_reactor()); + const auto reactor = new rppgrpc::server_bidi_reactor(); + reactor->get_observable() | rpp::ops::map([](const Request& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + + ::grpc::CallbackServerContext* obtained_context{}; + const auto bidirectional_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_)).LR_SIDE_EFFECT(obtained_context = _1;).RETURN(reactor).IN_SEQUENCE(s); const auto writer = stub->Bidirectional(&ctx); + + SECTION("writer immediate finish") + { + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + auto t = std::thread{[&] { + writer->WritesDone(); + CHECK(writer->Finish().ok()); + }}; + + reactor->get_observer().on_completed(); + t.join(); + wait(last); + } + + SECTION("writer cancels") + { + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + ctx.TryCancel(); + wait(last); + } + + SECTION("server cancels") + { + wait(bidirectional_call); + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + obtained_context->TryCancel(); + wait(last); + } + + SECTION("writer writes") + { + REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + std::thread{[&] { + Request request{}; + for (int i : {1, 2, 3}) + { + request.set_value(i); + writer->Write(request); + } + writer->WritesDone(); + REQUIRE(writer->Finish().ok()); + }}.join(); + wait(last); + } + + SECTION("writer reads") + { + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + Response response{}; + for (int i : {1, 2, 3}) + { + response.set_value(i); + reactor->get_observer().on_next(response); + } + reactor->get_observer().on_completed(); + + std::thread{[&] { + writer->WritesDone(); + + Response response{}; + for (int i : {1, 2, 3}) + { + REQUIRE(writer->Read(&response)); + REQUIRE(response.value() == i); + } + // REQUIRE(!writer->Read(&response)); + REQUIRE(writer->Finish().ok()); + }}.join(); + wait(last); + } } server->Shutdown(); From 5e3efb83d48affb0401559b1cdfd79d2c61a320d Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 29 Jul 2024 23:48:46 +0300 Subject: [PATCH 04/20] fix tests --- src/tests/rppgrpc/test_async_server.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/tests/rppgrpc/test_async_server.cpp b/src/tests/rppgrpc/test_async_server.cpp index a41bf1e86..70769e07d 100644 --- a/src/tests/rppgrpc/test_async_server.cpp +++ b/src/tests/rppgrpc/test_async_server.cpp @@ -55,6 +55,9 @@ TEST_CASE("Async server") const auto writer = stub->Bidirectional(&ctx); + wait(bidirectional_call); + + SECTION("writer immediate finish") { const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); @@ -77,8 +80,6 @@ TEST_CASE("Async server") SECTION("server cancels") { - wait(bidirectional_call); - const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); obtained_context->TryCancel(); wait(last); @@ -86,6 +87,7 @@ TEST_CASE("Async server") SECTION("writer writes") { + REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s); @@ -98,8 +100,14 @@ TEST_CASE("Async server") writer->Write(request); } writer->WritesDone(); + }}.join(); + + reactor->get_observer().on_completed(); + + std::thread{[&] { REQUIRE(writer->Finish().ok()); }}.join(); + wait(last); } @@ -123,7 +131,7 @@ TEST_CASE("Async server") REQUIRE(writer->Read(&response)); REQUIRE(response.value() == i); } - // REQUIRE(!writer->Read(&response)); + REQUIRE(!writer->Read(&response)); REQUIRE(writer->Finish().ok()); }}.join(); wait(last); From fdd268eb7bb9898ddea3f6ad4949e6490432435c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 29 Jul 2024 23:51:04 +0300 Subject: [PATCH 05/20] add checks --- src/tests/rppgrpc/test_async_server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/rppgrpc/test_async_server.cpp b/src/tests/rppgrpc/test_async_server.cpp index 70769e07d..b8bffee24 100644 --- a/src/tests/rppgrpc/test_async_server.cpp +++ b/src/tests/rppgrpc/test_async_server.cpp @@ -97,7 +97,7 @@ TEST_CASE("Async server") for (int i : {1, 2, 3}) { request.set_value(i); - writer->Write(request); + REQUIRE(writer->Write(request)); } writer->WritesDone(); }}.join(); From ce9bf15cde45fbf5fab97bf3937e32828cb04297 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 30 Jul 2024 23:01:58 +0300 Subject: [PATCH 06/20] add base writer --- .../rppgrpc/rppgrpc/client_reactor.hpp | 130 +++++------------- .../rppgrpc/rppgrpc/details/base.hpp | 106 ++++++++++++++ src/tests/rppgrpc/test_async_server.cpp | 1 - 3 files changed, 138 insertions(+), 99 deletions(-) create mode 100644 src/extensions/rppgrpc/rppgrpc/details/base.hpp diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index c97923f6d..235298fe7 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -15,57 +15,12 @@ #include #include +#include #include #include #include -namespace rppgrpc::details -{ - template - struct client_write_data - { - std::mutex write_mutex{}; - std::deque write{}; - bool finished{}; - }; - - template - struct client_write_observer_strategy - { - template T> - void on_next(T&& message) const - { - std::lock_guard lock{owner.get().m_write_data.write_mutex}; - owner.get().m_write_data.write.push_back(std::forward(message)); - if (owner.get().m_write_data.write.size() == 1) - owner.get().StartWrite(&owner.get().m_write_data.write.front()); - } - - void on_error(const std::exception_ptr&) const - { - std::lock_guard lock{owner.get().m_write_data.write_mutex}; - owner.get().m_write_data.finished = true; - - if (owner.get().m_write_data.write.size() == 0) - owner.get().StartWritesDone(); - } - void on_completed() const - { - std::lock_guard lock{owner.get().m_write_data.write_mutex}; - owner.get().m_write_data.finished = true; - - if (owner.get().m_write_data.write.size() == 0) - owner.get().StartWritesDone(); - } - - static constexpr bool is_disposed() { return false; } - static constexpr void set_upstream(const rpp::disposable_wrapper&) {} - - std::reference_wrapper owner{}; - }; -} // namespace rppgrpc::details - namespace rppgrpc { /** @@ -79,16 +34,12 @@ namespace rppgrpc */ template class client_bidi_reactor final : public grpc::ClientBidiReactor + , private details::base_writer { using Base = grpc::ClientBidiReactor; public: - friend struct details::client_write_observer_strategy; - - client_bidi_reactor() - { - m_requests.get_observable().subscribe(details::client_write_observer_strategy{*this}); - } + client_bidi_reactor() = default; void init() { @@ -96,10 +47,7 @@ namespace rppgrpc Base::StartRead(&m_read); } - auto get_observer() - { - return m_requests.get_observer(); - } + using details::base_writer::get_observer; auto get_observable() { @@ -107,6 +55,16 @@ namespace rppgrpc } private: + void start_write(const Request& v) override + { + Base::StartWrite(&v); + } + + void finish_writes() override + { + Base::StartWritesDone(); + } + using Base::StartCall; using Base::StartRead; @@ -124,22 +82,12 @@ namespace rppgrpc if (!ok) return; - std::lock_guard lock{m_write_data.write_mutex}; - m_write_data.write.pop_front(); - - if (!m_write_data.write.empty()) - { - Base::StartWrite(&m_write_data.write.front()); - } - else if (m_write_data.finished) - { - Base::StartWritesDone(); - } + details::base_writer::handle_write_done(); } void OnDone(const grpc::Status& s) override { - m_requests.get_disposable().dispose(); + details::base_writer::handle_on_done(); if (s.ok()) { @@ -153,12 +101,8 @@ namespace rppgrpc } private: - rpp::subjects::serialized_publish_subject m_requests{}; - rpp::subjects::publish_subject m_observer; Response m_read{}; - - details::client_write_data m_write_data{}; }; /** @@ -172,26 +116,19 @@ namespace rppgrpc */ template class client_write_reactor final : public grpc::ClientWriteReactor + , private details::base_writer { using Base = grpc::ClientWriteReactor; public: - friend struct details::client_write_observer_strategy; - - client_write_reactor() - { - m_requests.get_observable().subscribe(details::client_write_observer_strategy{*this}); - } + client_write_reactor() = default; void init() { Base::StartCall(); } - auto get_observer() - { - return m_requests.get_observer(); - } + using details::base_writer::get_observer; auto get_observable() { @@ -199,6 +136,16 @@ namespace rppgrpc } private: + void start_write(const Request& v) override + { + Base::StartWrite(&v); + } + + void finish_writes() override + { + Base::StartWritesDone(); + } + using Base::StartCall; void OnWriteDone(bool ok) override @@ -206,22 +153,12 @@ namespace rppgrpc if (!ok) return; - std::lock_guard lock{m_write_data.write_mutex}; - m_write_data.write.pop_front(); - - if (!m_write_data.write.empty()) - { - Base::StartWrite(&m_write_data.write.front()); - } - else if (m_write_data.finished) - { - Base::StartWritesDone(); - } + details::base_writer::handle_write_done(); } void OnDone(const grpc::Status& s) override { - m_requests.get_disposable().dispose(); + details::base_writer::handle_on_done(); if (s.ok()) { @@ -235,10 +172,7 @@ namespace rppgrpc } private: - rpp::subjects::serialized_publish_subject m_requests{}; - rpp::subjects::publish_subject m_observer; - - details::client_write_data m_write_data{}; + rpp::subjects::publish_subject m_observer; }; /** diff --git a/src/extensions/rppgrpc/rppgrpc/details/base.hpp b/src/extensions/rppgrpc/rppgrpc/details/base.hpp new file mode 100644 index 000000000..a65661f76 --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/details/base.hpp @@ -0,0 +1,106 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include + +#include "rpp/subjects/publish_subject.hpp" + +#include +#include + +namespace rppgrpc::details +{ + template + class base_writer + { + public: + base_writer() + { + m_subject.get_observable().subscribe(typename details::base_writer::observer_strategy{*this}); + } + + virtual ~base_writer() noexcept = default; + + auto get_observer() const + { + return m_subject.get_observer(); + } + + protected: + virtual void start_write(const TData& v) = 0; + virtual void finish_writes() = 0; + + void handle_on_done() + { + m_subject.get_disposable().dispose(); + } + + void handle_write_done() + { + std::lock_guard lock{write_mutex}; + write.pop_front(); + + if (!write.empty()) + { + start_write(write.front()); + } + else if (finished) + { + finish_writes(); + } + } + + struct observer_strategy + { + std::reference_wrapper owner{}; + + template T> + void on_next(T&& message) const + { + std::lock_guard lock{owner.get().write_mutex}; + owner.get().write.push_back(std::forward(message)); + if (owner.get().write.size() == 1) + owner.get().start_write(owner.get().write.front()); + } + + void on_error(const std::exception_ptr&) const + { + std::lock_guard lock{owner.get().write_mutex}; + owner.get().finished = true; + + if (owner.get().write.size() == 0) + owner.get().finish_writes(); + } + void on_completed() const + { + std::lock_guard lock{owner.get().write_mutex}; + owner.get().finished = true; + + if (owner.get().write.size() == 0) + owner.get().finish_writes(); + } + + static constexpr bool is_disposed() { return false; } + static constexpr void set_upstream(const rpp::disposable_wrapper&) {} + }; + + private: + rpp::subjects::serialized_publish_subject m_subject{}; + + std::mutex write_mutex{}; + std::deque write{}; + bool finished{}; + }; + +} // namespace rppgrpc::details diff --git a/src/tests/rppgrpc/test_async_server.cpp b/src/tests/rppgrpc/test_async_server.cpp index b8bffee24..354bcaed8 100644 --- a/src/tests/rppgrpc/test_async_server.cpp +++ b/src/tests/rppgrpc/test_async_server.cpp @@ -57,7 +57,6 @@ TEST_CASE("Async server") wait(bidirectional_call); - SECTION("writer immediate finish") { const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); From 3dbcdb8393e80877c433d6fe8686cffe839085ab Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 30 Jul 2024 23:14:03 +0300 Subject: [PATCH 07/20] refactor --- .../rppgrpc/rppgrpc/client_reactor.hpp | 77 ++++++------------- .../rppgrpc/rppgrpc/details/base.hpp | 35 +++++++++ 2 files changed, 58 insertions(+), 54 deletions(-) diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index 235298fe7..0facfeaba 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -19,8 +19,6 @@ #include #include -#include - namespace rppgrpc { /** @@ -35,6 +33,7 @@ namespace rppgrpc template class client_bidi_reactor final : public grpc::ClientBidiReactor , private details::base_writer + , private details::base_reader { using Base = grpc::ClientBidiReactor; @@ -44,17 +43,18 @@ namespace rppgrpc void init() { Base::StartCall(); - Base::StartRead(&m_read); + details::base_reader::handle_read_done(true); } using details::base_writer::get_observer; + using details::base_reader::get_observable; - auto get_observable() + private: + void start_read(Response& data) override { - return m_observer.get_observable(); + Base::StartRead(&data); } - private: void start_write(const Request& v) override { Base::StartWrite(&v); @@ -73,8 +73,7 @@ namespace rppgrpc if (!ok) return; - m_observer.get_observer().on_next(m_read); - Base::StartRead(&m_read); + details::base_reader::handle_read_done(); } void OnWriteDone(bool ok) override @@ -88,21 +87,9 @@ namespace rppgrpc void OnDone(const grpc::Status& s) override { details::base_writer::handle_on_done(); - - if (s.ok()) - { - m_observer.get_observer().on_completed(); - } - else - { - m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); - } + details::base_reader::handle_on_done(s.ok() ? std::exception_ptr{} : std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); delete this; } - - private: - rpp::subjects::publish_subject m_observer; - Response m_read{}; }; /** @@ -117,6 +104,7 @@ namespace rppgrpc template class client_write_reactor final : public grpc::ClientWriteReactor , private details::base_writer + , private details::base_reader { using Base = grpc::ClientWriteReactor; @@ -129,13 +117,13 @@ namespace rppgrpc } using details::base_writer::get_observer; + using details::base_reader::get_observable; - auto get_observable() + private: + void start_read(rpp::utils::none& data) override { - return m_observer.get_observable(); } - private: void start_write(const Request& v) override { Base::StartWrite(&v); @@ -159,20 +147,10 @@ namespace rppgrpc void OnDone(const grpc::Status& s) override { details::base_writer::handle_on_done(); + details::base_reader::handle_on_done(s.ok() ? std::exception_ptr{} : std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); - if (s.ok()) - { - m_observer.get_observer().on_completed(); - } - else - { - m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); - } delete this; } - - private: - rpp::subjects::publish_subject m_observer; }; /** @@ -185,6 +163,7 @@ namespace rppgrpc */ template class client_read_reactor final : public grpc::ClientReadReactor + , private details::base_reader { using Base = grpc::ClientReadReactor; @@ -194,42 +173,32 @@ namespace rppgrpc void init() { Base::StartCall(); - Base::StartRead(&m_read); + details::base_reader::handle_read_done(true); } - auto get_observable() - { - return m_observer.get_observable(); - } + using details::base_reader::get_observable; private: using Base::StartCall; using Base::StartRead; + void start_read(Response& data) override + { + Base::StartRead(&data); + } + void OnReadDone(bool ok) override { if (!ok) return; - m_observer.get_observer().on_next(m_read); - Base::StartRead(&m_read); + details::base_reader::handle_read_done(); } void OnDone(const grpc::Status& s) override { - if (s.ok()) - { - m_observer.get_observer().on_completed(); - } - else - { - m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); - } + details::base_reader::handle_on_done(s.ok() ? std::exception_ptr{} : std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); delete this; } - - private: - rpp::subjects::publish_subject m_observer; - Response m_read{}; }; } // namespace rppgrpc diff --git a/src/extensions/rppgrpc/rppgrpc/details/base.hpp b/src/extensions/rppgrpc/rppgrpc/details/base.hpp index a65661f76..a85f22b07 100644 --- a/src/extensions/rppgrpc/rppgrpc/details/base.hpp +++ b/src/extensions/rppgrpc/rppgrpc/details/base.hpp @@ -103,4 +103,39 @@ namespace rppgrpc::details bool finished{}; }; + template + class base_reader + { + public: + base_reader() = default; + virtual ~base_reader() = default; + + auto get_observable() + { + return m_observer.get_observable(); + } + + protected: + virtual void start_read(TData& data) = 0; + + void handle_read_done(bool initial = false) + { + if (!initial) + m_observer.get_observer().on_next(m_data); + start_read(m_data); + } + + void handle_on_done(std::exception_ptr err) + { + if (err) + m_observer.get_observer().on_error(err); + else + m_observer.get_observer().on_completed(); + } + + private: + rpp::subjects::publish_subject m_observer; + TData m_data{}; + }; + } // namespace rppgrpc::details From a2de96339d25a11d22bafd13c02fc5c0308c22a3 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 30 Jul 2024 23:22:06 +0300 Subject: [PATCH 08/20] reuse same code --- .../rppgrpc/rppgrpc/client_reactor.hpp | 6 +- .../rppgrpc/rppgrpc/details/base.hpp | 12 +- .../rppgrpc/rppgrpc/server_reactor.hpp | 124 ++++-------------- 3 files changed, 36 insertions(+), 106 deletions(-) diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index 0facfeaba..6cc241aab 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -12,8 +12,6 @@ #include -#include - #include #include #include @@ -60,7 +58,7 @@ namespace rppgrpc Base::StartWrite(&v); } - void finish_writes() override + void finish_writes(const grpc::Status) override { Base::StartWritesDone(); } @@ -129,7 +127,7 @@ namespace rppgrpc Base::StartWrite(&v); } - void finish_writes() override + void finish_writes(const grpc::Status&) override { Base::StartWritesDone(); } diff --git a/src/extensions/rppgrpc/rppgrpc/details/base.hpp b/src/extensions/rppgrpc/rppgrpc/details/base.hpp index a85f22b07..30a9dc96e 100644 --- a/src/extensions/rppgrpc/rppgrpc/details/base.hpp +++ b/src/extensions/rppgrpc/rppgrpc/details/base.hpp @@ -14,6 +14,8 @@ #include +#include + #include "rpp/subjects/publish_subject.hpp" #include @@ -38,8 +40,8 @@ namespace rppgrpc::details } protected: - virtual void start_write(const TData& v) = 0; - virtual void finish_writes() = 0; + virtual void start_write(const TData& v) = 0; + virtual void finish_writes(const grpc::Status& status) = 0; void handle_on_done() { @@ -57,7 +59,7 @@ namespace rppgrpc::details } else if (finished) { - finish_writes(); + finish_writes(grpc::Status::OK); } } @@ -80,7 +82,7 @@ namespace rppgrpc::details owner.get().finished = true; if (owner.get().write.size() == 0) - owner.get().finish_writes(); + owner.get().finish_writes(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"}); } void on_completed() const { @@ -88,7 +90,7 @@ namespace rppgrpc::details owner.get().finished = true; if (owner.get().write.size() == 0) - owner.get().finish_writes(); + owner.get().finish_writes(grpc::Status::OK); } static constexpr bool is_disposed() { return false; } diff --git a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp index 1e354f51e..4368abe3a 100644 --- a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp @@ -12,146 +12,76 @@ #include -#include - #include +#include #include #include -#include - -namespace rppgrpc::details -{ - template - struct server_write_data - { - std::mutex write_mutex{}; - std::deque write{}; - bool finished{}; - }; - - template - struct server_write_observer_strategy - { - template T> - void on_next(T&& message) const - { - std::lock_guard lock{owner.get().m_write_data.write_mutex}; - owner.get().m_write_data.write.push_back(std::forward(message)); - if (owner.get().m_write_data.write.size() == 1) - owner.get().StartWrite(&owner.get().m_write_data.write.front()); - } - - void on_error(const std::exception_ptr&) const - { - std::lock_guard lock{owner.get().m_write_data.write_mutex}; - owner.get().m_write_data.finished = true; - - if (owner.get().m_write_data.write.size() == 0) - owner.get().Finish(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"}); - } - void on_completed() const - { - std::lock_guard lock{owner.get().m_write_data.write_mutex}; - owner.get().m_write_data.finished = true; - - if (owner.get().m_write_data.write.size() == 0) - owner.get().Finish(grpc::Status::OK); - } - - static constexpr bool is_disposed() { return false; } - static constexpr void set_upstream(const rpp::disposable_wrapper&) {} - - std::reference_wrapper owner{}; - }; -} // namespace rppgrpc::details - namespace rppgrpc { template class server_bidi_reactor final : public grpc::ServerBidiReactor + , private details::base_writer + , private details::base_reader { using Base = grpc::ServerBidiReactor; public: - friend struct details::server_write_observer_strategy; - server_bidi_reactor() { - m_responses.get_observable().subscribe(details::server_write_observer_strategy{*this}); - Base::StartSendInitialMetadata(); - Base::StartRead(&m_read); + details::base_reader::handle_read_done(true); } - auto get_observer() + using details::base_writer::get_observer; + using details::base_reader::get_observable; + + private: + void start_write(const Response& v) override { - return m_responses.get_observer(); + Base::StartWrite(&v); } - auto get_observable() + void start_read(Request& data) override { - return m_observer.get_observable(); + Base::StartRead(&data); + } + + void finish_writes(const grpc::Status& status) override + { + Base::Finish(status); } - private: void OnReadDone(bool ok) override { if (!ok) return; - m_observer.get_observer().on_next(m_read); - Base::StartRead(&m_read); + details::base_reader::handle_read_done(); } void OnWriteDone(bool ok) override { if (!ok) - { - m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); - Base::Finish(grpc::Status::CANCELLED); return; - } - - std::lock_guard lock{m_write_data.write_mutex}; - m_write_data.write.pop_front(); - - if (!m_write_data.write.empty()) - { - Base::StartWrite(&m_write_data.write.front()); - } - else if (m_write_data.finished) - { - Base::Finish(grpc::Status::OK); - } + + details::base_writer::handle_write_done(); } void OnDone() override { - m_responses.get_disposable().dispose(); - m_observer.get_observer().on_completed(); - Destroy(); + details::base_writer::handle_on_done(); + details::base_reader::handle_on_done({}); + + delete this; } void OnCancel() override { - m_responses.get_disposable().dispose(); - m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); - Base::Finish(grpc::Status::CANCELLED); - } + details::base_writer::handle_on_done(); + details::base_reader::handle_on_done(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); - private: - void Destroy() - { - delete this; + Base::Finish(grpc::Status::CANCELLED); } - - private: - rpp::subjects::serialized_publish_subject m_responses{}; - - rpp::subjects::publish_subject m_observer; - Request m_read{}; - - details::server_write_data m_write_data{}; }; } // namespace rppgrpc From 781427a913f316ad744a74ece18517e42c30ebe5 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 31 Jul 2024 14:55:25 +0300 Subject: [PATCH 09/20] fix --- src/extensions/rppgrpc/rppgrpc/client_reactor.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index 6cc241aab..c4f183983 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -58,7 +58,7 @@ namespace rppgrpc Base::StartWrite(&v); } - void finish_writes(const grpc::Status) override + void finish_writes(const grpc::Status&) override { Base::StartWritesDone(); } From c6ee2b776c1ed55294042e22dc0b2c56fbd80cdf Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 31 Jul 2024 15:49:06 +0300 Subject: [PATCH 10/20] Update CMakeLists.txt --- src/tests/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index f28168ae5..fb39b2cfe 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -25,8 +25,7 @@ macro(add_test_target target_name module files) endif() if (${module} STREQUAL rppgrpc) - rpp_add_proto_target(${TARGET}_proto rppgrpc/proto.proto) - target_link_libraries(${TARGET} PRIVATE ${TARGET}_proto) + target_link_libraries(${TARGET} PRIVATE rppgrpc_tests_proto) endif() target_compile_features(${TARGET} PRIVATE cxx_std_20) @@ -57,5 +56,6 @@ if (RPP_BUILD_QT_CODE) endif() if (RPP_BUILD_GRPC_CODE) + rpp_add_proto_target(rppgrpc_tests_proto rppgrpc/proto.proto) rpp_register_tests(rppgrpc) endif() From b534acd8cedcf1f1269d0b1b738b406ff79515ae Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 4 Aug 2024 21:56:06 +0300 Subject: [PATCH 11/20] change parentheses --- src/tests/rppgrpc/test_async_server.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tests/rppgrpc/test_async_server.cpp b/src/tests/rppgrpc/test_async_server.cpp index 354bcaed8..441202f8a 100644 --- a/src/tests/rppgrpc/test_async_server.cpp +++ b/src/tests/rppgrpc/test_async_server.cpp @@ -14,9 +14,9 @@ struct service : public trompeloeil::mock_interface { - MAKE_MOCK2(ServerSide, (grpc::ServerWriteReactor<::Response>*)(::grpc::CallbackServerContext* /*context*/, const ::Request* /*request*/)); - MAKE_MOCK2(ClientSide, (::grpc::ServerReadReactor<::Request>*)(::grpc::CallbackServerContext* /*context*/, ::Response* /*response*/)); - MAKE_MOCK1(Bidirectional, (::grpc::ServerBidiReactor<::Request, ::Response>*)(::grpc::CallbackServerContext* /*context*/)); + MAKE_MOCK2(ServerSide, (grpc::ServerWriteReactor<::Response> * (::grpc::CallbackServerContext* /*context*/, const ::Request* /*request*/))); + MAKE_MOCK2(ClientSide, (::grpc::ServerReadReactor<::Request> * (::grpc::CallbackServerContext* /*context*/, ::Response* /*response*/))); + MAKE_MOCK1(Bidirectional, (::grpc::ServerBidiReactor<::Request, ::Response> * (::grpc::CallbackServerContext* /*context*/))); }; void wait(const std::unique_ptr& e) From 074fbf21559e9fec3517b1414d1d82ad4a381bfe Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 4 Aug 2024 18:56:11 +0000 Subject: [PATCH 12/20] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/operators/publish.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rpp/rpp/operators/publish.hpp b/src/rpp/rpp/operators/publish.hpp index 86ef0bd10..25afff05d 100644 --- a/src/rpp/rpp/operators/publish.hpp +++ b/src/rpp/rpp/operators/publish.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include namespace rpp::operators { From cf4c6405dd494289e325e0b8d75e70d92b35ee8c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 4 Aug 2024 22:55:40 +0300 Subject: [PATCH 13/20] extend reactors --- .../rppgrpc/rppgrpc/server_reactor.hpp | 102 ++++++++++++++++++ src/tests/rppgrpc/test_async_server.cpp | 92 ++++++++++++---- 2 files changed, 171 insertions(+), 23 deletions(-) diff --git a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp index 4368abe3a..1765c0c7c 100644 --- a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp @@ -84,4 +84,106 @@ namespace rppgrpc Base::Finish(grpc::Status::CANCELLED); } }; + + template + class server_write_reactor final : public grpc::ServerWriteReactor + , private details::base_writer + , private details::base_reader + { + using Base = grpc::ServerWriteReactor; + + public: + server_write_reactor() + { + Base::StartSendInitialMetadata(); + } + + using details::base_writer::get_observer; + using details::base_reader::get_observable; + + private: + void start_write(const Response& v) override + { + Base::StartWrite(&v); + } + + void start_read(rpp::utils::none& data) override {} + + void finish_writes(const grpc::Status& status) override + { + Base::Finish(status); + } + + void OnWriteDone(bool ok) override + { + if (!ok) + { + Base::Finish(grpc::Status::OK); + return; + } + + details::base_writer::handle_write_done(); + } + + void OnDone() override + { + details::base_writer::handle_on_done(); + details::base_reader::handle_on_done({}); + + delete this; + } + + void OnCancel() override + { + details::base_writer::handle_on_done(); + details::base_reader::handle_on_done(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); + + Base::Finish(grpc::Status::CANCELLED); + } + }; + + template + class server_read_reactor final : public grpc::ServerReadReactor + , private details::base_reader + { + using Base = grpc::ServerReadReactor; + + public: + server_read_reactor() + { + Base::StartSendInitialMetadata(); + details::base_reader::handle_read_done(true); + } + + using details::base_reader::get_observable; + + private: + void start_read(Request& data) override + { + Base::StartRead(&data); + } + + void OnReadDone(bool ok) override + { + if (!ok) + { + Base::Finish(grpc::Status::OK); + return; + } + + details::base_reader::handle_read_done(); + } + + void OnDone() override + { + details::base_reader::handle_on_done({}); + + delete this; + } + + void OnCancel() override + { + details::base_reader::handle_on_done(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); + } + }; } // namespace rppgrpc diff --git a/src/tests/rppgrpc/test_async_server.cpp b/src/tests/rppgrpc/test_async_server.cpp index 441202f8a..061ca58ee 100644 --- a/src/tests/rppgrpc/test_async_server.cpp +++ b/src/tests/rppgrpc/test_async_server.cpp @@ -14,16 +14,17 @@ struct service : public trompeloeil::mock_interface { - MAKE_MOCK2(ServerSide, (grpc::ServerWriteReactor<::Response> * (::grpc::CallbackServerContext* /*context*/, const ::Request* /*request*/))); - MAKE_MOCK2(ClientSide, (::grpc::ServerReadReactor<::Request> * (::grpc::CallbackServerContext* /*context*/, ::Response* /*response*/))); - MAKE_MOCK1(Bidirectional, (::grpc::ServerBidiReactor<::Request, ::Response> * (::grpc::CallbackServerContext* /*context*/))); + MAKE_MOCK2(ServerSide, (grpc::ServerWriteReactor * (grpc::CallbackServerContext* /*context*/, const Request* /*request*/))); + MAKE_MOCK2(ClientSide, (grpc::ServerReadReactor * (grpc::CallbackServerContext* /*context*/, Response* /*response*/))); + MAKE_MOCK1(Bidirectional, (grpc::ServerBidiReactor * (grpc::CallbackServerContext* /*context*/))); }; void wait(const std::unique_ptr& e) { while (!e->is_satisfied()) { - std::this_thread::sleep_for(std::chrono::seconds{1}); + std::this_thread::sleep_for(std::chrono::milliseconds{1}); + std::this_thread::yield(); } } @@ -42,30 +43,23 @@ TEST_CASE("Async server") const auto channel = server->InProcessChannel({}); const auto stub = TestService::NewStub(channel, {}); - mock_observer out_mock{}; - - SECTION("bidirectionl") - { - grpc::ClientContext ctx{}; - const auto reactor = new rppgrpc::server_bidi_reactor(); - reactor->get_observable() | rpp::ops::map([](const Request& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); - - ::grpc::CallbackServerContext* obtained_context{}; - const auto bidirectional_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_)).LR_SIDE_EFFECT(obtained_context = _1;).RETURN(reactor).IN_SEQUENCE(s); - - const auto writer = stub->Bidirectional(&ctx); - - wait(bidirectional_call); + mock_observer out_mock{}; + grpc::ClientContext ctx{}; + grpc::CallbackServerContext* obtained_context{}; + auto test_common = [&out_mock, &ctx, &obtained_context, &s](const auto& writer, const auto* reactor) { SECTION("writer immediate finish") { const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); auto t = std::thread{[&] { - writer->WritesDone(); + if constexpr (requires { writer->WritesDone(); }) + writer->WritesDone(); CHECK(writer->Finish().ok()); }}; - reactor->get_observer().on_completed(); + if constexpr (requires { reactor->get_observer(); }) + reactor->get_observer().on_completed(); + t.join(); wait(last); } @@ -83,10 +77,11 @@ TEST_CASE("Async server") obtained_context->TryCancel(); wait(last); } + }; + auto test_read = [&out_mock, &ctx, &obtained_context, &s](const auto& writer, const auto* reactor) { SECTION("writer writes") { - REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s); @@ -101,7 +96,8 @@ TEST_CASE("Async server") writer->WritesDone(); }}.join(); - reactor->get_observer().on_completed(); + if constexpr (requires { reactor->get_observer(); }) + reactor->get_observer().on_completed(); std::thread{[&] { REQUIRE(writer->Finish().ok()); @@ -109,7 +105,9 @@ TEST_CASE("Async server") wait(last); } + }; + auto test_write = [&out_mock, &ctx, &obtained_context, &s](const auto& writer, const auto* reactor) { SECTION("writer reads") { const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); @@ -122,7 +120,8 @@ TEST_CASE("Async server") reactor->get_observer().on_completed(); std::thread{[&] { - writer->WritesDone(); + if constexpr (requires { writer->WritesDone(); }) + writer->WritesDone(); Response response{}; for (int i : {1, 2, 3}) @@ -135,6 +134,53 @@ TEST_CASE("Async server") }}.join(); wait(last); } + }; + + SECTION("bidirectionl") + { + const auto reactor = new rppgrpc::server_bidi_reactor(); + reactor->get_observable() | rpp::ops::map([](const Request& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + + const auto call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_)).LR_SIDE_EFFECT(obtained_context = _1;).RETURN(reactor).IN_SEQUENCE(s); + + const auto writer = stub->Bidirectional(&ctx); + + wait(call); + + test_common(writer, reactor); + test_read(writer, reactor); + test_write(writer, reactor); + } + + SECTION("server-side") + { + const auto reactor = new rppgrpc::server_write_reactor(); + reactor->get_observable() | rpp::ops::map([](const rpp::utils::none&) { return 0; }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + + const auto call = NAMED_REQUIRE_CALL(*mock_service, ServerSide(trompeloeil::_, trompeloeil::_)).LR_SIDE_EFFECT(obtained_context = _1;).RETURN(reactor).IN_SEQUENCE(s); + + const auto writer = stub->ServerSide(&ctx, {}); + + wait(call); + + test_common(writer, reactor); + test_write(writer, reactor); + } + + SECTION("client-side") + { + const auto reactor = new rppgrpc::server_read_reactor(); + reactor->get_observable() | rpp::ops::map([](const Request& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + + const auto call = NAMED_REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_)).LR_SIDE_EFFECT(obtained_context = _1;).RETURN(reactor).IN_SEQUENCE(s); + + Response response{}; + const auto writer = stub->ClientSide(&ctx, &response); + + wait(call); + + test_common(writer, reactor); + test_read(writer, reactor); } server->Shutdown(); From 1e32271020642e14a208873e70d56013fe5058c8 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 4 Aug 2024 23:15:58 +0300 Subject: [PATCH 14/20] compile fix --- src/tests/rppgrpc/test_async_server.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tests/rppgrpc/test_async_server.cpp b/src/tests/rppgrpc/test_async_server.cpp index 061ca58ee..d8b25001c 100644 --- a/src/tests/rppgrpc/test_async_server.cpp +++ b/src/tests/rppgrpc/test_async_server.cpp @@ -47,7 +47,7 @@ TEST_CASE("Async server") grpc::ClientContext ctx{}; grpc::CallbackServerContext* obtained_context{}; - auto test_common = [&out_mock, &ctx, &obtained_context, &s](const auto& writer, const auto* reactor) { + auto test_common = [&](const auto& writer, const auto* reactor) { SECTION("writer immediate finish") { const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); @@ -79,7 +79,7 @@ TEST_CASE("Async server") } }; - auto test_read = [&out_mock, &ctx, &obtained_context, &s](const auto& writer, const auto* reactor) { + auto test_read = [&](const auto& writer, const auto* reactor) { SECTION("writer writes") { REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); @@ -107,7 +107,7 @@ TEST_CASE("Async server") } }; - auto test_write = [&out_mock, &ctx, &obtained_context, &s](const auto& writer, const auto* reactor) { + auto test_write = [&](const auto& writer, const auto* reactor) { SECTION("writer reads") { const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); From a21cc668dcd70e42514823632d2948bd5854c5d8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 4 Aug 2024 20:38:06 +0000 Subject: [PATCH 15/20] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/operators/publish.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rpp/rpp/operators/publish.hpp b/src/rpp/rpp/operators/publish.hpp index eb386c760..25afff05d 100644 --- a/src/rpp/rpp/operators/publish.hpp +++ b/src/rpp/rpp/operators/publish.hpp @@ -1,6 +1,5 @@ #pragma once -#include #include #include From 6de781bbf694dfc346cb2a679bcfbcb8f8b93a9a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 4 Aug 2024 23:53:28 +0300 Subject: [PATCH 16/20] fix --- src/tests/rppgrpc/test_async_server.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/tests/rppgrpc/test_async_server.cpp b/src/tests/rppgrpc/test_async_server.cpp index d8b25001c..f8f7c9562 100644 --- a/src/tests/rppgrpc/test_async_server.cpp +++ b/src/tests/rppgrpc/test_async_server.cpp @@ -14,9 +14,13 @@ struct service : public trompeloeil::mock_interface { - MAKE_MOCK2(ServerSide, (grpc::ServerWriteReactor * (grpc::CallbackServerContext* /*context*/, const Request* /*request*/))); - MAKE_MOCK2(ClientSide, (grpc::ServerReadReactor * (grpc::CallbackServerContext* /*context*/, Response* /*response*/))); - MAKE_MOCK1(Bidirectional, (grpc::ServerBidiReactor * (grpc::CallbackServerContext* /*context*/))); + using write_reactor_ptr = grpc::ServerWriteReactor*; + using read_reactor_ptr = grpc::ServerReadReactor*; + using bidi_reactor_ptr = grpc::ServerBidiReactor*; + + MAKE_MOCK2(ServerSide, write_reactor_ptr(grpc::CallbackServerContext* /*context*/, const Request* /*request*/)); + MAKE_MOCK2(ClientSide, read_reactor_ptr(grpc::CallbackServerContext* /*context*/, Response* /*response*/)); + MAKE_MOCK1(Bidirectional, bidi_reactor_ptr(grpc::CallbackServerContext* /*context*/)); }; void wait(const std::unique_ptr& e) From 0fada07560cfdbe0104f2042f0237af3f100a4b8 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 5 Aug 2024 22:28:44 +0300 Subject: [PATCH 17/20] hide in namesapce --- src/tests/rppgrpc/test_async_client.cpp | 19 +++++++---------- src/tests/rppgrpc/test_async_server.cpp | 27 ++++++++++--------------- src/tests/utils/rpp_trompeloil.hpp | 10 ++++++++- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp index 880314d96..ee870bdaf 100644 --- a/src/tests/rppgrpc/test_async_client.cpp +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -15,20 +15,15 @@ #include -struct service : public trompeloeil::mock_interface +namespace { - IMPLEMENT_MOCK3(ServerSide); - IMPLEMENT_MOCK3(ClientSide); - IMPLEMENT_MOCK2(Bidirectional); -}; - -void wait(const std::unique_ptr& e) -{ - while (!e->is_satisfied()) + struct service : public trompeloeil::mock_interface { - std::this_thread::sleep_for(std::chrono::seconds{1}); - } -} + IMPLEMENT_MOCK3(ServerSide); + IMPLEMENT_MOCK3(ClientSide); + IMPLEMENT_MOCK2(Bidirectional); + }; +} // namespace TEST_CASE("async client reactor") { diff --git a/src/tests/rppgrpc/test_async_server.cpp b/src/tests/rppgrpc/test_async_server.cpp index f8f7c9562..76b7eb28a 100644 --- a/src/tests/rppgrpc/test_async_server.cpp +++ b/src/tests/rppgrpc/test_async_server.cpp @@ -12,25 +12,20 @@ #include "rpp_trompeloil.hpp" -struct service : public trompeloeil::mock_interface -{ - using write_reactor_ptr = grpc::ServerWriteReactor*; - using read_reactor_ptr = grpc::ServerReadReactor*; - using bidi_reactor_ptr = grpc::ServerBidiReactor*; - - MAKE_MOCK2(ServerSide, write_reactor_ptr(grpc::CallbackServerContext* /*context*/, const Request* /*request*/)); - MAKE_MOCK2(ClientSide, read_reactor_ptr(grpc::CallbackServerContext* /*context*/, Response* /*response*/)); - MAKE_MOCK1(Bidirectional, bidi_reactor_ptr(grpc::CallbackServerContext* /*context*/)); -}; -void wait(const std::unique_ptr& e) +namespace { - while (!e->is_satisfied()) + struct service : public trompeloeil::mock_interface { - std::this_thread::sleep_for(std::chrono::milliseconds{1}); - std::this_thread::yield(); - } -} + using write_reactor_ptr = grpc::ServerWriteReactor*; + using read_reactor_ptr = grpc::ServerReadReactor*; + using bidi_reactor_ptr = grpc::ServerBidiReactor*; + + MAKE_MOCK2(ServerSide, write_reactor_ptr(grpc::CallbackServerContext* /*context*/, const Request* /*request*/)); + MAKE_MOCK2(ClientSide, read_reactor_ptr(grpc::CallbackServerContext* /*context*/, Response* /*response*/)); + MAKE_MOCK1(Bidirectional, bidi_reactor_ptr(grpc::CallbackServerContext* /*context*/)); + }; +} // namespace TEST_CASE("Async server") diff --git a/src/tests/utils/rpp_trompeloil.hpp b/src/tests/utils/rpp_trompeloil.hpp index edd7bebc4..c2c817960 100644 --- a/src/tests/utils/rpp_trompeloil.hpp +++ b/src/tests/utils/rpp_trompeloil.hpp @@ -8,7 +8,7 @@ #include #include - +#include namespace trompeloeil { @@ -69,3 +69,11 @@ class mock_observer private: std::shared_ptr impl = std::make_shared(); }; + +inline void wait(const std::unique_ptr& e) +{ + while (!e->is_satisfied()) + { + std::this_thread::sleep_for(std::chrono::seconds{1}); + } +} From aa44888329ecded69fc8359f47c55c2905c18bb9 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Thu, 8 Aug 2024 22:02:57 +0300 Subject: [PATCH 18/20] comments --- .../rppgrpc/rppgrpc/client_reactor.hpp | 6 ++--- .../rppgrpc/rppgrpc/server_reactor.hpp | 25 +++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index c4f183983..7109fe879 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -22,7 +22,7 @@ namespace rppgrpc /** * @brief RPP's based implementation for grpc client bidirectional reactor. * @details To use it you need: - * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - create it via `new` operator * - pass it to `stub->async()->GrpcBidirectionalStream(ctx, reactor);` * - call `reactor->init()` method for actual starting of grpc logic * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) @@ -93,7 +93,7 @@ namespace rppgrpc /** * @brief RPP's based implementation for grpc client write reactor * @details To use it you need: - * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - create it via `new` operator * - pass it to `stub->async()->GrpcWriteStream(ctx, &request, reactor);` * - call `reactor->init()` method for actual starting of grpc logic * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` @@ -154,7 +154,7 @@ namespace rppgrpc /** * @brief RPP's based implementation for grpc client read reactor. * @details To use it you need: - * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - create it via `new` operator * - pass it to `stub->async()->GrpcReadStream(ctx, &response, reactor);` * - call `reactor->init()` method for actual starting of grpc logic * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) diff --git a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp index 1765c0c7c..2e79fb501 100644 --- a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp @@ -19,6 +19,16 @@ namespace rppgrpc { + /** + * @brief RPP's based implementation for grpc server bidirectional reactor. + * @details To use it you need: + * - create it via `new` operator + * - return it from bidirection method of CallbackService interface + * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) + * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` + * + * @warning grpc server reactor have to finish manually, so it is expected that you call `on_completed()` on reactor->get_observer() + */ template class server_bidi_reactor final : public grpc::ServerBidiReactor , private details::base_writer @@ -85,6 +95,14 @@ namespace rppgrpc } }; + /** + * @brief RPP's based implementation for grpc server write reactor. + * @details To use it you need: + * - create it via `new` operator + * - return it from write-based method of CallbackService interface + * - reactor provides `reactor->get_observable()` method but such as observable emits nothing and can be used only to be notified about completion/error + * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` + */ template class server_write_reactor final : public grpc::ServerWriteReactor , private details::base_writer @@ -142,6 +160,13 @@ namespace rppgrpc } }; + /** + * @brief RPP's based implementation for grpc server read reactor. + * @details To use it you need: + * - create it via `new` operator + * - return it from read-based method of CallbackService interface + * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) + */ template class server_read_reactor final : public grpc::ServerReadReactor , private details::base_reader From 6c7dd4cb99b9e4cbe5ce34d4a8e0cab69af12aae Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 11 Aug 2024 20:22:19 +0300 Subject: [PATCH 19/20] extend docs --- .../rppgrpc/doxygen/server_reactor.cpp | 52 +++++++++++++++++++ .../rppgrpc/rppgrpc/client_reactor.hpp | 9 ++++ src/extensions/rppgrpc/rppgrpc/fwd.hpp | 9 ++++ .../rppgrpc/rppgrpc/server_reactor.hpp | 9 ++++ src/rpp/rpp/observables/observable.hpp | 9 ++++ src/rpp/rpp/operators/subscribe.hpp | 11 ++++ 6 files changed, 99 insertions(+) create mode 100644 src/examples/rppgrpc/doxygen/server_reactor.cpp diff --git a/src/examples/rppgrpc/doxygen/server_reactor.cpp b/src/examples/rppgrpc/doxygen/server_reactor.cpp new file mode 100644 index 000000000..8f74c7ceb --- /dev/null +++ b/src/examples/rppgrpc/doxygen/server_reactor.cpp @@ -0,0 +1,52 @@ +#include + +#include +#include + +#include "protocol.grpc.pb.h" + +/** + * \example server_reactor.cpp + **/ + +class Server : public TestService::CallbackService +{ + + //! [read_reactor] + grpc::ServerReadReactor* ClientSide(grpc::CallbackServerContext* /*context*/, Response* /*response*/) override + { + const auto reactor = new rppgrpc::server_read_reactor(); + reactor->get_observable().subscribe([](const Request&) {}, []() { std::cout << "DONE" << std::endl; }); + return reactor; + } + //! [read_reactor] + + //! [bidi_reactor] + grpc::ServerBidiReactor* Bidirectional(grpc::CallbackServerContext* /*context*/) override + { + const auto reactor = new rppgrpc::server_bidi_reactor(); + reactor->get_observable().subscribe([](const Request&) {}, []() { std::cout << "DONE" << std::endl; }); + + reactor->get_observer().on_next(Response{}); + + return reactor; + } + //! [bidi_reactor] + + //! [write_reactor] + grpc::ServerWriteReactor* ServerSide(grpc::CallbackServerContext* /*context*/, const Request* /*request*/) override + { + const auto reactor = new rppgrpc::server_write_reactor(); + reactor->get_observable().subscribe([]() { std::cout << "DONE" << std::endl; }); + + reactor->get_observer().on_next(Response{}); + + return reactor; + } + //! [write_reactor] +}; + +int main() +{ + return 0; +} diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp index 7109fe879..2e1ff1cfc 100644 --- a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -27,6 +27,9 @@ namespace rppgrpc * - call `reactor->init()` method for actual starting of grpc logic * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` + * + * @snippet client_reactor.cpp bidi_reactor + * */ template class client_bidi_reactor final : public grpc::ClientBidiReactor @@ -98,6 +101,9 @@ namespace rppgrpc * - call `reactor->init()` method for actual starting of grpc logic * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` * - reactor provides `reactor->get_observable()` method but such as observable emits nothing and can be used only to be notified about completion/error + * + * @snippet client_reactor.cpp write_reactor + * */ template class client_write_reactor final : public grpc::ClientWriteReactor @@ -158,6 +164,9 @@ namespace rppgrpc * - pass it to `stub->async()->GrpcReadStream(ctx, &response, reactor);` * - call `reactor->init()` method for actual starting of grpc logic * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) + * + * @snippet client_reactor.cpp read_reactor + * */ template class client_read_reactor final : public grpc::ClientReadReactor diff --git a/src/extensions/rppgrpc/rppgrpc/fwd.hpp b/src/extensions/rppgrpc/rppgrpc/fwd.hpp index d807537f8..4f1114a43 100644 --- a/src/extensions/rppgrpc/rppgrpc/fwd.hpp +++ b/src/extensions/rppgrpc/rppgrpc/fwd.hpp @@ -27,4 +27,13 @@ namespace rppgrpc template class client_read_reactor; + + template + class server_bidi_reactor; + + template + class server_write_reactor; + + template + class server_read_reactor; } // namespace rppgrpc diff --git a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp index 2e79fb501..392d8aae1 100644 --- a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp +++ b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp @@ -28,6 +28,9 @@ namespace rppgrpc * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` * * @warning grpc server reactor have to finish manually, so it is expected that you call `on_completed()` on reactor->get_observer() + * + * @snippet server_reactor.cpp bidi_reactor + * */ template class server_bidi_reactor final : public grpc::ServerBidiReactor @@ -102,6 +105,9 @@ namespace rppgrpc * - return it from write-based method of CallbackService interface * - reactor provides `reactor->get_observable()` method but such as observable emits nothing and can be used only to be notified about completion/error * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` + * + * @snippet server_reactor.cpp write_reactor + * */ template class server_write_reactor final : public grpc::ServerWriteReactor @@ -166,6 +172,9 @@ namespace rppgrpc * - create it via `new` operator * - return it from read-based method of CallbackService interface * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) + * + * @snippet server_reactor.cpp read_reactor + * */ template class server_read_reactor final : public grpc::ServerReadReactor diff --git a/src/rpp/rpp/observables/observable.hpp b/src/rpp/rpp/observables/observable.hpp index 6d3d6ca29..a1c86ed32 100644 --- a/src/rpp/rpp/observables/observable.hpp +++ b/src/rpp/rpp/observables/observable.hpp @@ -202,6 +202,15 @@ namespace rpp subscribe(std::forward(on_next), rpp::utils::rethrow_error_t{}, std::forward(on_completed)); } + /** + * @brief Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable + */ + template OnCompleted> + void subscribe(OnCompleted&& on_completed) const + { + subscribe(rpp::utils::empty_function_any_t{}, rpp::utils::rethrow_error_t{}, std::forward(on_completed)); + } + /** * @brief Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable * diff --git a/src/rpp/rpp/operators/subscribe.hpp b/src/rpp/rpp/operators/subscribe.hpp index 84ca2933d..6b494b01a 100644 --- a/src/rpp/rpp/operators/subscribe.hpp +++ b/src/rpp/rpp/operators/subscribe.hpp @@ -319,6 +319,17 @@ namespace rpp::operators return details::subscribe_t{std::forward(on_next), rpp::utils::rethrow_error_t{}, std::forward(on_completed)}; } + /** + * @brief Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable + * + * @ingroup utility_operators + */ + template OnCompleted> + auto subscribe(OnCompleted&& on_completed) + { + return details::subscribe_t{rpp::utils::empty_function_any_t{}, rpp::utils::rethrow_error_t{}, std::forward(on_completed)}; + } + /** * @brief Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable * @details This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed. From c16c66a127bb22224a7816ad670181956e32716a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 11 Aug 2024 22:36:51 +0300 Subject: [PATCH 20/20] fix --- src/examples/rppgrpc/doxygen/server_reactor.cpp | 2 +- src/rpp/rpp/observables/observable.hpp | 9 --------- src/rpp/rpp/operators/subscribe.hpp | 11 ----------- 3 files changed, 1 insertion(+), 21 deletions(-) diff --git a/src/examples/rppgrpc/doxygen/server_reactor.cpp b/src/examples/rppgrpc/doxygen/server_reactor.cpp index 8f74c7ceb..ebfb5fcc8 100644 --- a/src/examples/rppgrpc/doxygen/server_reactor.cpp +++ b/src/examples/rppgrpc/doxygen/server_reactor.cpp @@ -37,7 +37,7 @@ class Server : public TestService::CallbackService grpc::ServerWriteReactor* ServerSide(grpc::CallbackServerContext* /*context*/, const Request* /*request*/) override { const auto reactor = new rppgrpc::server_write_reactor(); - reactor->get_observable().subscribe([]() { std::cout << "DONE" << std::endl; }); + reactor->get_observable().subscribe([](rpp::utils::none) {}, []() { std::cout << "DONE" << std::endl; }); reactor->get_observer().on_next(Response{}); diff --git a/src/rpp/rpp/observables/observable.hpp b/src/rpp/rpp/observables/observable.hpp index a1c86ed32..6d3d6ca29 100644 --- a/src/rpp/rpp/observables/observable.hpp +++ b/src/rpp/rpp/observables/observable.hpp @@ -202,15 +202,6 @@ namespace rpp subscribe(std::forward(on_next), rpp::utils::rethrow_error_t{}, std::forward(on_completed)); } - /** - * @brief Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable - */ - template OnCompleted> - void subscribe(OnCompleted&& on_completed) const - { - subscribe(rpp::utils::empty_function_any_t{}, rpp::utils::rethrow_error_t{}, std::forward(on_completed)); - } - /** * @brief Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable * diff --git a/src/rpp/rpp/operators/subscribe.hpp b/src/rpp/rpp/operators/subscribe.hpp index 6b494b01a..84ca2933d 100644 --- a/src/rpp/rpp/operators/subscribe.hpp +++ b/src/rpp/rpp/operators/subscribe.hpp @@ -319,17 +319,6 @@ namespace rpp::operators return details::subscribe_t{std::forward(on_next), rpp::utils::rethrow_error_t{}, std::forward(on_completed)}; } - /** - * @brief Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable - * - * @ingroup utility_operators - */ - template OnCompleted> - auto subscribe(OnCompleted&& on_completed) - { - return details::subscribe_t{rpp::utils::empty_function_any_t{}, rpp::utils::rethrow_error_t{}, std::forward(on_completed)}; - } - /** * @brief Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable * @details This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed.