Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/cn/bthread_tagged_task_group.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@

```c++
服务端启动
./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20 -bthread_min_concurrency 12 -event_dispatcher_num 1
./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20 -bthread_min_concurrency 8 -event_dispatcher_num 1

客户端启动
./echo_client -dummy_port 8888 -server "0.0.0.0:8002" -use_bthread true
./echo_client -dummy_port 8889 -server "0.0.0.0:8003" -use_bthread true
```

FLAGS_bthread_concurrency为所有分组的线程数的上限,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。
FLAGS_bthread_concurrency为所有线程的数,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。
一般情况应用创建的bthread不需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行;如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值,这么做会对性能有些损失,关键路径上应该避免这么做。

Q:如何动态改变分组线程的数量?

A:server的线程数最少为4个,后台任务线程数最少为2个,所以上面的例子中,FLAGS_bthread_concurrency最小值为4+4+2=10,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。对于server,如果设置了ServerOption.bthread_tag,num_threads的含义是这个分组的线程数;如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_thread的含义是所有分组的线程数
A:你可以根据你的服务更自由的设计你的每个分组的线程数,启动的时候会根据你设置的 bthread_concurrency 来初始化线程池,如果你设置了 bthread_min_concurrency,那么会根据 bthread_min_concurrency 来设置线程池,对于 server 来说,num_threads 就是该 tag 对应的 worker 数量。可以通过设置 FLAGS_bthread_current_tag 和 FLAGS_bthread_concurrency_by_tag 来改变某个分组的线程数。如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_threads的含义是所有分组的 worker 总数

Q:不同分组之间有什么关系吗?

Expand Down
4 changes: 2 additions & 2 deletions example/bthread_tag_echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ DEFINE_int32(port2, 8003, "TCP Port of this server");
DEFINE_int32(tag1, 0, "Server1 tag");
DEFINE_int32(tag2, 1, "Server2 tag");
DEFINE_int32(tag3, 2, "Background task tag");
DEFINE_int32(num_threads1, 4, "Thread number of server1");
DEFINE_int32(num_threads2, 4, "Thread number of server2");
DEFINE_int32(num_threads1, 6, "Thread number of server1");
DEFINE_int32(num_threads2, 16, "Thread number of server2");
DEFINE_int32(idle_timeout_s, -1,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
Expand Down
6 changes: 1 addition & 5 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1044,11 +1044,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
_options.num_threads = BTHREAD_MIN_CONCURRENCY;
}
if (original_bthread_tag == BTHREAD_TAG_INVALID) {
bthread_setconcurrency(_options.num_threads);
} else {
bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag);
}
bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag);
}

for (MethodMap::iterator it = _method_map.begin();
Expand Down
20 changes: 11 additions & 9 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,20 +397,22 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) {
}
auto c = bthread::get_or_new_task_control();
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
auto ngroup = c->concurrency();
auto tag_ngroup = c->concurrency(tag);
auto add = num - tag_ngroup;
if (ngroup + add > bthread::FLAGS_bthread_concurrency) {
LOG(ERROR) << "Fail to set concurrency by tag " << tag
<< ", Total concurrency larger than bthread_concurrency";
return EPERM;
}
auto added = 0;

if (add > 0) {
added = c->add_workers(add, tag);
auto added = c->add_workers(add, tag);
bthread::FLAGS_bthread_concurrency += added;
return (add == added ? 0 : EPERM);

} else if (add < 0){
LOG(WARNING) << "Fail to set concurrency by tag: " << tag
<< ", tag concurrency must larger than old oncurrency. old concurrency: "
<< tag_ngroup << ", new concurrency: " << num;
return EPERM;
} else {
return 0;
}
return (num == tag_ngroup ? 0 : EPERM);
}

int bthread_about_to_quit() {
Expand Down
6 changes: 4 additions & 2 deletions test/bthread_setconcurrency_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,11 @@ int concurrency_by_tag(int num) {

TEST(BthreadTest, concurrency_by_tag) {
ASSERT_EQ(concurrency_by_tag(1), false);
auto con = bthread_getconcurrency_by_tag(0);
auto tag_con = bthread_getconcurrency_by_tag(0);
auto con = bthread_getconcurrency();
ASSERT_EQ(concurrency_by_tag(con), true);
ASSERT_EQ(concurrency_by_tag(con + 1), false);
ASSERT_EQ(concurrency_by_tag(con + 1), true);
ASSERT_EQ(bthread_getconcurrency(), con+1);
bthread_setconcurrency(con + 1);
ASSERT_EQ(concurrency_by_tag(con + 1), true);
}
Expand Down