From ece5026022036723b37144cd4044d22bea46e8f3 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Thu, 13 Mar 2025 22:01:58 +0800 Subject: [PATCH] Optimize zero copy of http body --- src/brpc/details/http_message.cpp | 30 +++++++++++++++++------------ src/brpc/details/http_message.h | 20 ++++++++++++------- test/brpc_http_message_unittest.cpp | 7 +++++-- 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/src/brpc/details/http_message.cpp b/src/brpc/details/http_message.cpp index 3160afbfcd..0ffe5b1143 100644 --- a/src/brpc/details/http_message.cpp +++ b/src/brpc/details/http_message.cpp @@ -141,7 +141,7 @@ int HttpMessage::on_header_value(http_parser *parser, } int HttpMessage::on_headers_complete(http_parser *parser) { - HttpMessage *http_message = (HttpMessage *)parser->data; + HttpMessage* http_message = (HttpMessage *)parser->data; http_message->_stage = HTTP_ON_HEADERS_COMPLETE; if (parser->http_major > 1) { // NOTE: this checking is a MUST because ProcessHttpResponse relies @@ -282,9 +282,12 @@ int HttpMessage::OnBody(const char *at, const size_t length) { } if (!_read_body_progressively) { // Normal read. - // TODO: The input data is from IOBuf as well, possible to append - // data w/o copying. - _body.append(at, length); + if (NULL != _current_source_iobuf) { + _current_source_iobuf->append_to( + &_body, length, _parsed_block_size + (at - _current_block_base)); + } else { + _body.append(at, length); + } return 0; } // Progressive read. @@ -434,13 +437,8 @@ const http_parser_settings g_parser_settings = { HttpMessage::HttpMessage(bool read_body_progressively, HttpMethod request_method) - : _parsed_length(0) - , _stage(HTTP_ON_MESSAGE_BEGIN) - , _request_method(request_method) - , _read_body_progressively(read_body_progressively) - , _body_reader(NULL) - , _cur_value(NULL) - , _vbodylen(0) { + : _request_method(request_method) + , _read_body_progressively(read_body_progressively) { http_parser_init(&_parser, HTTP_BOTH); _parser.allow_chunked_length = 1; _parser.data = this; @@ -489,6 +487,11 @@ ssize_t HttpMessage::ParseFromIOBuf(const butil::IOBuf &buf) { << ") to already-completed message"; return -1; } + _parsed_block_size = 0; + _current_source_iobuf = &buf; + BRPC_SCOPE_EXIT { + _current_source_iobuf = NULL; + }; size_t nprocessed = 0; for (size_t i = 0; i < buf.backing_block_num(); ++i) { butil::StringPiece blk = buf.backing_block(i); @@ -496,8 +499,11 @@ ssize_t HttpMessage::ParseFromIOBuf(const butil::IOBuf &buf) { // length=0 will be treated as EOF by http_parser, must skip. continue; } - nprocessed += http_parser_execute( + _current_block_base = blk.data(); + size_t n = http_parser_execute( &_parser, &g_parser_settings, blk.data(), blk.size()); + nprocessed += n; + _parsed_block_size += n; if (_parser.http_errno != 0) { // May try HTTP on other formats, failure is norm. RPC_VLOG << "Fail to parse http message, parser=" << _parser diff --git a/src/brpc/details/http_message.h b/src/brpc/details/http_message.h index b86ffdd9b5..6ff2d03107 100644 --- a/src/brpc/details/http_message.h +++ b/src/brpc/details/http_message.h @@ -99,32 +99,38 @@ class HttpMessage { protected: int OnBody(const char* data, size_t size); int OnMessageComplete(); - size_t _parsed_length; + size_t _parsed_length{0}; private: DISALLOW_COPY_AND_ASSIGN(HttpMessage); int UnlockAndFlushToBodyReader(std::unique_lock& locked); - HttpParserStage _stage; + HttpParserStage _stage{HTTP_ON_MESSAGE_BEGIN}; std::string _url; - HttpMethod _request_method; + HttpMethod _request_method{HTTP_METHOD_GET}; HttpHeader _header; - bool _read_body_progressively; + bool _read_body_progressively{false}; // For mutual exclusion between on_body and SetBodyReader. butil::Mutex _body_mutex; // Read body progressively - ProgressiveReader* _body_reader; + ProgressiveReader* _body_reader{NULL}; butil::IOBuf _body; + // Store the IOBuf information in `ParseFromIOBuf' + // for later zero-copy usage in `OnBody'. + const butil::IOBuf* _current_source_iobuf{NULL}; + const char* _current_block_base{NULL}; + size_t _parsed_block_size{0}; + // Parser related members struct http_parser _parser; std::string _cur_header; - std::string *_cur_value; + std::string *_cur_value{NULL}; protected: // Only valid when -http_verbose is on std::unique_ptr _vmsgbuilder; - size_t _vbodylen; + size_t _vbodylen{0}; }; std::ostream& operator<<(std::ostream& os, const http_parser& parser); diff --git a/test/brpc_http_message_unittest.cpp b/test/brpc_http_message_unittest.cpp index d6f206cd07..9933a8d1bb 100644 --- a/test/brpc_http_message_unittest.cpp +++ b/test/brpc_http_message_unittest.cpp @@ -224,8 +224,10 @@ TEST(HttpMessageTest, parse_from_iobuf) { "Content-Length: %lu\r\n" "\r\n", content_length); - std::string content; - for (size_t i = 0; i < content_length; ++i) content.push_back('2'); + butil::IOBuf content; + for (size_t i = 0; i < content_length; ++i) { + content.push_back('2'); + } butil::IOBuf request; request.append(header); request.append(content); @@ -233,6 +235,7 @@ TEST(HttpMessageTest, parse_from_iobuf) { brpc::HttpMessage http_message; ASSERT_TRUE(http_message.ParseFromIOBuf(request) >= 0); ASSERT_TRUE(http_message.Completed()); + ASSERT_EQ(content, http_message.body()); ASSERT_EQ(content, http_message.body().to_string()); ASSERT_EQ("text/plain", http_message.header().content_type()); }