Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion codex-rs/codex-api/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ pub struct MemorySummarizeOutput {

#[derive(Debug)]
pub enum ResponseEvent {
Created,
Created {
response_id: Option<String>,
},
OutputItemDone(ResponseItem),
OutputItemAdded(ResponseItem),
/// Emitted when the server includes `OpenAI-Model` on the stream response.
Expand Down
33 changes: 32 additions & 1 deletion codex-rs/codex-api/src/endpoint/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::requests::headers::build_conversation_headers;
use crate::requests::headers::insert_header;
use crate::requests::headers::subagent_header;
use crate::sse::spawn_response_stream;
use crate::stream_lifecycle::ResponseStreamLifecycleOptions;
use crate::telemetry::SseTelemetry;
use codex_client::HttpTransport;
use codex_client::RequestCompression;
Expand Down Expand Up @@ -70,6 +71,16 @@ impl<T: HttpTransport> ResponsesClient<T> {
&self,
request: ResponsesApiRequest,
options: ResponsesOptions,
) -> Result<ResponseStream, ApiError> {
self.stream_request_with_lifecycle(request, options, /*lifecycle*/ None)
.await
}

pub async fn stream_request_with_lifecycle(
&self,
request: ResponsesApiRequest,
options: ResponsesOptions,
lifecycle: Option<ResponseStreamLifecycleOptions>,
) -> Result<ResponseStream, ApiError> {
let ResponsesOptions {
conversation_id,
Expand All @@ -94,7 +105,8 @@ impl<T: HttpTransport> ResponsesClient<T> {
insert_header(&mut headers, "x-openai-subagent", &subagent);
}

self.stream(body, headers, compression, turn_state).await
self.stream_with_lifecycle(body, headers, compression, turn_state, lifecycle)
.await
}

fn path() -> &'static str {
Expand All @@ -118,6 +130,24 @@ impl<T: HttpTransport> ResponsesClient<T> {
extra_headers: HeaderMap,
compression: Compression,
turn_state: Option<Arc<OnceLock<String>>>,
) -> Result<ResponseStream, ApiError> {
self.stream_with_lifecycle(
body,
extra_headers,
compression,
turn_state,
/*lifecycle*/ None,
)
.await
}

pub async fn stream_with_lifecycle(
&self,
body: Value,
extra_headers: HeaderMap,
compression: Compression,
turn_state: Option<Arc<OnceLock<String>>>,
lifecycle: Option<ResponseStreamLifecycleOptions>,
) -> Result<ResponseStream, ApiError> {
let request_compression = match compression {
Compression::None => RequestCompression::None,
Expand Down Expand Up @@ -146,6 +176,7 @@ impl<T: HttpTransport> ResponsesClient<T> {
self.session.provider().stream_idle_timeout,
self.sse_telemetry.clone(),
turn_state,
lifecycle,
))
}
}
65 changes: 57 additions & 8 deletions codex-rs/codex-api/src/endpoint/responses_websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use crate::provider::Provider;
use crate::rate_limits::parse_rate_limit_event;
use crate::sse::ResponsesStreamEvent;
use crate::sse::process_responses_event;
use crate::stream_lifecycle::ResponseStreamLifecycleOptions;
use crate::stream_lifecycle::ResponseStreamLifecycleRecorder;
use crate::stream_lifecycle::ResponseStreamTerminalState;
use crate::stream_lifecycle::finalize_lifecycle_error;
use crate::telemetry::WebsocketTelemetry;
use codex_client::TransportError;
use codex_client::maybe_build_rustls_client_config_with_custom_ca;
Expand Down Expand Up @@ -214,6 +218,16 @@ impl ResponsesWebsocketConnection {
&self,
request: ResponsesWsRequest,
connection_reused: bool,
) -> Result<ResponseStream, ApiError> {
self.stream_request_with_lifecycle(request, connection_reused, /*lifecycle*/ None)
.await
}

pub async fn stream_request_with_lifecycle(
&self,
request: ResponsesWsRequest,
connection_reused: bool,
lifecycle: Option<ResponseStreamLifecycleOptions>,
) -> Result<ResponseStream, ApiError> {
let (tx_event, rx_event) =
mpsc::channel::<std::result::Result<ResponseEvent, ApiError>>(1600);
Expand Down Expand Up @@ -263,6 +277,7 @@ impl ResponsesWebsocketConnection {
idle_timeout,
telemetry,
connection_reused,
lifecycle,
)
.await
};
Expand Down Expand Up @@ -540,8 +555,10 @@ async fn run_websocket_response_stream(
idle_timeout: Duration,
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
connection_reused: bool,
lifecycle: Option<ResponseStreamLifecycleOptions>,
) -> Result<(), ApiError> {
let mut last_server_model: Option<String> = None;
let mut lifecycle = lifecycle.map(ResponseStreamLifecycleRecorder::new);
let request_text = match serde_json::to_string(&request_body) {
Ok(text) => text,
Err(err) => {
Expand Down Expand Up @@ -579,15 +596,27 @@ async fn run_websocket_response_stream(
let message = match response {
Ok(Some(Ok(msg))) => msg,
Ok(Some(Err(err))) => {
return Err(ApiError::Stream(err.to_string()));
let error = ApiError::Stream(err.to_string());
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::StreamError,
error,
));
}
Ok(None) => {
return Err(ApiError::Stream(
"stream closed before response.completed".into(),
let error = ApiError::Stream("stream closed before response.completed".into());
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::ClosedBeforeCompletion,
error,
));
}
Err(err) => {
return Err(err);
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::IdleTimeout,
err,
));
}
};

Expand All @@ -609,6 +638,9 @@ async fn run_websocket_response_stream(
}
};
let model_verifications = event.model_verifications();
if let Some(lifecycle) = lifecycle.as_mut() {
lifecycle.observe_event(&event);
}
if event.kind() == "codex.rate_limits" {
if let Some(snapshot) = parse_rate_limit_event(&text) {
let _ = tx_event.send(Ok(ResponseEvent::RateLimits(snapshot))).await;
Expand Down Expand Up @@ -638,21 +670,38 @@ async fn run_websocket_response_stream(
let is_completed = matches!(event, ResponseEvent::Completed { .. });
let _ = tx_event.send(Ok(event)).await;
if is_completed {
if let Some(lifecycle) = lifecycle.as_mut() {
lifecycle.finalize_completed();
}
break;
}
}
Ok(None) => {}
Err(error) => {
return Err(error.into_api_error());
let error = error.into_api_error();
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::StreamError,
error,
));
}
}
}
Message::Binary(_) => {
return Err(ApiError::Stream("unexpected binary websocket event".into()));
let error = ApiError::Stream("unexpected binary websocket event".into());
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::StreamError,
error,
));
}
Message::Close(_) => {
return Err(ApiError::Stream(
"websocket closed by server before response.completed".into(),
let error =
ApiError::Stream("websocket closed by server before response.completed".into());
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::ClosedBeforeCompletion,
error,
));
}
Message::Frame(_) => {}
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/codex-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub(crate) mod provider;
pub(crate) mod rate_limits;
pub(crate) mod requests;
pub(crate) mod sse;
mod stream_lifecycle;
pub(crate) mod telemetry;

pub use crate::requests::headers::build_conversation_headers;
Expand Down Expand Up @@ -63,6 +64,8 @@ pub use crate::provider::RetryConfig;
pub use crate::provider::is_azure_responses_provider;
pub use crate::requests::Compression;
pub use crate::sse::stream_from_fixture;
pub use crate::stream_lifecycle::ResponseStreamLifecycleOptions;
pub use crate::stream_lifecycle::ResponseStreamTransport;
pub use crate::telemetry::SseTelemetry;
pub use crate::telemetry::WebsocketTelemetry;
pub use codex_protocol::protocol::RealtimeAudioFrame;
Expand Down
Loading
Loading