diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index e686df2dfd6a8f..5365ad42fafec5 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -222,6 +222,9 @@ class ExecEnv { BrpcClientCache* brpc_internal_client_cache() const { return _internal_client_cache; } + BrpcClientCache* brpc_streaming_client_cache() const { + return _streaming_client_cache; + } BrpcClientCache* brpc_function_client_cache() const { return _function_client_cache; } @@ -403,6 +406,7 @@ class ExecEnv { // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle. std::shared_ptr _new_load_stream_mgr; BrpcClientCache* _internal_client_cache = nullptr; + BrpcClientCache* _streaming_client_cache = nullptr; BrpcClientCache* _function_client_cache = nullptr; std::shared_ptr _stream_load_executor; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index d160e9abdc2ff1..4ac0fd7d159753 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -298,7 +298,10 @@ Status ExecEnv::_init(const std::vector& store_paths, _load_stream_mgr = std::make_unique(num_flush_threads); _new_load_stream_mgr = NewLoadStreamMgr::create_shared(); _internal_client_cache = new BrpcClientCache(); - _function_client_cache = new BrpcClientCache(); + _streaming_client_cache = + new BrpcClientCache("baidu_std", "single", "streaming"); + _function_client_cache = + new BrpcClientCache(config::function_service_protocol); if (config::is_cloud_mode()) { _stream_load_executor = std::make_shared(this); } else { @@ -719,6 +722,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_routine_load_task_executor); // _stream_load_executor SAFE_DELETE(_function_client_cache); + SAFE_DELETE(_streaming_client_cache); SAFE_DELETE(_internal_client_cache); SAFE_DELETE(_bfd_parser); diff --git a/be/src/util/brpc_client_cache.cpp b/be/src/util/brpc_client_cache.cpp index b9135e8014dc7d..c5a6488787879b 100644 --- a/be/src/util/brpc_client_cache.cpp +++ b/be/src/util/brpc_client_cache.cpp @@ -25,12 +25,23 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_endpoint_stub_count, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_stream_endpoint_stub_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_function_endpoint_stub_count, MetricUnit::NOUNIT); template <> -BrpcClientCache::BrpcClientCache() { - REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return _stub_map.size(); }); +BrpcClientCache::BrpcClientCache(std::string protocol, + std::string connection_type, + std::string connection_group) + : _protocol(protocol), + _connection_type(connection_type), + _connection_group(connection_group) { + if (connection_group == "streaming") { + REGISTER_HOOK_METRIC(brpc_stream_endpoint_stub_count, + [this]() { return _stub_map.size(); }); + } else { + REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return _stub_map.size(); }); + } } template <> @@ -39,7 +50,12 @@ BrpcClientCache::~BrpcClientCache() { } template <> -BrpcClientCache::BrpcClientCache() { +BrpcClientCache::BrpcClientCache(std::string protocol, + std::string connection_type, + std::string connection_group) + : _protocol(protocol), + _connection_type(connection_type), + _connection_group(connection_group) { REGISTER_HOOK_METRIC(brpc_function_endpoint_stub_count, [this]() { return _stub_map.size(); }); } diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index ebef80f4a6bdfb..09c92fb398e085 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -59,7 +59,8 @@ namespace doris { template class BrpcClientCache { public: - BrpcClientCache(); + BrpcClientCache(std::string protocol = "baidu_std", std::string connection_type = "", + std::string connection_group = ""); virtual ~BrpcClientCache(); std::shared_ptr get_client(const butil::EndPoint& endpoint) { @@ -110,20 +111,24 @@ class BrpcClientCache { } std::shared_ptr get_new_client_no_cache(const std::string& host_port, - const std::string& protocol = "baidu_std", - const std::string& connect_type = "", + const std::string& protocol = "", + const std::string& connection_type = "", const std::string& connection_group = "") { brpc::ChannelOptions options; - if constexpr (std::is_same_v) { - options.protocol = config::function_service_protocol; - } else { + if (protocol != "") { options.protocol = protocol; + } else if (_protocol != "") { + options.protocol = _protocol; } - if (connect_type != "") { - options.connection_type = connect_type; + if (connection_type != "") { + options.connection_type = connection_type; + } else if (_connection_type != "") { + options.connection_type = _connection_type; } if (connection_group != "") { options.connection_group = connection_group; + } else if (_connection_group != "") { + options.connection_group = _connection_group; } options.connect_timeout_ms = 2000; options.timeout_ms = 2000; @@ -204,6 +209,9 @@ class BrpcClientCache { private: StubMap _stub_map; + const std::string _protocol; + const std::string _connection_type; + const std::string _connection_group; }; using InternalServiceClientCache = BrpcClientCache; diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index c568098195b61d..b201369454f926 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -178,6 +178,7 @@ class DorisMetrics { UIntGauge* stream_load_pipe_count = nullptr; UIntGauge* new_stream_load_pipe_count = nullptr; UIntGauge* brpc_endpoint_stub_count = nullptr; + UIntGauge* brpc_stream_endpoint_stub_count = nullptr; UIntGauge* brpc_function_endpoint_stub_count = nullptr; UIntGauge* tablet_writer_count = nullptr; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index eadcebc3bc888a..b53e1c778b8d34 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -185,8 +185,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, } POpenLoadStreamResponse response; // set connection_group "streaming" to distinguish with non-streaming connections - const auto& stub = - client_cache->get_new_client_no_cache(host_port, "baidu_std", "single", "streaming"); + const auto& stub = client_cache->get_client(host_port); stub->open_load_stream(&cntl, &request, &response, nullptr); for (const auto& resp : response.tablet_schemas()) { auto tablet_schema = std::make_unique(); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 84c8c94821ce45..057384ab9b9a25 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -297,7 +297,7 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream continue; } }); - auto st = stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, + auto st = stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id, *_schema, tablets_for_schema, _total_streams, idle_timeout_ms, _state->enable_profile()); if (st.ok()) {