From 0d97dc03be5fa752292f550d4e830e320a64d269 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 10:41:21 +0000 Subject: [PATCH 01/16] =?UTF-8?q?=F0=9F=A4=96=20docs:=20add=20OpenAI=20web?= =?UTF-8?q?socket=20transport=20PRD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Document the shared glossary and PRD for adding an opt-in OpenAI WebSocket transport setting. --- _Generated with `mux` • Model: `openai:gpt-5.5` • Thinking: `high` • Cost: `114722{MUX_COSTS_USD-0}`_ --- CONTEXT.md | 49 ++++++++++++++++++++ PRD.md | 130 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 179 insertions(+) create mode 100644 CONTEXT.md create mode 100644 PRD.md diff --git a/CONTEXT.md b/CONTEXT.md new file mode 100644 index 0000000000..f13b3e5a57 --- /dev/null +++ b/CONTEXT.md @@ -0,0 +1,49 @@ +# Mux Provider Configuration + +Mux provider configuration describes which model providers are available and how Mux routes model requests through them. + +## Language + +**Built-in OpenAI Provider**: +The first-party OpenAI provider settings surface in Mux, covering OpenAI API key and Codex OAuth configuration paths. +_Avoid_: OpenAI-compatible provider, GitHub Copilot provider + +**Direct OpenAI API Key Path**: +The **Built-in OpenAI Provider** path that talks to OpenAI's platform API with an OpenAI API key. +_Avoid_: Codex OAuth path, OpenAI-compatible provider + +**OpenAI WebSocket Transport**: +An optional transport for the **Built-in OpenAI Provider** that uses OpenAI's Responses WebSocket path for eligible streaming responses requests. +_Avoid_: WebSocket provider, OpenAI-compatible WebSocket transport + +**webSocketTransportEnabled**: +The persisted opt-in flag that enables the **OpenAI WebSocket Transport** for eligible **Built-in OpenAI Provider** requests. +_Avoid_: transport, websocket, useWebSocketTransport + +## Relationships + +- The **OpenAI WebSocket Transport** belongs to the built-in OpenAI provider settings surface. +- Codex OAuth routing is not a supported **OpenAI WebSocket Transport** scope, and Mux does not add a special guard solely to protect it from an opt-in WebSocket attempt. +- The **OpenAI WebSocket Transport** applies only to eligible streaming Responses API requests. +- The **OpenAI WebSocket Transport** is inactive when the **Built-in OpenAI Provider** uses Chat Completions wire format. +- Switching to Chat Completions wire format preserves `webSocketTransportEnabled` but disables the active WebSocket transport until Responses wire format is restored. +- A WebSocket connection lives for one streaming model run: it can be reused by internal tool-calling steps and is closed when the run completes, errors, or is cancelled. +- Eligible WebSocket request failures are surfaced to the user; Mux does not automatically retry them over HTTP. +- WebSocket transport behavior is verified with deterministic config, UI, provider factory, and lifecycle tests; live OpenAI dogfooding is optional when credentials are available. +- Mux uses the published `ai-sdk-openai-websocket-fetch` package for the **OpenAI WebSocket Transport** instead of owning the WebSocket protocol locally. +- Mux composes **OpenAI WebSocket Transport** through a small helper that preserves existing OpenAI fetch behavior and exposes a close hook for stream lifecycle cleanup. +- Mux carries WebSocket cleanup on a Mux-owned language-model cleanup symbol so stream owners can run it in their existing cleanup paths without changing the provider model factory API shape. +- Every `streamText` owner that uses `createModel()`-returned models, including main streams and workspace title generation, runs the model cleanup helper in its stream cleanup path. +- The **OpenAI WebSocket Transport** is a persisted **Built-in OpenAI Provider** setting, not a request-level override. +- The OpenAI provider settings UI exposes `webSocketTransportEnabled` near Wire Format and disables the control while Chat Completions wire format is selected. +- The OpenAI provider settings UI describes the **OpenAI WebSocket Transport** as experimental and warns that unsupported endpoints may fail. +- The **OpenAI WebSocket Transport** does not validate configured base URLs; if the selected endpoint does not support OpenAI's Responses WebSocket path, the first eligible request fails normally. + +## Example dialogue + +> **Dev:** "Should the **OpenAI WebSocket Transport** apply to OpenAI-compatible providers?" +> **Domain expert:** "No — it belongs only to the **Built-in OpenAI Provider** for the initial opt-in feature." + +## Flagged ambiguities + +- "OpenAI provider" can mean the **Built-in OpenAI Provider**, the **Direct OpenAI API Key Path**, or an OpenAI-compatible provider; resolved: this feature targets the **Built-in OpenAI Provider** settings surface only. diff --git a/PRD.md b/PRD.md new file mode 100644 index 0000000000..7fccae943e --- /dev/null +++ b/PRD.md @@ -0,0 +1,130 @@ +## Problem Statement + +Mux users who use the **Built-in OpenAI Provider** currently send OpenAI model requests over the existing HTTP transport. OpenAI's Responses WebSocket transport can reduce setup overhead for streaming Responses API workflows, especially multi-step tool-calling runs, but Mux has no first-class way to opt into it. Users should be able to enable the **OpenAI WebSocket Transport** without breaking existing OpenAI, OpenAI-compatible, Chat Completions, Codex OAuth, or custom endpoint configurations. + +## Solution + +Add an optional, non-breaking **Built-in OpenAI Provider** setting named `webSocketTransportEnabled`. When enabled and the provider is using Responses wire format, Mux will use OpenAI's Responses WebSocket transport for eligible streaming Responses API requests. The setting will be exposed in the OpenAI provider settings UI near Wire Format, persisted with the provider configuration, and disabled in the UI while Chat Completions wire format is selected. + +The implementation will use the published OpenAI WebSocket transport package rather than implementing the protocol in Mux. Mux will compose the package through a small, testable integration helper that preserves existing OpenAI fetch behavior, including Mux attribution, timeout behavior, DevTools header handling, and existing request normalization. WebSocket connections will live for one streaming model run, can be reused by internal AI SDK tool-calling steps, and will be explicitly closed when the run completes, errors, or is cancelled. + +## User Stories + +1. As a Mux user, I want to opt into OpenAI's WebSocket transport, so that eligible OpenAI Responses streams can use the lower-overhead transport. +2. As a Mux user, I want the WebSocket transport setting to be optional, so that existing OpenAI behavior remains unchanged unless I opt in. +3. As a Mux user, I want the WebSocket transport setting to persist in provider configuration, so that I do not need to re-enable it for every session. +4. As a Mux user, I want to configure WebSocket transport from the OpenAI provider settings UI, so that I do not need to edit configuration files manually. +5. As a Mux user, I want the setting to appear near Wire Format, so that I understand it is related to Responses versus Chat Completions behavior. +6. As a Mux user, I want the UI to explain that the WebSocket transport is experimental, so that I understand the risk of endpoint failures. +7. As a Mux user, I want the UI to warn that unsupported endpoints may fail, so that failures after enabling the setting are understandable. +8. As a Mux user, I want the WebSocket transport control to be disabled when Chat Completions wire format is selected, so that I do not expect Chat Completions requests to use WebSockets. +9. As a Mux user, I want switching to Chat Completions to preserve my saved WebSocket preference, so that switching back to Responses restores my intended WebSocket behavior. +10. As a Mux user, I want the setting to be non-breaking for existing configurations, so that upgrading Mux does not change transport behavior unexpectedly. +11. As a Mux user, I want Mux to avoid over-validating custom OpenAI base URLs, so that intentionally configured endpoints are not blocked by Mux-specific assumptions. +12. As a Mux user, I want unsupported WebSocket endpoints to fail normally, so that I can decide whether to disable the setting or fix the endpoint. +13. As a Mux user, I want Mux not to retry failed WebSocket requests over HTTP automatically, so that I do not accidentally duplicate provider-side work or tool-call flows. +14. As a Mux user, I want eligible multi-step tool-calling runs to reuse one WebSocket connection within the run, so that the transport can provide benefit across internal model steps. +15. As a Mux user, I want WebSocket connections to close when a stream completes, so that Mux does not leave unnecessary sockets open. +16. As a Mux user, I want WebSocket connections to close when a stream errors, so that failed streams do not leak resources. +17. As a Mux user, I want WebSocket connections to close when a stream is cancelled, so that interrupting a run cleans up transport resources. +18. As a Mux user, I want workspace title generation to clean up WebSocket resources too, so that background model uses do not leak sockets. +19. As a Mux user, I want Codex OAuth behavior not to receive special protective gating solely for WebSocket mode, so that the implementation stays simple and opt-in failures surface naturally. +20. As a Mux user, I want OpenAI-compatible providers to remain outside the WebSocket feature scope, so that provider-specific transport behavior does not accidentally affect unrelated providers. +21. As a Mux maintainer, I want the feature implemented through a small composition helper, so that WebSocket integration can be tested independently from the rest of provider construction. +22. As a Mux maintainer, I want the feature to use the published WebSocket transport package, so that Mux does not own protocol details already maintained upstream. +23. As a Mux maintainer, I want the provider model factory API shape to remain stable, so that adding WebSocket cleanup does not create broad call-site churn. +24. As a Mux maintainer, I want cleanup to be carried by a Mux-owned language-model cleanup symbol, so that stream owners can clean up resources without exposing package-specific details. +25. As a Mux maintainer, I want deterministic tests for config/schema behavior, so that the persisted opt-in field is accepted and invalid values are not surfaced. +26. As a Mux maintainer, I want deterministic tests for provider status behavior, so that the UI receives the WebSocket setting only when it is valid. +27. As a Mux maintainer, I want deterministic tests for provider construction behavior, so that WebSocket fetch is used only for enabled Responses-mode OpenAI requests. +28. As a Mux maintainer, I want deterministic tests for Chat Completions gating, so that the setting is inactive while Chat Completions wire format is selected. +29. As a Mux maintainer, I want lifecycle tests for cleanup, so that WebSocket close behavior is verified on completion, error, and cancellation paths. +30. As a Mux maintainer, I want UI tests for the OpenAI provider settings control, so that users can discover and correctly interpret the setting. +31. As a reviewer, I want optional live dogfooding instructions, so that the feature can be validated against a real OpenAI endpoint when credentials are available. +32. As a reviewer, I want screenshots and a recording from manual dogfooding when available, so that I can verify the setting's UI and runtime behavior without repeating every step. + +## Implementation Decisions + +- Add a persisted **Built-in OpenAI Provider** boolean setting named `webSocketTransportEnabled`. +- Treat absence of `webSocketTransportEnabled` as disabled so the change is non-breaking. +- Surface valid `webSocketTransportEnabled` values through the provider configuration information consumed by settings UI. +- Expose the setting in the OpenAI provider settings UI near Wire Format. +- Disable the UI control when Chat Completions wire format is selected. +- Preserve the saved WebSocket setting when Chat Completions wire format is selected; only make the transport inactive. +- Use risk-aware UI copy: the feature is experimental, uses OpenAI's Responses WebSocket transport for streaming Responses API requests, and unsupported endpoints may fail. +- Do not validate configured OpenAI base URLs before attempting WebSocket transport. +- Do not add automatic HTTP fallback for eligible WebSocket request failures. +- Do not broaden the feature to OpenAI-compatible providers in the initial implementation. +- Treat Codex OAuth as not a supported WebSocket scope, while avoiding a special guard solely to protect Codex OAuth from an opt-in attempt. +- Use the published OpenAI WebSocket transport package instead of implementing the WebSocket protocol locally. +- Add a small deep module for composing OpenAI provider fetch behavior with the WebSocket transport. Its interface should hide package details behind simple inputs such as whether WebSocket mode is active and a returned close hook. +- Preserve existing OpenAI fetch behavior when composing the WebSocket transport, including request header handling, Mux attribution, DevTools header stripping, timeout behavior, custom fetch compatibility, and existing request normalization. +- Keep the provider model factory return shape unchanged. +- Add a Mux-owned language-model cleanup helper that can attach cleanup to a model, run cleanup once, and make double cleanup harmless. +- Run language-model cleanup in every stream owner that uses models created by the provider model factory, including main chat/agent streams and workspace title generation. +- Use per-stream WebSocket lifecycle: create for the stream run, allow reuse across internal AI SDK tool-calling steps, and close on completion, error, or cancellation. +- Do not create an ADR for this iteration because the feature is optional, reversible, and sufficiently covered by the glossary plus implementation comments. + +## Testing Decisions + +- Tests should focus on externally observable behavior rather than implementation details. Good tests should prove what users, provider status consumers, stream owners, or provider constructors observe; they should not assert private helper internals unless testing a deliberately extracted deep module through its public interface. +- Test the provider configuration schema accepts `webSocketTransportEnabled` as an optional boolean on the **Built-in OpenAI Provider**. +- Test invalid `webSocketTransportEnabled` values are not surfaced as valid provider status. +- Test provider status includes `webSocketTransportEnabled` when it is configured with a valid boolean. +- Test provider construction uses the WebSocket transport only when `webSocketTransportEnabled` is true and the effective wire format is Responses. +- Test provider construction does not activate WebSocket transport when the effective wire format is Chat Completions. +- Test the OpenAI provider settings UI renders a WebSocket transport control near Wire Format. +- Test the WebSocket transport UI control persists changes to provider configuration. +- Test the WebSocket transport UI control is disabled when Chat Completions wire format is selected. +- Test switching to Chat Completions does not delete the saved `webSocketTransportEnabled` value. +- Test the cleanup helper runs an attached cleanup once and tolerates repeated cleanup calls. +- Test main stream cleanup runs the language-model cleanup helper on completion, error, and cancellation paths. +- Test workspace title generation runs the language-model cleanup helper after its stream attempt completes or fails. +- Mock the published WebSocket transport package in lifecycle tests so tests do not require network access. +- Use existing provider configuration tests, provider service tests, provider model factory tests, settings UI tests, and stream lifecycle tests as prior art. +- Avoid tautological tests that only assert exact UI prose. UI tests should assert behavior such as visibility, disabled state, persisted mutations, and relationship to Wire Format. +- Live OpenAI dogfooding is optional and should not be required in CI because it depends on credentials, network access, endpoint support, and provider billing. + +## Out of Scope + +- Supporting OpenAI-compatible providers with WebSocket transport. +- Implementing the WebSocket protocol directly in Mux. +- Process-wide or cross-stream WebSocket connection caching. +- Automatic HTTP fallback after WebSocket failures. +- Base URL validation or URL derivation for custom endpoints. +- A separate custom WebSocket URL setting. +- Request-level or workspace-level overrides for WebSocket transport. +- Guaranteeing Codex OAuth WebSocket support. +- Adding an ADR for this initial opt-in implementation. +- Making live OpenAI WebSocket tests part of required automated validation. + +## Further Notes + +### Acceptance Criteria + +- Existing OpenAI users see no transport behavior change unless `webSocketTransportEnabled` is explicitly enabled. +- The **Built-in OpenAI Provider** configuration accepts and persists `webSocketTransportEnabled` as an optional boolean. +- The OpenAI provider settings UI exposes the setting near Wire Format. +- The UI disables the setting while Chat Completions wire format is selected and preserves the saved value. +- Enabled Responses-mode OpenAI streams use the published WebSocket transport through Mux's composition layer. +- Chat Completions-mode OpenAI streams do not activate the WebSocket transport. +- WebSocket failures surface normally without automatic HTTP fallback. +- WebSocket resources are closed on stream completion, error, and cancellation for all stream owners that use provider-created models. +- Deterministic automated tests cover config, provider status, provider construction, UI gating, and cleanup lifecycle behavior. + +### Dogfooding Plan + +1. Start the Mux dev environment with an OpenAI-capable provider configuration. +2. Open Settings and navigate to Providers, then expand the OpenAI provider settings. +3. Verify the WebSocket transport control appears near Wire Format with experimental/risk-aware helper copy. +4. With Responses wire format selected, enable WebSocket transport and confirm the provider configuration persists the setting. +5. Send a short prompt using an OpenAI Responses model and verify the response streams successfully, or that an unsupported endpoint failure is surfaced clearly. +6. Switch Wire Format to Chat Completions and verify the WebSocket transport control becomes disabled while the saved preference is preserved. +7. Switch Wire Format back to Responses and verify the previously saved WebSocket preference is still reflected. +8. Interrupt or cancel a streaming response and verify the app remains stable and no follow-up stream is blocked by leaked transport state. +9. Capture screenshots of the settings UI in enabled and Chat Completions-disabled states. +10. Capture a short recording of enabling the setting, sending a prompt, and switching Wire Format to demonstrate the complete reviewer-visible flow. + +### Issue Tracker Note + +If this PRD is later published to the issue tracker, apply the `needs-triage` label so it enters normal triage. From 7b496deeb9384dc7f81b2d31f151b312f7f63a0a Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 11:41:20 +0000 Subject: [PATCH 02/16] Add OpenAI WebSocket transport opt-in --- CONTEXT.md | 6 +- PRD.md | 18 +-- bun.lock | 3 + package.json | 1 + .../Sections/ProvidersSection.test.tsx | 60 +++++++++ .../Settings/Sections/ProvidersSection.tsx | 36 +++++ .../config/schemas/providersConfig.test.ts | 21 +++ src/common/config/schemas/providersConfig.ts | 1 + src/common/orpc/schemas/api.test.ts | 2 + src/common/orpc/schemas/api.ts | 1 + .../services/languageModelCleanup.test.ts | 82 ++++++++++++ src/node/services/languageModelCleanup.ts | 53 ++++++++ .../openAIWebSocketTransportFetch.test.ts | 125 ++++++++++++++++++ .../services/openAIWebSocketTransportFetch.ts | 96 ++++++++++++++ .../services/providerModelFactory.test.ts | 61 +++++++++ src/node/services/providerModelFactory.ts | 21 ++- src/node/services/providerService.test.ts | 36 +++++ src/node/services/providerService.ts | 5 + src/node/services/streamManager.test.ts | 97 +++++++++++++- src/node/services/streamManager.ts | 3 + .../services/workspaceTitleGenerator.test.ts | 55 +++++++- src/node/services/workspaceTitleGenerator.ts | 3 + 22 files changed, 770 insertions(+), 16 deletions(-) create mode 100644 src/node/services/languageModelCleanup.test.ts create mode 100644 src/node/services/languageModelCleanup.ts create mode 100644 src/node/services/openAIWebSocketTransportFetch.test.ts create mode 100644 src/node/services/openAIWebSocketTransportFetch.ts diff --git a/CONTEXT.md b/CONTEXT.md index f13b3e5a57..e4efa65bb0 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -26,16 +26,16 @@ _Avoid_: transport, websocket, useWebSocketTransport - Codex OAuth routing is not a supported **OpenAI WebSocket Transport** scope, and Mux does not add a special guard solely to protect it from an opt-in WebSocket attempt. - The **OpenAI WebSocket Transport** applies only to eligible streaming Responses API requests. - The **OpenAI WebSocket Transport** is inactive when the **Built-in OpenAI Provider** uses Chat Completions wire format. -- Switching to Chat Completions wire format preserves `webSocketTransportEnabled` but disables the active WebSocket transport until Responses wire format is restored. +- Switching to Chat Completions wire format preserves `webSocketTransportEnabled` but hides the UI control and disables the active WebSocket transport until Responses wire format is restored. - A WebSocket connection lives for one streaming model run: it can be reused by internal tool-calling steps and is closed when the run completes, errors, or is cancelled. - Eligible WebSocket request failures are surfaced to the user; Mux does not automatically retry them over HTTP. - WebSocket transport behavior is verified with deterministic config, UI, provider factory, and lifecycle tests; live OpenAI dogfooding is optional when credentials are available. -- Mux uses the published `ai-sdk-openai-websocket-fetch` package for the **OpenAI WebSocket Transport** instead of owning the WebSocket protocol locally. +- Mux uses the published `@vercel/ai-sdk-openai-websocket-fetch` package for the **OpenAI WebSocket Transport** instead of owning the WebSocket protocol locally. - Mux composes **OpenAI WebSocket Transport** through a small helper that preserves existing OpenAI fetch behavior and exposes a close hook for stream lifecycle cleanup. - Mux carries WebSocket cleanup on a Mux-owned language-model cleanup symbol so stream owners can run it in their existing cleanup paths without changing the provider model factory API shape. - Every `streamText` owner that uses `createModel()`-returned models, including main streams and workspace title generation, runs the model cleanup helper in its stream cleanup path. - The **OpenAI WebSocket Transport** is a persisted **Built-in OpenAI Provider** setting, not a request-level override. -- The OpenAI provider settings UI exposes `webSocketTransportEnabled` near Wire Format and disables the control while Chat Completions wire format is selected. +- The OpenAI provider settings UI exposes `webSocketTransportEnabled` near Wire Format only while Responses wire format is selected. - The OpenAI provider settings UI describes the **OpenAI WebSocket Transport** as experimental and warns that unsupported endpoints may fail. - The **OpenAI WebSocket Transport** does not validate configured base URLs; if the selected endpoint does not support OpenAI's Responses WebSocket path, the first eligible request fails normally. diff --git a/PRD.md b/PRD.md index 7fccae943e..955b11aa28 100644 --- a/PRD.md +++ b/PRD.md @@ -4,9 +4,9 @@ Mux users who use the **Built-in OpenAI Provider** currently send OpenAI model r ## Solution -Add an optional, non-breaking **Built-in OpenAI Provider** setting named `webSocketTransportEnabled`. When enabled and the provider is using Responses wire format, Mux will use OpenAI's Responses WebSocket transport for eligible streaming Responses API requests. The setting will be exposed in the OpenAI provider settings UI near Wire Format, persisted with the provider configuration, and disabled in the UI while Chat Completions wire format is selected. +Add an optional, non-breaking **Built-in OpenAI Provider** setting named `webSocketTransportEnabled`. When enabled and the provider is using Responses wire format, Mux will use OpenAI's Responses WebSocket transport for eligible streaming Responses API requests. The setting will be exposed in the OpenAI provider settings UI near Wire Format, persisted with the provider configuration, and hidden in the UI while Chat Completions wire format is selected. -The implementation will use the published OpenAI WebSocket transport package rather than implementing the protocol in Mux. Mux will compose the package through a small, testable integration helper that preserves existing OpenAI fetch behavior, including Mux attribution, timeout behavior, DevTools header handling, and existing request normalization. WebSocket connections will live for one streaming model run, can be reused by internal AI SDK tool-calling steps, and will be explicitly closed when the run completes, errors, or is cancelled. +The implementation will use the published `@vercel/ai-sdk-openai-websocket-fetch` package rather than implementing the protocol in Mux. Mux will compose the package through a small, testable integration helper that preserves existing OpenAI fetch behavior, including Mux attribution, timeout behavior, DevTools header handling, and existing request normalization. WebSocket connections will live for one streaming model run, can be reused by internal AI SDK tool-calling steps, and will be explicitly closed when the run completes, errors, or is cancelled. ## User Stories @@ -17,7 +17,7 @@ The implementation will use the published OpenAI WebSocket transport package rat 5. As a Mux user, I want the setting to appear near Wire Format, so that I understand it is related to Responses versus Chat Completions behavior. 6. As a Mux user, I want the UI to explain that the WebSocket transport is experimental, so that I understand the risk of endpoint failures. 7. As a Mux user, I want the UI to warn that unsupported endpoints may fail, so that failures after enabling the setting are understandable. -8. As a Mux user, I want the WebSocket transport control to be disabled when Chat Completions wire format is selected, so that I do not expect Chat Completions requests to use WebSockets. +8. As a Mux user, I want the WebSocket transport control to be hidden when Chat Completions wire format is selected, so that I do not expect Chat Completions requests to use WebSockets. 9. As a Mux user, I want switching to Chat Completions to preserve my saved WebSocket preference, so that switching back to Responses restores my intended WebSocket behavior. 10. As a Mux user, I want the setting to be non-breaking for existing configurations, so that upgrading Mux does not change transport behavior unexpectedly. 11. As a Mux user, I want Mux to avoid over-validating custom OpenAI base URLs, so that intentionally configured endpoints are not blocked by Mux-specific assumptions. @@ -49,14 +49,14 @@ The implementation will use the published OpenAI WebSocket transport package rat - Treat absence of `webSocketTransportEnabled` as disabled so the change is non-breaking. - Surface valid `webSocketTransportEnabled` values through the provider configuration information consumed by settings UI. - Expose the setting in the OpenAI provider settings UI near Wire Format. -- Disable the UI control when Chat Completions wire format is selected. +- Hide the UI control when Chat Completions wire format is selected. - Preserve the saved WebSocket setting when Chat Completions wire format is selected; only make the transport inactive. - Use risk-aware UI copy: the feature is experimental, uses OpenAI's Responses WebSocket transport for streaming Responses API requests, and unsupported endpoints may fail. - Do not validate configured OpenAI base URLs before attempting WebSocket transport. - Do not add automatic HTTP fallback for eligible WebSocket request failures. - Do not broaden the feature to OpenAI-compatible providers in the initial implementation. - Treat Codex OAuth as not a supported WebSocket scope, while avoiding a special guard solely to protect Codex OAuth from an opt-in attempt. -- Use the published OpenAI WebSocket transport package instead of implementing the WebSocket protocol locally. +- Use the published `@vercel/ai-sdk-openai-websocket-fetch` package instead of implementing the WebSocket protocol locally. - Add a small deep module for composing OpenAI provider fetch behavior with the WebSocket transport. Its interface should hide package details behind simple inputs such as whether WebSocket mode is active and a returned close hook. - Preserve existing OpenAI fetch behavior when composing the WebSocket transport, including request header handling, Mux attribution, DevTools header stripping, timeout behavior, custom fetch compatibility, and existing request normalization. - Keep the provider model factory return shape unchanged. @@ -75,7 +75,7 @@ The implementation will use the published OpenAI WebSocket transport package rat - Test provider construction does not activate WebSocket transport when the effective wire format is Chat Completions. - Test the OpenAI provider settings UI renders a WebSocket transport control near Wire Format. - Test the WebSocket transport UI control persists changes to provider configuration. -- Test the WebSocket transport UI control is disabled when Chat Completions wire format is selected. +- Test the WebSocket transport UI control is hidden when Chat Completions wire format is selected. - Test switching to Chat Completions does not delete the saved `webSocketTransportEnabled` value. - Test the cleanup helper runs an attached cleanup once and tolerates repeated cleanup calls. - Test main stream cleanup runs the language-model cleanup helper on completion, error, and cancellation paths. @@ -105,7 +105,7 @@ The implementation will use the published OpenAI WebSocket transport package rat - Existing OpenAI users see no transport behavior change unless `webSocketTransportEnabled` is explicitly enabled. - The **Built-in OpenAI Provider** configuration accepts and persists `webSocketTransportEnabled` as an optional boolean. - The OpenAI provider settings UI exposes the setting near Wire Format. -- The UI disables the setting while Chat Completions wire format is selected and preserves the saved value. +- The UI hides the setting while Chat Completions wire format is selected and preserves the saved value. - Enabled Responses-mode OpenAI streams use the published WebSocket transport through Mux's composition layer. - Chat Completions-mode OpenAI streams do not activate the WebSocket transport. - WebSocket failures surface normally without automatic HTTP fallback. @@ -119,10 +119,10 @@ The implementation will use the published OpenAI WebSocket transport package rat 3. Verify the WebSocket transport control appears near Wire Format with experimental/risk-aware helper copy. 4. With Responses wire format selected, enable WebSocket transport and confirm the provider configuration persists the setting. 5. Send a short prompt using an OpenAI Responses model and verify the response streams successfully, or that an unsupported endpoint failure is surfaced clearly. -6. Switch Wire Format to Chat Completions and verify the WebSocket transport control becomes disabled while the saved preference is preserved. +6. Switch Wire Format to Chat Completions and verify the WebSocket transport control is hidden while the saved preference is preserved. 7. Switch Wire Format back to Responses and verify the previously saved WebSocket preference is still reflected. 8. Interrupt or cancel a streaming response and verify the app remains stable and no follow-up stream is blocked by leaked transport state. -9. Capture screenshots of the settings UI in enabled and Chat Completions-disabled states. +9. Capture screenshots of the settings UI in enabled and Chat Completions-hidden states. 10. Capture a short recording of enabling the setting, sending a prompt, and switching Wire Format to demonstrate the complete reviewer-visible flow. ### Issue Tracker Note diff --git a/bun.lock b/bun.lock index cf46ed14b7..ffb78df0d2 100644 --- a/bun.lock +++ b/bun.lock @@ -46,6 +46,7 @@ "@radix-ui/react-toggle-group": "^1.1.11", "@radix-ui/react-tooltip": "^1.2.8", "@radix-ui/react-visually-hidden": "^1.2.4", + "@vercel/ai-sdk-openai-websocket-fetch": "^1.0.0", "@xterm/addon-serialize": "^0.14.0", "@xterm/headless": "^6.0.0", "ai": "^6.0.72", @@ -1587,6 +1588,8 @@ "@unrs/resolver-binding-win32-x64-msvc": ["@unrs/resolver-binding-win32-x64-msvc@1.11.1", "", { "os": "win32", "cpu": "x64" }, "sha512-lrW200hZdbfRtztbygyaq/6jP6AKE8qQN2KvPcJ+x7wiD038YtnYtZ82IMNJ69GJibV7bwL3y9FgK+5w/pYt6g=="], + "@vercel/ai-sdk-openai-websocket-fetch": ["@vercel/ai-sdk-openai-websocket-fetch@1.0.0", "", { "dependencies": { "ws": "^8" }, "peerDependencies": { "@ai-sdk/openai": ">=3" }, "optionalPeers": ["@ai-sdk/openai"] }, "sha512-QTBex1bJogUTdh8lcmgxQhYkRZDVDMEdRyJjqy/Chnzv0i8UpLB2imFZvXORNUNdvu51aNPwgsclvx51F0Pc+A=="], + "@vercel/oidc": ["@vercel/oidc@3.1.0", "", {}, "sha512-Fw28YZpRnA3cAHHDlkt7xQHiJ0fcL+NRcIqsocZQUSmbzeIKRpwttJjik5ZGanXP+vlA4SbTg+AbA3bP363l+w=="], "@vitejs/plugin-react": ["@vitejs/plugin-react@4.7.0", "", { "dependencies": { "@babel/core": "^7.28.0", "@babel/plugin-transform-react-jsx-self": "^7.27.1", "@babel/plugin-transform-react-jsx-source": "^7.27.1", "@rolldown/pluginutils": "1.0.0-beta.27", "@types/babel__core": "^7.20.5", "react-refresh": "^0.17.0" }, "peerDependencies": { "vite": "^4.2.0 || ^5.0.0 || ^6.0.0 || ^7.0.0" } }, "sha512-gUu9hwfWvvEDBBmgtAowQCojwZmJ5mcLn3aufeCsitijs3+f2NsrPtlAWIR6OPiqljl96GVCUbLe0HyqIpVaoA=="], diff --git a/package.json b/package.json index e5fe833cba..e76a7a864f 100644 --- a/package.json +++ b/package.json @@ -88,6 +88,7 @@ "@radix-ui/react-toggle-group": "^1.1.11", "@radix-ui/react-tooltip": "^1.2.8", "@radix-ui/react-visually-hidden": "^1.2.4", + "@vercel/ai-sdk-openai-websocket-fetch": "^1.0.0", "@xterm/addon-serialize": "^0.14.0", "@xterm/headless": "^6.0.0", "ai": "^6.0.72", diff --git a/src/browser/features/Settings/Sections/ProvidersSection.test.tsx b/src/browser/features/Settings/Sections/ProvidersSection.test.tsx index 8e31956d30..89ef4f7c83 100644 --- a/src/browser/features/Settings/Sections/ProvidersSection.test.tsx +++ b/src/browser/features/Settings/Sections/ProvidersSection.test.tsx @@ -150,12 +150,27 @@ function patchProviderMethods(client: APIClient, providersConfig: ProvidersConfi delete providersConfig[input.provider]; return Promise.resolve({ success: true as const, data: undefined }); }); + const setProviderConfig = mock((input) => { + const provider = providersConfig[input.provider]; + if (provider) { + const key = input.keyPath[0] as keyof ProviderConfigInfo | undefined; + if (key) { + if (input.value === "") { + delete provider[key]; + } else { + Object.assign(provider, { [key]: input.value }); + } + } + } + return Promise.resolve({ success: true as const, data: undefined }); + }); const onConfigChanged = mock(() => Promise.resolve(emptyConfigChangeIterator())); Object.assign(client.providers, { getConfig, addCustomOpenAICompatibleProvider, removeCustomProvider, + setProviderConfig, onConfigChanged, }); @@ -163,6 +178,7 @@ function patchProviderMethods(client: APIClient, providersConfig: ProvidersConfi addCustomOpenAICompatibleProvider, getConfig, removeCustomProvider, + setProviderConfig, }; } @@ -318,6 +334,50 @@ describe("ProvidersSection", () => { ).toBeTruthy(); }); + test("shows and persists the OpenAI WebSocket transport toggle", async () => { + const view = renderProvidersSection(); + const openAiButton = await view.findByRole("button", { name: /^OpenAI$/ }); + + fireEvent.click(openAiButton); + + const openAiCard = getProviderCard(openAiButton); + const webSocketToggle = within(openAiCard).getByRole("switch", { + name: /WebSocket transport/i, + }); + expect(webSocketToggle).toBeTruthy(); + + fireEvent.click(webSocketToggle); + + await waitFor(() => { + expect(view.setProviderConfig).toHaveBeenCalledWith({ + provider: "openai", + keyPath: ["webSocketTransportEnabled"], + value: true, + }); + }); + }); + + test("hides the OpenAI WebSocket transport toggle for Chat Completions without clearing it", async () => { + const view = renderProvidersSection(); + view.providersConfig.openai.wireFormat = "chatCompletions"; + view.providersConfig.openai.webSocketTransportEnabled = true; + const openAiButton = await view.findByRole("button", { name: /^OpenAI$/ }); + + fireEvent.click(openAiButton); + + const openAiCard = getProviderCard(openAiButton); + expect( + within(openAiCard).queryByRole("switch", { + name: /WebSocket transport/i, + }) + ).toBeNull(); + expect(within(openAiCard).queryByText("WebSocket transport")).toBeNull(); + expect(view.providersConfig.openai.webSocketTransportEnabled).toBe(true); + expect(view.setProviderConfig).not.toHaveBeenCalledWith( + expect.objectContaining({ keyPath: ["webSocketTransportEnabled"], value: "" }) + ); + }); + test("shows remove only for expanded custom provider cards", async () => { const view = renderProvidersSection(); const customButton = await view.findByRole("button", { name: /Acme OpenAI/ }); diff --git a/src/browser/features/Settings/Sections/ProvidersSection.tsx b/src/browser/features/Settings/Sections/ProvidersSection.tsx index b7c140a04c..ed20a643c1 100644 --- a/src/browser/features/Settings/Sections/ProvidersSection.tsx +++ b/src/browser/features/Settings/Sections/ProvidersSection.tsx @@ -1636,6 +1636,8 @@ export function ProvidersSection() { const gatewayRouteTargets = providerDefinition?.kind === "gateway" ? (providerDefinition.routes ?? []) : []; const isCustomOpenAICompatible = isCustomOpenAICompatibleProviderInfo(providerInfo); + const openAIWireFormat = providerInfo?.wireFormat ?? "responses"; + const openAIWebSocketTransportVisible = openAIWireFormat === "responses"; const statusDotColor = !enabled ? "bg-warning" : configured @@ -2556,6 +2558,40 @@ export function ProvidersSection() { + {openAIWebSocketTransportVisible && ( +
+
+
+ + + Experimental: uses OpenAI's Responses WebSocket transport + for streaming Responses API requests. + +
+ { + if (!api) return; + + const webSocketTransportEnabled = nextChecked + ? true + : undefined; + updateOptimistically("openai", { webSocketTransportEnabled }); + void api.providers.setProviderConfig({ + provider: "openai", + keyPath: ["webSocketTransportEnabled"], + value: nextChecked ? true : "", + }); + }} + aria-label="WebSocket transport" + /> +
+
+ )} +
diff --git a/src/common/config/schemas/providersConfig.test.ts b/src/common/config/schemas/providersConfig.test.ts index d360127e95..a36f277212 100644 --- a/src/common/config/schemas/providersConfig.test.ts +++ b/src/common/config/schemas/providersConfig.test.ts @@ -62,6 +62,27 @@ describe("ProvidersConfigSchema", () => { expect(ProvidersConfigSchema.safeParse(invalid).success).toBe(false); }); + it("accepts OpenAI WebSocket transport opt-in as an optional boolean", () => { + const valid = { + openai: { apiKey: "sk-openai-123", webSocketTransportEnabled: true }, + }; + + const parsed = ProvidersConfigSchema.safeParse(valid); + + expect(parsed.success).toBe(true); + if (parsed.success) { + expect(parsed.data.openai?.webSocketTransportEnabled).toBe(true); + } + }); + + it("rejects non-boolean OpenAI WebSocket transport values", () => { + const invalid = { + openai: { webSocketTransportEnabled: "true" }, + }; + + expect(ProvidersConfigSchema.safeParse(invalid).success).toBe(false); + }); + describe("modelParameters", () => { it("accepts valid per-model and wildcard overrides", () => { const valid = { diff --git a/src/common/config/schemas/providersConfig.ts b/src/common/config/schemas/providersConfig.ts index 8748293e27..957e4331a4 100644 --- a/src/common/config/schemas/providersConfig.ts +++ b/src/common/config/schemas/providersConfig.ts @@ -34,6 +34,7 @@ export const OpenAIProviderConfigSchema = BaseProviderConfigSchema.extend({ codexOauth: z.record(z.string(), z.unknown()).optional(), defaultModel: z.string().optional(), apiVersion: z.string().optional(), + webSocketTransportEnabled: z.boolean().optional(), }); export const BedrockProviderConfigSchema = BaseProviderConfigSchema.extend({ diff --git a/src/common/orpc/schemas/api.test.ts b/src/common/orpc/schemas/api.test.ts index 0c9e9871d4..7e5c7a13b9 100644 --- a/src/common/orpc/schemas/api.test.ts +++ b/src/common/orpc/schemas/api.test.ts @@ -116,6 +116,7 @@ describe("ProviderConfigInfoSchema conformance", () => { models: ["claude-3-opus", "claude-3-sonnet"], serviceTier: "flex", store: false, + webSocketTransportEnabled: true, cacheTtl: "1h", disableBetaFeatures: true, codexOauthSet: true, @@ -150,6 +151,7 @@ describe("ProviderConfigInfoSchema conformance", () => { expect(parsed.models).toEqual(full.models); expect(parsed.serviceTier).toBe(full.serviceTier); expect(parsed.store).toBe(full.store); + expect(parsed.webSocketTransportEnabled).toBe(full.webSocketTransportEnabled); expect(parsed.cacheTtl).toBe(full.cacheTtl); expect(parsed.disableBetaFeatures).toBe(full.disableBetaFeatures); expect(parsed.codexOauthSet).toBe(full.codexOauthSet); diff --git a/src/common/orpc/schemas/api.ts b/src/common/orpc/schemas/api.ts index cf6669549c..630020a930 100644 --- a/src/common/orpc/schemas/api.ts +++ b/src/common/orpc/schemas/api.ts @@ -205,6 +205,7 @@ export const ProviderConfigInfoSchema = z.object({ serviceTier: ServiceTierSchema.optional(), wireFormat: z.enum(["responses", "chatCompletions"]).optional(), store: z.boolean().optional(), + webSocketTransportEnabled: z.boolean().optional(), /** Anthropic-specific fields */ cacheTtl: CacheTtlSchema.optional(), disableBetaFeatures: z.boolean().optional(), diff --git a/src/node/services/languageModelCleanup.test.ts b/src/node/services/languageModelCleanup.test.ts new file mode 100644 index 0000000000..22838b6876 --- /dev/null +++ b/src/node/services/languageModelCleanup.test.ts @@ -0,0 +1,82 @@ +import { describe, expect, test } from "bun:test"; +import type { LanguageModel } from "ai"; + +import { + attachLanguageModelCleanup, + hasLanguageModelCleanup, + moveLanguageModelCleanup, + runLanguageModelCleanup, +} from "./languageModelCleanup"; + +function createModel(): LanguageModel { + return { + specificationVersion: "v3", + provider: "test", + modelId: "test-model", + supportedUrls: {}, + doGenerate: () => Promise.reject(new Error("doGenerate is unused in cleanup tests")), + doStream: () => Promise.reject(new Error("doStream is unused in cleanup tests")), + }; +} + +describe("language model cleanup", () => { + test("runs attached cleanup exactly once", () => { + const model = createModel(); + let cleanupCalls = 0; + + attachLanguageModelCleanup(model, () => { + cleanupCalls += 1; + }); + + runLanguageModelCleanup(model); + runLanguageModelCleanup(model); + + expect(cleanupCalls).toBe(1); + }); + + test("reports whether cleanup is attached", () => { + const model = createModel(); + + expect(hasLanguageModelCleanup(model)).toBe(false); + attachLanguageModelCleanup(model, () => undefined); + expect(hasLanguageModelCleanup(model)).toBe(true); + runLanguageModelCleanup(model); + expect(hasLanguageModelCleanup(model)).toBe(false); + }); + + test("moves cleanup to a wrapper model", () => { + const inner = createModel(); + const outer = createModel(); + let cleanupCalls = 0; + + attachLanguageModelCleanup(inner, () => { + cleanupCalls += 1; + }); + + moveLanguageModelCleanup(inner, outer); + + expect(hasLanguageModelCleanup(inner)).toBe(false); + expect(hasLanguageModelCleanup(outer)).toBe(true); + runLanguageModelCleanup(inner); + runLanguageModelCleanup(outer); + expect(cleanupCalls).toBe(1); + }); + + test("models without cleanup are safe", () => { + expect(() => runLanguageModelCleanup(createModel())).not.toThrow(); + }); + + test("cleanup errors are swallowed after the first attempt", () => { + const model = createModel(); + let cleanupCalls = 0; + + attachLanguageModelCleanup(model, () => { + cleanupCalls += 1; + throw new Error("close failed"); + }); + + expect(() => runLanguageModelCleanup(model)).not.toThrow(); + expect(() => runLanguageModelCleanup(model)).not.toThrow(); + expect(cleanupCalls).toBe(1); + }); +}); diff --git a/src/node/services/languageModelCleanup.ts b/src/node/services/languageModelCleanup.ts new file mode 100644 index 0000000000..7e3f0d8405 --- /dev/null +++ b/src/node/services/languageModelCleanup.ts @@ -0,0 +1,53 @@ +import assert from "node:assert"; +import type { LanguageModel } from "ai"; + +import { log } from "./log"; + +const languageModelCleanupSymbol = Symbol("mux.languageModelCleanup"); + +type LanguageModelCleanup = () => void; +type LanguageModelWithCleanup = LanguageModel & { + [languageModelCleanupSymbol]?: LanguageModelCleanup; +}; + +export function attachLanguageModelCleanup( + model: LanguageModel, + cleanup: LanguageModelCleanup +): LanguageModel { + assert(typeof cleanup === "function", "language model cleanup must be a function"); + const modelWithCleanup = model as LanguageModelWithCleanup; + modelWithCleanup[languageModelCleanupSymbol] = cleanup; + return model; +} + +export function moveLanguageModelCleanup(source: LanguageModel, target: LanguageModel): void { + const sourceWithCleanup = source as LanguageModelWithCleanup; + const cleanup = sourceWithCleanup[languageModelCleanupSymbol]; + if (cleanup === undefined) { + return; + } + + delete sourceWithCleanup[languageModelCleanupSymbol]; + attachLanguageModelCleanup(target, cleanup); +} + +export function hasLanguageModelCleanup(model: LanguageModel): boolean { + const modelWithCleanup = model as LanguageModelWithCleanup; + return typeof modelWithCleanup[languageModelCleanupSymbol] === "function"; +} + +export function runLanguageModelCleanup(model: LanguageModel): void { + const modelWithCleanup = model as LanguageModelWithCleanup; + const cleanup = modelWithCleanup[languageModelCleanupSymbol]; + if (cleanup === undefined) { + return; + } + + delete modelWithCleanup[languageModelCleanupSymbol]; + + try { + cleanup(); + } catch (error) { + log.warn("Failed to clean up language model resources", { error }); + } +} diff --git a/src/node/services/openAIWebSocketTransportFetch.test.ts b/src/node/services/openAIWebSocketTransportFetch.test.ts new file mode 100644 index 0000000000..c9932a4e3c --- /dev/null +++ b/src/node/services/openAIWebSocketTransportFetch.test.ts @@ -0,0 +1,125 @@ +import { describe, expect, test } from "bun:test"; + +import { createOpenAIWebSocketTransportFetch } from "./openAIWebSocketTransportFetch"; + +function getFetchInputUrl(input: RequestInfo | URL): string { + if (input instanceof URL) { + return input.toString(); + } + if (typeof input === "string") { + return input; + } + return input.url; +} + +function createTestFetch( + handler: (input: RequestInfo | URL, init?: RequestInit) => Promise +): typeof fetch { + return Object.assign(handler, { preconnect: fetch.preconnect.bind(fetch) }) as typeof fetch; +} + +function createTestWebSocketFetch( + handler: (input: RequestInfo | URL, init?: RequestInit) => Promise, + close: () => void = () => undefined +): typeof fetch & { close: () => void } { + return Object.assign(createTestFetch(handler), { close }); +} + +describe("createOpenAIWebSocketTransportFetch", () => { + test("disabled transport keeps using the base fetch and exposes inactive cleanup", async () => { + const baseCalls: string[] = []; + const baseFetch = createTestFetch((input: RequestInfo | URL, _init?: RequestInit) => { + baseCalls.push(getFetchInputUrl(input)); + return Promise.resolve(new Response("base")); + }); + + const transport = createOpenAIWebSocketTransportFetch({ + enabled: false, + baseFetch, + createWebSocketFetch: () => { + throw new Error("WebSocket fetch should not be created when disabled"); + }, + }); + + const response = await transport.fetch("https://api.openai.com/v1/responses", { + method: "POST", + body: JSON.stringify({ stream: true }), + }); + + expect(await response.text()).toBe("base"); + expect(baseCalls).toEqual(["https://api.openai.com/v1/responses"]); + expect(transport.active).toBe(false); + expect(() => transport.close()).not.toThrow(); + }); + + test("enabled transport sends streaming Responses API posts through WebSocket fetch", async () => { + const wsCalls: string[] = []; + const transport = createOpenAIWebSocketTransportFetch({ + enabled: true, + baseFetch: createTestFetch(() => Promise.resolve(new Response("base"))), + createWebSocketFetch: () => { + return createTestWebSocketFetch((input: RequestInfo | URL, _init?: RequestInit) => { + wsCalls.push(getFetchInputUrl(input)); + return Promise.resolve(new Response("ws")); + }); + }, + }); + + const response = await transport.fetch("https://api.openai.com/v1/responses", { + method: "POST", + body: JSON.stringify({ stream: true }), + }); + + expect(await response.text()).toBe("ws"); + expect(wsCalls).toEqual(["https://api.openai.com/v1/responses"]); + expect(transport.active).toBe(true); + }); + + test("enabled transport keeps non-eligible requests on the base fetch", async () => { + const baseCalls: string[] = []; + const wsCalls: string[] = []; + const baseFetch = createTestFetch((input: RequestInfo | URL, _init?: RequestInit) => { + baseCalls.push(getFetchInputUrl(input)); + return Promise.resolve(new Response("base")); + }); + + const transport = createOpenAIWebSocketTransportFetch({ + enabled: true, + baseFetch, + createWebSocketFetch: () => { + return createTestWebSocketFetch((input: RequestInfo | URL, _init?: RequestInit) => { + wsCalls.push(getFetchInputUrl(input)); + return Promise.resolve(new Response("ws")); + }); + }, + }); + + const response = await transport.fetch("https://api.openai.com/v1/models", { + method: "GET", + }); + + expect(await response.text()).toBe("base"); + expect(baseCalls).toEqual(["https://api.openai.com/v1/models"]); + expect(wsCalls).toEqual([]); + }); + + test("close is idempotent", () => { + let closeCalls = 0; + const transport = createOpenAIWebSocketTransportFetch({ + enabled: true, + baseFetch: createTestFetch(() => Promise.resolve(new Response("base"))), + createWebSocketFetch: () => + createTestWebSocketFetch( + () => Promise.resolve(new Response("ws")), + () => { + closeCalls += 1; + } + ), + }); + + transport.close(); + transport.close(); + + expect(closeCalls).toBe(1); + }); +}); diff --git a/src/node/services/openAIWebSocketTransportFetch.ts b/src/node/services/openAIWebSocketTransportFetch.ts new file mode 100644 index 0000000000..3731dd8ac7 --- /dev/null +++ b/src/node/services/openAIWebSocketTransportFetch.ts @@ -0,0 +1,96 @@ +import assert from "node:assert"; +import { createWebSocketFetch as createOpenAIWebSocketFetch } from "@vercel/ai-sdk-openai-websocket-fetch"; + +type WebSocketFetch = typeof fetch & { close: () => void }; +type WebSocketFetchFactory = () => WebSocketFetch; + +interface CreateOpenAIWebSocketTransportFetchOptions { + enabled: boolean; + baseFetch: typeof fetch; + createWebSocketFetch?: WebSocketFetchFactory; +} + +interface OpenAIWebSocketTransportFetch { + fetch: typeof fetch; + close: () => void; + active: boolean; +} + +function getRequestUrl(input: RequestInfo | URL): string { + if (input instanceof URL) { + return input.toString(); + } + if (typeof input === "string") { + return input; + } + return input.url; +} + +function isStreamingResponsesRequest(input: RequestInfo | URL, init?: RequestInit): boolean { + if (init?.method?.toUpperCase() !== "POST") { + return false; + } + + if (!getRequestUrl(input).endsWith("/responses")) { + return false; + } + + if (typeof init?.body !== "string") { + return false; + } + + try { + const body = JSON.parse(init.body) as { stream?: unknown }; + return body.stream === true; + } catch { + return false; + } +} + +export function createOpenAIWebSocketTransportFetch( + options: CreateOpenAIWebSocketTransportFetchOptions +): OpenAIWebSocketTransportFetch { + if (!options.enabled) { + return { + fetch: options.baseFetch, + close: () => undefined, + active: false, + }; + } + + const webSocketFetchFactory = options.createWebSocketFetch ?? createOpenAIWebSocketFetch; + const webSocketFetch = webSocketFetchFactory(); + assert(typeof webSocketFetch.close === "function", "OpenAI WebSocket fetch must expose close()"); + + let closed = false; + const close = (): void => { + if (closed) { + return; + } + closed = true; + webSocketFetch.close(); + }; + + const baseFetchWithPreconnect = options.baseFetch as typeof fetch & { + preconnect?: typeof fetch.preconnect; + }; + const fetchExtras = + typeof baseFetchWithPreconnect.preconnect === "function" + ? { preconnect: baseFetchWithPreconnect.preconnect.bind(baseFetchWithPreconnect) } + : {}; + const transportFetch = Object.assign(async (input: RequestInfo | URL, init?: RequestInit) => { + // The upstream package falls through to globalThis.fetch for non-WebSocket requests. + // Pre-filter here so Mux's existing fetch wrappers keep handling those HTTP paths. + if (!isStreamingResponsesRequest(input, init)) { + return options.baseFetch(input, init); + } + + return webSocketFetch(input, init); + }, fetchExtras) as typeof fetch; + + return { + fetch: transportFetch, + close, + active: true, + }; +} diff --git a/src/node/services/providerModelFactory.test.ts b/src/node/services/providerModelFactory.test.ts index cff450fe36..7076cb4584 100644 --- a/src/node/services/providerModelFactory.test.ts +++ b/src/node/services/providerModelFactory.test.ts @@ -21,6 +21,7 @@ import { wrapFetchWithAnthropicCacheControl, } from "./providerModelFactory"; import { MUX_ANTHROPIC_EFFORT_OVERRIDE_HEADER } from "@/common/utils/ai/providerOptions"; +import { hasLanguageModelCleanup } from "./languageModelCleanup"; import { CodexOauthService } from "./codexOauthService"; import { PolicyService } from "./policyService"; import { ProviderService } from "./providerService"; @@ -781,6 +782,66 @@ describe("ProviderModelFactory GitHub Copilot", () => { }); }); +describe("ProviderModelFactory OpenAI WebSocket transport", () => { + it("attaches cleanup when enabled for Responses models", async () => { + await withTempConfig(async (config, factory) => { + config.saveProvidersConfig({ + openai: { + apiKey: "sk-test", + webSocketTransportEnabled: true, + }, + }); + + const result = await factory.createModel("openai:gpt-4.1-mini"); + + expect(result.success).toBe(true); + if (!result.success) { + return; + } + expect(hasLanguageModelCleanup(result.data)).toBe(true); + }); + }); + + it("does not attach cleanup when Chat Completions is selected", async () => { + await withTempConfig(async (config, factory) => { + config.saveProvidersConfig({ + openai: { + apiKey: "sk-test", + wireFormat: "chatCompletions", + webSocketTransportEnabled: true, + }, + }); + + const result = await factory.createModel("openai:gpt-4.1-mini"); + + expect(result.success).toBe(true); + if (!result.success) { + return; + } + expect(hasLanguageModelCleanup(result.data)).toBe(false); + }); + }); + + it("ignores invalid persisted WebSocket transport values", async () => { + await withTempConfig(async (config, factory) => { + config.saveProvidersConfig({ + openai: { + apiKey: "sk-test", + webSocketTransportEnabled: "true", + }, + } as unknown as Parameters[0]); + + const result = await factory.createModel("openai:gpt-4.1-mini"); + + expect(result.success).toBe(true); + if (!result.success) { + return; + } + expect(hasLanguageModelCleanup(result.data)).toBe(false); + }); + }); +}); + describe("ProviderModelFactory modelCostsIncluded", () => { it("marks gpt-5.3-codex as subscription-covered when routed through Codex OAuth", async () => { await withTempConfig(async (config, factory) => { diff --git a/src/node/services/providerModelFactory.ts b/src/node/services/providerModelFactory.ts index fea730ba1f..e38bbe58d5 100644 --- a/src/node/services/providerModelFactory.ts +++ b/src/node/services/providerModelFactory.ts @@ -41,6 +41,11 @@ import type { CodexOauthService } from "@/node/services/codexOauthService"; import type { DevToolsService } from "@/node/services/devToolsService"; import { captureAndStripDevToolsHeader } from "@/node/services/devToolsHeaderCapture"; import { createDevToolsMiddleware } from "@/node/services/devToolsMiddleware"; +import { + attachLanguageModelCleanup, + moveLanguageModelCleanup, +} from "@/node/services/languageModelCleanup"; +import { createOpenAIWebSocketTransportFetch } from "@/node/services/openAIWebSocketTransportFetch"; import { log } from "@/node/services/log"; import { MUX_ANTHROPIC_EFFORT_OVERRIDE_HEADER, @@ -958,10 +963,12 @@ export class ProviderModelFactory { const workspaceId = opts?.workspaceId; const devToolsService = this.devToolsService; if (workspaceId != null && devToolsService?.enabled) { + const innerModel = model; model = wrapLanguageModel({ model, middleware: createDevToolsMiddleware(workspaceId, devToolsService), }); + moveLanguageModelCleanup(innerModel, model); } return Ok(model); @@ -1296,6 +1303,9 @@ export class ProviderModelFactory { const baseFetch = getProviderFetch(providerConfig); const codexOauthService = this.codexOauthService; + const webSocketTransportEnabled = + (providerConfig as { webSocketTransportEnabled?: unknown }).webSocketTransportEnabled === + true; // Wrap fetch so Codex OAuth Responses requests are normalized before // they are rerouted from api.openai.com to chatgpt.com's Codex backend. @@ -1378,9 +1388,14 @@ export class ProviderModelFactory { : {} ); + const webSocketTransport = createOpenAIWebSocketTransportFetch({ + enabled: webSocketTransportEnabled && effectiveWireFormat === "responses", + baseFetch: fetchWithOpenAICodexNormalization as typeof fetch, + }); + // Lazy-load OpenAI provider to reduce startup time const { createOpenAI } = await PROVIDER_REGISTRY.openai(); - const providerFetch = fetchWithOpenAICodexNormalization as typeof fetch; + const providerFetch = webSocketTransport.fetch; const provider = createOpenAI({ ...configWithCreds, // Cast is safe: our fetch implementation is compatible with the SDK's fetch type. @@ -1393,6 +1408,10 @@ export class ProviderModelFactory { effectiveWireFormat === "chatCompletions" ? provider.chat(modelId) : provider.responses(modelId); + if (webSocketTransport.active) { + attachLanguageModelCleanup(model, webSocketTransport.close); + } + const injectModelOpenAIStore = (storeValue: unknown, mode: "default" | "force"): void => { assert(typeof storeValue === "boolean", "OpenAI store override must be boolean"); const store = storeValue; diff --git a/src/node/services/providerService.test.ts b/src/node/services/providerService.test.ts index bc2ec9bdef..cdf62757e0 100644 --- a/src/node/services/providerService.test.ts +++ b/src/node/services/providerService.test.ts @@ -204,6 +204,42 @@ describe("ProviderService.getConfig", () => { }); }); + it("surfaces valid OpenAI WebSocket transport preference", () => { + withTempConfig((config, service) => { + config.saveProvidersConfig({ + openai: { + apiKey: "sk-test", + webSocketTransportEnabled: true, + }, + }); + + const cfg = service.getConfig(); + + expect(cfg.openai.webSocketTransportEnabled).toBe(true); + expect(Object.prototype.hasOwnProperty.call(cfg.openai, "webSocketTransportEnabled")).toBe( + true + ); + }); + }); + + it("omits invalid OpenAI WebSocket transport preference", () => { + withTempConfig((config, service) => { + config.saveProvidersConfig({ + openai: { + apiKey: "sk-test", + webSocketTransportEnabled: "true", + }, + }); + + const cfg = service.getConfig(); + + expect(cfg.openai.webSocketTransportEnabled).toBeUndefined(); + expect(Object.prototype.hasOwnProperty.call(cfg.openai, "webSocketTransportEnabled")).toBe( + false + ); + }); + }); + it("surfaces non-secret op:// API key references", () => { withTempConfig((config, service) => { const opRef = "op://Personal/Anthropic/credential"; diff --git a/src/node/services/providerService.ts b/src/node/services/providerService.ts index 0dd93364e9..5ed17ba8de 100644 --- a/src/node/services/providerService.ts +++ b/src/node/services/providerService.ts @@ -255,6 +255,7 @@ export class ProviderService { serviceTier?: string; wireFormat?: string; store?: unknown; + webSocketTransportEnabled?: unknown; cacheTtl?: unknown; disableBetaFeatures?: unknown; /** OpenAI-only: default auth precedence for Codex-OAuth-allowed models. */ @@ -333,6 +334,10 @@ export class ProviderService { providerInfo.store = config.store; } + if (provider === "openai" && typeof config.webSocketTransportEnabled === "boolean") { + providerInfo.webSocketTransportEnabled = config.webSocketTransportEnabled; + } + // Anthropic-specific fields const cacheTtl = config.cacheTtl; if (provider === "anthropic" && (cacheTtl === "5m" || cacheTtl === "1h")) { diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index c4f079d2f2..8c60dec1f7 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -5,7 +5,14 @@ import { KNOWN_MODELS } from "@/common/constants/knownModels"; import type { ToolPolicy } from "@/common/utils/tools/toolPolicy"; import { StreamManager, stripEncryptedContent } from "./streamManager"; import * as aiSdk from "ai"; -import { APICallError, RetryError, tool, type ModelMessage, type Tool } from "ai"; +import { + APICallError, + RetryError, + tool, + type LanguageModel, + type ModelMessage, + type Tool, +} from "ai"; import { z } from "zod"; import * as modelStatsModule from "@/common/utils/tokens/modelStats"; import type { HistoryService } from "./historyService"; @@ -17,6 +24,7 @@ import { shouldRunIntegrationTests, validateApiKeys } from "../../../tests/testU import { DisposableTempDir } from "@/node/services/tempDir"; import type { ExecOptions, ExecStream, Runtime } from "@/node/runtime/Runtime"; import { createRuntime } from "@/node/runtime/runtimeFactory"; +import { attachLanguageModelCleanup } from "./languageModelCleanup"; import { shellQuote } from "@/common/utils/shell"; // Skip integration tests if TEST_INTEGRATION is not set @@ -918,6 +926,93 @@ describe("StreamManager - call settings overrides", () => { }); }); +describe("StreamManager - language model cleanup", () => { + const runtime = createRuntime({ type: "local", srcBaseDir: "/tmp" }); + + test("runs model cleanup when stream processing finishes", async () => { + const streamManager = new StreamManager(historyService); + streamManager.on("error", () => undefined); + const workspaceId = "cleanup-workspace"; + const messageId = "cleanup-message"; + const historySequence = 1; + let cleanupCalls = 0; + const model: LanguageModel = { + specificationVersion: "v3", + provider: "test", + modelId: "cleanup-model", + supportedUrls: {}, + doGenerate: () => Promise.reject(new Error("doGenerate is unused in cleanup tests")), + doStream: () => Promise.reject(new Error("doStream is unused in cleanup tests")), + }; + attachLanguageModelCleanup(model, () => { + cleanupCalls += 1; + }); + + const appendResult = await historyService.appendToHistory(workspaceId, { + id: messageId, + role: "assistant", + metadata: { historySequence, partial: true }, + parts: [], + }); + expect(appendResult.success).toBe(true); + + const processStreamWithCleanup = Reflect.get(streamManager, "processStreamWithCleanup") as ( + workspaceId: string, + streamInfo: unknown, + historySequence: number + ) => Promise; + expect(typeof processStreamWithCleanup).toBe("function"); + + const streamInfo = { + state: "streaming", + streamResult: { + fullStream: (async function* () { + await Promise.resolve(); + yield* [] as unknown[]; + })(), + totalUsage: Promise.resolve({ inputTokens: 1, outputTokens: 1, totalTokens: 2 }), + usage: Promise.resolve({ inputTokens: 1, outputTokens: 1, totalTokens: 2 }), + providerMetadata: Promise.resolve(undefined), + steps: Promise.resolve([]), + }, + abortController: new AbortController(), + messageId, + token: "cleanup-token", + startTime: Date.now(), + lastPartTimestamp: Date.now(), + toolCompletionTimestamps: new Map(), + model: "openai:gpt-4.1-mini", + metadataModel: "openai:gpt-4.1-mini", + historySequence, + request: { model, messages: [], providerOptions: undefined }, + toolModelUsages: [], + parts: [{ type: "text" as const, text: "done", timestamp: Date.now() }], + lastPartialWriteTime: 0, + partialWritePromise: undefined, + processingPromise: Promise.resolve(), + softInterrupt: { pending: false as const }, + runtimeTempDir: "", + runtime, + cumulativeUsage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, + cumulativeProviderMetadata: undefined, + didRetryPreviousResponseIdAtStep: false, + currentStepStartIndex: 0, + stepTracker: {}, + }; + + const workspaceStreamsValue: unknown = Reflect.get(streamManager, "workspaceStreams"); + expect(workspaceStreamsValue instanceof Map).toBe(true); + if (!(workspaceStreamsValue instanceof Map)) { + throw new Error("Expected StreamManager.workspaceStreams to be a Map"); + } + workspaceStreamsValue.set(workspaceId, streamInfo); + + await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence); + + expect(cleanupCalls).toBe(1); + }); +}); + describe("StreamManager - stripEncryptedContent", () => { test("strips encryptedContent from array output shape", () => { const output = [ diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index d296647ce8..5b21d6831b 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -60,6 +60,7 @@ import { withSequentialExecution } from "@/node/services/tools/withSequentialExe import type { ResolvedCallSettingsOverrides } from "@/common/config/schemas/modelParameters"; import { resolveModelForMetadata } from "@/common/utils/providers/modelEntries"; import { getErrorMessage } from "@/common/utils/errors"; +import { runLanguageModelCleanup } from "./languageModelCleanup"; import { shellQuote } from "@/common/utils/shell"; import { classify429Capacity } from "@/common/utils/errors/classify429Capacity"; import { normalizeLiteralRequiredToolPattern } from "@/common/utils/agentTools"; @@ -2334,6 +2335,8 @@ export class StreamManager extends EventEmitter { streamInfo.partialWriteTimer = undefined; } + runLanguageModelCleanup(streamInfo.request.model); + streamInfo.unlinkAbortSignal?.(); streamInfo.unlinkAbortSignal = undefined; diff --git a/src/node/services/workspaceTitleGenerator.test.ts b/src/node/services/workspaceTitleGenerator.test.ts index 71b75bcf08..b7fcd65c45 100644 --- a/src/node/services/workspaceTitleGenerator.test.ts +++ b/src/node/services/workspaceTitleGenerator.test.ts @@ -1,10 +1,19 @@ -import { APICallError, NoOutputGeneratedError, RetryError } from "ai"; -import { describe, expect, test } from "bun:test"; +import * as aiSdk from "ai"; +import { APICallError, NoOutputGeneratedError, RetryError, type LanguageModel } from "ai"; +import { afterEach, describe, expect, mock, spyOn, test } from "bun:test"; import { buildWorkspaceIdentityPrompt, + generateWorkspaceIdentity, mapModelCreationError, mapNameGenerationError, } from "./workspaceTitleGenerator"; +import { Ok } from "@/common/types/result"; +import type { AIService } from "./aiService"; +import { attachLanguageModelCleanup } from "./languageModelCleanup"; + +afterEach(() => { + mock.restore(); +}); describe("buildWorkspaceIdentityPrompt", () => { test("includes overall-scope guidance, conversation turns, and latest-user context without precedence", () => { @@ -58,6 +67,48 @@ const createApiCallError = ( responseBody: overrides?.responseBody, }); +describe("generateWorkspaceIdentity cleanup", () => { + test("cleans up the model after a successful title stream", async () => { + let cleanupCalls = 0; + const model: LanguageModel = { + specificationVersion: "v3", + provider: "test", + modelId: "title-model", + supportedUrls: {}, + doGenerate: () => Promise.reject(new Error("doGenerate is unused in cleanup tests")), + doStream: () => Promise.reject(new Error("doStream is unused in cleanup tests")), + }; + attachLanguageModelCleanup(model, () => { + cleanupCalls += 1; + }); + const aiService = { + createModel: () => Promise.resolve(Ok(model)), + }; + + // AIService is a class with many runtime dependencies; this test only needs its public createModel seam. + const titleAiService = aiService as unknown as AIService; + + spyOn(aiSdk, "streamText").mockReturnValue({ + toolResults: Promise.resolve([ + { + dynamic: false, + toolName: "propose_name", + output: { name: "settings", title: "Add setting" }, + }, + ]), + } as unknown as ReturnType); + + const result = await generateWorkspaceIdentity( + "Add setting", + ["openai:gpt-4.1-mini"], + titleAiService + ); + + expect(result.success).toBe(true); + expect(cleanupCalls).toBe(1); + }); +}); + describe("workspaceTitleGenerator error mappers", () => { describe("mapNameGenerationError", () => { test("preserves provider context for auth and permission API failures", () => { diff --git a/src/node/services/workspaceTitleGenerator.ts b/src/node/services/workspaceTitleGenerator.ts index 35b933f7fb..0cea63887d 100644 --- a/src/node/services/workspaceTitleGenerator.ts +++ b/src/node/services/workspaceTitleGenerator.ts @@ -7,6 +7,7 @@ import type { NameGenerationError, SendMessageError } from "@/common/types/error import { getErrorMessage } from "@/common/utils/errors"; import { classify429Capacity } from "@/common/utils/errors/classify429Capacity"; import { TOOL_DEFINITIONS, ProposeNameToolArgsSchema } from "@/common/utils/tools/toolDefinitions"; +import { runLanguageModelCleanup } from "./languageModelCleanup"; import crypto from "crypto"; export interface WorkspaceIdentity { @@ -272,6 +273,8 @@ export async function generateWorkspaceIdentity( lastError = mapNameGenerationError(error, modelString); log.warn("Name generation failed, trying next candidate", { modelString, error: lastError }); continue; + } finally { + runLanguageModelCleanup(modelResult.data); } } From 1c121a726c63061af1c36793b060099be58fd0dc Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 12:15:34 +0000 Subject: [PATCH 03/16] Preserve Codex OAuth routing for WebSocket opt-in --- CONTEXT.md | 2 +- PRD.md | 4 +-- flake.nix | 2 +- .../services/providerModelFactory.test.ts | 26 +++++++++++++++++++ src/node/services/providerModelFactory.ts | 5 +++- 5 files changed, 34 insertions(+), 5 deletions(-) diff --git a/CONTEXT.md b/CONTEXT.md index e4efa65bb0..4989a78ea5 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -23,7 +23,7 @@ _Avoid_: transport, websocket, useWebSocketTransport ## Relationships - The **OpenAI WebSocket Transport** belongs to the built-in OpenAI provider settings surface. -- Codex OAuth routing is not a supported **OpenAI WebSocket Transport** scope, and Mux does not add a special guard solely to protect it from an opt-in WebSocket attempt. +- Codex OAuth routing is not a supported **OpenAI WebSocket Transport** scope; Mux keeps Codex OAuth on the existing HTTP routing path so OAuth token injection, endpoint rewriting, and request normalization still run. - The **OpenAI WebSocket Transport** applies only to eligible streaming Responses API requests. - The **OpenAI WebSocket Transport** is inactive when the **Built-in OpenAI Provider** uses Chat Completions wire format. - Switching to Chat Completions wire format preserves `webSocketTransportEnabled` but hides the UI control and disables the active WebSocket transport until Responses wire format is restored. diff --git a/PRD.md b/PRD.md index 955b11aa28..aebc5e7423 100644 --- a/PRD.md +++ b/PRD.md @@ -28,7 +28,7 @@ The implementation will use the published `@vercel/ai-sdk-openai-websocket-fetch 16. As a Mux user, I want WebSocket connections to close when a stream errors, so that failed streams do not leak resources. 17. As a Mux user, I want WebSocket connections to close when a stream is cancelled, so that interrupting a run cleans up transport resources. 18. As a Mux user, I want workspace title generation to clean up WebSocket resources too, so that background model uses do not leak sockets. -19. As a Mux user, I want Codex OAuth behavior not to receive special protective gating solely for WebSocket mode, so that the implementation stays simple and opt-in failures surface naturally. +19. As a Mux user, I want Codex OAuth-routed requests to keep using the existing HTTP routing path, so that OAuth token injection, endpoint rewriting, and request normalization still run correctly. 20. As a Mux user, I want OpenAI-compatible providers to remain outside the WebSocket feature scope, so that provider-specific transport behavior does not accidentally affect unrelated providers. 21. As a Mux maintainer, I want the feature implemented through a small composition helper, so that WebSocket integration can be tested independently from the rest of provider construction. 22. As a Mux maintainer, I want the feature to use the published WebSocket transport package, so that Mux does not own protocol details already maintained upstream. @@ -55,7 +55,7 @@ The implementation will use the published `@vercel/ai-sdk-openai-websocket-fetch - Do not validate configured OpenAI base URLs before attempting WebSocket transport. - Do not add automatic HTTP fallback for eligible WebSocket request failures. - Do not broaden the feature to OpenAI-compatible providers in the initial implementation. -- Treat Codex OAuth as not a supported WebSocket scope, while avoiding a special guard solely to protect Codex OAuth from an opt-in attempt. +- Treat Codex OAuth as not a supported WebSocket scope; keep Codex OAuth-routed models on the existing HTTP routing path so OAuth token injection, endpoint rewriting, and request normalization still run. - Use the published `@vercel/ai-sdk-openai-websocket-fetch` package instead of implementing the WebSocket protocol locally. - Add a small deep module for composing OpenAI provider fetch behavior with the WebSocket transport. Its interface should hide package details behind simple inputs such as whether WebSocket mode is active and a returned close hook. - Preserve existing OpenAI fetch behavior when composing the WebSocket transport, including request header handling, Mux attribution, DevTools header stripping, timeout behavior, custom fetch compatibility, and existing request normalization. diff --git a/flake.nix b/flake.nix index 717b38e3b0..d54949a7d0 100644 --- a/flake.nix +++ b/flake.nix @@ -83,7 +83,7 @@ outputHashMode = "recursive"; # Marker used by scripts/update_flake_hash.sh to update this hash in place. - outputHash = "sha256-nSkVmS55SWfLbUIscBGMzgR2su6vIlE9GcSRDLrn4eI="; # mux-offline-cache-hash + outputHash = "sha256-jHp/RsmtwHKsbrD0b86+nb+XQ75pT5y1tEeltT6hDVQ="; # mux-offline-cache-hash }; configurePhase = '' diff --git a/src/node/services/providerModelFactory.test.ts b/src/node/services/providerModelFactory.test.ts index 7076cb4584..d86fc18b42 100644 --- a/src/node/services/providerModelFactory.test.ts +++ b/src/node/services/providerModelFactory.test.ts @@ -802,6 +802,32 @@ describe("ProviderModelFactory OpenAI WebSocket transport", () => { }); }); + it("does not attach cleanup for Codex OAuth routed models", async () => { + await withTempConfig(async (config, factory) => { + config.saveProvidersConfig({ + openai: { + webSocketTransportEnabled: true, + codexOauth: { + type: "oauth", + access: "test-access-token", + refresh: "test-refresh-token", + expires: Date.now() + 60_000, + accountId: "test-account-id", + }, + }, + }); + + const result = await factory.createModel(KNOWN_MODELS.GPT_53_CODEX.id); + + expect(result.success).toBe(true); + if (!result.success) { + return; + } + expect(hasLanguageModelCleanup(result.data)).toBe(false); + expect(modelCostsIncluded(result.data)).toBe(true); + }); + }); + it("does not attach cleanup when Chat Completions is selected", async () => { await withTempConfig(async (config, factory) => { config.saveProvidersConfig({ diff --git a/src/node/services/providerModelFactory.ts b/src/node/services/providerModelFactory.ts index e38bbe58d5..f88d376c63 100644 --- a/src/node/services/providerModelFactory.ts +++ b/src/node/services/providerModelFactory.ts @@ -1389,7 +1389,10 @@ export class ProviderModelFactory { ); const webSocketTransport = createOpenAIWebSocketTransportFetch({ - enabled: webSocketTransportEnabled && effectiveWireFormat === "responses", + enabled: + webSocketTransportEnabled && + effectiveWireFormat === "responses" && + !shouldRouteThroughCodexOauth, baseFetch: fetchWithOpenAICodexNormalization as typeof fetch, }); From 37db4a4259f09f17f02ff8ef27b2361d968df028 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 12:21:36 +0000 Subject: [PATCH 04/16] Remove root WebSocket planning docs --- CONTEXT.md | 49 -------------------- PRD.md | 130 ----------------------------------------------------- 2 files changed, 179 deletions(-) delete mode 100644 CONTEXT.md delete mode 100644 PRD.md diff --git a/CONTEXT.md b/CONTEXT.md deleted file mode 100644 index 4989a78ea5..0000000000 --- a/CONTEXT.md +++ /dev/null @@ -1,49 +0,0 @@ -# Mux Provider Configuration - -Mux provider configuration describes which model providers are available and how Mux routes model requests through them. - -## Language - -**Built-in OpenAI Provider**: -The first-party OpenAI provider settings surface in Mux, covering OpenAI API key and Codex OAuth configuration paths. -_Avoid_: OpenAI-compatible provider, GitHub Copilot provider - -**Direct OpenAI API Key Path**: -The **Built-in OpenAI Provider** path that talks to OpenAI's platform API with an OpenAI API key. -_Avoid_: Codex OAuth path, OpenAI-compatible provider - -**OpenAI WebSocket Transport**: -An optional transport for the **Built-in OpenAI Provider** that uses OpenAI's Responses WebSocket path for eligible streaming responses requests. -_Avoid_: WebSocket provider, OpenAI-compatible WebSocket transport - -**webSocketTransportEnabled**: -The persisted opt-in flag that enables the **OpenAI WebSocket Transport** for eligible **Built-in OpenAI Provider** requests. -_Avoid_: transport, websocket, useWebSocketTransport - -## Relationships - -- The **OpenAI WebSocket Transport** belongs to the built-in OpenAI provider settings surface. -- Codex OAuth routing is not a supported **OpenAI WebSocket Transport** scope; Mux keeps Codex OAuth on the existing HTTP routing path so OAuth token injection, endpoint rewriting, and request normalization still run. -- The **OpenAI WebSocket Transport** applies only to eligible streaming Responses API requests. -- The **OpenAI WebSocket Transport** is inactive when the **Built-in OpenAI Provider** uses Chat Completions wire format. -- Switching to Chat Completions wire format preserves `webSocketTransportEnabled` but hides the UI control and disables the active WebSocket transport until Responses wire format is restored. -- A WebSocket connection lives for one streaming model run: it can be reused by internal tool-calling steps and is closed when the run completes, errors, or is cancelled. -- Eligible WebSocket request failures are surfaced to the user; Mux does not automatically retry them over HTTP. -- WebSocket transport behavior is verified with deterministic config, UI, provider factory, and lifecycle tests; live OpenAI dogfooding is optional when credentials are available. -- Mux uses the published `@vercel/ai-sdk-openai-websocket-fetch` package for the **OpenAI WebSocket Transport** instead of owning the WebSocket protocol locally. -- Mux composes **OpenAI WebSocket Transport** through a small helper that preserves existing OpenAI fetch behavior and exposes a close hook for stream lifecycle cleanup. -- Mux carries WebSocket cleanup on a Mux-owned language-model cleanup symbol so stream owners can run it in their existing cleanup paths without changing the provider model factory API shape. -- Every `streamText` owner that uses `createModel()`-returned models, including main streams and workspace title generation, runs the model cleanup helper in its stream cleanup path. -- The **OpenAI WebSocket Transport** is a persisted **Built-in OpenAI Provider** setting, not a request-level override. -- The OpenAI provider settings UI exposes `webSocketTransportEnabled` near Wire Format only while Responses wire format is selected. -- The OpenAI provider settings UI describes the **OpenAI WebSocket Transport** as experimental and warns that unsupported endpoints may fail. -- The **OpenAI WebSocket Transport** does not validate configured base URLs; if the selected endpoint does not support OpenAI's Responses WebSocket path, the first eligible request fails normally. - -## Example dialogue - -> **Dev:** "Should the **OpenAI WebSocket Transport** apply to OpenAI-compatible providers?" -> **Domain expert:** "No — it belongs only to the **Built-in OpenAI Provider** for the initial opt-in feature." - -## Flagged ambiguities - -- "OpenAI provider" can mean the **Built-in OpenAI Provider**, the **Direct OpenAI API Key Path**, or an OpenAI-compatible provider; resolved: this feature targets the **Built-in OpenAI Provider** settings surface only. diff --git a/PRD.md b/PRD.md deleted file mode 100644 index aebc5e7423..0000000000 --- a/PRD.md +++ /dev/null @@ -1,130 +0,0 @@ -## Problem Statement - -Mux users who use the **Built-in OpenAI Provider** currently send OpenAI model requests over the existing HTTP transport. OpenAI's Responses WebSocket transport can reduce setup overhead for streaming Responses API workflows, especially multi-step tool-calling runs, but Mux has no first-class way to opt into it. Users should be able to enable the **OpenAI WebSocket Transport** without breaking existing OpenAI, OpenAI-compatible, Chat Completions, Codex OAuth, or custom endpoint configurations. - -## Solution - -Add an optional, non-breaking **Built-in OpenAI Provider** setting named `webSocketTransportEnabled`. When enabled and the provider is using Responses wire format, Mux will use OpenAI's Responses WebSocket transport for eligible streaming Responses API requests. The setting will be exposed in the OpenAI provider settings UI near Wire Format, persisted with the provider configuration, and hidden in the UI while Chat Completions wire format is selected. - -The implementation will use the published `@vercel/ai-sdk-openai-websocket-fetch` package rather than implementing the protocol in Mux. Mux will compose the package through a small, testable integration helper that preserves existing OpenAI fetch behavior, including Mux attribution, timeout behavior, DevTools header handling, and existing request normalization. WebSocket connections will live for one streaming model run, can be reused by internal AI SDK tool-calling steps, and will be explicitly closed when the run completes, errors, or is cancelled. - -## User Stories - -1. As a Mux user, I want to opt into OpenAI's WebSocket transport, so that eligible OpenAI Responses streams can use the lower-overhead transport. -2. As a Mux user, I want the WebSocket transport setting to be optional, so that existing OpenAI behavior remains unchanged unless I opt in. -3. As a Mux user, I want the WebSocket transport setting to persist in provider configuration, so that I do not need to re-enable it for every session. -4. As a Mux user, I want to configure WebSocket transport from the OpenAI provider settings UI, so that I do not need to edit configuration files manually. -5. As a Mux user, I want the setting to appear near Wire Format, so that I understand it is related to Responses versus Chat Completions behavior. -6. As a Mux user, I want the UI to explain that the WebSocket transport is experimental, so that I understand the risk of endpoint failures. -7. As a Mux user, I want the UI to warn that unsupported endpoints may fail, so that failures after enabling the setting are understandable. -8. As a Mux user, I want the WebSocket transport control to be hidden when Chat Completions wire format is selected, so that I do not expect Chat Completions requests to use WebSockets. -9. As a Mux user, I want switching to Chat Completions to preserve my saved WebSocket preference, so that switching back to Responses restores my intended WebSocket behavior. -10. As a Mux user, I want the setting to be non-breaking for existing configurations, so that upgrading Mux does not change transport behavior unexpectedly. -11. As a Mux user, I want Mux to avoid over-validating custom OpenAI base URLs, so that intentionally configured endpoints are not blocked by Mux-specific assumptions. -12. As a Mux user, I want unsupported WebSocket endpoints to fail normally, so that I can decide whether to disable the setting or fix the endpoint. -13. As a Mux user, I want Mux not to retry failed WebSocket requests over HTTP automatically, so that I do not accidentally duplicate provider-side work or tool-call flows. -14. As a Mux user, I want eligible multi-step tool-calling runs to reuse one WebSocket connection within the run, so that the transport can provide benefit across internal model steps. -15. As a Mux user, I want WebSocket connections to close when a stream completes, so that Mux does not leave unnecessary sockets open. -16. As a Mux user, I want WebSocket connections to close when a stream errors, so that failed streams do not leak resources. -17. As a Mux user, I want WebSocket connections to close when a stream is cancelled, so that interrupting a run cleans up transport resources. -18. As a Mux user, I want workspace title generation to clean up WebSocket resources too, so that background model uses do not leak sockets. -19. As a Mux user, I want Codex OAuth-routed requests to keep using the existing HTTP routing path, so that OAuth token injection, endpoint rewriting, and request normalization still run correctly. -20. As a Mux user, I want OpenAI-compatible providers to remain outside the WebSocket feature scope, so that provider-specific transport behavior does not accidentally affect unrelated providers. -21. As a Mux maintainer, I want the feature implemented through a small composition helper, so that WebSocket integration can be tested independently from the rest of provider construction. -22. As a Mux maintainer, I want the feature to use the published WebSocket transport package, so that Mux does not own protocol details already maintained upstream. -23. As a Mux maintainer, I want the provider model factory API shape to remain stable, so that adding WebSocket cleanup does not create broad call-site churn. -24. As a Mux maintainer, I want cleanup to be carried by a Mux-owned language-model cleanup symbol, so that stream owners can clean up resources without exposing package-specific details. -25. As a Mux maintainer, I want deterministic tests for config/schema behavior, so that the persisted opt-in field is accepted and invalid values are not surfaced. -26. As a Mux maintainer, I want deterministic tests for provider status behavior, so that the UI receives the WebSocket setting only when it is valid. -27. As a Mux maintainer, I want deterministic tests for provider construction behavior, so that WebSocket fetch is used only for enabled Responses-mode OpenAI requests. -28. As a Mux maintainer, I want deterministic tests for Chat Completions gating, so that the setting is inactive while Chat Completions wire format is selected. -29. As a Mux maintainer, I want lifecycle tests for cleanup, so that WebSocket close behavior is verified on completion, error, and cancellation paths. -30. As a Mux maintainer, I want UI tests for the OpenAI provider settings control, so that users can discover and correctly interpret the setting. -31. As a reviewer, I want optional live dogfooding instructions, so that the feature can be validated against a real OpenAI endpoint when credentials are available. -32. As a reviewer, I want screenshots and a recording from manual dogfooding when available, so that I can verify the setting's UI and runtime behavior without repeating every step. - -## Implementation Decisions - -- Add a persisted **Built-in OpenAI Provider** boolean setting named `webSocketTransportEnabled`. -- Treat absence of `webSocketTransportEnabled` as disabled so the change is non-breaking. -- Surface valid `webSocketTransportEnabled` values through the provider configuration information consumed by settings UI. -- Expose the setting in the OpenAI provider settings UI near Wire Format. -- Hide the UI control when Chat Completions wire format is selected. -- Preserve the saved WebSocket setting when Chat Completions wire format is selected; only make the transport inactive. -- Use risk-aware UI copy: the feature is experimental, uses OpenAI's Responses WebSocket transport for streaming Responses API requests, and unsupported endpoints may fail. -- Do not validate configured OpenAI base URLs before attempting WebSocket transport. -- Do not add automatic HTTP fallback for eligible WebSocket request failures. -- Do not broaden the feature to OpenAI-compatible providers in the initial implementation. -- Treat Codex OAuth as not a supported WebSocket scope; keep Codex OAuth-routed models on the existing HTTP routing path so OAuth token injection, endpoint rewriting, and request normalization still run. -- Use the published `@vercel/ai-sdk-openai-websocket-fetch` package instead of implementing the WebSocket protocol locally. -- Add a small deep module for composing OpenAI provider fetch behavior with the WebSocket transport. Its interface should hide package details behind simple inputs such as whether WebSocket mode is active and a returned close hook. -- Preserve existing OpenAI fetch behavior when composing the WebSocket transport, including request header handling, Mux attribution, DevTools header stripping, timeout behavior, custom fetch compatibility, and existing request normalization. -- Keep the provider model factory return shape unchanged. -- Add a Mux-owned language-model cleanup helper that can attach cleanup to a model, run cleanup once, and make double cleanup harmless. -- Run language-model cleanup in every stream owner that uses models created by the provider model factory, including main chat/agent streams and workspace title generation. -- Use per-stream WebSocket lifecycle: create for the stream run, allow reuse across internal AI SDK tool-calling steps, and close on completion, error, or cancellation. -- Do not create an ADR for this iteration because the feature is optional, reversible, and sufficiently covered by the glossary plus implementation comments. - -## Testing Decisions - -- Tests should focus on externally observable behavior rather than implementation details. Good tests should prove what users, provider status consumers, stream owners, or provider constructors observe; they should not assert private helper internals unless testing a deliberately extracted deep module through its public interface. -- Test the provider configuration schema accepts `webSocketTransportEnabled` as an optional boolean on the **Built-in OpenAI Provider**. -- Test invalid `webSocketTransportEnabled` values are not surfaced as valid provider status. -- Test provider status includes `webSocketTransportEnabled` when it is configured with a valid boolean. -- Test provider construction uses the WebSocket transport only when `webSocketTransportEnabled` is true and the effective wire format is Responses. -- Test provider construction does not activate WebSocket transport when the effective wire format is Chat Completions. -- Test the OpenAI provider settings UI renders a WebSocket transport control near Wire Format. -- Test the WebSocket transport UI control persists changes to provider configuration. -- Test the WebSocket transport UI control is hidden when Chat Completions wire format is selected. -- Test switching to Chat Completions does not delete the saved `webSocketTransportEnabled` value. -- Test the cleanup helper runs an attached cleanup once and tolerates repeated cleanup calls. -- Test main stream cleanup runs the language-model cleanup helper on completion, error, and cancellation paths. -- Test workspace title generation runs the language-model cleanup helper after its stream attempt completes or fails. -- Mock the published WebSocket transport package in lifecycle tests so tests do not require network access. -- Use existing provider configuration tests, provider service tests, provider model factory tests, settings UI tests, and stream lifecycle tests as prior art. -- Avoid tautological tests that only assert exact UI prose. UI tests should assert behavior such as visibility, disabled state, persisted mutations, and relationship to Wire Format. -- Live OpenAI dogfooding is optional and should not be required in CI because it depends on credentials, network access, endpoint support, and provider billing. - -## Out of Scope - -- Supporting OpenAI-compatible providers with WebSocket transport. -- Implementing the WebSocket protocol directly in Mux. -- Process-wide or cross-stream WebSocket connection caching. -- Automatic HTTP fallback after WebSocket failures. -- Base URL validation or URL derivation for custom endpoints. -- A separate custom WebSocket URL setting. -- Request-level or workspace-level overrides for WebSocket transport. -- Guaranteeing Codex OAuth WebSocket support. -- Adding an ADR for this initial opt-in implementation. -- Making live OpenAI WebSocket tests part of required automated validation. - -## Further Notes - -### Acceptance Criteria - -- Existing OpenAI users see no transport behavior change unless `webSocketTransportEnabled` is explicitly enabled. -- The **Built-in OpenAI Provider** configuration accepts and persists `webSocketTransportEnabled` as an optional boolean. -- The OpenAI provider settings UI exposes the setting near Wire Format. -- The UI hides the setting while Chat Completions wire format is selected and preserves the saved value. -- Enabled Responses-mode OpenAI streams use the published WebSocket transport through Mux's composition layer. -- Chat Completions-mode OpenAI streams do not activate the WebSocket transport. -- WebSocket failures surface normally without automatic HTTP fallback. -- WebSocket resources are closed on stream completion, error, and cancellation for all stream owners that use provider-created models. -- Deterministic automated tests cover config, provider status, provider construction, UI gating, and cleanup lifecycle behavior. - -### Dogfooding Plan - -1. Start the Mux dev environment with an OpenAI-capable provider configuration. -2. Open Settings and navigate to Providers, then expand the OpenAI provider settings. -3. Verify the WebSocket transport control appears near Wire Format with experimental/risk-aware helper copy. -4. With Responses wire format selected, enable WebSocket transport and confirm the provider configuration persists the setting. -5. Send a short prompt using an OpenAI Responses model and verify the response streams successfully, or that an unsupported endpoint failure is surfaced clearly. -6. Switch Wire Format to Chat Completions and verify the WebSocket transport control is hidden while the saved preference is preserved. -7. Switch Wire Format back to Responses and verify the previously saved WebSocket preference is still reflected. -8. Interrupt or cancel a streaming response and verify the app remains stable and no follow-up stream is blocked by leaked transport state. -9. Capture screenshots of the settings UI in enabled and Chat Completions-hidden states. -10. Capture a short recording of enabling the setting, sending a prompt, and switching Wire Format to demonstrate the complete reviewer-visible flow. - -### Issue Tracker Note - -If this PRD is later published to the issue tracker, apply the `needs-triage` label so it enters normal triage. From d9567fedc89c4cd1fedce24184cf3d5b53cc5355 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 12:30:49 +0000 Subject: [PATCH 05/16] Strip DevTools headers for OpenAI WebSocket transport --- .../openAIWebSocketTransportFetch.test.ts | 33 +++++++++++++++++++ .../services/openAIWebSocketTransportFetch.ts | 5 ++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/node/services/openAIWebSocketTransportFetch.test.ts b/src/node/services/openAIWebSocketTransportFetch.test.ts index c9932a4e3c..2c54831a87 100644 --- a/src/node/services/openAIWebSocketTransportFetch.test.ts +++ b/src/node/services/openAIWebSocketTransportFetch.test.ts @@ -1,5 +1,10 @@ import { describe, expect, test } from "bun:test"; +import { + consumeCapturedRequestHeaders, + DEVTOOLS_RUN_METADATA_ID_HEADER, + DEVTOOLS_STEP_ID_HEADER, +} from "./devToolsHeaderCapture"; import { createOpenAIWebSocketTransportFetch } from "./openAIWebSocketTransportFetch"; function getFetchInputUrl(input: RequestInfo | URL): string { @@ -103,6 +108,34 @@ describe("createOpenAIWebSocketTransportFetch", () => { expect(wsCalls).toEqual([]); }); + test("enabled transport strips DevTools headers before WebSocket dispatch", async () => { + let webSocketHeaders: Headers | null = null; + const transport = createOpenAIWebSocketTransportFetch({ + enabled: true, + baseFetch: createTestFetch(() => Promise.resolve(new Response("base"))), + createWebSocketFetch: () => + createTestWebSocketFetch((_input: RequestInfo | URL, init?: RequestInit) => { + webSocketHeaders = new Headers(init?.headers); + return Promise.resolve(new Response("ws")); + }), + }); + + await transport.fetch("https://api.openai.com/v1/responses", { + method: "POST", + headers: { + Authorization: "Bearer test-key", + [DEVTOOLS_STEP_ID_HEADER]: "step-ws-1", + [DEVTOOLS_RUN_METADATA_ID_HEADER]: "run-metadata-1", + }, + body: JSON.stringify({ stream: true }), + }); + + expect(webSocketHeaders?.get(DEVTOOLS_STEP_ID_HEADER)).toBeNull(); + expect(webSocketHeaders?.get(DEVTOOLS_RUN_METADATA_ID_HEADER)).toBeNull(); + const captured = consumeCapturedRequestHeaders("step-ws-1"); + expect(captured).toEqual({ authorization: "[REDACTED]" }); + }); + test("close is idempotent", () => { let closeCalls = 0; const transport = createOpenAIWebSocketTransportFetch({ diff --git a/src/node/services/openAIWebSocketTransportFetch.ts b/src/node/services/openAIWebSocketTransportFetch.ts index 3731dd8ac7..64a0c83ce2 100644 --- a/src/node/services/openAIWebSocketTransportFetch.ts +++ b/src/node/services/openAIWebSocketTransportFetch.ts @@ -1,4 +1,5 @@ import assert from "node:assert"; +import { captureAndStripDevToolsHeader } from "./devToolsHeaderCapture"; import { createWebSocketFetch as createOpenAIWebSocketFetch } from "@vercel/ai-sdk-openai-websocket-fetch"; type WebSocketFetch = typeof fetch & { close: () => void }; @@ -85,7 +86,9 @@ export function createOpenAIWebSocketTransportFetch( return options.baseFetch(input, init); } - return webSocketFetch(input, init); + const headers = new Headers(init?.headers); + captureAndStripDevToolsHeader(headers); + return webSocketFetch(input, { ...(init ?? {}), headers }); }, fetchExtras) as typeof fetch; return { From 33d3cba833e6e23cc5d522829e826f83fb1c6d8e Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 12:40:56 +0000 Subject: [PATCH 06/16] Strip DevTools headers for OpenAI WebSocket transport --- .../services/openAIWebSocketTransportFetch.test.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/node/services/openAIWebSocketTransportFetch.test.ts b/src/node/services/openAIWebSocketTransportFetch.test.ts index 2c54831a87..dc7f2d2b64 100644 --- a/src/node/services/openAIWebSocketTransportFetch.test.ts +++ b/src/node/services/openAIWebSocketTransportFetch.test.ts @@ -109,7 +109,7 @@ describe("createOpenAIWebSocketTransportFetch", () => { }); test("enabled transport strips DevTools headers before WebSocket dispatch", async () => { - let webSocketHeaders: Headers | null = null; + let webSocketHeaders: Headers | undefined; const transport = createOpenAIWebSocketTransportFetch({ enabled: true, baseFetch: createTestFetch(() => Promise.resolve(new Response("base"))), @@ -130,8 +130,12 @@ describe("createOpenAIWebSocketTransportFetch", () => { body: JSON.stringify({ stream: true }), }); - expect(webSocketHeaders?.get(DEVTOOLS_STEP_ID_HEADER)).toBeNull(); - expect(webSocketHeaders?.get(DEVTOOLS_RUN_METADATA_ID_HEADER)).toBeNull(); + expect(webSocketHeaders).toBeDefined(); + if (!webSocketHeaders) { + throw new Error("Expected WebSocket fetch to receive request headers"); + } + expect(webSocketHeaders.get(DEVTOOLS_STEP_ID_HEADER)).toBeNull(); + expect(webSocketHeaders.get(DEVTOOLS_RUN_METADATA_ID_HEADER)).toBeNull(); const captured = consumeCapturedRequestHeaders("step-ws-1"); expect(captured).toEqual({ authorization: "[REDACTED]" }); }); From 7095916393597c3e3bc3a79058ab71e6737cf8a4 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 12:49:47 +0000 Subject: [PATCH 07/16] Address WebSocket cleanup and routing review feedback --- .../Settings/Sections/ProvidersSection.tsx | 740 +++++++++--------- .../services/languageModelCleanup.test.ts | 9 + src/node/services/languageModelCleanup.ts | 13 +- .../openAIWebSocketTransportFetch.test.ts | 85 ++ .../services/openAIWebSocketTransportFetch.ts | 14 +- .../services/providerModelFactory.test.ts | 51 ++ src/node/services/providerModelFactory.ts | 4 +- src/node/services/streamManager.test.ts | 212 ++++- src/node/services/streamManager.ts | 3 +- .../services/workspaceTitleGenerator.test.ts | 67 +- 10 files changed, 806 insertions(+), 392 deletions(-) diff --git a/src/browser/features/Settings/Sections/ProvidersSection.tsx b/src/browser/features/Settings/Sections/ProvidersSection.tsx index ed20a643c1..f5ccc4b671 100644 --- a/src/browser/features/Settings/Sections/ProvidersSection.tsx +++ b/src/browser/features/Settings/Sections/ProvidersSection.tsx @@ -1636,8 +1636,6 @@ export function ProvidersSection() { const gatewayRouteTargets = providerDefinition?.kind === "gateway" ? (providerDefinition.routes ?? []) : []; const isCustomOpenAICompatible = isCustomOpenAICompatibleProviderInfo(providerInfo); - const openAIWireFormat = providerInfo?.wireFormat ?? "responses"; - const openAIWebSocketTransportVisible = openAIWireFormat === "responses"; const statusDotColor = !enabled ? "bg-warning" : configured @@ -2247,404 +2245,418 @@ export function ProvidersSection() { )} {/* OpenAI: ChatGPT OAuth + service tier */} - {provider === "openai" && ( -
-
- - - {codexOauthStatus === "starting" - ? "Starting..." - : codexOauthStatus === "waiting" - ? "Waiting for login..." - : codexOauthIsConnected - ? "Connected" - : "Not connected"} - -
- -
- {!isRemoteServer && ( - - )} - + {provider === "openai" && + (() => { + const openAIWireFormat = providerInfo?.wireFormat ?? "responses"; + const openAIWebSocketTransportVisible = openAIWireFormat === "responses"; + return ( +
+
+ + + {codexOauthStatus === "starting" + ? "Starting..." + : codexOauthStatus === "waiting" + ? "Waiting for login..." + : codexOauthIsConnected + ? "Connected" + : "Not connected"} + +
- {codexOauthStatus === "waiting" && - !codexOauthDeviceFlow && - codexOauthAuthorizeUrl && ( +
+ {!isRemoteServer && ( + + )} - )} - {codexOauthLoginInProgress && ( - - )} + {codexOauthStatus === "waiting" && + !codexOauthDeviceFlow && + codexOauthAuthorizeUrl && ( + + )} - {codexOauthIsConnected && ( - - )} -
+ {codexOauthLoginInProgress && ( + + )} - {codexOauthDeviceFlow && ( -
-

- Enter this code on the OpenAI verification page: -

-
- - {codexOauthDeviceFlow.userCode} - - + {codexOauthIsConnected && ( + + )}
-

- - Waiting for authorization... -

-
- )} - {codexOauthStatus === "waiting" && !codexOauthDeviceFlow && ( -

- - Waiting for authorization... -

- )} + {codexOauthDeviceFlow && ( +
+

+ Enter this code on the OpenAI verification page: +

+
+ + {codexOauthDeviceFlow.userCode} + + +
+

+ + Waiting for authorization... +

+
+ )} - {codexOauthStatus === "error" && codexOauthError && ( -

{codexOauthError}

- )} + {codexOauthStatus === "waiting" && !codexOauthDeviceFlow && ( +

+ + Waiting for authorization... +

+ )} -
-
- -

- Applies to models that support both ChatGPT OAuth and API keys (e.g.{" "} - gpt-5.5). -

-
+ {codexOauthStatus === "error" && codexOauthError && ( +

{codexOauthError}

+ )} - { - if (!api) return; - if (next !== "oauth" && next !== "apiKey") { - return; - } +
+
+ +

+ Applies to models that support both ChatGPT OAuth and API keys + (e.g. gpt-5.5). +

+
- updateOptimistically("openai", { codexOauthDefaultAuth: next }); - void api.providers.setProviderConfig({ - provider: "openai", - keyPath: ["codexOauthDefaultAuth"], - value: next, - }); - }} - size="sm" - className="h-9" - disabled={!api || !codexOauthDefaultAuthIsEditable} - > - - Use ChatGPT OAuth by default - - - Use OpenAI API key by default - - + { + if (!api) return; + if (next !== "oauth" && next !== "apiKey") { + return; + } -

- ChatGPT OAuth uses subscription billing (costs included). API key uses - OpenAI platform billing. -

+ updateOptimistically("openai", { codexOauthDefaultAuth: next }); + void api.providers.setProviderConfig({ + provider: "openai", + keyPath: ["codexOauthDefaultAuth"], + value: next, + }); + }} + size="sm" + className="h-9" + disabled={!api || !codexOauthDefaultAuthIsEditable} + > + + Use ChatGPT OAuth by default + + + Use OpenAI API key by default + +
- {!codexOauthDefaultAuthIsEditable && ( -

- Connect ChatGPT OAuth and set an OpenAI API key to change this - setting. -

- )} -
+

+ ChatGPT OAuth uses subscription billing (costs included). API key + uses OpenAI platform billing. +

-
-
- - - - - - ? - - - -
-
OpenAI service tier
-
- auto: standard - behavior. -
-
- priority: lower - latency, higher cost. -
-
- flex: lower cost, - higher latency. -
-
-
-
-
-
- { + if (!api) return; - if (next === OPENAI_SERVICE_TIER_UNSET) { - setOpenaiServiceTierSelectOverride(OPENAI_SERVICE_TIER_UNSET); - void api.providers - .setProviderConfig({ + if (next === OPENAI_SERVICE_TIER_UNSET) { + setOpenaiServiceTierSelectOverride(OPENAI_SERVICE_TIER_UNSET); + void api.providers + .setProviderConfig({ + provider: "openai", + keyPath: ["serviceTier"], + value: "", + }) + .then(() => refresh()) + .finally(() => setOpenaiServiceTierSelectOverride(null)); + return; + } + + if (!isOpenAIServiceTier(next)) { + return; + } + + setOpenaiServiceTierSelectOverride(null); + updateOptimistically("openai", { serviceTier: next }); + void api.providers.setProviderConfig({ provider: "openai", keyPath: ["serviceTier"], - value: "", - }) - .then(() => refresh()) - .finally(() => setOpenaiServiceTierSelectOverride(null)); - return; - } - - if (!isOpenAIServiceTier(next)) { - return; - } + value: next, + }); + }} + > + + + + + + Not configured (omit service_tier) + + auto + default + flex + priority + + +
- setOpenaiServiceTierSelectOverride(null); - updateOptimistically("openai", { serviceTier: next }); - void api.providers.setProviderConfig({ - provider: "openai", - keyPath: ["serviceTier"], - value: next, - }); - }} - > - - - - - - Not configured (omit service_tier) - - auto - default - flex - priority - - -
+
+
+ + + + + + ? + + + +
+
OpenAI wire format
+
+ responses: modern + API with persistence and built-in tools (default). +
+
+ chat completions: + legacy /chat/completions endpoint. Use if your provider + doesn't support the Responses API (e.g. Azure Gov). +
+
+
+
+
+
+ +
+ {openAIWebSocketTransportVisible && ( +
+
+
+ + + Experimental: uses OpenAI's Responses WebSocket + transport for streaming Responses API requests. Unsupported + endpoints may fail. +
- - - -
- -
- {openAIWebSocketTransportVisible && ( -
-
-
-
- )} - -
-
- - - - - - ? - - - -
-
OpenAI response storage
-
- enabled: OpenAI - stores responses for retrieval and context (default). -
-
- disabled: responses - are not stored. Required for zero data retention (ZDR) - endpoints. -
-
-
-
-
-
- -
-
- )} + ); + })()} {isCustomOpenAICompatible && (
diff --git a/src/node/services/languageModelCleanup.test.ts b/src/node/services/languageModelCleanup.test.ts index 22838b6876..45efab5d9b 100644 --- a/src/node/services/languageModelCleanup.test.ts +++ b/src/node/services/languageModelCleanup.test.ts @@ -62,6 +62,15 @@ describe("language model cleanup", () => { expect(cleanupCalls).toBe(1); }); + test("rejects double attach before cleanup is moved or run", () => { + const model = createModel(); + attachLanguageModelCleanup(model, () => undefined); + + expect(() => attachLanguageModelCleanup(model, () => undefined)).toThrow( + "language model already has cleanup attached" + ); + }); + test("models without cleanup are safe", () => { expect(() => runLanguageModelCleanup(createModel())).not.toThrow(); }); diff --git a/src/node/services/languageModelCleanup.ts b/src/node/services/languageModelCleanup.ts index 7e3f0d8405..1b96c9138e 100644 --- a/src/node/services/languageModelCleanup.ts +++ b/src/node/services/languageModelCleanup.ts @@ -13,11 +13,14 @@ type LanguageModelWithCleanup = LanguageModel & { export function attachLanguageModelCleanup( model: LanguageModel, cleanup: LanguageModelCleanup -): LanguageModel { +): void { assert(typeof cleanup === "function", "language model cleanup must be a function"); + assert( + !hasLanguageModelCleanup(model), + "language model already has cleanup attached; call moveLanguageModelCleanup instead" + ); const modelWithCleanup = model as LanguageModelWithCleanup; modelWithCleanup[languageModelCleanupSymbol] = cleanup; - return model; } export function moveLanguageModelCleanup(source: LanguageModel, target: LanguageModel): void { @@ -36,7 +39,11 @@ export function hasLanguageModelCleanup(model: LanguageModel): boolean { return typeof modelWithCleanup[languageModelCleanupSymbol] === "function"; } -export function runLanguageModelCleanup(model: LanguageModel): void { +export function runLanguageModelCleanup(model: LanguageModel | undefined): void { + if (model === undefined) { + return; + } + const modelWithCleanup = model as LanguageModelWithCleanup; const cleanup = modelWithCleanup[languageModelCleanupSymbol]; if (cleanup === undefined) { diff --git a/src/node/services/openAIWebSocketTransportFetch.test.ts b/src/node/services/openAIWebSocketTransportFetch.test.ts index dc7f2d2b64..fc8fe4f61f 100644 --- a/src/node/services/openAIWebSocketTransportFetch.test.ts +++ b/src/node/services/openAIWebSocketTransportFetch.test.ts @@ -140,6 +140,91 @@ describe("createOpenAIWebSocketTransportFetch", () => { expect(captured).toEqual({ authorization: "[REDACTED]" }); }); + test("enabled transport keeps non-streaming Responses posts on the base fetch", async () => { + const baseBodies: string[] = []; + const wsCalls: string[] = []; + const baseFetch = createTestFetch((_input: RequestInfo | URL, init?: RequestInit) => { + baseBodies.push(String(init?.body ?? "")); + return Promise.resolve(new Response("base")); + }); + const transport = createOpenAIWebSocketTransportFetch({ + enabled: true, + baseFetch, + createWebSocketFetch: () => + createTestWebSocketFetch((input: RequestInfo | URL) => { + wsCalls.push(getFetchInputUrl(input)); + return Promise.resolve(new Response("ws")); + }), + }); + + const streamFalse = await transport.fetch("https://api.openai.com/v1/responses", { + method: "POST", + body: JSON.stringify({ stream: false }), + }); + const streamAbsent = await transport.fetch("https://api.openai.com/v1/responses", { + method: "POST", + body: JSON.stringify({}), + }); + + expect(await streamFalse.text()).toBe("base"); + expect(await streamAbsent.text()).toBe("base"); + expect(baseBodies).toEqual([JSON.stringify({ stream: false }), JSON.stringify({})]); + expect(wsCalls).toEqual([]); + }); + + test("enabled transport recognizes Responses URLs with query parameters", async () => { + const wsCalls: string[] = []; + const transport = createOpenAIWebSocketTransportFetch({ + enabled: true, + baseFetch: createTestFetch(() => Promise.resolve(new Response("base"))), + createWebSocketFetch: () => + createTestWebSocketFetch((input: RequestInfo | URL) => { + wsCalls.push(getFetchInputUrl(input)); + return Promise.resolve(new Response("ws")); + }), + }); + + const response = await transport.fetch("https://api.openai.com/v1/responses?beta=2", { + method: "POST", + body: JSON.stringify({ stream: true }), + }); + + expect(await response.text()).toBe("ws"); + expect(wsCalls).toEqual(["https://api.openai.com/v1/responses?beta=2"]); + }); + + test("close retries after a connection-establishment race", async () => { + let closeCalls = 0; + let resolveWebSocketFetch: ((response: Response) => void) | undefined; + const webSocketFetchPromise = new Promise((resolve) => { + resolveWebSocketFetch = resolve; + }); + const transport = createOpenAIWebSocketTransportFetch({ + enabled: true, + baseFetch: createTestFetch(() => Promise.resolve(new Response("base"))), + createWebSocketFetch: () => + createTestWebSocketFetch( + () => webSocketFetchPromise, + () => { + closeCalls += 1; + } + ), + }); + + const responsePromise = transport.fetch("https://api.openai.com/v1/responses", { + method: "POST", + body: JSON.stringify({ stream: true }), + }); + transport.close(); + if (!resolveWebSocketFetch) { + throw new Error("Expected test WebSocket fetch resolver to be initialized"); + } + resolveWebSocketFetch(new Response("ws")); + + expect(await (await responsePromise).text()).toBe("ws"); + expect(closeCalls).toBe(2); + }); + test("close is idempotent", () => { let closeCalls = 0; const transport = createOpenAIWebSocketTransportFetch({ diff --git a/src/node/services/openAIWebSocketTransportFetch.ts b/src/node/services/openAIWebSocketTransportFetch.ts index 64a0c83ce2..3df77ce56b 100644 --- a/src/node/services/openAIWebSocketTransportFetch.ts +++ b/src/node/services/openAIWebSocketTransportFetch.ts @@ -32,7 +32,7 @@ function isStreamingResponsesRequest(input: RequestInfo | URL, init?: RequestIni return false; } - if (!getRequestUrl(input).endsWith("/responses")) { + if (!/\/v1\/responses(\?|$)/.test(getRequestUrl(input))) { return false; } @@ -63,12 +63,12 @@ export function createOpenAIWebSocketTransportFetch( const webSocketFetch = webSocketFetchFactory(); assert(typeof webSocketFetch.close === "function", "OpenAI WebSocket fetch must expose close()"); - let closed = false; + let closeRequested = false; const close = (): void => { - if (closed) { + if (closeRequested) { return; } - closed = true; + closeRequested = true; webSocketFetch.close(); }; @@ -88,7 +88,11 @@ export function createOpenAIWebSocketTransportFetch( const headers = new Headers(init?.headers); captureAndStripDevToolsHeader(headers); - return webSocketFetch(input, { ...(init ?? {}), headers }); + const response = await webSocketFetch(input, { ...(init ?? {}), headers }); + if (closeRequested) { + webSocketFetch.close(); + } + return response; }, fetchExtras) as typeof fetch; return { diff --git a/src/node/services/providerModelFactory.test.ts b/src/node/services/providerModelFactory.test.ts index d86fc18b42..6800ce8dcc 100644 --- a/src/node/services/providerModelFactory.test.ts +++ b/src/node/services/providerModelFactory.test.ts @@ -22,6 +22,7 @@ import { } from "./providerModelFactory"; import { MUX_ANTHROPIC_EFFORT_OVERRIDE_HEADER } from "@/common/utils/ai/providerOptions"; import { hasLanguageModelCleanup } from "./languageModelCleanup"; +import type { DevToolsService } from "./devToolsService"; import { CodexOauthService } from "./codexOauthService"; import { PolicyService } from "./policyService"; import { ProviderService } from "./providerService"; @@ -828,6 +829,56 @@ describe("ProviderModelFactory OpenAI WebSocket transport", () => { }); }); + it("does not attach cleanup when a custom OpenAI base URL is configured", async () => { + await withTempConfig(async (config, factory) => { + config.saveProvidersConfig({ + openai: { + apiKey: "sk-test", + baseURL: "https://proxy.openai.test/v1", + webSocketTransportEnabled: true, + }, + }); + + const result = await factory.createModel("openai:gpt-4.1-mini"); + + expect(result.success).toBe(true); + if (!result.success) { + return; + } + expect(hasLanguageModelCleanup(result.data)).toBe(false); + }); + }); + + it("preserves cleanup when DevTools wraps an OpenAI WebSocket model", async () => { + await withTempConfig(async (config) => { + config.saveProvidersConfig({ + openai: { + apiKey: "sk-test", + webSocketTransportEnabled: true, + }, + }); + const providerService = new ProviderService(config); + const devToolsService = { enabled: true } as unknown as DevToolsService; + const factory = new ProviderModelFactory( + config, + providerService, + undefined, + undefined, + devToolsService + ); + + const result = await factory.createModel("openai:gpt-4.1-mini", undefined, { + workspaceId: "devtools-workspace", + }); + + expect(result.success).toBe(true); + if (!result.success) { + return; + } + expect(hasLanguageModelCleanup(result.data)).toBe(true); + }); + }); + it("does not attach cleanup when Chat Completions is selected", async () => { await withTempConfig(async (config, factory) => { config.saveProvidersConfig({ diff --git a/src/node/services/providerModelFactory.ts b/src/node/services/providerModelFactory.ts index f88d376c63..c11087a04b 100644 --- a/src/node/services/providerModelFactory.ts +++ b/src/node/services/providerModelFactory.ts @@ -1303,9 +1303,11 @@ export class ProviderModelFactory { const baseFetch = getProviderFetch(providerConfig); const codexOauthService = this.codexOauthService; + const openAIHasCustomBaseURL = + providerConfig.baseURL != null || providerConfig.baseUrl != null || creds.baseUrl != null; const webSocketTransportEnabled = (providerConfig as { webSocketTransportEnabled?: unknown }).webSocketTransportEnabled === - true; + true && !openAIHasCustomBaseURL; // Wrap fetch so Codex OAuth Responses requests are normalized before // they are rerouted from api.openai.com to chatgpt.com's Codex backend. diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index 8c60dec1f7..e5ac4d3390 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -27,6 +27,17 @@ import { createRuntime } from "@/node/runtime/runtimeFactory"; import { attachLanguageModelCleanup } from "./languageModelCleanup"; import { shellQuote } from "@/common/utils/shell"; +function createTestLanguageModel(modelId = "cleanup-model"): LanguageModel { + return { + specificationVersion: "v3", + provider: "test", + modelId, + supportedUrls: {}, + doGenerate: () => Promise.reject(new Error("doGenerate is unused in StreamManager tests")), + doStream: () => Promise.reject(new Error("doStream is unused in StreamManager tests")), + }; +} + // Skip integration tests if TEST_INTEGRATION is not set const describeIntegration = shouldRunIntegrationTests() ? describe : describe.skip; @@ -936,14 +947,7 @@ describe("StreamManager - language model cleanup", () => { const messageId = "cleanup-message"; const historySequence = 1; let cleanupCalls = 0; - const model: LanguageModel = { - specificationVersion: "v3", - provider: "test", - modelId: "cleanup-model", - supportedUrls: {}, - doGenerate: () => Promise.reject(new Error("doGenerate is unused in cleanup tests")), - doStream: () => Promise.reject(new Error("doStream is unused in cleanup tests")), - }; + const model = createTestLanguageModel(); attachLanguageModelCleanup(model, () => { cleanupCalls += 1; }); @@ -1009,6 +1013,197 @@ describe("StreamManager - language model cleanup", () => { await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence); + expect(cleanupCalls).toBe(1); + }); + test("runs model cleanup when startStream exits before processing after abort", async () => { + const streamManager = new StreamManager(historyService); + let cleanupCalls = 0; + const model = createTestLanguageModel("cleanup-preabort-model"); + attachLanguageModelCleanup(model, () => { + cleanupCalls += 1; + }); + const abortController = new AbortController(); + abortController.abort(new Error("pre-abort")); + + const result = await streamManager.startStream( + "cleanup-preabort-workspace", + [{ role: "user", content: "hello" }], + model, + "openai:gpt-4.1-mini", + 1, + "system", + runtime, + "cleanup-preabort-message", + abortController.signal + ); + + expect(result.success).toBe(true); + expect(cleanupCalls).toBe(1); + }); + + test("runs model cleanup when stream creation throws before processing", async () => { + const streamManager = new StreamManager(historyService); + let cleanupCalls = 0; + const model = createTestLanguageModel("cleanup-create-throw-model"); + attachLanguageModelCleanup(model, () => { + cleanupCalls += 1; + }); + const replaceCreateStreamResult = Reflect.set(streamManager, "createStreamResult", () => { + throw new Error("create stream failed"); + }); + expect(replaceCreateStreamResult).toBe(true); + + const result = await streamManager.startStream( + "cleanup-create-throw-workspace", + [{ role: "user", content: "hello" }], + model, + "openai:gpt-4.1-mini", + 1, + "system", + runtime, + "cleanup-create-throw-message" + ); + + expect(result.success).toBe(false); + expect(cleanupCalls).toBe(1); + }); + + test("runs model cleanup when stream processing fails", async () => { + const streamManager = new StreamManager(historyService); + streamManager.on("error", () => undefined); + const workspaceId = "cleanup-error-workspace"; + const messageId = "cleanup-error-message"; + const historySequence = 1; + let cleanupCalls = 0; + const model = createTestLanguageModel("cleanup-error-model"); + attachLanguageModelCleanup(model, () => { + cleanupCalls += 1; + }); + + const appendResult = await historyService.appendToHistory(workspaceId, { + id: messageId, + role: "assistant", + metadata: { historySequence, partial: true }, + parts: [], + }); + expect(appendResult.success).toBe(true); + + const processStreamWithCleanup = Reflect.get(streamManager, "processStreamWithCleanup") as ( + workspaceId: string, + streamInfo: unknown, + historySequence: number + ) => Promise; + expect(typeof processStreamWithCleanup).toBe("function"); + + const streamInfo = { + state: "streaming", + streamResult: { + fullStream: (async function* () { + throw new Error("stream failed before output"); + })(), + totalUsage: Promise.resolve({ inputTokens: 1, outputTokens: 0, totalTokens: 1 }), + usage: Promise.resolve({ inputTokens: 1, outputTokens: 0, totalTokens: 1 }), + providerMetadata: Promise.resolve(undefined), + steps: Promise.resolve([]), + }, + abortController: new AbortController(), + messageId, + token: "cleanup-error-token", + startTime: Date.now(), + lastPartTimestamp: Date.now(), + toolCompletionTimestamps: new Map(), + model: "openai:gpt-4.1-mini", + metadataModel: "openai:gpt-4.1-mini", + historySequence, + request: { model, messages: [], providerOptions: undefined }, + toolModelUsages: [], + parts: [], + lastPartialWriteTime: 0, + partialWritePromise: undefined, + processingPromise: Promise.resolve(), + softInterrupt: { pending: false as const }, + runtimeTempDir: "", + runtime, + cumulativeUsage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, + cumulativeProviderMetadata: undefined, + didRetryPreviousResponseIdAtStep: false, + currentStepStartIndex: 0, + stepTracker: {}, + }; + + await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence); + + expect(cleanupCalls).toBe(1); + }); + + test("runs model cleanup when stream processing is aborted", async () => { + const streamManager = new StreamManager(historyService); + streamManager.on("error", () => undefined); + const workspaceId = "cleanup-abort-workspace"; + const messageId = "cleanup-abort-message"; + const historySequence = 1; + let cleanupCalls = 0; + const model = createTestLanguageModel("cleanup-abort-model"); + attachLanguageModelCleanup(model, () => { + cleanupCalls += 1; + }); + + const appendResult = await historyService.appendToHistory(workspaceId, { + id: messageId, + role: "assistant", + metadata: { historySequence, partial: true }, + parts: [], + }); + expect(appendResult.success).toBe(true); + + const processStreamWithCleanup = Reflect.get(streamManager, "processStreamWithCleanup") as ( + workspaceId: string, + streamInfo: unknown, + historySequence: number + ) => Promise; + expect(typeof processStreamWithCleanup).toBe("function"); + const abortController = new AbortController(); + abortController.abort(new Error("test abort")); + + const streamInfo = { + state: "streaming", + streamResult: { + fullStream: (async function* () { + await Promise.resolve(); + yield* [] as unknown[]; + })(), + totalUsage: Promise.resolve({ inputTokens: 1, outputTokens: 0, totalTokens: 1 }), + usage: Promise.resolve({ inputTokens: 1, outputTokens: 0, totalTokens: 1 }), + providerMetadata: Promise.resolve(undefined), + steps: Promise.resolve([]), + }, + abortController, + messageId, + token: "cleanup-abort-token", + startTime: Date.now(), + lastPartTimestamp: Date.now(), + toolCompletionTimestamps: new Map(), + model: "openai:gpt-4.1-mini", + metadataModel: "openai:gpt-4.1-mini", + historySequence, + request: { model, messages: [], providerOptions: undefined }, + toolModelUsages: [], + parts: [], + lastPartialWriteTime: 0, + partialWritePromise: undefined, + processingPromise: Promise.resolve(), + softInterrupt: { pending: false as const }, + runtimeTempDir: "", + runtime, + cumulativeUsage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, + cumulativeProviderMetadata: undefined, + didRetryPreviousResponseIdAtStep: false, + currentStepStartIndex: 0, + stepTracker: {}, + }; + + await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence); + expect(cleanupCalls).toBe(1); }); }); @@ -1828,6 +2023,7 @@ describe("StreamManager - TTFT metadata persistence", () => { historySequence: params.historySequence, initialMetadata: params.initialMetadata, toolModelUsages: [], + request: { model: createTestLanguageModel(), messages: [], providerOptions: undefined }, parts: params.parts, lastPartialWriteTime: 0, partialWriteTimer: undefined, diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 5b21d6831b..8ed4312190 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -2335,7 +2335,7 @@ export class StreamManager extends EventEmitter { streamInfo.partialWriteTimer = undefined; } - runLanguageModelCleanup(streamInfo.request.model); + runLanguageModelCleanup(streamInfo.request?.model); streamInfo.unlinkAbortSignal?.(); streamInfo.unlinkAbortSignal = undefined; @@ -3016,6 +3016,7 @@ export class StreamManager extends EventEmitter { return Ok(streamToken); } finally { if (!streamRegistered) { + runLanguageModelCleanup(model); unlinkAbortSignal(); if (runtimeTempDir) { this.cleanupStreamTempDir(runtime, runtimeTempDir); diff --git a/src/node/services/workspaceTitleGenerator.test.ts b/src/node/services/workspaceTitleGenerator.test.ts index b7fcd65c45..91fd24afa0 100644 --- a/src/node/services/workspaceTitleGenerator.test.ts +++ b/src/node/services/workspaceTitleGenerator.test.ts @@ -68,25 +68,28 @@ const createApiCallError = ( }); describe("generateWorkspaceIdentity cleanup", () => { - test("cleans up the model after a successful title stream", async () => { - let cleanupCalls = 0; - const model: LanguageModel = { + function createTitleModel(modelId = "title-model"): LanguageModel { + return { specificationVersion: "v3", provider: "test", - modelId: "title-model", + modelId, supportedUrls: {}, doGenerate: () => Promise.reject(new Error("doGenerate is unused in cleanup tests")), doStream: () => Promise.reject(new Error("doStream is unused in cleanup tests")), }; + } + + function createTitleAIService(model: LanguageModel): AIService { + return { createModel: () => Promise.resolve(Ok(model)) } as unknown as AIService; + } + + test("cleans up the model after a successful title stream", async () => { + let cleanupCalls = 0; + const model = createTitleModel(); attachLanguageModelCleanup(model, () => { cleanupCalls += 1; }); - const aiService = { - createModel: () => Promise.resolve(Ok(model)), - }; - - // AIService is a class with many runtime dependencies; this test only needs its public createModel seam. - const titleAiService = aiService as unknown as AIService; + const titleAiService = createTitleAIService(model); spyOn(aiSdk, "streamText").mockReturnValue({ toolResults: Promise.resolve([ @@ -107,6 +110,50 @@ describe("generateWorkspaceIdentity cleanup", () => { expect(result.success).toBe(true); expect(cleanupCalls).toBe(1); }); + + test("cleans up when title stream throws before trying the next candidate", async () => { + let cleanupCalls = 0; + const failingModel = createTitleModel("title-failing-model"); + attachLanguageModelCleanup(failingModel, () => { + cleanupCalls += 1; + }); + const aiService = createTitleAIService(failingModel); + + spyOn(aiSdk, "streamText").mockImplementation(() => { + throw new Error("title stream failed"); + }); + + const result = await generateWorkspaceIdentity( + "Add setting", + ["openai:gpt-4.1-mini"], + aiService + ); + + expect(result.success).toBe(false); + expect(cleanupCalls).toBe(1); + }); + + test("cleans up when title stream returns no propose_name result", async () => { + let cleanupCalls = 0; + const model = createTitleModel("title-no-tool-model"); + attachLanguageModelCleanup(model, () => { + cleanupCalls += 1; + }); + const aiService = createTitleAIService(model); + + spyOn(aiSdk, "streamText").mockReturnValue({ + toolResults: Promise.resolve([]), + } as unknown as ReturnType); + + const result = await generateWorkspaceIdentity( + "Add setting", + ["openai:gpt-4.1-mini"], + aiService + ); + + expect(result.success).toBe(false); + expect(cleanupCalls).toBe(1); + }); }); describe("workspaceTitleGenerator error mappers", () => { From 4c1e847173102bc46ea55abed473b8e4351ebe48 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 12:56:39 +0000 Subject: [PATCH 08/16] Close WebSocket transports across edge paths --- .../openAIWebSocketTransportFetch.test.ts | 27 ++++++++++++++++++- .../services/openAIWebSocketTransportFetch.ts | 25 ++++++++++++----- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/src/node/services/openAIWebSocketTransportFetch.test.ts b/src/node/services/openAIWebSocketTransportFetch.test.ts index fc8fe4f61f..02c5f72e25 100644 --- a/src/node/services/openAIWebSocketTransportFetch.test.ts +++ b/src/node/services/openAIWebSocketTransportFetch.test.ts @@ -57,6 +57,27 @@ describe("createOpenAIWebSocketTransportFetch", () => { expect(() => transport.close()).not.toThrow(); }); + test("enabled transport creates the WebSocket fetch lazily", async () => { + let created = false; + const transport = createOpenAIWebSocketTransportFetch({ + enabled: true, + baseFetch: createTestFetch(() => Promise.resolve(new Response("base"))), + createWebSocketFetch: () => { + created = true; + return createTestWebSocketFetch(() => Promise.resolve(new Response("ws"))); + }, + }); + + expect(created).toBe(false); + await transport.fetch("https://api.openai.com/v1/models", { method: "GET" }); + expect(created).toBe(false); + await transport.fetch("https://api.openai.com/v1/responses", { + method: "POST", + body: JSON.stringify({ stream: true }), + }); + expect(created).toBe(true); + }); + test("enabled transport sends streaming Responses API posts through WebSocket fetch", async () => { const wsCalls: string[] = []; const transport = createOpenAIWebSocketTransportFetch({ @@ -225,7 +246,7 @@ describe("createOpenAIWebSocketTransportFetch", () => { expect(closeCalls).toBe(2); }); - test("close is idempotent", () => { + test("close is idempotent after WebSocket fetch creation", async () => { let closeCalls = 0; const transport = createOpenAIWebSocketTransportFetch({ enabled: true, @@ -239,6 +260,10 @@ describe("createOpenAIWebSocketTransportFetch", () => { ), }); + await transport.fetch("https://api.openai.com/v1/responses", { + method: "POST", + body: JSON.stringify({ stream: true }), + }); transport.close(); transport.close(); diff --git a/src/node/services/openAIWebSocketTransportFetch.ts b/src/node/services/openAIWebSocketTransportFetch.ts index 3df77ce56b..b56b04a2d7 100644 --- a/src/node/services/openAIWebSocketTransportFetch.ts +++ b/src/node/services/openAIWebSocketTransportFetch.ts @@ -2,7 +2,9 @@ import assert from "node:assert"; import { captureAndStripDevToolsHeader } from "./devToolsHeaderCapture"; import { createWebSocketFetch as createOpenAIWebSocketFetch } from "@vercel/ai-sdk-openai-websocket-fetch"; -type WebSocketFetch = typeof fetch & { close: () => void }; +type WebSocketFetch = ((input: RequestInfo | URL, init?: RequestInit) => Promise) & { + close: () => void; +}; type WebSocketFetchFactory = () => WebSocketFetch; interface CreateOpenAIWebSocketTransportFetchOptions { @@ -60,8 +62,18 @@ export function createOpenAIWebSocketTransportFetch( } const webSocketFetchFactory = options.createWebSocketFetch ?? createOpenAIWebSocketFetch; - const webSocketFetch = webSocketFetchFactory(); - assert(typeof webSocketFetch.close === "function", "OpenAI WebSocket fetch must expose close()"); + let webSocketFetch: WebSocketFetch | null = null; + + const getWebSocketFetch = (): WebSocketFetch => { + if (webSocketFetch === null) { + webSocketFetch = webSocketFetchFactory(); + } + assert( + typeof webSocketFetch.close === "function", + "OpenAI WebSocket fetch must expose close()" + ); + return webSocketFetch; + }; let closeRequested = false; const close = (): void => { @@ -69,7 +81,7 @@ export function createOpenAIWebSocketTransportFetch( return; } closeRequested = true; - webSocketFetch.close(); + webSocketFetch?.close(); }; const baseFetchWithPreconnect = options.baseFetch as typeof fetch & { @@ -86,11 +98,12 @@ export function createOpenAIWebSocketTransportFetch( return options.baseFetch(input, init); } + const activeWebSocketFetch = getWebSocketFetch(); const headers = new Headers(init?.headers); captureAndStripDevToolsHeader(headers); - const response = await webSocketFetch(input, { ...(init ?? {}), headers }); + const response = await activeWebSocketFetch(input, { ...(init ?? {}), headers }); if (closeRequested) { - webSocketFetch.close(); + activeWebSocketFetch.close(); } return response; }, fetchExtras) as typeof fetch; From 2cc3d1a6f790bed5b521467428eb08c79c7cf588 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 12:58:56 +0000 Subject: [PATCH 09/16] Hide WebSocket toggle for custom OpenAI endpoints --- .../Settings/Sections/ProvidersSection.test.tsx | 17 +++++++++++++++++ .../Settings/Sections/ProvidersSection.tsx | 13 ++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/browser/features/Settings/Sections/ProvidersSection.test.tsx b/src/browser/features/Settings/Sections/ProvidersSection.test.tsx index 89ef4f7c83..8beff1a873 100644 --- a/src/browser/features/Settings/Sections/ProvidersSection.test.tsx +++ b/src/browser/features/Settings/Sections/ProvidersSection.test.tsx @@ -357,6 +357,23 @@ describe("ProvidersSection", () => { }); }); + test("hides the OpenAI WebSocket transport toggle when OpenAI uses a custom base URL", async () => { + const view = renderProvidersSection(); + view.providersConfig.openai.baseUrl = "https://proxy.openai.test/v1"; + view.providersConfig.openai.webSocketTransportEnabled = true; + const openAiButton = await view.findByRole("button", { name: /^OpenAI$/ }); + + fireEvent.click(openAiButton); + + const openAiCard = getProviderCard(openAiButton); + expect( + within(openAiCard).queryByRole("switch", { + name: /WebSocket transport/i, + }) + ).toBeNull(); + expect(view.providersConfig.openai.webSocketTransportEnabled).toBe(true); + }); + test("hides the OpenAI WebSocket transport toggle for Chat Completions without clearing it", async () => { const view = renderProvidersSection(); view.providersConfig.openai.wireFormat = "chatCompletions"; diff --git a/src/browser/features/Settings/Sections/ProvidersSection.tsx b/src/browser/features/Settings/Sections/ProvidersSection.tsx index f5ccc4b671..3c22d9d994 100644 --- a/src/browser/features/Settings/Sections/ProvidersSection.tsx +++ b/src/browser/features/Settings/Sections/ProvidersSection.tsx @@ -2248,7 +2248,18 @@ export function ProvidersSection() { {provider === "openai" && (() => { const openAIWireFormat = providerInfo?.wireFormat ?? "responses"; - const openAIWebSocketTransportVisible = openAIWireFormat === "responses"; + const openAIBaseUrl = + typeof providerInfo?.baseUrl === "string" + ? providerInfo.baseUrl.trim() + : ""; + const openAIResolvedBaseUrl = + typeof providerInfo?.baseUrlResolved === "string" + ? providerInfo.baseUrlResolved.trim() + : ""; + const openAIWebSocketTransportVisible = + openAIWireFormat === "responses" && + openAIBaseUrl.length === 0 && + openAIResolvedBaseUrl.length === 0; return (
From 74ed0c4457854b828ff6a7c34614761ef8a365f5 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 13:04:10 +0000 Subject: [PATCH 10/16] Fix WebSocket review test lint --- src/node/services/openAIWebSocketTransportFetch.test.ts | 2 +- src/node/services/openAIWebSocketTransportFetch.ts | 4 +--- src/node/services/streamManager.test.ts | 2 ++ 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/node/services/openAIWebSocketTransportFetch.test.ts b/src/node/services/openAIWebSocketTransportFetch.test.ts index 02c5f72e25..b9c1dcc2e9 100644 --- a/src/node/services/openAIWebSocketTransportFetch.test.ts +++ b/src/node/services/openAIWebSocketTransportFetch.test.ts @@ -165,7 +165,7 @@ describe("createOpenAIWebSocketTransportFetch", () => { const baseBodies: string[] = []; const wsCalls: string[] = []; const baseFetch = createTestFetch((_input: RequestInfo | URL, init?: RequestInit) => { - baseBodies.push(String(init?.body ?? "")); + baseBodies.push(typeof init?.body === "string" ? init.body : ""); return Promise.resolve(new Response("base")); }); const transport = createOpenAIWebSocketTransportFetch({ diff --git a/src/node/services/openAIWebSocketTransportFetch.ts b/src/node/services/openAIWebSocketTransportFetch.ts index b56b04a2d7..62bc69c2e9 100644 --- a/src/node/services/openAIWebSocketTransportFetch.ts +++ b/src/node/services/openAIWebSocketTransportFetch.ts @@ -65,9 +65,7 @@ export function createOpenAIWebSocketTransportFetch( let webSocketFetch: WebSocketFetch | null = null; const getWebSocketFetch = (): WebSocketFetch => { - if (webSocketFetch === null) { - webSocketFetch = webSocketFetchFactory(); - } + webSocketFetch ??= webSocketFetchFactory(); assert( typeof webSocketFetch.close === "function", "OpenAI WebSocket fetch must expose close()" diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index e5ac4d3390..2da5ebfd3f 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -1099,7 +1099,9 @@ describe("StreamManager - language model cleanup", () => { state: "streaming", streamResult: { fullStream: (async function* () { + await Promise.resolve(); throw new Error("stream failed before output"); + yield* [] as unknown[]; })(), totalUsage: Promise.resolve({ inputTokens: 1, outputTokens: 0, totalTokens: 1 }), usage: Promise.resolve({ inputTokens: 1, outputTokens: 0, totalTokens: 1 }), From 1a7dccf0da32cf0102a6b62e5825655c24e9a00d Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 13:13:29 +0000 Subject: [PATCH 11/16] Handle Request-form OpenAI WebSocket calls --- .../openAIWebSocketTransportFetch.test.ts | 23 +++++++++++++++++++ .../services/openAIWebSocketTransportFetch.ts | 20 ++++++++++++---- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/src/node/services/openAIWebSocketTransportFetch.test.ts b/src/node/services/openAIWebSocketTransportFetch.test.ts index b9c1dcc2e9..b169f3860c 100644 --- a/src/node/services/openAIWebSocketTransportFetch.test.ts +++ b/src/node/services/openAIWebSocketTransportFetch.test.ts @@ -193,6 +193,28 @@ describe("createOpenAIWebSocketTransportFetch", () => { expect(wsCalls).toEqual([]); }); + test("enabled transport recognizes streaming Responses Request objects", async () => { + const wsCalls: string[] = []; + const transport = createOpenAIWebSocketTransportFetch({ + enabled: true, + baseFetch: createTestFetch(() => Promise.resolve(new Response("base"))), + createWebSocketFetch: () => + createTestWebSocketFetch((input: RequestInfo | URL) => { + wsCalls.push(getFetchInputUrl(input)); + return Promise.resolve(new Response("ws")); + }), + }); + const request = new Request("https://api.openai.com/v1/responses", { + method: "POST", + body: JSON.stringify({ stream: true }), + }); + + const response = await transport.fetch(request); + + expect(await response.text()).toBe("ws"); + expect(wsCalls).toEqual(["https://api.openai.com/v1/responses"]); + }); + test("enabled transport recognizes Responses URLs with query parameters", async () => { const wsCalls: string[] = []; const transport = createOpenAIWebSocketTransportFetch({ @@ -236,6 +258,7 @@ describe("createOpenAIWebSocketTransportFetch", () => { method: "POST", body: JSON.stringify({ stream: true }), }); + await Promise.resolve(); transport.close(); if (!resolveWebSocketFetch) { throw new Error("Expected test WebSocket fetch resolver to be initialized"); diff --git a/src/node/services/openAIWebSocketTransportFetch.ts b/src/node/services/openAIWebSocketTransportFetch.ts index 62bc69c2e9..3184a554b4 100644 --- a/src/node/services/openAIWebSocketTransportFetch.ts +++ b/src/node/services/openAIWebSocketTransportFetch.ts @@ -29,8 +29,12 @@ function getRequestUrl(input: RequestInfo | URL): string { return input.url; } -function isStreamingResponsesRequest(input: RequestInfo | URL, init?: RequestInit): boolean { - if (init?.method?.toUpperCase() !== "POST") { +async function isStreamingResponsesRequest( + input: RequestInfo | URL, + init?: RequestInit +): Promise { + const method = init?.method ?? (input instanceof Request ? input.method : "GET"); + if (method.toUpperCase() !== "POST") { return false; } @@ -38,12 +42,18 @@ function isStreamingResponsesRequest(input: RequestInfo | URL, init?: RequestIni return false; } - if (typeof init?.body !== "string") { + const bodyText = + typeof init?.body === "string" + ? init.body + : init?.body == null && input instanceof Request + ? await input.clone().text() + : undefined; + if (bodyText === undefined) { return false; } try { - const body = JSON.parse(init.body) as { stream?: unknown }; + const body = JSON.parse(bodyText) as { stream?: unknown }; return body.stream === true; } catch { return false; @@ -92,7 +102,7 @@ export function createOpenAIWebSocketTransportFetch( const transportFetch = Object.assign(async (input: RequestInfo | URL, init?: RequestInit) => { // The upstream package falls through to globalThis.fetch for non-WebSocket requests. // Pre-filter here so Mux's existing fetch wrappers keep handling those HTTP paths. - if (!isStreamingResponsesRequest(input, init)) { + if (!(await isStreamingResponsesRequest(input, init))) { return options.baseFetch(input, init); } From 5bb8480111e89715a4a65adbc5813388c6698d04 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 13:21:14 +0000 Subject: [PATCH 12/16] Preserve Request headers for OpenAI WebSocket --- src/node/services/openAIWebSocketTransportFetch.test.ts | 6 +++++- src/node/services/openAIWebSocketTransportFetch.ts | 4 +++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/node/services/openAIWebSocketTransportFetch.test.ts b/src/node/services/openAIWebSocketTransportFetch.test.ts index b169f3860c..0616e501fb 100644 --- a/src/node/services/openAIWebSocketTransportFetch.test.ts +++ b/src/node/services/openAIWebSocketTransportFetch.test.ts @@ -195,17 +195,20 @@ describe("createOpenAIWebSocketTransportFetch", () => { test("enabled transport recognizes streaming Responses Request objects", async () => { const wsCalls: string[] = []; + let webSocketHeaders: Headers | undefined; const transport = createOpenAIWebSocketTransportFetch({ enabled: true, baseFetch: createTestFetch(() => Promise.resolve(new Response("base"))), createWebSocketFetch: () => - createTestWebSocketFetch((input: RequestInfo | URL) => { + createTestWebSocketFetch((input: RequestInfo | URL, init?: RequestInit) => { wsCalls.push(getFetchInputUrl(input)); + webSocketHeaders = new Headers(init?.headers); return Promise.resolve(new Response("ws")); }), }); const request = new Request("https://api.openai.com/v1/responses", { method: "POST", + headers: { Authorization: "Bearer request-key" }, body: JSON.stringify({ stream: true }), }); @@ -213,6 +216,7 @@ describe("createOpenAIWebSocketTransportFetch", () => { expect(await response.text()).toBe("ws"); expect(wsCalls).toEqual(["https://api.openai.com/v1/responses"]); + expect(webSocketHeaders?.get("authorization")).toBe("Bearer request-key"); }); test("enabled transport recognizes Responses URLs with query parameters", async () => { diff --git a/src/node/services/openAIWebSocketTransportFetch.ts b/src/node/services/openAIWebSocketTransportFetch.ts index 3184a554b4..e1f92472ef 100644 --- a/src/node/services/openAIWebSocketTransportFetch.ts +++ b/src/node/services/openAIWebSocketTransportFetch.ts @@ -107,7 +107,9 @@ export function createOpenAIWebSocketTransportFetch( } const activeWebSocketFetch = getWebSocketFetch(); - const headers = new Headers(init?.headers); + const headers = new Headers( + init?.headers ?? (input instanceof Request ? input.headers : undefined) + ); captureAndStripDevToolsHeader(headers); const response = await activeWebSocketFetch(input, { ...(init ?? {}), headers }); if (closeRequested) { From db2f54f479f80f0229facee50c7e4728d15a864f Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 14:04:56 +0000 Subject: [PATCH 13/16] Stabilize WebSocket tests with OpenAI base URL env --- .../services/providerModelFactory.test.ts | 103 +++++++++++------- 1 file changed, 64 insertions(+), 39 deletions(-) diff --git a/src/node/services/providerModelFactory.test.ts b/src/node/services/providerModelFactory.test.ts index 6800ce8dcc..de9c0e54b6 100644 --- a/src/node/services/providerModelFactory.test.ts +++ b/src/node/services/providerModelFactory.test.ts @@ -42,6 +42,27 @@ async function withTempConfig( } } +async function withOpenAIBaseUrlEnvUnset(run: () => Promise): Promise { + const savedBaseUrl = process.env.OPENAI_BASE_URL; + const savedApiBase = process.env.OPENAI_API_BASE; + delete process.env.OPENAI_BASE_URL; + delete process.env.OPENAI_API_BASE; + try { + await run(); + } finally { + if (savedBaseUrl === undefined) { + delete process.env.OPENAI_BASE_URL; + } else { + process.env.OPENAI_BASE_URL = savedBaseUrl; + } + if (savedApiBase === undefined) { + delete process.env.OPENAI_API_BASE; + } else { + process.env.OPENAI_API_BASE = savedApiBase; + } + } +} + async function withTempPolicyProviderFactory( policy: unknown, run: ( @@ -785,22 +806,24 @@ describe("ProviderModelFactory GitHub Copilot", () => { describe("ProviderModelFactory OpenAI WebSocket transport", () => { it("attaches cleanup when enabled for Responses models", async () => { - await withTempConfig(async (config, factory) => { - config.saveProvidersConfig({ - openai: { - apiKey: "sk-test", - webSocketTransportEnabled: true, - }, - }); + await withOpenAIBaseUrlEnvUnset(async () => + withTempConfig(async (config, factory) => { + config.saveProvidersConfig({ + openai: { + apiKey: "sk-test", + webSocketTransportEnabled: true, + }, + }); - const result = await factory.createModel("openai:gpt-4.1-mini"); + const result = await factory.createModel("openai:gpt-4.1-mini"); - expect(result.success).toBe(true); - if (!result.success) { - return; - } - expect(hasLanguageModelCleanup(result.data)).toBe(true); - }); + expect(result.success).toBe(true); + if (!result.success) { + return; + } + expect(hasLanguageModelCleanup(result.data)).toBe(true); + }) + ); }); it("does not attach cleanup for Codex OAuth routed models", async () => { @@ -850,33 +873,35 @@ describe("ProviderModelFactory OpenAI WebSocket transport", () => { }); it("preserves cleanup when DevTools wraps an OpenAI WebSocket model", async () => { - await withTempConfig(async (config) => { - config.saveProvidersConfig({ - openai: { - apiKey: "sk-test", - webSocketTransportEnabled: true, - }, - }); - const providerService = new ProviderService(config); - const devToolsService = { enabled: true } as unknown as DevToolsService; - const factory = new ProviderModelFactory( - config, - providerService, - undefined, - undefined, - devToolsService - ); + await withOpenAIBaseUrlEnvUnset(async () => + withTempConfig(async (config) => { + config.saveProvidersConfig({ + openai: { + apiKey: "sk-test", + webSocketTransportEnabled: true, + }, + }); + const providerService = new ProviderService(config); + const devToolsService = { enabled: true } as unknown as DevToolsService; + const factory = new ProviderModelFactory( + config, + providerService, + undefined, + undefined, + devToolsService + ); - const result = await factory.createModel("openai:gpt-4.1-mini", undefined, { - workspaceId: "devtools-workspace", - }); + const result = await factory.createModel("openai:gpt-4.1-mini", undefined, { + workspaceId: "devtools-workspace", + }); - expect(result.success).toBe(true); - if (!result.success) { - return; - } - expect(hasLanguageModelCleanup(result.data)).toBe(true); - }); + expect(result.success).toBe(true); + if (!result.success) { + return; + } + expect(hasLanguageModelCleanup(result.data)).toBe(true); + }) + ); }); it("does not attach cleanup when Chat Completions is selected", async () => { From 281f29224cce12aa89ba03a17c93ff1366d1be5d Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 14:07:22 +0000 Subject: [PATCH 14/16] Avoid masking WebSocket response on close retry --- .../openAIWebSocketTransportFetch.test.ts | 23 +++++++++++++++++++ .../services/openAIWebSocketTransportFetch.ts | 6 ++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/node/services/openAIWebSocketTransportFetch.test.ts b/src/node/services/openAIWebSocketTransportFetch.test.ts index 0616e501fb..b7e98e02e4 100644 --- a/src/node/services/openAIWebSocketTransportFetch.test.ts +++ b/src/node/services/openAIWebSocketTransportFetch.test.ts @@ -273,6 +273,29 @@ describe("createOpenAIWebSocketTransportFetch", () => { expect(closeCalls).toBe(2); }); + test("close retry failure does not mask a resolved WebSocket response", async () => { + const transport = createOpenAIWebSocketTransportFetch({ + enabled: true, + baseFetch: createTestFetch(() => Promise.resolve(new Response("base"))), + createWebSocketFetch: () => + createTestWebSocketFetch( + () => Promise.resolve(new Response("ws")), + () => { + throw new Error("close failed"); + } + ), + }); + + const responsePromise = transport.fetch("https://api.openai.com/v1/responses", { + method: "POST", + body: JSON.stringify({ stream: true }), + }); + await Promise.resolve(); + expect(() => transport.close()).toThrow("close failed"); + + expect(await (await responsePromise).text()).toBe("ws"); + }); + test("close is idempotent after WebSocket fetch creation", async () => { let closeCalls = 0; const transport = createOpenAIWebSocketTransportFetch({ diff --git a/src/node/services/openAIWebSocketTransportFetch.ts b/src/node/services/openAIWebSocketTransportFetch.ts index e1f92472ef..5ad9d800d8 100644 --- a/src/node/services/openAIWebSocketTransportFetch.ts +++ b/src/node/services/openAIWebSocketTransportFetch.ts @@ -113,7 +113,11 @@ export function createOpenAIWebSocketTransportFetch( captureAndStripDevToolsHeader(headers); const response = await activeWebSocketFetch(input, { ...(init ?? {}), headers }); if (closeRequested) { - activeWebSocketFetch.close(); + try { + activeWebSocketFetch.close(); + } catch { + // Cleanup after a cancellation race must not mask the successful fetch response. + } } return response; }, fetchExtras) as typeof fetch; From 9aa35596d3598c8f881fa5fbca10b4fc0033a990 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 14:51:35 +0000 Subject: [PATCH 15/16] Address remaining WebSocket review coverage --- .../Sections/ProvidersSection.test.tsx | 41 +++++++++ .../Settings/Sections/ProvidersSection.tsx | 7 +- src/node/services/streamManager.test.ts | 84 +++++++++++++++++++ .../services/workspaceTitleGenerator.test.ts | 44 ++++++++++ 4 files changed, 175 insertions(+), 1 deletion(-) diff --git a/src/browser/features/Settings/Sections/ProvidersSection.test.tsx b/src/browser/features/Settings/Sections/ProvidersSection.test.tsx index 8beff1a873..53255a7a0c 100644 --- a/src/browser/features/Settings/Sections/ProvidersSection.test.tsx +++ b/src/browser/features/Settings/Sections/ProvidersSection.test.tsx @@ -357,6 +357,47 @@ describe("ProvidersSection", () => { }); }); + test("clears the OpenAI WebSocket transport preference when toggled off", async () => { + const view = renderProvidersSection(); + view.providersConfig.openai.webSocketTransportEnabled = true; + const openAiButton = await view.findByRole("button", { name: /^OpenAI$/ }); + + fireEvent.click(openAiButton); + + const openAiCard = getProviderCard(openAiButton); + const webSocketToggle = within(openAiCard).getByRole("switch", { + name: /WebSocket transport/i, + }); + + fireEvent.click(webSocketToggle); + + await waitFor(() => { + expect(view.setProviderConfig).toHaveBeenCalledWith({ + provider: "openai", + keyPath: ["webSocketTransportEnabled"], + value: "", + }); + }); + }); + + test("hides the OpenAI WebSocket transport toggle when Codex OAuth is the active default", async () => { + const view = renderProvidersSection(); + view.providersConfig.openai.codexOauthSet = true; + view.providersConfig.openai.apiKeySet = false; + view.providersConfig.openai.webSocketTransportEnabled = true; + const openAiButton = await view.findByRole("button", { name: /^OpenAI$/ }); + + fireEvent.click(openAiButton); + + const openAiCard = getProviderCard(openAiButton); + expect( + within(openAiCard).queryByRole("switch", { + name: /WebSocket transport/i, + }) + ).toBeNull(); + expect(view.providersConfig.openai.webSocketTransportEnabled).toBe(true); + }); + test("hides the OpenAI WebSocket transport toggle when OpenAI uses a custom base URL", async () => { const view = renderProvidersSection(); view.providersConfig.openai.baseUrl = "https://proxy.openai.test/v1"; diff --git a/src/browser/features/Settings/Sections/ProvidersSection.tsx b/src/browser/features/Settings/Sections/ProvidersSection.tsx index 3c22d9d994..3d2c871801 100644 --- a/src/browser/features/Settings/Sections/ProvidersSection.tsx +++ b/src/browser/features/Settings/Sections/ProvidersSection.tsx @@ -2256,10 +2256,15 @@ export function ProvidersSection() { typeof providerInfo?.baseUrlResolved === "string" ? providerInfo.baseUrlResolved.trim() : ""; + const openAICodexOAuthIsDefault = + providerInfo?.codexOauthSet === true && + (providerInfo.apiKeySet !== true || + providerInfo.codexOauthDefaultAuth !== "apiKey"); const openAIWebSocketTransportVisible = openAIWireFormat === "responses" && openAIBaseUrl.length === 0 && - openAIResolvedBaseUrl.length === 0; + openAIResolvedBaseUrl.length === 0 && + !openAICodexOAuthIsDefault; return (
diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index 2da5ebfd3f..bfff10603d 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -1068,6 +1068,90 @@ describe("StreamManager - language model cleanup", () => { expect(cleanupCalls).toBe(1); }); + test("keeps model cleanup until a multi-step tool stream finishes", async () => { + const streamManager = new StreamManager(historyService); + streamManager.on("error", () => undefined); + const workspaceId = "cleanup-multistep-workspace"; + const messageId = "cleanup-multistep-message"; + const historySequence = 1; + let cleanupCalls = 0; + const model = createTestLanguageModel("cleanup-multistep-model"); + attachLanguageModelCleanup(model, () => { + cleanupCalls += 1; + }); + + const appendResult = await historyService.appendToHistory(workspaceId, { + id: messageId, + role: "assistant", + metadata: { historySequence, partial: true }, + parts: [], + }); + expect(appendResult.success).toBe(true); + + const processStreamWithCleanup = Reflect.get(streamManager, "processStreamWithCleanup") as ( + workspaceId: string, + streamInfo: unknown, + historySequence: number + ) => Promise; + expect(typeof processStreamWithCleanup).toBe("function"); + + const streamInfo = { + state: "streaming", + streamResult: { + fullStream: (async function* () { + yield { + type: "tool-call", + toolCallId: "call-1", + toolName: "test_tool", + input: { value: 1 }, + }; + expect(cleanupCalls).toBe(0); + yield { + type: "tool-result", + toolCallId: "call-1", + toolName: "test_tool", + output: { ok: true }, + }; + expect(cleanupCalls).toBe(0); + yield { type: "text-delta", text: "done" }; + expect(cleanupCalls).toBe(0); + yield { type: "finish", finishReason: "stop" }; + })(), + totalUsage: Promise.resolve({ inputTokens: 1, outputTokens: 1, totalTokens: 2 }), + usage: Promise.resolve({ inputTokens: 1, outputTokens: 1, totalTokens: 2 }), + providerMetadata: Promise.resolve(undefined), + steps: Promise.resolve([]), + }, + abortController: new AbortController(), + messageId, + token: "cleanup-multistep-token", + startTime: Date.now(), + lastPartTimestamp: Date.now(), + toolCompletionTimestamps: new Map(), + model: "openai:gpt-4.1-mini", + metadataModel: "openai:gpt-4.1-mini", + historySequence, + request: { model, messages: [], providerOptions: undefined }, + toolModelUsages: [], + parts: [], + lastPartialWriteTime: 0, + partialWritePromise: undefined, + processingPromise: Promise.resolve(), + softInterrupt: { pending: false as const }, + runtimeTempDir: "", + runtime, + cumulativeUsage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, + cumulativeProviderMetadata: undefined, + didRetryPreviousResponseIdAtStep: false, + currentStepStartIndex: 0, + stepTracker: {}, + }; + + await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence); + + expect(cleanupCalls).toBe(1); + }); + test("runs model cleanup when stream processing fails", async () => { const streamManager = new StreamManager(historyService); streamManager.on("error", () => undefined); diff --git a/src/node/services/workspaceTitleGenerator.test.ts b/src/node/services/workspaceTitleGenerator.test.ts index 91fd24afa0..089a6b08b2 100644 --- a/src/node/services/workspaceTitleGenerator.test.ts +++ b/src/node/services/workspaceTitleGenerator.test.ts @@ -133,6 +133,50 @@ describe("generateWorkspaceIdentity cleanup", () => { expect(cleanupCalls).toBe(1); }); + test("cleans up each candidate when title generation retries", async () => { + let firstCleanupCalls = 0; + let secondCleanupCalls = 0; + const firstModel = createTitleModel("title-first-model"); + const secondModel = createTitleModel("title-second-model"); + attachLanguageModelCleanup(firstModel, () => { + firstCleanupCalls += 1; + }); + attachLanguageModelCleanup(secondModel, () => { + secondCleanupCalls += 1; + }); + const aiService = { + createModel: mock((modelString: string) => + Promise.resolve(Ok(modelString.includes("first") ? firstModel : secondModel)) + ), + } as unknown as AIService; + let streamTextCalls = 0; + spyOn(aiSdk, "streamText").mockImplementation((() => { + streamTextCalls += 1; + if (streamTextCalls === 1) { + throw new Error("first candidate failed"); + } + return { + toolResults: Promise.resolve([ + { + dynamic: false, + toolName: "propose_name", + output: { name: "settings", title: "Add setting" }, + }, + ]), + } as unknown as ReturnType; + }) as unknown as typeof aiSdk.streamText); + + const result = await generateWorkspaceIdentity( + "Add setting", + ["openai:first", "openai:second"], + aiService + ); + + expect(result.success).toBe(true); + expect(firstCleanupCalls).toBe(1); + expect(secondCleanupCalls).toBe(1); + }); + test("cleans up when title stream returns no propose_name result", async () => { let cleanupCalls = 0; const model = createTitleModel("title-no-tool-model"); From f81c3e08bac0296cea3b2422c4d038bd2a5c937c Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 6 May 2026 14:55:13 +0000 Subject: [PATCH 16/16] Fix multi-step cleanup test lint --- src/node/services/streamManager.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index bfff10603d..75d562229a 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -1099,6 +1099,7 @@ describe("StreamManager - language model cleanup", () => { state: "streaming", streamResult: { fullStream: (async function* () { + await Promise.resolve(); yield { type: "tool-call", toolCallId: "call-1",