Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions src/libs/Network/SequentialQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
endRequestAndRemoveFromQueue as endPersistedRequestAndRemoveFromQueue,
getAll as getAllPersistedRequests,
getCommands,
getOngoingRequest as getPersistedOngoingRequest,
onCrossTabRequestsMerged as onPersistedRequestsCrossTabMerge,
onInitialization as onPersistedRequestsInitialization,
processNextRequest as processNextPersistedRequest,
Expand Down Expand Up @@ -135,13 +136,15 @@ function process(): Promise<void> {
}

const persistedRequests = getAllPersistedRequests();
const ongoingRequest = getPersistedOngoingRequest();

Log.info('[SequentialQueue] process() called', false, {
persistedRequestsLength: persistedRequests.length,
hasOngoingRequest: !!ongoingRequest,
isSequentialQueueRunning,
});

if (persistedRequests.length === 0) {
if (persistedRequests.length === 0 && !ongoingRequest) {
Log.info('[SequentialQueue] Unable to process. No requests to process.');
return Promise.resolve();
}
Expand Down Expand Up @@ -289,23 +292,26 @@ function flush(shouldResetPromise = true) {
}

const currentPersistedRequests = getAllPersistedRequests();
const currentOngoingRequest = getPersistedOngoingRequest();
const persistedRequestsLength = currentPersistedRequests.length;
const hasOnyxUpdates = !isEmpty();

Log.info('[SequentialQueue] flush() called', false, {
shouldResetPromise,
persistedRequestsLength,
hasOngoingRequest: !!currentOngoingRequest,
hasQueuedOnyxUpdates: hasOnyxUpdates,
isClientTheLeader: isClientTheLeader(),
});

if (persistedRequestsLength === 0 && !hasOnyxUpdates) {
if (persistedRequestsLength === 0 && !currentOngoingRequest && !hasOnyxUpdates) {
Comment thread
MonilBhavsar marked this conversation as resolved.
Log.info('[SequentialQueue] Unable to flush. No requests or queued Onyx updates to process.');
return;
}

Log.info('[SequentialQueue] Checking if client is leader', false, {
persistedRequestsLength,
hasOngoingRequest: !!currentOngoingRequest,
hasOnyxUpdates,
});

Expand All @@ -314,12 +320,14 @@ function flush(shouldResetPromise = true) {
if (!isClientTheLeader()) {
Log.info('[SequentialQueue] Unable to flush. Client is not the leader.', false, {
persistedRequestsLength,
hasOngoingRequest: !!currentOngoingRequest,
});
return;
}

Log.info('[SequentialQueue] Starting queue processing', false, {
persistedRequestsLength,
hasOngoingRequest: !!currentOngoingRequest,
persistedCommands: getCommands(currentPersistedRequests),
});

Expand All @@ -342,22 +350,25 @@ function flush(shouldResetPromise = true) {
callback: () => {
Log.info('[SequentialQueue] PERSISTED_REQUESTS loaded, starting process()', false, {
requestsLength: getAllPersistedRequests().length,
ongoingCommand: getPersistedOngoingRequest()?.command ?? 'null',
});
Onyx.disconnect(connection);
process().finally(() => {
const remainingRequests = getAllPersistedRequests().length;
const remainingPersistedRequests = getAllPersistedRequests().length;
const hasOngoingRequest = !!getPersistedOngoingRequest();
const hasRemainingRequests = remainingPersistedRequests > 0 || hasOngoingRequest;
Log.info('[SequentialQueue] Finished processing queue.', false, {
remainingRequests,
remainingRequests: remainingPersistedRequests,
isOffline: isOfflineNetwork(),
willResolvePromise: isOfflineNetwork() || remainingRequests === 0,
willResolvePromise: isOfflineNetwork() || !hasRemainingRequests,
});

isSequentialQueueRunning = false;
// Use isOfflineNetwork() — not isQueuePaused — to decide whether to resolve isReadyPromise.
// isQueuePaused is true for both offline pauses AND shouldPauseQueue (data gap sync).
// For shouldPauseQueue, WRITEs are still pending so READs must wait (don't resolve).
// For offline, the queue can't process anyway so READs should proceed (resolve).
if (isOfflineNetwork() || remainingRequests === 0) {
if (isOfflineNetwork() || !hasRemainingRequests) {
Log.info('[SequentialQueue] Resolving isReadyPromise', false, {
reason: isOfflineNetwork() ? 'offline' : 'queue empty',
});
Expand All @@ -366,7 +377,7 @@ function flush(shouldResetPromise = true) {
currentRequestPromise = null;

// The queue can be paused when we sync the data with backend so we should only update the Onyx data when the queue is empty
if (remainingRequests === 0) {
if (!hasRemainingRequests) {
Log.info('[SequentialQueue] Queue is empty, flushing Onyx updates');
flushOnyxUpdatesQueue()?.then(() => {
const queueFlushedData = getQueueFlushedData();
Expand All @@ -386,7 +397,8 @@ function flush(shouldResetPromise = true) {
});
} else {
Log.info('[SequentialQueue] Queue still has requests, NOT flushing Onyx updates', false, {
remainingRequests,
remainingRequests: remainingPersistedRequests,
hasOngoingRequest,
});
}
});
Expand All @@ -404,18 +416,20 @@ function unpause() {
}

const currentPersistedRequests = getAllPersistedRequests();
const currentOngoingRequest = getPersistedOngoingRequest();
const numberOfPersistedRequests = currentPersistedRequests.length;
const persistedCommands = getCommands(currentPersistedRequests);

Log.info('[SequentialQueue] Unpausing the queue', false, {
numberOfPersistedRequests,
hasOngoingRequest: !!currentOngoingRequest,
persistedCommands,
});

isQueuePaused = false;

// If there are no persisted requests, we need to flush the Onyx updates queue
if (numberOfPersistedRequests === 0) {
if (numberOfPersistedRequests === 0 && !currentOngoingRequest) {
Log.info('[SequentialQueue] No persisted requests, flushing Onyx updates queue');
flushOnyxUpdatesQueue();
}
Expand Down
9 changes: 4 additions & 5 deletions src/libs/actions/App.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,9 @@ Onyx.connectWithoutView({
return;
}

Onyx.clear(KEYS_TO_PRESERVE).then(() => {
clearOnyxAndResetApp().finally(() => {
// Set this to false to reset the flag for this client
Onyx.set(ONYXKEYS.RESET_REQUIRED, false);

openApp();
});
},
});
Expand Down Expand Up @@ -847,11 +845,11 @@ function setPreservedAccount(account: OnyxTypes.Account) {
function clearOnyxAndResetApp(shouldNavigateToHomepage?: boolean) {
// The value of isUsingImportedState will be lost once Onyx is cleared, so we need to store it
const isStateImported = isUsingImportedState;
rollbackOngoingRequest();
const sequentialQueue = getAll();

rollbackOngoingRequest();
Navigation.clearPreloadedRoutes();
Onyx.clear(KEYS_TO_PRESERVE)
const resetPromise = Onyx.clear(KEYS_TO_PRESERVE)
.then(() => {
// Network key is preserved, so when exiting imported state, we should:
// 1. Stop forcing offline mode so the app can reconnect
Expand Down Expand Up @@ -893,6 +891,7 @@ function clearOnyxAndResetApp(shouldNavigateToHomepage?: boolean) {
});
});
clearSoundAssetsCache();
return resetPromise;
}

/**
Expand Down
52 changes: 42 additions & 10 deletions src/libs/actions/PersistedRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Onyx.connectWithoutView({
}
}

if (!isInitialized && persistedRequests.length > 0) {
if (!isInitialized && (persistedRequests.length > 0 || !!ongoingRequest)) {
Log.info('[PersistedRequests] Triggering initialization callback', false);
triggerInitializationCallback();
}
Expand All @@ -172,6 +172,11 @@ Onyx.connectWithoutView({
diskValue: val?.command ?? 'null',
changed: previousOngoingRequest !== ongoingRequest,
});

if (isInitialized && ongoingRequest && previousOngoingRequest !== ongoingRequest) {
Log.info('[PersistedRequests] Triggering initialization callback from ongoing request', false);
triggerInitializationCallback();
}
},
});

Expand Down Expand Up @@ -320,13 +325,30 @@ function update<TKey extends OnyxKey>(oldRequestIndex: number, newRequest: Reque
return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests));
}

function shouldPersistOngoingRequest(request: AnyRequest | null): boolean {
if (!request?.data) {
return true;
}

return !Object.values(request.data).some((value) => {
const isFile = typeof File !== 'undefined' && value instanceof File;
const isBlob = typeof Blob !== 'undefined' && value instanceof Blob;
return isFile || isBlob;
});
}

function updateOngoingRequest<TKey extends OnyxKey>(newRequest: Request<TKey>) {
Log.info('[PersistedRequests] Updating the ongoing request', false, {ongoingRequest, newRequest});
ongoingRequest = newRequest as AnyRequest;

if (newRequest.persistWhenOngoing) {
Onyx.set(ONYXKEYS.PERSISTED_ONGOING_REQUESTS, newRequest as AnyRequest);
if (shouldPersistOngoingRequest(ongoingRequest)) {
trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_ONGOING_REQUESTS, newRequest as AnyRequest));
return;
}

trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_ONGOING_REQUESTS, null)).finally(() => {
ongoingRequest = newRequest as AnyRequest;
});
}

function processNextRequest(): AnyRequest | null {
Expand Down Expand Up @@ -373,13 +395,23 @@ function processNextRequest(): AnyRequest | null {
// (e.g. File objects in data.file or data.receipt). IndexedDB cannot clone
// native File objects (DataCloneError). These requests cannot survive a crash
// anyway since File references are lost on restart.
const hasNonSerializableData = ongoingRequest?.data && Object.values(ongoingRequest.data).some((v) => v instanceof File || v instanceof Blob);
trackOnyxWrite(
Onyx.multiSet({
[ONYXKEYS.PERSISTED_REQUESTS]: persistedRequests,
...(ongoingRequest && !hasNonSerializableData ? {[ONYXKEYS.PERSISTED_ONGOING_REQUESTS]: ongoingRequest} : {}),
}),
);
if (shouldPersistOngoingRequest(ongoingRequest)) {
trackOnyxWrite(
Onyx.multiSet({
[ONYXKEYS.PERSISTED_REQUESTS]: persistedRequests,
[ONYXKEYS.PERSISTED_ONGOING_REQUESTS]: ongoingRequest,
}),
);
} else {
trackOnyxWrite(
Onyx.multiSet({
[ONYXKEYS.PERSISTED_REQUESTS]: persistedRequests,
[ONYXKEYS.PERSISTED_ONGOING_REQUESTS]: null,
}),
).finally(() => {
ongoingRequest = nextRequest;
});
}

// Return the local reference, not `ongoingRequest`. The Onyx.multiSet above
// triggers a synchronous callback (Onyx 3.0.46+) that overwrites `ongoingRequest`
Expand Down
40 changes: 40 additions & 0 deletions tests/actions/AppTest.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import {waitFor} from '@testing-library/react-native';
import type {OnyxCollection} from 'react-native-onyx';
import Onyx from 'react-native-onyx';
import DateUtils from '@libs/DateUtils';
import '@libs/Navigation/AppNavigator/AuthScreens';
import Navigation from '@libs/Navigation/Navigation';
import OnyxUpdateManager from '@src/libs/actions/OnyxUpdateManager';
import ONYXKEYS from '@src/ONYXKEYS';
import type {Policy} from '@src/types/onyx';
import * as App from '../../src/libs/actions/App';
import * as PersistedRequests from '../../src/libs/actions/PersistedRequests';
import type Request from '../../src/types/onyx/Request';
import getOnyxValue from '../utils/getOnyxValue';
import * as TestHelper from '../utils/TestHelper';
import waitForBatchedUpdates from '../utils/waitForBatchedUpdates';
Expand Down Expand Up @@ -103,6 +107,42 @@ describe('actions/App', () => {
expect(reconnectApp).toHaveBeenCalledTimes(0);
});

test('clearOnyxAndResetApp preserves rolled-back ongoing requests across reset', async () => {
const persistedRequest: Request<'reportMetadata_1' | 'reportMetadata_2'> = {
command: 'AddComment',
successData: [{key: 'reportMetadata_1', onyxMethod: 'merge', value: {}}],
failureData: [{key: 'reportMetadata_2', onyxMethod: 'merge', value: {}}],
requestID: 123,
};

jest.spyOn(Navigation, 'clearPreloadedRoutes').mockImplementation(() => {});
await Onyx.set(ONYXKEYS.NETWORK, {shouldForceOffline: true});
await PersistedRequests.save(persistedRequest);
await waitForBatchedUpdates();

PersistedRequests.processNextRequest();
await waitForBatchedUpdates();

expect(PersistedRequests.getOngoingRequest()).toEqual(persistedRequest);

await App.clearOnyxAndResetApp();
await waitForBatchedUpdates();

await waitFor(async () => {
const diskQueue = (await getOnyxValue(ONYXKEYS.PERSISTED_REQUESTS)) ?? [];
expect(diskQueue).toEqual(
expect.arrayContaining([
expect.objectContaining({
command: 'AddComment',
requestID: 123,
isRollback: true,
}),
]),
);
expect((await getOnyxValue(ONYXKEYS.PERSISTED_ONGOING_REQUESTS)) == null).toBe(true);
});
});

describe('getNonOptimisticPolicyIDs', () => {
it('should return empty array when policies is empty object', () => {
const result = App.getNonOptimisticPolicyIDs({});
Expand Down
62 changes: 62 additions & 0 deletions tests/unit/PersistedRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,35 @@ describe('PersistedRequests', () => {
expect(PersistedRequests.getOngoingRequest()).toEqual(newRequest);
});

it('updateOngoingRequest should clear persisted ongoing request when data contains a File/Blob', async () => {
PersistedRequests.processNextRequest();
await waitForBatchedUpdates();

const originalFile = global.File;
function MockFile() {}
global.File = MockFile as unknown as typeof File;

try {
const mockFilePrototype = MockFile.prototype as Record<string, never>;
const mockFile = Object.create(mockFilePrototype) as File;
const newRequest: Request<'reportMetadata_1' | 'reportMetadata_2'> = {
command: 'OpenReport',
successData: [{key: 'reportMetadata_1', onyxMethod: 'set', value: {}}],
failureData: [{key: 'reportMetadata_2', onyxMethod: 'set', value: {}}],
requestID: 5,
data: {file: mockFile},
};

PersistedRequests.updateOngoingRequest(newRequest);
await waitForBatchedUpdates();

expect(PersistedRequests.getOngoingRequest()).toEqual(newRequest);
expect((await OnyxUtils.get(ONYXKEYS.PERSISTED_ONGOING_REQUESTS)) == null).toBe(true);
} finally {
global.File = originalFile;
}
});

it('when removing a request should update the persistedRequests queue and clear the ongoing request', () => {
PersistedRequests.processNextRequest();
expect(PersistedRequests.getOngoingRequest()).toEqual(request);
Expand Down Expand Up @@ -168,6 +197,39 @@ describe('PersistedRequests persistence guarantees', () => {
});
});

it('processNextRequest should keep the in-memory ongoing request when data contains a File/Blob', async () => {
PersistedRequests.clear();
await waitForBatchedUpdates();

const originalFile = global.File;
function MockFile() {}
global.File = MockFile as unknown as typeof File;

try {
const mockFilePrototype = MockFile.prototype as Record<string, never>;
const mockFile = Object.create(mockFilePrototype) as File;
const requestWithFile: Request<'reportMetadata_1' | 'reportMetadata_2'> = {
command: 'OpenReport',
successData: [{key: 'reportMetadata_1', onyxMethod: 'merge', value: {}}],
failureData: [{key: 'reportMetadata_2', onyxMethod: 'merge', value: {}}],
requestID: 30,
data: {file: mockFile},
};

PersistedRequests.save(requestWithFile);
await waitForBatchedUpdates();

const nextRequest = PersistedRequests.processNextRequest();
await waitForBatchedUpdates();

expect(nextRequest).toEqual(requestWithFile);
expect(PersistedRequests.getOngoingRequest()).toEqual(requestWithFile);
expect((await OnyxUtils.get(ONYXKEYS.PERSISTED_ONGOING_REQUESTS)) == null).toBe(true);
} finally {
global.File = originalFile;
}
});

// BUG: save() at PersistedRequests.ts:124-134 does a read-modify-write
// on the in-memory array and fires Onyx.set() without awaiting. The connect
// callback at PersistedRequests.ts:32 (persistedRequests = diskRequests)
Expand Down
Loading
Loading