Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/db-sync-methods.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Add a new truncate method to the sync handler to enable a collections state to be reset from a sync transaction.
5 changes: 5 additions & 0 deletions .changeset/electric-must-refetch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/electric-db-collection": patch
---

Add must-refetch message handling to clear synced data and reset collection to loading state.
162 changes: 156 additions & 6 deletions packages/db/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import type { BaseIndex, IndexResolver } from "./indexes/base-index.js"
interface PendingSyncedTransaction<T extends object = Record<string, unknown>> {
committed: boolean
operations: Array<OptimisticChangeMessage<T>>
truncate?: boolean
}

/**
Expand Down Expand Up @@ -559,11 +560,16 @@ export class CollectionImpl<

// Check if an item with this key already exists when inserting
if (messageWithoutKey.type === `insert`) {
const insertingIntoExistingSynced = this.syncedData.has(key)
const hasPendingDeleteForKey = pendingTransaction.operations.some(
(op) => op.key === key && op.type === `delete`
)
const isTruncateTransaction = pendingTransaction.truncate === true
// Allow insert after truncate in the same transaction even if it existed in syncedData
if (
this.syncedData.has(key) &&
!pendingTransaction.operations.some(
(op) => op.key === key && op.type === `delete`
)
insertingIntoExistingSynced &&
!hasPendingDeleteForKey &&
!isTruncateTransaction
) {
throw new DuplicateKeySyncError(key, this.id)
}
Expand Down Expand Up @@ -600,6 +606,28 @@ export class CollectionImpl<
markReady: () => {
this.markReady()
},
truncate: () => {
const pendingTransaction =
this.pendingSyncedTransactions[
this.pendingSyncedTransactions.length - 1
]
if (!pendingTransaction) {
throw new NoPendingSyncTransactionWriteError()
}
if (pendingTransaction.committed) {
throw new SyncTransactionAlreadyCommittedWriteError()
}

// Clear all operations from the current transaction
pendingTransaction.operations = []

// Mark the transaction as a truncate operation. During commit, this triggers:
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
// - Clearing of syncedData/syncedMetadata
// - Subsequent synced ops applied on the fresh base
// - Finally, optimistic mutations re-applied on top (single batch)
pendingTransaction.truncate = true
},
})

// Store cleanup function if provided
Expand Down Expand Up @@ -1149,7 +1177,11 @@ export class CollectionImpl<
}
}

if (!hasPersistingTransaction) {
const hasTruncateSync = this.pendingSyncedTransactions.some(
(t) => t.truncate === true
)

if (!hasPersistingTransaction || hasTruncateSync) {
// Set flag to prevent redundant optimistic state recalculations
this.isCommittingSyncTransactions = true

Expand Down Expand Up @@ -1178,7 +1210,31 @@ export class CollectionImpl<
const events: Array<ChangeMessage<T, TKey>> = []
const rowUpdateMode = this.config.sync.rowUpdateMode || `partial`

// Use current optimistic state directly; no copies required

for (const transaction of this.pendingSyncedTransactions) {
// Handle truncate operations first
if (transaction.truncate) {
// TRUNCATE PHASE
// 1) Emit a delete for every currently-synced key so downstream listeners/indexes
// observe a clear-before-rebuild. We intentionally skip keys already in
// optimisticDeletes because their delete was previously emitted by the user.
for (const key of this.syncedData.keys()) {
if (this.optimisticDeletes.has(key)) continue
const previousValue =
this.optimisticUpserts.get(key) || this.syncedData.get(key)
if (previousValue !== undefined) {
events.push({ type: `delete`, key, value: previousValue })
}
}

// 2) Clear the authoritative synced base. Subsequent server ops in this
// same commit will rebuild the base atomically.
this.syncedData.clear()
this.syncedMetadata.clear()
this.syncedKeys.clear()
}

for (const operation of transaction.operations) {
const key = operation.key as TKey
this.syncedKeys.add(key)
Expand Down Expand Up @@ -1228,7 +1284,101 @@ export class CollectionImpl<
}
}

// Clear optimistic state since sync operations will now provide the authoritative data
// After applying synced operations, if this commit included a truncate,
// re-apply optimistic mutations on top of the fresh synced base. This ensures
// the UI preserves local intent while respecting server rebuild semantics.
// Ordering: deletes (above) -> server ops (just applied) -> optimistic upserts.
const hadTruncate = this.pendingSyncedTransactions.some(
(t) => t.truncate === true
)
if (hadTruncate) {
// Avoid duplicating keys that were inserted/updated by synced operations in this commit
const syncedInsertedOrUpdatedKeys = new Set<TKey>()
for (const t of this.pendingSyncedTransactions) {
for (const op of t.operations) {
if (op.type === `insert` || op.type === `update`) {
syncedInsertedOrUpdatedKeys.add(op.key as TKey)
}
}
}

// Build re-apply sets from ACTIVE optimistic transactions against the new synced base
// We do not copy maps; we compute intent directly from transactions to avoid drift.
const reapplyUpserts = new Map<TKey, T>()
const reapplyDeletes = new Set<TKey>()

for (const tx of this.transactions.values()) {
if ([`completed`, `failed`].includes(tx.state)) continue
for (const mutation of tx.mutations) {
if (mutation.collection !== this || !mutation.optimistic) continue
const key = mutation.key as TKey
switch (mutation.type) {
case `insert`:
reapplyUpserts.set(key, mutation.modified as T)
reapplyDeletes.delete(key)
break
case `update`: {
const base = this.syncedData.get(key)
const next = base
? (Object.assign({}, base, mutation.changes) as T)
: (mutation.modified as T)
reapplyUpserts.set(key, next)
reapplyDeletes.delete(key)
break
}
case `delete`:
reapplyUpserts.delete(key)
reapplyDeletes.add(key)
break
}
}
}

// Emit inserts for re-applied upserts, skipping any keys that have an optimistic delete.
// If the server also inserted/updated the same key in this batch, override that value
// with the optimistic value to preserve local intent.
for (const [key, value] of reapplyUpserts) {
if (reapplyDeletes.has(key)) continue
if (syncedInsertedOrUpdatedKeys.has(key)) {
let foundInsert = false
for (let i = events.length - 1; i >= 0; i--) {
const evt = events[i]!
if (evt.key === key && evt.type === `insert`) {
evt.value = value
foundInsert = true
break
}
}
if (!foundInsert) {
events.push({ type: `insert`, key, value })
}
} else {
events.push({ type: `insert`, key, value })
}
}

// Finally, ensure we do NOT insert keys that have an outstanding optimistic delete.
if (events.length > 0 && reapplyDeletes.size > 0) {
const filtered: Array<ChangeMessage<T, TKey>> = []
for (const evt of events) {
if (evt.type === `insert` && reapplyDeletes.has(evt.key)) {
continue
}
filtered.push(evt)
}
events.length = 0
events.push(...filtered)
}

// Ensure listeners are active before emitting this critical batch
if (!this.isReady()) {
this.setStatus(`ready`)
}
}

// Maintain optimistic state appropriately
// Clear optimistic state since sync operations will now provide the authoritative data.
// Any still-active user transactions will be re-applied below in recompute.
this.optimisticUpserts.clear()
this.optimisticDeletes.clear()

Expand Down
1 change: 1 addition & 0 deletions packages/db/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ export interface SyncConfig<
write: (message: Omit<ChangeMessage<T>, `key`>) => void
commit: () => void
markReady: () => void
truncate: () => void
}) => void

/**
Expand Down
Loading