Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/bthread/task_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ struct TaskMeta {
TaskStatus status{TASK_STATUS_UNKNOWN};
// Whether bthread is traced?
bool traced{false};
// [Not Reset] guarantee tracing completion before jumping.
pthread_mutex_t trace_lock{};
// Worker thread id.
pthread_t worker_tid{};

Expand All @@ -122,9 +124,11 @@ struct TaskMeta {
pthread_spin_init(&version_lock, 0);
version_butex = butex_create_checked<uint32_t>();
*version_butex = 1;
pthread_mutex_init(&trace_lock, NULL);
}

~TaskMeta() {
pthread_mutex_destroy(&trace_lock);
butex_destroy(version_butex);
version_butex = NULL;
pthread_spin_destroy(&version_lock);
Expand Down
92 changes: 34 additions & 58 deletions src/bthread/task_tracer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ void TaskTracer::Result::OutputToStream(std::ostream& os) const {

bool TaskTracer::Init() {
if (_trace_time.expose("bthread_trace_time") != 0) {
LOG(ERROR) << "Fail to expose bthread_trace_time";
return false;
}
if (!RegisterSignalHandler()) {
Expand All @@ -136,7 +137,7 @@ void TaskTracer::set_status(TaskStatus s, TaskMeta* m) {
CHECK_NE(TASK_STATUS_RUNNING, s) << "Use `set_running_status' instead";
CHECK_NE(TASK_STATUS_END, s) << "Use `set_end_status_unsafe' instead";

bool tracing;
bool tracing = false;
{
BAIDU_SCOPED_LOCK(m->version_lock);
if (TASK_STATUS_UNKNOWN == m->status && TASK_STATUS_JUMPING == s) {
Expand Down Expand Up @@ -182,31 +183,8 @@ void TaskTracer::Trace(std::ostream& os, bthread_t tid) {
}

void TaskTracer::WaitForTracing(TaskMeta* m) {
BAIDU_SCOPED_LOCK(_mutex);
while (m->traced) {
_cond.Wait();
}
}

TaskStatus TaskTracer::WaitForJumping(TaskMeta* m) {
// Reasons for not using locks here:
// 1. It is necessary to lock before jump_stack, unlock after jump_stack,
// which involves two different bthread and is prone to errors.
// 2. jump_stack is fast.
int i = 0;
do {
// The bthread is jumping now, spin until it finishes.
if (i++ < 30) {
cpu_relax();
} else {
sched_yield();
}

BAIDU_SCOPED_LOCK(m->version_lock);
if (TASK_STATUS_JUMPING != m->status) {
return m->status;
}
} while (true);
BAIDU_SCOPED_LOCK(m->trace_lock);
// Acquiring trace_lock means tracing is done.
}

TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
Expand All @@ -224,25 +202,44 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
// Make sure only one bthread is traced at a time.
BAIDU_SCOPED_LOCK(_trace_request_mutex);

// The chance to remove unused SignalSyncs.
auto iter = std::remove_if(
_inuse_signal_syncs.begin(), _inuse_signal_syncs.end(),
[](butil::intrusive_ptr<SignalSync>& sync) {
return sync->ref_count() == 1;
});
_inuse_signal_syncs.erase(iter, _inuse_signal_syncs.end());

TaskMeta* m = TaskGroup::address_meta(tid);
if (NULL == m) {
return Result::MakeErrorResult("bthread=%d never existed", tid);
}

BAIDU_SCOPED_LOCK(_mutex);
BAIDU_SCOPED_LOCK(m->trace_lock);
TaskStatus status;
pthread_t worker_tid;
const uint32_t given_version = get_version(tid);
{
BAIDU_SCOPED_LOCK(m->version_lock);
if (given_version == *m->version_butex) {
// Start tracing.
m->traced = true;
worker_tid = m->worker_tid;
status = m->status;
} else {
if (given_version != *m->version_butex) {
return Result::MakeErrorResult("bthread=%d not exist now", tid);
}

status = m->status;
if (TASK_STATUS_UNKNOWN == status) {
return Result::MakeErrorResult("bthread=%d not exist now", tid);
} else if (TASK_STATUS_CREATED == status) {
return Result::MakeErrorResult("bthread=%d has just been created", tid);
} else if (TASK_STATUS_FIRST_READY == status) {
return Result::MakeErrorResult("bthread=%d is scheduled for the first time", tid);
} else if (TASK_STATUS_END == status) {
return Result::MakeErrorResult("bthread=%d has ended", tid);
} else if (TASK_STATUS_JUMPING == status) {
return Result::MakeErrorResult("bthread=%d is jumping stack", tid);
}
// Start tracing.
m->traced = true;
worker_tid = m->worker_tid;
}

BRPC_SCOPE_EXIT {
Expand All @@ -252,31 +249,16 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
// tracing completion, so given_version != *m->version_butex is OK.
m->traced = false;
}
// Wake up the waiting worker thread to jump.
_cond.Signal();
};

if (TASK_STATUS_UNKNOWN == status) {
return Result::MakeErrorResult("bthread=%d not exist now", tid);
} else if (TASK_STATUS_CREATED == status) {
return Result::MakeErrorResult("bthread=%d has just been created", tid);
} else if (TASK_STATUS_FIRST_READY == status) {
return Result::MakeErrorResult("bthread=%d is scheduled for the first time", tid);
} else if (TASK_STATUS_END == status) {
return Result::MakeErrorResult("bthread=%d has ended", tid);
} else if (TASK_STATUS_JUMPING == status) {
// Wait for jumping completion.
status = WaitForJumping(m);
}

// After jumping, the status may be RUNNING, SUSPENDED, or READY, which is traceable.
// The status may be RUNNING, SUSPENDED, or READY, which is traceable.
if (TASK_STATUS_RUNNING == status) {
return SignalTrace(worker_tid);
} else if (TASK_STATUS_SUSPENDED == status || TASK_STATUS_READY == status) {
return ContextTrace(m->stack->context);
}

return Result::MakeErrorResult("Invalid TaskStatus=%d", status);
return Result::MakeErrorResult("Invalid TaskStatus=%d of bthread=%d", status, tid);
}

// Instruct ASan to ignore this function.
Expand Down Expand Up @@ -408,14 +390,6 @@ TaskTracer::Result TaskTracer::SignalTrace(pthread_t worker_tid) {
return Result::MakeErrorResult("Forbid to trace self");
}

// Remove unused SignalSyncs.
auto iter = std::remove_if(
_inuse_signal_syncs.begin(), _inuse_signal_syncs.end(),
[](butil::intrusive_ptr<SignalSync>& sync) {
return sync->ref_count() == 1;
});
_inuse_signal_syncs.erase(iter, _inuse_signal_syncs.end());

// Each signal trace has an independent SignalSync to
// prevent the previous SignalHandler from affecting the new SignalTrace.
butil::intrusive_ptr<SignalSync> signal_sync(new SignalSync());
Expand Down Expand Up @@ -465,6 +439,8 @@ TaskTracer::Result TaskTracer::SignalTrace(pthread_t worker_tid) {
}
break;
}
// Remove the successful SignalSync.
_inuse_signal_syncs.pop_back();

return signal_sync->result;
}
Expand Down
8 changes: 1 addition & 7 deletions src/bthread/task_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TaskTracer {
void Trace(std::ostream& os, bthread_t tid);

// When the worker is jumping stack from a bthread to another,
void WaitForTracing(TaskMeta* m);
static void WaitForTracing(TaskMeta* m);

private:
// Error number guard used in signal handler.
Expand Down Expand Up @@ -94,7 +94,6 @@ class TaskTracer {
Result result;
};

static TaskStatus WaitForJumping(TaskMeta* m);
Result TraceImpl(bthread_t tid);

unw_cursor_t MakeCursor(bthread_fcontext_t fcontext);
Expand All @@ -108,11 +107,6 @@ class TaskTracer {
// Make sure only one bthread is traced at a time.
Mutex _trace_request_mutex;

// For signal trace.
// Make sure bthread does not jump stack when it is being traced.
butil::Mutex _mutex;
butil::ConditionVariable _cond{&_mutex};

// For context trace.
unw_context_t _context{};

Expand Down
4 changes: 2 additions & 2 deletions test/brpc_http_rpc_protocol_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1701,9 +1701,9 @@ TEST_F(HttpTest, spring_protobuf_content_type) {
res.Clear();
cntl2.http_request().set_content_type("application/x-protobuf");
stub.Echo(&cntl2, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_FALSE(cntl2.Failed());
ASSERT_EQ(EXP_RESPONSE, res.message());
ASSERT_EQ("application/x-protobuf", cntl.http_response().content_type());
ASSERT_EQ("application/x-protobuf", cntl2.http_response().content_type());
}

TEST_F(HttpTest, dump_http_request) {
Expand Down
Loading