From 1f35b54682e87b1df9d2e4ba9dd8bef7e6fae6f5 Mon Sep 17 00:00:00 2001 From: egolearner <45122959+egolearner@users.noreply.github.com> Date: Thu, 12 Jun 2025 12:21:17 +0800 Subject: [PATCH 1/2] fix: consistent hashing support server tag --- src/brpc/policy/consistent_hashing_load_balancer.cpp | 8 ++++---- src/brpc/policy/consistent_hashing_load_balancer.h | 5 ++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/brpc/policy/consistent_hashing_load_balancer.cpp b/src/brpc/policy/consistent_hashing_load_balancer.cpp index 2560d8f29b..5b2f6f9005 100644 --- a/src/brpc/policy/consistent_hashing_load_balancer.cpp +++ b/src/brpc/policy/consistent_hashing_load_balancer.cpp @@ -71,8 +71,8 @@ bool DefaultReplicaPolicy::Build(ServerId server, replicas->clear(); for (size_t i = 0; i < num_replicas; ++i) { char host[256]; - int len = snprintf(host, sizeof(host), "%s-%lu", - endpoint2str(ptr->remote_side()).c_str(), i); + int len = snprintf(host, sizeof(host), "%s-%lu-%s", + endpoint2str(ptr->remote_side()).c_str(), i, server.tag.c_str()); ConsistentHashingLoadBalancer::Node node; node.hash = _hash_func(host, len); node.server_sock = server; @@ -104,8 +104,8 @@ bool KetamaReplicaPolicy::Build(ServerId server, << "Ketam hash replicas number(" << num_replicas << ") should be n*4"; for (size_t i = 0; i < num_replicas / points_per_hash; ++i) { char host[256]; - int len = snprintf(host, sizeof(host), "%s-%lu", - endpoint2str(ptr->remote_side()).c_str(), i); + int len = snprintf(host, sizeof(host), "%s-%lu-%s", + endpoint2str(ptr->remote_side()).c_str(), i, server.tag.c_str()); unsigned char digest[MD5_DIGEST_LENGTH]; MD5HashSignature(host, len, digest); for (size_t j = 0; j < points_per_hash; ++j) { diff --git a/src/brpc/policy/consistent_hashing_load_balancer.h b/src/brpc/policy/consistent_hashing_load_balancer.h index 5da7548a63..a4808c1a70 100644 --- a/src/brpc/policy/consistent_hashing_load_balancer.h +++ b/src/brpc/policy/consistent_hashing_load_balancer.h @@ -50,7 +50,10 @@ class ConsistentHashingLoadBalancer : public LoadBalancer { bool operator<(const Node &rhs) const { if (hash < rhs.hash) { return true; } if (hash > rhs.hash) { return false; } - return server_addr < rhs.server_addr; + if (server_addr < rhs.server_addr) { return true; } + if (server_addr > rhs.server_addr) { return false; } + // compare by tag if has the same ip-port + return server_sock.tag < rhs.server_sock.tag; } bool operator<(const uint32_t code) const { return hash < code; From 451220571d4bca3303bd30cac8504bc05c7939b5 Mon Sep 17 00:00:00 2001 From: egolearner <45122959+egolearner@users.noreply.github.com> Date: Mon, 16 Jun 2025 16:01:43 +0800 Subject: [PATCH 2/2] add flag --- .../consistent_hashing_load_balancer.cpp | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/brpc/policy/consistent_hashing_load_balancer.cpp b/src/brpc/policy/consistent_hashing_load_balancer.cpp index 5b2f6f9005..085ddf950e 100644 --- a/src/brpc/policy/consistent_hashing_load_balancer.cpp +++ b/src/brpc/policy/consistent_hashing_load_balancer.cpp @@ -33,6 +33,8 @@ namespace policy { // TODO: or 160? DEFINE_int32(chash_num_replicas, 100, "default number of replicas per server in chash"); +DEFINE_bool(consistent_hashing_enable_server_tag, false, + "if consistent hashing enable server with tag"); // Defined in hasher.cpp. const char* GetHashName(HashFunc hasher); @@ -71,8 +73,14 @@ bool DefaultReplicaPolicy::Build(ServerId server, replicas->clear(); for (size_t i = 0; i < num_replicas; ++i) { char host[256]; - int len = snprintf(host, sizeof(host), "%s-%lu-%s", + int len = 0; + if (!FLAGS_consistent_hashing_enable_server_tag) { + len = snprintf(host, sizeof(host), "%s-%lu", + endpoint2str(ptr->remote_side()).c_str(), i); + } else { + len = snprintf(host, sizeof(host), "%s-%lu-%s", endpoint2str(ptr->remote_side()).c_str(), i, server.tag.c_str()); + } ConsistentHashingLoadBalancer::Node node; node.hash = _hash_func(host, len); node.server_sock = server; @@ -104,8 +112,14 @@ bool KetamaReplicaPolicy::Build(ServerId server, << "Ketam hash replicas number(" << num_replicas << ") should be n*4"; for (size_t i = 0; i < num_replicas / points_per_hash; ++i) { char host[256]; - int len = snprintf(host, sizeof(host), "%s-%lu-%s", + int len = 0; + if (!FLAGS_consistent_hashing_enable_server_tag) { + len = snprintf(host, sizeof(host), "%s-%lu", + endpoint2str(ptr->remote_side()).c_str(), i); + } else { + len = snprintf(host, sizeof(host), "%s-%lu-%s", endpoint2str(ptr->remote_side()).c_str(), i, server.tag.c_str()); + } unsigned char digest[MD5_DIGEST_LENGTH]; MD5HashSignature(host, len, digest); for (size_t j = 0; j < points_per_hash; ++j) {