diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index b16ab545e2..d4dbab951b 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -298,6 +298,7 @@ void Controller::ResetPods() { _response_streams.clear(); _remote_stream_settings = NULL; _auth_flags = 0; + _rpc_received_us = 0; } Controller::Call::Call(Controller::Call* rhs) diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 6447441943..69d859ea8f 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -638,6 +638,17 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); return _response_content_type; } + // If brpc acts as a server, this interface exposes the time when the RPC was received from the + // socket. This function can be used in scenarios where the user code needs to understand the RPC + // reception time, such as for precise control of timeouts. Users will require timing to start + // from the receipt of the RPC. When the user processing function starts to handle the RPC, if + // it is found that the RPC has timed out, it will be directly discarded + void set_rpc_received_us(int64_t received_us) { _rpc_received_us = received_us; } + + // Get the received time of RPC (in microseconds), if the returned value is 0, it means that + // the received time of RPC is not recorded in the controller. + int64_t get_rpc_received_us() const { return _rpc_received_us; } + private: struct CompletionInfo { CallId id; // call_id of the corresponding request @@ -909,6 +920,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); uint32_t _auth_flags; AfterRpcRespFnType _after_rpc_resp_fn; + + // The point in time when the rpc is read from the socket + int64_t _rpc_received_us; }; // Advises the RPC system that the caller desires that the RPC call be diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index fcc8b824c6..5adf77b2c5 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -615,6 +615,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { cntl->set_request_content_type(meta.content_type()); cntl->set_request_compress_type((CompressType)meta.compress_type()); cntl->set_request_checksum_type((ChecksumType)meta.checksum_type()); + cntl->set_rpc_received_us(msg->received_us()); accessor.set_checksum_value(meta.checksum_value()); accessor.set_server(server) .set_security_mode(security_mode) @@ -943,6 +944,7 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { } } + cntl->set_rpc_received_us(msg->received_us()); Span* span = accessor.span(); if (span) { span->set_base_real_us(msg->base_real_us());