diff --git a/src/brpc/details/method_status.cpp b/src/brpc/details/method_status.cpp index faf6449c3c..3bed6bf209 100644 --- a/src/brpc/details/method_status.cpp +++ b/src/brpc/details/method_status.cpp @@ -149,6 +149,13 @@ void MethodStatus::SetConcurrencyLimiter(ConcurrencyLimiter* cl) { _cl.reset(cl); } +int HandleResponseWritten(bthread_id_t id, void* data, int /*error_code*/) { + auto args = static_cast(data); + args->sent_us = butil::cpuwide_time_us(); + CHECK_EQ(0, bthread_id_unlock_and_destroy(id)); + return 0; +} + ConcurrencyRemover::~ConcurrencyRemover() { if (_status) { _status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us); diff --git a/src/brpc/details/method_status.h b/src/brpc/details/method_status.h index b49b6754d0..9b7f070991 100644 --- a/src/brpc/details/method_status.h +++ b/src/brpc/details/method_status.h @@ -75,18 +75,23 @@ friend class Server; bvar::PassiveStatus _max_concurrency_bvar; }; +struct ResponseWriteInfo { + int64_t sent_us{0}; +}; + +int HandleResponseWritten(bthread_id_t id, void* data, int error_code); + class ConcurrencyRemover { public: ConcurrencyRemover(MethodStatus* status, Controller* c, int64_t received_us) - : _status(status) - , _c(c) - , _received_us(received_us) {} + : _status(status) , _c(c) , _received_us(received_us) {} ~ConcurrencyRemover(); + private: DISALLOW_COPY_AND_ASSIGN(ConcurrencyRemover); MethodStatus* _status; Controller* _c; - uint64_t _received_us; + int64_t _received_us; }; inline bool MethodStatus::OnRequested(int* rejected_cc, Controller* cntl) { diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index f8ea1f6361..a0ae5dd86d 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -184,12 +184,9 @@ struct BaiduProxyPBMessages : public RpcPBMessages { } // Used by UT, can't be static. -void SendRpcResponse(int64_t correlation_id, - Controller* cntl, - RpcPBMessages* messages, - const Server* server, - MethodStatus* method_status, - int64_t received_us) { +void SendRpcResponse(int64_t correlation_id, Controller* cntl, + RpcPBMessages* messages, const Server* server, + MethodStatus* method_status, int64_t received_us) { ControllerPrivateAccessor accessor(cntl); Span* span = accessor.span(); if (span) { @@ -197,24 +194,29 @@ void SendRpcResponse(int64_t correlation_id, } Socket* sock = accessor.get_sending_socket(); - std::unique_ptr recycle_cntl(cntl); - ConcurrencyRemover concurrency_remover(method_status, cntl, received_us); + const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request(); + const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response(); + + // Recycle resources at the end of this function. + BRPC_SCOPE_EXIT { + { + // Remove concurrency and record latency at first. + ConcurrencyRemover concurrency_remover(method_status, cntl, received_us); + } + + std::unique_ptr recycle_cntl(cntl); - auto messages_guard = butil::MakeScopeGuard([server, messages] { if (NULL == messages) { return; } - if (NULL != server->options().baidu_master_service) { - BaiduProxyPBMessages::Return(static_cast(messages)); - } else { + + cntl->CallAfterRpcResp(req, res); + if (NULL == server->options().baidu_master_service) { server->options().rpc_pb_message_factory->Return(messages); + } else { + BaiduProxyPBMessages::Return(static_cast(messages)); } - }); - - const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request(); - const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response(); - ClosureGuard guard(brpc::NewCallback( - cntl, &Controller::CallAfterRpcResp, req, res)); + }; StreamIds response_stream_ids = accessor.response_streams(); @@ -299,32 +301,37 @@ void SendRpcResponse(int64_t correlation_id, } } + ResponseWriteInfo args; + bthread_id_t response_id = INVALID_BTHREAD_ID; if (span) { span->set_response_size(res_buf.size()); + CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); } + // Send rpc response over stream even if server side failed to create // stream for some reason. - if(cntl->has_remote_stream()){ + if (cntl->has_remote_stream()) { // Send the response over stream to notify that this stream connection // is successfully built. // Response_stream can be INVALID_STREAM_ID when error occurs. if (SendStreamData(sock, &res_buf, accessor.remote_stream_settings()->stream_id(), - response_stream_id) != 0) { - const int errcode = errno; - std::string error_text = butil::string_printf(64, "Fail to write into %s", - sock->description().c_str()); - PLOG_IF(WARNING, errcode != EPIPE) << error_text; - cntl->SetFailed(errcode, "%s", error_text.c_str()); - Stream::SetFailed(response_stream_ids, errcode, "%s", - error_text.c_str()); + response_stream_id, response_id) != 0) { + error_code = errno; + PLOG_IF(WARNING, error_code != EPIPE) + << "Fail to write into " << sock->description(); + cntl->SetFailed(error_code, "Fail to write into %s", + sock->description().c_str()); + Stream::SetFailed(response_stream_ids, error_code, + "Fail to write into %s", + sock->description().c_str()); return; } // Now it's ok the mark these server-side streams as connected as all the // written user data would follower the RPC response. // Reuse stream_ptr to avoid address first stream id again - if(stream_ptr) { + if (stream_ptr) { ((Stream*)stream_ptr->conn())->SetConnected(); } for (size_t i = 1; i < response_stream_ids.size(); ++i) { @@ -344,6 +351,10 @@ void SendRpcResponse(int64_t correlation_id, // users to set max_concurrency. Socket::WriteOptions wopt; wopt.ignore_eovercrowded = true; + if (INVALID_BTHREAD_ID != response_id) { + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } if (sock->Write(&res_buf, &wopt) != 0) { const int errcode = errno; PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; @@ -354,8 +365,10 @@ void SendRpcResponse(int64_t correlation_id, } if (span) { + bthread_id_join(response_id); + // Do not care about the result of background writing. // TODO: this is not sent - span->set_sent_us(butil::cpuwide_time_us()); + span->set_sent_us(args.sent_us); } } diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index 444be39708..07fc7da88f 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -934,8 +934,15 @@ HttpResponseSender::~HttpResponseSender() { int rc = -1; // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. + ResponseWriteInfo args; Socket::WriteOptions wopt; wopt.ignore_eovercrowded = true; + bthread_id_t response_id = INVALID_BTHREAD_ID; + if (span) { + CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } if (is_http2) { if (is_grpc) { // Append compressed and length before body @@ -980,9 +987,12 @@ HttpResponseSender::~HttpResponseSender() { cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str()); return; } + if (span) { + bthread_id_join(response_id); + // Do not care about the result of background writing. // TODO: this is not sent - span->set_sent_us(butil::cpuwide_time_us()); + span->set_sent_us(args.sent_us); } } diff --git a/src/brpc/policy/hulu_pbrpc_protocol.cpp b/src/brpc/policy/hulu_pbrpc_protocol.cpp index bf4bd86f5c..02ec8efcad 100644 --- a/src/brpc/policy/hulu_pbrpc_protocol.cpp +++ b/src/brpc/policy/hulu_pbrpc_protocol.cpp @@ -304,8 +304,15 @@ static void SendHuluResponse(int64_t correlation_id, // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. + ResponseWriteInfo args; Socket::WriteOptions wopt; wopt.ignore_eovercrowded = true; + bthread_id_t response_id = INVALID_BTHREAD_ID; + if (span) { + CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } if (sock->Write(&res_buf, &wopt) != 0) { const int errcode = errno; PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; @@ -313,9 +320,12 @@ static void SendHuluResponse(int64_t correlation_id, sock->description().c_str()); return; } + if (span) { + bthread_id_join(response_id); + // Do not care about the result of background writing. // TODO: this is not sent - span->set_sent_us(butil::cpuwide_time_us()); + span->set_sent_us(args.sent_us); } } diff --git a/src/brpc/policy/nshead_protocol.cpp b/src/brpc/policy/nshead_protocol.cpp index 4288085d68..a26dc96857 100644 --- a/src/brpc/policy/nshead_protocol.cpp +++ b/src/brpc/policy/nshead_protocol.cpp @@ -95,6 +95,7 @@ void NsheadClosure::Run() { return; } + int64_t sent_us = 0; if (_do_respond) { // response uses request's head as default. // Notice that the response use request.head.log_id directly rather @@ -112,8 +113,15 @@ void NsheadClosure::Run() { write_buf.append(_response.body.movable()); // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. + ResponseWriteInfo args; Socket::WriteOptions wopt; wopt.ignore_eovercrowded = true; + bthread_id_t response_id = INVALID_BTHREAD_ID; + if (span) { + CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } if (sock->Write(&write_buf, &wopt) != 0) { const int errcode = errno; PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; @@ -121,10 +129,16 @@ void NsheadClosure::Run() { sock->description().c_str()); return; } + + if (span) { + bthread_id_join(response_id); + // Do not care about the result of background writing. + sent_us = args.sent_us; + } } if (span) { // TODO: this is not sent - span->set_sent_us(butil::cpuwide_time_us()); + span->set_sent_us(0 == sent_us ? butil::cpuwide_time_us() : sent_us); } } diff --git a/src/brpc/policy/sofa_pbrpc_protocol.cpp b/src/brpc/policy/sofa_pbrpc_protocol.cpp index ae128b4051..9ee772dcff 100644 --- a/src/brpc/policy/sofa_pbrpc_protocol.cpp +++ b/src/brpc/policy/sofa_pbrpc_protocol.cpp @@ -281,8 +281,15 @@ static void SendSofaResponse(int64_t correlation_id, } // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. + ResponseWriteInfo args; Socket::WriteOptions wopt; wopt.ignore_eovercrowded = true; + bthread_id_t response_id = INVALID_BTHREAD_ID; + if (span) { + CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } if (sock->Write(&res_buf, &wopt) != 0) { const int errcode = errno; PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; @@ -290,9 +297,12 @@ static void SendSofaResponse(int64_t correlation_id, sock->description().c_str()); return; } + if (span) { + bthread_id_join(response_id); + // Do not care about the result of background writing. // TODO: this is not sent - span->set_sent_us(butil::cpuwide_time_us()); + span->set_sent_us(args.sent_us); } } diff --git a/src/brpc/policy/streaming_rpc_protocol.cpp b/src/brpc/policy/streaming_rpc_protocol.cpp index 85d2a7d536..0921d005e7 100644 --- a/src/brpc/policy/streaming_rpc_protocol.cpp +++ b/src/brpc/policy/streaming_rpc_protocol.cpp @@ -154,7 +154,8 @@ void SendStreamClose(Socket *sock, int64_t remote_stream_id, } int SendStreamData(Socket* sock, const butil::IOBuf* data, - int64_t remote_stream_id, int64_t source_stream_id) { + int64_t remote_stream_id, int64_t source_stream_id, + bthread_id_t response_id) { CHECK(sock != NULL); StreamFrameMeta fm; fm.set_stream_id(remote_stream_id); @@ -164,6 +165,10 @@ int SendStreamData(Socket* sock, const butil::IOBuf* data, butil::IOBuf out; PackStreamMessage(&out, fm, data); Socket::WriteOptions wopt; + if (INVALID_BTHREAD_ID != response_id) { + wopt.id_wait = response_id; + wopt.notify_on_success = true; + } wopt.ignore_eovercrowded = true; return sock->Write(&out, &wopt); } diff --git a/src/brpc/policy/streaming_rpc_protocol.h b/src/brpc/policy/streaming_rpc_protocol.h index ff9e207a19..22023e7c3f 100644 --- a/src/brpc/policy/streaming_rpc_protocol.h +++ b/src/brpc/policy/streaming_rpc_protocol.h @@ -19,10 +19,10 @@ #ifndef BRPC_STREAMING_RPC_PROTOCOL_H #define BRPC_STREAMING_RPC_PROTOCOL_H +#include "bthread/types.h" #include "brpc/protocol.h" #include "brpc/streaming_rpc_meta.pb.h" - namespace brpc { namespace policy { @@ -41,7 +41,8 @@ void SendStreamClose(Socket *sock, int64_t remote_stream_id, int64_t source_stream_id); int SendStreamData(Socket* sock, const butil::IOBuf* data, - int64_t remote_stream_id, int64_t source_stream_id); + int64_t remote_stream_id, int64_t source_stream_id, + bthread_id_t); } // namespace policy } // namespace brpc