From b5fab0becc382c744837f5fb732d1d500c36793d Mon Sep 17 00:00:00 2001 From: chenBright Date: Sun, 12 Oct 2025 16:15:38 +0800 Subject: [PATCH] Bugfix: TaskTracer deadlocks due to ABA problem --- src/bthread/task_meta.h | 4 ++ src/bthread/task_tracer.cpp | 92 +++++++++--------------- src/bthread/task_tracer.h | 8 +-- test/brpc_http_rpc_protocol_unittest.cpp | 4 +- 4 files changed, 41 insertions(+), 67 deletions(-) diff --git a/src/bthread/task_meta.h b/src/bthread/task_meta.h index a4ed42bf97..1b77c0b601 100644 --- a/src/bthread/task_meta.h +++ b/src/bthread/task_meta.h @@ -112,6 +112,8 @@ struct TaskMeta { TaskStatus status{TASK_STATUS_UNKNOWN}; // Whether bthread is traced? bool traced{false}; + // [Not Reset] guarantee tracing completion before jumping. + pthread_mutex_t trace_lock{}; // Worker thread id. pthread_t worker_tid{}; @@ -122,9 +124,11 @@ struct TaskMeta { pthread_spin_init(&version_lock, 0); version_butex = butex_create_checked(); *version_butex = 1; + pthread_mutex_init(&trace_lock, NULL); } ~TaskMeta() { + pthread_mutex_destroy(&trace_lock); butex_destroy(version_butex); version_butex = NULL; pthread_spin_destroy(&version_lock); diff --git a/src/bthread/task_tracer.cpp b/src/bthread/task_tracer.cpp index 415670aa71..2602fe3113 100644 --- a/src/bthread/task_tracer.cpp +++ b/src/bthread/task_tracer.cpp @@ -119,6 +119,7 @@ void TaskTracer::Result::OutputToStream(std::ostream& os) const { bool TaskTracer::Init() { if (_trace_time.expose("bthread_trace_time") != 0) { + LOG(ERROR) << "Fail to expose bthread_trace_time"; return false; } if (!RegisterSignalHandler()) { @@ -136,7 +137,7 @@ void TaskTracer::set_status(TaskStatus s, TaskMeta* m) { CHECK_NE(TASK_STATUS_RUNNING, s) << "Use `set_running_status' instead"; CHECK_NE(TASK_STATUS_END, s) << "Use `set_end_status_unsafe' instead"; - bool tracing; + bool tracing = false; { BAIDU_SCOPED_LOCK(m->version_lock); if (TASK_STATUS_UNKNOWN == m->status && TASK_STATUS_JUMPING == s) { @@ -182,31 +183,8 @@ void TaskTracer::Trace(std::ostream& os, bthread_t tid) { } void TaskTracer::WaitForTracing(TaskMeta* m) { - BAIDU_SCOPED_LOCK(_mutex); - while (m->traced) { - _cond.Wait(); - } -} - -TaskStatus TaskTracer::WaitForJumping(TaskMeta* m) { - // Reasons for not using locks here: - // 1. It is necessary to lock before jump_stack, unlock after jump_stack, - // which involves two different bthread and is prone to errors. - // 2. jump_stack is fast. - int i = 0; - do { - // The bthread is jumping now, spin until it finishes. - if (i++ < 30) { - cpu_relax(); - } else { - sched_yield(); - } - - BAIDU_SCOPED_LOCK(m->version_lock); - if (TASK_STATUS_JUMPING != m->status) { - return m->status; - } - } while (true); + BAIDU_SCOPED_LOCK(m->trace_lock); + // Acquiring trace_lock means tracing is done. } TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) { @@ -224,25 +202,44 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) { // Make sure only one bthread is traced at a time. BAIDU_SCOPED_LOCK(_trace_request_mutex); + // The chance to remove unused SignalSyncs. + auto iter = std::remove_if( + _inuse_signal_syncs.begin(), _inuse_signal_syncs.end(), + [](butil::intrusive_ptr& sync) { + return sync->ref_count() == 1; + }); + _inuse_signal_syncs.erase(iter, _inuse_signal_syncs.end()); + TaskMeta* m = TaskGroup::address_meta(tid); if (NULL == m) { return Result::MakeErrorResult("bthread=%d never existed", tid); } - BAIDU_SCOPED_LOCK(_mutex); + BAIDU_SCOPED_LOCK(m->trace_lock); TaskStatus status; pthread_t worker_tid; const uint32_t given_version = get_version(tid); { BAIDU_SCOPED_LOCK(m->version_lock); - if (given_version == *m->version_butex) { - // Start tracing. - m->traced = true; - worker_tid = m->worker_tid; - status = m->status; - } else { + if (given_version != *m->version_butex) { return Result::MakeErrorResult("bthread=%d not exist now", tid); } + + status = m->status; + if (TASK_STATUS_UNKNOWN == status) { + return Result::MakeErrorResult("bthread=%d not exist now", tid); + } else if (TASK_STATUS_CREATED == status) { + return Result::MakeErrorResult("bthread=%d has just been created", tid); + } else if (TASK_STATUS_FIRST_READY == status) { + return Result::MakeErrorResult("bthread=%d is scheduled for the first time", tid); + } else if (TASK_STATUS_END == status) { + return Result::MakeErrorResult("bthread=%d has ended", tid); + } else if (TASK_STATUS_JUMPING == status) { + return Result::MakeErrorResult("bthread=%d is jumping stack", tid); + } + // Start tracing. + m->traced = true; + worker_tid = m->worker_tid; } BRPC_SCOPE_EXIT { @@ -252,31 +249,16 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) { // tracing completion, so given_version != *m->version_butex is OK. m->traced = false; } - // Wake up the waiting worker thread to jump. - _cond.Signal(); }; - if (TASK_STATUS_UNKNOWN == status) { - return Result::MakeErrorResult("bthread=%d not exist now", tid); - } else if (TASK_STATUS_CREATED == status) { - return Result::MakeErrorResult("bthread=%d has just been created", tid); - } else if (TASK_STATUS_FIRST_READY == status) { - return Result::MakeErrorResult("bthread=%d is scheduled for the first time", tid); - } else if (TASK_STATUS_END == status) { - return Result::MakeErrorResult("bthread=%d has ended", tid); - } else if (TASK_STATUS_JUMPING == status) { - // Wait for jumping completion. - status = WaitForJumping(m); - } - - // After jumping, the status may be RUNNING, SUSPENDED, or READY, which is traceable. + // The status may be RUNNING, SUSPENDED, or READY, which is traceable. if (TASK_STATUS_RUNNING == status) { return SignalTrace(worker_tid); } else if (TASK_STATUS_SUSPENDED == status || TASK_STATUS_READY == status) { return ContextTrace(m->stack->context); } - return Result::MakeErrorResult("Invalid TaskStatus=%d", status); + return Result::MakeErrorResult("Invalid TaskStatus=%d of bthread=%d", status, tid); } // Instruct ASan to ignore this function. @@ -408,14 +390,6 @@ TaskTracer::Result TaskTracer::SignalTrace(pthread_t worker_tid) { return Result::MakeErrorResult("Forbid to trace self"); } - // Remove unused SignalSyncs. - auto iter = std::remove_if( - _inuse_signal_syncs.begin(), _inuse_signal_syncs.end(), - [](butil::intrusive_ptr& sync) { - return sync->ref_count() == 1; - }); - _inuse_signal_syncs.erase(iter, _inuse_signal_syncs.end()); - // Each signal trace has an independent SignalSync to // prevent the previous SignalHandler from affecting the new SignalTrace. butil::intrusive_ptr signal_sync(new SignalSync()); @@ -465,6 +439,8 @@ TaskTracer::Result TaskTracer::SignalTrace(pthread_t worker_tid) { } break; } + // Remove the successful SignalSync. + _inuse_signal_syncs.pop_back(); return signal_sync->result; } diff --git a/src/bthread/task_tracer.h b/src/bthread/task_tracer.h index 0888c658f0..be95f3ac80 100644 --- a/src/bthread/task_tracer.h +++ b/src/bthread/task_tracer.h @@ -47,7 +47,7 @@ class TaskTracer { void Trace(std::ostream& os, bthread_t tid); // When the worker is jumping stack from a bthread to another, - void WaitForTracing(TaskMeta* m); + static void WaitForTracing(TaskMeta* m); private: // Error number guard used in signal handler. @@ -94,7 +94,6 @@ class TaskTracer { Result result; }; - static TaskStatus WaitForJumping(TaskMeta* m); Result TraceImpl(bthread_t tid); unw_cursor_t MakeCursor(bthread_fcontext_t fcontext); @@ -108,11 +107,6 @@ class TaskTracer { // Make sure only one bthread is traced at a time. Mutex _trace_request_mutex; - // For signal trace. - // Make sure bthread does not jump stack when it is being traced. - butil::Mutex _mutex; - butil::ConditionVariable _cond{&_mutex}; - // For context trace. unw_context_t _context{}; diff --git a/test/brpc_http_rpc_protocol_unittest.cpp b/test/brpc_http_rpc_protocol_unittest.cpp index 5a6839c86f..f13c6877f7 100644 --- a/test/brpc_http_rpc_protocol_unittest.cpp +++ b/test/brpc_http_rpc_protocol_unittest.cpp @@ -1701,9 +1701,9 @@ TEST_F(HttpTest, spring_protobuf_content_type) { res.Clear(); cntl2.http_request().set_content_type("application/x-protobuf"); stub.Echo(&cntl2, &req, &res, nullptr); - ASSERT_FALSE(cntl.Failed()); + ASSERT_FALSE(cntl2.Failed()); ASSERT_EQ(EXP_RESPONSE, res.message()); - ASSERT_EQ("application/x-protobuf", cntl.http_response().content_type()); + ASSERT_EQ("application/x-protobuf", cntl2.http_response().content_type()); } TEST_F(HttpTest, dump_http_request) {