-
Notifications
You must be signed in to change notification settings - Fork 203
fix stuck loading state after an electric must-refetch #532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| "@tanstack/electric-db-collection": patch | ||
| "@tanstack/db": patch | ||
| --- | ||
|
|
||
| Fixed a bug where a live query result could become inconsistent after an electric "must-refetch". | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| "@tanstack/electric-db-collection": patch | ||
| "@tanstack/db": patch | ||
| --- | ||
|
|
||
| Fixed a bug where a live query could get stuck in "loading" status when an electric "must-refetch" message arrived before the first "uo-to-date". | ||
samwillis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -395,13 +395,14 @@ export class CollectionImpl< | |
| const callbacks = [...this.onFirstReadyCallbacks] | ||
| this.onFirstReadyCallbacks = [] | ||
| callbacks.forEach((callback) => callback()) | ||
|
|
||
| // to notify subscribers (like LiveQueryCollection) that the collection is ready | ||
| if (this.changeListeners.size > 0) { | ||
| this.emitEmptyReadyEvent() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Always notify dependents when markReady is called, after status is set | ||
| // This ensures live queries get notified when their dependencies become ready | ||
| if (this.changeListeners.size > 0) { | ||
| this.emitEmptyReadyEvent() | ||
| } | ||
|
Comment on lines
+401
to
+405
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the fix for the stuck loading state |
||
| } | ||
|
|
||
| public id = `` | ||
|
|
@@ -1270,6 +1271,13 @@ export class CollectionImpl< | |
| this.syncedData.clear() | ||
| this.syncedMetadata.clear() | ||
| this.syncedKeys.clear() | ||
|
|
||
| // 3) Clear currentVisibleState for truncated keys to ensure subsequent operations | ||
| // are compared against the post-truncate state (undefined) rather than pre-truncate state | ||
| // This ensures that re-inserted keys are emitted as INSERT events, not UPDATE events | ||
| for (const key of changedKeys) { | ||
| currentVisibleState.delete(key) | ||
| } | ||
|
Comment on lines
+1275
to
+1280
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the fix for the inconsistent state |
||
| } | ||
|
|
||
| for (const operation of transaction.operations) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,339 @@ | ||
| import { beforeEach, describe, expect, it, vi } from "vitest" | ||
| import { | ||
| createCollection, | ||
| createLiveQueryCollection, | ||
| eq, | ||
| gt, | ||
| } from "@tanstack/db" | ||
| import { electricCollectionOptions } from "../src/electric" | ||
| import type { ElectricCollectionUtils } from "../src/electric" | ||
| import type { Collection } from "@tanstack/db" | ||
| import type { Message } from "@electric-sql/client" | ||
| import type { StandardSchemaV1 } from "@standard-schema/spec" | ||
|
|
||
| // Sample user type for tests | ||
| type User = { | ||
| id: number | ||
| name: string | ||
| age: number | ||
| email: string | ||
| active: boolean | ||
| } | ||
|
|
||
| // Sample data for tests | ||
| const sampleUsers: Array<User> = [ | ||
| { | ||
| id: 1, | ||
| name: `Alice`, | ||
| age: 25, | ||
| email: `alice@example.com`, | ||
| active: true, | ||
| }, | ||
| { | ||
| id: 2, | ||
| name: `Bob`, | ||
| age: 19, | ||
| email: `bob@example.com`, | ||
| active: true, | ||
| }, | ||
| { | ||
| id: 3, | ||
| name: `Charlie`, | ||
| age: 30, | ||
| email: `charlie@example.com`, | ||
| active: false, | ||
| }, | ||
| { | ||
| id: 4, | ||
| name: `Dave`, | ||
| age: 22, | ||
| email: `dave@example.com`, | ||
| active: true, | ||
| }, | ||
| ] | ||
|
|
||
| // Mock the ShapeStream module | ||
| const mockSubscribe = vi.fn() | ||
| const mockStream = { | ||
| subscribe: mockSubscribe, | ||
| } | ||
|
|
||
| vi.mock(`@electric-sql/client`, async () => { | ||
| const actual = await vi.importActual(`@electric-sql/client`) | ||
| return { | ||
| ...actual, | ||
| ShapeStream: vi.fn(() => mockStream), | ||
| } | ||
| }) | ||
|
|
||
| describe.each([ | ||
| [`autoIndex enabled (default)`, `eager` as const], | ||
| [`autoIndex disabled`, `off` as const], | ||
| ])(`Electric Collection with Live Query - %s`, (description, autoIndex) => { | ||
| let electricCollection: Collection< | ||
| User, | ||
| number, | ||
| ElectricCollectionUtils, | ||
| StandardSchemaV1<unknown, unknown>, | ||
| User | ||
| > | ||
| let subscriber: (messages: Array<Message<User>>) => void | ||
|
|
||
| function createElectricUsersCollection() { | ||
| vi.clearAllMocks() | ||
|
|
||
| // Reset mock subscriber | ||
| mockSubscribe.mockImplementation((callback) => { | ||
| subscriber = callback | ||
| return () => {} | ||
| }) | ||
|
|
||
| // Create Electric collection with specified autoIndex | ||
| const config = { | ||
| id: `electric-users`, | ||
| shapeOptions: { | ||
| url: `http://test-url`, | ||
| params: { | ||
| table: `users`, | ||
| }, | ||
| }, | ||
| startSync: true, | ||
| getKey: (user: User) => user.id, | ||
| autoIndex, | ||
| } | ||
|
|
||
| const options = electricCollectionOptions(config) | ||
| return createCollection(options) | ||
| } | ||
|
|
||
| function simulateInitialSync(users: Array<User> = sampleUsers) { | ||
| const messages: Array<Message<User>> = users.map((user) => ({ | ||
| key: user.id.toString(), | ||
| value: user, | ||
| headers: { operation: `insert` }, | ||
| })) | ||
|
|
||
| messages.push({ | ||
| headers: { control: `up-to-date` }, | ||
| }) | ||
|
|
||
| subscriber(messages) | ||
| } | ||
|
|
||
| function simulateMustRefetch() { | ||
| subscriber([ | ||
| { | ||
| headers: { control: `must-refetch` }, | ||
| }, | ||
| ]) | ||
| } | ||
|
|
||
| function simulateResync(users: Array<User>) { | ||
| const messages: Array<Message<User>> = users.map((user) => ({ | ||
| key: user.id.toString(), | ||
| value: user, | ||
| headers: { operation: `insert` }, | ||
| })) | ||
|
|
||
| messages.push({ | ||
| headers: { control: `up-to-date` }, | ||
| }) | ||
|
|
||
| subscriber(messages) | ||
| } | ||
|
|
||
| beforeEach(() => { | ||
| electricCollection = createElectricUsersCollection() | ||
| }) | ||
|
|
||
| it(`should handle basic must-refetch with filtered live query`, () => { | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this test failed before the fix, it showed an inconstant state |
||
| // Create a live query with WHERE clause | ||
| const activeLiveQuery = createLiveQueryCollection({ | ||
| id: `active-users-live-query`, | ||
| startSync: true, | ||
| query: (q) => | ||
| q | ||
| .from({ user: electricCollection }) | ||
| .where(({ user }) => eq(user.active, true)) | ||
| .select(({ user }) => ({ | ||
| id: user.id, | ||
| name: user.name, | ||
| active: user.active, | ||
| })), | ||
| }) | ||
|
|
||
| // Initial sync | ||
| simulateInitialSync() | ||
| expect(electricCollection.status).toBe(`ready`) | ||
| expect(electricCollection.size).toBe(4) | ||
| expect(activeLiveQuery.status).toBe(`ready`) | ||
| expect(activeLiveQuery.size).toBe(3) // Only active users | ||
|
|
||
| // Must-refetch and resync with updated data | ||
| simulateMustRefetch() | ||
| const updatedUsers = [ | ||
| { | ||
| id: 1, | ||
| name: `Alice Updated`, | ||
| age: 26, | ||
| email: `alice@example.com`, | ||
| active: true, | ||
| }, | ||
| { id: 5, name: `Eve`, age: 24, email: `eve@example.com`, active: true }, | ||
| { | ||
| id: 6, | ||
| name: `Frank`, | ||
| age: 35, | ||
| email: `frank@example.com`, | ||
| active: false, | ||
| }, | ||
| ] | ||
| simulateResync(updatedUsers) | ||
|
|
||
| // BUG: Live query should have 2 active users but only shows 1 | ||
| expect(electricCollection.status).toBe(`ready`) | ||
| expect(electricCollection.size).toBe(3) | ||
| expect(activeLiveQuery.status).toBe(`ready`) | ||
| expect(activeLiveQuery.size).toBe(2) // Only active users (Alice Updated and Eve) | ||
| }) | ||
|
|
||
| it(`should handle must-refetch with complex projections`, () => { | ||
| const complexLiveQuery = createLiveQueryCollection({ | ||
| startSync: true, | ||
| query: (q) => | ||
| q | ||
| .from({ user: electricCollection }) | ||
| .where(({ user }) => gt(user.age, 18)) | ||
| .select(({ user }) => ({ | ||
| userId: user.id, | ||
| displayName: user.name, | ||
| isAdult: user.age, | ||
| })), | ||
| }) | ||
|
|
||
| // Initial sync and must-refetch | ||
| simulateInitialSync() | ||
| simulateMustRefetch() | ||
|
|
||
| const newUsers = [ | ||
| { | ||
| id: 9, | ||
| name: `Iris`, | ||
| age: 30, | ||
| email: `iris@example.com`, | ||
| active: false, | ||
| }, | ||
| { | ||
| id: 10, | ||
| name: `Jack`, | ||
| age: 17, | ||
| email: `jack@example.com`, | ||
| active: true, | ||
| }, // Under 18, filtered | ||
| ] | ||
| simulateResync(newUsers) | ||
|
|
||
| expect(complexLiveQuery.status).toBe(`ready`) | ||
| expect(complexLiveQuery.size).toBe(1) // Only Iris (Jack filtered by age) | ||
| expect(complexLiveQuery.get(9)).toMatchObject({ | ||
| userId: 9, | ||
| displayName: `Iris`, | ||
| isAdult: 30, | ||
| }) | ||
| }) | ||
|
|
||
| it(`should handle rapid must-refetch sequences`, () => { | ||
| const liveQuery = createLiveQueryCollection({ | ||
| startSync: true, | ||
| query: (q) => q.from({ user: electricCollection }), | ||
| }) | ||
|
|
||
| // Initial sync | ||
| simulateInitialSync() | ||
| expect(liveQuery.size).toBe(4) | ||
|
|
||
| // Multiple rapid must-refetch messages | ||
| simulateMustRefetch() | ||
| simulateMustRefetch() | ||
| simulateMustRefetch() | ||
|
|
||
| // Final resync | ||
| const newUsers = [ | ||
| { | ||
| id: 10, | ||
| name: `New User`, | ||
| age: 20, | ||
| email: `new@example.com`, | ||
| active: true, | ||
| }, | ||
| ] | ||
| simulateResync(newUsers) | ||
|
|
||
| expect(electricCollection.status).toBe(`ready`) | ||
| expect(liveQuery.status).toBe(`ready`) | ||
| expect(liveQuery.size).toBe(1) | ||
| }) | ||
|
|
||
| it(`should handle live query becoming ready after must-refetch during initial sync`, () => { | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this reproduced the stuck loading state |
||
| // Test that live queries properly transition to ready state when must-refetch | ||
| // occurs during the initial sync of the source Electric collection | ||
|
|
||
| let testSubscriber: (messages: Array<Message<User>>) => void | ||
| vi.clearAllMocks() | ||
| mockSubscribe.mockImplementation((callback) => { | ||
| testSubscriber = callback | ||
| return () => {} | ||
| }) | ||
|
|
||
| // Create Electric collection | ||
| const testElectricCollection = createCollection( | ||
| electricCollectionOptions({ | ||
| id: `initial-sync-collection`, | ||
| shapeOptions: { | ||
| url: `http://test-url`, | ||
| params: { | ||
| table: `users`, | ||
| }, | ||
| }, | ||
| startSync: true, | ||
| getKey: (user: User) => user.id, | ||
| autoIndex, | ||
| }) | ||
| ) | ||
|
|
||
| // Send initial data but don't complete sync (no up-to-date) | ||
| testSubscriber([ | ||
| { | ||
| key: `1`, | ||
| value: { | ||
| id: 1, | ||
| name: `Alice`, | ||
| age: 25, | ||
| email: `alice@example.com`, | ||
| active: true, | ||
| }, | ||
| headers: { operation: `insert` }, | ||
| }, | ||
| ]) | ||
|
|
||
| expect(testElectricCollection.status).toBe(`loading`) | ||
|
|
||
| // Create live query while Electric collection is still loading | ||
| const liveQuery = createLiveQueryCollection({ | ||
| startSync: true, | ||
| query: (q) => q.from({ user: testElectricCollection }), | ||
| }) | ||
|
|
||
| expect(liveQuery.status).toBe(`loading`) | ||
|
|
||
| // Send must-refetch while collection is in loading state | ||
| testSubscriber([{ headers: { control: `must-refetch` } }]) | ||
|
|
||
| // Complete the sync | ||
| testSubscriber([{ headers: { control: `up-to-date` } }]) | ||
|
|
||
| // Both Electric collection and live query should be ready | ||
| expect(testElectricCollection.status).toBe(`ready`) | ||
| expect(liveQuery.status).toBe(`ready`) // This currently fails - live query stuck in loading | ||
samwillis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| }) | ||
| }) | ||
Uh oh!
There was an error while loading. Please reload this page.