From 4217a8a36343010c7b876e48317eff2f907a3c0a Mon Sep 17 00:00:00 2001 From: huangjun Date: Thu, 25 Jun 2026 14:01:34 +0800 Subject: [PATCH] Fix selective_channel late SubDone retry after EndRPC (#3358) Ignore late selective_channel SubDone callbacks once the main RPC enters EndRPC, and let Sender hold the selective balancer while issuing sub-RPCs. Return early when SelectiveChannel is called before Init, preserving the intended EINVAL result. Add regression tests covering timeout plus delayed sub-call completion and uninitialized SelectiveChannel calls. --- src/brpc/controller.cpp | 12 +++++ src/brpc/controller.h | 3 ++ src/brpc/selective_channel.cpp | 23 ++++++++-- test/brpc_channel_unittest.cpp | 80 ++++++++++++++++++++++++++++++++++ 4 files changed, 115 insertions(+), 3 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 15c8c91887..135e2b2451 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -625,6 +625,16 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, return; } + if (is_ending_rpc()) { + // SelectiveChannel may still deliver late SubDone callbacks after the + // main RPC has entered EndRPC(). Ignore those callbacks instead of + // letting them re-enter retry/backup on partially torn-down state. + _error_code = saved_error; + response_attachment().clear(); + CHECK_EQ(0, bthread_id_unlock(info.id)); + return; + } + if ((!_error_code && _retry_policy == NULL) || _current_call.nretry >= _max_retry) { goto END_OF_RPC; @@ -881,6 +891,8 @@ void Controller::Call::OnComplete( } void Controller::EndRPC(const CompletionInfo& info) { + add_flag(FLAGS_ENDING_RPC); + if (_timeout_id != 0) { bthread_timer_del(_timeout_id); _timeout_id = 0; diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 45f71b72f6..dbbb556072 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -152,6 +152,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20); static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21); static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22); + static const uint32_t FLAGS_ENDING_RPC = (1 << 23); public: struct Inheritable { @@ -796,6 +797,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); return has_flag(FLAGS_ENABLED_CIRCUIT_BREAKER); } + bool is_ending_rpc() const { return has_flag(FLAGS_ENDING_RPC); } + std::string& protocol_param() { return _thrift_method_name; } const std::string& protocol_param() const { return _thrift_method_name; } diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index a59580e321..069bc724cb 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -135,6 +135,7 @@ friend class SubDone; Sender(Controller* cntl, const google::protobuf::Message* request, google::protobuf::Message* response, + SharedLoadBalancer* lb, google::protobuf::Closure* user_done); ~Sender() { Clear(); } int IssueRPC(int64_t start_realtime_us); @@ -148,6 +149,7 @@ friend class SubDone; Controller* _main_cntl; const google::protobuf::Message* _request; google::protobuf::Message* _response; + butil::intrusive_ptr _lb; google::protobuf::Closure* _user_done; short _nfree; short _nalloc; @@ -293,10 +295,12 @@ void ChannelBalancer::Describe(std::ostream& os, Sender::Sender(Controller* cntl, const google::protobuf::Message* request, google::protobuf::Message* response, + SharedLoadBalancer* lb, google::protobuf::Closure* user_done) : _main_cntl(cntl) , _request(request) , _response(response) + , _lb(lb) , _user_done(user_done) , _nfree(0) , _nalloc(0) @@ -306,14 +310,20 @@ Sender::Sender(Controller* cntl, int Sender::IssueRPC(int64_t start_realtime_us) { _main_cntl->_current_call.need_feedback = false; + ChannelBalancer* balancer = + static_cast(_lb.get()); + if (balancer == NULL) { + _main_cntl->SetFailed(ECANCELED, + "SelectiveChannel balancer is unavailable"); + return -1; + } LoadBalancer::SelectIn sel_in = { start_realtime_us, true, _main_cntl->has_request_code(), _main_cntl->_request_code, _main_cntl->_accessed }; ChannelBalancer::SelectOut sel_out; - const int rc = static_cast(_main_cntl->_lb.get()) - ->SelectChannel(sel_in, &sel_out); + const int rc = balancer->SelectChannel(sel_in, &sel_out); if (rc != 0) { _main_cntl->SetFailed(rc, "Fail to select channel, %s", berror(rc)); return -1; @@ -570,8 +580,15 @@ void SelectiveChannel::CallMethod( if (!initialized()) { cntl->SetFailed(EINVAL, "SelectiveChannel=%p is not initialized yet", this); + // This is a branch only entered by wrongly-used RPC, just call done + // in-place. See comments in channel.cpp on deadlock concerns. + if (user_done) { + user_done->Run(); + } + return; } - schan::Sender* sndr = new schan::Sender(cntl, request, response, user_done); + schan::Sender* sndr = + new schan::Sender(cntl, request, response, _chan._lb.get(), user_done); cntl->_sender = sndr; cntl->add_flag(Controller::FLAGS_DESTROY_CID_IN_DONE); const CallId cid = cntl->call_id(); diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 2004767470..3a5153c5d8 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -74,6 +74,10 @@ void* RunClosure(void* arg) { return NULL; } +void MarkCalled(bool* called) { + *called = true; +} + class DeleteOnlyOnceChannel : public brpc::Channel { public: DeleteOnlyOnceChannel() : _c(1) { @@ -214,6 +218,21 @@ class MyEchoService : public ::test::EchoService { std::function mockfunc_; }; +class DelayedCloseEchoService : public ::test::EchoService { +public: + void Echo(google::protobuf::RpcController* cntl_base, + const ::test::EchoRequest* req, + ::test::EchoResponse*, + google::protobuf::Closure* done) override { + brpc::ClosureGuard done_guard(done); + if (req->sleep_us() > 0) { + bthread_usleep(req->sleep_us()); + } + static_cast(cntl_base)->CloseConnection( + "Close connection after delay"); + } +}; + pthread_once_t register_mock_protocol = PTHREAD_ONCE_INIT; class ChannelTest : public ::testing::Test{ @@ -2417,6 +2436,25 @@ TEST_F(ChannelTest, empty_parallel_channel) { EXPECT_EQ(EPERM, cntl.ErrorCode()) << cntl.ErrorText(); } +TEST_F(ChannelTest, uninitialized_selective_channel) { + brpc::SelectiveChannel channel; + test::EchoRequest req; + test::EchoResponse res; + req.set_message(__FUNCTION__); + + brpc::Controller sync_cntl; + ::test::EchoService::Stub(&channel).Echo(&sync_cntl, &req, &res, NULL); + EXPECT_EQ(EINVAL, sync_cntl.ErrorCode()) << sync_cntl.ErrorText(); + + brpc::Controller async_cntl; + bool done_called = false; + google::protobuf::Closure* done = + brpc::NewCallback(&MarkCalled, &done_called); + ::test::EchoService::Stub(&channel).Echo(&async_cntl, &req, &res, done); + EXPECT_TRUE(done_called); + EXPECT_EQ(EINVAL, async_cntl.ErrorCode()) << async_cntl.ErrorText(); +} + TEST_F(ChannelTest, empty_selective_channel) { brpc::SelectiveChannel channel; ASSERT_EQ(0, channel.Init("rr", NULL)); @@ -2938,6 +2976,48 @@ TEST_F(ChannelTest, backup_request_policy) { } } +TEST_F(ChannelTest, selective_channel_ignores_late_subdone_after_timeout) { + DelayedCloseEchoService service; + brpc::Server server; + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + ASSERT_EQ(0, server.Start("127.0.0.1:0", NULL)); + + brpc::SelectiveChannel channel; + ASSERT_EQ(0, channel.Init("rr", NULL)); + + brpc::ChannelOptions options; + options.timeout_ms = 100; + for (int i = 0; i < 2; ++i) { + brpc::Channel* sub_channel = new brpc::Channel; + ASSERT_EQ(0, sub_channel->Init(server.listen_address(), &options)); + ASSERT_EQ(0, channel.AddChannel(sub_channel, NULL)); + } + + brpc::Controller cntl; + cntl.set_max_retry(3); + cntl.set_backup_request_ms(1); + cntl.set_timeout_ms(10); + + test::EchoRequest req; + test::EchoResponse res; + req.set_message(__FUNCTION__); + req.set_sleep_us(50000); + CallMethod(&channel, &cntl, &req, &res, true); + + ASSERT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText(); + ASSERT_TRUE(cntl.has_backup_request()); + ASSERT_GE(cntl.retried_count(), 1); + + // Let the delayed sub-calls close their connections after the main RPC + // has already timed out. This used to re-enter retry/backup from + // SubDone::Run() on a partially torn-down controller. + bthread_usleep(120000); + + EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText(); + server.Stop(0); + server.Join(); +} + TEST_F(ChannelTest, multiple_threads_single_channel) { srand(time(NULL)); ASSERT_EQ(0, StartAccept(_ep));