From ea549fad29093917af509232e1c41d044adffb5d Mon Sep 17 00:00:00 2001 From: zerob13 Date: Sat, 21 Mar 2026 09:30:07 +0800 Subject: [PATCH 1/7] feat(deepchat): add queue steer lane --- src/main/events.ts | 3 +- .../presenter/deepchatAgentPresenter/index.ts | 243 ++++++++++++++- .../pendingInputCoordinator.ts | 159 ++++++++++ .../pendingInputStore.ts | 278 ++++++++++++++++++ src/main/presenter/newAgentPresenter/index.ts | 125 +++++++- src/main/presenter/sqlitePresenter/index.ts | 5 + .../tables/deepchatPendingInputs.ts | 218 ++++++++++++++ .../src/components/chat/ChatInputToolbar.vue | 45 +-- .../src/components/chat/PendingInputLane.vue | 263 +++++++++++++++++ src/renderer/src/events.ts | 3 +- src/renderer/src/i18n/da-DK/chat.json | 14 + src/renderer/src/i18n/en-US/chat.json | 14 + src/renderer/src/i18n/fa-IR/chat.json | 14 + src/renderer/src/i18n/fr-FR/chat.json | 14 + src/renderer/src/i18n/he-IL/chat.json | 14 + src/renderer/src/i18n/ja-JP/chat.json | 14 + src/renderer/src/i18n/ko-KR/chat.json | 14 + src/renderer/src/i18n/pt-BR/chat.json | 14 + src/renderer/src/i18n/ru-RU/chat.json | 14 + src/renderer/src/i18n/zh-CN/chat.json | 14 + src/renderer/src/i18n/zh-HK/chat.json | 14 + src/renderer/src/i18n/zh-TW/chat.json | 14 + src/renderer/src/pages/ChatPage.vue | 85 +++++- src/renderer/src/stores/ui/pendingInput.ts | 153 ++++++++++ src/renderer/src/stores/ui/session.ts | 9 - src/shared/types/agent-interface.d.ts | 36 +++ .../types/presenters/new-agent.presenter.d.ts | 19 ++ .../deepchatAgentPresenter.test.ts | 14 +- .../newAgentPresenter/integration.test.ts | 179 ++++++++++- test/renderer/components/ChatPage.test.ts | 21 ++ 30 files changed, 1975 insertions(+), 51 deletions(-) create mode 100644 src/main/presenter/deepchatAgentPresenter/pendingInputCoordinator.ts create mode 100644 src/main/presenter/deepchatAgentPresenter/pendingInputStore.ts create mode 100644 src/main/presenter/sqlitePresenter/tables/deepchatPendingInputs.ts create mode 100644 src/renderer/src/components/chat/PendingInputLane.vue create mode 100644 src/renderer/src/stores/ui/pendingInput.ts diff --git a/src/main/events.ts b/src/main/events.ts index fdf231ba1..130c392e2 100644 --- a/src/main/events.ts +++ b/src/main/events.ts @@ -78,7 +78,8 @@ export const SESSION_EVENTS = { ACTIVATED: 'session:activated', DEACTIVATED: 'session:deactivated', STATUS_CHANGED: 'session:status-changed', - COMPACTION_UPDATED: 'session:compaction-updated' + COMPACTION_UPDATED: 'session:compaction-updated', + PENDING_INPUTS_UPDATED: 'session:pending-inputs-updated' } // 系统相关事件 diff --git a/src/main/presenter/deepchatAgentPresenter/index.ts b/src/main/presenter/deepchatAgentPresenter/index.ts index 903adc205..13952590e 100644 --- a/src/main/presenter/deepchatAgentPresenter/index.ts +++ b/src/main/presenter/deepchatAgentPresenter/index.ts @@ -4,6 +4,7 @@ import type { DeepChatSessionState, IAgentImplementation, MessageFile, + PendingSessionInputRecord, PermissionMode, SendMessageInput, SessionCompactionState, @@ -27,10 +28,12 @@ import { buildSystemEnvPrompt } from '@/lib/agentRuntime/systemEnvPromptBuilder' import { presenter } from '@/presenter' -import { buildContext, buildResumeContext } from './contextBuilder' +import { buildContext, buildResumeContext, createUserChatMessage } from './contextBuilder' import { appendSummarySection, CompactionService, type CompactionIntent } from './compactionService' import { buildPersistableMessageTracePayload } from './messageTracePayload' import { DeepChatMessageStore } from './messageStore' +import { PendingInputCoordinator } from './pendingInputCoordinator' +import { DeepChatPendingInputStore } from './pendingInputStore' import { processStream } from './process' import { DeepChatSessionStore, type SessionSummaryState } from './sessionStore' import type { PendingToolInteraction, ProcessResult } from './types' @@ -97,6 +100,8 @@ export class DeepChatAgentPresenter implements IAgentImplementation { private readonly toolPresenter: IToolPresenter | null private readonly sessionStore: DeepChatSessionStore private readonly messageStore: DeepChatMessageStore + private readonly pendingInputStore: DeepChatPendingInputStore + private readonly pendingInputCoordinator: PendingInputCoordinator private readonly runtimeState: Map = new Map() private readonly sessionGenerationSettings: Map = new Map() private readonly abortControllers: Map = new Map() @@ -106,6 +111,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { private readonly sessionCompactionStates: Map = new Map() private readonly interactionLocks: Set = new Set() private readonly resumingMessages: Set = new Set() + private readonly drainingPendingQueues: Set = new Set() private readonly compactionService: CompactionService private readonly toolOutputGuard: ToolOutputGuard private readonly hooksBridge?: NewSessionHooksBridge @@ -123,6 +129,8 @@ export class DeepChatAgentPresenter implements IAgentImplementation { this.toolPresenter = toolPresenter ?? null this.sessionStore = new DeepChatSessionStore(sqlitePresenter) this.messageStore = new DeepChatMessageStore(sqlitePresenter) + this.pendingInputStore = new DeepChatPendingInputStore(sqlitePresenter) + this.pendingInputCoordinator = new PendingInputCoordinator(this.pendingInputStore) this.compactionService = new CompactionService( this.sessionStore, this.messageStore, @@ -136,6 +144,13 @@ export class DeepChatAgentPresenter implements IAgentImplementation { if (recovered > 0) { console.log(`DeepChatAgent: recovered ${recovered} pending messages to error status`) } + + const recoveredPendingInputs = this.pendingInputCoordinator.recoverClaimedInputsAfterRestart() + if (recoveredPendingInputs > 0) { + console.log( + `DeepChatAgent: recovered ${recoveredPendingInputs} sessions with claimed pending inputs` + ) + } } async initSession( @@ -190,6 +205,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { this.abortControllers.delete(sessionId) } + this.pendingInputCoordinator.deleteBySession(sessionId) this.messageStore.deleteBySession(sessionId) this.sessionStore.delete(sessionId) this.runtimeState.delete(sessionId) @@ -198,6 +214,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { this.sessionProjectDirs.delete(sessionId) this.systemPromptCache.delete(sessionId) this.sessionCompactionStates.delete(sessionId) + this.drainingPendingQueues.delete(sessionId) } async getSessionState(sessionId: string): Promise { @@ -226,10 +243,72 @@ export class DeepChatAgentPresenter implements IAgentImplementation { return { ...rebuilt } } + async listPendingInputs(sessionId: string): Promise { + return this.pendingInputCoordinator.listPendingInputs(sessionId) + } + + async queuePendingInput( + sessionId: string, + content: string | SendMessageInput + ): Promise { + const state = await this.getSessionState(sessionId) + if (!state) { + throw new Error(`Session ${sessionId} not found`) + } + + const record = this.pendingInputCoordinator.queuePendingInput(sessionId, content) + void this.drainPendingQueueIfPossible(sessionId, 'enqueue') + return record + } + + async updateQueuedInput( + sessionId: string, + itemId: string, + content: string | SendMessageInput + ): Promise { + await this.ensureSessionReadyForPendingInputMutation(sessionId) + return this.pendingInputCoordinator.updateQueuedInput(sessionId, itemId, content) + } + + async moveQueuedInput( + sessionId: string, + itemId: string, + toIndex: number + ): Promise { + await this.ensureSessionReadyForPendingInputMutation(sessionId) + return this.pendingInputCoordinator.moveQueuedInput(sessionId, itemId, toIndex) + } + + async convertPendingInputToSteer( + sessionId: string, + itemId: string + ): Promise { + await this.ensureSessionReadyForPendingInputMutation(sessionId) + return this.pendingInputCoordinator.convertPendingInputToSteer(sessionId, itemId) + } + + async deletePendingInput(sessionId: string, itemId: string): Promise { + await this.ensureSessionReadyForPendingInputMutation(sessionId) + this.pendingInputCoordinator.deletePendingInput(sessionId, itemId) + } + + async resumePendingQueue(sessionId: string): Promise { + const state = await this.getSessionState(sessionId) + if (!state) { + throw new Error(`Session ${sessionId} not found`) + } + + void this.drainPendingQueueIfPossible(sessionId, 'resume') + } + async processMessage( sessionId: string, content: string | SendMessageInput, - context?: { projectDir?: string | null; emitRefreshBeforeStream?: boolean } + context?: { + projectDir?: string | null + emitRefreshBeforeStream?: boolean + pendingQueueItemId?: string + } ): Promise { const state = this.runtimeState.get(sessionId) if (!state) throw new Error(`Session ${sessionId} not found`) @@ -245,6 +324,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { ) this.setSessionStatus(sessionId, 'generating') + let consumedPendingQueueItem = false try { const generationSettings = await this.getEffectiveSessionGenerationSettings(sessionId) @@ -340,6 +420,11 @@ export class DeepChatAgentPresenter implements IAgentImplementation { assistantOrderSeq ) + if (context?.pendingQueueItemId) { + this.pendingInputCoordinator.consumeQueuedInput(sessionId, context.pendingQueueItemId) + consumedPendingQueueItem = true + } + if (context?.emitRefreshBeforeStream) { this.emitMessageRefresh(sessionId, assistantMessageId || userMessageId) } @@ -353,8 +438,21 @@ export class DeepChatAgentPresenter implements IAgentImplementation { tools }) this.applyProcessResultStatus(sessionId, result) + if (result?.status === 'completed') { + void this.drainPendingQueueIfPossible(sessionId, 'completed') + } } catch (err) { console.error('[DeepChatAgent] processMessage error:', err) + if (context?.pendingQueueItemId && !consumedPendingQueueItem) { + try { + this.pendingInputCoordinator.releaseClaimedQueueInput( + sessionId, + context.pendingQueueItemId + ) + } catch (releaseError) { + console.warn('[DeepChatAgent] failed to release claimed queue input:', releaseError) + } + } const errorMessage = err instanceof Error ? err.message : String(err) this.dispatchHook('Stop', { sessionId, @@ -583,6 +681,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { emitResolvedToolHook?.() this.messageStore.updateMessageStatus(messageId, 'sent') this.setSessionStatus(sessionId, 'idle') + void this.drainPendingQueueIfPossible(sessionId, 'question_other') return { resumed: false, waitingForUserMessage: true } } @@ -884,6 +983,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { } await this.cancelGeneration(sessionId) + this.pendingInputCoordinator.deleteBySession(sessionId) this.messageStore.deleteBySession(sessionId) this.resetSummaryState(sessionId) this.setSessionStatus(sessionId, 'idle') @@ -900,6 +1000,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { if (this.hasPendingInteractions(sessionId)) { throw new Error('Please resolve pending tool interactions before retrying.') } + this.assertNoActivePendingInputs(sessionId) const target = await this.messageStore.getMessage(messageId) if (!target) { @@ -931,6 +1032,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { } async deleteMessage(sessionId: string, messageId: string): Promise { + this.assertNoActivePendingInputs(sessionId) const target = await this.messageStore.getMessage(messageId) if (!target) { throw new Error(`Message ${messageId} not found`) @@ -950,6 +1052,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { messageId: string, text: string ): Promise { + this.assertNoActivePendingInputs(sessionId) const target = await this.messageStore.getMessage(messageId) if (!target) { throw new Error(`Message ${messageId} not found`) @@ -1045,6 +1148,9 @@ export class DeepChatAgentPresenter implements IAgentImplementation { } const traceEnabled = this.configPresenter.getSetting('traceDebugEnabled') === true + const pendingInputCoordinator = this.pendingInputCoordinator + const injectSteerInputsIntoRequest = this.injectSteerInputsIntoRequest.bind(this) + const persistMessageTrace = this.persistMessageTrace.bind(this) if (traceEnabled) { const traceAwareConfig = modelConfig as ModelConfig & { requestTraceContext?: { @@ -1055,7 +1161,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { traceAwareConfig.requestTraceContext = { enabled: true, persist: async (payload: ProviderRequestTracePayload) => { - this.persistMessageTrace({ + persistMessageTrace({ sessionId, messageId, providerId: state.providerId, @@ -1070,6 +1176,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { const maxTokens = generationSettings.maxTokens const tools = providedTools ?? (await this.loadToolDefinitionsForSession(sessionId, projectDir)) + const supportsVision = this.supportsVision(state.providerId, state.modelId) const abortController = new AbortController() this.abortControllers.set(sessionId, abortController) @@ -1088,7 +1195,49 @@ export class DeepChatAgentPresenter implements IAgentImplementation { messages, tools, toolPresenter: this.toolPresenter, - coreStream: provider.coreStream.bind(provider), + coreStream: async function* ( + requestMessages, + requestModelId, + requestModelConfig, + requestTemperature, + requestMaxTokens, + requestTools + ) { + const claimedSteerBatch = pendingInputCoordinator.claimSteerBatchForNextLoop(sessionId) + const injectedMessages = injectSteerInputsIntoRequest( + requestMessages, + claimedSteerBatch, + supportsVision + ) + + let didConsumeSteerBatch = false + + try { + for await (const event of provider.coreStream( + injectedMessages, + requestModelId, + requestModelConfig, + requestTemperature, + requestMaxTokens, + requestTools + )) { + if (!didConsumeSteerBatch && claimedSteerBatch.length > 0) { + pendingInputCoordinator.consumeClaimedSteerBatch(sessionId) + didConsumeSteerBatch = true + } + yield event + } + + if (!didConsumeSteerBatch && claimedSteerBatch.length > 0) { + pendingInputCoordinator.consumeClaimedSteerBatch(sessionId) + } + } catch (error) { + if (!didConsumeSteerBatch && claimedSteerBatch.length > 0) { + pendingInputCoordinator.releaseClaimedInputs(sessionId) + } + throw error + } + }, providerId: state.providerId, modelId: state.modelId, modelConfig, @@ -1155,6 +1304,75 @@ export class DeepChatAgentPresenter implements IAgentImplementation { } } + private injectSteerInputsIntoRequest( + messages: ChatMessage[], + steerInputs: PendingSessionInputRecord[], + supportsVision: boolean + ): ChatMessage[] { + if (steerInputs.length === 0) { + return messages + } + + const steerMessages = steerInputs.map((input) => + createUserChatMessage(input.payload, supportsVision) + ) + const clonedMessages = [...messages] + const lastMessage = clonedMessages[clonedMessages.length - 1] + + if (lastMessage?.role === 'user') { + return [...clonedMessages.slice(0, -1), ...steerMessages, lastMessage] + } + + return [...clonedMessages, ...steerMessages] + } + + private async drainPendingQueueIfPossible( + sessionId: string, + _reason: 'enqueue' | 'resume' | 'completed' | 'question_other' + ): Promise { + if (this.drainingPendingQueues.has(sessionId)) { + return false + } + + const state = await this.getSessionState(sessionId) + if (!state || state.status !== 'idle') { + return false + } + if (this.hasPendingInteractions(sessionId)) { + return false + } + + const nextQueuedInput = this.pendingInputCoordinator.getNextQueuedInput(sessionId) + if (!nextQueuedInput) { + return false + } + + this.drainingPendingQueues.add(sessionId) + try { + const claimedInput = this.pendingInputCoordinator.claimQueuedInput( + sessionId, + nextQueuedInput.id + ) + await this.processMessage(sessionId, claimedInput.payload, { + projectDir: this.resolveProjectDir(sessionId), + pendingQueueItemId: claimedInput.id + }) + return true + } catch (error) { + console.error('[DeepChatAgent] drainPendingQueueIfPossible error:', error) + return false + } finally { + this.drainingPendingQueues.delete(sessionId) + if ( + this.pendingInputCoordinator.getNextQueuedInput(sessionId) && + (await this.getSessionState(sessionId))?.status === 'idle' && + !this.hasPendingInteractions(sessionId) + ) { + void this.drainPendingQueueIfPossible(sessionId, 'completed') + } + } + } + private applyProcessResultStatus( sessionId: string, result: ProcessResult | null | undefined @@ -1278,6 +1496,9 @@ export class DeepChatAgentPresenter implements IAgentImplementation { initialBlocks }) this.applyProcessResultStatus(sessionId, result) + if (result?.status === 'completed') { + void this.drainPendingQueueIfPossible(sessionId, 'completed') + } return true } catch (error) { console.error('[DeepChatAgent] resumeAssistantMessage error:', error) @@ -1977,6 +2198,20 @@ export class DeepChatAgentPresenter implements IAgentImplementation { return Math.max(MAX_TOKENS_MIN, Math.round(configured)) } + private async ensureSessionReadyForPendingInputMutation(sessionId: string): Promise { + const state = await this.getSessionState(sessionId) + if (!state) { + throw new Error(`Session ${sessionId} not found`) + } + } + + private assertNoActivePendingInputs(sessionId: string): void { + if (!this.pendingInputCoordinator.hasActiveInputs(sessionId)) { + return + } + throw new Error('Please clear the waiting lane before mutating chat history.') + } + private parseAssistantBlocks(rawContent: string): AssistantMessageBlock[] { try { const parsed = JSON.parse(rawContent) as AssistantMessageBlock[] diff --git a/src/main/presenter/deepchatAgentPresenter/pendingInputCoordinator.ts b/src/main/presenter/deepchatAgentPresenter/pendingInputCoordinator.ts new file mode 100644 index 000000000..29ec7a56e --- /dev/null +++ b/src/main/presenter/deepchatAgentPresenter/pendingInputCoordinator.ts @@ -0,0 +1,159 @@ +import { eventBus, SendTarget } from '@/eventbus' +import { SESSION_EVENTS } from '@/events' +import type { PendingSessionInputRecord, SendMessageInput } from '@shared/types/agent-interface' +import { DeepChatPendingInputStore } from './pendingInputStore' + +const MAX_ACTIVE_PENDING_INPUTS = 5 + +function normalizeInput(input: string | SendMessageInput): SendMessageInput { + if (typeof input === 'string') { + return { text: input, files: [] } + } + + return { + text: typeof input?.text === 'string' ? input.text : '', + files: Array.isArray(input?.files) ? input.files.filter(Boolean) : [] + } +} + +export class PendingInputCoordinator { + private readonly store: DeepChatPendingInputStore + + constructor(store: DeepChatPendingInputStore) { + this.store = store + } + + listPendingInputs(sessionId: string): PendingSessionInputRecord[] { + return this.store.listPendingInputs(sessionId) + } + + queuePendingInput( + sessionId: string, + input: string | SendMessageInput + ): PendingSessionInputRecord { + this.ensureWithinLimit(sessionId) + const record = this.store.createQueueInput(sessionId, normalizeInput(input)) + this.emitUpdated(sessionId) + return record + } + + updateQueuedInput( + sessionId: string, + itemId: string, + input: string | SendMessageInput + ): PendingSessionInputRecord { + this.assertQueueInput(sessionId, itemId) + const record = this.store.updateQueueInput(itemId, normalizeInput(input)) + this.emitUpdated(sessionId) + return record + } + + moveQueuedInput(sessionId: string, itemId: string, toIndex: number): PendingSessionInputRecord[] { + this.assertQueueInput(sessionId, itemId) + const records = this.store.moveQueueInput(sessionId, itemId, toIndex) + this.emitUpdated(sessionId) + return records + } + + convertPendingInputToSteer(sessionId: string, itemId: string): PendingSessionInputRecord { + this.assertQueueInput(sessionId, itemId) + const record = this.store.convertQueueInputToSteer(itemId) + this.emitUpdated(sessionId) + return record + } + + deletePendingInput(sessionId: string, itemId: string): void { + this.assertQueueInput(sessionId, itemId) + this.store.deleteInput(itemId) + this.emitUpdated(sessionId) + } + + getNextQueuedInput(sessionId: string): PendingSessionInputRecord | null { + return this.store.getNextPendingQueueInput(sessionId) + } + + claimQueuedInput(sessionId: string, itemId: string): PendingSessionInputRecord { + this.assertQueueInput(sessionId, itemId) + const record = this.store.claimQueueInput(itemId) + this.emitUpdated(sessionId) + return record + } + + releaseClaimedQueueInput(sessionId: string, itemId: string): PendingSessionInputRecord { + const record = this.store.releaseClaimedQueueInput(itemId) + this.emitUpdated(sessionId) + return record + } + + consumeQueuedInput(sessionId: string, itemId: string): void { + this.store.consumeQueueInput(itemId) + this.emitUpdated(sessionId) + } + + claimSteerBatchForNextLoop(sessionId: string): PendingSessionInputRecord[] { + const claimed = this.store.claimSteerBatch(sessionId) + if (claimed.length > 0) { + this.emitUpdated(sessionId) + } + return claimed + } + + releaseClaimedInputs(sessionId: string): number { + const released = this.store.releaseClaimedInputs(sessionId) + if (released > 0) { + this.emitUpdated(sessionId) + } + return released + } + + consumeClaimedSteerBatch(sessionId: string): number { + const consumed = this.store.consumeClaimedSteerBatch(sessionId) + if (consumed > 0) { + this.emitUpdated(sessionId) + } + return consumed + } + + recoverClaimedInputsAfterRestart(): number { + const sessionIds = this.store.recoverClaimedInputs() + for (const sessionId of sessionIds) { + this.emitUpdated(sessionId) + } + return sessionIds.length + } + + hasActiveInputs(sessionId: string): boolean { + return this.store.countActive(sessionId) > 0 + } + + isAtCapacity(sessionId: string): boolean { + return this.store.countActive(sessionId) >= MAX_ACTIVE_PENDING_INPUTS + } + + deleteBySession(sessionId: string): void { + this.store.deleteBySession(sessionId) + this.emitUpdated(sessionId) + } + + private ensureWithinLimit(sessionId: string): void { + if (this.store.countActive(sessionId) >= MAX_ACTIVE_PENDING_INPUTS) { + throw new Error('Pending input limit reached for this session.') + } + } + + private assertQueueInput(sessionId: string, itemId: string): void { + const record = this.store.listPendingInputs(sessionId).find((item) => item.id === itemId) + if (!record) { + throw new Error(`Pending input not found: ${itemId}`) + } + if (record.mode !== 'queue') { + throw new Error('Steer inputs are locked and cannot be modified.') + } + } + + private emitUpdated(sessionId: string): void { + eventBus.sendToRenderer(SESSION_EVENTS.PENDING_INPUTS_UPDATED, SendTarget.ALL_WINDOWS, { + sessionId + }) + } +} diff --git a/src/main/presenter/deepchatAgentPresenter/pendingInputStore.ts b/src/main/presenter/deepchatAgentPresenter/pendingInputStore.ts new file mode 100644 index 000000000..9538ac1ff --- /dev/null +++ b/src/main/presenter/deepchatAgentPresenter/pendingInputStore.ts @@ -0,0 +1,278 @@ +import { nanoid } from 'nanoid' +import type { + PendingSessionInputRecord, + PendingSessionInputState, + SendMessageInput +} from '@shared/types/agent-interface' +import type { SQLitePresenter } from '../sqlitePresenter' +import type { DeepChatPendingInputRow } from '../sqlitePresenter/tables/deepchatPendingInputs' + +function normalizeInput(input: string | SendMessageInput): SendMessageInput { + if (typeof input === 'string') { + return { text: input, files: [] } + } + + return { + text: typeof input?.text === 'string' ? input.text : '', + files: Array.isArray(input?.files) ? input.files.filter(Boolean) : [] + } +} + +export class DeepChatPendingInputStore { + private readonly sqlitePresenter: SQLitePresenter + + constructor(sqlitePresenter: SQLitePresenter) { + this.sqlitePresenter = sqlitePresenter + } + + listPendingInputs(sessionId: string): PendingSessionInputRecord[] { + return this.sqlitePresenter.deepchatPendingInputsTable + .listActiveBySession(sessionId) + .filter((row) => !(row.mode === 'queue' && row.state === 'claimed')) + .map((row) => this.toRecord(row)) + } + + countActive(sessionId: string): number { + return this.sqlitePresenter.deepchatPendingInputsTable.countActiveBySession(sessionId) + } + + createQueueInput(sessionId: string, input: string | SendMessageInput): PendingSessionInputRecord { + const normalized = normalizeInput(input) + const id = nanoid() + const nextQueueOrder = this.getNextQueueOrder(sessionId) + this.sqlitePresenter.deepchatPendingInputsTable.insert({ + id, + sessionId, + mode: 'queue', + state: 'pending', + payloadJson: JSON.stringify(normalized), + queueOrder: nextQueueOrder + }) + const row = this.sqlitePresenter.deepchatPendingInputsTable.get(id) + if (!row) { + throw new Error(`Failed to create pending input ${id}`) + } + return this.toRecord(row) + } + + updateQueueInput(itemId: string, input: string | SendMessageInput): PendingSessionInputRecord { + const row = this.requireRow(itemId) + this.sqlitePresenter.deepchatPendingInputsTable.update(itemId, { + payload_json: JSON.stringify(normalizeInput(input)) + }) + return this.toRecord(this.requireRow(itemId, row.session_id)) + } + + moveQueueInput(sessionId: string, itemId: string, toIndex: number): PendingSessionInputRecord[] { + const queueRows = this.getPendingQueueRows(sessionId) + const fromIndex = queueRows.findIndex((row) => row.id === itemId) + if (fromIndex === -1) { + throw new Error(`Pending queue item not found: ${itemId}`) + } + + const clampedIndex = Math.max(0, Math.min(toIndex, queueRows.length - 1)) + if (fromIndex === clampedIndex) { + return this.listPendingInputs(sessionId) + } + + const [moved] = queueRows.splice(fromIndex, 1) + queueRows.splice(clampedIndex, 0, moved) + this.resequenceQueueRows(queueRows) + + return this.listPendingInputs(sessionId) + } + + convertQueueInputToSteer(itemId: string): PendingSessionInputRecord { + const row = this.requireRow(itemId) + this.sqlitePresenter.deepchatPendingInputsTable.update(itemId, { + mode: 'steer', + queue_order: null + }) + this.resequenceQueue(row.session_id) + return this.toRecord(this.requireRow(itemId, row.session_id)) + } + + deleteInput(itemId: string): void { + const row = this.requireRow(itemId) + this.sqlitePresenter.deepchatPendingInputsTable.delete(itemId) + if (row.mode === 'queue') { + this.resequenceQueue(row.session_id) + } + } + + getNextPendingQueueInput(sessionId: string): PendingSessionInputRecord | null { + const row = this.getPendingQueueRows(sessionId)[0] + return row ? this.toRecord(row) : null + } + + claimQueueInput(itemId: string): PendingSessionInputRecord { + const row = this.requireRow(itemId) + if (row.mode !== 'queue') { + throw new Error(`Pending input ${itemId} is not a queue item.`) + } + if (row.state !== 'pending') { + throw new Error(`Pending queue item ${itemId} is not claimable.`) + } + + this.sqlitePresenter.deepchatPendingInputsTable.update(itemId, { + state: 'claimed', + claimed_at: Date.now() + }) + return this.toRecord(this.requireRow(itemId, row.session_id)) + } + + releaseClaimedQueueInput(itemId: string): PendingSessionInputRecord { + const row = this.requireRow(itemId) + if (row.mode !== 'queue') { + throw new Error(`Pending input ${itemId} is not a queue item.`) + } + if (row.state !== 'claimed') { + return this.toRecord(row) + } + + this.sqlitePresenter.deepchatPendingInputsTable.update(itemId, { + state: 'pending', + claimed_at: null + }) + return this.toRecord(this.requireRow(itemId, row.session_id)) + } + + consumeQueueInput(itemId: string): void { + this.deleteInput(itemId) + } + + claimSteerBatch(sessionId: string): PendingSessionInputRecord[] { + const now = Date.now() + const steerRows = this.getSteerRows(sessionId).filter((row) => row.state === 'pending') + if (steerRows.length === 0) { + return [] + } + + for (const row of steerRows) { + this.sqlitePresenter.deepchatPendingInputsTable.update(row.id, { + state: 'claimed', + claimed_at: now + }) + } + + return this.getSteerRows(sessionId) + .filter((row) => row.state === 'claimed') + .map((row) => this.toRecord(row)) + } + + releaseClaimedInputs(sessionId: string): number { + const claimedRows = this.sqlitePresenter.deepchatPendingInputsTable + .listActiveBySession(sessionId) + .filter((row) => row.state === 'claimed') + for (const row of claimedRows) { + this.sqlitePresenter.deepchatPendingInputsTable.update(row.id, { + state: 'pending', + claimed_at: null + }) + } + return claimedRows.length + } + + recoverClaimedInputs(): string[] { + const rows = this.listClaimedRows() + for (const row of rows) { + this.sqlitePresenter.deepchatPendingInputsTable.update(row.id, { + state: 'pending', + claimed_at: null + }) + } + return Array.from(new Set(rows.map((row) => row.session_id))) + } + + consumeClaimedSteerBatch(sessionId: string): number { + const claimedSteerRows = this.getSteerRows(sessionId).filter((row) => row.state === 'claimed') + if (claimedSteerRows.length === 0) { + return 0 + } + + const now = Date.now() + for (const row of claimedSteerRows) { + this.sqlitePresenter.deepchatPendingInputsTable.update(row.id, { + state: 'consumed', + consumed_at: now + }) + } + return claimedSteerRows.length + } + + deleteBySession(sessionId: string): void { + this.sqlitePresenter.deepchatPendingInputsTable.deleteBySession(sessionId) + } + + private getNextQueueOrder(sessionId: string): number { + const queueRows = this.getPendingQueueRows(sessionId) + if (queueRows.length === 0) { + return 1 + } + return (queueRows[queueRows.length - 1].queue_order ?? 0) + 1 + } + + private getPendingQueueRows(sessionId: string): DeepChatPendingInputRow[] { + return this.sqlitePresenter.deepchatPendingInputsTable + .listActiveBySession(sessionId) + .filter((row) => row.mode === 'queue' && row.state === 'pending') + .sort((left, right) => (left.queue_order ?? 0) - (right.queue_order ?? 0)) + } + + private getSteerRows(sessionId: string): DeepChatPendingInputRow[] { + return this.sqlitePresenter.deepchatPendingInputsTable + .listActiveBySession(sessionId) + .filter((row) => row.mode === 'steer') + .sort((left, right) => left.created_at - right.created_at) + } + + private listClaimedRows(): DeepChatPendingInputRow[] { + return this.sqlitePresenter.deepchatPendingInputsTable.listClaimed() + } + + private resequenceQueue(sessionId: string): void { + this.resequenceQueueRows(this.getPendingQueueRows(sessionId)) + } + + private resequenceQueueRows(rows: DeepChatPendingInputRow[]): void { + rows.forEach((row, index) => { + this.sqlitePresenter.deepchatPendingInputsTable.update(row.id, { + queue_order: index + 1 + }) + }) + } + + private requireRow(itemId: string, expectedSessionId?: string): DeepChatPendingInputRow { + const row = this.sqlitePresenter.deepchatPendingInputsTable.get(itemId) + if (!row) { + throw new Error(`Pending input not found: ${itemId}`) + } + if (expectedSessionId && row.session_id !== expectedSessionId) { + throw new Error(`Pending input ${itemId} does not belong to session ${expectedSessionId}`) + } + return row + } + + private toRecord(row: DeepChatPendingInputRow): PendingSessionInputRecord { + return { + id: row.id, + sessionId: row.session_id, + mode: row.mode, + state: row.state as PendingSessionInputState, + payload: this.parsePayload(row.payload_json), + queueOrder: row.queue_order, + claimedAt: row.claimed_at, + consumedAt: row.consumed_at, + createdAt: row.created_at, + updatedAt: row.updated_at + } + } + + private parsePayload(raw: string): SendMessageInput { + try { + return normalizeInput(JSON.parse(raw) as SendMessageInput) + } catch { + return normalizeInput(raw) + } + } +} diff --git a/src/main/presenter/newAgentPresenter/index.ts b/src/main/presenter/newAgentPresenter/index.ts index 8afefb149..cecdadf74 100644 --- a/src/main/presenter/newAgentPresenter/index.ts +++ b/src/main/presenter/newAgentPresenter/index.ts @@ -181,11 +181,19 @@ export class NewAgentPresenter { modelId: state?.modelId ?? modelId } - // Process the first message (non-blocking) after returning session ID - console.log(`[NewAgentPresenter] firing processMessage (non-blocking)`) - agent.processMessage(sessionId, normalizedInput, { projectDir }).catch((err) => { - console.error('[NewAgentPresenter] processMessage failed:', err) - }) + // Queue the first message (non-blocking) after returning session ID + if (normalizedInput.text.trim() || (normalizedInput.files?.length ?? 0) > 0) { + console.log(`[NewAgentPresenter] firing queuePendingInput (non-blocking)`) + if (agent.queuePendingInput) { + agent.queuePendingInput(sessionId, normalizedInput).catch((err) => { + console.error('[NewAgentPresenter] queuePendingInput failed:', err) + }) + } else { + agent.processMessage(sessionId, normalizedInput, { projectDir }).catch((err) => { + console.error('[NewAgentPresenter] processMessage failed:', err) + }) + } + } void this.generateSessionTitle(sessionId, title, providerId, modelId) return sessionResult @@ -272,11 +280,118 @@ export class NewAgentPresenter { } } this.assertAcpSessionHasWorkdir(providerId, session.projectDir ?? null) + if (agent.queuePendingInput) { + await agent.queuePendingInput(sessionId, normalizedInput) + return + } await agent.processMessage(sessionId, normalizedInput, { projectDir: session.projectDir ?? null }) } + async listPendingInputs(sessionId: string) { + const session = this.sessionManager.get(sessionId) + if (!session) { + throw new Error(`Session not found: ${sessionId}`) + } + const agent = await this.resolveAgentImplementation(session.agentId) + if (!agent.listPendingInputs) { + return [] + } + return await agent.listPendingInputs(sessionId) + } + + async queuePendingInput(sessionId: string, content: string | SendMessageInput) { + const session = this.sessionManager.get(sessionId) + if (!session) { + throw new Error(`Session not found: ${sessionId}`) + } + + let currentSession = session + const normalizedInput = this.normalizeSendMessageInput(content) + if (currentSession.isDraft) { + const title = normalizedInput.text.trim().slice(0, 50) || 'New Chat' + this.sessionManager.update(sessionId, { isDraft: false, title }) + this.emitSessionListUpdated() + currentSession = this.sessionManager.get(sessionId) ?? currentSession + } + + const agent = await this.resolveAgentImplementation(currentSession.agentId) + if (!agent.queuePendingInput) { + throw new Error(`Agent ${currentSession.agentId} does not support pending inputs.`) + } + + let providerId = (await agent.getSessionState(sessionId))?.providerId ?? '' + if (!providerId) { + const acpAgents = await this.configPresenter.getAcpAgents() + if (acpAgents.some((item) => item.id === currentSession.agentId)) { + providerId = 'acp' + } + } + this.assertAcpSessionHasWorkdir(providerId, currentSession.projectDir ?? null) + return await agent.queuePendingInput(sessionId, normalizedInput) + } + + async updateQueuedInput(sessionId: string, itemId: string, content: string | SendMessageInput) { + const session = this.sessionManager.get(sessionId) + if (!session) { + throw new Error(`Session not found: ${sessionId}`) + } + const agent = await this.resolveAgentImplementation(session.agentId) + if (!agent.updateQueuedInput) { + throw new Error(`Agent ${session.agentId} does not support pending input edits.`) + } + return await agent.updateQueuedInput(sessionId, itemId, this.normalizeSendMessageInput(content)) + } + + async moveQueuedInput(sessionId: string, itemId: string, toIndex: number) { + const session = this.sessionManager.get(sessionId) + if (!session) { + throw new Error(`Session not found: ${sessionId}`) + } + const agent = await this.resolveAgentImplementation(session.agentId) + if (!agent.moveQueuedInput) { + throw new Error(`Agent ${session.agentId} does not support pending input sorting.`) + } + return await agent.moveQueuedInput(sessionId, itemId, toIndex) + } + + async convertPendingInputToSteer(sessionId: string, itemId: string) { + const session = this.sessionManager.get(sessionId) + if (!session) { + throw new Error(`Session not found: ${sessionId}`) + } + const agent = await this.resolveAgentImplementation(session.agentId) + if (!agent.convertPendingInputToSteer) { + throw new Error(`Agent ${session.agentId} does not support steer conversion.`) + } + return await agent.convertPendingInputToSteer(sessionId, itemId) + } + + async deletePendingInput(sessionId: string, itemId: string): Promise { + const session = this.sessionManager.get(sessionId) + if (!session) { + throw new Error(`Session not found: ${sessionId}`) + } + const agent = await this.resolveAgentImplementation(session.agentId) + if (!agent.deletePendingInput) { + throw new Error(`Agent ${session.agentId} does not support pending input deletion.`) + } + await agent.deletePendingInput(sessionId, itemId) + } + + async resumePendingQueue(sessionId: string): Promise { + const session = this.sessionManager.get(sessionId) + if (!session) { + throw new Error(`Session not found: ${sessionId}`) + } + const agent = await this.resolveAgentImplementation(session.agentId) + if (!agent.resumePendingQueue) { + throw new Error(`Agent ${session.agentId} does not support pending queue resume.`) + } + await agent.resumePendingQueue(sessionId) + } + async retryMessage(sessionId: string, messageId: string): Promise { const session = this.sessionManager.get(sessionId) if (!session) { diff --git a/src/main/presenter/sqlitePresenter/index.ts b/src/main/presenter/sqlitePresenter/index.ts index 2e36219d5..3591c8b14 100644 --- a/src/main/presenter/sqlitePresenter/index.ts +++ b/src/main/presenter/sqlitePresenter/index.ts @@ -20,6 +20,7 @@ import { DeepChatSessionsTable } from './tables/deepchatSessions' import { DeepChatMessagesTable } from './tables/deepchatMessages' import { DeepChatMessageTracesTable } from './tables/deepchatMessageTraces' import { DeepChatMessageSearchResultsTable } from './tables/deepchatMessageSearchResults' +import { DeepChatPendingInputsTable } from './tables/deepchatPendingInputs' import { DeepChatUsageStatsTable } from './tables/deepchatUsageStats' import { LegacyImportStatusTable } from './tables/legacyImportStatus' @@ -44,6 +45,7 @@ export class SQLitePresenter implements ISQLitePresenter { public deepchatMessagesTable!: DeepChatMessagesTable public deepchatMessageTracesTable!: DeepChatMessageTracesTable public deepchatMessageSearchResultsTable!: DeepChatMessageSearchResultsTable + public deepchatPendingInputsTable!: DeepChatPendingInputsTable public deepchatUsageStatsTable!: DeepChatUsageStatsTable public legacyImportStatusTable!: LegacyImportStatusTable private currentVersion: number = 0 @@ -163,6 +165,7 @@ export class SQLitePresenter implements ISQLitePresenter { this.deepchatMessagesTable = new DeepChatMessagesTable(this.db) this.deepchatMessageTracesTable = new DeepChatMessageTracesTable(this.db) this.deepchatMessageSearchResultsTable = new DeepChatMessageSearchResultsTable(this.db) + this.deepchatPendingInputsTable = new DeepChatPendingInputsTable(this.db) this.deepchatUsageStatsTable = new DeepChatUsageStatsTable(this.db) this.legacyImportStatusTable = new LegacyImportStatusTable(this.db) @@ -175,6 +178,7 @@ export class SQLitePresenter implements ISQLitePresenter { this.deepchatMessagesTable.createTable() this.deepchatMessageTracesTable.createTable() this.deepchatMessageSearchResultsTable.createTable() + this.deepchatPendingInputsTable.createTable() this.deepchatUsageStatsTable.createTable() this.legacyImportStatusTable.createTable() } @@ -206,6 +210,7 @@ export class SQLitePresenter implements ISQLitePresenter { this.deepchatMessagesTable, this.deepchatMessageTracesTable, this.deepchatMessageSearchResultsTable, + this.deepchatPendingInputsTable, this.deepchatUsageStatsTable, this.legacyImportStatusTable ] diff --git a/src/main/presenter/sqlitePresenter/tables/deepchatPendingInputs.ts b/src/main/presenter/sqlitePresenter/tables/deepchatPendingInputs.ts new file mode 100644 index 000000000..336c337d6 --- /dev/null +++ b/src/main/presenter/sqlitePresenter/tables/deepchatPendingInputs.ts @@ -0,0 +1,218 @@ +import Database from 'better-sqlite3-multiple-ciphers' +import { BaseTable } from './baseTable' + +export interface DeepChatPendingInputRow { + id: string + session_id: string + mode: 'queue' | 'steer' + state: 'pending' | 'claimed' | 'consumed' + payload_json: string + queue_order: number | null + claimed_at: number | null + consumed_at: number | null + created_at: number + updated_at: number +} + +export class DeepChatPendingInputsTable extends BaseTable { + constructor(db: Database.Database) { + super(db, 'deepchat_pending_inputs') + } + + getCreateTableSQL(): string { + return ` + CREATE TABLE IF NOT EXISTS deepchat_pending_inputs ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + mode TEXT NOT NULL, + state TEXT NOT NULL DEFAULT 'pending', + payload_json TEXT NOT NULL, + queue_order INTEGER, + claimed_at INTEGER, + consumed_at INTEGER, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_deepchat_pending_inputs_session + ON deepchat_pending_inputs(session_id, state, mode, queue_order, created_at); + ` + } + + getMigrationSQL(version: number): string | null { + if (version === 17) { + return this.getCreateTableSQL() + } + return null + } + + getLatestVersion(): number { + return 17 + } + + insert(row: { + id: string + sessionId: string + mode: 'queue' | 'steer' + state?: 'pending' | 'claimed' | 'consumed' + payloadJson: string + queueOrder?: number | null + claimedAt?: number | null + consumedAt?: number | null + createdAt?: number + updatedAt?: number + }): void { + const now = Date.now() + const createdAt = row.createdAt ?? now + const updatedAt = row.updatedAt ?? createdAt + this.db + .prepare( + `INSERT INTO deepchat_pending_inputs ( + id, + session_id, + mode, + state, + payload_json, + queue_order, + claimed_at, + consumed_at, + created_at, + updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ) + .run( + row.id, + row.sessionId, + row.mode, + row.state ?? 'pending', + row.payloadJson, + row.queueOrder ?? null, + row.claimedAt ?? null, + row.consumedAt ?? null, + createdAt, + updatedAt + ) + } + + get(id: string): DeepChatPendingInputRow | undefined { + return this.db.prepare('SELECT * FROM deepchat_pending_inputs WHERE id = ?').get(id) as + | DeepChatPendingInputRow + | undefined + } + + listBySession(sessionId: string): DeepChatPendingInputRow[] { + return this.db + .prepare( + `SELECT * + FROM deepchat_pending_inputs + WHERE session_id = ? + ORDER BY + CASE mode WHEN 'steer' THEN 0 ELSE 1 END ASC, + CASE + WHEN mode = 'queue' THEN COALESCE(queue_order, 2147483647) + ELSE created_at + END ASC, + created_at ASC` + ) + .all(sessionId) as DeepChatPendingInputRow[] + } + + listClaimed(): DeepChatPendingInputRow[] { + return this.db + .prepare( + `SELECT * + FROM deepchat_pending_inputs + WHERE state = 'claimed' + ORDER BY session_id ASC, created_at ASC` + ) + .all() as DeepChatPendingInputRow[] + } + + listActiveBySession(sessionId: string): DeepChatPendingInputRow[] { + return this.db + .prepare( + `SELECT * + FROM deepchat_pending_inputs + WHERE session_id = ? + AND state != 'consumed' + ORDER BY + CASE mode WHEN 'steer' THEN 0 ELSE 1 END ASC, + CASE + WHEN mode = 'queue' THEN COALESCE(queue_order, 2147483647) + ELSE created_at + END ASC, + created_at ASC` + ) + .all(sessionId) as DeepChatPendingInputRow[] + } + + countActiveBySession(sessionId: string): number { + const row = this.db + .prepare( + `SELECT COUNT(*) AS total + FROM deepchat_pending_inputs + WHERE session_id = ? + AND state != 'consumed' + AND NOT (mode = 'queue' AND state = 'claimed')` + ) + .get(sessionId) as { total: number } + return row.total + } + + update( + id: string, + fields: Partial< + Pick< + DeepChatPendingInputRow, + 'mode' | 'state' | 'payload_json' | 'queue_order' | 'claimed_at' | 'consumed_at' + > + > + ): void { + const setClauses: string[] = [] + const params: unknown[] = [] + + if (fields.mode !== undefined) { + setClauses.push('mode = ?') + params.push(fields.mode) + } + if (fields.state !== undefined) { + setClauses.push('state = ?') + params.push(fields.state) + } + if (fields.payload_json !== undefined) { + setClauses.push('payload_json = ?') + params.push(fields.payload_json) + } + if (fields.queue_order !== undefined) { + setClauses.push('queue_order = ?') + params.push(fields.queue_order) + } + if (fields.claimed_at !== undefined) { + setClauses.push('claimed_at = ?') + params.push(fields.claimed_at) + } + if (fields.consumed_at !== undefined) { + setClauses.push('consumed_at = ?') + params.push(fields.consumed_at) + } + + if (setClauses.length === 0) { + return + } + + setClauses.push('updated_at = ?') + params.push(Date.now()) + params.push(id) + + this.db + .prepare(`UPDATE deepchat_pending_inputs SET ${setClauses.join(', ')} WHERE id = ?`) + .run(...params) + } + + delete(id: string): void { + this.db.prepare('DELETE FROM deepchat_pending_inputs WHERE id = ?').run(id) + } + + deleteBySession(sessionId: string): void { + this.db.prepare('DELETE FROM deepchat_pending_inputs WHERE session_id = ?').run(sessionId) + } +} diff --git a/src/renderer/src/components/chat/ChatInputToolbar.vue b/src/renderer/src/components/chat/ChatInputToolbar.vue index fd9bf38e2..57489b2d5 100644 --- a/src/renderer/src/components/chat/ChatInputToolbar.vue +++ b/src/renderer/src/components/chat/ChatInputToolbar.vue @@ -37,24 +37,31 @@ - - + + + + + +

{{ t('chat.input.stop') }}

+
+
+ + + + + +

{{ t('chat.input.queue') }}

+
+
@@ -68,11 +75,13 @@ import { useI18n } from 'vue-i18n' withDefaults( defineProps<{ isGenerating?: boolean + hasText?: boolean sendDisabled?: boolean showVoiceInput?: boolean }>(), { isGenerating: false, + hasText: false, sendDisabled: false, showVoiceInput: false } diff --git a/src/renderer/src/components/chat/PendingInputLane.vue b/src/renderer/src/components/chat/PendingInputLane.vue new file mode 100644 index 000000000..a6343a0e1 --- /dev/null +++ b/src/renderer/src/components/chat/PendingInputLane.vue @@ -0,0 +1,263 @@ +