From 0c324f03f98055998399404a7a1669e5f434c95b Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Wed, 29 Jan 2025 23:03:32 +0800 Subject: [PATCH 1/2] Optimize server latency --- src/brpc/details/method_status.cpp | 15 ++- src/brpc/details/method_status.h | 18 +++- src/brpc/policy/baidu_rpc_protocol.cpp | 111 ++++++++++++++------- src/brpc/policy/http_rpc_protocol.cpp | 17 +++- src/brpc/policy/hulu_pbrpc_protocol.cpp | 17 +++- src/brpc/policy/mongo_protocol.cpp | 36 +++++-- src/brpc/policy/nshead_protocol.cpp | 19 +++- src/brpc/policy/sofa_pbrpc_protocol.cpp | 18 +++- src/brpc/policy/streaming_rpc_protocol.cpp | 6 +- src/brpc/policy/streaming_rpc_protocol.h | 5 +- 10 files changed, 206 insertions(+), 56 deletions(-) diff --git a/src/brpc/details/method_status.cpp b/src/brpc/details/method_status.cpp index faf6449c3c..4360e7ae06 100644 --- a/src/brpc/details/method_status.cpp +++ b/src/brpc/details/method_status.cpp @@ -149,9 +149,22 @@ void MethodStatus::SetConcurrencyLimiter(ConcurrencyLimiter* cl) { _cl.reset(cl); } +int HandleResponseWritten(bthread_id_t id, void* data, int error_code, + const std::string& error_text) { + auto args = static_cast(data); + args->error_code = error_code; + args->error_text = error_text; + args->sent_us = butil::cpuwide_time_us(); + CHECK_EQ(0, bthread_id_unlock_and_destroy(id)); + return 0; +} + ConcurrencyRemover::~ConcurrencyRemover() { if (_status) { - _status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us); + if (_sent_us < _received_us) { + _sent_us = butil::cpuwide_time_us(); + } + _status->OnResponded(_c->ErrorCode(), _sent_us - _received_us); _status = NULL; } ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c); diff --git a/src/brpc/details/method_status.h b/src/brpc/details/method_status.h index b49b6754d0..7f42076aee 100644 --- a/src/brpc/details/method_status.h +++ b/src/brpc/details/method_status.h @@ -75,18 +75,28 @@ friend class Server; bvar::PassiveStatus _max_concurrency_bvar; }; +struct ResponseWriteInfo { + int error_code{0}; + std::string error_text; + int64_t sent_us{0}; +}; + +int HandleResponseWritten(bthread_id_t id, void* data, int error_code, + const std::string& error_text); + class ConcurrencyRemover { public: ConcurrencyRemover(MethodStatus* status, Controller* c, int64_t received_us) - : _status(status) - , _c(c) - , _received_us(received_us) {} + : _status(status) , _c(c) , _received_us(received_us) {} ~ConcurrencyRemover(); + + void set_sent_us(int64_t sent_us) { _sent_us = sent_us; } private: DISALLOW_COPY_AND_ASSIGN(ConcurrencyRemover); MethodStatus* _status; Controller* _c; - uint64_t _received_us; + int64_t _received_us; + int64_t _sent_us{0}; }; inline bool MethodStatus::OnRequested(int* rejected_cc, Controller* cntl) { diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index f8ea1f6361..ebeb119516 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -184,12 +184,9 @@ struct BaiduProxyPBMessages : public RpcPBMessages { } // Used by UT, can't be static. -void SendRpcResponse(int64_t correlation_id, - Controller* cntl, - RpcPBMessages* messages, - const Server* server, - MethodStatus* method_status, - int64_t received_us) { +void SendRpcResponse(int64_t correlation_id, Controller* cntl, + RpcPBMessages* messages, const Server* server, + MethodStatus* method_status, int64_t received_us) { ControllerPrivateAccessor accessor(cntl); Span* span = accessor.span(); if (span) { @@ -197,24 +194,33 @@ void SendRpcResponse(int64_t correlation_id, } Socket* sock = accessor.get_sending_socket(); - std::unique_ptr recycle_cntl(cntl); - ConcurrencyRemover concurrency_remover(method_status, cntl, received_us); + const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request(); + const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response(); + + ResponseWriteInfo args; + + // Recycle resources at the end of this function. + BRPC_SCOPE_EXIT { + { + // Remove concurrency and record latency at first. + ConcurrencyRemover concurrency_remover(method_status, cntl, received_us); + concurrency_remover.set_sent_us(args.sent_us); + } + + std::unique_ptr recycle_cntl(cntl); + - auto messages_guard = butil::MakeScopeGuard([server, messages] { if (NULL == messages) { return; } - if (NULL != server->options().baidu_master_service) { - BaiduProxyPBMessages::Return(static_cast(messages)); - } else { + + cntl->CallAfterRpcResp(req, res); + if (NULL == server->options().baidu_master_service) { server->options().rpc_pb_message_factory->Return(messages); + } else { + BaiduProxyPBMessages::Return(static_cast(messages)); } - }); - - const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request(); - const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response(); - ClosureGuard guard(brpc::NewCallback( - cntl, &Controller::CallAfterRpcResp, req, res)); + }; StreamIds response_stream_ids = accessor.response_streams(); @@ -302,29 +308,65 @@ void SendRpcResponse(int64_t correlation_id, if (span) { span->set_response_size(res_buf.size()); } + + bthread_id_t response_id; + CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); // Send rpc response over stream even if server side failed to create // stream for some reason. - if(cntl->has_remote_stream()){ + if (cntl->has_remote_stream()) { // Send the response over stream to notify that this stream connection // is successfully built. // Response_stream can be INVALID_STREAM_ID when error occurs. if (SendStreamData(sock, &res_buf, accessor.remote_stream_settings()->stream_id(), - response_stream_id) != 0) { + response_stream_id, response_id) != 0) { + error_code = errno; + PLOG_IF(WARNING, error_code != EPIPE) + << "Fail to write into " << sock->description(); + cntl->SetFailed(error_code, "Fail to write into %s", + sock->description().c_str()); + Stream::SetFailed(response_stream_ids, error_code, + "Fail to write into %s", + sock->description().c_str()); + } + } else{ + // Have the risk of unlimited pending responses, in which case, tell + // users to set max_concurrency. + Socket::WriteOptions wopt; + wopt.id_wait = response_id; + wopt.notify_on_success = true; + wopt.ignore_eovercrowded = true; + if (sock->Write(&res_buf, &wopt) != 0) { const int errcode = errno; - std::string error_text = butil::string_printf(64, "Fail to write into %s", - sock->description().c_str()); - PLOG_IF(WARNING, errcode != EPIPE) << error_text; - cntl->SetFailed(errcode, "%s", error_text.c_str()); - Stream::SetFailed(response_stream_ids, errcode, "%s", - error_text.c_str()); + PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; + cntl->SetFailed(errcode, "Fail to write into %s", + sock->description().c_str()); + return; + } + } + + bthread_id_join(response_id); + + error_code = args.error_code; + if (cntl->has_remote_stream()) { + if (0 != error_code) { + LOG_IF(WARNING, error_code != EPIPE) + << "Fail to write into " << *sock + << ", error text= " << args.error_text + << ": " << berror(error_code); + cntl->SetFailed(error_code, "Fail to write into %s: %s", + sock->description().c_str(), + args.error_text.c_str()); + Stream::SetFailed(response_stream_ids, error_code, + "Fail to write into %s", + args.error_text.c_str()); return; } // Now it's ok the mark these server-side streams as connected as all the // written user data would follower the RPC response. // Reuse stream_ptr to avoid address first stream id again - if(stream_ptr) { + if (NULL != stream_ptr) { ((Stream*)stream_ptr->conn())->SetConnected(); } for (size_t i = 1; i < response_stream_ids.size(); ++i) { @@ -342,20 +384,19 @@ void SendRpcResponse(int64_t correlation_id, } else{ // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. - Socket::WriteOptions wopt; - wopt.ignore_eovercrowded = true; - if (sock->Write(&res_buf, &wopt) != 0) { - const int errcode = errno; - PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; - cntl->SetFailed(errcode, "Fail to write into %s", - sock->description().c_str()); + if (0 != error_code) { + LOG_IF(WARNING, error_code != EPIPE) << "Fail to write into " << *sock + << ", error text= " << args.error_text + << ": " << strerror(error_code); + cntl->SetFailed(error_code, "Fail to write into %s: %s", + sock->description().c_str(), args.error_text.c_str()); return; } } if (span) { // TODO: this is not sent - span->set_sent_us(butil::cpuwide_time_us()); + span->set_sent_us(args.sent_us); } } diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index 444be39708..7b0b28a60d 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -934,7 +934,12 @@ HttpResponseSender::~HttpResponseSender() { int rc = -1; // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. + ResponseWriteInfo args; + bthread_id_t response_id; + CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); Socket::WriteOptions wopt; + wopt.id_wait = response_id; + wopt.notify_on_success = true; wopt.ignore_eovercrowded = true; if (is_http2) { if (is_grpc) { @@ -980,9 +985,19 @@ HttpResponseSender::~HttpResponseSender() { cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str()); return; } + + bthread_id_join(response_id); + concurrency_remover.set_sent_us(args.sent_us); + const int errcode = args.error_code; + if (0 != errcode) { + LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket; + cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str()); + return; + } + if (span) { // TODO: this is not sent - span->set_sent_us(butil::cpuwide_time_us()); + span->set_sent_us(args.sent_us); } } diff --git a/src/brpc/policy/hulu_pbrpc_protocol.cpp b/src/brpc/policy/hulu_pbrpc_protocol.cpp index bf4bd86f5c..8858f10b7a 100644 --- a/src/brpc/policy/hulu_pbrpc_protocol.cpp +++ b/src/brpc/policy/hulu_pbrpc_protocol.cpp @@ -304,7 +304,12 @@ static void SendHuluResponse(int64_t correlation_id, // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. + ResponseWriteInfo args; + bthread_id_t response_id; + CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); Socket::WriteOptions wopt; + wopt.id_wait = response_id; + wopt.notify_on_success = true; wopt.ignore_eovercrowded = true; if (sock->Write(&res_buf, &wopt) != 0) { const int errcode = errno; @@ -313,9 +318,19 @@ static void SendHuluResponse(int64_t correlation_id, sock->description().c_str()); return; } + + bthread_id_join(response_id); + concurrency_remover.set_sent_us(args.sent_us); + const int errcode = args.error_code; + if (0 != errcode) { + LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; + cntl->SetFailed(errcode, "Fail to write into %s", sock->description().c_str()); + return; + } + if (span) { // TODO: this is not sent - span->set_sent_us(butil::cpuwide_time_us()); + span->set_sent_us(args.sent_us); } } diff --git a/src/brpc/policy/mongo_protocol.cpp b/src/brpc/policy/mongo_protocol.cpp index 82bb3e0b36..29fc002181 100644 --- a/src/brpc/policy/mongo_protocol.cpp +++ b/src/brpc/policy/mongo_protocol.cpp @@ -95,16 +95,34 @@ void SendMongoResponse::Run() { res_buf.append(res.message()); } - if (!res_buf.empty()) { - // Have the risk of unlimited pending responses, in which case, tell - // users to set max_concurrency. - Socket::WriteOptions wopt; - wopt.ignore_eovercrowded = true; - if (socket->Write(&res_buf, &wopt) != 0) { - PLOG(WARNING) << "Fail to write into " << *socket; - return; - } + if (res_buf.empty()) { + return; + } + + // Have the risk of unlimited pending responses, in which case, tell + // users to set max_concurrency. + ResponseWriteInfo args; + bthread_id_t response_id; + CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); + Socket::WriteOptions wopt; + wopt.id_wait = response_id; + wopt.notify_on_success = true; + wopt.ignore_eovercrowded = true; + wopt.ignore_eovercrowded = true; + if (socket->Write(&res_buf, &wopt) != 0) { + PLOG(WARNING) << "Fail to write into " << *socket; + return; } + + bthread_id_join(response_id); + concurrency_remover.set_sent_us(args.sent_us); + const int errcode = args.error_code; + if (0 != errcode) { + LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket; + cntl.SetFailed(errcode, "Fail to write into %s", socket->description().c_str()); + return; + } + } ParseResult ParseMongoMessage(butil::IOBuf* source, diff --git a/src/brpc/policy/nshead_protocol.cpp b/src/brpc/policy/nshead_protocol.cpp index 4288085d68..75ecccb315 100644 --- a/src/brpc/policy/nshead_protocol.cpp +++ b/src/brpc/policy/nshead_protocol.cpp @@ -95,6 +95,7 @@ void NsheadClosure::Run() { return; } + int64_t sent_us = 0; if (_do_respond) { // response uses request's head as default. // Notice that the response use request.head.log_id directly rather @@ -112,7 +113,12 @@ void NsheadClosure::Run() { write_buf.append(_response.body.movable()); // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. + ResponseWriteInfo args; + bthread_id_t response_id; + CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); Socket::WriteOptions wopt; + wopt.id_wait = response_id; + wopt.notify_on_success = true; wopt.ignore_eovercrowded = true; if (sock->Write(&write_buf, &wopt) != 0) { const int errcode = errno; @@ -121,10 +127,21 @@ void NsheadClosure::Run() { sock->description().c_str()); return; } + + bthread_id_join(response_id); + concurrency_remover.set_sent_us(args.sent_us); + const int errcode = args.error_code; + if (0 != errcode) { + LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; + _controller.SetFailed(errcode, "Fail to write into %s", + sock->description().c_str()); + return; + } + sent_us = args.sent_us; } if (span) { // TODO: this is not sent - span->set_sent_us(butil::cpuwide_time_us()); + span->set_sent_us(0 == sent_us ? butil::cpuwide_time_us() : sent_us); } } diff --git a/src/brpc/policy/sofa_pbrpc_protocol.cpp b/src/brpc/policy/sofa_pbrpc_protocol.cpp index ae128b4051..50c8d476c2 100644 --- a/src/brpc/policy/sofa_pbrpc_protocol.cpp +++ b/src/brpc/policy/sofa_pbrpc_protocol.cpp @@ -281,7 +281,12 @@ static void SendSofaResponse(int64_t correlation_id, } // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. + ResponseWriteInfo args; + bthread_id_t response_id; + CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); Socket::WriteOptions wopt; + wopt.id_wait = response_id; + wopt.notify_on_success = true; wopt.ignore_eovercrowded = true; if (sock->Write(&res_buf, &wopt) != 0) { const int errcode = errno; @@ -290,9 +295,20 @@ static void SendSofaResponse(int64_t correlation_id, sock->description().c_str()); return; } + + bthread_id_join(response_id); + concurrency_remover.set_sent_us(args.sent_us); + const int errcode = args.error_code; + if (0 != errcode) { + LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; + cntl->SetFailed(errcode, "Fail to write into %s", + sock->description().c_str()); + return; + } + if (span) { // TODO: this is not sent - span->set_sent_us(butil::cpuwide_time_us()); + span->set_sent_us(args.sent_us); } } diff --git a/src/brpc/policy/streaming_rpc_protocol.cpp b/src/brpc/policy/streaming_rpc_protocol.cpp index 85d2a7d536..483eb30ff4 100644 --- a/src/brpc/policy/streaming_rpc_protocol.cpp +++ b/src/brpc/policy/streaming_rpc_protocol.cpp @@ -154,7 +154,9 @@ void SendStreamClose(Socket *sock, int64_t remote_stream_id, } int SendStreamData(Socket* sock, const butil::IOBuf* data, - int64_t remote_stream_id, int64_t source_stream_id) { + int64_t remote_stream_id, + int64_t source_stream_id, + bthread_id_t response_id) { CHECK(sock != NULL); StreamFrameMeta fm; fm.set_stream_id(remote_stream_id); @@ -164,6 +166,8 @@ int SendStreamData(Socket* sock, const butil::IOBuf* data, butil::IOBuf out; PackStreamMessage(&out, fm, data); Socket::WriteOptions wopt; + wopt.id_wait = response_id; + wopt.notify_on_success = true; wopt.ignore_eovercrowded = true; return sock->Write(&out, &wopt); } diff --git a/src/brpc/policy/streaming_rpc_protocol.h b/src/brpc/policy/streaming_rpc_protocol.h index ff9e207a19..7b29ce9063 100644 --- a/src/brpc/policy/streaming_rpc_protocol.h +++ b/src/brpc/policy/streaming_rpc_protocol.h @@ -19,10 +19,10 @@ #ifndef BRPC_STREAMING_RPC_PROTOCOL_H #define BRPC_STREAMING_RPC_PROTOCOL_H +#include "bthread/types.h" #include "brpc/protocol.h" #include "brpc/streaming_rpc_meta.pb.h" - namespace brpc { namespace policy { @@ -41,7 +41,8 @@ void SendStreamClose(Socket *sock, int64_t remote_stream_id, int64_t source_stream_id); int SendStreamData(Socket* sock, const butil::IOBuf* data, - int64_t remote_stream_id, int64_t source_stream_id); + int64_t remote_stream_id, + int64_t source_stream_id, bthread_id_t); } // namespace policy } // namespace brpc From 6a0a761fdf8bd022b690e78ce571d082eb3b719a Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Fri, 21 Mar 2025 22:01:05 +0800 Subject: [PATCH 2/2] Only update real sent time to span --- src/brpc/details/method_status.cpp | 10 +--- src/brpc/details/method_status.h | 7 +-- src/brpc/policy/baidu_rpc_protocol.cpp | 62 ++++++---------------- src/brpc/policy/http_rpc_protocol.cpp | 21 +++----- src/brpc/policy/hulu_pbrpc_protocol.cpp | 21 +++----- src/brpc/policy/mongo_protocol.cpp | 36 ++++--------- src/brpc/policy/nshead_protocol.cpp | 23 ++++---- src/brpc/policy/sofa_pbrpc_protocol.cpp | 22 +++----- src/brpc/policy/streaming_rpc_protocol.cpp | 9 ++-- src/brpc/policy/streaming_rpc_protocol.h | 4 +- 10 files changed, 70 insertions(+), 145 deletions(-) diff --git a/src/brpc/details/method_status.cpp b/src/brpc/details/method_status.cpp index 4360e7ae06..3bed6bf209 100644 --- a/src/brpc/details/method_status.cpp +++ b/src/brpc/details/method_status.cpp @@ -149,11 +149,8 @@ void MethodStatus::SetConcurrencyLimiter(ConcurrencyLimiter* cl) { _cl.reset(cl); } -int HandleResponseWritten(bthread_id_t id, void* data, int error_code, - const std::string& error_text) { +int HandleResponseWritten(bthread_id_t id, void* data, int /*error_code*/) { auto args = static_cast(data); - args->error_code = error_code; - args->error_text = error_text; args->sent_us = butil::cpuwide_time_us(); CHECK_EQ(0, bthread_id_unlock_and_destroy(id)); return 0; @@ -161,10 +158,7 @@ int HandleResponseWritten(bthread_id_t id, void* data, int error_code, ConcurrencyRemover::~ConcurrencyRemover() { if (_status) { - if (_sent_us < _received_us) { - _sent_us = butil::cpuwide_time_us(); - } - _status->OnResponded(_c->ErrorCode(), _sent_us - _received_us); + _status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us); _status = NULL; } ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c); diff --git a/src/brpc/details/method_status.h b/src/brpc/details/method_status.h index 7f42076aee..9b7f070991 100644 --- a/src/brpc/details/method_status.h +++ b/src/brpc/details/method_status.h @@ -76,13 +76,10 @@ friend class Server; }; struct ResponseWriteInfo { - int error_code{0}; - std::string error_text; int64_t sent_us{0}; }; -int HandleResponseWritten(bthread_id_t id, void* data, int error_code, - const std::string& error_text); +int HandleResponseWritten(bthread_id_t id, void* data, int error_code); class ConcurrencyRemover { public: @@ -90,13 +87,11 @@ class ConcurrencyRemover { : _status(status) , _c(c) , _received_us(received_us) {} ~ConcurrencyRemover(); - void set_sent_us(int64_t sent_us) { _sent_us = sent_us; } private: DISALLOW_COPY_AND_ASSIGN(ConcurrencyRemover); MethodStatus* _status; Controller* _c; int64_t _received_us; - int64_t _sent_us{0}; }; inline bool MethodStatus::OnRequested(int* rejected_cc, Controller* cntl) { diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index ebeb119516..a0ae5dd86d 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -197,19 +197,15 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request(); const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response(); - ResponseWriteInfo args; - // Recycle resources at the end of this function. BRPC_SCOPE_EXIT { { // Remove concurrency and record latency at first. ConcurrencyRemover concurrency_remover(method_status, cntl, received_us); - concurrency_remover.set_sent_us(args.sent_us); } std::unique_ptr recycle_cntl(cntl); - if (NULL == messages) { return; } @@ -305,12 +301,13 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, } } + ResponseWriteInfo args; + bthread_id_t response_id = INVALID_BTHREAD_ID; if (span) { span->set_response_size(res_buf.size()); + CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); } - bthread_id_t response_id; - CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); // Send rpc response over stream even if server side failed to create // stream for some reason. if (cntl->has_remote_stream()) { @@ -328,45 +325,13 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, Stream::SetFailed(response_stream_ids, error_code, "Fail to write into %s", sock->description().c_str()); - } - } else{ - // Have the risk of unlimited pending responses, in which case, tell - // users to set max_concurrency. - Socket::WriteOptions wopt; - wopt.id_wait = response_id; - wopt.notify_on_success = true; - wopt.ignore_eovercrowded = true; - if (sock->Write(&res_buf, &wopt) != 0) { - const int errcode = errno; - PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; - cntl->SetFailed(errcode, "Fail to write into %s", - sock->description().c_str()); - return; - } - } - - bthread_id_join(response_id); - - error_code = args.error_code; - if (cntl->has_remote_stream()) { - if (0 != error_code) { - LOG_IF(WARNING, error_code != EPIPE) - << "Fail to write into " << *sock - << ", error text= " << args.error_text - << ": " << berror(error_code); - cntl->SetFailed(error_code, "Fail to write into %s: %s", - sock->description().c_str(), - args.error_text.c_str()); - Stream::SetFailed(response_stream_ids, error_code, - "Fail to write into %s", - args.error_text.c_str()); return; } // Now it's ok the mark these server-side streams as connected as all the // written user data would follower the RPC response. // Reuse stream_ptr to avoid address first stream id again - if (NULL != stream_ptr) { + if (stream_ptr) { ((Stream*)stream_ptr->conn())->SetConnected(); } for (size_t i = 1; i < response_stream_ids.size(); ++i) { @@ -384,17 +349,24 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, } else{ // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. - if (0 != error_code) { - LOG_IF(WARNING, error_code != EPIPE) << "Fail to write into " << *sock - << ", error text= " << args.error_text - << ": " << strerror(error_code); - cntl->SetFailed(error_code, "Fail to write into %s: %s", - sock->description().c_str(), args.error_text.c_str()); + Socket::WriteOptions wopt; + wopt.ignore_eovercrowded = true; + if (INVALID_BTHREAD_ID != response_id) { + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } + if (sock->Write(&res_buf, &wopt) != 0) { + const int errcode = errno; + PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; + cntl->SetFailed(errcode, "Fail to write into %s", + sock->description().c_str()); return; } } if (span) { + bthread_id_join(response_id); + // Do not care about the result of background writing. // TODO: this is not sent span->set_sent_us(args.sent_us); } diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index 7b0b28a60d..07fc7da88f 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -935,12 +935,14 @@ HttpResponseSender::~HttpResponseSender() { // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. ResponseWriteInfo args; - bthread_id_t response_id; - CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); Socket::WriteOptions wopt; - wopt.id_wait = response_id; - wopt.notify_on_success = true; wopt.ignore_eovercrowded = true; + bthread_id_t response_id = INVALID_BTHREAD_ID; + if (span) { + CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } if (is_http2) { if (is_grpc) { // Append compressed and length before body @@ -986,16 +988,9 @@ HttpResponseSender::~HttpResponseSender() { return; } - bthread_id_join(response_id); - concurrency_remover.set_sent_us(args.sent_us); - const int errcode = args.error_code; - if (0 != errcode) { - LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket; - cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str()); - return; - } - if (span) { + bthread_id_join(response_id); + // Do not care about the result of background writing. // TODO: this is not sent span->set_sent_us(args.sent_us); } diff --git a/src/brpc/policy/hulu_pbrpc_protocol.cpp b/src/brpc/policy/hulu_pbrpc_protocol.cpp index 8858f10b7a..02ec8efcad 100644 --- a/src/brpc/policy/hulu_pbrpc_protocol.cpp +++ b/src/brpc/policy/hulu_pbrpc_protocol.cpp @@ -305,12 +305,14 @@ static void SendHuluResponse(int64_t correlation_id, // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. ResponseWriteInfo args; - bthread_id_t response_id; - CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); Socket::WriteOptions wopt; - wopt.id_wait = response_id; - wopt.notify_on_success = true; wopt.ignore_eovercrowded = true; + bthread_id_t response_id = INVALID_BTHREAD_ID; + if (span) { + CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } if (sock->Write(&res_buf, &wopt) != 0) { const int errcode = errno; PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; @@ -319,16 +321,9 @@ static void SendHuluResponse(int64_t correlation_id, return; } - bthread_id_join(response_id); - concurrency_remover.set_sent_us(args.sent_us); - const int errcode = args.error_code; - if (0 != errcode) { - LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; - cntl->SetFailed(errcode, "Fail to write into %s", sock->description().c_str()); - return; - } - if (span) { + bthread_id_join(response_id); + // Do not care about the result of background writing. // TODO: this is not sent span->set_sent_us(args.sent_us); } diff --git a/src/brpc/policy/mongo_protocol.cpp b/src/brpc/policy/mongo_protocol.cpp index 29fc002181..82bb3e0b36 100644 --- a/src/brpc/policy/mongo_protocol.cpp +++ b/src/brpc/policy/mongo_protocol.cpp @@ -95,34 +95,16 @@ void SendMongoResponse::Run() { res_buf.append(res.message()); } - if (res_buf.empty()) { - return; - } - - // Have the risk of unlimited pending responses, in which case, tell - // users to set max_concurrency. - ResponseWriteInfo args; - bthread_id_t response_id; - CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); - Socket::WriteOptions wopt; - wopt.id_wait = response_id; - wopt.notify_on_success = true; - wopt.ignore_eovercrowded = true; - wopt.ignore_eovercrowded = true; - if (socket->Write(&res_buf, &wopt) != 0) { - PLOG(WARNING) << "Fail to write into " << *socket; - return; - } - - bthread_id_join(response_id); - concurrency_remover.set_sent_us(args.sent_us); - const int errcode = args.error_code; - if (0 != errcode) { - LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket; - cntl.SetFailed(errcode, "Fail to write into %s", socket->description().c_str()); - return; + if (!res_buf.empty()) { + // Have the risk of unlimited pending responses, in which case, tell + // users to set max_concurrency. + Socket::WriteOptions wopt; + wopt.ignore_eovercrowded = true; + if (socket->Write(&res_buf, &wopt) != 0) { + PLOG(WARNING) << "Fail to write into " << *socket; + return; + } } - } ParseResult ParseMongoMessage(butil::IOBuf* source, diff --git a/src/brpc/policy/nshead_protocol.cpp b/src/brpc/policy/nshead_protocol.cpp index 75ecccb315..a26dc96857 100644 --- a/src/brpc/policy/nshead_protocol.cpp +++ b/src/brpc/policy/nshead_protocol.cpp @@ -114,12 +114,14 @@ void NsheadClosure::Run() { // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. ResponseWriteInfo args; - bthread_id_t response_id; - CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); Socket::WriteOptions wopt; - wopt.id_wait = response_id; - wopt.notify_on_success = true; wopt.ignore_eovercrowded = true; + bthread_id_t response_id = INVALID_BTHREAD_ID; + if (span) { + CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } if (sock->Write(&write_buf, &wopt) != 0) { const int errcode = errno; PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; @@ -128,16 +130,11 @@ void NsheadClosure::Run() { return; } - bthread_id_join(response_id); - concurrency_remover.set_sent_us(args.sent_us); - const int errcode = args.error_code; - if (0 != errcode) { - LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; - _controller.SetFailed(errcode, "Fail to write into %s", - sock->description().c_str()); - return; + if (span) { + bthread_id_join(response_id); + // Do not care about the result of background writing. + sent_us = args.sent_us; } - sent_us = args.sent_us; } if (span) { // TODO: this is not sent diff --git a/src/brpc/policy/sofa_pbrpc_protocol.cpp b/src/brpc/policy/sofa_pbrpc_protocol.cpp index 50c8d476c2..9ee772dcff 100644 --- a/src/brpc/policy/sofa_pbrpc_protocol.cpp +++ b/src/brpc/policy/sofa_pbrpc_protocol.cpp @@ -282,12 +282,14 @@ static void SendSofaResponse(int64_t correlation_id, // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. ResponseWriteInfo args; - bthread_id_t response_id; - CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten)); Socket::WriteOptions wopt; - wopt.id_wait = response_id; - wopt.notify_on_success = true; wopt.ignore_eovercrowded = true; + bthread_id_t response_id = INVALID_BTHREAD_ID; + if (span) { + CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } if (sock->Write(&res_buf, &wopt) != 0) { const int errcode = errno; PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; @@ -296,17 +298,9 @@ static void SendSofaResponse(int64_t correlation_id, return; } - bthread_id_join(response_id); - concurrency_remover.set_sent_us(args.sent_us); - const int errcode = args.error_code; - if (0 != errcode) { - LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; - cntl->SetFailed(errcode, "Fail to write into %s", - sock->description().c_str()); - return; - } - if (span) { + bthread_id_join(response_id); + // Do not care about the result of background writing. // TODO: this is not sent span->set_sent_us(args.sent_us); } diff --git a/src/brpc/policy/streaming_rpc_protocol.cpp b/src/brpc/policy/streaming_rpc_protocol.cpp index 483eb30ff4..0921d005e7 100644 --- a/src/brpc/policy/streaming_rpc_protocol.cpp +++ b/src/brpc/policy/streaming_rpc_protocol.cpp @@ -154,8 +154,7 @@ void SendStreamClose(Socket *sock, int64_t remote_stream_id, } int SendStreamData(Socket* sock, const butil::IOBuf* data, - int64_t remote_stream_id, - int64_t source_stream_id, + int64_t remote_stream_id, int64_t source_stream_id, bthread_id_t response_id) { CHECK(sock != NULL); StreamFrameMeta fm; @@ -166,8 +165,10 @@ int SendStreamData(Socket* sock, const butil::IOBuf* data, butil::IOBuf out; PackStreamMessage(&out, fm, data); Socket::WriteOptions wopt; - wopt.id_wait = response_id; - wopt.notify_on_success = true; + if (INVALID_BTHREAD_ID != response_id) { + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } wopt.ignore_eovercrowded = true; return sock->Write(&out, &wopt); } diff --git a/src/brpc/policy/streaming_rpc_protocol.h b/src/brpc/policy/streaming_rpc_protocol.h index 7b29ce9063..22023e7c3f 100644 --- a/src/brpc/policy/streaming_rpc_protocol.h +++ b/src/brpc/policy/streaming_rpc_protocol.h @@ -41,8 +41,8 @@ void SendStreamClose(Socket *sock, int64_t remote_stream_id, int64_t source_stream_id); int SendStreamData(Socket* sock, const butil::IOBuf* data, - int64_t remote_stream_id, - int64_t source_stream_id, bthread_id_t); + int64_t remote_stream_id, int64_t source_stream_id, + bthread_id_t); } // namespace policy } // namespace brpc