diff --git a/apps/server/src/provider/Layers/ProviderRegistry.ts b/apps/server/src/provider/Layers/ProviderRegistry.ts index 22a120f0a52..0c6317e45e8 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.ts @@ -38,6 +38,8 @@ import * as Path from "effect/Path"; import * as PubSub from "effect/PubSub"; import * as Ref from "effect/Ref"; import * as Stream from "effect/Stream"; +import * as Duration from "effect/Duration"; +import * as Option from "effect/Option"; import * as Semaphore from "effect/Semaphore"; import { ServerConfig } from "../../config.ts"; @@ -45,7 +47,6 @@ import { ProviderInstanceRegistry } from "../Services/ProviderInstanceRegistry.t import { ProviderRegistry, type ProviderRegistryShape } from "../Services/ProviderRegistry.ts"; import { hydrateCachedProvider, - isCachedProviderCorrelated, orderProviderSnapshots, readProviderStatusCache, resolveProviderStatusCachePath, @@ -69,6 +70,8 @@ const loadProviders = ( }, ); +const BOOT_SNAPSHOT_FALLBACK_BUDGET = Duration.millis(100); + const makeManualProviderMaintenanceCapabilities = (provider: ProviderDriverKind) => makeManualOnlyProviderMaintenanceCapabilities({ provider, @@ -184,6 +187,21 @@ const buildSnapshotSource = (instance: ProviderInstance): ProviderSnapshotSource streamChanges: instance.snapshot.streamChanges, }); +const buildPendingSnapshot = (instance: ProviderInstance): ServerProvider => ({ + instanceId: instance.instanceId, + driver: instance.driverKind, + displayName: instance.displayName, + enabled: instance.enabled, + installed: false, + version: null, + status: "pending", + auth: { status: "unknown" }, + checkedAt: "1970-01-01T00:00:00.000Z", + models: [], + slashCommands: [], + skills: [], +}); + export const ProviderRegistryLive = Layer.effect( ProviderRegistry, Effect.gen(function* () { @@ -204,20 +222,25 @@ export const ProviderRegistryLive = Layer.effect( // Instances added post-boot skip this path; their first entry in // `providersRef` comes from the reactive `syncLiveSources` pass // below. + // + // The boot fallback is bounded so slow provider probes can't block + // HTTP readiness. Fast `getSnapshot` implementations (tests, cheap + // providers) still seed state within the budget; cache fills the + // gaps for everything else. const bootInstances = yield* instanceRegistry.listInstances; const bootSources = bootInstances.map(buildSnapshotSource); - const fallbackProviders = yield* loadProviders(bootSources); - const fallbackByInstance = new Map(); - for (let index = 0; index < fallbackProviders.length; index++) { - const provider = fallbackProviders[index]; - const source = bootSources[index]; - if (provider === undefined || source === undefined) { - continue; - } - fallbackByInstance.set(source.instanceId, provider); - } + const bootInstanceByInstanceId = new Map( + bootInstances.map((instance) => [instance.instanceId, instance] as const), + ); + const fallbackProviders = yield* loadProviders(bootSources).pipe( + Effect.timeoutOption(BOOT_SNAPSHOT_FALLBACK_BUDGET), + Effect.map(Option.getOrElse((): ReadonlyArray => [])), + ); + const fallbackByInstance = new Map( + fallbackProviders.map((provider) => [provider.instanceId, provider] as const), + ); - const cachedProviders = yield* Effect.forEach( + const hydratedBootProviders = yield* Effect.forEach( bootSources, (source) => Effect.gen(function* () { @@ -225,36 +248,43 @@ export const ProviderRegistryLive = Layer.effect( // instance of a built-in kind the path equals `.json` — // identical to the legacy filename. We still require the cache // payload to carry matching instance id + driver kind; old - // identity-less payloads are discarded and the awaited refresh - // below repopulates the cache. + // identity-less payloads are discarded and the background + // refresh repopulates the cache. const filePath = yield* resolveProviderStatusCachePath({ cacheDir: config.providerStatusCacheDir, instanceId: source.instanceId, }).pipe(Effect.provideService(Path.Path, path)); const fallbackProvider = fallbackByInstance.get(source.instanceId); - if (fallbackProvider === undefined) { - return undefined; - } + const orPendingSnapshot = ( + provider: ServerProvider | undefined, + ): ServerProvider | undefined => { + if (provider !== undefined) return provider; + const instance = bootInstanceByInstanceId.get(source.instanceId); + return instance !== undefined ? buildPendingSnapshot(instance) : undefined; + }; + return yield* readProviderStatusCache(filePath).pipe( Effect.provideService(FileSystem.FileSystem, fileSystem), - Effect.flatMap((cachedProvider) => { + Effect.flatMap((cachedProvider): Effect.Effect => { if (cachedProvider === undefined) { - return Effect.void.pipe(Effect.as(undefined as ServerProvider | undefined)); + return Effect.succeed(orPendingSnapshot(fallbackProvider)); } - const correlation = { - cachedProvider, - fallbackProvider, - } as const; - if (!isCachedProviderCorrelated(correlation)) { + if ( + cachedProvider.instanceId !== source.instanceId || + cachedProvider.driver !== source.driverKind + ) { return Effect.logWarning("provider status cache identity mismatch, ignoring", { path: filePath, instanceId: source.instanceId, cachedInstanceId: cachedProvider.instanceId ?? null, driver: source.driverKind, cachedDriver: cachedProvider.driver ?? null, - }).pipe(Effect.as(undefined as ServerProvider | undefined)); + }).pipe(Effect.map(() => orPendingSnapshot(fallbackProvider))); + } + if (fallbackProvider !== undefined) { + return Effect.succeed(hydrateCachedProvider({ cachedProvider, fallbackProvider })); } - return Effect.succeed(hydrateCachedProvider(correlation)); + return Effect.succeed(cachedProvider); }), ); }), @@ -266,7 +296,7 @@ export const ProviderRegistryLive = Layer.effect( ), ), ); - const providersRef = yield* Ref.make>(cachedProviders); + const providersRef = yield* Ref.make>(hydratedBootProviders); const maintenanceActionStatesRef = yield* Ref.make< ReadonlyMap >(new Map()); @@ -618,26 +648,15 @@ export const ProviderRegistryLive = Layer.effect( }), ); - // Seed `providersRef` with the boot-time fallback snapshots so - // consumers calling `getProviders` immediately after layer build see - // a populated list — even before the first `syncLiveSources` refresh - // resolves. Cached snapshots (already in `providersRef`) merge with - // these via `upsertProviders` so on-disk state wins where present - // and pending fallbacks fill the gaps. - yield* upsertProviders(fallbackProviders, { publish: false }); - // Subscribe to registry mutations BEFORE running the initial sync. - // `subscribeChanges` acquires the dequeue synchronously in this - // fibre; the subscription is active the instant this `yield*` - // returns. Forking the consumer loop later cannot lose a publish - // because no publish can reach a not-yet-subscribed dequeue. - // - // (Contrast with the pre-fix code that did - // `Stream.runForEach(instanceRegistry.streamChanges, …).pipe(Effect.forkScoped)`. - // `Stream.fromPubSub` defers `PubSub.subscribe` to stream start, - // and `forkScoped` only schedules the fibre — so a reconcile that - // published between "fibre scheduled" and "fibre starts running" - // was dropped, which made any settings change that replaced an - // instance never propagate to the aggregator's `providersRef`.) + // `providersRef` already contains boot-hydrated snapshots (cache + + // pending fallbacks). Seed unavailable instances eagerly so the UI + // sees ghost/fork-only drivers immediately, without waiting for the + // background refresh. + yield* upsertProviders(yield* instanceRegistry.listUnavailable, { + persist: false, + replace: true, + publish: false, + }); // Subscribe to registry mutations BEFORE running the initial sync. // `subscribeChanges` acquires the `PubSub.Subscription` synchronously // in this fibre; the subscription is registered with the PubSub the @@ -654,10 +673,11 @@ export const ProviderRegistryLive = Layer.effect( // was dropped, which made any settings change that replaced an // instance never propagate to the aggregator's `providersRef`.) const instanceChanges = yield* instanceRegistry.subscribeChanges; - // Initial sync: subscribe + kick off refreshes for every instance - // present at boot. Run synchronously so consumers pulling immediately - // after the layer build see the correct aggregator state. - yield* syncLiveSources; + // Initial sync: fork scoped so provider refresh runs purely in the + // background — no startup budget is consumed. The layer returns + // immediately after forking; provider state updates when refresh + // completes. + yield* syncLiveSourcesAndContinue.pipe(Effect.forkScoped); // React to registry mutations — instance added / removed / rebuilt. // `Stream.fromSubscription` builds a stream over the pre-acquired // subscription rather than subscribing on stream start, which is diff --git a/packages/contracts/src/server.ts b/packages/contracts/src/server.ts index 85ff4a4b2cb..aa5ca210d2e 100644 --- a/packages/contracts/src/server.ts +++ b/packages/contracts/src/server.ts @@ -40,7 +40,13 @@ export type ServerConfigIssue = typeof ServerConfigIssue.Type; const ServerConfigIssues = Schema.Array(ServerConfigIssue); -export const ServerProviderState = Schema.Literals(["ready", "warning", "error", "disabled"]); +export const ServerProviderState = Schema.Literals([ + "ready", + "warning", + "error", + "disabled", + "pending", +]); export type ServerProviderState = typeof ServerProviderState.Type; export const ServerProviderAuthStatus = Schema.Literals([