diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index 35d8747751..a5f178e331 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -59,6 +59,21 @@ DEFINE_int32(bthread_concurrency_by_tag, 8 + BTHREAD_EPOLL_THREAD_NUM, "Number of pthread workers of FLAGS_bthread_current_tag"); BUTIL_VALIDATE_GFLAG(bthread_concurrency_by_tag, validate_bthread_concurrency_by_tag); +DEFINE_int32(bthread_parking_lot_of_each_tag, 4, "Number of parking lots of each tag"); +BUTIL_VALIDATE_GFLAG(bthread_parking_lot_of_each_tag, [](const char*, int32_t val) { + if (val < BTHREAD_MIN_PARKINGLOT) { + LOG(ERROR) << "bthread_parking_lot_of_each_tag must be greater than or equal to " + << BTHREAD_MIN_PARKINGLOT; + return false; + } + if (val > BTHREAD_MAX_PARKINGLOT) { + LOG(ERROR) << "bthread_parking_lot_of_each_tag must be less than or equal to " + << BTHREAD_MAX_PARKINGLOT; + return false; + } + return true; +}); + static bool never_set_bthread_concurrency = true; BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic), atomic_size_match); @@ -216,7 +231,7 @@ static bool validate_bthread_current_tag(const char*, int32_t val) { return false; } BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex); - auto c = bthread::get_task_control(); + auto c = get_task_control(); if (c == NULL) { FLAGS_bthread_concurrency_by_tag = 8 + BTHREAD_EPOLL_THREAD_NUM; return true; diff --git a/src/bthread/parking_lot.h b/src/bthread/parking_lot.h index 620e3c89df..315e9956ae 100644 --- a/src/bthread/parking_lot.h +++ b/src/bthread/parking_lot.h @@ -60,6 +60,10 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot { // Wait for tasks. // If the `expected_state' does not match, wait() may finish directly. void wait(const State& expected_state) { + if (get_state().val != expected_state.val) { + // Fast path, no need to futex_wait. + return; + } _waiter_num.fetch_add(1, butil::memory_order_relaxed); futex_wait_private(&_pending_signal, expected_state.val, NULL); _waiter_num.fetch_sub(1, butil::memory_order_relaxed); diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 66307d323e..0b93d6a0c8 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -46,6 +46,7 @@ namespace bthread { DECLARE_int32(bthread_concurrency); DECLARE_int32(bthread_min_concurrency); +DECLARE_int32(bthread_parking_lot_of_each_tag); extern pthread_mutex_t g_task_control_mutex; extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; @@ -184,7 +185,8 @@ TaskControl::TaskControl() , _status(print_rq_sizes_in_the_tc, this) , _nbthreads("bthread_count") , _priority_queues(FLAGS_task_group_ntags) - , _pl(FLAGS_task_group_ntags) + , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag) + , _tagged_pl(FLAGS_task_group_ntags) {} int TaskControl::init(int concurrency) { @@ -324,7 +326,7 @@ void TaskControl::stop_and_join() { [](butil::atomic& index) { index.store(0, butil::memory_order_relaxed); }); } for (int i = 0; i < FLAGS_task_group_ntags; ++i) { - for (auto& pl : _pl[i]) { + for (auto& pl : _tagged_pl[i]) { pl.stop(); } } @@ -365,7 +367,7 @@ int TaskControl::_add_group(TaskGroup* g, bthread_tag_t tag) { return -1; } g->set_tag(tag); - g->set_pl(&_pl[tag][butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM]); + g->set_pl(&_tagged_pl[tag][butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag]); size_t ngroup = _tagged_ngroup[tag].load(butil::memory_order_relaxed); if (ngroup < (size_t)BTHREAD_MAX_CONCURRENCY) { _tagged_groups[tag][ngroup] = g; @@ -480,14 +482,11 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) { num_task = 2; } auto& pl = tag_pl(tag); - int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM; - num_task -= pl[start_index].signal(1); - if (num_task > 0) { - for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) { - if (++start_index >= PARKING_LOT_NUM) { - start_index = 0; - } - num_task -= pl[start_index].signal(1); + size_t start_index = butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag; + for (size_t i = 0; i < _pl_num_of_each_tag && num_task > 0; ++i) { + num_task -= pl[start_index].signal(1); + if (++start_index >= _pl_num_of_each_tag) { + start_index = 0; } } if (num_task > 0 && diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 2a2b76d6f2..ecc2cd6a85 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -103,8 +103,7 @@ friend bthread_t init_for_pthread_stack_trace(); private: typedef std::array TaggedGroups; - static const int PARKING_LOT_NUM = 4; - typedef std::array TaggedParkingLot; + typedef std::array TaggedParkingLot; // Add/Remove a TaskGroup. // Returns 0 on success, -1 otherwise. int _add_group(TaskGroup*, bthread_tag_t tag); @@ -117,7 +116,7 @@ friend bthread_t init_for_pthread_stack_trace(); butil::atomic& tag_ngroup(int tag) { return _tagged_ngroup[tag]; } // Tag parking slot - TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; } + TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; } static void delete_task_group(void* arg); @@ -159,7 +158,8 @@ friend bthread_t init_for_pthread_stack_trace(); std::vector*> _tagged_nbthreads; std::vector> _priority_queues; - std::vector _pl; + size_t _pl_num_of_each_tag; + std::vector _tagged_pl; #ifdef BRPC_BTHREAD_TRACER TaskTracer _task_tracer; diff --git a/src/bthread/types.h b/src/bthread/types.h index c0f23f1c29..30368f68e9 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -154,6 +154,9 @@ static const bthread_t BTHREAD_ATOMIC_INIT = 0; // Min/Max number of work pthreads. static const int BTHREAD_MIN_CONCURRENCY = 3 + BTHREAD_EPOLL_THREAD_NUM; static const int BTHREAD_MAX_CONCURRENCY = 1024; +// Min/max number of ParkingLot. +static const int BTHREAD_MIN_PARKINGLOT = 4; +static const int BTHREAD_MAX_PARKINGLOT = 1024; typedef struct { void* impl;