Skip to content

Commit 9ca890d

Browse files
committed
fix: make passthrough stream format-aware for Responses SSE (#186)
Passthrough mode now detects Responses SSE payloads (parsed.type starts with 'response.') and skips Chat Completions-specific sanitization: - sanitizeStreamingChunk() only runs on Chat Completions payloads - fixInvalidId() and hasValuableContent() checks skipped for Responses - Usage extraction still runs for both formats - Content length tracking adapted for Responses delta format This prevents potential stream corruption when Responses SSE data triggers idFixed or other Chat Completions-specific rewrite conditions.
1 parent f350f9d commit 9ca890d

File tree

1 file changed

+57
-37
lines changed

1 file changed

+57
-37
lines changed

open-sse/utils/stream.ts

Lines changed: 57 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -136,51 +136,71 @@ export function createSSEStream(options: any = {}) {
136136
try {
137137
let parsed = JSON.parse(trimmed.slice(5).trim());
138138

139-
// Sanitize: strip non-standard fields for OpenAI SDK compatibility
140-
parsed = sanitizeStreamingChunk(parsed);
139+
// Detect Responses SSE payloads (have a `type` field like "response.created",
140+
// "response.output_item.added", etc.) and skip Chat Completions-specific
141+
// sanitization to avoid corrupting the stream for Responses-native clients.
142+
const isResponsesSSE =
143+
parsed.type &&
144+
typeof parsed.type === "string" &&
145+
parsed.type.startsWith("response.");
146+
147+
if (isResponsesSSE) {
148+
// Responses SSE: only extract usage, forward payload as-is
149+
const extracted = extractUsage(parsed);
150+
if (extracted) {
151+
usage = extracted;
152+
}
153+
// Track content length from Responses format
154+
if (parsed.delta && typeof parsed.delta === "string") {
155+
totalContentLength += parsed.delta.length;
156+
}
157+
} else {
158+
// Chat Completions: full sanitization pipeline
159+
parsed = sanitizeStreamingChunk(parsed);
141160

142-
const idFixed = fixInvalidId(parsed);
161+
const idFixed = fixInvalidId(parsed);
143162

144-
if (!hasValuableContent(parsed, FORMATS.OPENAI)) {
145-
continue;
146-
}
163+
if (!hasValuableContent(parsed, FORMATS.OPENAI)) {
164+
continue;
165+
}
147166

148-
const delta = parsed.choices?.[0]?.delta;
167+
const delta = parsed.choices?.[0]?.delta;
149168

150-
// Extract <think> tags from streaming content
151-
if (delta?.content && typeof delta.content === "string") {
152-
const { content, thinking } = extractThinkingFromContent(delta.content);
153-
delta.content = content;
154-
if (thinking && !delta.reasoning_content) {
155-
delta.reasoning_content = thinking;
169+
// Extract <think> tags from streaming content
170+
if (delta?.content && typeof delta.content === "string") {
171+
const { content, thinking } = extractThinkingFromContent(delta.content);
172+
delta.content = content;
173+
if (thinking && !delta.reasoning_content) {
174+
delta.reasoning_content = thinking;
175+
}
156176
}
157-
}
158177

159-
const content = delta?.content || delta?.reasoning_content;
160-
if (content && typeof content === "string") {
161-
totalContentLength += content.length;
162-
}
178+
const content = delta?.content || delta?.reasoning_content;
179+
if (content && typeof content === "string") {
180+
totalContentLength += content.length;
181+
}
163182

164-
const extracted = extractUsage(parsed);
165-
if (extracted) {
166-
usage = extracted;
167-
}
183+
const extracted = extractUsage(parsed);
184+
if (extracted) {
185+
usage = extracted;
186+
}
168187

169-
const isFinishChunk = parsed.choices?.[0]?.finish_reason;
170-
if (isFinishChunk && !hasValidUsage(parsed.usage)) {
171-
const estimated = estimateUsage(body, totalContentLength, FORMATS.OPENAI);
172-
parsed.usage = filterUsageForFormat(estimated, FORMATS.OPENAI);
173-
output = `data: ${JSON.stringify(parsed)}\n`;
174-
usage = estimated;
175-
injectedUsage = true;
176-
} else if (isFinishChunk && usage) {
177-
const buffered = addBufferToUsage(usage);
178-
parsed.usage = filterUsageForFormat(buffered, FORMATS.OPENAI);
179-
output = `data: ${JSON.stringify(parsed)}\n`;
180-
injectedUsage = true;
181-
} else if (idFixed) {
182-
output = `data: ${JSON.stringify(parsed)}\n`;
183-
injectedUsage = true;
188+
const isFinishChunk = parsed.choices?.[0]?.finish_reason;
189+
if (isFinishChunk && !hasValidUsage(parsed.usage)) {
190+
const estimated = estimateUsage(body, totalContentLength, FORMATS.OPENAI);
191+
parsed.usage = filterUsageForFormat(estimated, FORMATS.OPENAI);
192+
output = `data: ${JSON.stringify(parsed)}\n`;
193+
usage = estimated;
194+
injectedUsage = true;
195+
} else if (isFinishChunk && usage) {
196+
const buffered = addBufferToUsage(usage);
197+
parsed.usage = filterUsageForFormat(buffered, FORMATS.OPENAI);
198+
output = `data: ${JSON.stringify(parsed)}\n`;
199+
injectedUsage = true;
200+
} else if (idFixed) {
201+
output = `data: ${JSON.stringify(parsed)}\n`;
202+
injectedUsage = true;
203+
}
184204
}
185205
} catch {}
186206
}

0 commit comments

Comments
 (0)