diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index e43ce1ece9..99e4af5006 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -20,6 +20,8 @@ // Date: Tue Jul 10 17:40:58 CST 2012 #include +#include +#include #include // SYS_gettid #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK #include "butil/errno.h" // berror @@ -34,6 +36,9 @@ #include "bthread/timer_thread.h" // global_timer_thread #include #include "bthread/log.h" +#if defined(OS_MACOSX) +#include +#endif DEFINE_int32(task_group_delete_delay, 1, "delay deletion of TaskGroup for so many seconds"); @@ -42,6 +47,9 @@ DEFINE_int32(task_group_runqueue_capacity, 4096, DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags"); DEFINE_bool(task_group_set_worker_name, true, "Whether to set the name of the worker thread"); +DEFINE_string(cpu_set, "", + "Set of CPUs to which cores are bound. " + "for example, 0-3,5,7; default: disable"); namespace bthread { @@ -99,10 +107,14 @@ void* TaskControl::worker_thread(void* arg) { g->_tid = pthread_self(); + int worker_id = c->_next_worker_id.fetch_add( + 1, butil::memory_order_relaxed); + if (!c->_cpus.empty()) { + bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id % c->_cpus.size()]); + } if (FLAGS_task_group_set_worker_name) { std::string worker_thread_name = butil::string_printf( - "brpc_wkr:%d-%d", g->tag(), - c->_next_worker_id.fetch_add(1, butil::memory_order_relaxed)); + "brpc_wkr:%d-%d", g->tag(), worker_id); butil::PlatformThread::SetNameSimple(worker_thread_name.c_str()); } BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid @@ -209,6 +221,13 @@ int TaskControl::init(int concurrency) { } _concurrency = concurrency; + if (!FLAGS_cpu_set.empty()) { + if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) { + LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set; + return -1; + } + } + // task group group by tags for (int i = 0; i < FLAGS_task_group_ntags; ++i) { _tagged_ngroup[i].store(0, std::memory_order_relaxed); @@ -309,6 +328,70 @@ TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) { return NULL; } +int TaskControl::parse_cpuset(std::string value, std::vector& cpus) { + static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*"); + std::smatch match; + std::set cpuset; + if (value.empty()) { + return -1; + } + if (std::regex_match(value, match, r)) { + for (butil::StringSplitter split(value.data(), ','); split; ++split) { + butil::StringPiece cpu_ids(split.field(), split.length()); + cpu_ids.trim_spaces(); + butil::StringPiece begin = cpu_ids; + butil::StringPiece end = cpu_ids; + auto dash = cpu_ids.find('-'); + if (dash != cpu_ids.npos) { + begin = cpu_ids.substr(0, dash); + end = cpu_ids.substr(dash + 1); + } + unsigned first = UINT_MAX; + unsigned last = 0; + int ret; + ret = butil::StringSplitter(begin, '\t').to_uint(&first); + ret = ret | butil::StringSplitter(end, '\t').to_uint(&last); + if (ret != 0 || first > last) { + return -1; + } + for (auto i = first; i <= last; ++i) { + cpuset.insert(i); + } + } + cpus.assign(cpuset.begin(), cpuset.end()); + return 0; + } + return -1; +} + +void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) { +#if defined(OS_LINUX) + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpu_id, &cs); + auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs); + if (r != 0) { + LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id; + } + (void)r; +#elif defined(OS_MACOSX) + thread_port_t mach_thread = pthread_mach_thread_np(pthread); + if (mach_thread != MACH_PORT_NULL) { + LOG(WARNING) << "mach_thread is null" + << "Failed to bind thread to cpu: " << cpu_id; + return; + } + thread_affinity_policy_data_t policy; + policy.affinity_tag = cpu_id; + if (thread_policy_set(mach_thread, + THREAD_AFFINITY_POLICY, + (thread_policy_t)&policy, + THREAD_AFFINITY_POLICY_COUNT) != KERN_SUCCESS) { + LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id; + } +#endif +} + #ifdef BRPC_BTHREAD_TRACER void TaskControl::stack_trace(std::ostream& os, bthread_t tid) { _task_tracer.Trace(os, tid); diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 439c96db8d..4480daa677 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -91,6 +91,10 @@ friend bthread_t init_for_pthread_stack_trace(); // If this method is called after init(), it never returns NULL. TaskGroup* choose_one_group(bthread_tag_t tag); + static int parse_cpuset(std::string value, std::vector& cpus); + + static void bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id); + #ifdef BRPC_BTHREAD_TRACER // A stacktrace of bthread can be helpful in debugging. void stack_trace(std::ostream& os, bthread_t tid); @@ -139,6 +143,7 @@ friend bthread_t init_for_pthread_stack_trace(); bool _stop; butil::atomic _concurrency; std::vector _workers; + std::vector _cpus; butil::atomic _next_worker_id; bvar::Adder _nworkers;