Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,16 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
return;
}

if (is_ending_rpc()) {
// SelectiveChannel may still deliver late SubDone callbacks after the
// main RPC has entered EndRPC(). Ignore those callbacks instead of
// letting them re-enter retry/backup on partially torn-down state.
_error_code = saved_error;
response_attachment().clear();
CHECK_EQ(0, bthread_id_unlock(info.id));
return;
}

if ((!_error_code && _retry_policy == NULL) ||
_current_call.nretry >= _max_retry) {
goto END_OF_RPC;
Expand Down Expand Up @@ -881,6 +891,8 @@ void Controller::Call::OnComplete(
}

void Controller::EndRPC(const CompletionInfo& info) {
add_flag(FLAGS_ENDING_RPC);

if (_timeout_id != 0) {
bthread_timer_del(_timeout_id);
_timeout_id = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20);
static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21);
static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22);
static const uint32_t FLAGS_ENDING_RPC = (1 << 23);

public:
struct Inheritable {
Expand Down Expand Up @@ -796,6 +797,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
return has_flag(FLAGS_ENABLED_CIRCUIT_BREAKER);
}

bool is_ending_rpc() const { return has_flag(FLAGS_ENDING_RPC); }

std::string& protocol_param() { return _thrift_method_name; }
const std::string& protocol_param() const { return _thrift_method_name; }

Expand Down
23 changes: 20 additions & 3 deletions src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ friend class SubDone;
Sender(Controller* cntl,
const google::protobuf::Message* request,
google::protobuf::Message* response,
SharedLoadBalancer* lb,
google::protobuf::Closure* user_done);
~Sender() { Clear(); }
int IssueRPC(int64_t start_realtime_us);
Expand All @@ -148,6 +149,7 @@ friend class SubDone;
Controller* _main_cntl;
const google::protobuf::Message* _request;
google::protobuf::Message* _response;
butil::intrusive_ptr<SharedLoadBalancer> _lb;
google::protobuf::Closure* _user_done;
short _nfree;
short _nalloc;
Expand Down Expand Up @@ -293,10 +295,12 @@ void ChannelBalancer::Describe(std::ostream& os,
Sender::Sender(Controller* cntl,
const google::protobuf::Message* request,
google::protobuf::Message* response,
SharedLoadBalancer* lb,
google::protobuf::Closure* user_done)
: _main_cntl(cntl)
, _request(request)
, _response(response)
, _lb(lb)
, _user_done(user_done)
, _nfree(0)
, _nalloc(0)
Expand All @@ -306,14 +310,20 @@ Sender::Sender(Controller* cntl,

int Sender::IssueRPC(int64_t start_realtime_us) {
_main_cntl->_current_call.need_feedback = false;
ChannelBalancer* balancer =
static_cast<ChannelBalancer*>(_lb.get());
if (balancer == NULL) {
_main_cntl->SetFailed(ECANCELED,
"SelectiveChannel balancer is unavailable");
return -1;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem seems unresolved. The process still crashes even if the balancer is deleted by main_cntl after the if statement.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenBright thanks for review, The previous null check was not enough because it did not keep the balancer alive after _lb.get().

I updated the patch so Sender holds its own intrusive_ptr<SharedLoadBalancer> captured from SelectiveChannel at creation time, and IssueRPC() uses that retained reference. This prevents main_cntl->_lb.reset() in EndRPC() from freeing the balancer while IssueRPC() is still using it.

The FLAGS_ENDING_RPC guard is kept to prevent late SubDone from re-entering retry / backup after the main RPC starts ending.

LoadBalancer::SelectIn sel_in = { start_realtime_us,
true,
_main_cntl->has_request_code(),
_main_cntl->_request_code,
_main_cntl->_accessed };
ChannelBalancer::SelectOut sel_out;
const int rc = static_cast<ChannelBalancer*>(_main_cntl->_lb.get())
->SelectChannel(sel_in, &sel_out);
const int rc = balancer->SelectChannel(sel_in, &sel_out);
if (rc != 0) {
_main_cntl->SetFailed(rc, "Fail to select channel, %s", berror(rc));
return -1;
Expand Down Expand Up @@ -570,8 +580,15 @@ void SelectiveChannel::CallMethod(
if (!initialized()) {
cntl->SetFailed(EINVAL, "SelectiveChannel=%p is not initialized yet",
this);
// This is a branch only entered by wrongly-used RPC, just call done
// in-place. See comments in channel.cpp on deadlock concerns.
if (user_done) {
user_done->Run();
}
return;
}
schan::Sender* sndr = new schan::Sender(cntl, request, response, user_done);
schan::Sender* sndr =
new schan::Sender(cntl, request, response, _chan._lb.get(), user_done);
cntl->_sender = sndr;
cntl->add_flag(Controller::FLAGS_DESTROY_CID_IN_DONE);
const CallId cid = cntl->call_id();
Expand Down
80 changes: 80 additions & 0 deletions test/brpc_channel_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ void* RunClosure(void* arg) {
return NULL;
}

void MarkCalled(bool* called) {
*called = true;
}

class DeleteOnlyOnceChannel : public brpc::Channel {
public:
DeleteOnlyOnceChannel() : _c(1) {
Expand Down Expand Up @@ -214,6 +218,21 @@ class MyEchoService : public ::test::EchoService {
std::function<MockFuncType> mockfunc_;
};

class DelayedCloseEchoService : public ::test::EchoService {
public:
void Echo(google::protobuf::RpcController* cntl_base,
const ::test::EchoRequest* req,
::test::EchoResponse*,
google::protobuf::Closure* done) override {
brpc::ClosureGuard done_guard(done);
if (req->sleep_us() > 0) {
bthread_usleep(req->sleep_us());
}
static_cast<brpc::Controller*>(cntl_base)->CloseConnection(
"Close connection after delay");
}
};

pthread_once_t register_mock_protocol = PTHREAD_ONCE_INIT;

class ChannelTest : public ::testing::Test{
Expand Down Expand Up @@ -2417,6 +2436,25 @@ TEST_F(ChannelTest, empty_parallel_channel) {
EXPECT_EQ(EPERM, cntl.ErrorCode()) << cntl.ErrorText();
}

TEST_F(ChannelTest, uninitialized_selective_channel) {
brpc::SelectiveChannel channel;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);

brpc::Controller sync_cntl;
::test::EchoService::Stub(&channel).Echo(&sync_cntl, &req, &res, NULL);
EXPECT_EQ(EINVAL, sync_cntl.ErrorCode()) << sync_cntl.ErrorText();

brpc::Controller async_cntl;
bool done_called = false;
google::protobuf::Closure* done =
brpc::NewCallback(&MarkCalled, &done_called);
::test::EchoService::Stub(&channel).Echo(&async_cntl, &req, &res, done);
EXPECT_TRUE(done_called);
EXPECT_EQ(EINVAL, async_cntl.ErrorCode()) << async_cntl.ErrorText();
}

TEST_F(ChannelTest, empty_selective_channel) {
brpc::SelectiveChannel channel;
ASSERT_EQ(0, channel.Init("rr", NULL));
Expand Down Expand Up @@ -2938,6 +2976,48 @@ TEST_F(ChannelTest, backup_request_policy) {
}
}

TEST_F(ChannelTest, selective_channel_ignores_late_subdone_after_timeout) {
DelayedCloseEchoService service;
brpc::Server server;
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start("127.0.0.1:0", NULL));

brpc::SelectiveChannel channel;
ASSERT_EQ(0, channel.Init("rr", NULL));

brpc::ChannelOptions options;
options.timeout_ms = 100;
for (int i = 0; i < 2; ++i) {
brpc::Channel* sub_channel = new brpc::Channel;
ASSERT_EQ(0, sub_channel->Init(server.listen_address(), &options));
ASSERT_EQ(0, channel.AddChannel(sub_channel, NULL));
}

brpc::Controller cntl;
cntl.set_max_retry(3);
cntl.set_backup_request_ms(1);
cntl.set_timeout_ms(10);

test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
req.set_sleep_us(50000);
CallMethod(&channel, &cntl, &req, &res, true);

ASSERT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText();
ASSERT_TRUE(cntl.has_backup_request());
ASSERT_GE(cntl.retried_count(), 1);

// Let the delayed sub-calls close their connections after the main RPC
// has already timed out. This used to re-enter retry/backup from
// SubDone::Run() on a partially torn-down controller.
bthread_usleep(120000);

EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText();
server.Stop(0);
server.Join();
}

TEST_F(ChannelTest, multiple_threads_single_channel) {
srand(time(NULL));
ASSERT_EQ(0, StartAccept(_ep));
Expand Down
Loading