diff --git a/apps/server/src/diagnostics/ProcessDiagnostics.test.ts b/apps/server/src/diagnostics/ProcessDiagnostics.test.ts new file mode 100644 index 00000000000..18a54326de1 --- /dev/null +++ b/apps/server/src/diagnostics/ProcessDiagnostics.test.ts @@ -0,0 +1,252 @@ +import { describe, expect, it } from "@effect/vitest"; +import * as DateTime from "effect/DateTime"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; +import * as Sink from "effect/Sink"; +import * as Stream from "effect/Stream"; +import { ChildProcessSpawner } from "effect/unstable/process"; + +import * as ProcessDiagnostics from "./ProcessDiagnostics.ts"; + +const encoder = new TextEncoder(); + +function mockHandle(result: { + readonly stdout?: string; + readonly stderr?: string; + readonly code?: number; +}) { + return ChildProcessSpawner.makeHandle({ + pid: ChildProcessSpawner.ProcessId(1), + exitCode: Effect.succeed(ChildProcessSpawner.ExitCode(result.code ?? 0)), + isRunning: Effect.succeed(false), + kill: () => Effect.void, + unref: Effect.succeed(Effect.void), + stdin: Sink.drain, + stdout: Stream.make(encoder.encode(result.stdout ?? "")), + stderr: Stream.make(encoder.encode(result.stderr ?? "")), + all: Stream.empty, + getInputFd: () => Sink.drain, + getOutputFd: () => Stream.empty, + }); +} + +describe("ProcessDiagnostics", () => { + it.effect("parses POSIX ps rows with full commands", () => + Effect.sync(() => { + const rows = ProcessDiagnostics.parsePosixProcessRows( + [ + " 10 1 10 Ss 0.0 1024 01:02.03 /usr/bin/node server.js", + " 11 10 10 S+ 12.5 20480 00:04 codex app-server --config /tmp/one two", + ].join("\n"), + ); + + expect(rows).toEqual([ + { + pid: 10, + ppid: 1, + pgid: 10, + status: "Ss", + cpuPercent: 0, + rssBytes: 1024 * 1024, + elapsed: "01:02.03", + command: "/usr/bin/node server.js", + }, + { + pid: 11, + ppid: 10, + pgid: 10, + status: "S+", + cpuPercent: 12.5, + rssBytes: 20480 * 1024, + elapsed: "00:04", + command: "codex app-server --config /tmp/one two", + }, + ]); + }), + ); + + it.effect("aggregates only descendants of the server process", () => + Effect.sync(() => { + const diagnostics = ProcessDiagnostics.aggregateProcessDiagnostics({ + serverPid: 100, + readAt: DateTime.makeUnsafe("2026-05-05T10:00:00.000Z"), + rows: [ + { + pid: 100, + ppid: 1, + pgid: 100, + status: "S", + cpuPercent: 0, + rssBytes: 1_000, + elapsed: "01:00", + command: "t3 server", + }, + { + pid: 101, + ppid: 100, + pgid: 100, + status: "S", + cpuPercent: 1.5, + rssBytes: 2_000, + elapsed: "00:20", + command: "codex app-server", + }, + { + pid: 102, + ppid: 101, + pgid: 100, + status: "R", + cpuPercent: 3.25, + rssBytes: 4_000, + elapsed: "00:05", + command: "git status", + }, + { + pid: 200, + ppid: 1, + pgid: 200, + status: "S", + cpuPercent: 99, + rssBytes: 8_000, + elapsed: "00:01", + command: "unrelated", + }, + { + pid: 201, + ppid: 100, + pgid: 100, + status: "R", + cpuPercent: 9, + rssBytes: 9_000, + elapsed: "00:00", + command: "ps -axo pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command=", + }, + ], + }); + + expect(diagnostics.serverPid).toBe(100); + expect(DateTime.formatIso(diagnostics.readAt)).toBe("2026-05-05T10:00:00.000Z"); + expect(diagnostics.processCount).toBe(2); + expect(diagnostics.totalRssBytes).toBe(6_000); + expect(diagnostics.totalCpuPercent).toBe(4.75); + expect(diagnostics.processes.map((process) => process.pid)).toEqual([101, 102]); + expect(diagnostics.processes.map((process) => process.depth)).toEqual([0, 1]); + expect(Option.getOrNull(diagnostics.processes[0]!.pgid)).toBe(100); + expect(diagnostics.processes[0]?.childPids).toEqual([102]); + }), + ); + + it.effect("preserves ascending sibling order for nested descendants", () => + Effect.sync(() => { + const diagnostics = ProcessDiagnostics.aggregateProcessDiagnostics({ + serverPid: 100, + readAt: DateTime.makeUnsafe("2026-05-05T10:00:00.000Z"), + rows: [ + { + pid: 101, + ppid: 100, + pgid: 100, + status: "S", + cpuPercent: 0, + rssBytes: 100, + elapsed: "00:10", + command: "agent", + }, + { + pid: 103, + ppid: 101, + pgid: 100, + status: "S", + cpuPercent: 0, + rssBytes: 100, + elapsed: "00:10", + command: "child-b", + }, + { + pid: 102, + ppid: 101, + pgid: 100, + status: "S", + cpuPercent: 0, + rssBytes: 100, + elapsed: "00:10", + command: "child-a", + }, + ], + }); + + expect(diagnostics.processes.map((process) => process.pid)).toEqual([101, 102, 103]); + }), + ); + + it.effect("queries processes through the ChildProcessSpawner service", () => + Effect.gen(function* () { + const commands: Array<{ readonly command: string; readonly args: ReadonlyArray }> = + []; + const spawnerLayer = Layer.succeed( + ChildProcessSpawner.ChildProcessSpawner, + ChildProcessSpawner.make((command) => { + const childProcess = command as unknown as { + readonly command: string; + readonly args: ReadonlyArray; + }; + commands.push({ command: childProcess.command, args: childProcess.args }); + return Effect.succeed( + mockHandle({ + stdout: [ + ` ${process.pid} 1 ${process.pid} Ss 0.0 1024 01:02.03 t3 server`, + ` 4242 ${process.pid} ${process.pid} S 1.5 2048 00:04 agent`, + ].join("\n"), + }), + ); + }), + ); + const layer = ProcessDiagnostics.layer.pipe(Layer.provide(spawnerLayer)); + + const diagnostics = yield* Effect.service(ProcessDiagnostics.ProcessDiagnostics).pipe( + Effect.flatMap((pd) => pd.read), + Effect.provide(layer), + ); + + expect(diagnostics.processes.map((process) => process.pid)).toEqual([4242]); + expect(commands).toEqual([ + { + command: "ps", + args: ["-axo", "pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command="], + }, + ]); + }), + ); + + it.effect("does not allow signaling the diagnostics query process", () => + Effect.gen(function* () { + const spawnerLayer = Layer.succeed( + ChildProcessSpawner.ChildProcessSpawner, + ChildProcessSpawner.make(() => + Effect.succeed( + mockHandle({ + stdout: [ + ` ${process.pid} 1 ${process.pid} Ss 0.0 1024 01:02.03 t3 server`, + ` 4242 ${process.pid} ${process.pid} R 1.5 2048 00:00 ps -axo pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command=`, + ].join("\n"), + }), + ), + ), + ); + const layer = ProcessDiagnostics.layer.pipe(Layer.provide(spawnerLayer)); + + const result = yield* Effect.service(ProcessDiagnostics.ProcessDiagnostics).pipe( + Effect.flatMap((pd) => pd.signal({ pid: 4242, signal: "SIGINT" })), + Effect.provide(layer), + ); + + expect(result).toEqual({ + pid: 4242, + signal: "SIGINT", + signaled: false, + message: Option.some("Process 4242 is not a live descendant of the T3 server."), + }); + }), + ); +}); diff --git a/apps/server/src/diagnostics/ProcessDiagnostics.ts b/apps/server/src/diagnostics/ProcessDiagnostics.ts new file mode 100644 index 00000000000..2e1b255c303 --- /dev/null +++ b/apps/server/src/diagnostics/ProcessDiagnostics.ts @@ -0,0 +1,461 @@ +import type { + ServerProcessDiagnosticsEntry, + ServerProcessDiagnosticsResult, + ServerProcessSignal, + ServerSignalProcessResult, +} from "@t3tools/contracts"; +import * as Context from "effect/Context"; +import * as DateTime from "effect/DateTime"; +import * as Duration from "effect/Duration"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; +import * as Schema from "effect/Schema"; +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; + +import { collectUint8StreamText } from "../stream/collectUint8StreamText.ts"; + +interface ProcessRow { + readonly pid: number; + readonly ppid: number; + readonly pgid: number | null; + readonly status: string; + readonly cpuPercent: number; + readonly rssBytes: number; + readonly elapsed: string; + readonly command: string; +} + +const PROCESS_QUERY_TIMEOUT_MS = 1_000; +const POSIX_PROCESS_QUERY_COMMAND = "pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command="; +const PROCESS_QUERY_MAX_OUTPUT_BYTES = 2 * 1024 * 1024; + +export interface ProcessDiagnosticsShape { + readonly read: Effect.Effect; + readonly signal: (input: { + readonly pid: number; + readonly signal: ServerProcessSignal; + }) => Effect.Effect; +} + +export class ProcessDiagnostics extends Context.Service< + ProcessDiagnostics, + ProcessDiagnosticsShape +>()("t3/diagnostics/ProcessDiagnostics") {} + +class ProcessDiagnosticsError extends Schema.TaggedErrorClass()( + "ProcessDiagnosticsError", + { + message: Schema.String, + cause: Schema.optional(Schema.Defect), + }, +) {} + +function toProcessDiagnosticsError(message: string, cause?: unknown): ProcessDiagnosticsError { + return new ProcessDiagnosticsError({ + message, + ...(cause === undefined ? {} : { cause }), + }); +} + +function parsePositiveInt(value: string): number | null { + const parsed = Number.parseInt(value, 10); + return Number.isInteger(parsed) && parsed > 0 ? parsed : null; +} + +function parseNonNegativeInt(value: string): number | null { + const parsed = Number.parseInt(value, 10); + return Number.isInteger(parsed) && parsed >= 0 ? parsed : null; +} + +function parseNumber(value: string): number | null { + const parsed = Number.parseFloat(value); + return Number.isFinite(parsed) ? parsed : null; +} + +export function parsePosixProcessRows(output: string): ReadonlyArray { + const rows: ProcessRow[] = []; + const rowPattern = + /^\s*(\d+)\s+(\d+)\s+(-?\d+)\s+(\S+)\s+([+-]?(?:\d+\.?\d*|\.\d+))\s+(\d+)\s+(\S+)\s+(.+)$/; + + for (const line of output.split(/\r?\n/)) { + if (line.trim().length === 0) continue; + + const match = rowPattern.exec(line); + if (!match) continue; + + const pidText = match[1]; + const ppidText = match[2]; + const pgidText = match[3]; + const status = match[4]; + const cpuText = match[5]; + const rssText = match[6]; + const elapsed = match[7]; + const command = match[8]; + if ( + pidText === undefined || + ppidText === undefined || + pgidText === undefined || + status === undefined || + cpuText === undefined || + rssText === undefined || + elapsed === undefined || + command === undefined + ) { + continue; + } + + const pid = parsePositiveInt(pidText); + const ppid = parseNonNegativeInt(ppidText); + const pgid = Number.parseInt(pgidText, 10); + const cpuPercent = parseNumber(cpuText); + const rssKiB = parseNonNegativeInt(rssText); + if ( + pid === null || + ppid === null || + !Number.isInteger(pgid) || + cpuPercent === null || + rssKiB === null || + !status || + !elapsed || + !command + ) { + continue; + } + + rows.push({ + pid, + ppid, + pgid, + status, + cpuPercent, + rssBytes: rssKiB * 1024, + elapsed, + command, + }); + } + + return rows; +} + +function normalizeWindowsProcessRow(value: unknown): ProcessRow | null { + if (typeof value !== "object" || value === null) return null; + const record = value as Record; + const pid = typeof record.ProcessId === "number" ? record.ProcessId : null; + const ppid = typeof record.ParentProcessId === "number" ? record.ParentProcessId : null; + const commandLine = + typeof record.CommandLine === "string" && record.CommandLine.trim().length > 0 + ? record.CommandLine + : typeof record.Name === "string" + ? record.Name + : null; + const workingSet = + typeof record.WorkingSetSize === "number" && Number.isFinite(record.WorkingSetSize) + ? Math.max(0, Math.round(record.WorkingSetSize)) + : 0; + const cpuPercent = + typeof record.PercentProcessorTime === "number" && Number.isFinite(record.PercentProcessorTime) + ? Math.max(0, record.PercentProcessorTime) + : 0; + + if (!pid || pid <= 0 || ppid === null || ppid < 0 || !commandLine) return null; + return { + pid, + ppid, + pgid: null, + status: typeof record.Status === "string" && record.Status.length > 0 ? record.Status : "Live", + cpuPercent, + rssBytes: workingSet, + elapsed: "", + command: commandLine, + }; +} + +function parseWindowsProcessRows(output: string): ReadonlyArray { + if (output.trim().length === 0) return []; + try { + const parsed = JSON.parse(output) as unknown; + const records = Array.isArray(parsed) ? parsed : [parsed]; + return records.flatMap((record) => { + const row = normalizeWindowsProcessRow(record); + return row ? [row] : []; + }); + } catch { + return []; + } +} + +function buildDescendantEntries( + rows: ReadonlyArray, + serverPid: number, +): ReadonlyArray { + const childrenByParent = new Map(); + for (const row of rows) { + const children = childrenByParent.get(row.ppid) ?? []; + children.push(row); + childrenByParent.set(row.ppid, children); + } + + const entries: ServerProcessDiagnosticsEntry[] = []; + const visited = new Set(); + const stack = [...(childrenByParent.get(serverPid) ?? [])] + .toSorted((left, right) => left.pid - right.pid) + .map((row) => ({ row, depth: 0 })); + + while (stack.length > 0) { + const item = stack.shift(); + if (!item || visited.has(item.row.pid)) continue; + visited.add(item.row.pid); + + const children = [...(childrenByParent.get(item.row.pid) ?? [])].toSorted( + (left, right) => left.pid - right.pid, + ); + entries.push({ + pid: item.row.pid, + ppid: item.row.ppid, + pgid: Option.fromNullishOr(item.row.pgid), + status: item.row.status, + cpuPercent: item.row.cpuPercent, + rssBytes: item.row.rssBytes, + elapsed: item.row.elapsed || "n/a", + command: item.row.command, + depth: item.depth, + childPids: children.map((child) => child.pid), + }); + + stack.unshift(...children.map((row) => ({ row, depth: item.depth + 1 }))); + } + + return entries; +} + +function isDiagnosticsQueryProcess(row: ProcessRow, serverPid: number): boolean { + if (row.ppid !== serverPid) return false; + + const command = row.command.trim(); + return ( + /(?:^|[/\\])ps\s+-axo\s+pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command=/.test(command) || + (/\bpowershell(?:\.exe)?\b/i.test(command) && + /\bGet-CimInstance\s+Win32_Process\b/i.test(command)) + ); +} + +function makeResult(input: { + readonly serverPid: number; + readonly rows: ReadonlyArray; + readonly readAt: DateTime.Utc; + readonly error?: string; +}): ServerProcessDiagnosticsResult { + const readAt = input.readAt; + const rows = input.rows.filter((row) => !isDiagnosticsQueryProcess(row, input.serverPid)); + const processes = buildDescendantEntries(rows, input.serverPid); + const totalRssBytes = processes.reduce((total, process) => total + process.rssBytes, 0); + const totalCpuPercent = processes.reduce((total, process) => total + process.cpuPercent, 0); + + return { + serverPid: input.serverPid, + readAt, + processCount: processes.length, + totalRssBytes, + totalCpuPercent, + processes, + error: input.error ? Option.some({ message: input.error }) : Option.none(), + }; +} + +interface ProcessOutput { + readonly exitCode: number; + readonly stdout: string; + readonly stderr: string; +} + +const runProcess = Effect.fn("runProcess")( + function* (input: { + readonly command: string; + readonly args: ReadonlyArray; + readonly errorMessage: string; + }) { + const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; + const child = yield* spawner.spawn( + ChildProcess.make(input.command, input.args, { + cwd: process.cwd(), + shell: process.platform === "win32", + }), + ); + const [stdout, stderr, exitCode] = yield* Effect.all( + [ + collectUint8StreamText({ + stream: child.stdout, + maxBytes: PROCESS_QUERY_MAX_OUTPUT_BYTES, + truncatedMarker: "\n\n[truncated]", + }), + collectUint8StreamText({ + stream: child.stderr, + maxBytes: PROCESS_QUERY_MAX_OUTPUT_BYTES, + truncatedMarker: "\n\n[truncated]", + }), + child.exitCode, + ], + { concurrency: "unbounded" }, + ); + + return { + exitCode, + stdout: stdout.text, + stderr: stderr.text, + } satisfies ProcessOutput; + }, + (effect, input) => + effect.pipe( + Effect.scoped, + Effect.timeoutOption(Duration.millis(PROCESS_QUERY_TIMEOUT_MS)), + Effect.flatMap((result) => + Option.match(result, { + onNone: () => Effect.fail(toProcessDiagnosticsError(`${input.errorMessage} timed out.`)), + onSome: Effect.succeed, + }), + ), + Effect.mapError((cause) => + Schema.is(ProcessDiagnosticsError)(cause) + ? cause + : toProcessDiagnosticsError(input.errorMessage, cause), + ), + ), +); + +function readPosixProcessRows(): Effect.Effect< + ReadonlyArray, + ProcessDiagnosticsError, + ChildProcessSpawner.ChildProcessSpawner +> { + return runProcess({ + command: "ps", + args: ["-axo", POSIX_PROCESS_QUERY_COMMAND], + errorMessage: "Failed to query process diagnostics.", + }).pipe( + Effect.flatMap((result) => + result.exitCode !== 0 + ? Effect.fail(toProcessDiagnosticsError(result.stderr.trim() || "ps failed.")) + : Effect.succeed(parsePosixProcessRows(result.stdout)), + ), + ); +} + +function readWindowsProcessRows(): Effect.Effect< + ReadonlyArray, + ProcessDiagnosticsError, + ChildProcessSpawner.ChildProcessSpawner +> { + const command = [ + "$processes = Get-CimInstance Win32_Process | ForEach-Object {", + '$perf = Get-CimInstance Win32_PerfFormattedData_PerfProc_Process -Filter "IDProcess = $($_.ProcessId)" -ErrorAction SilentlyContinue;', + "[pscustomobject]@{ ProcessId = $_.ProcessId; ParentProcessId = $_.ParentProcessId; Name = $_.Name; CommandLine = $_.CommandLine; Status = $_.Status; WorkingSetSize = $_.WorkingSetSize; PercentProcessorTime = if ($perf) { $perf.PercentProcessorTime } else { 0 } }", + "};", + "$processes | ConvertTo-Json -Compress -Depth 3", + ].join(" "); + + return runProcess({ + command: "powershell.exe", + args: ["-NoProfile", "-NonInteractive", "-Command", command], + errorMessage: "Failed to query process diagnostics.", + }).pipe( + Effect.flatMap((result) => + result.exitCode !== 0 + ? Effect.fail( + toProcessDiagnosticsError(result.stderr.trim() || "PowerShell process query failed."), + ) + : Effect.succeed(parseWindowsProcessRows(result.stdout)), + ), + ); +} + +const readProcessRows = (platform = process.platform) => + platform === "win32" ? readWindowsProcessRows() : readPosixProcessRows(); + +export function aggregateProcessDiagnostics(input: { + readonly serverPid: number; + readonly rows: ReadonlyArray; + readonly readAt: DateTime.Utc; +}): ServerProcessDiagnosticsResult { + return makeResult(input); +} + +function assertDescendantPid( + pid: number, +): Effect.Effect { + if (pid === process.pid) { + return Effect.fail(toProcessDiagnosticsError("Refusing to signal the T3 server process.")); + } + + return readProcessRows().pipe( + Effect.flatMap((rows) => { + const filteredRows = rows.filter((row) => !isDiagnosticsQueryProcess(row, process.pid)); + const descendant = buildDescendantEntries(filteredRows, process.pid).some( + (entry) => entry.pid === pid, + ); + return descendant + ? Effect.void + : Effect.fail( + toProcessDiagnosticsError(`Process ${pid} is not a live descendant of the T3 server.`), + ); + }), + ); +} + +export const make = Effect.fn("makeProcessDiagnostics")(function* () { + const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; + + const read: ProcessDiagnosticsShape["read"] = Effect.gen(function* () { + const readAt = yield* DateTime.now; + const rows = yield* readProcessRows().pipe( + Effect.provideService(ChildProcessSpawner.ChildProcessSpawner, spawner), + ); + return makeResult({ serverPid: process.pid, rows, readAt }); + }).pipe( + Effect.catch((error: ProcessDiagnosticsError) => + DateTime.now.pipe( + Effect.map((readAt) => + makeResult({ serverPid: process.pid, rows: [], readAt, error: error.message }), + ), + ), + ), + ); + + const signal: ProcessDiagnosticsShape["signal"] = Effect.fn("ProcessDiagnostics.signal")( + function* (input) { + return yield* assertDescendantPid(input.pid).pipe( + Effect.provideService(ChildProcessSpawner.ChildProcessSpawner, spawner), + Effect.flatMap(() => + Effect.try({ + try: () => { + process.kill(input.pid, input.signal); + return { + pid: input.pid, + signal: input.signal, + signaled: true, + message: Option.none(), + }; + }, + catch: (cause) => + toProcessDiagnosticsError( + `Failed to signal process ${input.pid} with ${input.signal}.`, + cause, + ), + }), + ), + Effect.catch((error: ProcessDiagnosticsError) => + Effect.succeed({ + pid: input.pid, + signal: input.signal, + signaled: false, + message: Option.some(error.message), + }), + ), + ); + }, + ); + + return ProcessDiagnostics.of({ read, signal }); +}); + +export const layer = Layer.effect(ProcessDiagnostics, make()); diff --git a/apps/server/src/diagnostics/TraceDiagnostics.test.ts b/apps/server/src/diagnostics/TraceDiagnostics.test.ts new file mode 100644 index 00000000000..d4ffa4a5fc2 --- /dev/null +++ b/apps/server/src/diagnostics/TraceDiagnostics.test.ts @@ -0,0 +1,258 @@ +import { assert, describe, it } from "@effect/vitest"; +import * as DateTime from "effect/DateTime"; +import * as Effect from "effect/Effect"; +import * as FileSystem from "effect/FileSystem"; +import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; +import * as PlatformError from "effect/PlatformError"; + +import * as TraceDiagnostics from "./TraceDiagnostics.ts"; + +function ns(ms: number): string { + return String(BigInt(ms) * 1_000_000n); +} + +function record(input: { + readonly name: string; + readonly traceId: string; + readonly spanId: string; + readonly startMs: number; + readonly durationMs: number; + readonly exit?: { readonly _tag: "Success" | "Failure" | "Interrupted"; readonly cause?: string }; + readonly events?: ReadonlyArray; +}) { + return JSON.stringify({ + type: "effect-span", + name: input.name, + traceId: input.traceId, + spanId: input.spanId, + sampled: true, + kind: "internal", + startTimeUnixNano: ns(input.startMs), + endTimeUnixNano: ns(input.startMs + input.durationMs), + durationMs: input.durationMs, + attributes: {}, + events: input.events ?? [], + links: [], + exit: input.exit ?? { _tag: "Success" }, + }); +} + +describe("TraceDiagnostics", () => { + it.effect("aggregates failures, slow spans, log levels, and parse errors", () => + Effect.sync(() => { + const diagnostics = TraceDiagnostics.aggregateTraceDiagnostics({ + traceFilePath: "/tmp/server.trace.ndjson", + readAt: DateTime.makeUnsafe("2026-05-05T10:00:00.000Z"), + slowSpanThresholdMs: 1_000, + files: [ + { + path: "/tmp/server.trace.ndjson.1", + text: [ + record({ + name: "server.getConfig", + traceId: "trace-a", + spanId: "span-a", + startMs: 1_000, + durationMs: 50, + }), + "not-json", + ].join("\n"), + }, + { + path: "/tmp/server.trace.ndjson", + text: [ + record({ + name: "orchestration.dispatch", + traceId: "trace-b", + spanId: "span-b", + startMs: 2_000, + durationMs: 1_500, + exit: { _tag: "Failure", cause: "Provider crashed" }, + events: [ + { + name: "provider failed", + timeUnixNano: ns(3_400), + attributes: { "effect.logLevel": "Error" }, + }, + ], + }), + record({ + name: "orchestration.dispatch", + traceId: "trace-c", + spanId: "span-c", + startMs: 4_000, + durationMs: 250, + exit: { _tag: "Failure", cause: "Provider crashed" }, + }), + record({ + name: "git.status", + traceId: "trace-d", + spanId: "span-d", + startMs: 5_000, + durationMs: 25, + exit: { _tag: "Interrupted", cause: "Interrupted" }, + events: [ + { + name: "status delayed", + timeUnixNano: ns(5_010), + attributes: { "effect.logLevel": "Warning" }, + }, + ], + }), + ].join("\n"), + }, + ], + }); + + assert.equal(diagnostics.recordCount, 4); + assert.equal(DateTime.formatIso(diagnostics.readAt), "2026-05-05T10:00:00.000Z"); + assert.equal( + Option.match(diagnostics.firstSpanAt, { + onNone: () => null, + onSome: DateTime.formatIso, + }), + "1970-01-01T00:00:01.000Z", + ); + assert.equal( + Option.match(diagnostics.lastSpanAt, { + onNone: () => null, + onSome: DateTime.formatIso, + }), + "1970-01-01T00:00:05.025Z", + ); + assert.equal(diagnostics.parseErrorCount, 1); + assert.equal(diagnostics.failureCount, 2); + assert.equal(diagnostics.interruptionCount, 1); + assert.equal(diagnostics.slowSpanCount, 1); + assert.equal(diagnostics.logLevelCounts.Error, 1); + assert.equal(diagnostics.logLevelCounts.Warning, 1); + assert.equal(diagnostics.commonFailures[0]?.name, "orchestration.dispatch"); + assert.equal(diagnostics.commonFailures[0]?.count, 2); + assert.equal(diagnostics.latestFailures[0]?.traceId, "trace-c"); + assert.equal(diagnostics.slowestSpans[0]?.traceId, "trace-b"); + assert.equal(diagnostics.latestWarningAndErrorLogs[0]?.message, "status delayed"); + assert.equal(diagnostics.topSpansByCount[0]?.name, "orchestration.dispatch"); + }), + ); + + it.effect("returns a not-found diagnostic when no files are available", () => + Effect.sync(() => { + const diagnostics = TraceDiagnostics.aggregateTraceDiagnostics({ + traceFilePath: "/tmp/missing.trace.ndjson", + readAt: DateTime.makeUnsafe("2026-05-05T10:00:00.000Z"), + files: [], + }); + + assert.equal(diagnostics.recordCount, 0); + assert.equal(Option.getOrUndefined(diagnostics.error)?.kind, "trace-file-not-found"); + }), + ); + + it.effect("preserves full failure causes and log messages", () => + Effect.sync(() => { + const longCause = `VcsProcessSpawnError: ${"missing executable ".repeat(80)}`.trim(); + const longMessage = `provider warning: ${"retrying command ".repeat(80)}`.trim(); + const diagnostics = TraceDiagnostics.aggregateTraceDiagnostics({ + traceFilePath: "/tmp/server.trace.ndjson", + readAt: DateTime.makeUnsafe("2026-05-05T10:00:00.000Z"), + files: [ + { + path: "/tmp/server.trace.ndjson", + text: record({ + name: "VcsProcess.run", + traceId: "trace-long", + spanId: "span-long", + startMs: 1_000, + durationMs: 25, + exit: { _tag: "Failure", cause: longCause }, + events: [ + { + name: longMessage, + timeUnixNano: ns(1_010), + attributes: { "effect.logLevel": "Warning" }, + }, + ], + }), + }, + ], + }); + + assert.equal(diagnostics.latestFailures[0]?.cause, longCause); + assert.equal(diagnostics.commonFailures[0]?.cause, longCause); + assert.equal(diagnostics.latestWarningAndErrorLogs[0]?.message, longMessage); + }), + ); + + it.effect("keeps loaded trace data when one rotated trace file fails to read", () => + Effect.gen(function* () { + const traceFilePath = "/tmp/server.trace.ndjson"; + const fileSystemLayer = FileSystem.layerNoop({ + readFileString: (path) => + path === `${traceFilePath}.1` + ? Effect.fail( + PlatformError.systemError({ + _tag: "PermissionDenied", + module: "FileSystem", + method: "readFileString", + description: "permission denied", + pathOrDescriptor: path, + }), + ) + : Effect.succeed( + record({ + name: "server.getConfig", + traceId: "trace-a", + spanId: "span-a", + startMs: 1_000, + durationMs: 50, + }), + ), + }); + + const diagnostics = yield* TraceDiagnostics.readTraceDiagnostics({ + traceFilePath, + maxFiles: 1, + readAt: DateTime.makeUnsafe("2026-05-05T10:00:00.000Z"), + }).pipe(Effect.provide(TraceDiagnostics.layer.pipe(Layer.provide(fileSystemLayer)))); + + assert.equal(diagnostics.recordCount, 1); + assert.equal( + Option.getOrElse(diagnostics.partialFailure, () => false), + true, + ); + assert.equal(Option.getOrUndefined(diagnostics.error)?.kind, "trace-file-read-failed"); + assert.deepStrictEqual(diagnostics.scannedFilePaths, [`${traceFilePath}.1`, traceFilePath]); + }), + ); + + it.effect("keeps only the slowest span occurrences while aggregating large inputs", () => + Effect.sync(() => { + const diagnostics = TraceDiagnostics.aggregateTraceDiagnostics({ + traceFilePath: "/tmp/server.trace.ndjson", + readAt: DateTime.makeUnsafe("2026-05-05T10:00:00.000Z"), + files: [ + { + path: "/tmp/server.trace.ndjson", + text: Array.from({ length: 25 }, (_, index) => + record({ + name: `span-${index}`, + traceId: `trace-${index}`, + spanId: `span-${index}`, + startMs: index * 1_000, + durationMs: index, + }), + ).join("\n"), + }, + ], + }); + + assert.equal(diagnostics.recordCount, 25); + assert.equal(diagnostics.slowestSpans.length, 10); + assert.deepStrictEqual( + diagnostics.slowestSpans.map((span) => span.durationMs), + [24, 23, 22, 21, 20, 19, 18, 17, 16, 15], + ); + }), + ); +}); diff --git a/apps/server/src/diagnostics/TraceDiagnostics.ts b/apps/server/src/diagnostics/TraceDiagnostics.ts new file mode 100644 index 00000000000..d90186a8647 --- /dev/null +++ b/apps/server/src/diagnostics/TraceDiagnostics.ts @@ -0,0 +1,470 @@ +import type { + ServerTraceDiagnosticsErrorKind, + ServerTraceDiagnosticsFailureSummary, + ServerTraceDiagnosticsLogEvent, + ServerTraceDiagnosticsRecentFailure, + ServerTraceDiagnosticsResult, + ServerTraceDiagnosticsSpanOccurrence, + ServerTraceDiagnosticsSpanSummary, +} from "@t3tools/contracts"; +import * as Context from "effect/Context"; +import * as DateTime from "effect/DateTime"; +import * as Effect from "effect/Effect"; +import * as FileSystem from "effect/FileSystem"; +import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; +import * as PlatformError from "effect/PlatformError"; + +interface TraceRecordLike { + readonly name?: unknown; + readonly traceId?: unknown; + readonly spanId?: unknown; + readonly startTimeUnixNano?: unknown; + readonly endTimeUnixNano?: unknown; + readonly durationMs?: unknown; + readonly exit?: unknown; + readonly events?: unknown; +} + +interface TraceEventLike { + readonly name?: unknown; + readonly timeUnixNano?: unknown; + readonly attributes?: unknown; +} + +export interface TraceDiagnosticsOptions { + readonly traceFilePath: string; + readonly maxFiles: number; + readonly slowSpanThresholdMs?: number; + readonly readAt?: DateTime.Utc; +} + +export interface TraceDiagnosticsShape { + readonly read: (options: TraceDiagnosticsOptions) => Effect.Effect; +} + +export class TraceDiagnostics extends Context.Service()( + "t3/diagnostics/TraceDiagnostics", +) {} + +interface TraceDiagnosticsInput { + readonly traceFilePath: string; + readonly files: ReadonlyArray<{ readonly path: string; readonly text: string }>; + readonly scannedFilePaths?: ReadonlyArray; + readonly slowSpanThresholdMs?: number; + readonly readAt: DateTime.Utc; + readonly error?: TraceDiagnosticsErrorSummary; + readonly partialFailure?: boolean; +} + +interface TraceDiagnosticsErrorSummary { + readonly kind: ServerTraceDiagnosticsErrorKind; + readonly message: string; +} + +const DEFAULT_SLOW_SPAN_THRESHOLD_MS = 1_000; +const TOP_LIMIT = 10; +const RECENT_LIMIT = 20; +function toRotatedTracePaths(traceFilePath: string, maxFiles: number): ReadonlyArray { + const backupCount = Math.max(0, Math.floor(maxFiles)); + const backups = Array.from( + { length: backupCount }, + (_, index) => `${traceFilePath}.${backupCount - index}`, + ); + return [...backups, traceFilePath]; +} + +function isRecordObject(value: unknown): value is TraceRecordLike { + return typeof value === "object" && value !== null; +} + +function toStringValue(value: unknown): string | null { + return typeof value === "string" && value.trim().length > 0 ? value : null; +} + +function toNumberValue(value: unknown): number | null { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function unixNanoToDateTime(value: unknown): DateTime.Utc | null { + const text = toStringValue(value); + if (!text) return null; + + try { + const millis = Number(BigInt(text) / 1_000_000n); + if (!Number.isFinite(millis)) return null; + return DateTime.makeUnsafe(millis); + } catch { + return null; + } +} + +function isAfter(left: DateTime.Utc, right: DateTime.Utc): boolean { + return DateTime.toEpochMillis(left) > DateTime.toEpochMillis(right); +} + +function isBefore(left: DateTime.Utc, right: DateTime.Utc): boolean { + return DateTime.toEpochMillis(left) < DateTime.toEpochMillis(right); +} + +function readExitTag(exit: unknown): string | null { + if (!isRecordObject(exit) || !("_tag" in exit)) return null; + return toStringValue(exit._tag); +} + +function readExitCause(exit: unknown): string { + if (!isRecordObject(exit) || !("cause" in exit)) return "Failure"; + return toStringValue(exit.cause)?.trim() ?? "Failure"; +} + +function isTraceEvent(value: unknown): value is TraceEventLike { + return typeof value === "object" && value !== null; +} + +function readEventAttributes(event: TraceEventLike): Readonly> { + return typeof event.attributes === "object" && event.attributes !== null + ? (event.attributes as Readonly>) + : {}; +} + +function makeEmptyDiagnostics(input: { + readonly traceFilePath: string; + readonly scannedFilePaths: ReadonlyArray; + readonly readAt: DateTime.Utc; + readonly slowSpanThresholdMs: number; + readonly error?: TraceDiagnosticsErrorSummary; + readonly partialFailure?: boolean; +}): ServerTraceDiagnosticsResult { + return { + traceFilePath: input.traceFilePath, + scannedFilePaths: [...input.scannedFilePaths], + readAt: input.readAt, + recordCount: 0, + parseErrorCount: 0, + firstSpanAt: Option.none(), + lastSpanAt: Option.none(), + failureCount: 0, + interruptionCount: 0, + slowSpanThresholdMs: input.slowSpanThresholdMs, + slowSpanCount: 0, + logLevelCounts: {}, + topSpansByCount: [], + slowestSpans: [], + commonFailures: [], + latestFailures: [], + latestWarningAndErrorLogs: [], + partialFailure: input.partialFailure ? Option.some(true) : Option.none(), + error: Option.fromNullishOr(input.error), + }; +} + +function isNotFoundError(error: PlatformError.PlatformError): boolean { + return error.reason._tag === "NotFound"; +} + +function platformErrorMessage(error: PlatformError.PlatformError): string { + return error.message || String(error); +} + +function insertBoundedSlowestSpan( + slowestSpans: ServerTraceDiagnosticsSpanOccurrence[], + span: ServerTraceDiagnosticsSpanOccurrence, +): void { + if ( + slowestSpans.length >= TOP_LIMIT && + span.durationMs <= slowestSpans[slowestSpans.length - 1]!.durationMs + ) { + return; + } + + slowestSpans.push(span); + slowestSpans.sort((left, right) => right.durationMs - left.durationMs); + if (slowestSpans.length > TOP_LIMIT) { + slowestSpans.length = TOP_LIMIT; + } +} + +export function aggregateTraceDiagnostics( + input: TraceDiagnosticsInput, +): ServerTraceDiagnosticsResult { + const readAt = input.readAt; + const slowSpanThresholdMs = input.slowSpanThresholdMs ?? DEFAULT_SLOW_SPAN_THRESHOLD_MS; + const scannedFilePaths = input.scannedFilePaths ?? input.files.map((file) => file.path); + if (input.files.length === 0) { + return makeEmptyDiagnostics({ + traceFilePath: input.traceFilePath, + scannedFilePaths, + readAt, + slowSpanThresholdMs, + error: input.error ?? { + kind: "trace-file-not-found", + message: "No local trace files were found.", + }, + ...(input.partialFailure ? { partialFailure: true } : {}), + }); + } + + let parseErrorCount = 0; + let recordCount = 0; + let failureCount = 0; + let interruptionCount = 0; + let slowSpanCount = 0; + let firstSpanAt: DateTime.Utc | null = null; + let lastSpanAt: DateTime.Utc | null = null; + + const spansByName = new Map< + string, + { count: number; failureCount: number; totalDurationMs: number; maxDurationMs: number } + >(); + const failuresByKey = new Map(); + const latestFailures: ServerTraceDiagnosticsRecentFailure[] = []; + const slowestSpans: ServerTraceDiagnosticsSpanOccurrence[] = []; + const latestWarningAndErrorLogs: ServerTraceDiagnosticsLogEvent[] = []; + const logLevelCounts: Record = {}; + + for (const file of input.files) { + const lines = file.text.split(/\r?\n/); + for (const line of lines) { + if (line.trim().length === 0) continue; + + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + parseErrorCount += 1; + continue; + } + + if (!isRecordObject(parsed)) { + parseErrorCount += 1; + continue; + } + + const name = toStringValue(parsed.name); + const traceId = toStringValue(parsed.traceId); + const spanId = toStringValue(parsed.spanId); + const durationMs = toNumberValue(parsed.durationMs); + const endedAt = unixNanoToDateTime(parsed.endTimeUnixNano); + const startedAt = unixNanoToDateTime(parsed.startTimeUnixNano); + + if (!name || !traceId || !spanId || durationMs === null || !endedAt) { + parseErrorCount += 1; + continue; + } + + recordCount += 1; + firstSpanAt = + startedAt && (firstSpanAt === null || isBefore(startedAt, firstSpanAt)) + ? startedAt + : firstSpanAt; + lastSpanAt = lastSpanAt === null || isAfter(endedAt, lastSpanAt) ? endedAt : lastSpanAt; + + const exitTag = readExitTag(parsed.exit); + const isFailure = exitTag === "Failure"; + const isInterrupted = exitTag === "Interrupted"; + if (isFailure) failureCount += 1; + if (isInterrupted) interruptionCount += 1; + + const spanSummary = spansByName.get(name) ?? { + count: 0, + failureCount: 0, + totalDurationMs: 0, + maxDurationMs: 0, + }; + spanSummary.count += 1; + spanSummary.totalDurationMs += durationMs; + spanSummary.maxDurationMs = Math.max(spanSummary.maxDurationMs, durationMs); + if (isFailure) spanSummary.failureCount += 1; + spansByName.set(name, spanSummary); + + const spanItem = { name, durationMs, endedAt, traceId, spanId }; + if (durationMs >= slowSpanThresholdMs) { + slowSpanCount += 1; + } + insertBoundedSlowestSpan(slowestSpans, spanItem); + + if (isFailure) { + const cause = readExitCause(parsed.exit); + latestFailures.push({ ...spanItem, cause }); + + const failureKey = `${name}\0${cause}`; + const existing = failuresByKey.get(failureKey); + const isLatestFailure = !existing || isAfter(endedAt, existing.lastSeenAt); + failuresByKey.set(failureKey, { + name, + cause, + count: (existing?.count ?? 0) + 1, + lastSeenAt: isLatestFailure ? endedAt : existing!.lastSeenAt, + traceId: isLatestFailure ? traceId : existing!.traceId, + spanId: isLatestFailure ? spanId : existing!.spanId, + }); + } + + if (Array.isArray(parsed.events)) { + for (const rawEvent of parsed.events) { + if (!isTraceEvent(rawEvent)) continue; + const attributes = readEventAttributes(rawEvent); + const level = toStringValue(attributes["effect.logLevel"]); + if (!level) continue; + + logLevelCounts[level] = (logLevelCounts[level] ?? 0) + 1; + const normalizedLevel = level.toLowerCase(); + if ( + normalizedLevel !== "warning" && + normalizedLevel !== "warn" && + normalizedLevel !== "error" && + normalizedLevel !== "fatal" + ) { + continue; + } + + const seenAt = unixNanoToDateTime(rawEvent.timeUnixNano) ?? endedAt; + const message = toStringValue(rawEvent.name)?.trim() ?? "Log event"; + latestWarningAndErrorLogs.push({ + spanName: name, + level, + message, + seenAt, + traceId, + spanId, + }); + } + } + } + } + + const topSpansByCount: ServerTraceDiagnosticsSpanSummary[] = [...spansByName.entries()] + .map(([name, span]) => ({ + name, + count: span.count, + failureCount: span.failureCount, + totalDurationMs: span.totalDurationMs, + averageDurationMs: span.count > 0 ? span.totalDurationMs / span.count : 0, + maxDurationMs: span.maxDurationMs, + })) + .toSorted((left, right) => right.count - left.count || right.maxDurationMs - left.maxDurationMs) + .slice(0, TOP_LIMIT); + + return { + traceFilePath: input.traceFilePath, + scannedFilePaths, + readAt, + recordCount, + parseErrorCount, + firstSpanAt: Option.fromNullishOr(firstSpanAt), + lastSpanAt: Option.fromNullishOr(lastSpanAt), + failureCount, + interruptionCount, + slowSpanThresholdMs, + slowSpanCount, + logLevelCounts, + topSpansByCount, + slowestSpans, + commonFailures: [...failuresByKey.values()] + .toSorted( + (left, right) => + right.count - left.count || + DateTime.toEpochMillis(right.lastSeenAt) - DateTime.toEpochMillis(left.lastSeenAt), + ) + .slice(0, TOP_LIMIT), + latestFailures: latestFailures + .toSorted( + (left, right) => + DateTime.toEpochMillis(right.endedAt) - DateTime.toEpochMillis(left.endedAt), + ) + .slice(0, RECENT_LIMIT), + latestWarningAndErrorLogs: latestWarningAndErrorLogs + .toSorted( + (left, right) => DateTime.toEpochMillis(right.seenAt) - DateTime.toEpochMillis(left.seenAt), + ) + .slice(0, RECENT_LIMIT), + partialFailure: input.partialFailure ? Option.some(true) : Option.none(), + error: Option.fromNullishOr(input.error), + }; +} + +type TraceFileReadResult = + | { readonly _tag: "Loaded"; readonly path: string; readonly text: string } + | { readonly _tag: "Missing"; readonly path: string } + | { readonly _tag: "Failed"; readonly path: string; readonly message: string }; + +function readTraceFile( + fileSystem: FileSystem.FileSystem, + path: string, +): Effect.Effect { + return fileSystem.readFileString(path).pipe( + Effect.map((text) => ({ _tag: "Loaded" as const, path, text })), + Effect.catch((error: PlatformError.PlatformError) => + Effect.succeed( + isNotFoundError(error) + ? { _tag: "Missing" as const, path } + : { _tag: "Failed" as const, path, message: platformErrorMessage(error) }, + ), + ), + ); +} + +export const make = Effect.fn("makeTraceDiagnostics")(function* () { + const fileSystem = yield* FileSystem.FileSystem; + + const read: TraceDiagnosticsShape["read"] = Effect.fn("TraceDiagnostics.read")( + function* (options) { + const readAt = options.readAt ?? (yield* DateTime.now); + const slowSpanThresholdMs = options.slowSpanThresholdMs ?? DEFAULT_SLOW_SPAN_THRESHOLD_MS; + const paths = toRotatedTracePaths(options.traceFilePath, options.maxFiles); + const results = yield* Effect.all( + paths.map((path) => readTraceFile(fileSystem, path)), + { + concurrency: 1, + }, + ); + const files = results.flatMap((result) => + result._tag === "Loaded" ? [{ path: result.path, text: result.text }] : [], + ); + const readFailure = results.find((result) => result._tag === "Failed"); + const readFailureError = readFailure + ? ({ + kind: "trace-file-read-failed", + message: readFailure.message.trim() || `Failed to read ${readFailure.path}.`, + } satisfies TraceDiagnosticsErrorSummary) + : undefined; + + if (files.length === 0) { + return makeEmptyDiagnostics({ + traceFilePath: options.traceFilePath, + scannedFilePaths: paths, + readAt, + slowSpanThresholdMs, + error: + readFailureError ?? + ({ + kind: "trace-file-not-found", + message: "No local trace files were found.", + } satisfies TraceDiagnosticsErrorSummary), + }); + } + + return aggregateTraceDiagnostics({ + traceFilePath: options.traceFilePath, + files, + scannedFilePaths: paths, + readAt, + slowSpanThresholdMs, + ...(readFailureError ? { partialFailure: true, error: readFailureError } : {}), + }); + }, + ); + + return TraceDiagnostics.of({ read }); +}); + +export const layer = Layer.effect(TraceDiagnostics, make()); + +export function readTraceDiagnostics( + options: TraceDiagnosticsOptions, +): Effect.Effect { + return Effect.gen(function* () { + const diagnostics = yield* TraceDiagnostics; + return yield* diagnostics.read(options); + }); +} diff --git a/apps/server/src/observability/Metrics.test.ts b/apps/server/src/observability/Metrics.test.ts index b5eeedaaa43..57bfebaeceb 100644 --- a/apps/server/src/observability/Metrics.test.ts +++ b/apps/server/src/observability/Metrics.test.ts @@ -1,6 +1,7 @@ import { assert, describe, it } from "@effect/vitest"; import { ProviderDriverKind } from "@t3tools/contracts"; -import { Effect, Metric } from "effect"; +import { Duration, Effect, Fiber, Metric } from "effect"; +import { TestClock } from "effect/testing"; import { withMetrics } from "./Metrics.ts"; @@ -15,6 +16,18 @@ const hasMetricSnapshot = ( Object.entries(attributes).every(([key, value]) => snapshot.attributes?.[key] === value), ); +const findHistogramSnapshot = ( + snapshots: ReadonlyArray, + id: string, + attributes: Readonly>, +) => + snapshots.find( + (snapshot): snapshot is Extract => + snapshot.type === "Histogram" && + snapshot.id === id && + Object.entries(attributes).every(([key, value]) => snapshot.attributes?.[key] === value), + ); + describe("withMetrics", () => { it.effect("supports pipe-style usage", () => Effect.gen(function* () { @@ -110,4 +123,35 @@ describe("withMetrics", () => { ); }), ); + + it.effect("records timer durations from nanosecond clock readings", () => + Effect.gen(function* () { + const duration = Duration.nanos(1_500_000n); + const timer = Metric.timer("with_metrics_nanos_duration"); + + yield* Effect.gen(function* () { + const fiber = yield* Effect.sleep(duration).pipe( + withMetrics({ + timer, + attributes: { + operation: "nanos", + }, + }), + Effect.forkChild, + ); + + yield* Effect.yieldNow; + yield* TestClock.adjust(duration); + yield* Fiber.join(fiber); + }).pipe(Effect.provide(TestClock.layer())); + + const snapshots = yield* Metric.snapshot; + const snapshot = findHistogramSnapshot(snapshots, "with_metrics_nanos_duration", { + operation: "nanos", + }); + + assert.equal(snapshot?.state.count, 1); + assert.equal(snapshot?.state.sum, 1.5); + }), + ); }); diff --git a/apps/server/src/observability/Metrics.ts b/apps/server/src/observability/Metrics.ts index 3e527c7cb45..976bf7ccdb7 100644 --- a/apps/server/src/observability/Metrics.ts +++ b/apps/server/src/observability/Metrics.ts @@ -1,4 +1,4 @@ -import { Duration, Effect, Exit, Metric } from "effect"; +import { Clock, Duration, Effect, Exit, Metric } from "effect"; import { dual } from "effect/Function"; import { @@ -96,9 +96,11 @@ const withMetricsImpl = ( options: WithMetricsOptions, ): Effect.Effect => Effect.gen(function* () { - const startedAt = Date.now(); + const startedAt = yield* Clock.currentTimeNanos; const exit = yield* Effect.exit(effect); - const duration = Duration.millis(Math.max(0, Date.now() - startedAt)); + const endedAt = yield* Clock.currentTimeNanos; + const elapsedNanos = endedAt > startedAt ? endedAt - startedAt : 0n; + const duration = Duration.nanos(elapsedNanos); const baseAttributes = typeof options.attributes === "function" ? options.attributes() : (options.attributes ?? {}); diff --git a/apps/server/src/observability/RpcInstrumentation.test.ts b/apps/server/src/observability/RpcInstrumentation.test.ts index d29b05f3c2b..b0aa7c874f4 100644 --- a/apps/server/src/observability/RpcInstrumentation.test.ts +++ b/apps/server/src/observability/RpcInstrumentation.test.ts @@ -1,5 +1,7 @@ import { assert, describe, it } from "@effect/vitest"; -import { Effect, Exit, Metric, Stream } from "effect"; +import { WS_METHODS } from "@t3tools/contracts"; +import { Duration, Effect, Exit, Fiber, Metric, Stream, Tracer } from "effect"; +import { TestClock } from "effect/testing"; import { observeRpcEffect, @@ -18,6 +20,44 @@ const hasMetricSnapshot = ( Object.entries(attributes).every(([key, value]) => snapshot.attributes?.[key] === value), ); +const findHistogramSnapshot = ( + snapshots: ReadonlyArray, + id: string, + attributes: Readonly>, +) => + snapshots.find( + (snapshot): snapshot is Extract => + snapshot.type === "Histogram" && + snapshot.id === id && + Object.entries(attributes).every(([key, value]) => snapshot.attributes?.[key] === value), + ); + +const collectSpanNames = ( + effect: Effect.Effect, +): Effect.Effect, E, R> => + Effect.gen(function* () { + const spanNames: Array = []; + const tracer = Tracer.make({ + span: (options) => { + const span = new Tracer.NativeSpan(options); + const end = span.end.bind(span); + + span.end = (endTime, exit) => { + end(endTime, exit); + if (span.sampled) { + spanNames.push(span.name); + } + }; + + return span; + }, + }); + + yield* effect.pipe(Effect.withTracer(tracer)); + + return spanNames; + }); + describe("RpcInstrumentation", () => { it.effect("records success metrics for unary RPC handlers", () => Effect.gen(function* () { @@ -129,6 +169,37 @@ describe("RpcInstrumentation", () => { }), ); + it.effect("records direct stream durations from nanosecond clock readings", () => + Effect.gen(function* () { + const duration = Duration.nanos(1_500_000n); + const events = yield* Effect.gen(function* () { + const fiber = yield* Stream.runCollect( + observeRpcStream( + WS_METHODS.serverGetProcessDiagnostics, + Stream.fromEffect(Effect.sleep(duration).pipe(Effect.as("ok"))), + { + "rpc.aggregate": "test", + }, + ), + ).pipe(Effect.forkChild); + + yield* Effect.yieldNow; + yield* TestClock.adjust(duration); + return yield* Fiber.join(fiber); + }).pipe(Effect.provide(TestClock.layer())); + + assert.deepStrictEqual(Array.from(events), ["ok"]); + + const snapshots = yield* Metric.snapshot; + const snapshot = findHistogramSnapshot(snapshots, "t3_rpc_request_duration", { + method: WS_METHODS.serverGetProcessDiagnostics, + }); + + assert.equal(snapshot?.state.count, 1); + assert.equal(snapshot?.state.sum, 1.5); + }), + ); + it.effect("records failure outcomes when a stream RPC effect produces a failing stream", () => Effect.gen(function* () { const exit = yield* Stream.runCollect( @@ -158,4 +229,79 @@ describe("RpcInstrumentation", () => { ); }), ); + + it.effect("records spans for traced stream RPC handlers", () => + Effect.gen(function* () { + const spanNames = yield* collectSpanNames( + Stream.runCollect( + observeRpcStream( + "rpc.instrumentation.traced.stream", + Stream.fromEffect( + Effect.succeed("ok").pipe(Effect.withSpan("rpc.instrumentation.traced.stream.child")), + ), + { "rpc.aggregate": "test" }, + ), + ), + ); + + assert.equal(spanNames.includes("ws.rpc.rpc.instrumentation.traced.stream"), true); + assert.equal(spanNames.includes("rpc.instrumentation.traced.stream.child"), true); + }), + ); + + it.effect("does not create spans for disabled unary RPC handlers", () => + Effect.gen(function* () { + const spanNames = yield* collectSpanNames( + observeRpcEffect( + WS_METHODS.serverGetTraceDiagnostics, + Effect.succeed("ok").pipe(Effect.withSpan("rpc.instrumentation.disabled.unary.child")), + { "rpc.aggregate": "test" }, + ), + ); + + assert.deepStrictEqual(spanNames, []); + }), + ); + + it.effect("does not create spans for disabled direct stream RPC handlers", () => + Effect.gen(function* () { + const spanNames = yield* collectSpanNames( + Stream.runCollect( + observeRpcStream( + WS_METHODS.serverGetTraceDiagnostics, + Stream.fromEffect( + Effect.succeed("ok").pipe( + Effect.withSpan("rpc.instrumentation.disabled.stream.child"), + ), + ), + { "rpc.aggregate": "test" }, + ), + ), + ); + + assert.deepStrictEqual(spanNames, []); + }), + ); + + it.effect("does not create spans for disabled stream effect RPC handlers", () => + Effect.gen(function* () { + const spanNames = yield* collectSpanNames( + Stream.runCollect( + observeRpcStreamEffect( + WS_METHODS.serverGetTraceDiagnostics, + Effect.succeed( + Stream.fromEffect( + Effect.succeed("ok").pipe( + Effect.withSpan("rpc.instrumentation.disabled.stream.effect.consume"), + ), + ), + ).pipe(Effect.withSpan("rpc.instrumentation.disabled.stream.effect.create")), + { "rpc.aggregate": "test" }, + ), + ), + ); + + assert.deepStrictEqual(spanNames, []); + }), + ); }); diff --git a/apps/server/src/observability/RpcInstrumentation.ts b/apps/server/src/observability/RpcInstrumentation.ts index a3ac29aa02d..c03e1c2b8a8 100644 --- a/apps/server/src/observability/RpcInstrumentation.ts +++ b/apps/server/src/observability/RpcInstrumentation.ts @@ -1,26 +1,71 @@ -import { Duration, Effect, Exit, Metric, Stream } from "effect"; +import { WS_METHODS } from "@t3tools/contracts"; +import { Clock, Duration, Effect, Exit, Metric, References, Stream } from "effect"; import { outcomeFromExit } from "./Attributes.ts"; import { metricAttributes, rpcRequestDuration, rpcRequestsTotal, withMetrics } from "./Metrics.ts"; -const annotateRpcSpan = ( +const RPC_SPAN_PREFIX = "ws.rpc"; +const DEFAULT_RPC_SPAN_ATTRIBUTES = { + "rpc.transport": "websocket", + "rpc.system": "effect-rpc", +} as const; +const RPC_METHODS_WITH_TRACING_DISABLED: ReadonlySet = new Set([ + WS_METHODS.serverGetTraceDiagnostics, + WS_METHODS.serverGetProcessDiagnostics, + WS_METHODS.serverSignalProcess, +]); + +function shouldTraceRpc(method: string): boolean { + return !RPC_METHODS_WITH_TRACING_DISABLED.has(method); +} + +const rpcSpanAttributes = ( method: string, traceAttributes?: Readonly>, -): Effect.Effect => - Effect.annotateCurrentSpan({ - "rpc.method": method, - ...traceAttributes, - }); +): Record => ({ + ...DEFAULT_RPC_SPAN_ATTRIBUTES, + "rpc.method": method, + ...traceAttributes, +}); + +const withRpcEffectTracing = ( + method: string, + effect: Effect.Effect, + traceAttributes?: Readonly>, +): Effect.Effect => + shouldTraceRpc(method) + ? effect.pipe( + Effect.withSpan(`${RPC_SPAN_PREFIX}.${method}`, { + attributes: rpcSpanAttributes(method, traceAttributes), + }), + ) + : effect.pipe(Effect.provideService(References.TracerEnabled, false)); + +const withRpcStreamTracing = ( + method: string, + stream: Stream.Stream, + traceAttributes?: Readonly>, +): Stream.Stream => + shouldTraceRpc(method) + ? stream.pipe( + Stream.withSpan(`${RPC_SPAN_PREFIX}.${method}`, { + attributes: rpcSpanAttributes(method, traceAttributes), + }), + ) + : stream.pipe(Stream.provideService(References.TracerEnabled, false)); const recordRpcStreamMetrics = ( method: string, - startedAt: number, + startedAt: bigint, exit: Exit.Exit, ): Effect.Effect => Effect.gen(function* () { + const endedAt = yield* Clock.currentTimeNanos; + const elapsedNanos = endedAt > startedAt ? endedAt - startedAt : 0n; + yield* Metric.update( Metric.withAttributes(rpcRequestDuration, metricAttributes({ method })), - Duration.millis(Math.max(0, Date.now() - startedAt)), + Duration.nanos(elapsedNanos), ); yield* Metric.update( Metric.withAttributes( @@ -38,43 +83,43 @@ export const observeRpcEffect = ( method: string, effect: Effect.Effect, traceAttributes?: Readonly>, -): Effect.Effect => - Effect.gen(function* () { - yield* annotateRpcSpan(method, traceAttributes); +): Effect.Effect => { + const instrumented = effect.pipe( + withMetrics({ + counter: rpcRequestsTotal, + timer: rpcRequestDuration, + attributes: { + method, + }, + }), + ); - return yield* effect.pipe( - withMetrics({ - counter: rpcRequestsTotal, - timer: rpcRequestDuration, - attributes: { - method, - }, - }), - ); - }); + return withRpcEffectTracing(method, instrumented, traceAttributes); +}; export const observeRpcStream = ( method: string, stream: Stream.Stream, traceAttributes?: Readonly>, -): Stream.Stream => - Stream.unwrap( +): Stream.Stream => { + const instrumented = Stream.unwrap( Effect.gen(function* () { - yield* annotateRpcSpan(method, traceAttributes); - const startedAt = Date.now(); + const startedAt = yield* Clock.currentTimeNanos; return stream.pipe(Stream.onExit((exit) => recordRpcStreamMetrics(method, startedAt, exit))); }), ); + return withRpcStreamTracing(method, instrumented, traceAttributes); +}; + export const observeRpcStreamEffect = ( method: string, effect: Effect.Effect, EffectError, EffectContext>, traceAttributes?: Readonly>, -): Stream.Stream => - Stream.unwrap( +): Stream.Stream => { + const instrumented = Stream.unwrap( Effect.gen(function* () { - yield* annotateRpcSpan(method, traceAttributes); - const startedAt = Date.now(); + const startedAt = yield* Clock.currentTimeNanos; const exit = yield* Effect.exit(effect); if (Exit.isFailure(exit)) { @@ -87,3 +132,6 @@ export const observeRpcStreamEffect = + Effect.succeed({ + pid: input.pid, + signal: input.signal, + signaled: true, + message: Option.none(), + }), + }), + ), + Layer.provide( + Layer.mock(TraceDiagnostics.TraceDiagnostics)({ + read: () => + Effect.succeed({ + traceFilePath: "", + scannedFilePaths: [], + readAt: TEST_EPOCH, + recordCount: 0, + parseErrorCount: 0, + firstSpanAt: Option.none(), + lastSpanAt: Option.none(), + failureCount: 0, + interruptionCount: 0, + slowSpanThresholdMs: 1_000, + slowSpanCount: 0, + logLevelCounts: {}, + topSpansByCount: [], + slowestSpans: [], + commonFailures: [], + latestFailures: [], + latestWarningAndErrorLogs: [], + partialFailure: Option.none(), + error: Option.none(), + }), + }), + ), Layer.provide(gitManagerLayer), Layer.provide(gitVcsDriverLayer), Layer.provide(gitWorkflowLayer), diff --git a/apps/server/src/server.ts b/apps/server/src/server.ts index 939b2c8abf8..980aa82268b 100644 --- a/apps/server/src/server.ts +++ b/apps/server/src/server.ts @@ -74,6 +74,8 @@ import { } from "./auth/http.ts"; import { ServerSecretStoreLive } from "./auth/Layers/ServerSecretStore.ts"; import { ServerAuthLive } from "./auth/Layers/ServerAuth.ts"; +import * as ProcessDiagnostics from "./diagnostics/ProcessDiagnostics.ts"; +import * as TraceDiagnostics from "./diagnostics/TraceDiagnostics.ts"; import { OrchestrationLayerLive } from "./orchestration/runtimeLayer.ts"; import { clearPersistedServerRuntimeState, @@ -276,6 +278,8 @@ const RuntimeCoreDependenciesLive = ReactorLayerLive.pipe( const RuntimeDependenciesLive = RuntimeCoreDependenciesLive.pipe( // Misc. + Layer.provideMerge(ProcessDiagnostics.layer), + Layer.provideMerge(TraceDiagnostics.layer), Layer.provideMerge(AnalyticsServiceLayerLive), Layer.provideMerge(OpenLive), Layer.provideMerge(ServerLifecycleEventsLive), diff --git a/apps/server/src/vcs/GitVcsDriverCore.ts b/apps/server/src/vcs/GitVcsDriverCore.ts index e01a78a21f1..6569cec2d27 100644 --- a/apps/server/src/vcs/GitVcsDriverCore.ts +++ b/apps/server/src/vcs/GitVcsDriverCore.ts @@ -528,7 +528,7 @@ const createTrace2Monitor = Effect.fn("createTrace2Monitor")(function* ( }; }); -const collectOutput = Effect.fn("collectOutput")(function* ( +const collectOutput = Effect.fnUntraced(function* ( input: Pick, stream: Stream.Stream, maxOutputBytes: number, @@ -541,7 +541,7 @@ const collectOutput = Effect.fn("collectOutput")(function* ( let lineBuffer = ""; let truncated = false; - const emitCompleteLines = Effect.fn("emitCompleteLines")(function* (flush: boolean) { + const emitCompleteLines = Effect.fnUntraced(function* (flush: boolean) { let newlineIndex = lineBuffer.indexOf("\n"); while (newlineIndex >= 0) { const line = lineBuffer.slice(0, newlineIndex).replace(/\r$/, ""); @@ -561,7 +561,7 @@ const collectOutput = Effect.fn("collectOutput")(function* ( } }); - const processChunk = Effect.fn("processChunk")(function* (chunk: Uint8Array) { + const processChunk = Effect.fnUntraced(function* (chunk: Uint8Array) { if (truncateOutputAtMaxBytes && truncated) { return; } @@ -602,20 +602,14 @@ const collectOutput = Effect.fn("collectOutput")(function* ( }; }); -export const makeGitVcsDriverCore = Effect.fn("makeGitVcsDriverCore")(function* (options?: { - executeOverride?: GitVcsDriver.GitVcsDriverShape["execute"]; -}) { +export const makeGitVcsDriverCore = Effect.fn("makeGitVcsDriverCore")(function* () { const fileSystem = yield* FileSystem.FileSystem; const path = yield* Path.Path; + const commandSpawner = yield* ChildProcessSpawner.ChildProcessSpawner; const { worktreesDir } = yield* ServerConfig; - let executeRaw: GitVcsDriver.GitVcsDriverShape["execute"]; - - if (options?.executeOverride) { - executeRaw = options.executeOverride; - } else { - const commandSpawner = yield* ChildProcessSpawner.ChildProcessSpawner; - executeRaw = Effect.fnUntraced(function* (input) { + const executeRaw: GitVcsDriver.GitVcsDriverShape["execute"] = Effect.fnUntraced( + function* (input) { const commandInput = { ...input, args: [...input.args], @@ -712,8 +706,8 @@ export const makeGitVcsDriverCore = Effect.fn("makeGitVcsDriverCore")(function* }), ), ); - }); - } + }, + ); const execute: GitVcsDriver.GitVcsDriverShape["execute"] = (input) => executeRaw(input).pipe( diff --git a/apps/server/src/ws.ts b/apps/server/src/ws.ts index f32953abfbe..476140dd3ae 100644 --- a/apps/server/src/ws.ts +++ b/apps/server/src/ws.ts @@ -55,6 +55,8 @@ import { ProjectSetupScriptRunner } from "./project/Services/ProjectSetupScriptR import { RepositoryIdentityResolver } from "./project/Services/RepositoryIdentityResolver.ts"; import { ServerEnvironment } from "./environment/Services/ServerEnvironment.ts"; import { ServerAuth } from "./auth/Services/ServerAuth.ts"; +import * as ProcessDiagnostics from "./diagnostics/ProcessDiagnostics.ts"; +import * as TraceDiagnostics from "./diagnostics/TraceDiagnostics.ts"; import * as SourceControlDiscoveryLayer from "./sourceControl/SourceControlDiscovery.ts"; import { SourceControlRepositoryService } from "./sourceControl/SourceControlRepositoryService.ts"; import * as AzureDevOpsCli from "./sourceControl/AzureDevOpsCli.ts"; @@ -168,6 +170,7 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) => const sourceControlRepositories = yield* SourceControlRepositoryService; const bootstrapCredentials = yield* BootstrapCredentialService; const sessions = yield* SessionCredentialService; + const processDiagnostics = yield* ProcessDiagnostics.ProcessDiagnostics; const serverCommandId = (tag: string) => CommandId.make(`server:${tag}:${crypto.randomUUID()}`); @@ -837,6 +840,25 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) => "rpc.aggregate": "server", }, ), + [WS_METHODS.serverGetTraceDiagnostics]: (_input) => + observeRpcEffect( + WS_METHODS.serverGetTraceDiagnostics, + TraceDiagnostics.readTraceDiagnostics({ + traceFilePath: config.serverTracePath, + maxFiles: config.traceMaxFiles, + }), + { + "rpc.aggregate": "server", + }, + ), + [WS_METHODS.serverGetProcessDiagnostics]: (_input) => + observeRpcEffect(WS_METHODS.serverGetProcessDiagnostics, processDiagnostics.read, { + "rpc.aggregate": "server", + }), + [WS_METHODS.serverSignalProcess]: (input) => + observeRpcEffect(WS_METHODS.serverSignalProcess, processDiagnostics.signal(input), { + "rpc.aggregate": "server", + }), [WS_METHODS.sourceControlLookupRepository]: (input) => observeRpcEffect( WS_METHODS.sourceControlLookupRepository, @@ -1162,11 +1184,7 @@ export const websocketRpcRouteLayer = Layer.unwrap( const sessions = yield* SessionCredentialService; const session = yield* serverAuth.authenticateWebSocketUpgrade(request); const rpcWebSocketHttpEffect = yield* RpcServer.toHttpEffectWebsocket(WsRpcGroup, { - spanPrefix: "ws.rpc", - spanAttributes: { - "rpc.transport": "websocket", - "rpc.system": "effect-rpc", - }, + disableTracing: true, }).pipe( Effect.provide( makeWsRpcLayer(session.sessionId).pipe( diff --git a/apps/web/src/components/settings/DiagnosticsSettings.tsx b/apps/web/src/components/settings/DiagnosticsSettings.tsx new file mode 100644 index 00000000000..bd590a1242c --- /dev/null +++ b/apps/web/src/components/settings/DiagnosticsSettings.tsx @@ -0,0 +1,982 @@ +import { + AlertTriangleIcon, + ChevronDownIcon, + ChevronRightIcon, + CopyIcon, + FolderOpenIcon, + InfoIcon, + RefreshCwIcon, +} from "lucide-react"; +import { useCallback, useMemo, useState, type ReactNode } from "react"; +import type { ServerProcessDiagnosticsEntry, ServerProcessSignal } from "@t3tools/contracts"; +import { DateTime, Option } from "effect"; + +import { ensureLocalApi } from "../../localApi"; +import { cn } from "../../lib/utils"; +import { resolveAndPersistPreferredEditor } from "../../editorPreferences"; +import { formatRelativeTime } from "../../timestampFormat"; +import { useServerAvailableEditors, useServerObservability } from "../../rpc/serverState"; +import { useProcessDiagnostics } from "../../lib/processDiagnosticsState"; +import { useTraceDiagnostics } from "../../lib/traceDiagnosticsState"; +import { Button } from "../ui/button"; +import { ScrollArea } from "../ui/scroll-area"; +import { Tooltip, TooltipPopup, TooltipTrigger } from "../ui/tooltip"; +import { toastManager } from "../ui/toast"; +import { SettingsPageContainer, SettingsSection, useRelativeTimeTick } from "./settingsLayout"; + +const NUMBER_FORMAT = new Intl.NumberFormat(); + +function formatCount(value: number): string { + return NUMBER_FORMAT.format(value); +} + +function formatDuration(value: number): string { + if (value < 1_000) return `${Math.round(value)} ms`; + return `${(value / 1_000).toFixed(value >= 10_000 ? 1 : 2)} s`; +} + +function formatBytes(value: number): string { + if (value < 1024) return `${value} B`; + const units = ["KB", "MB", "GB"] as const; + let unitIndex = -1; + let next = value; + do { + next /= 1024; + unitIndex += 1; + } while (next >= 1024 && unitIndex < units.length - 1); + return `${next.toFixed(next >= 10 ? 1 : 2)} ${units[unitIndex]}`; +} + +function formatRelative(value: DateTime.Utc | null): string { + if (!value) return "No trace records"; + const relative = formatRelativeTime(DateTime.formatIso(value)); + return relative.suffix ? `${relative.value} ${relative.suffix}` : relative.value; +} + +function formatRelativeNoWrap(value: DateTime.Utc | null): string { + return formatRelative(value).replaceAll(" ", "\u00a0"); +} + +function shortenTraceId(traceId: string): string { + if (traceId.length <= 32) return traceId; + return `${traceId.slice(0, 18)}...${traceId.slice(-10)}`; +} + +function isStaleProcessSignalMessage(message: string | undefined): boolean { + return message?.includes("not a live descendant") ?? false; +} + +function StatBlock({ + label, + value, + tooltip, + tone = "default", +}: { + label: string; + value: string; + tooltip?: ReactNode; + tone?: "default" | "warning" | "danger"; +}) { + return ( +
+
+ {label} + {tooltip ? ( + + + + + } + /> + + {tooltip} + + + ) : null} +
+
+ {value} +
+
+ ); +} + +function StatsGrid({ children }: { children: ReactNode }) { + return ( +
+ + + + + {children} +
+ ); +} + +function EmptyRows({ label }: { label: string }) { + return
{label}
; +} + +function ExpandableText({ + text, + className, + collapsedClassName = "line-clamp-3", + expandLabel = "Show full error", +}: { + text: string; + className?: string; + collapsedClassName?: string; + expandLabel?: string; +}) { + const [expanded, setExpanded] = useState(false); + const canExpand = text.length > 180 || text.includes("\n"); + + return ( +
+
+ {text} +
+ {canExpand ? ( + + ) : null} +
+ ); +} + +function DiagnosticsTable({ + headers, + children, + minTableWidth = "min-w-[640px]", + columnWidths, +}: { + headers: ReadonlyArray; + children: ReactNode; + minTableWidth?: string; + columnWidths?: ReadonlyArray; +}) { + return ( + + + {columnWidths ? ( + + {headers.map((header, index) => ( + + ))} + + ) : null} + + + {headers.map((header, index) => ( + + ))} + + + {children} +
+ {header.replaceAll(" ", "\u00a0")} +
+
+ ); +} + +function TraceIdCell({ traceId }: { traceId: string }) { + const [copied, setCopied] = useState(false); + const copyTraceId = useCallback(() => { + void navigator.clipboard + ?.writeText(traceId) + .then(() => { + setCopied(true); + window.setTimeout(() => setCopied(false), 1_200); + }) + .catch(() => undefined); + }, [traceId]); + + return ( +
+ + + {shortenTraceId(traceId)} + + } + /> + + {traceId} + + + + + + + } + /> + {copied ? "Copied" : "Copy full trace ID"} + +
+ ); +} + +function formatProcessName(command: string): string { + const firstToken = command.trim().split(/\s+/)[0]; + if (!firstToken) return command; + const normalized = firstToken.replace(/^['"]|['"]$/g, ""); + const segments = normalized.split(/[\\/]/).filter(Boolean); + return segments.at(-1) ?? normalized; +} + +function formatProcessType(process: ServerProcessDiagnosticsEntry): string { + if (process.depth > 0) return "Subprocess"; + if (/\b(codex|claude|opencode|cursor)\b/i.test(process.command)) return "Agent"; + return "Process"; +} + +function ProcessNameCell({ + process, + isExpanded, + onToggle, +}: { + process: ServerProcessDiagnosticsEntry; + isExpanded: boolean; + onToggle: (pid: number) => void; +}) { + const name = formatProcessName(process.command); + const hasChildren = process.childPids.length > 0; + const ChevronIcon = isExpanded ? ChevronDownIcon : ChevronRightIcon; + + return ( +
+ {hasChildren ? ( + + ) : ( +
+ ); +} + +function ProcessSignalActions({ + process, + isSignaling, + onSignal, +}: { + process: ServerProcessDiagnosticsEntry; + isSignaling: boolean; + onSignal: (pid: number, signal: ServerProcessSignal) => void; +}) { + return ( +
+ + onSignal(process.pid, "SIGINT")} + > + INT + + } + /> + Send SIGINT + + + onSignal(process.pid, "SIGKILL")} + > + KILL + + } + /> + Send SIGKILL + +
+ ); +} + +function ProcessDiagnosticsTable({ + processes, + signalingPid, + onSignal, + emptyLabel, +}: { + processes: ReadonlyArray; + signalingPid: number | null; + onSignal: (pid: number, signal: ServerProcessSignal) => void; + emptyLabel?: string; +}) { + const [collapsedPids, setCollapsedPids] = useState>(() => new Set()); + const visibleProcesses = useMemo(() => { + const visible: ServerProcessDiagnosticsEntry[] = []; + let hiddenChildDepth: number | null = null; + + for (const process of processes) { + if (hiddenChildDepth !== null) { + if (process.depth > hiddenChildDepth) continue; + hiddenChildDepth = null; + } + + visible.push(process); + if (collapsedPids.has(process.pid)) { + hiddenChildDepth = process.depth; + } + } + + return visible; + }, [collapsedPids, processes]); + + const toggleProcess = useCallback((pid: number) => { + setCollapsedPids((previous) => { + const next = new Set(previous); + if (next.has(pid)) { + next.delete(pid); + } else { + next.add(pid); + } + return next; + }); + }, []); + + return ( + + + + + + + + + + + + + + + + + + + + + + + + {visibleProcesses.length === 0 ? ( + + + + ) : null} + {visibleProcesses.map((process) => ( + + + + + + + + + + ))} + +
NameCPUMemoryCommandPIDTypeKill
+ {emptyLabel ?? "No live descendant processes found."} +
+ + + {process.cpuPercent.toFixed(1)}% + + {formatBytes(process.rssBytes)} + + + {process.command}} + /> + + {process.command} + + + + {process.pid} + + {formatProcessType(process)} + + +
+
+ ); +} + +function DiagnosticsLastChecked({ checkedAt }: { checkedAt: DateTime.Utc | null }) { + useRelativeTimeTick(); + const relative = checkedAt ? formatRelativeTime(DateTime.formatIso(checkedAt)) : null; + + if (!relative) { + return Checking; + } + + return ( + + {relative.suffix ? ( + <> + Checked {relative.value} {relative.suffix} + + ) : ( + <>Checked {relative.value} + )} + + ); +} + +function DiagnosticsRefreshButton({ + isPending, + label, + onClick, +}: { + isPending: boolean; + label: string; + onClick: () => void; +}) { + return ( + + + + + } + /> + {label} + + ); +} + +export function DiagnosticsSettingsPanel() { + const observability = useServerObservability(); + const availableEditors = useServerAvailableEditors(); + const { data, error, isPending, refresh } = useTraceDiagnostics(); + const { + data: processData, + error: processError, + isPending: isProcessPending, + refresh: refreshProcesses, + } = useProcessDiagnostics(); + const [isOpeningLogsDirectory, setIsOpeningLogsDirectory] = useState(false); + const [openLogsDirectoryError, setOpenLogsDirectoryError] = useState(null); + const [signalingPid, setSignalingPid] = useState(null); + + const openLogsDirectory = useCallback(() => { + const logsDirectoryPath = observability?.logsDirectoryPath ?? null; + if (!logsDirectoryPath) return; + + const editor = resolveAndPersistPreferredEditor(availableEditors ?? []); + if (!editor) { + setOpenLogsDirectoryError("No available editors found."); + return; + } + + setIsOpeningLogsDirectory(true); + setOpenLogsDirectoryError(null); + void ensureLocalApi() + .shell.openInEditor(logsDirectoryPath, editor) + .catch((error: unknown) => { + setOpenLogsDirectoryError( + error instanceof Error ? error.message : "Unable to open logs folder.", + ); + }) + .finally(() => { + setIsOpeningLogsDirectory(false); + }); + }, [availableEditors, observability?.logsDirectoryPath]); + + const isInitialLoading = isPending && data === null; + const isProcessInitialLoading = isProcessPending && processData === null; + const signalProcess = useCallback( + (pid: number, signal: ServerProcessSignal) => { + if ( + signal === "SIGKILL" && + !window.confirm(`Send SIGKILL to process ${pid}? This cannot be handled by the process.`) + ) { + return; + } + + setSignalingPid(pid); + void ensureLocalApi() + .server.signalProcess({ pid, signal }) + .then((result) => { + if (!result.signaled) { + const message = Option.getOrUndefined(result.message); + refreshProcesses(); + if (isStaleProcessSignalMessage(message)) { + toastManager.add({ + type: "info", + title: "Process already exited", + description: + "The process is not a child of the T3 Server. It might already have exited.", + }); + return; + } + + toastManager.add({ + type: "error", + title: `Could not send ${signal}`, + description: message ?? `Failed to send ${signal}.`, + }); + return; + } + refreshProcesses(); + }) + .catch((error: unknown) => { + toastManager.add({ + type: "error", + title: `Could not send ${signal}`, + description: error instanceof Error ? error.message : `Failed to send ${signal}.`, + }); + }) + .finally(() => { + setSignalingPid(null); + }); + }, + [refreshProcesses], + ); + + const processDiagnosticsError = processData ? Option.getOrNull(processData.error) : null; + const traceDiagnosticsError = data ? Option.getOrNull(data.error) : null; + const traceDiagnosticsPartialFailure = data + ? Option.getOrElse(data.partialFailure, () => false) + : false; + + return ( + + + + + + } + > + + + + + + + {processDiagnosticsError || processError ? ( +
+ {processDiagnosticsError ? ( +
+ + {processDiagnosticsError.message} +
+ ) : null} + {processError ? ( +
+ + {processError} +
+ ) : null} +
+ ) : null} + +
+ + + + + + + + } + /> + Open logs folder + + + + } + > + + + 0 ? "danger" : "default"} + /> + 0 ? "warning" : "default"} + /> + 0 ? "warning" : "default"} + /> + + {openLogsDirectoryError || traceDiagnosticsError || error ? ( +
+ {openLogsDirectoryError ? ( +
+ + {openLogsDirectoryError} +
+ ) : null} + {traceDiagnosticsError ? ( +
+ + + {traceDiagnosticsPartialFailure + ? `Some trace files could not be read, so diagnostics may be incomplete. ${traceDiagnosticsError.message}` + : traceDiagnosticsError.message} + +
+ ) : null} + {error ? ( +
+ + {error} +
+ ) : null} +
+ ) : null} +
+ + + {data && data.latestFailures.length > 0 ? ( + + {data.latestFailures.map((failure) => ( + + + {failure.name} + + + + + + {formatDuration(failure.durationMs)} + + + {formatRelativeNoWrap(failure.endedAt)} + + + ))} + + ) : ( + + )} + + + + {data && data.commonFailures.length > 0 ? ( + + {data.commonFailures.map((failure) => ( + + + {failure.name} + + + {formatCount(failure.count)} + + + + + + {formatRelativeNoWrap(failure.lastSeenAt)} + + + ))} + + ) : ( + + )} + + + + {data && data.slowestSpans.length > 0 ? ( + + {data.slowestSpans.map((span) => ( + + + {span.name} + + + {formatDuration(span.durationMs)} + + + {formatRelativeNoWrap(span.endedAt)} + + + + + + ))} + + ) : ( + + )} + + + + {data && data.latestWarningAndErrorLogs.length > 0 ? ( + + + + + + + + + + + + + + + + + + + + {data.latestWarningAndErrorLogs.map((event) => ( + + + + + + + + ))} + +
TimeLevelSpanMessageTrace
+ {formatRelativeNoWrap(event.seenAt)} + + + {event.level} + + +
{event.spanName}
+
+ + + +
+
+ ) : ( + + )} +
+ + + {data && data.topSpansByCount.length > 0 ? ( + + {data.topSpansByCount.map((span) => ( + + + {span.name} + + + {formatCount(span.count)} + + + {formatCount(span.failureCount)} + + + {formatDuration(span.averageDurationMs)} + + + {formatDuration(span.maxDurationMs)} + + + ))} + + ) : ( + + )} + +
+ ); +} diff --git a/apps/web/src/components/settings/SettingsPanels.browser.tsx b/apps/web/src/components/settings/SettingsPanels.browser.tsx index 9249ff69778..9e3d52e89c9 100644 --- a/apps/web/src/components/settings/SettingsPanels.browser.tsx +++ b/apps/web/src/components/settings/SettingsPanels.browser.tsx @@ -20,15 +20,40 @@ import { DateTime, Option } from "effect"; import { page } from "vitest/browser"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { render } from "vitest-browser-react"; +import type { ReactNode } from "react"; +import { + RouterProvider, + createMemoryHistory, + createRootRoute, + createRoute, + createRouter, +} from "@tanstack/react-router"; import { __resetLocalApiForTests } from "../../localApi"; import { AppAtomRegistryProvider, resetAppAtomRegistryForTests } from "../../rpc/atomRegistry"; import { resetServerStateForTests, setServerConfigSnapshot } from "../../rpc/serverState"; import { useUiStateStore } from "../../uiStateStore"; import { ConnectionsSettings } from "./ConnectionsSettings"; +import { DiagnosticsSettingsPanel } from "./DiagnosticsSettings"; import { GeneralSettingsPanel, ProviderSettingsPanel } from "./SettingsPanels"; import { SourceControlSettingsPanel } from "./SourceControlSettings"; +function renderWithTestRouter(children: ReactNode) { + const rootRoute = createRootRoute({ + component: () => children, + }); + const indexRoute = createRoute({ + getParentRoute: () => rootRoute, + path: "/", + }); + const router = createRouter({ + routeTree: rootRoute.addChildren([indexRoute]), + history: createMemoryHistory({ initialEntries: ["/"] }), + }); + + return render(); +} + const authAccessHarness = vi.hoisted(() => { type Snapshot = AuthAccessSnapshot; let snapshot: Snapshot = { @@ -694,25 +719,24 @@ describe("GeneralSettingsPanel observability", () => { await expect.element(page.getByText("http://127.0.0.1:3773/").first()).toBeInTheDocument(); }); - it("shows diagnostics inside About with a single logs-folder action", async () => { + it("shows diagnostics inside About with a diagnostics link", async () => { setServerConfigSnapshot(createBaseServerConfig()); - mounted = await render( + mounted = await renderWithTestRouter( , ); await expect.element(page.getByText("About")).toBeInTheDocument(); - await expect.element(page.getByText("Diagnostics")).toBeInTheDocument(); - await expect.element(page.getByText("Open logs folder")).toBeInTheDocument(); await expect - .element(page.getByText("/repo/project/.t3/logs", { exact: true })) + .element(page.getByRole("heading", { name: "Diagnostics", exact: true })) .toBeInTheDocument(); + await expect.element(page.getByRole("link", { name: "View diagnostics" })).toBeInTheDocument(); await expect .element( page.getByText( - "Local trace file. OTLP exporting traces to http://localhost:4318/v1/traces.", + "Local trace file. Exporting OTEL traces to http://localhost:4318/v1/traces.", ), ) .toBeInTheDocument(); @@ -1020,20 +1044,56 @@ describe("GeneralSettingsPanel observability", () => { it("opens the logs folder in the preferred editor", async () => { const openInEditor = vi.fn().mockResolvedValue(undefined); window.nativeApi = { + persistence: { + getClientSettings: vi.fn().mockResolvedValue(null), + setClientSettings: vi.fn().mockResolvedValue(undefined), + }, shell: { openInEditor, }, + server: { + getProcessDiagnostics: vi.fn().mockResolvedValue({ + serverPid: 1234, + readAt: makeUtc("2036-04-07T00:00:00.000Z"), + processCount: 0, + totalRssBytes: 0, + totalCpuPercent: 0, + processes: [], + error: Option.none(), + }), + getTraceDiagnostics: vi.fn().mockResolvedValue({ + traceFilePath: "/repo/project/.t3/traces.jsonl", + scannedFilePaths: ["/repo/project/.t3/traces.jsonl"], + readAt: makeUtc("2036-04-07T00:00:00.000Z"), + recordCount: 0, + parseErrorCount: 0, + firstSpanAt: Option.none(), + lastSpanAt: Option.none(), + failureCount: 0, + interruptionCount: 0, + slowSpanThresholdMs: 5_000, + slowSpanCount: 0, + logLevelCounts: {}, + topSpansByCount: [], + slowestSpans: [], + commonFailures: [], + latestFailures: [], + latestWarningAndErrorLogs: [], + partialFailure: Option.none(), + error: Option.none(), + }), + }, } as unknown as LocalApi; setServerConfigSnapshot(createBaseServerConfig()); mounted = await render( - + , ); - const openLogsButton = page.getByText("Open logs folder"); + const openLogsButton = page.getByLabelText("Open logs folder"); await openLogsButton.click(); expect(openInEditor).toHaveBeenCalledWith("/repo/project/.t3/logs", "cursor"); diff --git a/apps/web/src/components/settings/SettingsPanels.logic.test.ts b/apps/web/src/components/settings/SettingsPanels.logic.test.ts index 77206d2484f..7a7c68a6b9c 100644 --- a/apps/web/src/components/settings/SettingsPanels.logic.test.ts +++ b/apps/web/src/components/settings/SettingsPanels.logic.test.ts @@ -5,7 +5,48 @@ import { type ProviderInstanceConfig, } from "@t3tools/contracts"; import { describe, expect, it } from "vitest"; -import { buildProviderInstanceUpdatePatch } from "./SettingsPanels.logic"; +import { + buildProviderInstanceUpdatePatch, + formatDiagnosticsDescription, +} from "./SettingsPanels.logic"; + +describe("formatDiagnosticsDescription", () => { + it("collapses trace and metric URLs that share the same OTEL base path", () => { + expect( + formatDiagnosticsDescription({ + localTracingEnabled: true, + otlpTracesEnabled: true, + otlpTracesUrl: "http://localhost:4318/v1/traces", + otlpMetricsEnabled: true, + otlpMetricsUrl: "http://localhost:4318/v1/metrics", + }), + ).toBe("Local trace file. Exporting OTEL to http://localhost:4318/v1/{traces,metrics}."); + }); + + it("keeps separate trace and metric URLs when their base paths differ", () => { + expect( + formatDiagnosticsDescription({ + localTracingEnabled: true, + otlpTracesEnabled: true, + otlpTracesUrl: "http://localhost:4318/v1/traces", + otlpMetricsEnabled: true, + otlpMetricsUrl: "http://localhost:9000/v1/metrics", + }), + ).toBe( + "Local trace file. Exporting OTEL traces to http://localhost:4318/v1/traces and metrics to http://localhost:9000/v1/metrics.", + ); + }); + + it("omits OTEL text when no exporter is enabled", () => { + expect( + formatDiagnosticsDescription({ + localTracingEnabled: true, + otlpTracesEnabled: false, + otlpMetricsEnabled: false, + }), + ).toBe("Local trace file."); + }); +}); describe("buildProviderInstanceUpdatePatch", () => { it("promotes an edited default provider into providerInstances and resets the legacy provider", () => { diff --git a/apps/web/src/components/settings/SettingsPanels.logic.ts b/apps/web/src/components/settings/SettingsPanels.logic.ts index 57a63b7f2ae..99d7052965a 100644 --- a/apps/web/src/components/settings/SettingsPanels.logic.ts +++ b/apps/web/src/components/settings/SettingsPanels.logic.ts @@ -7,6 +7,54 @@ import type { } from "@t3tools/contracts"; import { DEFAULT_UNIFIED_SETTINGS } from "@t3tools/contracts/settings"; +function collapseOtelSignalsUrl(input: { + readonly tracesUrl: string; + readonly metricsUrl: string; +}): string | null { + const tracesSuffix = "/traces"; + const metricsSuffix = "/metrics"; + if (!input.tracesUrl.endsWith(tracesSuffix) || !input.metricsUrl.endsWith(metricsSuffix)) { + return null; + } + + const tracesBase = input.tracesUrl.slice(0, -tracesSuffix.length); + const metricsBase = input.metricsUrl.slice(0, -metricsSuffix.length); + if (tracesBase !== metricsBase) { + return null; + } + + return `${tracesBase}/{traces,metrics}`; +} + +export function formatDiagnosticsDescription(input: { + readonly localTracingEnabled: boolean; + readonly otlpTracesEnabled: boolean; + readonly otlpTracesUrl?: string | undefined; + readonly otlpMetricsEnabled: boolean; + readonly otlpMetricsUrl?: string | undefined; +}): string { + const mode = input.localTracingEnabled ? "Local trace file" : "Terminal logs only"; + const tracesUrl = input.otlpTracesEnabled ? input.otlpTracesUrl : undefined; + const metricsUrl = input.otlpMetricsEnabled ? input.otlpMetricsUrl : undefined; + + if (tracesUrl && metricsUrl) { + const collapsedUrl = collapseOtelSignalsUrl({ tracesUrl, metricsUrl }); + return collapsedUrl + ? `${mode}. Exporting OTEL to ${collapsedUrl}.` + : `${mode}. Exporting OTEL traces to ${tracesUrl} and metrics to ${metricsUrl}.`; + } + + if (tracesUrl) { + return `${mode}. Exporting OTEL traces to ${tracesUrl}.`; + } + + if (metricsUrl) { + return `${mode}. Exporting OTEL metrics to ${metricsUrl}.`; + } + + return `${mode}.`; +} + export function buildProviderInstanceUpdatePatch(input: { readonly settings: Pick; readonly instanceId: ProviderInstanceId; diff --git a/apps/web/src/components/settings/SettingsPanels.tsx b/apps/web/src/components/settings/SettingsPanels.tsx index d8eca942c60..ee75fba5d06 100644 --- a/apps/web/src/components/settings/SettingsPanels.tsx +++ b/apps/web/src/components/settings/SettingsPanels.tsx @@ -1,5 +1,6 @@ import { ArchiveIcon, ArchiveX, LoaderIcon, PlusIcon, RefreshCwIcon } from "lucide-react"; import { useQueryClient } from "@tanstack/react-query"; +import { Link } from "@tanstack/react-router"; import { useCallback, useMemo, useRef, useState } from "react"; import { defaultInstanceIdForDriver, @@ -24,7 +25,6 @@ import { } from "../../components/desktopUpdate.logic"; import { ProviderModelPicker } from "../chat/ProviderModelPicker"; import { TraitsPicker } from "../chat/TraitsPicker"; -import { resolveAndPersistPreferredEditor } from "../../editorPreferences"; import { isElectron } from "../../env"; import { useTheme } from "../../hooks/useTheme"; import { useSettings, useUpdateSettings } from "../../hooks/useSettings"; @@ -66,7 +66,10 @@ import { } from "../ProviderUpdateLaunchNotification.logic"; import { ProviderInstanceCard } from "./ProviderInstanceCard"; import { DRIVER_OPTIONS, getDriverOption } from "./providerDriverMeta"; -import { buildProviderInstanceUpdatePatch } from "./SettingsPanels.logic"; +import { + buildProviderInstanceUpdatePatch, + formatDiagnosticsDescription, +} from "./SettingsPanels.logic"; import { SettingResetButton, SettingsPageContainer, @@ -75,11 +78,7 @@ import { useRelativeTimeTick, } from "./settingsLayout"; import { ProjectFavicon } from "../ProjectFavicon"; -import { - useServerAvailableEditors, - useServerObservability, - useServerProviders, -} from "../../rpc/serverState"; +import { useServerObservability, useServerProviders } from "../../rpc/serverState"; const THEME_OPTIONS = [ { @@ -442,27 +441,15 @@ export function GeneralSettingsPanel() { const { theme, setTheme } = useTheme(); const settings = useSettings(); const { updateSettings } = useUpdateSettings(); - const [openingPathByTarget, setOpeningPathByTarget] = useState({ - logsDirectory: false, - }); - const [openPathErrorByTarget, setOpenPathErrorByTarget] = useState< - Partial> - >({}); - const availableEditors = useServerAvailableEditors(); const observability = useServerObservability(); const serverProviders = useServerProviders(); - const logsDirectoryPath = observability?.logsDirectoryPath ?? null; - const diagnosticsDescription = (() => { - const exports: string[] = []; - if (observability?.otlpTracesEnabled && observability.otlpTracesUrl) { - exports.push(`traces to ${observability.otlpTracesUrl}`); - } - if (observability?.otlpMetricsEnabled && observability.otlpMetricsUrl) { - exports.push(`metrics to ${observability.otlpMetricsUrl}`); - } - const mode = observability?.localTracingEnabled ? "Local trace file" : "Terminal logs only"; - return exports.length > 0 ? `${mode}. OTLP exporting ${exports.join(" and ")}.` : `${mode}.`; - })(); + const diagnosticsDescription = formatDiagnosticsDescription({ + localTracingEnabled: observability?.localTracingEnabled ?? false, + otlpTracesEnabled: observability?.otlpTracesEnabled ?? false, + otlpTracesUrl: observability?.otlpTracesUrl, + otlpMetricsEnabled: observability?.otlpMetricsEnabled ?? false, + otlpMetricsUrl: observability?.otlpMetricsUrl, + }); const textGenerationModelSelection = resolveAppModelSelectionState(settings, serverProviders); const textGenInstanceId = textGenerationModelSelection.instanceId; @@ -487,44 +474,6 @@ export function GeneralSettingsPanel() { DEFAULT_UNIFIED_SETTINGS.textGenerationModelSelection ?? null, ); - const openInPreferredEditor = useCallback( - (target: "logsDirectory", path: string | null, failureMessage: string) => { - if (!path) return; - setOpenPathErrorByTarget((existing) => ({ ...existing, [target]: null })); - setOpeningPathByTarget((existing) => ({ ...existing, [target]: true })); - - const editor = resolveAndPersistPreferredEditor(availableEditors ?? []); - if (!editor) { - setOpenPathErrorByTarget((existing) => ({ - ...existing, - [target]: "No available editors found.", - })); - setOpeningPathByTarget((existing) => ({ ...existing, [target]: false })); - return; - } - - void ensureLocalApi() - .shell.openInEditor(path, editor) - .catch((error) => { - setOpenPathErrorByTarget((existing) => ({ - ...existing, - [target]: error instanceof Error ? error.message : failureMessage, - })); - }) - .finally(() => { - setOpeningPathByTarget((existing) => ({ ...existing, [target]: false })); - }); - }, - [availableEditors], - ); - - const openLogsDirectory = useCallback(() => { - openInPreferredEditor("logsDirectory", logsDirectoryPath, "Unable to open logs folder."); - }, [logsDirectoryPath, openInPreferredEditor]); - - const openDiagnosticsError = openPathErrorByTarget.logsDirectory ?? null; - const isOpeningLogsDirectory = openingPathByTarget.logsDirectory; - return ( @@ -914,24 +863,9 @@ export function GeneralSettingsPanel() { - - {logsDirectoryPath ?? "Resolving logs directory..."} - - {openDiagnosticsError ? ( - {openDiagnosticsError} - ) : null} - - } control={ - } /> diff --git a/apps/web/src/lib/processDiagnosticsState.ts b/apps/web/src/lib/processDiagnosticsState.ts new file mode 100644 index 00000000000..17d13dcf336 --- /dev/null +++ b/apps/web/src/lib/processDiagnosticsState.ts @@ -0,0 +1,63 @@ +import { useAtomValue } from "@effect/atom-react"; +import type { ServerProcessDiagnosticsResult } from "@t3tools/contracts"; +import { Cause, Effect, Option } from "effect"; +import { AsyncResult, Atom } from "effect/unstable/reactivity"; +import { useCallback } from "react"; + +import { ensureLocalApi } from "../localApi"; +import { appAtomRegistry } from "../rpc/atomRegistry"; + +const PROCESS_DIAGNOSTICS_STALE_TIME_MS = 2_000; +const PROCESS_DIAGNOSTICS_IDLE_TTL_MS = 5 * 60_000; + +const processDiagnosticsAtom = Atom.make( + Effect.promise(() => ensureLocalApi().server.getProcessDiagnostics()), +).pipe( + Atom.swr({ + staleTime: PROCESS_DIAGNOSTICS_STALE_TIME_MS, + revalidateOnMount: true, + }), + Atom.setIdleTTL(PROCESS_DIAGNOSTICS_IDLE_TTL_MS), + Atom.withLabel("process-diagnostics"), +); + +export interface ProcessDiagnosticsState { + readonly data: ServerProcessDiagnosticsResult | null; + readonly error: string | null; + readonly isPending: boolean; + readonly refresh: () => void; +} + +function formatProcessDiagnosticsError(error: unknown): string { + return error instanceof Error ? error.message : "Failed to load process diagnostics."; +} + +function readProcessDiagnosticsError( + result: AsyncResult.AsyncResult, +): string | null { + if (result._tag !== "Failure") { + return null; + } + + const squashed = Cause.squash(result.cause); + return formatProcessDiagnosticsError(squashed); +} + +export function refreshProcessDiagnostics(): void { + appAtomRegistry.refresh(processDiagnosticsAtom); +} + +export function useProcessDiagnostics(): ProcessDiagnosticsState { + const result = useAtomValue(processDiagnosticsAtom); + const data = Option.getOrNull(AsyncResult.value(result)); + const refresh = useCallback(() => { + refreshProcessDiagnostics(); + }, []); + + return { + data, + error: readProcessDiagnosticsError(result), + isPending: result.waiting, + refresh, + }; +} diff --git a/apps/web/src/lib/traceDiagnosticsState.ts b/apps/web/src/lib/traceDiagnosticsState.ts new file mode 100644 index 00000000000..bfe1b3a83fa --- /dev/null +++ b/apps/web/src/lib/traceDiagnosticsState.ts @@ -0,0 +1,63 @@ +import { useAtomValue } from "@effect/atom-react"; +import type { ServerTraceDiagnosticsResult } from "@t3tools/contracts"; +import { Cause, Effect, Option } from "effect"; +import { AsyncResult, Atom } from "effect/unstable/reactivity"; +import { useCallback } from "react"; + +import { ensureLocalApi } from "../localApi"; +import { appAtomRegistry } from "../rpc/atomRegistry"; + +const TRACE_DIAGNOSTICS_STALE_TIME_MS = 5_000; +const TRACE_DIAGNOSTICS_IDLE_TTL_MS = 5 * 60_000; + +const traceDiagnosticsAtom = Atom.make( + Effect.promise(() => ensureLocalApi().server.getTraceDiagnostics()), +).pipe( + Atom.swr({ + staleTime: TRACE_DIAGNOSTICS_STALE_TIME_MS, + revalidateOnMount: true, + }), + Atom.setIdleTTL(TRACE_DIAGNOSTICS_IDLE_TTL_MS), + Atom.withLabel("trace-diagnostics"), +); + +export interface TraceDiagnosticsState { + readonly data: ServerTraceDiagnosticsResult | null; + readonly error: string | null; + readonly isPending: boolean; + readonly refresh: () => void; +} + +function formatTraceDiagnosticsError(error: unknown): string { + return error instanceof Error ? error.message : "Failed to load trace diagnostics."; +} + +function readTraceDiagnosticsError( + result: AsyncResult.AsyncResult, +): string | null { + if (result._tag !== "Failure") { + return null; + } + + const squashed = Cause.squash(result.cause); + return formatTraceDiagnosticsError(squashed); +} + +export function refreshTraceDiagnostics(): void { + appAtomRegistry.refresh(traceDiagnosticsAtom); +} + +export function useTraceDiagnostics(): TraceDiagnosticsState { + const result = useAtomValue(traceDiagnosticsAtom); + const data = Option.getOrNull(AsyncResult.value(result)); + const refresh = useCallback(() => { + refreshTraceDiagnostics(); + }, []); + + return { + data, + error: readTraceDiagnosticsError(result), + isPending: result.waiting, + refresh, + }; +} diff --git a/apps/web/src/localApi.ts b/apps/web/src/localApi.ts index a4ba190516b..cbb3427b004 100644 --- a/apps/web/src/localApi.ts +++ b/apps/web/src/localApi.ts @@ -147,6 +147,18 @@ function createBrowserLocalApi(rpcClient?: WsRpcClient): LocalApi { rpcClient ? rpcClient.server.discoverSourceControl() : Promise.reject(unavailableLocalBackendError()), + getTraceDiagnostics: () => + rpcClient + ? rpcClient.server.getTraceDiagnostics() + : Promise.reject(unavailableLocalBackendError()), + getProcessDiagnostics: () => + rpcClient + ? rpcClient.server.getProcessDiagnostics() + : Promise.reject(unavailableLocalBackendError()), + signalProcess: (input) => + rpcClient + ? rpcClient.server.signalProcess(input) + : Promise.reject(unavailableLocalBackendError()), }, }; } diff --git a/apps/web/src/routeTree.gen.ts b/apps/web/src/routeTree.gen.ts index 85a2f9ef8fa..3a9140e278c 100644 --- a/apps/web/src/routeTree.gen.ts +++ b/apps/web/src/routeTree.gen.ts @@ -17,6 +17,7 @@ import { Route as SettingsSourceControlRouteImport } from './routes/settings.sou import { Route as SettingsProvidersRouteImport } from './routes/settings.providers' import { Route as SettingsKeybindingsRouteImport } from './routes/settings.keybindings' import { Route as SettingsGeneralRouteImport } from './routes/settings.general' +import { Route as SettingsDiagnosticsRouteImport } from './routes/settings.diagnostics' import { Route as SettingsConnectionsRouteImport } from './routes/settings.connections' import { Route as SettingsArchivedRouteImport } from './routes/settings.archived' import { Route as ChatDraftDraftIdRouteImport } from './routes/_chat.draft.$draftId' @@ -61,6 +62,11 @@ const SettingsGeneralRoute = SettingsGeneralRouteImport.update({ path: '/general', getParentRoute: () => SettingsRoute, } as any) +const SettingsDiagnosticsRoute = SettingsDiagnosticsRouteImport.update({ + id: '/diagnostics', + path: '/diagnostics', + getParentRoute: () => SettingsRoute, +} as any) const SettingsConnectionsRoute = SettingsConnectionsRouteImport.update({ id: '/connections', path: '/connections', @@ -89,6 +95,7 @@ export interface FileRoutesByFullPath { '/settings': typeof SettingsRouteWithChildren '/settings/archived': typeof SettingsArchivedRoute '/settings/connections': typeof SettingsConnectionsRoute + '/settings/diagnostics': typeof SettingsDiagnosticsRoute '/settings/general': typeof SettingsGeneralRoute '/settings/keybindings': typeof SettingsKeybindingsRoute '/settings/providers': typeof SettingsProvidersRoute @@ -101,6 +108,7 @@ export interface FileRoutesByTo { '/settings': typeof SettingsRouteWithChildren '/settings/archived': typeof SettingsArchivedRoute '/settings/connections': typeof SettingsConnectionsRoute + '/settings/diagnostics': typeof SettingsDiagnosticsRoute '/settings/general': typeof SettingsGeneralRoute '/settings/keybindings': typeof SettingsKeybindingsRoute '/settings/providers': typeof SettingsProvidersRoute @@ -116,6 +124,7 @@ export interface FileRoutesById { '/settings': typeof SettingsRouteWithChildren '/settings/archived': typeof SettingsArchivedRoute '/settings/connections': typeof SettingsConnectionsRoute + '/settings/diagnostics': typeof SettingsDiagnosticsRoute '/settings/general': typeof SettingsGeneralRoute '/settings/keybindings': typeof SettingsKeybindingsRoute '/settings/providers': typeof SettingsProvidersRoute @@ -132,6 +141,7 @@ export interface FileRouteTypes { | '/settings' | '/settings/archived' | '/settings/connections' + | '/settings/diagnostics' | '/settings/general' | '/settings/keybindings' | '/settings/providers' @@ -144,6 +154,7 @@ export interface FileRouteTypes { | '/settings' | '/settings/archived' | '/settings/connections' + | '/settings/diagnostics' | '/settings/general' | '/settings/keybindings' | '/settings/providers' @@ -158,6 +169,7 @@ export interface FileRouteTypes { | '/settings' | '/settings/archived' | '/settings/connections' + | '/settings/diagnostics' | '/settings/general' | '/settings/keybindings' | '/settings/providers' @@ -231,6 +243,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof SettingsGeneralRouteImport parentRoute: typeof SettingsRoute } + '/settings/diagnostics': { + id: '/settings/diagnostics' + path: '/diagnostics' + fullPath: '/settings/diagnostics' + preLoaderRoute: typeof SettingsDiagnosticsRouteImport + parentRoute: typeof SettingsRoute + } '/settings/connections': { id: '/settings/connections' path: '/connections' @@ -279,6 +298,7 @@ const ChatRouteWithChildren = ChatRoute._addFileChildren(ChatRouteChildren) interface SettingsRouteChildren { SettingsArchivedRoute: typeof SettingsArchivedRoute SettingsConnectionsRoute: typeof SettingsConnectionsRoute + SettingsDiagnosticsRoute: typeof SettingsDiagnosticsRoute SettingsGeneralRoute: typeof SettingsGeneralRoute SettingsKeybindingsRoute: typeof SettingsKeybindingsRoute SettingsProvidersRoute: typeof SettingsProvidersRoute @@ -288,6 +308,7 @@ interface SettingsRouteChildren { const SettingsRouteChildren: SettingsRouteChildren = { SettingsArchivedRoute: SettingsArchivedRoute, SettingsConnectionsRoute: SettingsConnectionsRoute, + SettingsDiagnosticsRoute: SettingsDiagnosticsRoute, SettingsGeneralRoute: SettingsGeneralRoute, SettingsKeybindingsRoute: SettingsKeybindingsRoute, SettingsProvidersRoute: SettingsProvidersRoute, diff --git a/apps/web/src/routes/settings.diagnostics.tsx b/apps/web/src/routes/settings.diagnostics.tsx new file mode 100644 index 00000000000..e86e5c5cb14 --- /dev/null +++ b/apps/web/src/routes/settings.diagnostics.tsx @@ -0,0 +1,7 @@ +import { createFileRoute } from "@tanstack/react-router"; + +import { DiagnosticsSettingsPanel } from "../components/settings/DiagnosticsSettings"; + +export const Route = createFileRoute("/settings/diagnostics")({ + component: DiagnosticsSettingsPanel, +}); diff --git a/apps/web/src/rpc/wsRpcClient.ts b/apps/web/src/rpc/wsRpcClient.ts index a13f2173cd5..ca56b6143c0 100644 --- a/apps/web/src/rpc/wsRpcClient.ts +++ b/apps/web/src/rpc/wsRpcClient.ts @@ -129,6 +129,11 @@ export interface WsRpcClient { readonly discoverSourceControl: RpcUnaryNoArgMethod< typeof WS_METHODS.serverDiscoverSourceControl >; + readonly getTraceDiagnostics: RpcUnaryNoArgMethod; + readonly getProcessDiagnostics: RpcUnaryNoArgMethod< + typeof WS_METHODS.serverGetProcessDiagnostics + >; + readonly signalProcess: RpcUnaryMethod; readonly subscribeConfig: RpcStreamMethod; readonly subscribeLifecycle: RpcStreamMethod; readonly subscribeAuthAccess: RpcStreamMethod; @@ -247,6 +252,18 @@ export function createWsRpcClient(transport: WsTransport): WsRpcClient { transport.request((client) => client[WS_METHODS.serverUpdateSettings]({ patch })), discoverSourceControl: () => transport.request((client) => client[WS_METHODS.serverDiscoverSourceControl]({})), + getTraceDiagnostics: () => + transport.request((client) => + client[WS_METHODS.serverGetTraceDiagnostics]({}).pipe(Effect.withTracerEnabled(false)), + ), + getProcessDiagnostics: () => + transport.request((client) => + client[WS_METHODS.serverGetProcessDiagnostics]({}).pipe(Effect.withTracerEnabled(false)), + ), + signalProcess: (input) => + transport.request((client) => + client[WS_METHODS.serverSignalProcess](input).pipe(Effect.withTracerEnabled(false)), + ), subscribeConfig: (listener, options) => transport.subscribe((client) => client[WS_METHODS.subscribeServerConfig]({}), listener, { ...options, diff --git a/packages/contracts/src/ipc.ts b/packages/contracts/src/ipc.ts index f2c7a28e719..eca3bb4e66b 100644 --- a/packages/contracts/src/ipc.ts +++ b/packages/contracts/src/ipc.ts @@ -28,9 +28,13 @@ import type { import type { ProviderInstanceId } from "./providerInstance.ts"; import type { ServerConfig, + ServerProcessDiagnosticsResult, ServerProviderUpdateInput, ServerProviderUpdatedPayload, ServerRemoveKeybindingResult, + ServerSignalProcessInput, + ServerSignalProcessResult, + ServerTraceDiagnosticsResult, ServerUpsertKeybindingResult, } from "./server.ts"; import type { @@ -304,6 +308,9 @@ export interface LocalApi { getSettings: () => Promise; updateSettings: (patch: ServerSettingsPatch) => Promise; discoverSourceControl: () => Promise; + getTraceDiagnostics: () => Promise; + getProcessDiagnostics: () => Promise; + signalProcess: (input: ServerSignalProcessInput) => Promise; }; } diff --git a/packages/contracts/src/rpc.ts b/packages/contracts/src/rpc.ts index 33cddc98a4b..1c3ddc93739 100644 --- a/packages/contracts/src/rpc.ts +++ b/packages/contracts/src/rpc.ts @@ -77,6 +77,10 @@ import { ServerRemoveKeybindingInput, ServerRemoveKeybindingResult, ServerProviderUpdatedPayload, + ServerTraceDiagnosticsResult, + ServerProcessDiagnosticsResult, + ServerSignalProcessInput, + ServerSignalProcessResult, ServerUpsertKeybindingInput, ServerUpsertKeybindingResult, } from "./server.ts"; @@ -139,6 +143,9 @@ export const WS_METHODS = { serverGetSettings: "server.getSettings", serverUpdateSettings: "server.updateSettings", serverDiscoverSourceControl: "server.discoverSourceControl", + serverGetTraceDiagnostics: "server.getTraceDiagnostics", + serverGetProcessDiagnostics: "server.getProcessDiagnostics", + serverSignalProcess: "server.signalProcess", // Source control methods sourceControlLookupRepository: "sourceControl.lookupRepository", @@ -207,6 +214,21 @@ export const WsServerDiscoverSourceControlRpc = Rpc.make(WS_METHODS.serverDiscov success: SourceControlDiscoveryResult, }); +export const WsServerGetTraceDiagnosticsRpc = Rpc.make(WS_METHODS.serverGetTraceDiagnostics, { + payload: Schema.Struct({}), + success: ServerTraceDiagnosticsResult, +}); + +export const WsServerGetProcessDiagnosticsRpc = Rpc.make(WS_METHODS.serverGetProcessDiagnostics, { + payload: Schema.Struct({}), + success: ServerProcessDiagnosticsResult, +}); + +export const WsServerSignalProcessRpc = Rpc.make(WS_METHODS.serverSignalProcess, { + payload: ServerSignalProcessInput, + success: ServerSignalProcessResult, +}); + export const WsSourceControlLookupRepositoryRpc = Rpc.make( WS_METHODS.sourceControlLookupRepository, { @@ -439,6 +461,9 @@ export const WsRpcGroup = RpcGroup.make( WsServerGetSettingsRpc, WsServerUpdateSettingsRpc, WsServerDiscoverSourceControlRpc, + WsServerGetTraceDiagnosticsRpc, + WsServerGetProcessDiagnosticsRpc, + WsServerSignalProcessRpc, WsSourceControlLookupRepositoryRpc, WsSourceControlCloneRepositoryRpc, WsSourceControlPublishRepositoryRpc, diff --git a/packages/contracts/src/server.ts b/packages/contracts/src/server.ts index ec6840d8072..15afea93ad9 100644 --- a/packages/contracts/src/server.ts +++ b/packages/contracts/src/server.ts @@ -4,6 +4,7 @@ import { ServerAuthDescriptor } from "./auth.ts"; import { IsoDateTime, NonNegativeInt, + PositiveInt, ProjectId, ThreadId, TrimmedNonEmptyString, @@ -211,6 +212,135 @@ export const ServerObservability = Schema.Struct({ }); export type ServerObservability = typeof ServerObservability.Type; +export const ServerTraceDiagnosticsErrorKind = Schema.Literals([ + "trace-file-not-found", + "trace-file-read-failed", +]); +export type ServerTraceDiagnosticsErrorKind = typeof ServerTraceDiagnosticsErrorKind.Type; + +export const ServerTraceDiagnosticsSpanSummary = Schema.Struct({ + name: TrimmedNonEmptyString, + count: NonNegativeInt, + failureCount: NonNegativeInt, + totalDurationMs: Schema.Number, + averageDurationMs: Schema.Number, + maxDurationMs: Schema.Number, +}); +export type ServerTraceDiagnosticsSpanSummary = typeof ServerTraceDiagnosticsSpanSummary.Type; + +export const ServerTraceDiagnosticsFailureSummary = Schema.Struct({ + name: TrimmedNonEmptyString, + cause: TrimmedNonEmptyString, + count: NonNegativeInt, + lastSeenAt: Schema.DateTimeUtc, + traceId: TrimmedNonEmptyString, + spanId: TrimmedNonEmptyString, +}); +export type ServerTraceDiagnosticsFailureSummary = typeof ServerTraceDiagnosticsFailureSummary.Type; + +export const ServerTraceDiagnosticsRecentFailure = Schema.Struct({ + name: TrimmedNonEmptyString, + cause: TrimmedNonEmptyString, + durationMs: Schema.Number, + endedAt: Schema.DateTimeUtc, + traceId: TrimmedNonEmptyString, + spanId: TrimmedNonEmptyString, +}); +export type ServerTraceDiagnosticsRecentFailure = typeof ServerTraceDiagnosticsRecentFailure.Type; + +export const ServerTraceDiagnosticsSpanOccurrence = Schema.Struct({ + name: TrimmedNonEmptyString, + durationMs: Schema.Number, + endedAt: Schema.DateTimeUtc, + traceId: TrimmedNonEmptyString, + spanId: TrimmedNonEmptyString, +}); +export type ServerTraceDiagnosticsSpanOccurrence = typeof ServerTraceDiagnosticsSpanOccurrence.Type; + +export const ServerTraceDiagnosticsLogEvent = Schema.Struct({ + spanName: TrimmedNonEmptyString, + level: TrimmedNonEmptyString, + message: TrimmedNonEmptyString, + seenAt: Schema.DateTimeUtc, + traceId: TrimmedNonEmptyString, + spanId: TrimmedNonEmptyString, +}); +export type ServerTraceDiagnosticsLogEvent = typeof ServerTraceDiagnosticsLogEvent.Type; + +export const ServerTraceDiagnosticsResult = Schema.Struct({ + traceFilePath: TrimmedNonEmptyString, + scannedFilePaths: Schema.Array(TrimmedNonEmptyString), + readAt: Schema.DateTimeUtc, + recordCount: NonNegativeInt, + parseErrorCount: NonNegativeInt, + firstSpanAt: Schema.Option(Schema.DateTimeUtc), + lastSpanAt: Schema.Option(Schema.DateTimeUtc), + failureCount: NonNegativeInt, + interruptionCount: NonNegativeInt, + slowSpanThresholdMs: NonNegativeInt, + slowSpanCount: NonNegativeInt, + logLevelCounts: Schema.Record(TrimmedNonEmptyString, NonNegativeInt), + topSpansByCount: Schema.Array(ServerTraceDiagnosticsSpanSummary), + slowestSpans: Schema.Array(ServerTraceDiagnosticsSpanOccurrence), + commonFailures: Schema.Array(ServerTraceDiagnosticsFailureSummary), + latestFailures: Schema.Array(ServerTraceDiagnosticsRecentFailure), + latestWarningAndErrorLogs: Schema.Array(ServerTraceDiagnosticsLogEvent), + partialFailure: Schema.Option(Schema.Boolean), + error: Schema.Option( + Schema.Struct({ + kind: ServerTraceDiagnosticsErrorKind, + message: TrimmedNonEmptyString, + }), + ), +}); +export type ServerTraceDiagnosticsResult = typeof ServerTraceDiagnosticsResult.Type; + +export const ServerProcessSignal = Schema.Literals(["SIGINT", "SIGKILL"]); +export type ServerProcessSignal = typeof ServerProcessSignal.Type; + +export const ServerProcessDiagnosticsEntry = Schema.Struct({ + pid: PositiveInt, + ppid: NonNegativeInt, + pgid: Schema.Option(Schema.Int), + status: TrimmedNonEmptyString, + cpuPercent: Schema.Number, + rssBytes: NonNegativeInt, + elapsed: TrimmedNonEmptyString, + command: TrimmedNonEmptyString, + depth: NonNegativeInt, + childPids: Schema.Array(PositiveInt), +}); +export type ServerProcessDiagnosticsEntry = typeof ServerProcessDiagnosticsEntry.Type; + +export const ServerProcessDiagnosticsResult = Schema.Struct({ + serverPid: PositiveInt, + readAt: Schema.DateTimeUtc, + processCount: NonNegativeInt, + totalRssBytes: NonNegativeInt, + totalCpuPercent: Schema.Number, + processes: Schema.Array(ServerProcessDiagnosticsEntry), + error: Schema.Option( + Schema.Struct({ + message: TrimmedNonEmptyString, + }), + ), +}); +export type ServerProcessDiagnosticsResult = typeof ServerProcessDiagnosticsResult.Type; + +export const ServerSignalProcessInput = Schema.Struct({ + pid: PositiveInt, + signal: ServerProcessSignal, +}); +export type ServerSignalProcessInput = typeof ServerSignalProcessInput.Type; + +export const ServerSignalProcessResult = Schema.Struct({ + pid: PositiveInt, + signal: ServerProcessSignal, + signaled: Schema.Boolean, + message: Schema.Option(TrimmedNonEmptyString), +}); +export type ServerSignalProcessResult = typeof ServerSignalProcessResult.Type; + export const ServerConfig = Schema.Struct({ environment: ExecutionEnvironmentDescriptor, auth: ServerAuthDescriptor,