From 1ad900143c951e0ebc77c34b8112c79cc96054b8 Mon Sep 17 00:00:00 2001 From: Venkata Kaushik Chaganti Date: Tue, 2 Jun 2026 10:57:50 -0700 Subject: [PATCH 1/2] brpc 1.4.0 agent combiner thread safety patch --- ...4.0-fix-agent-combiner-thread-safety.patch | 434 ++++++++++++++++++ 1 file changed, 434 insertions(+) create mode 100644 thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch diff --git a/thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch b/thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch new file mode 100644 index 00000000000000..bac44588e8f429 --- /dev/null +++ b/thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch @@ -0,0 +1,434 @@ +diff --git a/src/bvar/detail/combiner.h b/src/bvar/detail/combiner.h +index 6a6ab80..65ed9a1 100644 +--- a/src/bvar/detail/combiner.h ++++ b/src/bvar/detail/combiner.h +@@ -22,6 +22,7 @@ + + #include // std::string + #include // std::vector ++#include + #include "butil/atomicops.h" // butil::atomic + #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK + #include "butil/type_traits.h" // butil::add_cr_non_integral +@@ -42,7 +43,6 @@ public: + typedef typename Combiner::Agent agent_type; + + GlobalValue(agent_type* a, Combiner* c) : _a(a), _c(c) {} +- ~GlobalValue() {} + + // Call this method to unlock tls element and lock the combiner. + // Unlocking tls element avoids potential deadlock with +@@ -113,7 +113,7 @@ class ElementContainer< + T, typename butil::enable_if::value>::type> { + public: + // We don't need any memory fencing here, every op is relaxed. +- ++ + inline void load(T* out) { + *out = _value.load(butil::memory_order_relaxed); + } +@@ -153,24 +153,26 @@ private: + }; + + template +-class AgentCombiner { ++class AgentCombiner ++ : public std::enable_shared_from_this> { ++ + public: + typedef ResultTp result_type; + typedef ElementTp element_type; + typedef AgentCombiner self_type; ++ typedef std::shared_ptr self_shared_type; ++ typedef std::weak_ptr self_weak_type; + friend class GlobalValue; +- +- struct Agent : public butil::LinkNode { +- Agent() : combiner(NULL) {} + ++ struct Agent : public butil::LinkNode { + ~Agent() { +- if (combiner) { +- combiner->commit_and_erase(this); +- combiner = NULL; ++ self_shared_type c = combiner.lock(); ++ if (NULL != c) { ++ c->commit_and_erase(this); + } + } +- +- void reset(const ElementTp& val, self_type* c) { ++ ++ void reset(const ElementTp& val, self_shared_type c) { + combiner = c; + element.store(val); + } +@@ -181,11 +183,11 @@ friend class GlobalValue; + // void operator()(GlobalValue & global_value, + // ElementTp & local_value) const { + // if (test_for_merging(local_value)) { +- // ++ // + // // Unlock tls element and lock combiner. Obviously + // // tls element can be changed during lock(). + // ResultTp* g = global_value.lock(); +- // ++ // + // // *g and local_value are not changed provided + // // merge_global is called from the thread owning + // // the agent. +@@ -200,16 +202,23 @@ friend class GlobalValue; + // ... + // } + // }; +- // ++ // + // NOTE: Only available to non-atomic types. + template +- void merge_global(const Op &op) { +- GlobalValue g(this, combiner); +- element.merge_global(op, g); ++ void merge_global(const Op &op, self_shared_type c = NULL) { ++ if (NULL == c) { ++ c = combiner.lock(); ++ } ++ if (NULL != c) { ++ GlobalValue g(this, c.get()); ++ element.merge_global(op, g); ++ } + } + +- self_type *combiner; + ElementContainer element; ++ private: ++ friend class AgentCombiner; ++ self_weak_type combiner; + }; + + typedef detail::AgentGroup AgentGroup; +@@ -231,7 +240,7 @@ friend class GlobalValue; + _id = -1; + } + } +- ++ + // [Threadsafe] May be called from anywhere + ResultTp combine_agents() const { + ElementTp tls_value; +@@ -245,10 +254,10 @@ friend class GlobalValue; + return ret; + } + +- typename butil::add_cr_non_integral::type element_identity() const +- { return _element_identity; } +- typename butil::add_cr_non_integral::type result_identity() const +- { return _result_identity; } ++ typename butil::add_cr_non_integral::type ++ element_identity() const { return _element_identity; } ++ typename butil::add_cr_non_integral::type ++ result_identity() const { return _result_identity; } + + // [Threadsafe] May be called from anywhere. + ResultTp reset_all_agents() { +@@ -265,7 +274,7 @@ friend class GlobalValue; + } + + // Always called from the thread owning the agent. +- void commit_and_erase(Agent *agent) { ++ void commit_and_erase(Agent* agent) { + if (NULL == agent) { + return; + } +@@ -279,7 +288,7 @@ friend class GlobalValue; + } + + // Always called from the thread owning the agent +- void commit_and_clear(Agent *agent) { ++ void commit_and_clear(Agent* agent) { + if (NULL == agent) { + return; + } +@@ -290,7 +299,7 @@ friend class GlobalValue; + } + + // We need this function to be as fast as possible. +- inline Agent* get_or_create_tls_agent() { ++ Agent* get_or_create_tls_agent() { + Agent* agent = AgentGroup::get_tls_agent(_id); + if (!agent) { + // Create the agent +@@ -300,10 +309,10 @@ friend class GlobalValue; + return NULL; + } + } +- if (agent->combiner) { ++ if (!agent->combiner.expired()) { + return agent; + } +- agent->reset(_element_identity, this); ++ agent->reset(_element_identity, this->shared_from_this()); + // TODO: Is uniqueness-checking necessary here? + { + butil::AutoLock guard(_lock); +@@ -317,8 +326,7 @@ friend class GlobalValue; + // reseting agents is must because the agent object may be reused. + // Set element to be default-constructed so that if it's non-pod, + // internal allocations should be released. +- for (butil::LinkNode* +- node = _agents.head(); node != _agents.end();) { ++ for (butil::LinkNode* node = _agents.head(); node != _agents.end();) { + node->value()->reset(ElementTp(), NULL); + butil::LinkNode* const saved_next = node->next(); + node->RemoveFromList(); +diff --git a/src/bvar/detail/percentile.cpp b/src/bvar/detail/percentile.cpp +index e0412cb..37181cc 100644 +--- a/src/bvar/detail/percentile.cpp ++++ b/src/bvar/detail/percentile.cpp +@@ -85,9 +85,8 @@ private: + int64_t _latency; + }; + +-Percentile::Percentile() : _combiner(NULL), _sampler(NULL) { +- _combiner = new combiner_type; +-} ++Percentile::Percentile() ++ : _combiner(std::make_shared()), _sampler(NULL) {} + + Percentile::~Percentile() { + // Have to destroy sampler first to avoid the race between destruction and +@@ -96,7 +95,6 @@ Percentile::~Percentile() { + _sampler->destroy(); + _sampler = NULL; + } +- delete _combiner; + } + + Percentile::value_type Percentile::reset() { +@@ -126,7 +124,7 @@ Percentile &Percentile::operator<<(int64_t latency) { + } + return *this; + } +- agent->merge_global(AddLatency(latency)); ++ agent->merge_global(AddLatency(latency), _combiner); + return *this; + } + +diff --git a/src/bvar/detail/percentile.h b/src/bvar/detail/percentile.h +index f04268d..3680178 100644 +--- a/src/bvar/detail/percentile.h ++++ b/src/bvar/detail/percentile.h +@@ -462,6 +462,7 @@ public: + typedef AgentCombiner combiner_type; ++ typedef typename combiner_type::self_shared_type shared_combiner_type; + typedef combiner_type::Agent agent_type; + Percentile(); + ~Percentile(); +@@ -484,7 +485,7 @@ public: + + Percentile& operator<<(int64_t latency); + +- bool valid() const { return _combiner != NULL && _combiner->valid(); } ++ bool valid() const { return _combiner && _combiner->valid(); } + + // This name is useful for warning negative latencies in operator<< + void set_debug_name(const butil::StringPiece& name) { +@@ -494,8 +495,8 @@ public: + private: + DISALLOW_COPY_AND_ASSIGN(Percentile); + +- combiner_type* _combiner; +- sampler_type* _sampler; ++ shared_combiner_type _combiner; ++ sampler_type* _sampler; + std::string _debug_name; + }; + +diff --git a/src/bvar/recorder.h b/src/bvar/recorder.h +index 9b73a19..c2c18bd 100644 +--- a/src/bvar/recorder.h ++++ b/src/bvar/recorder.h +@@ -113,9 +113,10 @@ public: + }; + + typedef detail::AgentCombiner combiner_type; ++ typedef typename combiner_type::self_shared_type shared_combiner_type; + typedef combiner_type::Agent agent_type; + +- IntRecorder() : _sampler(NULL) {} ++ IntRecorder() : _combiner(std::make_shared()), _sampler(NULL) {} + + explicit IntRecorder(const butil::StringPiece& name) : _sampler(NULL) { + expose(name); +@@ -126,7 +127,7 @@ public: + expose_as(prefix, name); + } + +- ~IntRecorder() { ++ ~IntRecorder() override { + hide(); + if (_sampler) { + _sampler->destroy(); +@@ -138,19 +139,19 @@ public: + IntRecorder& operator<<(int64_t/*note*/ sample); + + int64_t average() const { +- return _combiner.combine_agents().get_average_int(); ++ return _combiner->combine_agents().get_average_int(); + } + + double average(double) const { +- return _combiner.combine_agents().get_average_double(); ++ return _combiner->combine_agents().get_average_double(); + } + + Stat get_value() const { +- return _combiner.combine_agents(); ++ return _combiner->combine_agents(); + } + + Stat reset() { +- return _combiner.reset_all_agents(); ++ return _combiner->reset_all_agents(); + } + + AddStat op() const { return AddStat(); } +@@ -160,7 +161,7 @@ public: + os << get_value(); + } + +- bool valid() const { return _combiner.valid(); } ++ bool valid() const { return _combiner->valid(); } + + sampler_type* get_sampler() { + if (NULL == _sampler) { +@@ -230,7 +231,7 @@ private: + } + + private: +- combiner_type _combiner; ++ shared_combiner_type _combiner; + sampler_type* _sampler; + std::string _debug_name; + }; +@@ -258,7 +259,7 @@ inline IntRecorder& IntRecorder::operator<<(int64_t sample) { + << (void*)this << ") " << reason; + } + } +- agent_type* agent = _combiner.get_or_create_tls_agent(); ++ agent_type* agent = _combiner->get_or_create_tls_agent(); + if (BAIDU_UNLIKELY(!agent)) { + LOG(FATAL) << "Fail to create agent"; + return *this; +@@ -276,7 +277,7 @@ inline IntRecorder& IntRecorder::operator<<(int64_t sample) { + // Although agent->element might have been cleared at this + // point, it is just OK because the very value is 0 in + // this case +- agent->combiner->commit_and_clear(agent); ++ _combiner->commit_and_clear(agent); + sum = 0; + num = 0; + n = 0; +diff --git a/src/bvar/reducer.h b/src/bvar/reducer.h +index fbd4fa7..ccf7805 100644 +--- a/src/bvar/reducer.h ++++ b/src/bvar/reducer.h +@@ -69,13 +69,13 @@ template + class Reducer : public Variable { + public: + typedef typename detail::AgentCombiner combiner_type; ++ typedef typename combiner_type::self_shared_type shared_combiner_type; + typedef typename combiner_type::Agent agent_type; + typedef detail::ReducerSampler sampler_type; + class SeriesSampler : public detail::Sampler { + public: + SeriesSampler(Reducer* owner, const Op& op) + : _owner(owner), _series(op) {} +- ~SeriesSampler() {} + void take_sample() override { _series.append(_owner->get_value()); } + void describe(std::ostream& os) { _series.describe(os, NULL); } + private: +@@ -85,16 +85,12 @@ public: + + public: + // The `identify' must satisfy: identity Op a == a +- Reducer(typename butil::add_cr_non_integral::type identity = T(), +- const Op& op = Op(), +- const InvOp& inv_op = InvOp()) +- : _combiner(identity, identity, op) +- , _sampler(NULL) +- , _series_sampler(NULL) +- , _inv_op(inv_op) { +- } ++ explicit Reducer(typename butil::add_cr_non_integral::type identity = T(), ++ const Op& op = Op(), const InvOp& inv_op = InvOp()) ++ : _combiner(std::make_shared(identity, identity, op)) ++ , _sampler(NULL) , _series_sampler(NULL) , _inv_op(inv_op) {} + +- ~Reducer() { ++ ~Reducer() override { + // Calling hide() manually is a MUST required by Variable. + hide(); + if (_sampler) { +@@ -119,13 +115,13 @@ public: + << "You should not call Reducer<" << butil::class_name_str() + << ", " << butil::class_name_str() << ">::get_value() when a" + << " Window<> is used because the operator does not have inverse."; +- return _combiner.combine_agents(); ++ return _combiner->combine_agents(); + } + + + // Reset the reduced value to T(). + // Returns the reduced value before reset. +- T reset() { return _combiner.reset_all_agents(); } ++ T reset() { return _combiner->reset_all_agents(); } + + void describe(std::ostream& os, bool quote_string) const override { + if (butil::is_same::value && quote_string) { +@@ -140,10 +136,10 @@ public: + #endif + + // True if this reducer is constructed successfully. +- bool valid() const { return _combiner.valid(); } ++ bool valid() const { return _combiner->valid(); } + + // Get instance of Op. +- const Op& op() const { return _combiner.op(); } ++ const Op& op() const { return _combiner->op(); } + const InvOp& inv_op() const { return _inv_op; } + + sampler_type* get_sampler() { +@@ -174,14 +170,14 @@ protected: + !butil::is_same::value && + !butil::is_same::value && + FLAGS_save_series) { +- _series_sampler = new SeriesSampler(this, _combiner.op()); ++ _series_sampler = new SeriesSampler(this, _combiner->op()); + _series_sampler->schedule(); + } + return rc; + } + + private: +- combiner_type _combiner; ++ shared_combiner_type _combiner; + sampler_type* _sampler; + SeriesSampler* _series_sampler; + InvOp _inv_op; +@@ -191,12 +187,12 @@ template + inline Reducer& Reducer::operator<<( + typename butil::add_cr_non_integral::type value) { + // It's wait-free for most time +- agent_type* agent = _combiner.get_or_create_tls_agent(); ++ agent_type* agent = _combiner->get_or_create_tls_agent(); + if (__builtin_expect(!agent, 0)) { + LOG(FATAL) << "Fail to create agent"; + return *this; + } +- agent->element.modify(_combiner.op(), value); ++ agent->element.modify(_combiner->op(), value); + return *this; + } + From 2fcd368c5cb61249dfb7bafdd32a1d5cab57d422 Mon Sep 17 00:00:00 2001 From: Venkata Kaushik Chaganti Date: Thu, 25 Jun 2026 17:32:01 -0700 Subject: [PATCH 2/2] backporting apache/brpc#3066 --- ...4.0-fix-agent-combiner-thread-safety.patch | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch b/thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch index bac44588e8f429..b7d0a9a93dfc79 100644 --- a/thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch +++ b/thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch @@ -59,7 +59,7 @@ index 6a6ab80..65ed9a1 100644 - - void reset(const ElementTp& val, self_type* c) { + -+ void reset(const ElementTp& val, self_shared_type c) { ++ void reset(const ElementTp& val, const self_shared_type& c) { combiner = c; element.store(val); } @@ -249,22 +249,27 @@ diff --git a/src/bvar/recorder.h b/src/bvar/recorder.h index 9b73a19..c2c18bd 100644 --- a/src/bvar/recorder.h +++ b/src/bvar/recorder.h -@@ -113,9 +113,10 @@ public: +@@ -113,20 +113,21 @@ public: }; - + typedef detail::AgentCombiner combiner_type; + typedef typename combiner_type::self_shared_type shared_combiner_type; typedef combiner_type::Agent agent_type; - + - IntRecorder() : _sampler(NULL) {} + IntRecorder() : _combiner(std::make_shared()), _sampler(NULL) {} - - explicit IntRecorder(const butil::StringPiece& name) : _sampler(NULL) { + +- explicit IntRecorder(const butil::StringPiece& name) : _sampler(NULL) { ++ explicit IntRecorder(const butil::StringPiece& name) : IntRecorder() { expose(name); -@@ -126,7 +127,7 @@ public: + } + + IntRecorder(const butil::StringPiece& prefix, const butil::StringPiece& name) +- : _sampler(NULL) { ++ : IntRecorder() { expose_as(prefix, name); } - + - ~IntRecorder() { + ~IntRecorder() override { hide();