Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/db-sync-methods.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Add setLoading and clearSyncedState methods to sync handler to enable a collections state to be reset to loading state and cleared.
5 changes: 5 additions & 0 deletions .changeset/electric-must-refetch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/electric-db-collection": patch
---

Add must-refetch message handling to clear synced data and reset collection to loading state.
32 changes: 31 additions & 1 deletion packages/db/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,30 @@ export class CollectionImpl<
}
}

/**
* Set the collection back to loading state
* This is called by sync implementations when they need to reset the collection state
* (e.g., when receiving a must-refetch message from Electric)
* @private - Should only be called by sync implementations
*/
private setLoading(): void {
// Can transition to loading from ready or initialCommit states
if (this._status === `ready` || this._status === `initialCommit`) {
this.setStatus(`loading`)
}
}

/**
* Clear the synced data and metadata
* This is called by sync implementations when they need to clear the collection state
* (e.g., when receiving a must-refetch message from Electric)
* @private - Should only be called by sync implementations
*/
private clearSyncedState(): void {
this.syncedData.clear()
this.syncedMetadata.clear()
}

public id = ``

/**
Expand Down Expand Up @@ -444,7 +468,7 @@ export class CollectionImpl<
idle: [`loading`, `error`, `cleaned-up`],
loading: [`initialCommit`, `ready`, `error`, `cleaned-up`],
initialCommit: [`ready`, `error`, `cleaned-up`],
ready: [`cleaned-up`, `error`],
ready: [`cleaned-up`, `error`, `loading`],
error: [`cleaned-up`, `idle`],
"cleaned-up": [`loading`, `error`],
}
Expand Down Expand Up @@ -600,6 +624,12 @@ export class CollectionImpl<
markReady: () => {
this.markReady()
},
setLoading: () => {
this.setLoading()
},
clearSyncedState: () => {
this.clearSyncedState()
},
})

// Store cleanup function if provided
Expand Down
2 changes: 2 additions & 0 deletions packages/db/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ export interface SyncConfig<
write: (message: Omit<ChangeMessage<T>, `key`>) => void
commit: () => void
markReady: () => void
setLoading: () => void
clearSyncedState: () => void
}) => void

/**
Expand Down
7 changes: 5 additions & 2 deletions packages/db/tests/collection-errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,9 @@ describe(`Collection Error Handling`, () => {

expect(collection.status).toBe(`idle`)

// Test invalid transition
// Test invalid transition (ready to idle is not allowed)
expect(() => {
collectionImpl.validateStatusTransition(`ready`, `loading`)
collectionImpl.validateStatusTransition(`ready`, `idle`)
}).toThrow(InvalidCollectionStatusTransitionError)

// Test valid transition
Expand Down Expand Up @@ -435,6 +435,9 @@ describe(`Collection Error Handling`, () => {
expect(() =>
collectionImpl.validateStatusTransition(`ready`, `error`)
).not.toThrow()
expect(() =>
collectionImpl.validateStatusTransition(`ready`, `loading`)
).not.toThrow()

// Valid transitions from error (allow recovery)
expect(() =>
Expand Down
20 changes: 19 additions & 1 deletion packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ function isUpToDateMessage<T extends Row<unknown>>(
return isControlMessage(message) && message.headers.control === `up-to-date`
}

function isMustRefetchMessage<T extends Row<unknown>>(
message: Message<T>
): message is ControlMessage & { headers: { control: `must-refetch` } } {
return isControlMessage(message) && message.headers.control === `must-refetch`
}

// Check if a message contains txids in its headers
function hasTxids<T extends Row<unknown>>(
message: Message<T>
Expand Down Expand Up @@ -470,7 +476,8 @@ function createElectricSync<T extends Row<unknown>>(

return {
sync: (params: Parameters<SyncConfig<T>[`sync`]>[0]) => {
const { begin, write, commit, markReady } = params
const { begin, write, commit, markReady, clearSyncedState, setLoading } =
params
const stream = new ShapeStream({
...shapeOptions,
signal: abortController.signal,
Expand Down Expand Up @@ -521,6 +528,17 @@ function createElectricSync<T extends Row<unknown>>(
})
} else if (isUpToDateMessage(message)) {
hasUpToDate = true
} else if (isMustRefetchMessage(message)) {
debug(
`Received must-refetch message, clearing synced data and restarting sync`
)

// Clear synced data and reset to loading state using the new methods
clearSyncedState()
setLoading()

// Reset transaction state to allow new transactions after must-refetch
transactionStarted = false
}
}

Expand Down
51 changes: 51 additions & 0 deletions packages/electric-db-collection/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,57 @@ describe(`Electric Integration`, () => {
expect(collection.state).toEqual(new Map())
})

it(`should handle must-refetch by clearing synced data and resetting to loading state`, () => {
// First, populate the collection with some data
subscriber([
{
key: `1`,
value: { id: 1, name: `Test User` },
headers: { operation: `insert` },
},
{
key: `2`,
value: { id: 2, name: `Another User` },
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])

// Verify the data is in the collection
expect(collection.state.size).toBe(2)
expect(collection.status).toBe(`ready`)

// Send must-refetch control message
subscriber([
{
headers: { control: `must-refetch` },
},
])

// The collection should be cleared and reset to loading state
expect(collection.state.size).toBe(0)
expect(collection.status).toBe(`loading`)

// Send new data after must-refetch
subscriber([
{
key: `3`,
value: { id: 3, name: `New User` },
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])

// The collection should now have the new data
expect(collection.state.size).toBe(1)
expect(collection.state.get(3)).toEqual({ id: 3, name: `New User` })
expect(collection.status).toBe(`ready`)
})

// Tests for txid tracking functionality
describe(`txid tracking`, () => {
it(`should track txids from incoming messages`, async () => {
Expand Down