From 53587e418636e12c4330df7e206fd7853d7fd2d3 Mon Sep 17 00:00:00 2001 From: sunhao Date: Mon, 3 Nov 2025 17:41:25 +0800 Subject: [PATCH] Remove recursion of redis command parser --- src/brpc/redis_command.cpp | 88 ++++++++++++++++++++++++------------ src/brpc/redis_command.h | 14 ++++++ test/brpc_redis_unittest.cpp | 19 ++++++++ 3 files changed, 92 insertions(+), 29 deletions(-) diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp index 5803aaa8b9..d5e76c39d8 100644 --- a/src/brpc/redis_command.cpp +++ b/src/brpc/redis_command.cpp @@ -377,27 +377,50 @@ size_t RedisCommandParser::ParsedArgsSize() { ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::vector* args, butil::Arena* arena) { + ParseError err = PARSE_OK; + do { + RedisCommandConsumeState state = ConsumeImpl(buf, arena, &err); + if (state == CONSUME_STATE_CONTINUE) { + continue; + } else if (state == CONSUME_STATE_DONE) { + break; + } else { + return err; + } + } while (true); + + args->swap(_args); + Reset(); + return PARSE_OK; +} + +RedisCommandConsumeState RedisCommandParser::ConsumeImpl(butil::IOBuf& buf, + butil::Arena* arena, + ParseError* err) { const auto pfc = static_cast(buf.fetch1()); if (pfc == NULL) { - return PARSE_ERROR_NOT_ENOUGH_DATA; + *err = PARSE_ERROR_NOT_ENOUGH_DATA; + return CONSUME_STATE_ERROR; } // '*' stands for array "*\r\n..." if (!_parsing_array && *pfc != '*') { if (!std::isalpha(static_cast(*pfc))) { - return PARSE_ERROR_TRY_OTHERS; + *err = PARSE_ERROR_TRY_OTHERS; + return CONSUME_STATE_ERROR; } const size_t buf_size = buf.size(); const auto copy_str = static_cast(arena->allocate(buf_size + 1)); buf.copy_to(copy_str, buf_size); if (*copy_str == ' ') { - return PARSE_ERROR_ABSOLUTELY_WRONG; + *err = PARSE_ERROR_ABSOLUTELY_WRONG; + return CONSUME_STATE_ERROR; } copy_str[buf_size] = '\0'; const size_t crlf_pos = butil::StringPiece(copy_str, buf_size).find("\r\n"); if (crlf_pos == butil::StringPiece::npos) { // not enough data - return PARSE_ERROR_NOT_ENOUGH_DATA; + *err = PARSE_ERROR_NOT_ENOUGH_DATA; + return CONSUME_STATE_ERROR; } - args->clear(); size_t offset = 0; while (offset < crlf_pos && copy_str[offset] != ' ') { ++offset; @@ -407,11 +430,11 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, for (size_t i = 0; i < offset; ++i) { first_arg[i] = tolower(first_arg[i]); } - args->push_back(butil::StringPiece(first_arg, offset)); + _args.push_back(butil::StringPiece(first_arg, offset)); if (offset == crlf_pos) { // only one argument, directly return buf.pop_front(crlf_pos + 2); - return PARSE_OK; + return CONSUME_STATE_DONE; } size_t arg_start_pos = ++offset; @@ -422,7 +445,7 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, const auto arg_length = offset - arg_start_pos; const auto arg = static_cast(arena->allocate(arg_length)); memcpy(arg, copy_str + arg_start_pos, arg_length); - args->push_back(butil::StringPiece(arg, arg_length)); + _args.push_back(butil::StringPiece(arg, arg_length)); arg_start_pos = ++offset; } @@ -431,61 +454,69 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, const auto arg_length = crlf_pos - arg_start_pos; const auto arg = static_cast(arena->allocate(arg_length)); memcpy(arg, copy_str + arg_start_pos, arg_length); - args->push_back(butil::StringPiece(arg, arg_length)); + _args.push_back(butil::StringPiece(arg, arg_length)); } buf.pop_front(crlf_pos + 2); - return PARSE_OK; + return CONSUME_STATE_DONE; } // '$' stands for bulk string "$\r\n\r\n" if (_parsing_array && *pfc != '$') { - return PARSE_ERROR_ABSOLUTELY_WRONG; + *err = PARSE_ERROR_ABSOLUTELY_WRONG; + return CONSUME_STATE_ERROR; } char intbuf[32]; // enough for fc + 64-bit decimal + \r\n const size_t ncopied = buf.copy_to(intbuf, sizeof(intbuf) - 1); intbuf[ncopied] = '\0'; const size_t crlf_pos = butil::StringPiece(intbuf, ncopied).find("\r\n"); if (crlf_pos == butil::StringPiece::npos) { // not enough data - return PARSE_ERROR_NOT_ENOUGH_DATA; + *err = PARSE_ERROR_NOT_ENOUGH_DATA; + return CONSUME_STATE_ERROR; } char* endptr = NULL; int64_t value = strtoll(intbuf + 1/*skip fc*/, &endptr, 10); if (endptr != intbuf + crlf_pos) { LOG(ERROR) << '`' << intbuf + 1 << "' is not a valid 64-bit decimal"; - return PARSE_ERROR_ABSOLUTELY_WRONG; + *err = PARSE_ERROR_ABSOLUTELY_WRONG; + return CONSUME_STATE_ERROR; } if (value < 0) { LOG(ERROR) << "Invalid len=" << value << " in redis command"; - return PARSE_ERROR_ABSOLUTELY_WRONG; + *err = PARSE_ERROR_ABSOLUTELY_WRONG; + return CONSUME_STATE_ERROR; } if (!_parsing_array) { if (value > (int64_t)(FLAGS_redis_max_allocation_size / sizeof(butil::StringPiece))) { - LOG(ERROR) << "command array size exceeds limit! max=" - << (FLAGS_redis_max_allocation_size / sizeof(butil::StringPiece)) + LOG(ERROR) << "command array size exceeds limit! max=" + << (FLAGS_redis_max_allocation_size / sizeof(butil::StringPiece)) << ", actually=" << value; - return PARSE_ERROR_ABSOLUTELY_WRONG; + *err = PARSE_ERROR_ABSOLUTELY_WRONG; + return CONSUME_STATE_ERROR; } buf.pop_front(crlf_pos + 2/*CRLF*/); _parsing_array = true; _length = value; _index = 0; _args.resize(value); - return Consume(buf, args, arena); + return CONSUME_STATE_CONTINUE; } CHECK(_index < _length) << "a complete command has been parsed. " - "impl of RedisCommandParser::Parse is buggy"; + "impl of RedisCommandParser::Parse is buggy"; const int64_t len = value; // `value' is length of the string if (len < 0) { LOG(ERROR) << "string in command is nil!"; - return PARSE_ERROR_ABSOLUTELY_WRONG; + *err = PARSE_ERROR_ABSOLUTELY_WRONG; + return CONSUME_STATE_ERROR; } if (len > FLAGS_redis_max_allocation_size) { - LOG(ERROR) << "command string exceeds max allocation size! max=" + LOG(ERROR) << "command string exceeds max allocation size! max=" << FLAGS_redis_max_allocation_size << ", actually=" << len; - return PARSE_ERROR_ABSOLUTELY_WRONG; + *err = PARSE_ERROR_ABSOLUTELY_WRONG; + return CONSUME_STATE_ERROR; } if (buf.size() < crlf_pos + 2 + (size_t)len + 2/*CRLF*/) { - return PARSE_ERROR_NOT_ENOUGH_DATA; + *err = PARSE_ERROR_NOT_ENOUGH_DATA; + return CONSUME_STATE_ERROR; } buf.pop_front(crlf_pos + 2/*CRLF*/); char* d = (char*)arena->allocate((len/8 + 1) * 8); @@ -502,14 +533,13 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, buf.cutn(crlf, sizeof(crlf)); if (crlf[0] != '\r' || crlf[1] != '\n') { LOG(ERROR) << "string in command is not ended with CRLF"; - return PARSE_ERROR_ABSOLUTELY_WRONG; + *err = PARSE_ERROR_ABSOLUTELY_WRONG; + return CONSUME_STATE_ERROR; } - if (++_index < _length) { - return Consume(buf, args, arena); + if (++_index == _length) { + return CONSUME_STATE_DONE; } - args->swap(_args); - Reset(); - return PARSE_OK; + return CONSUME_STATE_CONTINUE; } void RedisCommandParser::Reset() { diff --git a/src/brpc/redis_command.h b/src/brpc/redis_command.h index 5ddfb8e99a..25bd8859fd 100644 --- a/src/brpc/redis_command.h +++ b/src/brpc/redis_command.h @@ -43,6 +43,12 @@ butil::Status RedisCommandByComponents(butil::IOBuf* buf, const butil::StringPiece* components, size_t num_components); +enum RedisCommandConsumeState { + CONSUME_STATE_CONTINUE, + CONSUME_STATE_DONE, + CONSUME_STATE_ERROR, +}; + // A parser used to parse redis raw command. class RedisCommandParser { public: @@ -59,6 +65,14 @@ class RedisCommandParser { // Reset parser to the initial state. void Reset(); + // Consume one arg from `buf'. + // Return CONSUME_STATE_CONTINUE if the parser needs more data. + // Return CONSUME_STATE_DONE if the parser has parsed a complete command. + // Return CONSUME_STATE_ERROR if the parser meets an error. + RedisCommandConsumeState ConsumeImpl(butil::IOBuf& buf, + butil::Arena* arena, + ParseError* err); + bool _parsing_array; // if the parser has met array indicator '*' int _length; // array length int _index; // current parsing array index diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp index 5c8aef125b..4dffd55e9f 100644 --- a/test/brpc_redis_unittest.cpp +++ b/test/brpc_redis_unittest.cpp @@ -1423,6 +1423,25 @@ TEST_F(RedisTest, memory_allocation_limits) { brpc::ParseError err = parser.Consume(buf, &args, &arena); ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, err); } + + { + // Test large command array work + int32_t original_limit_tmp = brpc::FLAGS_redis_max_allocation_size; + brpc::FLAGS_redis_max_allocation_size = 1024 * 1024; + brpc::RedisCommandParser parser; + butil::IOBuf buf; + int32_t large_array_size = brpc::FLAGS_redis_max_allocation_size / sizeof(butil::StringPiece); + std::string large_array_cmd = "*" + std::to_string(large_array_size) + "\r\n"; + for(int i = 0; i < large_array_size; i++){ + large_array_cmd.append("$1\r\n1\r\n"); + } + buf.append(large_array_cmd); + + std::vector args; + brpc::ParseError err = parser.Consume(buf, &args, &arena); + ASSERT_EQ(brpc::PARSE_OK, err); + brpc::FLAGS_redis_max_allocation_size = original_limit_tmp; + } // Test valid cases within limits {