From 15940100fd697241ff40e1a40d15787464551660 Mon Sep 17 00:00:00 2001 From: Zhengwei Zhu Date: Fri, 5 Sep 2025 16:57:02 +0800 Subject: [PATCH] Support naming bthread to help debug The bthread name is shown when checking bthread status by curl ip:port/bthreads/xxx, which helps to debug when bthread trace is not enabled. --- src/brpc/acceptor.cpp | 1 + src/brpc/controller.cpp | 1 + src/brpc/event_dispatcher_epoll.cpp | 1 + src/brpc/event_dispatcher_kqueue.cpp | 1 + src/brpc/global.cpp | 4 +++- src/brpc/input_messenger.cpp | 17 +++++++++-------- src/brpc/periodic_task.cpp | 4 +++- src/brpc/rdma/rdma_endpoint.cpp | 9 +++++++-- src/brpc/server.cpp | 1 + src/brpc/socket.cpp | 13 ++++++++++--- src/brpc/socket_map.cpp | 4 +++- src/bthread/bthread.cpp | 7 +++++++ src/bthread/fd.cpp | 4 +++- src/bthread/task_group.cpp | 3 ++- src/bthread/types.h | 14 +++++++++----- 15 files changed, 61 insertions(+), 23 deletions(-) diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp index 616c1a3044..fd6564c987 100644 --- a/src/brpc/acceptor.cpp +++ b/src/brpc/acceptor.cpp @@ -78,6 +78,7 @@ int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec, if (idle_timeout_sec > 0) { bthread_attr_t tmp = BTHREAD_ATTR_NORMAL; tmp.tag = _bthread_tag; + bthread_attr_set_name(&tmp, "CloseIdleConnections"); if (bthread_start_background(&_close_idle_tid, &tmp, CloseIdleConnections, this) != 0) { LOG(FATAL) << "Fail to start bthread"; return -1; diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index d4dbab951b..b30a13476e 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -743,6 +743,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, bthread_t bt; bthread_attr_t attr = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); + bthread_attr_set_name(&attr, "RunEndRPC"); _tmp_completion_info = info; if (bthread_start_background(&bt, &attr, RunEndRPC, this) != 0) { LOG(FATAL) << "Fail to start bthread"; diff --git a/src/brpc/event_dispatcher_epoll.cpp b/src/brpc/event_dispatcher_epoll.cpp index 5a4a6370f7..5a6c23b0e5 100644 --- a/src/brpc/event_dispatcher_epoll.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -78,6 +78,7 @@ int EventDispatcher::Start(const bthread_attr_t* thread_attr) { // Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY. bthread_attr_t epoll_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT | BTHREAD_GLOBAL_PRIORITY; + bthread_attr_set_name(&epoll_thread_attr, "EventDispatcher::RunThis"); // Polling thread uses the same attr for consumer threads (NORMAL right // now). Previously, we used small stack (32KB) which may be overflowed diff --git a/src/brpc/event_dispatcher_kqueue.cpp b/src/brpc/event_dispatcher_kqueue.cpp index 48b2814798..f73e62004f 100644 --- a/src/brpc/event_dispatcher_kqueue.cpp +++ b/src/brpc/event_dispatcher_kqueue.cpp @@ -78,6 +78,7 @@ int EventDispatcher::Start(const bthread_attr_t* thread_attr) { // Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY. bthread_attr_t kqueue_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT | BTHREAD_GLOBAL_PRIORITY; + bthread_attr_set_name(&kqueue_thread_attr, "EventDispatcher::RunThis"); // Polling thread uses the same attr for consumer threads (NORMAL right // now). Previously, we used small stack (32KB) which may be overflowed diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index 0196b6d008..af6b425289 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -635,7 +635,9 @@ static void GlobalInitializeOrDieImpl() { // We never join GlobalUpdate, let it quit with the process. bthread_t th; - CHECK(bthread_start_background(&th, NULL, GlobalUpdate, NULL) == 0) + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + bthread_attr_set_name(&attr, "GlobalUpdate"); + CHECK(bthread_start_background(&th, &attr, GlobalUpdate, NULL) == 0) << "Fail to start GlobalUpdate"; } diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index 45ececbc66..1b8a86f2c6 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -197,6 +197,13 @@ static void QueueMessage(InputMessageBase* to_run_msg, if (!to_run_msg) { return; } + +#if BRPC_WITH_RDMA + if (rdma::FLAGS_rdma_disable_bthread) { + ProcessInputMessage(to_run_msg); + return; + } +#endif // Create bthread for last_msg. The bthread is not scheduled // until bthread_flush() is called (in the worse case). @@ -207,14 +214,8 @@ static void QueueMessage(InputMessageBase* to_run_msg, BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL; tmp.keytable_pool = keytable_pool; tmp.tag = bthread_self_tag(); - -#if BRPC_WITH_RDMA - if (rdma::FLAGS_rdma_disable_bthread) { - ProcessInputMessage(to_run_msg); - return; - } -#endif - + bthread_attr_set_name(&tmp, "ProcessInputMessage"); + if (!FLAGS_usercode_in_coroutine && bthread_start_background( &th, &tmp, ProcessInputMessage, to_run_msg) == 0) { ++*num_bthread_created; diff --git a/src/brpc/periodic_task.cpp b/src/brpc/periodic_task.cpp index 27ea3ec310..3ba9c0ec59 100644 --- a/src/brpc/periodic_task.cpp +++ b/src/brpc/periodic_task.cpp @@ -38,8 +38,10 @@ static void* PeriodicTaskThread(void* arg) { static void RunPeriodicTaskThread(void* arg) { bthread_t th = 0; + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + bthread_attr_set_name(&attr, "PeriodicTaskThread"); int rc = bthread_start_background( - &th, &BTHREAD_ATTR_NORMAL, PeriodicTaskThread, arg); + &th, &attr, PeriodicTaskThread, arg); if (rc != 0) { LOG(ERROR) << "Fail to start PeriodicTaskThread"; static_cast(arg)->OnDestroyingTask(); diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 1d502a98f7..5176756510 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -250,7 +250,9 @@ void RdmaConnect::StartConnect(const Socket* socket, _done = done; _data = data; bthread_t tid; - if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL, + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + bthread_attr_set_name(&attr, "RdmaProcessHandshakeAtClient"); + if (bthread_start_background(&tid, &attr, RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) { LOG(FATAL) << "Fail to start handshake bthread"; } else { @@ -309,7 +311,9 @@ void RdmaEndpoint::OnNewDataFromTcp(Socket* m) { ep->_state = S_HELLO_WAIT; SocketUniquePtr s; m->ReAddress(&s); - if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL, + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + bthread_attr_set_name(&attr, "RdmaProcessHandshakeAtServer"); + if (bthread_start_background(&tid, &attr, ProcessHandshakeAtServer, ep) < 0) { ep->_state = UNINIT; LOG(FATAL) << "Fail to start handshake bthread"; @@ -1616,6 +1620,7 @@ int RdmaEndpoint::PollingModeInitialize(bthread_tag_t tag, auto attr = FLAGS_rdma_disable_bthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; attr.tag = tag; + bthread_attr_set_name(&attr, "RdmaPolling"); pollers[i].callback = callback; pollers[i].init_fn = init_fn; pollers[i].release_fn = release_fn; diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index cd83053a42..a82817be8b 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -1236,6 +1236,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint, CHECK_EQ(INVALID_BTHREAD, _derivative_thread); bthread_attr_t tmp = BTHREAD_ATTR_NORMAL; tmp.tag = _options.bthread_tag; + bthread_attr_set_name(&tmp, "UpdateDerivedVars"); if (bthread_start_background(&_derivative_thread, &tmp, UpdateDerivedVars, this) != 0) { LOG(ERROR) << "Fail to create _derivative_thread"; diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 73ea309a71..ec5300987c 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -1491,8 +1491,10 @@ void Socket::AfterAppConnected(int err, void* data) { // requests are not setup yet. check the comment on Setup() in Write() req->Setup(s); bthread_t th; + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + bthread_attr_set_name(&attr, "KeepWrite"); if (bthread_start_background( - &th, &BTHREAD_ATTR_NORMAL, KeepWrite, req) != 0) { + &th, &attr, KeepWrite, req) != 0) { PLOG(WARNING) << "Fail to start KeepWrite"; KeepWrite(req); } @@ -1532,7 +1534,9 @@ int Socket::KeepWriteIfConnected(int fd, int err, void* data) { bthread_t th; std::unique_ptr thrd_func(brpc::NewCallback( Socket::CheckConnectedAndKeepWrite, fd, err, data)); - if ((err = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + bthread_attr_set_name(&attr, "CheckConnectedAndKeepWrite"); + if ((err = bthread_start_background(&th, &attr, RunClosure, thrd_func.get())) == 0) { thrd_func.release(); return 0; @@ -1705,6 +1709,8 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) { int saved_errno = 0; bthread_t th; + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + bthread_attr_set_name(&attr, "KeepWrite"); SocketUniquePtr ptr_for_keep_write; ssize_t nw = 0; int ret = 0; @@ -1779,7 +1785,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) { KEEPWRITE_IN_BACKGROUND: ReAddress(&ptr_for_keep_write); req->set_socket(ptr_for_keep_write.release()); - if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, + if (bthread_start_background(&th, &attr, KeepWrite, req) != 0) { LOG(FATAL) << "Fail to start KeepWrite"; KeepWrite(req); @@ -2266,6 +2272,7 @@ int Socket::OnInputEvent(void* user_data, uint32_t events, bthread_attr_t attr = thread_attr; attr.keytable_pool = p->_keytable_pool; attr.tag = bthread_self_tag(); + bthread_attr_set_name(&attr, "ProcessEvent"); if (FLAGS_usercode_in_coroutine) { ProcessEvent(p); #if BRPC_WITH_RDMA diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index c5c94bc747..14bea71db5 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -190,7 +190,9 @@ int SocketMap::Init(const SocketMapOptions& options) { } if (_options.idle_timeout_second_dynamic != NULL || _options.idle_timeout_second > 0) { - if (bthread_start_background(&_close_idle_thread, NULL, + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + bthread_attr_set_name(&attr, "RunWatchConnections"); + if (bthread_start_background(&_close_idle_thread, &attr, RunWatchConnections, this) != 0) { LOG(FATAL) << "Fail to start bthread"; return -1; diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index 085d814df6..ac49f269d9 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -669,3 +669,10 @@ uint64_t bthread_cpu_clock_ns(void) { } } // extern "C" + +void bthread_attr_set_name(bthread_attr_t* attr, const char* name) { + if (attr) { + strncpy(attr->name, name, BTHREAD_NAME_MAX_LENGTH); + attr->name[BTHREAD_NAME_MAX_LENGTH] = '\0'; + } +} diff --git a/src/bthread/fd.cpp b/src/bthread/fd.cpp index b65dca4838..17ca63dcfe 100644 --- a/src/bthread/fd.cpp +++ b/src/bthread/fd.cpp @@ -141,8 +141,10 @@ class EpollThread { PLOG(FATAL) << "Fail to epoll_create/kqueue"; return -1; } + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + bthread_attr_set_name(&attr, "EpollThread::run_this"); if (bthread_start_background( - &_tid, NULL, EpollThread::run_this, this) != 0) { + &_tid, &attr, EpollThread::run_this, this) != 0) { close(_epfd); _epfd = -1; LOG(FATAL) << "Fail to create epoll bthread"; diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 40daaa1ace..c577b64b16 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -49,7 +49,7 @@ namespace bthread { static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = { - BTHREAD_STACKTYPE_UNKNOWN, 0, NULL, BTHREAD_TAG_INVALID }; + BTHREAD_STACKTYPE_UNKNOWN, 0, NULL, BTHREAD_TAG_INVALID, {0} }; DEFINE_bool(show_bthread_creation_in_vars, false, "When this flags is on, The time " "from bthread creation to first run will be recorded and shown in /vars"); @@ -1141,6 +1141,7 @@ void print_task(std::ostream& os, bthread_t tid, bool enable_trace, << "\nattr={stack_type=" << attr.stack_type << " flags=" << attr.flags << " specified_tag=" << attr.tag + << " name=" << attr.name << " keytable_pool=" << attr.keytable_pool << "}\nhas_tls=" << has_tls << "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns diff --git a/src/bthread/types.h b/src/bthread/types.h index a09c2e3817..86148c938b 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -97,12 +97,14 @@ typedef struct { size_t nfree; } bthread_keytable_pool_stat_t; +static const size_t BTHREAD_NAME_MAX_LENGTH = 31; // Attributes for thread creation. typedef struct bthread_attr_t { bthread_stacktype_t stack_type; bthread_attrflags_t flags; bthread_keytable_pool_t* keytable_pool; bthread_tag_t tag; + char name[BTHREAD_NAME_MAX_LENGTH + 1]; // do not use std::string to keep POD #if defined(__cplusplus) void operator=(unsigned stacktype_and_flags) { @@ -120,6 +122,8 @@ typedef struct bthread_attr_t { #endif // __cplusplus } bthread_attr_t; +void bthread_attr_set_name(bthread_attr_t* attr, const char* name); + // bthreads started with this attribute will run on stack of worker pthread and // all bthread functions that would block the bthread will block the pthread. // The bthread will not allocate its own stack, simply occupying a little meta @@ -127,22 +131,22 @@ typedef struct bthread_attr_t { // obvious drawback is that you need more worker pthreads when you have a lot // of such bthreads. static const bthread_attr_t BTHREAD_ATTR_PTHREAD = -{ BTHREAD_STACKTYPE_PTHREAD, 0, NULL, BTHREAD_TAG_INVALID }; +{ BTHREAD_STACKTYPE_PTHREAD, 0, NULL, BTHREAD_TAG_INVALID, {0} }; // bthreads created with following attributes will have different size of // stacks. Default is BTHREAD_ATTR_NORMAL. static const bthread_attr_t BTHREAD_ATTR_SMALL = {BTHREAD_STACKTYPE_SMALL, 0, NULL, - BTHREAD_TAG_INVALID}; + BTHREAD_TAG_INVALID, {0}}; static const bthread_attr_t BTHREAD_ATTR_NORMAL = {BTHREAD_STACKTYPE_NORMAL, 0, NULL, - BTHREAD_TAG_INVALID}; + BTHREAD_TAG_INVALID, {0}}; static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0, NULL, - BTHREAD_TAG_INVALID}; + BTHREAD_TAG_INVALID, {0}}; // bthreads created with this attribute will print log when it's started, // context-switched, finished. static const bthread_attr_t BTHREAD_ATTR_DEBUG = { BTHREAD_STACKTYPE_NORMAL, BTHREAD_LOG_START_AND_FINISH | BTHREAD_LOG_CONTEXT_SWITCH, NULL, - BTHREAD_TAG_INVALID}; + BTHREAD_TAG_INVALID, {0}}; static const size_t BTHREAD_EPOLL_THREAD_NUM = 1; static const bthread_t BTHREAD_ATOMIC_INIT = 0;