From 1be836c191c983a7f2d6ad017dbe8708842ec038 Mon Sep 17 00:00:00 2001 From: "jiazheng.jia" Date: Tue, 29 Oct 2024 19:54:25 +0800 Subject: [PATCH 1/2] set tags workers unlimitedly --- docs/cn/bthread_tagged_task_group.md | 6 +++--- example/bthread_tag_echo_c++/server.cpp | 4 ++-- src/brpc/server.cpp | 6 +----- src/bthread/bthread.cpp | 20 +++++++++++--------- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/docs/cn/bthread_tagged_task_group.md b/docs/cn/bthread_tagged_task_group.md index bfdafd76e7..027bd4eb9e 100644 --- a/docs/cn/bthread_tagged_task_group.md +++ b/docs/cn/bthread_tagged_task_group.md @@ -11,19 +11,19 @@ ```c++ 服务端启动 -./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20 -bthread_min_concurrency 12 -event_dispatcher_num 1 +./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20 -bthread_min_concurrency 8 -event_dispatcher_num 1 客户端启动 ./echo_client -dummy_port 8888 -server "0.0.0.0:8002" -use_bthread true ./echo_client -dummy_port 8889 -server "0.0.0.0:8003" -use_bthread true ``` -FLAGS_bthread_concurrency为所有分组的线程数的上限,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。 +FLAGS_bthread_concurrency为所有线程的数,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。 一般情况应用创建的bthread不需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行;如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值,这么做会对性能有些损失,关键路径上应该避免这么做。 Q:如何动态改变分组线程的数量? -A:server的线程数最少为4个,后台任务线程数最少为2个,所以上面的例子中,FLAGS_bthread_concurrency最小值为4+4+2=10,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。对于server,如果设置了ServerOption.bthread_tag,num_threads的含义是这个分组的线程数;如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_thread的含义是所有分组的线程数。 +A:你可以根据你的服务更自由的设计你的每个分组的线程数,启动的时候会根据你设置的 bthread_concurrency 来初始化线程池,如果你设置了 bthread_min_concurrency,那么会根据 bthread_min_concurrency 来设置线程池,对于 server 来说,num_threads 就是该 tag 对应的 worker 数量。可以通过设置 FLAGS_bthread_current_tag 和 FLAGS_bthread_concurrency_by_tag 来改变某个分组的线程数。如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_threads的含义是所有分组的 worker 总数。 Q:不同分组之间有什么关系吗? diff --git a/example/bthread_tag_echo_c++/server.cpp b/example/bthread_tag_echo_c++/server.cpp index bc717e2544..ed4ba4d6c3 100644 --- a/example/bthread_tag_echo_c++/server.cpp +++ b/example/bthread_tag_echo_c++/server.cpp @@ -29,8 +29,8 @@ DEFINE_int32(port2, 8003, "TCP Port of this server"); DEFINE_int32(tag1, 0, "Server1 tag"); DEFINE_int32(tag2, 1, "Server2 tag"); DEFINE_int32(tag3, 2, "Background task tag"); -DEFINE_int32(num_threads1, 4, "Thread number of server1"); -DEFINE_int32(num_threads2, 4, "Thread number of server2"); +DEFINE_int32(num_threads1, 6, "Thread number of server1"); +DEFINE_int32(num_threads2, 16, "Thread number of server2"); DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " "read/write operations during the last `idle_timeout_s'"); diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index fa3ab7d7ed..0110761af7 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -1044,11 +1044,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint, if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) { _options.num_threads = BTHREAD_MIN_CONCURRENCY; } - if (original_bthread_tag == BTHREAD_TAG_INVALID) { - bthread_setconcurrency(_options.num_threads); - } else { - bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag); - } + bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag); } for (MethodMap::iterator it = _method_map.begin(); diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index dcd29c43dc..7bd459a5da 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -397,20 +397,22 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) { } auto c = bthread::get_or_new_task_control(); BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex); - auto ngroup = c->concurrency(); auto tag_ngroup = c->concurrency(tag); auto add = num - tag_ngroup; - if (ngroup + add > bthread::FLAGS_bthread_concurrency) { - LOG(ERROR) << "Fail to set concurrency by tag " << tag - << ", Total concurrency larger than bthread_concurrency"; - return EPERM; - } - auto added = 0; + if (add > 0) { - added = c->add_workers(add, tag); + bthread::FLAGS_bthread_concurrency += add; + auto added = c->add_workers(add, tag); return (add == added ? 0 : EPERM); + + } else if (add < 0){ + LOG(WARNING) << "Fail to set concurrency by tag: " << tag + << ", tag concurrency must larger than old oncurrency. old concurrency: " + << tag_ngroup << ", new concurrency: " << num; + return EPERM; + } else { + return 0; } - return (num == tag_ngroup ? 0 : EPERM); } int bthread_about_to_quit() { From ad750160a63b1ddac6cdda628017255f09eee420 Mon Sep 17 00:00:00 2001 From: "jiazheng.jia" Date: Wed, 30 Oct 2024 15:09:58 +0800 Subject: [PATCH 2/2] fix set concurrency test --- src/bthread/bthread.cpp | 2 +- test/bthread_setconcurrency_unittest.cpp | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index 7bd459a5da..f963c4a63c 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -401,8 +401,8 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) { auto add = num - tag_ngroup; if (add > 0) { - bthread::FLAGS_bthread_concurrency += add; auto added = c->add_workers(add, tag); + bthread::FLAGS_bthread_concurrency += added; return (add == added ? 0 : EPERM); } else if (add < 0){ diff --git a/test/bthread_setconcurrency_unittest.cpp b/test/bthread_setconcurrency_unittest.cpp index 7c8faf401c..aa1d674c22 100644 --- a/test/bthread_setconcurrency_unittest.cpp +++ b/test/bthread_setconcurrency_unittest.cpp @@ -214,9 +214,11 @@ int concurrency_by_tag(int num) { TEST(BthreadTest, concurrency_by_tag) { ASSERT_EQ(concurrency_by_tag(1), false); - auto con = bthread_getconcurrency_by_tag(0); + auto tag_con = bthread_getconcurrency_by_tag(0); + auto con = bthread_getconcurrency(); ASSERT_EQ(concurrency_by_tag(con), true); - ASSERT_EQ(concurrency_by_tag(con + 1), false); + ASSERT_EQ(concurrency_by_tag(con + 1), true); + ASSERT_EQ(bthread_getconcurrency(), con+1); bthread_setconcurrency(con + 1); ASSERT_EQ(concurrency_by_tag(con + 1), true); }