diff --git a/apps/desktop/src/backendReadiness.test.ts b/apps/desktop/src/backendReadiness.test.ts index fd6180b5dac..33a5ef6b715 100644 --- a/apps/desktop/src/backendReadiness.test.ts +++ b/apps/desktop/src/backendReadiness.test.ts @@ -7,7 +7,7 @@ import { } from "./backendReadiness"; describe("waitForHttpReady", () => { - it("returns once the backend reports a successful session endpoint", async () => { + it("returns once the backend serves the requested readiness path", async () => { const fetchImpl = vi .fn() .mockResolvedValueOnce(new Response(null, { status: 503 })) @@ -20,6 +20,11 @@ describe("waitForHttpReady", () => { }); expect(fetchImpl).toHaveBeenCalledTimes(2); + expect(fetchImpl).toHaveBeenNthCalledWith( + 1, + "http://127.0.0.1:3773/", + expect.objectContaining({ redirect: "manual" }), + ); }); it("retries after a readiness request stalls past the per-request timeout", async () => { @@ -80,4 +85,30 @@ describe("waitForHttpReady", () => { expect(isBackendReadinessAborted(new BackendReadinessAbortedError())).toBe(true); expect(isBackendReadinessAborted(new Error("nope"))).toBe(false); }); + + it("supports custom readiness predicates", async () => { + const fetchImpl = vi + .fn() + .mockResolvedValueOnce(new Response(null, { status: 200 })) + .mockResolvedValueOnce(new Response(null, { status: 204 })); + + await waitForHttpReady("http://127.0.0.1:3773", { + fetchImpl, + timeoutMs: 1_000, + intervalMs: 0, + path: "/api/healthz", + isReady: (response) => response.status === 204, + }); + + expect(fetchImpl).toHaveBeenNthCalledWith( + 1, + "http://127.0.0.1:3773/api/healthz", + expect.objectContaining({ redirect: "manual" }), + ); + expect(fetchImpl).toHaveBeenNthCalledWith( + 2, + "http://127.0.0.1:3773/api/healthz", + expect.objectContaining({ redirect: "manual" }), + ); + }); }); diff --git a/apps/desktop/src/backendReadiness.ts b/apps/desktop/src/backendReadiness.ts index 73081823a0e..71c28929ebe 100644 --- a/apps/desktop/src/backendReadiness.ts +++ b/apps/desktop/src/backendReadiness.ts @@ -4,6 +4,8 @@ export interface WaitForHttpReadyOptions { readonly requestTimeoutMs?: number; readonly fetchImpl?: typeof fetch; readonly signal?: AbortSignal; + readonly path?: string; + readonly isReady?: (response: Response) => boolean; } const DEFAULT_TIMEOUT_MS = 30_000; @@ -57,6 +59,8 @@ export async function waitForHttpReady( const timeoutMs = options?.timeoutMs ?? DEFAULT_TIMEOUT_MS; const intervalMs = options?.intervalMs ?? DEFAULT_INTERVAL_MS; const requestTimeoutMs = options?.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS; + const readinessPath = options?.path ?? "/"; + const isReady = options?.isReady ?? ((response: Response) => response.ok); const deadline = Date.now() + timeoutMs; for (;;) { @@ -74,11 +78,11 @@ export async function waitForHttpReady( signal?.addEventListener("abort", abortRequest, { once: true }); try { - const response = await fetchImpl(`${baseUrl}/api/auth/session`, { + const response = await fetchImpl(new URL(readinessPath, baseUrl).toString(), { redirect: "manual", signal: requestController.signal, }); - if (response.ok) { + if (isReady(response)) { return; } } catch (error) { diff --git a/apps/desktop/src/main.ts b/apps/desktop/src/main.ts index fa0ad50da4d..cdabb2d74fb 100644 --- a/apps/desktop/src/main.ts +++ b/apps/desktop/src/main.ts @@ -55,6 +55,7 @@ import { showDesktopConfirmDialog } from "./confirmDialog"; import { resolveDesktopServerExposure } from "./serverExposure"; import { syncShellEnvironment } from "./syncShellEnvironment"; import { getAutoUpdateDisabledReason, shouldBroadcastDownloadProgress } from "./updateState"; +import { ServerListeningDetector } from "./serverListeningDetector"; import { createInitialDesktopUpdateState, reduceDesktopUpdateStateOnCheckFailure, @@ -144,6 +145,8 @@ let backendWsUrl = ""; let backendEndpointUrl: string | null = null; let backendAdvertisedHost: string | null = null; let backendReadinessAbortController: AbortController | null = null; +let backendInitialWindowOpenInFlight: Promise | null = null; +let backendListeningDetector: ServerListeningDetector | null = null; let restartAttempt = 0; let restartTimer: ReturnType | null = null; let isQuitting = false; @@ -362,13 +365,17 @@ function getSafeTheme(rawTheme: unknown): DesktopTheme | null { return null; } -async function waitForBackendHttpReady(baseUrl: string): Promise { +async function waitForBackendHttpReady( + baseUrl: string, + options?: Parameters[1], +): Promise { cancelBackendReadinessWait(); const controller = new AbortController(); backendReadinessAbortController = controller; try { await waitForHttpReady(baseUrl, { + ...options, signal: controller.signal, }); } finally { @@ -383,6 +390,88 @@ function cancelBackendReadinessWait(): void { backendReadinessAbortController = null; } +async function waitForBackendWindowReady(baseUrl: string): Promise<"listening" | "http"> { + const httpReadyPromise = waitForBackendHttpReady(baseUrl, { + timeoutMs: 60_000, + }); + const listeningPromise = backendListeningDetector?.promise; + + if (!listeningPromise) { + await httpReadyPromise; + return "http"; + } + + return await new Promise<"listening" | "http">((resolve, reject) => { + let settled = false; + + const settleResolve = (source: "listening" | "http") => { + if (settled) { + return; + } + settled = true; + if (source === "listening") { + cancelBackendReadinessWait(); + } + resolve(source); + }; + + const settleReject = (error: unknown) => { + if (settled) { + return; + } + settled = true; + reject(error); + }; + + listeningPromise.then( + () => settleResolve("listening"), + (error) => settleReject(error), + ); + httpReadyPromise.then( + () => settleResolve("http"), + (error) => { + if (settled && isBackendReadinessAborted(error)) { + return; + } + settleReject(error); + }, + ); + }); +} + +function ensureInitialBackendWindowOpen(): void { + const existingWindow = mainWindow ?? BrowserWindow.getAllWindows()[0] ?? null; + if (isDevelopment || existingWindow !== null || backendInitialWindowOpenInFlight !== null) { + return; + } + + const nextOpen = waitForBackendWindowReady(backendHttpUrl) + .then((source) => { + writeDesktopLogHeader(`bootstrap backend ready source=${source}`); + if (mainWindow ?? BrowserWindow.getAllWindows()[0]) { + return; + } + mainWindow = createWindow(); + writeDesktopLogHeader("bootstrap main window created"); + }) + .catch((error) => { + if (isBackendReadinessAborted(error)) { + return; + } + writeDesktopLogHeader( + `bootstrap backend readiness warning message=${formatErrorMessage(error)}`, + ); + console.warn("[desktop] backend readiness check timed out during packaged bootstrap", error); + }) + .finally(() => { + if (backendInitialWindowOpenInFlight === nextOpen) { + backendInitialWindowOpenInFlight = null; + } + }); + + backendInitialWindowOpenInFlight = nextOpen; +} + function writeDesktopStreamChunk( streamName: "stdout" | "stderr", chunk: unknown, @@ -460,14 +549,16 @@ function initializePackagedLogging(): void { } function captureBackendOutput(child: ChildProcess.ChildProcess): void { - if (!app.isPackaged || backendLogSink === null) return; - const writeChunk = (chunk: unknown): void => { - if (!backendLogSink) return; - const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(String(chunk), "utf8"); - backendLogSink.write(buffer); + const attachStream = (stream: NodeJS.ReadableStream | null | undefined): void => { + stream?.on("data", (chunk: unknown) => { + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(String(chunk), "utf8"); + backendLogSink?.write(buffer); + backendListeningDetector?.push(buffer); + }); }; - child.stdout?.on("data", writeChunk); - child.stderr?.on("data", writeChunk); + + attachStream(child.stdout); + attachStream(child.stderr); } initializePackagedLogging(); @@ -1222,7 +1313,7 @@ function startBackend(): void { return; } - const captureBackendLogs = app.isPackaged && backendLogSink !== null; + const captureBackendLogs = !isDevelopment; const child = ChildProcess.spawn(process.execPath, [backendEntry, "--bootstrap-fd", "3"], { cwd: resolveBackendCwd(), // In Electron main, process.execPath points to the Electron binary. @@ -1259,6 +1350,8 @@ function startBackend(): void { scheduleBackendRestart("missing desktop bootstrap pipe"); return; } + const listeningDetector = new ServerListeningDetector(); + backendListeningDetector = listeningDetector; backendProcess = child; let backendSessionClosed = false; const closeBackendSession = (details: string) => { @@ -1277,6 +1370,10 @@ function startBackend(): void { }); child.on("error", (error) => { + if (backendListeningDetector === listeningDetector) { + listeningDetector.fail(error); + backendListeningDetector = null; + } const wasExpected = expectedBackendExitChildren.has(child); if (backendProcess === child) { backendProcess = null; @@ -1289,6 +1386,14 @@ function startBackend(): void { }); child.on("exit", (code, signal) => { + if (backendListeningDetector === listeningDetector) { + listeningDetector.fail( + new Error( + `backend exited before logging readiness (code=${code ?? "null"} signal=${signal ?? "null"})`, + ), + ); + backendListeningDetector = null; + } const wasExpected = expectedBackendExitChildren.has(child); if (backendProcess === child) { backendProcess = null; @@ -1300,10 +1405,13 @@ function startBackend(): void { const reason = `code=${code ?? "null"} signal=${signal ?? "null"}`; scheduleBackendRestart(reason); }); + + ensureInitialBackendWindowOpen(); } function stopBackend(): void { cancelBackendReadinessWait(); + backendListeningDetector = null; if (restartTimer) { clearTimeout(restartTimer); restartTimer = null; @@ -1705,7 +1813,7 @@ function createWindow(): BrowserWindow { height: 780, minWidth: 840, minHeight: 620, - show: isDevelopment, + show: false, autoHideMenuBar: true, backgroundColor: getInitialWindowBackgroundColor(), ...getIconOption(), @@ -1779,20 +1887,23 @@ function createWindow(): BrowserWindow { window.setTitle(APP_DISPLAY_NAME); emitUpdateState(); }); - if (!isDevelopment) { - window.once("ready-to-show", () => { - revealWindow(window); - }); - } + + let initialRevealScheduled = false; + const revealInitialWindow = () => { + if (initialRevealScheduled) { + return; + } + initialRevealScheduled = true; + revealWindow(window); + }; + + window.once("ready-to-show", revealInitialWindow); if (isDevelopment) { void window.loadURL(resolveDesktopDevServerUrl()); window.webContents.openDevTools({ mode: "detach" }); - setImmediate(() => { - revealWindow(window); - }); } else { - void window.loadURL(resolveDesktopWindowUrl()); + void window.loadURL(backendHttpUrl); } window.on("closed", () => { @@ -1804,14 +1915,6 @@ function createWindow(): BrowserWindow { return window; } -function resolveDesktopWindowUrl(): string { - if (backendHttpUrl) { - return backendHttpUrl; - } - - return `${DESKTOP_SCHEME}://app`; -} - // Override Electron's userData path before the `ready` event so that // Chromium session data uses a filesystem-friendly directory name. // Must be called synchronously at the top level — before `app.whenReady()`. @@ -1885,10 +1988,7 @@ async function bootstrap(): Promise { return; } - await waitForBackendHttpReady(backendHttpUrl); - writeDesktopLogHeader("bootstrap backend ready"); - mainWindow = createWindow(); - writeDesktopLogHeader("bootstrap main window created"); + ensureInitialBackendWindowOpen(); } app.on("before-quit", () => { @@ -1922,7 +2022,11 @@ app revealWindow(existingWindow); return; } - mainWindow = createWindow(); + if (isDevelopment) { + mainWindow = createWindow(); + return; + } + ensureInitialBackendWindowOpen(); }); }) .catch((error) => { diff --git a/apps/desktop/src/serverListeningDetector.test.ts b/apps/desktop/src/serverListeningDetector.test.ts new file mode 100644 index 00000000000..b7c66b6312c --- /dev/null +++ b/apps/desktop/src/serverListeningDetector.test.ts @@ -0,0 +1,31 @@ +import { describe, expect, it } from "vitest"; + +import { ServerListeningDetector } from "./serverListeningDetector"; + +describe("ServerListeningDetector", () => { + it("resolves when the server logs the listening line", async () => { + const detector = new ServerListeningDetector(); + + detector.push("[01:23:30.571] INFO (#148): Listening on http://0.0.0.0:7011\n"); + + await expect(detector.promise).resolves.toBeUndefined(); + }); + + it("resolves when the listening line arrives across multiple chunks", async () => { + const detector = new ServerListeningDetector(); + + detector.push("[01:23:30.571] INFO (#148): Listen"); + detector.push("ing on http://0.0.0.0:7011\n"); + + await expect(detector.promise).resolves.toBeUndefined(); + }); + + it("rejects when the server exits before logging readiness", async () => { + const detector = new ServerListeningDetector(); + const error = new Error("server exited"); + + detector.fail(error); + + await expect(detector.promise).rejects.toBe(error); + }); +}); diff --git a/apps/desktop/src/serverListeningDetector.ts b/apps/desktop/src/serverListeningDetector.ts new file mode 100644 index 00000000000..e738aacc38d --- /dev/null +++ b/apps/desktop/src/serverListeningDetector.ts @@ -0,0 +1,56 @@ +const LISTENING_LOG_FRAGMENT = "Listening on http://"; +const MAX_BUFFER_CHARS = 8_192; + +export class ServerListeningDetector { + private buffer = ""; + private settled = false; + private readonly resolvePromise: () => void; + private readonly rejectPromise: (error: unknown) => void; + readonly promise: Promise; + + constructor() { + let resolvePromise: (() => void) | null = null; + let rejectPromise: ((error: unknown) => void) | null = null; + + this.promise = new Promise((resolve, reject) => { + resolvePromise = resolve; + rejectPromise = reject; + }); + + this.resolvePromise = () => { + if (this.settled) { + return; + } + this.settled = true; + resolvePromise?.(); + }; + this.rejectPromise = (error) => { + if (this.settled) { + return; + } + this.settled = true; + rejectPromise?.(error); + }; + } + + push(chunk: unknown): void { + if (this.settled) { + return; + } + + const text = Buffer.isBuffer(chunk) ? chunk.toString("utf8") : String(chunk); + this.buffer = `${this.buffer}${text.replace(/\r/g, "")}`; + if (this.buffer.includes(LISTENING_LOG_FRAGMENT)) { + this.resolvePromise(); + return; + } + + if (this.buffer.length > MAX_BUFFER_CHARS) { + this.buffer = this.buffer.slice(-MAX_BUFFER_CHARS); + } + } + + fail(error: unknown): void { + this.rejectPromise(error); + } +} diff --git a/apps/server/src/config.ts b/apps/server/src/config.ts index c2e5ecee459..7840c761151 100644 --- a/apps/server/src/config.ts +++ b/apps/server/src/config.ts @@ -24,6 +24,7 @@ export interface ServerDerivedPaths { readonly dbPath: string; readonly keybindingsConfigPath: string; readonly settingsPath: string; + readonly providerStatusCacheDir: string; readonly worktreesDir: string; readonly attachmentsDir: string; readonly logsDir: string; @@ -76,11 +77,13 @@ export const deriveServerPaths = Effect.fn(function* ( const attachmentsDir = join(stateDir, "attachments"); const logsDir = join(stateDir, "logs"); const providerLogsDir = join(logsDir, "provider"); + const providerStatusCacheDir = join(baseDir, "caches"); return { stateDir, dbPath, keybindingsConfigPath: join(stateDir, "keybindings.json"), settingsPath: join(stateDir, "settings.json"), + providerStatusCacheDir, worktreesDir: join(baseDir, "worktrees"), attachmentsDir, logsDir, @@ -110,6 +113,7 @@ export const ensureServerDirectories = Effect.fn(function* (derivedPaths: Server fs.makeDirectory(derivedPaths.worktreesDir, { recursive: true }), fs.makeDirectory(path.dirname(derivedPaths.keybindingsConfigPath), { recursive: true }), fs.makeDirectory(path.dirname(derivedPaths.settingsPath), { recursive: true }), + fs.makeDirectory(derivedPaths.providerStatusCacheDir, { recursive: true }), fs.makeDirectory(path.dirname(derivedPaths.anonymousIdPath), { recursive: true }), fs.makeDirectory(path.dirname(derivedPaths.serverRuntimeStatePath), { recursive: true }), ], diff --git a/apps/server/src/http.ts b/apps/server/src/http.ts index ed403894291..7420156b2e7 100644 --- a/apps/server/src/http.ts +++ b/apps/server/src/http.ts @@ -226,6 +226,7 @@ export const staticAndDevRouteLayer = HttpRouter.add( Effect.gen(function* () { const request = yield* HttpServerRequest.HttpServerRequest; const url = HttpServerRequest.toURL(request); + if (Option.isNone(url)) { return HttpServerResponse.text("Bad Request", { status: 400 }); } diff --git a/apps/server/src/provider/Layers/ClaudeProvider.ts b/apps/server/src/provider/Layers/ClaudeProvider.ts index d00be86d3e4..b7c3c3140ea 100644 --- a/apps/server/src/provider/Layers/ClaudeProvider.ts +++ b/apps/server/src/provider/Layers/ClaudeProvider.ts @@ -663,6 +663,46 @@ export const checkClaudeProviderStatus = Effect.fn("checkClaudeProviderStatus")( }); }); +const makePendingClaudeProvider = (claudeSettings: ClaudeSettings): ServerProvider => { + const checkedAt = new Date().toISOString(); + const models = providerModelsFromSettings( + BUILT_IN_MODELS, + PROVIDER, + claudeSettings.customModels, + DEFAULT_CLAUDE_MODEL_CAPABILITIES, + ); + + if (!claudeSettings.enabled) { + return buildServerProvider({ + provider: PROVIDER, + enabled: false, + checkedAt, + models, + probe: { + installed: false, + version: null, + status: "warning", + auth: { status: "unknown" }, + message: "Claude is disabled in T3 Code settings.", + }, + }); + } + + return buildServerProvider({ + provider: PROVIDER, + enabled: true, + checkedAt, + models, + probe: { + installed: false, + version: null, + status: "warning", + auth: { status: "unknown" }, + message: "Claude provider status has not been checked in this session yet.", + }, + }); +}; + export const ClaudeProviderLive = Layer.effect( ClaudeProvider, Effect.gen(function* () { @@ -698,6 +738,7 @@ export const ClaudeProviderLive = Layer.effect( Stream.map((settings) => settings.providers.claudeAgent), ), haveSettingsChanged: (previous, next) => !Equal.equals(previous, next), + initialSnapshot: makePendingClaudeProvider, checkProvider, }); }), diff --git a/apps/server/src/provider/Layers/CodexProvider.ts b/apps/server/src/provider/Layers/CodexProvider.ts index 421621c9699..d3f8c742ef5 100644 --- a/apps/server/src/provider/Layers/CodexProvider.ts +++ b/apps/server/src/provider/Layers/CodexProvider.ts @@ -552,6 +552,46 @@ export const checkCodexProviderStatus = Effect.fn("checkCodexProviderStatus")(fu }); }); +const makePendingCodexProvider = (codexSettings: CodexSettings): ServerProvider => { + const checkedAt = new Date().toISOString(); + const models = providerModelsFromSettings( + BUILT_IN_MODELS, + PROVIDER, + codexSettings.customModels, + DEFAULT_CODEX_MODEL_CAPABILITIES, + ); + + if (!codexSettings.enabled) { + return buildServerProvider({ + provider: PROVIDER, + enabled: false, + checkedAt, + models, + probe: { + installed: false, + version: null, + status: "warning", + auth: { status: "unknown" }, + message: "Codex is disabled in T3 Code settings.", + }, + }); + } + + return buildServerProvider({ + provider: PROVIDER, + enabled: true, + checkedAt, + models, + probe: { + installed: false, + version: null, + status: "warning", + auth: { status: "unknown" }, + message: "Codex provider status has not been checked in this session yet.", + }, + }); +}; + export const CodexProviderLive = Layer.effect( CodexProvider, Effect.gen(function* () { @@ -602,6 +642,7 @@ export const CodexProviderLive = Layer.effect( Stream.map((settings) => settings.providers.codex), ), haveSettingsChanged: (previous, next) => !Equal.equals(previous, next), + initialSnapshot: makePendingCodexProvider, checkProvider, }); }), diff --git a/apps/server/src/provider/Layers/ProviderRegistry.test.ts b/apps/server/src/provider/Layers/ProviderRegistry.test.ts index 9c12048e45e..93d146acf96 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.test.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.test.ts @@ -31,6 +31,7 @@ import { } from "./CodexProvider"; import { checkClaudeProviderStatus, parseClaudeAuthStatusFromOutput } from "./ClaudeProvider"; import { haveProvidersChanged, ProviderRegistryLive } from "./ProviderRegistry"; +import { ServerConfig } from "../../config"; import { ServerSettingsService, type ServerSettingsShape } from "../../serverSettings"; import { ProviderRegistry } from "../Services/ProviderRegistry"; @@ -562,6 +563,61 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest()))( assert.strictEqual(haveProvidersChanged(providers, [...providers]), false); }); + it.effect("does not probe provider health during registry startup", () => + Effect.gen(function* () { + let spawnCount = 0; + const serverSettings = yield* makeMutableServerSettingsService(); + const scope = yield* Scope.make(); + yield* Effect.addFinalizer(() => Scope.close(scope, Exit.void)); + const providerRegistryLayer = ProviderRegistryLive.pipe( + Layer.provideMerge(Layer.succeed(ServerSettingsService, serverSettings)), + Layer.provideMerge( + ServerConfig.layerTest(process.cwd(), { + prefix: "t3-provider-registry-", + }), + ), + Layer.provideMerge( + mockCommandSpawnerLayer((command, args) => { + spawnCount += 1; + const joined = args.join(" "); + if (joined === "--version") { + if (command === "codex") { + return { stdout: "codex 1.0.0\n", stderr: "", code: 0 }; + } + return { stdout: "claude 1.0.0\n", stderr: "", code: 0 }; + } + if (joined === "login status") { + return { stdout: "Logged in\n", stderr: "", code: 0 }; + } + if (joined === "auth status") { + return { stdout: '{"authenticated":true}\n', stderr: "", code: 0 }; + } + throw new Error(`Unexpected args: ${command} ${joined}`); + }), + ), + ); + const runtimeServices = yield* Layer.build( + Layer.mergeAll( + Layer.succeed(ServerSettingsService, serverSettings), + providerRegistryLayer, + ), + ).pipe(Scope.provide(scope)); + + yield* Effect.gen(function* () { + const registry = yield* ProviderRegistry; + assert.deepStrictEqual(yield* registry.getProviders, []); + assert.strictEqual(spawnCount, 0); + + const refreshed = yield* registry.refresh("codex"); + assert.strictEqual(spawnCount > 0, true); + assert.strictEqual( + refreshed.find((provider) => provider.provider === "codex")?.status, + "ready", + ); + }).pipe(Effect.provide(runtimeServices)); + }), + ); + it.effect("reruns codex health when codex provider settings change", () => Effect.gen(function* () { const serverSettings = yield* makeMutableServerSettingsService(); @@ -569,6 +625,11 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest()))( yield* Effect.addFinalizer(() => Scope.close(scope, Exit.void)); const providerRegistryLayer = ProviderRegistryLive.pipe( Layer.provideMerge(Layer.succeed(ServerSettingsService, serverSettings)), + Layer.provideMerge( + ServerConfig.layerTest(process.cwd(), { + prefix: "t3-provider-registry-", + }), + ), Layer.provideMerge( mockCommandSpawnerLayer((command, args) => { const joined = args.join(" "); @@ -596,8 +657,11 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest()))( const registry = yield* ProviderRegistry; const initial = yield* registry.getProviders; + assert.deepStrictEqual(initial, []); + + const refreshed = yield* registry.refresh("codex"); assert.strictEqual( - initial.find((status) => status.provider === "codex")?.status, + refreshed.find((status) => status.provider === "codex")?.status, "ready", ); diff --git a/apps/server/src/provider/Layers/ProviderRegistry.ts b/apps/server/src/provider/Layers/ProviderRegistry.ts index fb2f33c2932..41bb81e74f8 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.ts @@ -4,8 +4,9 @@ * @module ProviderRegistryLive */ import type { ProviderKind, ServerProvider } from "@t3tools/contracts"; -import { Effect, Equal, Layer, PubSub, Ref, Stream } from "effect"; +import { Effect, Equal, FileSystem, Layer, Path, PubSub, Ref, Stream } from "effect"; +import { ServerConfig } from "../../config"; import { ClaudeProviderLive } from "./ClaudeProvider"; import { CodexProviderLive } from "./CodexProvider"; import type { ClaudeProviderShape } from "../Services/ClaudeProvider"; @@ -13,6 +14,14 @@ import { ClaudeProvider } from "../Services/ClaudeProvider"; import type { CodexProviderShape } from "../Services/CodexProvider"; import { CodexProvider } from "../Services/CodexProvider"; import { ProviderRegistry, type ProviderRegistryShape } from "../Services/ProviderRegistry"; +import { + hydrateCachedProvider, + PROVIDER_CACHE_IDS, + orderProviderSnapshots, + readProviderStatusCache, + resolveProviderStatusCachePath, + writeProviderStatusCache, +} from "../providerStatusCache"; const loadProviders = ( codexProvider: CodexProviderShape, @@ -32,61 +41,152 @@ export const ProviderRegistryLive = Layer.effect( Effect.gen(function* () { const codexProvider = yield* CodexProvider; const claudeProvider = yield* ClaudeProvider; + const config = yield* ServerConfig; + const fileSystem = yield* FileSystem.FileSystem; + const path = yield* Path.Path; const changesPubSub = yield* Effect.acquireRelease( PubSub.unbounded>(), PubSub.shutdown, ); - const providersRef = yield* Ref.make>( - yield* loadProviders(codexProvider, claudeProvider), + const fallbackProviders = yield* loadProviders(codexProvider, claudeProvider); + const cachePathByProvider = new Map( + PROVIDER_CACHE_IDS.map( + (provider) => + [ + provider, + resolveProviderStatusCachePath({ + cacheDir: config.providerStatusCacheDir, + provider, + }), + ] as const, + ), + ); + const fallbackByProvider = new Map( + fallbackProviders.map((provider) => [provider.provider, provider] as const), ); + const cachedProviders = yield* Effect.forEach( + PROVIDER_CACHE_IDS, + (provider) => { + const filePath = cachePathByProvider.get(provider)!; + const fallbackProvider = fallbackByProvider.get(provider)!; + return readProviderStatusCache(filePath).pipe( + Effect.provideService(FileSystem.FileSystem, fileSystem), + Effect.map((cachedProvider) => + cachedProvider === undefined + ? undefined + : hydrateCachedProvider({ + cachedProvider, + fallbackProvider, + }), + ), + ); + }, + { concurrency: "unbounded" }, + ).pipe( + Effect.map((providers) => + orderProviderSnapshots( + providers.filter((provider): provider is ServerProvider => provider !== undefined), + ), + ), + ); + const providersRef = yield* Ref.make>(cachedProviders); + + const persistProvider = (provider: ServerProvider) => + writeProviderStatusCache({ + filePath: cachePathByProvider.get(provider.provider)!, + provider, + }).pipe( + Effect.provideService(FileSystem.FileSystem, fileSystem), + Effect.provideService(Path.Path, path), + Effect.tapError(Effect.logError), + Effect.ignore, + ); + + const upsertProviders = Effect.fn("upsertProviders")(function* ( + nextProviders: ReadonlyArray, + options?: { + readonly publish?: boolean; + }, + ) { + const [previousProviders, providers] = yield* Ref.modify( + providersRef, + (previousProviders) => { + const mergedProviders = new Map( + previousProviders.map((provider) => [provider.provider, provider] as const), + ); + + for (const provider of nextProviders) { + mergedProviders.set(provider.provider, provider); + } - const syncProviders = Effect.fn("syncProviders")(function* (options?: { - readonly publish?: boolean; - }) { - const previousProviders = yield* Ref.get(providersRef); - const providers = yield* loadProviders(codexProvider, claudeProvider); - yield* Ref.set(providersRef, providers); + const providers = orderProviderSnapshots([...mergedProviders.values()]); + return [[previousProviders, providers] as const, providers]; + }, + ); - if (options?.publish !== false && haveProvidersChanged(previousProviders, providers)) { - yield* PubSub.publish(changesPubSub, providers); + if (haveProvidersChanged(previousProviders, providers)) { + yield* Effect.forEach(nextProviders, persistProvider, { + concurrency: "unbounded", + discard: true, + }); + if (options?.publish !== false) { + yield* PubSub.publish(changesPubSub, providers); + } } return providers; }); - yield* Stream.runForEach(codexProvider.streamChanges, () => syncProviders()).pipe( - Effect.forkScoped, - ); - yield* Stream.runForEach(claudeProvider.streamChanges, () => syncProviders()).pipe( - Effect.forkScoped, - ); + const syncProvider = Effect.fn("syncProvider")(function* ( + provider: ServerProvider, + options?: { + readonly publish?: boolean; + }, + ) { + return yield* upsertProviders([provider], options); + }); const refresh = Effect.fn("refresh")(function* (provider?: ProviderKind) { switch (provider) { case "codex": - yield* codexProvider.refresh; - break; + return yield* codexProvider.refresh.pipe( + Effect.flatMap((nextProvider) => syncProvider(nextProvider)), + ); case "claudeAgent": - yield* claudeProvider.refresh; - break; + return yield* claudeProvider.refresh.pipe( + Effect.flatMap((nextProvider) => syncProvider(nextProvider)), + ); default: - yield* Effect.all([codexProvider.refresh, claudeProvider.refresh], { - concurrency: "unbounded", - }); - break; + return yield* Effect.all( + [ + codexProvider.refresh.pipe( + Effect.flatMap((nextProvider) => syncProvider(nextProvider)), + ), + claudeProvider.refresh.pipe( + Effect.flatMap((nextProvider) => syncProvider(nextProvider)), + ), + ], + { + concurrency: "unbounded", + discard: true, + }, + ).pipe(Effect.andThen(Ref.get(providersRef))); } - return yield* syncProviders(); }); + yield* Stream.runForEach(codexProvider.streamChanges, (provider) => + syncProvider(provider), + ).pipe(Effect.forkScoped); + yield* Stream.runForEach(claudeProvider.streamChanges, (provider) => + syncProvider(provider), + ).pipe(Effect.forkScoped); + return { - getProviders: syncProviders({ publish: false }).pipe( - Effect.tapError(Effect.logError), - Effect.orElseSucceed(() => []), - ), + getProviders: Ref.get(providersRef), refresh: (provider?: ProviderKind) => refresh(provider).pipe( Effect.tapError(Effect.logError), - Effect.orElseSucceed(() => []), + Effect.orElseSucceed(() => [] as ReadonlyArray), ), get streamChanges() { return Stream.fromPubSub(changesPubSub); diff --git a/apps/server/src/provider/makeManagedServerProvider.ts b/apps/server/src/provider/makeManagedServerProvider.ts index 59aeac1ab5f..856594c1f02 100644 --- a/apps/server/src/provider/makeManagedServerProvider.ts +++ b/apps/server/src/provider/makeManagedServerProvider.ts @@ -11,6 +11,7 @@ export const makeManagedServerProvider = Effect.fn("makeManagedServerProvider")( readonly getSettings: Effect.Effect; readonly streamSettings: Stream.Stream; readonly haveSettingsChanged: (previous: Settings, next: Settings) => boolean; + readonly initialSnapshot: (settings: Settings) => ServerProvider; readonly checkProvider: Effect.Effect; readonly refreshInterval?: Duration.Input; }): Effect.fn.Return { @@ -20,7 +21,7 @@ export const makeManagedServerProvider = Effect.fn("makeManagedServerProvider")( PubSub.shutdown, ); const initialSettings = yield* input.getSettings; - const initialSnapshot = yield* input.checkProvider; + const initialSnapshot = input.initialSnapshot(initialSettings); const snapshotRef = yield* Ref.make(initialSnapshot); const settingsRef = yield* Ref.make(initialSettings); diff --git a/apps/server/src/provider/providerStatusCache.test.ts b/apps/server/src/provider/providerStatusCache.test.ts new file mode 100644 index 00000000000..5f0d88322e1 --- /dev/null +++ b/apps/server/src/provider/providerStatusCache.test.ts @@ -0,0 +1,136 @@ +import * as NodeServices from "@effect/platform-node/NodeServices"; +import type { ServerProvider } from "@t3tools/contracts"; +import { assert, it } from "@effect/vitest"; +import { Effect, FileSystem } from "effect"; + +import { + hydrateCachedProvider, + readProviderStatusCache, + resolveProviderStatusCachePath, + writeProviderStatusCache, +} from "./providerStatusCache"; + +const makeProvider = ( + provider: ServerProvider["provider"], + overrides?: Partial, +): ServerProvider => ({ + provider, + enabled: true, + installed: true, + version: "1.0.0", + status: "ready", + auth: { status: "authenticated" }, + checkedAt: "2026-04-11T00:00:00.000Z", + models: [], + slashCommands: [], + skills: [], + ...overrides, +}); + +it.layer(NodeServices.layer)("providerStatusCache", (it) => { + it.effect("writes and reads provider status snapshots", () => + Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem; + const tempDir = yield* fs.makeTempDirectoryScoped({ prefix: "t3-provider-cache-" }); + const codexProvider = makeProvider("codex"); + const claudeProvider = makeProvider("claudeAgent", { + status: "warning", + auth: { status: "unknown" }, + }); + const codexPath = resolveProviderStatusCachePath({ + cacheDir: tempDir, + provider: "codex", + }); + const claudePath = resolveProviderStatusCachePath({ + cacheDir: tempDir, + provider: "claudeAgent", + }); + + yield* writeProviderStatusCache({ + filePath: codexPath, + provider: codexProvider, + }); + yield* writeProviderStatusCache({ + filePath: claudePath, + provider: claudeProvider, + }); + + assert.deepStrictEqual(yield* readProviderStatusCache(codexPath), codexProvider); + assert.deepStrictEqual(yield* readProviderStatusCache(claudePath), claudeProvider); + }), + ); + + it("hydrates cached provider status onto current settings-derived models", () => { + const cachedCodex = makeProvider("codex", { + checkedAt: "2026-04-10T12:00:00.000Z", + models: [], + message: "Cached message", + skills: [ + { + name: "github:gh-fix-ci", + path: "/tmp/skills/gh-fix-ci/SKILL.md", + enabled: true, + displayName: "CI Debug", + }, + ], + }); + const fallbackCodex = makeProvider("codex", { + models: [ + { + slug: "gpt-5.4", + name: "GPT-5.4", + isCustom: false, + capabilities: { + reasoningEffortLevels: [], + supportsFastMode: false, + supportsThinkingToggle: false, + contextWindowOptions: [], + promptInjectedEffortLevels: [], + }, + }, + ], + message: "Pending refresh", + }); + + assert.deepStrictEqual( + hydrateCachedProvider({ + cachedProvider: cachedCodex, + fallbackProvider: fallbackCodex, + }), + { + ...fallbackCodex, + installed: cachedCodex.installed, + version: cachedCodex.version, + status: cachedCodex.status, + auth: cachedCodex.auth, + checkedAt: cachedCodex.checkedAt, + slashCommands: cachedCodex.slashCommands, + skills: cachedCodex.skills, + message: cachedCodex.message, + }, + ); + }); + + it("ignores stale cached enabled state when the provider is now disabled", () => { + const cachedCodex = makeProvider("codex", { + checkedAt: "2026-04-10T12:00:00.000Z", + message: "Cached ready status", + }); + const disabledFallback = makeProvider("codex", { + enabled: false, + installed: false, + version: null, + status: "disabled", + auth: { status: "unknown" }, + message: "Codex is disabled in T3 Code settings.", + }); + + assert.deepStrictEqual( + hydrateCachedProvider({ + cachedProvider: cachedCodex, + fallbackProvider: disabledFallback, + }), + disabledFallback, + ); + }); +}); diff --git a/apps/server/src/provider/providerStatusCache.ts b/apps/server/src/provider/providerStatusCache.ts new file mode 100644 index 00000000000..abedf99d138 --- /dev/null +++ b/apps/server/src/provider/providerStatusCache.ts @@ -0,0 +1,105 @@ +import * as nodePath from "node:path"; +import { type ServerProvider, ServerProvider as ServerProviderSchema } from "@t3tools/contracts"; +import { Cause, Effect, FileSystem, Path, Schema } from "effect"; + +export const PROVIDER_CACHE_IDS = ["codex", "claudeAgent"] as const satisfies ReadonlyArray< + ServerProvider["provider"] +>; + +const decodeProviderStatusCache = Schema.decodeUnknownEffect( + Schema.fromJsonString(ServerProviderSchema), +); + +const providerOrderRank = (provider: ServerProvider["provider"]): number => { + const rank = PROVIDER_CACHE_IDS.indexOf(provider); + return rank === -1 ? Number.MAX_SAFE_INTEGER : rank; +}; + +export const orderProviderSnapshots = ( + providers: ReadonlyArray, +): ReadonlyArray => + [...providers].toSorted( + (left, right) => providerOrderRank(left.provider) - providerOrderRank(right.provider), + ); + +export const hydrateCachedProvider = (input: { + readonly cachedProvider: ServerProvider; + readonly fallbackProvider: ServerProvider; +}): ServerProvider => { + if ( + !input.fallbackProvider.enabled || + input.cachedProvider.enabled !== input.fallbackProvider.enabled + ) { + return input.fallbackProvider; + } + + const { message: _fallbackMessage, ...fallbackWithoutMessage } = input.fallbackProvider; + const hydratedProvider: ServerProvider = { + ...fallbackWithoutMessage, + installed: input.cachedProvider.installed, + version: input.cachedProvider.version, + status: input.cachedProvider.status, + auth: input.cachedProvider.auth, + checkedAt: input.cachedProvider.checkedAt, + slashCommands: input.cachedProvider.slashCommands, + skills: input.cachedProvider.skills, + }; + + return input.cachedProvider.message + ? { ...hydratedProvider, message: input.cachedProvider.message } + : hydratedProvider; +}; + +export const resolveProviderStatusCachePath = (input: { + readonly cacheDir: string; + readonly provider: ServerProvider["provider"]; +}) => nodePath.join(input.cacheDir, `${input.provider}.json`); + +export const readProviderStatusCache = (filePath: string) => + Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem; + const exists = yield* fs.exists(filePath).pipe(Effect.orElseSucceed(() => false)); + if (!exists) { + return undefined; + } + + const raw = yield* fs.readFileString(filePath).pipe(Effect.orElseSucceed(() => "")); + const trimmed = raw.trim(); + if (trimmed.length === 0) { + return undefined; + } + + return yield* decodeProviderStatusCache(trimmed).pipe( + Effect.matchCauseEffect({ + onFailure: (cause) => + Effect.logWarning("failed to parse provider status cache, ignoring", { + path: filePath, + issues: Cause.pretty(cause), + }).pipe(Effect.as(undefined)), + onSuccess: Effect.succeed, + }), + ); + }); + +export const writeProviderStatusCache = (input: { + readonly filePath: string; + readonly provider: ServerProvider; +}) => { + const tempPath = `${input.filePath}.${process.pid}.${Date.now()}.tmp`; + return Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const encoded = `${JSON.stringify(input.provider, null, 2)}\n`; + + yield* fs.makeDirectory(path.dirname(input.filePath), { recursive: true }); + yield* fs.writeFileString(tempPath, encoded); + yield* fs.rename(tempPath, input.filePath); + }).pipe( + Effect.ensuring( + Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem; + yield* fs.remove(tempPath, { force: true }).pipe(Effect.ignore({ log: true })); + }), + ), + ); +}; diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index 5e5b172f444..276bc1d9b40 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -1790,7 +1790,20 @@ it.layer(NodeServices.layer)("server router seam", (it) => { it.effect("routes websocket rpc subscribeServerConfig streams snapshot then update", () => Effect.gen(function* () { - const providers = [] as const; + const providers = [ + { + provider: "codex" as const, + enabled: true, + installed: true, + version: "1.0.0", + status: "ready" as const, + auth: { status: "authenticated" as const }, + checkedAt: "2026-04-11T00:00:00.000Z", + models: [], + slashCommands: [], + skills: [], + }, + ] as const; const changeEvent = { keybindings: [], issues: [], @@ -1847,7 +1860,20 @@ it.layer(NodeServices.layer)("server router seam", (it) => { it.effect("routes websocket rpc subscribeServerConfig emits provider status updates", () => Effect.gen(function* () { - const providers = [] as const; + const nextProviders = [ + { + provider: "codex" as const, + enabled: true, + installed: true, + version: "1.0.0", + status: "ready" as const, + auth: { status: "authenticated" as const }, + checkedAt: "2026-04-11T00:00:00.000Z", + models: [], + slashCommands: [], + skills: [], + }, + ] as const; yield* buildAppUnderTest({ layers: { @@ -1860,7 +1886,7 @@ it.layer(NodeServices.layer)("server router seam", (it) => { }, providerRegistry: { getProviders: Effect.succeed([]), - streamChanges: Stream.succeed(providers), + streamChanges: Stream.succeed(nextProviders), }, }, }); @@ -1874,10 +1900,13 @@ it.layer(NodeServices.layer)("server router seam", (it) => { const [first, second] = Array.from(events); assert.equal(first?.type, "snapshot"); + if (first?.type === "snapshot") { + assert.deepEqual(first.config.providers, []); + } assert.deepEqual(second, { version: 1, type: "providerStatuses", - payload: { providers }, + payload: { providers: nextProviders }, }); }).pipe(Effect.provide(NodeHttpServer.layerTest)), ); diff --git a/apps/server/src/serverRuntimeStartup.test.ts b/apps/server/src/serverRuntimeStartup.test.ts index 7e8c0e14bad..836b71c7eb4 100644 --- a/apps/server/src/serverRuntimeStartup.test.ts +++ b/apps/server/src/serverRuntimeStartup.test.ts @@ -1,13 +1,21 @@ -import { DEFAULT_MODEL_BY_PROVIDER } from "@t3tools/contracts"; +import * as NodeServices from "@effect/platform-node/NodeServices"; +import { DEFAULT_MODEL_BY_PROVIDER, ProjectId, ThreadId } from "@t3tools/contracts"; import { assert, it } from "@effect/vitest"; -import { Deferred, Effect, Fiber, Option, Ref } from "effect"; +import { Deferred, Effect, Fiber, Option, Ref, Stream } from "effect"; -import { AnalyticsService } from "./telemetry/Services/AnalyticsService.ts"; +import { ServerConfig } from "./config.ts"; +import { + OrchestrationEngineService, + type OrchestrationEngineShape, +} from "./orchestration/Services/OrchestrationEngine.ts"; import { ProjectionSnapshotQuery } from "./orchestration/Services/ProjectionSnapshotQuery.ts"; +import { AnalyticsService } from "./telemetry/Services/AnalyticsService.ts"; import { getAutoBootstrapDefaultModelSelection, launchStartupHeartbeat, makeCommandGate, + resolveAutoBootstrapWelcomeTargets, + resolveWelcomeBase, ServerRuntimeStartupError, } from "./serverRuntimeStartup.ts"; @@ -93,3 +101,109 @@ it.effect("launchStartupHeartbeat does not block the caller while counts are loa }), ), ); + +it.effect("resolveWelcomeBase derives cwd and project name from server config", () => + Effect.gen(function* () { + const welcome = yield* resolveWelcomeBase.pipe( + Effect.provideService(ServerConfig, { + cwd: "/tmp/startup-project", + } as never), + ); + + assert.deepStrictEqual(welcome, { + cwd: "/tmp/startup-project", + projectName: "startup-project", + }); + }), +); + +it.effect("resolveAutoBootstrapWelcomeTargets returns existing project and thread ids", () => { + const bootstrapProjectId = ProjectId.make("project-startup-bootstrap"); + const bootstrapThreadId = ThreadId.make("thread-startup-bootstrap"); + + return Effect.gen(function* () { + const dispatchCalls = yield* Ref.make>([]); + const targets = yield* resolveAutoBootstrapWelcomeTargets.pipe( + Effect.provideService(ServerConfig, { + cwd: "/tmp/startup-project", + autoBootstrapProjectFromCwd: true, + } as never), + Effect.provideService(ProjectionSnapshotQuery, { + getSnapshot: () => Effect.die("unused"), + getShellSnapshot: () => Effect.die("unused"), + getCounts: () => Effect.die("unused"), + getActiveProjectByWorkspaceRoot: () => + Effect.succeed( + Option.some({ + id: bootstrapProjectId, + title: "Startup Project", + workspaceRoot: "/tmp/startup-project", + defaultModelSelection: getAutoBootstrapDefaultModelSelection(), + scripts: [], + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt: "2026-01-01T00:00:00.000Z", + deletedAt: null, + }), + ), + getProjectShellById: () => Effect.die("unused"), + getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.some(bootstrapThreadId)), + getThreadCheckpointContext: () => Effect.succeed(Option.none()), + getThreadShellById: () => Effect.die("unused"), + getThreadDetailById: () => Effect.die("unused"), + }), + Effect.provideService(OrchestrationEngineService, { + getReadModel: () => Effect.die("unused"), + readEvents: () => Stream.empty, + dispatch: (command) => + Ref.update(dispatchCalls, (calls) => [...calls, command.type]).pipe( + Effect.as({ sequence: 1 }), + ), + streamDomainEvents: Stream.empty, + } satisfies OrchestrationEngineShape), + Effect.provide(NodeServices.layer), + ); + + assert.deepStrictEqual(targets, { + bootstrapProjectId, + bootstrapThreadId, + }); + assert.deepStrictEqual(yield* Ref.get(dispatchCalls), []); + }); +}); + +it.effect("resolveAutoBootstrapWelcomeTargets creates a project and thread when missing", () => + Effect.gen(function* () { + const dispatchCalls = yield* Ref.make>([]); + const targets = yield* resolveAutoBootstrapWelcomeTargets.pipe( + Effect.provideService(ServerConfig, { + cwd: "/tmp/startup-project", + autoBootstrapProjectFromCwd: true, + } as never), + Effect.provideService(ProjectionSnapshotQuery, { + getSnapshot: () => Effect.die("unused"), + getShellSnapshot: () => Effect.die("unused"), + getCounts: () => Effect.die("unused"), + getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), + getProjectShellById: () => Effect.die("unused"), + getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()), + getThreadCheckpointContext: () => Effect.succeed(Option.none()), + getThreadShellById: () => Effect.die("unused"), + getThreadDetailById: () => Effect.die("unused"), + }), + Effect.provideService(OrchestrationEngineService, { + getReadModel: () => Effect.die("unused"), + readEvents: () => Stream.empty, + dispatch: (command) => + Ref.update(dispatchCalls, (calls) => [...calls, command.type]).pipe( + Effect.as({ sequence: 1 }), + ), + streamDomainEvents: Stream.empty, + } satisfies OrchestrationEngineShape), + Effect.provide(NodeServices.layer), + ); + + assert.equal(typeof targets.bootstrapProjectId, "string"); + assert.equal(typeof targets.bootstrapThreadId, "string"); + assert.deepStrictEqual(yield* Ref.get(dispatchCalls), ["project.create", "thread.create"]); + }), +); diff --git a/apps/server/src/serverRuntimeStartup.ts b/apps/server/src/serverRuntimeStartup.ts index 441df3395d9..823e3b4771e 100644 --- a/apps/server/src/serverRuntimeStartup.ts +++ b/apps/server/src/serverRuntimeStartup.ts @@ -157,7 +157,18 @@ export const getAutoBootstrapDefaultModelSelection = (): ModelSelection => ({ model: DEFAULT_MODEL_BY_PROVIDER.codex, }); -const autoBootstrapWelcome = Effect.gen(function* () { +export const resolveWelcomeBase = Effect.gen(function* () { + const serverConfig = yield* ServerConfig; + const segments = serverConfig.cwd.split(/[/\\]/).filter(Boolean); + const projectName = segments[segments.length - 1] ?? "project"; + + return { + cwd: serverConfig.cwd, + projectName, + } as const; +}); + +export const resolveAutoBootstrapWelcomeTargets = Effect.gen(function* () { const serverConfig = yield* ServerConfig; const projectionReadModelQuery = yield* ProjectionSnapshotQuery; const orchestrationEngine = yield* OrchestrationEngineService; @@ -221,12 +232,7 @@ const autoBootstrapWelcome = Effect.gen(function* () { }); } - const segments = serverConfig.cwd.split(/[/\\]/).filter(Boolean); - const projectName = segments[segments.length - 1] ?? "project"; - return { - cwd: serverConfig.cwd, - projectName, ...(bootstrapProjectId ? { bootstrapProjectId } : {}), ...(bootstrapThreadId ? { bootstrapThreadId } : {}), } as const; @@ -271,7 +277,7 @@ const runStartupPhase = (phase: string, effect: Effect.Effect) Effect.withSpan(`server.startup.${phase}`), ); -const makeServerRuntimeStartup = Effect.gen(function* () { +export const makeServerRuntimeStartup = Effect.gen(function* () { const serverConfig = yield* ServerConfig; const keybindings = yield* Keybindings; const orchestrationReactor = yield* OrchestrationReactor; @@ -322,15 +328,13 @@ const makeServerRuntimeStartup = Effect.gen(function* () { orchestrationReactor.start().pipe(Scope.provide(reactorScope)), ); - yield* Effect.logDebug("startup phase: preparing welcome payload"); - const welcome = yield* runStartupPhase("welcome.prepare", autoBootstrapWelcome); + const welcomeBase = yield* resolveWelcomeBase; const environment = yield* serverEnvironment.getDescriptor; + yield* Effect.logDebug("startup phase: preparing welcome payload"); yield* Effect.logDebug("startup phase: publishing welcome event", { environmentId: environment.environmentId, - cwd: welcome.cwd, - projectName: welcome.projectName, - bootstrapProjectId: welcome.bootstrapProjectId, - bootstrapThreadId: welcome.bootstrapThreadId, + cwd: welcomeBase.cwd, + projectName: welcomeBase.projectName, }); yield* runStartupPhase( "welcome.publish", @@ -339,10 +343,47 @@ const makeServerRuntimeStartup = Effect.gen(function* () { type: "welcome", payload: { environment, - ...welcome, + ...welcomeBase, }, }), ); + + if (serverConfig.autoBootstrapProjectFromCwd) { + yield* Effect.forkScoped( + runStartupPhase( + "welcome.autobootstrap", + Effect.gen(function* () { + const bootstrapTargets = yield* resolveAutoBootstrapWelcomeTargets; + if (!bootstrapTargets.bootstrapProjectId && !bootstrapTargets.bootstrapThreadId) { + return; + } + + yield* Effect.logDebug("startup phase: publishing bootstrapped welcome event", { + environmentId: environment.environmentId, + cwd: welcomeBase.cwd, + projectName: welcomeBase.projectName, + bootstrapProjectId: bootstrapTargets.bootstrapProjectId, + bootstrapThreadId: bootstrapTargets.bootstrapThreadId, + }); + yield* lifecycleEvents.publish({ + version: 1, + type: "welcome", + payload: { + environment, + ...welcomeBase, + ...bootstrapTargets, + }, + }); + }).pipe( + Effect.catch((cause) => + Effect.logWarning("startup auto-bootstrap welcome failed", { + cause, + }), + ), + ), + ), + ); + } }).pipe( Effect.annotateSpans({ "server.mode": serverConfig.mode, diff --git a/apps/server/src/ws.ts b/apps/server/src/ws.ts index c685bbc763e..3214cef806e 100644 --- a/apps/server/src/ws.ts +++ b/apps/server/src/ws.ts @@ -1,4 +1,4 @@ -import { Cause, Effect, Layer, Option, Queue, Ref, Schema, Stream } from "effect"; +import { Cause, Duration, Effect, Layer, Option, Queue, Ref, Schema, Stream } from "effect"; import { type AuthAccessStreamEvent, AuthSessionId, @@ -85,6 +85,8 @@ function isThreadDetailEvent(event: OrchestrationEvent): event is Extract< ); } +const PROVIDER_STATUS_DEBOUNCE_MS = 200; + function toAuthAccessStreamEvent( change: BootstrapCredentialChange | SessionCredentialChange, revision: number, @@ -919,6 +921,7 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) => type: "providerStatuses" as const, payload: { providers }, })), + Stream.debounce(Duration.millis(PROVIDER_STATUS_DEBOUNCE_MS)), ); const settingsUpdates = serverSettings.streamChanges.pipe( Stream.map((settings) => ({ @@ -928,13 +931,26 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) => })), ); + yield* Effect.all( + [providerRegistry.refresh("codex"), providerRegistry.refresh("claudeAgent")], + { + concurrency: "unbounded", + discard: true, + }, + ).pipe(Effect.ignoreCause({ log: true }), Effect.forkScoped); + + const liveUpdates = Stream.merge( + keybindingsUpdates, + Stream.merge(providerStatuses, settingsUpdates), + ); + return Stream.concat( Stream.make({ version: 1 as const, type: "snapshot" as const, config: yield* loadServerConfig, }), - Stream.merge(keybindingsUpdates, Stream.merge(providerStatuses, settingsUpdates)), + liveUpdates, ); }), { "rpc.aggregate": "server" },