diff --git a/thirdparty/patches/brpc-2560-Refactor-Socket-SetFailed-to-handle-error_code-0-properly.patch b/thirdparty/patches/brpc-2560-Refactor-Socket-SetFailed-to-handle-error_code-0-properly.patch new file mode 100644 index 00000000000000..79527fe61e0c64 --- /dev/null +++ b/thirdparty/patches/brpc-2560-Refactor-Socket-SetFailed-to-handle-error_code-0-properly.patch @@ -0,0 +1,303 @@ +From 9bd2d3112a30d2059b7f2713c626b6c412b6e2fc Mon Sep 17 00:00:00 2001 +From: koarz <3577087577@qq.com> +Date: Thu, 13 Nov 2025 16:34:53 +0800 +Subject: [PATCH] Refactor Socket::SetFailed to handle error_code=0 properly + +--- + src/brpc/socket.cpp | 117 ++++++++++++++++++------------------ + src/brpc/socket.h | 17 ++++++ + src/butil/string_printf.cpp | 49 ++++++++++----- + src/butil/string_printf.h | 4 ++ + 4 files changed, 112 insertions(+), 75 deletions(-) + +diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp +index ae3bb76..da42442 100644 +--- a/src/brpc/socket.cpp ++++ b/src/brpc/socket.cpp +@@ -862,82 +862,81 @@ int Socket::isolated_times() const { + } + + int Socket::SetFailed(int error_code, const char* error_fmt, ...) { +- if (error_code == 0) { +- CHECK(false) << "error_code is 0"; +- error_code = EFAILEDSOCKET; +- } +- const uint32_t id_ver = VersionOfSocketId(_this_id); ++ std::string error_text; ++ if (error_fmt != NULL) { ++ va_list ap; ++ va_start(ap, error_fmt); ++ butil::string_vprintf(&error_text, error_fmt, ap); ++ va_end(ap); ++ } ++ const uint32_t id_ver = VersionOfVRefId(_this_id); + uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed); +- for (;;) { // need iteration to retry compare_exchange_strong ++ for (;;) { + if (VersionOfVRef(vref) != id_ver) { + return -1; + } +- // Try to set version=id_ver+1 (to make later Address() return NULL), ++ // Try to set version=id_ver+1 (to make later address() return NULL), + // retry on fail. + if (_versioned_ref.compare_exchange_strong( + vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)), + butil::memory_order_release, + butil::memory_order_relaxed)) { +- // Update _error_text +- std::string error_text; +- if (error_fmt != NULL) { +- va_list ap; +- va_start(ap, error_fmt); +- butil::string_vprintf(&error_text, error_fmt, ap); +- va_end(ap); +- } +- pthread_mutex_lock(&_id_wait_list_mutex); +- _error_code = error_code; +- _error_text = error_text; +- pthread_mutex_unlock(&_id_wait_list_mutex); +- +- // Do health-checking even if we're not connected before, needed +- // by Channel to revive never-connected socket when server side +- // comes online. +- if (_health_check_interval_s > 0) { +- bool expect = false; +- if (_hc_started.compare_exchange_strong(expect, +- true, +- butil::memory_order_relaxed, +- butil::memory_order_relaxed)) { +- GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); +- StartHealthCheck(id(), +- GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); +- } else { +- // No need to run 2 health checking at the same time. +- RPC_VLOG << "There is already a health checking running " +- "for SocketId=" << _this_id; +- } +- } +- // Wake up all threads waiting on EPOLLOUT when closing fd +- _epollout_butex->fetch_add(1, butil::memory_order_relaxed); +- bthread::butex_wake_all(_epollout_butex); +- +- // Wake up all unresponded RPC. +- CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe( +- &_id_wait_list, error_code, error_text, +- &_id_wait_list_mutex)); +- +- ResetAllStreams(); +- // _app_connect shouldn't be set to NULL in SetFailed otherwise +- // HC is always not supported. +- // FIXME: Design a better interface for AppConnect +- // if (_app_connect) { +- // AppConnect* const saved_app_connect = _app_connect; +- // _app_connect = NULL; +- // saved_app_connect->StopConnect(this); +- // } +- ++ // Call OnFailed to handle the failure of Socket. ++ OnFailed(error_code, error_text); + // Deref additionally which is added at creation so that this +- // Socket's reference will hit 0(recycle) when no one addresses it. ++ // queue's reference will hit 0(recycle) when no one addresses it. + ReleaseAdditionalReference(); +- // NOTE: This Socket may be recycled at this point, don't ++ // NOTE: This object may be recycled at this point, don't + // touch anything. + return 0; + } + } + } + ++void Socket::OnFailed(int error_code, const std::string& error_text) { ++ // Update _error_text ++ pthread_mutex_lock(&_id_wait_list_mutex); ++ _error_code = error_code; ++ _error_text = error_text; ++ pthread_mutex_unlock(&_id_wait_list_mutex); ++ ++ // Do health-checking even if we're not connected before, needed ++ // by Channel to revive never-connected socket when server side ++ // comes online. ++ if (HCEnabled()) { ++ bool expect = false; ++ if (_hc_started.compare_exchange_strong(expect, ++ true, ++ butil::memory_order_relaxed, ++ butil::memory_order_relaxed)) { ++ GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); ++ StartHealthCheck(id(), ++ GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); ++ } else { ++ // No need to run 2 health checking at the same time. ++ RPC_VLOG << "There is already a health checking running " ++ "for SocketId=" << id(); ++ } ++ } ++ // Wake up all threads waiting on EPOLLOUT when closing fd ++ _epollout_butex->fetch_add(1, butil::memory_order_relaxed); ++ bthread::butex_wake_all(_epollout_butex); ++ ++ // Wake up all unresponded RPC. ++ CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe( ++ &_id_wait_list, error_code, error_text, ++ &_id_wait_list_mutex)); ++ ResetAllStreams(); ++ // _app_connect shouldn't be set to NULL in SetFailed otherwise ++ // HC is always not supported. ++ // FIXME: Design a better interface for AppConnect ++ // if (_app_connect) { ++ // AppConnect* const saved_app_connect = _app_connect; ++ // _app_connect = NULL; ++ // saved_app_connect->StopConnect(this); ++ // } ++} ++ + int Socket::SetFailed() { + return SetFailed(EFAILEDSOCKET, NULL); + } +diff --git a/src/brpc/socket.h b/src/brpc/socket.h +index 58ccd2f..c535669 100644 +--- a/src/brpc/socket.h ++++ b/src/brpc/socket.h +@@ -202,6 +202,12 @@ struct SocketOptions { + Destroyable* initial_parsing_context; + }; + ++typedef uint64_t VRefId; ++ ++BUTIL_FORCE_INLINE uint32_t VersionOfVRefId(VRefId vref_id) { ++ return (uint32_t)(vref_id >> 32); ++} ++ + // Abstractions on reading from and writing into file descriptors. + // NOTE: accessed by multiple threads(frequently), align it by cacheline. + class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket { +@@ -305,6 +311,14 @@ public: + // Initialized by SocketOptions.health_check_interval_s. + int health_check_interval() const { return _health_check_interval_s; } + ++ // True if health checking is enabled. ++ bool HCEnabled() const { ++ // This fence makes sure that we see change of ++ // `_is_hc_related_ref_held' before changing `_versioned_ref. ++ butil::atomic_thread_fence(butil::memory_order_acquire); ++ return _health_check_interval_s > 0 && _is_hc_related_ref_held; ++ } ++ + // When someone holds a health-checking-related reference, + // this function need to be called to make health checking run normally. + void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; } +@@ -559,6 +573,9 @@ public: + + bthread_keytable_pool_t* keytable_pool() const { return _keytable_pool; } + ++ // Called by SetFailed to handle failure logic ++ void OnFailed(int error_code, const std::string& error_text); ++ + private: + DISALLOW_COPY_AND_ASSIGN(Socket); + +diff --git a/src/butil/string_printf.cpp b/src/butil/string_printf.cpp +index 981420e..cebaf13 100644 +--- a/src/butil/string_printf.cpp ++++ b/src/butil/string_printf.cpp +@@ -67,24 +67,49 @@ inline int string_printf_impl(std::string& output, const char* format, + } + return 0; + } ++ ++inline int string_printf_impl(std::string& output, size_t hint, ++ const char* format, va_list args) { ++ if (hint > output.capacity()) { ++ output.reserve(hint); ++ } ++ return string_printf_impl(output, format, args); ++} + } // end anonymous namespace + + std::string string_printf(const char* format, ...) { + // snprintf will tell us how large the output buffer should be, but +- // we then have to call it a second time, which is costly. By ++ // we then have to call it a second time, which is costly. By + // guestimating the final size, we avoid the double snprintf in many +- // cases, resulting in a performance win. We use this constructor ++ // cases, resulting in a performance win. We use this constructor + // of std::string to avoid a double allocation, though it does pad +- // the resulting string with nul bytes. Our guestimation is twice ++ // the resulting string with nul bytes. Our guestimation is twice + // the format string size, or 32 bytes, whichever is larger. This +- // is a hueristic that doesn't affect correctness but attempts to be ++ // is a heuristic that doesn't affect correctness but attempts to be + // reasonably fast for the most common cases. + std::string ret; +- ret.reserve(std::max(32UL, strlen(format) * 2)); ++ va_list ap; ++ va_start(ap, format); ++ if (string_printf_impl(ret, std::max(32UL, strlen(format) * 2), ++ format, ap) != 0) { ++ ret.clear(); ++ } ++ va_end(ap); ++ return ret; ++} + ++std::string string_printf(size_t hint_size, const char* format, ...) { ++ // snprintf will tell us how large the output buffer should be, but ++ // we then have to call it a second time, which is costly. By ++ // passing the hint size of formatted string, we avoid the double ++ // snprintf in many cases, resulting in a performance win. We use ++ // this constructor of std::string to avoid a double allocation, ++ // though it does pad the resulting string with nul bytes. ++ std::string ret; + va_list ap; + va_start(ap, format); +- if (string_printf_impl(ret, format, ap) != 0) { ++ if (string_printf_impl(ret, std::max(hint_size, strlen(format) * 2), ++ format, ap) != 0) { + ret.clear(); + } + va_end(ap); +@@ -96,11 +121,7 @@ std::string string_printf(const char* format, ...) { + int string_appendf(std::string* output, const char* format, ...) { + va_list ap; + va_start(ap, format); +- const size_t old_size = output->size(); +- const int rc = string_printf_impl(*output, format, ap); +- if (rc != 0) { +- output->resize(old_size); +- } ++ const int rc = string_vappendf(output, format, ap); + va_end(ap); + return rc; + } +@@ -117,11 +138,7 @@ int string_vappendf(std::string* output, const char* format, va_list args) { + int string_printf(std::string* output, const char* format, ...) { + va_list ap; + va_start(ap, format); +- output->clear(); +- const int rc = string_printf_impl(*output, format, ap); +- if (rc != 0) { +- output->clear(); +- } ++ const int rc = string_vprintf(output, format, ap); + va_end(ap); + return rc; + }; +diff --git a/src/butil/string_printf.h b/src/butil/string_printf.h +index 64eae84..7f9e3dd 100644 +--- a/src/butil/string_printf.h ++++ b/src/butil/string_printf.h +@@ -27,6 +27,10 @@ namespace butil { + std::string string_printf(const char* format, ...) + __attribute__ ((format (printf, 1, 2))); + ++// Hint size of formatted string. ++std::string string_printf(size_t hint_size, const char* format, ...) ++ __attribute__ ((format (printf, 2, 3))); ++ + // Write |format| and associated arguments into |output| + // Returns 0 on success, -1 otherwise. + int string_printf(std::string* output, const char* fmt, ...) +-- +2.43.5 +