Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .changeset/wide-dancers-battle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
"@tanstack/electric-db-collection": patch
"@tanstack/query-db-collection": patch
"@tanstack/db": patch
---

Add explicit collection readiness detection with `isReady()` and `markReady()`

- Add `isReady()` method to check if a collection is ready for use
- Add `onFirstReady()` method to register callbacks for when collection becomes ready
- Add `markReady()` to SyncConfig interface for sync implementations to explicitly signal readiness
- Replace `onFirstCommit()` with `onFirstReady()` for better semantics
- Update status state machine to allow `loading` → `ready` transition for cases with no data to commit
- Update all sync implementations (Electric, Query, Local-only, Local-storage) to use `markReady()`
- Improve error handling by allowing collections to be marked ready even when sync errors occur

This provides a more intuitive and ergonomic API for determining collection readiness, replacing the previous approach of using commits as a readiness signal.
101 changes: 75 additions & 26 deletions packages/db/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ export class CollectionImpl<
private hasReceivedFirstCommit = false
private isCommittingSyncTransactions = false

// Array to store one-time commit listeners
private onFirstCommitCallbacks: Array<() => void> = []
// Array to store one-time ready listeners
private onFirstReadyCallbacks: Array<() => void> = []
private hasBeenReady = false

// Event batching for preventing duplicate emissions during transaction flows
private batchedEvents: Array<ChangeMessage<T, TKey>> = []
Expand All @@ -244,17 +245,66 @@ export class CollectionImpl<
private syncCleanupFn: (() => void) | null = null

/**
* Register a callback to be executed on the next commit
* Register a callback to be executed when the collection first becomes ready
* Useful for preloading collections
* @param callback Function to call after the next commit
* @param callback Function to call when the collection first becomes ready
* @example
* collection.onFirstCommit(() => {
* console.log('Collection has received first data')
* collection.onFirstReady(() => {
* console.log('Collection is ready for the first time')
* // Safe to access collection.state now
* })
*/
public onFirstCommit(callback: () => void): void {
this.onFirstCommitCallbacks.push(callback)
public onFirstReady(callback: () => void): void {
// If already ready, call immediately
if (this.hasBeenReady) {
callback()
return
}

this.onFirstReadyCallbacks.push(callback)
}

/**
* Check if the collection is ready for use
* Returns true if the collection has been marked as ready by its sync implementation
* @returns true if the collection is ready, false otherwise
* @example
* if (collection.isReady()) {
* console.log('Collection is ready, data is available')
* // Safe to access collection.state
* } else {
* console.log('Collection is still loading')
* }
*/
public isReady(): boolean {
return this._status === `ready`
}

/**
* Mark the collection as ready for use
* This is called by sync implementations to explicitly signal that the collection is ready,
* providing a more intuitive alternative to using commits for readiness signaling
* @private - Should only be called by sync implementations
*/
private markReady(): void {
// Can transition to ready from loading or initialCommit states
if (this._status === `loading` || this._status === `initialCommit`) {
this.setStatus(`ready`)

// Call any registered first ready callbacks (only on first time becoming ready)
if (!this.hasBeenReady) {
this.hasBeenReady = true

// Also mark as having received first commit for backwards compatibility
if (!this.hasReceivedFirstCommit) {
this.hasReceivedFirstCommit = true
}

const callbacks = [...this.onFirstReadyCallbacks]
this.onFirstReadyCallbacks = []
callbacks.forEach((callback) => callback())
}
}
}

public id = ``
Expand Down Expand Up @@ -302,7 +352,7 @@ export class CollectionImpl<
Array<CollectionStatus>
> = {
idle: [`loading`, `error`, `cleaned-up`],
loading: [`initialCommit`, `error`, `cleaned-up`],
loading: [`initialCommit`, `ready`, `error`, `cleaned-up`],
initialCommit: [`ready`, `error`, `cleaned-up`],
ready: [`cleaned-up`, `error`],
error: [`cleaned-up`, `idle`],
Expand Down Expand Up @@ -455,11 +505,9 @@ export class CollectionImpl<
}

this.commitPendingTransactions()

// Transition from initialCommit to ready after the first commit is complete
if (this._status === `initialCommit`) {
this.setStatus(`ready`)
}
},
markReady: () => {
this.markReady()
},
})

Expand Down Expand Up @@ -492,7 +540,7 @@ export class CollectionImpl<
}

// Register callback BEFORE starting sync to avoid race condition
this.onFirstCommit(() => {
this.onFirstReady(() => {
resolve()
})

Expand Down Expand Up @@ -555,7 +603,8 @@ export class CollectionImpl<
this.pendingSyncedTransactions = []
this.syncedKeys.clear()
this.hasReceivedFirstCommit = false
this.onFirstCommitCallbacks = []
this.hasBeenReady = false
this.onFirstReadyCallbacks = []
this.preloadPromise = null
this.batchedEvents = []
this.shouldBatchEvents = false
Expand Down Expand Up @@ -1184,8 +1233,8 @@ export class CollectionImpl<
// Call any registered one-time commit listeners
if (!this.hasReceivedFirstCommit) {
this.hasReceivedFirstCommit = true
const callbacks = [...this.onFirstCommitCallbacks]
this.onFirstCommitCallbacks = []
const callbacks = [...this.onFirstReadyCallbacks]
this.onFirstReadyCallbacks = []
callbacks.forEach((callback) => callback())
}
}
Expand Down Expand Up @@ -1812,14 +1861,14 @@ export class CollectionImpl<
* @returns Promise that resolves to a Map containing all items in the collection
*/
stateWhenReady(): Promise<Map<TKey, T>> {
// If we already have data or there are no loading collections, resolve immediately
if (this.size > 0 || this.hasReceivedFirstCommit === true) {
// If we already have data or collection is ready, resolve immediately
if (this.size > 0 || this.isReady()) {
return Promise.resolve(this.state)
}

// Otherwise, wait for the first commit
// Otherwise, wait for the collection to be ready
return new Promise<Map<TKey, T>>((resolve) => {
this.onFirstCommit(() => {
this.onFirstReady(() => {
resolve(this.state)
})
})
Expand All @@ -1841,14 +1890,14 @@ export class CollectionImpl<
* @returns Promise that resolves to an Array containing all items in the collection
*/
toArrayWhenReady(): Promise<Array<T>> {
// If we already have data or there are no loading collections, resolve immediately
if (this.size > 0 || this.hasReceivedFirstCommit === true) {
// If we already have data or collection is ready, resolve immediately
if (this.size > 0 || this.isReady()) {
return Promise.resolve(this.toArray)
}

// Otherwise, wait for the first commit
// Otherwise, wait for the collection to be ready
return new Promise<Array<T>>((resolve) => {
this.onFirstCommit(() => {
this.onFirstReady(() => {
resolve(this.toArray)
})
})
Expand Down
5 changes: 4 additions & 1 deletion packages/db/src/local-only.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ function createLocalOnlySync<T extends object, TKey extends string | number>(
* @returns Unsubscribe function (empty since no ongoing sync is needed)
*/
sync: (params) => {
const { begin, write, commit } = params
const { begin, write, commit, markReady } = params

// Capture sync functions for later use by confirmOperationsSync
syncBegin = begin
Expand All @@ -259,6 +259,9 @@ function createLocalOnlySync<T extends object, TKey extends string | number>(
commit()
}

// Mark collection as ready since local-only collections are immediately ready
markReady()

// Return empty unsubscribe function - no ongoing sync needed
return () => {}
},
Expand Down
5 changes: 4 additions & 1 deletion packages/db/src/local-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ function createLocalStorageSync<T extends object>(

const syncConfig: SyncConfig<T> & { manualTrigger?: () => void } = {
sync: (params: Parameters<SyncConfig<T>[`sync`]>[0]) => {
const { begin, write, commit } = params
const { begin, write, commit, markReady } = params

// Store sync params for later use
syncParams = params
Expand All @@ -608,6 +608,9 @@ function createLocalStorageSync<T extends object>(
lastKnownData.set(key, storedItem)
})

// Mark collection as ready after initial load
markReady()

// Listen for storage events from other tabs
const handleStorageEvent = (event: StorageEvent) => {
// Only respond to changes to our specific key and from our storage
Expand Down
4 changes: 3 additions & 1 deletion packages/db/src/query/live-query-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ export function liveQueryCollectionOptions<
// Create the sync configuration
const sync: SyncConfig<TResult> = {
rowUpdateMode: `full`,
sync: ({ begin, write, commit, collection: theCollection }) => {
sync: ({ begin, write, commit, markReady, collection: theCollection }) => {
const { graph, inputs, pipeline } = maybeCompileBasePipeline()
let messagesCount = 0
pipeline.pipe(
Expand Down Expand Up @@ -295,6 +295,8 @@ export function liveQueryCollectionOptions<
begin()
commit()
}
// Mark the collection as ready after the first successful run
markReady()
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/db/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ export interface SyncConfig<
begin: () => void
write: (message: Omit<ChangeMessage<T>, `key`>) => void
commit: () => void
markReady: () => void
}) => void

/**
Expand Down
61 changes: 35 additions & 26 deletions packages/db/tests/collection-lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ describe(`Collection Lifecycle Management`, () => {
id: `status-test`,
getKey: (item) => item.id,
sync: {
sync: ({ begin, commit }) => {
sync: ({ begin, commit, markReady }) => {
beginCallback = begin as () => void
commitCallback = commit as () => void
commitCallback = () => {
commit()
markReady()
}
},
},
})
Expand All @@ -80,9 +83,12 @@ describe(`Collection Lifecycle Management`, () => {
getKey: (item) => item.id,
startSync: true,
sync: {
sync: ({ begin, commit }) => {
sync: ({ begin, commit, markReady }) => {
beginCallback = begin as () => void
commitCallback = commit as () => void
commitCallback = () => {
commit()
markReady()
}
},
},
})
Expand Down Expand Up @@ -121,9 +127,12 @@ describe(`Collection Lifecycle Management`, () => {
getKey: (item) => item.id,
gcTime: 0,
sync: {
sync: ({ begin, commit }) => {
sync: ({ begin, commit, markReady }) => {
beginCallback = begin as () => void
commitCallback = commit as () => void
commitCallback = () => {
commit()
markReady()
}
},
},
})
Expand Down Expand Up @@ -154,9 +163,10 @@ describe(`Collection Lifecycle Management`, () => {
getKey: (item) => item.id,
startSync: false, // Test lazy loading behavior
sync: {
sync: ({ begin, commit }) => {
sync: ({ begin, commit, markReady }) => {
begin()
commit()
markReady()
syncCallCount++
},
},
Expand Down Expand Up @@ -327,9 +337,12 @@ describe(`Collection Lifecycle Management`, () => {
getKey: (item) => item.id,
startSync: true,
sync: {
sync: ({ begin, commit }) => {
sync: ({ begin, commit, markReady }) => {
beginCallback = begin as () => void
commitCallback = commit as () => void
commitCallback = () => {
commit()
markReady()
}
},
},
})
Expand Down Expand Up @@ -389,42 +402,38 @@ describe(`Collection Lifecycle Management`, () => {
})

describe(`Lifecycle Events`, () => {
it(`should call onFirstCommit callbacks`, () => {
let beginCallback: (() => void) | undefined
let commitCallback: (() => void) | undefined
it(`should call onFirstReady callbacks`, () => {
let markReadyCallback: (() => void) | undefined
const callbacks: Array<() => void> = []

const collection = createCollection<{ id: string; name: string }>({
id: `first-commit-test`,
id: `first-ready-test`,
getKey: (item) => item.id,
sync: {
sync: ({ begin, commit }) => {
beginCallback = begin as () => void
commitCallback = commit as () => void
sync: ({ markReady }) => {
markReadyCallback = markReady as () => void
},
},
})

const unsubscribe = collection.subscribeChanges(() => {})

// Register callbacks
collection.onFirstCommit(() => callbacks.push(() => `callback1`))
collection.onFirstCommit(() => callbacks.push(() => `callback2`))
collection.onFirstReady(() => callbacks.push(() => `callback1`))
collection.onFirstReady(() => callbacks.push(() => `callback2`))

expect(callbacks).toHaveLength(0)

// Trigger first commit
if (beginCallback && commitCallback) {
beginCallback()
commitCallback()
// Trigger first ready
if (markReadyCallback) {
markReadyCallback()
}

expect(callbacks).toHaveLength(2)

// Subsequent commits should not trigger callbacks
if (beginCallback && commitCallback) {
beginCallback()
commitCallback()
// Subsequent markReady calls should not trigger callbacks
if (markReadyCallback) {
markReadyCallback()
}
expect(callbacks).toHaveLength(2)

Expand Down
Loading