From 30945c3399ac267d6e9f295a6fef96823439ecb4 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Wed, 9 Apr 2025 22:22:55 +0800 Subject: [PATCH 1/2] Support various payload of baidu-std: json, proto-json and proto-text --- src/brpc/compress.cpp | 8 +- src/brpc/compress.h | 15 ++ src/brpc/controller.cpp | 2 + src/brpc/controller.h | 19 ++ src/brpc/global.cpp | 23 ++- src/brpc/options.proto | 7 + src/brpc/policy/baidu_rpc_meta.proto | 1 + src/brpc/policy/baidu_rpc_protocol.cpp | 217 ++++++++++++++++++++--- src/brpc/policy/baidu_rpc_protocol.h | 7 + src/brpc/policy/gzip_compress.cpp | 216 ++++++++++++++++++---- src/brpc/policy/gzip_compress.h | 25 +++ src/brpc/policy/snappy_compress.cpp | 107 +++++++++-- src/brpc/policy/snappy_compress.h | 12 ++ src/brpc/protocol.cpp | 7 +- src/brpc/serialized_request.h | 1 - src/brpc/serialized_response.h | 1 - src/json2pb/json_to_pb.cpp | 7 +- src/json2pb/json_to_pb.h | 6 +- src/json2pb/pb_to_json.cpp | 9 +- src/json2pb/pb_to_json.h | 13 +- src/json2pb/protobuf_type_resolver.h | 2 + test/brpc_http_rpc_protocol_unittest.cpp | 11 +- test/brpc_server_unittest.cpp | 189 ++++++++++++++------ test/brpc_socket_unittest.cpp | 43 +++-- test/bthread_cond_unittest.cpp | 2 +- 25 files changed, 771 insertions(+), 179 deletions(-) diff --git a/src/brpc/compress.cpp b/src/brpc/compress.cpp index dd9ba825aa..f068678a36 100644 --- a/src/brpc/compress.cpp +++ b/src/brpc/compress.cpp @@ -17,14 +17,16 @@ #include "butil/logging.h" +#include "json2pb/json_to_pb.h" #include "brpc/compress.h" #include "brpc/protocol.h" - namespace brpc { static const int MAX_HANDLER_SIZE = 1024; -static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = { { NULL, NULL, NULL } }; +static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = { + { NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL } +}; int RegisterCompressHandler(CompressType type, CompressHandler handler) { @@ -47,7 +49,7 @@ int RegisterCompressHandler(CompressType type, // Find CompressHandler by type. // Returns NULL if not found -inline const CompressHandler* FindCompressHandler(CompressType type) { +const CompressHandler* FindCompressHandler(CompressType type) { int index = type; if (index < 0 || index >= MAX_HANDLER_SIZE) { LOG(ERROR) << "CompressType=" << type << " is out of range"; diff --git a/src/brpc/compress.h b/src/brpc/compress.h index 0b0fcb17e2..725cf1ec5a 100644 --- a/src/brpc/compress.h +++ b/src/brpc/compress.h @@ -21,6 +21,8 @@ #include // Message #include "butil/iobuf.h" // butil::IOBuf +#include "json2pb/pb_to_json.h" +#include "json2pb/json_to_pb.h" #include "brpc/options.pb.h" // CompressType namespace brpc { @@ -29,10 +31,20 @@ struct CompressHandler { // Compress serialized `msg' into `buf'. // Returns true on success, false otherwise bool (*Compress)(const google::protobuf::Message& msg, butil::IOBuf* buf); + bool (*Compress2Json)(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2JsonOptions& options); + bool (*Compress2ProtoJson)(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2ProtoJsonOptions& options); + bool (*Compress2ProtoText)(const google::protobuf::Message& msg, butil::IOBuf* buf); // Parse decompressed `data' as `msg'. // Returns true on success, false otherwise bool (*Decompress)(const butil::IOBuf& data, google::protobuf::Message* msg); + bool (*DecompressFromJson)(const butil::IOBuf& data, google::protobuf::Message* msg, + const json2pb::Json2PbOptions& options); + bool (*DecompressFromProtoJson)(const butil::IOBuf& data, google::protobuf::Message* msg, + const json2pb::ProtoJson2PbOptions& options); + bool (*DecompressFromProtoText)(const butil::IOBuf& data, google::protobuf::Message* msg); // Name of the compression algorithm, must be string constant. const char* name; @@ -42,6 +54,9 @@ struct CompressHandler { // Returns 0 on success, -1 otherwise int RegisterCompressHandler(CompressType type, CompressHandler handler); +// Returns CompressHandler pointer of `type' if registered, NULL otherwise. +const CompressHandler* FindCompressHandler(CompressType type); + // Returns the `name' of the CompressType if registered const char* CompressTypeToCStr(CompressType type); diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 271ccfa485..0cb83dc50e 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -290,6 +290,8 @@ void Controller::ResetPods() { _http_response = NULL; _request_user_fields = NULL; _response_user_fields = NULL; + _request_content_type = CONTENT_TYPE_PB; + _response_content_type = CONTENT_TYPE_PB; _request_streams.clear(); _response_streams.clear(); _remote_stream_settings = NULL; diff --git a/src/brpc/controller.h b/src/brpc/controller.h index d9799f889b..000aee2f50 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -616,6 +616,20 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); void CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res); + void set_request_content_type(ContentType type) { + _request_content_type = type; + } + ContentType request_content_type() const { + return _request_content_type; + } + + void set_response_content_type(ContentType type) { + _response_content_type = type; + } + ContentType response_content_type() const { + return _response_content_type; + } + private: struct CompletionInfo { CallId id; // call_id of the corresponding request @@ -859,6 +873,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); butil::IOBuf _request_attachment; butil::IOBuf _response_attachment; + // Only SerializedRequest supports `_request_content_type'. + ContentType _request_content_type; + // Only SerializedResponse supports `_response_content_type'. + ContentType _response_content_type; + // Writable progressive attachment butil::intrusive_ptr _wpa; // Readable progressive attachment diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index d45c67f054..a98ac49068 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -388,25 +388,34 @@ static void GlobalInitializeOrDieImpl() { LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb); // Compress Handlers - const CompressHandler gzip_compress = - { GzipCompress, GzipDecompress, "gzip" }; + CompressHandler gzip_compress = { + GzipCompress, GzipCompress2Json, GzipCompress2ProtoJson, + GzipCompress2ProtoText, GzipDecompress, GzipDecompressFromJson, + GzipDecompressFromProtoJson, GzipDecompressFromProtoText, "gzip" + }; if (RegisterCompressHandler(COMPRESS_TYPE_GZIP, gzip_compress) != 0) { exit(1); } - const CompressHandler zlib_compress = - { ZlibCompress, ZlibDecompress, "zlib" }; + CompressHandler zlib_compress = { + ZlibCompress, ZlibCompress2Json, ZlibCompress2ProtoJson, + ZlibCompress2ProtoText, ZlibDecompress, ZlibDecompressFromJson, + ZlibDecompressFromProtoJson, ZlibDecompressFromProtoText, "zlib" + }; if (RegisterCompressHandler(COMPRESS_TYPE_ZLIB, zlib_compress) != 0) { exit(1); } - const CompressHandler snappy_compress = - { SnappyCompress, SnappyDecompress, "snappy" }; + CompressHandler snappy_compress = { + SnappyCompress, SnappyCompress2Json, SnappyCompress2ProtoJson, + SnappyCompress2ProtoText, SnappyDecompress, SnappyDecompressFromJson, + SnappyDecompressFromProtoJson, SnappyDecompressFromProtoText, "snappy" + }; if (RegisterCompressHandler(COMPRESS_TYPE_SNAPPY, snappy_compress) != 0) { exit(1); } // Protocols Protocol baidu_protocol = { ParseRpcMessage, - SerializeRequestDefault, PackRpcRequest, + SerializeRpcRequest, PackRpcRequest, ProcessRpcRequest, ProcessRpcResponse, VerifyRpcRequest, NULL, NULL, CONNECTION_TYPE_ALL, "baidu_std" }; diff --git a/src/brpc/options.proto b/src/brpc/options.proto index 3e34b5f6f6..e334c48ea0 100644 --- a/src/brpc/options.proto +++ b/src/brpc/options.proto @@ -74,6 +74,13 @@ enum CompressType { COMPRESS_TYPE_LZ4 = 4; } +enum ContentType { + CONTENT_TYPE_PB = 0; + CONTENT_TYPE_JSON = 1; + CONTENT_TYPE_PROTO_JSON = 2; + CONTENT_TYPE_PROTO_TEXT = 3; +} + message ChunkInfo { required int64 stream_id = 1; required int64 chunk_id = 2; diff --git a/src/brpc/policy/baidu_rpc_meta.proto b/src/brpc/policy/baidu_rpc_meta.proto index 300564bbee..591798310a 100644 --- a/src/brpc/policy/baidu_rpc_meta.proto +++ b/src/brpc/policy/baidu_rpc_meta.proto @@ -33,6 +33,7 @@ message RpcMeta { optional bytes authentication_data = 7; optional StreamSettings stream_settings = 8; map user_fields = 9; + optional ContentType content_type = 10; } message RpcRequestMeta { diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 269f0645a9..9b6065afcc 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -20,11 +20,12 @@ #include // Message #include #include +#include #include "butil/logging.h" // LOG() -#include "butil/time.h" #include "butil/iobuf.h" // butil::IOBuf #include "butil/raw_pack.h" // RawPacker RawUnpacker #include "butil/memory/scope_guard.h" +#include "json2pb/json_to_pb.h" #include "brpc/controller.h" // Controller #include "brpc/socket.h" // Socket #include "brpc/server.h" // Server @@ -56,6 +57,8 @@ DEFINE_bool(baidu_protocol_use_fullname, true, DEFINE_bool(baidu_std_protocol_deliver_timeout_ms, false, "If this flag is true, baidu_std puts timeout_ms in requests."); +DECLARE_bool(pb_enum_as_number); + // Notes: // 1. 12-byte header [PRPC][body_size][meta_size] // 2. body_size and meta_size are in network byte order @@ -137,9 +140,64 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, return MakeMessage(msg); } +static json2pb::Pb2JsonOptions MakePb2JsonOptions(Controller& cntl) { + json2pb::Pb2JsonOptions options; + options.bytes_to_base64 = cntl.has_pb_bytes_to_base64(); + options.jsonify_empty_array = cntl.has_pb_jsonify_empty_array(); + options.always_print_primitive_fields = cntl.has_always_print_primitive_fields(); + options.single_repeated_to_array = cntl.has_pb_single_repeated_to_array(); + options.enum_option = FLAGS_pb_enum_as_number + ? json2pb::OUTPUT_ENUM_BY_NUMBER + : json2pb::OUTPUT_ENUM_BY_NAME; + return options; +} + +bool SerializeRpcMessage(const google::protobuf::Message& message, + Controller& cntl, ContentType content_type, + CompressType compress_type, butil::IOBuf* buf, + std::string* error) { + if (COMPRESS_TYPE_NONE == compress_type) { + butil::IOBufAsZeroCopyOutputStream wrapper(buf); + if (CONTENT_TYPE_PB == content_type) { + return message.SerializeToZeroCopyStream(&wrapper); + } else if (CONTENT_TYPE_JSON == content_type ) { + json2pb::Pb2JsonOptions options = MakePb2JsonOptions(cntl); + return json2pb::ProtoMessageToJson(message, &wrapper, options, error); + } else if (CONTENT_TYPE_PROTO_JSON == content_type) { + json2pb::Pb2ProtoJsonOptions options; + AlwaysPrintPrimitiveFields(options) = cntl.has_always_print_primitive_fields(); + return json2pb::ProtoMessageToProtoJson(message, &wrapper, options, error); + } else if (CONTENT_TYPE_PROTO_TEXT == content_type) { + return google::protobuf::TextFormat::Print(message, &wrapper); + } + return false; + } + + const CompressHandler* handler = FindCompressHandler(compress_type); + if (NULL == handler) { + return false; + } + if (CONTENT_TYPE_PB == content_type) { + return handler->Compress(message, buf); + } + butil::IOBufAsZeroCopyOutputStream wrapper(buf); + if (CONTENT_TYPE_JSON == content_type) { + json2pb::Pb2JsonOptions options = MakePb2JsonOptions(cntl); + return handler->Compress2Json(message, buf, options); + } else if (CONTENT_TYPE_PROTO_JSON == content_type) { + json2pb::Pb2ProtoJsonOptions options; + options.always_print_enums_as_ints = FLAGS_pb_enum_as_number; + AlwaysPrintPrimitiveFields(options) = cntl.has_always_print_primitive_fields(); + return handler->Compress2ProtoJson(message, buf, options); + } else if (CONTENT_TYPE_PROTO_TEXT == content_type) { + return handler->Compress2ProtoText(message, buf); + } + + return false; +} + static bool SerializeResponse(const google::protobuf::Message& res, - Controller& cntl, CompressType compress_type, - butil::IOBuf& buf) { + Controller& cntl, butil::IOBuf& buf) { if (res.GetDescriptor() == SerializedResponse::descriptor()) { buf.swap(((SerializedResponse&)res).serialized_data()); return true; @@ -150,9 +208,16 @@ static bool SerializeResponse(const google::protobuf::Message& res, "Missing required fields in response: %s", res.InitializationErrorString().c_str()); return false; - } else if (!SerializeAsCompressedData(res, &buf, compress_type)) { - cntl.SetFailed(ERESPONSE, - "Fail to serialize response, CompressType=%s", + } + + ContentType content_type = cntl.response_content_type(); + CompressType compress_type = cntl.response_compress_type(); + std::string error; + if (!SerializeRpcMessage(res, cntl, content_type, + compress_type, &buf, &error)) { + cntl.SetFailed(ERESPONSE, "Fail to serialize response, " + "ContentType=%s, CompressType=%s", + ContentTypeToCStr(content_type), CompressTypeToCStr(compress_type)); return false; } @@ -234,8 +299,7 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, // response either CompressType compress_type = cntl->response_compress_type(); if (res != NULL && !cntl->Failed()) { - append_body = SerializeResponse( - *res, *cntl, compress_type, res_body); + append_body = SerializeResponse(*res, *cntl, res_body); } // Don't use res->ByteSize() since it may be compressed @@ -262,6 +326,7 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, } meta.set_correlation_id(correlation_id); meta.set_compress_type(compress_type); + meta.set_content_type(cntl->response_content_type()); if (attached_size > 0) { meta.set_attachment_size(attached_size); } @@ -408,6 +473,52 @@ void EndRunningCallMethodInPool( return EndRunningUserCodeInPool(CallMethodInBackupThread, args); }; +bool DeserializeRpcMessage(const butil::IOBuf& data, Controller& cntl, + ContentType content_type, CompressType compress_type, + google::protobuf::Message* message, std::string* error) { + if (COMPRESS_TYPE_NONE != compress_type) { + const CompressHandler* handler = FindCompressHandler(compress_type); + if (NULL == handler) { + return false; + } + + if (CONTENT_TYPE_PB == content_type) { + return handler->Decompress(data, message); + } else if (CONTENT_TYPE_JSON == content_type) { + butil::IOBufAsZeroCopyInputStream wrapper(data); + json2pb::Json2PbOptions options; + options.base64_to_bytes = cntl.has_pb_bytes_to_base64(); + options.array_to_single_repeated = cntl.has_pb_single_repeated_to_array(); + return handler->DecompressFromJson(data, message, options); + } else if (CONTENT_TYPE_PROTO_JSON == content_type) { + json2pb::ProtoJson2PbOptions options; + options.ignore_unknown_fields = true; + return handler->DecompressFromProtoJson(data, message, options); + } else if (CONTENT_TYPE_PROTO_TEXT == content_type) { + return handler->DecompressFromProtoText(data, message); + } + return false; + } + + butil::IOBufAsZeroCopyInputStream wrapper(data); + if (CONTENT_TYPE_PB == content_type) { + return ParsePbFromZeroCopyStream(message, &wrapper); + } else if (CONTENT_TYPE_JSON == content_type) { + json2pb::Json2PbOptions options; + options.base64_to_bytes = cntl.has_pb_bytes_to_base64(); + options.array_to_single_repeated = cntl.has_pb_single_repeated_to_array(); + return json2pb::JsonToProtoMessage(&wrapper, message, options, error); + } else if (CONTENT_TYPE_PROTO_JSON == content_type) { + json2pb::ProtoJson2PbOptions options; + options.ignore_unknown_fields = true; + return json2pb::ProtoJsonToProtoMessage(&wrapper, message, options, error); + } else if (CONTENT_TYPE_PROTO_TEXT == content_type) { + return google::protobuf::TextFormat::Parse(&wrapper, message); + } + + return false; +} + void ProcessRpcRequest(InputMessageBase* msg_base) { const int64_t start_parse_us = butil::cpuwide_time_us(); DestroyingPtr msg(static_cast(msg_base)); @@ -458,6 +569,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { if (request_meta.has_timeout_ms()) { cntl->set_timeout_ms(request_meta.timeout_ms()); } + cntl->set_request_content_type(meta.content_type()); cntl->set_request_compress_type((CompressType)meta.compress_type()); accessor.set_server(server) .set_security_mode(security_mode) @@ -640,12 +752,17 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { cntl->request_attachment().swap(msg->payload); } - auto req_cmp_type = static_cast(meta.compress_type()); + ContentType content_type = meta.content_type(); + auto compress_type = static_cast(meta.compress_type()); messages = server->options().rpc_pb_message_factory->Get(*svc, *method); - if (!ParseFromCompressedData(req_buf, messages->Request(), req_cmp_type)) { - cntl->SetFailed(EREQUEST, "Fail to parse request message, " - "CompressType=%s, request_size=%d", - CompressTypeToCStr(req_cmp_type), req_size); + std::string error; + if (!DeserializeRpcMessage(req_buf, *cntl, content_type, compress_type, + messages->Request(), &error)) { + cntl->SetFailed( + EREQUEST, "Fail to parse request message, ContentType=%s," + "CompressType=%s, request_size=%d : %s", + ContentTypeToCStr(content_type), CompressTypeToCStr(compress_type), + req_size, error.c_str()); break; } req_buf.clear(); @@ -654,11 +771,9 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { // `socket' will be held until response has been sent google::protobuf::Closure* done = ::brpc::NewCallback< int64_t, Controller*, RpcPBMessages*, - const Server*, MethodStatus*, int64_t>(&SendRpcResponse, - meta.correlation_id(), - cntl.get(), messages, - server, method_status, - msg->received_us()); + const Server*, MethodStatus*, int64_t>( + &SendRpcResponse, meta.correlation_id(),cntl.get(), + messages, server, method_status, msg->received_us()); // optional, just release resource ASAP msg.reset(); @@ -799,8 +914,7 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { if (meta.has_attachment_size()) { if (meta.attachment_size() > res_size) { cntl->SetFailed( - ERESPONSE, - "attachment_size=%d is larger than response_size=%d", + ERESPONSE, "attachment_size=%d is larger than response_size=%d", meta.attachment_size(), res_size); break; } @@ -810,20 +924,24 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { cntl->response_attachment().swap(msg->payload); } - auto res_cmp_type = (CompressType)meta.compress_type(); - cntl->set_response_compress_type(res_cmp_type); + ContentType content_type = meta.content_type(); + auto compress_type = (CompressType)meta.compress_type(); + cntl->set_response_content_type(content_type); + cntl->set_response_compress_type(compress_type); if (cntl->response()) { + std::string error; if (cntl->response()->GetDescriptor() == SerializedResponse::descriptor()) { ((SerializedResponse*)cntl->response())-> serialized_data().append(*res_buf_ptr); - } else if (!ParseFromCompressedData( - *res_buf_ptr, cntl->response(), res_cmp_type)) { + } else if (!DeserializeRpcMessage(*res_buf_ptr, *cntl, content_type, + compress_type, cntl->response(), &error)) { cntl->SetFailed( - ERESPONSE, "Fail to parse response message, " - "CompressType=%s, response_size=%d", - CompressTypeToCStr(res_cmp_type), res_size); + EREQUEST, "Fail to parse request message, ContentType=%s," + "CompressType=%s, request_size=%d : %s", + ContentTypeToCStr(content_type), CompressTypeToCStr(compress_type), + res_size, error.c_str()); } - } // else silently ignore the response. + } // else silently ignore the response. } while (0); // Unlocks correlation_id inside. Revert controller's // error code if it version check of `cid' fails @@ -831,6 +949,33 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { accessor.OnResponse(cid, saved_error); } +void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl, + const google::protobuf::Message* request) { + // Check sanity of request. + if (NULL == request) { + return cntl->SetFailed(EREQUEST, "`request' is NULL"); + } + if (request->GetDescriptor() == SerializedRequest::descriptor()) { + request_buf->append(((SerializedRequest*)request)->serialized_data()); + return; + } + if (!request->IsInitialized()) { + return cntl->SetFailed(EREQUEST, "Missing required fields in request: %s", + request->InitializationErrorString().c_str()); + } + + ContentType content_type = cntl->request_content_type(); + CompressType compress_type = cntl->request_compress_type(); + std::string error; + if (!SerializeRpcMessage(*request, *cntl, content_type, + compress_type, request_buf, &error)) { + return cntl->SetFailed(EREQUEST, "Fail to compress request, " + "ContentType=%s, CompressType=%s", + ContentTypeToCStr(content_type), + CompressTypeToCStr(compress_type)); + } +} + void PackRpcRequest(butil::IOBuf* req_buf, SocketMessage**, uint64_t correlation_id, @@ -904,6 +1049,7 @@ void PackRpcRequest(butil::IOBuf* req_buf, request_meta->set_timeout_ms(accessor.real_timeout_ms()); } } + meta.set_content_type(cntl->request_content_type()); Span* span = accessor.span(); if (span) { @@ -919,5 +1065,20 @@ void PackRpcRequest(butil::IOBuf* req_buf, } } +const char* ContentTypeToCStr(ContentType content_type) { + switch (content_type) { + case CONTENT_TYPE_PB: + return "pb"; + case CONTENT_TYPE_JSON: + return "json"; + case CONTENT_TYPE_PROTO_JSON: + return "proto-json"; + case CONTENT_TYPE_PROTO_TEXT: + return "proto-text"; + default: + return "unknown"; + } +} + } // namespace policy } // namespace brpc diff --git a/src/brpc/policy/baidu_rpc_protocol.h b/src/brpc/policy/baidu_rpc_protocol.h index e3e4954b0c..77ecc780a2 100644 --- a/src/brpc/policy/baidu_rpc_protocol.h +++ b/src/brpc/policy/baidu_rpc_protocol.h @@ -37,6 +37,10 @@ void ProcessRpcResponse(InputMessageBase* msg); // Verify authentication information in baidu_std format bool VerifyRpcRequest(const InputMessageBase* msg); +// Serialize `request' into `buf'. +void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl, + const google::protobuf::Message* request); + // Pack `request' to `method' into `buf'. void PackRpcRequest(butil::IOBuf* buf, SocketMessage**, @@ -46,6 +50,9 @@ void PackRpcRequest(butil::IOBuf* buf, const butil::IOBuf& request, const Authenticator* auth); +// Returns the `name' of the 'content_type'. +const char* ContentTypeToCStr(ContentType content_type); + } // namespace policy } // namespace brpc diff --git a/src/brpc/policy/gzip_compress.cpp b/src/brpc/policy/gzip_compress.cpp index 35367e3ea1..95327750d8 100644 --- a/src/brpc/policy/gzip_compress.cpp +++ b/src/brpc/policy/gzip_compress.cpp @@ -17,6 +17,7 @@ #include // GzipXXXStream +#include #include "butil/logging.h" #include "brpc/policy/gzip_compress.h" #include "brpc/protocol.h" @@ -25,45 +26,171 @@ namespace brpc { namespace policy { -static void LogError(const google::protobuf::io::GzipOutputStream& gzip) { - if (gzip.ZlibErrorMessage()) { - LOG(WARNING) << "Fail to decompress: " << gzip.ZlibErrorMessage(); +template +void LogError(const char* error_message1 = NULL, + const char* error_message2 = NULL) { + if (IsCompress) { + LOG(WARNING) << "Fail to compress. " + << (NULL == error_message1 ? "" : error_message1) << " " + << (NULL == error_message2 ? "" : error_message2); } else { - LOG(WARNING) << "Fail to decompress."; + LOG(WARNING) << "Fail to decompress." + << error_message1 << " " + << error_message2; } } -static void LogError(const google::protobuf::io::GzipInputStream& gzip) { - if (gzip.ZlibErrorMessage()) { - LOG(WARNING) << "Fail to decompress: " << gzip.ZlibErrorMessage(); - } else { - LOG(WARNING) << "Fail to decompress."; +static bool Compress(const google::protobuf::Message& msg, butil::IOBuf* buf, + google::protobuf::io::GzipOutputStream::Format format) { + butil::IOBufAsZeroCopyOutputStream wrapper(buf); + google::protobuf::io::GzipOutputStream::Options options; + options.format = format; + google::protobuf::io::GzipOutputStream gzip(&wrapper, options); + if (!msg.SerializeToZeroCopyStream(&gzip)) { + LogError(gzip.ZlibErrorMessage()); + return false; } + return gzip.Close(); } -bool GzipCompress(const google::protobuf::Message& msg, butil::IOBuf* buf) { +static bool Compress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2JsonOptions& options, + google::protobuf::io::GzipOutputStream::Format format) { butil::IOBufAsZeroCopyOutputStream wrapper(buf); google::protobuf::io::GzipOutputStream::Options gzip_opt; - gzip_opt.format = google::protobuf::io::GzipOutputStream::GZIP; + gzip_opt.format = format; google::protobuf::io::GzipOutputStream gzip(&wrapper, gzip_opt); - if (!msg.SerializeToZeroCopyStream(&gzip)) { - LogError(gzip); + std::string error; + if (!json2pb::ProtoMessageToJson(msg, &gzip, options, &error)) { + LogError(error.c_str(), gzip.ZlibErrorMessage()); return false; } return gzip.Close(); } -bool GzipDecompress(const butil::IOBuf& data, google::protobuf::Message* msg) { +static bool Compress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2ProtoJsonOptions& options, + google::protobuf::io::GzipOutputStream::Format format) { + butil::IOBufAsZeroCopyOutputStream wrapper(buf); + google::protobuf::io::GzipOutputStream::Options gzip_opt; + gzip_opt.format = format; + google::protobuf::io::GzipOutputStream gzip(&wrapper, gzip_opt); + std::string error; + if (!json2pb::ProtoMessageToProtoJson(msg, &gzip, options, &error)) { + LogError(error.c_str(), gzip.ZlibErrorMessage()); + return false; + } + return gzip.Close(); +} + +static bool Compress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf, + google::protobuf::io::GzipOutputStream::Format format) { + butil::IOBufAsZeroCopyOutputStream wrapper(buf); + google::protobuf::io::GzipOutputStream::Options gzip_opt; + gzip_opt.format = format; + google::protobuf::io::GzipOutputStream gzip(&wrapper, gzip_opt); + if (!google::protobuf::TextFormat::Print(msg, &gzip)) { + LogError(gzip.ZlibErrorMessage()); + return false; + } + return gzip.Close(); +} + +static bool Decompress(const butil::IOBuf& data, google::protobuf::Message* msg, + google::protobuf::io::GzipInputStream::Format format) { butil::IOBufAsZeroCopyInputStream wrapper(data); - google::protobuf::io::GzipInputStream gzip( - &wrapper, google::protobuf::io::GzipInputStream::GZIP); + google::protobuf::io::GzipInputStream gzip(&wrapper, format); if (!ParsePbFromZeroCopyStream(msg, &gzip)) { - LogError(gzip); + LogError(gzip.ZlibErrorMessage(), gzip.ZlibErrorMessage()); return false; } return true; } +static bool DecompressFromJson(const butil::IOBuf& data, + google::protobuf::Message* msg, + const json2pb::Json2PbOptions& options, + google::protobuf::io::GzipInputStream::Format format) { + butil::IOBufAsZeroCopyInputStream wrapper(data); + google::protobuf::io::GzipInputStream gzip(&wrapper, format); + std::string error; + if (!json2pb::JsonToProtoMessage(&gzip, msg, options, &error)) { + LogError(error.c_str(), gzip.ZlibErrorMessage()); + return false; + } + return true; +} + +static bool DecompressFromProtoJson(const butil::IOBuf& data, + google::protobuf::Message* msg, + const json2pb::ProtoJson2PbOptions& options, + google::protobuf::io::GzipInputStream::Format format) { + butil::IOBufAsZeroCopyInputStream wrapper(data); + google::protobuf::io::GzipInputStream gzip(&wrapper, format); + std::string error; + if (!json2pb::ProtoJsonToProtoMessage(&gzip, msg, options, &error)) { + LogError(error.c_str(), gzip.ZlibErrorMessage()); + return false; + } + return true; +} + +static bool DecompressFromProtoText(const butil::IOBuf& data, + google::protobuf::Message* msg, + google::protobuf::io::GzipInputStream::Format format) { + butil::IOBufAsZeroCopyInputStream wrapper(data); + google::protobuf::io::GzipInputStream gzip(&wrapper, format); + std::string error; + if (!google::protobuf::TextFormat::Parse(&gzip, msg)) { + LogError(error.c_str(), gzip.ZlibErrorMessage()); + return false; + } + return true; +} + +bool GzipCompress(const google::protobuf::Message& msg, butil::IOBuf* buf) { + return Compress(msg, buf, google::protobuf::io::GzipOutputStream::GZIP); +} + +bool GzipCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2JsonOptions& options) { + return Compress2Json( + msg, buf, options, google::protobuf::io::GzipOutputStream::GZIP); +} + +bool GzipCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2ProtoJsonOptions& options) { + return Compress2ProtoJson( + msg, buf, options, google::protobuf::io::GzipOutputStream::GZIP); +} + +bool GzipCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf) { + return Compress2ProtoText(msg, buf, google::protobuf::io::GzipOutputStream::GZIP); +} + +bool GzipDecompress(const butil::IOBuf& data, google::protobuf::Message* msg) { + return Decompress(data, msg, google::protobuf::io::GzipInputStream::GZIP); +} + +bool GzipDecompressFromJson(const butil::IOBuf& data, google::protobuf::Message* msg, + const json2pb::Json2PbOptions& options) { + return DecompressFromJson( + data, msg, options, google::protobuf::io::GzipInputStream::GZIP); +} + +bool GzipDecompressFromProtoJson(const butil::IOBuf& data, + google::protobuf::Message* msg, + const json2pb::ProtoJson2PbOptions& options) { + return DecompressFromProtoJson( + data, msg, options, google::protobuf::io::GzipInputStream::GZIP); +} + +bool GzipDecompressFromProtoText(const butil::IOBuf& data, + google::protobuf::Message* msg) { + return DecompressFromProtoText( + data, msg, google::protobuf::io::GzipInputStream::GZIP); +} + bool GzipCompress(const butil::IOBuf& msg, butil::IOBuf* buf, const GzipCompressOptions* options_in) { butil::IOBufAsZeroCopyOutputStream wrapper(buf); @@ -93,7 +220,7 @@ bool GzipCompress(const butil::IOBuf& msg, butil::IOBuf* buf, } if (size_in != 0 || (size_t)in.ByteCount() != msg.size()) { // If any stage is not fully consumed, something went wrong. - LogError(out); + LogError(out.ZlibErrorMessage()); return false; } if (size_out != 0) { @@ -132,7 +259,7 @@ inline bool GzipDecompressBase( // If any stage is not fully consumed, something went wrong. // Here we call in.Next addtitionally to make sure that the gzip // "blackbox" does not have buffer left. - LogError(in); + LogError(in.ZlibErrorMessage()); return false; } if (size_out != 0) { @@ -141,19 +268,48 @@ inline bool GzipDecompressBase( return true; } -bool ZlibCompress(const google::protobuf::Message& res, butil::IOBuf* buf) { - butil::IOBufAsZeroCopyOutputStream wrapper(buf); - google::protobuf::io::GzipOutputStream::Options zlib_opt; - zlib_opt.format = google::protobuf::io::GzipOutputStream::ZLIB; - google::protobuf::io::GzipOutputStream zlib(&wrapper, zlib_opt); - return res.SerializeToZeroCopyStream(&zlib) && zlib.Close(); +bool ZlibCompress(const google::protobuf::Message& msg, butil::IOBuf* buf) { + return Compress(msg, buf, google::protobuf::io::GzipOutputStream::ZLIB); } -bool ZlibDecompress(const butil::IOBuf& data, google::protobuf::Message* req) { - butil::IOBufAsZeroCopyInputStream wrapper(data); - google::protobuf::io::GzipInputStream zlib( - &wrapper, google::protobuf::io::GzipInputStream::ZLIB); - return ParsePbFromZeroCopyStream(req, &zlib); +bool ZlibCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2JsonOptions& options) { + return Compress2Json( + msg, buf, options, google::protobuf::io::GzipOutputStream::ZLIB); +} + +bool ZlibCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2ProtoJsonOptions& options) { + return Compress2ProtoJson( + msg, buf, options, google::protobuf::io::GzipOutputStream::ZLIB); +} + +bool ZlibCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf) { + return Compress2ProtoText(msg, buf, google::protobuf::io::GzipOutputStream::ZLIB); +} + +bool ZlibDecompress(const butil::IOBuf& data, + google::protobuf::Message* msg) { + return Decompress(data, msg, google::protobuf::io::GzipInputStream::ZLIB); +} + +bool ZlibDecompressFromJson(const butil::IOBuf& data, + google::protobuf::Message* msg, + const json2pb::Json2PbOptions& options) { + return DecompressFromJson( + data, msg, options, google::protobuf::io::GzipInputStream::ZLIB); +} + +bool ZlibDecompressFromProtoJson(const butil::IOBuf& data, + google::protobuf::Message* msg, + const json2pb::ProtoJson2PbOptions& options) { + return DecompressFromProtoJson( + data, msg, options, google::protobuf::io::GzipInputStream::ZLIB); +} + +bool ZlibDecompressFromProtoText(const butil::IOBuf& data, + google::protobuf::Message* msg) { + return DecompressFromProtoText(data, msg, google::protobuf::io::GzipInputStream::ZLIB); } bool GzipDecompress(const butil::IOBuf& data, butil::IOBuf* msg) { diff --git a/src/brpc/policy/gzip_compress.h b/src/brpc/policy/gzip_compress.h index 426aaf504b..fe1efb0250 100644 --- a/src/brpc/policy/gzip_compress.h +++ b/src/brpc/policy/gzip_compress.h @@ -22,6 +22,8 @@ #include // Message #include #include "butil/iobuf.h" // butil::IOBuf +#include "json2pb/pb_to_json.h" +#include "json2pb/json_to_pb.h" namespace brpc { @@ -31,11 +33,34 @@ typedef google::protobuf::io::GzipOutputStream::Options GzipCompressOptions; // Compress serialized `msg' into `buf'. bool GzipCompress(const google::protobuf::Message& msg, butil::IOBuf* buf); +bool GzipCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2JsonOptions& options); +bool GzipCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2ProtoJsonOptions& options); +bool GzipCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf); + bool ZlibCompress(const google::protobuf::Message& msg, butil::IOBuf* buf); +bool ZlibCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2JsonOptions& options); +bool ZlibCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2ProtoJsonOptions& options); +bool ZlibCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf); // Parse `msg' from decompressed `buf'. bool GzipDecompress(const butil::IOBuf& buf, google::protobuf::Message* msg); +bool GzipDecompressFromJson(const butil::IOBuf& buf, google::protobuf::Message* msg, + const json2pb::Json2PbOptions& options); +bool GzipDecompressFromProtoJson(const butil::IOBuf& data, google::protobuf::Message* msg, + const json2pb::ProtoJson2PbOptions& options); +bool GzipDecompressFromProtoText(const butil::IOBuf& data, google::protobuf::Message* msg); + + bool ZlibDecompress(const butil::IOBuf& buf, google::protobuf::Message* msg); +bool ZlibDecompressFromJson(const butil::IOBuf& buf, google::protobuf::Message* msg, + const json2pb::Json2PbOptions& options); +bool ZlibDecompressFromProtoJson(const butil::IOBuf& data, google::protobuf::Message* msg, + const json2pb::ProtoJson2PbOptions& options); +bool ZlibDecompressFromProtoText(const butil::IOBuf& data, google::protobuf::Message* msg); // Put compressed `in' into `out'. bool GzipCompress(const butil::IOBuf& in, butil::IOBuf* out, diff --git a/src/brpc/policy/snappy_compress.cpp b/src/brpc/policy/snappy_compress.cpp index 77e80170e1..1f731a6beb 100644 --- a/src/brpc/policy/snappy_compress.cpp +++ b/src/brpc/policy/snappy_compress.cpp @@ -16,6 +16,7 @@ // under the License. +#include #include "butil/logging.h" #include "butil/third_party/snappy/snappy.h" #include "brpc/policy/snappy_compress.h" @@ -25,29 +26,113 @@ namespace brpc { namespace policy { -bool SnappyCompress(const google::protobuf::Message& res, butil::IOBuf* buf) { +bool SnappyCompress(const google::protobuf::Message& msg, butil::IOBuf* buf) { butil::IOBuf serialized_pb; butil::IOBufAsZeroCopyOutputStream wrapper(&serialized_pb); - if (res.SerializeToZeroCopyStream(&wrapper)) { - butil::IOBufAsSnappySource source(serialized_pb); - butil::IOBufAsSnappySink sink(*buf); - return butil::snappy::Compress(&source, &sink); + if (msg.SerializeToZeroCopyStream(&wrapper)) { + return SnappyCompress(serialized_pb, buf); } - LOG(WARNING) << "Fail to serialize input pb=" << &res; + + LOG(WARNING) << "Fail to serialize input pb=" << &msg; return false; } -bool SnappyDecompress(const butil::IOBuf& data, google::protobuf::Message* req) { - butil::IOBufAsSnappySource source(data); +bool SnappyCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2JsonOptions& options) { + butil::IOBuf serialized_pb; + butil::IOBufAsZeroCopyOutputStream wrapper(&serialized_pb); + std::string error; + if (json2pb::ProtoMessageToJson(msg, &wrapper, options, &error)) { + return SnappyCompress(serialized_pb, buf); + } + + LOG(WARNING) << "Fail to serialize input pb=" << &msg << " : " << error; + return false; + +} + +bool SnappyCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2ProtoJsonOptions& options) { + butil::IOBuf serialized_pb; + butil::IOBufAsZeroCopyOutputStream wrapper(&serialized_pb); + std::string error; + if (json2pb::ProtoMessageToProtoJson(msg, &wrapper, options, &error)) { + return SnappyCompress(serialized_pb, buf); + } + + LOG(WARNING) << "Fail to serialize input pb=" << &msg << " : " << error; + return false; +} + +bool SnappyCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf) { + butil::IOBuf serialized_pb; + butil::IOBufAsZeroCopyOutputStream wrapper(&serialized_pb); + if (google::protobuf::TextFormat::Print(msg, &wrapper)) { + return SnappyCompress(serialized_pb, buf); + } + + LOG(WARNING) << "Fail to serialize input pb=" << &msg; + return false; +} + +bool SnappyDecompress(const butil::IOBuf& data, google::protobuf::Message* msg) { butil::IOBuf binary_pb; - butil::IOBufAsSnappySink sink(binary_pb); - if (butil::snappy::Uncompress(&source, &sink)) { - return ParsePbFromIOBuf(req, binary_pb); + if (SnappyDecompress(data, &binary_pb)) { + return ParsePbFromIOBuf(msg, binary_pb); } LOG(WARNING) << "Fail to snappy::Uncompress, size=" << data.size(); return false; } +bool SnappyDecompressFromJson(const butil::IOBuf& data, google::protobuf::Message* msg, + const json2pb::Json2PbOptions& options) { + butil::IOBuf json; + if (!SnappyDecompress(data, &json)) { + LOG(WARNING) << "Fail to snappy::Uncompress, size=" << data.size(); + return false; + } + + butil::IOBufAsZeroCopyInputStream wrapper(json); + std::string error; + if (json2pb::JsonToProtoMessage(&wrapper, msg, options, &error)) { + return true; + } + + LOG(WARNING) << "Fail to serialize input pb=" << &msg << " : " << error; + return false; +} +bool SnappyDecompressFromProtoJson(const butil::IOBuf& data, google::protobuf::Message* msg, + const json2pb::ProtoJson2PbOptions& options) { + butil::IOBuf json; + if (!SnappyDecompress(data, &json)) { + LOG(WARNING) << "Fail to snappy::Uncompress, size=" << data.size(); + return false; + } + + butil::IOBufAsZeroCopyInputStream wrapper(json); + std::string error; + if (json2pb::ProtoJsonToProtoMessage(&wrapper, msg, options, &error)) { + return true; + } + LOG(WARNING) << "Fail to serialize input pb=" << &msg << " : " << error; + return false; +} + +bool SnappyDecompressFromProtoText(const butil::IOBuf& data, google::protobuf::Message* msg) { + butil::IOBuf json; + if (!SnappyDecompress(data, &json)) { + LOG(WARNING) << "Fail to snappy::Uncompress, size=" << data.size(); + return false; + } + + butil::IOBufAsZeroCopyInputStream wrapper(json); + if (google::protobuf::TextFormat::Parse(&wrapper, msg)) { + return true; + } + LOG(WARNING) << "Fail to serialize input pb=" << &msg; + return false; +} + bool SnappyCompress(const butil::IOBuf& in, butil::IOBuf* out) { butil::IOBufAsSnappySource source(in); butil::IOBufAsSnappySink sink(*out); diff --git a/src/brpc/policy/snappy_compress.h b/src/brpc/policy/snappy_compress.h index 082aba90b9..85d61b3fcd 100644 --- a/src/brpc/policy/snappy_compress.h +++ b/src/brpc/policy/snappy_compress.h @@ -21,6 +21,8 @@ #include // Message #include "butil/iobuf.h" // IOBuf +#include "json2pb/pb_to_json.h" +#include "json2pb/json_to_pb.h" namespace brpc { @@ -28,9 +30,19 @@ namespace policy { // Compress serialized `msg' into `buf'. bool SnappyCompress(const google::protobuf::Message& msg, butil::IOBuf* buf); +bool SnappyCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2JsonOptions& options); +bool SnappyCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, + const json2pb::Pb2ProtoJsonOptions& options); +bool SnappyCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf); // Parse `msg' from decompressed `buf' bool SnappyDecompress(const butil::IOBuf& data, google::protobuf::Message* msg); +bool SnappyDecompressFromJson(const butil::IOBuf& data, google::protobuf::Message* msg, + const json2pb::Json2PbOptions& options); +bool SnappyDecompressFromProtoJson(const butil::IOBuf& data, google::protobuf::Message* msg, + const json2pb::ProtoJson2PbOptions& options); +bool SnappyDecompressFromProtoText(const butil::IOBuf& data, google::protobuf::Message* msg); // Put compressed `in' into `out'. bool SnappyCompress(const butil::IOBuf& in, butil::IOBuf* out); diff --git a/src/brpc/protocol.cpp b/src/brpc/protocol.cpp index e0468c22ff..9bb1fde315 100644 --- a/src/brpc/protocol.cpp +++ b/src/brpc/protocol.cpp @@ -130,17 +130,12 @@ void ListProtocols(std::vector >* vec) { } } -void SerializeRequestDefault(butil::IOBuf* buf, - Controller* cntl, +void SerializeRequestDefault(butil::IOBuf* buf, Controller* cntl, const google::protobuf::Message* request) { // Check sanity of request. if (!request) { return cntl->SetFailed(EREQUEST, "`request' is NULL"); } - if (request->GetDescriptor() == SerializedRequest::descriptor()) { - buf->append(((SerializedRequest*)request)->serialized_data()); - return; - } if (!request->IsInitialized()) { return cntl->SetFailed( EREQUEST, "Missing required fields in request: %s", diff --git a/src/brpc/serialized_request.h b/src/brpc/serialized_request.h index 00d959f36f..4d69aa42a0 100644 --- a/src/brpc/serialized_request.h +++ b/src/brpc/serialized_request.h @@ -54,7 +54,6 @@ class SerializedRequest : public NonreflectableMessage { void SharedCtor(); void SharedDtor(); -private: butil::IOBuf _serialized; }; diff --git a/src/brpc/serialized_response.h b/src/brpc/serialized_response.h index a724be4d40..acd18a2a0d 100644 --- a/src/brpc/serialized_response.h +++ b/src/brpc/serialized_response.h @@ -54,7 +54,6 @@ class SerializedResponse : public NonreflectableMessage { void SharedCtor(); void SharedDtor(); -private: butil::IOBuf _serialized; }; diff --git a/src/json2pb/json_to_pb.cpp b/src/json2pb/json_to_pb.cpp index 53887a3826..42d4772edc 100644 --- a/src/json2pb/json_to_pb.cpp +++ b/src/json2pb/json_to_pb.cpp @@ -718,14 +718,11 @@ bool ProtoJsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* json, const ProtoJson2PbOptions& options, std::string* error) { TypeResolverUniqueptr type_resolver = GetTypeResolver(*message); + std::string type_url = GetTypeUrl(*message); butil::IOBuf buf; butil::IOBufAsZeroCopyOutputStream output_stream(&buf); - std::string type_url = GetTypeUrl(*message); auto st = google::protobuf::util::JsonToBinaryStream( type_resolver.get(), type_url, json, &output_stream, options); - - butil::IOBufAsZeroCopyInputStream input_stream(buf); - google::protobuf::io::CodedInputStream decoder(&input_stream); if (!st.ok()) { if (NULL != error) { *error = st.ToString(); @@ -733,6 +730,8 @@ bool ProtoJsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* json, return false; } + butil::IOBufAsZeroCopyInputStream input_stream(buf); + google::protobuf::io::CodedInputStream decoder(&input_stream); bool ok = message->ParseFromCodedStream(&decoder); if (!ok && NULL != error) { *error = "Fail to ParseFromCodedStream"; diff --git a/src/json2pb/json_to_pb.h b/src/json2pb/json_to_pb.h index 78eb15b602..3734ef313e 100644 --- a/src/json2pb/json_to_pb.h +++ b/src/json2pb/json_to_pb.h @@ -92,11 +92,11 @@ using ProtoJson2PbOptions = google::protobuf::util::JsonParseOptions; // See https://protobuf.dev/programming-guides/json/ for details. bool ProtoJsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* json, google::protobuf::Message* message, - const ProtoJson2PbOptions& options, + const ProtoJson2PbOptions& options = ProtoJson2PbOptions(), std::string* error = NULL); -// Use default GoogleJson2PbOptions. bool ProtoJsonToProtoMessage(const std::string& json, google::protobuf::Message* message, - const ProtoJson2PbOptions& options, std::string* error = NULL); + const ProtoJson2PbOptions& options = ProtoJson2PbOptions(), + std::string* error = NULL); } // namespace json2pb diff --git a/src/json2pb/pb_to_json.cpp b/src/json2pb/pb_to_json.cpp index 0dc948148d..c23ccdf757 100644 --- a/src/json2pb/pb_to_json.cpp +++ b/src/json2pb/pb_to_json.cpp @@ -349,16 +349,15 @@ bool ProtoMessageToJson(const google::protobuf::Message& message, } bool ProtoMessageToProtoJson(const google::protobuf::Message& message, - google::protobuf::io::ZeroCopyOutputStream* json, - const Pb2ProtoJsonOptions& options, std::string* error) { - TypeResolverUniqueptr type_resolver = GetTypeResolver(message); + google::protobuf::io::ZeroCopyOutputStream* json, + const Pb2ProtoJsonOptions& options, std::string* error) { butil::IOBuf buf; butil::IOBufAsZeroCopyOutputStream output_stream(&buf); - google::protobuf::io::CodedOutputStream coded_stream(&output_stream); - if (!message.SerializeToCodedStream(&coded_stream)) { + if (!message.SerializeToZeroCopyStream(&output_stream)) { return false; } + TypeResolverUniqueptr type_resolver = GetTypeResolver(message); butil::IOBufAsZeroCopyInputStream input_stream(buf); auto st = google::protobuf::util::BinaryToJsonStream( type_resolver.get(), GetTypeUrl(message), &input_stream, json, options); diff --git a/src/json2pb/pb_to_json.h b/src/json2pb/pb_to_json.h index 33311ffb76..8de635170e 100644 --- a/src/json2pb/pb_to_json.h +++ b/src/json2pb/pb_to_json.h @@ -95,14 +95,21 @@ bool ProtoMessageToJson(const google::protobuf::Message& message, // See for details. using Pb2ProtoJsonOptions = google::protobuf::util::JsonOptions; +#if GOOGLE_PROTOBUF_VERSION >= 5026002 +#define AlwaysPrintPrimitiveFields(options) options.always_print_fields_with_no_presence +#else +#define AlwaysPrintPrimitiveFields(options) options.always_print_primitive_fields +#endif + // Convert protobuf `messge' to `json' in ProtoJSON format according to `options'. // See https://protobuf.dev/programming-guides/json/ for details. bool ProtoMessageToProtoJson(const google::protobuf::Message& message, google::protobuf::io::ZeroCopyOutputStream* json, - const Pb2ProtoJsonOptions& options, std::string* error = NULL); -// Using default GooglePb2JsonOptions. + const Pb2ProtoJsonOptions& options = Pb2ProtoJsonOptions(), + std::string* error = NULL); bool ProtoMessageToProtoJson(const google::protobuf::Message& message, std::string* json, - const Pb2ProtoJsonOptions& options, std::string* error = NULL); + const Pb2ProtoJsonOptions& options = Pb2ProtoJsonOptions(), + std::string* error = NULL); } // namespace json2pb #endif // BRPC_JSON2PB_PB_TO_JSON_H diff --git a/src/json2pb/protobuf_type_resolver.h b/src/json2pb/protobuf_type_resolver.h index 18993f1838..a73a42315e 100644 --- a/src/json2pb/protobuf_type_resolver.h +++ b/src/json2pb/protobuf_type_resolver.h @@ -35,6 +35,8 @@ inline std::string GetTypeUrl(const google::protobuf::Message& message) { message.GetDescriptor()->full_name().c_str()); } +// unique_ptr deleter for TypeResolver only deletes the object +// when it's not from the generated pool. class TypeResolverDeleter { public: explicit TypeResolverDeleter(bool is_generated_pool) diff --git a/test/brpc_http_rpc_protocol_unittest.cpp b/test/brpc_http_rpc_protocol_unittest.cpp index 578e8f8936..0eca15326a 100644 --- a/test/brpc_http_rpc_protocol_unittest.cpp +++ b/test/brpc_http_rpc_protocol_unittest.cpp @@ -189,7 +189,7 @@ class HttpTest : public ::testing::Test{ return msg; } - brpc::policy::HttpContext* MakePostJsonStdRequestMessage(const std::string& path) { + brpc::policy::HttpContext* MakePostProtoJsonRequestMessage(const std::string& path) { brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false); msg->header().uri().set_path(path); msg->header().set_content_type("application/proto-json"); @@ -199,7 +199,8 @@ class HttpTest : public ::testing::Test{ req.set_message(EXP_REQUEST); butil::IOBufAsZeroCopyOutputStream req_stream(&msg->body()); json2pb::Pb2ProtoJsonOptions options; - EXPECT_TRUE(json2pb::ProtoMessageToProtoJson(req, &req_stream, options)); + std::string error; + EXPECT_TRUE(json2pb::ProtoMessageToProtoJson(req, &req_stream, options, &error)) << error; return msg; } @@ -366,7 +367,7 @@ TEST_F(HttpTest, verify_request) { } { brpc::policy::HttpContext* msg = - MakePostJsonStdRequestMessage("/EchoService/Echo"); + MakePostProtoJsonRequestMessage("/EchoService/Echo"); VerifyMessage(msg, false); msg->Destroy(); } @@ -1847,7 +1848,7 @@ TEST_F(HttpTest, proto_json_content_type) { butil::IOBufAsZeroCopyOutputStream output_stream(&cntl.request_attachment()); ASSERT_TRUE(json2pb::ProtoMessageToProtoJson(req, &output_stream, json_options)); channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr); - ASSERT_FALSE(cntl.Failed()); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); ASSERT_EQ("application/proto-json", cntl.http_response().content_type()); json2pb::ProtoJson2PbOptions parse_options; parse_options.ignore_unknown_fields = true; @@ -1860,7 +1861,7 @@ TEST_F(HttpTest, proto_json_content_type) { cntl.http_request().set_content_type("application/proto-json"); res.Clear(); stub.Echo(&cntl, &req, &res, nullptr); - ASSERT_FALSE(cntl.Failed()); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); ASSERT_EQ(EXP_RESPONSE, res.message()); ASSERT_EQ("application/proto-json", cntl.http_response().content_type()); } diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp index 676f16626d..f0359132b9 100644 --- a/test/brpc_server_unittest.cpp +++ b/test/brpc_server_unittest.cpp @@ -30,6 +30,7 @@ #include "butil/files/scoped_file.h" #include "brpc/socket.h" #include "butil/object_pool.h" +#include "brpc/policy/baidu_rpc_protocol.h" #include "brpc/builtin/version_service.h" #include "brpc/builtin/health_service.h" #include "brpc/builtin/list_service.h" @@ -70,6 +71,13 @@ DECLARE_bool(enable_dir_service); namespace policy { DECLARE_bool(use_http_error_code); + +extern bool SerializeRpcMessage(const google::protobuf::Message& message, Controller& cntl, + ContentType content_type, CompressType compress_type, + butil::IOBuf* buf, std::string* error); +extern bool DeserializeRpcMessage(const butil::IOBuf& data, Controller& cntl, + ContentType content_type, CompressType compress_type, + google::protobuf::Message* message, std::string* error); } } @@ -1693,53 +1701,124 @@ class BaiduMasterServiceImpl : public brpc::BaiduMasterService { cntl->sampled_request()->meta.service_name()); ASSERT_TRUE(cntl->sampled_request()->meta.has_method_name()); ASSERT_EQ("Echo", cntl->sampled_request()->meta.method_name()); + brpc::ContentType content_type = cntl->request_content_type(); + brpc::CompressType compress_type = cntl->request_compress_type(); + test::EchoRequest echo_request; test::EchoResponse echo_response; - brpc::CompressType type = cntl->request_compress_type(); - ASSERT_TRUE(brpc::ParseFromCompressedData( - request->serialized_data(), &echo_request, type)); + std::string error; + ASSERT_TRUE(brpc::policy::DeserializeRpcMessage( + request->serialized_data(), *cntl, content_type, + compress_type, &echo_request, &error)) << error; ASSERT_EQ(EXP_REQUEST, echo_request.message()); ASSERT_EQ(EXP_REQUEST, cntl->request_attachment().to_string()); - echo_response.set_message(EXP_RESPONSE); - butil::IOBuf compressed_data; - ASSERT_TRUE(brpc::SerializeAsCompressedData( - echo_response, &response->serialized_data(), type)); - cntl->set_response_compress_type(type); + content_type = (brpc::ContentType)_content_type_index; + compress_type = (brpc::CompressType)_compress_type_index; + ++_compress_type_index; + if (_compress_type_index == brpc::COMPRESS_TYPE_LZ4) { + ++_compress_type_index; + } + if (_compress_type_index > brpc::CompressType_MAX) { + _compress_type_index = brpc::CompressType_MIN; + + ++_content_type_index; + if (_content_type_index > brpc::ContentType_MAX) { + _content_type_index = brpc::ContentType_MIN; + } + } + + cntl->set_response_content_type(content_type); + cntl->set_response_compress_type(compress_type); cntl->response_attachment().append(EXP_RESPONSE); + echo_response.set_message(EXP_RESPONSE); + ASSERT_TRUE(brpc::policy::SerializeRpcMessage( + echo_response, *cntl, content_type, compress_type, + &response->serialized_data(), &error)) << error; } +private: + int _content_type_index = brpc::ContentType_MIN; + int _compress_type_index = brpc::CompressType_MIN; }; -TEST_F(ServerTest, baidu_master_service) { - butil::EndPoint ep; - ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); - brpc::Server server; - EchoServiceImpl service; - ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); - brpc::ServerOptions opt; - opt.baidu_master_service = new BaiduMasterServiceImpl; - ASSERT_EQ(0, server.Start(ep, &opt)); - - brpc::Channel chan; - brpc::ChannelOptions copt; - copt.protocol = "baidu_std"; - ASSERT_EQ(0, chan.Init(ep, &copt)); +void TestBaiduMasterService(brpc::Channel& channel, brpc::CompressType compress_type) { brpc::Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(EXP_REQUEST); cntl.request_attachment().append(EXP_REQUEST); - cntl.set_request_compress_type(brpc::COMPRESS_TYPE_GZIP); - test::EchoService_Stub stub(&chan); + cntl.set_request_compress_type(compress_type); + test::EchoService_Stub stub(&channel); stub.Echo(&cntl, &req, &res, NULL); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); ASSERT_EQ(EXP_RESPONSE, res.message()); ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string()); +} + +TEST_F(ServerTest, baidu_master_service) { + butil::EndPoint ep; + ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); + brpc::Server server; + EchoServiceImpl service; + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + brpc::ServerOptions server_options; + server_options.baidu_master_service = new BaiduMasterServiceImpl; + ASSERT_EQ(0, server.Start(ep, &server_options)); + + brpc::Channel channel; + brpc::ChannelOptions channel_options; + channel_options.protocol = "baidu_std"; + ASSERT_EQ(0, channel.Init(ep, &channel_options)); + + for (int i = 0; i < 10; ++i) { + TestBaiduMasterService(channel, brpc::COMPRESS_TYPE_ZLIB); + TestBaiduMasterService(channel, brpc::COMPRESS_TYPE_GZIP); + TestBaiduMasterService(channel, brpc::COMPRESS_TYPE_SNAPPY); + TestBaiduMasterService(channel, brpc::COMPRESS_TYPE_NONE); + } ASSERT_EQ(0, server.Stop(0)); ASSERT_EQ(0, server.Join()); } +void TestGenericCall(brpc::Channel& channel, + brpc::ContentType content_type, + brpc::CompressType compress_type) { + LOG(INFO) << "TestGenericCall: content_type=" << content_type + << ", compress_type=" << compress_type; + test::EchoRequest request; + test::EchoResponse response; + request.set_message(EXP_REQUEST); + + brpc::SerializedResponse serialized_response; + brpc::SerializedRequest serialized_request; + + brpc::Controller cntl; + cntl.set_request_content_type(content_type); + cntl.set_request_compress_type(compress_type); + cntl.request_attachment().append(EXP_REQUEST); + + std::string error; + ASSERT_TRUE(brpc::policy::SerializeRpcMessage( + request, cntl, content_type, compress_type, + &serialized_request.serialized_data(), &error)) << error; + auto sampled_request = new (std::nothrow) brpc::SampledRequest(); + sampled_request->meta.set_service_name( + test::EchoService::descriptor()->full_name()); + sampled_request->meta.set_method_name( + test::EchoService::descriptor()->FindMethodByName("Echo")->name()); + cntl.reset_sampled_request(sampled_request); + + channel.CallMethod(NULL, &cntl, &serialized_request, &serialized_response, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + + ASSERT_TRUE(brpc::policy::DeserializeRpcMessage(serialized_response.serialized_data(), + cntl, cntl.response_content_type(), + cntl.response_compress_type(), + &response, &error)) << error; + ASSERT_EQ(EXP_RESPONSE, response.message()); + ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string()); +} TEST_F(ServerTest, generic_call) { butil::EndPoint ep; @@ -1747,42 +1826,34 @@ TEST_F(ServerTest, generic_call) { brpc::Server server; EchoServiceImpl service; ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); - brpc::ServerOptions opt; - opt.baidu_master_service = new BaiduMasterServiceImpl; - ASSERT_EQ(0, server.Start(ep, &opt)); - - { - brpc::Channel chan; - brpc::ChannelOptions copt; - copt.protocol = "baidu_std"; - ASSERT_EQ(0, chan.Init(ep, &copt)); - brpc::Controller cntl; - test::EchoRequest req; - test::EchoResponse res; - req.set_message(EXP_REQUEST); - - brpc::SerializedResponse serialized_response; - brpc::SerializedRequest serialized_request; - brpc::CompressType type = brpc::COMPRESS_TYPE_GZIP; - ASSERT_TRUE(brpc::SerializeAsCompressedData( - req, &serialized_request.serialized_data(), type)); - cntl.request_attachment().append(EXP_REQUEST); - cntl.set_request_compress_type(type); - auto sampled_request = new (std::nothrow) brpc::SampledRequest(); - sampled_request->meta.set_service_name( - test::EchoService::descriptor()->full_name()); - sampled_request->meta.set_method_name( - test::EchoService::descriptor()->FindMethodByName("Echo")->name()); - cntl.reset_sampled_request(sampled_request); - chan.CallMethod(NULL, &cntl, &serialized_request, &serialized_response, NULL); - ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + brpc::ServerOptions server_options; + server_options.baidu_master_service = new BaiduMasterServiceImpl; + ASSERT_EQ(0, server.Start(ep, &server_options)); - ASSERT_TRUE(brpc::ParseFromCompressedData(serialized_response.serialized_data(), - &res, cntl.response_compress_type())) - << serialized_response.serialized_data().size(); - ASSERT_EQ(EXP_RESPONSE, res.message()); - ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string()); - } + brpc::Channel channel; + brpc::ChannelOptions channel_options; + channel_options.protocol = "baidu_std"; + ASSERT_EQ(0, channel.Init(ep, &channel_options)); + + TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_ZLIB); + TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_GZIP); + TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_SNAPPY); + TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_NONE); + + TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_ZLIB); + TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_GZIP); + TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_SNAPPY); + TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_NONE); + + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, brpc::COMPRESS_TYPE_ZLIB); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, brpc::COMPRESS_TYPE_GZIP); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, brpc::COMPRESS_TYPE_SNAPPY); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, brpc::COMPRESS_TYPE_NONE); + + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, brpc::COMPRESS_TYPE_ZLIB); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, brpc::COMPRESS_TYPE_GZIP); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, brpc::COMPRESS_TYPE_SNAPPY); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, brpc::COMPRESS_TYPE_NONE); ASSERT_EQ(0, server.Stop(0)); ASSERT_EQ(0, server.Join()); diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index 0f35863fb7..8e9f90e833 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -421,6 +421,7 @@ TEST_F(SocketTest, single_threaded_connect_and_write) { ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr)); messenger->StopAccept(0); + messenger->Join(); ASSERT_EQ(-1, messenger->listened_fd()); ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD)); ASSERT_EQ(EBADF, errno); @@ -789,6 +790,7 @@ TEST_F(SocketTest, health_check) { // Must stop messenger before SetFailed the id otherwise StartHealthCheck // still has chance to get reconnected and revive the id. messenger->StopAccept(0); + messenger->Join(); ASSERT_EQ(-1, messenger->listened_fd()); ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD)); ASSERT_EQ(EBADF, errno); @@ -1408,6 +1410,7 @@ TEST_F(SocketTest, keepalive_input_message) { } messenger->StopAccept(0); + messenger->Join(); ASSERT_EQ(-1, messenger->listened_fd()); ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD)); ASSERT_EQ(EBADF, errno); @@ -1422,11 +1425,24 @@ void CheckTCPUserTimeout(int fd, int expect_tcp_user_timeout) { } TEST_F(SocketTest, tcp_user_timeout) { + brpc::Acceptor* messenger = new brpc::Acceptor; + int listening_fd = -1; + butil::EndPoint point(butil::IP_ANY, 7878); + for (int i = 0; i < 100; ++i) { + point.port += i; + listening_fd = tcp_listen(point); + if (listening_fd >= 0) { + break; + } + } + ASSERT_GT(listening_fd, 0) << berror(); + ASSERT_EQ(0, butil::make_non_blocking(listening_fd)); + ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL, false)); + { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr ptr; @@ -1436,10 +1452,9 @@ TEST_F(SocketTest, tcp_user_timeout) { { int tcp_user_timeout_ms = 1000; - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; options.tcp_user_timeout_ms = tcp_user_timeout_ms; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); @@ -1450,10 +1465,9 @@ TEST_F(SocketTest, tcp_user_timeout) { brpc::FLAGS_socket_tcp_user_timeout_ms = 2000; { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; @@ -1462,10 +1476,9 @@ TEST_F(SocketTest, tcp_user_timeout) { } { int tcp_user_timeout_ms = 3000; - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; options.tcp_user_timeout_ms = tcp_user_timeout_ms; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); @@ -1473,6 +1486,12 @@ TEST_F(SocketTest, tcp_user_timeout) { ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id; CheckTCPUserTimeout(ptr->fd(), tcp_user_timeout_ms); } + + messenger->StopAccept(0); + messenger->Join(); + ASSERT_EQ(-1, messenger->listened_fd()); + ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD)); + ASSERT_EQ(EBADF, errno); } #endif diff --git a/test/bthread_cond_unittest.cpp b/test/bthread_cond_unittest.cpp index 7342cea184..d01ef69c26 100644 --- a/test/bthread_cond_unittest.cpp +++ b/test/bthread_cond_unittest.cpp @@ -397,6 +397,7 @@ class BthreadCond { bthread_mutex_t _mutex; }; +#ifndef BUTIL_USE_ASAN volatile bool g_stop = false; bool started_wait = false; bool ended_wait = false; @@ -449,7 +450,6 @@ TEST(CondTest, too_many_bthreads_from_pthread) { launch_many_bthreads(); } -#ifndef BUTIL_USE_ASAN static void* run_launch_many_bthreads(void*) { launch_many_bthreads(); return NULL; From b9a2a73c25bdc22cc60f1885e727342dc3ea0bd8 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Fri, 11 Apr 2025 22:53:43 +0800 Subject: [PATCH 2/2] Refactor implementation of compress --- src/brpc/compress.cpp | 39 +++- src/brpc/compress.h | 145 +++++++++++++-- src/brpc/global.cpp | 18 +- src/brpc/memcache.cpp | 4 +- src/brpc/nonreflectable_message.h | 2 +- src/brpc/nshead_message.cpp | 2 +- src/brpc/policy/baidu_rpc_protocol.cpp | 242 ++++++++++++++----------- src/brpc/policy/gzip_compress.cpp | 224 +++++------------------ src/brpc/policy/gzip_compress.h | 25 --- src/brpc/policy/snappy_compress.cpp | 116 +++--------- src/brpc/policy/snappy_compress.h | 12 -- src/brpc/proto_base.proto | 3 + src/brpc/redis.cpp | 4 +- src/brpc/serialized_request.cpp | 2 +- src/brpc/serialized_response.cpp | 2 +- test/brpc_server_unittest.cpp | 22 +-- 16 files changed, 395 insertions(+), 467 deletions(-) diff --git a/src/brpc/compress.cpp b/src/brpc/compress.cpp index f068678a36..36e55c7cdc 100644 --- a/src/brpc/compress.cpp +++ b/src/brpc/compress.cpp @@ -20,13 +20,12 @@ #include "json2pb/json_to_pb.h" #include "brpc/compress.h" #include "brpc/protocol.h" +#include "brpc/proto_base.pb.h" namespace brpc { static const int MAX_HANDLER_SIZE = 1024; -static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = { - { NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL } -}; +static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = { { NULL, NULL, NULL } }; int RegisterCompressHandler(CompressType type, CompressHandler handler) { @@ -85,10 +84,14 @@ bool ParseFromCompressedData(const butil::IOBuf& data, return ParsePbFromIOBuf(msg, data); } const CompressHandler* handler = FindCompressHandler(compress_type); - if (NULL != handler) { - return handler->Decompress(data, msg); + if (NULL == handler) { + return false; } - return false; + + Deserializer deserializer([msg](google::protobuf::io::ZeroCopyInputStream* input) { + return msg->ParseFromZeroCopyStream(input); + }); + return handler->Decompress(data, &deserializer); } bool SerializeAsCompressedData(const google::protobuf::Message& msg, @@ -98,10 +101,28 @@ bool SerializeAsCompressedData(const google::protobuf::Message& msg, return msg.SerializeToZeroCopyStream(&wrapper); } const CompressHandler* handler = FindCompressHandler(compress_type); - if (NULL != handler) { - return handler->Compress(msg, buf); + if (NULL == handler) { + return false; } - return false; + + Serializer serializer([&msg](google::protobuf::io::ZeroCopyOutputStream* output) { + return msg.SerializeToZeroCopyStream(output); + }); + return handler->Compress(serializer, buf); +} + +::google::protobuf::Metadata Serializer::GetMetadata() const { + ::google::protobuf::Metadata metadata{}; + metadata.descriptor = SerializerBase::descriptor(); + metadata.reflection = nullptr; + return metadata; +} + +::google::protobuf::Metadata Deserializer::GetMetadata() const { + ::google::protobuf::Metadata metadata{}; + metadata.descriptor = DeserializerBase::descriptor(); + metadata.reflection = nullptr; + return metadata; } } // namespace brpc diff --git a/src/brpc/compress.h b/src/brpc/compress.h index 725cf1ec5a..4529959873 100644 --- a/src/brpc/compress.h +++ b/src/brpc/compress.h @@ -21,30 +21,151 @@ #include // Message #include "butil/iobuf.h" // butil::IOBuf -#include "json2pb/pb_to_json.h" -#include "json2pb/json_to_pb.h" +#include "butil/logging.h" #include "brpc/options.pb.h" // CompressType +#include "brpc/nonreflectable_message.h" namespace brpc { +// Serializer can be used to implement custom serialization +// before compression with user callback. +class Serializer : public NonreflectableMessage { +public: + using Callback = std::function; + + Serializer() :Serializer(NULL) {} + + explicit Serializer(Callback callback) + :_callback(std::move(callback)) { + SharedCtor(); + } + + ~Serializer() override { + SharedDtor(); + } + + Serializer(const Serializer& from) + : NonreflectableMessage(from) { + SharedCtor(); + MergeFrom(from); + } + + Serializer& operator=(const Serializer& from) { + CopyFrom(from); + return *this; + } + + void Swap(Serializer* other) { + if (other != this) { + } + } + + void MergeFrom(const Serializer& from) override { + CHECK_NE(&from, this); + } + + // implements Message ---------------------------------------------- + void Clear() override { + _callback = nullptr; + } + size_t ByteSizeLong() const override { return 0; } + int GetCachedSize() const PB_425_OVERRIDE { return ByteSize(); } + + ::google::protobuf::Metadata GetMetadata() const PB_527_OVERRIDE; + + // Converts the data into `output' for later compression. + bool SerializeTo(google::protobuf::io::ZeroCopyOutputStream* output) const { + if (!_callback) { + LOG(WARNING) << "Serializer::SerializeTo() called without callback"; + return false; + } + return _callback(output); + } + + void SetCallback(Callback callback) { + _callback = std::move(callback); + } + +private: + void SharedCtor() {} + void SharedDtor() {} + + Callback _callback; +}; + +// Deserializer can be used to implement custom deserialization +// after decompression with user callback. +class Deserializer : public NonreflectableMessage { +public: +public: + using Callback = std::function; + + Deserializer() :Deserializer(NULL) {} + + explicit Deserializer(Callback callback) : _callback(std::move(callback)) { + SharedCtor(); + } + + ~Deserializer() override { + SharedDtor(); + } + + Deserializer(const Deserializer& from) + : NonreflectableMessage(from) { + SharedCtor(); + MergeFrom(from); + } + + Deserializer& operator=(const Deserializer& from) { + CopyFrom(from); + return *this; + } + + void Swap(Deserializer* other) { + if (other != this) { + _callback.swap(other->_callback); + } + } + + void MergeFrom(const Deserializer& from) override { + CHECK_NE(&from, this); + _callback = from._callback; + } + + // implements Message ---------------------------------------------- + void Clear() override { _callback = nullptr; } + size_t ByteSizeLong() const override { return 0; } + int GetCachedSize() const PB_425_OVERRIDE { return ByteSize(); } + + ::google::protobuf::Metadata GetMetadata() const PB_527_OVERRIDE; + + // Converts the decompressed `input'. + bool DeserializeFrom(google::protobuf::io::ZeroCopyInputStream* intput) const { + if (!_callback) { + LOG(WARNING) << "Deserializer::DeserializeFrom() called without callback"; + return false; + } + return _callback(intput); + } + void SetCallback(Callback callback) { + _callback = std::move(callback); + } + +private: + void SharedCtor() {} + void SharedDtor() {} + + Callback _callback; +}; + struct CompressHandler { // Compress serialized `msg' into `buf'. // Returns true on success, false otherwise bool (*Compress)(const google::protobuf::Message& msg, butil::IOBuf* buf); - bool (*Compress2Json)(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2JsonOptions& options); - bool (*Compress2ProtoJson)(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2ProtoJsonOptions& options); - bool (*Compress2ProtoText)(const google::protobuf::Message& msg, butil::IOBuf* buf); // Parse decompressed `data' as `msg'. // Returns true on success, false otherwise bool (*Decompress)(const butil::IOBuf& data, google::protobuf::Message* msg); - bool (*DecompressFromJson)(const butil::IOBuf& data, google::protobuf::Message* msg, - const json2pb::Json2PbOptions& options); - bool (*DecompressFromProtoJson)(const butil::IOBuf& data, google::protobuf::Message* msg, - const json2pb::ProtoJson2PbOptions& options); - bool (*DecompressFromProtoText)(const butil::IOBuf& data, google::protobuf::Message* msg); // Name of the compression algorithm, must be string constant. const char* name; diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index a98ac49068..6b3310ec7f 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -388,27 +388,15 @@ static void GlobalInitializeOrDieImpl() { LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb); // Compress Handlers - CompressHandler gzip_compress = { - GzipCompress, GzipCompress2Json, GzipCompress2ProtoJson, - GzipCompress2ProtoText, GzipDecompress, GzipDecompressFromJson, - GzipDecompressFromProtoJson, GzipDecompressFromProtoText, "gzip" - }; + CompressHandler gzip_compress = { GzipCompress, GzipDecompress, "gzip" }; if (RegisterCompressHandler(COMPRESS_TYPE_GZIP, gzip_compress) != 0) { exit(1); } - CompressHandler zlib_compress = { - ZlibCompress, ZlibCompress2Json, ZlibCompress2ProtoJson, - ZlibCompress2ProtoText, ZlibDecompress, ZlibDecompressFromJson, - ZlibDecompressFromProtoJson, ZlibDecompressFromProtoText, "zlib" - }; + CompressHandler zlib_compress = { ZlibCompress, ZlibDecompress, "zlib" }; if (RegisterCompressHandler(COMPRESS_TYPE_ZLIB, zlib_compress) != 0) { exit(1); } - CompressHandler snappy_compress = { - SnappyCompress, SnappyCompress2Json, SnappyCompress2ProtoJson, - SnappyCompress2ProtoText, SnappyDecompress, SnappyDecompressFromJson, - SnappyDecompressFromProtoJson, SnappyDecompressFromProtoText, "snappy" - }; + CompressHandler snappy_compress = { SnappyCompress, SnappyDecompress, "snappy" }; if (RegisterCompressHandler(COMPRESS_TYPE_SNAPPY, snappy_compress) != 0) { exit(1); } diff --git a/src/brpc/memcache.cpp b/src/brpc/memcache.cpp index c198d16873..489d84db16 100644 --- a/src/brpc/memcache.cpp +++ b/src/brpc/memcache.cpp @@ -32,7 +32,7 @@ MemcacheRequest::MemcacheRequest() } MemcacheRequest::MemcacheRequest(const MemcacheRequest& from) - : NonreflectableMessage() { + : NonreflectableMessage(from) { SharedCtor(); MergeFrom(from); } @@ -143,7 +143,7 @@ MemcacheResponse::MemcacheResponse() } MemcacheResponse::MemcacheResponse(const MemcacheResponse& from) - : NonreflectableMessage() { + : NonreflectableMessage(from) { SharedCtor(); MergeFrom(from); } diff --git a/src/brpc/nonreflectable_message.h b/src/brpc/nonreflectable_message.h index 54a479fc79..1494cd1b75 100644 --- a/src/brpc/nonreflectable_message.h +++ b/src/brpc/nonreflectable_message.h @@ -21,7 +21,7 @@ #include #include -#include "pb_compat.h" +#include "brpc/pb_compat.h" namespace brpc { diff --git a/src/brpc/nshead_message.cpp b/src/brpc/nshead_message.cpp index fe9e4c96c0..46081c702a 100644 --- a/src/brpc/nshead_message.cpp +++ b/src/brpc/nshead_message.cpp @@ -28,7 +28,7 @@ NsheadMessage::NsheadMessage() } NsheadMessage::NsheadMessage(const NsheadMessage& from) - : NonreflectableMessage() { + : NonreflectableMessage(from) { SharedCtor(); MergeFrom(from); } diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 9b6065afcc..8efff06546 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -26,6 +26,7 @@ #include "butil/raw_pack.h" // RawPacker RawUnpacker #include "butil/memory/scope_guard.h" #include "json2pb/json_to_pb.h" +#include "json2pb/pb_to_json.h" #include "brpc/controller.h" // Controller #include "brpc/socket.h" // Socket #include "brpc/server.h" // Server @@ -140,59 +141,70 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, return MakeMessage(msg); } -static json2pb::Pb2JsonOptions MakePb2JsonOptions(Controller& cntl) { - json2pb::Pb2JsonOptions options; - options.bytes_to_base64 = cntl.has_pb_bytes_to_base64(); - options.jsonify_empty_array = cntl.has_pb_jsonify_empty_array(); - options.always_print_primitive_fields = cntl.has_always_print_primitive_fields(); - options.single_repeated_to_array = cntl.has_pb_single_repeated_to_array(); - options.enum_option = FLAGS_pb_enum_as_number - ? json2pb::OUTPUT_ENUM_BY_NUMBER - : json2pb::OUTPUT_ENUM_BY_NAME; - return options; -} - bool SerializeRpcMessage(const google::protobuf::Message& message, Controller& cntl, ContentType content_type, - CompressType compress_type, butil::IOBuf* buf, - std::string* error) { - if (COMPRESS_TYPE_NONE == compress_type) { - butil::IOBufAsZeroCopyOutputStream wrapper(buf); - if (CONTENT_TYPE_PB == content_type) { - return message.SerializeToZeroCopyStream(&wrapper); - } else if (CONTENT_TYPE_JSON == content_type ) { - json2pb::Pb2JsonOptions options = MakePb2JsonOptions(cntl); - return json2pb::ProtoMessageToJson(message, &wrapper, options, error); - } else if (CONTENT_TYPE_PROTO_JSON == content_type) { - json2pb::Pb2ProtoJsonOptions options; - AlwaysPrintPrimitiveFields(options) = cntl.has_always_print_primitive_fields(); - return json2pb::ProtoMessageToProtoJson(message, &wrapper, options, error); - } else if (CONTENT_TYPE_PROTO_TEXT == content_type) { - return google::protobuf::TextFormat::Print(message, &wrapper); + CompressType compress_type, butil::IOBuf* buf) { + auto serialize = [&](Serializer& serializer) -> bool { + bool ok; + if (COMPRESS_TYPE_NONE == compress_type) { + butil::IOBufAsZeroCopyOutputStream stream(buf); + ok = serializer.SerializeTo(&stream); + } else { + const CompressHandler* handler = FindCompressHandler(compress_type); + if (NULL == handler) { + return false; + } + ok = handler->Compress(serializer, buf); } - return false; - } + return ok; + }; - const CompressHandler* handler = FindCompressHandler(compress_type); - if (NULL == handler) { - return false; - } if (CONTENT_TYPE_PB == content_type) { - return handler->Compress(message, buf); - } - butil::IOBufAsZeroCopyOutputStream wrapper(buf); - if (CONTENT_TYPE_JSON == content_type) { - json2pb::Pb2JsonOptions options = MakePb2JsonOptions(cntl); - return handler->Compress2Json(message, buf, options); + Serializer serializer([&message](google::protobuf::io::ZeroCopyOutputStream* output) -> bool { + return message.SerializeToZeroCopyStream(output); + }); + return serialize(serializer); + } else if (CONTENT_TYPE_JSON == content_type) { + Serializer serializer([&message, &cntl](google::protobuf::io::ZeroCopyOutputStream* output) -> bool { + json2pb::Pb2JsonOptions options; + options.bytes_to_base64 = cntl.has_pb_bytes_to_base64(); + options.jsonify_empty_array = cntl.has_pb_jsonify_empty_array(); + options.always_print_primitive_fields = cntl.has_always_print_primitive_fields(); + options.single_repeated_to_array = cntl.has_pb_single_repeated_to_array(); + options.enum_option = FLAGS_pb_enum_as_number + ? json2pb::OUTPUT_ENUM_BY_NUMBER + : json2pb::OUTPUT_ENUM_BY_NAME; + std::string error; + bool ok = json2pb::ProtoMessageToJson(message, output, options, &error); + if (!ok) { + LOG(INFO) << "Fail to serialize message=" + << message.GetDescriptor()->full_name() + << " to json :" << error; + } + return ok; + }); + return serialize(serializer); } else if (CONTENT_TYPE_PROTO_JSON == content_type) { - json2pb::Pb2ProtoJsonOptions options; - options.always_print_enums_as_ints = FLAGS_pb_enum_as_number; - AlwaysPrintPrimitiveFields(options) = cntl.has_always_print_primitive_fields(); - return handler->Compress2ProtoJson(message, buf, options); + Serializer serializer([&message, &cntl](google::protobuf::io::ZeroCopyOutputStream* output) -> bool { + json2pb::Pb2ProtoJsonOptions options; + options.always_print_enums_as_ints = FLAGS_pb_enum_as_number; + AlwaysPrintPrimitiveFields(options) = cntl.has_always_print_primitive_fields(); + std::string error; + bool ok = json2pb::ProtoMessageToProtoJson(message, output, options, &error); + if (!ok) { + LOG(INFO) << "Fail to serialize message=" + << message.GetDescriptor()->full_name() + << " to proto-json :" << error; + } + return ok; + }); + return serialize(serializer); } else if (CONTENT_TYPE_PROTO_TEXT == content_type) { - return handler->Compress2ProtoText(message, buf); + Serializer serializer([&message](google::protobuf::io::ZeroCopyOutputStream* output) -> bool { + return google::protobuf::TextFormat::Print(message, output); + }); + return serialize(serializer); } - return false; } @@ -204,21 +216,20 @@ static bool SerializeResponse(const google::protobuf::Message& res, } if (!res.IsInitialized()) { - cntl.SetFailed(ERESPONSE, - "Missing required fields in response: %s", + cntl.SetFailed(ERESPONSE, "Missing required fields in response: %s", res.InitializationErrorString().c_str()); return false; } ContentType content_type = cntl.response_content_type(); CompressType compress_type = cntl.response_compress_type(); - std::string error; - if (!SerializeRpcMessage(res, cntl, content_type, - compress_type, &buf, &error)) { - cntl.SetFailed(ERESPONSE, "Fail to serialize response, " - "ContentType=%s, CompressType=%s", - ContentTypeToCStr(content_type), - CompressTypeToCStr(compress_type)); + if (!SerializeRpcMessage(res, cntl, content_type, compress_type, &buf)) { + cntl.SetFailed( + ERESPONSE, "Fail to serialize response=%s, " + "ContentType=%s, CompressType=%s", + res.GetDescriptor()->full_name().c_str(), + ContentTypeToCStr(content_type), + CompressTypeToCStr(compress_type)); return false; } return true; @@ -475,47 +486,66 @@ void EndRunningCallMethodInPool( bool DeserializeRpcMessage(const butil::IOBuf& data, Controller& cntl, ContentType content_type, CompressType compress_type, - google::protobuf::Message* message, std::string* error) { - if (COMPRESS_TYPE_NONE != compress_type) { - const CompressHandler* handler = FindCompressHandler(compress_type); - if (NULL == handler) { - return false; + google::protobuf::Message* message) { + auto deserialize = [&](Deserializer& deserializer) -> bool { + bool ok; + if (COMPRESS_TYPE_NONE == compress_type) { + butil::IOBufAsZeroCopyInputStream stream(data); + ok = deserializer.DeserializeFrom(&stream); + } else { + const CompressHandler* handler = FindCompressHandler(compress_type); + if (NULL == handler) { + return false; + } + ok = handler->Decompress(data, &deserializer); } + return ok; + }; - if (CONTENT_TYPE_PB == content_type) { - return handler->Decompress(data, message); - } else if (CONTENT_TYPE_JSON == content_type) { - butil::IOBufAsZeroCopyInputStream wrapper(data); + if (CONTENT_TYPE_PB == content_type) { + Deserializer deserializer([message]( + google::protobuf::io::ZeroCopyInputStream* input) -> bool { + return message->ParseFromZeroCopyStream(input); + }); + return deserialize(deserializer); + } else if (CONTENT_TYPE_JSON == content_type) { + Deserializer deserializer([message, &cntl]( + google::protobuf::io::ZeroCopyInputStream* input) -> bool { json2pb::Json2PbOptions options; options.base64_to_bytes = cntl.has_pb_bytes_to_base64(); options.array_to_single_repeated = cntl.has_pb_single_repeated_to_array(); - return handler->DecompressFromJson(data, message, options); - } else if (CONTENT_TYPE_PROTO_JSON == content_type) { + std::string error; + bool ok = json2pb::JsonToProtoMessage(input, message, options, &error); + if (!ok) { + LOG(INFO) << "Fail to parse json to " + << message->GetDescriptor()->full_name() + << ": "<< error; + } + return ok; + }); + return deserialize(deserializer); + } else if (CONTENT_TYPE_PROTO_JSON == content_type) { + Deserializer deserializer([message]( + google::protobuf::io::ZeroCopyInputStream* input) -> bool { json2pb::ProtoJson2PbOptions options; options.ignore_unknown_fields = true; - return handler->DecompressFromProtoJson(data, message, options); - } else if (CONTENT_TYPE_PROTO_TEXT == content_type) { - return handler->DecompressFromProtoText(data, message); - } - return false; - } - - butil::IOBufAsZeroCopyInputStream wrapper(data); - if (CONTENT_TYPE_PB == content_type) { - return ParsePbFromZeroCopyStream(message, &wrapper); - } else if (CONTENT_TYPE_JSON == content_type) { - json2pb::Json2PbOptions options; - options.base64_to_bytes = cntl.has_pb_bytes_to_base64(); - options.array_to_single_repeated = cntl.has_pb_single_repeated_to_array(); - return json2pb::JsonToProtoMessage(&wrapper, message, options, error); - } else if (CONTENT_TYPE_PROTO_JSON == content_type) { - json2pb::ProtoJson2PbOptions options; - options.ignore_unknown_fields = true; - return json2pb::ProtoJsonToProtoMessage(&wrapper, message, options, error); + std::string error; + bool ok = json2pb::ProtoJsonToProtoMessage(input, message, options, &error); + if (!ok) { + LOG(INFO) << "Fail to parse proto-json to " + << message->GetDescriptor()->full_name() + << ": "<< error; + } + return ok; + }); + return deserialize(deserializer); } else if (CONTENT_TYPE_PROTO_TEXT == content_type) { - return google::protobuf::TextFormat::Parse(&wrapper, message); + Deserializer deserializer([message]( + google::protobuf::io::ZeroCopyInputStream* input) -> bool { + return google::protobuf::TextFormat::Parse(input, message); + }); + return deserialize(deserializer); } - return false; } @@ -755,14 +785,14 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { ContentType content_type = meta.content_type(); auto compress_type = static_cast(meta.compress_type()); messages = server->options().rpc_pb_message_factory->Get(*svc, *method); - std::string error; - if (!DeserializeRpcMessage(req_buf, *cntl, content_type, compress_type, - messages->Request(), &error)) { + if (!DeserializeRpcMessage(req_buf, *cntl, content_type, + compress_type, messages->Request())) { cntl->SetFailed( - EREQUEST, "Fail to parse request message, ContentType=%s," - "CompressType=%s, request_size=%d : %s", - ContentTypeToCStr(content_type), CompressTypeToCStr(compress_type), - req_size, error.c_str()); + EREQUEST, "Fail to parse request=%s, ContentType=%s, " + "CompressType=%s, request_size=%d", + messages->Request()->GetDescriptor()->full_name().c_str(), + ContentTypeToCStr(content_type), + CompressTypeToCStr(compress_type), req_size); break; } req_buf.clear(); @@ -929,17 +959,17 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { cntl->set_response_content_type(content_type); cntl->set_response_compress_type(compress_type); if (cntl->response()) { - std::string error; if (cntl->response()->GetDescriptor() == SerializedResponse::descriptor()) { ((SerializedResponse*)cntl->response())-> serialized_data().append(*res_buf_ptr); } else if (!DeserializeRpcMessage(*res_buf_ptr, *cntl, content_type, - compress_type, cntl->response(), &error)) { + compress_type, cntl->response())) { cntl->SetFailed( - EREQUEST, "Fail to parse request message, ContentType=%s," - "CompressType=%s, request_size=%d : %s", - ContentTypeToCStr(content_type), CompressTypeToCStr(compress_type), - res_size, error.c_str()); + EREQUEST, "Fail to parse response=%s, ContentType=%s, " + "CompressType=%s, request_size=%d", + cntl->response()->GetDescriptor()->full_name().c_str(), + ContentTypeToCStr(content_type), + CompressTypeToCStr(compress_type), res_size); } } // else silently ignore the response. } while (0); @@ -966,13 +996,13 @@ void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl, ContentType content_type = cntl->request_content_type(); CompressType compress_type = cntl->request_compress_type(); - std::string error; - if (!SerializeRpcMessage(*request, *cntl, content_type, - compress_type, request_buf, &error)) { - return cntl->SetFailed(EREQUEST, "Fail to compress request, " - "ContentType=%s, CompressType=%s", - ContentTypeToCStr(content_type), - CompressTypeToCStr(compress_type)); + if (!SerializeRpcMessage(*request, *cntl, content_type, compress_type, request_buf)) { + return cntl->SetFailed( + EREQUEST, "Fail to compress request=%s, " + "ContentType=%s, CompressType=%s", + request->GetDescriptor()->full_name().c_str(), + ContentTypeToCStr(content_type), + CompressTypeToCStr(compress_type)); } } diff --git a/src/brpc/policy/gzip_compress.cpp b/src/brpc/policy/gzip_compress.cpp index 95327750d8..e8c77a5563 100644 --- a/src/brpc/policy/gzip_compress.cpp +++ b/src/brpc/policy/gzip_compress.cpp @@ -21,180 +21,85 @@ #include "butil/logging.h" #include "brpc/policy/gzip_compress.h" #include "brpc/protocol.h" - +#include "brpc/compress.h" namespace brpc { namespace policy { -template -void LogError(const char* error_message1 = NULL, - const char* error_message2 = NULL) { - if (IsCompress) { - LOG(WARNING) << "Fail to compress. " - << (NULL == error_message1 ? "" : error_message1) << " " - << (NULL == error_message2 ? "" : error_message2); - } else { - LOG(WARNING) << "Fail to decompress." - << error_message1 << " " - << error_message2; +const char* Format2CStr(google::protobuf::io::GzipOutputStream::Format format) { + switch (format) { + case google::protobuf::io::GzipOutputStream::GZIP: + return "gzip"; + case google::protobuf::io::GzipOutputStream::ZLIB: + return "zlib"; + default: + return "unknown"; + } +} + +const char* Format2CStr(google::protobuf::io::GzipInputStream::Format format) { + switch (format) { + case google::protobuf::io::GzipInputStream::GZIP: + return "gzip"; + case google::protobuf::io::GzipInputStream::ZLIB: + return "zlib"; + default: + return "unknown"; } } static bool Compress(const google::protobuf::Message& msg, butil::IOBuf* buf, google::protobuf::io::GzipOutputStream::Format format) { butil::IOBufAsZeroCopyOutputStream wrapper(buf); - google::protobuf::io::GzipOutputStream::Options options; + GzipCompressOptions options; options.format = format; google::protobuf::io::GzipOutputStream gzip(&wrapper, options); - if (!msg.SerializeToZeroCopyStream(&gzip)) { - LogError(gzip.ZlibErrorMessage()); - return false; - } - return gzip.Close(); -} - -static bool Compress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2JsonOptions& options, - google::protobuf::io::GzipOutputStream::Format format) { - butil::IOBufAsZeroCopyOutputStream wrapper(buf); - google::protobuf::io::GzipOutputStream::Options gzip_opt; - gzip_opt.format = format; - google::protobuf::io::GzipOutputStream gzip(&wrapper, gzip_opt); - std::string error; - if (!json2pb::ProtoMessageToJson(msg, &gzip, options, &error)) { - LogError(error.c_str(), gzip.ZlibErrorMessage()); - return false; + bool ok; + if (msg.GetDescriptor() == Serializer::descriptor()) { + ok = ((const Serializer&)msg).SerializeTo(&gzip); + } else { + ok = msg.SerializeToZeroCopyStream(&gzip); } - return gzip.Close(); -} - -static bool Compress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2ProtoJsonOptions& options, - google::protobuf::io::GzipOutputStream::Format format) { - butil::IOBufAsZeroCopyOutputStream wrapper(buf); - google::protobuf::io::GzipOutputStream::Options gzip_opt; - gzip_opt.format = format; - google::protobuf::io::GzipOutputStream gzip(&wrapper, gzip_opt); - std::string error; - if (!json2pb::ProtoMessageToProtoJson(msg, &gzip, options, &error)) { - LogError(error.c_str(), gzip.ZlibErrorMessage()); - return false; + if (!ok) { + LOG(WARNING) << "Fail to serialize input message=" + << msg.GetDescriptor()->full_name() + << ", format=" << Format2CStr(format) << " : " + << (NULL == gzip.ZlibErrorMessage() ? "" : gzip.ZlibErrorMessage()); } - return gzip.Close(); -} - -static bool Compress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf, - google::protobuf::io::GzipOutputStream::Format format) { - butil::IOBufAsZeroCopyOutputStream wrapper(buf); - google::protobuf::io::GzipOutputStream::Options gzip_opt; - gzip_opt.format = format; - google::protobuf::io::GzipOutputStream gzip(&wrapper, gzip_opt); - if (!google::protobuf::TextFormat::Print(msg, &gzip)) { - LogError(gzip.ZlibErrorMessage()); - return false; - } - return gzip.Close(); + return ok && gzip.Close(); } static bool Decompress(const butil::IOBuf& data, google::protobuf::Message* msg, google::protobuf::io::GzipInputStream::Format format) { butil::IOBufAsZeroCopyInputStream wrapper(data); google::protobuf::io::GzipInputStream gzip(&wrapper, format); - if (!ParsePbFromZeroCopyStream(msg, &gzip)) { - LogError(gzip.ZlibErrorMessage(), gzip.ZlibErrorMessage()); - return false; - } - return true; -} - -static bool DecompressFromJson(const butil::IOBuf& data, - google::protobuf::Message* msg, - const json2pb::Json2PbOptions& options, - google::protobuf::io::GzipInputStream::Format format) { - butil::IOBufAsZeroCopyInputStream wrapper(data); - google::protobuf::io::GzipInputStream gzip(&wrapper, format); - std::string error; - if (!json2pb::JsonToProtoMessage(&gzip, msg, options, &error)) { - LogError(error.c_str(), gzip.ZlibErrorMessage()); - return false; - } - return true; -} - -static bool DecompressFromProtoJson(const butil::IOBuf& data, - google::protobuf::Message* msg, - const json2pb::ProtoJson2PbOptions& options, - google::protobuf::io::GzipInputStream::Format format) { - butil::IOBufAsZeroCopyInputStream wrapper(data); - google::protobuf::io::GzipInputStream gzip(&wrapper, format); - std::string error; - if (!json2pb::ProtoJsonToProtoMessage(&gzip, msg, options, &error)) { - LogError(error.c_str(), gzip.ZlibErrorMessage()); - return false; + bool ok; + if (msg->GetDescriptor() == Deserializer::descriptor()) { + ok = ((Deserializer*)msg)->DeserializeFrom(&gzip); + } else { + ok = msg->ParseFromZeroCopyStream(&gzip); } - return true; -} - -static bool DecompressFromProtoText(const butil::IOBuf& data, - google::protobuf::Message* msg, - google::protobuf::io::GzipInputStream::Format format) { - butil::IOBufAsZeroCopyInputStream wrapper(data); - google::protobuf::io::GzipInputStream gzip(&wrapper, format); - std::string error; - if (!google::protobuf::TextFormat::Parse(&gzip, msg)) { - LogError(error.c_str(), gzip.ZlibErrorMessage()); - return false; + if (!ok) { + LOG(WARNING) << "Fail to deserialize input message=" + << msg->GetDescriptor()->full_name() + << ", format=" << Format2CStr(format) << " : " + << (NULL == gzip.ZlibErrorMessage() ? "" : gzip.ZlibErrorMessage()); } - return true; + return ok; } bool GzipCompress(const google::protobuf::Message& msg, butil::IOBuf* buf) { return Compress(msg, buf, google::protobuf::io::GzipOutputStream::GZIP); } -bool GzipCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2JsonOptions& options) { - return Compress2Json( - msg, buf, options, google::protobuf::io::GzipOutputStream::GZIP); -} - -bool GzipCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2ProtoJsonOptions& options) { - return Compress2ProtoJson( - msg, buf, options, google::protobuf::io::GzipOutputStream::GZIP); -} - -bool GzipCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf) { - return Compress2ProtoText(msg, buf, google::protobuf::io::GzipOutputStream::GZIP); -} - bool GzipDecompress(const butil::IOBuf& data, google::protobuf::Message* msg) { return Decompress(data, msg, google::protobuf::io::GzipInputStream::GZIP); } -bool GzipDecompressFromJson(const butil::IOBuf& data, google::protobuf::Message* msg, - const json2pb::Json2PbOptions& options) { - return DecompressFromJson( - data, msg, options, google::protobuf::io::GzipInputStream::GZIP); -} - -bool GzipDecompressFromProtoJson(const butil::IOBuf& data, - google::protobuf::Message* msg, - const json2pb::ProtoJson2PbOptions& options) { - return DecompressFromProtoJson( - data, msg, options, google::protobuf::io::GzipInputStream::GZIP); -} - -bool GzipDecompressFromProtoText(const butil::IOBuf& data, - google::protobuf::Message* msg) { - return DecompressFromProtoText( - data, msg, google::protobuf::io::GzipInputStream::GZIP); -} - bool GzipCompress(const butil::IOBuf& msg, butil::IOBuf* buf, const GzipCompressOptions* options_in) { butil::IOBufAsZeroCopyOutputStream wrapper(buf); - google::protobuf::io::GzipOutputStream::Options gzip_opt; + GzipCompressOptions gzip_opt; if (options_in) { gzip_opt = *options_in; } @@ -220,7 +125,8 @@ bool GzipCompress(const butil::IOBuf& msg, butil::IOBuf* buf, } if (size_in != 0 || (size_t)in.ByteCount() != msg.size()) { // If any stage is not fully consumed, something went wrong. - LogError(out.ZlibErrorMessage()); + LOG(WARNING) << "Fail to compress, format=" << Format2CStr(gzip_opt.format) + << " : " << out.ZlibErrorMessage(); return false; } if (size_out != 0) { @@ -259,7 +165,8 @@ inline bool GzipDecompressBase( // If any stage is not fully consumed, something went wrong. // Here we call in.Next addtitionally to make sure that the gzip // "blackbox" does not have buffer left. - LogError(in.ZlibErrorMessage()); + LOG(WARNING) << "Fail to decompress, format=" << Format2CStr(format) + << " : " << in.ZlibErrorMessage(); return false; } if (size_out != 0) { @@ -272,46 +179,11 @@ bool ZlibCompress(const google::protobuf::Message& msg, butil::IOBuf* buf) { return Compress(msg, buf, google::protobuf::io::GzipOutputStream::ZLIB); } -bool ZlibCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2JsonOptions& options) { - return Compress2Json( - msg, buf, options, google::protobuf::io::GzipOutputStream::ZLIB); -} - -bool ZlibCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2ProtoJsonOptions& options) { - return Compress2ProtoJson( - msg, buf, options, google::protobuf::io::GzipOutputStream::ZLIB); -} - -bool ZlibCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf) { - return Compress2ProtoText(msg, buf, google::protobuf::io::GzipOutputStream::ZLIB); -} - bool ZlibDecompress(const butil::IOBuf& data, google::protobuf::Message* msg) { return Decompress(data, msg, google::protobuf::io::GzipInputStream::ZLIB); } -bool ZlibDecompressFromJson(const butil::IOBuf& data, - google::protobuf::Message* msg, - const json2pb::Json2PbOptions& options) { - return DecompressFromJson( - data, msg, options, google::protobuf::io::GzipInputStream::ZLIB); -} - -bool ZlibDecompressFromProtoJson(const butil::IOBuf& data, - google::protobuf::Message* msg, - const json2pb::ProtoJson2PbOptions& options) { - return DecompressFromProtoJson( - data, msg, options, google::protobuf::io::GzipInputStream::ZLIB); -} - -bool ZlibDecompressFromProtoText(const butil::IOBuf& data, - google::protobuf::Message* msg) { - return DecompressFromProtoText(data, msg, google::protobuf::io::GzipInputStream::ZLIB); -} - bool GzipDecompress(const butil::IOBuf& data, butil::IOBuf* msg) { return GzipDecompressBase( data, msg, google::protobuf::io::GzipInputStream::GZIP); diff --git a/src/brpc/policy/gzip_compress.h b/src/brpc/policy/gzip_compress.h index fe1efb0250..426aaf504b 100644 --- a/src/brpc/policy/gzip_compress.h +++ b/src/brpc/policy/gzip_compress.h @@ -22,8 +22,6 @@ #include // Message #include #include "butil/iobuf.h" // butil::IOBuf -#include "json2pb/pb_to_json.h" -#include "json2pb/json_to_pb.h" namespace brpc { @@ -33,34 +31,11 @@ typedef google::protobuf::io::GzipOutputStream::Options GzipCompressOptions; // Compress serialized `msg' into `buf'. bool GzipCompress(const google::protobuf::Message& msg, butil::IOBuf* buf); -bool GzipCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2JsonOptions& options); -bool GzipCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2ProtoJsonOptions& options); -bool GzipCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf); - bool ZlibCompress(const google::protobuf::Message& msg, butil::IOBuf* buf); -bool ZlibCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2JsonOptions& options); -bool ZlibCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2ProtoJsonOptions& options); -bool ZlibCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf); // Parse `msg' from decompressed `buf'. bool GzipDecompress(const butil::IOBuf& buf, google::protobuf::Message* msg); -bool GzipDecompressFromJson(const butil::IOBuf& buf, google::protobuf::Message* msg, - const json2pb::Json2PbOptions& options); -bool GzipDecompressFromProtoJson(const butil::IOBuf& data, google::protobuf::Message* msg, - const json2pb::ProtoJson2PbOptions& options); -bool GzipDecompressFromProtoText(const butil::IOBuf& data, google::protobuf::Message* msg); - - bool ZlibDecompress(const butil::IOBuf& buf, google::protobuf::Message* msg); -bool ZlibDecompressFromJson(const butil::IOBuf& buf, google::protobuf::Message* msg, - const json2pb::Json2PbOptions& options); -bool ZlibDecompressFromProtoJson(const butil::IOBuf& data, google::protobuf::Message* msg, - const json2pb::ProtoJson2PbOptions& options); -bool ZlibDecompressFromProtoText(const butil::IOBuf& data, google::protobuf::Message* msg); // Put compressed `in' into `out'. bool GzipCompress(const butil::IOBuf& in, butil::IOBuf* out, diff --git a/src/brpc/policy/snappy_compress.cpp b/src/brpc/policy/snappy_compress.cpp index 1f731a6beb..8019b97b3c 100644 --- a/src/brpc/policy/snappy_compress.cpp +++ b/src/brpc/policy/snappy_compress.cpp @@ -16,12 +16,11 @@ // under the License. -#include #include "butil/logging.h" #include "butil/third_party/snappy/snappy.h" #include "brpc/policy/snappy_compress.h" #include "brpc/protocol.h" - +#include "brpc/compress.h" namespace brpc { namespace policy { @@ -29,108 +28,45 @@ namespace policy { bool SnappyCompress(const google::protobuf::Message& msg, butil::IOBuf* buf) { butil::IOBuf serialized_pb; butil::IOBufAsZeroCopyOutputStream wrapper(&serialized_pb); - if (msg.SerializeToZeroCopyStream(&wrapper)) { - return SnappyCompress(serialized_pb, buf); + bool ok; + if (msg.GetDescriptor() == Serializer::descriptor()) { + ok = ((const Serializer&)msg).SerializeTo(&wrapper); + } else { + ok = msg.SerializeToZeroCopyStream(&wrapper); } - - LOG(WARNING) << "Fail to serialize input pb=" << &msg; - return false; -} - -bool SnappyCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2JsonOptions& options) { - butil::IOBuf serialized_pb; - butil::IOBufAsZeroCopyOutputStream wrapper(&serialized_pb); - std::string error; - if (json2pb::ProtoMessageToJson(msg, &wrapper, options, &error)) { - return SnappyCompress(serialized_pb, buf); + if (!ok) { + LOG(WARNING) << "Fail to serialize input pb=" + << msg.GetDescriptor()->full_name(); + return false; } - LOG(WARNING) << "Fail to serialize input pb=" << &msg << " : " << error; - return false; - -} - -bool SnappyCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2ProtoJsonOptions& options) { - butil::IOBuf serialized_pb; - butil::IOBufAsZeroCopyOutputStream wrapper(&serialized_pb); - std::string error; - if (json2pb::ProtoMessageToProtoJson(msg, &wrapper, options, &error)) { - return SnappyCompress(serialized_pb, buf); + ok = SnappyCompress(serialized_pb, buf); + if (!ok) { + LOG(WARNING) << "Fail to snappy::Compress, size=" + << serialized_pb.size(); } - - LOG(WARNING) << "Fail to serialize input pb=" << &msg << " : " << error; - return false; -} - -bool SnappyCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf) { - butil::IOBuf serialized_pb; - butil::IOBufAsZeroCopyOutputStream wrapper(&serialized_pb); - if (google::protobuf::TextFormat::Print(msg, &wrapper)) { - return SnappyCompress(serialized_pb, buf); - } - - LOG(WARNING) << "Fail to serialize input pb=" << &msg; - return false; + return ok; } bool SnappyDecompress(const butil::IOBuf& data, google::protobuf::Message* msg) { butil::IOBuf binary_pb; - if (SnappyDecompress(data, &binary_pb)) { - return ParsePbFromIOBuf(msg, binary_pb); - } - LOG(WARNING) << "Fail to snappy::Uncompress, size=" << data.size(); - return false; -} - -bool SnappyDecompressFromJson(const butil::IOBuf& data, google::protobuf::Message* msg, - const json2pb::Json2PbOptions& options) { - butil::IOBuf json; - if (!SnappyDecompress(data, &json)) { + if (!SnappyDecompress(data, &binary_pb)) { LOG(WARNING) << "Fail to snappy::Uncompress, size=" << data.size(); return false; } - butil::IOBufAsZeroCopyInputStream wrapper(json); - std::string error; - if (json2pb::JsonToProtoMessage(&wrapper, msg, options, &error)) { - return true; - } - - LOG(WARNING) << "Fail to serialize input pb=" << &msg << " : " << error; - return false; -} -bool SnappyDecompressFromProtoJson(const butil::IOBuf& data, google::protobuf::Message* msg, - const json2pb::ProtoJson2PbOptions& options) { - butil::IOBuf json; - if (!SnappyDecompress(data, &json)) { - LOG(WARNING) << "Fail to snappy::Uncompress, size=" << data.size(); - return false; + bool ok; + butil::IOBufAsZeroCopyInputStream stream(binary_pb); + if (msg->GetDescriptor() == Deserializer::descriptor()) { + ok = ((Deserializer*)msg)->DeserializeFrom(&stream); + } else { + ok = msg->ParseFromZeroCopyStream(&stream); } - - butil::IOBufAsZeroCopyInputStream wrapper(json); - std::string error; - if (json2pb::ProtoJsonToProtoMessage(&wrapper, msg, options, &error)) { - return true; - } - LOG(WARNING) << "Fail to serialize input pb=" << &msg << " : " << error; - return false; -} - -bool SnappyDecompressFromProtoText(const butil::IOBuf& data, google::protobuf::Message* msg) { - butil::IOBuf json; - if (!SnappyDecompress(data, &json)) { - LOG(WARNING) << "Fail to snappy::Uncompress, size=" << data.size(); - return false; - } - - butil::IOBufAsZeroCopyInputStream wrapper(json); - if (google::protobuf::TextFormat::Parse(&wrapper, msg)) { - return true; + if (!ok) { + LOG(WARNING) << "Fail to eserialize input message=" + << msg->GetDescriptor()->full_name(); } - LOG(WARNING) << "Fail to serialize input pb=" << &msg; - return false; + return ok; } bool SnappyCompress(const butil::IOBuf& in, butil::IOBuf* out) { diff --git a/src/brpc/policy/snappy_compress.h b/src/brpc/policy/snappy_compress.h index 85d61b3fcd..082aba90b9 100644 --- a/src/brpc/policy/snappy_compress.h +++ b/src/brpc/policy/snappy_compress.h @@ -21,8 +21,6 @@ #include // Message #include "butil/iobuf.h" // IOBuf -#include "json2pb/pb_to_json.h" -#include "json2pb/json_to_pb.h" namespace brpc { @@ -30,19 +28,9 @@ namespace policy { // Compress serialized `msg' into `buf'. bool SnappyCompress(const google::protobuf::Message& msg, butil::IOBuf* buf); -bool SnappyCompress2Json(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2JsonOptions& options); -bool SnappyCompress2ProtoJson(const google::protobuf::Message& msg, butil::IOBuf* buf, - const json2pb::Pb2ProtoJsonOptions& options); -bool SnappyCompress2ProtoText(const google::protobuf::Message& msg, butil::IOBuf* buf); // Parse `msg' from decompressed `buf' bool SnappyDecompress(const butil::IOBuf& data, google::protobuf::Message* msg); -bool SnappyDecompressFromJson(const butil::IOBuf& data, google::protobuf::Message* msg, - const json2pb::Json2PbOptions& options); -bool SnappyDecompressFromProtoJson(const butil::IOBuf& data, google::protobuf::Message* msg, - const json2pb::ProtoJson2PbOptions& options); -bool SnappyDecompressFromProtoText(const butil::IOBuf& data, google::protobuf::Message* msg); // Put compressed `in' into `out'. bool SnappyCompress(const butil::IOBuf& in, butil::IOBuf* out); diff --git a/src/brpc/proto_base.proto b/src/brpc/proto_base.proto index 30033d49b6..b278ddb6bf 100644 --- a/src/brpc/proto_base.proto +++ b/src/brpc/proto_base.proto @@ -33,6 +33,9 @@ message NsheadMessageBase {} message SerializedRequestBase {} message SerializedResponseBase {} +message SerializerBase {} +message DeserializerBase {} + message ThriftFramedMessageBase {} service BaiduMasterServiceBase {} diff --git a/src/brpc/redis.cpp b/src/brpc/redis.cpp index f8870ae5c1..9af036f857 100644 --- a/src/brpc/redis.cpp +++ b/src/brpc/redis.cpp @@ -35,7 +35,7 @@ RedisRequest::RedisRequest() } RedisRequest::RedisRequest(const RedisRequest& from) - : NonreflectableMessage() { + : NonreflectableMessage(from) { SharedCtor(); MergeFrom(from); } @@ -200,7 +200,7 @@ RedisResponse::RedisResponse() SharedCtor(); } RedisResponse::RedisResponse(const RedisResponse& from) - : NonreflectableMessage() + : NonreflectableMessage(from) , _first_reply(&_arena) { SharedCtor(); MergeFrom(from); diff --git a/src/brpc/serialized_request.cpp b/src/brpc/serialized_request.cpp index f4cabad28e..ac55e31ed9 100644 --- a/src/brpc/serialized_request.cpp +++ b/src/brpc/serialized_request.cpp @@ -28,7 +28,7 @@ SerializedRequest::SerializedRequest() } SerializedRequest::SerializedRequest(const SerializedRequest& from) - : NonreflectableMessage() { + : NonreflectableMessage(from) { SharedCtor(); MergeFrom(from); } diff --git a/src/brpc/serialized_response.cpp b/src/brpc/serialized_response.cpp index 6d5d8fef92..c8466451c2 100644 --- a/src/brpc/serialized_response.cpp +++ b/src/brpc/serialized_response.cpp @@ -28,7 +28,7 @@ SerializedResponse::SerializedResponse() } SerializedResponse::SerializedResponse(const SerializedResponse& from) - : NonreflectableMessage() { + : NonreflectableMessage(from) { SharedCtor(); MergeFrom(from); } diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp index f0359132b9..a51b931781 100644 --- a/test/brpc_server_unittest.cpp +++ b/test/brpc_server_unittest.cpp @@ -30,7 +30,6 @@ #include "butil/files/scoped_file.h" #include "brpc/socket.h" #include "butil/object_pool.h" -#include "brpc/policy/baidu_rpc_protocol.h" #include "brpc/builtin/version_service.h" #include "brpc/builtin/health_service.h" #include "brpc/builtin/list_service.h" @@ -72,12 +71,12 @@ DECLARE_bool(enable_dir_service); namespace policy { DECLARE_bool(use_http_error_code); -extern bool SerializeRpcMessage(const google::protobuf::Message& message, Controller& cntl, +extern bool SerializeRpcMessage(const google::protobuf::Message& serializer, Controller& cntl, ContentType content_type, CompressType compress_type, - butil::IOBuf* buf, std::string* error); -extern bool DeserializeRpcMessage(const butil::IOBuf& data, Controller& cntl, + butil::IOBuf* buf); +extern bool DeserializeRpcMessage(const butil::IOBuf& deserializer, Controller& cntl, ContentType content_type, CompressType compress_type, - google::protobuf::Message* message, std::string* error); + google::protobuf::Message* message); } } @@ -1706,10 +1705,8 @@ class BaiduMasterServiceImpl : public brpc::BaiduMasterService { test::EchoRequest echo_request; test::EchoResponse echo_response; - std::string error; ASSERT_TRUE(brpc::policy::DeserializeRpcMessage( - request->serialized_data(), *cntl, content_type, - compress_type, &echo_request, &error)) << error; + request->serialized_data(), *cntl, content_type, compress_type, &echo_request)); ASSERT_EQ(EXP_REQUEST, echo_request.message()); ASSERT_EQ(EXP_REQUEST, cntl->request_attachment().to_string()); @@ -1733,8 +1730,7 @@ class BaiduMasterServiceImpl : public brpc::BaiduMasterService { cntl->response_attachment().append(EXP_RESPONSE); echo_response.set_message(EXP_RESPONSE); ASSERT_TRUE(brpc::policy::SerializeRpcMessage( - echo_response, *cntl, content_type, compress_type, - &response->serialized_data(), &error)) << error; + echo_response, *cntl, content_type, compress_type, &response->serialized_data())); } private: int _content_type_index = brpc::ContentType_MIN; @@ -1800,8 +1796,7 @@ void TestGenericCall(brpc::Channel& channel, std::string error; ASSERT_TRUE(brpc::policy::SerializeRpcMessage( - request, cntl, content_type, compress_type, - &serialized_request.serialized_data(), &error)) << error; + request, cntl, content_type, compress_type, &serialized_request.serialized_data())); auto sampled_request = new (std::nothrow) brpc::SampledRequest(); sampled_request->meta.set_service_name( test::EchoService::descriptor()->full_name()); @@ -1814,8 +1809,7 @@ void TestGenericCall(brpc::Channel& channel, ASSERT_TRUE(brpc::policy::DeserializeRpcMessage(serialized_response.serialized_data(), cntl, cntl.response_content_type(), - cntl.response_compress_type(), - &response, &error)) << error; + cntl.response_compress_type(), &response)); ASSERT_EQ(EXP_RESPONSE, response.message()); ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string()); }