Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions super-legal-mcp-refactored/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,52 @@

All notable changes to the Super Legal MCP Server are documented in this file.

## [4.7.0] - 2026-03-12
## [4.8.0] - 2026-03-12

### AddedExa-Powered Web Tool Replacement
### HardenedSSE Streaming Resilience (deploy-week-4)

Replaces Anthropic's `WebFetch` + `WebSearch` with Exa-powered MCP tools (`fetch_document`, `exa_web_search`). Gated by `EXA_WEB_TOOLS=false` (default OFF). Zero behavior change when disabled. 150/150 tests pass in both flag states.
Production-readiness hardening for the SSE streaming path in `claude-sdk-server.js` and frontend reconnection in `app.js`. All changes are failure-path guards — zero behavioral change on happy path.

- **`fetch_document`**: Two-phase MCP tool — direct HTTP first, Exa `/contents` fallback on 403/timeout. Preview mode (`mode: 'preview'`) returns highlights + AI summary (~2-4K chars); full mode returns complete text with verbosity control (`compact`/`standard`/`full`).
- **`exa_web_search`**: MCP tool wrapping Exa deep search with highlights, date filtering, category support.
- **`DirectFetchHybridClient`**: Cache isolation (preview vs full keys), metrics (preview/direct/exa hit rates), rate limiter integration, 15s/30s/60s tiered timeouts.
- **STANDARD_TOOLS**: `WebFetch`/`WebSearch` conditionally removed from all 5 tool arrays when flag enabled.
- **Domain MCP servers**: `direct-fetch` + `exa-search` domains appended to all 21 research subagent domain maps.
- **Agent prompts**: 63+ tool name references use dynamic `${FETCH_TOOL_NAME}`/`${SEARCH_TOOL_NAME}` — zero hardcoded leaks verified by audit.
- **Observability**: `exaWebSearches` + `fetchDocumentCalls` counters in sdkHooks → hookSSEBridge → frontend. Math.max guards on 4 `streamStats.webSearches` update sites prevent orchestrator overwrite.
- **Frontend**: Combined web search counter, per-agent card chips, Doc Preview/Document Fetch timeline events with source tags.
- **Tests**: 4 new test files (direct-fetch-hybrid unit + live, exa-web-search unit). 5 existing test files updated with flag-conditional assertions.
**`send()` rewrite (Tasks 1 + 2 + 5):**
- `res.destroyed` / `ended` guard prevents writes to closed connections
- try/catch wrapper returns `false` instead of throwing on write failure
- Backpressure: non-critical events silently dropped when `res.writableLength > 1MB`
- `{ critical: true }` option on 5 terminal sends (`final`, `error`, `session_timeout`) — always attempt delivery
- Heartbeat `res.write(':\n\n')` independently guarded with same checks

**Client disconnect + session timeout (Tasks 3 + 4):**
- `clientDisconnected` flag set on `res.on('close')` — checked at top of all 3 `for await` loops (P0, main, legacy)
- 4-hour session timeout (configurable via `SDK_MAX_SESSION_DURATION_MS`) — matches GCE HTTPS LB backend timeout
- `end()` clears both heartbeat interval and session timeout; `activeStreamCount` decremented exactly once

**Frontend reconnection (Task 6):**
- EventSource `onerror` handler: linear backoff reconnection (3s, 6s, 9s), max 3 attempts
- Timeline events for connection status (`warning` on retry, `error` on exhaustion)
- Counter reset on successful `final` event

**Health check enhancement (Task 7):**
- DB probe: `SELECT 1` with 3-second `Promise.race` timeout — prevents hung connections from blocking health checks
- Memory metrics: `heap_used_mb`, `heap_total_mb`, `rss_mb` from `process.memoryUsage()`
- `active_streams` count, `uptime_seconds`
- 503 triggers: circuit breaker OPEN, DB unreachable, RSS > 6400 MB (80% of 8GB GCE e2-standard-2)

**Express error handlers (Task 8):**
- 404 handler: `{ error: "Not found: GET /nonexistent" }` — placed after all routes, before `app.listen()`
- Global error handler: multer `LIMIT_FILE_SIZE` → 413, `MulterError` → 400, generic → 500

| File | Changes |
|------|---------|
| `src/server/claude-sdk-server.js` | send() rewrite, heartbeat guard, clientDisconnected, sessionTimeout, end() update, 3x for-await guards, activeStreamCount, /health rewrite, 404+error handlers, 5 critical send() flags |
| `test/react-frontend/app.js` | reconnectAttempts vars, EventSource onerror reconnection, counter reset on final |

**Zero new files. Zero dependency changes. Zero config changes.**

**Deployment status (Weeks 1-4):**
- Week 1: Runtime fixes, AsyncLocalStorage, error taxonomy — merged
- Week 2: Env validation, Dockerfile, .dockerignore, pool config — merged
- Week 3: Authentication, Cloud Run→GCE infrastructure — merged
- Week 4: SSE hardening — **this release**
- Next: Staging deployment validation (smoke tests, 24h health monitoring)

---

Expand Down
16 changes: 16 additions & 0 deletions super-legal-mcp-refactored/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,22 @@ Key design decisions:
- **Slim shared context**: Specialists receive only STATUS CODE REFERENCE + TOOL CAPABILITY MATRIX (~62 lines vs ~793 in full shared)
- **Rollback**: `USE_SPLIT_PROMPTS=false` reverts all agents to the legacy monolithic `memorandum.md`

### SSE Streaming Resilience (v4.8.0)

Production hardening for the SSE streaming path. All guards activate only under failure — zero happy-path behavioral change.

| Protection | Mechanism | Config |
|-----------|-----------|--------|
| **Write guard** | `res.destroyed` / `ended` check + try/catch on every `send()` | — |
| **Backpressure** | Non-critical events dropped when `res.writableLength > 1MB` | — |
| **Critical events** | `final`, `error`, `session_timeout` always attempt delivery | — |
| **Session timeout** | Graceful close after max duration | `SDK_MAX_SESSION_DURATION_MS` (default: 4h) |
| **Disconnect detection** | `clientDisconnected` flag breaks all 3 `for await` loops | — |
| **Frontend reconnect** | EventSource linear backoff (3s, 6s, 9s), max 3 attempts | — |
| **Health probes** | DB `SELECT 1` (3s timeout), RSS memory, active stream count | — |
| **503 auto-heal** | Circuit breaker OPEN, DB down, or RSS > 6.4GB → MIG restart | — |
| **Error handlers** | 404 JSON + global Express handler (multer 413, generic 500) | — |

### Phase 4 SDK Cutover (Production)
- Default feature flags now route 100% traffic to the SDK Tool Runner (`SDK_TOOL_RUNNER=true`, `CANARY_PCT=100`, `STRUCTURED_OUTPUTS=true`).
- For staged rollout, adjust `CANARY_PCT` (50 → 75 → 100) and monitor parity/latency before advancing.
Expand Down
112 changes: 100 additions & 12 deletions super-legal-mcp-refactored/src/server/claude-sdk-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ const anthropicBreaker = new CircuitBreaker({
timeoutMs: Number(process.env.SDK_BREAKER_TIMEOUT_MS || 60000)
});

let activeStreamCount = 0;

// Build rate limiters for API clients
function createRateLimiters() {
const limiters = new Map();
Expand Down Expand Up @@ -490,10 +492,34 @@ app.get('/health', async (req, res) => {
threshold: anthropicBreaker?.failureThreshold ?? null
};

// Anthropic reachability probe is skipped in sandboxed/offline environments
const anthropicStatus = process.env.ANTHROPIC_API_KEY ? 'not_checked' : 'missing_api_key';
// DB probe
let dbStatus;
try {
const pool = getPool();
if (pool) {
const start = Date.now();
const dbTimeout = new Promise((_, reject) => setTimeout(() => reject(new Error('DB probe timeout (3s)')), 3000));
await Promise.race([pool.query('SELECT 1'), dbTimeout]);
dbStatus = { status: 'ok', latency_ms: Date.now() - start };
} else {
dbStatus = { status: 'not_configured' };
}
} catch (err) {
dbStatus = { status: 'error', error: err.message };
}

const healthy = breakerStatus.state !== 'OPEN';
// Memory metrics
const mem = process.memoryUsage();
const memory = {
heap_used_mb: Math.round(mem.heapUsed / 1024 / 1024),
heap_total_mb: Math.round(mem.heapTotal / 1024 / 1024),
rss_mb: Math.round(mem.rss / 1024 / 1024),
};

const dbDown = dbStatus.status === 'error';
const memoryPressure = memory.rss_mb > 6400; // 80% of 8GB (GCE e2-standard-2)
const breakerOpen = breakerStatus.state === 'OPEN';
const healthy = !breakerOpen && !dbDown && !memoryPressure;

res.status(healthy ? 200 : 503).json({
ok: healthy,
Expand All @@ -502,10 +528,14 @@ app.get('/health', async (req, res) => {
sdk_version: SDK_VERSION,
model: MODEL,
timestamp: now,
uptime_seconds: Math.round(process.uptime()),
active_streams: activeStreamCount,
memory,
build,
feature_flags: flags,
dependencies: {
anthropic_api: anthropicStatus,
anthropic_api: process.env.ANTHROPIC_API_KEY ? 'not_checked' : 'missing_api_key',
database: dbStatus,
rate_limiter: rateLimiterStatus,
circuit_breaker: breakerStatus
}
Expand Down Expand Up @@ -963,21 +993,50 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => {
});
res.flushHeaders?.();

const send = (obj) => res.write(`data: ${JSON.stringify(obj)}\n\n`);
const heartbeat = setInterval(() => res.write(':\n\n'), 10000);
const send = (obj, { critical = false } = {}) => {
if (res.destroyed || ended) return false;
// Backpressure: drop non-critical events when buffer > 1MB
if (!critical && res.writableLength > 1_048_576) return false;
try {
return res.write(`data: ${JSON.stringify(obj)}\n\n`);
} catch {
return false;
}
};
const heartbeat = setInterval(() => {
if (!res.destroyed && !ended) {
try { res.write(':\n\n'); } catch {}
}
}, 10000);

let clientDisconnected = false;
const MAX_SESSION_DURATION_MS = Number(process.env.SDK_MAX_SESSION_DURATION_MS || 4 * 60 * 60 * 1000); // 4 hours (matches GCE HTTPS LB backend timeout)
const sessionTimeout = setTimeout(() => {
console.warn('[Stream] Session duration limit reached');
send({ type: 'session_timeout', message: 'Maximum session duration reached. Session closing gracefully.' }, { critical: true });
clientDisconnected = true;
end();
}, MAX_SESSION_DURATION_MS);

let ended = false;
activeStreamCount++;
const end = () => {
if (ended) return;
ended = true;
activeStreamCount--;
clearInterval(heartbeat);
clearTimeout(sessionTimeout);
try { res.end(); } catch {}
};

// Only listen to response close, NOT request close
// For POST requests, req 'close' fires after body is consumed (before streaming completes)
// res 'close' correctly fires when client disconnects
res.on('close', end);
res.on('close', () => {
clientDisconnected = true;
console.log('[Stream] Client disconnected for session');
end();
});

send({
type: 'system_info',
Expand Down Expand Up @@ -1159,6 +1218,10 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => {
settingSources: [],
}
})) {
if (clientDisconnected) {
console.log('[Stream] Client disconnected, breaking P0 loop');
break;
}
// Forward P0 stream events to SSE — tool visibility + text deltas (thinking suppressed)
if (message.type === 'stream_event') {
if (message.event?.type === 'content_block_start') {
Expand Down Expand Up @@ -1306,6 +1369,11 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => {
allowedTools: ORCHESTRATOR_ALLOWED_TOOLS,
}
})) {
if (clientDisconnected) {
console.log('[Stream] Client disconnected, breaking main loop');
break;
}

// Debug logging (reduced verbosity for content_block_delta)
if (message.type !== 'stream_event' ||
(message.event?.type !== 'content_block_delta')) {
Expand Down Expand Up @@ -1500,7 +1568,6 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => {
});
}

clearInterval(heartbeat);
recordStreamDuration({ path: '/api/stream', model: MODEL, status: resultMessage.is_error ? 'error' : 'ok' }, Date.now() - streamStart);

// Use manifest's actual session dir (may differ from server-computed if model chose a different directory)
Expand Down Expand Up @@ -1559,7 +1626,7 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => {
}

manifest.finalize(finalPayload);
send(finalPayload);
send(finalPayload, { critical: true });

// Persist final event metadata & agent audit to DB (non-fatal)
if (featureFlags.HOOK_DB_PERSISTENCE) {
Expand Down Expand Up @@ -1627,7 +1694,7 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => {
const resp = toErrorResponse(code, error?.message || 'Unknown error', {}, req.requestId);
recordError(resp.body.error.code, '/api/stream');
console.error('Agent SDK stream error:', error);
send({ type: 'error', error: resp.body.error });
send({ type: 'error', error: resp.body.error }, { critical: true });
endSpan(span, { error });
} finally {
end();
Expand Down Expand Up @@ -1707,6 +1774,10 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => {
let currentTurnText = '';

for await (const event of stream) {
if (clientDisconnected) {
console.log('[Stream] Client disconnected, breaking legacy loop');
break;
}
if (event.type === 'content_block_start') {
currentBlockType = event.content_block?.type;
if (event.content_block?.type === 'tool_use') {
Expand Down Expand Up @@ -1850,21 +1921,38 @@ app.all('/api/stream', upload.array('documents', 10), async (req, res) => {
globalRateLimiter.tokenBucket = Math.max(0, globalRateLimiter.tokenBucket - correctionLegacy);
}

send(finalPayload);
send(finalPayload, { critical: true });
});
endSpan(span);
} catch (error) {
const code = mapExceptionToCode(error);
const resp = toErrorResponse(code, error?.message || 'Unknown error', {}, req.requestId);
recordError(resp.body.error.code, '/api/stream');
console.error('SDK stream error:', error);
send({ type: 'error', error: resp.body.error });
send({ type: 'error', error: resp.body.error }, { critical: true });
endSpan(span, { error });
} finally {
end();
}
});

// 404 handler (must be after all routes)
app.use((req, res) => {
res.status(404).json({ error: `Not found: ${req.method} ${req.path}` });
});

// Global Express error handler (4-arg signature required)
app.use((err, req, res, next) => {
if (err.code === 'LIMIT_FILE_SIZE') {
return res.status(413).json({ error: `File too large: ${err.message}`, code: err.code, limit: '50MB' });
}
if (err.name === 'MulterError') {
return res.status(400).json({ error: `Upload error: ${err.message}`, code: err.code });
}
console.error('[Server] Unhandled express error:', err);
res.status(500).json({ error: 'Internal server error' });
});

// Global exception handlers — catch unhandled errors that would silently crash the process
process.on('unhandledRejection', (reason) => {
console.error('[FATAL] Unhandled rejection:', reason);
Expand Down
30 changes: 27 additions & 3 deletions super-legal-mcp-refactored/test/react-frontend/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -2786,6 +2786,7 @@
body: renderKV(kvData)
});

reconnectAttempts = 0;
setStreaming(false);
if (!docConvertExpected) {
closeStream('final');
Expand All @@ -2808,6 +2809,10 @@
});
}

let reconnectAttempts = 0;
const MAX_RECONNECT_ATTEMPTS = 3;
const RECONNECT_BASE_DELAY_MS = 3000;

function openStream() {
const q = queryInput.value.trim();
if (!q) return;
Expand Down Expand Up @@ -2925,9 +2930,28 @@
closeStream('doc_convert_server_close');
return;
}
addTimelineEvent({ kind: 'error', title: 'SSE error', tags: [{ text: 'error', style: 'err' }] });
setStreaming(false);
closeStream('error');
eventSource.close();
eventSource = null;

if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
reconnectAttempts++;
addTimelineEvent({
kind: 'warning',
title: `Connection lost — reconnecting (${reconnectAttempts}/${MAX_RECONNECT_ATTEMPTS})`,
tags: [{ text: 'SSE' }],
});
setTimeout(() => {
openStream();
}, RECONNECT_BASE_DELAY_MS * reconnectAttempts);
} else {
addTimelineEvent({
kind: 'error',
title: 'Connection lost — unable to reconnect',
tags: [{ text: 'SSE' }],
body: 'Please refresh the page to start a new session.',
});
closeStream('reconnect_failed');
}
};
}
}
Expand Down