From fbc508d0051699635e240e274dab33a65bed99f1 Mon Sep 17 00:00:00 2001 From: chenBright Date: Sat, 28 Jun 2025 16:26:27 +0800 Subject: [PATCH] Bugfix: Socket without health check would be abnormally recycled --- src/brpc/selective_channel.cpp | 9 +++++++-- src/brpc/socket_map.cpp | 18 +++++++++++++++--- src/brpc/socket_map.h | 1 + test/brpc_socket_map_unittest.cpp | 27 +++++++++++++++++++++++++++ 4 files changed, 50 insertions(+), 5 deletions(-) diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index 8877994369..21b871af3f 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -197,8 +197,13 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, } SocketUniquePtr ptr; int rc = Socket::AddressFailedAsWell(sock_id, &ptr); - if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) { - LOG(FATAL) << "Fail to address SocketId=" << sock_id; + if (rc < 0) { + LOG(ERROR) << "Fail to address SocketId=" << sock_id; + return -1; + } + if (rc > 0 && !ptr->HCEnabled()) { + LOG(ERROR) << "Health check of SocketId=" + << sock_id << " is disabled"; return -1; } if (!AddServer(ServerId(sock_id))) { diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index d150301f97..c5c94bc747 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -239,7 +239,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, return 0; } // A socket w/o HC is failed (permanently), replace it. - sc->socket->ReleaseHCRelatedReference(); + ReleaseReference(sc->socket); _map.erase(key); // in principle, we can override the entry in map w/o // removing and inserting it again. But this would make error branches // below have to remove the entry before returning, which is @@ -268,7 +268,10 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, LOG(FATAL) << "Failed socket is not HC-enabled"; return -1; } - SingleConnection new_sc = { 1, ptr.get(), 0 }; + // If health check is enabled, a health-checking-related reference + // is hold in Socket::Create. + // If health check is disabled, hold a reference in SocketMap. + SingleConnection new_sc = { 1, ptr->HCEnabled() ? ptr.get() : ptr.release(), 0 }; _map[key] = new_sc; *id = tmp_id; mu.unlock(); @@ -306,11 +309,20 @@ void SocketMap::RemoveInternal(const SocketMapKey& key, _map.erase(key); mu.unlock(); s->ReleaseAdditionalReference(); // release extra ref - s->ReleaseHCRelatedReference(); + ReleaseReference(s); } } } +void SocketMap::ReleaseReference(Socket* s) { + if (s->HCEnabled()) { + s->ReleaseHCRelatedReference(); + } else { + // Release the extra ref hold in SocketMap::Insert. + SocketUniquePtr ptr(s); + } +} + int SocketMap::Find(const SocketMapKey& key, SocketId* id) { BAIDU_SCOPED_LOCK(_mutex); SingleConnection* sc = _map.seek(key); diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h index 37fd77e723..b0d542e78e 100644 --- a/src/brpc/socket_map.h +++ b/src/brpc/socket_map.h @@ -177,6 +177,7 @@ class SocketMap { private: void RemoveInternal(const SocketMapKey& key, SocketId id, bool remove_orphan); + static void ReleaseReference(Socket* s); void ListOrphans(int64_t defer_us, std::vector* out); void WatchConnections(); static void* RunWatchConnections(void*); diff --git a/test/brpc_socket_map_unittest.cpp b/test/brpc_socket_map_unittest.cpp index 5881445040..d40db0e723 100644 --- a/test/brpc_socket_map_unittest.cpp +++ b/test/brpc_socket_map_unittest.cpp @@ -26,6 +26,7 @@ #include "brpc/reloadable_flags.h" namespace brpc { +DECLARE_int32(health_check_interval); DECLARE_int32(idle_timeout_second); DECLARE_int32(defer_close_second); DECLARE_int32(max_connection_pool_size); @@ -59,6 +60,31 @@ class SocketMapTest : public ::testing::Test{ virtual void TearDown(){}; }; +TEST_F(SocketMapTest, disable_health_check) { + int32_t old_interval = brpc::FLAGS_health_check_interval; + brpc::FLAGS_health_check_interval = 0; + brpc::SocketId id; + ASSERT_EQ(-1, brpc::SocketMapFind(g_key, &id)); + ASSERT_EQ(0, brpc::SocketMapInsert(g_key, &id)); + ASSERT_EQ(0, brpc::SocketMapFind(g_key, &id)); + { + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); + } + ASSERT_EQ(0, brpc::Socket::SetFailed(id)); + + brpc::SocketUniquePtr ptr; + // The socket should not be recycled, + // because SocketMap holds a reference to it. + ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(id, &ptr)); + ASSERT_EQ(2, ptr->nref()); + brpc::SocketMapRemove(g_key); + // After removing the socket, `ptr' holds the last reference. + ASSERT_EQ(1, ptr->nref()); + ASSERT_EQ(-1, brpc::SocketMapFind(g_key, &id)); + brpc::FLAGS_health_check_interval = old_interval; +} + TEST_F(SocketMapTest, idle_timeout) { const int TIMEOUT = 1; const int NTHREAD = 10; @@ -140,6 +166,7 @@ TEST_F(SocketMapTest, max_pool_size) { EXPECT_TRUE(ptrs[i]->Failed()); } } + } //namespace int main(int argc, char* argv[]) {