Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
419ae23
feat: Add flexible matching strategies for electric-db-collection (#402)
KyleAMathews Sep 3, 2025
5f4b76b
fix: Address code review feedback - commit semantics, memory leaks, a…
KyleAMathews Sep 3, 2025
d563b44
format
KyleAMathews Sep 3, 2025
0098cab
fix: Address critical lifecycle and safety issues in matching strategies
KyleAMathews Sep 3, 2025
2ca6beb
Merge remote-tracking branch 'origin/main' into match-stream
KyleAMathews Sep 10, 2025
4759c98
Merge origin/main into match-stream
KyleAMathews Oct 6, 2025
9f26e69
Fix TypeScript build error in ElectricCollectionConfig
KyleAMathews Oct 6, 2025
89e7cce
Fix electric collection test unhandled rejections
KyleAMathews Oct 7, 2025
6b504d7
Resolve merge conflict in electric-collection.md
KyleAMathews Oct 7, 2025
9908218
Simplify matching strategies API based on review feedback
KyleAMathews Oct 7, 2025
3c9d28a
Merge origin/main into match-stream
KyleAMathews Oct 7, 2025
e1f5ba6
Merge branch 'main' into match-stream
KyleAMathews Oct 7, 2025
69cc35c
Set awaitTxId default timeout to 5 seconds
KyleAMathews Oct 7, 2025
659a184
Update changeset to reflect current API changes
KyleAMathews Oct 7, 2025
d5f826e
format
KyleAMathews Oct 8, 2025
14725e9
cleanup changeset
KyleAMathews Oct 8, 2025
bef4480
better wording
KyleAMathews Oct 8, 2025
f3cfb72
Delete packages/db/src/collection.ts.backup
KyleAMathews Oct 8, 2025
e76afa0
Delete packages/db/tests/collection.test.ts.backup
KyleAMathews Oct 8, 2025
a9a56a3
wording
KyleAMathews Oct 8, 2025
b673ea1
Remove void/no-wait pattern from handler examples
KyleAMathews Oct 8, 2025
df682af
Delete # Introducing TanStack DB 0.md
KyleAMathews Oct 8, 2025
7d1c7df
Extract match resolution logic into helper function
KyleAMathews Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .changeset/poor-wasps-stand.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"@tanstack/electric-db-collection": minor
---

feat: Add flexible matching strategies for electric-db-collection (#402)

Add three matching strategies for client-server synchronization:

1. **Txid strategy** (existing, backward compatible) - Uses PostgreSQL transaction IDs for precise matching
2. **Custom match function strategy** (new) - Allows heuristic-based matching with custom logic
3. **Void/timeout strategy** (new, 3-second default) - Simple timeout for prototyping

**New Features:**

- New types: `MatchFunction<T>`, `MatchingStrategy<T>`
- Enhanced `ElectricCollectionConfig` to support all strategies
- New utility: `awaitMatch(matchFn, timeout?)`
- Export `isChangeMessage` and `isControlMessage` helpers for custom match functions

**Benefits:**

- Backward compatibility maintained - existing code works unchanged
- Architecture flexibility for different backend capabilities
- Progressive enhancement path - start with void strategy, upgrade to txid when ready
- No forced backend API changes - custom match functions work without backend modifications
172 changes: 164 additions & 8 deletions docs/collections/electric-collection.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,26 @@ The `electricCollectionOptions` function accepts the following options:

### Persistence Handlers

Handlers are called before mutations and support three different matching strategies:

- `onInsert`: Handler called before insert operations
- `onUpdate`: Handler called before update operations
- `onDelete`: Handler called before delete operations

## Persistence Handlers
Each handler can return:
- `{ txid: number | number[] }` - Txid strategy (recommended)
- `{ matchFn: (message) => boolean, timeout?: number }` - Custom match function strategy
- `{}` - Void strategy (3-second timeout)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we let a user return the timeout on the void strategy?

Suggested change
- `{}` - Void strategy (3-second timeout)
- `{ timeout?: number }` - Void strategy (default 3-second timeout)


## Persistence Handlers & Matching Strategies

Handlers can be defined to run on mutations. They are useful to send mutations to the backend and confirming them once Electric delivers the corresponding transactions. Until confirmation, TanStack DB blocks sync data for the collection to prevent race conditions. To avoid any delays, it's important to use a matching strategy.

Handlers can be defined to run on mutations. They are useful to send mutations to the backend and confirming them once Electric delivers the corresponding transactions. Until confirmation, TanStack DB blocks sync data for the collection to prevent race conditions. To avoid any delays, it’s important to use a matching strategy.
Electric collections support three matching strategies for synchronizing client mutations with server responses:

The most reliable strategy is for the backend to include the transaction ID (txid) in its response, allowing the client to match each mutation with Electric’s transaction identifiers for precise confirmation. If no strategy is provided, client mutations are automatically confirmed after three seconds.
### 1. Txid Strategy (Recommended)

The most reliable strategy uses PostgreSQL transaction IDs (txids) for precise matching. The backend returns a txid, and the client waits for that specific txid to appear in the Electric stream.

```typescript
const todosCollection = createCollection(
Expand All @@ -79,10 +90,78 @@ const todosCollection = createCollection(
const newItem = transaction.mutations[0].modified
const response = await api.todos.create(newItem)

// Txid strategy - most reliable
return { txid: response.txid }
},

// you can also implement onUpdate and onDelete handlers
onUpdate: async ({ transaction }) => {
const { original, changes } = transaction.mutations[0]
const response = await api.todos.update({
where: { id: original.id },
data: changes
})

return { txid: response.txid }
}
})
)
```

### 2. Custom Match Function Strategy

When txids aren't available, you can provide a custom function that examines Electric stream messages to determine when a mutation has been synchronized. This is useful for heuristic-based matching.

```typescript
import { isChangeMessage } from '@tanstack/electric-db-collection'

const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},

onInsert: async ({ transaction }) => {
const newItem = transaction.mutations[0].modified
await api.todos.create(newItem)

// Custom match function strategy
return {
matchFn: (message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === newItem.text
},
timeout: 10000 // Optional timeout in ms, defaults to 30000
}
}
})
)
```

### 3. Void Strategy (Timeout)

When neither txids nor reliable matching are possible, you can use the void strategy which simply waits a fixed timeout period (3 seconds by default). This is useful for prototyping or when you're confident about timing.

```typescript
const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},

onInsert: async ({ transaction }) => {
const newItem = transaction.mutations[0].modified
await api.todos.create(newItem)

// Void strategy - waits 3 seconds
return {}
}
})
)
```
Expand Down Expand Up @@ -162,7 +241,9 @@ export const ServerRoute = createServerFileRoute("/api/todos").methods({

## Optimistic Updates with Explicit Transactions

For more advanced use cases, you can create custom actions that can do multiple mutations across collections transactionally. In this case, you need to explicitly await for the transaction ID using `utils.awaitTxId()`.
For more advanced use cases, you can create custom actions that can do multiple mutations across collections transactionally. You can use the utility methods to wait for synchronization with different strategies:

### Using Txid Strategy

```typescript
const addTodoAction = createOptimisticAction({
Expand All @@ -184,19 +265,94 @@ const addTodoAction = createOptimisticAction({
data: { text, completed: false }
})

// Wait for the specific txid
await todosCollection.utils.awaitTxId(response.txid)
}
})
```

### Using Custom Match Function

```typescript
import { isChangeMessage } from '@tanstack/electric-db-collection'

const addTodoAction = createOptimisticAction({
onMutate: ({ text }) => {
const tempId = crypto.randomUUID()
todosCollection.insert({
id: tempId,
text,
completed: false,
created_at: new Date(),
})
},

mutationFn: async ({ text }) => {
await api.todos.create({
data: { text, completed: false }
})

// Wait for matching message
await todosCollection.utils.awaitMatch(
(message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === text
}
)
}
})
```

## Utility Methods

The collection provides these utility methods via `collection.utils`:

- `awaitTxId(txid, timeout?)`: Manually wait for a specific transaction ID to be synchronized
### `awaitTxId(txid, timeout?)`

Manually wait for a specific transaction ID to be synchronized:

```typescript
// Wait for specific txid
await todosCollection.utils.awaitTxId(12345)

// With custom timeout (default is 30 seconds)
await todosCollection.utils.awaitTxId(12345, 10000)
```

### `awaitMatch(matchFn, timeout?)`

Manually wait for a custom match function to find a matching message:

```typescript
todosCollection.utils.awaitTxId(12345)
import { isChangeMessage } from '@tanstack/electric-db-collection'

// Wait for a specific message pattern
await todosCollection.utils.awaitMatch(
(message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === 'New Todo'
},
5000 // timeout in ms
)
```

This is useful when you need to ensure a mutation has been synchronized before proceeding with other operations.
### Helper Functions

The package exports helper functions for use in custom match functions:

- `isChangeMessage(message)`: Check if a message is a data change (insert/update/delete)
- `isControlMessage(message)`: Check if a message is a control message (up-to-date, must-refetch)

```typescript
import { isChangeMessage, isControlMessage } from '@tanstack/electric-db-collection'

// Use in custom match functions
const matchFn = (message) => {
if (isChangeMessage(message)) {
return message.headers.operation === 'insert'
}
return false
}
```
12 changes: 8 additions & 4 deletions packages/db/src/collection/mutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ export class CollectionMutationsManager<

// Apply mutations to the new transaction
directOpTransaction.applyMutations(mutations)
directOpTransaction.commit()
// Errors still reject tx.isPersisted.promise; this catch only prevents global unhandled rejections
directOpTransaction.commit().catch(() => undefined)

// Add the transaction to the collection's transactions store
state.transactions.set(directOpTransaction.id, directOpTransaction)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the commit fails, do we still want to store this TX ID in the state.transactions?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes because the transaction still exists even if it failed

Expand Down Expand Up @@ -387,7 +388,8 @@ export class CollectionMutationsManager<
const emptyTransaction = createTransaction({
mutationFn: async () => {},
})
emptyTransaction.commit()
// Errors still propagate through tx.isPersisted.promise; suppress the background commit from warning
emptyTransaction.commit().catch(() => undefined)
// Schedule cleanup for empty transaction
state.scheduleTransactionCleanup(emptyTransaction)
return emptyTransaction
Expand Down Expand Up @@ -423,7 +425,8 @@ export class CollectionMutationsManager<

// Apply mutations to the new transaction
directOpTransaction.applyMutations(mutations)
directOpTransaction.commit()
// Errors still hit tx.isPersisted.promise; avoid leaking an unhandled rejection from the fire-and-forget commit
directOpTransaction.commit().catch(() => undefined)

// Add the transaction to the collection's transactions store

Expand Down Expand Up @@ -524,7 +527,8 @@ export class CollectionMutationsManager<

// Apply mutations to the new transaction
directOpTransaction.applyMutations(mutations)
directOpTransaction.commit()
// Errors still reject tx.isPersisted.promise; silence the internal commit promise to prevent test noise
directOpTransaction.commit().catch(() => undefined)

state.transactions.set(directOpTransaction.id, directOpTransaction)
state.scheduleTransactionCleanup(directOpTransaction)
Expand Down
Loading
Loading