diff --git a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts index 645568d734d..041bc402034 100644 --- a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts +++ b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts @@ -86,10 +86,13 @@ describe("CheckpointDiffQueryLive", () => { Layer.provideMerge(Layer.succeed(CheckpointStore, checkpointStore)), Layer.provideMerge( Layer.succeed(ProjectionSnapshotQuery, { + getCommandReadModel: () => + Effect.die("CheckpointDiffQuery should not request the command read model"), getSnapshot: () => Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"), getShellSnapshot: () => Effect.die("CheckpointDiffQuery should not request the orchestration shell snapshot"), + getSnapshotSequence: () => Effect.succeed({ snapshotSequence: 0 }), getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }), getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), getProjectShellById: () => Effect.succeed(Option.none()), @@ -163,10 +166,13 @@ describe("CheckpointDiffQueryLive", () => { Layer.provideMerge(Layer.succeed(CheckpointStore, checkpointStore)), Layer.provideMerge( Layer.succeed(ProjectionSnapshotQuery, { + getCommandReadModel: () => + Effect.die("CheckpointDiffQuery should not request the command read model"), getSnapshot: () => Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"), getShellSnapshot: () => Effect.die("CheckpointDiffQuery should not request the orchestration shell snapshot"), + getSnapshotSequence: () => Effect.succeed({ snapshotSequence: 0 }), getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }), getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), getProjectShellById: () => Effect.succeed(Option.none()), @@ -208,10 +214,13 @@ describe("CheckpointDiffQueryLive", () => { Layer.provideMerge(Layer.succeed(CheckpointStore, checkpointStore)), Layer.provideMerge( Layer.succeed(ProjectionSnapshotQuery, { + getCommandReadModel: () => + Effect.die("CheckpointDiffQuery should not request the command read model"), getSnapshot: () => Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"), getShellSnapshot: () => Effect.die("CheckpointDiffQuery should not request the orchestration shell snapshot"), + getSnapshotSequence: () => Effect.succeed({ snapshotSequence: 0 }), getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }), getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), getProjectShellById: () => Effect.succeed(Option.none()), diff --git a/apps/server/src/cli.test.ts b/apps/server/src/cli.test.ts index 3ef8c441959..3740d1f868e 100644 --- a/apps/server/src/cli.test.ts +++ b/apps/server/src/cli.test.ts @@ -17,7 +17,6 @@ import { Command } from "effect/unstable/cli"; import { cli } from "./cli.ts"; import { deriveServerPaths, ServerConfig, type ServerConfigShape } from "./config.ts"; -import { OrchestrationEngineService } from "./orchestration/Services/OrchestrationEngine.ts"; import { ProjectionSnapshotQuery } from "./orchestration/Services/ProjectionSnapshotQuery.ts"; import { OrchestrationLayerLive } from "./orchestration/runtimeLayer.ts"; import { @@ -316,8 +315,8 @@ it.layer(NodeServices.layer)("cli log-level parsing", (it) => { "--base-dir", baseDir, ]); - const orchestrationEngine = yield* OrchestrationEngineService; - const readModel = yield* orchestrationEngine.getReadModel(); + const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; + const readModel = yield* projectionSnapshotQuery.getSnapshot(); const addedProject = readModel.projects.find( (project) => project.workspaceRoot === workspaceRoot && project.deletedAt === null, ); diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts index ad5fb59bd1e..862e4de3cbe 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts @@ -41,6 +41,7 @@ import { type OrchestrationEngineShape, } from "../Services/OrchestrationEngine.ts"; import { CheckpointReactor } from "../Services/CheckpointReactor.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; import { ProviderService, type ProviderServiceShape, @@ -132,7 +133,14 @@ function createProviderServiceHarness( } async function waitForThread( - engine: OrchestrationEngineShape, + readModel: () => Promise<{ + readonly threads: ReadonlyArray<{ + readonly id: ThreadId; + readonly latestTurn: { readonly turnId: string } | null; + readonly checkpoints: ReadonlyArray<{ readonly checkpointTurnCount: number }>; + readonly activities: ReadonlyArray<{ readonly kind: string }>; + }>; + }>, predicate: (thread: { latestTurn: { turnId: string } | null; checkpoints: ReadonlyArray<{ checkpointTurnCount: number }>; @@ -146,8 +154,8 @@ async function waitForThread( checkpoints: ReadonlyArray<{ checkpointTurnCount: number }>; activities: ReadonlyArray<{ kind: string }>; }> => { - const readModel = await Effect.runPromise(engine.getReadModel()); - const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + const snapshot = await readModel(); + const thread = snapshot.threads.find((entry) => entry.id === ThreadId.make("thread-1")); if (thread && predicate(thread)) { return thread; } @@ -231,7 +239,7 @@ async function waitForGitRefExists(cwd: string, ref: string, timeoutMs = 15_000) describe("CheckpointReactor", () => { let runtime: ManagedRuntime.ManagedRuntime< - OrchestrationEngineService | CheckpointReactor | CheckpointStore, + OrchestrationEngineService | CheckpointReactor | CheckpointStore | ProjectionSnapshotQuery, unknown > | null = null; let scope: Scope.Closeable | null = null; @@ -279,6 +287,10 @@ describe("CheckpointReactor", () => { Layer.provide(RepositoryIdentityResolverLive), Layer.provide(SqlitePersistenceMemory), ); + const projectionSnapshotLayer = OrchestrationProjectionSnapshotQueryLive.pipe( + Layer.provide(RepositoryIdentityResolverLive), + Layer.provide(SqlitePersistenceMemory), + ); const ServerConfigLayer = ServerConfig.layerTest(process.cwd(), { prefix: "t3-checkpoint-reactor-test-", @@ -304,6 +316,7 @@ describe("CheckpointReactor", () => { const layer = CheckpointReactorLive.pipe( Layer.provideMerge(orchestrationLayer), + Layer.provideMerge(projectionSnapshotLayer), Layer.provideMerge(RuntimeReceiptBusLive), Layer.provideMerge(Layer.succeed(ProviderService, provider.service)), Layer.provideMerge(vcsStatusBroadcasterLayer), @@ -322,6 +335,7 @@ describe("CheckpointReactor", () => { runtime = ManagedRuntime.make(layer); const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); + const snapshotQuery = await runtime.runPromise(Effect.service(ProjectionSnapshotQuery)); const reactor = await runtime.runPromise(Effect.service(CheckpointReactor)); const checkpointStore = await runtime.runPromise(Effect.service(CheckpointStore)); scope = await Effect.runPromise(Scope.make("sequential")); @@ -387,6 +401,7 @@ describe("CheckpointReactor", () => { return { engine, + readModel: () => Effect.runPromise(snapshotQuery.getSnapshot()), provider, cwd, drain, @@ -443,7 +458,7 @@ describe("CheckpointReactor", () => { await waitForEvent(harness.engine, (event) => event.type === "thread.turn-diff-completed"); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.latestTurn?.turnId === "turn-1" && entry.checkpoints.length === 1, ); expect(thread.checkpoints[0]?.checkpointTurnCount).toBe(1); @@ -541,7 +556,7 @@ describe("CheckpointReactor", () => { }); await harness.drain(); - const midReadModel = await Effect.runPromise(harness.engine.getReadModel()); + const midReadModel = await harness.readModel(); const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(midThread?.checkpoints).toHaveLength(0); @@ -557,7 +572,7 @@ describe("CheckpointReactor", () => { }); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.latestTurn?.turnId === "turn-main" && entry.checkpoints.length === 1, ); expect(thread.checkpoints[0]?.checkpointTurnCount).toBe(1); @@ -614,7 +629,7 @@ describe("CheckpointReactor", () => { await waitForEvent(harness.engine, (event) => event.type === "thread.turn-diff-completed"); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.latestTurn?.turnId === "turn-claude-1" && entry.checkpoints.length === 1, ); @@ -659,7 +674,7 @@ describe("CheckpointReactor", () => { await waitForEvent(harness.engine, (event) => event.type === "thread.turn-diff-completed"); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.checkpoints.length === 1 && entry.activities.some((activity) => activity.kind === "checkpoint.capture.failed"), @@ -794,7 +809,7 @@ describe("CheckpointReactor", () => { }); await harness.drain(); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.checkpoints.some((checkpoint) => checkpoint.checkpointTurnCount === 3)).toBe( false, @@ -923,7 +938,10 @@ describe("CheckpointReactor", () => { ); await waitForEvent(harness.engine, (event) => event.type === "thread.reverted"); - const thread = await waitForThread(harness.engine, (entry) => entry.checkpoints.length === 1); + const thread = await waitForThread( + harness.readModel, + (entry) => entry.checkpoints.length === 1, + ); expect(thread.latestTurn?.turnId).toBe("turn-1"); expect(thread.checkpoints).toHaveLength(1); @@ -1105,7 +1123,7 @@ describe("CheckpointReactor", () => { }), ); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.activities.some((activity) => activity.kind === "checkpoint.revert.failed"), ); diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.ts index e534f6851ae..001b757cf80 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.ts @@ -20,6 +20,7 @@ import { CheckpointStore } from "../../checkpointing/Services/CheckpointStore.ts import { ProviderService } from "../../provider/Services/ProviderService.ts"; import { CheckpointReactor, type CheckpointReactorShape } from "../Services/CheckpointReactor.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; import { RuntimeReceiptBus } from "../Services/RuntimeReceiptBus.ts"; import type { CheckpointStoreError } from "../../checkpointing/Errors.ts"; import type { OrchestrationDispatchError } from "../Errors.ts"; @@ -66,6 +67,7 @@ const serverCommandId = (tag: string): CommandId => const make = Effect.gen(function* () { const orchestrationEngine = yield* OrchestrationEngineService; + const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; const providerService = yield* ProviderService; const checkpointStore = yield* CheckpointStore; const receiptBus = yield* RuntimeReceiptBus; @@ -124,29 +126,26 @@ const make = Effect.gen(function* () { const resolveSessionRuntimeForThread = Effect.fn("resolveSessionRuntimeForThread")(function* ( threadId: ThreadId, ): Effect.fn.Return> { - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === threadId); - const sessions = yield* providerService.listSessions(); + const session = sessions.find((entry) => entry.threadId === threadId); + return session?.cwd + ? Option.some({ threadId: session.threadId, cwd: session.cwd }) + : Option.none(); + }); - const findSessionWithCwd = ( - session: (typeof sessions)[number] | undefined, - ): Option.Option<{ readonly threadId: ThreadId; readonly cwd: string }> => { - if (!session?.cwd) { - return Option.none(); - } - return Option.some({ threadId: session.threadId, cwd: session.cwd }); - }; - - if (thread) { - const projectedSession = sessions.find((session) => session.threadId === thread.id); - const fromProjected = findSessionWithCwd(projectedSession); - if (Option.isSome(fromProjected)) { - return fromProjected; - } - } + const resolveThreadDetail = Effect.fn("resolveThreadDetail")(function* (threadId: ThreadId) { + return yield* projectionSnapshotQuery + .getThreadDetailById(threadId) + .pipe(Effect.map(Option.getOrUndefined)); + }); - return Option.none(); + const resolveThreadProjects = Effect.fn("resolveThreadProjects")(function* ( + projectId: ProjectId, + ) { + const project = yield* projectionSnapshotQuery + .getProjectShellById(projectId) + .pipe(Effect.map(Option.getOrUndefined)); + return project ? [project] : []; }); const isGitWorkspace = (cwd: string) => isGitRepository(cwd); @@ -331,8 +330,7 @@ const make = Effect.gen(function* () { return; } - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === event.threadId); + const thread = yield* resolveThreadDetail(event.threadId); if (!thread) { return; } @@ -353,10 +351,11 @@ const make = Effect.gen(function* () { return; } + const projects = yield* resolveThreadProjects(thread.projectId); const checkpointCwd = yield* resolveCheckpointCwd({ threadId: thread.id, thread, - projects: readModel.projects, + projects, preferSessionRuntime: true, }); if (!checkpointCwd) { @@ -407,8 +406,7 @@ const make = Effect.gen(function* () { return; } - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === threadId); + const thread = yield* resolveThreadDetail(threadId); if (!thread) { yield* Effect.logWarning("checkpoint capture from placeholder skipped: thread not found", { threadId, @@ -429,10 +427,11 @@ const make = Effect.gen(function* () { return; } + const projects = yield* resolveThreadProjects(thread.projectId); const checkpointCwd = yield* resolveCheckpointCwd({ threadId, thread, - projects: readModel.projects, + projects, preferSessionRuntime: true, }); if (!checkpointCwd) { @@ -458,16 +457,16 @@ const make = Effect.gen(function* () { return; } - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === event.threadId); + const thread = yield* resolveThreadDetail(event.threadId); if (!thread) { return; } + const projects = yield* resolveThreadProjects(thread.projectId); const checkpointCwd = yield* resolveCheckpointCwd({ threadId: thread.id, thread, - projects: readModel.projects, + projects, preferSessionRuntime: false, }); if (!checkpointCwd) { @@ -540,16 +539,16 @@ const make = Effect.gen(function* () { } const threadId = event.payload.threadId; - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === threadId); + const thread = yield* resolveThreadDetail(threadId); if (!thread) { return; } + const projects = yield* resolveThreadProjects(thread.projectId); const checkpointCwd = yield* resolveCheckpointCwd({ threadId, thread, - projects: readModel.projects, + projects, preferSessionRuntime: false, }); if (!checkpointCwd) { @@ -587,8 +586,7 @@ const make = Effect.gen(function* () { ) { const now = new Date().toISOString(); - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === event.payload.threadId); + const thread = yield* resolveThreadDetail(event.payload.threadId); if (!thread) { yield* appendRevertFailureActivity({ threadId: event.payload.threadId, diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts index 0af5b099a64..90d849fd826 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts @@ -8,6 +8,7 @@ import { TurnId, type OrchestrationEvent, ProviderInstanceId, + type OrchestrationReadModel, } from "@t3tools/contracts"; import { Effect, Layer, ManagedRuntime, Metric, Option, Queue, Stream } from "effect"; import { describe, expect, it } from "vitest"; @@ -42,9 +43,13 @@ async function createOrchestrationSystem() { const ServerConfigLayer = ServerConfig.layerTest(process.cwd(), { prefix: "t3-orchestration-engine-test-", }); - const orchestrationLayer = OrchestrationEngineLive.pipe( - Layer.provide(OrchestrationProjectionSnapshotQueryLive), - Layer.provide(OrchestrationProjectionPipelineLive), + const orchestrationLayer = Layer.mergeAll( + OrchestrationEngineLive.pipe( + Layer.provide(OrchestrationProjectionSnapshotQueryLive), + Layer.provide(OrchestrationProjectionPipelineLive), + ), + OrchestrationProjectionSnapshotQueryLive, + ).pipe( Layer.provide(OrchestrationEventStoreLive), Layer.provide(OrchestrationCommandReceiptRepositoryLive), Layer.provide(RepositoryIdentityResolverLive), @@ -54,8 +59,10 @@ async function createOrchestrationSystem() { ); const runtime = ManagedRuntime.make(orchestrationLayer); const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); + const snapshotQuery = await runtime.runPromise(Effect.service(ProjectionSnapshotQuery)); return { engine, + readModel: () => runtime.runPromise(snapshotQuery.getSnapshot()), run: (effect: Effect.Effect) => runtime.runPromise(effect), dispose: () => runtime.dispose(), }; @@ -77,15 +84,18 @@ const hasMetricSnapshot = ( ); describe("OrchestrationEngine", () => { - it("bootstraps the in-memory read model from persisted projections", async () => { - const failOnHistoricalReplayStore: OrchestrationEventStoreShape = { - append: () => - Effect.fail( - new PersistenceSqlError({ - operation: "test.append", - detail: "append should not be called during bootstrap", - }), - ), + it("bootstraps command handling from persisted projections without reading the full snapshot", async () => { + let nextSequence = 8; + const eventStore: OrchestrationEventStoreShape = { + append: (event) => + Effect.sync(() => { + const savedEvent = { + ...event, + sequence: nextSequence, + } as OrchestrationEvent; + nextSequence += 1; + return savedEvent; + }), readFromSequence: () => Stream.empty, readAll: () => Stream.fail( @@ -140,11 +150,27 @@ describe("OrchestrationEngine", () => { }, ], }; + const commandReadModel = { + ...projectionSnapshot, + threads: projectionSnapshot.threads.map((thread) => ({ + ...thread, + messages: [], + proposedPlans: [], + activities: [], + checkpoints: [], + })), + }; + let fullSnapshotReadCount = 0; const layer = OrchestrationEngineLive.pipe( Layer.provide( Layer.succeed(ProjectionSnapshotQuery, { - getSnapshot: () => Effect.succeed(projectionSnapshot), + getCommandReadModel: () => Effect.succeed(commandReadModel), + getSnapshot: () => + Effect.sync(() => { + fullSnapshotReadCount += 1; + return projectionSnapshot; + }), getShellSnapshot: () => Effect.succeed({ snapshotSequence: projectionSnapshot.snapshotSequence, @@ -152,6 +178,8 @@ describe("OrchestrationEngine", () => { threads: [], updatedAt: projectionSnapshot.updatedAt, }), + getSnapshotSequence: () => + Effect.succeed({ snapshotSequence: projectionSnapshot.snapshotSequence }), getCounts: () => Effect.succeed({ projectCount: 1, threadCount: 1 }), getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), getProjectShellById: () => Effect.succeed(Option.none()), @@ -167,7 +195,7 @@ describe("OrchestrationEngine", () => { projectEvent: () => Effect.void, } satisfies OrchestrationProjectionPipelineShape), ), - Layer.provide(Layer.succeed(OrchestrationEventStore, failOnHistoricalReplayStore)), + Layer.provide(Layer.succeed(OrchestrationEventStore, eventStore)), Layer.provide(OrchestrationCommandReceiptRepositoryLive), Layer.provide(SqlitePersistenceMemory), ); @@ -175,18 +203,22 @@ describe("OrchestrationEngine", () => { const runtime = ManagedRuntime.make(layer); const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); - const readModel = await runtime.runPromise(engine.getReadModel()); + const result = await runtime.runPromise( + engine.dispatch({ + type: "thread.meta.update", + commandId: CommandId.make("cmd-bootstrap-thread-update"), + threadId: ThreadId.make("thread-bootstrap"), + title: "Updated Bootstrap Thread", + }), + ); - expect(readModel.snapshotSequence).toBe(7); - expect(readModel.projects).toHaveLength(1); - expect(readModel.projects[0]?.title).toBe("Bootstrap Project"); - expect(readModel.threads).toHaveLength(1); - expect(readModel.threads[0]?.title).toBe("Bootstrap Thread"); + expect(result.sequence).toBe(8); + expect(fullSnapshotReadCount).toBe(0); await runtime.dispose(); }); - it("returns deterministic read models for repeated reads", async () => { + it("persists deterministic read models for repeated snapshot reads", async () => { const createdAt = now(); const system = await createOrchestrationSystem(); const { engine } = system; @@ -240,8 +272,8 @@ describe("OrchestrationEngine", () => { }), ); - const readModelA = await system.run(engine.getReadModel()); - const readModelB = await system.run(engine.getReadModel()); + const readModelA = await system.readModel(); + const readModelB = await system.readModel(); expect(readModelB).toEqual(readModelA); await system.dispose(); }); @@ -292,9 +324,8 @@ describe("OrchestrationEngine", () => { }), ); expect( - (await system.run(engine.getReadModel())).threads.find( - (thread) => thread.id === "thread-archive", - )?.archivedAt, + (await system.readModel()).threads.find((thread) => thread.id === "thread-archive") + ?.archivedAt, ).not.toBeNull(); await system.run( @@ -305,9 +336,8 @@ describe("OrchestrationEngine", () => { }), ); expect( - (await system.run(engine.getReadModel())).threads.find( - (thread) => thread.id === "thread-archive", - )?.archivedAt, + (await system.readModel()).threads.find((thread) => thread.id === "thread-archive") + ?.archivedAt, ).toBeNull(); await system.dispose(); @@ -573,7 +603,7 @@ describe("OrchestrationEngine", () => { }), ); - const thread = (await system.run(engine.getReadModel())).threads.find( + const thread = (await system.readModel()).threads.find( (entry) => entry.id === "thread-turn-diff", ); expect(thread?.checkpoints).toEqual([ @@ -701,7 +731,15 @@ describe("OrchestrationEngine", () => { ); expect(result.sequence).toBe(2); - expect((await runtime.runPromise(engine.getReadModel())).snapshotSequence).toBe(2); + const eventsAfterRetry = await runtime.runPromise( + Stream.runCollect(engine.readEvents(0)).pipe( + Effect.map((chunk): OrchestrationEvent[] => Array.from(chunk)), + ), + ); + expect(eventsAfterRetry.map((event) => event.type)).toEqual([ + "project.created", + "thread.created", + ]); await runtime.dispose(); }); @@ -801,7 +839,6 @@ describe("OrchestrationEngine", () => { "project.created", "thread.created", ]); - expect((await runtime.runPromise(engine.getReadModel())).snapshotSequence).toBe(2); const retryResult = await runtime.runPromise(engine.dispatch(turnStartCommand)); expect(retryResult.sequence).toBe(4); @@ -824,7 +861,7 @@ describe("OrchestrationEngine", () => { await runtime.dispose(); }); - it("reconciles in-memory state when append persists but projection fails", async () => { + it("reconciles command state when append persists but projection fails", async () => { type StoredEvent = ReturnType extends Effect.Effect ? A @@ -856,7 +893,7 @@ describe("OrchestrationEngine", () => { projectEvent: (event) => { if ( shouldFailProjection && - event.commandId === CommandId.make("cmd-thread-meta-sync-fail") + event.commandId === CommandId.make("cmd-thread-archive-sync-fail") ) { shouldFailProjection = false; return Effect.fail( @@ -919,20 +956,22 @@ describe("OrchestrationEngine", () => { await expect( runtime.runPromise( engine.dispatch({ - type: "thread.meta.update", - commandId: CommandId.make("cmd-thread-meta-sync-fail"), + type: "thread.archive", + commandId: CommandId.make("cmd-thread-archive-sync-fail"), threadId: ThreadId.make("thread-sync"), - title: "sync-after-failed-projection", }), ), ).rejects.toThrow("projection failed"); - const readModelAfterFailure = await runtime.runPromise(engine.getReadModel()); - const updatedThread = readModelAfterFailure.threads.find( - (thread) => thread.id === "thread-sync", - ); - expect(readModelAfterFailure.snapshotSequence).toBe(3); - expect(updatedThread?.title).toBe("sync-after-failed-projection"); + await expect( + runtime.runPromise( + engine.dispatch({ + type: "thread.archive", + commandId: CommandId.make("cmd-thread-archive-sync-retry"), + threadId: ThreadId.make("thread-sync"), + }), + ), + ).rejects.toThrow("already archived"); await runtime.dispose(); }); diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts index ddd1718faf0..6c591416486 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts @@ -34,6 +34,7 @@ import { OrchestrationCommandInvariantError, OrchestrationCommandPreviouslyRejectedError, type OrchestrationDispatchError, + type OrchestrationProjectorDecodeError, } from "../Errors.ts"; import { decideOrchestrationCommand } from "../decider.ts"; import { createEmptyReadModel, projectEvent } from "../projector.ts"; @@ -77,13 +78,25 @@ const makeOrchestrationEngine = Effect.gen(function* () { const projectionPipeline = yield* OrchestrationProjectionPipeline; const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; - let readModel = createEmptyReadModel(new Date().toISOString()); + let commandReadModel = createEmptyReadModel(new Date().toISOString()); const commandQueue = yield* Queue.unbounded(); const eventPubSub = yield* PubSub.unbounded(); + const projectEventsOntoReadModel = ( + baseReadModel: OrchestrationReadModel, + events: ReadonlyArray, + ): Effect.Effect => + Effect.gen(function* () { + let nextReadModel = baseReadModel; + for (const event of events) { + nextReadModel = yield* projectEvent(nextReadModel, event); + } + return nextReadModel; + }); + const processEnvelope = (envelope: CommandEnvelope): Effect.Effect => { - const dispatchStartSequence = readModel.snapshotSequence; + const dispatchStartSequence = commandReadModel.snapshotSequence; const processingStartedAtMs = Date.now(); const aggregateRef = commandToAggregateRef(envelope.command); const baseMetricAttributes = { @@ -98,11 +111,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { return; } - let nextReadModel = readModel; - for (const persistedEvent of persistedEvents) { - nextReadModel = yield* projectEvent(nextReadModel, persistedEvent); - } - readModel = nextReadModel; + commandReadModel = yield* projectEventsOntoReadModel(commandReadModel, persistedEvents); for (const persistedEvent of persistedEvents) { yield* PubSub.publish(eventPubSub, persistedEvent); @@ -135,18 +144,18 @@ const makeOrchestrationEngine = Effect.gen(function* () { const eventBase = yield* decideOrchestrationCommand({ command: envelope.command, - readModel, + readModel: commandReadModel, }); const eventBases = Array.isArray(eventBase) ? eventBase : [eventBase]; const committedCommand = yield* sql .withTransaction( Effect.gen(function* () { const committedEvents: OrchestrationEvent[] = []; - let nextReadModel = readModel; + let nextCommandReadModel = commandReadModel; for (const nextEvent of eventBases) { const savedEvent = yield* eventStore.append(nextEvent); - nextReadModel = yield* projectEvent(nextReadModel, savedEvent); + nextCommandReadModel = yield* projectEvent(nextCommandReadModel, savedEvent); yield* projectionPipeline.projectEvent(savedEvent); committedEvents.push(savedEvent); } @@ -172,7 +181,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { return { committedEvents, lastSequence: lastSavedEvent.sequence, - nextReadModel, + nextCommandReadModel, } as const; }), ) @@ -184,7 +193,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { ), ); - readModel = committedCommand.nextReadModel; + commandReadModel = committedCommand.nextCommandReadModel; for (const [index, event] of committedCommand.committedEvents.entries()) { yield* PubSub.publish(eventPubSub, event); if (index === 0) { @@ -242,7 +251,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { ).pipe( Effect.annotateLogs({ commandId: envelope.command.commandId, - snapshotSequence: readModel.snapshotSequence, + snapshotSequence: commandReadModel.snapshotSequence, }), ), ), @@ -255,7 +264,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { aggregateKind: aggregateRef.aggregateKind, aggregateId: aggregateRef.aggregateId, acceptedAt: new Date().toISOString(), - resultSequence: readModel.snapshotSequence, + resultSequence: commandReadModel.snapshotSequence, status: "rejected", error: error.message, }) @@ -270,17 +279,14 @@ const makeOrchestrationEngine = Effect.gen(function* () { }; yield* projectionPipeline.bootstrap; - readModel = yield* projectionSnapshotQuery.getSnapshot(); + commandReadModel = yield* projectionSnapshotQuery.getCommandReadModel(); const worker = Effect.forever(Queue.take(commandQueue).pipe(Effect.flatMap(processEnvelope))); yield* Effect.forkScoped(worker); yield* Effect.logDebug("orchestration engine started").pipe( - Effect.annotateLogs({ sequence: readModel.snapshotSequence }), + Effect.annotateLogs({ sequence: commandReadModel.snapshotSequence }), ); - const getReadModel: OrchestrationEngineShape["getReadModel"] = () => - Effect.sync((): OrchestrationReadModel => readModel); - const readEvents: OrchestrationEngineShape["readEvents"] = (fromSequenceExclusive) => eventStore.readFromSequence(fromSequenceExclusive); @@ -292,7 +298,6 @@ const makeOrchestrationEngine = Effect.gen(function* () { }); return { - getReadModel, readEvents, dispatch, // Each access creates a fresh PubSub subscription so that multiple diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 28a0208e75c..3ef8b38d642 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -739,9 +739,31 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti if (Option.isNone(existingRow)) { return; } + + const retainedTurns = yield* projectionTurnRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + let latestTurnId: ProjectionTurn["turnId"] = null; + let latestCheckpointTurnCount = -1; + for (let index = 0; index < retainedTurns.length; index += 1) { + const turn = retainedTurns[index]; + if ( + !turn || + turn.turnId === null || + turn.checkpointTurnCount === null || + turn.checkpointTurnCount > event.payload.turnCount + ) { + continue; + } + if (turn.checkpointTurnCount > latestCheckpointTurnCount) { + latestCheckpointTurnCount = turn.checkpointTurnCount; + latestTurnId = turn.turnId; + } + } + yield* projectionThreadRepository.upsert({ ...existingRow.value, - latestTurnId: null, + latestTurnId, updatedAt: event.occurredAt, }); yield* refreshThreadShellSummary(event.payload.threadId); diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts index cba5ce7e830..4538ab4b6b5 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts @@ -12,6 +12,7 @@ import { Effect, Layer } from "effect"; import * as SqlClient from "effect/unstable/sql/SqlClient"; import { SqlitePersistenceMemory } from "../../persistence/Layers/Sqlite.ts"; +import { RepositoryIdentityResolver } from "../../project/Services/RepositoryIdentityResolver.ts"; import { RepositoryIdentityResolverLive } from "../../project/Layers/RepositoryIdentityResolver.ts"; import { ORCHESTRATION_PROJECTOR_NAMES } from "./ProjectionPipeline.ts"; import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts"; @@ -1029,4 +1030,378 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { } }), ); + + it.effect("uses projection_threads.latest_turn_id for bulk command and shell snapshots", () => + Effect.gen(function* () { + const snapshotQuery = yield* ProjectionSnapshotQuery; + const sql = yield* SqlClient.SqlClient; + + yield* sql`DELETE FROM projection_projects`; + yield* sql`DELETE FROM projection_threads`; + yield* sql`DELETE FROM projection_turns`; + yield* sql`DELETE FROM projection_state`; + + yield* sql` + INSERT INTO projection_projects ( + project_id, + title, + workspace_root, + default_model_selection_json, + scripts_json, + created_at, + updated_at, + deleted_at + ) + VALUES ( + 'project-1', + 'Project 1', + '/tmp/project-1', + '{"provider":"codex","model":"gpt-5-codex"}', + '[]', + '2026-04-03T00:00:00.000Z', + '2026-04-03T00:00:01.000Z', + NULL + ) + `; + + yield* sql` + INSERT INTO projection_threads ( + thread_id, + project_id, + title, + model_selection_json, + runtime_mode, + interaction_mode, + branch, + worktree_path, + latest_turn_id, + latest_user_message_at, + pending_approval_count, + pending_user_input_count, + has_actionable_proposed_plan, + created_at, + updated_at, + archived_at, + deleted_at + ) + VALUES ( + 'thread-1', + 'project-1', + 'Thread 1', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + NULL, + NULL, + 'turn-running', + '2026-04-03T00:00:04.000Z', + 0, + 0, + 0, + '2026-04-03T00:00:02.000Z', + '2026-04-03T00:00:03.000Z', + NULL, + NULL + ) + `; + + yield* sql` + INSERT INTO projection_turns ( + thread_id, + turn_id, + pending_message_id, + source_proposed_plan_thread_id, + source_proposed_plan_id, + assistant_message_id, + state, + requested_at, + started_at, + completed_at, + checkpoint_turn_count, + checkpoint_ref, + checkpoint_status, + checkpoint_files_json + ) + VALUES + ( + 'thread-1', + 'turn-running', + 'message-user-2', + NULL, + NULL, + NULL, + 'running', + '2026-04-03T00:00:30.000Z', + '2026-04-03T00:00:30.000Z', + NULL, + NULL, + NULL, + NULL, + '[]' + ), + ( + 'thread-1', + 'turn-completed', + 'message-user-1', + NULL, + NULL, + 'message-assistant-1', + 'completed', + '2026-04-03T00:00:05.000Z', + '2026-04-03T00:00:06.000Z', + '2026-04-03T00:00:20.000Z', + NULL, + NULL, + NULL, + '[]' + ) + `; + + yield* sql` + INSERT INTO projection_state (projector, last_applied_sequence, updated_at) + VALUES + (${ORCHESTRATION_PROJECTOR_NAMES.projects}, 3, '2026-04-03T00:00:40.000Z'), + (${ORCHESTRATION_PROJECTOR_NAMES.threads}, 3, '2026-04-03T00:00:40.000Z'), + (${ORCHESTRATION_PROJECTOR_NAMES.threadMessages}, 3, '2026-04-03T00:00:40.000Z'), + (${ORCHESTRATION_PROJECTOR_NAMES.threadProposedPlans}, 3, '2026-04-03T00:00:40.000Z'), + (${ORCHESTRATION_PROJECTOR_NAMES.threadActivities}, 3, '2026-04-03T00:00:40.000Z'), + (${ORCHESTRATION_PROJECTOR_NAMES.threadSessions}, 3, '2026-04-03T00:00:40.000Z'), + (${ORCHESTRATION_PROJECTOR_NAMES.checkpoints}, 3, '2026-04-03T00:00:40.000Z') + `; + + const commandReadModel = yield* snapshotQuery.getCommandReadModel(); + assert.equal(commandReadModel.threads[0]?.latestTurn?.turnId, asTurnId("turn-running")); + assert.equal(commandReadModel.threads[0]?.latestTurn?.state, "running"); + + const shellSnapshot = yield* snapshotQuery.getShellSnapshot(); + assert.equal(shellSnapshot.threads[0]?.latestTurn?.turnId, asTurnId("turn-running")); + assert.equal(shellSnapshot.threads[0]?.latestTurn?.state, "running"); + + const fullSnapshot = yield* snapshotQuery.getSnapshot(); + assert.equal(fullSnapshot.threads[0]?.latestTurn?.turnId, asTurnId("turn-running")); + assert.equal(fullSnapshot.threads[0]?.latestTurn?.state, "running"); + }), + ); + + it.effect("keeps deleted project and thread tombstones in the command read model", () => + Effect.gen(function* () { + const snapshotQuery = yield* ProjectionSnapshotQuery; + const sql = yield* SqlClient.SqlClient; + + yield* sql`DELETE FROM projection_projects`; + yield* sql`DELETE FROM projection_threads`; + yield* sql`DELETE FROM projection_turns`; + yield* sql`DELETE FROM projection_state`; + + yield* sql` + INSERT INTO projection_projects ( + project_id, + title, + workspace_root, + default_model_selection_json, + scripts_json, + created_at, + updated_at, + deleted_at + ) + VALUES ( + 'project-deleted', + 'Deleted Project', + '/tmp/deleted-project', + '{"provider":"codex","model":"gpt-5-codex"}', + '[]', + '2026-04-05T00:00:00.000Z', + '2026-04-05T00:00:01.000Z', + '2026-04-05T00:00:02.000Z' + ) + `; + + yield* sql` + INSERT INTO projection_threads ( + thread_id, + project_id, + title, + model_selection_json, + runtime_mode, + interaction_mode, + branch, + worktree_path, + latest_turn_id, + latest_user_message_at, + pending_approval_count, + pending_user_input_count, + has_actionable_proposed_plan, + created_at, + updated_at, + archived_at, + deleted_at + ) + VALUES ( + 'thread-deleted', + 'project-deleted', + 'Deleted Thread', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + NULL, + NULL, + 'turn-deleted', + NULL, + 0, + 0, + 0, + '2026-04-05T00:00:03.000Z', + '2026-04-05T00:00:04.000Z', + NULL, + '2026-04-05T00:00:05.000Z' + ) + `; + + yield* sql` + INSERT INTO projection_turns ( + thread_id, + turn_id, + pending_message_id, + source_proposed_plan_thread_id, + source_proposed_plan_id, + assistant_message_id, + state, + requested_at, + started_at, + completed_at, + checkpoint_turn_count, + checkpoint_ref, + checkpoint_status, + checkpoint_files_json + ) + VALUES ( + 'thread-deleted', + 'turn-deleted', + 'message-deleted-user', + NULL, + NULL, + 'message-deleted-assistant', + 'completed', + '2026-04-05T00:00:04.100Z', + '2026-04-05T00:00:04.200Z', + '2026-04-05T00:00:04.300Z', + NULL, + NULL, + NULL, + '[]' + ) + `; + + const commandReadModel = yield* snapshotQuery.getCommandReadModel(); + assert.equal(commandReadModel.projects[0]?.id, asProjectId("project-deleted")); + assert.equal(commandReadModel.projects[0]?.deletedAt, "2026-04-05T00:00:02.000Z"); + assert.equal(commandReadModel.threads[0]?.id, ThreadId.make("thread-deleted")); + assert.equal(commandReadModel.threads[0]?.deletedAt, "2026-04-05T00:00:05.000Z"); + assert.equal(commandReadModel.threads[0]?.latestTurn?.turnId, asTurnId("turn-deleted")); + assert.equal(commandReadModel.threads[0]?.latestTurn?.state, "completed"); + + const fullSnapshot = yield* snapshotQuery.getSnapshot(); + assert.equal(fullSnapshot.threads[0]?.id, ThreadId.make("thread-deleted")); + assert.equal(fullSnapshot.threads[0]?.latestTurn?.turnId, asTurnId("turn-deleted")); + assert.equal(fullSnapshot.threads[0]?.latestTurn?.state, "completed"); + + const shellSnapshot = yield* snapshotQuery.getShellSnapshot(); + assert.equal(shellSnapshot.projects.length, 0); + assert.equal(shellSnapshot.threads.length, 0); + }), + ); }); + +it.effect( + "ProjectionSnapshotQuery dedupes repository identity resolution by workspace root and skips deleted projects for shell snapshots", + () => { + const resolveCalls: string[] = []; + const layer = OrchestrationProjectionSnapshotQueryLive.pipe( + Layer.provideMerge( + Layer.succeed(RepositoryIdentityResolver, { + resolve: (cwd: string) => + Effect.sync(() => { + resolveCalls.push(cwd); + return { + canonicalKey: `github.com/acme${cwd}`, + locator: { + source: "git-remote" as const, + remoteName: "origin", + remoteUrl: `https://github.com/acme${cwd}.git`, + }, + rootPath: cwd, + }; + }), + }), + ), + Layer.provideMerge(SqlitePersistenceMemory), + ); + + return Effect.gen(function* () { + const snapshotQuery = yield* ProjectionSnapshotQuery; + const sql = yield* SqlClient.SqlClient; + + yield* sql`DELETE FROM projection_projects`; + yield* sql`DELETE FROM projection_threads`; + yield* sql`DELETE FROM projection_turns`; + yield* sql`DELETE FROM projection_state`; + + yield* sql` + INSERT INTO projection_projects ( + project_id, + title, + workspace_root, + default_model_selection_json, + scripts_json, + created_at, + updated_at, + deleted_at + ) + VALUES + ( + 'project-1', + 'Shared Project 1', + '/tmp/shared-root', + '{"provider":"codex","model":"gpt-5-codex"}', + '[]', + '2026-04-04T00:00:00.000Z', + '2026-04-04T00:00:01.000Z', + NULL + ), + ( + 'project-2', + 'Shared Project 2', + '/tmp/shared-root', + '{"provider":"codex","model":"gpt-5-codex"}', + '[]', + '2026-04-04T00:00:02.000Z', + '2026-04-04T00:00:03.000Z', + NULL + ), + ( + 'project-3', + 'Deleted Project', + '/tmp/deleted-root', + '{"provider":"codex","model":"gpt-5-codex"}', + '[]', + '2026-04-04T00:00:04.000Z', + '2026-04-04T00:00:05.000Z', + '2026-04-04T00:00:06.000Z' + ) + `; + + const shellSnapshot = yield* snapshotQuery.getShellSnapshot(); + assert.deepStrictEqual(resolveCalls.toSorted(), ["/tmp/shared-root"]); + assert.equal(shellSnapshot.projects.length, 2); + assert.equal(shellSnapshot.projects[0]?.repositoryIdentity?.rootPath, "/tmp/shared-root"); + assert.equal(shellSnapshot.projects[1]?.repositoryIdentity?.rootPath, "/tmp/shared-root"); + + resolveCalls.length = 0; + + const fullSnapshot = yield* snapshotQuery.getSnapshot(); + assert.deepStrictEqual(resolveCalls.toSorted(), ["/tmp/deleted-root", "/tmp/shared-root"]); + assert.equal(fullSnapshot.projects.length, 3); + assert.equal(fullSnapshot.projects[2]?.repositoryIdentity?.rootPath, "/tmp/deleted-root"); + }).pipe(Effect.provide(layer)); + }, +); diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index 59a396c926d..27b067e8f57 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -219,6 +219,20 @@ function mapProjectShellRow( }; } +function mapProposedPlanRow( + row: Schema.Schema.Type, +): OrchestrationProposedPlan { + return { + id: row.planId, + turnId: row.turnId, + planMarkdown: row.planMarkdown, + implementedAt: row.implementedAt, + implementationThreadId: row.implementationThreadId, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + }; +} + function toPersistenceSqlOrDecodeError(sqlOperation: string, decodeOperation: string) { return (cause: unknown): ProjectionRepositoryError => Schema.isSchemaError(cause) @@ -230,6 +244,37 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { const sql = yield* SqlClient.SqlClient; const repositoryIdentityResolver = yield* RepositoryIdentityResolver; const repositoryIdentityResolutionConcurrency = 4; + const resolveRepositoryIdentitiesForProjects = Effect.fn( + "ProjectionSnapshotQuery.resolveRepositoryIdentitiesForProjects", + )(function* ( + projectRows: ReadonlyArray>, + options?: { + readonly includeDeleted?: boolean; + }, + ) { + const filteredProjectRows = + options?.includeDeleted === true + ? projectRows + : projectRows.filter((row) => row.deletedAt === null); + const uniqueWorkspaceRoots = [...new Set(filteredProjectRows.map((row) => row.workspaceRoot))]; + const repositoryIdentityByWorkspaceRoot = new Map( + yield* Effect.forEach( + uniqueWorkspaceRoots, + (workspaceRoot) => + repositoryIdentityResolver + .resolve(workspaceRoot) + .pipe(Effect.map((identity) => [workspaceRoot, identity] as const)), + { concurrency: repositoryIdentityResolutionConcurrency }, + ), + ); + + return new Map( + filteredProjectRows.map((row) => [ + row.projectId, + repositoryIdentityByWorkspaceRoot.get(row.workspaceRoot) ?? null, + ]), + ); + }); const listProjectRows = SqlSchema.findAll({ Request: Schema.Void, @@ -335,7 +380,6 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { FROM projection_thread_activities ORDER BY thread_id ASC, - CASE WHEN sequence IS NULL THEN 0 ELSE 1 END ASC, sequence ASC, created_at ASC, activity_id ASC @@ -389,18 +433,46 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { execute: () => sql` SELECT - thread_id AS "threadId", - turn_id AS "turnId", - state, - requested_at AS "requestedAt", - started_at AS "startedAt", - completed_at AS "completedAt", - assistant_message_id AS "assistantMessageId", - source_proposed_plan_thread_id AS "sourceProposedPlanThreadId", - source_proposed_plan_id AS "sourceProposedPlanId" - FROM projection_turns - WHERE turn_id IS NOT NULL - ORDER BY thread_id ASC, requested_at DESC, turn_id DESC + turns.thread_id AS "threadId", + turns.turn_id AS "turnId", + turns.state, + turns.requested_at AS "requestedAt", + turns.started_at AS "startedAt", + turns.completed_at AS "completedAt", + turns.assistant_message_id AS "assistantMessageId", + turns.source_proposed_plan_thread_id AS "sourceProposedPlanThreadId", + turns.source_proposed_plan_id AS "sourceProposedPlanId" + FROM projection_threads threads + JOIN projection_turns turns + ON turns.thread_id = threads.thread_id + AND turns.turn_id = threads.latest_turn_id + WHERE threads.latest_turn_id IS NOT NULL + ORDER BY turns.thread_id ASC + `, + }); + + const listActiveLatestTurnRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionLatestTurnDbRowSchema, + execute: () => + sql` + SELECT + turns.thread_id AS "threadId", + turns.turn_id AS "turnId", + turns.state, + turns.requested_at AS "requestedAt", + turns.started_at AS "startedAt", + turns.completed_at AS "completedAt", + turns.assistant_message_id AS "assistantMessageId", + turns.source_proposed_plan_thread_id AS "sourceProposedPlanThreadId", + turns.source_proposed_plan_id AS "sourceProposedPlanId" + FROM projection_threads threads + JOIN projection_turns turns + ON turns.thread_id = threads.thread_id + AND turns.turn_id = threads.latest_turn_id + WHERE threads.deleted_at IS NULL + AND threads.latest_turn_id IS NOT NULL + ORDER BY turns.thread_id ASC `, }); @@ -594,7 +666,6 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { FROM projection_thread_activities WHERE thread_id = ${threadId} ORDER BY - CASE WHEN sequence IS NULL THEN 0 ELSE 1 END ASC, sequence ASC, created_at ASC, activity_id ASC @@ -892,15 +963,9 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { }); } - const repositoryIdentities = new Map( - yield* Effect.forEach( - projectRows, - (row) => - repositoryIdentityResolver - .resolve(row.workspaceRoot) - .pipe(Effect.map((identity) => [row.projectId, identity] as const)), - { concurrency: repositoryIdentityResolutionConcurrency }, - ), + const repositoryIdentities = yield* resolveRepositoryIdentitiesForProjects( + projectRows, + { includeDeleted: true }, ); const projects: ReadonlyArray = projectRows.map((row) => ({ @@ -958,6 +1023,199 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { }), ); + const getCommandReadModel: ProjectionSnapshotQueryShape["getCommandReadModel"] = () => + sql + .withTransaction( + Effect.all([ + listProjectRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getCommandReadModel:listProjects:query", + "ProjectionSnapshotQuery.getCommandReadModel:listProjects:decodeRows", + ), + ), + ), + listThreadRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getCommandReadModel:listThreads:query", + "ProjectionSnapshotQuery.getCommandReadModel:listThreads:decodeRows", + ), + ), + ), + listThreadProposedPlanRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getCommandReadModel:listThreadProposedPlans:query", + "ProjectionSnapshotQuery.getCommandReadModel:listThreadProposedPlans:decodeRows", + ), + ), + ), + listThreadSessionRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getCommandReadModel:listThreadSessions:query", + "ProjectionSnapshotQuery.getCommandReadModel:listThreadSessions:decodeRows", + ), + ), + ), + listLatestTurnRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getCommandReadModel:listLatestTurns:query", + "ProjectionSnapshotQuery.getCommandReadModel:listLatestTurns:decodeRows", + ), + ), + ), + listProjectionStateRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getCommandReadModel:listProjectionState:query", + "ProjectionSnapshotQuery.getCommandReadModel:listProjectionState:decodeRows", + ), + ), + ), + ]), + ) + .pipe( + Effect.flatMap( + ([projectRows, threadRows, proposedPlanRows, sessionRows, latestTurnRows, stateRows]) => + Effect.sync(() => { + let updatedAt: string | null = null; + const projects: OrchestrationProject[] = []; + const threads: OrchestrationThread[] = []; + + for (let index = 0; index < projectRows.length; index += 1) { + const row = projectRows[index]; + if (!row) { + continue; + } + updatedAt = maxIso(updatedAt, row.updatedAt); + projects.push({ + id: row.projectId, + title: row.title, + workspaceRoot: row.workspaceRoot, + defaultModelSelection: row.defaultModelSelection, + scripts: row.scripts, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + deletedAt: row.deletedAt, + }); + } + for (let index = 0; index < threadRows.length; index += 1) { + const row = threadRows[index]; + if (!row) { + continue; + } + updatedAt = maxIso(updatedAt, row.updatedAt); + } + for (let index = 0; index < proposedPlanRows.length; index += 1) { + const row = proposedPlanRows[index]; + if (!row) { + continue; + } + updatedAt = maxIso(updatedAt, row.updatedAt); + } + for (let index = 0; index < sessionRows.length; index += 1) { + const row = sessionRows[index]; + if (!row) { + continue; + } + updatedAt = maxIso(updatedAt, row.updatedAt); + } + for (let index = 0; index < latestTurnRows.length; index += 1) { + const row = latestTurnRows[index]; + if (!row) { + continue; + } + updatedAt = maxIso(updatedAt, row.requestedAt); + if (row.startedAt !== null) { + updatedAt = maxIso(updatedAt, row.startedAt); + } + if (row.completedAt !== null) { + updatedAt = maxIso(updatedAt, row.completedAt); + } + } + for (let index = 0; index < stateRows.length; index += 1) { + const row = stateRows[index]; + if (!row) { + continue; + } + updatedAt = maxIso(updatedAt, row.updatedAt); + } + + const latestTurnByThread = new Map(); + for (let index = 0; index < latestTurnRows.length; index += 1) { + const row = latestTurnRows[index]; + if (!row) { + continue; + } + latestTurnByThread.set(row.threadId, mapLatestTurn(row)); + } + const proposedPlansByThread = new Map>(); + const sessionByThread = new Map(); + + for (let index = 0; index < sessionRows.length; index += 1) { + const row = sessionRows[index]; + if (!row) { + continue; + } + sessionByThread.set(row.threadId, mapSessionRow(row)); + } + + for (let index = 0; index < proposedPlanRows.length; index += 1) { + const row = proposedPlanRows[index]; + if (!row) { + continue; + } + const threadProposedPlans = proposedPlansByThread.get(row.threadId) ?? []; + threadProposedPlans.push(mapProposedPlanRow(row)); + proposedPlansByThread.set(row.threadId, threadProposedPlans); + } + + for (let index = 0; index < threadRows.length; index += 1) { + const row = threadRows[index]; + if (!row) { + continue; + } + threads.push({ + id: row.threadId, + projectId: row.projectId, + title: row.title, + modelSelection: row.modelSelection, + runtimeMode: row.runtimeMode, + interactionMode: row.interactionMode, + branch: row.branch, + worktreePath: row.worktreePath, + latestTurn: latestTurnByThread.get(row.threadId) ?? null, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + archivedAt: row.archivedAt, + deletedAt: row.deletedAt, + messages: [], + proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], + activities: [], + checkpoints: [], + session: sessionByThread.get(row.threadId) ?? null, + }); + } + + return { + snapshotSequence: computeSnapshotSequence(stateRows), + projects, + threads, + updatedAt: updatedAt ?? new Date(0).toISOString(), + } satisfies OrchestrationReadModel; + }), + ), + Effect.mapError((error) => { + if (isPersistenceError(error)) { + return error; + } + return toPersistenceSqlError("ProjectionSnapshotQuery.getCommandReadModel:query")(error); + }), + ); + const getShellSnapshot: ProjectionSnapshotQueryShape["getShellSnapshot"] = () => sql .withTransaction( @@ -986,7 +1244,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { ), ), ), - listLatestTurnRows(undefined).pipe( + listActiveLatestTurnRows(undefined).pipe( Effect.mapError( toPersistenceSqlOrDecodeError( "ProjectionSnapshotQuery.getShellSnapshot:listLatestTurns:query", @@ -1030,16 +1288,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { updatedAt = maxIso(updatedAt, row.updatedAt); } - const repositoryIdentities = new Map( - yield* Effect.forEach( - projectRows, - (row) => - repositoryIdentityResolver - .resolve(row.workspaceRoot) - .pipe(Effect.map((identity) => [row.projectId, identity] as const)), - { concurrency: repositoryIdentityResolutionConcurrency }, - ), - ); + const repositoryIdentities = yield* resolveRepositoryIdentitiesForProjects(projectRows); const latestTurnByThread = new Map( latestTurnRows.map((row) => [row.threadId, mapLatestTurn(row)] as const), ); @@ -1097,6 +1346,19 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { }), ); + const getSnapshotSequence: ProjectionSnapshotQueryShape["getSnapshotSequence"] = () => + listProjectionStateRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshotSequence:query", + "ProjectionSnapshotQuery.getSnapshotSequence:decodeRows", + ), + ), + Effect.map((stateRows) => ({ + snapshotSequence: computeSnapshotSequence(stateRows), + })), + ); + const getCounts: ProjectionSnapshotQueryShape["getCounts"] = () => readProjectionCounts(undefined).pipe( Effect.mapError( @@ -1376,15 +1638,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { } return message; }), - proposedPlans: proposedPlanRows.map((row) => ({ - id: row.planId, - turnId: row.turnId, - planMarkdown: row.planMarkdown, - implementedAt: row.implementedAt, - implementationThreadId: row.implementationThreadId, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - })), + proposedPlans: proposedPlanRows.map(mapProposedPlanRow), activities: activityRows.map((row) => { const activity = { id: row.activityId, @@ -1422,8 +1676,10 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { }); return { + getCommandReadModel, getSnapshot, getShellSnapshot, + getSnapshotSequence, getCounts, getActiveProjectByWorkspaceRoot, getProjectShellById, diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 09252571c37..f641eef0370 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -45,6 +45,7 @@ import { } from "./ProviderCommandReactor.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; import * as NodeServices from "@effect/platform-node/NodeServices"; import { ServerSettingsService } from "../../serverSettings.ts"; import { VcsStatusBroadcaster } from "../../vcs/VcsStatusBroadcaster.ts"; @@ -79,7 +80,7 @@ async function waitFor( describe("ProviderCommandReactor", () => { let runtime: ManagedRuntime.ManagedRuntime< - OrchestrationEngineService | ProviderCommandReactor, + OrchestrationEngineService | ProviderCommandReactor | ProjectionSnapshotQuery, unknown > | null = null; let scope: Scope.Closeable | null = null; @@ -318,8 +319,13 @@ describe("ProviderCommandReactor", () => { Layer.provide(RepositoryIdentityResolverLive), Layer.provide(SqlitePersistenceMemory), ); + const projectionSnapshotLayer = OrchestrationProjectionSnapshotQueryLive.pipe( + Layer.provide(RepositoryIdentityResolverLive), + Layer.provide(SqlitePersistenceMemory), + ); const layer = ProviderCommandReactorLive.pipe( Layer.provideMerge(orchestrationLayer), + Layer.provideMerge(projectionSnapshotLayer), Layer.provideMerge(Layer.succeed(ProviderService, service)), Layer.provideMerge( Layer.mock(GitWorkflowService)({ @@ -348,6 +354,7 @@ describe("ProviderCommandReactor", () => { runtime = ManagedRuntime.make(layer); const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); + const snapshotQuery = await runtime.runPromise(Effect.service(ProjectionSnapshotQuery)); const reactor = await runtime.runPromise(Effect.service(ProviderCommandReactor)); scope = await Effect.runPromise(Scope.make("sequential")); await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); @@ -382,6 +389,7 @@ describe("ProviderCommandReactor", () => { return { engine, + readModel: () => Effect.runPromise(snapshotQuery.getSnapshot()), startSession, sendTurn, interruptTurn, @@ -431,7 +439,7 @@ describe("ProviderCommandReactor", () => { runtimeMode: "approval-required", }); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.session?.threadId).toBe("thread-1"); expect(thread?.session?.runtimeMode).toBe("approval-required"); @@ -476,13 +484,13 @@ describe("ProviderCommandReactor", () => { }); await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); return ( readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"))?.title === "Generated title" ); }); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.title).toBe("Generated title"); }); @@ -522,7 +530,7 @@ describe("ProviderCommandReactor", () => { await waitFor(() => harness.sendTurn.mock.calls.length === 1); expect(harness.generateThreadTitle).not.toHaveBeenCalled(); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.title).toBe("Keep this custom title"); }); @@ -566,14 +574,14 @@ describe("ProviderCommandReactor", () => { await waitFor(() => harness.generateThreadTitle.mock.calls.length === 1); await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); return ( readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"))?.title === "Reconnect spinner resume bug" ); }); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.title).toBe("Reconnect spinner resume bug"); }); @@ -902,7 +910,7 @@ describe("ProviderCommandReactor", () => { }, }); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.session?.providerName).toBe("claudeAgent"); expect(thread?.session?.providerInstanceId).toBe(ProviderInstanceId.make("claudeAgent")); @@ -1014,7 +1022,7 @@ describe("ProviderCommandReactor", () => { resumeCursor: { opaque: "resume-1" }, }); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.session?.providerInstanceId).toBe(ProviderInstanceId.make("codex_work")); }); @@ -1205,7 +1213,7 @@ describe("ProviderCommandReactor", () => { ); await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); return thread?.runtimeMode === "approval-required"; }); @@ -1239,7 +1247,7 @@ describe("ProviderCommandReactor", () => { threadId: ThreadId.make("thread-1"), }); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.session?.threadId).toBe("thread-1"); expect(thread?.session?.runtimeMode).toBe("approval-required"); @@ -1342,7 +1350,7 @@ describe("ProviderCommandReactor", () => { ); await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); return thread?.runtimeMode === "approval-required"; }); @@ -1352,7 +1360,7 @@ describe("ProviderCommandReactor", () => { expect(harness.stopSession.mock.calls.length).toBe(0); expect(harness.sendTurn.mock.calls.length).toBe(1); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.session?.threadId).toBe("thread-1"); expect(thread?.session?.runtimeMode).toBe("full-access"); @@ -1404,7 +1412,7 @@ describe("ProviderCommandReactor", () => { ); await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); return ( thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ?? @@ -1416,7 +1424,7 @@ describe("ProviderCommandReactor", () => { expect(harness.sendTurn.mock.calls.length).toBe(1); expect(harness.stopSession.mock.calls.length).toBe(0); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.session?.threadId).toBe("thread-1"); expect(thread?.session?.providerName).toBe("codex"); @@ -1475,7 +1483,7 @@ describe("ProviderCommandReactor", () => { ); await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); return ( thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ?? @@ -1485,7 +1493,7 @@ describe("ProviderCommandReactor", () => { expect(harness.startSession.mock.calls.length).toBe(0); expect(harness.sendTurn.mock.calls.length).toBe(0); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect( thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"), @@ -1639,7 +1647,7 @@ describe("ProviderCommandReactor", () => { ); await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); return ( thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ?? @@ -1649,7 +1657,7 @@ describe("ProviderCommandReactor", () => { expect(harness.startSession.mock.calls.length).toBe(0); expect(harness.sendTurn.mock.calls.length).toBe(0); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect( thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"), @@ -1810,7 +1818,7 @@ describe("ProviderCommandReactor", () => { ); await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); if (!thread) return false; return thread.activities.some( @@ -1818,7 +1826,7 @@ describe("ProviderCommandReactor", () => { ); }); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread).toBeDefined(); @@ -1919,7 +1927,7 @@ describe("ProviderCommandReactor", () => { ); await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); if (!thread) return false; return thread.activities.some( @@ -1927,7 +1935,7 @@ describe("ProviderCommandReactor", () => { ); }); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread).toBeDefined(); @@ -1983,7 +1991,7 @@ describe("ProviderCommandReactor", () => { ); await waitFor(() => harness.stopSession.mock.calls.length === 1); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.session).not.toBeNull(); expect(thread?.session?.status).toBe("stopped"); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 998475f6118..ff583c0f19d 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -5,6 +5,7 @@ import { type ModelSelection, type OrchestrationEvent, ProviderDriverKind, + type ProjectId, type OrchestrationSession, ThreadId, type ProviderSession, @@ -22,6 +23,7 @@ import type { ProviderServiceError } from "../../provider/Errors.ts"; import { TextGeneration } from "../../textGeneration/TextGeneration.ts"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; import { ProviderCommandReactor, type ProviderCommandReactorShape, @@ -167,6 +169,7 @@ function buildGeneratedWorktreeBranchName(raw: string): string { const make = Effect.gen(function* () { const orchestrationEngine = yield* OrchestrationEngineService; + const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; const providerService = yield* ProviderService; const gitWorkflow = yield* GitWorkflowService; const vcsStatusBroadcaster = yield* VcsStatusBroadcaster; @@ -267,9 +270,16 @@ const make = Effect.gen(function* () { }); }); + const resolveProject = Effect.fnUntraced(function* (projectId: ProjectId) { + return yield* projectionSnapshotQuery + .getProjectShellById(projectId) + .pipe(Effect.map(Option.getOrUndefined)); + }); + const resolveThread = Effect.fnUntraced(function* (threadId: ThreadId) { - const readModel = yield* orchestrationEngine.getReadModel(); - return readModel.threads.find((entry) => entry.id === threadId); + return yield* projectionSnapshotQuery + .getThreadDetailById(threadId) + .pipe(Effect.map(Option.getOrUndefined)); }); const ensureSessionForThread = Effect.fn("ensureSessionForThread")(function* ( @@ -279,8 +289,7 @@ const make = Effect.gen(function* () { readonly modelSelection?: ModelSelection; }, ) { - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === threadId); + const thread = yield* resolveThread(threadId); if (!thread) { return yield* Effect.die(new Error(`Thread '${threadId}' was not found in read model.`)); } @@ -375,9 +384,10 @@ const make = Effect.gen(function* () { }); } } + const project = yield* resolveProject(thread.projectId); const effectiveCwd = resolveThreadWorkspaceCwd({ thread, - projects: readModel.projects, + projects: project ? [project] : [], }); const startProviderSession = (input?: { @@ -682,10 +692,11 @@ const make = Effect.gen(function* () { const isFirstUserMessageTurn = thread.messages.filter((entry) => entry.role === "user").length === 1; if (isFirstUserMessageTurn) { + const project = yield* resolveProject(thread.projectId); const generationCwd = resolveThreadWorkspaceCwd({ thread, - projects: (yield* orchestrationEngine.getReadModel()).projects, + projects: project ? [project] : [], }) ?? process.cwd(); const generationInput = { messageText: message.text, @@ -819,20 +830,16 @@ const make = Effect.gen(function* () { }) .pipe( Effect.catchCause((cause) => - Effect.gen(function* () { - yield* appendProviderFailureActivity({ - threadId: event.payload.threadId, - kind: "provider.approval.respond.failed", - summary: "Provider approval response failed", - detail: isUnknownPendingApprovalRequestError(cause) - ? stalePendingRequestDetail("approval", event.payload.requestId) - : Cause.pretty(cause), - turnId: null, - createdAt: event.payload.createdAt, - requestId: event.payload.requestId, - }); - - if (!isUnknownPendingApprovalRequestError(cause)) return; + appendProviderFailureActivity({ + threadId: event.payload.threadId, + kind: "provider.approval.respond.failed", + summary: "Provider approval response failed", + detail: isUnknownPendingApprovalRequestError(cause) + ? stalePendingRequestDetail("approval", event.payload.requestId) + : Cause.pretty(cause), + turnId: null, + createdAt: event.payload.createdAt, + requestId: event.payload.requestId, }), ), ); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 487d1a3aac7..2fe0e406d66 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -36,11 +36,9 @@ import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts"; import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts"; import { ProviderRuntimeIngestionLive } from "./ProviderRuntimeIngestion.ts"; -import { - OrchestrationEngineService, - type OrchestrationEngineShape, -} from "../Services/OrchestrationEngine.ts"; +import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; import { ServerConfig } from "../../config.ts"; import { ServerSettingsService } from "../../serverSettings.ts"; import * as NodeServices from "@effect/platform-node/NodeServices"; @@ -154,16 +152,23 @@ function createProviderServiceHarness() { }; } +type ProviderRuntimeTestReadModel = OrchestrationReadModel; +type ProviderRuntimeTestThread = ProviderRuntimeTestReadModel["threads"][number]; +type ProviderRuntimeTestMessage = ProviderRuntimeTestThread["messages"][number]; +type ProviderRuntimeTestProposedPlan = ProviderRuntimeTestThread["proposedPlans"][number]; +type ProviderRuntimeTestActivity = ProviderRuntimeTestThread["activities"][number]; +type ProviderRuntimeTestCheckpoint = ProviderRuntimeTestThread["checkpoints"][number]; + async function waitForThread( - engine: OrchestrationEngineShape, + readModel: () => Promise, predicate: (thread: ProviderRuntimeTestThread) => boolean, timeoutMs = 2000, threadId: ThreadId = asThreadId("thread-1"), ) { const deadline = Date.now() + timeoutMs; const poll = async (): Promise => { - const readModel = await Effect.runPromise(engine.getReadModel()); - const thread = readModel.threads.find((entry) => entry.id === threadId); + const snapshot = await readModel(); + const thread = snapshot.threads.find((entry) => entry.id === threadId); if (thread && predicate(thread)) { return thread; } @@ -176,16 +181,9 @@ async function waitForThread( return poll(); } -type ProviderRuntimeTestReadModel = OrchestrationReadModel; -type ProviderRuntimeTestThread = ProviderRuntimeTestReadModel["threads"][number]; -type ProviderRuntimeTestMessage = ProviderRuntimeTestThread["messages"][number]; -type ProviderRuntimeTestProposedPlan = ProviderRuntimeTestThread["proposedPlans"][number]; -type ProviderRuntimeTestActivity = ProviderRuntimeTestThread["activities"][number]; -type ProviderRuntimeTestCheckpoint = ProviderRuntimeTestThread["checkpoints"][number]; - describe("ProviderRuntimeIngestion", () => { let runtime: ManagedRuntime.ManagedRuntime< - OrchestrationEngineService | ProviderRuntimeIngestionService, + OrchestrationEngineService | ProviderRuntimeIngestionService | ProjectionSnapshotQuery, unknown > | null = null; let scope: Scope.Closeable | null = null; @@ -223,8 +221,13 @@ describe("ProviderRuntimeIngestion", () => { Layer.provide(RepositoryIdentityResolverLive), Layer.provide(SqlitePersistenceMemory), ); + const projectionSnapshotLayer = OrchestrationProjectionSnapshotQueryLive.pipe( + Layer.provide(RepositoryIdentityResolverLive), + Layer.provide(SqlitePersistenceMemory), + ); const layer = ProviderRuntimeIngestionLive.pipe( Layer.provideMerge(orchestrationLayer), + Layer.provideMerge(projectionSnapshotLayer), Layer.provideMerge(SqlitePersistenceMemory), Layer.provideMerge(Layer.succeed(ProviderService, provider.service)), Layer.provideMerge(makeTestServerSettingsLayer(options?.serverSettings)), @@ -233,6 +236,7 @@ describe("ProviderRuntimeIngestion", () => { ); runtime = ManagedRuntime.make(layer); const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); + const snapshotQuery = await runtime.runPromise(Effect.service(ProjectionSnapshotQuery)); const ingestion = await runtime.runPromise(Effect.service(ProviderRuntimeIngestionService)); scope = await Effect.runPromise(Scope.make("sequential")); await Effect.runPromise(ingestion.start().pipe(Scope.provide(scope))); @@ -299,6 +303,7 @@ describe("ProviderRuntimeIngestion", () => { return { engine, + readModel: () => Effect.runPromise(snapshotQuery.getSnapshot()), emit: provider.emit, setProviderSession: provider.setSession, drain, @@ -319,7 +324,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-1", ); @@ -337,7 +342,7 @@ describe("ProviderRuntimeIngestion", () => { }); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.session?.status === "error" && entry.session?.activeTurnId === null && @@ -364,7 +369,7 @@ describe("ProviderRuntimeIngestion", () => { }); let thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.session?.status === "running" && entry.session?.activeTurnId === null, ); expect(thread.session?.status).toBe("running"); @@ -383,7 +388,7 @@ describe("ProviderRuntimeIngestion", () => { }); thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.session?.status === "error" && entry.session?.activeTurnId === null && @@ -404,7 +409,7 @@ describe("ProviderRuntimeIngestion", () => { }); thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.session?.status === "stopped" && entry.session?.activeTurnId === null && @@ -425,7 +430,7 @@ describe("ProviderRuntimeIngestion", () => { }); thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.session?.status === "ready" && entry.session?.activeTurnId === null && @@ -449,7 +454,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-midturn-lifecycle", @@ -471,7 +476,7 @@ describe("ProviderRuntimeIngestion", () => { }); await harness.drain(); - const midReadModel = await Effect.runPromise(harness.engine.getReadModel()); + const midReadModel = await harness.readModel(); const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(midThread?.session?.status).toBe("running"); expect(midThread?.session?.activeTurnId).toBe("turn-midturn-lifecycle"); @@ -487,7 +492,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "ready" && thread.session?.activeTurnId === null, ); }); @@ -524,7 +529,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-claude-placeholder", @@ -541,7 +546,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "ready" && thread.session?.activeTurnId === null, ); }); @@ -560,7 +565,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-primary", ); @@ -576,7 +581,7 @@ describe("ProviderRuntimeIngestion", () => { }); await harness.drain(); - const midReadModel = await Effect.runPromise(harness.engine.getReadModel()); + const midReadModel = await harness.readModel(); const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(midThread?.session?.status).toBe("running"); expect(midThread?.session?.activeTurnId).toBe("turn-primary"); @@ -592,7 +597,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "ready" && thread.session?.activeTurnId === null, ); }); @@ -611,7 +616,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-guarded-main", @@ -628,7 +633,7 @@ describe("ProviderRuntimeIngestion", () => { }); await harness.drain(); - const midReadModel = await Effect.runPromise(harness.engine.getReadModel()); + const midReadModel = await harness.readModel(); const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(midThread?.session?.status).toBe("running"); expect(midThread?.session?.activeTurnId).toBe("turn-guarded-main"); @@ -644,7 +649,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "ready" && thread.session?.activeTurnId === null, ); }); @@ -693,7 +698,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-1" && !message.streaming, @@ -725,7 +730,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-no-delta" && !message.streaming, @@ -764,7 +769,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.activities.some( (activity: ProviderRuntimeTestActivity) => activity.id === "evt-tool-completed-with-data", ), @@ -819,7 +824,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.activities.some( (activity: ProviderRuntimeTestActivity) => activity.id === "evt-command-completed", ), @@ -861,7 +866,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.activities.some( (activity: ProviderRuntimeTestActivity) => activity.id === "evt-read-path-completed", ), @@ -894,7 +899,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.proposedPlans.some( (proposedPlan: ProviderRuntimeTestProposedPlan) => proposedPlan.id === "plan:thread-1:turn:turn-plan-final", @@ -1009,7 +1014,7 @@ describe("ProviderRuntimeIngestion", () => { }); const sourceThreadWithPlan = await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.proposedPlans.some( (proposedPlan: ProviderRuntimeTestProposedPlan) => @@ -1050,7 +1055,7 @@ describe("ProviderRuntimeIngestion", () => { ); const sourceThreadBeforeStart = await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.proposedPlans.some( (proposedPlan: ProviderRuntimeTestProposedPlan) => @@ -1076,7 +1081,7 @@ describe("ProviderRuntimeIngestion", () => { }); const sourceThreadAfterStart = await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.proposedPlans.some( (proposedPlan: ProviderRuntimeTestProposedPlan) => @@ -1158,7 +1163,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === activeTurnId, 2_000, @@ -1178,7 +1183,7 @@ describe("ProviderRuntimeIngestion", () => { }); const sourceThreadWithPlan = await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.proposedPlans.some( (proposedPlan: ProviderRuntimeTestProposedPlan) => @@ -1229,7 +1234,7 @@ describe("ProviderRuntimeIngestion", () => { await harness.drain(); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const sourceThreadAfterRejectedStart = readModel.threads.find( (entry) => entry.id === sourceThreadId, ); @@ -1340,7 +1345,7 @@ describe("ProviderRuntimeIngestion", () => { }); const sourceThreadWithPlan = await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.proposedPlans.some( (proposedPlan: ProviderRuntimeTestProposedPlan) => @@ -1401,7 +1406,7 @@ describe("ProviderRuntimeIngestion", () => { await harness.drain(); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const sourceThreadAfterUnrelatedStart = readModel.threads.find( (entry) => entry.id === sourceThreadId, ); @@ -1427,7 +1432,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-plan-buffer", ); @@ -1466,7 +1471,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.proposedPlans.some( (proposedPlan: ProviderRuntimeTestProposedPlan) => proposedPlan.id === "plan:thread-1:turn:turn-plan-buffer", @@ -1492,7 +1497,7 @@ describe("ProviderRuntimeIngestion", () => { turnId: asTurnId("turn-buffered"), }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-buffered", ); @@ -1512,7 +1517,7 @@ describe("ProviderRuntimeIngestion", () => { }); await harness.drain(); - const midReadModel = await Effect.runPromise(harness.engine.getReadModel()); + const midReadModel = await harness.readModel(); const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect( midThread?.messages.some( @@ -1534,7 +1539,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffered" && !message.streaming, @@ -1560,7 +1565,7 @@ describe("ProviderRuntimeIngestion", () => { turnId: asTurnId("turn-buffered-request-flush"), }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-buffered-request-flush", @@ -1593,7 +1598,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffered-request-flush" && @@ -1620,7 +1625,7 @@ describe("ProviderRuntimeIngestion", () => { turnId: asTurnId("turn-buffered-user-input-flush"), }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-buffered-user-input-flush", @@ -1659,7 +1664,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffered-user-input-flush" && @@ -1688,7 +1693,7 @@ describe("ProviderRuntimeIngestion", () => { turnId: asTurnId("turn-buffered-whitespace-request"), }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-buffered-whitespace-request", @@ -1721,7 +1726,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.activities.some( (activity: ProviderRuntimeTestActivity) => activity.kind === "approval.requested", ), @@ -1750,7 +1755,7 @@ describe("ProviderRuntimeIngestion", () => { turnId: asTurnId("turn-buffered-request-append"), }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-buffered-request-append", @@ -1783,7 +1788,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - await waitForThread(harness.engine, (entry) => + await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffered-request-append" && @@ -1819,7 +1824,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffered-request-append:segment:1" && @@ -1882,7 +1887,7 @@ describe("ProviderRuntimeIngestion", () => { turnId: asTurnId("turn-streaming-request-segment"), }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-streaming-request-segment", @@ -1915,7 +1920,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - await waitForThread(harness.engine, (entry) => + await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-streaming-request-segment" && @@ -1951,7 +1956,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-streaming-request-segment:segment:1" && @@ -2004,7 +2009,7 @@ describe("ProviderRuntimeIngestion", () => { turnId: asTurnId("turn-streaming-mode"), }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-streaming-mode", @@ -2024,7 +2029,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const liveThread = await waitForThread(harness.engine, (entry) => + const liveThread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-streaming-mode" && @@ -2052,7 +2057,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const finalThread = await waitForThread(harness.engine, (entry) => + const finalThread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-streaming-mode" && !message.streaming, @@ -2079,7 +2084,7 @@ describe("ProviderRuntimeIngestion", () => { turnId: asTurnId("turn-buffer-spill"), }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-buffer-spill", @@ -2112,7 +2117,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffer-spill" && !message.streaming, @@ -2140,7 +2145,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-complete-dedup", @@ -2185,7 +2190,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (thread) => thread.session?.status === "ready" && thread.session?.activeTurnId === null && @@ -2243,7 +2248,7 @@ describe("ProviderRuntimeIngestion", () => { }); await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.activities.some( (activity: ProviderRuntimeTestActivity) => activity.kind === "approval.requested", @@ -2253,7 +2258,7 @@ describe("ProviderRuntimeIngestion", () => { ), ); - const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread).toBeDefined(); @@ -2295,7 +2300,7 @@ describe("ProviderRuntimeIngestion", () => { }); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.session?.status === "error" && entry.session?.activeTurnId === "turn-3" && @@ -2321,7 +2326,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.activities.some((activity) => activity.id === "evt-runtime-error-activity"), ); const activity = thread.activities.find( @@ -2366,7 +2371,7 @@ describe("ProviderRuntimeIngestion", () => { }); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.session?.status === "running" && entry.session?.activeTurnId === "turn-warning" && @@ -2415,7 +2420,7 @@ describe("ProviderRuntimeIngestion", () => { }); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.session?.status === "ready" && entry.session?.activeTurnId === null && @@ -2508,7 +2513,7 @@ describe("ProviderRuntimeIngestion", () => { }); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.title === "Renamed by provider" && entry.activities.some( @@ -2595,7 +2600,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.activities.some( (activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated", ), @@ -2647,7 +2652,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.activities.some( (activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated", ), @@ -2697,7 +2702,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.activities.some( (activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated", ), @@ -2732,7 +2737,7 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const thread = await waitForThread(harness.engine, (entry) => + const thread = await waitForThread(harness.readModel, (entry) => entry.activities.some( (activity: ProviderRuntimeTestActivity) => activity.kind === "context-compaction", ), @@ -2802,7 +2807,7 @@ describe("ProviderRuntimeIngestion", () => { }); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.activities.some( (activity: ProviderRuntimeTestActivity) => activity.kind === "task.completed", @@ -2893,7 +2898,7 @@ describe("ProviderRuntimeIngestion", () => { }); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.activities.some( (activity: ProviderRuntimeTestActivity) => activity.kind === "user-input.requested", @@ -2952,7 +2957,7 @@ describe("ProviderRuntimeIngestion", () => { }); const thread = await waitForThread( - harness.engine, + harness.readModel, (entry) => entry.session?.status === "error" && entry.session?.activeTurnId === "turn-after-failure" && diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index b7a4c195a5b..2e86623f8dc 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -4,12 +4,16 @@ import { CommandId, MessageId, type OrchestrationEvent, + type OrchestrationMessage, type OrchestrationProposedPlanId, CheckpointRef, isToolLifecycleItemType, ThreadId, type ThreadTokenUsageSnapshot, TurnId, + type OrchestrationCheckpointSummary, + type OrchestrationProposedPlan, + type OrchestrationThread, type OrchestrationThreadActivity, type ProviderRuntimeEvent, } from "@t3tools/contracts"; @@ -19,9 +23,9 @@ import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; import { ProjectionTurnRepository } from "../../persistence/Services/ProjectionTurns.ts"; import { ProjectionTurnRepositoryLive } from "../../persistence/Layers/ProjectionTurns.ts"; -import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts"; import { isGitRepository } from "../../git/Utils.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; import { ProviderRuntimeIngestionService, type ProviderRuntimeIngestionShape, @@ -77,6 +81,82 @@ function sameId(left: string | null | undefined, right: string | null | undefine return left === right; } +function hasAssistantMessageForTurn( + messages: ReadonlyArray, + turnId: TurnId, + options?: { readonly streamingOnly?: boolean }, +): boolean { + for (let index = 0; index < messages.length; index += 1) { + const message = messages[index]; + if (!message) { + continue; + } + if (message.role !== "assistant" || message.turnId !== turnId) { + continue; + } + if (options?.streamingOnly === true && !message.streaming) { + continue; + } + return true; + } + return false; +} + +function findMessageById( + messages: ReadonlyArray, + messageId: MessageId, +): OrchestrationMessage | undefined { + for (let index = 0; index < messages.length; index += 1) { + const message = messages[index]; + if (message?.id === messageId) { + return message; + } + } + return undefined; +} + +function findProposedPlanById( + proposedPlans: ReadonlyArray< + Pick + >, + planId: string, +): + | Pick + | undefined { + for (let index = 0; index < proposedPlans.length; index += 1) { + const proposedPlan = proposedPlans[index]; + if (proposedPlan?.id === planId) { + return proposedPlan; + } + } + return undefined; +} + +function hasCheckpointForTurn( + checkpoints: ReadonlyArray, + turnId: TurnId, +): boolean { + for (let index = 0; index < checkpoints.length; index += 1) { + if (checkpoints[index]?.turnId === turnId) { + return true; + } + } + return false; +} + +function maxCheckpointTurnCount( + checkpoints: ReadonlyArray, +): number { + let maxTurnCount = 0; + for (let index = 0; index < checkpoints.length; index += 1) { + const checkpoint = checkpoints[index]; + if (checkpoint && checkpoint.checkpointTurnCount > maxTurnCount) { + maxTurnCount = checkpoint.checkpointTurnCount; + } + } + return maxTurnCount; +} + function truncateDetail(value: string, limit = 180): string { return value.length > limit ? `${value.slice(0, limit - 3)}...` : value; } @@ -522,6 +602,7 @@ function runtimeEventToActivities( const make = Effect.gen(function* () { const orchestrationEngine = yield* OrchestrationEngineService; + const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; const providerService = yield* ProviderService; const projectionTurnRepository = yield* ProjectionTurnRepository; const serverSettingsService = yield* ServerSettingsService; @@ -553,20 +634,16 @@ const make = Effect.gen(function* () { lookup: () => Effect.succeed({ text: "", createdAt: "" }), }); - const isGitRepoForThread = Effect.fn("isGitRepoForThread")(function* (threadId: ThreadId) { - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === threadId); - if (!thread) { - return false; - } - const workspaceCwd = resolveThreadWorkspaceCwd({ - thread, - projects: readModel.projects, - }); - if (!workspaceCwd) { - return false; - } - return isGitRepository(workspaceCwd); + const resolveThreadDetail = Effect.fn("resolveThreadDetail")(function* (threadId: ThreadId) { + return yield* projectionSnapshotQuery + .getThreadDetailById(threadId) + .pipe(Effect.map(Option.getOrUndefined)); + }); + + const resolveThreadShell = Effect.fn("resolveThreadShell")(function* (threadId: ThreadId) { + return yield* projectionSnapshotQuery + .getThreadShellById(threadId) + .pipe(Effect.map(Option.getOrUndefined)); }); const rememberAssistantMessageId = (threadId: ThreadId, turnId: TurnId, messageId: MessageId) => @@ -917,7 +994,7 @@ const make = Effect.gen(function* () { return; } - const existingPlan = input.threadProposedPlans.find((entry) => entry.id === input.planId); + const existingPlan = findProposedPlanById(input.threadProposedPlans, input.planId); yield* orchestrationEngine.dispatch({ type: "thread.proposed-plan.upsert", commandId: providerCommandId(input.event, "proposed-plan-upsert"), @@ -1070,8 +1147,7 @@ const make = Effect.gen(function* () { implementationThreadId: ThreadId, implementedAt: string, ) { - const readModel = yield* orchestrationEngine.getReadModel(); - const sourceThread = readModel.threads.find((entry) => entry.id === sourceThreadId); + const sourceThread = yield* resolveThreadDetail(sourceThreadId); const sourcePlan = sourceThread?.proposedPlans.find((entry) => entry.id === sourcePlanId); if (!sourceThread || !sourcePlan || sourcePlan.implementedAt !== null) { return; @@ -1096,10 +1172,19 @@ const make = Effect.gen(function* () { const processRuntimeEvent = (event: ProviderRuntimeEvent) => Effect.gen(function* () { - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === event.threadId); + const thread = yield* resolveThreadShell(event.threadId); if (!thread) return; + let loadedThreadDetail: OrchestrationThread | null | undefined; + const getLoadedThreadDetail = () => + Effect.gen(function* () { + if (loadedThreadDetail !== undefined) { + return loadedThreadDetail; + } + loadedThreadDetail = (yield* resolveThreadDetail(thread.id)) ?? null; + return loadedThreadDetail; + }); + const now = event.createdAt; const eventTurnId = toTurnId(event.turnId); const activeTurnId = thread.session?.activeTurnId ?? null; @@ -1277,6 +1362,7 @@ const make = Effect.gen(function* () { ? toTurnId(event.turnId) : undefined; if (pauseForUserTurnId) { + const detailedThread = yield* getLoadedThreadDetail(); const assistantDeliveryMode: AssistantDeliveryMode = yield* Effect.map( serverSettingsService.getSettings, (settings) => (settings.enableAssistantStreaming ? "streaming" : "buffered"), @@ -1307,10 +1393,11 @@ const make = Effect.gen(function* () { event.type === "request.opened" ? "assistant-delta-finalize-on-request-opened" : "assistant-delta-finalize-on-user-input-requested", - hasProjectedMessage: thread.messages.some( - (entry) => - entry.role === "assistant" && entry.turnId === pauseForUserTurnId && entry.streaming, - ), + hasProjectedMessage: + detailedThread !== null && + hasAssistantMessageForTurn(detailedThread.messages, pauseForUserTurnId, { + streamingOnly: true, + }), flushedMessageIds, }); } @@ -1339,21 +1426,19 @@ const make = Effect.gen(function* () { : undefined; if (assistantCompletion) { + const detailedThread = yield* getLoadedThreadDetail(); + const messages = detailedThread?.messages ?? []; const turnId = toTurnId(event.turnId); const activeAssistantMessageId = turnId ? yield* getActiveAssistantMessageIdForTurn(thread.id, turnId) : Option.none(); const hasAssistantMessagesForTurn = - turnId !== undefined - ? thread.messages.some((entry) => entry.role === "assistant" && entry.turnId === turnId) - : false; + turnId !== undefined ? hasAssistantMessageForTurn(messages, turnId) : false; const assistantMessageId = Option.getOrElse( activeAssistantMessageId, () => assistantCompletion.messageId, ); - const existingAssistantMessage = thread.messages.find( - (entry) => entry.id === assistantMessageId, - ); + const existingAssistantMessage = findMessageById(messages, assistantMessageId); const shouldApplyFallbackCompletionText = !existingAssistantMessage || existingAssistantMessage.text.length === 0; @@ -1393,10 +1478,11 @@ const make = Effect.gen(function* () { } if (proposedPlanCompletion) { + const detailedThread = yield* getLoadedThreadDetail(); yield* finalizeBufferedProposedPlan({ event, threadId: thread.id, - threadProposedPlans: thread.proposedPlans, + threadProposedPlans: detailedThread?.proposedPlans ?? [], planId: proposedPlanCompletion.planId, ...(proposedPlanCompletion.turnId ? { turnId: proposedPlanCompletion.turnId } : {}), fallbackMarkdown: proposedPlanCompletion.planMarkdown, @@ -1405,6 +1491,9 @@ const make = Effect.gen(function* () { } if (event.type === "turn.completed") { + const detailedThread = yield* getLoadedThreadDetail(); + const messages = detailedThread?.messages ?? []; + const proposedPlans = detailedThread?.proposedPlans ?? []; const turnId = toTurnId(event.turnId); if (turnId) { const assistantMessageIds = yield* getAssistantMessageIdsForTurn(thread.id, turnId); @@ -1419,9 +1508,7 @@ const make = Effect.gen(function* () { createdAt: now, commandTag: "assistant-complete-finalize", finalDeltaCommandTag: "assistant-delta-finalize-fallback", - hasProjectedMessage: thread.messages.some( - (entry) => entry.id === assistantMessageId, - ), + hasProjectedMessage: findMessageById(messages, assistantMessageId) !== undefined, }), { concurrency: 1 }, ).pipe(Effect.asVoid); @@ -1431,7 +1518,7 @@ const make = Effect.gen(function* () { yield* finalizeBufferedProposedPlan({ event, threadId: thread.id, - threadProposedPlans: thread.proposedPlans, + threadProposedPlans: proposedPlans, planId: proposedPlanIdForTurn(thread.id, turnId), turnId, updatedAt: now, @@ -1483,21 +1570,24 @@ const make = Effect.gen(function* () { if (event.type === "turn.diff.updated") { const turnId = toTurnId(event.turnId); - if (turnId && (yield* isGitRepoForThread(thread.id))) { + const checkpointContext = turnId + ? yield* projectionSnapshotQuery + .getThreadCheckpointContext(thread.id) + .pipe(Effect.map(Option.getOrUndefined)) + : undefined; + const workspaceCwd = + checkpointContext?.worktreePath ?? checkpointContext?.workspaceRoot ?? undefined; + if (turnId && checkpointContext && workspaceCwd && isGitRepository(workspaceCwd)) { // Skip if a checkpoint already exists for this turn. A real // (non-placeholder) capture from CheckpointReactor should not // be clobbered, and dispatching a duplicate placeholder for the // same turnId would produce an unstable checkpointTurnCount. - if (thread.checkpoints.some((c) => c.turnId === turnId)) { + if (hasCheckpointForTurn(checkpointContext.checkpoints, turnId)) { // Already tracked; no-op. } else { const assistantMessageId = MessageId.make( `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, ); - const maxTurnCount = thread.checkpoints.reduce( - (max, c) => Math.max(max, c.checkpointTurnCount), - 0, - ); yield* orchestrationEngine.dispatch({ type: "thread.turn.diff.complete", commandId: providerCommandId(event, "thread-turn-diff-complete"), @@ -1508,7 +1598,7 @@ const make = Effect.gen(function* () { status: "missing", files: [], assistantMessageId, - checkpointTurnCount: maxTurnCount + 1, + checkpointTurnCount: maxCheckpointTurnCount(checkpointContext.checkpoints) + 1, createdAt: now, }); } diff --git a/apps/server/src/orchestration/Services/OrchestrationEngine.ts b/apps/server/src/orchestration/Services/OrchestrationEngine.ts index 376b87d30a0..39270bb0c4a 100644 --- a/apps/server/src/orchestration/Services/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Services/OrchestrationEngine.ts @@ -10,11 +10,7 @@ * * @module OrchestrationEngineService */ -import type { - OrchestrationCommand, - OrchestrationEvent, - OrchestrationReadModel, -} from "@t3tools/contracts"; +import type { OrchestrationCommand, OrchestrationEvent } from "@t3tools/contracts"; import { Context } from "effect"; import type { Effect, Stream } from "effect"; @@ -25,13 +21,6 @@ import type { OrchestrationEventStoreError } from "../../persistence/Errors.ts"; * OrchestrationEngineShape - Service API for orchestration command and event flow. */ export interface OrchestrationEngineShape { - /** - * Read the current in-memory orchestration read model. - * - * @returns Effect containing the latest read model. - */ - readonly getReadModel: () => Effect.Effect; - /** * Replay persisted orchestration events from an exclusive sequence cursor. * @@ -70,7 +59,7 @@ export interface OrchestrationEngineShape { * ```ts * const program = Effect.gen(function* () { * const engine = yield* OrchestrationEngineService - * return yield* engine.getReadModel() + * return yield* engine.dispatch(command) * }) * ``` */ diff --git a/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts index be81dcbb374..9d64307d3dd 100644 --- a/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts @@ -28,6 +28,10 @@ export interface ProjectionSnapshotCounts { readonly threadCount: number; } +export interface ProjectionSnapshotSequence { + readonly snapshotSequence: number; +} + export interface ProjectionThreadCheckpointContext { readonly threadId: ThreadId; readonly projectId: ProjectId; @@ -40,6 +44,15 @@ export interface ProjectionThreadCheckpointContext { * ProjectionSnapshotQueryShape - Service API for read-model snapshots. */ export interface ProjectionSnapshotQueryShape { + /** + * Read the lightweight command snapshot used to bootstrap the in-memory + * orchestration engine without hydrating message/activity/checkpoint bodies. + */ + readonly getCommandReadModel: () => Effect.Effect< + OrchestrationReadModel, + ProjectionRepositoryError + >; + /** * Read the latest orchestration projection snapshot. * @@ -59,6 +72,15 @@ export interface ProjectionSnapshotQueryShape { ProjectionRepositoryError >; + /** + * Read the latest projection snapshot sequence without hydrating read-model + * entities. + */ + readonly getSnapshotSequence: () => Effect.Effect< + ProjectionSnapshotSequence, + ProjectionRepositoryError + >; + /** * Read aggregate projection counts without hydrating the full read model. */ diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index 025e9e4831a..c0918de8493 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -41,6 +41,7 @@ import Migration0025 from "./Migrations/025_CleanupInvalidProjectionPendingAppro import Migration0026 from "./Migrations/026_CanonicalizeModelSelectionOptions.ts"; import Migration0027 from "./Migrations/027_ProviderSessionRuntimeInstanceId.ts"; import Migration0028 from "./Migrations/028_ProjectionThreadSessionInstanceId.ts"; +import Migration0029 from "./Migrations/029_ProjectionThreadDetailOrderingIndexes.ts"; /** * Migration loader with all migrations defined inline. @@ -81,6 +82,7 @@ export const migrationEntries = [ [26, "CanonicalizeModelSelectionOptions", Migration0026], [27, "ProviderSessionRuntimeInstanceId", Migration0027], [28, "ProjectionThreadSessionInstanceId", Migration0028], + [29, "ProjectionThreadDetailOrderingIndexes", Migration0029], ] as const; export const makeMigrationLoader = (throughId?: number) => diff --git a/apps/server/src/persistence/Migrations/029_ProjectionThreadDetailOrderingIndexes.test.ts b/apps/server/src/persistence/Migrations/029_ProjectionThreadDetailOrderingIndexes.test.ts new file mode 100644 index 00000000000..6565e81e5c9 --- /dev/null +++ b/apps/server/src/persistence/Migrations/029_ProjectionThreadDetailOrderingIndexes.test.ts @@ -0,0 +1,73 @@ +import { assert, it } from "@effect/vitest"; +import { Effect, Layer } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +import { runMigrations } from "../Migrations.ts"; +import * as NodeSqliteClient from "../NodeSqliteClient.ts"; + +const layer = it.layer(Layer.mergeAll(NodeSqliteClient.layerMemory())); + +layer("029_ProjectionThreadDetailOrderingIndexes", (it) => { + it.effect("creates indexes matching thread detail ordering queries", () => + Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* runMigrations({ toMigrationInclusive: 28 }); + yield* runMigrations({ toMigrationInclusive: 29 }); + + const activityIndexes = yield* sql<{ + readonly seq: number; + readonly name: string; + readonly unique: number; + readonly origin: string; + readonly partial: number; + }>` + PRAGMA index_list(projection_thread_activities) + `; + assert.ok( + activityIndexes.some( + (index) => index.name === "idx_projection_thread_activities_thread_sequence_created_id", + ), + ); + + const activityIndexColumns = yield* sql<{ + readonly seqno: number; + readonly cid: number; + readonly name: string; + }>` + PRAGMA index_info('idx_projection_thread_activities_thread_sequence_created_id') + `; + assert.deepStrictEqual( + activityIndexColumns.map((column) => column.name), + ["thread_id", "sequence", "created_at", "activity_id"], + ); + + const messageIndexes = yield* sql<{ + readonly seq: number; + readonly name: string; + readonly unique: number; + readonly origin: string; + readonly partial: number; + }>` + PRAGMA index_list(projection_thread_messages) + `; + assert.ok( + messageIndexes.some( + (index) => index.name === "idx_projection_thread_messages_thread_created_id", + ), + ); + + const messageIndexColumns = yield* sql<{ + readonly seqno: number; + readonly cid: number; + readonly name: string; + }>` + PRAGMA index_info('idx_projection_thread_messages_thread_created_id') + `; + assert.deepStrictEqual( + messageIndexColumns.map((column) => column.name), + ["thread_id", "created_at", "message_id"], + ); + }), + ); +}); diff --git a/apps/server/src/persistence/Migrations/029_ProjectionThreadDetailOrderingIndexes.ts b/apps/server/src/persistence/Migrations/029_ProjectionThreadDetailOrderingIndexes.ts new file mode 100644 index 00000000000..4a0595afa9b --- /dev/null +++ b/apps/server/src/persistence/Migrations/029_ProjectionThreadDetailOrderingIndexes.ts @@ -0,0 +1,16 @@ +import * as Effect from "effect/Effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_thread_activities_thread_sequence_created_id + ON projection_thread_activities(thread_id, sequence, created_at, activity_id) + `; + + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_thread_messages_thread_created_id + ON projection_thread_messages(thread_id, created_at, message_id) + `; +}); diff --git a/apps/server/src/project/Layers/ProjectSetupScriptRunner.test.ts b/apps/server/src/project/Layers/ProjectSetupScriptRunner.test.ts index 6366a768b75..28c74aa7fb8 100644 --- a/apps/server/src/project/Layers/ProjectSetupScriptRunner.test.ts +++ b/apps/server/src/project/Layers/ProjectSetupScriptRunner.test.ts @@ -1,53 +1,52 @@ -import { Effect, Layer, Stream } from "effect"; +import { ProjectId, type OrchestrationProject } from "@t3tools/contracts"; +import { Effect, Layer, Option } from "effect"; import { describe, expect, it, vi } from "vitest"; -import type { OrchestrationReadModel } from "@t3tools/contracts"; -import { OrchestrationEngineService } from "../../orchestration/Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts"; import { TerminalManager } from "../../terminal/Services/Manager.ts"; import { ProjectSetupScriptRunner } from "../Services/ProjectSetupScriptRunner.ts"; import { ProjectSetupScriptRunnerLive } from "./ProjectSetupScriptRunner.ts"; -const emptySnapshot = ( - scripts: OrchestrationReadModel["projects"][number]["scripts"], -): OrchestrationReadModel => - ({ - snapshotSequence: 1, - updatedAt: "2026-01-01T00:00:00.000Z", - projects: [ - { - id: "project-1", - title: "Project", - workspaceRoot: "/repo/project", - defaultModelSelection: null, - scripts, - createdAt: "2026-01-01T00:00:00.000Z", - updatedAt: "2026-01-01T00:00:00.000Z", - deletedAt: null, - }, - ], - threads: [], - providerSessions: [], - providerStatuses: [], - pendingApprovals: [], - latestTurnByThreadId: {}, - }) as unknown as OrchestrationReadModel; +const makeProject = (scripts: OrchestrationProject["scripts"]): OrchestrationProject => ({ + id: ProjectId.make("project-1"), + title: "Project", + workspaceRoot: "/repo/project", + defaultModelSelection: null, + scripts, + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt: "2026-01-01T00:00:00.000Z", + deletedAt: null, +}); + +const makeProjectionSnapshotQueryLayer = (project: OrchestrationProject) => + Layer.succeed(ProjectionSnapshotQuery, { + getCommandReadModel: () => Effect.die("unused"), + getSnapshot: () => Effect.die("unused"), + getShellSnapshot: () => Effect.die("unused"), + getSnapshotSequence: () => Effect.succeed({ snapshotSequence: 1 }), + getCounts: () => Effect.die("unused"), + getActiveProjectByWorkspaceRoot: (workspaceRoot) => + Effect.succeed( + workspaceRoot === project.workspaceRoot ? Option.some(project) : Option.none(), + ), + getProjectShellById: (projectId) => + Effect.succeed(projectId === project.id ? Option.some(project) : Option.none()), + getFirstActiveThreadIdByProjectId: () => Effect.die("unused"), + getThreadCheckpointContext: () => Effect.die("unused"), + getThreadShellById: () => Effect.die("unused"), + getThreadDetailById: () => Effect.die("unused"), + }); describe("ProjectSetupScriptRunner", () => { it("returns no-script when no setup script exists", async () => { const open = vi.fn(); const write = vi.fn(); + const project = makeProject([]); const runner = await Effect.runPromise( Effect.service(ProjectSetupScriptRunner).pipe( Effect.provide( ProjectSetupScriptRunnerLive.pipe( - Layer.provideMerge( - Layer.succeed(OrchestrationEngineService, { - getReadModel: () => Effect.succeed(emptySnapshot([])), - readEvents: () => Stream.empty, - dispatch: () => Effect.die(new Error("unused")), - streamDomainEvents: Stream.empty, - }), - ), + Layer.provideMerge(makeProjectionSnapshotQueryLayer(project)), Layer.provideMerge( Layer.succeed(TerminalManager, { open, @@ -93,29 +92,20 @@ describe("ProjectSetupScriptRunner", () => { }), ); const write = vi.fn(() => Effect.void); + const project = makeProject([ + { + id: "setup", + name: "Setup", + command: "bun install", + icon: "configure", + runOnWorktreeCreate: true, + }, + ]); const runner = await Effect.runPromise( Effect.service(ProjectSetupScriptRunner).pipe( Effect.provide( ProjectSetupScriptRunnerLive.pipe( - Layer.provideMerge( - Layer.succeed(OrchestrationEngineService, { - getReadModel: () => - Effect.succeed( - emptySnapshot([ - { - id: "setup", - name: "Setup", - command: "bun install", - icon: "configure", - runOnWorktreeCreate: true, - }, - ]), - ), - readEvents: () => Stream.empty, - dispatch: () => Effect.die(new Error("unused")), - streamDomainEvents: Stream.empty, - }), - ), + Layer.provideMerge(makeProjectionSnapshotQueryLayer(project)), Layer.provideMerge( Layer.succeed(TerminalManager, { open, diff --git a/apps/server/src/project/Layers/ProjectSetupScriptRunner.ts b/apps/server/src/project/Layers/ProjectSetupScriptRunner.ts index 3bac8cf0abf..43c916d52ee 100644 --- a/apps/server/src/project/Layers/ProjectSetupScriptRunner.ts +++ b/apps/server/src/project/Layers/ProjectSetupScriptRunner.ts @@ -1,7 +1,8 @@ +import { ProjectId } from "@t3tools/contracts"; import { projectScriptRuntimeEnv, setupProjectScript } from "@t3tools/shared/projectScripts"; -import { Effect, Layer } from "effect"; +import { Effect, Layer, Option } from "effect"; -import { OrchestrationEngineService } from "../../orchestration/Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts"; import { TerminalManager } from "../../terminal/Services/Manager.ts"; import { type ProjectSetupScriptRunnerShape, @@ -9,18 +10,21 @@ import { } from "../Services/ProjectSetupScriptRunner.ts"; const makeProjectSetupScriptRunner = Effect.gen(function* () { - const orchestrationEngine = yield* OrchestrationEngineService; + const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; const terminalManager = yield* TerminalManager; const runForThread: ProjectSetupScriptRunnerShape["runForThread"] = (input) => Effect.gen(function* () { - const readModel = yield* orchestrationEngine.getReadModel(); const project = (input.projectId - ? readModel.projects.find((entry) => entry.id === input.projectId) + ? yield* projectionSnapshotQuery + .getProjectShellById(ProjectId.make(input.projectId)) + .pipe(Effect.map(Option.getOrUndefined)) : null) ?? (input.projectCwd - ? readModel.projects.find((entry) => entry.workspaceRoot === input.projectCwd) + ? yield* projectionSnapshotQuery + .getActiveProjectByWorkspaceRoot(input.projectCwd) + .pipe(Effect.map(Option.getOrUndefined)) : null) ?? null; diff --git a/apps/server/src/provider/Layers/ProviderSessionReaper.test.ts b/apps/server/src/provider/Layers/ProviderSessionReaper.test.ts index 91e1a9aef97..b2b9af2c1ec 100644 --- a/apps/server/src/provider/Layers/ProviderSessionReaper.test.ts +++ b/apps/server/src/provider/Layers/ProviderSessionReaper.test.ts @@ -9,10 +9,7 @@ import { import { Effect, Exit, Layer, ManagedRuntime, Option, Scope, Stream } from "effect"; import { afterEach, describe, expect, it, vi } from "vitest"; -import { - OrchestrationEngineService, - type OrchestrationEngineShape, -} from "../../orchestration/Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts"; import { SqlitePersistenceMemory } from "../../persistence/Layers/Sqlite.ts"; import { ProviderSessionRuntimeRepositoryLive } from "../../persistence/Layers/ProviderSessionRuntime.ts"; import { ProviderSessionRuntimeRepository } from "../../persistence/Services/ProviderSessionRuntime.ts"; @@ -92,6 +89,10 @@ function makeReadModel( createdAt: now, updatedAt: now, archivedAt: null, + latestUserMessageAt: null, + hasPendingApprovals: false, + hasPendingUserInput: false, + hasActionableProposedPlan: false, latestTurn: null, messages: [], session: thread.session, @@ -163,13 +164,6 @@ describe("ProviderSessionReaper", () => { streamEvents: Stream.empty, }; - const orchestrationEngine: OrchestrationEngineShape = { - getReadModel: () => Effect.succeed(input.readModel), - readEvents: () => Stream.empty, - dispatch: () => unsupported(), - streamDomainEvents: Stream.empty, - }; - const runtimeRepositoryLayer = ProviderSessionRuntimeRepositoryLive.pipe( Layer.provide(SqlitePersistenceMemory), ); @@ -183,7 +177,27 @@ describe("ProviderSessionReaper", () => { Layer.provideMerge(providerSessionDirectoryLayer), Layer.provideMerge(runtimeRepositoryLayer), Layer.provideMerge(Layer.succeed(ProviderService, providerService)), - Layer.provideMerge(Layer.succeed(OrchestrationEngineService, orchestrationEngine)), + Layer.provideMerge( + Layer.succeed(ProjectionSnapshotQuery, { + getCommandReadModel: () => Effect.die("unused"), + getSnapshot: () => Effect.die("unused"), + getShellSnapshot: () => Effect.die("unused"), + getSnapshotSequence: () => + Effect.succeed({ snapshotSequence: input.readModel.snapshotSequence }), + getCounts: () => Effect.die("unused"), + getActiveProjectByWorkspaceRoot: () => Effect.die("unused"), + getProjectShellById: () => Effect.die("unused"), + getFirstActiveThreadIdByProjectId: () => Effect.die("unused"), + getThreadCheckpointContext: () => Effect.die("unused"), + getThreadShellById: (threadId) => + Effect.succeed( + input.readModel.threads.find((thread) => thread.id === threadId) + ? Option.some(input.readModel.threads.find((thread) => thread.id === threadId)!) + : Option.none(), + ), + getThreadDetailById: () => Effect.die("unused"), + }), + ), Layer.provideMerge(NodeServices.layer), ); diff --git a/apps/server/src/provider/Layers/ProviderSessionReaper.ts b/apps/server/src/provider/Layers/ProviderSessionReaper.ts index aa31c8c7d7a..916e5fcea4a 100644 --- a/apps/server/src/provider/Layers/ProviderSessionReaper.ts +++ b/apps/server/src/provider/Layers/ProviderSessionReaper.ts @@ -1,6 +1,6 @@ -import { Duration, Effect, Layer, Schedule } from "effect"; +import { Duration, Effect, Layer, Option, Schedule } from "effect"; -import { OrchestrationEngineService } from "../../orchestration/Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts"; import { ProviderSessionDirectory } from "../Services/ProviderSessionDirectory.ts"; import { ProviderSessionReaper, @@ -20,7 +20,7 @@ const makeProviderSessionReaper = (options?: ProviderSessionReaperLiveOptions) = Effect.gen(function* () { const providerService = yield* ProviderService; const directory = yield* ProviderSessionDirectory; - const orchestrationEngine = yield* OrchestrationEngineService; + const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; const inactivityThresholdMs = Math.max( 1, @@ -29,8 +29,6 @@ const makeProviderSessionReaper = (options?: ProviderSessionReaperLiveOptions) = const sweepIntervalMs = Math.max(1, options?.sweepIntervalMs ?? DEFAULT_SWEEP_INTERVAL_MS); const sweep = Effect.gen(function* () { - const readModel = yield* orchestrationEngine.getReadModel(); - const threadsById = new Map(readModel.threads.map((thread) => [thread.id, thread] as const)); const bindings = yield* directory.listBindings(); const now = Date.now(); let reapedCount = 0; @@ -55,7 +53,9 @@ const makeProviderSessionReaper = (options?: ProviderSessionReaperLiveOptions) = continue; } - const thread = threadsById.get(binding.threadId); + const thread = yield* projectionSnapshotQuery + .getThreadShellById(binding.threadId) + .pipe(Effect.map(Option.getOrUndefined)); if (thread?.session?.activeTurnId != null) { yield* Effect.logDebug("provider.session.reaper.skipped-active-turn", { threadId: binding.threadId, diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index 152fba1ea27..65cb638f7e4 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -559,7 +559,6 @@ const buildAppUnderTest = (options?: { ), Layer.provide( Layer.mock(OrchestrationEngineService)({ - getReadModel: () => Effect.succeed(makeDefaultOrchestrationReadModel()), readEvents: () => Stream.empty, dispatch: () => Effect.succeed({ sequence: 0 }), streamDomainEvents: Stream.empty, @@ -568,6 +567,7 @@ const buildAppUnderTest = (options?: { ), Layer.provide( Layer.mock(ProjectionSnapshotQuery)({ + getCommandReadModel: () => Effect.succeed(makeDefaultOrchestrationReadModel()), getSnapshot: () => Effect.succeed(makeDefaultOrchestrationReadModel()), getShellSnapshot: () => Effect.succeed({ @@ -576,6 +576,7 @@ const buildAppUnderTest = (options?: { threads: [], updatedAt: new Date(0).toISOString(), }), + getSnapshotSequence: () => Effect.succeed({ snapshotSequence: 0 }), getProjectShellById: () => Effect.succeed(Option.none()), getThreadShellById: () => Effect.succeed(Option.none()), getThreadDetailById: () => Effect.succeed(Option.none()), diff --git a/apps/server/src/serverRuntimeStartup.test.ts b/apps/server/src/serverRuntimeStartup.test.ts index 91b4b215c10..7f13693289c 100644 --- a/apps/server/src/serverRuntimeStartup.test.ts +++ b/apps/server/src/serverRuntimeStartup.test.ts @@ -77,8 +77,10 @@ it.effect("launchStartupHeartbeat does not block the caller while counts are loa yield* launchStartupHeartbeat.pipe( Effect.provideService(ProjectionSnapshotQuery, { + getCommandReadModel: () => Effect.die("unused"), getSnapshot: () => Effect.die("unused"), getShellSnapshot: () => Effect.die("unused"), + getSnapshotSequence: () => Effect.die("unused"), getCounts: () => Deferred.await(releaseCounts).pipe( Effect.as({ @@ -129,8 +131,10 @@ it.effect("resolveAutoBootstrapWelcomeTargets returns existing project and threa autoBootstrapProjectFromCwd: true, } as never), Effect.provideService(ProjectionSnapshotQuery, { + getCommandReadModel: () => Effect.die("unused"), getSnapshot: () => Effect.die("unused"), getShellSnapshot: () => Effect.die("unused"), + getSnapshotSequence: () => Effect.die("unused"), getCounts: () => Effect.die("unused"), getActiveProjectByWorkspaceRoot: () => Effect.succeed( @@ -152,7 +156,6 @@ it.effect("resolveAutoBootstrapWelcomeTargets returns existing project and threa getThreadDetailById: () => Effect.die("unused"), }), Effect.provideService(OrchestrationEngineService, { - getReadModel: () => Effect.die("unused"), readEvents: () => Stream.empty, dispatch: (command) => Ref.update(dispatchCalls, (calls) => [...calls, command.type]).pipe( @@ -180,8 +183,10 @@ it.effect("resolveAutoBootstrapWelcomeTargets creates a project and thread when autoBootstrapProjectFromCwd: true, } as never), Effect.provideService(ProjectionSnapshotQuery, { + getCommandReadModel: () => Effect.die("unused"), getSnapshot: () => Effect.die("unused"), getShellSnapshot: () => Effect.die("unused"), + getSnapshotSequence: () => Effect.die("unused"), getCounts: () => Effect.die("unused"), getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), getProjectShellById: () => Effect.die("unused"), @@ -191,7 +196,6 @@ it.effect("resolveAutoBootstrapWelcomeTargets creates a project and thread when getThreadDetailById: () => Effect.die("unused"), }), Effect.provideService(OrchestrationEngineService, { - getReadModel: () => Effect.die("unused"), readEvents: () => Stream.empty, dispatch: (command) => Ref.update(dispatchCalls, (calls) => [...calls, command.type]).pipe( diff --git a/apps/server/src/ws.ts b/apps/server/src/ws.ts index fbefe6eac62..592097b6d92 100644 --- a/apps/server/src/ws.ts +++ b/apps/server/src/ws.ts @@ -236,9 +236,13 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) => return Effect.gen(function* () { const workspaceRoot = event.payload.workspaceRoot ?? - (yield* orchestrationEngine.getReadModel()).projects.find( - (project) => project.id === event.payload.projectId, - )?.workspaceRoot ?? + Option.match( + yield* projectionSnapshotQuery.getProjectShellById(event.payload.projectId), + { + onNone: () => null, + onSome: (project) => project.workspaceRoot, + }, + ) ?? null; if (workspaceRoot === null) { return event; @@ -252,7 +256,7 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) => repositoryIdentity, }, } satisfies OrchestrationEvent; - }); + }).pipe(Effect.catch(() => Effect.succeed(event))); default: return Effect.succeed(event); } @@ -723,9 +727,16 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) => }), ), ), - orchestrationEngine - .getReadModel() - .pipe(Effect.map((readModel) => readModel.snapshotSequence)), + projectionSnapshotQuery.getSnapshotSequence().pipe( + Effect.map(({ snapshotSequence }) => snapshotSequence), + Effect.mapError( + (cause) => + new OrchestrationGetSnapshotError({ + message: "Failed to load orchestration snapshot sequence", + cause, + }), + ), + ), ]); if (Option.isNone(threadDetail)) { diff --git a/apps/web/src/environments/runtime/connection.test.ts b/apps/web/src/environments/runtime/connection.test.ts index 64ff7a1c3ca..c490795d233 100644 --- a/apps/web/src/environments/runtime/connection.test.ts +++ b/apps/web/src/environments/runtime/connection.test.ts @@ -22,14 +22,14 @@ function createTestClient() { environmentId: EnvironmentId.make("env-1"), }, })), - subscribeConfig: (listener: (event: any) => void) => { + subscribeConfig: vi.fn((listener: (event: any) => void) => { configListeners.add(listener); return () => configListeners.delete(listener); - }, - subscribeLifecycle: (listener: (event: any) => void) => { + }), + subscribeLifecycle: vi.fn((listener: (event: any) => void) => { lifecycleListeners.add(listener); return () => lifecycleListeners.delete(listener); - }, + }), subscribeAuthAccess: () => () => undefined, refreshProviders: vi.fn(async () => undefined), upsertKeybinding: vi.fn(async () => undefined), @@ -237,4 +237,33 @@ describe("createEnvironmentConnection", () => { await connection.dispose(); }); + + it("skips primary lifecycle/config subscriptions when no handlers are registered", async () => { + const environmentId = EnvironmentId.make("env-1"); + const { client } = createTestClient(); + + const connection = createEnvironmentConnection({ + kind: "primary", + knownEnvironment: { + id: "env-1", + label: "Local env", + source: "manual", + target: { + httpBaseUrl: "http://example.test", + wsBaseUrl: "ws://example.test", + }, + environmentId, + }, + client, + applyShellEvent: vi.fn(), + syncShellSnapshot: vi.fn(), + applyTerminalEvent: vi.fn(), + }); + + expect(client.server.subscribeLifecycle).not.toHaveBeenCalled(); + expect(client.server.subscribeConfig).not.toHaveBeenCalled(); + expect(client.orchestration.subscribeShell).toHaveBeenCalledOnce(); + + await connection.dispose(); + }); }); diff --git a/apps/web/src/environments/runtime/connection.ts b/apps/web/src/environments/runtime/connection.ts index 9f3465dfefb..beacf6144da 100644 --- a/apps/web/src/environments/runtime/connection.ts +++ b/apps/web/src/environments/runtime/connection.ts @@ -83,6 +83,8 @@ export function createEnvironmentConnection( let disposed = false; const bootstrapGate = createBootstrapGate(); + const shouldObserveLifecycle = input.kind === "saved" || input.onWelcome !== undefined; + const shouldObserveConfig = input.kind === "saved" || input.onConfigSnapshot !== undefined; const observeEnvironmentIdentity = (nextEnvironmentId: EnvironmentId, source: string) => { if (environmentId !== nextEnvironmentId) { @@ -92,28 +94,35 @@ export function createEnvironmentConnection( } }; - const unsubLifecycle = input.client.server.subscribeLifecycle( - (event: Parameters[0]>[0]) => { - if (event.type !== "welcome") { - return; - } - observeEnvironmentIdentity( - event.payload.environment.environmentId, - "server lifecycle welcome", - ); - input.onWelcome?.(event.payload); - }, - ); - - const unsubConfig = input.client.server.subscribeConfig( - (event: Parameters[0]>[0]) => { - if (event.type !== "snapshot") { - return; - } - observeEnvironmentIdentity(event.config.environment.environmentId, "server config snapshot"); - input.onConfigSnapshot?.(event.config); - }, - ); + const unsubLifecycle = shouldObserveLifecycle + ? input.client.server.subscribeLifecycle( + (event: Parameters[0]>[0]) => { + if (event.type !== "welcome") { + return; + } + observeEnvironmentIdentity( + event.payload.environment.environmentId, + "server lifecycle welcome", + ); + input.onWelcome?.(event.payload); + }, + ) + : () => undefined; + + const unsubConfig = shouldObserveConfig + ? input.client.server.subscribeConfig( + (event: Parameters[0]>[0]) => { + if (event.type !== "snapshot") { + return; + } + observeEnvironmentIdentity( + event.config.environment.environmentId, + "server config snapshot", + ); + input.onConfigSnapshot?.(event.config); + }, + ) + : () => undefined; const unsubShell = input.client.orchestration.subscribeShell( (item: Parameters[0]>[0]) => { diff --git a/apps/web/src/environments/runtime/service.savedEnvironments.test.ts b/apps/web/src/environments/runtime/service.savedEnvironments.test.ts new file mode 100644 index 00000000000..b1d59bb49e1 --- /dev/null +++ b/apps/web/src/environments/runtime/service.savedEnvironments.test.ts @@ -0,0 +1,326 @@ +import { QueryClient } from "@tanstack/react-query"; +import { EnvironmentId } from "@t3tools/contracts"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +const mockCreateEnvironmentConnection = vi.fn(); +const mockCreateWsRpcClient = vi.fn(); +const mockFetchRemoteSessionState = vi.fn(); +const mockWaitForSavedEnvironmentRegistryHydration = vi.fn(); +const mockListSavedEnvironmentRecords = vi.fn(); +const mockSavedEnvironmentRegistrySubscribe = vi.fn(); +const mockReadSavedEnvironmentBearerToken = vi.fn(); +const mockGetSavedEnvironmentRecord = vi.fn(); + +function MockWsTransport() { + return undefined; +} + +vi.mock("../primary", () => ({ + getPrimaryKnownEnvironment: vi.fn(() => ({ + id: "env-1", + label: "Primary environment", + source: "window-origin", + target: { + httpBaseUrl: "http://127.0.0.1:3000/", + wsBaseUrl: "ws://127.0.0.1:3000/", + }, + environmentId: EnvironmentId.make("env-1"), + })), +})); + +vi.mock("../remote/api", () => ({ + bootstrapRemoteBearerSession: vi.fn(), + fetchRemoteEnvironmentDescriptor: vi.fn(), + fetchRemoteSessionState: mockFetchRemoteSessionState, + resolveRemoteWebSocketConnectionUrl: vi.fn(() => "ws://remote.example.test"), +})); + +vi.mock("./catalog", () => ({ + getSavedEnvironmentRecord: mockGetSavedEnvironmentRecord, + hasSavedEnvironmentRegistryHydrated: vi.fn(() => true), + listSavedEnvironmentRecords: mockListSavedEnvironmentRecords, + persistSavedEnvironmentRecord: vi.fn(), + readSavedEnvironmentBearerToken: mockReadSavedEnvironmentBearerToken, + removeSavedEnvironmentBearerToken: vi.fn(), + useSavedEnvironmentRegistryStore: { + subscribe: mockSavedEnvironmentRegistrySubscribe, + getState: () => ({ + upsert: vi.fn(), + remove: vi.fn(), + markConnected: vi.fn(), + rename: vi.fn(), + }), + }, + useSavedEnvironmentRuntimeStore: { + getState: () => ({ + ensure: vi.fn(), + patch: vi.fn(), + clear: vi.fn(), + }), + }, + waitForSavedEnvironmentRegistryHydration: mockWaitForSavedEnvironmentRegistryHydration, + writeSavedEnvironmentBearerToken: vi.fn(), +})); + +vi.mock("./connection", () => ({ + createEnvironmentConnection: mockCreateEnvironmentConnection, +})); + +vi.mock("../../rpc/wsRpcClient", () => ({ + createWsRpcClient: mockCreateWsRpcClient, +})); + +vi.mock("../../rpc/wsTransport", () => ({ + WsTransport: MockWsTransport, +})); + +vi.mock("~/composerDraftStore", () => ({ + markPromotedDraftThreadByRef: vi.fn(), + markPromotedDraftThreadsByRef: vi.fn(), + useComposerDraftStore: { + getState: () => ({ + getDraftThreadByRef: vi.fn(() => null), + clearDraftThread: vi.fn(), + }), + }, +})); + +vi.mock("~/localApi", () => ({ + ensureLocalApi: vi.fn(() => ({ + persistence: { + setSavedEnvironmentRegistry: vi.fn(async () => undefined), + }, + })), +})); + +vi.mock("~/lib/terminalStateCleanup", () => ({ + collectActiveTerminalThreadIds: vi.fn(() => []), +})); + +vi.mock("~/orchestrationEventEffects", () => ({ + deriveOrchestrationBatchEffects: vi.fn(() => ({ + promotedThreadRefs: [], + invalidatedProviderState: false, + })), +})); + +vi.mock("~/lib/projectReactQuery", () => ({ + projectQueryKeys: { + all: ["projects"], + }, +})); + +vi.mock("~/lib/providerReactQuery", () => ({ + providerQueryKeys: { + all: ["providers"], + }, +})); + +vi.mock("~/store", () => ({ + useStore: { + getState: () => ({ + syncServerShellSnapshot: vi.fn(), + syncServerThreadDetail: vi.fn(), + removeServerThreadDetail: vi.fn(), + applyServerShellEvent: vi.fn(), + }), + }, + selectProjectsAcrossEnvironments: vi.fn(() => []), + selectSidebarThreadSummaryByRef: vi.fn(() => null), + selectThreadByRef: vi.fn(() => null), + selectThreadsAcrossEnvironments: vi.fn(() => []), +})); + +vi.mock("~/terminalStateStore", () => ({ + useTerminalStateStore: { + getState: () => ({ + applyTerminalEvent: vi.fn(), + removeTerminalState: vi.fn(), + clearTerminalSelection: vi.fn(), + }), + }, +})); + +vi.mock("~/uiStateStore", () => ({ + useUiStateStore: { + getState: () => ({ + clearThreadUi: vi.fn(), + syncPromotedDraftThreadRefs: vi.fn(), + }), + }, +})); + +const savedRecord = { + environmentId: EnvironmentId.make("env-saved"), + label: "Remote environment", + httpBaseUrl: "https://remote.example.test/", + wsBaseUrl: "wss://remote.example.test/", +}; + +const configSnapshot = { + environment: { + environmentId: savedRecord.environmentId, + label: "Remote environment", + }, +}; + +function createClient() { + return { + dispose: vi.fn(async () => undefined), + reconnect: vi.fn(async () => undefined), + server: { + getConfig: vi.fn(async () => configSnapshot), + subscribeConfig: vi.fn(() => () => undefined), + subscribeLifecycle: vi.fn(() => () => undefined), + subscribeAuthAccess: vi.fn(() => () => undefined), + refreshProviders: vi.fn(async () => undefined), + upsertKeybinding: vi.fn(async () => undefined), + getSettings: vi.fn(async () => undefined), + updateSettings: vi.fn(async () => undefined), + }, + orchestration: { + subscribeShell: vi.fn(() => () => undefined), + subscribeThread: vi.fn(() => () => undefined), + dispatchCommand: vi.fn(async () => undefined), + getTurnDiff: vi.fn(async () => undefined), + getFullThreadDiff: vi.fn(async () => undefined), + }, + terminal: { + open: vi.fn(async () => undefined), + write: vi.fn(async () => undefined), + resize: vi.fn(async () => undefined), + clear: vi.fn(async () => undefined), + restart: vi.fn(async () => undefined), + close: vi.fn(async () => undefined), + onEvent: vi.fn(() => () => undefined), + }, + projects: { + searchEntries: vi.fn(async () => []), + writeFile: vi.fn(async () => undefined), + }, + shell: { + openInEditor: vi.fn(async () => undefined), + }, + git: { + pull: vi.fn(async () => undefined), + refreshStatus: vi.fn(async () => undefined), + onStatus: vi.fn(() => () => undefined), + runStackedAction: vi.fn(async () => ({})), + listBranches: vi.fn(async () => []), + createWorktree: vi.fn(async () => undefined), + removeWorktree: vi.fn(async () => undefined), + createBranch: vi.fn(async () => undefined), + checkout: vi.fn(async () => undefined), + init: vi.fn(async () => undefined), + resolvePullRequest: vi.fn(async () => undefined), + preparePullRequestThread: vi.fn(async () => undefined), + }, + }; +} + +describe("saved environment startup", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.resetModules(); + vi.clearAllMocks(); + + mockFetchRemoteSessionState.mockResolvedValue({ + authenticated: true, + role: "owner", + }); + mockGetSavedEnvironmentRecord.mockImplementation((environmentId: EnvironmentId) => + environmentId === savedRecord.environmentId ? savedRecord : null, + ); + mockListSavedEnvironmentRecords.mockReturnValue([savedRecord]); + mockSavedEnvironmentRegistrySubscribe.mockReturnValue(() => undefined); + mockWaitForSavedEnvironmentRegistryHydration.mockResolvedValue(undefined); + mockReadSavedEnvironmentBearerToken.mockResolvedValue("saved-bearer-token"); + mockCreateWsRpcClient.mockImplementation(() => createClient()); + mockCreateEnvironmentConnection.mockImplementation((input) => { + if (input.kind === "saved") { + queueMicrotask(() => { + input.onConfigSnapshot?.(configSnapshot); + }); + } + + return { + kind: input.kind, + environmentId: input.knownEnvironment.environmentId, + knownEnvironment: input.knownEnvironment, + client: input.client, + ensureBootstrapped: vi.fn(async () => undefined), + reconnect: vi.fn(async () => undefined), + dispose: vi.fn(async () => undefined), + }; + }); + }); + + afterEach(async () => { + const { resetEnvironmentServiceForTests } = await import("./service"); + await resetEnvironmentServiceForTests(); + vi.useRealTimers(); + }); + + it("uses the initial config snapshot instead of issuing an extra getConfig call", async () => { + const { startEnvironmentConnectionService, resetEnvironmentServiceForTests } = + await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + await vi.runAllTimersAsync(); + + const savedConnectionCall = mockCreateEnvironmentConnection.mock.calls.find( + ([input]) => input.kind === "saved", + ); + expect(savedConnectionCall).toBeDefined(); + + const savedClient = savedConnectionCall?.[0]?.client; + expect(savedClient.server.getConfig).not.toHaveBeenCalled(); + expect(mockFetchRemoteSessionState).toHaveBeenCalledTimes(1); + + stop(); + await resetEnvironmentServiceForTests(); + }); + + it("coalesces hydration and registry sync so the initial saved connection only starts once", async () => { + let finishHydration!: () => void; + let finishTokenRead!: (token: string) => void; + + mockWaitForSavedEnvironmentRegistryHydration.mockImplementation( + () => + new Promise((resolve) => { + finishHydration = () => resolve(); + }), + ); + mockReadSavedEnvironmentBearerToken.mockImplementation( + () => + new Promise((resolve) => { + finishTokenRead = resolve; + }), + ); + + const { startEnvironmentConnectionService, resetEnvironmentServiceForTests } = + await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + const registryListener = mockSavedEnvironmentRegistrySubscribe.mock.calls[0]?.[0]; + expect(registryListener).toBeTypeOf("function"); + + registryListener?.(); + finishHydration(); + await vi.waitFor(() => { + expect(mockReadSavedEnvironmentBearerToken).toHaveBeenCalledTimes(1); + }); + + finishTokenRead("saved-bearer-token"); + await vi.runAllTimersAsync(); + + const savedConnectionCalls = mockCreateEnvironmentConnection.mock.calls.filter( + ([input]) => input.kind === "saved", + ); + expect(savedConnectionCalls).toHaveLength(1); + expect(mockFetchRemoteSessionState).toHaveBeenCalledTimes(1); + + stop(); + await resetEnvironmentServiceForTests(); + }); +}); diff --git a/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts b/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts index ea96d6c3219..82964447d7a 100644 --- a/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts +++ b/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts @@ -369,7 +369,9 @@ describe("retainThreadDetailSubscription", () => { listEnvironmentConnections().some((connection) => connection.environmentId === environmentId), ).toBe(false); - await reconnectSavedEnvironment(environmentId); + const reconnectPromise = reconnectSavedEnvironment(environmentId); + await vi.advanceTimersByTimeAsync(200); + await reconnectPromise; await vi.waitFor(() => { expect(mockCreateEnvironmentConnection).toHaveBeenCalledTimes(3); expect(mockSubscribeThread).toHaveBeenCalledTimes(2); diff --git a/apps/web/src/environments/runtime/service.ts b/apps/web/src/environments/runtime/service.ts index 14562e3c492..60c05fc217c 100644 --- a/apps/web/src/environments/runtime/service.ts +++ b/apps/web/src/environments/runtime/service.ts @@ -139,9 +139,70 @@ let lastBrowserResumeReconnectAt = Number.NEGATIVE_INFINITY; const THREAD_DETAIL_SUBSCRIPTION_IDLE_EVICTION_MS = 15 * 60 * 1000; const MAX_CACHED_THREAD_DETAIL_SUBSCRIPTIONS = 32; const BROWSER_RESUME_RECONNECT_COOLDOWN_MS = 2_000; +const INITIAL_SERVER_CONFIG_SNAPSHOT_WAIT_MS = 150; const NOOP = () => undefined; const SSH_HTTP_STATUS_RE = /^\[ssh_http:(\d+)\]\s/u; +function createDeferredPromise() { + let resolve: ((value: T) => void) | null = null; + const promise = new Promise((nextResolve) => { + resolve = nextResolve; + }); + + return { + promise, + resolve: (value: T) => { + resolve?.(value); + resolve = null; + }, + }; +} + +async function waitForConfigSnapshot( + promise: Promise, + timeoutMs: number, +): Promise { + return await new Promise((resolve) => { + const timeoutId = globalThis.setTimeout(() => resolve(null), timeoutMs); + promise.then( + (config) => { + clearTimeout(timeoutId); + resolve(config); + }, + () => { + clearTimeout(timeoutId); + resolve(null); + }, + ); + }); +} + +function createSavedEnvironmentSyncScheduler() { + let activeSync: Promise | null = null; + let queued = false; + + const run = async (): Promise => { + do { + queued = false; + await syncSavedEnvironmentConnections(listSavedEnvironmentRecords()); + } while (queued); + }; + + return () => { + if (activeSync) { + queued = true; + return activeSync; + } + + activeSync = run() + .catch(() => undefined) + .finally(() => { + activeSync = null; + }); + + return activeSync; + }; +} function compareAppliedProjectionVersion( left: { readonly sequence: number; readonly updatedAt: string | null }, right: { readonly sequence: number; readonly updatedAt: string | null }, @@ -233,7 +294,6 @@ function markAppliedProjectionEvent(environmentId: EnvironmentId, sequence: numb updatedAt: currentVersion?.updatedAt ?? null, }); } - function getThreadDetailSubscriptionKey(environmentId: EnvironmentId, threadId: ThreadId): string { return scopedThreadKey(scopeThreadRef(environmentId, threadId)); } @@ -1284,6 +1344,7 @@ async function ensureSavedEnvironmentConnection( const client = options?.client ?? createSavedEnvironmentClient(activeRecord.environmentId, activeBearerToken); + const initialConfigSnapshot = createDeferredPromise(); const knownEnvironment = createKnownEnvironment({ id: activeRecord.environmentId, label: activeRecord.label, @@ -1308,6 +1369,7 @@ async function ensureSavedEnvironmentConnection( ); }, onConfigSnapshot: (config) => { + initialConfigSnapshot.resolve(config); useSavedEnvironmentRuntimeStore.getState().patch(activeRecord.environmentId, { descriptor: config.environment, serverConfig: config, @@ -1323,12 +1385,18 @@ async function ensureSavedEnvironmentConnection( try { try { + const initialServerConfig = + options?.serverConfig ?? + (await waitForConfigSnapshot( + initialConfigSnapshot.promise, + INITIAL_SERVER_CONFIG_SNAPSHOT_WAIT_MS, + )); await refreshSavedEnvironmentMetadata( activeRecord.environmentId, activeBearerToken, client, roleHint, - options?.serverConfig ?? null, + initialServerConfig, ); } catch (error) { const isAuthError = activeRecord.desktopSsh @@ -1699,6 +1767,7 @@ export function startEnvironmentConnectionService(queryClient: QueryClient): () trailing: true, }, ); + const requestSavedEnvironmentSync = createSavedEnvironmentSyncScheduler(); maybeCreatePrimaryEnvironmentConnection(); @@ -1706,11 +1775,11 @@ export function startEnvironmentConnectionService(queryClient: QueryClient): () if (!hasSavedEnvironmentRegistryHydrated()) { return; } - void syncSavedEnvironmentConnections(listSavedEnvironmentRecords()); + void requestSavedEnvironmentSync(); }); void waitForSavedEnvironmentRegistryHydration() - .then(() => syncSavedEnvironmentConnections(listSavedEnvironmentRecords())) + .then(() => requestSavedEnvironmentSync()) .catch(() => undefined); const unsubscribeBrowserResumeReconnects = subscribeBrowserResumeReconnects();