From bee2323dc3fe9fee6b9d61259d21c93374a93178 Mon Sep 17 00:00:00 2001 From: m30070657 Date: Thu, 30 Oct 2025 19:53:54 +0800 Subject: [PATCH 1/3] Add pthread CPU affinity support --- src/bthread/task_control.cpp | 62 ++++++++++++++++++++++++++++++++++++ src/bthread/task_control.h | 33 +++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index e43ce1ece9..c6b81e8b53 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -42,6 +42,9 @@ DEFINE_int32(task_group_runqueue_capacity, 4096, DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags"); DEFINE_bool(task_group_set_worker_name, true, "Whether to set the name of the worker thread"); +DEFINE_bool(thread_affinity, false, "Whether to Bind Cores"); +DEFINE_string(cpu_set, "", + "Set of CPUs to which cores are bound, for example, 0-3,5,6-7; default: all"); namespace bthread { @@ -58,6 +61,7 @@ extern pthread_mutex_t g_task_control_mutex; extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; void (*g_worker_startfn)() = NULL; void (*g_tagged_worker_startfn)(bthread_tag_t) = NULL; +std::vector TaskControl::_cpus; // May be called in other modules to run startfn in non-worker pthreads. void run_worker_startfn() { @@ -74,8 +78,17 @@ void run_tagged_worker_startfn(bthread_tag_t tag) { struct WorkerThreadArgs { WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {} + + WorkerThreadArgs* set_cpuId(unsigned _cpuId) { + if (FLAGS_thread_affinity) { + cpuId = _cpuId; + } + return this; + } + TaskControl* c; bthread_tag_t tag; + unsigned cpuId; }; void* TaskControl::worker_thread(void* arg) { @@ -87,6 +100,9 @@ void* TaskControl::worker_thread(void* arg) { auto dummy = static_cast(arg); auto c = dummy->c; auto tag = dummy->tag; + if (FLAGS_thread_affinity) { + bind_thread(pthread_self(), _cpus[dummy->cpuId]); + } delete dummy; run_tagged_worker_startfn(tag); @@ -209,6 +225,13 @@ int TaskControl::init(int concurrency) { } _concurrency = concurrency; + if (FLAGS_thread_affinity) { + if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1 || _cpus.empty()) { + LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set; + return -1; + } + } + // task group group by tags for (int i = 0; i < FLAGS_task_group_ntags; ++i) { _tagged_ngroup[i].store(0, std::memory_order_relaxed); @@ -241,6 +264,7 @@ int TaskControl::init(int concurrency) { _workers.resize(_concurrency); for (int i = 0; i < _concurrency; ++i) { auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags); + arg->set_cpuId(i % _cpus.size()); const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg); if (rc) { delete arg; @@ -284,6 +308,7 @@ int TaskControl::add_workers(int num, bthread_tag_t tag) { // _concurrency before create a worker. _concurrency.fetch_add(1); auto arg = new WorkerThreadArgs(this, tag); + arg->set_cpuId((i + old_concurency) % _cpus.size()); const int rc = pthread_create( &_workers[i + old_concurency], NULL, worker_thread, arg); if (rc) { @@ -309,6 +334,43 @@ TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) { return NULL; } +int TaskControl::parse_cpuset(std::string value, std::vector& cpus) { + static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*"); + std::smatch match; + std::set cpuset; + if (value.empty()) { + cpus = get_current_cpus(); + return 0; + } + if (std::regex_match(value, match, r)) { + for (butil::StringSplitter split(value.data(), ','); split; ++split) { + butil::StringPiece cpuIds(split.field(), split.length()); + cpuIds.trim_spaces(); + butil::StringPiece begin = cpuIds; + butil::StringPiece end = cpuIds; + auto dash = cpuIds.find('-'); + if (dash != cpuIds.npos) { + begin = cpuIds.substr(0, dash); + end = cpuIds.substr(dash + 1); + } + unsigned first = UINT_MAX; + unsigned last = 0; + int ret; + ret = butil::StringSplitter(begin, '\t').to_uint(&first); + ret = ret | butil::StringSplitter(end, '\t').to_uint(&last); + if (ret != 0 || first > last) { + return -1; + } + for (auto i = first; i <= last; ++i) { + cpuset.insert(i); + } + } + cpus.assign(cpuset.begin(), cpuset.end()); + return 0; + } + return -1; +} + #ifdef BRPC_BTHREAD_TRACER void TaskControl::stack_trace(std::ostream& os, bthread_t tid) { _task_tracer.Trace(os, tid); diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 439c96db8d..92b040dc6e 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -30,6 +30,8 @@ #include #include #include +#include +#include #include "butil/atomicops.h" // butil::atomic #include "bvar/bvar.h" // bvar::PassiveStatus #include "bthread/task_tracer.h" @@ -91,6 +93,36 @@ friend bthread_t init_for_pthread_stack_trace(); // If this method is called after init(), it never returns NULL. TaskGroup* choose_one_group(bthread_tag_t tag); + static int parse_cpuset(std::string value, std::vector& cpus); + + static inline void bind_thread(pthread_t pthread, unsigned cpuId) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpuId, &cs); + auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs); + if (r != 0) { + LOG(WARNING) << "Failed to bind thread to cpu: " << cpuId; + } + (void)r; + } + + static inline std::vector get_current_cpus() { + cpu_set_t cs; + auto r = pthread_getaffinity_np(pthread_self(), sizeof(cs), &cs); + if (r != 0) { + LOG(ERROR) << "get thread affinity failed"; + exit(1); + } + std::vector cpus; + unsigned nr = CPU_COUNT(&cs); + for (int cpu = 0; cpu < CPU_SETSIZE && cpus.size() < nr; cpu++) { + if (CPU_ISSET(cpu, &cs)) { + cpus.push_back(cpu); + } + } + return cpus; + } + #ifdef BRPC_BTHREAD_TRACER // A stacktrace of bthread can be helpful in debugging. void stack_trace(std::ostream& os, bthread_t tid); @@ -139,6 +171,7 @@ friend bthread_t init_for_pthread_stack_trace(); bool _stop; butil::atomic _concurrency; std::vector _workers; + static std::vector _cpus; butil::atomic _next_worker_id; bvar::Adder _nworkers; From 4116fd765da8c5d8b9ca53204445569ea68e7bf0 Mon Sep 17 00:00:00 2001 From: m30070657 Date: Thu, 30 Oct 2025 19:53:55 +0800 Subject: [PATCH 2/3] combine thread_affinity and cpu_set into one flag --- src/bthread/task_control.cpp | 33 +++++++++++++++++---------------- src/bthread/task_control.h | 17 ----------------- 2 files changed, 17 insertions(+), 33 deletions(-) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index c6b81e8b53..44c6b5036a 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -42,7 +42,6 @@ DEFINE_int32(task_group_runqueue_capacity, 4096, DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags"); DEFINE_bool(task_group_set_worker_name, true, "Whether to set the name of the worker thread"); -DEFINE_bool(thread_affinity, false, "Whether to Bind Cores"); DEFINE_string(cpu_set, "", "Set of CPUs to which cores are bound, for example, 0-3,5,6-7; default: all"); @@ -79,13 +78,6 @@ void run_tagged_worker_startfn(bthread_tag_t tag) { struct WorkerThreadArgs { WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {} - WorkerThreadArgs* set_cpuId(unsigned _cpuId) { - if (FLAGS_thread_affinity) { - cpuId = _cpuId; - } - return this; - } - TaskControl* c; bthread_tag_t tag; unsigned cpuId; @@ -100,8 +92,14 @@ void* TaskControl::worker_thread(void* arg) { auto dummy = static_cast(arg); auto c = dummy->c; auto tag = dummy->tag; - if (FLAGS_thread_affinity) { - bind_thread(pthread_self(), _cpus[dummy->cpuId]); + if (!_cpus.empty()) { + if (dummy->cpuId < _cpus.size()) { + bind_thread(pthread_self(), _cpus[dummy->cpuId]); + } else { + LOG(ERROR) << "Failed to bind cpuId=" << dummy->cpuId + << " is out of bounds for _cpus (size=" + << _cpus.size() << ")"; + } } delete dummy; run_tagged_worker_startfn(tag); @@ -225,8 +223,8 @@ int TaskControl::init(int concurrency) { } _concurrency = concurrency; - if (FLAGS_thread_affinity) { - if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1 || _cpus.empty()) { + if (!FLAGS_cpu_set.empty()) { + if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) { LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set; return -1; } @@ -264,7 +262,9 @@ int TaskControl::init(int concurrency) { _workers.resize(_concurrency); for (int i = 0; i < _concurrency; ++i) { auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags); - arg->set_cpuId(i % _cpus.size()); + if (!_cpus.empty()) { + arg->cpuId = i % _cpus.size(); + } const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg); if (rc) { delete arg; @@ -308,7 +308,9 @@ int TaskControl::add_workers(int num, bthread_tag_t tag) { // _concurrency before create a worker. _concurrency.fetch_add(1); auto arg = new WorkerThreadArgs(this, tag); - arg->set_cpuId((i + old_concurency) % _cpus.size()); + if (!_cpus.empty()) { + arg->cpuId = (i + old_concurency) % _cpus.size(); + } const int rc = pthread_create( &_workers[i + old_concurency], NULL, worker_thread, arg); if (rc) { @@ -339,8 +341,7 @@ int TaskControl::parse_cpuset(std::string value, std::vector& cpus) { std::smatch match; std::set cpuset; if (value.empty()) { - cpus = get_current_cpus(); - return 0; + return -1; } if (std::regex_match(value, match, r)) { for (butil::StringSplitter split(value.data(), ','); split; ++split) { diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 92b040dc6e..4eb52c5444 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -106,23 +106,6 @@ friend bthread_t init_for_pthread_stack_trace(); (void)r; } - static inline std::vector get_current_cpus() { - cpu_set_t cs; - auto r = pthread_getaffinity_np(pthread_self(), sizeof(cs), &cs); - if (r != 0) { - LOG(ERROR) << "get thread affinity failed"; - exit(1); - } - std::vector cpus; - unsigned nr = CPU_COUNT(&cs); - for (int cpu = 0; cpu < CPU_SETSIZE && cpus.size() < nr; cpu++) { - if (CPU_ISSET(cpu, &cs)) { - cpus.push_back(cpu); - } - } - return cpus; - } - #ifdef BRPC_BTHREAD_TRACER // A stacktrace of bthread can be helpful in debugging. void stack_trace(std::ostream& os, bthread_t tid); From ffebd510ddca391bb97f50ef571edfa0f52af00f Mon Sep 17 00:00:00 2001 From: maowenjie <3252896864@qq.com> Date: Fri, 31 Oct 2025 12:30:30 +0800 Subject: [PATCH 3/3] review & Support MACOS thread affinity --- src/bthread/task_control.cpp | 78 ++++++++++++++++++++++-------------- src/bthread/task_control.h | 15 +------ 2 files changed, 51 insertions(+), 42 deletions(-) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 44c6b5036a..99e4af5006 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -20,6 +20,8 @@ // Date: Tue Jul 10 17:40:58 CST 2012 #include +#include +#include #include // SYS_gettid #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK #include "butil/errno.h" // berror @@ -34,6 +36,9 @@ #include "bthread/timer_thread.h" // global_timer_thread #include #include "bthread/log.h" +#if defined(OS_MACOSX) +#include +#endif DEFINE_int32(task_group_delete_delay, 1, "delay deletion of TaskGroup for so many seconds"); @@ -43,7 +48,8 @@ DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags"); DEFINE_bool(task_group_set_worker_name, true, "Whether to set the name of the worker thread"); DEFINE_string(cpu_set, "", - "Set of CPUs to which cores are bound, for example, 0-3,5,6-7; default: all"); + "Set of CPUs to which cores are bound. " + "for example, 0-3,5,7; default: disable"); namespace bthread { @@ -60,7 +66,6 @@ extern pthread_mutex_t g_task_control_mutex; extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; void (*g_worker_startfn)() = NULL; void (*g_tagged_worker_startfn)(bthread_tag_t) = NULL; -std::vector TaskControl::_cpus; // May be called in other modules to run startfn in non-worker pthreads. void run_worker_startfn() { @@ -77,10 +82,8 @@ void run_tagged_worker_startfn(bthread_tag_t tag) { struct WorkerThreadArgs { WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {} - TaskControl* c; bthread_tag_t tag; - unsigned cpuId; }; void* TaskControl::worker_thread(void* arg) { @@ -92,15 +95,6 @@ void* TaskControl::worker_thread(void* arg) { auto dummy = static_cast(arg); auto c = dummy->c; auto tag = dummy->tag; - if (!_cpus.empty()) { - if (dummy->cpuId < _cpus.size()) { - bind_thread(pthread_self(), _cpus[dummy->cpuId]); - } else { - LOG(ERROR) << "Failed to bind cpuId=" << dummy->cpuId - << " is out of bounds for _cpus (size=" - << _cpus.size() << ")"; - } - } delete dummy; run_tagged_worker_startfn(tag); @@ -113,10 +107,14 @@ void* TaskControl::worker_thread(void* arg) { g->_tid = pthread_self(); + int worker_id = c->_next_worker_id.fetch_add( + 1, butil::memory_order_relaxed); + if (!c->_cpus.empty()) { + bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id % c->_cpus.size()]); + } if (FLAGS_task_group_set_worker_name) { std::string worker_thread_name = butil::string_printf( - "brpc_wkr:%d-%d", g->tag(), - c->_next_worker_id.fetch_add(1, butil::memory_order_relaxed)); + "brpc_wkr:%d-%d", g->tag(), worker_id); butil::PlatformThread::SetNameSimple(worker_thread_name.c_str()); } BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid @@ -262,9 +260,6 @@ int TaskControl::init(int concurrency) { _workers.resize(_concurrency); for (int i = 0; i < _concurrency; ++i) { auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags); - if (!_cpus.empty()) { - arg->cpuId = i % _cpus.size(); - } const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg); if (rc) { delete arg; @@ -308,9 +303,6 @@ int TaskControl::add_workers(int num, bthread_tag_t tag) { // _concurrency before create a worker. _concurrency.fetch_add(1); auto arg = new WorkerThreadArgs(this, tag); - if (!_cpus.empty()) { - arg->cpuId = (i + old_concurency) % _cpus.size(); - } const int rc = pthread_create( &_workers[i + old_concurency], NULL, worker_thread, arg); if (rc) { @@ -345,14 +337,14 @@ int TaskControl::parse_cpuset(std::string value, std::vector& cpus) { } if (std::regex_match(value, match, r)) { for (butil::StringSplitter split(value.data(), ','); split; ++split) { - butil::StringPiece cpuIds(split.field(), split.length()); - cpuIds.trim_spaces(); - butil::StringPiece begin = cpuIds; - butil::StringPiece end = cpuIds; - auto dash = cpuIds.find('-'); - if (dash != cpuIds.npos) { - begin = cpuIds.substr(0, dash); - end = cpuIds.substr(dash + 1); + butil::StringPiece cpu_ids(split.field(), split.length()); + cpu_ids.trim_spaces(); + butil::StringPiece begin = cpu_ids; + butil::StringPiece end = cpu_ids; + auto dash = cpu_ids.find('-'); + if (dash != cpu_ids.npos) { + begin = cpu_ids.substr(0, dash); + end = cpu_ids.substr(dash + 1); } unsigned first = UINT_MAX; unsigned last = 0; @@ -372,6 +364,34 @@ int TaskControl::parse_cpuset(std::string value, std::vector& cpus) { return -1; } +void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) { +#if defined(OS_LINUX) + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpu_id, &cs); + auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs); + if (r != 0) { + LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id; + } + (void)r; +#elif defined(OS_MACOSX) + thread_port_t mach_thread = pthread_mach_thread_np(pthread); + if (mach_thread != MACH_PORT_NULL) { + LOG(WARNING) << "mach_thread is null" + << "Failed to bind thread to cpu: " << cpu_id; + return; + } + thread_affinity_policy_data_t policy; + policy.affinity_tag = cpu_id; + if (thread_policy_set(mach_thread, + THREAD_AFFINITY_POLICY, + (thread_policy_t)&policy, + THREAD_AFFINITY_POLICY_COUNT) != KERN_SUCCESS) { + LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id; + } +#endif +} + #ifdef BRPC_BTHREAD_TRACER void TaskControl::stack_trace(std::ostream& os, bthread_t tid) { _task_tracer.Trace(os, tid); diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 4eb52c5444..4480daa677 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -30,8 +30,6 @@ #include #include #include -#include -#include #include "butil/atomicops.h" // butil::atomic #include "bvar/bvar.h" // bvar::PassiveStatus #include "bthread/task_tracer.h" @@ -95,16 +93,7 @@ friend bthread_t init_for_pthread_stack_trace(); static int parse_cpuset(std::string value, std::vector& cpus); - static inline void bind_thread(pthread_t pthread, unsigned cpuId) { - cpu_set_t cs; - CPU_ZERO(&cs); - CPU_SET(cpuId, &cs); - auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs); - if (r != 0) { - LOG(WARNING) << "Failed to bind thread to cpu: " << cpuId; - } - (void)r; - } + static void bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id); #ifdef BRPC_BTHREAD_TRACER // A stacktrace of bthread can be helpful in debugging. @@ -154,7 +143,7 @@ friend bthread_t init_for_pthread_stack_trace(); bool _stop; butil::atomic _concurrency; std::vector _workers; - static std::vector _cpus; + std::vector _cpus; butil::atomic _next_worker_id; bvar::Adder _nworkers;