diff --git a/src/brpc/load_balancer.cpp b/src/brpc/load_balancer.cpp index 16e051d65e..2532d9efad 100644 --- a/src/brpc/load_balancer.cpp +++ b/src/brpc/load_balancer.cpp @@ -19,6 +19,7 @@ #include #include "brpc/reloadable_flags.h" #include "brpc/load_balancer.h" +#include "brpc/socket.h" namespace brpc { @@ -34,6 +35,15 @@ BRPC_VALIDATE_GFLAG(show_lb_in_vars, PassValidate); // For assigning unique names for lb. static butil::static_atomic g_lb_counter = BUTIL_STATIC_ATOMIC_INIT(0); +bool LoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) { + SocketUniquePtr ptr; + bool res = Socket::Address(id, &ptr) == 0 && ptr->IsAvailable(); + if (res) { + *out = std::move(ptr); + } + return res; +} + void SharedLoadBalancer::DescribeLB(std::ostream& os, void* arg) { (static_cast(arg))->Describe(os, DescribeOptions()); } diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h index cda0517e87..2a76fa4305 100644 --- a/src/brpc/load_balancer.h +++ b/src/brpc/load_balancer.h @@ -105,6 +105,10 @@ class LoadBalancer : public NonConstDescribable, public Destroyable { protected: virtual ~LoadBalancer() { } + + // Returns true and set `out' if the server is available (not failed, not logoff). + // Otherwise, returns false. + static bool IsServerAvailable(SocketId id, SocketUniquePtr* out); }; DECLARE_bool(show_lb_in_vars); diff --git a/src/brpc/policy/consistent_hashing_load_balancer.cpp b/src/brpc/policy/consistent_hashing_load_balancer.cpp index 085ddf950e..d29ad55e3c 100644 --- a/src/brpc/policy/consistent_hashing_load_balancer.cpp +++ b/src/brpc/policy/consistent_hashing_load_balancer.cpp @@ -323,8 +323,7 @@ int ConsistentHashingLoadBalancer::SelectServer( for (size_t i = 0; i < s->size(); ++i) { if (((i + 1) == s->size() // always take last chance || !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id)) - && Socket::Address(choice->server_sock.id, out->ptr) == 0 - && (*out->ptr)->IsAvailable()) { + && IsServerAvailable(choice->server_sock.id, out->ptr)) { return 0; } else { if (++choice == s->end()) { diff --git a/src/brpc/policy/locality_aware_load_balancer.cpp b/src/brpc/policy/locality_aware_load_balancer.cpp index 68d85ad370..beea51690e 100644 --- a/src/brpc/policy/locality_aware_load_balancer.cpp +++ b/src/brpc/policy/locality_aware_load_balancer.cpp @@ -302,8 +302,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) if (index < n) { continue; } - } else if (Socket::Address(info.server_id, out->ptr) == 0 - && (*out->ptr)->IsAvailable()) { + } else if (IsServerAvailable(info.server_id, out->ptr)) { if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer // choosing the server again. || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) { diff --git a/src/brpc/policy/randomized_load_balancer.cpp b/src/brpc/policy/randomized_load_balancer.cpp index 65cfdee975..4ff43d753f 100644 --- a/src/brpc/policy/randomized_load_balancer.cpp +++ b/src/brpc/policy/randomized_load_balancer.cpp @@ -113,8 +113,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { const SocketId id = s->server_list[offset].id; if (((i + 1) == n // always take last chance || !ExcludedServers::IsExcluded(in.excluded, id)) - && Socket::Address(id, out->ptr) == 0 - && (*out->ptr)->IsAvailable()) { + && IsServerAvailable(id, out->ptr)) { // We found an available server return 0; } diff --git a/src/brpc/policy/round_robin_load_balancer.cpp b/src/brpc/policy/round_robin_load_balancer.cpp index fa69aa86c2..cf67624085 100644 --- a/src/brpc/policy/round_robin_load_balancer.cpp +++ b/src/brpc/policy/round_robin_load_balancer.cpp @@ -120,8 +120,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { const SocketId id = s->server_list[tls.offset].id; if (((i + 1) == n // always take last chance || !ExcludedServers::IsExcluded(in.excluded, id)) - && Socket::Address(id, out->ptr) == 0 - && (*out->ptr)->IsAvailable()) { + && IsServerAvailable(id, out->ptr)) { s.tls() = tls; return 0; } diff --git a/src/brpc/policy/weighted_randomized_load_balancer.cpp b/src/brpc/policy/weighted_randomized_load_balancer.cpp index 819c550c3e..46923acb86 100644 --- a/src/brpc/policy/weighted_randomized_load_balancer.cpp +++ b/src/brpc/policy/weighted_randomized_load_balancer.cpp @@ -117,10 +117,6 @@ size_t WeightedRandomizedLoadBalancer::RemoveServersInBatch( return _db_servers.Modify(BatchRemove, servers); } -bool WeightedRandomizedLoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) { - return Socket::Address(id, out) == 0 && (*out)->IsAvailable(); -} - int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { butil::DoublyBufferedData::ScopedPtr s; if (_db_servers.Read(&s) != 0) { @@ -144,13 +140,13 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* continue; } random_traversed.insert(id); - if (0 == IsServerAvailable(id, out->ptr)) { + if (IsServerAvailable(id, out->ptr)) { // An available server is found. return 0; } } - if (random_traversed.size() == n) { + if (random_traversed.size() < n) { // Try to traverse the remaining servers to find an available server. uint32_t offset = butil::fast_rand_less_than(n); uint32_t stride = bthread::prime_offset(); @@ -161,19 +157,18 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* continue; } if (IsServerAvailable(id, out->ptr)) { - // An available server is found. - return 0; + if (!ExcludedServers::IsExcluded(in.excluded, id)) { + // Prioritize servers that are not excluded. + return 0; + } } } } - if (NULL != out->ptr) { - // Use the excluded but available server. - return 0; - } - - // After traversing the whole server list, no available server is found. - return EHOSTDOWN; + // Returns EHOSTDOWN, if no available server is found + // after traversing the whole server list. + // Otherwise, returns 0 with a available excluded server. + return NULL == out->ptr ? EHOSTDOWN : 0; } LoadBalancer* WeightedRandomizedLoadBalancer::New( diff --git a/src/brpc/policy/weighted_randomized_load_balancer.h b/src/brpc/policy/weighted_randomized_load_balancer.h index 3842affa0a..9d7a705b15 100644 --- a/src/brpc/policy/weighted_randomized_load_balancer.h +++ b/src/brpc/policy/weighted_randomized_load_balancer.h @@ -41,7 +41,7 @@ class WeightedRandomizedLoadBalancer : public LoadBalancer { void Describe(std::ostream& os, const DescribeOptions&) override; struct Server { - Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0) + explicit Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0) : id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {} SocketId id; uint32_t weight; @@ -61,7 +61,6 @@ class WeightedRandomizedLoadBalancer : public LoadBalancer { static bool Remove(Servers& bg, const ServerId& id); static size_t BatchAdd(Servers& bg, const std::vector& servers); static size_t BatchRemove(Servers& bg, const std::vector& servers); - static bool IsServerAvailable(SocketId id, SocketUniquePtr* out); butil::DoublyBufferedData _db_servers; }; diff --git a/test/brpc_load_balancer_unittest.cpp b/test/brpc_load_balancer_unittest.cpp index cca44a0753..07059484d7 100644 --- a/test/brpc_load_balancer_unittest.cpp +++ b/test/brpc_load_balancer_unittest.cpp @@ -431,7 +431,7 @@ struct SelectArg { }; void* select_server(void* arg) { - SelectArg *sa = (SelectArg *)arg; + SelectArg *sa = (SelectArg*)arg; brpc::LoadBalancer* c = sa->lb; brpc::SocketUniquePtr ptr; CountMap *selected_count = new CountMap; @@ -951,6 +951,7 @@ TEST_F(LoadBalancerTest, weighted_randomized) { brpc::policy::WeightedRandomizedLoadBalancer wrlb; size_t valid_weight_num = 4; + std::vector ids; // Add server to selected list. The server with invalid weight will be skipped. for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) { const char *addr = servers[i]; @@ -961,6 +962,7 @@ TEST_F(LoadBalancerTest, weighted_randomized) { options.remote_side = dummy; options.user = new SaveRecycle; ASSERT_EQ(0, brpc::Socket::Create(options, &id.id)); + ids.emplace_back(id.id); id.tag = weight[i]; if (i < valid_weight_num) { int weight_num = 0; @@ -1010,6 +1012,16 @@ TEST_F(LoadBalancerTest, weighted_randomized) { // actual_rate <= expect_rate * 2 ASSERT_LE(actual_rate, expect_rate * 2); } + + for (size_t i = 1; i < ids.size(); ++i) { + brpc::Socket::SetFailed(ids[i]); + } + select_result.clear(); + for (int i = 0; i < run_times; ++i) { + EXPECT_EQ(0, wrlb.SelectServer(in, &out)); + // The only choice is servers[0]. + ASSERT_STREQ(butil::endpoint2str(ptr->remote_side()).c_str(), servers[0]); + } } TEST_F(LoadBalancerTest, health_check_no_valid_server) {