diff --git a/.changeset/moody-ghosts-decide.md b/.changeset/moody-ghosts-decide.md new file mode 100644 index 0000000000..81bac1be6c --- /dev/null +++ b/.changeset/moody-ghosts-decide.md @@ -0,0 +1,6 @@ +--- +"@workflow/web-shared": patch +"@workflow/web": patch +--- + +Fix traceviewer pagination issues diff --git a/packages/web-shared/src/components/run-trace-view.tsx b/packages/web-shared/src/components/run-trace-view.tsx index 0c643e1522..e9ff44896d 100644 --- a/packages/web-shared/src/components/run-trace-view.tsx +++ b/packages/web-shared/src/components/run-trace-view.tsx @@ -27,6 +27,9 @@ interface RunTraceViewProps { onCancelRun?: (runId: string) => Promise; onStreamClick?: (streamId: string) => void; onSpanSelect?: (info: SpanSelectionInfo) => void; + onLoadMoreSpans?: () => void | Promise; + hasMoreSpans?: boolean; + isLoadingMoreSpans?: boolean; } export function RunTraceView({ @@ -44,6 +47,9 @@ export function RunTraceView({ onCancelRun, onStreamClick, onSpanSelect, + onLoadMoreSpans, + hasMoreSpans, + isLoadingMoreSpans, }: RunTraceViewProps) { if (error && !run) { return ( @@ -72,6 +78,9 @@ export function RunTraceView({ onCancelRun={onCancelRun} onStreamClick={onStreamClick} onSpanSelect={onSpanSelect} + onLoadMoreSpans={onLoadMoreSpans} + hasMoreSpans={hasMoreSpans} + isLoadingMoreSpans={isLoadingMoreSpans} /> ); diff --git a/packages/web-shared/src/components/workflow-trace-view.tsx b/packages/web-shared/src/components/workflow-trace-view.tsx index 496dc20e28..ea3b84a836 100644 --- a/packages/web-shared/src/components/workflow-trace-view.tsx +++ b/packages/web-shared/src/components/workflow-trace-view.tsx @@ -278,6 +278,9 @@ function TraceViewerWithContextMenu({ onWakeUpSleep, onCancelRun, onResolveHook, + onLoadMoreSpans, + hasMoreSpans = false, + isLoadingMoreSpans = false, children, }: { trace: { spans: Span[] }; @@ -294,9 +297,12 @@ function TraceViewerWithContextMenu({ payload: unknown, hook?: Hook ) => Promise; + onLoadMoreSpans?: () => void | Promise; + hasMoreSpans?: boolean; + isLoadingMoreSpans?: boolean; children: ReactNode; }): ReactNode { - const { dispatch } = useTraceViewer(); + const { state, dispatch } = useTraceViewer(); // Drive active span widths at 60fps without React re-renders useLiveTick(isLive); @@ -413,6 +419,37 @@ function TraceViewerWithContextMenu({ }; }, [handleContextMenu]); + const loadingMoreRef = useRef(false); + useEffect(() => { + const timeline = state.timelineRef.current; + if (!timeline || !onLoadMoreSpans || !hasMoreSpans) { + return; + } + + const thresholdPx = 200; + const maybeLoadMore = () => { + if (loadingMoreRef.current || isLoadingMoreSpans || !hasMoreSpans) { + return; + } + const remaining = + timeline.scrollHeight - timeline.scrollTop - timeline.clientHeight; + if (remaining > thresholdPx) { + return; + } + + loadingMoreRef.current = true; + Promise.resolve(onLoadMoreSpans()).finally(() => { + loadingMoreRef.current = false; + }); + }; + + timeline.addEventListener('scroll', maybeLoadMore); + maybeLoadMore(); + return () => { + timeline.removeEventListener('scroll', maybeLoadMore); + }; + }, [state.timelineRef, onLoadMoreSpans, hasMoreSpans, isLoadingMoreSpans]); + const closeMenu = useCallback(() => { setContextMenu(null); }, []); @@ -853,6 +890,9 @@ export const WorkflowTraceViewer = ({ onStreamClick, onSpanSelect, onLoadEventData, + onLoadMoreSpans, + hasMoreSpans = false, + isLoadingMoreSpans = false, }: { run: WorkflowRun; steps: Step[]; @@ -883,6 +923,12 @@ export const WorkflowTraceViewer = ({ correlationId: string, eventId: string ) => Promise; + /** Load next trace page when vertical scroll reaches bottom. */ + onLoadMoreSpans?: () => void | Promise; + /** Whether trace pagination has more data to load. */ + hasMoreSpans?: boolean; + /** Whether trace pagination is currently fetching another page. */ + isLoadingMoreSpans?: boolean; }) => { const [selectedSpan, setSelectedSpan] = useState( null @@ -1049,6 +1095,9 @@ export const WorkflowTraceViewer = ({ onWakeUpSleep={onWakeUpSleep} onCancelRun={onCancelRun} onResolveHook={onResolveHook} + onLoadMoreSpans={onLoadMoreSpans} + hasMoreSpans={hasMoreSpans} + isLoadingMoreSpans={isLoadingMoreSpans} > diff --git a/packages/web/app/lib/client/hooks/use-trace-viewer.test.ts b/packages/web/app/lib/client/hooks/use-trace-viewer.test.ts index 17176b7d1e..9b40ae307c 100644 --- a/packages/web/app/lib/client/hooks/use-trace-viewer.test.ts +++ b/packages/web/app/lib/client/hooks/use-trace-viewer.test.ts @@ -11,11 +11,13 @@ vi.mock('~/lib/rpc-client', () => ({ fetchSteps: vi.fn(), fetchHooks: vi.fn(), fetchEvents: vi.fn(), + fetchEventsByCorrelationId: vi.fn(), })); import type { WorkflowRun } from '@workflow/world'; import { fetchEvents, + fetchEventsByCorrelationId, fetchHooks, fetchRun, fetchSteps, @@ -50,6 +52,7 @@ function emptyPage() { describe('useWorkflowTraceViewerData', () => { beforeEach(() => { vi.clearAllMocks(); + vi.mocked(fetchEventsByCorrelationId).mockReturnValue(emptyPage()); }); it('shows complete trace data on load', async () => { diff --git a/packages/web/app/lib/client/hooks/use-trace-viewer.ts b/packages/web/app/lib/client/hooks/use-trace-viewer.ts index cabcad2406..a0002565d1 100644 --- a/packages/web/app/lib/client/hooks/use-trace-viewer.ts +++ b/packages/web/app/lib/client/hooks/use-trace-viewer.ts @@ -3,6 +3,7 @@ import type { Event, Hook, Step, WorkflowRun } from '@workflow/world'; import { useCallback, useEffect, useRef, useState } from 'react'; import { fetchEvents, + fetchEventsByCorrelationId, fetchHooks, fetchRun, fetchSteps, @@ -10,14 +11,60 @@ import { import type { EnvMap } from '~/lib/types'; import { unwrapServerActionResult } from '~/lib/client/workflow-errors'; import { - fetchAllPaginated, + MAX_ITEMS, + mergeById, pollResource, } from '~/lib/client/workflow-primitives'; const LIVE_POLL_LIMIT = 10; +const TRACE_VIEWER_BATCH_SIZE = 50; const LIVE_STEP_UPDATE_INTERVAL_MS = 2000; const LIVE_UPDATE_INTERVAL_MS = 5000; +async function fetchAllEventsForCorrelationId( + env: EnvMap, + correlationId: string +): Promise { + let eventsData: Event[] = []; + let cursor: string | undefined; + + while (true) { + const { error, result } = await unwrapServerActionResult( + fetchEventsByCorrelationId(env, correlationId, { + cursor, + sortOrder: 'asc', + limit: 1000, + }) + ); + if (error) { + break; + } + + eventsData = [...eventsData, ...result.data]; + if (!result.hasMore || !result.cursor || eventsData.length >= MAX_ITEMS) { + break; + } + cursor = result.cursor; + } + + return eventsData; +} + +async function fetchEventsForCorrelationIds( + env: EnvMap, + correlationIds: string[] +): Promise { + if (correlationIds.length === 0) { + return []; + } + const results = await Promise.all( + correlationIds.map((correlationId) => + fetchAllEventsForCorrelationId(env, correlationId) + ) + ); + return results.flat(); +} + /** * Returns (and keeps up-to-date) all data related to a run. * Items returned will _not_ have resolved data (like input/output values). @@ -40,11 +87,15 @@ export function useWorkflowTraceViewerData( const [stepsCursor, setStepsCursor] = useState(); const [hooksCursor, setHooksCursor] = useState(); const [eventsCursor, setEventsCursor] = useState(); + const [stepsHasMore, setStepsHasMore] = useState(false); + const [hooksHasMore, setHooksHasMore] = useState(false); + const [eventsHasMore, setEventsHasMore] = useState(false); + const [isLoadingMoreTraceData, setIsLoadingMoreTraceData] = useState(false); const isFetchingRef = useRef(false); const [initialLoadCompleted, setInitialLoadCompleted] = useState(false); - // Fetch all data for a run + // Fetch first trace page and related events for visible spans. const fetchAllData = useCallback(async () => { if (isFetchingRef.current) { return; @@ -55,62 +106,232 @@ export function useWorkflowTraceViewerData( setAuxiliaryDataLoading(true); setError(null); - const promises = [ - unwrapServerActionResult(fetchRun(env, runId)).then( - ({ error, result }) => { - if (error) { - setError(error); - return; - } - setRun(hydrateResourceIO(result)); - return result; - } - ), - fetchAllPaginated((cursor) => + const [runResult, stepsResult, hooksResult, eventsResult] = + await Promise.all([ + unwrapServerActionResult(fetchRun(env, runId)), unwrapServerActionResult( - fetchSteps(env, runId, { cursor, sortOrder: 'asc', limit: 100 }) - ) - ).then((result) => { - setSteps(result.data.map(hydrateResourceIO)); - setStepsCursor(result.cursor); - }), - fetchAllPaginated((cursor) => + fetchSteps(env, runId, { + sortOrder: 'asc', + limit: TRACE_VIEWER_BATCH_SIZE, + }) + ), unwrapServerActionResult( fetchHooks(env, { runId, - cursor, sortOrder: 'asc', - limit: 100, + limit: TRACE_VIEWER_BATCH_SIZE, }) - ) - ).then((result) => { - setHooks(result.data.map(hydrateResourceIO)); - setHooksCursor(result.cursor); - }), - fetchAllPaginated((cursor) => + ), unwrapServerActionResult( - fetchEvents(env, runId, { cursor, sortOrder: 'asc', limit: 1000 }) - ) - ).then((result) => { - setEvents(result.data.map(hydrateResourceIO)); - setEventsCursor(result.cursor); - }), + fetchEvents(env, runId, { + sortOrder: 'asc', + limit: TRACE_VIEWER_BATCH_SIZE, + }) + ), + ]); + + if (runResult.error) { + setError(runResult.error); + } else { + setRun(hydrateResourceIO(runResult.result)); + } + + const nextSteps = stepsResult.error + ? [] + : stepsResult.result.data.map(hydrateResourceIO); + const nextHooks = hooksResult.error + ? [] + : hooksResult.result.data.map(hydrateResourceIO); + const initialEvents = eventsResult.error + ? [] + : eventsResult.result.data.map(hydrateResourceIO); + + const correlationIds = [ + ...nextSteps.map((step) => step.stepId), + ...nextHooks.map((hook) => hook.hookId), ]; + const correlationEventsRaw = await fetchEventsForCorrelationIds( + env, + correlationIds + ); + const correlationEvents = correlationEventsRaw.map(hydrateResourceIO); + + setSteps(nextSteps); + setHooks(nextHooks); + setEvents( + mergeById([], [...initialEvents, ...correlationEvents], 'eventId') + ); + + setStepsCursor( + stepsResult.error || !stepsResult.result.hasMore + ? undefined + : stepsResult.result.cursor + ); + setHooksCursor( + hooksResult.error || !hooksResult.result.hasMore + ? undefined + : hooksResult.result.cursor + ); + setEventsCursor( + eventsResult.error || !eventsResult.result.hasMore + ? undefined + : eventsResult.result.cursor + ); + setStepsHasMore(Boolean(!stepsResult.error && stepsResult.result.hasMore)); + setHooksHasMore(Boolean(!hooksResult.error && hooksResult.result.hasMore)); + setEventsHasMore( + Boolean(!eventsResult.error && eventsResult.result.hasMore) + ); - const results = await Promise.allSettled(promises); + const settledResults = [runResult, stepsResult, hooksResult, eventsResult]; setLoading(false); setAuxiliaryDataLoading(false); setInitialLoadCompleted(true); isFetchingRef.current = false; - // Just doing the first error, but would be nice to show multiple - const error = results.find((result) => result.status === 'rejected') - ?.reason as Error; - if (error) { - setError(error); - return; + + if (!runResult.error) { + const firstError = settledResults.find((result) => result.error)?.error; + if (firstError) { + setError(firstError); + } } }, [env, runId]); + const loadMoreTraceData = useCallback(async () => { + if ( + isFetchingRef.current || + !initialLoadCompleted || + isLoadingMoreTraceData + ) { + return; + } + if (!stepsHasMore && !hooksHasMore && !eventsHasMore) { + return; + } + + setIsLoadingMoreTraceData(true); + try { + const [nextStepsResult, nextHooksResult, nextEventsResult] = + await Promise.all([ + stepsHasMore + ? unwrapServerActionResult( + fetchSteps(env, runId, { + cursor: stepsCursor, + sortOrder: 'asc', + limit: TRACE_VIEWER_BATCH_SIZE, + }) + ) + : Promise.resolve({ error: null, result: null }), + hooksHasMore + ? unwrapServerActionResult( + fetchHooks(env, { + runId, + cursor: hooksCursor, + sortOrder: 'asc', + limit: TRACE_VIEWER_BATCH_SIZE, + }) + ) + : Promise.resolve({ error: null, result: null }), + eventsHasMore + ? unwrapServerActionResult( + fetchEvents(env, runId, { + cursor: eventsCursor, + sortOrder: 'asc', + limit: TRACE_VIEWER_BATCH_SIZE, + }) + ) + : Promise.resolve({ error: null, result: null }), + ]); + + if (nextStepsResult.error) { + setError(nextStepsResult.error); + } + if (nextHooksResult.error) { + setError(nextHooksResult.error); + } + if (nextEventsResult.error) { + setError(nextEventsResult.error); + } + + const nextSteps = + nextStepsResult.result?.data.map(hydrateResourceIO) ?? []; + const nextHooks = + nextHooksResult.result?.data.map(hydrateResourceIO) ?? []; + const nextEvents = + nextEventsResult.result?.data.map(hydrateResourceIO) ?? []; + + if (nextSteps.length > 0) { + setSteps((prev) => mergeById(prev, nextSteps, 'stepId')); + } + if (nextHooks.length > 0) { + setHooks((prev) => mergeById(prev, nextHooks, 'hookId')); + } + + const newCorrelationIds = [ + ...nextSteps.map((step) => step.stepId), + ...nextHooks.map((hook) => hook.hookId), + ]; + const correlationEventsRaw = await fetchEventsForCorrelationIds( + env, + newCorrelationIds + ); + const correlationEvents = correlationEventsRaw.map(hydrateResourceIO); + const allNewEvents = [...nextEvents, ...correlationEvents]; + if (allNewEvents.length > 0) { + setEvents((prev) => mergeById(prev, allNewEvents, 'eventId')); + } + + const nextStepsHasMore = nextStepsResult.error + ? stepsHasMore + : Boolean(nextStepsResult.result && nextStepsResult.result.hasMore); + const nextHooksHasMore = nextHooksResult.error + ? hooksHasMore + : Boolean(nextHooksResult.result && nextHooksResult.result.hasMore); + const nextEventsHasMore = nextEventsResult.error + ? eventsHasMore + : Boolean(nextEventsResult.result && nextEventsResult.result.hasMore); + + setStepsHasMore(nextStepsHasMore); + setHooksHasMore(nextHooksHasMore); + setEventsHasMore(nextEventsHasMore); + + if (!nextStepsResult.error) { + setStepsCursor( + nextStepsResult.result?.hasMore + ? nextStepsResult.result.cursor + : undefined + ); + } + if (!nextHooksResult.error) { + setHooksCursor( + nextHooksResult.result?.hasMore + ? nextHooksResult.result.cursor + : undefined + ); + } + if (!nextEventsResult.error) { + setEventsCursor( + nextEventsResult.result?.hasMore + ? nextEventsResult.result.cursor + : undefined + ); + } + } finally { + setIsLoadingMoreTraceData(false); + } + }, [ + env, + runId, + initialLoadCompleted, + isLoadingMoreTraceData, + stepsHasMore, + hooksHasMore, + eventsHasMore, + stepsCursor, + hooksCursor, + eventsCursor, + ]); + const pollRun = useCallback(async (): Promise => { if (run?.completedAt) { return false; @@ -254,5 +475,8 @@ export function useWorkflowTraceViewerData( auxiliaryDataLoading, error, update, + loadMoreTraceData, + hasMoreTraceData: stepsHasMore || hooksHasMore || eventsHasMore, + isLoadingMoreTraceData, }; } diff --git a/workbench/nextjs-turbopack/workflows/96_many_steps.ts b/workbench/nextjs-turbopack/workflows/96_many_steps.ts new file mode 100644 index 0000000000..52ec5d50d9 --- /dev/null +++ b/workbench/nextjs-turbopack/workflows/96_many_steps.ts @@ -0,0 +1,22 @@ +async function runIndexedStep(index: number): Promise<{ index: number }> { + 'use step'; + return { index }; +} + +export async function twoHundredStepsWorkflow() { + 'use workflow'; + + const totalSteps = 200; + const results: number[] = []; + + for (let i = 0; i < totalSteps; i++) { + const result = await runIndexedStep(i); + results.push(result.index); + } + + return { + totalSteps, + firstStep: results[0], + lastStep: results[results.length - 1], + }; +}