Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions lexicons/com/atproto/sync/getRecord.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"lexicon": 1,
"id": "com.atproto.sync.getRecord",
"defs": {
"main": {
"type": "query",
"description": "Gets blocks needed for existence or non-existence of record.",
"parameters": {
"type": "params",
"required": ["did", "collection", "rkey"],
"properties": {
"did": {"type": "string", "description": "The DID of the repo."},
"collection": {"type": "string" },
"rkey": {"type": "string" },
"commit": {"type": "string", "description": "An optional past commit CID."}
}
},
"output": {
"encoding": "application/cbor"
}
}
}
}
13 changes: 13 additions & 0 deletions packages/api/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import * as ComAtprotoSessionRefresh from './types/com/atproto/session/refresh'
import * as ComAtprotoSyncGetCheckout from './types/com/atproto/sync/getCheckout'
import * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommitPath'
import * as ComAtprotoSyncGetHead from './types/com/atproto/sync/getHead'
import * as ComAtprotoSyncGetRecord from './types/com/atproto/sync/getRecord'
import * as ComAtprotoSyncGetRepo from './types/com/atproto/sync/getRepo'
import * as ComAtprotoSyncUpdateRepo from './types/com/atproto/sync/updateRepo'
import * as AppBskyActorCreateScene from './types/app/bsky/actor/createScene'
Expand Down Expand Up @@ -116,6 +117,7 @@ export * as ComAtprotoSessionRefresh from './types/com/atproto/session/refresh'
export * as ComAtprotoSyncGetCheckout from './types/com/atproto/sync/getCheckout'
export * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommitPath'
export * as ComAtprotoSyncGetHead from './types/com/atproto/sync/getHead'
export * as ComAtprotoSyncGetRecord from './types/com/atproto/sync/getRecord'
export * as ComAtprotoSyncGetRepo from './types/com/atproto/sync/getRepo'
export * as ComAtprotoSyncUpdateRepo from './types/com/atproto/sync/updateRepo'
export * as AppBskyActorCreateScene from './types/app/bsky/actor/createScene'
Expand Down Expand Up @@ -615,6 +617,17 @@ export class SyncNS {
})
}

getRecord(
params?: ComAtprotoSyncGetRecord.QueryParams,
opts?: ComAtprotoSyncGetRecord.CallOptions,
): Promise<ComAtprotoSyncGetRecord.Response> {
return this._service.xrpc
.call('com.atproto.sync.getRecord', params, undefined, opts)
.catch((e) => {
throw ComAtprotoSyncGetRecord.toKnownErr(e)
})
}

getRepo(
params?: ComAtprotoSyncGetRepo.QueryParams,
opts?: ComAtprotoSyncGetRepo.CallOptions,
Expand Down
35 changes: 35 additions & 0 deletions packages/api/src/client/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,40 @@ export const schemaDict = {
},
},
},
ComAtprotoSyncGetRecord: {
lexicon: 1,
id: 'com.atproto.sync.getRecord',
defs: {
main: {
type: 'query',
description:
'Gets blocks needed for existence or non-existence of record.',
parameters: {
type: 'params',
required: ['did', 'collection', 'rkey'],
properties: {
did: {
type: 'string',
description: 'The DID of the repo.',
},
collection: {
type: 'string',
},
rkey: {
type: 'string',
},
commit: {
type: 'string',
description: 'An optional past commit CID.',
},
},
},
output: {
encoding: 'application/cbor',
},
},
},
},
ComAtprotoSyncGetRepo: {
lexicon: 1,
id: 'com.atproto.sync.getRepo',
Expand Down Expand Up @@ -3607,6 +3641,7 @@ export const ids = {
ComAtprotoSyncGetCheckout: 'com.atproto.sync.getCheckout',
ComAtprotoSyncGetCommitPath: 'com.atproto.sync.getCommitPath',
ComAtprotoSyncGetHead: 'com.atproto.sync.getHead',
ComAtprotoSyncGetRecord: 'com.atproto.sync.getRecord',
ComAtprotoSyncGetRepo: 'com.atproto.sync.getRepo',
ComAtprotoSyncUpdateRepo: 'com.atproto.sync.updateRepo',
AppBskyActorCreateScene: 'app.bsky.actor.createScene',
Expand Down
31 changes: 31 additions & 0 deletions packages/api/src/client/types/com/atproto/sync/getRecord.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import { Headers, XRPCError } from '@atproto/xrpc'

export interface QueryParams {
/** The DID of the repo. */
did: string
collection: string
rkey: string
/** An optional past commit CID. */
commit?: string
}

export type InputSchema = undefined

export interface CallOptions {
headers?: Headers
}

export interface Response {
success: boolean
headers: Headers
data: Uint8Array
}

export function toKnownErr(e: any) {
if (e instanceof XRPCError) {
}
return e
}
2 changes: 1 addition & 1 deletion packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ export * as util from './util'

export * from './util'
export * from './tid'
export * from './blocks'
export * from './ipld'
export * from './logger'
export * from './types'
export * from './streams'
24 changes: 11 additions & 13 deletions packages/common/src/blocks.ts → packages/common/src/ipld.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import * as rawCodec from 'multiformats/codecs/raw'
import { sha256 } from 'multiformats/hashes/sha2'
import * as mf from 'multiformats'
import * as cborCodec from '@ipld/dag-cbor'
import { check, schema } from '.'

export const valueToIpldBlock = async (data: unknown) => {
export const dataToCborBlock = async (data: unknown) => {
return Block.encode({
value: data,
codec: cborCodec,
Expand All @@ -28,22 +29,19 @@ export const sha256RawToCid = (hash: Uint8Array): CID => {
return CID.createV1(rawCodec.code, digest)
}

export const cidForData = async (data: unknown): Promise<CID> => {
const block = await valueToIpldBlock(data)
export const cidForCbor = async (data: unknown): Promise<CID> => {
const block = await dataToCborBlock(data)
return block.cid
}

export const valueToIpldBytes = (value: unknown): Uint8Array => {
return cborCodec.encode(value)
}

export const ipldBytesToValue = (bytes: Uint8Array) => {
return cborCodec.decode(bytes)
}
export const cborEncode = cborCodec.encode
export const cborDecode = cborCodec.decode

export const ipldBytesToRecord = (bytes: Uint8Array): object => {
const val = ipldBytesToValue(bytes)
if (typeof val !== 'object' || val === null) {
export const cborBytesToRecord = (
bytes: Uint8Array,
): Record<string, unknown> => {
const val = cborDecode(bytes)
if (!check.is(val, schema.record)) {
throw new Error(`Expected object, got: ${val}`)
}
return val
Expand Down
2 changes: 1 addition & 1 deletion packages/pds/src/api/app/bsky/notification/list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export default function (server: Server, ctx: AppContext) {
},
reason: notif.reason,
reasonSubject: notif.reasonSubject || undefined,
record: common.ipldBytesToRecord(notif.recordBytes),
record: common.cborBytesToRecord(notif.recordBytes),
isRead: notif.indexedAt <= user.lastSeenNotifs,
indexedAt: notif.indexedAt,
}))
Expand Down
4 changes: 2 additions & 2 deletions packages/pds/src/api/com/atproto/account.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import * as crypto from '@atproto/crypto'
import * as handleLib from '@atproto/handle'
import { cidForData } from '@atproto/common'
import { cidForCbor } from '@atproto/common'
import { Server, APP_BSKY_SYSTEM } from '../../../lexicon'
import { countAll } from '../../../db/util'
import * as lex from '../../../lexicon/lexicons'
Expand Down Expand Up @@ -151,7 +151,7 @@ export default function (server: Server, ctx: AppContext) {
await repoTxn.createRepo(did, [write], now)
await repoTxn.indexWrites([write], now)

const declarationCid = await cidForData(declaration)
const declarationCid = await cidForCbor(declaration)
const access = ctx.auth.createAccessToken(did)
const refresh = ctx.auth.createRefreshToken(did)
await ctx.services.auth(dbTxn).grantRefreshToken(refresh.payload)
Expand Down
35 changes: 27 additions & 8 deletions packages/pds/src/api/com/atproto/sync.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CID } from 'multiformats/cid'
import { Repo } from '@atproto/repo'
import * as repo from '@atproto/repo'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../lexicon'
import SqlRepoStorage from '../../../sql-repo-storage'
Expand Down Expand Up @@ -45,13 +45,12 @@ export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getRepo(async ({ params }) => {
const { did, from = null } = params
const storage = new SqlRepoStorage(ctx.db, did)
const root = await storage.getHead()
if (root === null) {
const head = await storage.getHead()
if (head === null) {
throw new InvalidRequestError(`Could not find repo for DID: ${did}`)
}
const repo = await Repo.load(storage, root)
const fromCid = from ? CID.parse(from) : null
const diff = await repo.getDiff(fromCid)
const diff = await repo.getDiff(storage, head, fromCid)
return {
encoding: 'application/cbor',
body: Buffer.from(diff),
Expand All @@ -61,15 +60,35 @@ export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getCheckout(async ({ params }) => {
const { did } = params
const storage = new SqlRepoStorage(ctx.db, did)
const commit = params.commit ? CID.parse(params.commit) : undefined
const repo = await Repo.load(storage, commit)
const checkout = await repo.getCheckout()
const commit = params.commit
? CID.parse(params.commit)
: await storage.getHead()
if (!commit) {
throw new InvalidRequestError(`Could not find repo for DID: ${did}`)
}
const checkout = await repo.getCheckout(storage, commit)
return {
encoding: 'application/cbor',
body: Buffer.from(checkout),
}
})

server.com.atproto.sync.getRecord(async ({ params }) => {
const { did, collection, rkey } = params
const storage = new SqlRepoStorage(ctx.db, did)
const commit = params.commit
? CID.parse(params.commit)
: await storage.getHead()
if (!commit) {
throw new InvalidRequestError(`Could not find repo for DID: ${did}`)
}
const proof = await repo.getRecords(storage, commit, [{ collection, rkey }])
return {
encoding: 'application/cbor',
body: Buffer.from(proof),
}
})

server.com.atproto.sync.updateRepo(async () => {
throw new InvalidRequestError('Not implemented')
})
Expand Down
8 changes: 8 additions & 0 deletions packages/pds/src/lexicon/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import * as ComAtprotoSessionRefresh from './types/com/atproto/session/refresh'
import * as ComAtprotoSyncGetCheckout from './types/com/atproto/sync/getCheckout'
import * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommitPath'
import * as ComAtprotoSyncGetHead from './types/com/atproto/sync/getHead'
import * as ComAtprotoSyncGetRecord from './types/com/atproto/sync/getRecord'
import * as ComAtprotoSyncGetRepo from './types/com/atproto/sync/getRepo'
import * as ComAtprotoSyncUpdateRepo from './types/com/atproto/sync/updateRepo'
import * as AppBskyActorCreateScene from './types/app/bsky/actor/createScene'
Expand Down Expand Up @@ -409,6 +410,13 @@ export class SyncNS {
return this._server.xrpc.method(nsid, cfg)
}

getRecord<AV extends AuthVerifier>(
cfg: ConfigOf<AV, ComAtprotoSyncGetRecord.Handler<ExtractAuth<AV>>>,
) {
const nsid = 'com.atproto.sync.getRecord' // @ts-ignore
return this._server.xrpc.method(nsid, cfg)
}

getRepo<AV extends AuthVerifier>(
cfg: ConfigOf<AV, ComAtprotoSyncGetRepo.Handler<ExtractAuth<AV>>>,
) {
Expand Down
35 changes: 35 additions & 0 deletions packages/pds/src/lexicon/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,40 @@ export const schemaDict = {
},
},
},
ComAtprotoSyncGetRecord: {
lexicon: 1,
id: 'com.atproto.sync.getRecord',
defs: {
main: {
type: 'query',
description:
'Gets blocks needed for existence or non-existence of record.',
parameters: {
type: 'params',
required: ['did', 'collection', 'rkey'],
properties: {
did: {
type: 'string',
description: 'The DID of the repo.',
},
collection: {
type: 'string',
},
rkey: {
type: 'string',
},
commit: {
type: 'string',
description: 'An optional past commit CID.',
},
},
},
output: {
encoding: 'application/cbor',
},
},
},
},
ComAtprotoSyncGetRepo: {
lexicon: 1,
id: 'com.atproto.sync.getRepo',
Expand Down Expand Up @@ -3607,6 +3641,7 @@ export const ids = {
ComAtprotoSyncGetCheckout: 'com.atproto.sync.getCheckout',
ComAtprotoSyncGetCommitPath: 'com.atproto.sync.getCommitPath',
ComAtprotoSyncGetHead: 'com.atproto.sync.getHead',
ComAtprotoSyncGetRecord: 'com.atproto.sync.getRecord',
ComAtprotoSyncGetRepo: 'com.atproto.sync.getRepo',
ComAtprotoSyncUpdateRepo: 'com.atproto.sync.updateRepo',
AppBskyActorCreateScene: 'app.bsky.actor.createScene',
Expand Down
37 changes: 37 additions & 0 deletions packages/pds/src/lexicon/types/com/atproto/sync/getRecord.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import express from 'express'
import stream from 'stream'
import { HandlerAuth } from '@atproto/xrpc-server'

export interface QueryParams {
/** The DID of the repo. */
did: string
collection: string
rkey: string
/** An optional past commit CID. */
commit?: string
}

export type InputSchema = undefined
export type HandlerInput = undefined

export interface HandlerSuccess {
encoding: 'application/cbor'
body: Uint8Array | stream.Readable
}

export interface HandlerError {
status: number
message?: string
}

export type HandlerOutput = HandlerError | HandlerSuccess
export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
input: HandlerInput
req: express.Request
res: express.Response
}) => Promise<HandlerOutput> | HandlerOutput
Loading