Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Refactor history fetcher code
Signed-off-by: Adhitya Mamallan <adhitya.mamallan@uber.com>
  • Loading branch information
adhityamamallan committed Dec 17, 2025
commit 0dc269048d39241c6fc96ea78e94cd7be1855200
132 changes: 89 additions & 43 deletions src/views/workflow-history/helpers/workflow-history-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,30 @@ import {
type GetWorkflowHistoryResponse,
} from '@/route-handlers/get-workflow-history/get-workflow-history.types';
import request from '@/utils/request';

import {
WORKFLOW_HISTORY_FIRST_PAGE_SIZE_CONFIG,
WORKFLOW_HISTORY_PAGE_SIZE_CONFIG,
} from '../config/workflow-history-page-size.config';

} from '@/views/workflow-history/config/workflow-history-page-size.config';
import {
type WorkflowHistoryQueryResult,
type QueryResultOnChangeCallback,
type ShouldContinueCallback,
type WorkflowHistoryReactQueryParams,
type WorkflowHistoryInfiniteQueryOptions,
type WorkflowHistoryInfiniteQueryObserver,
} from './workflow-history-fetcher.types';
} from '@/views/workflow-history/helpers/workflow-history-fetcher.types';

export default class WorkflowHistoryFetcher {
private observer: WorkflowHistoryInfiniteQueryObserver;

private unsubscribe: (() => void) | null = null;
private isStarted = false;
private shouldContinue: ShouldContinueCallback = () => true;

/**
* Creates a new WorkflowHistoryFetcher instance.
*
* @param queryClient - The React Query client instance
* @param params - Parameters for the workflow history query (domain, cluster, workflowId, runId, etc.)
*/
constructor(
private readonly queryClient: QueryClient,
private readonly params: WorkflowHistoryReactQueryParams
Expand All @@ -37,6 +39,13 @@ export default class WorkflowHistoryFetcher {
});
}

/**
* Subscribes to query state changes. The callback will be invoked immediately
* with the current state, and then whenever the query state changes.
*
* @param callback - Function to call when the query state changes
* @returns Unsubscribe function to stop receiving updates
*/
onChange(callback: QueryResultOnChangeCallback): () => void {
const current = this.getCurrentState();
if (current) callback(current);
Expand All @@ -45,81 +54,118 @@ export default class WorkflowHistoryFetcher {
});
}

/**
* Starts automatic pagination of workflow history. The fetcher will automatically
* fetch the next page whenever the `shouldContinue` callback returns true.
* Pagination will automatically stop when there are no more pages, or an error occurs.
*
* If called multiple times, previous subscriptions are cleaned up and a fresh
* pagination cycle begins.
*
* @param shouldContinue - Callback that determines whether to continue fetching pages.
* Receives the current query state and should return true to continue, false to stop.
* Defaults to always returning true.
* @param throttleMs - Optional throttle delay (ms) for fetching next pages.
* If greater than 0, page fetches will be throttled by this amount.
*/
start(shouldContinue: ShouldContinueCallback = () => true): void {
this.shouldContinue = shouldContinue;

// remove current listener (if exists) to have fresh emits only
this.unsubscribe?.();
this.unsubscribe = null;

this.isStarted = true;
let emitCount = 0;
const currentState = this.observer.getCurrentResult();
const fetchedFirstPage = currentState.status !== 'pending';
const shouldEnableQuery =
!fetchedFirstPage && this.shouldContinue(currentState);

if (shouldEnableQuery) {
this.observer.setOptions({
...this.buildObserverOptions(this.params),
enabled: true,
});
const hasFetchedFirstPage = currentState.status !== 'pending';
if (!hasFetchedFirstPage && this.shouldContinue(currentState)) {
this.enableQuery();
}

const emit = (res: WorkflowHistoryQueryResult) => {
emitCount++;

// Auto stop when there are no more pages (end of history) or when there is an existing error from last start (emitCount === 1 means this is the first emit in the current start).
// isError is true when the request failes and retries are exhausted.
if (res.hasNextPage === false || (res.isError && emitCount > 1)) {
this.stop();
return;
}

// Drive pagination based on external predicate
if (this.shouldContinue(res) && !res.isFetchingNextPage) {
res.fetchNextPage();
}
};
const handleStateChange = this.createStateChangeHandler();

// Manual emit is needed to fetch the first next page after start is called.
// While this manual emit is not needed for on the first history page as enabling the query will fetch it automatically.
if (fetchedFirstPage) {
emit(currentState);
// Manually trigger handler if first page of history has already been loaded
// Subscriptions only fire on future state changes
if (hasFetchedFirstPage) {
handleStateChange(currentState);
}

this.unsubscribe = this.observer.subscribe((res) => emit(res));
this.unsubscribe = this.observer.subscribe((res) => handleStateChange(res));
}

/**
* Stops automatic pagination. This will cancel any ongoing subscriptions and
* clean up throttled functions, but will not destroy the fetcher instance.
* Use `destroy()` to fully clean up the fetcher.
*/
stop(): void {
this.isStarted = false;
if (this.unsubscribe) {
this.unsubscribe();
this.unsubscribe = null;
}
}

/**
* Destroys the fetcher instance. This calls `stop()` to clean up subscriptions
* and then destroys the underlying React Query observer. After calling this,
* the fetcher instance should not be used.
*/
destroy(): void {
this.stop();
this.observer.destroy();
}

/**
* Manually fetches a single next page of workflow history. If the query is
* still pending (first page not fetched), this will enable the query, fetching
* the first page. Otherwise, it will fetch the next page if available and
* not already fetching.
*
* This method does not start automatic pagination - use `start()` for that.
*/
fetchSingleNextPage(): void {
const state = this.getCurrentState();
// If the query is still pending, enable it to fetch the first page.
// Otherwise, fetch the next page if it is not already fetching and there are more pages.
if (state.status === 'pending') {
this.observer.setOptions({
...this.buildObserverOptions(this.params),
enabled: true,
});
this.enableQuery();
} else if (!state.isFetchingNextPage && state.hasNextPage)
state.fetchNextPage();
}

/**
* Gets the current state of the workflow history query. This includes
* loading status, data, error information, and pagination state.
*
* @returns The current query result state
*/
getCurrentState() {
return this.observer.getCurrentResult();
}

private enableQuery() {
this.observer.setOptions({
...this.buildObserverOptions(this.params),
enabled: true,
});
}

private createStateChangeHandler(): (
res: WorkflowHistoryQueryResult
) => void {
let stateChangeCount = 0;

return (res: WorkflowHistoryQueryResult) => {
stateChangeCount++;

if (res.hasNextPage === false || (res.isError && stateChangeCount > 1)) {
this.stop();
return;
}

if (!this.shouldContinue(res) || res.isFetchingNextPage) return;
res.fetchNextPage();
};
}

private buildObserverOptions(
queryParams: WorkflowHistoryReactQueryParams
): WorkflowHistoryInfiniteQueryOptions {
Expand Down