From abbb9d57c5023ef4c3f1ffaad2077edefd3f33b5 Mon Sep 17 00:00:00 2001 From: Idris Gadi Date: Fri, 15 May 2026 19:48:54 +0530 Subject: [PATCH 1/2] fix: do not wait for slow providers for HTTP readiness --- .../src/provider/Layers/ProviderRegistry.ts | 359 ++++++++++++------ 1 file changed, 235 insertions(+), 124 deletions(-) diff --git a/apps/server/src/provider/Layers/ProviderRegistry.ts b/apps/server/src/provider/Layers/ProviderRegistry.ts index 22a120f0a52..e2bc99210d7 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.ts @@ -38,14 +38,19 @@ import * as Path from "effect/Path"; import * as PubSub from "effect/PubSub"; import * as Ref from "effect/Ref"; import * as Stream from "effect/Stream"; +import * as Duration from "effect/Duration"; +import * as Fiber from "effect/Fiber"; +import * as Option from "effect/Option"; import * as Semaphore from "effect/Semaphore"; import { ServerConfig } from "../../config.ts"; import { ProviderInstanceRegistry } from "../Services/ProviderInstanceRegistry.ts"; -import { ProviderRegistry, type ProviderRegistryShape } from "../Services/ProviderRegistry.ts"; +import { + ProviderRegistry, + type ProviderRegistryShape, +} from "../Services/ProviderRegistry.ts"; import { hydrateCachedProvider, - isCachedProviderCorrelated, orderProviderSnapshots, readProviderStatusCache, resolveProviderStatusCachePath, @@ -62,21 +67,29 @@ const loadProviders = ( providerSources, (providerSource) => providerSource.getSnapshot.pipe( - Effect.flatMap((snapshot) => correlateSnapshotWithSource(providerSource, snapshot)), + Effect.flatMap((snapshot) => + correlateSnapshotWithSource(providerSource, snapshot), + ), ), { concurrency: "unbounded", }, ); -const makeManualProviderMaintenanceCapabilities = (provider: ProviderDriverKind) => +const INITIAL_PROVIDER_REFRESH_BUDGET = Duration.millis(1_500); +const BOOT_SNAPSHOT_FALLBACK_BUDGET = Duration.millis(100); + +const makeManualProviderMaintenanceCapabilities = ( + provider: ProviderDriverKind, +) => makeManualOnlyProviderMaintenanceCapabilities({ provider, packageName: null, }); -const hasModelCapabilities = (model: ServerProvider["models"][number]): boolean => - (model.capabilities?.optionDescriptors?.length ?? 0) > 0; +const hasModelCapabilities = ( + model: ServerProvider["models"][number], +): boolean => (model.capabilities?.optionDescriptors?.length ?? 0) > 0; const mergeProviderModels = ( previousModels: ReadonlyArray, @@ -86,10 +99,16 @@ const mergeProviderModels = ( return previousModels; } - const previousBySlug = new Map(previousModels.map((model) => [model.slug, model] as const)); + const previousBySlug = new Map( + previousModels.map((model) => [model.slug, model] as const), + ); const mergedModels = nextModels.map((model) => { const previousModel = previousBySlug.get(model.slug); - if (!previousModel || hasModelCapabilities(model) || !hasModelCapabilities(previousModel)) { + if ( + !previousModel || + hasModelCapabilities(model) || + !hasModelCapabilities(previousModel) + ) { return model; } return { @@ -98,7 +117,10 @@ const mergeProviderModels = ( }; }); const nextSlugs = new Set(nextModels.map((model) => model.slug)); - return [...mergedModels, ...previousModels.filter((model) => !nextSlugs.has(model.slug))]; + return [ + ...mergedModels, + ...previousModels.filter((model) => !nextSlugs.has(model.slug)), + ]; }; export const mergeProviderSnapshot = ( @@ -109,7 +131,10 @@ export const mergeProviderSnapshot = ( ? nextProvider : { ...nextProvider, - models: mergeProviderModels(previousProvider.models, nextProvider.models), + models: mergeProviderModels( + previousProvider.models, + nextProvider.models, + ), }; export const mergeProviderSnapshots = ( @@ -117,13 +142,18 @@ export const mergeProviderSnapshots = ( nextProviders: ReadonlyArray, ): ReadonlyArray => { const mergedProviders = new Map( - previousProviders.map((provider) => [snapshotInstanceKey(provider), provider] as const), + previousProviders.map( + (provider) => [snapshotInstanceKey(provider), provider] as const, + ), ); for (const provider of nextProviders) { mergedProviders.set( snapshotInstanceKey(provider), - mergeProviderSnapshot(mergedProviders.get(snapshotInstanceKey(provider)), provider), + mergeProviderSnapshot( + mergedProviders.get(snapshotInstanceKey(provider)), + provider, + ), ); } @@ -176,7 +206,9 @@ const snapshotInstanceKey = (provider: ServerProvider): ProviderInstanceId => { // after `ProviderInstanceRegistry` rebuilds an instance (e.g. because // its settings changed), a fresh source rides the new PubSub instead // of a closed one. -const buildSnapshotSource = (instance: ProviderInstance): ProviderSnapshotSource => ({ +const buildSnapshotSource = ( + instance: ProviderInstance, +): ProviderSnapshotSource => ({ instanceId: instance.instanceId, driverKind: instance.driverKind, getSnapshot: instance.snapshot.getSnapshot, @@ -204,9 +236,17 @@ export const ProviderRegistryLive = Layer.effect( // Instances added post-boot skip this path; their first entry in // `providersRef` comes from the reactive `syncLiveSources` pass // below. + // + // The boot fallback is bounded so slow provider probes can't block + // HTTP readiness. Fast `getSnapshot` implementations (tests, cheap + // providers) still seed state within the budget; cache fills the + // gaps for everything else. const bootInstances = yield* instanceRegistry.listInstances; const bootSources = bootInstances.map(buildSnapshotSource); - const fallbackProviders = yield* loadProviders(bootSources); + const fallbackProviders = yield* loadProviders(bootSources).pipe( + Effect.timeoutOption(BOOT_SNAPSHOT_FALLBACK_BUDGET), + Effect.map(Option.getOrElse((): ReadonlyArray => [])), + ); const fallbackByInstance = new Map(); for (let index = 0; index < fallbackProviders.length; index++) { const provider = fallbackProviders[index]; @@ -217,7 +257,7 @@ export const ProviderRegistryLive = Layer.effect( fallbackByInstance.set(source.instanceId, provider); } - const cachedProviders = yield* Effect.forEach( + const hydratedBootProviders = yield* Effect.forEach( bootSources, (source) => Effect.gen(function* () { @@ -225,66 +265,81 @@ export const ProviderRegistryLive = Layer.effect( // instance of a built-in kind the path equals `.json` — // identical to the legacy filename. We still require the cache // payload to carry matching instance id + driver kind; old - // identity-less payloads are discarded and the awaited refresh - // below repopulates the cache. + // identity-less payloads are discarded and the background + // refresh repopulates the cache. const filePath = yield* resolveProviderStatusCachePath({ cacheDir: config.providerStatusCacheDir, instanceId: source.instanceId, }).pipe(Effect.provideService(Path.Path, path)); const fallbackProvider = fallbackByInstance.get(source.instanceId); - if (fallbackProvider === undefined) { - return undefined; - } + return yield* readProviderStatusCache(filePath).pipe( Effect.provideService(FileSystem.FileSystem, fileSystem), - Effect.flatMap((cachedProvider) => { - if (cachedProvider === undefined) { - return Effect.void.pipe(Effect.as(undefined as ServerProvider | undefined)); - } - const correlation = { - cachedProvider, - fallbackProvider, - } as const; - if (!isCachedProviderCorrelated(correlation)) { - return Effect.logWarning("provider status cache identity mismatch, ignoring", { - path: filePath, - instanceId: source.instanceId, - cachedInstanceId: cachedProvider.instanceId ?? null, - driver: source.driverKind, - cachedDriver: cachedProvider.driver ?? null, - }).pipe(Effect.as(undefined as ServerProvider | undefined)); - } - return Effect.succeed(hydrateCachedProvider(correlation)); - }), + Effect.flatMap( + (cachedProvider): Effect.Effect => { + if (cachedProvider === undefined) { + return Effect.succeed(fallbackProvider); + } + if ( + cachedProvider.instanceId !== source.instanceId || + cachedProvider.driver !== source.driverKind + ) { + return Effect.logWarning( + "provider status cache identity mismatch, ignoring", + { + path: filePath, + instanceId: source.instanceId, + cachedInstanceId: cachedProvider.instanceId ?? null, + driver: source.driverKind, + cachedDriver: cachedProvider.driver ?? null, + }, + ).pipe(Effect.as(fallbackProvider)); + } + if (fallbackProvider !== undefined) { + return Effect.succeed( + hydrateCachedProvider({ cachedProvider, fallbackProvider }), + ); + } + return Effect.succeed(cachedProvider); + }, + ), ); }), { concurrency: "unbounded" }, ).pipe( Effect.map((providers) => orderProviderSnapshots( - providers.filter((provider): provider is ServerProvider => provider !== undefined), + providers.filter( + (provider): provider is ServerProvider => provider !== undefined, + ), ), ), ); - const providersRef = yield* Ref.make>(cachedProviders); + const providersRef = yield* Ref.make>( + hydratedBootProviders, + ); const maintenanceActionStatesRef = yield* Ref.make< - ReadonlyMap + ReadonlyMap< + ProviderInstanceId, + { readonly update?: ServerProviderUpdateState | undefined } + > >(new Map()); // Live-source registry — the dynamic counterpart to the boot-time // `bootSources`. Keyed by `instanceId`; the stored `ProviderInstance` // reference is used for identity equality so "no-op" reconciles // (settings unchanged) skip re-subscribing + re-probing. - const liveSubsRef = yield* Ref.make>( - new Map(), - ); + const liveSubsRef = yield* Ref.make< + ReadonlyMap + >(new Map()); // Serialize `syncLiveSources` so a rapid burst of reconciles doesn't // interleave two passes clobbering each other's fiber bookkeeping. const syncSemaphore = yield* Semaphore.make(1); - const getLiveSources: Effect.Effect> = Ref.get( - liveSubsRef, - ).pipe(Effect.map((map) => Array.from(map.values(), buildSnapshotSource))); + const getLiveSources: Effect.Effect> = + Ref.get(liveSubsRef).pipe( + Effect.map((map) => Array.from(map.values(), buildSnapshotSource)), + ); const persistProvider = (provider: ServerProvider) => Effect.gen(function* () { @@ -307,20 +362,25 @@ export const ProviderRegistryLive = Layer.effect( ); }); - const applyProviderUpdateState = Effect.fn("applyProviderUpdateState")(function* ( - provider: ServerProvider, - ) { - const maintenanceActionStates = yield* Ref.get(maintenanceActionStatesRef); - const updateState = maintenanceActionStates.get(provider.instanceId)?.update; - if (!updateState) { - const { updateState: _updateState, ...providerWithoutUpdateState } = provider; - return providerWithoutUpdateState; - } - return { - ...provider, - updateState, - }; - }); + const applyProviderUpdateState = Effect.fn("applyProviderUpdateState")( + function* (provider: ServerProvider) { + const maintenanceActionStates = yield* Ref.get( + maintenanceActionStatesRef, + ); + const updateState = maintenanceActionStates.get( + provider.instanceId, + )?.update; + if (!updateState) { + const { updateState: _updateState, ...providerWithoutUpdateState } = + provider; + return providerWithoutUpdateState; + } + return { + ...provider, + updateState, + }; + }, + ); const upsertProviders = Effect.fn("upsertProviders")(function* ( nextProviders: ReadonlyArray, @@ -337,11 +397,12 @@ export const ProviderRegistryLive = Layer.effect( concurrency: "unbounded", }, ); - const [previousProviders, providers, providersToPersist] = yield* Ref.modify( - providersRef, - (previousProviders) => { + const [previousProviders, providers, providersToPersist] = + yield* Ref.modify(providersRef, (previousProviders) => { const mergedProviders = new Map( - previousProviders.map((provider) => [snapshotInstanceKey(provider), provider] as const), + previousProviders.map( + (provider) => [snapshotInstanceKey(provider), provider] as const, + ), ); const updatedKeys = new Set(); @@ -356,13 +417,17 @@ export const ProviderRegistryLive = Layer.effect( ); } - const providers = orderProviderSnapshots([...mergedProviders.values()]); + const providers = orderProviderSnapshots([ + ...mergedProviders.values(), + ]); const providersToPersist = providers.filter((provider) => updatedKeys.has(snapshotInstanceKey(provider)), ); - return [[previousProviders, providers, providersToPersist] as const, providers]; - }, - ); + return [ + [previousProviders, providers, providersToPersist] as const, + providers, + ]; + }); if (haveProvidersChanged(previousProviders, providers)) { if (options?.persist !== false) { @@ -388,44 +453,44 @@ export const ProviderRegistryLive = Layer.effect( return yield* upsertProviders([provider], options); }); - const setProviderMaintenanceActionState = Effect.fn("setProviderMaintenanceActionState")( - function* (input: { - readonly instanceId: ProviderInstanceId; - readonly action: "update"; - readonly state: ServerProviderUpdateState | null; - }) { - yield* Ref.update(maintenanceActionStatesRef, (previous) => { - const previousActions = previous.get(input.instanceId); - const nextActions = { ...previousActions }; - if (input.state === null || input.state.status === "idle") { - delete nextActions[input.action]; - } else { - nextActions[input.action] = input.state; - } - - const next = new Map(previous); - if (Object.keys(nextActions).length === 0) { - next.delete(input.instanceId); - } else { - next.set(input.instanceId, nextActions); - } - return next; - }); + const setProviderMaintenanceActionState = Effect.fn( + "setProviderMaintenanceActionState", + )(function* (input: { + readonly instanceId: ProviderInstanceId; + readonly action: "update"; + readonly state: ServerProviderUpdateState | null; + }) { + yield* Ref.update(maintenanceActionStatesRef, (previous) => { + const previousActions = previous.get(input.instanceId); + const nextActions = { ...previousActions }; + if (input.state === null || input.state.status === "idle") { + delete nextActions[input.action]; + } else { + nextActions[input.action] = input.state; + } - const existingProviders = yield* Ref.get(providersRef); - const matchingProvider = existingProviders.find( - (candidate) => candidate.instanceId === input.instanceId, - ); - if (!matchingProvider) { - return existingProviders; + const next = new Map(previous); + if (Object.keys(nextActions).length === 0) { + next.delete(input.instanceId); + } else { + next.set(input.instanceId, nextActions); } + return next; + }); - const nextProvider = yield* applyProviderUpdateState(matchingProvider); - return yield* upsertProviders([nextProvider], { - persist: false, - }); - }, - ); + const existingProviders = yield* Ref.get(providersRef); + const matchingProvider = existingProviders.find( + (candidate) => candidate.instanceId === input.instanceId, + ); + if (!matchingProvider) { + return existingProviders; + } + + const nextProvider = yield* applyProviderUpdateState(matchingProvider); + return yield* upsertProviders([nextProvider], { + persist: false, + }); + }); const refreshOneSource = Effect.fn("refreshOneSource")(function* ( providerSource: ProviderSnapshotSource, @@ -441,13 +506,19 @@ export const ProviderRegistryLive = Layer.effect( const refreshAll = Effect.fn("refreshAll")(function* () { const sources = yield* getLiveSources; - return yield* Effect.forEach(sources, (source) => refreshOneSource(source), { - concurrency: "unbounded", - discard: true, - }).pipe(Effect.andThen(Ref.get(providersRef))); + return yield* Effect.forEach( + sources, + (source) => refreshOneSource(source), + { + concurrency: "unbounded", + discard: true, + }, + ).pipe(Effect.andThen(Ref.get(providersRef))); }); - const refresh = Effect.fn("refresh")(function* (provider?: ProviderDriverKind) { + const refresh = Effect.fn("refresh")(function* ( + provider?: ProviderDriverKind, + ) { if (provider === undefined) { return yield* refreshAll(); } @@ -467,7 +538,9 @@ export const ProviderRegistryLive = Layer.effect( instanceId: ProviderInstanceId, ) { const sources = yield* getLiveSources; - const providerSource = sources.find((candidate) => candidate.instanceId === instanceId); + const providerSource = sources.find( + (candidate) => candidate.instanceId === instanceId, + ); if (!providerSource) { return yield* Ref.get(providersRef); } @@ -514,7 +587,9 @@ export const ProviderRegistryLive = Layer.effect( const nextByInstance = new Map( instances.map((instance) => [instance.instanceId, instance] as const), ); - const knownInstanceIds = new Set(nextByInstance.keys()); + const knownInstanceIds = new Set( + nextByInstance.keys(), + ); for (const provider of unavailableProviders) { knownInstanceIds.add(snapshotInstanceKey(provider)); } @@ -534,7 +609,9 @@ export const ProviderRegistryLive = Layer.effect( // Collect new/rebuilt instances in `nextByInstance` insertion // order (which preserves settings-author order). - const newlyAdded: Array = []; + const newlyAdded: Array< + readonly [ProviderInstanceId, ProviderInstance] + > = []; for (const [instanceId, instance] of nextByInstance) { if (carriedOver.has(instanceId)) { continue; @@ -550,7 +627,9 @@ export const ProviderRegistryLive = Layer.effect( for (const [, instance] of newlyAdded) { const source = buildSnapshotSource(instance); yield* Stream.runForEach(source.streamChanges, (provider) => - correlateSnapshotWithSource(source, provider).pipe(Effect.flatMap(syncProvider)), + correlateSnapshotWithSource(source, provider).pipe( + Effect.flatMap(syncProvider), + ), ).pipe(Effect.forkScoped); } @@ -563,7 +642,9 @@ export const ProviderRegistryLive = Layer.effect( yield* Effect.forEach( newlyAdded, ([, instance]) => - refreshOneSource(buildSnapshotSource(instance)).pipe(Effect.ignoreCause({ log: true })), + refreshOneSource(buildSnapshotSource(instance)).pipe( + Effect.ignoreCause({ log: true }), + ), { concurrency: "unbounded", discard: true }, ); yield* upsertProviders(unavailableProviders, { @@ -624,7 +705,10 @@ export const ProviderRegistryLive = Layer.effect( // resolves. Cached snapshots (already in `providersRef`) merge with // these via `upsertProviders` so on-disk state wins where present // and pending fallbacks fill the gaps. - yield* upsertProviders(fallbackProviders, { publish: false }); + // + // (boot hydration path was refactored to produce `hydratedBootProviders` + // directly: fallback providers are now merged into `providersRef` + // during cache hydration rather than upserted in a separate pass.) // Subscribe to registry mutations BEFORE running the initial sync. // `subscribeChanges` acquires the dequeue synchronously in this // fibre; the subscription is active the instant this `yield*` @@ -654,10 +738,32 @@ export const ProviderRegistryLive = Layer.effect( // was dropped, which made any settings change that replaced an // instance never propagate to the aggregator's `providersRef`.) const instanceChanges = yield* instanceRegistry.subscribeChanges; - // Initial sync: subscribe + kick off refreshes for every instance - // present at boot. Run synchronously so consumers pulling immediately - // after the layer build see the correct aggregator state. - yield* syncLiveSources; + // Initial sync: fork scoped so provider refresh continues in the + // background even if it exceeds the startup budget. We await the + // forked fiber with a budget so fast providers preserve the old + // behavior (fresh state before server ready), while slow providers + // no longer block HTTP readiness. + const initialSyncFiber = yield* syncLiveSourcesAndContinue.pipe( + Effect.forkScoped, + ); + const initialSyncExit = yield* Fiber.await(initialSyncFiber).pipe( + Effect.timeoutOption(INITIAL_PROVIDER_REFRESH_BUDGET), + ); + if (Option.isSome(initialSyncExit)) { + yield* Effect.logInfo( + "provider registry initial refresh completed before startup budget", + { + budgetMs: Duration.toMillis(INITIAL_PROVIDER_REFRESH_BUDGET), + }, + ); + } else { + yield* Effect.logInfo( + "provider registry initial refresh exceeded startup budget; continuing in background", + { + budgetMs: Duration.toMillis(INITIAL_PROVIDER_REFRESH_BUDGET), + }, + ); + } // React to registry mutations — instance added / removed / rebuilt. // `Stream.fromSubscription` builds a stream over the pre-acquired // subscription rather than subscribing on stream start, which is @@ -673,9 +779,12 @@ export const ProviderRegistryLive = Layer.effect( if (Cause.hasInterruptsOnly(cause)) { return yield* Effect.interrupt; } - yield* Effect.logError("provider registry refresh failed; preserving cached providers", { - cause: Cause.pretty(cause), - }); + yield* Effect.logError( + "provider registry refresh failed; preserving cached providers", + { + cause: Cause.pretty(cause), + }, + ); return yield* Ref.get(providersRef); }); @@ -684,7 +793,9 @@ export const ProviderRegistryLive = Layer.effect( refresh: (provider?: ProviderDriverKind) => refresh(provider).pipe(Effect.catchCause(recoverRefreshFailure)), refreshInstance: (instanceId: ProviderInstanceId) => - refreshInstance(instanceId).pipe(Effect.catchCause(recoverRefreshFailure)), + refreshInstance(instanceId).pipe( + Effect.catchCause(recoverRefreshFailure), + ), getProviderMaintenanceCapabilitiesForInstance, setProviderMaintenanceActionState, get streamChanges() { From f4d3212d90309e805fbb4185ba979895a4b00d37 Mon Sep 17 00:00:00 2001 From: Idris Gadi Date: Fri, 15 May 2026 22:18:36 +0530 Subject: [PATCH 2/2] fix:perf: defer provider probing to background so it doens't block startup --- .../src/provider/Layers/ProviderRegistry.ts | 401 +++++++----------- packages/contracts/src/server.ts | 8 +- 2 files changed, 162 insertions(+), 247 deletions(-) diff --git a/apps/server/src/provider/Layers/ProviderRegistry.ts b/apps/server/src/provider/Layers/ProviderRegistry.ts index e2bc99210d7..0c6317e45e8 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.ts @@ -39,16 +39,12 @@ import * as PubSub from "effect/PubSub"; import * as Ref from "effect/Ref"; import * as Stream from "effect/Stream"; import * as Duration from "effect/Duration"; -import * as Fiber from "effect/Fiber"; import * as Option from "effect/Option"; import * as Semaphore from "effect/Semaphore"; import { ServerConfig } from "../../config.ts"; import { ProviderInstanceRegistry } from "../Services/ProviderInstanceRegistry.ts"; -import { - ProviderRegistry, - type ProviderRegistryShape, -} from "../Services/ProviderRegistry.ts"; +import { ProviderRegistry, type ProviderRegistryShape } from "../Services/ProviderRegistry.ts"; import { hydrateCachedProvider, orderProviderSnapshots, @@ -67,29 +63,23 @@ const loadProviders = ( providerSources, (providerSource) => providerSource.getSnapshot.pipe( - Effect.flatMap((snapshot) => - correlateSnapshotWithSource(providerSource, snapshot), - ), + Effect.flatMap((snapshot) => correlateSnapshotWithSource(providerSource, snapshot)), ), { concurrency: "unbounded", }, ); -const INITIAL_PROVIDER_REFRESH_BUDGET = Duration.millis(1_500); const BOOT_SNAPSHOT_FALLBACK_BUDGET = Duration.millis(100); -const makeManualProviderMaintenanceCapabilities = ( - provider: ProviderDriverKind, -) => +const makeManualProviderMaintenanceCapabilities = (provider: ProviderDriverKind) => makeManualOnlyProviderMaintenanceCapabilities({ provider, packageName: null, }); -const hasModelCapabilities = ( - model: ServerProvider["models"][number], -): boolean => (model.capabilities?.optionDescriptors?.length ?? 0) > 0; +const hasModelCapabilities = (model: ServerProvider["models"][number]): boolean => + (model.capabilities?.optionDescriptors?.length ?? 0) > 0; const mergeProviderModels = ( previousModels: ReadonlyArray, @@ -99,16 +89,10 @@ const mergeProviderModels = ( return previousModels; } - const previousBySlug = new Map( - previousModels.map((model) => [model.slug, model] as const), - ); + const previousBySlug = new Map(previousModels.map((model) => [model.slug, model] as const)); const mergedModels = nextModels.map((model) => { const previousModel = previousBySlug.get(model.slug); - if ( - !previousModel || - hasModelCapabilities(model) || - !hasModelCapabilities(previousModel) - ) { + if (!previousModel || hasModelCapabilities(model) || !hasModelCapabilities(previousModel)) { return model; } return { @@ -117,10 +101,7 @@ const mergeProviderModels = ( }; }); const nextSlugs = new Set(nextModels.map((model) => model.slug)); - return [ - ...mergedModels, - ...previousModels.filter((model) => !nextSlugs.has(model.slug)), - ]; + return [...mergedModels, ...previousModels.filter((model) => !nextSlugs.has(model.slug))]; }; export const mergeProviderSnapshot = ( @@ -131,10 +112,7 @@ export const mergeProviderSnapshot = ( ? nextProvider : { ...nextProvider, - models: mergeProviderModels( - previousProvider.models, - nextProvider.models, - ), + models: mergeProviderModels(previousProvider.models, nextProvider.models), }; export const mergeProviderSnapshots = ( @@ -142,18 +120,13 @@ export const mergeProviderSnapshots = ( nextProviders: ReadonlyArray, ): ReadonlyArray => { const mergedProviders = new Map( - previousProviders.map( - (provider) => [snapshotInstanceKey(provider), provider] as const, - ), + previousProviders.map((provider) => [snapshotInstanceKey(provider), provider] as const), ); for (const provider of nextProviders) { mergedProviders.set( snapshotInstanceKey(provider), - mergeProviderSnapshot( - mergedProviders.get(snapshotInstanceKey(provider)), - provider, - ), + mergeProviderSnapshot(mergedProviders.get(snapshotInstanceKey(provider)), provider), ); } @@ -206,9 +179,7 @@ const snapshotInstanceKey = (provider: ServerProvider): ProviderInstanceId => { // after `ProviderInstanceRegistry` rebuilds an instance (e.g. because // its settings changed), a fresh source rides the new PubSub instead // of a closed one. -const buildSnapshotSource = ( - instance: ProviderInstance, -): ProviderSnapshotSource => ({ +const buildSnapshotSource = (instance: ProviderInstance): ProviderSnapshotSource => ({ instanceId: instance.instanceId, driverKind: instance.driverKind, getSnapshot: instance.snapshot.getSnapshot, @@ -216,6 +187,21 @@ const buildSnapshotSource = ( streamChanges: instance.snapshot.streamChanges, }); +const buildPendingSnapshot = (instance: ProviderInstance): ServerProvider => ({ + instanceId: instance.instanceId, + driver: instance.driverKind, + displayName: instance.displayName, + enabled: instance.enabled, + installed: false, + version: null, + status: "pending", + auth: { status: "unknown" }, + checkedAt: "1970-01-01T00:00:00.000Z", + models: [], + slashCommands: [], + skills: [], +}); + export const ProviderRegistryLive = Layer.effect( ProviderRegistry, Effect.gen(function* () { @@ -243,19 +229,16 @@ export const ProviderRegistryLive = Layer.effect( // gaps for everything else. const bootInstances = yield* instanceRegistry.listInstances; const bootSources = bootInstances.map(buildSnapshotSource); + const bootInstanceByInstanceId = new Map( + bootInstances.map((instance) => [instance.instanceId, instance] as const), + ); const fallbackProviders = yield* loadProviders(bootSources).pipe( Effect.timeoutOption(BOOT_SNAPSHOT_FALLBACK_BUDGET), Effect.map(Option.getOrElse((): ReadonlyArray => [])), ); - const fallbackByInstance = new Map(); - for (let index = 0; index < fallbackProviders.length; index++) { - const provider = fallbackProviders[index]; - const source = bootSources[index]; - if (provider === undefined || source === undefined) { - continue; - } - fallbackByInstance.set(source.instanceId, provider); - } + const fallbackByInstance = new Map( + fallbackProviders.map((provider) => [provider.instanceId, provider] as const), + ); const hydratedBootProviders = yield* Effect.forEach( bootSources, @@ -272,74 +255,66 @@ export const ProviderRegistryLive = Layer.effect( instanceId: source.instanceId, }).pipe(Effect.provideService(Path.Path, path)); const fallbackProvider = fallbackByInstance.get(source.instanceId); + const orPendingSnapshot = ( + provider: ServerProvider | undefined, + ): ServerProvider | undefined => { + if (provider !== undefined) return provider; + const instance = bootInstanceByInstanceId.get(source.instanceId); + return instance !== undefined ? buildPendingSnapshot(instance) : undefined; + }; return yield* readProviderStatusCache(filePath).pipe( Effect.provideService(FileSystem.FileSystem, fileSystem), - Effect.flatMap( - (cachedProvider): Effect.Effect => { - if (cachedProvider === undefined) { - return Effect.succeed(fallbackProvider); - } - if ( - cachedProvider.instanceId !== source.instanceId || - cachedProvider.driver !== source.driverKind - ) { - return Effect.logWarning( - "provider status cache identity mismatch, ignoring", - { - path: filePath, - instanceId: source.instanceId, - cachedInstanceId: cachedProvider.instanceId ?? null, - driver: source.driverKind, - cachedDriver: cachedProvider.driver ?? null, - }, - ).pipe(Effect.as(fallbackProvider)); - } - if (fallbackProvider !== undefined) { - return Effect.succeed( - hydrateCachedProvider({ cachedProvider, fallbackProvider }), - ); - } - return Effect.succeed(cachedProvider); - }, - ), + Effect.flatMap((cachedProvider): Effect.Effect => { + if (cachedProvider === undefined) { + return Effect.succeed(orPendingSnapshot(fallbackProvider)); + } + if ( + cachedProvider.instanceId !== source.instanceId || + cachedProvider.driver !== source.driverKind + ) { + return Effect.logWarning("provider status cache identity mismatch, ignoring", { + path: filePath, + instanceId: source.instanceId, + cachedInstanceId: cachedProvider.instanceId ?? null, + driver: source.driverKind, + cachedDriver: cachedProvider.driver ?? null, + }).pipe(Effect.map(() => orPendingSnapshot(fallbackProvider))); + } + if (fallbackProvider !== undefined) { + return Effect.succeed(hydrateCachedProvider({ cachedProvider, fallbackProvider })); + } + return Effect.succeed(cachedProvider); + }), ); }), { concurrency: "unbounded" }, ).pipe( Effect.map((providers) => orderProviderSnapshots( - providers.filter( - (provider): provider is ServerProvider => provider !== undefined, - ), + providers.filter((provider): provider is ServerProvider => provider !== undefined), ), ), ); - const providersRef = yield* Ref.make>( - hydratedBootProviders, - ); + const providersRef = yield* Ref.make>(hydratedBootProviders); const maintenanceActionStatesRef = yield* Ref.make< - ReadonlyMap< - ProviderInstanceId, - { readonly update?: ServerProviderUpdateState | undefined } - > + ReadonlyMap >(new Map()); // Live-source registry — the dynamic counterpart to the boot-time // `bootSources`. Keyed by `instanceId`; the stored `ProviderInstance` // reference is used for identity equality so "no-op" reconciles // (settings unchanged) skip re-subscribing + re-probing. - const liveSubsRef = yield* Ref.make< - ReadonlyMap - >(new Map()); + const liveSubsRef = yield* Ref.make>( + new Map(), + ); // Serialize `syncLiveSources` so a rapid burst of reconciles doesn't // interleave two passes clobbering each other's fiber bookkeeping. const syncSemaphore = yield* Semaphore.make(1); - const getLiveSources: Effect.Effect> = - Ref.get(liveSubsRef).pipe( - Effect.map((map) => Array.from(map.values(), buildSnapshotSource)), - ); + const getLiveSources: Effect.Effect> = Ref.get( + liveSubsRef, + ).pipe(Effect.map((map) => Array.from(map.values(), buildSnapshotSource))); const persistProvider = (provider: ServerProvider) => Effect.gen(function* () { @@ -362,25 +337,20 @@ export const ProviderRegistryLive = Layer.effect( ); }); - const applyProviderUpdateState = Effect.fn("applyProviderUpdateState")( - function* (provider: ServerProvider) { - const maintenanceActionStates = yield* Ref.get( - maintenanceActionStatesRef, - ); - const updateState = maintenanceActionStates.get( - provider.instanceId, - )?.update; - if (!updateState) { - const { updateState: _updateState, ...providerWithoutUpdateState } = - provider; - return providerWithoutUpdateState; - } - return { - ...provider, - updateState, - }; - }, - ); + const applyProviderUpdateState = Effect.fn("applyProviderUpdateState")(function* ( + provider: ServerProvider, + ) { + const maintenanceActionStates = yield* Ref.get(maintenanceActionStatesRef); + const updateState = maintenanceActionStates.get(provider.instanceId)?.update; + if (!updateState) { + const { updateState: _updateState, ...providerWithoutUpdateState } = provider; + return providerWithoutUpdateState; + } + return { + ...provider, + updateState, + }; + }); const upsertProviders = Effect.fn("upsertProviders")(function* ( nextProviders: ReadonlyArray, @@ -397,12 +367,11 @@ export const ProviderRegistryLive = Layer.effect( concurrency: "unbounded", }, ); - const [previousProviders, providers, providersToPersist] = - yield* Ref.modify(providersRef, (previousProviders) => { + const [previousProviders, providers, providersToPersist] = yield* Ref.modify( + providersRef, + (previousProviders) => { const mergedProviders = new Map( - previousProviders.map( - (provider) => [snapshotInstanceKey(provider), provider] as const, - ), + previousProviders.map((provider) => [snapshotInstanceKey(provider), provider] as const), ); const updatedKeys = new Set(); @@ -417,17 +386,13 @@ export const ProviderRegistryLive = Layer.effect( ); } - const providers = orderProviderSnapshots([ - ...mergedProviders.values(), - ]); + const providers = orderProviderSnapshots([...mergedProviders.values()]); const providersToPersist = providers.filter((provider) => updatedKeys.has(snapshotInstanceKey(provider)), ); - return [ - [previousProviders, providers, providersToPersist] as const, - providers, - ]; - }); + return [[previousProviders, providers, providersToPersist] as const, providers]; + }, + ); if (haveProvidersChanged(previousProviders, providers)) { if (options?.persist !== false) { @@ -453,44 +418,44 @@ export const ProviderRegistryLive = Layer.effect( return yield* upsertProviders([provider], options); }); - const setProviderMaintenanceActionState = Effect.fn( - "setProviderMaintenanceActionState", - )(function* (input: { - readonly instanceId: ProviderInstanceId; - readonly action: "update"; - readonly state: ServerProviderUpdateState | null; - }) { - yield* Ref.update(maintenanceActionStatesRef, (previous) => { - const previousActions = previous.get(input.instanceId); - const nextActions = { ...previousActions }; - if (input.state === null || input.state.status === "idle") { - delete nextActions[input.action]; - } else { - nextActions[input.action] = input.state; - } + const setProviderMaintenanceActionState = Effect.fn("setProviderMaintenanceActionState")( + function* (input: { + readonly instanceId: ProviderInstanceId; + readonly action: "update"; + readonly state: ServerProviderUpdateState | null; + }) { + yield* Ref.update(maintenanceActionStatesRef, (previous) => { + const previousActions = previous.get(input.instanceId); + const nextActions = { ...previousActions }; + if (input.state === null || input.state.status === "idle") { + delete nextActions[input.action]; + } else { + nextActions[input.action] = input.state; + } - const next = new Map(previous); - if (Object.keys(nextActions).length === 0) { - next.delete(input.instanceId); - } else { - next.set(input.instanceId, nextActions); - } - return next; - }); + const next = new Map(previous); + if (Object.keys(nextActions).length === 0) { + next.delete(input.instanceId); + } else { + next.set(input.instanceId, nextActions); + } + return next; + }); - const existingProviders = yield* Ref.get(providersRef); - const matchingProvider = existingProviders.find( - (candidate) => candidate.instanceId === input.instanceId, - ); - if (!matchingProvider) { - return existingProviders; - } + const existingProviders = yield* Ref.get(providersRef); + const matchingProvider = existingProviders.find( + (candidate) => candidate.instanceId === input.instanceId, + ); + if (!matchingProvider) { + return existingProviders; + } - const nextProvider = yield* applyProviderUpdateState(matchingProvider); - return yield* upsertProviders([nextProvider], { - persist: false, - }); - }); + const nextProvider = yield* applyProviderUpdateState(matchingProvider); + return yield* upsertProviders([nextProvider], { + persist: false, + }); + }, + ); const refreshOneSource = Effect.fn("refreshOneSource")(function* ( providerSource: ProviderSnapshotSource, @@ -506,19 +471,13 @@ export const ProviderRegistryLive = Layer.effect( const refreshAll = Effect.fn("refreshAll")(function* () { const sources = yield* getLiveSources; - return yield* Effect.forEach( - sources, - (source) => refreshOneSource(source), - { - concurrency: "unbounded", - discard: true, - }, - ).pipe(Effect.andThen(Ref.get(providersRef))); + return yield* Effect.forEach(sources, (source) => refreshOneSource(source), { + concurrency: "unbounded", + discard: true, + }).pipe(Effect.andThen(Ref.get(providersRef))); }); - const refresh = Effect.fn("refresh")(function* ( - provider?: ProviderDriverKind, - ) { + const refresh = Effect.fn("refresh")(function* (provider?: ProviderDriverKind) { if (provider === undefined) { return yield* refreshAll(); } @@ -538,9 +497,7 @@ export const ProviderRegistryLive = Layer.effect( instanceId: ProviderInstanceId, ) { const sources = yield* getLiveSources; - const providerSource = sources.find( - (candidate) => candidate.instanceId === instanceId, - ); + const providerSource = sources.find((candidate) => candidate.instanceId === instanceId); if (!providerSource) { return yield* Ref.get(providersRef); } @@ -587,9 +544,7 @@ export const ProviderRegistryLive = Layer.effect( const nextByInstance = new Map( instances.map((instance) => [instance.instanceId, instance] as const), ); - const knownInstanceIds = new Set( - nextByInstance.keys(), - ); + const knownInstanceIds = new Set(nextByInstance.keys()); for (const provider of unavailableProviders) { knownInstanceIds.add(snapshotInstanceKey(provider)); } @@ -609,9 +564,7 @@ export const ProviderRegistryLive = Layer.effect( // Collect new/rebuilt instances in `nextByInstance` insertion // order (which preserves settings-author order). - const newlyAdded: Array< - readonly [ProviderInstanceId, ProviderInstance] - > = []; + const newlyAdded: Array = []; for (const [instanceId, instance] of nextByInstance) { if (carriedOver.has(instanceId)) { continue; @@ -627,9 +580,7 @@ export const ProviderRegistryLive = Layer.effect( for (const [, instance] of newlyAdded) { const source = buildSnapshotSource(instance); yield* Stream.runForEach(source.streamChanges, (provider) => - correlateSnapshotWithSource(source, provider).pipe( - Effect.flatMap(syncProvider), - ), + correlateSnapshotWithSource(source, provider).pipe(Effect.flatMap(syncProvider)), ).pipe(Effect.forkScoped); } @@ -642,9 +593,7 @@ export const ProviderRegistryLive = Layer.effect( yield* Effect.forEach( newlyAdded, ([, instance]) => - refreshOneSource(buildSnapshotSource(instance)).pipe( - Effect.ignoreCause({ log: true }), - ), + refreshOneSource(buildSnapshotSource(instance)).pipe(Effect.ignoreCause({ log: true })), { concurrency: "unbounded", discard: true }, ); yield* upsertProviders(unavailableProviders, { @@ -699,29 +648,15 @@ export const ProviderRegistryLive = Layer.effect( }), ); - // Seed `providersRef` with the boot-time fallback snapshots so - // consumers calling `getProviders` immediately after layer build see - // a populated list — even before the first `syncLiveSources` refresh - // resolves. Cached snapshots (already in `providersRef`) merge with - // these via `upsertProviders` so on-disk state wins where present - // and pending fallbacks fill the gaps. - // - // (boot hydration path was refactored to produce `hydratedBootProviders` - // directly: fallback providers are now merged into `providersRef` - // during cache hydration rather than upserted in a separate pass.) - // Subscribe to registry mutations BEFORE running the initial sync. - // `subscribeChanges` acquires the dequeue synchronously in this - // fibre; the subscription is active the instant this `yield*` - // returns. Forking the consumer loop later cannot lose a publish - // because no publish can reach a not-yet-subscribed dequeue. - // - // (Contrast with the pre-fix code that did - // `Stream.runForEach(instanceRegistry.streamChanges, …).pipe(Effect.forkScoped)`. - // `Stream.fromPubSub` defers `PubSub.subscribe` to stream start, - // and `forkScoped` only schedules the fibre — so a reconcile that - // published between "fibre scheduled" and "fibre starts running" - // was dropped, which made any settings change that replaced an - // instance never propagate to the aggregator's `providersRef`.) + // `providersRef` already contains boot-hydrated snapshots (cache + + // pending fallbacks). Seed unavailable instances eagerly so the UI + // sees ghost/fork-only drivers immediately, without waiting for the + // background refresh. + yield* upsertProviders(yield* instanceRegistry.listUnavailable, { + persist: false, + replace: true, + publish: false, + }); // Subscribe to registry mutations BEFORE running the initial sync. // `subscribeChanges` acquires the `PubSub.Subscription` synchronously // in this fibre; the subscription is registered with the PubSub the @@ -738,32 +673,11 @@ export const ProviderRegistryLive = Layer.effect( // was dropped, which made any settings change that replaced an // instance never propagate to the aggregator's `providersRef`.) const instanceChanges = yield* instanceRegistry.subscribeChanges; - // Initial sync: fork scoped so provider refresh continues in the - // background even if it exceeds the startup budget. We await the - // forked fiber with a budget so fast providers preserve the old - // behavior (fresh state before server ready), while slow providers - // no longer block HTTP readiness. - const initialSyncFiber = yield* syncLiveSourcesAndContinue.pipe( - Effect.forkScoped, - ); - const initialSyncExit = yield* Fiber.await(initialSyncFiber).pipe( - Effect.timeoutOption(INITIAL_PROVIDER_REFRESH_BUDGET), - ); - if (Option.isSome(initialSyncExit)) { - yield* Effect.logInfo( - "provider registry initial refresh completed before startup budget", - { - budgetMs: Duration.toMillis(INITIAL_PROVIDER_REFRESH_BUDGET), - }, - ); - } else { - yield* Effect.logInfo( - "provider registry initial refresh exceeded startup budget; continuing in background", - { - budgetMs: Duration.toMillis(INITIAL_PROVIDER_REFRESH_BUDGET), - }, - ); - } + // Initial sync: fork scoped so provider refresh runs purely in the + // background — no startup budget is consumed. The layer returns + // immediately after forking; provider state updates when refresh + // completes. + yield* syncLiveSourcesAndContinue.pipe(Effect.forkScoped); // React to registry mutations — instance added / removed / rebuilt. // `Stream.fromSubscription` builds a stream over the pre-acquired // subscription rather than subscribing on stream start, which is @@ -779,12 +693,9 @@ export const ProviderRegistryLive = Layer.effect( if (Cause.hasInterruptsOnly(cause)) { return yield* Effect.interrupt; } - yield* Effect.logError( - "provider registry refresh failed; preserving cached providers", - { - cause: Cause.pretty(cause), - }, - ); + yield* Effect.logError("provider registry refresh failed; preserving cached providers", { + cause: Cause.pretty(cause), + }); return yield* Ref.get(providersRef); }); @@ -793,9 +704,7 @@ export const ProviderRegistryLive = Layer.effect( refresh: (provider?: ProviderDriverKind) => refresh(provider).pipe(Effect.catchCause(recoverRefreshFailure)), refreshInstance: (instanceId: ProviderInstanceId) => - refreshInstance(instanceId).pipe( - Effect.catchCause(recoverRefreshFailure), - ), + refreshInstance(instanceId).pipe(Effect.catchCause(recoverRefreshFailure)), getProviderMaintenanceCapabilitiesForInstance, setProviderMaintenanceActionState, get streamChanges() { diff --git a/packages/contracts/src/server.ts b/packages/contracts/src/server.ts index 85ff4a4b2cb..aa5ca210d2e 100644 --- a/packages/contracts/src/server.ts +++ b/packages/contracts/src/server.ts @@ -40,7 +40,13 @@ export type ServerConfigIssue = typeof ServerConfigIssue.Type; const ServerConfigIssues = Schema.Array(ServerConfigIssue); -export const ServerProviderState = Schema.Literals(["ready", "warning", "error", "disabled"]); +export const ServerProviderState = Schema.Literals([ + "ready", + "warning", + "error", + "disabled", + "pending", +]); export type ServerProviderState = typeof ServerProviderState.Type; export const ServerProviderAuthStatus = Schema.Literals([