diff --git a/super-legal-mcp-refactored/CHANGELOG.md b/super-legal-mcp-refactored/CHANGELOG.md index e9e801a7a..0ecabf63d 100644 --- a/super-legal-mcp-refactored/CHANGELOG.md +++ b/super-legal-mcp-refactored/CHANGELOG.md @@ -2,21 +2,52 @@ All notable changes to the Super Legal MCP Server are documented in this file. -## [4.7.0] - 2026-03-12 +## [4.8.0] - 2026-03-12 -### Added — Exa-Powered Web Tool Replacement +### Hardened — SSE Streaming Resilience (deploy-week-4) -Replaces Anthropic's `WebFetch` + `WebSearch` with Exa-powered MCP tools (`fetch_document`, `exa_web_search`). Gated by `EXA_WEB_TOOLS=false` (default OFF). Zero behavior change when disabled. 150/150 tests pass in both flag states. +Production-readiness hardening for the SSE streaming path in `claude-sdk-server.js` and frontend reconnection in `app.js`. All changes are failure-path guards — zero behavioral change on happy path. -- **`fetch_document`**: Two-phase MCP tool — direct HTTP first, Exa `/contents` fallback on 403/timeout. Preview mode (`mode: 'preview'`) returns highlights + AI summary (~2-4K chars); full mode returns complete text with verbosity control (`compact`/`standard`/`full`). -- **`exa_web_search`**: MCP tool wrapping Exa deep search with highlights, date filtering, category support. -- **`DirectFetchHybridClient`**: Cache isolation (preview vs full keys), metrics (preview/direct/exa hit rates), rate limiter integration, 15s/30s/60s tiered timeouts. -- **STANDARD_TOOLS**: `WebFetch`/`WebSearch` conditionally removed from all 5 tool arrays when flag enabled. -- **Domain MCP servers**: `direct-fetch` + `exa-search` domains appended to all 21 research subagent domain maps. -- **Agent prompts**: 63+ tool name references use dynamic `${FETCH_TOOL_NAME}`/`${SEARCH_TOOL_NAME}` — zero hardcoded leaks verified by audit. -- **Observability**: `exaWebSearches` + `fetchDocumentCalls` counters in sdkHooks → hookSSEBridge → frontend. Math.max guards on 4 `streamStats.webSearches` update sites prevent orchestrator overwrite. -- **Frontend**: Combined web search counter, per-agent card chips, Doc Preview/Document Fetch timeline events with source tags. -- **Tests**: 4 new test files (direct-fetch-hybrid unit + live, exa-web-search unit). 5 existing test files updated with flag-conditional assertions. +**`send()` rewrite (Tasks 1 + 2 + 5):** +- `res.destroyed` / `ended` guard prevents writes to closed connections +- try/catch wrapper returns `false` instead of throwing on write failure +- Backpressure: non-critical events silently dropped when `res.writableLength > 1MB` +- `{ critical: true }` option on 5 terminal sends (`final`, `error`, `session_timeout`) — always attempt delivery +- Heartbeat `res.write(':\n\n')` independently guarded with same checks + +**Client disconnect + session timeout (Tasks 3 + 4):** +- `clientDisconnected` flag set on `res.on('close')` — checked at top of all 3 `for await` loops (P0, main, legacy) +- 4-hour session timeout (configurable via `SDK_MAX_SESSION_DURATION_MS`) — matches GCE HTTPS LB backend timeout +- `end()` clears both heartbeat interval and session timeout; `activeStreamCount` decremented exactly once + +**Frontend reconnection (Task 6):** +- EventSource `onerror` handler: linear backoff reconnection (3s, 6s, 9s), max 3 attempts +- Timeline events for connection status (`warning` on retry, `error` on exhaustion) +- Counter reset on successful `final` event + +**Health check enhancement (Task 7):** +- DB probe: `SELECT 1` with 3-second `Promise.race` timeout — prevents hung connections from blocking health checks +- Memory metrics: `heap_used_mb`, `heap_total_mb`, `rss_mb` from `process.memoryUsage()` +- `active_streams` count, `uptime_seconds` +- 503 triggers: circuit breaker OPEN, DB unreachable, RSS > 6400 MB (80% of 8GB GCE e2-standard-2) + +**Express error handlers (Task 8):** +- 404 handler: `{ error: "Not found: GET /nonexistent" }` — placed after all routes, before `app.listen()` +- Global error handler: multer `LIMIT_FILE_SIZE` → 413, `MulterError` → 400, generic → 500 + +| File | Changes | +|------|---------| +| `src/server/claude-sdk-server.js` | send() rewrite, heartbeat guard, clientDisconnected, sessionTimeout, end() update, 3x for-await guards, activeStreamCount, /health rewrite, 404+error handlers, 5 critical send() flags | +| `test/react-frontend/app.js` | reconnectAttempts vars, EventSource onerror reconnection, counter reset on final | + +**Zero new files. Zero dependency changes. Zero config changes.** + +**Deployment status (Weeks 1-4):** +- Week 1: Runtime fixes, AsyncLocalStorage, error taxonomy — merged +- Week 2: Env validation, Dockerfile, .dockerignore, pool config — merged +- Week 3: Authentication, Cloud Run→GCE infrastructure — merged +- Week 4: SSE hardening — **this release** +- Next: Staging deployment validation (smoke tests, 24h health monitoring) --- diff --git a/super-legal-mcp-refactored/README.md b/super-legal-mcp-refactored/README.md index 755c66450..1d3065f17 100644 --- a/super-legal-mcp-refactored/README.md +++ b/super-legal-mcp-refactored/README.md @@ -150,6 +150,22 @@ Key design decisions: - **Slim shared context**: Specialists receive only STATUS CODE REFERENCE + TOOL CAPABILITY MATRIX (~62 lines vs ~793 in full shared) - **Rollback**: `USE_SPLIT_PROMPTS=false` reverts all agents to the legacy monolithic `memorandum.md` +### SSE Streaming Resilience (v4.8.0) + +Production hardening for the SSE streaming path. All guards activate only under failure — zero happy-path behavioral change. + +| Protection | Mechanism | Config | +|-----------|-----------|--------| +| **Write guard** | `res.destroyed` / `ended` check + try/catch on every `send()` | — | +| **Backpressure** | Non-critical events dropped when `res.writableLength > 1MB` | — | +| **Critical events** | `final`, `error`, `session_timeout` always attempt delivery | — | +| **Session timeout** | Graceful close after max duration | `SDK_MAX_SESSION_DURATION_MS` (default: 4h) | +| **Disconnect detection** | `clientDisconnected` flag breaks all 3 `for await` loops | — | +| **Frontend reconnect** | EventSource linear backoff (3s, 6s, 9s), max 3 attempts | — | +| **Health probes** | DB `SELECT 1` (3s timeout), RSS memory, active stream count | — | +| **503 auto-heal** | Circuit breaker OPEN, DB down, or RSS > 6.4GB → MIG restart | — | +| **Error handlers** | 404 JSON + global Express handler (multer 413, generic 500) | — | + ### Phase 4 SDK Cutover (Production) - Default feature flags now route 100% traffic to the SDK Tool Runner (`SDK_TOOL_RUNNER=true`, `CANARY_PCT=100`, `STRUCTURED_OUTPUTS=true`). - For staged rollout, adjust `CANARY_PCT` (50 → 75 → 100) and monitor parity/latency before advancing. diff --git a/super-legal-mcp-refactored/src/server/claude-sdk-server.js b/super-legal-mcp-refactored/src/server/claude-sdk-server.js index 69f98c95e..793874aff 100644 --- a/super-legal-mcp-refactored/src/server/claude-sdk-server.js +++ b/super-legal-mcp-refactored/src/server/claude-sdk-server.js @@ -332,6 +332,8 @@ const anthropicBreaker = new CircuitBreaker({ timeoutMs: Number(process.env.SDK_BREAKER_TIMEOUT_MS || 60000) }); +let activeStreamCount = 0; + // Build rate limiters for API clients function createRateLimiters() { const limiters = new Map(); @@ -490,10 +492,34 @@ app.get('/health', async (req, res) => { threshold: anthropicBreaker?.failureThreshold ?? null }; - // Anthropic reachability probe is skipped in sandboxed/offline environments - const anthropicStatus = process.env.ANTHROPIC_API_KEY ? 'not_checked' : 'missing_api_key'; + // DB probe + let dbStatus; + try { + const pool = getPool(); + if (pool) { + const start = Date.now(); + const dbTimeout = new Promise((_, reject) => setTimeout(() => reject(new Error('DB probe timeout (3s)')), 3000)); + await Promise.race([pool.query('SELECT 1'), dbTimeout]); + dbStatus = { status: 'ok', latency_ms: Date.now() - start }; + } else { + dbStatus = { status: 'not_configured' }; + } + } catch (err) { + dbStatus = { status: 'error', error: err.message }; + } - const healthy = breakerStatus.state !== 'OPEN'; + // Memory metrics + const mem = process.memoryUsage(); + const memory = { + heap_used_mb: Math.round(mem.heapUsed / 1024 / 1024), + heap_total_mb: Math.round(mem.heapTotal / 1024 / 1024), + rss_mb: Math.round(mem.rss / 1024 / 1024), + }; + + const dbDown = dbStatus.status === 'error'; + const memoryPressure = memory.rss_mb > 6400; // 80% of 8GB (GCE e2-standard-2) + const breakerOpen = breakerStatus.state === 'OPEN'; + const healthy = !breakerOpen && !dbDown && !memoryPressure; res.status(healthy ? 200 : 503).json({ ok: healthy, @@ -502,10 +528,14 @@ app.get('/health', async (req, res) => { sdk_version: SDK_VERSION, model: MODEL, timestamp: now, + uptime_seconds: Math.round(process.uptime()), + active_streams: activeStreamCount, + memory, build, feature_flags: flags, dependencies: { - anthropic_api: anthropicStatus, + anthropic_api: process.env.ANTHROPIC_API_KEY ? 'not_checked' : 'missing_api_key', + database: dbStatus, rate_limiter: rateLimiterStatus, circuit_breaker: breakerStatus } @@ -963,21 +993,50 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => { }); res.flushHeaders?.(); - const send = (obj) => res.write(`data: ${JSON.stringify(obj)}\n\n`); - const heartbeat = setInterval(() => res.write(':\n\n'), 10000); + const send = (obj, { critical = false } = {}) => { + if (res.destroyed || ended) return false; + // Backpressure: drop non-critical events when buffer > 1MB + if (!critical && res.writableLength > 1_048_576) return false; + try { + return res.write(`data: ${JSON.stringify(obj)}\n\n`); + } catch { + return false; + } + }; + const heartbeat = setInterval(() => { + if (!res.destroyed && !ended) { + try { res.write(':\n\n'); } catch {} + } + }, 10000); + + let clientDisconnected = false; + const MAX_SESSION_DURATION_MS = Number(process.env.SDK_MAX_SESSION_DURATION_MS || 4 * 60 * 60 * 1000); // 4 hours (matches GCE HTTPS LB backend timeout) + const sessionTimeout = setTimeout(() => { + console.warn('[Stream] Session duration limit reached'); + send({ type: 'session_timeout', message: 'Maximum session duration reached. Session closing gracefully.' }, { critical: true }); + clientDisconnected = true; + end(); + }, MAX_SESSION_DURATION_MS); let ended = false; + activeStreamCount++; const end = () => { if (ended) return; ended = true; + activeStreamCount--; clearInterval(heartbeat); + clearTimeout(sessionTimeout); try { res.end(); } catch {} }; // Only listen to response close, NOT request close // For POST requests, req 'close' fires after body is consumed (before streaming completes) // res 'close' correctly fires when client disconnects - res.on('close', end); + res.on('close', () => { + clientDisconnected = true; + console.log('[Stream] Client disconnected for session'); + end(); + }); send({ type: 'system_info', @@ -1159,6 +1218,10 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => { settingSources: [], } })) { + if (clientDisconnected) { + console.log('[Stream] Client disconnected, breaking P0 loop'); + break; + } // Forward P0 stream events to SSE — tool visibility + text deltas (thinking suppressed) if (message.type === 'stream_event') { if (message.event?.type === 'content_block_start') { @@ -1306,6 +1369,11 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => { allowedTools: ORCHESTRATOR_ALLOWED_TOOLS, } })) { + if (clientDisconnected) { + console.log('[Stream] Client disconnected, breaking main loop'); + break; + } + // Debug logging (reduced verbosity for content_block_delta) if (message.type !== 'stream_event' || (message.event?.type !== 'content_block_delta')) { @@ -1500,7 +1568,6 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => { }); } - clearInterval(heartbeat); recordStreamDuration({ path: '/api/stream', model: MODEL, status: resultMessage.is_error ? 'error' : 'ok' }, Date.now() - streamStart); // Use manifest's actual session dir (may differ from server-computed if model chose a different directory) @@ -1559,7 +1626,7 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => { } manifest.finalize(finalPayload); - send(finalPayload); + send(finalPayload, { critical: true }); // Persist final event metadata & agent audit to DB (non-fatal) if (featureFlags.HOOK_DB_PERSISTENCE) { @@ -1627,7 +1694,7 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => { const resp = toErrorResponse(code, error?.message || 'Unknown error', {}, req.requestId); recordError(resp.body.error.code, '/api/stream'); console.error('Agent SDK stream error:', error); - send({ type: 'error', error: resp.body.error }); + send({ type: 'error', error: resp.body.error }, { critical: true }); endSpan(span, { error }); } finally { end(); @@ -1707,6 +1774,10 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => { let currentTurnText = ''; for await (const event of stream) { + if (clientDisconnected) { + console.log('[Stream] Client disconnected, breaking legacy loop'); + break; + } if (event.type === 'content_block_start') { currentBlockType = event.content_block?.type; if (event.content_block?.type === 'tool_use') { @@ -1850,7 +1921,7 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => { globalRateLimiter.tokenBucket = Math.max(0, globalRateLimiter.tokenBucket - correctionLegacy); } - send(finalPayload); + send(finalPayload, { critical: true }); }); endSpan(span); } catch (error) { @@ -1858,13 +1929,30 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => { const resp = toErrorResponse(code, error?.message || 'Unknown error', {}, req.requestId); recordError(resp.body.error.code, '/api/stream'); console.error('SDK stream error:', error); - send({ type: 'error', error: resp.body.error }); + send({ type: 'error', error: resp.body.error }, { critical: true }); endSpan(span, { error }); } finally { end(); } }); +// 404 handler (must be after all routes) +app.use((req, res) => { + res.status(404).json({ error: `Not found: ${req.method} ${req.path}` }); +}); + +// Global Express error handler (4-arg signature required) +app.use((err, req, res, next) => { + if (err.code === 'LIMIT_FILE_SIZE') { + return res.status(413).json({ error: `File too large: ${err.message}`, code: err.code, limit: '50MB' }); + } + if (err.name === 'MulterError') { + return res.status(400).json({ error: `Upload error: ${err.message}`, code: err.code }); + } + console.error('[Server] Unhandled express error:', err); + res.status(500).json({ error: 'Internal server error' }); +}); + // Global exception handlers — catch unhandled errors that would silently crash the process process.on('unhandledRejection', (reason) => { console.error('[FATAL] Unhandled rejection:', reason); diff --git a/super-legal-mcp-refactored/test/react-frontend/app.js b/super-legal-mcp-refactored/test/react-frontend/app.js index d4839b572..4aa4508e1 100644 --- a/super-legal-mcp-refactored/test/react-frontend/app.js +++ b/super-legal-mcp-refactored/test/react-frontend/app.js @@ -2786,6 +2786,7 @@ body: renderKV(kvData) }); + reconnectAttempts = 0; setStreaming(false); if (!docConvertExpected) { closeStream('final'); @@ -2808,6 +2809,10 @@ }); } + let reconnectAttempts = 0; + const MAX_RECONNECT_ATTEMPTS = 3; + const RECONNECT_BASE_DELAY_MS = 3000; + function openStream() { const q = queryInput.value.trim(); if (!q) return; @@ -2925,9 +2930,28 @@ closeStream('doc_convert_server_close'); return; } - addTimelineEvent({ kind: 'error', title: 'SSE error', tags: [{ text: 'error', style: 'err' }] }); - setStreaming(false); - closeStream('error'); + eventSource.close(); + eventSource = null; + + if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { + reconnectAttempts++; + addTimelineEvent({ + kind: 'warning', + title: `Connection lost — reconnecting (${reconnectAttempts}/${MAX_RECONNECT_ATTEMPTS})`, + tags: [{ text: 'SSE' }], + }); + setTimeout(() => { + openStream(); + }, RECONNECT_BASE_DELAY_MS * reconnectAttempts); + } else { + addTimelineEvent({ + kind: 'error', + title: 'Connection lost — unable to reconnect', + tags: [{ text: 'SSE' }], + body: 'Please refresh the page to start a new session.', + }); + closeStream('reconnect_failed'); + } }; } }