Skip to content

Commit cfa0ba0

Browse files
committed
🐛 fix(streaming): prevent request aborted errors on long sessions
- Add SSE keepalive pings every 15s to prevent TCP/proxy/client timeouts during long tool executions where no data flows to the client - Remove withRetry for streaming to prevent duplicate/corrupted output when retrying after partial data has already been sent - Guard all res.write/res.end calls with safeWrite helper to prevent errors when writing to a closed/destroyed connection - Disable socket and request timeouts on streaming connections (managed per-request via AbortController instead) - Enable TCP keepalive and setNoDelay on streaming sockets - Use { once: true } on abort signal listener to prevent memory leaks
1 parent 3ab2763 commit cfa0ba0

File tree

1 file changed

+50
-16
lines changed

1 file changed

+50
-16
lines changed

src/server.ts

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,6 +1077,14 @@ interface StreamCallbackResult {
10771077
callback: (notification: SessionNotification) => void;
10781078
}
10791079
1080+
// Safe write helper - guards against writing to a destroyed/closed response
1081+
function safeWrite(res: ServerResponse, data: string): boolean {
1082+
if (!res.destroyed && res.writable) {
1083+
return res.write(data);
1084+
}
1085+
return false;
1086+
}
1087+
10801088
function createStreamCallback(res: ServerResponse, model: string, requestId: string): StreamCallbackResult {
10811089
// Reset tool call tracking for this request
10821090
toolCallIndices.clear();
@@ -1142,7 +1150,7 @@ function createStreamCallback(res: ServerResponse, model: string, requestId: str
11421150
},
11431151
],
11441152
};
1145-
res.write(`data: ${JSON.stringify(textChunk)}\n\n`);
1153+
safeWrite(res, `data: ${JSON.stringify(textChunk)}\n\n`);
11461154
}
11471155
break;
11481156
@@ -1172,7 +1180,7 @@ function createStreamCallback(res: ServerResponse, model: string, requestId: str
11721180
},
11731181
],
11741182
};
1175-
res.write(`data: ${JSON.stringify(thoughtChunk)}\n\n`);
1183+
safeWrite(res, `data: ${JSON.stringify(thoughtChunk)}\n\n`);
11761184
}
11771185
break;
11781186
@@ -1310,16 +1318,23 @@ async function callAugmentAPIStreamingInternal(
13101318
let hasError = false;
13111319
let caughtError: Error | null = null;
13121320
1321+
// Send SSE keepalive comments every 15 seconds to prevent connection timeouts
1322+
// during long tool executions where no data is streamed to the client
1323+
const keepaliveInterval = setInterval(() => {
1324+
if (safeWrite(res, ':keepalive\n\n')) {
1325+
structuredLog('debug', 'Keepalive', 'Sent SSE keepalive ping', { requestId });
1326+
}
1327+
}, 15000);
1328+
13131329
// Create a promise that rejects when abort signal fires
13141330
const abortPromise = abortSignal
13151331
? new Promise<never>((_, reject) => {
13161332
if (abortSignal.aborted) {
13171333
reject(new Error('Request aborted'));
13181334
return;
13191335
}
1320-
abortSignal.addEventListener('abort', () => {
1321-
reject(new Error('Request aborted'));
1322-
});
1336+
const onAbort = () => { reject(new Error('Request aborted')); };
1337+
abortSignal.addEventListener('abort', onAbort, { once: true });
13231338
})
13241339
: null;
13251340
@@ -1342,6 +1357,7 @@ async function callAugmentAPIStreamingInternal(
13421357
hasError = true;
13431358
caughtError = err as Error;
13441359
} finally {
1360+
clearInterval(keepaliveInterval);
13451361
client.onSessionUpdate(null);
13461362
// Discard client on session errors or aborts, otherwise return to pool
13471363
if (hasError && caughtError) {
@@ -1371,11 +1387,9 @@ async function callAugmentAPIStreaming(
13711387
workspaceRoot?: string,
13721388
abortSignal?: AbortSignal
13731389
): Promise<void> {
1374-
await withRetry(
1375-
() => callAugmentAPIStreamingInternal(prompt, modelId, res, requestId, model, workspaceRoot, abortSignal),
1376-
'Augment API Streaming',
1377-
requestId
1378-
);
1390+
// Do NOT use withRetry for streaming - retrying after partial data has been
1391+
// sent to the client would cause duplicate/corrupted output
1392+
await callAugmentAPIStreamingInternal(prompt, modelId, res, requestId, model, workspaceRoot, abortSignal);
13791393
}
13801394
13811395
async function callAugmentAPIInternal(
@@ -1575,6 +1589,17 @@ async function handleChatCompletions(req: IncomingMessage, res: ServerResponse):
15751589
}
15761590
15771591
if (stream) {
1592+
// Disable socket timeout for streaming connections to prevent
1593+
// Node.js from closing long-running SSE connections
1594+
req.setTimeout(0);
1595+
res.setTimeout(0);
1596+
1597+
// Enable TCP keepalive to prevent OS/network-level connection drops
1598+
if (req.socket) {
1599+
req.socket.setKeepAlive(true, 30000);
1600+
req.socket.setNoDelay(true);
1601+
}
1602+
15781603
// Disable response buffering for real-time streaming
15791604
res.writeHead(200, {
15801605
'Content-Type': 'text/event-stream',
@@ -1597,18 +1622,22 @@ async function handleChatCompletions(req: IncomingMessage, res: ServerResponse):
15971622
system_fingerprint: SYSTEM_FINGERPRINT,
15981623
choices: [{ index: 0, delta: {}, finish_reason: 'stop', logprobs: null }],
15991624
};
1600-
res.write(`data: ${JSON.stringify(stopChunk)}\n\n`);
1601-
res.write('data: [DONE]\n\n');
1625+
safeWrite(res, `data: ${JSON.stringify(stopChunk)}\n\n`);
1626+
safeWrite(res, 'data: [DONE]\n\n');
16021627
cleanup(true);
16031628
} catch (err) {
16041629
const error = err as Error;
16051630
structuredLog('error', 'Request', 'Streaming error', { requestId, data: error.message });
1606-
// Send OpenAI-compatible error in stream format
1607-
const openAIError = createOpenAIError(error);
1608-
res.write(`data: ${JSON.stringify(openAIError)}\n\n`);
1631+
// Send OpenAI-compatible error in stream format (only if connection is still open)
1632+
if (!res.destroyed && res.writable) {
1633+
const openAIError = createOpenAIError(error);
1634+
safeWrite(res, `data: ${JSON.stringify(openAIError)}\n\n`);
1635+
}
16091636
cleanup(false, error.name);
16101637
}
1611-
res.end();
1638+
if (!res.destroyed) {
1639+
res.end();
1640+
}
16121641
} else {
16131642
const response = await callAugmentAPI(prompt, model, requestId, workspaceRoot ?? undefined, abortController.signal);
16141643
res.writeHead(200, { 'Content-Type': 'application/json' });
@@ -1878,6 +1907,11 @@ const server = http.createServer((req, res) => {
18781907
server.keepAliveTimeout = 60000;
18791908
// Ensure headers timeout is greater than keep-alive timeout
18801909
server.headersTimeout = 65000;
1910+
// Disable socket timeout entirely - streaming SSE connections can be very long-lived
1911+
// and we manage timeouts per-request via AbortController instead
1912+
server.timeout = 0;
1913+
// Disable request timeout to prevent Node.js from closing long-lived connections
1914+
server.requestTimeout = 0;
18811915

18821916
// Graceful shutdown handler
18831917
async function gracefulShutdown(signal: string): Promise<void> {

0 commit comments

Comments
 (0)