Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
0d36fe8
scaffolding repo subscriptions
dholms Jan 31, 2023
8c3590a
wip
dholms Jan 31, 2023
556a516
remove repo ops
dholms Feb 2, 2023
97668f0
setup notify/listen for db
dholms Feb 2, 2023
458e01f
end pool to fix hanging test
dholms Feb 2, 2023
027753e
small comment in test
dholms Feb 2, 2023
a714ded
Merge branch 'db-notify-listen' into subscribe-repos
dholms Feb 2, 2023
a31ca30
basic sequencer
dholms Feb 2, 2023
88a1baa
some refactoring
dholms Feb 2, 2023
2f79c3e
switch to event emitter
dholms Feb 2, 2023
12af4f6
reconnect on listener error
dholms Feb 2, 2023
105ec60
rename notifyClient
dholms Feb 2, 2023
f2c74fa
remove payload on channels
dholms Feb 2, 2023
8a5539e
pr feedback
dholms Feb 2, 2023
3346ede
Merge branch 'db-notify-listen' into subscribe-repos
dholms Feb 2, 2023
ddfa711
subscribeRepo outbox
dholms Feb 2, 2023
05fd0f4
merged
dholms Feb 2, 2023
ab5ea66
some cleanup
dholms Feb 2, 2023
7a5291b
wip
dholms Feb 2, 2023
fb052b5
wip
dholms Feb 2, 2023
76b3962
merged main
dholms Feb 3, 2023
a8c9e2d
bugfixin
dholms Feb 6, 2023
f24e1be
only send msgs after tx is committed
dholms Feb 7, 2023
65dffd1
better handle event-emitter -> generator
dholms Feb 7, 2023
c094277
max buffer size
dholms Feb 7, 2023
0993210
cleanup
dholms Feb 7, 2023
4e527d8
track sequencedAt & eventType as well
dholms Feb 7, 2023
19b60a1
fix up buffer overloading
dholms Feb 7, 2023
72c4140
clean up evt types
dholms Feb 7, 2023
35bd329
buff up tests
dholms Feb 7, 2023
ba683c4
merged main
dholms Feb 7, 2023
89d34a0
missed merge conflict
dholms Feb 7, 2023
7cc7cd0
new schema
dholms Feb 7, 2023
5b3eaee
blobs on subscriptions
dholms Feb 7, 2023
730e2c7
rm genned client subscription methods
dholms Feb 7, 2023
efc05ae
backfill limits
dholms Feb 7, 2023
e4286e2
testing subscription route & quick outbox bugfix
dholms Feb 8, 2023
550417a
fix up migration
dholms Feb 8, 2023
9ebd589
cascade on delete
dholms Feb 8, 2023
3f94b3a
comments & naming
dholms Feb 8, 2023
5fe34b0
fix dev env
dholms Feb 8, 2023
cc6d8a2
delete seqs on account deletion
dholms Feb 8, 2023
8046299
tidy
dholms Feb 8, 2023
10eea8a
fixing things up with db notify system for schemas
dholms Feb 8, 2023
018476c
fix duplicates in outbox
dholms Feb 8, 2023
dd412af
tidy
dholms Feb 8, 2023
5ca021d
Merge branch 'xrpc-streams' into subscribe-repos
dholms Feb 8, 2023
335d993
fixing up some timing issues
dholms Feb 8, 2023
fdf8557
tidy
dholms Feb 8, 2023
e474af0
terminate ws after using
dholms Feb 8, 2023
0cfdb01
bump up timer on async reader
dholms Feb 8, 2023
7f0806e
fixing up NOTIFY in txns
dholms Feb 8, 2023
fcc0141
pr feedback
dholms Feb 9, 2023
276376e
pr bugfixes
dholms Feb 9, 2023
a89b5fa
make order asc explicit
dholms Feb 9, 2023
bf6693e
merged
dholms Feb 9, 2023
f967b63
bringing tests up to speed w atpagent
dholms Feb 9, 2023
ccedc84
bump up max listeners on sequencer
dholms Feb 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions lexicons/com/atproto/sync/subscribeAllRepos.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"lexicon": 1,
"id": "com.atproto.sync.subscribeAllRepos",
"defs": {
"main": {
"type": "subscription",
"description": "Subscribe to repo updates",
"parameters": {
"type": "params",
"properties": {
"backfillFrom": {
"type": "datetime",
"description": "The last known event to backfill from. Does not dedupe as there may be an overlap in timestamps."
}
}
},
"message": {
"schema": {
"type": "union",
"refs": ["#repoAppend", "#repoRebase"]
},
"codes": {
"#repoAppend": 0,
"#repoRebase": 1
}
}
},
"repoAppend": {
"type": "object",
"required": ["time", "repo", "commit", "blocks", "blobs"],
"properties": {
"time": {"type": "datetime"},
"repo": {"type": "string"},
"commit": {"type": "string"},
"prev": {"type": "string"},
"blocks": {"type": "unknown"},
"blobs": {
"type": "array",
"items": {"type": "string"}
}
}
},
"repoRebase": {
"type": "object",
"required": ["time", "repo", "commit"],
"properties": {
"time": {"type": "datetime"},
"repo": {"type": "string"},
"commit": {"type": "string"}
}
}
}
}
2 changes: 2 additions & 0 deletions packages/api/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommit
import * as ComAtprotoSyncGetHead from './types/com/atproto/sync/getHead'
import * as ComAtprotoSyncGetRecord from './types/com/atproto/sync/getRecord'
import * as ComAtprotoSyncGetRepo from './types/com/atproto/sync/getRepo'
import * as ComAtprotoSyncSubscribeAllRepos from './types/com/atproto/sync/subscribeAllRepos'
import * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile'
import * as AppBskyActorGetProfiles from './types/app/bsky/actor/getProfiles'
import * as AppBskyActorGetSuggestions from './types/app/bsky/actor/getSuggestions'
Expand Down Expand Up @@ -137,6 +138,7 @@ export * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommit
export * as ComAtprotoSyncGetHead from './types/com/atproto/sync/getHead'
export * as ComAtprotoSyncGetRecord from './types/com/atproto/sync/getRecord'
export * as ComAtprotoSyncGetRepo from './types/com/atproto/sync/getRepo'
export * as ComAtprotoSyncSubscribeAllRepos from './types/com/atproto/sync/subscribeAllRepos'
export * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile'
export * as AppBskyActorGetProfiles from './types/app/bsky/actor/getProfiles'
export * as AppBskyActorGetSuggestions from './types/app/bsky/actor/getSuggestions'
Expand Down
76 changes: 76 additions & 0 deletions packages/api/src/client/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,81 @@ export const schemaDict = {
},
},
},
ComAtprotoSyncSubscribeAllRepos: {
lexicon: 1,
id: 'com.atproto.sync.subscribeAllRepos',
defs: {
main: {
type: 'subscription',
description: 'Subscribe to repo updates',
parameters: {
type: 'params',
properties: {
backfillFrom: {
type: 'datetime',
description:
'The last known event to backfill from. Does not dedupe as there may be an overlap in timestamps.',
},
},
},
message: {
schema: {
type: 'union',
refs: [
'lex:com.atproto.sync.subscribeAllRepos#repoAppend',
'lex:com.atproto.sync.subscribeAllRepos#repoRebase',
],
},
codes: {
'lex:com.atproto.sync.subscribeAllRepos#repoAppend': 0,
'lex:com.atproto.sync.subscribeAllRepos#repoRebase': 1,
},
},
},
repoAppend: {
type: 'object',
required: ['time', 'repo', 'commit', 'blocks', 'blobs'],
properties: {
time: {
type: 'datetime',
},
repo: {
type: 'string',
},
commit: {
type: 'string',
},
prev: {
type: 'string',
},
blocks: {
type: 'unknown',
},
blobs: {
type: 'array',
items: {
type: 'string',
},
},
},
},
repoRebase: {
type: 'object',
required: ['time', 'repo', 'commit'],
properties: {
time: {
type: 'datetime',
},
repo: {
type: 'string',
},
commit: {
type: 'string',
},
},
},
},
},
AppBskyActorGetProfile: {
lexicon: 1,
id: 'app.bsky.actor.getProfile',
Expand Down Expand Up @@ -4119,6 +4194,7 @@ export const ids = {
ComAtprotoSyncGetHead: 'com.atproto.sync.getHead',
ComAtprotoSyncGetRecord: 'com.atproto.sync.getRecord',
ComAtprotoSyncGetRepo: 'com.atproto.sync.getRepo',
ComAtprotoSyncSubscribeAllRepos: 'com.atproto.sync.subscribeAllRepos',
AppBskyActorGetProfile: 'app.bsky.actor.getProfile',
AppBskyActorGetProfiles: 'app.bsky.actor.getProfiles',
AppBskyActorGetSuggestions: 'app.bsky.actor.getSuggestions',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import { Headers, XRPCError } from '@atproto/xrpc'
import { ValidationResult } from '@atproto/lexicon'
import { isObj, hasProp } from '../../../../util'
import { lexicons } from '../../../../lexicons'

export interface RepoAppend {
time: string
repo: string
commit: string
prev?: string
blocks: {}
blobs: string[]
[k: string]: unknown
}

export function isRepoAppend(v: unknown): v is RepoAppend {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.sync.subscribeAllRepos#repoAppend'
)
}

export function validateRepoAppend(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeAllRepos#repoAppend', v)
}

export interface RepoRebase {
time: string
repo: string
commit: string
[k: string]: unknown
}

export function isRepoRebase(v: unknown): v is RepoRebase {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.sync.subscribeAllRepos#repoRebase'
)
}

export function validateRepoRebase(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeAllRepos#repoRebase', v)
}
74 changes: 74 additions & 0 deletions packages/common/src/async.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
import { wait } from './util'

// reads values from a generator into a list
// NOTE: does not signal generator to close. it *will* continue to produce values
export const readFromGenerator = async <T>(
gen: AsyncGenerator<T>,
maxLength = Number.MAX_SAFE_INTEGER,
timeout = 1000,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bumping this up to 1000 let's all the tests pass when run in the test suite

I think PG just gets a bit bogged down & doesn't deliver notifcations in time

): Promise<T[]> => {
const evts: T[] = []
while (evts.length < maxLength) {
const maybeEvt = await Promise.race([gen.next(), wait(timeout)])
if (!maybeEvt) break
const evt = maybeEvt as IteratorResult<T>
if (evt.done) break
evts.push(evt.value)
}
return evts
}

export type Deferrable = {
resolve: () => void
complete: Promise<void>
Expand All @@ -22,3 +42,57 @@ export const createDeferrables = (count: number): Deferrable[] => {
export const allComplete = async (deferrables: Deferrable[]): Promise<void> => {
await Promise.all(deferrables.map((d) => d.complete))
}

export class AsyncBuffer<T> {
private buffer: T[] = []
private promise: Promise<void>
private resolve: () => void

constructor(public maxSize?: number) {
this.resetPromise()
}

get curr(): T[] {
return this.buffer
}

get size(): number {
return this.buffer.length
}

resetPromise() {
this.promise = new Promise<void>((r) => (this.resolve = r))
}

push(item: T) {
this.buffer.push(item)
this.resolve()
}

pushMany(items: T[]) {
items.forEach((i) => this.buffer.push(i))
this.resolve()
}

async *events(): AsyncGenerator<T> {
while (true) {
await this.promise
if (this.maxSize && this.size > this.maxSize) {
throw new AsyncBufferFullError(this.maxSize)
}
const [first, ...rest] = this.buffer
if (first) {
this.buffer = rest
yield first
} else {
this.resetPromise()
}
}
}
}

export class AsyncBufferFullError extends Error {
constructor(maxSize: number) {
super(`ReachedMaxBufferSize: ${maxSize}`)
}
}
1 change: 1 addition & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ export * from './ipld'
export * from './logger'
export * from './types'
export * from './streams'
export * from './times'
4 changes: 4 additions & 0 deletions packages/common/src/times.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export const SECOND = 1000
export const MINUTE = SECOND * 60
export const HOUR = MINUTE * 60
export const DAY = HOUR * 24
22 changes: 22 additions & 0 deletions packages/common/tests/async.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { readFromGenerator, wait } from '../src'

describe('async', () => {
describe('readFromGenerator', () => {
async function* waitToYield(time: number) {
for (let i = 0; i < 5; i++) {
await wait(time)
yield true
}
}

it('reads from generator with timeout', async () => {
const read = await readFromGenerator(waitToYield(100), undefined, 101)
expect(read).toEqual([true, true, true, true, true])
})

it('stops reading at timeout', async () => {
const read = await readFromGenerator(waitToYield(100), undefined, 99)
expect(read).toEqual([])
})
})
})
3 changes: 3 additions & 0 deletions packages/dev-env/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as plc from '@atproto/plc'
import * as crypto from '@atproto/crypto'
import AtpAgent from '@atproto/api'
import { ServerType, ServerConfig, StartParams } from './types.js'
import { HOUR } from '@atproto/common'

interface Startable {
start(): Promise<http.Server>
Expand Down Expand Up @@ -102,6 +103,8 @@ export class DevEnvServer {
'f23ecd142835025f42c3db2cf25dd813956c178392760256211f9d315f8ab4d8',
privacyPolicyUrl: 'https://example.com/privacy',
termsOfServiceUrl: 'https://example.com/tos',
maxSubscriptionBuffer: 200,
repoBackfillLimitMs: HOUR,
}),
})
await startServer(pds)
Expand Down
12 changes: 4 additions & 8 deletions packages/lex-cli/src/codegen/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
LexXrpcProcedure,
LexXrpcQuery,
LexRecord,
LexXrpcSubscription,
} from '@atproto/lexicon'
import { NSID } from '@atproto/nsid'
import { gen, utilTs, lexiconsTs } from './common'
Expand Down Expand Up @@ -273,20 +272,17 @@ function genNamespaceCls(file: SourceFile, ns: DefTreeNode) {

// methods
for (const userType of ns.userTypes) {
if (
userType.def.type !== 'query' &&
userType.def.type !== 'subscription' &&
userType.def.type !== 'procedure'
) {
if (userType.def.type !== 'query' && userType.def.type !== 'procedure') {
continue
}
const isGetReq = userType.def.type === 'query'
const moduleName = toTitleCase(userType.nsid)
const name = toCamelCase(NSID.parse(userType.nsid).name || '')
const method = cls.addMethod({
name,
returnType: `Promise<${moduleName}.Response>`,
})
if (userType.def.type === 'query' || userType.def.type === 'subscription') {
if (isGetReq) {
method.addParameter({
name: 'params?',
type: `${moduleName}.QueryParams`,
Expand All @@ -304,7 +300,7 @@ function genNamespaceCls(file: SourceFile, ns: DefTreeNode) {
method.setBodyText(
[
`return this._service.xrpc`,
userType.def.type === 'query'
isGetReq
? `.call('${userType.nsid}', params, undefined, opts)`
: `.call('${userType.nsid}', opts?.qp, data, opts)`,
` .catch((e) => {`,
Expand Down
Loading