diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index 53495ea6d7..a8f1b9dc0e 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -21,9 +21,9 @@ #include "butil/fd_utility.h" // make_close_on_exec #include "butil/logging.h" // LOG #include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32 +#include "bvar/latency_recorder.h" // bvar::LatencyRecorder #include "bthread/bthread.h" // bthread_start_background #include "brpc/event_dispatcher.h" -#include "brpc/reloadable_flags.h" DECLARE_int32(task_group_ntags); @@ -37,6 +37,8 @@ DEFINE_bool(usercode_in_coroutine, false, "User's callback are run in coroutine, no bthread or pthread blocking call"); static EventDispatcher* g_edisp = NULL; +static bvar::LatencyRecorder* g_edisp_read_lantency = NULL; +static bvar::LatencyRecorder* g_edisp_write_lantency = NULL; static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT; static void StopAndJoinGlobalDispatchers() { @@ -46,8 +48,13 @@ static void StopAndJoinGlobalDispatchers() { g_edisp[i * FLAGS_event_dispatcher_num + j].Join(); } } + delete g_edisp_read_lantency; + delete g_edisp_write_lantency; } void InitializeGlobalDispatchers() { + g_edisp_read_lantency = new bvar::LatencyRecorder("event_dispatcher_read_latency"); + g_edisp_write_lantency = new bvar::LatencyRecorder("event_dispatcher_write_latency"); + g_edisp = new EventDispatcher[FLAGS_task_group_ntags * FLAGS_event_dispatcher_num]; for (int i = 0; i < FLAGS_task_group_ntags; ++i) { for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) { diff --git a/src/brpc/event_dispatcher_epoll.cpp b/src/brpc/event_dispatcher_epoll.cpp index 64717b1623..0ea404fff6 100644 --- a/src/brpc/event_dispatcher_epoll.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -222,14 +222,18 @@ void EventDispatcher::Run() { || (e[i].events & has_epollrdhup) #endif ) { + int64_t start_ns = butil::cpuwide_time_ns(); // We don't care about the return value. CallInputEventCallback(e[i].data.u64, e[i].events, _thread_attr); + (*g_edisp_read_lantency) << (butil::cpuwide_time_ns() - start_ns); } } for (int i = 0; i < n; ++i) { if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) { + int64_t start_ns = butil::cpuwide_time_ns(); // We don't care about the return value. CallOutputEventCallback(e[i].data.u64, e[i].events, _thread_attr); + (*g_edisp_write_lantency) << (butil::cpuwide_time_ns() - start_ns); } } } diff --git a/src/brpc/event_dispatcher_kqueue.cpp b/src/brpc/event_dispatcher_kqueue.cpp index 97ad29bba6..a179048604 100644 --- a/src/brpc/event_dispatcher_kqueue.cpp +++ b/src/brpc/event_dispatcher_kqueue.cpp @@ -205,16 +205,20 @@ void EventDispatcher::Run() { } for (int i = 0; i < n; ++i) { if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) { + int64_t start_ns = butil::cpuwide_time_ns(); // We don't care about the return value. CallInputEventCallback((IOEventDataId)e[i].udata, e[i].filter, _thread_attr); + (*g_edisp_read_lantency) << (butil::cpuwide_time_ns() - start_ns); } } for (int i = 0; i < n; ++i) { if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) { + int64_t start_ns = butil::cpuwide_time_ns(); // We don't care about the return value. CallOutputEventCallback((IOEventDataId)e[i].udata, e[i].filter, _thread_attr); + (*g_edisp_write_lantency) << (butil::cpuwide_time_ns() - start_ns); } } } diff --git a/test/bthread_unittest.cpp b/test/bthread_unittest.cpp index 0286db9904..57f4fc82bd 100644 --- a/test/bthread_unittest.cpp +++ b/test/bthread_unittest.cpp @@ -111,6 +111,7 @@ TEST_F(BthreadTest, call_bthread_functions_before_tls_created) { ASSERT_EQ(0UL, bthread_self()); } +butil::atomic start(false); butil::atomic stop(false); void* sleep_for_awhile(void* arg) { @@ -128,6 +129,7 @@ void* just_exit(void* arg) { } void* repeated_sleep(void* arg) { + start = true; for (size_t i = 0; !stop; ++i) { LOG(INFO) << "repeated_sleep(" << arg << ") i=" << i; bthread_usleep(1000000L); @@ -136,6 +138,7 @@ void* repeated_sleep(void* arg) { } void* spin_and_log(void* arg) { + start = true; // This thread never yields CPU. butil::EveryManyUS every_1s(1000000L); size_t i = 0; @@ -620,10 +623,13 @@ TEST_F(BthreadTest, yield_single_thread) { #ifdef BRPC_BTHREAD_TRACER TEST_F(BthreadTest, trace) { + start = false; stop = false; bthread_t th; ASSERT_EQ(0, bthread_start_urgent(&th, NULL, spin_and_log, (void*)1)); - usleep(100 * 1000); + while (!start) { + usleep(10 * 1000); + } bthread::FLAGS_enable_fast_unwind = false; std::string st = bthread::stack_trace(th); LOG(INFO) << "fast_unwind spin_and_log stack trace:\n" << st; @@ -636,9 +642,12 @@ TEST_F(BthreadTest, trace) { stop = true; ASSERT_EQ(0, bthread_join(th, NULL)); + start = false; stop = false; ASSERT_EQ(0, bthread_start_urgent(&th, NULL, repeated_sleep, (void*)1)); - usleep(100 * 1000); + while (!start) { + usleep(10 * 1000); + } bthread::FLAGS_enable_fast_unwind = false; st = bthread::stack_trace(th); LOG(INFO) << "fast_unwind repeated_sleep stack trace:\n" << st;