diff --git a/apps/web/src/components/Sidebar.logic.test.ts b/apps/web/src/components/Sidebar.logic.test.ts index 35534af9f38..96bdfcb634f 100644 --- a/apps/web/src/components/Sidebar.logic.test.ts +++ b/apps/web/src/components/Sidebar.logic.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { createThreadJumpHintVisibilityController, + getSidebarThreadIdsToPrewarm, getVisibleSidebarThreadIds, resolveAdjacentThreadId, getFallbackThreadIdAfterDelete, @@ -121,6 +122,20 @@ describe("createThreadJumpHintVisibilityController", () => { }); }); +describe("getSidebarThreadIdsToPrewarm", () => { + it("returns only the first visible thread ids up to the prewarm limit", () => { + expect(getSidebarThreadIdsToPrewarm(["t1", "t2", "t3"], 2)).toEqual(["t1", "t2"]); + }); + + it("returns all visible thread ids when they fit within the limit", () => { + expect(getSidebarThreadIdsToPrewarm(["t1", "t2"], 10)).toEqual(["t1", "t2"]); + }); + + it("returns no thread ids when the limit is zero", () => { + expect(getSidebarThreadIdsToPrewarm(["t1", "t2"], 0)).toEqual([]); + }); +}); + describe("shouldClearThreadSelectionOnMouseDown", () => { it("preserves selection for thread items", () => { const child = { diff --git a/apps/web/src/components/Sidebar.logic.ts b/apps/web/src/components/Sidebar.logic.ts index 8c742cbe9b9..eef3dd35c1e 100644 --- a/apps/web/src/components/Sidebar.logic.ts +++ b/apps/web/src/components/Sidebar.logic.ts @@ -12,6 +12,9 @@ import { isLatestTurnSettled } from "../session-logic"; export const THREAD_SELECTION_SAFE_SELECTOR = "[data-thread-item], [data-thread-selection-safe]"; export const THREAD_JUMP_HINT_SHOW_DELAY_MS = 100; +// Visible sidebar rows are prewarmed into the thread-detail cache so opening a +// nearby thread usually reuses an already-hot subscription. +export const SIDEBAR_THREAD_PREWARM_LIMIT = 10; export type SidebarNewThreadEnvMode = "local" | "worktree"; type SidebarProject = { id: string; @@ -243,6 +246,13 @@ export function getVisibleSidebarThreadIds( ); } +export function getSidebarThreadIdsToPrewarm( + visibleThreadIds: readonly TThreadId[], + limit = SIDEBAR_THREAD_PREWARM_LIMIT, +): TThreadId[] { + return visibleThreadIds.slice(0, Math.max(0, limit)); +} + export function resolveAdjacentThreadId(input: { threadIds: readonly T[]; currentThreadId: T | null; diff --git a/apps/web/src/components/Sidebar.tsx b/apps/web/src/components/Sidebar.tsx index 8343d34ebec..dc2da70e208 100644 --- a/apps/web/src/components/Sidebar.tsx +++ b/apps/web/src/components/Sidebar.tsx @@ -43,6 +43,7 @@ import { type GitStatusResult, } from "@t3tools/contracts"; import { + parseScopedThreadKey, scopedProjectKey, scopedThreadKey, scopeProjectRef, @@ -81,6 +82,7 @@ import { useGitStatus } from "../lib/gitStatusState"; import { readLocalApi } from "../localApi"; import { useComposerDraftStore } from "../composerDraftStore"; import { useNewThreadHandler } from "../hooks/useHandleNewThread"; +import { retainThreadDetailSubscription } from "../environments/runtime/service"; import { useThreadActions } from "../hooks/useThreadActions"; import { @@ -122,6 +124,7 @@ import { import { useThreadSelectionStore } from "../threadSelectionStore"; import { isNonEmpty as isNonEmptyString } from "effect/String"; import { + getSidebarThreadIdsToPrewarm, resolveAdjacentThreadId, isContextMenuPointerDown, resolveProjectStatusIndicator, @@ -2878,6 +2881,30 @@ export default function Sidebar() { ? threadJumpLabelByKey : EMPTY_THREAD_JUMP_LABELS; const orderedSidebarThreadKeys = visibleSidebarThreadKeys; + const prewarmedSidebarThreadKeys = useMemo( + () => getSidebarThreadIdsToPrewarm(visibleSidebarThreadKeys), + [visibleSidebarThreadKeys], + ); + const prewarmedSidebarThreadRefs = useMemo( + () => + prewarmedSidebarThreadKeys.flatMap((threadKey) => { + const ref = parseScopedThreadKey(threadKey); + return ref ? [ref] : []; + }), + [prewarmedSidebarThreadKeys], + ); + + useEffect(() => { + const releases = prewarmedSidebarThreadRefs.map((ref) => + retainThreadDetailSubscription(ref.environmentId, ref.threadId), + ); + + return () => { + for (const release of releases) { + release(); + } + }; + }, [prewarmedSidebarThreadRefs]); useEffect(() => { const clearThreadJumpHints = () => { diff --git a/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts b/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts index a5ebc36ca53..5f169adc9b4 100644 --- a/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts +++ b/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts @@ -1,5 +1,11 @@ import { QueryClient } from "@tanstack/react-query"; -import { EnvironmentId, ThreadId } from "@t3tools/contracts"; +import { + EnvironmentId, + ProjectId, + ThreadId, + TurnId, + type OrchestrationShellSnapshot, +} from "@t3tools/contracts"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; const mockSubscribeThread = vi.fn(); @@ -65,6 +71,74 @@ vi.mock("../../rpc/wsTransport", () => ({ WsTransport: MockWsTransport, })); +function makeThreadShellSnapshot(params: { + readonly threadId: ThreadId; + readonly sessionStatus?: + | "idle" + | "starting" + | "running" + | "ready" + | "interrupted" + | "stopped" + | "error"; + readonly hasPendingApprovals?: boolean; + readonly hasPendingUserInput?: boolean; + readonly hasActionableProposedPlan?: boolean; +}): OrchestrationShellSnapshot { + const projectId = ProjectId.make("project-1"); + const turnId = TurnId.make("turn-1"); + + return { + snapshotSequence: 1, + projects: [], + updatedAt: "2026-04-13T00:00:00.000Z", + threads: [ + { + id: params.threadId, + projectId, + title: "Thread", + modelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + branch: null, + worktreePath: null, + latestTurn: + params.sessionStatus === "running" + ? { + turnId, + state: "running", + requestedAt: "2026-04-13T00:00:00.000Z", + startedAt: "2026-04-13T00:00:01.000Z", + completedAt: null, + assistantMessageId: null, + } + : null, + createdAt: "2026-04-13T00:00:00.000Z", + updatedAt: "2026-04-13T00:00:00.000Z", + archivedAt: null, + session: params.sessionStatus + ? { + threadId: params.threadId, + status: params.sessionStatus, + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: params.sessionStatus === "running" ? turnId : null, + lastError: null, + updatedAt: "2026-04-13T00:00:00.000Z", + } + : null, + latestUserMessageAt: null, + hasPendingApprovals: params.hasPendingApprovals ?? false, + hasPendingUserInput: params.hasPendingUserInput ?? false, + hasActionableProposedPlan: params.hasActionableProposedPlan ?? false, + }, + ], + }; +} + describe("retainThreadDetailSubscription", () => { beforeEach(() => { vi.useFakeTimers(); @@ -119,16 +193,89 @@ describe("retainThreadDetailSubscription", () => { expect(mockSubscribeThread).toHaveBeenCalledTimes(1); releaseSecond(); - await vi.advanceTimersByTimeAsync(2 * 60 * 1000 - 1); + await vi.advanceTimersByTimeAsync(2 * 60 * 1000); expect(mockThreadUnsubscribe).not.toHaveBeenCalled(); - await vi.advanceTimersByTimeAsync(1); + await vi.advanceTimersByTimeAsync(28 * 60 * 1000); expect(mockThreadUnsubscribe).toHaveBeenCalledTimes(1); stop(); await resetEnvironmentServiceForTests(); }); + it("keeps non-idle thread detail subscriptions attached until the thread becomes idle", async () => { + const { + retainThreadDetailSubscription, + startEnvironmentConnectionService, + resetEnvironmentServiceForTests, + } = await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + const environmentId = EnvironmentId.make("env-1"); + const threadId = ThreadId.make("thread-active"); + + const connectionInput = mockCreateEnvironmentConnection.mock.calls[0]?.[0]; + expect(connectionInput).toBeDefined(); + + connectionInput.syncShellSnapshot( + makeThreadShellSnapshot({ + threadId, + sessionStatus: "ready", + hasPendingApprovals: true, + }), + environmentId, + ); + + const release = retainThreadDetailSubscription(environmentId, threadId); + expect(mockSubscribeThread).toHaveBeenCalledTimes(1); + + release(); + await vi.advanceTimersByTimeAsync(30 * 60 * 1000); + expect(mockThreadUnsubscribe).not.toHaveBeenCalled(); + + connectionInput.applyShellEvent( + { + kind: "thread-upserted", + sequence: 2, + thread: makeThreadShellSnapshot({ + threadId, + sessionStatus: "idle", + }).threads[0]!, + }, + environmentId, + ); + + await vi.advanceTimersByTimeAsync(30 * 60 * 1000); + expect(mockThreadUnsubscribe).toHaveBeenCalledTimes(1); + + stop(); + await resetEnvironmentServiceForTests(); + }); + + it("allows a larger idle cache before capacity eviction starts", async () => { + const { + retainThreadDetailSubscription, + startEnvironmentConnectionService, + resetEnvironmentServiceForTests, + } = await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + const environmentId = EnvironmentId.make("env-1"); + + for (let index = 0; index < 12; index += 1) { + const release = retainThreadDetailSubscription( + environmentId, + ThreadId.make(`thread-${index + 1}`), + ); + release(); + } + + expect(mockThreadUnsubscribe).not.toHaveBeenCalled(); + + stop(); + await resetEnvironmentServiceForTests(); + }); + it("disposes cached thread detail subscriptions when the environment service resets", async () => { const { retainThreadDetailSubscription, diff --git a/apps/web/src/environments/runtime/service.ts b/apps/web/src/environments/runtime/service.ts index 0e72607d9af..086bff6b377 100644 --- a/apps/web/src/environments/runtime/service.ts +++ b/apps/web/src/environments/runtime/service.ts @@ -54,6 +54,7 @@ import { createEnvironmentConnection, type EnvironmentConnection } from "./conne import { useStore, selectProjectsAcrossEnvironments, + selectSidebarThreadSummaryByRef, selectThreadByRef, selectThreadsAcrossEnvironments, } from "~/store"; @@ -86,8 +87,15 @@ const threadDetailSubscriptions = new Map let activeService: EnvironmentServiceState | null = null; let needsProviderInvalidation = false; -const THREAD_DETAIL_SUBSCRIPTION_IDLE_EVICTION_MS = 2 * 60 * 1000; -const MAX_CACHED_THREAD_DETAIL_SUBSCRIPTIONS = 8; +// Thread detail subscription cache policy: +// - Active consumers keep a subscription retained via refCount. +// - Released subscriptions stay warm for a longer idle TTL to avoid churn +// while moving around the UI. +// - Threads with active work or pending user action are sticky and are never +// evicted while they remain non-idle. +// - Capacity eviction only targets idle cached subscriptions. +const THREAD_DETAIL_SUBSCRIPTION_IDLE_EVICTION_MS = 15 * 60 * 1000; +const MAX_CACHED_THREAD_DETAIL_SUBSCRIPTIONS = 32; const NOOP = () => undefined; function getThreadDetailSubscriptionKey(environmentId: EnvironmentId, threadId: ThreadId): string { @@ -104,6 +112,55 @@ function clearThreadDetailSubscriptionEviction( return entry; } +function isNonIdleThreadDetailSubscription(entry: ThreadDetailSubscriptionEntry): boolean { + const threadRef = scopeThreadRef(entry.environmentId, entry.threadId); + const state = useStore.getState(); + const sidebarThread = selectSidebarThreadSummaryByRef(state, threadRef); + + // Prefer shell/sidebar state first because it carries the coarse thread + // readiness flags used throughout the UI (pending approvals/input/plan). + if (sidebarThread) { + if ( + sidebarThread.hasPendingApprovals || + sidebarThread.hasPendingUserInput || + sidebarThread.hasActionableProposedPlan + ) { + return true; + } + + const orchestrationStatus = sidebarThread.session?.orchestrationStatus; + if ( + orchestrationStatus && + orchestrationStatus !== "idle" && + orchestrationStatus !== "stopped" + ) { + return true; + } + + if (sidebarThread.latestTurn?.state === "running") { + return true; + } + } + + const thread = selectThreadByRef(state, threadRef); + if (!thread) { + return false; + } + + const orchestrationStatus = thread.session?.orchestrationStatus; + return ( + Boolean( + orchestrationStatus && orchestrationStatus !== "idle" && orchestrationStatus !== "stopped", + ) || + thread.latestTurn?.state === "running" || + thread.pendingSourceProposedPlan !== undefined + ); +} + +function shouldEvictThreadDetailSubscription(entry: ThreadDetailSubscriptionEntry): boolean { + return entry.refCount === 0 && !isNonIdleThreadDetailSubscription(entry); +} + function attachThreadDetailSubscription(entry: ThreadDetailSubscriptionEntry): boolean { if (entry.unsubscribeConnectionListener !== null) { entry.unsubscribeConnectionListener(); @@ -181,11 +238,20 @@ function reconcileThreadDetailSubscriptionsForEnvironment( function scheduleThreadDetailSubscriptionEviction(entry: ThreadDetailSubscriptionEntry): void { clearThreadDetailSubscriptionEviction(entry); + if (!shouldEvictThreadDetailSubscription(entry)) { + return; + } + entry.evictionTimeoutId = setTimeout(() => { const currentEntry = threadDetailSubscriptions.get( getThreadDetailSubscriptionKey(entry.environmentId, entry.threadId), ); - if (!currentEntry || currentEntry.refCount > 0) { + if (!currentEntry) { + return; + } + + currentEntry.evictionTimeoutId = null; + if (!shouldEvictThreadDetailSubscription(currentEntry)) { return; } disposeThreadDetailSubscriptionByKey( @@ -200,7 +266,7 @@ function evictIdleThreadDetailSubscriptionsToCapacity(): void { } const idleEntries = [...threadDetailSubscriptions.entries()] - .filter(([, entry]) => entry.refCount === 0) + .filter(([, entry]) => shouldEvictThreadDetailSubscription(entry)) .toSorted(([, left], [, right]) => left.lastAccessedAt - right.lastAccessedAt); for (const [key] of idleEntries) { @@ -211,6 +277,42 @@ function evictIdleThreadDetailSubscriptionsToCapacity(): void { } } +function reconcileThreadDetailSubscriptionEvictionState( + entry: ThreadDetailSubscriptionEntry, +): void { + clearThreadDetailSubscriptionEviction(entry); + if (!shouldEvictThreadDetailSubscription(entry)) { + return; + } + + scheduleThreadDetailSubscriptionEviction(entry); +} + +function reconcileThreadDetailSubscriptionEvictionForThread( + environmentId: EnvironmentId, + threadId: ThreadId, +): void { + const entry = threadDetailSubscriptions.get( + getThreadDetailSubscriptionKey(environmentId, threadId), + ); + if (!entry) { + return; + } + + reconcileThreadDetailSubscriptionEvictionState(entry); +} + +function reconcileThreadDetailSubscriptionEvictionForEnvironment( + environmentId: EnvironmentId, +): void { + for (const entry of threadDetailSubscriptions.values()) { + if (entry.environmentId === environmentId) { + reconcileThreadDetailSubscriptionEvictionState(entry); + } + } + evictIdleThreadDetailSubscriptionsToCapacity(); +} + export function retainThreadDetailSubscription( environmentId: EnvironmentId, threadId: ThreadId, @@ -233,7 +335,7 @@ export function retainThreadDetailSubscription( existing.refCount = Math.max(0, existing.refCount - 1); existing.lastAccessedAt = Date.now(); if (existing.refCount === 0) { - scheduleThreadDetailSubscriptionEviction(existing); + reconcileThreadDetailSubscriptionEvictionState(existing); evictIdleThreadDetailSubscriptionsToCapacity(); } }; @@ -263,7 +365,7 @@ export function retainThreadDetailSubscription( entry.refCount = Math.max(0, entry.refCount - 1); entry.lastAccessedAt = Date.now(); if (entry.refCount === 0) { - scheduleThreadDetailSubscriptionEviction(entry); + reconcileThreadDetailSubscriptionEvictionState(entry); evictIdleThreadDetailSubscriptionsToCapacity(); } }; @@ -473,6 +575,8 @@ function applyRecoveredEventBatch( for (const threadId of batchEffects.removeTerminalStateThreadIds) { useTerminalStateStore.getState().removeTerminalState(scopeThreadRef(environmentId, threadId)); } + + reconcileThreadDetailSubscriptionEvictionForEnvironment(environmentId); } export function applyEnvironmentThreadDetailEvent( @@ -507,6 +611,8 @@ function applyShellEvent(event: OrchestrationShellStreamEvent, environmentId: En if (previousThread?.archivedAt === null && event.thread.archivedAt !== null && threadRef) { useTerminalStateStore.getState().removeTerminalState(threadRef); } + reconcileThreadDetailSubscriptionEvictionForThread(environmentId, event.thread.id); + evictIdleThreadDetailSubscriptionsToCapacity(); return; case "thread-removed": if (threadRef) { @@ -529,6 +635,7 @@ function createEnvironmentConnectionHandlers() { environmentId, snapshot.threads.map((thread) => thread.id), ); + reconcileThreadDetailSubscriptionEvictionForEnvironment(environmentId); reconcileSnapshotDerivedState(); }, applyTerminalEvent: (event: TerminalEvent, environmentId: EnvironmentId) => {