Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/brpc/event_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
#include "butil/fd_utility.h" // make_close_on_exec
#include "butil/logging.h" // LOG
#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
#include "bvar/latency_recorder.h" // bvar::LatencyRecorder
#include "bthread/bthread.h" // bthread_start_background
#include "brpc/event_dispatcher.h"
#include "brpc/reloadable_flags.h"

DECLARE_int32(task_group_ntags);

Expand All @@ -37,6 +37,8 @@ DEFINE_bool(usercode_in_coroutine, false,
"User's callback are run in coroutine, no bthread or pthread blocking call");

static EventDispatcher* g_edisp = NULL;
static bvar::LatencyRecorder* g_edisp_read_lantency = NULL;
static bvar::LatencyRecorder* g_edisp_write_lantency = NULL;
static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;

static void StopAndJoinGlobalDispatchers() {
Expand All @@ -46,8 +48,13 @@ static void StopAndJoinGlobalDispatchers() {
g_edisp[i * FLAGS_event_dispatcher_num + j].Join();
}
}
delete g_edisp_read_lantency;
delete g_edisp_write_lantency;
}
void InitializeGlobalDispatchers() {
g_edisp_read_lantency = new bvar::LatencyRecorder("event_dispatcher_read_latency");
g_edisp_write_lantency = new bvar::LatencyRecorder("event_dispatcher_write_latency");

g_edisp = new EventDispatcher[FLAGS_task_group_ntags * FLAGS_event_dispatcher_num];
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) {
Expand Down
4 changes: 4 additions & 0 deletions src/brpc/event_dispatcher_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,18 @@ void EventDispatcher::Run() {
|| (e[i].events & has_epollrdhup)
#endif
) {
int64_t start_ns = butil::cpuwide_time_ns();
// We don't care about the return value.
CallInputEventCallback(e[i].data.u64, e[i].events, _thread_attr);
(*g_edisp_read_lantency) << (butil::cpuwide_time_ns() - start_ns);
}
}
for (int i = 0; i < n; ++i) {
if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
int64_t start_ns = butil::cpuwide_time_ns();
// We don't care about the return value.
CallOutputEventCallback(e[i].data.u64, e[i].events, _thread_attr);
(*g_edisp_write_lantency) << (butil::cpuwide_time_ns() - start_ns);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/brpc/event_dispatcher_kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,20 @@ void EventDispatcher::Run() {
}
for (int i = 0; i < n; ++i) {
if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) {
int64_t start_ns = butil::cpuwide_time_ns();
// We don't care about the return value.
CallInputEventCallback((IOEventDataId)e[i].udata,
e[i].filter, _thread_attr);
(*g_edisp_read_lantency) << (butil::cpuwide_time_ns() - start_ns);
}
}
for (int i = 0; i < n; ++i) {
if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) {
int64_t start_ns = butil::cpuwide_time_ns();
// We don't care about the return value.
CallOutputEventCallback((IOEventDataId)e[i].udata,
e[i].filter, _thread_attr);
(*g_edisp_write_lantency) << (butil::cpuwide_time_ns() - start_ns);
}
}
}
Expand Down
13 changes: 11 additions & 2 deletions test/bthread_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ TEST_F(BthreadTest, call_bthread_functions_before_tls_created) {
ASSERT_EQ(0UL, bthread_self());
}

butil::atomic<bool> start(false);
butil::atomic<bool> stop(false);

void* sleep_for_awhile(void* arg) {
Expand All @@ -128,6 +129,7 @@ void* just_exit(void* arg) {
}

void* repeated_sleep(void* arg) {
start = true;
for (size_t i = 0; !stop; ++i) {
LOG(INFO) << "repeated_sleep(" << arg << ") i=" << i;
bthread_usleep(1000000L);
Expand All @@ -136,6 +138,7 @@ void* repeated_sleep(void* arg) {
}

void* spin_and_log(void* arg) {
start = true;
// This thread never yields CPU.
butil::EveryManyUS every_1s(1000000L);
size_t i = 0;
Expand Down Expand Up @@ -620,10 +623,13 @@ TEST_F(BthreadTest, yield_single_thread) {

#ifdef BRPC_BTHREAD_TRACER
TEST_F(BthreadTest, trace) {
start = false;
stop = false;
bthread_t th;
ASSERT_EQ(0, bthread_start_urgent(&th, NULL, spin_and_log, (void*)1));
usleep(100 * 1000);
while (!start) {
usleep(10 * 1000);
}
bthread::FLAGS_enable_fast_unwind = false;
std::string st = bthread::stack_trace(th);
LOG(INFO) << "fast_unwind spin_and_log stack trace:\n" << st;
Expand All @@ -636,9 +642,12 @@ TEST_F(BthreadTest, trace) {
stop = true;
ASSERT_EQ(0, bthread_join(th, NULL));

start = false;
stop = false;
ASSERT_EQ(0, bthread_start_urgent(&th, NULL, repeated_sleep, (void*)1));
usleep(100 * 1000);
while (!start) {
usleep(10 * 1000);
}
bthread::FLAGS_enable_fast_unwind = false;
st = bthread::stack_trace(th);
LOG(INFO) << "fast_unwind repeated_sleep stack trace:\n" << st;
Expand Down