-
Notifications
You must be signed in to change notification settings - Fork 203
Expand file tree
/
Copy pathtrailbase.ts
More file actions
352 lines (304 loc) · 9.69 KB
/
trailbase.ts
File metadata and controls
352 lines (304 loc) · 9.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
/* eslint-disable @typescript-eslint/no-unnecessary-condition */
import { Store } from "@tanstack/store"
import type { Event, RecordApi } from "trailbase"
import type {
CollectionConfig,
DeleteMutationFnParams,
InsertMutationFnParams,
SyncConfig,
UpdateMutationFnParams,
UtilsRecord,
} from "@tanstack/db"
type ShapeOf<T> = Record<keyof T, unknown>
type Conversion<I, O> = (value: I) => O
type OptionalConversions<
InputType extends ShapeOf<OutputType>,
OutputType extends ShapeOf<InputType>,
> = {
// Excludes all keys that require a conversation.
[K in keyof InputType as InputType[K] extends OutputType[K]
? K
: never]?: Conversion<InputType[K], OutputType[K]>
}
type RequiredConversions<
InputType extends ShapeOf<OutputType>,
OutputType extends ShapeOf<InputType>,
> = {
// Excludes all keys that do not strictly require a conversation.
[K in keyof InputType as InputType[K] extends OutputType[K]
? never
: K]: Conversion<InputType[K], OutputType[K]>
}
type Conversions<
InputType extends ShapeOf<OutputType>,
OutputType extends ShapeOf<InputType>,
> = OptionalConversions<InputType, OutputType> &
RequiredConversions<InputType, OutputType>
function convert<
InputType extends ShapeOf<OutputType> & Record<string, unknown>,
OutputType extends ShapeOf<InputType>,
>(
conversions: Conversions<InputType, OutputType>,
input: InputType
): OutputType {
const c = conversions as Record<string, Conversion<InputType, OutputType>>
return Object.fromEntries(
Object.keys(input).map((k: string) => {
const value = input[k]
return [k, c[k]?.(value as any) ?? value]
})
) as OutputType
}
function convertPartial<
InputType extends ShapeOf<OutputType> & Record<string, unknown>,
OutputType extends ShapeOf<InputType>,
>(
conversions: Conversions<InputType, OutputType>,
input: Partial<InputType>
): Partial<OutputType> {
const c = conversions as Record<string, Conversion<InputType, OutputType>>
return Object.fromEntries(
Object.keys(input).map((k: string) => {
const value = input[k]
return [k, c[k]?.(value as any) ?? value]
})
) as OutputType
}
/**
* Configuration interface for Trailbase Collection
*/
export interface TrailBaseCollectionConfig<
TItem extends ShapeOf<TRecord>,
TRecord extends ShapeOf<TItem> = TItem,
TKey extends string | number = string | number,
> extends Omit<
CollectionConfig<TItem, TKey>,
`sync` | `onInsert` | `onUpdate` | `onDelete`
> {
/**
* Record API name
*/
recordApi: RecordApi<TRecord>
parse: Conversions<TRecord, TItem>
serialize: Conversions<TItem, TRecord>
}
export type AwaitTxIdFn = (txId: string, timeout?: number) => Promise<boolean>
export interface TrailBaseCollectionUtils extends UtilsRecord {
cancel: () => void
}
export function trailBaseCollectionOptions<
TItem extends ShapeOf<TRecord>,
TRecord extends ShapeOf<TItem> = TItem,
TKey extends string | number = string | number,
>(
config: TrailBaseCollectionConfig<TItem, TRecord, TKey>
): CollectionConfig<TItem, TKey> & { utils: TrailBaseCollectionUtils } {
const getKey = config.getKey
const parse = (record: TRecord) =>
convert<TRecord, TItem>(config.parse, record)
const serialUpd = (item: Partial<TItem>) =>
convertPartial<TItem, TRecord>(config.serialize, item)
const serialIns = (item: TItem) =>
convert<TItem, TRecord>(config.serialize, item)
const seenIds = new Store(new Map<string, number>())
const awaitIds = (
ids: Array<string>,
timeout: number = 120 * 1000
): Promise<void> => {
const completed = (value: Map<string, number>) =>
ids.every((id) => value.has(id))
if (completed(seenIds.state)) {
return Promise.resolve()
}
return new Promise<void>((resolve, reject) => {
const timeoutId = setTimeout(() => {
unsubscribe()
reject(new Error(`Timeout waiting for ids: ${ids}`))
}, timeout)
const unsubscribe = seenIds.subscribe((value) => {
if (completed(value.currentVal)) {
clearTimeout(timeoutId)
unsubscribe()
resolve()
}
})
})
}
const weakSeenIds = new WeakRef(seenIds)
const cleanupTimer = setInterval(() => {
const seen = weakSeenIds.deref()
if (seen) {
seen.setState((curr) => {
const now = Date.now()
let anyExpired = false
const notExpired = Array.from(curr.entries()).filter(([_, v]) => {
const expired = now - v > 300 * 1000
anyExpired = anyExpired || expired
return !expired
})
if (anyExpired) {
return new Map(notExpired)
}
return curr
})
} else {
clearInterval(cleanupTimer)
}
}, 120 * 1000)
type SyncParams = Parameters<SyncConfig<TItem, TKey>[`sync`]>[0]
let eventReader: ReadableStreamDefaultReader<Event> | undefined
const cancel = () => {
if (eventReader) {
eventReader.cancel()
eventReader.releaseLock()
eventReader = undefined
}
}
const sync = {
sync: (params: SyncParams) => {
const { begin, write, commit, markReady } = params
// Initial fetch.
async function initialFetch() {
const limit = 256
let response = await config.recordApi.list({
pagination: {
limit,
},
})
let cursor = response.cursor
let got = 0
begin()
while (true) {
const length = response.records.length
if (length === 0) break
got = got + length
for (const item of response.records) {
write({
type: `insert`,
value: parse(item),
})
}
if (length < limit) break
response = await config.recordApi.list({
pagination: {
limit,
cursor,
offset: cursor === undefined ? got : undefined,
},
})
cursor = response.cursor
}
commit()
markReady()
}
// Afterwards subscribe.
async function listen(reader: ReadableStreamDefaultReader<Event>) {
while (true) {
const { done, value: event } = await reader.read()
if (done || !event) {
reader.releaseLock()
eventReader = undefined
return
}
begin()
let value: TItem | undefined
if (`Insert` in event) {
value = parse(event.Insert as TRecord)
write({ type: `insert`, value })
} else if (`Delete` in event) {
value = parse(event.Delete as TRecord)
write({ type: `delete`, value })
} else if (`Update` in event) {
value = parse(event.Update as TRecord)
write({ type: `update`, value })
} else {
console.error(`Error: ${event.Error}`)
}
commit()
if (value) {
seenIds.setState((curr: Map<string, number>) => {
const newIds = new Map(curr)
newIds.set(String(getKey(value)), Date.now())
return newIds
})
}
}
}
async function start() {
const eventStream = await config.recordApi.subscribe(`*`)
const reader = (eventReader = eventStream.getReader())
// Start listening for subscriptions first. Otherwise, we'd risk a gap
// between the initial fetch and starting to listen.
listen(reader)
try {
await initialFetch()
} catch (e) {
cancel()
markReady()
throw e
}
}
start()
},
// Expose the getSyncMetadata function
getSyncMetadata: undefined,
}
return {
...config,
sync,
getKey,
onInsert: async (
params: InsertMutationFnParams<TItem, TKey>
): Promise<Array<number | string>> => {
const ids = await config.recordApi.createBulk(
params.transaction.mutations.map((tx) => {
const { type, modified } = tx
if (type !== `insert`) {
throw new Error(`Expected 'insert', got: ${type}`)
}
return serialIns(modified)
})
)
// The optimistic mutation overlay is removed on return, so at this point
// we have to ensure that the new record was properly added to the local
// DB by the subscription.
await awaitIds(ids.map((id) => String(id)))
return ids
},
onUpdate: async (params: UpdateMutationFnParams<TItem, TKey>) => {
const ids: Array<string> = await Promise.all(
params.transaction.mutations.map(async (tx) => {
const { type, changes, key } = tx
if (type !== `update`) {
throw new Error(`Expected 'update', got: ${type}`)
}
await config.recordApi.update(key, serialUpd(changes))
return String(key)
})
)
// The optimistic mutation overlay is removed on return, so at this point
// we have to ensure that the new record was properly updated in the local
// DB by the subscription.
await awaitIds(ids)
},
onDelete: async (params: DeleteMutationFnParams<TItem, TKey>) => {
const ids: Array<string> = await Promise.all(
params.transaction.mutations.map(async (tx) => {
const { type, key } = tx
if (type !== `delete`) {
throw new Error(`Expected 'delete', got: ${type}`)
}
await config.recordApi.delete(key)
return String(key)
})
)
// The optimistic mutation overlay is removed on return, so at this point
// we have to ensure that the new record was properly updated in the local
// DB by the subscription.
await awaitIds(ids)
},
utils: {
cancel,
},
}
}