Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ DEFINE_int32(bthread_concurrency_by_tag, 8 + BTHREAD_EPOLL_THREAD_NUM,
"Number of pthread workers of FLAGS_bthread_current_tag");
BUTIL_VALIDATE_GFLAG(bthread_concurrency_by_tag, validate_bthread_concurrency_by_tag);

DEFINE_int32(bthread_parking_lot_of_each_tag, 4, "Number of parking lots of each tag");
BUTIL_VALIDATE_GFLAG(bthread_parking_lot_of_each_tag, [](const char*, int32_t val) {
if (val < BTHREAD_MIN_PARKINGLOT) {
LOG(ERROR) << "bthread_parking_lot_of_each_tag must be greater than or equal to "
<< BTHREAD_MIN_PARKINGLOT;
return false;
}
if (val > BTHREAD_MAX_PARKINGLOT) {
LOG(ERROR) << "bthread_parking_lot_of_each_tag must be less than or equal to "
<< BTHREAD_MAX_PARKINGLOT;
return false;
}
return true;
});

static bool never_set_bthread_concurrency = true;

BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomic_size_match);
Expand Down Expand Up @@ -216,7 +231,7 @@ static bool validate_bthread_current_tag(const char*, int32_t val) {
return false;
}
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
auto c = bthread::get_task_control();
auto c = get_task_control();
Comment thread
chenBright marked this conversation as resolved.
if (c == NULL) {
FLAGS_bthread_concurrency_by_tag = 8 + BTHREAD_EPOLL_THREAD_NUM;
return true;
Expand Down
4 changes: 4 additions & 0 deletions src/bthread/parking_lot.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
// Wait for tasks.
// If the `expected_state' does not match, wait() may finish directly.
void wait(const State& expected_state) {
if (get_state().val != expected_state.val) {
// Fast path, no need to futex_wait.
return;
}
_waiter_num.fetch_add(1, butil::memory_order_relaxed);
futex_wait_private(&_pending_signal, expected_state.val, NULL);
_waiter_num.fetch_sub(1, butil::memory_order_relaxed);
Expand Down
21 changes: 10 additions & 11 deletions src/bthread/task_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace bthread {

DECLARE_int32(bthread_concurrency);
DECLARE_int32(bthread_min_concurrency);
DECLARE_int32(bthread_parking_lot_of_each_tag);

extern pthread_mutex_t g_task_control_mutex;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
Expand Down Expand Up @@ -184,7 +185,8 @@ TaskControl::TaskControl()
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
, _priority_queues(FLAGS_task_group_ntags)
, _pl(FLAGS_task_group_ntags)
, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
, _tagged_pl(FLAGS_task_group_ntags)
{}

int TaskControl::init(int concurrency) {
Expand Down Expand Up @@ -324,7 +326,7 @@ void TaskControl::stop_and_join() {
[](butil::atomic<size_t>& index) { index.store(0, butil::memory_order_relaxed); });
}
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
for (auto& pl : _pl[i]) {
for (auto& pl : _tagged_pl[i]) {
pl.stop();
}
}
Expand Down Expand Up @@ -365,7 +367,7 @@ int TaskControl::_add_group(TaskGroup* g, bthread_tag_t tag) {
return -1;
}
g->set_tag(tag);
g->set_pl(&_pl[tag][butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM]);
g->set_pl(&_tagged_pl[tag][butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag]);
size_t ngroup = _tagged_ngroup[tag].load(butil::memory_order_relaxed);
if (ngroup < (size_t)BTHREAD_MAX_CONCURRENCY) {
_tagged_groups[tag][ngroup] = g;
Expand Down Expand Up @@ -480,14 +482,11 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) {
num_task = 2;
}
auto& pl = tag_pl(tag);
int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
num_task -= pl[start_index].signal(1);
if (num_task > 0) {
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
if (++start_index >= PARKING_LOT_NUM) {
start_index = 0;
}
num_task -= pl[start_index].signal(1);
size_t start_index = butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag;
for (size_t i = 0; i < _pl_num_of_each_tag && num_task > 0; ++i) {
num_task -= pl[start_index].signal(1);
if (++start_index >= _pl_num_of_each_tag) {
start_index = 0;
Comment thread
chenBright marked this conversation as resolved.
}
}
if (num_task > 0 &&
Expand Down
8 changes: 4 additions & 4 deletions src/bthread/task_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ friend bthread_t init_for_pthread_stack_trace();

private:
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
static const int PARKING_LOT_NUM = 4;
typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot;
typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
// Add/Remove a TaskGroup.
// Returns 0 on success, -1 otherwise.
int _add_group(TaskGroup*, bthread_tag_t tag);
Expand All @@ -117,7 +116,7 @@ friend bthread_t init_for_pthread_stack_trace();
butil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; }

// Tag parking slot
TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; }
TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; }

static void delete_task_group(void* arg);

Expand Down Expand Up @@ -159,7 +158,8 @@ friend bthread_t init_for_pthread_stack_trace();
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
std::vector<WorkStealingQueue<bthread_t>> _priority_queues;

std::vector<TaggedParkingLot> _pl;
size_t _pl_num_of_each_tag;
std::vector<TaggedParkingLot> _tagged_pl;

#ifdef BRPC_BTHREAD_TRACER
TaskTracer _task_tracer;
Expand Down
3 changes: 3 additions & 0 deletions src/bthread/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ static const bthread_t BTHREAD_ATOMIC_INIT = 0;
// Min/Max number of work pthreads.
static const int BTHREAD_MIN_CONCURRENCY = 3 + BTHREAD_EPOLL_THREAD_NUM;
static const int BTHREAD_MAX_CONCURRENCY = 1024;
// Min/max number of ParkingLot.
static const int BTHREAD_MIN_PARKINGLOT = 4;
static const int BTHREAD_MAX_PARKINGLOT = 1024;

typedef struct {
void* impl;
Expand Down
Loading