diff --git a/src/brpc/details/health_check.cpp b/src/brpc/details/health_check.cpp index 4c88604eb3..2800e6927f 100644 --- a/src/brpc/details/health_check.cpp +++ b/src/brpc/details/health_check.cpp @@ -217,9 +217,11 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { if (!FLAGS_health_check_path.empty()) { HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s); } + ptr->AfterHCCompleted(); return false; } else if (hc == ESTOP) { LOG(INFO) << "Cancel checking " << *ptr; + ptr->AfterHCCompleted(); return false; } ++ ptr->_hc_count; diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 7ab4e91e4b..0f35c8ac91 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -435,6 +435,7 @@ Socket::Socket(Forbidden) , _correlation_id(0) , _health_check_interval_s(-1) , _is_hc_related_ref_held(false) + , _hc_started(false) , _ninprocess(1) , _auth_flag_error(0) , _auth_id(INVALID_BTHREAD_ID) @@ -446,7 +447,7 @@ Socket::Socket(Forbidden) , _overcrowded(false) , _fail_me_at_server_stop(false) , _logoff_flag(false) - , _recycle_flag(false) + , _additional_ref_status(REF_USING) , _error_code(0) , _pipeline_q(NULL) , _last_writetime_us(0) @@ -616,6 +617,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { m->_correlation_id = 0; m->_health_check_interval_s = options.health_check_interval_s; m->_is_hc_related_ref_held = false; + m->_hc_started.store(false, butil::memory_order_relaxed); m->_ninprocess.store(1, butil::memory_order_relaxed); m->_auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); @@ -634,7 +636,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { // May be non-zero for RTMP connections. m->_fail_me_at_server_stop = false; m->_logoff_flag.store(false, butil::memory_order_relaxed); - m->_recycle_flag.store(false, butil::memory_order_relaxed); + m->_additional_ref_status.store(REF_USING, butil::memory_order_relaxed); m->_error_code = 0; m->_error_text.clear(); m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed); @@ -752,11 +754,14 @@ int Socket::WaitAndReset(int32_t expected_nref) { void Socket::Revive() { const uint32_t id_ver = VersionOfSocketId(_this_id); uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed); + _additional_ref_status.store(REF_REVIVING, butil::memory_order_relaxed); while (1) { CHECK_EQ(id_ver + 1, VersionOfVRef(vref)); int32_t nref = NRefOfVRef(vref); if (nref <= 1) { + // Set status to REF_RECYLED since no one uses this socket + _additional_ref_status.store(REF_RECYCLED, butil::memory_order_relaxed); CHECK_EQ(1, nref); LOG(WARNING) << *this << " was abandoned during revival"; return; @@ -767,8 +772,8 @@ void Socket::Revive() { vref, MakeVRef(id_ver, nref + 1/*note*/), butil::memory_order_release, butil::memory_order_relaxed)) { - // Set this flag to true since we add additional ref again - _recycle_flag.store(false, butil::memory_order_relaxed); + // Set status to REF_USING since we add additional ref again + _additional_ref_status.store(REF_USING, butil::memory_order_relaxed); if (_user) { _user->AfterRevived(this); } else { @@ -780,15 +785,22 @@ void Socket::Revive() { } int Socket::ReleaseAdditionalReference() { - bool expect = false; - // Use `relaxed' fence here since `Dereference' has `released' fence - if (_recycle_flag.compare_exchange_strong( - expect, true, + do { + AdditionalRefStatus expect = REF_USING; + if (_additional_ref_status.compare_exchange_strong( + expect, + REF_RECYCLED, butil::memory_order_relaxed, butil::memory_order_relaxed)) { - return Dereference(); - } - return -1; + return Dereference(); + } + + if (expect == REF_REVIVING) { // sched_yield to wait until status is not REF_REVIVING + sched_yield(); + } else { + return -1; // REF_RECYCLED + } + } while (1); } void Socket::AddRecentError() { @@ -848,9 +860,19 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { // by Channel to revive never-connected socket when server side // comes online. if (_health_check_interval_s > 0) { - GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); - StartHealthCheck(id(), + bool expect = false; + if (_hc_started.compare_exchange_strong(expect, + true, + butil::memory_order_relaxed, + butil::memory_order_relaxed)) { + GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); + StartHealthCheck(id(), GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); + } else { + // No need to run 2 health checking at the same time. + RPC_VLOG << "There is already a health checking running " + "for SocketId" << _this_id; + } } // Wake up all threads waiting on EPOLLOUT when closing fd _epollout_butex->fetch_add(1, butil::memory_order_relaxed); @@ -2119,12 +2141,14 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { const SSLState ssl_state = ptr->ssl_state(); os << "\npipeline_q=" << npipelined << "\nhc_interval_s=" << ptr->_health_check_interval_s + << "\nis_hc_related_ref_held=" << ptr->_is_hc_related_ref_held << "\nninprocess=" << ptr->_ninprocess.load(butil::memory_order_relaxed) << "\nauth_flag_error=" << ptr->_auth_flag_error.load(butil::memory_order_relaxed) << "\nauth_id=" << ptr->_auth_id.value << "\nauth_context=" << ptr->_auth_context << "\nlogoff_flag=" << ptr->_logoff_flag.load(butil::memory_order_relaxed) - << "\nrecycle_flag=" << ptr->_recycle_flag.load(butil::memory_order_relaxed) + << "\n_additional_ref_status=" + << ptr->_additional_ref_status.load(butil::memory_order_relaxed) << "\nninflight_app_health_check=" << ptr->_ninflight_app_health_check.load(butil::memory_order_relaxed) << "\nagent_socket_id="; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index e1b3f9ef18..ffc84e31fc 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -294,6 +294,9 @@ friend class policy::H2GlobalStreamCreator; void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; } bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; } + // After health checking is complete, set _hc_started to false. + void AfterHCCompleted() { _hc_started.store(false, butil::memory_order_relaxed); } + // The unique identifier. SocketId id() const { return _this_id; } @@ -363,8 +366,9 @@ friend class policy::H2GlobalStreamCreator; bool Failed() const; - bool DidReleaseAdditionalRereference() const - { return _recycle_flag.load(butil::memory_order_relaxed); } + bool DidReleaseAdditionalRereference() const { + return _additional_ref_status.load(butil::memory_order_relaxed) == REF_RECYCLED; + } // Notify `id' object (by calling bthread_id_error) when this Socket // has been `SetFailed'. If it already has, notify `id' immediately @@ -760,6 +764,10 @@ friend void DereferenceSocket(Socket*); // synchronized via _versioned_ref atomic variable. bool _is_hc_related_ref_held; + // Default: false. + // true, if health checking is started. + butil::atomic _hc_started; + // +-1 bit-+---31 bit---+ // | flag | counter | // +-------+------------+ @@ -797,9 +805,20 @@ friend void DereferenceSocket(Socket*); // Set by SetLogOff butil::atomic _logoff_flag; - // Flag used to mark whether additional reference has been decreased - // by either `SetFailed' or `SetRecycle' - butil::atomic _recycle_flag; + // Status flag used to mark that + enum AdditionalRefStatus { + REF_USING, // additional reference has been increased + REF_REVIVING, // additional reference is increasing + REF_RECYCLED // additional reference has been decreased + }; + + // Indicates whether additional reference has increased, + // decreased, or is increasing. + // additional ref status: + // `Socket'、`Create': REF_USING + // `SetFailed': REF_USING -> REF_RECYCLED + // `Revive' REF_RECYCLED -> REF_REVIVING -> REF_USING + butil::atomic _additional_ref_status; // Concrete error information from SetFailed() // Accesses to these 2 fields(especially _error_text) must be protected