From 99dec41ce5b4c0c5816593ae70637b598ceb3933 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 14 May 2024 10:35:23 +0800 Subject: [PATCH 1/4] [fix](move-memtable) change brpc connection type to single --- be/src/util/brpc_client_cache.h | 6 +++++- be/src/vec/sink/load_stream_stub.cpp | 6 +++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index 2e1d0508ba3a64..290f2cc3e04747 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -111,7 +111,8 @@ class BrpcClientCache { std::shared_ptr get_new_client_no_cache(const std::string& host_port, const std::string& protocol = "baidu_std", - const std::string& connect_type = "") { + const std::string& connect_type = "", + const std::string& connection_group = "") { brpc::ChannelOptions options; if constexpr (std::is_same_v) { options.protocol = config::function_service_protocol; @@ -121,6 +122,9 @@ class BrpcClientCache { if (connect_type != "") { options.connection_type = connect_type; } + if (connection_group != "") { + options.connection_group = connection_group; + } options.connect_timeout_ms = 2000; options.max_retry = 10; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 155ce2de349ab6..5ba15ba09eed61 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -175,9 +175,9 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, *request.add_tablets() = tablet; } POpenLoadStreamResponse response; - // use "pooled" connection to avoid conflicts between streaming rpc and regular rpc, - // see: https://github.com/apache/brpc/issues/392 - const auto& stub = client_cache->get_new_client_no_cache(host_port, "baidu_std", "pooled"); + // set load_id as connection_group to distinguish different streaming connections + const auto& stub = client_cache->get_new_client_no_cache(host_port, "baidu_std", "single", + print_id(_load_id)); stub->open_load_stream(&cntl, &request, &response, nullptr); for (const auto& resp : response.tablet_schemas()) { auto tablet_schema = std::make_unique(); From 1c56f992af0788ad237a8974f61fd86bffbc1586 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 15 May 2024 13:28:34 +0800 Subject: [PATCH 2/4] connection_group = streaming --- be/src/vec/sink/load_stream_stub.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 5ba15ba09eed61..d50a0788905c6f 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -177,7 +177,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, POpenLoadStreamResponse response; // set load_id as connection_group to distinguish different streaming connections const auto& stub = client_cache->get_new_client_no_cache(host_port, "baidu_std", "single", - print_id(_load_id)); + "streaming"); stub->open_load_stream(&cntl, &request, &response, nullptr); for (const auto& resp : response.tablet_schemas()) { auto tablet_schema = std::make_unique(); From 4e13e152f055c0ecf182e80a5731fed379620257 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 15 May 2024 13:29:52 +0800 Subject: [PATCH 3/4] style --- be/src/vec/sink/load_stream_stub.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index d50a0788905c6f..6a78d9e746bbd6 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -176,8 +176,8 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, } POpenLoadStreamResponse response; // set load_id as connection_group to distinguish different streaming connections - const auto& stub = client_cache->get_new_client_no_cache(host_port, "baidu_std", "single", - "streaming"); + const auto& stub = + client_cache->get_new_client_no_cache(host_port, "baidu_std", "single", "streaming"); stub->open_load_stream(&cntl, &request, &response, nullptr); for (const auto& resp : response.tablet_schemas()) { auto tablet_schema = std::make_unique(); From 572d6004dc7b31332df230ccb344494126a7a587 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 15 May 2024 14:28:21 +0800 Subject: [PATCH 4/4] fix comment --- be/src/vec/sink/load_stream_stub.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 6a78d9e746bbd6..92670c1c930090 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -175,7 +175,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, *request.add_tablets() = tablet; } POpenLoadStreamResponse response; - // set load_id as connection_group to distinguish different streaming connections + // set connection_group "streaming" to distinguish with non-streaming connections const auto& stub = client_cache->get_new_client_no_cache(host_port, "baidu_std", "single", "streaming"); stub->open_load_stream(&cntl, &request, &response, nullptr);