From 0acd2b79474f706c839a0bff4981513ec2f536a1 Mon Sep 17 00:00:00 2001 From: Chen Zhao Date: Thu, 10 Jul 2025 13:14:20 +0800 Subject: [PATCH 1/7] add inline redis protocol support --- src/brpc/redis_command.cpp | 50 +++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp index 81b734befd..48bbdd4d62 100644 --- a/src/brpc/redis_command.cpp +++ b/src/brpc/redis_command.cpp @@ -379,7 +379,55 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, } // '*' stands for array "*\r\n..." if (!_parsing_array && *pfc != '*') { - return PARSE_ERROR_TRY_OTHERS; + const size_t buf_size = buf.size(); + const auto copy_str = static_cast(arena->allocate(buf_size)); + buf.copy_to(copy_str, buf_size); + if (*copy_str == ' ') { + return PARSE_ERROR_ABSOLUTELY_WRONG; + } + const int crlf_pos = find_crlf(copy_str, buf_size); + if (crlf_pos == -1) { + return PARSE_ERROR_NOT_ENOUGH_DATA; + } + args->clear(); + int offset = 0; + while (offset < crlf_pos && copy_str[offset] != ' ') { + ++offset; + } + const auto first_arg = static_cast(arena->allocate(offset)); + memcpy(first_arg, copy_str, offset); + for (int i = 0; i < offset; ++i) { + first_arg[i] = tolower(first_arg[i]); + } + args->push_back(butil::StringPiece(first_arg, offset)); + if (offset == crlf_pos) { + // only one argument, directly return + buf.pop_front(crlf_pos + 2); + return PARSE_OK; + } + int arg_start_pos = ++offset; + + for (; offset < crlf_pos; ++offset) { + if (copy_str[offset] != ' ') { + continue; + } + const auto arg_length = offset - arg_start_pos; + const auto arg = static_cast(arena->allocate(arg_length)); + memcpy(arg, copy_str + arg_start_pos, arg_length); + args->push_back(butil::StringPiece(arg, arg_length)); + arg_start_pos = ++offset; + } + + if (arg_start_pos < crlf_pos) { + // process the last argument + const auto arg_length = crlf_pos - arg_start_pos; + const auto arg = static_cast(arena->allocate(arg_length)); + memcpy(arg, copy_str + arg_start_pos, arg_length); + args->push_back(butil::StringPiece(arg, arg_length)); + } + + buf.pop_front(crlf_pos + 2); + return PARSE_OK; } // '$' stands for bulk string "$\r\n\r\n" if (_parsing_array && *pfc != '$') { From 80631f45febcc0521d13dce8119ee4cc995297ab Mon Sep 17 00:00:00 2001 From: Chen Zhao Date: Fri, 11 Jul 2025 12:53:19 +0800 Subject: [PATCH 2/7] complete code --- src/brpc/redis_command.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp index 48bbdd4d62..1e6972992d 100644 --- a/src/brpc/redis_command.cpp +++ b/src/brpc/redis_command.cpp @@ -370,10 +370,19 @@ size_t RedisCommandParser::ParsedArgsSize() { return _args.size(); } +int find_crlf(const char* pfc, size_t length) { + for (size_t i = 0; i < length - 1; ++i) { + if (pfc[i] == '\r' && pfc[i + 1] == '\n') { + return i; + } + } + return -1; +} + ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::vector* args, butil::Arena* arena) { - const char* pfc = (const char*)buf.fetch1(); + const auto pfc = static_cast(buf.fetch1()); if (pfc == NULL) { return PARSE_ERROR_NOT_ENOUGH_DATA; } From 75f7276f98f2f5a85eded199902acc8d7fa5a5f3 Mon Sep 17 00:00:00 2001 From: Jerry Zhao Date: Sat, 12 Jul 2025 16:08:51 +0800 Subject: [PATCH 3/7] add check --- src/brpc/redis_command.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp index 1e6972992d..2c80ad77ad 100644 --- a/src/brpc/redis_command.cpp +++ b/src/brpc/redis_command.cpp @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include "butil/logging.h" @@ -388,6 +389,9 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, } // '*' stands for array "*\r\n..." if (!_parsing_array && *pfc != '*') { + if (!std::isalpha(static_cast(ch))) { + return PARSE_ERROR_TRY_OTHERS; + } const size_t buf_size = buf.size(); const auto copy_str = static_cast(arena->allocate(buf_size)); buf.copy_to(copy_str, buf_size); From 6b009bcecc800544817dae24fd35c2cda745e3c6 Mon Sep 17 00:00:00 2001 From: Jerry Zhao Date: Sat, 12 Jul 2025 18:05:28 +0800 Subject: [PATCH 4/7] fix --- src/brpc/redis_command.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp index 2c80ad77ad..6e2c841d3d 100644 --- a/src/brpc/redis_command.cpp +++ b/src/brpc/redis_command.cpp @@ -389,7 +389,7 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, } // '*' stands for array "*\r\n..." if (!_parsing_array && *pfc != '*') { - if (!std::isalpha(static_cast(ch))) { + if (!std::isalpha(static_cast(*pfc))) { return PARSE_ERROR_TRY_OTHERS; } const size_t buf_size = buf.size(); From f5b7bd2d13657a4b203407362c83f33ac09a2245 Mon Sep 17 00:00:00 2001 From: Chen Zhao Date: Tue, 15 Jul 2025 10:26:50 +0800 Subject: [PATCH 5/7] add inline unitest --- test/brpc_redis_unittest.cpp | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp index 7a48d19a85..ac6872e0fb 100644 --- a/test/brpc_redis_unittest.cpp +++ b/test/brpc_redis_unittest.cpp @@ -580,7 +580,7 @@ TEST_F(RedisTest, command_parser) { ASSERT_EQ(command, GetCompleteCommand(command_out)); } { - // simulate parsing from network + // simulate parsing from network following RESP int t = 100; std::string raw_string("*3\r\n$3\r\nset\r\n$3\r\nabc\r\n$3\r\ndef\r\n"); int size = raw_string.size(); @@ -600,6 +600,27 @@ TEST_F(RedisTest, command_parser) { ASSERT_EQ(GetCompleteCommand(command_out), "set abc def"); } } + { + // simulate parsing from network under inline protocol + int t = 100; + std::string raw_string("set abc def\r\n"); + int size = raw_string.size(); + while (t--) { + for (int i = 0; i < size; ++i) { + buf.push_back(raw_string[i]); + if (i == size - 1) { + ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &arena)); + } else { + if (butil::fast_rand_less_than(2) == 0) { + ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, + parser.Consume(buf, &command_out, &arena)); + } + } + } + ASSERT_TRUE(buf.empty()); + ASSERT_EQ(GetCompleteCommand(command_out), "set abc def"); + } + } { // there is a non-string message in command and parse should fail buf.append("*3\r\n$3"); From 9cb1e3c861b4a545d6edc4ec11131838ebe0083d Mon Sep 17 00:00:00 2001 From: Jerry Zhao Date: Sat, 19 Jul 2025 14:11:21 +0800 Subject: [PATCH 6/7] use find --- src/brpc/redis_command.cpp | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp index 6e2c841d3d..91cf8be876 100644 --- a/src/brpc/redis_command.cpp +++ b/src/brpc/redis_command.cpp @@ -371,15 +371,6 @@ size_t RedisCommandParser::ParsedArgsSize() { return _args.size(); } -int find_crlf(const char* pfc, size_t length) { - for (size_t i = 0; i < length - 1; ++i) { - if (pfc[i] == '\r' && pfc[i + 1] == '\n') { - return i; - } - } - return -1; -} - ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::vector* args, butil::Arena* arena) { @@ -393,13 +384,14 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, return PARSE_ERROR_TRY_OTHERS; } const size_t buf_size = buf.size(); - const auto copy_str = static_cast(arena->allocate(buf_size)); + const auto copy_str = static_cast(arena->allocate(buf_size + 1)); buf.copy_to(copy_str, buf_size); if (*copy_str == ' ') { return PARSE_ERROR_ABSOLUTELY_WRONG; } - const int crlf_pos = find_crlf(copy_str, buf_size); - if (crlf_pos == -1) { + copy_str[buf_size] = '\0'; + const size_t crlf_pos = butil::StringPiece(copy_str, buf_size).find("\r\n"); + if (crlf_pos == butil::StringPiece::npos) { // not enough data return PARSE_ERROR_NOT_ENOUGH_DATA; } args->clear(); From 684355b734558d911dc4de3fa759cc12e0447a36 Mon Sep 17 00:00:00 2001 From: Jerry Zhao Date: Sat, 19 Jul 2025 15:03:44 +0800 Subject: [PATCH 7/7] fix --- src/brpc/redis_command.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp index 91cf8be876..d2620323bc 100644 --- a/src/brpc/redis_command.cpp +++ b/src/brpc/redis_command.cpp @@ -395,13 +395,13 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, return PARSE_ERROR_NOT_ENOUGH_DATA; } args->clear(); - int offset = 0; + size_t offset = 0; while (offset < crlf_pos && copy_str[offset] != ' ') { ++offset; } const auto first_arg = static_cast(arena->allocate(offset)); memcpy(first_arg, copy_str, offset); - for (int i = 0; i < offset; ++i) { + for (size_t i = 0; i < offset; ++i) { first_arg[i] = tolower(first_arg[i]); } args->push_back(butil::StringPiece(first_arg, offset)); @@ -410,7 +410,7 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, buf.pop_front(crlf_pos + 2); return PARSE_OK; } - int arg_start_pos = ++offset; + size_t arg_start_pos = ++offset; for (; offset < crlf_pos; ++offset) { if (copy_str[offset] != ' ') {