Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ class ExecEnv {
BrpcClientCache<PBackendService_Stub>* brpc_internal_client_cache() const {
return _internal_client_cache;
}
BrpcClientCache<PBackendService_Stub>* brpc_streaming_client_cache() const {
return _streaming_client_cache;
}
BrpcClientCache<PFunctionService_Stub>* brpc_function_client_cache() const {
return _function_client_cache;
}
Expand Down Expand Up @@ -403,6 +406,7 @@ class ExecEnv {
// TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle.
std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
BrpcClientCache<PBackendService_Stub>* _streaming_client_cache = nullptr;
BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;

std::shared_ptr<StreamLoadExecutor> _stream_load_executor;
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads);
_new_load_stream_mgr = NewLoadStreamMgr::create_shared();
_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
_streaming_client_cache =
new BrpcClientCache<PBackendService_Stub>("baidu_std", "single", "streaming");
_function_client_cache =
new BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol);
if (config::is_cloud_mode()) {
_stream_load_executor = std::make_shared<CloudStreamLoadExecutor>(this);
} else {
Expand Down Expand Up @@ -719,6 +722,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_routine_load_task_executor);
// _stream_load_executor
SAFE_DELETE(_function_client_cache);
SAFE_DELETE(_streaming_client_cache);
SAFE_DELETE(_internal_client_cache);

SAFE_DELETE(_bfd_parser);
Expand Down
22 changes: 19 additions & 3 deletions be/src/util/brpc_client_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@

namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_endpoint_stub_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_stream_endpoint_stub_count, MetricUnit::NOUNIT);

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_function_endpoint_stub_count, MetricUnit::NOUNIT);

template <>
BrpcClientCache<PBackendService_Stub>::BrpcClientCache() {
REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return _stub_map.size(); });
BrpcClientCache<PBackendService_Stub>::BrpcClientCache(std::string protocol,
std::string connection_type,
std::string connection_group)
: _protocol(protocol),
_connection_type(connection_type),
_connection_group(connection_group) {
if (connection_group == "streaming") {
REGISTER_HOOK_METRIC(brpc_stream_endpoint_stub_count,
[this]() { return _stub_map.size(); });
} else {
REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return _stub_map.size(); });
}
}

template <>
Expand All @@ -39,7 +50,12 @@ BrpcClientCache<PBackendService_Stub>::~BrpcClientCache() {
}

template <>
BrpcClientCache<PFunctionService_Stub>::BrpcClientCache() {
BrpcClientCache<PFunctionService_Stub>::BrpcClientCache(std::string protocol,
std::string connection_type,
std::string connection_group)
: _protocol(protocol),
_connection_type(connection_type),
_connection_group(connection_group) {
REGISTER_HOOK_METRIC(brpc_function_endpoint_stub_count, [this]() { return _stub_map.size(); });
}

Expand Down
24 changes: 16 additions & 8 deletions be/src/util/brpc_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ namespace doris {
template <class T>
class BrpcClientCache {
public:
BrpcClientCache();
BrpcClientCache(std::string protocol = "baidu_std", std::string connection_type = "",
std::string connection_group = "");
virtual ~BrpcClientCache();

std::shared_ptr<T> get_client(const butil::EndPoint& endpoint) {
Expand Down Expand Up @@ -110,20 +111,24 @@ class BrpcClientCache {
}

std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
const std::string& protocol = "baidu_std",
const std::string& connect_type = "",
const std::string& protocol = "",
const std::string& connection_type = "",
const std::string& connection_group = "") {
brpc::ChannelOptions options;
if constexpr (std::is_same_v<T, PFunctionService_Stub>) {
options.protocol = config::function_service_protocol;
} else {
if (protocol != "") {
options.protocol = protocol;
} else if (_protocol != "") {
options.protocol = _protocol;
}
if (connect_type != "") {
options.connection_type = connect_type;
if (connection_type != "") {
options.connection_type = connection_type;
} else if (_connection_type != "") {
options.connection_type = _connection_type;
}
if (connection_group != "") {
options.connection_group = connection_group;
} else if (_connection_group != "") {
options.connection_group = _connection_group;
}
options.connect_timeout_ms = 2000;
options.timeout_ms = 2000;
Expand Down Expand Up @@ -204,6 +209,9 @@ class BrpcClientCache {

private:
StubMap<T> _stub_map;
const std::string _protocol;
const std::string _connection_type;
const std::string _connection_group;
};

using InternalServiceClientCache = BrpcClientCache<PBackendService_Stub>;
Expand Down
1 change: 1 addition & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class DorisMetrics {
UIntGauge* stream_load_pipe_count = nullptr;
UIntGauge* new_stream_load_pipe_count = nullptr;
UIntGauge* brpc_endpoint_stub_count = nullptr;
UIntGauge* brpc_stream_endpoint_stub_count = nullptr;
UIntGauge* brpc_function_endpoint_stub_count = nullptr;
UIntGauge* tablet_writer_count = nullptr;

Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
}
POpenLoadStreamResponse response;
// set connection_group "streaming" to distinguish with non-streaming connections
const auto& stub =
client_cache->get_new_client_no_cache(host_port, "baidu_std", "single", "streaming");
const auto& stub = client_cache->get_client(host_port);
stub->open_load_stream(&cntl, &request, &response, nullptr);
for (const auto& resp : response.tablet_schemas()) {
auto tablet_schema = std::make_unique<TabletSchema>();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream
continue;
}
});
auto st = stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info,
auto st = stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info,
_txn_id, *_schema, tablets_for_schema, _total_streams,
idle_timeout_ms, _state->enable_profile());
if (st.ok()) {
Expand Down