From c541f822eb2dcddb1548e8563206908e9345491d Mon Sep 17 00:00:00 2001 From: Yang Liming Date: Mon, 12 May 2025 16:50:24 +0800 Subject: [PATCH] support checksum --- example/echo_c++/client.cpp | 6 ++ example/echo_c++/server.cpp | 6 ++ src/brpc/checksum.cpp | 100 ++++++++++++++++++ src/brpc/checksum.h | 63 +++++++++++ src/brpc/controller.cpp | 4 + src/brpc/controller.h | 12 +++ .../details/controller_private_accessor.h | 10 ++ src/brpc/global.cpp | 11 ++ src/brpc/options.proto | 5 + src/brpc/policy/baidu_rpc_meta.proto | 2 + src/brpc/policy/baidu_rpc_protocol.cpp | 85 ++++++++++----- src/brpc/policy/crc32c_checksum.cpp | 64 +++++++++++ src/brpc/policy/crc32c_checksum.h | 37 +++++++ src/brpc/server.cpp | 15 +++ test/brpc_server_unittest.cpp | 95 +++++++++++------ 15 files changed, 455 insertions(+), 60 deletions(-) create mode 100644 src/brpc/checksum.cpp create mode 100644 src/brpc/checksum.h create mode 100644 src/brpc/policy/crc32c_checksum.cpp create mode 100644 src/brpc/policy/crc32c_checksum.h diff --git a/example/echo_c++/client.cpp b/example/echo_c++/client.cpp index 3cc83f7203..4a3aee9c88 100644 --- a/example/echo_c++/client.cpp +++ b/example/echo_c++/client.cpp @@ -31,6 +31,7 @@ DEFINE_string(load_balancer, "", "The algorithm for load balancing"); DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests"); +DEFINE_bool(enable_checksum, false, "Enable checksum or not"); int main(int argc, char* argv[]) { // Parse gflags. We recommend you to use gflags as well. @@ -71,6 +72,11 @@ int main(int argc, char* argv[]) { // being serialized into protobuf messages. cntl.request_attachment().append(FLAGS_attachment); + // Use checksum, only support CRC32C now. + if (FLAGS_enable_checksum) { + cntl.set_request_checksum_type(brpc::CHECKSUM_TYPE_CRC32C); + } + // Because `done'(last parameter) is NULL, this function waits until // the response comes back or error occurs(including timedout). stub.Echo(&cntl, &request, &response, NULL); diff --git a/example/echo_c++/server.cpp b/example/echo_c++/server.cpp index cc0050aea5..4113114629 100644 --- a/example/echo_c++/server.cpp +++ b/example/echo_c++/server.cpp @@ -29,6 +29,7 @@ DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS." " If this is set, the flag port will be ignored"); DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " "read/write operations during the last `idle_timeout_s'"); +DEFINE_bool(enable_checksum, false, "Enable checksum or not"); // Your implementation of example::EchoService // Notice that implementing brpc::Describable grants the ability to put @@ -75,6 +76,11 @@ class EchoServiceImpl : public EchoService { // being serialized into protobuf messages. cntl->response_attachment().append(cntl->request_attachment()); } + + // Use checksum, only support CRC32C now. + if (FLAGS_enable_checksum) { + cntl->set_response_checksum_type(brpc::CHECKSUM_TYPE_CRC32C); + } } // optional diff --git a/src/brpc/checksum.cpp b/src/brpc/checksum.cpp new file mode 100644 index 0000000000..85d04d852a --- /dev/null +++ b/src/brpc/checksum.cpp @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/checksum.h" + +#include "brpc/protocol.h" +#include "butil/logging.h" + +namespace brpc { + +static const int MAX_HANDLER_SIZE = 1024; +static ChecksumHandler s_handler_map[MAX_HANDLER_SIZE] = {{NULL, NULL, NULL}}; + +int RegisterChecksumHandler(ChecksumType type, ChecksumHandler handler) { + if (NULL == handler.Compute) { + LOG(FATAL) << "Invalid parameter: handler function is NULL"; + return -1; + } + int index = type; + if (index < 0 || index >= MAX_HANDLER_SIZE) { + LOG(FATAL) << "ChecksumType=" << type << " is out of range"; + return -1; + } + if (s_handler_map[index].Compute != NULL) { + LOG(FATAL) << "ChecksumType=" << type << " was registered"; + return -1; + } + s_handler_map[index] = handler; + return 0; +} + +// Find ChecksumHandler by type. +// Returns NULL if not found +inline const ChecksumHandler* FindChecksumHandler(ChecksumType type) { + int index = type; + if (index < 0 || index >= MAX_HANDLER_SIZE) { + LOG(ERROR) << "ChecksumType=" << type << " is out of range"; + return NULL; + } + if (NULL == s_handler_map[index].Compute) { + return NULL; + } + return &s_handler_map[index]; +} + +const char* ChecksumTypeToCStr(ChecksumType type) { + if (type == CHECKSUM_TYPE_NONE) { + return "none"; + } + const ChecksumHandler* handler = FindChecksumHandler(type); + return (handler != NULL ? handler->name : "unknown"); +} + +void ListChecksumHandler(std::vector* vec) { + vec->clear(); + for (int i = 0; i < MAX_HANDLER_SIZE; ++i) { + if (s_handler_map[i].Compute != NULL) { + vec->push_back(s_handler_map[i]); + } + } +} + +// Compute `data' checksum +void ComputeDataChecksum(const ChecksumIn& in, ChecksumType checksum_type) { + if (checksum_type == CHECKSUM_TYPE_NONE) { + return; + } + const ChecksumHandler* handler = FindChecksumHandler(checksum_type); + if (NULL != handler) { + handler->Compute(in); + } +} + +// Verify `data' checksum Returns true on success, false otherwise +bool VerifyDataChecksum(const ChecksumIn& in, ChecksumType checksum_type) { + if (checksum_type == CHECKSUM_TYPE_NONE) { + return true; + } + const ChecksumHandler* handler = FindChecksumHandler(checksum_type); + if (NULL != handler) { + return handler->Verify(in); + } + return true; +} + +} // namespace brpc diff --git a/src/brpc/checksum.h b/src/brpc/checksum.h new file mode 100644 index 0000000000..ff0a98a85b --- /dev/null +++ b/src/brpc/checksum.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_CHECKSUM_H +#define BRPC_CHECKSUM_H + +#include "brpc/controller.h" +#include "brpc/options.pb.h" // ChecksumType +#include "butil/iobuf.h" // butil::IOBuf + +namespace brpc { + +struct ChecksumIn { + const butil::IOBuf* buf; + Controller* cntl; +}; + +struct ChecksumHandler { + // checksum `buf'. + // Returns checksum value + void (*Compute)(const ChecksumIn& in); + + // verify buf checksum + // Rerturn true on success, false otherwise + bool (*Verify)(const ChecksumIn& in); + + // Name of the checksum algorithm, must be string constant. + const char* name; +}; + +// [NOT thread-safe] Register `handler' using key=`type' +// Returns 0 on success, -1 otherwise +int RegisterChecksumHandler(ChecksumType type, ChecksumHandler handler); + +// Returns the `name' of the checksumType if registered +const char* ChecksumTypeToCStr(ChecksumType type); + +// Put all registered handlers into `vec'. +void ListChecksumHandler(std::vector* vec); + +// Compute `data' checksum and set to controller +void ComputeDataChecksum(const ChecksumIn& in, ChecksumType checksum_type); + +// Verify `data' checksum Returns true on success, false otherwise +bool VerifyDataChecksum(const ChecksumIn& in, ChecksumType checksum_type); + +} // namespace brpc + +#endif // BRPC_CHECKSUM_H diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 1362d32296..b16ab545e2 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -269,6 +269,8 @@ void Controller::ResetPods() { _preferred_index = -1; _request_compress_type = COMPRESS_TYPE_NONE; _response_compress_type = COMPRESS_TYPE_NONE; + _request_checksum_type = CHECKSUM_TYPE_NONE; + _response_checksum_type = CHECKSUM_TYPE_NONE; _fail_limit = UNSET_MAGIC_NUM; _pipelined_count = 0; _inheritable.Reset(); @@ -1346,6 +1348,7 @@ void Controller::SaveClientSettings(ClientSettings* s) const { s->tos = _tos; s->connection_type = _connection_type; s->request_compress_type = _request_compress_type; + s->request_checksum_type = _request_checksum_type; s->log_id = log_id(); s->has_request_code = has_request_code(); s->request_code = _request_code; @@ -1359,6 +1362,7 @@ void Controller::ApplyClientSettings(const ClientSettings& s) { set_type_of_service(s.tos); set_connection_type(s.connection_type); set_request_compress_type(s.request_compress_type); + set_request_checksum_type(s.request_checksum_type); set_log_id(s.log_id); set_flag(FLAGS_REQUEST_CODE, s.has_request_code); _request_code = s.request_code; diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 000aee2f50..6447441943 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -241,6 +241,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Set compression method for request. void set_request_compress_type(CompressType t) { _request_compress_type = t; } + // Set checksum type for request. + void set_request_checksum_type(ChecksumType t) { _request_checksum_type = t; } + // Required by some load balancers. void set_request_code(uint64_t request_code) { add_flag(FLAGS_REQUEST_CODE); @@ -464,6 +467,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Set compression method for response. void set_response_compress_type(CompressType t) { _response_compress_type = t; } + + // Set checksum type for response. + void set_response_checksum_type(ChecksumType t) { _response_checksum_type = t; } // Non-zero when this RPC call is traced (by rpcz or rig). // NOTE: Only valid at server-side, always zero at client-side. @@ -552,6 +558,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); const std::string& request_id() const { return _inheritable.request_id; } CompressType request_compress_type() const { return _request_compress_type; } CompressType response_compress_type() const { return _response_compress_type; } + ChecksumType request_checksum_type() const { return _request_checksum_type; } + ChecksumType response_checksum_type() const { return _response_checksum_type; } const HttpHeader& http_request() const { return _http_request != NULL ? *_http_request : DefaultHttpHeader(); } @@ -693,6 +701,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); int32_t tos; ConnectionType connection_type; CompressType request_compress_type; + ChecksumType request_checksum_type; uint64_t log_id; bool has_request_code; int64_t request_code; @@ -834,6 +843,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); int _preferred_index; CompressType _request_compress_type; CompressType _response_compress_type; + ChecksumType _request_checksum_type; + ChecksumType _response_checksum_type; + std::string _checksum_value; Inheritable _inheritable; int _pchan_sub_count; google::protobuf::Message* _response; diff --git a/src/brpc/details/controller_private_accessor.h b/src/brpc/details/controller_private_accessor.h index db40ca1541..1a9d7062af 100644 --- a/src/brpc/details/controller_private_accessor.h +++ b/src/brpc/details/controller_private_accessor.h @@ -152,6 +152,16 @@ class ControllerPrivateAccessor { return *this; } + void set_checksum_value(const char* c, size_t size) { + _cntl->_checksum_value.assign(c, size); + } + + void set_checksum_value(const std::string& c) { + _cntl->_checksum_value = c; + } + + const std::string& checksum_value() const { return _cntl->_checksum_value; } + private: Controller* _cntl; }; diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index 6b3310ec7f..0196b6d008 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -60,6 +60,10 @@ #include "brpc/policy/gzip_compress.h" #include "brpc/policy/snappy_compress.h" +// Checksum handlers +#include "brpc/checksum.h" +#include "brpc/policy/crc32c_checksum.h" + // Protocols #include "brpc/protocol.h" #include "brpc/policy/baidu_rpc_protocol.h" @@ -401,6 +405,13 @@ static void GlobalInitializeOrDieImpl() { exit(1); } + // Checksum Handlers + const ChecksumHandler crc32c_checksum = {Crc32cCompute, Crc32cVerify, + "crc32c"}; + if (RegisterChecksumHandler(CHECKSUM_TYPE_CRC32C, crc32c_checksum) != 0) { + exit(1); + } + // Protocols Protocol baidu_protocol = { ParseRpcMessage, SerializeRpcRequest, PackRpcRequest, diff --git a/src/brpc/options.proto b/src/brpc/options.proto index e334c48ea0..34001d7bb8 100644 --- a/src/brpc/options.proto +++ b/src/brpc/options.proto @@ -74,6 +74,11 @@ enum CompressType { COMPRESS_TYPE_LZ4 = 4; } +enum ChecksumType { + CHECKSUM_TYPE_NONE = 0; + CHECKSUM_TYPE_CRC32C = 1; +} + enum ContentType { CONTENT_TYPE_PB = 0; CONTENT_TYPE_JSON = 1; diff --git a/src/brpc/policy/baidu_rpc_meta.proto b/src/brpc/policy/baidu_rpc_meta.proto index 591798310a..5591c5dab7 100644 --- a/src/brpc/policy/baidu_rpc_meta.proto +++ b/src/brpc/policy/baidu_rpc_meta.proto @@ -34,6 +34,8 @@ message RpcMeta { optional StreamSettings stream_settings = 8; map user_fields = 9; optional ContentType content_type = 10; + optional int32 checksum_type = 11; + optional bytes checksum_value = 12; } message RpcRequestMeta { diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 8efff06546..fcc8b824c6 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -32,6 +32,7 @@ #include "brpc/server.h" // Server #include "brpc/span.h" #include "brpc/compress.h" // ParseFromCompressedData +#include "brpc/checksum.h" #include "brpc/stream_impl.h" #include "brpc/rpc_dump.h" // SampledRequest #include "brpc/rpc_pb_message_factory.h" @@ -143,7 +144,8 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, bool SerializeRpcMessage(const google::protobuf::Message& message, Controller& cntl, ContentType content_type, - CompressType compress_type, butil::IOBuf* buf) { + CompressType compress_type, ChecksumType checksum_type, + butil::IOBuf* buf) { auto serialize = [&](Serializer& serializer) -> bool { bool ok; if (COMPRESS_TYPE_NONE == compress_type) { @@ -156,6 +158,8 @@ bool SerializeRpcMessage(const google::protobuf::Message& message, } ok = handler->Compress(serializer, buf); } + ChecksumIn checksum_in{buf, &cntl}; + ComputeDataChecksum(checksum_in, checksum_type); return ok; }; @@ -223,13 +227,16 @@ static bool SerializeResponse(const google::protobuf::Message& res, ContentType content_type = cntl.response_content_type(); CompressType compress_type = cntl.response_compress_type(); - if (!SerializeRpcMessage(res, cntl, content_type, compress_type, &buf)) { - cntl.SetFailed( - ERESPONSE, "Fail to serialize response=%s, " - "ContentType=%s, CompressType=%s", - res.GetDescriptor()->full_name().c_str(), - ContentTypeToCStr(content_type), - CompressTypeToCStr(compress_type)); + ChecksumType checksum_type = cntl.response_checksum_type(); + if (!SerializeRpcMessage(res, cntl, content_type, compress_type, + checksum_type, &buf)) { + cntl.SetFailed(ERESPONSE, + "Fail to serialize response=%s, " + "ContentType=%s, CompressType=%s, ChecksumType=%s", + res.GetDescriptor()->full_name().c_str(), + ContentTypeToCStr(content_type), + CompressTypeToCStr(compress_type), + ChecksumTypeToCStr(checksum_type)); return false; } return true; @@ -308,7 +315,6 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, // `res' can be NULL here, in which case we don't serialize it // If user calls `SetFailed' on Controller, we don't serialize // response either - CompressType compress_type = cntl->response_compress_type(); if (res != NULL && !cntl->Failed()) { append_body = SerializeResponse(*res, *cntl, res_body); } @@ -336,8 +342,10 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, response_meta->set_error_text(cntl->ErrorText()); } meta.set_correlation_id(correlation_id); - meta.set_compress_type(compress_type); + meta.set_compress_type(cntl->response_compress_type()); meta.set_content_type(cntl->response_content_type()); + meta.set_checksum_type(cntl->response_checksum_type()); + meta.set_checksum_value(accessor.checksum_value()); if (attached_size > 0) { meta.set_attachment_size(attached_size); } @@ -486,9 +494,14 @@ void EndRunningCallMethodInPool( bool DeserializeRpcMessage(const butil::IOBuf& data, Controller& cntl, ContentType content_type, CompressType compress_type, + ChecksumType checksum_type, google::protobuf::Message* message) { auto deserialize = [&](Deserializer& deserializer) -> bool { - bool ok; + ChecksumIn checksum_in{&data, &cntl}; + bool ok = VerifyDataChecksum(checksum_in, checksum_type); + if (!ok) { + return ok; + } if (COMPRESS_TYPE_NONE == compress_type) { butil::IOBufAsZeroCopyInputStream stream(data); ok = deserializer.DeserializeFrom(&stream); @@ -601,6 +614,8 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { } cntl->set_request_content_type(meta.content_type()); cntl->set_request_compress_type((CompressType)meta.compress_type()); + cntl->set_request_checksum_type((ChecksumType)meta.checksum_type()); + accessor.set_checksum_value(meta.checksum_value()); accessor.set_server(server) .set_security_mode(security_mode) .set_peer_id(socket->id()) @@ -783,16 +798,23 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { } ContentType content_type = meta.content_type(); - auto compress_type = static_cast(meta.compress_type()); - messages = server->options().rpc_pb_message_factory->Get(*svc, *method); + auto compress_type = + static_cast(meta.compress_type()); + auto checksum_type = + static_cast(meta.checksum_type()); + messages = + server->options().rpc_pb_message_factory->Get(*svc, *method); if (!DeserializeRpcMessage(req_buf, *cntl, content_type, - compress_type, messages->Request())) { + compress_type, checksum_type, + messages->Request())) { cntl->SetFailed( - EREQUEST, "Fail to parse request=%s, ContentType=%s, " - "CompressType=%s, request_size=%d", + EREQUEST, + "Fail to parse request=%s, ContentType=%s, " + "CompressType=%s, ChecksumType=%s, request_size=%d", messages->Request()->GetDescriptor()->full_name().c_str(), ContentTypeToCStr(content_type), - CompressTypeToCStr(compress_type), req_size); + CompressTypeToCStr(compress_type), + ChecksumTypeToCStr(checksum_type), req_size); break; } req_buf.clear(); @@ -956,20 +978,26 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { ContentType content_type = meta.content_type(); auto compress_type = (CompressType)meta.compress_type(); + auto checksum_type = (ChecksumType)meta.checksum_type(); cntl->set_response_content_type(content_type); cntl->set_response_compress_type(compress_type); + cntl->set_response_checksum_type(checksum_type); + accessor.set_checksum_value(meta.checksum_value()); if (cntl->response()) { if (cntl->response()->GetDescriptor() == SerializedResponse::descriptor()) { ((SerializedResponse*)cntl->response())-> serialized_data().append(*res_buf_ptr); } else if (!DeserializeRpcMessage(*res_buf_ptr, *cntl, content_type, - compress_type, cntl->response())) { + compress_type, checksum_type, + cntl->response())) { cntl->SetFailed( - EREQUEST, "Fail to parse response=%s, ContentType=%s, " - "CompressType=%s, request_size=%d", + EREQUEST, + "Fail to parse response=%s, ContentType=%s, " + "CompressType=%s, ChecksumType=%s, request_size=%d", cntl->response()->GetDescriptor()->full_name().c_str(), ContentTypeToCStr(content_type), - CompressTypeToCStr(compress_type), res_size); + CompressTypeToCStr(compress_type), + ChecksumTypeToCStr(checksum_type), res_size); } } // else silently ignore the response. } while (0); @@ -996,13 +1024,16 @@ void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl, ContentType content_type = cntl->request_content_type(); CompressType compress_type = cntl->request_compress_type(); - if (!SerializeRpcMessage(*request, *cntl, content_type, compress_type, request_buf)) { + ChecksumType checksum_type = cntl->request_checksum_type(); + if (!SerializeRpcMessage(*request, *cntl, content_type, compress_type, + checksum_type, request_buf)) { return cntl->SetFailed( - EREQUEST, "Fail to compress request=%s, " - "ContentType=%s, CompressType=%s", + EREQUEST, + "Fail to compress request=%s, " + "ContentType=%s, CompressType=%s, ChecksumType=%s", request->GetDescriptor()->full_name().c_str(), - ContentTypeToCStr(content_type), - CompressTypeToCStr(compress_type)); + ContentTypeToCStr(content_type), CompressTypeToCStr(compress_type), + ChecksumTypeToCStr(checksum_type)); } } @@ -1027,6 +1058,8 @@ void PackRpcRequest(butil::IOBuf* req_buf, method->service()->name()); request_meta->set_method_name(method->name()); meta.set_compress_type(cntl->request_compress_type()); + meta.set_checksum_type(cntl->request_checksum_type()); + meta.set_checksum_value(accessor.checksum_value()); } else if (NULL != cntl->sampled_request()) { // Replaying. Keep service-name as the one seen by server. request_meta->set_service_name(cntl->sampled_request()->meta.service_name()); diff --git a/src/brpc/policy/crc32c_checksum.cpp b/src/brpc/policy/crc32c_checksum.cpp new file mode 100644 index 0000000000..28b6fab60b --- /dev/null +++ b/src/brpc/policy/crc32c_checksum.cpp @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/policy/crc32c_checksum.h" + +#include "brpc/details/controller_private_accessor.h" +#include "brpc/log.h" +#include "butil/crc32c.h" +#include "butil/sys_byteorder.h" + +namespace brpc { +namespace policy { + +void Crc32cCompute(const ChecksumIn& in) { + auto buf = in.buf; + auto cntl = in.cntl; + butil::IOBufAsZeroCopyInputStream wrapper(*buf); + const void* data; + int size; + uint32_t crc = 0; + while (wrapper.Next(&data, &size)) { + crc = butil::crc32c::Extend(crc, static_cast(data), size); + } + RPC_VLOG << "Crc32cCompute crc=" << crc; + crc = butil::HostToNet32(butil::crc32c::Mask(crc)); + ControllerPrivateAccessor(cntl).set_checksum_value( + reinterpret_cast(&crc), sizeof(crc)); +} + +bool Crc32cVerify(const ChecksumIn& in) { + auto buf = in.buf; + auto cntl = in.cntl; + butil::IOBufAsZeroCopyInputStream wrapper(*buf); + const void* data; + int size; + uint32_t crc = 0; + while (wrapper.Next(&data, &size)) { + crc = butil::crc32c::Extend(crc, static_cast(data), size); + } + auto& val = ControllerPrivateAccessor(const_cast(cntl)) + .checksum_value(); + CHECK_EQ(val.size(), sizeof(crc)); + auto expected = *reinterpret_cast(val.data()); + expected = butil::crc32c::Unmask(butil::NetToHost32(expected)); + RPC_VLOG << "Crc32cVerify crc=" << crc << " expected=" << expected; + return crc == expected; +} + +} // namespace policy +} // namespace brpc diff --git a/src/brpc/policy/crc32c_checksum.h b/src/brpc/policy/crc32c_checksum.h new file mode 100644 index 0000000000..cfdd724ca1 --- /dev/null +++ b/src/brpc/policy/crc32c_checksum.h @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_POLICY_CRC32C_CHECKSUM_H +#define BRPC_POLICY_CRC32C_CHECKSUM_H + +#include "brpc/checksum.h" +#include "brpc/controller.h" +#include "butil/iobuf.h" // butil::IOBuf + +namespace brpc { +namespace policy { + +// Compute checksum +void Crc32cCompute(const ChecksumIn& in); + +// Verify checksum +bool Crc32cVerify(const ChecksumIn& in); + +} // namespace policy +} // namespace brpc + +#endif // BRPC_POLICY_CRC32C_CHECKSUM_H diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index aa55c858aa..4132eac299 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -33,6 +33,7 @@ #include "butil/debug/leak_annotations.h" #include "brpc/log.h" #include "brpc/compress.h" +#include "brpc/checksum.h" #include "brpc/policy/nova_pbrpc_protocol.h" #include "brpc/global.h" #include "brpc/socket_map.h" // SocketMapList @@ -227,6 +228,17 @@ static void PrintSupportedCompressions(std::ostream& os, void*) { } } +static void PrintSupportedChecksums(std::ostream& os, void*) { + std::vector handlers; + ListChecksumHandler(&handlers); + for (size_t i = 0; i < handlers.size(); ++i) { + if (i != 0) { + os << ' '; + } + os << (handlers[i].name ? handlers[i].name : "(null)"); + } +} + static void PrintEnabledProfilers(std::ostream& os, void*) { if (cpu_profiler_enabled) { os << "cpu "; @@ -253,6 +265,9 @@ static bvar::PassiveStatus s_proto_st( static bvar::PassiveStatus s_comp_st( "rpc_compressions", PrintSupportedCompressions, NULL); +static bvar::PassiveStatus s_cksum_st( + "rpc_checksums", PrintSupportedChecksums, NULL); + static bvar::PassiveStatus s_prof_st( "rpc_profilers", PrintEnabledProfilers, NULL); diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp index a51b931781..4a774fab2a 100644 --- a/test/brpc_server_unittest.cpp +++ b/test/brpc_server_unittest.cpp @@ -71,11 +71,14 @@ DECLARE_bool(enable_dir_service); namespace policy { DECLARE_bool(use_http_error_code); -extern bool SerializeRpcMessage(const google::protobuf::Message& serializer, Controller& cntl, - ContentType content_type, CompressType compress_type, - butil::IOBuf* buf); -extern bool DeserializeRpcMessage(const butil::IOBuf& deserializer, Controller& cntl, - ContentType content_type, CompressType compress_type, +extern bool SerializeRpcMessage(const google::protobuf::Message& serializer, + Controller& cntl, ContentType content_type, + CompressType compress_type, + ChecksumType checksum_type, butil::IOBuf* buf); +extern bool DeserializeRpcMessage(const butil::IOBuf& deserializer, + Controller& cntl, ContentType content_type, + CompressType compress_type, + ChecksumType checksum_type, google::protobuf::Message* message); } @@ -1702,11 +1705,13 @@ class BaiduMasterServiceImpl : public brpc::BaiduMasterService { ASSERT_EQ("Echo", cntl->sampled_request()->meta.method_name()); brpc::ContentType content_type = cntl->request_content_type(); brpc::CompressType compress_type = cntl->request_compress_type(); + brpc::ChecksumType checksum_type = cntl->request_checksum_type(); test::EchoRequest echo_request; test::EchoResponse echo_response; ASSERT_TRUE(brpc::policy::DeserializeRpcMessage( - request->serialized_data(), *cntl, content_type, compress_type, &echo_request)); + request->serialized_data(), *cntl, content_type, compress_type, + checksum_type, &echo_request)); ASSERT_EQ(EXP_REQUEST, echo_request.message()); ASSERT_EQ(EXP_REQUEST, cntl->request_attachment().to_string()); @@ -1727,10 +1732,12 @@ class BaiduMasterServiceImpl : public brpc::BaiduMasterService { cntl->set_response_content_type(content_type); cntl->set_response_compress_type(compress_type); + cntl->set_response_checksum_type(checksum_type); cntl->response_attachment().append(EXP_RESPONSE); echo_response.set_message(EXP_RESPONSE); ASSERT_TRUE(brpc::policy::SerializeRpcMessage( - echo_response, *cntl, content_type, compress_type, &response->serialized_data())); + echo_response, *cntl, content_type, compress_type, checksum_type, + &response->serialized_data())); } private: int _content_type_index = brpc::ContentType_MIN; @@ -1777,11 +1784,12 @@ TEST_F(ServerTest, baidu_master_service) { ASSERT_EQ(0, server.Join()); } -void TestGenericCall(brpc::Channel& channel, - brpc::ContentType content_type, - brpc::CompressType compress_type) { +void TestGenericCall(brpc::Channel& channel, brpc::ContentType content_type, + brpc::CompressType compress_type, + brpc::ChecksumType checksum_type) { LOG(INFO) << "TestGenericCall: content_type=" << content_type - << ", compress_type=" << compress_type; + << ", compress_type=" << compress_type + << ", checksum_type=" << checksum_type; test::EchoRequest request; test::EchoResponse response; request.set_message(EXP_REQUEST); @@ -1792,11 +1800,13 @@ void TestGenericCall(brpc::Channel& channel, brpc::Controller cntl; cntl.set_request_content_type(content_type); cntl.set_request_compress_type(compress_type); + cntl.set_request_checksum_type(checksum_type); cntl.request_attachment().append(EXP_REQUEST); std::string error; ASSERT_TRUE(brpc::policy::SerializeRpcMessage( - request, cntl, content_type, compress_type, &serialized_request.serialized_data())); + request, cntl, content_type, compress_type, checksum_type, + &serialized_request.serialized_data())); auto sampled_request = new (std::nothrow) brpc::SampledRequest(); sampled_request->meta.set_service_name( test::EchoService::descriptor()->full_name()); @@ -1807,9 +1817,10 @@ void TestGenericCall(brpc::Channel& channel, channel.CallMethod(NULL, &cntl, &serialized_request, &serialized_response, NULL); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); - ASSERT_TRUE(brpc::policy::DeserializeRpcMessage(serialized_response.serialized_data(), - cntl, cntl.response_content_type(), - cntl.response_compress_type(), &response)); + ASSERT_TRUE(brpc::policy::DeserializeRpcMessage( + serialized_response.serialized_data(), cntl, + cntl.response_content_type(), cntl.response_compress_type(), + cntl.response_checksum_type(), &response)); ASSERT_EQ(EXP_RESPONSE, response.message()); ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string()); } @@ -1829,25 +1840,41 @@ TEST_F(ServerTest, generic_call) { channel_options.protocol = "baidu_std"; ASSERT_EQ(0, channel.Init(ep, &channel_options)); - TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_ZLIB); - TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_GZIP); - TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_SNAPPY); - TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_NONE); - - TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_ZLIB); - TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_GZIP); - TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_SNAPPY); - TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_NONE); - - TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, brpc::COMPRESS_TYPE_ZLIB); - TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, brpc::COMPRESS_TYPE_GZIP); - TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, brpc::COMPRESS_TYPE_SNAPPY); - TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, brpc::COMPRESS_TYPE_NONE); - - TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, brpc::COMPRESS_TYPE_ZLIB); - TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, brpc::COMPRESS_TYPE_GZIP); - TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, brpc::COMPRESS_TYPE_SNAPPY); - TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, brpc::COMPRESS_TYPE_NONE); + TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_ZLIB, + brpc::CHECKSUM_TYPE_CRC32C); + TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_GZIP, + brpc::CHECKSUM_TYPE_CRC32C); + TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_SNAPPY, + brpc::CHECKSUM_TYPE_NONE); + TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_NONE, + brpc::CHECKSUM_TYPE_NONE); + + TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_ZLIB, + brpc::CHECKSUM_TYPE_CRC32C); + TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_GZIP, + brpc::CHECKSUM_TYPE_CRC32C); + TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, + brpc::COMPRESS_TYPE_SNAPPY, brpc::CHECKSUM_TYPE_NONE); + TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_NONE, + brpc::CHECKSUM_TYPE_NONE); + + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, + brpc::COMPRESS_TYPE_ZLIB, brpc::CHECKSUM_TYPE_CRC32C); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, + brpc::COMPRESS_TYPE_GZIP, brpc::CHECKSUM_TYPE_CRC32C); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, + brpc::COMPRESS_TYPE_SNAPPY, brpc::CHECKSUM_TYPE_NONE); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, + brpc::COMPRESS_TYPE_NONE, brpc::CHECKSUM_TYPE_NONE); + + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, + brpc::COMPRESS_TYPE_ZLIB, brpc::CHECKSUM_TYPE_CRC32C); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, + brpc::COMPRESS_TYPE_GZIP, brpc::CHECKSUM_TYPE_CRC32C); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, + brpc::COMPRESS_TYPE_SNAPPY, brpc::CHECKSUM_TYPE_NONE); + TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, + brpc::COMPRESS_TYPE_NONE, brpc::CHECKSUM_TYPE_NONE); ASSERT_EQ(0, server.Stop(0)); ASSERT_EQ(0, server.Join());