Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update fetcher based on feedback
Signed-off-by: Assem Hafez <assem.hafez@uber.com>
  • Loading branch information
Assem-Uber committed Nov 5, 2025
commit 83e390ddee4407860e8c0a7dbcae60a0691f378c
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ import mswMockEndpoints from '@/test-utils/msw-mock-handlers/helper/msw-mock-end
import workflowHistoryMultiPageFixture from '../../__fixtures__/workflow-history-multi-page-fixture';
import WorkflowHistoryFetcher from '../workflow-history-fetcher';

describe(WorkflowHistoryFetcher.name, () => {
let queryClient: QueryClient;
const RETRY_DELAY = 3000;
const RETRY_COUNT = 3;

let queryClient: QueryClient;
let hoistedFetcher: WorkflowHistoryFetcher;

describe(WorkflowHistoryFetcher.name, () => {
beforeEach(() => {
queryClient = new QueryClient({
defaultOptions: {
Expand All @@ -26,6 +30,7 @@ describe(WorkflowHistoryFetcher.name, () => {

afterEach(() => {
queryClient.clear();
hoistedFetcher?.unmount();
});

it('should return the current query state from getCurrentState', async () => {
Expand All @@ -34,8 +39,6 @@ describe(WorkflowHistoryFetcher.name, () => {
const initialState = fetcher.getCurrentState();
expect(initialState.data).toBeUndefined();
expect(initialState.status).toBe('pending');

fetcher.unmount();
});

it('should call onChange callback on state changes', async () => {
Expand All @@ -50,7 +53,6 @@ describe(WorkflowHistoryFetcher.name, () => {
await waitFor(() => {
expect(callback.mock.calls.length).toBeGreaterThan(initialCallCount);
});
fetcher.unmount();
});

it('should return unsubscribe function', async () => {
Expand All @@ -61,6 +63,7 @@ describe(WorkflowHistoryFetcher.name, () => {
const unsubscribe1 = fetcher.onChange(callback1);
fetcher.onChange(callback2);

// Fetch the first page
fetcher.start((state) => !state?.data?.pages?.length);

await waitFor(() => {
Expand All @@ -78,11 +81,9 @@ describe(WorkflowHistoryFetcher.name, () => {
countBeforeUnsubscribe
);
});

fetcher.unmount();
});

it('should respect shouldContinue callback', async () => {
it('should not fetch any pages if shouldContinue callback returns false', async () => {
const { fetcher } = setup(queryClient);
const shouldContinue = jest.fn(() => false);

Expand All @@ -95,8 +96,6 @@ describe(WorkflowHistoryFetcher.name, () => {

const state = fetcher.getCurrentState();
expect(state.data?.pages || []).toHaveLength(0);

fetcher.unmount();
});

it('should stop after shouldContinue returns false', async () => {
Expand All @@ -112,8 +111,6 @@ describe(WorkflowHistoryFetcher.name, () => {
expect(state.isFetching).toBe(false);
expect(state.data?.pages).toHaveLength(2);
});

fetcher.unmount();
});

it('should load all pages and auto-stop when there are no more pages', async () => {
Expand All @@ -126,8 +123,6 @@ describe(WorkflowHistoryFetcher.name, () => {
expect(state.hasNextPage).toBe(false);
expect(state.data?.pages).toHaveLength(3);
});

fetcher.unmount();
});

it('should auto-stop on error after initial success', async () => {
Expand All @@ -144,17 +139,15 @@ describe(WorkflowHistoryFetcher.name, () => {
expect(state.data?.pages).toHaveLength(1);
});

// Fast-forward through retry delays (3 retries * 3000ms each)
await jest.advanceTimersByTimeAsync(3 * 3000);
// Fast-forward through retry delays
await jest.advanceTimersByTimeAsync(RETRY_COUNT * RETRY_DELAY);

await waitFor(() => {
const state = fetcher.getCurrentState();
expect(state.isFetching).toBe(false);
expect(state.isError).toBe(true);
expect(state.data?.pages).toHaveLength(1);
});

fetcher.unmount();
} finally {
jest.useRealTimers();
}
Expand All @@ -178,8 +171,6 @@ describe(WorkflowHistoryFetcher.name, () => {
expect(state.isFetching).toBe(false);
expect(state.data?.pages).toHaveLength(1);
});

fetcher.unmount();
});

it('should allow start again after stop', async () => {
Expand Down Expand Up @@ -210,7 +201,6 @@ describe(WorkflowHistoryFetcher.name, () => {

const finalState = fetcher.getCurrentState();
expect(finalState.data?.pages).toHaveLength(3);
fetcher.unmount();
});

it('should fetch next page when available', async () => {
Expand All @@ -230,8 +220,6 @@ describe(WorkflowHistoryFetcher.name, () => {
const state = fetcher.getCurrentState();
expect(state.data?.pages).toHaveLength(2);
});

fetcher.unmount();
});

it('should not fetch when already fetching', async () => {
Expand All @@ -256,8 +244,6 @@ describe(WorkflowHistoryFetcher.name, () => {

const state = fetcher.getCurrentState();
expect(state.data?.pages).toHaveLength(2);

fetcher.unmount();
});

it('should not fetch when no next page available', async () => {
Expand All @@ -275,7 +261,6 @@ describe(WorkflowHistoryFetcher.name, () => {

const state = fetcher.getCurrentState();
expect(state.data?.pages.length).toBe(pageCountBefore);
fetcher.unmount();
});
});

Expand All @@ -289,8 +274,8 @@ function setup(client: QueryClient, options: { failOnPages?: number[] } = {}) {
};

mockHistoryEndpoint(workflowHistoryMultiPageFixture, options.failOnPages);

const fetcher = new WorkflowHistoryFetcher(client, params);
hoistedFetcher = fetcher;

const waitForData = async () => {
let unsubscribe: (() => void) | undefined;
Expand Down Expand Up @@ -325,7 +310,7 @@ function mockHistoryEndpoint(

// Determine current page number based on nextPage param
let pageNumber = 1;
if (!nextPage || nextPage === 'null' || nextPage === 'undefined') {
if (!nextPage) {
pageNumber = 1;
} else if (nextPage === 'page2') {
pageNumber = 2;
Expand Down
52 changes: 24 additions & 28 deletions src/views/workflow-history/helpers/workflow-history-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,36 @@ import {
type GetWorkflowHistoryResponse,
} from '@/route-handlers/get-workflow-history/get-workflow-history.types';
import request from '@/utils/request';
import { type RequestError } from '@/utils/request/request-error';

import {
type WorkflowHistoryQueryResult,
type QueryResultOnChangeCallback,
type ShouldContinueCallback,
type WorkflowHistoryQueryKey,
type WorkflowHistoryReactQueryParams,
type WorkflowHistoryInfiniteQueryOptions,
type WorkflowHistoryInfiniteQueryObserver,
} from './workflow-history-fetcher.types';

export default class WorkflowHistoryFetcher {
private observer: InfiniteQueryObserver<
GetWorkflowHistoryResponse,
RequestError
>;
private observer: WorkflowHistoryInfiniteQueryObserver;

private unsubscribe: (() => void) | null = null;
private isStarted = false;
private shouldContinue: ShouldContinueCallback = () => true;

constructor(
private readonly queryClient: QueryClient,
private readonly params: WorkflowHistoryQueryParams
private readonly params: WorkflowHistoryReactQueryParams
) {
this.observer = new InfiniteQueryObserver<
GetWorkflowHistoryResponse,
RequestError
>(this.queryClient, {
this.observer = new InfiniteQueryObserver(this.queryClient, {
...this.buildObserverOptions(this.params),
});
}

onChange(callback: QueryResultOnChangeCallback): () => void {
const current = this.getCurrentState();
if (current) callback(current);
return this.observer.subscribe((res: any) => {
return this.observer.subscribe((res) => {
callback(res);
});
}
Expand All @@ -55,8 +50,7 @@ export default class WorkflowHistoryFetcher {
let emitCount = 0;
const currentState = this.observer.getCurrentResult();
const fetchedFirstPage = currentState.status !== 'pending';
const shouldEnableQuery =
(!fetchedFirstPage && shouldContinue(currentState)) || fetchedFirstPage;
const shouldEnableQuery = !fetchedFirstPage && shouldContinue(currentState);

if (shouldEnableQuery) {
this.observer.setOptions({
Expand All @@ -68,7 +62,7 @@ export default class WorkflowHistoryFetcher {
const emit = (res: WorkflowHistoryQueryResult) => {
emitCount++;

// Auto stop when there are no more pages (end of history) or when there is a fresh error happens after the start.
// Auto stop when there are no more pages (end of history) or when there is an existing error from last start (emitCount === 1 means this is the first emit in the current start).
// isError is true when the request failes and retries are exhausted.
if (res.hasNextPage === false || (res.isError && emitCount > 1)) {
this.stop();
Expand All @@ -81,15 +75,14 @@ export default class WorkflowHistoryFetcher {
}
};

// only start emit (fetching next pages) after the initial fetch is complete
// first page is already fetched on the first subscription below
// Manual emit is needed to fetch the first next page after start is called.
// While this manual emit is not needed for on the first history page as enabling the query will fetch it automatically.
if (fetchedFirstPage) {
emit(currentState);
}

if (this.unsubscribe) {
this.unsubscribe();
}
// remove current listener (if exists) and add new one
this.unsubscribe?.();
this.unsubscribe = this.observer.subscribe((res) => emit(res));
}

Expand All @@ -107,7 +100,8 @@ export default class WorkflowHistoryFetcher {

fetchSingleNextPage(): void {
const state = this.getCurrentState();

// If the query is still pending, enable it to fetch the first page.
// Otherwise, fetch the next page if it is not already fetching and there are more pages.
if (state.status === 'pending') {
this.observer.setOptions({
...this.buildObserverOptions(this.params),
Expand All @@ -117,21 +111,23 @@ export default class WorkflowHistoryFetcher {
state.fetchNextPage();
}

getCurrentState(): WorkflowHistoryQueryResult {
getCurrentState() {
return this.observer.getCurrentResult();
}

private buildObserverOptions(params: WorkflowHistoryQueryParams) {
private buildObserverOptions(
queryParams: WorkflowHistoryReactQueryParams
): WorkflowHistoryInfiniteQueryOptions {
return {
queryKey: ['workflow_history', params] satisfies WorkflowHistoryQueryKey,
queryFn: ({ queryKey: [_, qp], pageParam }: any) =>
queryKey: ['workflow_history', queryParams],
queryFn: ({ queryKey: [_, params], pageParam }) =>
request(
queryString.stringifyUrl({
url: `/api/domains/${qp.domain}/${qp.cluster}/workflows/${qp.workflowId}/${qp.runId}/history`,
url: `/api/domains/${params.domain}/${params.cluster}/workflows/${params.workflowId}/${params.runId}/history`,
query: {
nextPage: pageParam,
pageSize: qp.pageSize,
waitForNewEvent: qp.waitForNewEvent ?? false,
pageSize: params.pageSize,
waitForNewEvent: params.waitForNewEvent ?? false,
} satisfies WorkflowHistoryQueryParams,
})
).then((res) => res.json()),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
import {
type InfiniteQueryObserver,
type InfiniteData,
type InfiniteQueryObserverResult,
type UseInfiniteQueryOptions,
} from '@tanstack/react-query';

import {
type WorkflowHistoryQueryParams,
type GetWorkflowHistoryResponse,
type RouteParams,
} from '@/route-handlers/get-workflow-history/get-workflow-history.types';
import { type RequestError } from '@/utils/request/request-error';

export type WorkflowHistoryQueryKey = [string, WorkflowHistoryQueryParams];
export type WorkflowHistoryReactQueryParams = RouteParams &
WorkflowHistoryQueryParams;

export type WorkflowHistoryQueryResult = InfiniteQueryObserverResult<
InfiniteData<GetWorkflowHistoryResponse, unknown>,
RequestError
export type WorkflowHistoryInfiniteQueryObserver = InfiniteQueryObserver<
GetWorkflowHistoryResponse,
RequestError,
InfiniteData<GetWorkflowHistoryResponse>,
GetWorkflowHistoryResponse,
WorkflowHistoryQueryKey,
string | undefined
>;
export type WorkflowHistoryQueryKey = [string, WorkflowHistoryReactQueryParams];

export type WorkflowHistoryInfiniteQueryOptions = UseInfiniteQueryOptions<
GetWorkflowHistoryResponse,
RequestError,
InfiniteData<GetWorkflowHistoryResponse>,
GetWorkflowHistoryResponse,
WorkflowHistoryQueryKey,
string | undefined
>;
export type WorkflowHistoryQueryResult = ReturnType<
WorkflowHistoryInfiniteQueryObserver['getCurrentResult']
>;

export type QueryResultOnChangeCallback = (
state: WorkflowHistoryQueryResult
) => void;
Expand Down