From ca7ae1a1fc49cf70b6cda267a286dd623ae0488a Mon Sep 17 00:00:00 2001 From: Jorge Calvar Date: Wed, 18 Mar 2026 14:29:25 +0100 Subject: [PATCH 1/3] fix: handle in-progress Genie messages on page reload and query overflow When reloading a page while a Genie message is still loading, the history endpoint returned the in-progress message which the frontend silently dropped (no attachments yet). This left the UI broken with no way to recover. Add a single-message polling endpoint (GET /:alias/conversations/ :conversationId/messages/:messageId) that SSE-streams status updates until the message completes. The frontend now detects pending messages after history load and polls via this endpoint, reusing the existing processStreamEvent pipeline. Also fix wide query results overflowing beyond the message bubble by switching to overflow-x-auto and adding min-w-0 constraints. Signed-off-by: Jorge Calvar --- .../react/genie/genie-chat-message-list.tsx | 10 +- .../src/react/genie/genie-chat-message.tsx | 2 +- .../react/genie/genie-query-visualization.tsx | 7 +- .../src/react/genie/use-genie-chat.ts | 107 +++++++++++++++--- .../appkit/src/connectors/genie/client.ts | 65 +++++++++++ packages/appkit/src/plugins/genie/genie.ts | 62 ++++++++++ .../src/plugins/genie/tests/genie.test.ts | 6 +- 7 files changed, 235 insertions(+), 24 deletions(-) diff --git a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx index e741bc29..83f2e84d 100644 --- a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx +++ b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx @@ -26,6 +26,8 @@ const STATUS_LABELS: Record = { COMPLETED: "Done", }; +const TERMINAL_STATUSES = new Set(["COMPLETED", "FAILED"]); + function formatStatus(status: string): string { return STATUS_LABELS[status] ?? status.replace(/_/g, " ").toLowerCase(); } @@ -166,7 +168,8 @@ export function GenieChatMessageList({ const showStreamingIndicator = status === "streaming" && lastMessage?.role === "assistant" && - lastMessage.id === ""; + !lastMessage.content && + !TERMINAL_STATUSES.has(lastMessage.status); return ( @@ -192,7 +195,10 @@ export function GenieChatMessageList({ {messages .filter( - (msg) => msg.role !== "assistant" || msg.id !== "" || msg.content, + (msg) => + msg.role !== "assistant" || + msg.content || + (msg.id !== "" && TERMINAL_STATUSES.has(msg.status)), ) .map((msg) => ( diff --git a/packages/appkit-ui/src/react/genie/genie-chat-message.tsx b/packages/appkit-ui/src/react/genie/genie-chat-message.tsx index ef34b048..a455a965 100644 --- a/packages/appkit-ui/src/react/genie/genie-chat-message.tsx +++ b/packages/appkit-ui/src/react/genie/genie-chat-message.tsx @@ -122,7 +122,7 @@ export function GenieChatMessage({ {queryResult != null && ( - + )} diff --git a/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx b/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx index 266a88e4..f0227f79 100644 --- a/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx +++ b/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx @@ -4,6 +4,7 @@ import type { GenieStatementResponse } from "shared"; import { BaseChart } from "../charts/base"; import { ChartErrorBoundary } from "../charts/chart-error-boundary"; import type { ChartType } from "../charts/types"; +import { cn } from "../lib/utils"; import { Button } from "../ui/button"; import { DropdownMenu, @@ -118,11 +119,11 @@ export function GenieQueryVisualization({ ); if (!inference || !activeChartType) { - return
{dataTable}
; + return
{dataTable}
; } return ( - +
Chart @@ -157,7 +158,7 @@ export function GenieQueryVisualization({ )}
-
+
0; - if (!hasAttachments) return [makeUserItem(msg)]; + + if (!hasAttachments && TERMINAL_STATUSES.has(msg.status)) { + return [makeUserItem(msg)]; + } + if (!hasAttachments) { + return [ + makeUserItem(msg, "-user"), + { + id: msg.messageId, + role: "assistant", + content: "", + status: msg.status, + attachments: [], + queryResults: new Map(), + }, + ]; + } return [makeUserItem(msg, "-user"), makeAssistantItem(msg)]; } @@ -202,19 +224,21 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { const msg = event.message; const hasAttachments = (msg.attachments?.length ?? 0) > 0; - if (hasAttachments) { - // During streaming we already appended the user message locally, - // so only handle assistant results. Messages without attachments - // are the user-message echo from the API — skip those. - const item = makeAssistantItem(msg); - setMessages((prev) => { - const last = prev[prev.length - 1]; - if (last?.role === "assistant" && last.id === "") { - return [...prev.slice(0, -1), item]; - } - return [...prev, item]; - }); - } + const item = makeAssistantItem(msg); + setMessages((prev) => { + const last = prev[prev.length - 1]; + if (!last || last.role !== "assistant") return prev; + + if (last.id === msg.messageId) { + return [...prev.slice(0, -1), item]; + } + + if (last.id === "" && hasAttachments) { + return [...prev.slice(0, -1), item]; + } + + return prev; + }); break; } @@ -362,6 +386,47 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { [alias, basePath], ); + const pollPendingMessage = useCallback( + ( + convId: string, + messageId: string, + parentAbortController: AbortController, + ) => { + setStatus("streaming"); + + const requestId = crypto.randomUUID(); + const url = + `${basePath}/${encodeURIComponent(alias)}/conversations/${encodeURIComponent(convId)}` + + `/messages/${encodeURIComponent(messageId)}?requestId=${encodeURIComponent(requestId)}`; + + connectSSE({ + url, + signal: parentAbortController.signal, + onMessage: async (message) => { + try { + processStreamEvent(JSON.parse(message.data) as GenieStreamEvent); + } catch { + // Malformed SSE data + } + }, + onError: (err) => { + if (parentAbortController.signal.aborted) return; + setError( + err instanceof Error + ? err.message + : "Failed to poll pending message.", + ); + setStatus("error"); + }, + }).then(() => { + if (!parentAbortController.signal.aborted) { + setStatus((prev) => (prev === "error" ? "error" : "idle")); + } + }); + }, + [alias, basePath, processStreamEvent], + ); + const loadHistory = useCallback( (convId: string) => { paginationAbortRef.current?.abort(); @@ -376,13 +441,21 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { { errorMessage: "Failed to load conversation history." }, ); promise.then((items) => { - if (!abortController.signal.aborted) { - setMessages(items); + if (abortController.signal.aborted) return; + setMessages(items); + + const lastItem = items[items.length - 1]; + if ( + lastItem?.role === "assistant" && + !TERMINAL_STATUSES.has(lastItem.status) + ) { + pollPendingMessage(convId, lastItem.id, abortController); + } else { setStatus((prev) => (prev === "error" ? "error" : "idle")); } }); }, - [fetchPage], + [fetchPage, pollPendingMessage], ); const fetchPreviousPage = useCallback(() => { diff --git a/packages/appkit/src/connectors/genie/client.ts b/packages/appkit/src/connectors/genie/client.ts index 280c56e4..68a4501e 100644 --- a/packages/appkit/src/connectors/genie/client.ts +++ b/packages/appkit/src/connectors/genie/client.ts @@ -381,6 +381,71 @@ export class GenieConnector { } } + /** + * Polls a single message via `getMessage` until it reaches a terminal + * state (`COMPLETED` or `FAILED`). Yields the same event types as + * `streamSendMessage` so callers can reuse the same SSE processing logic. + */ + async *streamGetMessage( + workspaceClient: WorkspaceClient, + spaceId: string, + conversationId: string, + messageId: string, + options?: { timeout?: number; pollInterval?: number }, + ): AsyncGenerator { + const timeout = options?.timeout ?? this.config.timeout; + const pollInterval = options?.pollInterval ?? 3_000; + const deadline = + timeout > 0 ? Date.now() + timeout : Number.POSITIVE_INFINITY; + let lastStatus = ""; + + try { + while (true) { + const message = await workspaceClient.genie.getMessage({ + space_id: spaceId, + conversation_id: conversationId, + message_id: messageId, + }); + + if (message.status && message.status !== lastStatus) { + lastStatus = message.status; + yield { type: "status", status: message.status }; + } + + const isTerminal = + message.status === "COMPLETED" || message.status === "FAILED"; + if (isTerminal) { + const messageResponse = toMessageResponse(message); + yield { type: "message_result", message: messageResponse }; + yield* this.emitQueryResults( + workspaceClient, + spaceId, + conversationId, + messageId, + messageResponse, + ); + return; + } + + if (Date.now() >= deadline) { + yield { type: "error", error: "Message polling timed out" }; + return; + } + + await new Promise((r) => setTimeout(r, pollInterval)); + } + } catch (error) { + logger.error( + "Genie getMessage poll error (spaceId=%s, conversationId=%s, messageId=%s): %O", + spaceId, + conversationId, + messageId, + error, + ); + yield { type: "error", error: classifyGenieError(error) }; + } + } + async sendMessage( workspaceClient: WorkspaceClient, spaceId: string, diff --git a/packages/appkit/src/plugins/genie/genie.ts b/packages/appkit/src/plugins/genie/genie.ts index 2ca348b4..8fe69d15 100644 --- a/packages/appkit/src/plugins/genie/genie.ts +++ b/packages/appkit/src/plugins/genie/genie.ts @@ -65,6 +65,15 @@ export class GeniePlugin extends Plugin { await this.asUser(req)._handleGetConversation(req, res); }, }); + + this.route(router, { + name: "getMessage", + method: "get", + path: "/:alias/conversations/:conversationId/messages/:messageId", + handler: async (req: express.Request, res: express.Response) => { + await this.asUser(req)._handleGetMessage(req, res); + }, + }); } async _handleSendMessage( @@ -177,6 +186,59 @@ export class GeniePlugin extends Plugin { ); } + async _handleGetMessage( + req: express.Request, + res: express.Response, + ): Promise { + const { alias, conversationId, messageId } = req.params; + const spaceId = this.resolveSpaceId(alias); + + if (!spaceId) { + res.status(404).json({ error: `Unknown space alias: ${alias}` }); + return; + } + + const requestId = + (typeof req.query.requestId === "string" && req.query.requestId) || + randomUUID(); + + logger.debug( + "Polling message %s in conversation %s from space %s (alias=%s)", + messageId, + conversationId, + spaceId, + alias, + ); + + const timeout = this.config.timeout ?? 120_000; + const streamSettings: StreamExecutionSettings = { + ...genieStreamDefaults, + default: { + ...genieStreamDefaults.default, + timeout, + }, + stream: { + ...genieStreamDefaults.stream, + streamId: requestId, + }, + }; + + const workspaceClient = getWorkspaceClient(); + + await this.executeStream( + res, + () => + this.genieConnector.streamGetMessage( + workspaceClient, + spaceId, + conversationId, + messageId, + { timeout }, + ), + streamSettings, + ); + } + async getConversation( alias: string, conversationId: string, diff --git a/packages/appkit/src/plugins/genie/tests/genie.test.ts b/packages/appkit/src/plugins/genie/tests/genie.test.ts index 37bcb0e6..3cf0784d 100644 --- a/packages/appkit/src/plugins/genie/tests/genie.test.ts +++ b/packages/appkit/src/plugins/genie/tests/genie.test.ts @@ -175,11 +175,15 @@ describe("Genie Plugin", () => { expect.any(Function), ); - expect(router.get).toHaveBeenCalledTimes(1); + expect(router.get).toHaveBeenCalledTimes(2); expect(router.get).toHaveBeenCalledWith( "/:alias/conversations/:conversationId", expect.any(Function), ); + expect(router.get).toHaveBeenCalledWith( + "/:alias/conversations/:conversationId/messages/:messageId", + expect.any(Function), + ); }); }); From 881e094682b93c89e64e05d354807abd307ce39f Mon Sep 17 00:00:00 2001 From: Jorge Calvar Date: Wed, 18 Mar 2026 14:34:08 +0100 Subject: [PATCH 2/3] fix: prevent Genie message bubbles from overflowing the chat viewport Message bubbles extended past the visible area for two reasons: 1. The content column used items-start/items-end for alignment, which caused Cards to size based on their content width instead of stretching to the column. Wide content (tables, long text) pushed Cards wider than the 80% column, with overflow clipped visually but text wrapping at the wider intrinsic width. 2. Radix ScrollArea inserts a wrapper div with display:table that grows to fit content. This made the entire scroll container wider than the viewport, so percentage-based widths resolved against the wider container. Fix: - Remove items-start/items-end from the content column - Add w-full to all Cards so they always match the column width - Override Radix's table wrapper to display:block via targeted selector on the scroll area viewport - Add break-words to markdown content and make markdown tables scrollable within the bubble Signed-off-by: Jorge Calvar --- .../src/react/genie/genie-chat-message-list.tsx | 10 ++++++++-- .../src/react/genie/genie-chat-message.tsx | 16 ++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx index 83f2e84d..89819894 100644 --- a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx +++ b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx @@ -172,8 +172,14 @@ export function GenieChatMessageList({ !TERMINAL_STATUSES.has(lastMessage.status); return ( - -
+ div]:!block", + className, + )} + > +
{hasPreviousPage &&
} {status === "loading-older" && ( diff --git a/packages/appkit-ui/src/react/genie/genie-chat-message.tsx b/packages/appkit-ui/src/react/genie/genie-chat-message.tsx index a455a965..7f19b8d8 100644 --- a/packages/appkit-ui/src/react/genie/genie-chat-message.tsx +++ b/packages/appkit-ui/src/react/genie/genie-chat-message.tsx @@ -14,11 +14,12 @@ import type { GenieAttachmentResponse, GenieMessageItem } from "./types"; marked.setOptions({ breaks: true, gfm: true }); const markdownStyles = cn( - "text-sm", + "text-sm break-words", "[&_p]:my-1 [&_ul]:my-1 [&_ol]:my-1 [&_li]:my-0", "[&_pre]:bg-background/50 [&_pre]:p-2 [&_pre]:rounded [&_pre]:text-xs [&_pre]:overflow-x-auto", "[&_code]:text-xs [&_code]:bg-background/50 [&_code]:px-1 [&_code]:rounded", - "[&_table]:text-xs [&_th]:px-2 [&_th]:py-1 [&_td]:px-2 [&_td]:py-1", + "[&_table]:text-xs [&_table]:block [&_table]:overflow-x-auto [&_table]:max-w-full", + "[&_th]:px-2 [&_th]:py-1 [&_td]:px-2 [&_td]:py-1", "[&_table]:border-collapse [&_th]:border [&_td]:border", "[&_th]:border-border [&_td]:border-border", "[&_a]:underline", @@ -66,15 +67,10 @@ export function GenieChatMessage({ -
+
{queryResult != null && ( - + )} From 63a75e5db9b38dc62f2d161a4678e68ea94722f7 Mon Sep 17 00:00:00 2001 From: Jorge Calvar Date: Wed, 1 Apr 2026 13:03:08 +0200 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20address=20PR=20feedback=20=E2=80=94?= =?UTF-8?q?=20deduplicate=20terminal=20statuses=20and=20fix=20Genie=20robu?= =?UTF-8?q?stness=20issues?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deduplicates TERMINAL_STATUSES into types.ts, fixes phantom placeholder on FAILED messages, threads abort signal into streamGetMessage polling, adds missing .catch() on fire-and-forget SSE promises, and stabilizes useCallback chain via ref to prevent identity cascade. Co-authored-by: Isaac Signed-off-by: Jorge Calvar --- .../react/genie/genie-chat-message-list.tsx | 8 +- packages/appkit-ui/src/react/genie/types.ts | 2 + .../src/react/genie/use-genie-chat.ts | 77 +++++++++++-------- .../appkit/src/connectors/genie/client.ts | 29 ++++--- packages/appkit/src/plugins/genie/genie.ts | 12 +-- 5 files changed, 76 insertions(+), 52 deletions(-) diff --git a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx index 89819894..8af90f5c 100644 --- a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx +++ b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx @@ -4,7 +4,11 @@ import { ScrollArea } from "../ui/scroll-area"; import { Skeleton } from "../ui/skeleton"; import { Spinner } from "../ui/spinner"; import { GenieChatMessage } from "./genie-chat-message"; -import type { GenieChatStatus, GenieMessageItem } from "./types"; +import { + type GenieChatStatus, + type GenieMessageItem, + TERMINAL_STATUSES, +} from "./types"; interface GenieChatMessageListProps { /** Array of messages to display */ @@ -26,8 +30,6 @@ const STATUS_LABELS: Record = { COMPLETED: "Done", }; -const TERMINAL_STATUSES = new Set(["COMPLETED", "FAILED"]); - function formatStatus(status: string): string { return STATUS_LABELS[status] ?? status.replace(/_/g, " ").toLowerCase(); } diff --git a/packages/appkit-ui/src/react/genie/types.ts b/packages/appkit-ui/src/react/genie/types.ts index 1ae7f845..a4ac3c09 100644 --- a/packages/appkit-ui/src/react/genie/types.ts +++ b/packages/appkit-ui/src/react/genie/types.ts @@ -7,6 +7,8 @@ export type { GenieStreamEvent, } from "shared"; +export const TERMINAL_STATUSES = new Set(["COMPLETED", "FAILED"]); + export type GenieChatStatus = | "idle" | "loading-history" diff --git a/packages/appkit-ui/src/react/genie/use-genie-chat.ts b/packages/appkit-ui/src/react/genie/use-genie-chat.ts index f401b367..e9bfabde 100644 --- a/packages/appkit-ui/src/react/genie/use-genie-chat.ts +++ b/packages/appkit-ui/src/react/genie/use-genie-chat.ts @@ -1,12 +1,13 @@ import { useCallback, useEffect, useRef, useState } from "react"; import { connectSSE } from "@/js"; -import type { - GenieChatStatus, - GenieMessageItem, - GenieMessageResponse, - GenieStreamEvent, - UseGenieChatOptions, - UseGenieChatReturn, +import { + type GenieChatStatus, + type GenieMessageItem, + type GenieMessageResponse, + type GenieStreamEvent, + TERMINAL_STATUSES, + type UseGenieChatOptions, + type UseGenieChatReturn, } from "./types"; function getUrlParam(name: string): string | null { @@ -63,8 +64,6 @@ function makeAssistantItem(msg: GenieMessageResponse): GenieMessageItem { }; } -const TERMINAL_STATUSES = new Set(["COMPLETED", "FAILED"]); - /** * The API bundles user question (content) and AI answer (attachments) in one message. * Split into separate user + assistant items for display. @@ -191,6 +190,9 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { const conversationIdRef = useRef(null); const nextPageTokenRef = useRef(null); const isLoadingOlderRef = useRef(false); + const processStreamEventRef = useRef<(event: GenieStreamEvent) => void>( + () => {}, + ); useEffect(() => { conversationIdRef.current = conversationId; @@ -221,19 +223,12 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { } case "message_result": { - const msg = event.message; - const hasAttachments = (msg.attachments?.length ?? 0) > 0; - - const item = makeAssistantItem(msg); + const item = makeAssistantItem(event.message); setMessages((prev) => { const last = prev[prev.length - 1]; if (!last || last.role !== "assistant") return prev; - if (last.id === msg.messageId) { - return [...prev.slice(0, -1), item]; - } - - if (last.id === "" && hasAttachments) { + if (last.id === event.message.messageId || last.id === "") { return [...prev.slice(0, -1), item]; } @@ -278,6 +273,8 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { [persistInUrl, urlParamName], ); + processStreamEventRef.current = processStreamEvent; + const sendMessage = useCallback( (content: string) => { const trimmed = content.trim(); @@ -322,7 +319,9 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { signal: abortController.signal, onMessage: async (message) => { try { - processStreamEvent(JSON.parse(message.data) as GenieStreamEvent); + processStreamEventRef.current( + JSON.parse(message.data) as GenieStreamEvent, + ); } catch { // Malformed SSE data } @@ -342,13 +341,19 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { : prev; }); }, - }).then(() => { - if (!abortController.signal.aborted) { - setStatus((prev) => (prev === "error" ? "error" : "idle")); - } - }); + }) + .then(() => { + if (!abortController.signal.aborted) { + setStatus((prev) => (prev === "error" ? "error" : "idle")); + } + }) + .catch(() => { + if (abortController.signal.aborted) return; + setError("Connection error. Please try again."); + setStatus("error"); + }); }, - [alias, basePath, processStreamEvent], + [alias, basePath], ); /** Creates an AbortController, stores it in the given ref, and fetches a conversation page. */ @@ -404,7 +409,9 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { signal: parentAbortController.signal, onMessage: async (message) => { try { - processStreamEvent(JSON.parse(message.data) as GenieStreamEvent); + processStreamEventRef.current( + JSON.parse(message.data) as GenieStreamEvent, + ); } catch { // Malformed SSE data } @@ -418,13 +425,19 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { ); setStatus("error"); }, - }).then(() => { - if (!parentAbortController.signal.aborted) { - setStatus((prev) => (prev === "error" ? "error" : "idle")); - } - }); + }) + .then(() => { + if (!parentAbortController.signal.aborted) { + setStatus((prev) => (prev === "error" ? "error" : "idle")); + } + }) + .catch(() => { + if (parentAbortController.signal.aborted) return; + setError("Failed to poll pending message."); + setStatus("error"); + }); }, - [alias, basePath, processStreamEvent], + [alias, basePath], ); const loadHistory = useCallback( diff --git a/packages/appkit/src/connectors/genie/client.ts b/packages/appkit/src/connectors/genie/client.ts index 68a4501e..5be91b6e 100644 --- a/packages/appkit/src/connectors/genie/client.ts +++ b/packages/appkit/src/connectors/genie/client.ts @@ -183,7 +183,7 @@ export class GenieConnector { spaceId: string, content: string, conversationId: string | undefined, - options?: { timeout?: number }, + options?: { timeout?: number; signal?: AbortSignal }, ): AsyncGenerator { try { const { @@ -289,6 +289,7 @@ export class GenieConnector { includeQueryResults?: boolean; pageSize?: number; pageToken?: string; + signal?: AbortSignal; }, ): AsyncGenerator { const includeQueryResults = options?.includeQueryResults !== false; @@ -391,16 +392,16 @@ export class GenieConnector { spaceId: string, conversationId: string, messageId: string, - options?: { timeout?: number; pollInterval?: number }, + options?: { timeout?: number; pollInterval?: number; signal?: AbortSignal }, ): AsyncGenerator { - const timeout = options?.timeout ?? this.config.timeout; const pollInterval = options?.pollInterval ?? 3_000; - const deadline = - timeout > 0 ? Date.now() + timeout : Number.POSITIVE_INFINITY; + const signal = options?.signal; let lastStatus = ""; try { while (true) { + if (signal?.aborted) return; + const message = await workspaceClient.genie.getMessage({ space_id: spaceId, conversation_id: conversationId, @@ -427,14 +428,20 @@ export class GenieConnector { return; } - if (Date.now() >= deadline) { - yield { type: "error", error: "Message polling timed out" }; - return; - } - - await new Promise((r) => setTimeout(r, pollInterval)); + await new Promise((resolve) => { + const timer = setTimeout(resolve, pollInterval); + signal?.addEventListener( + "abort", + () => { + clearTimeout(timer); + resolve(); + }, + { once: true }, + ); + }); } } catch (error) { + if (signal?.aborted) return; logger.error( "Genie getMessage poll error (spaceId=%s, conversationId=%s, messageId=%s): %O", spaceId, diff --git a/packages/appkit/src/plugins/genie/genie.ts b/packages/appkit/src/plugins/genie/genie.ts index 8fe69d15..712aadbf 100644 --- a/packages/appkit/src/plugins/genie/genie.ts +++ b/packages/appkit/src/plugins/genie/genie.ts @@ -123,13 +123,13 @@ export class GeniePlugin extends Plugin { await this.executeStream( res, - () => + (signal) => this.genieConnector.streamSendMessage( workspaceClient, spaceId, content, conversationId, - { timeout }, + { timeout, signal }, ), streamSettings, ); @@ -175,12 +175,12 @@ export class GeniePlugin extends Plugin { await this.executeStream( res, - () => + (signal) => this.genieConnector.streamConversation( workspaceClient, spaceId, conversationId, - { includeQueryResults, pageToken }, + { includeQueryResults, pageToken, signal }, ), streamSettings, ); @@ -227,13 +227,13 @@ export class GeniePlugin extends Plugin { await this.executeStream( res, - () => + (signal) => this.genieConnector.streamGetMessage( workspaceClient, spaceId, conversationId, messageId, - { timeout }, + { timeout, signal }, ), streamSettings, );