Skip to content

Commit 6ca8808

Browse files
committed
feat: support parsing buffer config from params in AsyncClientConfig
Change-Id: Id2e57fb2599b27c7d6f79e58031c44d8ad4a0b12
1 parent f1e3f3b commit 6ca8808

File tree

3 files changed

+142
-6
lines changed

3 files changed

+142
-6
lines changed

envoy/redis/async_client.h

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
#pragma once
22

33
#include <chrono>
4+
#include <limits>
5+
#include <map>
46
#include <memory>
57
#include <string>
68
#include <utility>
7-
#include <map>
89

910
namespace Envoy {
1011

@@ -20,17 +21,39 @@ struct AsyncClientConfig {
2021
AsyncClientConfig(std::string&& username, std::string&& password, int op_timeout_milliseconds,
2122
std::map<std::string, std::string>&& params)
2223
: auth_username_(std::move(username)), auth_password_(std::move(password)),
23-
op_timeout_(op_timeout_milliseconds), buffer_flush_timeout_(3), params_(std::move(params)) {
24+
op_timeout_(op_timeout_milliseconds),
25+
max_buffer_size_before_flush_(parseUint32FromParams(params, "max_buffer_size_before_flush", 1024)),
26+
buffer_flush_timeout_(parseUint32FromParams(params, "buffer_flush_timeout", 3)),
27+
params_(std::move(params)) {
2428
}
29+
2530
const std::string auth_username_;
2631
const std::string auth_password_;
2732

2833
const std::chrono::milliseconds op_timeout_;
29-
const uint32_t max_buffer_size_before_flush_{1024};
34+
const uint32_t max_buffer_size_before_flush_;
3035
const std::chrono::milliseconds buffer_flush_timeout_;
3136
const uint32_t max_upstream_unknown_connections_{100};
3237
const bool enable_command_stats_{false};
3338
const std::map<std::string, std::string> params_;
39+
40+
private:
41+
// Helper function to parse uint32 from params map with default value
42+
static uint32_t parseUint32FromParams(const std::map<std::string, std::string>& params,
43+
const std::string& key, uint32_t default_value) {
44+
auto it = params.find(key);
45+
if (it != params.end()) {
46+
try {
47+
unsigned long value = std::stoul(it->second);
48+
if (value <= std::numeric_limits<uint32_t>::max()) {
49+
return static_cast<uint32_t>(value);
50+
}
51+
} catch (const std::exception&) {
52+
// If parsing fails, return default value
53+
}
54+
}
55+
return default_value;
56+
}
3457
};
3558

3659
/**

test/extensions/filters/http/wasm/test_data/test_redis_call_cpp.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ static RegisterContextFactory register_RedisCallContext(CONTEXT_FACTORY(RedisCal
3030
"redis_call");
3131

3232
bool RedisCallRootContext::onConfigure(size_t) {
33-
redisInit("cluster?db=1", "admin", "123456", 1000);
33+
// Test with buffer configuration parameters
34+
redisInit("cluster?db=1&buffer_flush_timeout=1&max_buffer_size_before_flush=512", "admin", "123456", 1000);
3435
return true;
3536
}
3637

@@ -52,14 +53,16 @@ FilterHeadersStatus RedisCallContext::onRequestHeaders(uint32_t, bool) {
5253
auto query = "*3\r\n$3\r\nset\r\n$2\r\nid\r\n$1\r\n1\r\n";
5354
auto path = getRequestHeader(":path");
5455
if (path->view() == "/bad") {
55-
if (root()->redisCall("cluster?db=1", query, callback) != WasmResult::Ok) {
56+
// Test with different buffer params on runtime call
57+
if (root()->redisCall("cluster?db=1&buffer_flush_timeout=2", query, callback) != WasmResult::Ok) {
5658
logInfo("redis_call rejected");
5759
}
5860
} else {
5961
if (root()->redisCall("bogus cluster", query, callback) == WasmResult::Ok) {
6062
logError("bogus cluster found error");
6163
}
62-
root()->redisCall("cluster?db=1", query, callback);
64+
// Test with buffer params in query string
65+
root()->redisCall("cluster?db=1&buffer_flush_timeout=1&max_buffer_size_before_flush=512", query, callback);
6366
logInfo("onRequestHeaders");
6467
}
6568

test/extensions/filters/http/wasm/wasm_filter_test.cc

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "envoy/grpc/async_client.h"
2+
#include "envoy/redis/async_client.h"
23

34
#include "source/common/http/message_impl.h"
45
#include "source/extensions/filters/http/wasm/wasm_filter.h"
@@ -795,6 +796,115 @@ TEST_P(WasmHttpFilterTest, RedisCall) {
795796
EXPECT_NE(callbacks, nullptr);
796797
}
797798

799+
#if defined(HIGRESS)
800+
// Unit test for AsyncClientConfig parameter parsing
801+
TEST(RedisAsyncClientConfigTest, ParseBufferParamsFromQueryString) {
802+
// Test with all parameters specified
803+
{
804+
std::map<std::string, std::string> params = {
805+
{"db", "1"},
806+
{"buffer_flush_timeout", "5"},
807+
{"max_buffer_size_before_flush", "2048"}
808+
};
809+
Redis::AsyncClientConfig config("testuser", "testpass", 1000, std::move(params));
810+
811+
EXPECT_EQ(config.auth_username_, "testuser");
812+
EXPECT_EQ(config.auth_password_, "testpass");
813+
EXPECT_EQ(config.op_timeout_.count(), 1000);
814+
EXPECT_EQ(config.buffer_flush_timeout_.count(), 5);
815+
EXPECT_EQ(config.max_buffer_size_before_flush_, 2048);
816+
EXPECT_EQ(config.params_.at("db"), "1");
817+
}
818+
819+
// Test with only buffer_flush_timeout specified (max_buffer uses default)
820+
{
821+
std::map<std::string, std::string> params = {
822+
{"buffer_flush_timeout", "1"}
823+
};
824+
Redis::AsyncClientConfig config("admin", "123456", 2000, std::move(params));
825+
826+
EXPECT_EQ(config.buffer_flush_timeout_.count(), 1);
827+
EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default value
828+
}
829+
830+
// Test with only max_buffer_size_before_flush specified (timeout uses default)
831+
{
832+
std::map<std::string, std::string> params = {
833+
{"max_buffer_size_before_flush", "512"}
834+
};
835+
Redis::AsyncClientConfig config("admin", "123456", 2000, std::move(params));
836+
837+
EXPECT_EQ(config.buffer_flush_timeout_.count(), 3); // default value
838+
EXPECT_EQ(config.max_buffer_size_before_flush_, 512);
839+
}
840+
841+
// Test with no buffer params (both use defaults)
842+
{
843+
std::map<std::string, std::string> params = {
844+
{"db", "0"}
845+
};
846+
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));
847+
848+
EXPECT_EQ(config.buffer_flush_timeout_.count(), 3); // default 3ms
849+
EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default 1024 bytes
850+
}
851+
852+
// Test with invalid buffer_flush_timeout (should use default)
853+
{
854+
std::map<std::string, std::string> params = {
855+
{"buffer_flush_timeout", "invalid_number"}
856+
};
857+
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));
858+
859+
EXPECT_EQ(config.buffer_flush_timeout_.count(), 3); // default due to parse error
860+
}
861+
862+
// Test with invalid max_buffer_size_before_flush (should use default)
863+
{
864+
std::map<std::string, std::string> params = {
865+
{"max_buffer_size_before_flush", "not_a_number"}
866+
};
867+
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));
868+
869+
EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default due to parse error
870+
}
871+
872+
// Test with zero values (edge case - disable buffering)
873+
{
874+
std::map<std::string, std::string> params = {
875+
{"buffer_flush_timeout", "0"},
876+
{"max_buffer_size_before_flush", "0"}
877+
};
878+
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));
879+
880+
EXPECT_EQ(config.buffer_flush_timeout_.count(), 0);
881+
EXPECT_EQ(config.max_buffer_size_before_flush_, 0);
882+
}
883+
884+
// Test with very large values (within uint32 range)
885+
{
886+
std::map<std::string, std::string> params = {
887+
{"buffer_flush_timeout", "10000"},
888+
{"max_buffer_size_before_flush", "1048576"} // 1MB
889+
};
890+
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));
891+
892+
EXPECT_EQ(config.buffer_flush_timeout_.count(), 10000);
893+
EXPECT_EQ(config.max_buffer_size_before_flush_, 1048576);
894+
}
895+
896+
// Test with value exceeding uint32 max (should use default)
897+
{
898+
std::map<std::string, std::string> params = {
899+
{"max_buffer_size_before_flush", "99999999999999"} // exceeds uint32::max
900+
};
901+
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));
902+
903+
EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default due to overflow
904+
}
905+
}
906+
#endif
907+
798908
TEST_P(WasmHttpFilterTest, DisableClearRouteCache) {
799909
if (std::get<1>(GetParam()) == "rust") {
800910
// This feature is not supported in rust test code

0 commit comments

Comments
 (0)