Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions codex-rs/exec-server/src/client/reqwest_http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! - in a remote environment, that means the remote runtime after the
//! orchestrator has forwarded `http/request` over JSON-RPC

use std::error::Error as StdError;
use std::time::Duration;

use codex_app_server_protocol::JSONRPCErrorError;
Expand Down Expand Up @@ -135,15 +136,21 @@ impl ReqwestHttpRequestRunner {
}

let headers = Self::build_headers(params.headers)?;
let mut request = self.client.request(method, url).headers(headers);
let mut request = self.client.request(method.clone(), url).headers(headers);
if let Some(body) = params.body {
request = request.body(body.into_inner());
}

let response = request
.send()
.await
.map_err(|error| internal_error(format!("http/request failed: {error}")))?;
let response = match request.send().await {
Ok(response) => response,
Err(error) => {
let error_message = error.to_string();
log_send_error(&method, error);
return Err(internal_error(format!(
"http/request failed: {error_message}"
)));
}
};
let status = response.status().as_u16();
let headers = Self::response_headers(response.headers());

Expand Down Expand Up @@ -265,3 +272,26 @@ impl ReqwestHttpRequestRunner {
.collect()
}
}

fn log_send_error(method: &Method, error: reqwest::Error) {
let error = error.without_url();
let source_chain = error_source_chain(&error);
tracing::warn!(
http_method = method.as_str(),
error_is_timeout = error.is_timeout(),
error_is_connect = error.is_connect(),
error = %error,
error_sources = ?source_chain,
"http/request send failed"
);
}

fn error_source_chain(error: &reqwest::Error) -> Option<String> {
let mut sources = Vec::new();
let mut source = error.source();
while let Some(error) = source {
sources.push(error.to_string());
source = error.source();
}
(!sources.is_empty()).then(|| sources.join(": "))
}
69 changes: 65 additions & 4 deletions codex-rs/rmcp-client/src/http_client_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use reqwest::header::CONTENT_TYPE;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use rmcp::model::ClientJsonRpcMessage;
use rmcp::model::JsonRpcMessage;
use rmcp::model::ServerJsonRpcMessage;
use rmcp::transport::streamable_http_client::AuthRequiredError;
use rmcp::transport::streamable_http_client::InsufficientScopeError;
Expand Down Expand Up @@ -88,6 +89,8 @@ impl StreamableHttpClient for StreamableHttpClientAdapter {
auth_token: Option<String>,
custom_headers: HashMap<HeaderName, reqwest::header::HeaderValue>,
) -> std::result::Result<StreamableHttpPostResponse, StreamableHttpError<Self::Error>> {
let (mcp_method, mcp_request_id) = client_jsonrpc_message_fields(&message);
let has_session_id = session_id.is_some();
let mut headers = self.default_headers.clone();
headers.extend(custom_headers);
self.add_auth_headers(&mut headers);
Expand Down Expand Up @@ -121,7 +124,8 @@ impl StreamableHttpClient for StreamableHttpClientAdapter {
}

let body = serde_json::to_vec(&message).map_err(StreamableHttpError::Deserialize)?;
let (response, mut body_stream) = self
let has_authorization_header = headers.contains_key(AUTHORIZATION);
let response = self
.http_client
.http_request_stream(HttpRequestParams {
method: "POST".to_string(),
Expand All @@ -132,9 +136,22 @@ impl StreamableHttpClient for StreamableHttpClientAdapter {
request_id: "buffered-request".to_string(),
stream_response: true,
})
.await
.map_err(StreamableHttpClientAdapterError::from)
.map_err(StreamableHttpError::Client)?;
.await;
let (response, mut body_stream) = match response {
Ok(response) => response,
Err(error) => {
log_post_message_http_error(
&uri,
mcp_method.as_deref(),
mcp_request_id.as_deref(),
has_session_id,
has_authorization_header,
);
return Err(StreamableHttpError::Client(
StreamableHttpClientAdapterError::from(error),
));
}
};

if response.status == StatusCode::NOT_FOUND.as_u16() && session_id.is_some() {
return Err(StreamableHttpError::Client(
Expand Down Expand Up @@ -354,6 +371,50 @@ fn body_preview(body: impl Into<String>) -> String {
body_preview
}

fn client_jsonrpc_message_fields(
message: &ClientJsonRpcMessage,
) -> (Option<String>, Option<String>) {
match message {
JsonRpcMessage::Request(request) => (
Some(request.request.method().to_string()),
Some(request.id.to_string()),
),
JsonRpcMessage::Response(response) => (None, Some(response.id.to_string())),
JsonRpcMessage::Notification(_) => (None, None),
JsonRpcMessage::Error(error) => (None, error.id.as_ref().map(ToString::to_string)),
}
}

fn log_post_message_http_error(
uri: &str,
mcp_method: Option<&str>,
mcp_request_id: Option<&str>,
has_session_id: bool,
has_authorization_header: bool,
) {
let parsed_url = reqwest::Url::parse(uri).ok();
tracing::warn!(
endpoint_scheme = parsed_url
.as_ref()
.map(reqwest::Url::scheme)
.unwrap_or("<invalid>"),
endpoint_host = parsed_url
.as_ref()
.and_then(reqwest::Url::host_str)
.unwrap_or("<invalid>"),
endpoint_path = parsed_url
.as_ref()
.map(reqwest::Url::path)
.unwrap_or("<invalid>"),
endpoint_has_query = parsed_url.as_ref().is_some_and(|url| url.query().is_some()),
mcp_method = mcp_method.unwrap_or("<none>"),
mcp_request_id = mcp_request_id.unwrap_or("<none>"),
has_session_id = has_session_id,
has_authorization_header = has_authorization_header,
"streamable HTTP post_message failed"
);
}

fn insert_header<Error>(
headers: &mut HeaderMap,
name: HeaderName,
Expand Down
Loading