Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions src/brpc/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,6 @@ inline Extension<const LoadBalancer>* LoadBalancerExtension() {
return Extension<const LoadBalancer>::instance();
}

inline uint32_t GenRandomStride() {
uint32_t prime_offset[] = {
#include "bthread/offset_inl.list"
};
return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
}

} // namespace brpc


Expand Down
3 changes: 2 additions & 1 deletion src/brpc/policy/randomized_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "butil/macros.h"
#include "butil/fast_rand.h"
#include "bthread/prime_offset.h"
#include "brpc/socket.h"
#include "brpc/policy/randomized_load_balancer.h"
#include "butil/strings/string_number_conversions.h"
Expand Down Expand Up @@ -118,7 +119,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
return 0;
}
if (stride == 0) {
stride = GenRandomStride();
stride = bthread::prime_offset();
}
// If `Address' failed, use `offset+stride' to retry so that
// this failed server won't be visited again inside for
Expand Down
3 changes: 2 additions & 1 deletion src/brpc/policy/round_robin_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "butil/macros.h"
#include "butil/fast_rand.h"
#include "bthread/prime_offset.h"
#include "brpc/socket.h"
#include "brpc/policy/round_robin_load_balancer.h"

Expand Down Expand Up @@ -108,7 +109,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
}
TLS tls = s.tls();
if (tls.stride == 0) {
tls.stride = GenRandomStride();
tls.stride = bthread::prime_offset();
// use random at first time, for the case of
// use rr lb every time in new thread
tls.offset = butil::fast_rand_less_than(n);
Expand Down
3 changes: 2 additions & 1 deletion src/brpc/policy/weighted_randomized_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <algorithm>

#include "butil/fast_rand.h"
#include "bthread/prime_offset.h"
#include "brpc/socket.h"
#include "brpc/policy/weighted_randomized_load_balancer.h"
#include "butil/strings/string_number_conversions.h"
Expand Down Expand Up @@ -152,7 +153,7 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
if (random_traversed.size() == n) {
// Try to traverse the remaining servers to find an available server.
uint32_t offset = butil::fast_rand_less_than(n);
uint32_t stride = GenRandomStride();
uint32_t stride = bthread::prime_offset();
for (size_t i = 0; i < n; ++i) {
offset = (offset + stride) % n;
SocketId id = s->server_list[offset].id;
Expand Down
39 changes: 39 additions & 0 deletions src/bthread/prime_offset.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef BTHREAD_PRIME_OFFSET_H
#define BTHREAD_PRIME_OFFSET_H

#include "butil/fast_rand.h"
#include "butil/macros.h"

namespace bthread {
// Prime number offset for hash function.
inline size_t prime_offset(size_t seed) {
uint32_t offsets[] = {
#include "bthread/offset_inl.list"
};
return offsets[seed % ARRAY_SIZE(offsets)];
}

inline size_t prime_offset() {
return prime_offset(butil::fast_rand());
}
}


#endif // BTHREAD_PRIME_OFFSET_H
12 changes: 4 additions & 8 deletions src/bthread/task_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ static double get_cumulated_worker_time_from_this_with_tag(void* arg) {
auto a = static_cast<CumulatedWithTagArgs*>(arg);
auto c = a->c;
auto t = a->t;
return c->get_cumulated_worker_time_with_tag(t);
return c->get_cumulated_worker_time(t);
}

static int64_t get_cumulated_switch_count_from_this(void *arg) {
Expand Down Expand Up @@ -526,22 +526,18 @@ double TaskControl::get_cumulated_worker_time() {
int64_t cputime_ns = 0;
BAIDU_SCOPED_LOCK(_modify_group_mutex);
for_each_task_group([&](TaskGroup* g) {
if (g) {
cputime_ns += g->_cumulated_cputime_ns;
}
cputime_ns += g->cumulated_cputime_ns();
});
return cputime_ns / 1000000000.0;
}

double TaskControl::get_cumulated_worker_time_with_tag(bthread_tag_t tag) {
double TaskControl::get_cumulated_worker_time(bthread_tag_t tag) {
int64_t cputime_ns = 0;
BAIDU_SCOPED_LOCK(_modify_group_mutex);
const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed);
auto& groups = tag_group(tag);
for (size_t i = 0; i < ngroup; ++i) {
if (groups[i]) {
cputime_ns += groups[i]->_cumulated_cputime_ns;
}
cputime_ns += groups[i]->cumulated_cputime_ns();
}
return cputime_ns / 1000000000.0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/task_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ friend bthread_t init_for_pthread_stack_trace();
void print_rq_sizes(std::ostream& os);

double get_cumulated_worker_time();
double get_cumulated_worker_time_with_tag(bthread_tag_t tag);
double get_cumulated_worker_time(bthread_tag_t tag);
int64_t get_cumulated_switch_count();
int64_t get_cumulated_signal_count();

Expand Down
113 changes: 72 additions & 41 deletions src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@
#include "bthread/task_group.h"
#include "bthread/timer_thread.h"

#ifdef __x86_64__
#include <x86intrin.h>
#endif // __x86_64__

#ifdef __ARM_NEON
#include <arm_neon.h>
#endif // __ARM_NEON

namespace bthread {

static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = {
Expand Down Expand Up @@ -69,10 +77,6 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, NULL);

const TaskStatistics EMPTY_STAT = { 0, 0, 0 };

const size_t OFFSET_TABLE[] = {
#include "bthread/offset_inl.list"
};

void* (*g_create_span_func)() = NULL;

void* run_create_span_func() {
Expand All @@ -82,6 +86,39 @@ void* run_create_span_func() {
return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span;
}

AtomicInteger128::Value AtomicInteger128::load() const {
#if __x86_64__ || __ARM_NEON
// Supress compiler warning.
(void)_mutex;
#endif // __x86_64__ || __ARM_NEON

#if __x86_64__ || __ARM_NEON
#ifdef __x86_64__
__m128i value = _mm_load_si128(reinterpret_cast<const __m128i*>(&_value));
#else // __ARM_NEON
int64x2_t value = vld1q_s64(reinterpret_cast<const int64_t*>(&_value));
#endif // __x86_64__
return {value[0], value[1]};
#else // __x86_64__ || __ARM_NEON
BAIDU_SCOPED_LOCK(_mutex);
return _value;
#endif // __x86_64__ || __ARM_NEON
}

void AtomicInteger128::store(Value value) {
#if __x86_64__
__m128i v = _mm_load_si128(reinterpret_cast<__m128i*>(&value));
_mm_store_si128(reinterpret_cast<__m128i*>(&_value), v);
#elif __ARM_NEON
int64x2_t v = vld1q_s64(reinterpret_cast<int64_t*>(&value));
vst1q_s64(reinterpret_cast<int64_t*>(&_value), v);
#else
BAIDU_SCOPED_LOCK(_mutex);
_value = value;
#endif // __x86_64__ || __ARM_NEON
}


int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) {
TaskMeta* const m = address_meta(tid);
if (m != NULL) {
Expand Down Expand Up @@ -148,6 +185,16 @@ static double get_cumulated_cputime_from_this(void* arg) {
return static_cast<TaskGroup*>(arg)->cumulated_cputime_ns() / 1000000000.0;
}

int64_t TaskGroup::cumulated_cputime_ns() const {
CPUTimeStat cpu_time_stat = _cpu_time_stat.load();
// Add the elapsed time of running bthread.
int64_t cumulated_cputime_ns = cpu_time_stat.cumulated_cputime_ns();
if (!cpu_time_stat.is_main_task()) {
cumulated_cputime_ns += butil::cpuwide_time_ns() - cpu_time_stat.last_run_ns();
}
return cumulated_cputime_ns;
}

void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
Expand All @@ -156,11 +203,11 @@ void TaskGroup::run_main_task() {
TaskGroup* dummy = this;
bthread_t tid;
while (wait_task(&tid)) {
TaskGroup::sched_to(&dummy, tid);
sched_to(&dummy, tid);
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1/*skip remained*/);
task_runner(1/*skip remained*/);
}
if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) {
char name[32];
Expand All @@ -176,31 +223,12 @@ void TaskGroup::run_main_task() {
}
}
// Don't forget to add elapse of last wait_task.
current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
current_task()->stat.cputime_ns +=
butil::cpuwide_time_ns() - _cpu_time_stat.load_unsafe().last_run_ns();
}

TaskGroup::TaskGroup(TaskControl* c)
: _cur_meta(NULL)
, _control(c)
, _num_nosignal(0)
, _nsignaled(0)
, _last_run_ns(butil::cpuwide_time_ns())
, _cumulated_cputime_ns(0)
, _nswitch(0)
, _last_context_remained(NULL)
, _last_context_remained_arg(NULL)
, _pl(NULL)
, _main_stack(NULL)
, _main_tid(0)
, _remote_num_nosignal(0)
, _remote_nsignaled(0)
#ifndef NDEBUG
, _sched_recursive_guard(0)
#endif
, _tag(BTHREAD_TAG_DEFAULT)
, _tid(-1) {
_steal_seed = butil::fast_rand();
_steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
: _control(c) {
CHECK(c);
}

Expand Down Expand Up @@ -292,8 +320,12 @@ int TaskGroup::init(size_t runqueue_capacity) {
_cur_meta = m;
_main_tid = m->tid;
_main_stack = stk;
_last_run_ns = butil::cpuwide_time_ns();

CPUTimeStat cpu_time_stat;
cpu_time_stat.set_last_run_ns(m->cpuwide_start_ns, true);
_cpu_time_stat.store(cpu_time_stat);
_last_cpu_clock_ns = 0;

return 0;
}

Expand Down Expand Up @@ -414,7 +446,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) {

g->_control->_nbthreads << -1;
g->_control->tag_nbthreads(g->tag()) << -1;
g->set_remained(TaskGroup::_release_last_context, m);
g->set_remained(_release_last_context, m);
ending_sched(&g);

} while (g->_cur_meta->tid != g->_main_tid);
Expand Down Expand Up @@ -491,9 +523,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
fn = ready_to_run_in_worker;
}
ReadyToRunArgs args = {
g->tag(),
g->_cur_meta,
(bool)(using_attr.flags & BTHREAD_NOSIGNAL)
g->tag(), g->_cur_meta, (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
};
g->set_remained(fn, &args);
sched_to(pg, m->tid);
Expand Down Expand Up @@ -678,14 +708,18 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) {
}
#endif
// Save errno so that errno is bthread-specific.
const int saved_errno = errno;
int saved_errno = errno;
void* saved_unique_user_ptr = tls_unique_user_ptr;

TaskMeta* const cur_meta = g->_cur_meta;
const int64_t now = butil::cpuwide_time_ns();
const int64_t elp_ns = now - g->_last_run_ns;
g->_last_run_ns = now;
int64_t now = butil::cpuwide_time_ns();
CPUTimeStat cpu_time_stat = g->_cpu_time_stat.load_unsafe();
int64_t elp_ns = now - cpu_time_stat.last_run_ns();
cur_meta->stat.cputime_ns += elp_ns;
// Update cpu_time_stat.
cpu_time_stat.set_last_run_ns(now, is_main_task(g, next_meta->tid));
cpu_time_stat.add_cumulated_cputime_ns(elp_ns, is_main_task(g, cur_meta->tid));
g->_cpu_time_stat.store(cpu_time_stat);

if (FLAGS_bthread_enable_cpu_clock_stat) {
const int64_t cpu_thread_time = butil::cputhread_time_ns();
Expand All @@ -696,10 +730,7 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) {
} else {
g->_last_cpu_clock_ns = 0;
}

if (cur_meta->tid != g->main_tid()) {
g->_cumulated_cputime_ns += elp_ns;
}

++cur_meta->stat.nswitch;
++ g->_nswitch;
// Switch to the task
Expand Down
Loading
Loading