From 787e88e2c784ac035a784dbcd582fe40726d603e Mon Sep 17 00:00:00 2001 From: chenBright Date: Sat, 28 Jun 2025 13:25:52 +0800 Subject: [PATCH 1/4] Bugfix: bthread_worker_usage would be greater than bthread_worker_count --- src/brpc/load_balancer.h | 7 --- src/brpc/policy/randomized_load_balancer.cpp | 3 +- src/brpc/policy/round_robin_load_balancer.cpp | 3 +- .../weighted_randomized_load_balancer.cpp | 3 +- src/bthread/prime_offset.h | 39 +++++++++++++++ src/bthread/task_control.cpp | 20 +++++++- src/bthread/task_control.h | 3 ++ src/bthread/task_group.cpp | 38 ++++----------- src/bthread/task_group.h | 47 ++++++++++--------- 9 files changed, 101 insertions(+), 62 deletions(-) create mode 100644 src/bthread/prime_offset.h diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h index a32b298d1f..cda0517e87 100644 --- a/src/brpc/load_balancer.h +++ b/src/brpc/load_balancer.h @@ -184,13 +184,6 @@ inline Extension* LoadBalancerExtension() { return Extension::instance(); } -inline uint32_t GenRandomStride() { - uint32_t prime_offset[] = { - #include "bthread/offset_inl.list" - }; - return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))]; -} - } // namespace brpc diff --git a/src/brpc/policy/randomized_load_balancer.cpp b/src/brpc/policy/randomized_load_balancer.cpp index 5c4ba447d7..65cfdee975 100644 --- a/src/brpc/policy/randomized_load_balancer.cpp +++ b/src/brpc/policy/randomized_load_balancer.cpp @@ -18,6 +18,7 @@ #include "butil/macros.h" #include "butil/fast_rand.h" +#include "bthread/prime_offset.h" #include "brpc/socket.h" #include "brpc/policy/randomized_load_balancer.h" #include "butil/strings/string_number_conversions.h" @@ -118,7 +119,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { return 0; } if (stride == 0) { - stride = GenRandomStride(); + stride = bthread::prime_offset(); } // If `Address' failed, use `offset+stride' to retry so that // this failed server won't be visited again inside for diff --git a/src/brpc/policy/round_robin_load_balancer.cpp b/src/brpc/policy/round_robin_load_balancer.cpp index 1d16131aeb..fa69aa86c2 100644 --- a/src/brpc/policy/round_robin_load_balancer.cpp +++ b/src/brpc/policy/round_robin_load_balancer.cpp @@ -18,6 +18,7 @@ #include "butil/macros.h" #include "butil/fast_rand.h" +#include "bthread/prime_offset.h" #include "brpc/socket.h" #include "brpc/policy/round_robin_load_balancer.h" @@ -108,7 +109,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { } TLS tls = s.tls(); if (tls.stride == 0) { - tls.stride = GenRandomStride(); + tls.stride = bthread::prime_offset(); // use random at first time, for the case of // use rr lb every time in new thread tls.offset = butil::fast_rand_less_than(n); diff --git a/src/brpc/policy/weighted_randomized_load_balancer.cpp b/src/brpc/policy/weighted_randomized_load_balancer.cpp index 28cd7e3f17..819c550c3e 100644 --- a/src/brpc/policy/weighted_randomized_load_balancer.cpp +++ b/src/brpc/policy/weighted_randomized_load_balancer.cpp @@ -19,6 +19,7 @@ #include #include "butil/fast_rand.h" +#include "bthread/prime_offset.h" #include "brpc/socket.h" #include "brpc/policy/weighted_randomized_load_balancer.h" #include "butil/strings/string_number_conversions.h" @@ -152,7 +153,7 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* if (random_traversed.size() == n) { // Try to traverse the remaining servers to find an available server. uint32_t offset = butil::fast_rand_less_than(n); - uint32_t stride = GenRandomStride(); + uint32_t stride = bthread::prime_offset(); for (size_t i = 0; i < n; ++i) { offset = (offset + stride) % n; SocketId id = s->server_list[offset].id; diff --git a/src/bthread/prime_offset.h b/src/bthread/prime_offset.h new file mode 100644 index 0000000000..7137a68cdb --- /dev/null +++ b/src/bthread/prime_offset.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BTHREAD_PRIME_OFFSET_H +#define BTHREAD_PRIME_OFFSET_H + +#include "butil/fast_rand.h" +#include "butil/macros.h" + +namespace bthread { +// Prime number offset for hash function. +inline size_t prime_offset(size_t seed) { + uint32_t offsets[] = { + #include "bthread/offset_inl.list" + }; + return offsets[seed % ARRAY_SIZE(offsets)]; +} + +inline size_t prime_offset() { + return prime_offset(butil::fast_rand()); +} +} + + +#endif // BTHREAD_PRIME_OFFSET_H \ No newline at end of file diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 66307d323e..007c741abe 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -185,6 +185,7 @@ TaskControl::TaskControl() , _nbthreads("bthread_count") , _priority_queues(FLAGS_task_group_ntags) , _pl(FLAGS_task_group_ntags) + , _last_get_cumulated_time_ns(0) {} int TaskControl::init(int concurrency) { @@ -524,12 +525,29 @@ void TaskControl::print_rq_sizes(std::ostream& os) { double TaskControl::get_cumulated_worker_time() { int64_t cputime_ns = 0; + int64_t now = butil::cpuwide_time_ns(); BAIDU_SCOPED_LOCK(_modify_group_mutex); for_each_task_group([&](TaskGroup* g) { if (g) { - cputime_ns += g->_cumulated_cputime_ns; + // With the acquire-release atomic operation, the CPU time of the bthread is + // only calculated once through `_cumulated_cputime_ns' or `_last_run_ns'. + cputime_ns += g->_cumulated_cputime_ns.load(butil::memory_order_acquire); + // The bthread is still running on the worker, + // so we need to add the elapsed time since it started. + // In extreme cases, before getting `_last_run_ns_in_tc', + // `_last_run_ns_in_tc' may have been updated multiple times, + // and `cputime_ns' will miss some cpu time, which is ok. + int64_t last_run_ns = g->_last_run_ns; + if (last_run_ns > _last_get_cumulated_time_ns) { + g->_last_run_ns_in_tc = last_run_ns; + cputime_ns += now - last_run_ns; + } else if (last_run_ns == g->_last_run_ns_in_tc) { + // The bthread is still running on the same worker. + cputime_ns += now - last_run_ns; + } } }); + _last_get_cumulated_time_ns = now; return cputime_ns / 1000000000.0; } diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 2a2b76d6f2..74ed882a66 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -161,6 +161,9 @@ friend bthread_t init_for_pthread_stack_trace(); std::vector _pl; + // The last time of getting cumulated time. + int64_t _last_get_cumulated_time_ns; + #ifdef BRPC_BTHREAD_TRACER TaskTracer _task_tracer; #endif // BRPC_BTHREAD_TRACER diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 0d3e473e3b..cb01f458ec 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -69,10 +69,6 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, NULL); const TaskStatistics EMPTY_STAT = { 0, 0, 0 }; -const size_t OFFSET_TABLE[] = { -#include "bthread/offset_inl.list" -}; - void* (*g_create_span_func)() = NULL; void* run_create_span_func() { @@ -180,27 +176,7 @@ void TaskGroup::run_main_task() { } TaskGroup::TaskGroup(TaskControl* c) - : _cur_meta(NULL) - , _control(c) - , _num_nosignal(0) - , _nsignaled(0) - , _last_run_ns(butil::cpuwide_time_ns()) - , _cumulated_cputime_ns(0) - , _nswitch(0) - , _last_context_remained(NULL) - , _last_context_remained_arg(NULL) - , _pl(NULL) - , _main_stack(NULL) - , _main_tid(0) - , _remote_num_nosignal(0) - , _remote_nsignaled(0) -#ifndef NDEBUG - , _sched_recursive_guard(0) -#endif - , _tag(BTHREAD_TAG_DEFAULT) - , _tid(-1) { - _steal_seed = butil::fast_rand(); - _steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)]; + : _control(c) { CHECK(c); } @@ -292,7 +268,7 @@ int TaskGroup::init(size_t runqueue_capacity) { _cur_meta = m; _main_tid = m->tid; _main_stack = stk; - _last_run_ns = butil::cpuwide_time_ns(); + _last_run_ns = -m->cpuwide_start_ns; _last_cpu_clock_ns = 0; return 0; } @@ -683,8 +659,7 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) { TaskMeta* const cur_meta = g->_cur_meta; const int64_t now = butil::cpuwide_time_ns(); - const int64_t elp_ns = now - g->_last_run_ns; - g->_last_run_ns = now; + const int64_t elp_ns = now - std::abs(g->_last_run_ns); cur_meta->stat.cputime_ns += elp_ns; if (FLAGS_bthread_enable_cpu_clock_stat) { @@ -696,9 +671,12 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) { } else { g->_last_cpu_clock_ns = 0; } - + + g->_last_run_ns = next_meta->tid != g->main_tid() ? now : -now; if (cur_meta->tid != g->main_tid()) { - g->_cumulated_cputime_ns += elp_ns; + // Makes sure that we see the change of `_cur_run_start_ns' + // before changing `_cumulated_cputime_ns'. + g->_cumulated_cputime_ns.fetch_add(elp_ns, butil::memory_order_release); } ++cur_meta->stat.nswitch; ++ g->_nswitch; diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index ccba9ba3c7..bbd04dd474 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -29,6 +29,7 @@ #include "bthread/remote_task_queue.h" // RemoteTaskQueue #include "butil/resource_pool.h" // ResourceId #include "bthread/parking_lot.h" +#include "bthread/prime_offset.h" namespace bthread { @@ -248,41 +249,45 @@ friend class TaskControl; void set_pl(ParkingLot* pl) { _pl = pl; } - TaskMeta* _cur_meta; + TaskMeta* _cur_meta{NULL}; // the control that this group belongs to - TaskControl* _control; - int _num_nosignal; - int _nsignaled; - // last scheduling time - int64_t _last_run_ns; - int64_t _cumulated_cputime_ns; + TaskControl* _control{NULL}; + int _num_nosignal{0}; + int _nsignaled{0}; + // Last scheduling time. If this value is negative, + // it means that it is the main task. + int64_t _last_run_ns{0}; + // Last scheduling time observed in + // TaskControl::get_cumulated_worker_time(). + int64_t _last_run_ns_in_tc{0}; + butil::atomic _cumulated_cputime_ns{0}; // last thread cpu clock - int64_t _last_cpu_clock_ns; + int64_t _last_cpu_clock_ns{0}; - size_t _nswitch; - RemainedFn _last_context_remained; - void* _last_context_remained_arg; + size_t _nswitch{0}; + RemainedFn _last_context_remained{NULL}; + void* _last_context_remained_arg{NULL}; - ParkingLot* _pl; + ParkingLot* _pl{NULL}; #ifndef BTHREAD_DONT_SAVE_PARKING_STATE ParkingLot::State _last_pl_state; #endif - size_t _steal_seed; - size_t _steal_offset; - ContextualStack* _main_stack; - bthread_t _main_tid; + size_t _steal_seed{butil::fast_rand()}; + size_t _steal_offset{prime_offset(_steal_seed)}; + ContextualStack* _main_stack{NULL}; + bthread_t _main_tid{INVALID_BTHREAD}; WorkStealingQueue _rq; RemoteTaskQueue _remote_rq; - int _remote_num_nosignal; - int _remote_nsignaled; + int _remote_num_nosignal{0}; + int _remote_nsignaled{0}; - int _sched_recursive_guard; + int _sched_recursive_guard{0}; // tag of this taskgroup - bthread_tag_t _tag; + bthread_tag_t _tag{BTHREAD_TAG_DEFAULT}; // Worker thread id. - pid_t _tid; + pid_t _tid{-1}; }; } // namespace bthread From 4b34d6265c7606c9dbbb8099fed68380103a9e80 Mon Sep 17 00:00:00 2001 From: chenBright Date: Mon, 30 Jun 2025 00:08:19 +0800 Subject: [PATCH 2/4] Use CPU atomic 128-bit aligned loads and stores --- src/bthread/task_control.cpp | 66 ++++++++++++++++++++++-------------- src/bthread/task_control.h | 6 ++-- src/bthread/task_group.cpp | 52 ++++++++++++++++++++++++---- src/bthread/task_group.h | 22 +++++++----- 4 files changed, 101 insertions(+), 45 deletions(-) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 007c741abe..e8805159a0 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -33,6 +33,13 @@ #include "bthread/timer_thread.h" // global_timer_thread #include #include "bthread/log.h" +#ifdef __x86_64__ +#include +#endif // __x86_64__ + +#ifdef __ARM_NEON +#include +#endif // __ARM_NEON DEFINE_int32(task_group_delete_delay, 1, "delay deletion of TaskGroup for so many seconds"); @@ -152,7 +159,7 @@ static double get_cumulated_worker_time_from_this_with_tag(void* arg) { auto a = static_cast(arg); auto c = a->c; auto t = a->t; - return c->get_cumulated_worker_time_with_tag(t); + return c->get_cumulated_worker_time(t); } static int64_t get_cumulated_switch_count_from_this(void *arg) { @@ -185,7 +192,6 @@ TaskControl::TaskControl() , _nbthreads("bthread_count") , _priority_queues(FLAGS_task_group_ntags) , _pl(FLAGS_task_group_ntags) - , _last_get_cumulated_time_ns(0) {} int TaskControl::init(int concurrency) { @@ -525,45 +531,53 @@ void TaskControl::print_rq_sizes(std::ostream& os) { double TaskControl::get_cumulated_worker_time() { int64_t cputime_ns = 0; - int64_t now = butil::cpuwide_time_ns(); BAIDU_SCOPED_LOCK(_modify_group_mutex); for_each_task_group([&](TaskGroup* g) { - if (g) { - // With the acquire-release atomic operation, the CPU time of the bthread is - // only calculated once through `_cumulated_cputime_ns' or `_last_run_ns'. - cputime_ns += g->_cumulated_cputime_ns.load(butil::memory_order_acquire); - // The bthread is still running on the worker, - // so we need to add the elapsed time since it started. - // In extreme cases, before getting `_last_run_ns_in_tc', - // `_last_run_ns_in_tc' may have been updated multiple times, - // and `cputime_ns' will miss some cpu time, which is ok. - int64_t last_run_ns = g->_last_run_ns; - if (last_run_ns > _last_get_cumulated_time_ns) { - g->_last_run_ns_in_tc = last_run_ns; - cputime_ns += now - last_run_ns; - } else if (last_run_ns == g->_last_run_ns_in_tc) { - // The bthread is still running on the same worker. - cputime_ns += now - last_run_ns; - } - } + cputime_ns += get_cumulated_worker_time(g); }); - _last_get_cumulated_time_ns = now; return cputime_ns / 1000000000.0; } -double TaskControl::get_cumulated_worker_time_with_tag(bthread_tag_t tag) { +double TaskControl::get_cumulated_worker_time(bthread_tag_t tag) { int64_t cputime_ns = 0; BAIDU_SCOPED_LOCK(_modify_group_mutex); const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed); auto& groups = tag_group(tag); for (size_t i = 0; i < ngroup; ++i) { - if (groups[i]) { - cputime_ns += groups[i]->_cumulated_cputime_ns; - } + cputime_ns += get_cumulated_worker_time(groups[i]); } return cputime_ns / 1000000000.0; } +double TaskControl::get_cumulated_worker_time(TaskGroup* g) { + if (NULL == g) { + return 0.0; + } + +#if __x86_64__ || __ARM_NEON +#ifdef __x86_64__ + __m128i cpu_time_stat = _mm_load_si128(reinterpret_cast<__m128i*>(&g->_cpu_time_stat)); +#else // __ARM_NEON + int64x2_t cpu_time_stat = vld1q_s64(reinterpret_cast(&g->_cpu_time_stat)); +#endif // __x86_64__ + int64_t last_run_ns = cpu_time_stat[0]; + int64_t cputime_ns = cpu_time_stat[1]; +#else // __x86_64__ || __ARM_NEON + int64_t last_run_ns = 0; + int64_t cputime_ns = 0; + { + BAIDU_SCOPED_LOCK(g->_cpu_time_stat_mutex); + last_run_ns = g->_cpu_time_stat.last_run_ns; + cputime_ns = g->_cpu_time_stat.cumulated_cputime_ns; + } +#endif // __x86_64__ || __ARM_NEON + // Add the elapsed time of running bthread. + if (last_run_ns > 0) { + cputime_ns += butil::cpuwide_time_ns() - last_run_ns; + } + return cputime_ns; +} + int64_t TaskControl::get_cumulated_switch_count() { int64_t c = 0; BAIDU_SCOPED_LOCK(_modify_group_mutex); diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 74ed882a66..7cdd756a65 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -79,7 +79,8 @@ friend bthread_t init_for_pthread_stack_trace(); void print_rq_sizes(std::ostream& os); double get_cumulated_worker_time(); - double get_cumulated_worker_time_with_tag(bthread_tag_t tag); + double get_cumulated_worker_time(bthread_tag_t tag); + double get_cumulated_worker_time(TaskGroup* g); int64_t get_cumulated_switch_count(); int64_t get_cumulated_signal_count(); @@ -161,9 +162,6 @@ friend bthread_t init_for_pthread_stack_trace(); std::vector _pl; - // The last time of getting cumulated time. - int64_t _last_get_cumulated_time_ns; - #ifdef BRPC_BTHREAD_TRACER TaskTracer _task_tracer; #endif // BRPC_BTHREAD_TRACER diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index cb01f458ec..06de284516 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -37,6 +37,14 @@ #include "bthread/task_group.h" #include "bthread/timer_thread.h" +#ifdef __x86_64__ +#include +#endif // __x86_64__ + +#ifdef __ARM_NEON +#include +#endif // __ARM_NEON + namespace bthread { static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = { @@ -172,12 +180,18 @@ void TaskGroup::run_main_task() { } } // Don't forget to add elapse of last wait_task. - current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns; + current_task()->stat.cputime_ns += + butil::cpuwide_time_ns() - std::abs(_cpu_time_stat.last_run_ns); } TaskGroup::TaskGroup(TaskControl* c) : _control(c) { CHECK(c); +#if __x86_64__ || __ARM_NEON + // Supress compiler warning. + (void)_cpu_time_stat_mutex; +#endif // __x86_64__ || __ARM_NEON + } TaskGroup::~TaskGroup() { @@ -268,7 +282,7 @@ int TaskGroup::init(size_t runqueue_capacity) { _cur_meta = m; _main_tid = m->tid; _main_stack = stk; - _last_run_ns = -m->cpuwide_start_ns; + _cpu_time_stat.last_run_ns = -m->cpuwide_start_ns; _last_cpu_clock_ns = 0; return 0; } @@ -659,7 +673,7 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) { TaskMeta* const cur_meta = g->_cur_meta; const int64_t now = butil::cpuwide_time_ns(); - const int64_t elp_ns = now - std::abs(g->_last_run_ns); + const int64_t elp_ns = now - std::abs(g->_cpu_time_stat.last_run_ns); cur_meta->stat.cputime_ns += elp_ns; if (FLAGS_bthread_enable_cpu_clock_stat) { @@ -672,12 +686,36 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) { g->_last_cpu_clock_ns = 0; } - g->_last_run_ns = next_meta->tid != g->main_tid() ? now : -now; +#if __x86_64__ || __ARM_NEON + // Refer to https://rigtorp.se/isatomic/, On the modern CPU microarchitectures + // (Skylake and Zen 2) AVX/AVX2 128b/256b aligned loads and stores are atomic + // even though Intel and AMD officially doesn’t guarantee this. + CPUTimeStat cpu_time_stat{ + next_meta->tid != g->main_tid() ? now : -now, + g->_cpu_time_stat.cumulated_cputime_ns + }; if (cur_meta->tid != g->main_tid()) { - // Makes sure that we see the change of `_cur_run_start_ns' - // before changing `_cumulated_cputime_ns'. - g->_cumulated_cputime_ns.fetch_add(elp_ns, butil::memory_order_release); + cpu_time_stat.cumulated_cputime_ns += elp_ns; } +#if __x86_64__ + // On X86, SSE instructions can ensure atomic loads and stores. + __m128i value = _mm_load_si128(reinterpret_cast<__m128i*>(&cpu_time_stat)); + _mm_store_si128(reinterpret_cast<__m128i*>(&g->_cpu_time_stat), value); +#else // __ARM_NEON + // Starting from Armv8.4-A, neon can ensure atomic loads and stores. + int64x2_t value = vld1q_s64(reinterpret_cast(&cpu_time_stat)); + vst1q_s64(reinterpret_cast(&g->_cpu_time_stat), value); +#endif // __x86_64__ +#else // __x86_64__ || __ARM_NEON + { + BAIDU_SCOPED_LOCK(g->_cpu_time_stat_mutex); + g->_cpu_time_stat.last_run_ns = next_meta->tid != g->main_tid() ? now : -now; + if (cur_meta->tid != g->main_tid()) { + g->_cpu_time_stat.cumulated_cputime_ns += elp_ns; + } + } +#endif // __x86_64__ || __ARM_NEON + ++cur_meta->stat.nswitch; ++ g->_nswitch; // Switch to the task diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index bbd04dd474..fb81d10a5c 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -149,7 +149,9 @@ class TaskGroup { { return _cur_meta->stack == _main_stack; } // Active time in nanoseconds spent by this TaskGroup. - int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; } + int64_t cumulated_cputime_ns() const { + return _cpu_time_stat.cumulated_cputime_ns; + } // Push a bthread into the runqueue void ready_to_run(TaskMeta* meta, bool nosignal = false); @@ -204,6 +206,14 @@ class TaskGroup { private: friend class TaskControl; + struct CPUTimeStat { + // Last scheduling time. + // If this value is negative, + // it means that it is the main task. + int64_t last_run_ns; + int64_t cumulated_cputime_ns; + }; + // You shall use TaskControl::create_group to create new instance. explicit TaskGroup(TaskControl* c); @@ -255,13 +265,9 @@ friend class TaskControl; TaskControl* _control{NULL}; int _num_nosignal{0}; int _nsignaled{0}; - // Last scheduling time. If this value is negative, - // it means that it is the main task. - int64_t _last_run_ns{0}; - // Last scheduling time observed in - // TaskControl::get_cumulated_worker_time(). - int64_t _last_run_ns_in_tc{0}; - butil::atomic _cumulated_cputime_ns{0}; + // Used to protect `_cpu_time_stat' when __x86_64__ and __ARM_NEON is not defined. + FastPthreadMutex _cpu_time_stat_mutex; + BAIDU_CACHELINE_ALIGNMENT CPUTimeStat _cpu_time_stat{0, 0}; // last thread cpu clock int64_t _last_cpu_clock_ns{0}; From 9ca6a4b98ce9043de9c7362457e35d9596ceebf5 Mon Sep 17 00:00:00 2001 From: chenBright Date: Fri, 18 Jul 2025 14:49:44 +0800 Subject: [PATCH 3/4] Encapsulating AtomicInteger128 class --- src/bthread/task_control.cpp | 33 +---------- src/bthread/task_control.h | 1 - src/bthread/task_group.cpp | 107 +++++++++++++++++++--------------- src/bthread/task_group.h | 109 +++++++++++++++++++++++++++++++---- 4 files changed, 160 insertions(+), 90 deletions(-) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index e8805159a0..a30389962c 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -533,7 +533,7 @@ double TaskControl::get_cumulated_worker_time() { int64_t cputime_ns = 0; BAIDU_SCOPED_LOCK(_modify_group_mutex); for_each_task_group([&](TaskGroup* g) { - cputime_ns += get_cumulated_worker_time(g); + cputime_ns += g->cumulated_cputime_ns(); }); return cputime_ns / 1000000000.0; } @@ -544,40 +544,11 @@ double TaskControl::get_cumulated_worker_time(bthread_tag_t tag) { const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed); auto& groups = tag_group(tag); for (size_t i = 0; i < ngroup; ++i) { - cputime_ns += get_cumulated_worker_time(groups[i]); + cputime_ns += groups[i]->cumulated_cputime_ns(); } return cputime_ns / 1000000000.0; } -double TaskControl::get_cumulated_worker_time(TaskGroup* g) { - if (NULL == g) { - return 0.0; - } - -#if __x86_64__ || __ARM_NEON -#ifdef __x86_64__ - __m128i cpu_time_stat = _mm_load_si128(reinterpret_cast<__m128i*>(&g->_cpu_time_stat)); -#else // __ARM_NEON - int64x2_t cpu_time_stat = vld1q_s64(reinterpret_cast(&g->_cpu_time_stat)); -#endif // __x86_64__ - int64_t last_run_ns = cpu_time_stat[0]; - int64_t cputime_ns = cpu_time_stat[1]; -#else // __x86_64__ || __ARM_NEON - int64_t last_run_ns = 0; - int64_t cputime_ns = 0; - { - BAIDU_SCOPED_LOCK(g->_cpu_time_stat_mutex); - last_run_ns = g->_cpu_time_stat.last_run_ns; - cputime_ns = g->_cpu_time_stat.cumulated_cputime_ns; - } -#endif // __x86_64__ || __ARM_NEON - // Add the elapsed time of running bthread. - if (last_run_ns > 0) { - cputime_ns += butil::cpuwide_time_ns() - last_run_ns; - } - return cputime_ns; -} - int64_t TaskControl::get_cumulated_switch_count() { int64_t c = 0; BAIDU_SCOPED_LOCK(_modify_group_mutex); diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 7cdd756a65..11587b295f 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -80,7 +80,6 @@ friend bthread_t init_for_pthread_stack_trace(); double get_cumulated_worker_time(); double get_cumulated_worker_time(bthread_tag_t tag); - double get_cumulated_worker_time(TaskGroup* g); int64_t get_cumulated_switch_count(); int64_t get_cumulated_signal_count(); diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 06de284516..d4cb81f69a 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -86,6 +86,39 @@ void* run_create_span_func() { return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span; } +AtomicInteger128::Value AtomicInteger128::load() const { +#if __x86_64__ || __ARM_NEON + // Supress compiler warning. + (void)_mutex; +#endif // __x86_64__ || __ARM_NEON + +#if __x86_64__ || __ARM_NEON +#ifdef __x86_64__ + __m128i value = _mm_load_si128(reinterpret_cast(&_value)); +#else // __ARM_NEON + int64x2_t value = vld1q_s64(reinterpret_cast(&_value)); +#endif // __x86_64__ + return {value[0], value[1]}; +#else // __x86_64__ || __ARM_NEON + BAIDU_SCOPED_LOCK(_mutex); + return _value; +#endif // __x86_64__ || __ARM_NEON +} + +void AtomicInteger128::store(Value value) { +#if __x86_64__ + __m128i v = _mm_load_si128(reinterpret_cast<__m128i*>(&value)); + _mm_store_si128(reinterpret_cast<__m128i*>(&_value), v); +#elif __ARM_NEON + int64x2_t v = vld1q_s64(reinterpret_cast(&value)); + vst1q_s64(reinterpret_cast(&_value), v); +#else + BAIDU_SCOPED_LOCK(_mutex); + _value = value; +#endif // __x86_64__ || __ARM_NEON +} + + int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) { TaskMeta* const m = address_meta(tid); if (m != NULL) { @@ -152,6 +185,16 @@ static double get_cumulated_cputime_from_this(void* arg) { return static_cast(arg)->cumulated_cputime_ns() / 1000000000.0; } +int64_t TaskGroup::cumulated_cputime_ns() const { + CPUTimeStat cpu_time_stat = _cpu_time_stat.load(); + // Add the elapsed time of running bthread. + int64_t cumulated_cputime_ns = cpu_time_stat.cumulated_cputime_ns(); + if (!cpu_time_stat.is_main_task()) { + cumulated_cputime_ns += butil::cpuwide_time_ns() - cpu_time_stat.last_run_ns(); + } + return cumulated_cputime_ns; +} + void TaskGroup::run_main_task() { bvar::PassiveStatus cumulated_cputime( get_cumulated_cputime_from_this, this); @@ -160,11 +203,11 @@ void TaskGroup::run_main_task() { TaskGroup* dummy = this; bthread_t tid; while (wait_task(&tid)) { - TaskGroup::sched_to(&dummy, tid); + sched_to(&dummy, tid); DCHECK_EQ(this, dummy); DCHECK_EQ(_cur_meta->stack, _main_stack); if (_cur_meta->tid != _main_tid) { - TaskGroup::task_runner(1/*skip remained*/); + task_runner(1/*skip remained*/); } if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) { char name[32]; @@ -181,17 +224,12 @@ void TaskGroup::run_main_task() { } // Don't forget to add elapse of last wait_task. current_task()->stat.cputime_ns += - butil::cpuwide_time_ns() - std::abs(_cpu_time_stat.last_run_ns); + butil::cpuwide_time_ns() - _cpu_time_stat.load_unsafe().last_run_ns(); } TaskGroup::TaskGroup(TaskControl* c) : _control(c) { CHECK(c); -#if __x86_64__ || __ARM_NEON - // Supress compiler warning. - (void)_cpu_time_stat_mutex; -#endif // __x86_64__ || __ARM_NEON - } TaskGroup::~TaskGroup() { @@ -282,8 +320,12 @@ int TaskGroup::init(size_t runqueue_capacity) { _cur_meta = m; _main_tid = m->tid; _main_stack = stk; - _cpu_time_stat.last_run_ns = -m->cpuwide_start_ns; + + CPUTimeStat cpu_time_stat; + cpu_time_stat.set_last_run_ns(m->cpuwide_start_ns, true); + _cpu_time_stat.store(cpu_time_stat); _last_cpu_clock_ns = 0; + return 0; } @@ -404,7 +446,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) { g->_control->_nbthreads << -1; g->_control->tag_nbthreads(g->tag()) << -1; - g->set_remained(TaskGroup::_release_last_context, m); + g->set_remained(_release_last_context, m); ending_sched(&g); } while (g->_cur_meta->tid != g->_main_tid); @@ -481,9 +523,7 @@ int TaskGroup::start_foreground(TaskGroup** pg, fn = ready_to_run_in_worker; } ReadyToRunArgs args = { - g->tag(), - g->_cur_meta, - (bool)(using_attr.flags & BTHREAD_NOSIGNAL) + g->tag(), g->_cur_meta, (bool)(using_attr.flags & BTHREAD_NOSIGNAL) }; g->set_remained(fn, &args); sched_to(pg, m->tid); @@ -668,13 +708,18 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) { } #endif // Save errno so that errno is bthread-specific. - const int saved_errno = errno; + int saved_errno = errno; void* saved_unique_user_ptr = tls_unique_user_ptr; TaskMeta* const cur_meta = g->_cur_meta; - const int64_t now = butil::cpuwide_time_ns(); - const int64_t elp_ns = now - std::abs(g->_cpu_time_stat.last_run_ns); + int64_t now = butil::cpuwide_time_ns(); + CPUTimeStat cpu_time_stat = g->_cpu_time_stat.load_unsafe(); + int64_t elp_ns = now - cpu_time_stat.last_run_ns(); cur_meta->stat.cputime_ns += elp_ns; + // Update cpu_time_stat. + cpu_time_stat.set_last_run_ns(now, is_main_task(g, next_meta->tid)); + cpu_time_stat.add_cumulated_cputime_ns(elp_ns, is_main_task(g, cur_meta->tid)); + g->_cpu_time_stat.store(cpu_time_stat); if (FLAGS_bthread_enable_cpu_clock_stat) { const int64_t cpu_thread_time = butil::cputhread_time_ns(); @@ -686,36 +731,6 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) { g->_last_cpu_clock_ns = 0; } -#if __x86_64__ || __ARM_NEON - // Refer to https://rigtorp.se/isatomic/, On the modern CPU microarchitectures - // (Skylake and Zen 2) AVX/AVX2 128b/256b aligned loads and stores are atomic - // even though Intel and AMD officially doesn’t guarantee this. - CPUTimeStat cpu_time_stat{ - next_meta->tid != g->main_tid() ? now : -now, - g->_cpu_time_stat.cumulated_cputime_ns - }; - if (cur_meta->tid != g->main_tid()) { - cpu_time_stat.cumulated_cputime_ns += elp_ns; - } -#if __x86_64__ - // On X86, SSE instructions can ensure atomic loads and stores. - __m128i value = _mm_load_si128(reinterpret_cast<__m128i*>(&cpu_time_stat)); - _mm_store_si128(reinterpret_cast<__m128i*>(&g->_cpu_time_stat), value); -#else // __ARM_NEON - // Starting from Armv8.4-A, neon can ensure atomic loads and stores. - int64x2_t value = vld1q_s64(reinterpret_cast(&cpu_time_stat)); - vst1q_s64(reinterpret_cast(&g->_cpu_time_stat), value); -#endif // __x86_64__ -#else // __x86_64__ || __ARM_NEON - { - BAIDU_SCOPED_LOCK(g->_cpu_time_stat_mutex); - g->_cpu_time_stat.last_run_ns = next_meta->tid != g->main_tid() ? now : -now; - if (cur_meta->tid != g->main_tid()) { - g->_cpu_time_stat.cumulated_cputime_ns += elp_ns; - } - } -#endif // __x86_64__ || __ARM_NEON - ++cur_meta->stat.nswitch; ++ g->_nswitch; // Switch to the task diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index fb81d10a5c..b79e69d8b2 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -48,6 +48,35 @@ class ExitException : public std::exception { void* _value; }; +// Refer to https://rigtorp.se/isatomic/, On the modern CPU microarchitectures +// (Skylake and Zen 2) AVX/AVX2 128b/256b aligned loads and stores are atomic +// even though Intel and AMD officially doesn’t guarantee this. +// On X86, SSE instructions can ensure atomic loads and stores. +// Starting from Armv8.4-A, neon can ensure atomic loads and stores. +// Otherwise, use mutex to guarantee atomicity. +class AtomicInteger128 { +public: + struct Value { + int64_t v1; + int64_t v2; + }; + + AtomicInteger128() = default; + explicit AtomicInteger128(Value value) : _value(value) {} + + Value load() const; + Value load_unsafe() const { + return _value; + } + + void store(Value value); + +private: + Value BAIDU_CACHELINE_ALIGNMENT _value{}; + // Used to protect `_cpu_time_stat' when __x86_64__ and __ARM_NEON is not defined. + FastPthreadMutex _mutex; +}; + // Thread-local group of tasks. // Notice that most methods involving context switching are static otherwise // pointer `this' may change after wakeup. The **pg parameters in following @@ -149,9 +178,7 @@ class TaskGroup { { return _cur_meta->stack == _main_stack; } // Active time in nanoseconds spent by this TaskGroup. - int64_t cumulated_cputime_ns() const { - return _cpu_time_stat.cumulated_cputime_ns; - } + int64_t cumulated_cputime_ns() const; // Push a bthread into the runqueue void ready_to_run(TaskMeta* meta, bool nosignal = false); @@ -206,12 +233,68 @@ class TaskGroup { private: friend class TaskControl; - struct CPUTimeStat { - // Last scheduling time. - // If this value is negative, - // it means that it is the main task. - int64_t last_run_ns; - int64_t cumulated_cputime_ns; + // Last scheduling time, task type and cumulated CPU time. + class CPUTimeStat { + static constexpr int64_t LAST_SCHEDULING_TIME_MASK = 0x7FFFFFFFFFFFFFFFLL; + static constexpr int64_t TASK_TYPE_MASK = 0x8000000000000000LL; + public: + CPUTimeStat() : _last_run_ns_and_type(0), _cumulated_cputime_ns(0) {} + CPUTimeStat(AtomicInteger128::Value value) + : _last_run_ns_and_type(value.v1), _cumulated_cputime_ns(value.v2) {} + + // Convert to AtomicInteger128::Value for atomic operations. + explicit operator AtomicInteger128::Value() const { + return {_last_run_ns_and_type, _cumulated_cputime_ns}; + } + + void set_last_run_ns(int64_t last_run_ns, bool main_task) { + _last_run_ns_and_type = (last_run_ns & LAST_SCHEDULING_TIME_MASK) | + (static_cast(main_task) << 63); + } + int64_t last_run_ns() const { + return _last_run_ns_and_type & LAST_SCHEDULING_TIME_MASK; + } + int64_t last_run_ns_and_type() const { + return _last_run_ns_and_type; + } + + bool is_main_task() const { + return _last_run_ns_and_type & TASK_TYPE_MASK; + } + + void add_cumulated_cputime_ns(int64_t cputime_ns, bool main_task) { + if (main_task) { + return; + } + _cumulated_cputime_ns += cputime_ns; + } + int64_t cumulated_cputime_ns() const { + return _cumulated_cputime_ns; + } + + private: + // The higher bit for task type, main task is 1, otherwise 0. + // Lowest 63 bits for last scheduling time. + int64_t _last_run_ns_and_type; + // Cumulated CPU time in nanoseconds. + int64_t _cumulated_cputime_ns; + }; + + class AtomicCPUTimeStat { + public: + CPUTimeStat load() const { + return _cpu_time_stat.load(); + } + CPUTimeStat load_unsafe() const { + return _cpu_time_stat.load_unsafe(); + } + + void store(CPUTimeStat cpu_time_stat) { + _cpu_time_stat.store(AtomicInteger128::Value(cpu_time_stat)); + } + + private: + AtomicInteger128 _cpu_time_stat; }; // You shall use TaskControl::create_group to create new instance. @@ -259,15 +342,17 @@ friend class TaskControl; void set_pl(ParkingLot* pl) { _pl = pl; } + static bool is_main_task(TaskGroup* g, bthread_t tid) { + return g->_main_tid == tid; + } + TaskMeta* _cur_meta{NULL}; // the control that this group belongs to TaskControl* _control{NULL}; int _num_nosignal{0}; int _nsignaled{0}; - // Used to protect `_cpu_time_stat' when __x86_64__ and __ARM_NEON is not defined. - FastPthreadMutex _cpu_time_stat_mutex; - BAIDU_CACHELINE_ALIGNMENT CPUTimeStat _cpu_time_stat{0, 0}; + AtomicCPUTimeStat _cpu_time_stat; // last thread cpu clock int64_t _last_cpu_clock_ns{0}; From ff9e9031dd2fa82610e5440a8dccba31bf56e023 Mon Sep 17 00:00:00 2001 From: chenBright Date: Sun, 20 Jul 2025 22:27:00 +0800 Subject: [PATCH 4/4] Remove unused header files --- src/bthread/task_control.cpp | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index a30389962c..94be504f1b 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -33,13 +33,6 @@ #include "bthread/timer_thread.h" // global_timer_thread #include #include "bthread/log.h" -#ifdef __x86_64__ -#include -#endif // __x86_64__ - -#ifdef __ARM_NEON -#include -#endif // __ARM_NEON DEFINE_int32(task_group_delete_delay, 1, "delay deletion of TaskGroup for so many seconds");