Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/cn/bthread_tagged_task_group.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,23 @@ FLAGS_bthread_concurrency为所有分组的线程数的上限,FLAGS_bthread_mi
一般情况应用创建的bthread不需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行;如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值,这么做会对性能有些损失,关键路径上应该避免这么做。

Q:如何动态改变分组线程的数量?
A:当前每个分组的线程数最少为4个,所以先设置FLAGS_bthread_concurrency=4*分组数,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。

A:server的线程数最少为4个,后台任务线程数最少为2个,所以上面的例子中,FLAGS_bthread_concurrency最小值为4+4+2=10,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。对于server,如果设置了ServerOption.bthread_tag,num_threads的含义是这个分组的线程数;如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_thread的含义是所有分组的线程数。

Q:不同分组之间有什么关系吗?

A:不同分组是独立的线程池和事件驱动器,完全没有关系。

Q:可以在分组之间做bthread的同步操作吗?

A:可以的,每个bthread都有自己的tag标签,挂起后重新投入运行将继续在这个tag的线程池上执行。

Q:客户端发送和接收RPC消息是在哪个分组上执行的?

A:这取决于客户端的上下文,如果客户端不在任何tag分组上,那将使用tag0分组收发消息;否则将在当前所在的tag分组收发消息。

Q:如何将一个分组的线程绑定到指定的一些cpu上面。

A:int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t))这个函数用于在某个分组上做一些初始化的工作,比如:可以实现绑核的代码,根据tag入参来确定不同分组绑定不同的cpu。

# 监控
Expand Down
15 changes: 11 additions & 4 deletions example/bthread_tag_echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

// A server to receive EchoRequest and send back EchoResponse.

#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/server.h>
#include <bthread/unstable.h>
#include <butil/logging.h>
#include <gflags/gflags.h>
#include "echo.pb.h"

DEFINE_bool(echo_attachment, true, "Echo attachment as well");
Expand All @@ -29,6 +29,8 @@ DEFINE_int32(port2, 8003, "TCP Port of this server");
DEFINE_int32(tag1, 0, "Server1 tag");
DEFINE_int32(tag2, 1, "Server2 tag");
DEFINE_int32(tag3, 2, "Background task tag");
DEFINE_int32(num_threads1, 4, "Thread number of server1");
DEFINE_int32(num_threads2, 4, "Thread number of server2");
DEFINE_int32(idle_timeout_s, -1,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
Expand Down Expand Up @@ -63,8 +65,11 @@ static void my_tagged_worker_start_fn(bthread_tag_t tag) {
}

static void* my_background_task(void*) {
LOG(INFO) << "run background task tag=" << bthread_self_tag();
bthread_usleep(1000000UL);
while (true) {
LOG(INFO) << "run background task tag=" << bthread_self_tag();
bthread_usleep(1000000UL);
}
return nullptr;
}

int main(int argc, char* argv[]) {
Expand Down Expand Up @@ -102,6 +107,7 @@ int main(int argc, char* argv[]) {
options1.max_concurrency = FLAGS_max_concurrency;
options1.internal_port = FLAGS_internal_port1;
options1.bthread_tag = FLAGS_tag1;
options1.num_threads = FLAGS_num_threads1;
if (server1.Start(FLAGS_port1, &options1) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
Expand All @@ -127,6 +133,7 @@ int main(int argc, char* argv[]) {
options2.max_concurrency = FLAGS_max_concurrency;
options2.internal_port = FLAGS_internal_port2;
options2.bthread_tag = FLAGS_tag2;
options2.num_threads = FLAGS_num_threads2;
if (server2.Start(FLAGS_port2, &options2) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
Expand Down
25 changes: 17 additions & 8 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ ServerOptions::ServerOptions()
, health_reporter(NULL)
, rtmp_service(NULL)
, redis_service(NULL)
, bthread_tag(BTHREAD_TAG_DEFAULT) {
, bthread_tag(BTHREAD_TAG_INVALID) {
if (s_ncore > 0) {
num_threads = s_ncore + 1;
}
Expand Down Expand Up @@ -833,6 +833,17 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
#endif
}

auto original_bthread_tag = _options.bthread_tag;
if (original_bthread_tag == BTHREAD_TAG_INVALID) {
_options.bthread_tag = BTHREAD_TAG_DEFAULT;
}
if (_options.bthread_tag < BTHREAD_TAG_DEFAULT ||
_options.bthread_tag >= FLAGS_task_group_ntags) {
LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is ["
<< BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")";
return -1;
}

if (_options.http_master_service) {
// Check requirements for http_master_service:
// has "default_method" & request/response have no fields
Expand Down Expand Up @@ -1020,7 +1031,11 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
_options.num_threads = BTHREAD_MIN_CONCURRENCY;
}
bthread_setconcurrency(_options.num_threads);
if (original_bthread_tag == BTHREAD_TAG_INVALID) {
bthread_setconcurrency(_options.num_threads);
} else {
bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag);
}
}

for (MethodMap::iterator it = _method_map.begin();
Expand Down Expand Up @@ -1085,12 +1100,6 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
return -1;
}
_am->_use_rdma = _options.use_rdma;
if (_options.bthread_tag < BTHREAD_TAG_DEFAULT ||
_options.bthread_tag >= FLAGS_task_group_ntags) {
LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is ["
<< BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")";
return -1;
}
_am->_bthread_tag = _options.bthread_tag;
}
// Set `_status' to RUNNING before accepting connections
Expand Down
8 changes: 2 additions & 6 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,18 +392,14 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) {
if (tag < BTHREAD_TAG_DEFAULT || tag >= FLAGS_task_group_ntags) {
return EPERM;
}
auto c = bthread::get_or_new_task_control();
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
auto c = bthread::get_task_control();
if (c == NULL) {
bthread::FLAGS_bthread_concurrency_by_tag = 0;
return 0;
}
auto ngroup = c->concurrency();
auto tag_ngroup = c->concurrency(tag);
auto add = num - tag_ngroup;
if (ngroup + add > bthread::FLAGS_bthread_concurrency) {
LOG(ERROR) << "Fail to set concurrency by tag " << tag
<< ", Whole concurrency larger than bthread_concurrency";
<< ", Total concurrency larger than bthread_concurrency";
return EPERM;
}
auto added = 0;
Expand Down