From 8fae316d0bb29a08ece1cd8111da944ff9a56a33 Mon Sep 17 00:00:00 2001 From: dongzhizhao Date: Sun, 29 May 2022 21:54:34 +0800 Subject: [PATCH] Fix brpc client cannot reconnect to server probabilistically when the server restarts Problem ------- Server restart will trigger Socket::SetFailed -> Socket check healthy -> Socket::WaitAndReset -> Reconnect to server -> Socket::Revive on the brpc client side Socket::Revive -> _versioned_ref.cas(vref, ) // step 1 -> _recycle_flag = false // step 2 After step 1 is successfully executed, the Socket can be acquired by Socket::Address and the gap between 1 and 2 will cause the ref count in _versioned_ref to not be decreased in Socket::ReleaseAdditionalReference since _recycle_flag is true Finally Socket::WaitAndReset can never wait for the expected ref count to mistakenly think that there are still another requests holding socket references and fall into an infinite loop Example: Session1(Thread1) Session2(Thread2) Socket::Revive T1: _versioned_ref.cas(vref, ) T2: Socker::Address success T3: Server restart or network error happened T4: Socket::SetFailed T5: Socket::ReleaseAdditionalReference // Now _recycle_flag is true T6: _recycle_flag.cas(false, true) -> Failed: No call Socket::Dereference T7: _recycle_flag = false T8: Socket::WaitAndReset -> nref always be three, not excepted two -> dead loop. cannot reconnected to server Solution -------- Add _recycle_mutex to avoid this race condition Socket::Revive and Socket::ReleaseAdditionalReference will only be called when some exceptions occur in the socket, so this mutex will not cause performance degradation --- src/brpc/socket.cpp | 6 ++++++ src/brpc/socket.h | 1 + 2 files changed, 7 insertions(+) diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 59a570ac71..0841642c61 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -751,6 +751,8 @@ void Socket::Revive() { LOG(WARNING) << *this << " was abandoned during revival"; return; } + + std::unique_lock recycle_lock(_recycle_mutex); // +1 is the additional ref added in Create(). TODO(gejun): we should // remove this additional nref someday. if (_versioned_ref.compare_exchange_weak( @@ -759,6 +761,7 @@ void Socket::Revive() { butil::memory_order_relaxed)) { // Set this flag to true since we add additional ref again _recycle_flag.store(false, butil::memory_order_relaxed); + recycle_lock.unlock(); if (_user) { _user->AfterRevived(this); } else { @@ -771,11 +774,14 @@ void Socket::Revive() { int Socket::ReleaseAdditionalReference() { bool expect = false; + + std::unique_lock recycle_lock(_recycle_mutex); // Use `relaxed' fence here since `Dereference' has `released' fence if (_recycle_flag.compare_exchange_strong( expect, true, butil::memory_order_relaxed, butil::memory_order_relaxed)) { + recycle_lock.unlock(); return Dereference(); } return -1; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 4be6a73165..9034f0d64c 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -787,6 +787,7 @@ friend void DereferenceSocket(Socket*); // Flag used to mark whether additional reference has been decreased // by either `SetFailed' or `SetRecycle' butil::atomic _recycle_flag; + butil::Mutex _recycle_mutex; // Concrete error information from SetFailed() // Accesses to these 2 fields(especially _error_text) must be protected