diff --git a/apps/server/src/process/Layers/TerminalProcessInspector.test.ts b/apps/server/src/process/Layers/TerminalProcessInspector.test.ts new file mode 100644 index 00000000000..06a75f9e76c --- /dev/null +++ b/apps/server/src/process/Layers/TerminalProcessInspector.test.ts @@ -0,0 +1,162 @@ +import { spawn, type ChildProcess, type ChildProcessByStdio } from "node:child_process"; +import type { Readable } from "node:stream"; + +import * as NodeChildProcessSpawner from "@effect/platform-node/NodeChildProcessSpawner"; +import * as NodeServices from "@effect/platform-node/NodeServices"; +import { assert, it } from "@effect/vitest"; +import { Effect, Layer } from "effect"; + +import { TerminalProcessInspector } from "../Services/TerminalProcessInspector.ts"; +import { TerminalProcessInspectorLive } from "./TerminalProcessInspector.ts"; + +type ListenerProcess = ChildProcessByStdio; + +interface StartedProcess { + readonly process: ListenerProcess; + readonly port: number; +} + +const stopProcess = (child: ChildProcess) => + Effect.callback((resume) => { + if (child.exitCode !== null || child.signalCode !== null) { + resume(Effect.void); + return; + } + + child.kill("SIGTERM"); + + const timeout = setTimeout(() => { + if (child.exitCode === null && child.signalCode === null) { + child.kill("SIGKILL"); + } + }, 1_000); + + child.once("exit", () => { + clearTimeout(timeout); + resume(Effect.void); + }); + }); + +const waitForPort = (child: ListenerProcess) => + Effect.callback((resume) => { + const timeout = setTimeout(() => { + cleanup(); + resume(Effect.fail(new Error("Timed out waiting for listener port"))); + }, 3_000); + + let stdout = ""; + let stderr = ""; + + const cleanup = () => { + clearTimeout(timeout); + child.stdout.off("data", onStdout); + child.stderr.off("data", onStderr); + child.off("exit", onExit); + }; + + const onStdout = (chunk: Buffer) => { + stdout += chunk.toString("utf8"); + const match = stdout.match(/PORT:(\d+)/); + if (!match?.[1]) return; + const port = Number(match[1]); + if (!Number.isInteger(port) || port <= 0) return; + cleanup(); + resume(Effect.succeed(port)); + }; + + const onStderr = (chunk: Buffer) => { + stderr += chunk.toString("utf8"); + }; + + const onExit = (code: number | null) => { + cleanup(); + resume( + Effect.fail( + new Error( + `Listener process exited before reporting port (code=${String(code)}): ${stderr.trim()}`, + ), + ), + ); + }; + + child.stdout.on("data", onStdout); + child.stderr.on("data", onStderr); + child.on("exit", onExit); + + return Effect.sync(cleanup); + }); + +const startListenerProcess = Effect.gen(function* () { + const script = [ + "const { createServer } = require('node:http');", + "const server = createServer((_req, res) => {", + " res.statusCode = 200;", + " res.end('ok');", + "});", + "server.listen(0, '127.0.0.1', () => {", + " const address = server.address();", + " if (typeof address !== 'object' || !address) process.exit(1);", + " console.log(`PORT:${address.port}`);", + "});", + "const shutdown = () => server.close(() => process.exit(0));", + "process.on('SIGTERM', shutdown);", + "process.on('SIGINT', shutdown);", + "setInterval(() => {}, 10_000);", + ].join(""); + + const process = spawn("node", ["-e", script], { + stdio: ["ignore", "pipe", "pipe"], + }); + const port = yield* waitForPort(process); + return { process, port } satisfies StartedProcess; +}); + +const nodeChildProcessLayer = NodeChildProcessSpawner.layer.pipe(Layer.provide(NodeServices.layer)); + +const testLayer = TerminalProcessInspectorLive.pipe(Layer.provide(nodeChildProcessLayer)); + +it.layer(testLayer)("TerminalProcessInspectorLive", (it) => { + it.effect("detects listening ports when the terminal root pid is the listener", () => + Effect.acquireUseRelease( + startListenerProcess, + ({ process, port }) => + Effect.gen(function* () { + const listenerPid = process.pid; + if (!listenerPid) { + return yield* Effect.fail(new Error("Listener process pid missing")); + } + + const inspector = yield* TerminalProcessInspector; + const activity = yield* inspector.inspect(listenerPid); + + assert.equal(activity.hasRunningSubprocess, true); + assert.deepStrictEqual(activity.runningPorts.includes(port), true); + }), + ({ process }) => stopProcess(process), + ), + ); + + it.effect("returns idle activity when root process has no children and no listening ports", () => + Effect.acquireUseRelease( + Effect.sync(() => + spawn("node", ["-e", "setInterval(() => {}, 10_000)"], { + stdio: ["ignore", "ignore", "ignore"], + }), + ), + (process) => + Effect.gen(function* () { + const idlePid = process.pid; + if (!idlePid) { + return yield* Effect.fail(new Error("Idle process pid missing")); + } + + const inspector = yield* TerminalProcessInspector; + const activity = yield* inspector.inspect(idlePid); + + assert.equal(activity.hasRunningSubprocess, false); + assert.deepStrictEqual(activity.runningPorts, []); + }), + stopProcess, + ), + ); +}); diff --git a/apps/server/src/process/Layers/TerminalProcessInspector.ts b/apps/server/src/process/Layers/TerminalProcessInspector.ts new file mode 100644 index 00000000000..7d46ae7a7b9 --- /dev/null +++ b/apps/server/src/process/Layers/TerminalProcessInspector.ts @@ -0,0 +1,232 @@ +import { Buffer } from "node:buffer"; + +import { Effect, Layer, Option, PlatformError, Stream } from "effect"; +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; + +import { collectPosixProcessFamilyPids, checkPosixListeningPorts } from "../posix.ts"; +import type { + TerminalProcessInspectorShape, + TerminalSubprocessActivity, +} from "../Services/TerminalProcessInspector.ts"; +import { + TerminalProcessInspector, + TerminalProcessInspectionError, +} from "../Services/TerminalProcessInspector.ts"; +import { checkWindowsListeningPorts, collectWindowsChildPids } from "../win32.ts"; + +const DEFAULT_COMMAND_KILL_GRACE_MS = 1_000; + +interface InspectorCommandResult { + readonly stdout: string; + readonly stderr: string; + readonly exitCode: number; +} + +interface CollectOutputResult { + readonly text: string; + readonly truncated: boolean; +} + +interface RunInspectorCommandInput { + readonly operation: string; + readonly terminalPid: number; + readonly command: string; + readonly args: ReadonlyArray; + readonly timeoutMs: number; + readonly maxOutputBytes: number; +} + +function commandLabel(command: string, args: ReadonlyArray): string { + return [command, ...args].join(" "); +} + +const collectOutput = Effect.fn("process.collectOutput")(function* ( + stream: Stream.Stream, + maxOutputBytes: number, +): Effect.fn.Return { + return yield* stream.pipe( + Stream.decodeText(), + Stream.runFold( + () => ({ + text: "", + bytes: 0, + truncated: false, + }), + (state, chunk) => { + if (state.bytes >= maxOutputBytes) { + return { + ...state, + truncated: true, + }; + } + + const chunkBytes = Buffer.byteLength(chunk); + const remainingBytes = maxOutputBytes - state.bytes; + if (chunkBytes <= remainingBytes) { + return { + text: `${state.text}${chunk}`, + bytes: state.bytes + chunkBytes, + truncated: state.truncated, + }; + } + + const truncatedChunk = Buffer.from(chunk).subarray(0, remainingBytes).toString("utf8"); + return { + text: `${state.text}${truncatedChunk}`, + bytes: state.bytes + remainingBytes, + truncated: true, + }; + }, + ), + Effect.map(({ text, truncated }) => ({ text, truncated })), + ); +}); + +const makeTerminalProcessInspector = Effect.gen(function* () { + const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; + + const runInspectorCommand = Effect.fn("process.runInspectorCommand")(function* ( + input: RunInspectorCommandInput, + ) { + const command = ChildProcess.make(input.command, [...input.args], { + killSignal: "SIGTERM", + forceKillAfter: DEFAULT_COMMAND_KILL_GRACE_MS, + }); + + return yield* Effect.gen(function* () { + const child = yield* spawner.spawn(command).pipe( + Effect.mapError( + (cause) => + new TerminalProcessInspectionError({ + operation: input.operation, + terminalPid: input.terminalPid, + command: commandLabel(input.command, input.args), + detail: "Failed to spawn inspector command.", + cause, + }), + ), + ); + + const [stdout, stderr, exitCode] = yield* Effect.all( + [ + collectOutput(child.stdout, input.maxOutputBytes).pipe( + Effect.mapError( + (cause) => + new TerminalProcessInspectionError({ + operation: input.operation, + terminalPid: input.terminalPid, + command: commandLabel(input.command, input.args), + detail: "Failed to read stdout from inspector command.", + cause, + }), + ), + ), + collectOutput(child.stderr, input.maxOutputBytes).pipe( + Effect.mapError( + (cause) => + new TerminalProcessInspectionError({ + operation: input.operation, + terminalPid: input.terminalPid, + command: commandLabel(input.command, input.args), + detail: "Failed to read stderr from inspector command.", + cause, + }), + ), + ), + child.exitCode.pipe( + Effect.map(Number), + Effect.mapError( + (cause) => + new TerminalProcessInspectionError({ + operation: input.operation, + terminalPid: input.terminalPid, + command: commandLabel(input.command, input.args), + detail: "Failed to read inspector command exit code.", + cause, + }), + ), + ), + ], + { concurrency: "unbounded" }, + ); + + return { + stdout: stdout.text, + stderr: stderr.text, + exitCode, + } satisfies InspectorCommandResult; + }).pipe( + Effect.scoped, + Effect.timeoutOption(input.timeoutMs), + Effect.flatMap( + Option.match({ + onNone: () => + Effect.fail( + new TerminalProcessInspectionError({ + operation: input.operation, + terminalPid: input.terminalPid, + command: commandLabel(input.command, input.args), + detail: "Inspector command timed out.", + }), + ), + onSome: Effect.succeed, + }), + ), + ); + }); + + const inspect: TerminalProcessInspectorShape["inspect"] = Effect.fn("process.inspect")(function* ( + terminalPid: number, + ) { + if (!Number.isInteger(terminalPid) || terminalPid <= 0) { + return { + hasRunningSubprocess: false, + runningPorts: [], + } satisfies TerminalSubprocessActivity; + } + + if (process.platform === "win32") { + const childPids = yield* collectWindowsChildPids(terminalPid, runInspectorCommand); + const processPidsForPortScan = [terminalPid, ...childPids]; + const runningPorts = yield* checkWindowsListeningPorts(processPidsForPortScan, { + terminalPid, + runCommand: runInspectorCommand, + }); + return { + hasRunningSubprocess: childPids.length > 0 || runningPorts.length > 0, + runningPorts, + } satisfies TerminalSubprocessActivity; + } + + const processFamilyPids = yield* collectPosixProcessFamilyPids( + terminalPid, + runInspectorCommand, + ); + if (processFamilyPids.length === 0) { + return { + hasRunningSubprocess: false, + runningPorts: [], + } satisfies TerminalSubprocessActivity; + } + + const subprocessPids = processFamilyPids.filter((pid: number) => pid !== terminalPid); + const runningPorts = yield* checkPosixListeningPorts(processFamilyPids, { + terminalPid, + runCommand: runInspectorCommand, + platform: process.platform, + }); + return { + hasRunningSubprocess: subprocessPids.length > 0 || runningPorts.length > 0, + runningPorts, + } satisfies TerminalSubprocessActivity; + }); + + return { + inspect, + } satisfies TerminalProcessInspectorShape; +}); + +export const TerminalProcessInspectorLive = Layer.effect( + TerminalProcessInspector, + makeTerminalProcessInspector, +); diff --git a/apps/server/src/process/Layers/WebPortInspector.test.ts b/apps/server/src/process/Layers/WebPortInspector.test.ts new file mode 100644 index 00000000000..142075d71de --- /dev/null +++ b/apps/server/src/process/Layers/WebPortInspector.test.ts @@ -0,0 +1,153 @@ +import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http"; + +import { FetchHttpClient, HttpClient } from "effect/unstable/http"; +import { assert, it } from "@effect/vitest"; +import { Effect, Layer, Schema } from "effect"; + +import { WebPortInspectionError, WebPortInspector } from "../Services/WebPortInspector.ts"; +import { WebPortInspectorLive } from "./WebPortInspector.ts"; + +const closeServer = (server: Server) => + Effect.callback((resume) => { + server.closeAllConnections?.(); + server.closeIdleConnections?.(); + server.close(() => { + resume(Effect.void); + }); + + return Effect.sync(() => { + try { + server.closeAllConnections?.(); + server.closeIdleConnections?.(); + server.close(); + } catch { + // Ignore cleanup failures in tests. + } + }); + }); + +const listenServer = (server: Server) => + Effect.callback((resume) => { + const onError = (error: Error) => { + cleanup(); + resume(Effect.fail(error)); + }; + + const cleanup = () => { + server.off("error", onError); + }; + + server.once("error", onError); + server.listen(0, "127.0.0.1", () => { + cleanup(); + const address = server.address(); + if (!address || typeof address !== "object") { + resume(Effect.fail(new Error("Server did not provide a valid listening address."))); + return; + } + + resume(Effect.succeed(address.port)); + }); + + return Effect.gen(function* () { + cleanup(); + yield* closeServer(server); + }); + }); + +const startServer = ( + handler: (request: IncomingMessage, response: ServerResponse) => void, +): Effect.Effect<{ server: Server; port: number }, Error> => + Effect.gen(function* () { + const server = createServer(handler); + const port = yield* listenServer(server); + return { server, port }; + }); + +it.layer(WebPortInspectorLive.pipe(Layer.provide(FetchHttpClient.layer)))( + "WebPortInspectorLive", + (it) => { + it.effect("treats slow HTML responses as web ports", () => + Effect.acquireUseRelease( + startServer((_request, response) => { + setTimeout(() => { + response.statusCode = 200; + response.setHeader("content-type", "text/html; charset=utf-8"); + response.end( + "okhello", + ); + }, 800); + }), + ({ port }) => + Effect.gen(function* () { + const inspector = yield* WebPortInspector; + const isWeb = yield* inspector.inspect(port); + assert.equal(isWeb, true); + }), + ({ server }) => closeServer(server), + ), + ); + + it.effect("treats HTML responses with large bodies as web ports", () => + Effect.acquireUseRelease( + startServer((_request, response) => { + response.statusCode = 200; + response.setHeader("content-type", "text/html; charset=utf-8"); + response.end( + `x${"x".repeat(20_000)}`, + ); + }), + ({ port }) => + Effect.gen(function* () { + const inspector = yield* WebPortInspector; + const isWeb = yield* inspector.inspect(port); + assert.equal(isWeb, true); + }), + ({ server }) => closeServer(server), + ), + ); + + it.effect("ignores HTTP 404 responses", () => + Effect.acquireUseRelease( + startServer((_request, response) => { + response.statusCode = 404; + response.end(); + }), + ({ port }) => + Effect.gen(function* () { + const inspector = yield* WebPortInspector; + const isWeb = yield* inspector.inspect(port); + assert.equal(isWeb, false); + }), + ({ server }) => closeServer(server), + ), + ); + }, +); + +it.effect("WebPortInspectorLive preserves typed timeout errors", () => + Effect.gen(function* () { + const inspector = yield* WebPortInspector; + const error = yield* inspector.inspect(3000).pipe(Effect.flip); + + assert.equal(Schema.is(WebPortInspectionError)(error), true); + assert.equal(error.detail, "HTTP probe timed out."); + }).pipe( + Effect.provide( + WebPortInspectorLive.pipe( + Layer.provide( + Layer.succeed(HttpClient.HttpClient, { + execute: () => + Effect.fail( + new WebPortInspectionError({ + port: 3000, + host: "127.0.0.1", + detail: "HTTP probe timed out.", + }), + ), + } as never), + ), + ), + ), + ), +); diff --git a/apps/server/src/process/Layers/WebPortInspector.ts b/apps/server/src/process/Layers/WebPortInspector.ts new file mode 100644 index 00000000000..d58bea836c5 --- /dev/null +++ b/apps/server/src/process/Layers/WebPortInspector.ts @@ -0,0 +1,179 @@ +import { Buffer } from "node:buffer"; + +import { Effect, Layer, Option, Schema, Stream } from "effect"; +import { HttpClient, HttpClientRequest } from "effect/unstable/http"; + +import type { WebPortInspectorShape } from "../Services/WebPortInspector.ts"; +import { WebPortInspector, WebPortInspectionError } from "../Services/WebPortInspector.ts"; + +const DEFAULT_WEB_PORT_PROBE_TIMEOUT_MS = 2_000; +const WEB_PORT_PROBE_MAX_BODY_BYTES = 8_192; + +interface WebProbeResult { + readonly status: number; + readonly contentType: string; + readonly body: string; + readonly location: string; +} + +function normalizeHeaderValue(value: string | string[] | undefined): string { + if (typeof value === "string") return value; + if (Array.isArray(value)) return value[0] ?? ""; + return ""; +} + +function isLikelyWebProbe(result: WebProbeResult | null): boolean { + if (!result) return false; + if (result.status === 404) return false; + if (result.status >= 300 && result.status < 400 && result.location.length > 0) { + return true; + } + const contentType = result.contentType.toLowerCase(); + if (contentType.includes("text/html") || contentType.includes("application/xhtml+xml")) { + return true; + } + const body = result.body.toLowerCase(); + return body.includes("( + stream: Stream.Stream, +): Effect.Effect => + stream.pipe( + Stream.decodeText(), + Stream.runFold( + () => ({ + text: "", + bytes: 0, + }), + (state, chunk) => { + if (state.bytes >= WEB_PORT_PROBE_MAX_BODY_BYTES) { + return state; + } + + const chunkBytes = Buffer.byteLength(chunk); + const remainingBytes = WEB_PORT_PROBE_MAX_BODY_BYTES - state.bytes; + if (chunkBytes <= remainingBytes) { + return { + text: `${state.text}${chunk}`, + bytes: state.bytes + chunkBytes, + }; + } + + return { + text: `${state.text}${Buffer.from(chunk).subarray(0, remainingBytes).toString("utf8")}`, + bytes: state.bytes + remainingBytes, + }; + }, + ), + Effect.map((preview) => preview.text), + Effect.orElseSucceed(() => ""), + ); + +const makeWebPortInspector = Effect.gen(function* () { + const httpClient = yield* HttpClient.HttpClient; + + const probeWebPortOnHost = Effect.fn("process.probeWebPortOnHost")(function* ( + port: number, + host: string, + ): Effect.fn.Return { + const request = HttpClientRequest.get(`http://${host}:${port}/`).pipe( + HttpClientRequest.setHeaders({ + accept: "text/html,application/xhtml+xml;q=1,*/*;q=0.1", + }), + ); + + return yield* httpClient.execute(request).pipe( + Effect.flatMap((response) => + Effect.gen(function* () { + const status = response.status; + const contentType = normalizeHeaderValue(response.headers["content-type"]); + const location = normalizeHeaderValue(response.headers.location); + + if ( + (status >= 300 && status < 400 && location.length > 0) || + contentType.toLowerCase().includes("text/html") || + contentType.toLowerCase().includes("application/xhtml+xml") + ) { + return { + status, + contentType, + location, + body: "", + } satisfies WebProbeResult; + } + + const body = yield* collectBodyPreview(response.stream); + return { + status, + contentType, + location, + body, + } satisfies WebProbeResult; + }), + ), + Effect.timeoutOption(DEFAULT_WEB_PORT_PROBE_TIMEOUT_MS), + Effect.flatMap( + Option.match({ + onNone: () => + Effect.fail( + new WebPortInspectionError({ + port, + host, + detail: "HTTP probe timed out.", + }), + ), + onSome: Effect.succeed, + }), + ), + Effect.mapError((error) => + Schema.is(WebPortInspectionError)(error) + ? error + : new WebPortInspectionError({ + port, + host, + detail: "Failed to execute HTTP probe request.", + cause: error, + }), + ), + ); + }); + + const inspect: WebPortInspectorShape["inspect"] = Effect.fn("process.inspectWebPort")(function* ( + port: number, + ) { + if (!Number.isInteger(port) || port <= 0 || port > 65_535) { + return yield* new WebPortInspectionError({ + port, + host: "127.0.0.1", + detail: "Port must be an integer between 1 and 65535.", + }); + } + + const ipv4Result = yield* probeWebPortOnHost(port, "127.0.0.1").pipe(Effect.exit); + if (ipv4Result._tag === "Success" && isLikelyWebProbe(ipv4Result.value)) { + return true; + } + + const ipv6Result = yield* probeWebPortOnHost(port, "::1").pipe(Effect.exit); + if (ipv6Result._tag === "Success" && isLikelyWebProbe(ipv6Result.value)) { + return true; + } + + if (ipv4Result._tag === "Success" || ipv6Result._tag === "Success") { + return false; + } + + if (ipv6Result._tag === "Failure") { + return yield* Effect.failCause(ipv6Result.cause); + } + + return yield* Effect.failCause(ipv4Result.cause); + }); + + return { + inspect, + } satisfies WebPortInspectorShape; +}); + +export const WebPortInspectorLive = Layer.effect(WebPortInspector, makeWebPortInspector); diff --git a/apps/server/src/process/Services/TerminalProcessInspector.ts b/apps/server/src/process/Services/TerminalProcessInspector.ts new file mode 100644 index 00000000000..209111d4cf6 --- /dev/null +++ b/apps/server/src/process/Services/TerminalProcessInspector.ts @@ -0,0 +1,53 @@ +import { Context, Schema } from "effect"; +import type { Effect } from "effect"; + +function describeTerminalInspectorCause(cause: unknown): string | null { + if (cause instanceof Error && cause.message.length > 0) { + return cause.message; + } + if (typeof cause === "string" && cause.length > 0) { + return cause; + } + if (cause === undefined || cause === null) { + return null; + } + return String(cause); +} + +export class TerminalProcessInspectionError extends Schema.TaggedErrorClass()( + "TerminalProcessInspectionError", + { + operation: Schema.String, + terminalPid: Schema.Int, + command: Schema.String, + detail: Schema.String, + cause: Schema.optional(Schema.Defect), + }, +) { + override get message(): string { + const cause = describeTerminalInspectorCause(this.cause); + return `${this.operation} failed for terminal pid ${this.terminalPid} (${this.command}): ${this.detail}${ + cause ? ` Cause: ${cause}` : "" + }`; + } +} + +export interface TerminalSubprocessActivity { + hasRunningSubprocess: boolean; + runningPorts: number[]; +} + +export type TerminalSubprocessInspector = ( + terminalPid: number, +) => Effect.Effect; + +export interface TerminalProcessInspectorShape { + readonly inspect: ( + terminalPid: number, + ) => Effect.Effect; +} + +export class TerminalProcessInspector extends Context.Service< + TerminalProcessInspector, + TerminalProcessInspectorShape +>()("t3/process/Services/TerminalProcessInspector") {} diff --git a/apps/server/src/process/Services/WebPortInspector.ts b/apps/server/src/process/Services/WebPortInspector.ts new file mode 100644 index 00000000000..02bb1089635 --- /dev/null +++ b/apps/server/src/process/Services/WebPortInspector.ts @@ -0,0 +1,30 @@ +import { Context, Schema } from "effect"; +import type { Effect } from "effect"; + +export const DEFAULT_WEB_PORT_PROBE_TTL_MS = 10_000; + +export type TerminalWebPortInspector = ( + port: number, +) => Effect.Effect; + +export class WebPortInspectionError extends Schema.TaggedErrorClass()( + "WebPortInspectionError", + { + port: Schema.Int, + host: Schema.String, + detail: Schema.String, + cause: Schema.optional(Schema.Defect), + }, +) { + override get message(): string { + return `Web port probe failed for ${this.host}:${this.port}: ${this.detail}`; + } +} + +export interface WebPortInspectorShape { + readonly inspect: (port: number) => Effect.Effect; +} + +export class WebPortInspector extends Context.Service()( + "t3/process/Services/WebPortInspector", +) {} diff --git a/apps/server/src/process/posix.test.ts b/apps/server/src/process/posix.test.ts new file mode 100644 index 00000000000..eeddf8b013e --- /dev/null +++ b/apps/server/src/process/posix.test.ts @@ -0,0 +1,60 @@ +import { assert, describe, it } from "@effect/vitest"; +import { Effect } from "effect"; + +import { checkPosixListeningPorts } from "./posix.ts"; + +describe("process.checkPosixListeningPorts", () => { + it.effect("falls back to ss when lsof exits with code 1", () => + Effect.gen(function* () { + const commands: string[] = []; + + const ports = yield* checkPosixListeningPorts([123], { + terminalPid: 123, + platform: "linux", + runCommand: (input) => { + commands.push(input.command); + if (input.command === "lsof") { + return Effect.succeed({ + stdout: "", + stderr: "", + exitCode: 1, + }); + } + + return Effect.succeed({ + stdout: + "State Recv-Q Send-Q Local Address:Port Peer Address:Port Process\n" + + 'LISTEN 0 511 127.0.0.1:3773 0.0.0.0:* users:(("node",pid=123,fd=18))\n', + stderr: "", + exitCode: 0, + }); + }, + }); + + assert.deepStrictEqual(ports, [3773]); + assert.deepStrictEqual(commands, ["lsof", "ss"]); + }), + ); + + it.effect("does not try ss on darwin when lsof reports no listening ports", () => + Effect.gen(function* () { + const commands: string[] = []; + + const ports = yield* checkPosixListeningPorts([123], { + terminalPid: 123, + platform: "darwin", + runCommand: (input) => { + commands.push(input.command); + return Effect.succeed({ + stdout: "", + stderr: "", + exitCode: 1, + }); + }, + }); + + assert.deepStrictEqual(ports, []); + assert.deepStrictEqual(commands, ["lsof"]); + }), + ); +}); diff --git a/apps/server/src/process/posix.ts b/apps/server/src/process/posix.ts new file mode 100644 index 00000000000..3f4e1f9eb77 --- /dev/null +++ b/apps/server/src/process/posix.ts @@ -0,0 +1,147 @@ +import { Effect } from "effect"; + +import type { TerminalProcessInspectionError } from "./Services/TerminalProcessInspector.ts"; +import { MAX_PORT_NUMBER, portFromAddress } from "./utils.ts"; + +interface InspectorCommandResult { + readonly stdout: string; + readonly stderr: string; + readonly exitCode: number; +} + +interface PosixRunCommand { + ( + input: Readonly<{ + operation: string; + terminalPid: number; + command: string; + args: ReadonlyArray; + timeoutMs: number; + maxOutputBytes: number; + }>, + ): Effect.Effect; +} + +export const collectPosixProcessFamilyPids = Effect.fn("process.collectPosixProcessFamilyPids")( + function* ( + terminalPid: number, + runCommand: PosixRunCommand, + ): Effect.fn.Return { + const psResult = yield* runCommand({ + operation: "TerminalProcessInspector.collectPosixProcessFamilyPids", + terminalPid, + command: "ps", + args: ["-eo", "pid=,ppid="], + timeoutMs: 1_000, + maxOutputBytes: 262_144, + }); + if (psResult.exitCode !== 0) { + return []; + } + + const childrenByParentPid = new Map(); + for (const line of psResult.stdout.split(/\r?\n/g)) { + const [pidRaw, ppidRaw] = line.trim().split(/\s+/g); + const pid = Number(pidRaw); + const ppid = Number(ppidRaw); + if (!Number.isInteger(pid) || !Number.isInteger(ppid)) continue; + const children = childrenByParentPid.get(ppid); + if (children) { + children.push(pid); + } else { + childrenByParentPid.set(ppid, [pid]); + } + } + + const processFamily = new Set([terminalPid]); + const pendingParents = [terminalPid]; + while (pendingParents.length > 0) { + const parentPid = pendingParents.shift(); + if (!parentPid) continue; + const childPids = childrenByParentPid.get(parentPid); + if (!childPids || childPids.length === 0) continue; + for (const childPid of childPids) { + if (processFamily.has(childPid)) continue; + processFamily.add(childPid); + pendingParents.push(childPid); + } + } + + return [...processFamily]; + }, +); + +export const checkPosixListeningPorts = Effect.fn("process.checkPosixListeningPorts")(function* ( + processIds: number[], + input: { + terminalPid: number; + runCommand: PosixRunCommand; + platform?: NodeJS.Platform; + }, +): Effect.fn.Return { + if (processIds.length === 0) return []; + + const ports = new Set(); + const pidFilter = new Set(processIds); + const platform = input.platform ?? process.platform; + + const lsofResult = yield* input + .runCommand({ + operation: "TerminalProcessInspector.checkPosixListeningPorts.lsof", + terminalPid: input.terminalPid, + command: "lsof", + args: ["-nP", "-a", "-iTCP", "-sTCP:LISTEN", "-p", processIds.join(",")], + timeoutMs: 1_500, + maxOutputBytes: 262_144, + }) + .pipe(Effect.exit); + + if (lsofResult._tag === "Success") { + if (lsofResult.value.exitCode === 0) { + for (const line of lsofResult.value.stdout.split(/\r?\n/g)) { + const match = line.match(/:(\d+)\s+\(LISTEN\)$/); + if (!match?.[1]) continue; + const port = Number(match[1]); + if (Number.isInteger(port) && port > 0 && port <= MAX_PORT_NUMBER) { + ports.add(port); + } + } + return [...ports].toSorted((left, right) => left - right); + } + if (lsofResult.value.exitCode === 1 && platform === "darwin") { + return []; + } + } + + const ssResult = yield* input.runCommand({ + operation: "TerminalProcessInspector.checkPosixListeningPorts.ss", + terminalPid: input.terminalPid, + command: "ss", + args: ["-ltnp"], + timeoutMs: 1_500, + maxOutputBytes: 524_288, + }); + if (ssResult.exitCode !== 0) { + return []; + } + + for (const line of ssResult.stdout.split(/\r?\n/g)) { + if (!line.includes("pid=")) continue; + const localAddress = line.trim().split(/\s+/g)[3]; + if (!localAddress) continue; + const port = portFromAddress(localAddress); + if (port === null) continue; + + const pidMatches = [...line.matchAll(/pid=(\d+)/g)]; + if (pidMatches.length === 0) continue; + if ( + pidMatches.some((match) => { + const pid = Number(match[1]); + return Number.isInteger(pid) && pidFilter.has(pid); + }) + ) { + ports.add(port); + } + } + return [...ports].toSorted((left, right) => left - right); +}); diff --git a/apps/server/src/process/utils.ts b/apps/server/src/process/utils.ts new file mode 100644 index 00000000000..d59b6b3eaa8 --- /dev/null +++ b/apps/server/src/process/utils.ts @@ -0,0 +1,52 @@ +export const MAX_PORT_NUMBER = 65_535; + +export function normalizeRunningPorts(ports: number[]): number[] { + if (ports.length === 0) return []; + return [...new Set(ports)] + .filter((port) => Number.isInteger(port) && port > 0 && port <= MAX_PORT_NUMBER) + .toSorted((left, right) => left - right); +} + +export function parsePidList(stdout: string): number[] { + const pids: number[] = []; + for (const line of stdout.split(/\r?\n/g)) { + const pid = Number(line.trim()); + if (!Number.isInteger(pid) || pid <= 0) { + continue; + } + pids.push(pid); + } + return [...new Set(pids)]; +} + +export function parsePortList(stdout: string): number[] { + const ports: number[] = []; + for (const line of stdout.split(/\r?\n/g)) { + const port = Number(line.trim()); + if (!Number.isInteger(port)) { + continue; + } + ports.push(port); + } + return normalizeRunningPorts(ports); +} + +export function portFromAddress(address: string): number | null { + const match = address.match(/:(\d+)$/); + if (!match?.[1]) return null; + const port = Number(match[1]); + if (!Number.isInteger(port) || port <= 0 || port > MAX_PORT_NUMBER) { + return null; + } + return port; +} + +export function arePortListsEqual(left: number[], right: number[]): boolean { + if (left.length !== right.length) return false; + for (let index = 0; index < left.length; index += 1) { + if (left[index] !== right[index]) { + return false; + } + } + return true; +} diff --git a/apps/server/src/process/win32.test.ts b/apps/server/src/process/win32.test.ts new file mode 100644 index 00000000000..1751baa4cb6 --- /dev/null +++ b/apps/server/src/process/win32.test.ts @@ -0,0 +1,57 @@ +import { assert, describe, it } from "@effect/vitest"; +import { Effect } from "effect"; + +import { collectWindowsChildPids } from "./win32.ts"; + +function parentPidFromCommand(command: string | undefined): number { + return Number(/ParentProcessId = (\d+)/.exec(command ?? "")?.[1] ?? "0"); +} + +describe("process.collectWindowsChildPids", () => { + it.effect("walks the full descendant tree breadth-first", () => + Effect.gen(function* () { + const commands: string[] = []; + const childMap = new Map([ + [100, "200\n300\n"], + [200, "400\n"], + [300, ""], + [400, ""], + ]); + + const childPids = yield* collectWindowsChildPids(100, (input) => { + const command = input.args[3] ?? ""; + commands.push(command); + return Effect.succeed({ + stdout: childMap.get(parentPidFromCommand(command)) ?? "", + stderr: "", + exitCode: 0, + }); + }); + + assert.deepStrictEqual(childPids, [200, 300, 400]); + assert.equal(commands.length, 4); + }), + ); + + it.effect("deduplicates repeated descendants to avoid traversal loops", () => + Effect.gen(function* () { + const childMap = new Map([ + [100, "200\n300\n"], + [200, "300\n400\n"], + [300, "200\n"], + [400, ""], + ]); + + const childPids = yield* collectWindowsChildPids(100, (input) => { + const parentPid = parentPidFromCommand(input.args[3]); + return Effect.succeed({ + stdout: childMap.get(parentPid) ?? "", + stderr: "", + exitCode: 0, + }); + }); + + assert.deepStrictEqual(childPids, [200, 300, 400]); + }), + ); +}); diff --git a/apps/server/src/process/win32.ts b/apps/server/src/process/win32.ts new file mode 100644 index 00000000000..8bb862dd04e --- /dev/null +++ b/apps/server/src/process/win32.ts @@ -0,0 +1,98 @@ +import { Effect } from "effect"; + +import type { TerminalProcessInspectionError } from "./Services/TerminalProcessInspector.ts"; +import { parsePidList, parsePortList } from "./utils.ts"; + +interface InspectorCommandResult { + readonly stdout: string; + readonly stderr: string; + readonly exitCode: number; +} + +interface WindowsRunCommand { + ( + input: Readonly<{ + operation: string; + terminalPid: number; + command: string; + args: ReadonlyArray; + timeoutMs: number; + maxOutputBytes: number; + }>, + ): Effect.Effect; +} + +export const collectWindowsChildPids = Effect.fn("process.collectWindowsChildPids")(function* ( + terminalPid: number, + runCommand: WindowsRunCommand, +): Effect.fn.Return { + const seenPids = new Set([terminalPid]); + const childPids: number[] = []; + const pendingParentPids = [terminalPid]; + + while (pendingParentPids.length > 0) { + const parentPid = pendingParentPids.shift(); + if (parentPid === undefined) { + break; + } + + const command = [ + `$children = Get-CimInstance Win32_Process -Filter "ParentProcessId = ${parentPid}" -ErrorAction SilentlyContinue`, + "if (-not $children) { exit 0 }", + "$children | Select-Object -ExpandProperty ProcessId", + ].join("; "); + const result = yield* runCommand({ + operation: "TerminalProcessInspector.collectWindowsChildPids", + terminalPid, + command: "powershell.exe", + args: ["-NoProfile", "-NonInteractive", "-Command", command], + timeoutMs: 1_500, + maxOutputBytes: 32_768, + }); + if (result.exitCode !== 0) { + continue; + } + + for (const childPid of parsePidList(result.stdout)) { + if (seenPids.has(childPid)) { + continue; + } + seenPids.add(childPid); + childPids.push(childPid); + pendingParentPids.push(childPid); + } + } + return childPids; +}); + +export const checkWindowsListeningPorts = Effect.fn("process.checkWindowsListeningPorts")( + function* ( + processIds: number[], + input: { + terminalPid: number; + runCommand: WindowsRunCommand; + }, + ): Effect.fn.Return { + if (processIds.length === 0) return []; + + const processFilter = processIds.map((pid) => `$_.OwningProcess -eq ${pid}`).join(" -or "); + const command = [ + "$connections = Get-NetTCPConnection -State Listen -ErrorAction SilentlyContinue", + `$matching = $connections | Where-Object { ${processFilter} }`, + "if (-not $matching) { exit 0 }", + "$matching | Select-Object -ExpandProperty LocalPort -Unique", + ].join("; "); + const result = yield* input.runCommand({ + operation: "TerminalProcessInspector.checkWindowsListeningPorts", + terminalPid: input.terminalPid, + command: "powershell.exe", + args: ["-NoProfile", "-NonInteractive", "-Command", command], + timeoutMs: 1_500, + maxOutputBytes: 65_536, + }); + if (result.exitCode !== 0) { + return []; + } + return parsePortList(result.stdout); + }, +); diff --git a/apps/server/src/server.ts b/apps/server/src/server.ts index 50d2d62aa72..6555774605e 100644 --- a/apps/server/src/server.ts +++ b/apps/server/src/server.ts @@ -48,6 +48,8 @@ import { WorkspaceFileSystemLive } from "./workspace/Layers/WorkspaceFileSystem. import { WorkspacePathsLive } from "./workspace/Layers/WorkspacePaths.ts"; import { ProjectSetupScriptRunnerLive } from "./project/Layers/ProjectSetupScriptRunner.ts"; import { ObservabilityLive } from "./observability/Layers/Observability.ts"; +import { TerminalProcessInspectorLive } from "./process/Layers/TerminalProcessInspector.ts"; +import { WebPortInspectorLive } from "./process/Layers/WebPortInspector.ts"; import { ServerEnvironmentLive } from "./environment/Layers/ServerEnvironment.ts"; import { authBearerBootstrapRouteLayer, @@ -183,7 +185,11 @@ const GitLayerLive = Layer.empty.pipe( Layer.provideMerge(GitCoreLive), ); -const TerminalLayerLive = TerminalManagerLive.pipe(Layer.provide(PtyAdapterLive)); +const TerminalLayerLive = TerminalManagerLive.pipe( + Layer.provideMerge(WebPortInspectorLive), + Layer.provideMerge(TerminalProcessInspectorLive), + Layer.provide(PtyAdapterLive), +); const WorkspaceEntriesLayerLive = WorkspaceEntriesLive.pipe( Layer.provide(WorkspacePathsLive), diff --git a/apps/server/src/terminal/Layers/Manager.test.ts b/apps/server/src/terminal/Layers/Manager.test.ts index 9d41c3de20f..98de0442243 100644 --- a/apps/server/src/terminal/Layers/Manager.test.ts +++ b/apps/server/src/terminal/Layers/Manager.test.ts @@ -1,7 +1,9 @@ import path from "node:path"; +import * as NodeChildProcessSpawner from "@effect/platform-node/NodeChildProcessSpawner"; import * as NodeServices from "@effect/platform-node/NodeServices"; import { assert, it } from "@effect/vitest"; +import { FetchHttpClient } from "effect/unstable/http"; import { DEFAULT_TERMINAL_ID, type TerminalEvent, @@ -15,6 +17,7 @@ import { Exit, Fiber, FileSystem, + Layer, Option, PlatformError, Ref, @@ -24,6 +27,17 @@ import { import { TestClock } from "effect/testing"; import { expect } from "vitest"; +import { TerminalProcessInspectorLive } from "../../process/Layers/TerminalProcessInspector.ts"; +import { WebPortInspectorLive } from "../../process/Layers/WebPortInspector.ts"; +import { + TerminalProcessInspectionError, + TerminalProcessInspector, +} from "../../process/Services/TerminalProcessInspector.ts"; +import { + WebPortInspector, + WebPortInspectionError, + type TerminalWebPortInspector, +} from "../../process/Services/WebPortInspector.ts"; import type { TerminalManagerShape } from "../Services/Manager.ts"; import { type PtyAdapterShape, @@ -32,7 +46,11 @@ import { type PtySpawnInput, PtySpawnError, } from "../Services/PTY.ts"; -import { makeTerminalManagerWithOptions } from "./Manager.ts"; +import { + describeSubprocessInspectorError, + makeTerminalManagerWithOptions, + nextSubprocessActivityErrorLogState, +} from "./Manager.ts"; class FakePtyProcess implements PtyProcess { readonly writes: string[] = []; @@ -194,9 +212,13 @@ function multiTerminalHistoryLogPath( interface CreateManagerOptions { shellResolver?: () => string; + subprocessInspector?: ( + terminalPid: number, + ) => Effect.Effect<{ hasRunningSubprocess: boolean; runningPorts: number[] }>; + webPortInspector?: TerminalWebPortInspector; + webPortProbeCacheTtlMs?: number; platform?: NodeJS.Platform; env?: NodeJS.ProcessEnv; - subprocessChecker?: (terminalPid: number) => Effect.Effect; subprocessPollIntervalMs?: number; processKillGraceMs?: number; maxRetainedInactiveSessions?: number; @@ -217,7 +239,7 @@ const createManager = ( ): Effect.Effect< ManagerFixture, PlatformError.PlatformError, - FileSystem.FileSystem | Scope.Scope + FileSystem.FileSystem | Scope.Scope | TerminalProcessInspector | WebPortInspector > => Effect.flatMap(Effect.service(FileSystem.FileSystem), (fs) => Effect.gen(function* () { @@ -230,11 +252,17 @@ const createManager = ( historyLineLimit, ptyAdapter, ...(options.shellResolver !== undefined ? { shellResolver: options.shellResolver } : {}), + ...(options.subprocessInspector !== undefined + ? { subprocessInspector: options.subprocessInspector } + : {}), + ...(options.webPortInspector !== undefined + ? { webPortInspector: options.webPortInspector } + : {}), + ...(options.webPortProbeCacheTtlMs !== undefined + ? { webPortProbeCacheTtlMs: options.webPortProbeCacheTtlMs } + : {}), ...(options.platform !== undefined ? { platform: options.platform } : {}), ...(options.env !== undefined ? { env: options.env } : {}), - ...(options.subprocessChecker !== undefined - ? { subprocessChecker: options.subprocessChecker } - : {}), ...(options.subprocessPollIntervalMs !== undefined ? { subprocessPollIntervalMs: options.subprocessPollIntervalMs } : {}), @@ -262,7 +290,71 @@ const createManager = ( }), ); -it.layer(NodeServices.layer, { excludeTestServices: true })("TerminalManager", (it) => { +const nodeChildProcessLayer = NodeChildProcessSpawner.layer.pipe(Layer.provide(NodeServices.layer)); + +it.effect("formats subprocess inspector errors with command and underlying cause details", () => + Effect.sync(() => { + const error = new TerminalProcessInspectionError({ + operation: "TerminalProcessInspector.checkPosixListeningPorts.ss", + terminalPid: 77_411, + command: "ss -ltnp", + detail: "Failed to spawn inspector command.", + cause: new Error("spawn ss ENOENT"), + }); + + const description = describeSubprocessInspectorError(error); + + assert.equal(description.operation, "TerminalProcessInspector.checkPosixListeningPorts.ss"); + assert.equal(description.command, "ss -ltnp"); + assert.equal(description.detail, "Failed to spawn inspector command."); + assert.equal(description.cause, "spawn ss ENOENT"); + assert.include(description.error, "ss -ltnp"); + assert.include(description.error, "spawn ss ENOENT"); + }), +); + +it.effect( + "suppresses repeated identical subprocess inspector warnings inside the throttle window", + () => + Effect.sync(() => { + const first = nextSubprocessActivityErrorLogState({ + previous: undefined, + fingerprint: "same-error", + now: 1_000, + throttleMs: 30_000, + }); + assert.equal(first.shouldLog, true); + assert.equal(first.suppressedRepeats, 0); + + const second = nextSubprocessActivityErrorLogState({ + previous: first.next, + fingerprint: "same-error", + now: 2_000, + throttleMs: 30_000, + }); + assert.equal(second.shouldLog, false); + assert.equal(second.next.suppressedRepeats, 1); + + const third = nextSubprocessActivityErrorLogState({ + previous: second.next, + fingerprint: "same-error", + now: 32_000, + throttleMs: 30_000, + }); + assert.equal(third.shouldLog, true); + assert.equal(third.suppressedRepeats, 1); + assert.equal(third.next.suppressedRepeats, 0); + }), +); + +it.layer( + Layer.mergeAll( + NodeServices.layer, + WebPortInspectorLive.pipe(Layer.provide(FetchHttpClient.layer)), + TerminalProcessInspectorLive.pipe(Layer.provide(nodeChildProcessLayer)), + ), + { excludeTestServices: true }, +)("TerminalManager", (it) => { it.effect("spawns lazily and reuses running terminal per thread", () => Effect.gen(function* () { const { manager, ptyAdapter } = yield* createManager(); @@ -560,42 +652,127 @@ it.layer(NodeServices.layer, { excludeTestServices: true })("TerminalManager", ( }), ); - it.effect("emits subprocess activity events when child-process state changes", () => + it.effect("emits subprocess activity events when child-process state or web ports change", () => Effect.gen(function* () { - let hasRunningSubprocess = false; + let activity = { + hasRunningSubprocess: false, + runningPorts: [] as number[], + }; const { manager, getEvents } = yield* createManager(5, { - subprocessChecker: () => Effect.succeed(hasRunningSubprocess), + subprocessInspector: () => Effect.succeed(activity), + webPortInspector: (port) => Effect.succeed(port === 3000), subprocessPollIntervalMs: 20, }); yield* manager.open(openInput()); expect((yield* getEvents).some((event) => event.type === "activity")).toBe(false); - hasRunningSubprocess = true; + activity = { hasRunningSubprocess: true, runningPorts: [3000] }; + yield* waitFor( + Effect.map(getEvents, (events) => + events.some( + (event) => + event.type === "activity" && + event.hasRunningSubprocess === true && + event.runningPorts.join(",") === "3000", + ), + ), + "1200 millis", + ); + + activity = { hasRunningSubprocess: true, runningPorts: [5173] }; yield* waitFor( Effect.map(getEvents, (events) => - events.some((event) => event.type === "activity" && event.hasRunningSubprocess === true), + events.some( + (event) => + event.type === "activity" && + event.hasRunningSubprocess === true && + event.runningPorts.length === 0, + ), ), "1200 millis", ); - hasRunningSubprocess = false; + activity = { hasRunningSubprocess: false, runningPorts: [5173] }; yield* waitFor( Effect.map(getEvents, (events) => - events.some((event) => event.type === "activity" && event.hasRunningSubprocess === false), + events.some( + (event) => + event.type === "activity" && + event.hasRunningSubprocess === false && + event.runningPorts.length === 0, + ), ), "1200 millis", ); }), ); + it.effect("keeps a confirmed web port during transient probe failures after cache expiry", () => + Effect.gen(function* () { + let activity = { + hasRunningSubprocess: true, + runningPorts: [3000], + }; + let probeCalls = 0; + const { manager, getEvents } = yield* createManager(5, { + subprocessInspector: () => Effect.succeed(activity), + webPortInspector: () => { + probeCalls += 1; + return probeCalls === 1 + ? Effect.succeed(true) + : Effect.fail( + new WebPortInspectionError({ + port: 3000, + host: "127.0.0.1", + detail: "transient web probe failure", + }), + ); + }, + webPortProbeCacheTtlMs: 25, + subprocessPollIntervalMs: 20, + }); + + yield* manager.open(openInput()); + yield* waitFor( + Effect.map(getEvents, (events) => + events.some( + (event) => + event.type === "activity" && + event.hasRunningSubprocess === true && + event.runningPorts.join(",") === "3000", + ), + ), + "1200 millis", + ); + + yield* waitFor( + Effect.sync(() => probeCalls >= 2), + "1200 millis", + ); + const events = yield* getEvents; + expect( + events.some( + (event) => + event.type === "activity" && + event.hasRunningSubprocess === true && + event.runningPorts.length === 0, + ), + ).toBe(false); + activity = { hasRunningSubprocess: false, runningPorts: [] }; + }), + ); + it.effect("does not invoke subprocess polling until a terminal session is running", () => Effect.gen(function* () { let checks = 0; const { manager } = yield* createManager(5, { - subprocessChecker: () => { + subprocessInspector: () => { checks += 1; - return Effect.succeed(false); + return Effect.succeed({ + hasRunningSubprocess: false, + runningPorts: [], + }); }, subprocessPollIntervalMs: 20, }); diff --git a/apps/server/src/terminal/Layers/Manager.ts b/apps/server/src/terminal/Layers/Manager.ts index 5e14db8e5c1..a53ad8cdadb 100644 --- a/apps/server/src/terminal/Layers/Manager.ts +++ b/apps/server/src/terminal/Layers/Manager.ts @@ -28,7 +28,18 @@ import { terminalRestartsTotal, terminalSessionsTotal, } from "../../observability/Metrics.ts"; -import { runProcess } from "../../processRunner.ts"; +import { arePortListsEqual, normalizeRunningPorts } from "../../process/utils.ts"; +import { + TerminalProcessInspector, + TerminalProcessInspectionError, + type TerminalSubprocessActivity, + type TerminalSubprocessInspector, +} from "../../process/Services/TerminalProcessInspector.ts"; +import { + DEFAULT_WEB_PORT_PROBE_TTL_MS, + WebPortInspector, + type TerminalWebPortInspector, +} from "../../process/Services/WebPortInspector.ts"; import { TerminalCwdError, TerminalHistoryError, @@ -48,22 +59,13 @@ import { const DEFAULT_HISTORY_LINE_LIMIT = 5_000; const DEFAULT_PERSIST_DEBOUNCE_MS = 40; const DEFAULT_SUBPROCESS_POLL_INTERVAL_MS = 1_000; +const DEFAULT_SUBPROCESS_ACTIVITY_ERROR_LOG_THROTTLE_MS = 30_000; const DEFAULT_PROCESS_KILL_GRACE_MS = 1_000; const DEFAULT_MAX_RETAINED_INACTIVE_SESSIONS = 128; const DEFAULT_OPEN_COLS = 120; const DEFAULT_OPEN_ROWS = 30; const TERMINAL_ENV_BLOCKLIST = new Set(["PORT", "ELECTRON_RENDERER_PORT", "ELECTRON_RUN_AS_NODE"]); -class TerminalSubprocessCheckError extends Schema.TaggedErrorClass()( - "TerminalSubprocessCheckError", - { - message: Schema.String, - cause: Schema.optional(Schema.Defect), - terminalPid: Schema.Number, - command: Schema.Literals(["powershell", "pgrep", "ps"]), - }, -) {} - class TerminalProcessSignalError extends Schema.TaggedErrorClass()( "TerminalProcessSignalError", { @@ -73,8 +75,94 @@ class TerminalProcessSignalError extends Schema.TaggedErrorClass; +interface SubprocessActivityErrorLogState { + fingerprint: string; + lastLoggedAt: number; + suppressedRepeats: number; +} + +export function describeSubprocessInspectorError(error: unknown): { + fingerprint: string; + error: string; + operation?: string; + command?: string; + detail?: string; + cause?: string; +} { + if (Schema.is(TerminalProcessInspectionError)(error)) { + const cause = + error.cause instanceof Error + ? error.cause.message + : typeof error.cause === "string" + ? error.cause + : error.cause === undefined || error.cause === null + ? undefined + : String(error.cause); + return { + fingerprint: JSON.stringify([ + error.operation, + error.terminalPid, + error.command, + error.detail, + cause ?? "", + ]), + error: error.message, + operation: error.operation, + command: error.command, + detail: error.detail, + ...(cause ? { cause } : {}), + }; + } + + const message = error instanceof Error ? error.message : String(error); + return { + fingerprint: message, + error: message, + }; +} + +export function nextSubprocessActivityErrorLogState(input: { + previous: SubprocessActivityErrorLogState | undefined; + fingerprint: string; + now: number; + throttleMs: number; +}): { + shouldLog: boolean; + suppressedRepeats: number; + next: SubprocessActivityErrorLogState; +} { + if (!input.previous || input.previous.fingerprint !== input.fingerprint) { + return { + shouldLog: true, + suppressedRepeats: 0, + next: { + fingerprint: input.fingerprint, + lastLoggedAt: input.now, + suppressedRepeats: 0, + }, + }; + } + + if (input.now - input.previous.lastLoggedAt >= input.throttleMs) { + return { + shouldLog: true, + suppressedRepeats: input.previous.suppressedRepeats, + next: { + fingerprint: input.fingerprint, + lastLoggedAt: input.now, + suppressedRepeats: 0, + }, + }; + } + + return { + shouldLog: false, + suppressedRepeats: input.previous.suppressedRepeats + 1, + next: { + ...input.previous, + suppressedRepeats: input.previous.suppressedRepeats + 1, + }, + }; } interface ShellCandidate { @@ -113,6 +201,7 @@ interface TerminalSessionState { unsubscribeData: (() => void) | null; unsubscribeExit: (() => void) | null; hasRunningSubprocess: boolean; + runningSubprocessPorts: number[]; runtimeEnv: Record | null; } @@ -347,108 +436,6 @@ function isRetryableShellSpawnError(error: PtySpawnError): boolean { ); } -function checkWindowsSubprocessActivity( - terminalPid: number, -): Effect.Effect { - const command = [ - `$children = Get-CimInstance Win32_Process -Filter "ParentProcessId = ${terminalPid}" -ErrorAction SilentlyContinue`, - "if ($children) { exit 0 }", - "exit 1", - ].join("; "); - return Effect.tryPromise({ - try: () => - runProcess("powershell.exe", ["-NoProfile", "-NonInteractive", "-Command", command], { - timeoutMs: 1_500, - allowNonZeroExit: true, - maxBufferBytes: 32_768, - outputMode: "truncate", - }), - catch: (cause) => - new TerminalSubprocessCheckError({ - message: "Failed to check Windows terminal subprocess activity.", - cause, - terminalPid, - command: "powershell", - }), - }).pipe(Effect.map((result) => result.code === 0)); -} - -const checkPosixSubprocessActivity = Effect.fn("terminal.checkPosixSubprocessActivity")(function* ( - terminalPid: number, -): Effect.fn.Return { - const runPgrep = Effect.tryPromise({ - try: () => - runProcess("pgrep", ["-P", String(terminalPid)], { - timeoutMs: 1_000, - allowNonZeroExit: true, - maxBufferBytes: 32_768, - outputMode: "truncate", - }), - catch: (cause) => - new TerminalSubprocessCheckError({ - message: "Failed to inspect terminal subprocesses with pgrep.", - cause, - terminalPid, - command: "pgrep", - }), - }); - - const runPs = Effect.tryPromise({ - try: () => - runProcess("ps", ["-eo", "pid=,ppid="], { - timeoutMs: 1_000, - allowNonZeroExit: true, - maxBufferBytes: 262_144, - outputMode: "truncate", - }), - catch: (cause) => - new TerminalSubprocessCheckError({ - message: "Failed to inspect terminal subprocesses with ps.", - cause, - terminalPid, - command: "ps", - }), - }); - - const pgrepResult = yield* Effect.exit(runPgrep); - if (pgrepResult._tag === "Success") { - if (pgrepResult.value.code === 0) { - return pgrepResult.value.stdout.trim().length > 0; - } - if (pgrepResult.value.code === 1) { - return false; - } - } - - const psResult = yield* Effect.exit(runPs); - if (psResult._tag === "Failure" || psResult.value.code !== 0) { - return false; - } - - for (const line of psResult.value.stdout.split(/\r?\n/g)) { - const [pidRaw, ppidRaw] = line.trim().split(/\s+/g); - const pid = Number(pidRaw); - const ppid = Number(ppidRaw); - if (!Number.isInteger(pid) || !Number.isInteger(ppid)) continue; - if (ppid === terminalPid) { - return true; - } - } - return false; -}); - -const defaultSubprocessChecker = Effect.fn("terminal.defaultSubprocessChecker")(function* ( - terminalPid: number, -): Effect.fn.Return { - if (!Number.isInteger(terminalPid) || terminalPid <= 0) { - return false; - } - if (process.platform === "win32") { - return yield* checkWindowsSubprocessActivity(terminalPid); - } - return yield* checkPosixSubprocessActivity(terminalPid); -}); - function capHistory(history: string, maxLines: number): string { if (history.length === 0) return history; const hasTrailingNewline = history.endsWith("\n"); @@ -694,9 +681,11 @@ interface TerminalManagerOptions { historyLineLimit?: number; ptyAdapter: PtyAdapterShape; shellResolver?: () => string; + subprocessInspector?: TerminalSubprocessInspector; + webPortInspector?: TerminalWebPortInspector; + webPortProbeCacheTtlMs?: number; platform?: NodeJS.Platform; env?: NodeJS.ProcessEnv; - subprocessChecker?: TerminalSubprocessChecker; subprocessPollIntervalMs?: number; processKillGraceMs?: number; maxRetainedInactiveSessions?: number; @@ -722,12 +711,17 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith const platform = options.platform ?? process.platform; const baseEnv = options.env ?? process.env; const shellResolver = options.shellResolver ?? (() => defaultShellResolver(platform, baseEnv)); - const subprocessChecker = options.subprocessChecker ?? defaultSubprocessChecker; + const subprocessInspector = + options.subprocessInspector ?? (yield* TerminalProcessInspector).inspect; + const webPortInspector = options.webPortInspector ?? (yield* WebPortInspector).inspect; + const webPortProbeCacheTtlMs = options.webPortProbeCacheTtlMs ?? DEFAULT_WEB_PORT_PROBE_TTL_MS; const subprocessPollIntervalMs = options.subprocessPollIntervalMs ?? DEFAULT_SUBPROCESS_POLL_INTERVAL_MS; const processKillGraceMs = options.processKillGraceMs ?? DEFAULT_PROCESS_KILL_GRACE_MS; const maxRetainedInactiveSessions = options.maxRetainedInactiveSessions ?? DEFAULT_MAX_RETAINED_INACTIVE_SESSIONS; + const webPortProbeCache = new Map(); + const subprocessActivityErrorLogState = new Map(); yield* fileSystem.makeDirectory(logsDir, { recursive: true }).pipe(Effect.orDie); @@ -747,6 +741,66 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith } }); + const inspectWebPortCached = (port: number) => + Effect.gen(function* () { + const now = Date.now(); + const cached = webPortProbeCache.get(port); + for (const [cachedPort, entry] of webPortProbeCache) { + if (cachedPort !== port && now - entry.checkedAt > webPortProbeCacheTtlMs) { + webPortProbeCache.delete(cachedPort); + } + } + + if (cached && now - cached.checkedAt <= webPortProbeCacheTtlMs) { + return true; + } + + const probeResult = yield* webPortInspector(port).pipe(Effect.exit); + if (probeResult._tag === "Success") { + if (probeResult.value) { + webPortProbeCache.set(port, { + isWeb: true, + checkedAt: Date.now(), + }); + return true; + } + webPortProbeCache.delete(port); + return false; + } + + if (cached) { + webPortProbeCache.set(port, { + isWeb: true, + checkedAt: Date.now(), + }); + return true; + } + + return false; + }); + + const detectWebPorts = (runningPorts: number[]) => + Effect.gen(function* () { + if (runningPorts.length === 0) { + return [] as number[]; + } + + const checks = yield* Effect.forEach( + runningPorts, + (port) => + Effect.map(inspectWebPortCached(port), (isWeb) => ({ + port, + isWeb, + })), + { concurrency: "unbounded" }, + ); + + return checks + .filter((entry) => entry.isWeb) + .map((entry) => entry.port) + .toSorted((left, right) => left - right); + }); + const historyPath = (threadId: string, terminalId: string) => { const threadPart = toSafeThreadId(threadId); if (terminalId === DEFAULT_TERMINAL_ID) { @@ -1210,6 +1264,7 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith session.process = null; session.pid = null; session.hasRunningSubprocess = false; + session.runningSubprocessPorts = []; session.status = "exited"; session.pendingHistoryControlSequence = ""; session.pendingProcessEvents = []; @@ -1277,6 +1332,7 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith session.process = null; session.pid = null; session.hasRunningSubprocess = false; + session.runningSubprocessPorts = []; session.status = "exited"; session.pendingHistoryControlSequence = ""; session.pendingProcessEvents = []; @@ -1370,6 +1426,7 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith session.exitCode = null; session.exitSignal = null; session.hasRunningSubprocess = false; + session.runningSubprocessPorts = []; session.pendingProcessEvents = []; session.pendingProcessEventIndex = 0; session.processEventDrainRunning = false; @@ -1443,6 +1500,7 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith session.unsubscribeData = null; session.unsubscribeExit = null; session.hasRunningSubprocess = false; + session.runningSubprocessPorts = []; session.pendingProcessEvents = []; session.pendingProcessEventIndex = 0; session.processEventDrainRunning = false; @@ -1504,6 +1562,14 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith (session): session is TerminalSessionState & { pid: number } => session.status === "running" && Number.isInteger(session.pid), ); + const activeRunningSessionKeys = new Set( + runningSessions.map((session) => toSessionKey(session.threadId, session.terminalId)), + ); + for (const sessionKey of subprocessActivityErrorLogState.keys()) { + if (!activeRunningSessionKeys.has(sessionKey)) { + subprocessActivityErrorLogState.delete(sessionKey); + } + } if (runningSessions.length === 0) { return; @@ -1513,22 +1579,49 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith session: TerminalSessionState & { pid: number }, ) { const terminalPid = session.pid; - const hasRunningSubprocess = yield* subprocessChecker(terminalPid).pipe( + const sessionKey = toSessionKey(session.threadId, session.terminalId); + const activity = yield* subprocessInspector(terminalPid).pipe( + Effect.tap(() => Effect.sync(() => subprocessActivityErrorLogState.delete(sessionKey))), Effect.map(Option.some), - Effect.catch((reason) => - Effect.logWarning("failed to check terminal subprocess activity", { + Effect.catch((error) => { + const description = describeSubprocessInspectorError(error); + const logDecision = nextSubprocessActivityErrorLogState({ + previous: subprocessActivityErrorLogState.get(sessionKey), + fingerprint: description.fingerprint, + now: Date.now(), + throttleMs: DEFAULT_SUBPROCESS_ACTIVITY_ERROR_LOG_THROTTLE_MS, + }); + subprocessActivityErrorLogState.set(sessionKey, logDecision.next); + if (!logDecision.shouldLog) { + return Effect.succeed(Option.none()); + } + return Effect.logWarning("failed to check terminal subprocess activity", { threadId: session.threadId, terminalId: session.terminalId, terminalPid, - reason, - }).pipe(Effect.as(Option.none())), - ), + error: description.error, + ...(description.operation ? { operation: description.operation } : {}), + ...(description.command ? { command: description.command } : {}), + ...(description.detail ? { detail: description.detail } : {}), + ...(description.cause ? { cause: description.cause } : {}), + ...(logDecision.suppressedRepeats > 0 + ? { suppressedRepeats: logDecision.suppressedRepeats } + : {}), + }).pipe(Effect.as(Option.none())); + }), ); - if (Option.isNone(hasRunningSubprocess)) { + if (Option.isNone(activity)) { return; } + const hasRunningSubprocess = activity.value.hasRunningSubprocess === true; + const runningPorts = hasRunningSubprocess + ? normalizeRunningPorts( + yield* detectWebPorts(normalizeRunningPorts(activity.value.runningPorts)), + ) + : []; + const event = yield* modifyManagerState((state) => { const liveSession: Option.Option = Option.fromNullishOr( state.sessions.get(toSessionKey(session.threadId, session.terminalId)), @@ -1537,12 +1630,14 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith Option.isNone(liveSession) || liveSession.value.status !== "running" || liveSession.value.pid !== terminalPid || - liveSession.value.hasRunningSubprocess === hasRunningSubprocess.value + (liveSession.value.hasRunningSubprocess === hasRunningSubprocess && + arePortListsEqual(liveSession.value.runningSubprocessPorts, runningPorts)) ) { return [Option.none(), state] as const; } - liveSession.value.hasRunningSubprocess = hasRunningSubprocess.value; + liveSession.value.hasRunningSubprocess = hasRunningSubprocess; + liveSession.value.runningSubprocessPorts = runningPorts; liveSession.value.updatedAt = new Date().toISOString(); return [ @@ -1551,7 +1646,8 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith threadId: liveSession.value.threadId, terminalId: liveSession.value.terminalId, createdAt: new Date().toISOString(), - hasRunningSubprocess: hasRunningSubprocess.value, + hasRunningSubprocess, + runningPorts, }), state, ] as const; @@ -1650,6 +1746,7 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith unsubscribeData: null, unsubscribeExit: null, hasRunningSubprocess: false, + runningSubprocessPorts: [], runtimeEnv: normalizedRuntimeEnv(input.env), }; @@ -1829,6 +1926,7 @@ export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWith unsubscribeData: null, unsubscribeExit: null, hasRunningSubprocess: false, + runningSubprocessPorts: [], runtimeEnv: normalizedRuntimeEnv(input.env), }; const createdSession = session; diff --git a/apps/server/src/terminal/Services/Manager.ts b/apps/server/src/terminal/Services/Manager.ts index fb7a7da7b64..2064af97156 100644 --- a/apps/server/src/terminal/Services/Manager.ts +++ b/apps/server/src/terminal/Services/Manager.ts @@ -51,6 +51,7 @@ export interface TerminalSessionState { unsubscribeData: (() => void) | null; unsubscribeExit: (() => void) | null; hasRunningSubprocess: boolean; + runningSubprocessPorts: number[]; runtimeEnv: Record | null; } diff --git a/apps/web/src/components/ChatView.browser.tsx b/apps/web/src/components/ChatView.browser.tsx index 41f627332e3..92f68e04b5e 100644 --- a/apps/web/src/components/ChatView.browser.tsx +++ b/apps/web/src/components/ChatView.browser.tsx @@ -1753,6 +1753,7 @@ describe("ChatView timeline estimator parity (full app)", () => { terminalHeight: 280, terminalIds: ["default"], runningTerminalIds: [], + runningTerminalPorts: {}, activeTerminalId: "default", terminalGroups: [{ id: "group-default", terminalIds: ["default"] }], activeTerminalGroupId: "group-default", diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index 76431368f30..8489394bece 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -564,6 +564,8 @@ const PersistentThreadTerminalDrawer = memo(function PersistentThreadTerminalDra activeTerminalId={terminalState.activeTerminalId} terminalGroups={terminalState.terminalGroups} activeTerminalGroupId={terminalState.activeTerminalGroupId} + runningTerminalIds={terminalState.runningTerminalIds} + runningTerminalPorts={terminalState.runningTerminalPorts} focusRequestId={focusRequestId + localFocusRequestId + (visible ? 1 : 0)} onSplitTerminal={splitTerminal} onNewTerminal={createNewTerminal} diff --git a/apps/web/src/components/Sidebar.tsx b/apps/web/src/components/Sidebar.tsx index 9939833a951..8c566e0e4c5 100644 --- a/apps/web/src/components/Sidebar.tsx +++ b/apps/web/src/components/Sidebar.tsx @@ -11,12 +11,7 @@ import { TerminalIcon, TriangleAlertIcon, } from "lucide-react"; -import { - prStatusIndicator, - resolveThreadPr, - terminalStatusFromRunningIds, - ThreadStatusLabel, -} from "./ThreadStatusIndicators"; +import { prStatusIndicator, resolveThreadPr, ThreadStatusLabel } from "./ThreadStatusIndicators"; import { ProjectFavicon } from "./ProjectFavicon"; import { autoAnimate } from "@formkit/auto-animate"; import React, { useCallback, useEffect, memo, useMemo, useRef, useState } from "react"; @@ -70,7 +65,11 @@ import { selectThreadByRef, useStore, } from "../store"; -import { selectThreadTerminalState, useTerminalStateStore } from "../terminalStateStore"; +import { + normalizeRunningPorts, + selectThreadTerminalState, + useTerminalStateStore, +} from "../terminalStateStore"; import { useUiStateStore } from "../uiStateStore"; import { resolveShortcutCommand, @@ -199,6 +198,42 @@ const PROJECT_GROUPING_MODE_LABELS: Record = separate: "Keep separate", }; +interface TerminalStatusIndicator { + label: string; + colorClass: string; + pulse: boolean; + primaryWebPort: number | null; +} + +function terminalStatusFromTerminalState( + runningTerminalIds: string[], + runningTerminalPorts: Record, +): TerminalStatusIndicator | null { + if (runningTerminalIds.length === 0) { + return null; + } + + const runningPorts = normalizeRunningPorts( + runningTerminalIds.flatMap((terminalId) => runningTerminalPorts[terminalId] ?? []), + ); + const primaryWebPort = runningPorts[0] ?? null; + + return { + label: + primaryWebPort === null + ? "Terminal process running" + : runningPorts.length === 1 + ? `Open web server: http://localhost:${primaryWebPort}` + : `Open web server: http://localhost:${primaryWebPort} (detected web ports: ${runningPorts.join(", ")})`, + colorClass: + primaryWebPort === null + ? "text-teal-600 dark:text-teal-300/90" + : "text-sky-600 dark:text-sky-300/90", + pulse: true, + primaryWebPort, + }; +} + function threadJumpLabelMapsEqual( left: ReadonlyMap, right: ReadonlyMap, @@ -336,9 +371,8 @@ const SidebarThreadRow = memo(function SidebarThreadRow(props: SidebarThreadRowP const lastVisitedAt = useUiStateStore((state) => state.threadLastVisitedAtById[threadKey]); const isSelected = useThreadSelectionStore((state) => state.selectedThreadKeys.has(threadKey)); const hasSelection = useThreadSelectionStore((state) => state.selectedThreadKeys.size > 0); - const runningTerminalIds = useTerminalStateStore( - (state) => - selectThreadTerminalState(state.terminalStateByThreadKey, threadRef).runningTerminalIds, + const terminalState = useTerminalStateStore((state) => + selectThreadTerminalState(state.terminalStateByThreadKey, threadRef), ); const primaryEnvironmentId = usePrimaryEnvironmentId(); const isRemoteThread = @@ -379,7 +413,10 @@ const SidebarThreadRow = memo(function SidebarThreadRow(props: SidebarThreadRowP }); const pr = resolveThreadPr(thread.branch, gitStatus.data); const prStatus = prStatusIndicator(pr); - const terminalStatus = terminalStatusFromRunningIds(runningTerminalIds); + const terminalStatus = terminalStatusFromTerminalState( + terminalState.runningTerminalIds, + terminalState.runningTerminalPorts, + ); const isConfirmingArchive = confirmingArchiveThreadKey === threadKey && !isThreadRunning; const threadMetaClassName = isConfirmingArchive ? "pointer-events-none opacity-0" @@ -453,6 +490,19 @@ const SidebarThreadRow = memo(function SidebarThreadRow(props: SidebarThreadRowP }, [openPrLink, prStatus], ); + const handleTerminalStatusClick = useCallback( + (event: React.MouseEvent) => { + if (terminalStatus?.primaryWebPort == null) return; + event.preventDefault(); + event.stopPropagation(); + const api = readLocalApi(); + if (!api) return; + void api.shell + .openExternal(`http://localhost:${terminalStatus.primaryWebPort}`) + .catch(() => undefined); + }, + [terminalStatus], + ); const handleRenameInputRef = useCallback( (element: HTMLInputElement | null) => { if (element && renamingInputRef.current !== element) { @@ -606,16 +656,27 @@ const SidebarThreadRow = memo(function SidebarThreadRow(props: SidebarThreadRowP )}
- {terminalStatus && ( - - - - )} + {terminalStatus && + (terminalStatus.primaryWebPort === null ? ( + + + + ) : ( + + ))}
{isConfirmingArchive ? ( + {runtimeStatus && + (runtimeStatus.primaryWebPort === null ? ( + + + + + } + /> + + {runtimeStatus.label} + + + ) : ( + + + openWebPort(event, runtimeStatus.primaryWebPort!) + } + aria-label={runtimeStatus.label} + > + + + } + /> + + {runtimeStatus.label} + + + ))} {normalizedTerminalIds.length > 1 && ( { ...eventBase(), type: "activity", hasRunningSubprocess: true, + runningPorts: [3000], }); const idle = terminalRunningSubprocessFromEvent({ ...eventBase(), type: "activity", hasRunningSubprocess: false, + runningPorts: [], }); expect(active).toBe(true); expect(idle).toBe(false); }); - it("clears running state when a terminal session starts/restarts/exits", () => { + it("clears running state when a terminal session starts/restarts/exits/errors", () => { const events: TerminalEvent[] = [ { ...eventBase(), type: "started", snapshot }, { ...eventBase(), type: "restarted", snapshot }, { ...eventBase(), type: "exited", exitCode: 0, exitSignal: null }, + { ...eventBase(), type: "error", message: "oops" }, ]; for (const event of events) { @@ -61,12 +64,5 @@ describe("terminalRunningSubprocessFromEvent", () => { data: "hello", }), ).toBeNull(); - expect( - terminalRunningSubprocessFromEvent({ - ...eventBase(), - type: "error", - message: "oops", - }), - ).toBeNull(); }); }); diff --git a/apps/web/src/terminalActivity.ts b/apps/web/src/terminalActivity.ts index 24f4984ff53..fe410ad4046 100644 --- a/apps/web/src/terminalActivity.ts +++ b/apps/web/src/terminalActivity.ts @@ -7,6 +7,7 @@ export function terminalRunningSubprocessFromEvent(event: TerminalEvent): boolea case "started": case "restarted": case "exited": + case "error": return false; default: return null; diff --git a/apps/web/src/terminalStateStore.test.ts b/apps/web/src/terminalStateStore.test.ts index eceb621ac7e..3a70677a9e2 100644 --- a/apps/web/src/terminalStateStore.test.ts +++ b/apps/web/src/terminalStateStore.test.ts @@ -27,7 +27,13 @@ function makeTerminalEvent( case "output": return { ...base, type, data: "hello\n", ...overrides } as TerminalEvent; case "activity": - return { ...base, type, hasRunningSubprocess: true, ...overrides } as TerminalEvent; + return { + ...base, + type, + hasRunningSubprocess: true, + runningPorts: [], + ...overrides, + } as TerminalEvent; case "error": return { ...base, type, message: "boom", ...overrides } as TerminalEvent; case "cleared": @@ -77,12 +83,34 @@ describe("terminalStateStore actions", () => { terminalHeight: 280, terminalIds: ["default"], runningTerminalIds: [], + runningTerminalPorts: {}, activeTerminalId: "default", terminalGroups: [{ id: "group-default", terminalIds: ["default"] }], activeTerminalGroupId: "group-default", }); }); + it("normalizes legacy persisted thread terminal state at selector boundaries", () => { + const terminalState = selectThreadTerminalState( + { + [scopedThreadKey(THREAD_REF)]: { + terminalOpen: true, + terminalHeight: 280, + terminalIds: ["default"], + runningTerminalIds: ["default", "default"], + runningTerminalPorts: { default: [5173, 3000, 5173, 0] }, + activeTerminalId: "default", + terminalGroups: [{ id: "group-default", terminalIds: ["default"] }], + activeTerminalGroupId: "group-default", + }, + }, + THREAD_REF, + ); + + expect(terminalState.runningTerminalIds).toEqual(["default"]); + expect(terminalState.runningTerminalPorts).toEqual({ default: [3000, 5173] }); + }); + it("opens and splits terminals into the active group", () => { const store = useTerminalStateStore.getState(); store.setTerminalOpen(THREAD_REF, true); @@ -219,21 +247,21 @@ describe("terminalStateStore actions", () => { it("tracks and clears terminal subprocess activity", () => { const store = useTerminalStateStore.getState(); store.splitTerminal(THREAD_REF, "terminal-2"); - store.setTerminalActivity(THREAD_REF, "terminal-2", true); - expect( - selectThreadTerminalState( - useTerminalStateStore.getState().terminalStateByThreadKey, - THREAD_REF, - ).runningTerminalIds, - ).toEqual(["terminal-2"]); + store.setTerminalActivity(THREAD_REF, "terminal-2", true, [5173, 3000, 5173]); + let terminalState = selectThreadTerminalState( + useTerminalStateStore.getState().terminalStateByThreadKey, + THREAD_REF, + ); + expect(terminalState.runningTerminalIds).toEqual(["terminal-2"]); + expect(terminalState.runningTerminalPorts).toEqual({ "terminal-2": [3000, 5173] }); store.setTerminalActivity(THREAD_REF, "terminal-2", false); - expect( - selectThreadTerminalState( - useTerminalStateStore.getState().terminalStateByThreadKey, - THREAD_REF, - ).runningTerminalIds, - ).toEqual([]); + terminalState = selectThreadTerminalState( + useTerminalStateStore.getState().terminalStateByThreadKey, + THREAD_REF, + ); + expect(terminalState.runningTerminalIds).toEqual([]); + expect(terminalState.runningTerminalPorts).toEqual({}); }); it("resets to default and clears persisted entry when closing the last terminal", () => { @@ -339,14 +367,15 @@ describe("terminalStateStore actions", () => { makeTerminalEvent("activity", { terminalId: "terminal-2", hasRunningSubprocess: true, + runningPorts: [5173, 3000], }), ); - expect( - selectThreadTerminalState( - useTerminalStateStore.getState().terminalStateByThreadKey, - THREAD_REF, - ).runningTerminalIds, - ).toEqual(["terminal-2"]); + let terminalState = selectThreadTerminalState( + useTerminalStateStore.getState().terminalStateByThreadKey, + THREAD_REF, + ); + expect(terminalState.runningTerminalIds).toEqual(["terminal-2"]); + expect(terminalState.runningTerminalPorts).toEqual({ "terminal-2": [3000, 5173] }); store.applyTerminalEvent( THREAD_REF, @@ -357,7 +386,7 @@ describe("terminalStateStore actions", () => { }), ); - const terminalState = selectThreadTerminalState( + terminalState = selectThreadTerminalState( useTerminalStateStore.getState().terminalStateByThreadKey, THREAD_REF, ); @@ -368,6 +397,7 @@ describe("terminalStateStore actions", () => { ); expect(terminalState.runningTerminalIds).toEqual([]); + expect(terminalState.runningTerminalPorts).toEqual({}); expect(entries.map((entry) => entry.event.type)).toEqual(["activity", "exited"]); }); diff --git a/apps/web/src/terminalStateStore.ts b/apps/web/src/terminalStateStore.ts index 962c92f180e..30c7b4de318 100644 --- a/apps/web/src/terminalStateStore.ts +++ b/apps/web/src/terminalStateStore.ts @@ -23,6 +23,7 @@ interface ThreadTerminalState { terminalHeight: number; terminalIds: string[]; runningTerminalIds: string[]; + runningTerminalPorts: Record; activeTerminalId: string; terminalGroups: ThreadTerminalGroup[]; activeTerminalGroupId: string; @@ -82,6 +83,30 @@ function normalizeRunningTerminalIds( .filter((id) => id.length > 0 && validTerminalIdSet.has(id)); } +export function normalizeRunningPorts(ports: readonly number[] | undefined): number[] { + if (!ports || ports.length === 0) return []; + return [...new Set(ports)] + .filter((port) => Number.isInteger(port) && port > 0 && port <= 65_535) + .toSorted((left, right) => left - right); +} + +function normalizeRunningTerminalPorts( + runningTerminalPorts: Record | undefined, + terminalIds: string[], +): Record { + if (!runningTerminalPorts) return {}; + const validTerminalIdSet = new Set(terminalIds); + const normalizedEntries: Array<[string, number[]]> = []; + for (const [rawTerminalId, ports] of Object.entries(runningTerminalPorts)) { + const terminalId = rawTerminalId.trim(); + if (terminalId.length === 0 || !validTerminalIdSet.has(terminalId)) { + continue; + } + normalizedEntries.push([terminalId, normalizeRunningPorts(ports)]); + } + return Object.fromEntries(normalizedEntries); +} + function fallbackGroupId(terminalId: string): string { return `group-${terminalId}`; } @@ -157,7 +182,7 @@ function normalizeTerminalGroups( return nextGroups; } -function arraysEqual(a: string[], b: string[]): boolean { +function arraysEqual(a: readonly T[], b: readonly T[]): boolean { if (a.length !== b.length) return false; for (let index = 0; index < a.length; index += 1) { if (a[index] !== b[index]) return false; @@ -177,6 +202,22 @@ function terminalGroupsEqual(left: ThreadTerminalGroup[], right: ThreadTerminalG return true; } +function runningTerminalPortsEqual( + left: Record, + right: Record, +): boolean { + const leftEntries = Object.entries(left); + const rightEntries = Object.entries(right); + if (leftEntries.length !== rightEntries.length) return false; + for (const [terminalId, leftPorts] of leftEntries) { + const rightPorts = right[terminalId]; + if (!rightPorts || !arraysEqual(leftPorts, rightPorts)) { + return false; + } + } + return true; +} + function threadTerminalStateEqual(left: ThreadTerminalState, right: ThreadTerminalState): boolean { return ( left.terminalOpen === right.terminalOpen && @@ -185,6 +226,7 @@ function threadTerminalStateEqual(left: ThreadTerminalState, right: ThreadTermin left.activeTerminalGroupId === right.activeTerminalGroupId && arraysEqual(left.terminalIds, right.terminalIds) && arraysEqual(left.runningTerminalIds, right.runningTerminalIds) && + runningTerminalPortsEqual(left.runningTerminalPorts, right.runningTerminalPorts) && terminalGroupsEqual(left.terminalGroups, right.terminalGroups) ); } @@ -194,6 +236,7 @@ const DEFAULT_THREAD_TERMINAL_STATE: ThreadTerminalState = Object.freeze({ terminalHeight: DEFAULT_THREAD_TERMINAL_HEIGHT, terminalIds: [DEFAULT_THREAD_TERMINAL_ID], runningTerminalIds: [], + runningTerminalPorts: {}, activeTerminalId: DEFAULT_THREAD_TERMINAL_ID, terminalGroups: [ { @@ -209,6 +252,7 @@ function createDefaultThreadTerminalState(): ThreadTerminalState { ...DEFAULT_THREAD_TERMINAL_STATE, terminalIds: [...DEFAULT_THREAD_TERMINAL_STATE.terminalIds], runningTerminalIds: [...DEFAULT_THREAD_TERMINAL_STATE.runningTerminalIds], + runningTerminalPorts: { ...DEFAULT_THREAD_TERMINAL_STATE.runningTerminalPorts }, terminalGroups: copyTerminalGroups(DEFAULT_THREAD_TERMINAL_STATE.terminalGroups), }; } @@ -221,6 +265,10 @@ function normalizeThreadTerminalState(state: ThreadTerminalState): ThreadTermina const terminalIds = normalizeTerminalIds(state.terminalIds); const nextTerminalIds = terminalIds.length > 0 ? terminalIds : [DEFAULT_THREAD_TERMINAL_ID]; const runningTerminalIds = normalizeRunningTerminalIds(state.runningTerminalIds, nextTerminalIds); + const runningTerminalPorts = normalizeRunningTerminalPorts( + state.runningTerminalPorts, + nextTerminalIds, + ); const activeTerminalId = nextTerminalIds.includes(state.activeTerminalId) ? state.activeTerminalId : (nextTerminalIds[0] ?? DEFAULT_THREAD_TERMINAL_ID); @@ -241,6 +289,7 @@ function normalizeThreadTerminalState(state: ThreadTerminalState): ThreadTermina : DEFAULT_THREAD_TERMINAL_HEIGHT, terminalIds: nextTerminalIds, runningTerminalIds, + runningTerminalPorts, activeTerminalId, terminalGroups, activeTerminalGroupId: @@ -481,6 +530,9 @@ function closeThreadTerminal(state: ThreadTerminalState, terminalId: string): Th terminalHeight: normalized.terminalHeight, terminalIds: remainingTerminalIds, runningTerminalIds: normalized.runningTerminalIds.filter((id) => id !== terminalId), + runningTerminalPorts: Object.fromEntries( + Object.entries(normalized.runningTerminalPorts).filter(([key]) => key !== terminalId), + ), activeTerminalId: nextActiveTerminalId, terminalGroups, activeTerminalGroupId: nextActiveTerminalGroupId, @@ -491,22 +543,35 @@ function setThreadTerminalActivity( state: ThreadTerminalState, terminalId: string, hasRunningSubprocess: boolean, + runningPorts: readonly number[] = [], ): ThreadTerminalState { const normalized = normalizeThreadTerminalState(state); if (!normalized.terminalIds.includes(terminalId)) { return normalized; } const alreadyRunning = normalized.runningTerminalIds.includes(terminalId); - if (hasRunningSubprocess === alreadyRunning) { + const nextRunningPorts = normalizeRunningPorts(runningPorts); + const currentRunningPorts = normalized.runningTerminalPorts[terminalId] ?? []; + if ( + hasRunningSubprocess === alreadyRunning && + arraysEqual(currentRunningPorts, nextRunningPorts) + ) { return normalized; } const runningTerminalIds = new Set(normalized.runningTerminalIds); + const runningTerminalPorts = { ...normalized.runningTerminalPorts }; if (hasRunningSubprocess) { runningTerminalIds.add(terminalId); + runningTerminalPorts[terminalId] = nextRunningPorts; } else { runningTerminalIds.delete(terminalId); + delete runningTerminalPorts[terminalId]; } - return { ...normalized, runningTerminalIds: [...runningTerminalIds] }; + return { + ...normalized, + runningTerminalIds: [...runningTerminalIds], + runningTerminalPorts, + }; } export function selectThreadTerminalState( @@ -516,7 +581,8 @@ export function selectThreadTerminalState( if (!threadRef || threadRef.threadId.length === 0) { return getDefaultThreadTerminalState(); } - return terminalStateByThreadKey[terminalThreadKey(threadRef)] ?? getDefaultThreadTerminalState(); + const state = terminalStateByThreadKey[terminalThreadKey(threadRef)]; + return state ? normalizeThreadTerminalState(state) : getDefaultThreadTerminalState(); } function updateTerminalStateByThreadKey( @@ -588,6 +654,7 @@ interface TerminalStateStoreState { threadRef: ScopedThreadRef, terminalId: string, hasRunningSubprocess: boolean, + runningPorts?: readonly number[], ) => void; recordTerminalEvent: (threadRef: ScopedThreadRef, event: TerminalEvent) => void; applyTerminalEvent: (threadRef: ScopedThreadRef, event: TerminalEvent) => void; @@ -672,9 +739,9 @@ export const useTerminalStateStore = create()( const { [threadKey]: _removed, ...rest } = state.terminalLaunchContextByThreadKey; return { terminalLaunchContextByThreadKey: rest }; }), - setTerminalActivity: (threadRef, terminalId, hasRunningSubprocess) => + setTerminalActivity: (threadRef, terminalId, hasRunningSubprocess, runningPorts) => updateTerminal(threadRef, (state) => - setThreadTerminalActivity(state, terminalId, hasRunningSubprocess), + setThreadTerminalActivity(state, terminalId, hasRunningSubprocess, runningPorts), ), recordTerminalEvent: (threadRef, event) => set((state) => @@ -717,7 +784,12 @@ export const useTerminalStateStore = create()( nextTerminalStateByThreadKey, threadRef, (current) => - setThreadTerminalActivity(current, event.terminalId, hasRunningSubprocess), + setThreadTerminalActivity( + current, + event.terminalId, + hasRunningSubprocess, + event.type === "activity" ? event.runningPorts : [], + ), ); } diff --git a/packages/contracts/src/terminal.test.ts b/packages/contracts/src/terminal.test.ts index 3feae674924..0ec2819a7d8 100644 --- a/packages/contracts/src/terminal.test.ts +++ b/packages/contracts/src/terminal.test.ts @@ -205,6 +205,7 @@ describe("TerminalEvent", () => { terminalId: DEFAULT_TERMINAL_ID, createdAt: new Date().toISOString(), hasRunningSubprocess: true, + runningPorts: [3000], }), ).toBe(true); }); diff --git a/packages/contracts/src/terminal.ts b/packages/contracts/src/terminal.ts index 21bd74a0999..e2aff929395 100644 --- a/packages/contracts/src/terminal.ts +++ b/packages/contracts/src/terminal.ts @@ -10,6 +10,7 @@ const TerminalColsSchema = Schema.Int.check(Schema.isGreaterThanOrEqualTo(20)).c const TerminalRowsSchema = Schema.Int.check(Schema.isGreaterThanOrEqualTo(5)).check( Schema.isLessThanOrEqualTo(200), ); +const TerminalPortSchema = Schema.Int.check(Schema.isBetween({ minimum: 1, maximum: 65_535 })); const TerminalIdSchema = TrimmedNonEmptyStringSchema.check(Schema.isMaxLength(128)); const TerminalEnvKeySchema = Schema.String.check( Schema.isPattern(/^[A-Za-z_][A-Za-z0-9_]*$/), @@ -140,8 +141,8 @@ const TerminalActivityEvent = Schema.Struct({ ...TerminalEventBaseSchema.fields, type: Schema.Literal("activity"), hasRunningSubprocess: Schema.Boolean, + runningPorts: Schema.Array(TerminalPortSchema), }); - export const TerminalEvent = Schema.Union([ TerminalStartedEvent, TerminalOutputEvent,