From 640df5f96d58f4f0c9fa6f090bb40342e0d7ef8c Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 13 Mar 2020 08:51:15 -0400
Subject: [PATCH 1/3] ARROW-8112: [FlightRPC][C++] make sure status codes
round-trip through gRPC
---
cpp/src/arrow/flight/flight_test.cc | 18 ++++++++++++++++++
cpp/src/arrow/flight/internal.cc | 4 ++++
2 files changed, 22 insertions(+)
diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc
index 7e0d414a07cf..ad4ebc9875e3 100644
--- a/cpp/src/arrow/flight/flight_test.cc
+++ b/cpp/src/arrow/flight/flight_test.cc
@@ -258,6 +258,24 @@ TEST(TestFlight, RoundtripStatus) {
MakeFlightError(FlightStatusCode::Unavailable, "Test message"));
ASSERT_NE(nullptr, detail);
ASSERT_EQ(FlightStatusCode::Unavailable, detail->code());
+
+ Status status = internal::FromGrpcStatus(
+ internal::ToGrpcStatus(Status::NotImplemented("Sentinel")));
+ ASSERT_TRUE(status.IsNotImplemented());
+ ASSERT_THAT(status.message(), ::testing::HasSubstr("Sentinel"));
+
+ status = internal::FromGrpcStatus(internal::ToGrpcStatus(Status::Invalid("Sentinel")));
+ ASSERT_TRUE(status.IsInvalid());
+ ASSERT_THAT(status.message(), ::testing::HasSubstr("Sentinel"));
+
+ status = internal::FromGrpcStatus(internal::ToGrpcStatus(Status::KeyError("Sentinel")));
+ ASSERT_TRUE(status.IsKeyError());
+ ASSERT_THAT(status.message(), ::testing::HasSubstr("Sentinel"));
+
+ status =
+ internal::FromGrpcStatus(internal::ToGrpcStatus(Status::AlreadyExists("Sentinel")));
+ ASSERT_TRUE(status.IsAlreadyExists());
+ ASSERT_THAT(status.message(), ::testing::HasSubstr("Sentinel"));
}
TEST(TestFlight, GetPort) {
diff --git a/cpp/src/arrow/flight/internal.cc b/cpp/src/arrow/flight/internal.cc
index 39c0b92e263e..b96dad7478d4 100644
--- a/cpp/src/arrow/flight/internal.cc
+++ b/cpp/src/arrow/flight/internal.cc
@@ -164,6 +164,10 @@ grpc::Status ToGrpcStatus(const Status& arrow_status) {
grpc_code = grpc::StatusCode::UNIMPLEMENTED;
} else if (arrow_status.IsInvalid()) {
grpc_code = grpc::StatusCode::INVALID_ARGUMENT;
+ } else if (arrow_status.IsKeyError()) {
+ grpc_code = grpc::StatusCode::NOT_FOUND;
+ } else if (arrow_status.IsAlreadyExists()) {
+ grpc_code = grpc::StatusCode::ALREADY_EXISTS;
}
return grpc::Status(grpc_code, message);
}
From fef48460a461f39d292d5e38daf8d183180d38d1 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 13 Mar 2020 09:58:36 -0400
Subject: [PATCH 2/3] ARROW-8112: [FlightRPC][C++] Store exact Arrow status in
headers
---
cpp/src/arrow/flight/client.cc | 16 ++--
cpp/src/arrow/flight/flight_test.cc | 7 ++
cpp/src/arrow/flight/internal.cc | 115 +++++++++++++++++++++++++++-
cpp/src/arrow/flight/internal.h | 20 ++++-
cpp/src/arrow/flight/server.cc | 24 +++---
cpp/src/arrow/flight/test_util.cc | 6 ++
python/pyarrow/tests/test_flight.py | 13 +++-
7 files changed, 175 insertions(+), 26 deletions(-)
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index 036d6d334d4a..eae31a525534 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -306,7 +306,7 @@ class GrpcIpcMessageReader : public ipc::MessageReader {
protected:
Status OverrideWithServerError(Status&& st) {
// Get the gRPC status if not OK, to propagate any server error message
- RETURN_NOT_OK(internal::FromGrpcStatus(stream_->Finish()));
+ RETURN_NOT_OK(internal::FromGrpcStatus(stream_->Finish(), &rpc_->context));
return std::move(st);
}
@@ -458,7 +458,7 @@ class DoPutPayloadWriter : public ipc::internal::IpcPayloadWriter {
pb::PutResult message;
while (writer_->Read(&message)) {
}
- RETURN_NOT_OK(internal::FromGrpcStatus(writer_->Finish()));
+ RETURN_NOT_OK(internal::FromGrpcStatus(writer_->Finish(), &rpc_->context));
if (!finished_writes) {
return Status::UnknownError(
"Could not finish writing record batches before closing");
@@ -577,7 +577,7 @@ class FlightClient::FlightClientImpl {
RETURN_NOT_OK(auth_handler_->Authenticate(&outgoing, &incoming));
// Explicitly close our side of the connection
bool finished_writes = stream->WritesDone();
- RETURN_NOT_OK(internal::FromGrpcStatus(stream->Finish()));
+ RETURN_NOT_OK(internal::FromGrpcStatus(stream->Finish(), &rpc.context));
if (!finished_writes) {
return Status::UnknownError("Could not finish writing before closing");
}
@@ -604,7 +604,7 @@ class FlightClient::FlightClientImpl {
}
listing->reset(new SimpleFlightListing(std::move(flights)));
- return internal::FromGrpcStatus(stream->Finish());
+ return internal::FromGrpcStatus(stream->Finish(), &rpc.context);
}
Status DoAction(const FlightCallOptions& options, const Action& action,
@@ -628,7 +628,7 @@ class FlightClient::FlightClientImpl {
*results = std::unique_ptr(
new SimpleResultStream(std::move(materialized_results)));
- return internal::FromGrpcStatus(stream->Finish());
+ return internal::FromGrpcStatus(stream->Finish(), &rpc.context);
}
Status ListActions(const FlightCallOptions& options, std::vector* types) {
@@ -645,7 +645,7 @@ class FlightClient::FlightClientImpl {
RETURN_NOT_OK(internal::FromProto(pb_type, &type));
types->emplace_back(std::move(type));
}
- return internal::FromGrpcStatus(stream->Finish());
+ return internal::FromGrpcStatus(stream->Finish(), &rpc.context);
}
Status GetFlightInfo(const FlightCallOptions& options,
@@ -659,7 +659,7 @@ class FlightClient::FlightClientImpl {
ClientRpc rpc(options);
RETURN_NOT_OK(rpc.SetToken(auth_handler_.get()));
Status s = internal::FromGrpcStatus(
- stub_->GetFlightInfo(&rpc.context, pb_descriptor, &pb_response));
+ stub_->GetFlightInfo(&rpc.context, pb_descriptor, &pb_response), &rpc.context);
RETURN_NOT_OK(s);
FlightInfo::Data info_data;
@@ -678,7 +678,7 @@ class FlightClient::FlightClientImpl {
ClientRpc rpc(options);
RETURN_NOT_OK(rpc.SetToken(auth_handler_.get()));
Status s = internal::FromGrpcStatus(
- stub_->GetSchema(&rpc.context, pb_descriptor, &pb_response));
+ stub_->GetSchema(&rpc.context, pb_descriptor, &pb_response), &rpc.context);
RETURN_NOT_OK(s);
std::string str;
diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc
index ad4ebc9875e3..5b1b273f3beb 100644
--- a/cpp/src/arrow/flight/flight_test.cc
+++ b/cpp/src/arrow/flight/flight_test.cc
@@ -983,6 +983,13 @@ TEST_F(TestFlightClient, DoAction) {
ASSERT_EQ(nullptr, result);
}
+TEST_F(TestFlightClient, RoundTripStatus) {
+ const auto descr = FlightDescriptor::Command("status-outofmemory");
+ std::unique_ptr info;
+ const auto status = client_->GetFlightInfo(descr, &info);
+ ASSERT_RAISES(OutOfMemory, status);
+}
+
TEST_F(TestFlightClient, Issue5095) {
// Make sure the server-side error message is reflected to the
// client
diff --git a/cpp/src/arrow/flight/internal.cc b/cpp/src/arrow/flight/internal.cc
index b96dad7478d4..cac68bca258f 100644
--- a/cpp/src/arrow/flight/internal.cc
+++ b/cpp/src/arrow/flight/internal.cc
@@ -20,6 +20,7 @@
#include "arrow/flight/protocol_internal.h"
#include
+#include