Skip to content

Commit ec5c3ba

Browse files
committed
Fix timeout handling and restore verbose logging
- Fix AbortController integration to properly abort SDK calls on timeout - Increase default timeout from 5 minutes to 1 hour - Restore verbose logging: chunk counts, streaming start/complete, retry attempts, error messages, client discard messages - Add timeout warning at 80% of timeout duration
1 parent a1dcfaf commit ec5c3ba

File tree

1 file changed

+116
-23
lines changed

1 file changed

+116
-23
lines changed

src/server.ts

Lines changed: 116 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ const PORT = process.env['PORT'] ?? 8765;
155155
const DEBUG = process.env['DEBUG'] === 'true' || process.env['DEBUG'] === '1';
156156
const DEFAULT_MODEL = modelsConfig.defaultModel;
157157
const POOL_SIZE = 5;
158-
const REQUEST_TIMEOUT_MS = parseInt(process.env['REQUEST_TIMEOUT_MS'] ?? '300000', 10); // 5 minutes default
158+
const REQUEST_TIMEOUT_MS = parseInt(process.env['REQUEST_TIMEOUT_MS'] ?? '3600000', 10); // 1 hour default
159159
const SHUTDOWN_TIMEOUT_MS = parseInt(process.env['SHUTDOWN_TIMEOUT_MS'] ?? '30000', 10); // 30 seconds default
160160

161161
// Server start time for uptime tracking
@@ -743,7 +743,7 @@ function releaseAuggieClient(modelId: string, client: AuggieClient): void {
743743
}
744744

745745
// Discard a client without returning it to the pool (used when client has errors)
746-
function discardAuggieClient(modelId: string, client: AuggieClient): void {
746+
function discardAuggieClient(modelId: string, client: AuggieClient, reason?: string): void {
747747
const modelConfig = MODEL_MAP[modelId] ?? MODEL_MAP[DEFAULT_MODEL];
748748
if (!modelConfig) return;
749749
const auggieModel = modelConfig.auggie;
@@ -755,7 +755,7 @@ function discardAuggieClient(modelId: string, client: AuggieClient): void {
755755
}
756756
// Close the client without returning to pool
757757
void client.close();
758-
console.log(`Discarded faulty client for ${auggieModel} (session/connection error)`);
758+
console.log(`Discarded faulty client for ${auggieModel} (${reason ?? 'unknown reason'})`);
759759
}
760760

761761
function getModels() {
@@ -1063,10 +1063,23 @@ function createStreamCallback(res: ServerResponse, model: string, requestId: str
10631063
toolCallIndices.clear();
10641064
toolCallCounter = 0;
10651065
1066+
// Track chunks received for diagnostics
1067+
let chunkCount = 0;
1068+
let lastChunkTime = Date.now();
1069+
10661070
return (notification: SessionNotification): void => {
10671071
const update = notification.update;
10681072
const sessionId = notification.sessionId ?? requestId;
10691073
const timestamp = Math.floor(Date.now() / 1000);
1074+
const now = Date.now();
1075+
const timeSinceLastChunk = now - lastChunkTime;
1076+
lastChunkTime = now;
1077+
chunkCount++;
1078+
1079+
// Log chunk receipt for diagnostics (only every 10 chunks to reduce noise)
1080+
if (chunkCount === 1 || chunkCount % 10 === 0) {
1081+
console.log(`[${requestId}] 📦 Chunk #${String(chunkCount)} (${update.sessionUpdate}) +${String(timeSinceLastChunk)}ms`);
1082+
}
10701083
10711084
debugLog(`Stream Update [${requestId}]`, {
10721085
sessionId,
@@ -1082,9 +1095,10 @@ function createStreamCallback(res: ServerResponse, model: string, requestId: str
10821095
10831096
switch (update.sessionUpdate) {
10841097
case 'user_message_chunk':
1085-
// Echo user message chunks (useful for multi-turn visibility)
1086-
if (update.content?.type === 'text' && update.content.text) {
1087-
console.log(`[${requestId}] 👤 User: ${update.content.text.substring(0, 50)}...`);
1098+
// SDK echoes user input back - only log in debug mode to reduce noise
1099+
// Do NOT stream this to client - it would cause prompt leakage
1100+
if (DEBUG && update.content?.type === 'text' && update.content.text) {
1101+
console.log(`[${requestId}] 👤 User echo (ignored): ${update.content.text.substring(0, 50)}...`);
10881102
}
10891103
break;
10901104
@@ -1240,14 +1254,39 @@ async function callAugmentAPIStreamingInternal(
12401254
modelId: string,
12411255
res: ServerResponse,
12421256
requestId: string,
1243-
model: string
1257+
model: string,
1258+
abortSignal?: AbortSignal
12441259
): Promise<void> {
1260+
const startTime = Date.now();
1261+
console.log(`[${requestId}] 🚀 Starting streaming call to ${modelId} (prompt: ${String(prompt.length)} chars)`);
1262+
12451263
const client = await getAuggieClient(modelId);
12461264
client.onSessionUpdate(createStreamCallback(res, model, requestId));
12471265
let hasError = false;
12481266
let caughtError: Error | null = null;
1267+
1268+
// Create a promise that rejects when abort signal fires
1269+
const abortPromise = abortSignal
1270+
? new Promise<never>((_, reject) => {
1271+
if (abortSignal.aborted) {
1272+
reject(new Error('Request aborted'));
1273+
return;
1274+
}
1275+
abortSignal.addEventListener('abort', () => {
1276+
reject(new Error('Request aborted'));
1277+
});
1278+
})
1279+
: null;
1280+
12491281
try {
1250-
const response = await client.prompt(prompt);
1282+
// Race between the actual prompt and abort signal
1283+
console.log(`[${requestId}] 📤 Sending prompt to SDK...`);
1284+
const promptPromise = client.prompt(prompt);
1285+
const response = abortPromise
1286+
? await Promise.race([promptPromise, abortPromise])
1287+
: await promptPromise;
1288+
console.log(`[${requestId}] SDK call completed in ${String(Date.now() - startTime)}ms`);
1289+
12511290
// Check if SDK returned an error as a response string (can happen even in streaming mode)
12521291
const errorCheck = isSDKErrorResponse(response);
12531292
if (errorCheck.isError) {
@@ -1259,9 +1298,16 @@ async function callAugmentAPIStreamingInternal(
12591298
caughtError = err as Error;
12601299
} finally {
12611300
client.onSessionUpdate(null);
1262-
// Discard client on session errors, otherwise return to pool
1263-
if (hasError && caughtError && isSessionError(caughtError)) {
1264-
discardAuggieClient(modelId, client);
1301+
// Discard client on session errors or aborts, otherwise return to pool
1302+
if (hasError && caughtError) {
1303+
if (caughtError.message === 'Request aborted') {
1304+
discardAuggieClient(modelId, client, 'request aborted/timeout');
1305+
} else if (isSessionError(caughtError)) {
1306+
discardAuggieClient(modelId, client, 'session/connection error');
1307+
} else {
1308+
// Other errors - still return client to pool
1309+
releaseAuggieClient(modelId, client);
1310+
}
12651311
} else {
12661312
releaseAuggieClient(modelId, client);
12671313
}
@@ -1276,22 +1322,46 @@ async function callAugmentAPIStreaming(
12761322
modelId: string,
12771323
res: ServerResponse,
12781324
requestId: string,
1279-
model: string
1325+
model: string,
1326+
abortSignal?: AbortSignal
12801327
): Promise<void> {
12811328
await withRetry(
1282-
() => callAugmentAPIStreamingInternal(prompt, modelId, res, requestId, model),
1329+
() => callAugmentAPIStreamingInternal(prompt, modelId, res, requestId, model, abortSignal),
12831330
'Augment API Streaming',
12841331
requestId
12851332
);
12861333
}
12871334
1288-
async function callAugmentAPIInternal(prompt: string, modelId: string): Promise<string> {
1335+
async function callAugmentAPIInternal(
1336+
prompt: string,
1337+
modelId: string,
1338+
abortSignal?: AbortSignal
1339+
): Promise<string> {
12891340
const client = await getAuggieClient(modelId);
12901341
let hasError = false;
12911342
let caughtError: Error | null = null;
12921343
let result = '';
1344+
1345+
// Create a promise that rejects when abort signal fires
1346+
const abortPromise = abortSignal
1347+
? new Promise<never>((_, reject) => {
1348+
if (abortSignal.aborted) {
1349+
reject(new Error('Request aborted'));
1350+
return;
1351+
}
1352+
abortSignal.addEventListener('abort', () => {
1353+
reject(new Error('Request aborted'));
1354+
});
1355+
})
1356+
: null;
1357+
12931358
try {
1294-
const response = await client.prompt(prompt);
1359+
// Race between the actual prompt and abort signal
1360+
const promptPromise = client.prompt(prompt);
1361+
const response = abortPromise
1362+
? await Promise.race([promptPromise, abortPromise])
1363+
: await promptPromise;
1364+
12951365
// Check if SDK returned an error as a response string
12961366
const errorCheck = isSDKErrorResponse(response);
12971367
if (errorCheck.isError) {
@@ -1304,9 +1374,16 @@ async function callAugmentAPIInternal(prompt: string, modelId: string): Promise<
13041374
hasError = true;
13051375
caughtError = err as Error;
13061376
} finally {
1307-
// Discard client on session errors, otherwise return to pool
1308-
if (hasError && caughtError && isSessionError(caughtError)) {
1309-
discardAuggieClient(modelId, client);
1377+
// Discard client on session errors or aborts, otherwise return to pool
1378+
if (hasError && caughtError) {
1379+
if (caughtError.message === 'Request aborted') {
1380+
discardAuggieClient(modelId, client, 'request aborted/timeout');
1381+
} else if (isSessionError(caughtError)) {
1382+
discardAuggieClient(modelId, client, 'session/connection error');
1383+
} else {
1384+
// Other errors - still return client to pool
1385+
releaseAuggieClient(modelId, client);
1386+
}
13101387
} else {
13111388
releaseAuggieClient(modelId, client);
13121389
}
@@ -1317,8 +1394,17 @@ async function callAugmentAPIInternal(prompt: string, modelId: string): Promise<
13171394
return result;
13181395
}
13191396
1320-
async function callAugmentAPI(prompt: string, modelId: string, requestId: string): Promise<string> {
1321-
return withRetry(() => callAugmentAPIInternal(prompt, modelId), 'Augment API', requestId);
1397+
async function callAugmentAPI(
1398+
prompt: string,
1399+
modelId: string,
1400+
requestId: string,
1401+
abortSignal?: AbortSignal
1402+
): Promise<string> {
1403+
return withRetry(
1404+
() => callAugmentAPIInternal(prompt, modelId, abortSignal),
1405+
'Augment API',
1406+
requestId
1407+
);
13221408
}
13231409
13241410
async function handleChatCompletions(req: IncomingMessage, res: ServerResponse): Promise<void> {
@@ -1341,7 +1427,13 @@ async function handleChatCompletions(req: IncomingMessage, res: ServerResponse):
13411427
const abortController = new AbortController();
13421428
activeRequests.set(requestId, abortController);
13431429
1344-
// Set up request timeout
1430+
// Set up request timeout with warning
1431+
const TIMEOUT_WARNING_MS = Math.min(REQUEST_TIMEOUT_MS * 0.8, REQUEST_TIMEOUT_MS - 30000); // Warn at 80% or 30s before
1432+
const warningTimeoutId = setTimeout(() => {
1433+
const elapsed = Date.now() - startTime;
1434+
console.warn(`[WARN] [${requestId}] Request approaching timeout (elapsed: ${String(elapsed)}ms, timeout: ${String(REQUEST_TIMEOUT_MS)}ms)`);
1435+
}, TIMEOUT_WARNING_MS);
1436+
13451437
const timeoutId = setTimeout(() => {
13461438
structuredLog('warn', 'Request', 'Request timeout', {
13471439
requestId,
@@ -1359,6 +1451,7 @@ async function handleChatCompletions(req: IncomingMessage, res: ServerResponse):
13591451
});
13601452
13611453
const cleanup = (success: boolean, errorType?: string) => {
1454+
clearTimeout(warningTimeoutId);
13621455
clearTimeout(timeoutId);
13631456
activeRequests.delete(requestId);
13641457
metrics.activeRequests--;
@@ -1440,7 +1533,7 @@ async function handleChatCompletions(req: IncomingMessage, res: ServerResponse):
14401533
res.flushHeaders();
14411534
14421535
try {
1443-
await callAugmentAPIStreaming(prompt, model, res, requestId, model);
1536+
await callAugmentAPIStreaming(prompt, model, res, requestId, model, abortController.signal);
14441537
res.write(createStreamChunk('', model, true));
14451538
res.write('data: [DONE]\n\n');
14461539
cleanup(true);
@@ -1454,7 +1547,7 @@ async function handleChatCompletions(req: IncomingMessage, res: ServerResponse):
14541547
}
14551548
res.end();
14561549
} else {
1457-
const response = await callAugmentAPI(prompt, model, requestId);
1550+
const response = await callAugmentAPI(prompt, model, requestId, abortController.signal);
14581551
res.writeHead(200, { 'Content-Type': 'application/json' });
14591552
res.end(JSON.stringify(createChatResponse(response, model, prompt)));
14601553
cleanup(true);

0 commit comments

Comments
 (0)