Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/huge-forks-tan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@tanstack/electric-db-collection": patch
"@tanstack/db": patch
---

Fixed a bug where a live query result could become inconsistent after an electric "must-refetch".
6 changes: 6 additions & 0 deletions .changeset/wet-camels-brush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@tanstack/electric-db-collection": patch
"@tanstack/db": patch
---

Fixed a bug where a live query could get stuck in "loading" status when an electric "must-refetch" message arrived before the first "uo-to-date".
18 changes: 13 additions & 5 deletions packages/db/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,14 @@ export class CollectionImpl<
const callbacks = [...this.onFirstReadyCallbacks]
this.onFirstReadyCallbacks = []
callbacks.forEach((callback) => callback())

// to notify subscribers (like LiveQueryCollection) that the collection is ready
if (this.changeListeners.size > 0) {
this.emitEmptyReadyEvent()
}
}
}

// Always notify dependents when markReady is called, after status is set
// This ensures live queries get notified when their dependencies become ready
if (this.changeListeners.size > 0) {
this.emitEmptyReadyEvent()
}
Comment on lines +401 to +405
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for the stuck loading state

}

public id = ``
Expand Down Expand Up @@ -1270,6 +1271,13 @@ export class CollectionImpl<
this.syncedData.clear()
this.syncedMetadata.clear()
this.syncedKeys.clear()

// 3) Clear currentVisibleState for truncated keys to ensure subsequent operations
// are compared against the post-truncate state (undefined) rather than pre-truncate state
// This ensures that re-inserted keys are emitted as INSERT events, not UPDATE events
for (const key of changedKeys) {
currentVisibleState.delete(key)
}
Comment on lines +1275 to +1280
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for the inconsistent state

}

for (const operation of transaction.operations) {
Expand Down
339 changes: 339 additions & 0 deletions packages/electric-db-collection/tests/electric-live-query.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
import { beforeEach, describe, expect, it, vi } from "vitest"
import {
createCollection,
createLiveQueryCollection,
eq,
gt,
} from "@tanstack/db"
import { electricCollectionOptions } from "../src/electric"
import type { ElectricCollectionUtils } from "../src/electric"
import type { Collection } from "@tanstack/db"
import type { Message } from "@electric-sql/client"
import type { StandardSchemaV1 } from "@standard-schema/spec"

// Sample user type for tests
type User = {
id: number
name: string
age: number
email: string
active: boolean
}

// Sample data for tests
const sampleUsers: Array<User> = [
{
id: 1,
name: `Alice`,
age: 25,
email: `alice@example.com`,
active: true,
},
{
id: 2,
name: `Bob`,
age: 19,
email: `bob@example.com`,
active: true,
},
{
id: 3,
name: `Charlie`,
age: 30,
email: `charlie@example.com`,
active: false,
},
{
id: 4,
name: `Dave`,
age: 22,
email: `dave@example.com`,
active: true,
},
]

// Mock the ShapeStream module
const mockSubscribe = vi.fn()
const mockStream = {
subscribe: mockSubscribe,
}

vi.mock(`@electric-sql/client`, async () => {
const actual = await vi.importActual(`@electric-sql/client`)
return {
...actual,
ShapeStream: vi.fn(() => mockStream),
}
})

describe.each([
[`autoIndex enabled (default)`, `eager` as const],
[`autoIndex disabled`, `off` as const],
])(`Electric Collection with Live Query - %s`, (description, autoIndex) => {
let electricCollection: Collection<
User,
number,
ElectricCollectionUtils,
StandardSchemaV1<unknown, unknown>,
User
>
let subscriber: (messages: Array<Message<User>>) => void

function createElectricUsersCollection() {
vi.clearAllMocks()

// Reset mock subscriber
mockSubscribe.mockImplementation((callback) => {
subscriber = callback
return () => {}
})

// Create Electric collection with specified autoIndex
const config = {
id: `electric-users`,
shapeOptions: {
url: `http://test-url`,
params: {
table: `users`,
},
},
startSync: true,
getKey: (user: User) => user.id,
autoIndex,
}

const options = electricCollectionOptions(config)
return createCollection(options)
}

function simulateInitialSync(users: Array<User> = sampleUsers) {
const messages: Array<Message<User>> = users.map((user) => ({
key: user.id.toString(),
value: user,
headers: { operation: `insert` },
}))

messages.push({
headers: { control: `up-to-date` },
})

subscriber(messages)
}

function simulateMustRefetch() {
subscriber([
{
headers: { control: `must-refetch` },
},
])
}

function simulateResync(users: Array<User>) {
const messages: Array<Message<User>> = users.map((user) => ({
key: user.id.toString(),
value: user,
headers: { operation: `insert` },
}))

messages.push({
headers: { control: `up-to-date` },
})

subscriber(messages)
}

beforeEach(() => {
electricCollection = createElectricUsersCollection()
})

it(`should handle basic must-refetch with filtered live query`, () => {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test failed before the fix, it showed an inconstant state

// Create a live query with WHERE clause
const activeLiveQuery = createLiveQueryCollection({
id: `active-users-live-query`,
startSync: true,
query: (q) =>
q
.from({ user: electricCollection })
.where(({ user }) => eq(user.active, true))
.select(({ user }) => ({
id: user.id,
name: user.name,
active: user.active,
})),
})

// Initial sync
simulateInitialSync()
expect(electricCollection.status).toBe(`ready`)
expect(electricCollection.size).toBe(4)
expect(activeLiveQuery.status).toBe(`ready`)
expect(activeLiveQuery.size).toBe(3) // Only active users

// Must-refetch and resync with updated data
simulateMustRefetch()
const updatedUsers = [
{
id: 1,
name: `Alice Updated`,
age: 26,
email: `alice@example.com`,
active: true,
},
{ id: 5, name: `Eve`, age: 24, email: `eve@example.com`, active: true },
{
id: 6,
name: `Frank`,
age: 35,
email: `frank@example.com`,
active: false,
},
]
simulateResync(updatedUsers)

// BUG: Live query should have 2 active users but only shows 1
expect(electricCollection.status).toBe(`ready`)
expect(electricCollection.size).toBe(3)
expect(activeLiveQuery.status).toBe(`ready`)
expect(activeLiveQuery.size).toBe(2) // Only active users (Alice Updated and Eve)
})

it(`should handle must-refetch with complex projections`, () => {
const complexLiveQuery = createLiveQueryCollection({
startSync: true,
query: (q) =>
q
.from({ user: electricCollection })
.where(({ user }) => gt(user.age, 18))
.select(({ user }) => ({
userId: user.id,
displayName: user.name,
isAdult: user.age,
})),
})

// Initial sync and must-refetch
simulateInitialSync()
simulateMustRefetch()

const newUsers = [
{
id: 9,
name: `Iris`,
age: 30,
email: `iris@example.com`,
active: false,
},
{
id: 10,
name: `Jack`,
age: 17,
email: `jack@example.com`,
active: true,
}, // Under 18, filtered
]
simulateResync(newUsers)

expect(complexLiveQuery.status).toBe(`ready`)
expect(complexLiveQuery.size).toBe(1) // Only Iris (Jack filtered by age)
expect(complexLiveQuery.get(9)).toMatchObject({
userId: 9,
displayName: `Iris`,
isAdult: 30,
})
})

it(`should handle rapid must-refetch sequences`, () => {
const liveQuery = createLiveQueryCollection({
startSync: true,
query: (q) => q.from({ user: electricCollection }),
})

// Initial sync
simulateInitialSync()
expect(liveQuery.size).toBe(4)

// Multiple rapid must-refetch messages
simulateMustRefetch()
simulateMustRefetch()
simulateMustRefetch()

// Final resync
const newUsers = [
{
id: 10,
name: `New User`,
age: 20,
email: `new@example.com`,
active: true,
},
]
simulateResync(newUsers)

expect(electricCollection.status).toBe(`ready`)
expect(liveQuery.status).toBe(`ready`)
expect(liveQuery.size).toBe(1)
})

it(`should handle live query becoming ready after must-refetch during initial sync`, () => {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reproduced the stuck loading state

// Test that live queries properly transition to ready state when must-refetch
// occurs during the initial sync of the source Electric collection

let testSubscriber: (messages: Array<Message<User>>) => void
vi.clearAllMocks()
mockSubscribe.mockImplementation((callback) => {
testSubscriber = callback
return () => {}
})

// Create Electric collection
const testElectricCollection = createCollection(
electricCollectionOptions({
id: `initial-sync-collection`,
shapeOptions: {
url: `http://test-url`,
params: {
table: `users`,
},
},
startSync: true,
getKey: (user: User) => user.id,
autoIndex,
})
)

// Send initial data but don't complete sync (no up-to-date)
testSubscriber([
{
key: `1`,
value: {
id: 1,
name: `Alice`,
age: 25,
email: `alice@example.com`,
active: true,
},
headers: { operation: `insert` },
},
])

expect(testElectricCollection.status).toBe(`loading`)

// Create live query while Electric collection is still loading
const liveQuery = createLiveQueryCollection({
startSync: true,
query: (q) => q.from({ user: testElectricCollection }),
})

expect(liveQuery.status).toBe(`loading`)

// Send must-refetch while collection is in loading state
testSubscriber([{ headers: { control: `must-refetch` } }])

// Complete the sync
testSubscriber([{ headers: { control: `up-to-date` } }])

// Both Electric collection and live query should be ready
expect(testElectricCollection.status).toBe(`ready`)
expect(liveQuery.status).toBe(`ready`) // This currently fails - live query stuck in loading
})
})
Loading