From 4a44dfc4c14d165a42fd1fe3440dc4be6b382f6d Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Mon, 1 Jul 2024 18:48:16 -0400 Subject: [PATCH 1/9] update agent Event message types in favor of Messages interface methods --- packages/agent/package.json | 4 +- packages/agent/src/dwn-api.ts | 49 ++--- .../src/prototyping/clients/dwn-rpc-types.ts | 2 +- .../clients/http-dwn-rpc-client.ts | 4 +- packages/agent/src/sync-engine-level.ts | 176 +++++------------- packages/agent/src/types/dwn.ts | 86 ++++----- packages/agent/tests/dwn-api.spec.ts | 148 ++++++++------- pnpm-lock.yaml | 39 +++- 8 files changed, 219 insertions(+), 289 deletions(-) diff --git a/packages/agent/package.json b/packages/agent/package.json index adbe44715..94def215e 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -71,7 +71,7 @@ "dependencies": { "@noble/ciphers": "0.4.1", "@scure/bip39": "1.2.2", - "@tbd54566975/dwn-sdk-js": "0.3.10", + "@tbd54566975/dwn-sdk-js": "0.4.0", "@web5/common": "1.0.0", "@web5/crypto": "1.0.0", "@web5/dids": "1.1.0", @@ -110,4 +110,4 @@ "sinon": "16.1.3", "typescript": "5.1.6" } -} \ No newline at end of file +} diff --git a/packages/agent/src/dwn-api.ts b/packages/agent/src/dwn-api.ts index b5a14bebc..4186fd160 100644 --- a/packages/agent/src/dwn-api.ts +++ b/packages/agent/src/dwn-api.ts @@ -1,7 +1,7 @@ import type { Readable } from '@web5/common'; import type { DwnConfig, GenericMessage, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; -import { Convert, NodeStream } from '@web5/common'; +import { NodeStream } from '@web5/common'; import { utils as cryptoUtils } from '@web5/crypto'; import { DidDht, DidJwk, DidResolverCacheLevel, UniversalResolver } from '@web5/dids'; import { Cid, DataStoreLevel, Dwn, DwnMethodName, EventLogLevel, Message, MessageStoreLevel, ResumableTaskStoreLevel } from '@tbd54566975/dwn-sdk-js'; @@ -357,54 +357,29 @@ export class AgentDwnApi { }): Promise> { const signer = await this.getSigner(author); - // Construct a MessagesGet message to fetch the message. - const messagesGet = await dwnMessageConstructors[DwnInterface.MessagesGet].create({ - messageCids: [messageCid], + // Construct a MessagesRead message to fetch the message. + const messagesRead = await dwnMessageConstructors[DwnInterface.MessagesRead].create({ + messageCid: messageCid, signer }); - const result = await this._dwn.processMessage(author, messagesGet.message); + const result = await this._dwn.processMessage(author, messagesRead.message); - if (!(result.entries && result.entries.length === 1)) { - throw new Error('AgentDwnApi: Expected 1 message entry in the MessagesGet response but received none or more than one.'); + if (result.status.code !== 200) { + throw new Error(`AgentDwnApi: Failed to read message, response status: ${result.status.code} - ${result.status.detail}`); } - const [ messageEntry ] = result.entries; - - const message = messageEntry.message as DwnMessage[T]; + const messageEntry = result.entry; + const message = messageEntry?.message as DwnMessage[T]; if (!message) { throw new Error(`AgentDwnApi: Message not found with CID: ${messageCid}`); } let dwnMessageWithBlob: DwnMessageWithBlob = { message }; - // isRecordsWrite(message) && (dwnMessage.data = await this.getDataForRecordsWrite({ author, message, messageEntry, messageType, signer })); - - // If the message is a RecordsWrite, either data will be present, - // OR we have to fetch it using a RecordsRead. - if (isRecordsWrite(messageEntry)) { - if (messageEntry.encodedData) { - const dataBytes = Convert.base64Url(messageEntry.encodedData).toUint8Array(); - // TODO: test adding the messageEntry.message.descriptor.dataFormat to the Blob constructor. - dwnMessageWithBlob.data = new Blob([dataBytes]); - - } else { - const recordsRead = await dwnMessageConstructors[DwnInterface.RecordsRead].create({ - filter: { - recordId: messageEntry.message.recordId - }, - signer - }); - - const reply = await this._dwn.processMessage(author, recordsRead.message); + // If the message is a RecordsWrite, data will be present in the form of a stream - if (reply.status.code >= 400) { - const { status: { code, detail } } = reply; - throw new Error(`AgentDwnApi: (${code}) Failed to read data associated with record ${messageEntry.message.recordId}. ${detail}}`); - } else if (reply.record) { - const dataBytes = await NodeStream.consumeToBytes({ readable: reply.record.data }); - dwnMessageWithBlob.data = new Blob([dataBytes]); - } - } + if (isRecordsWrite(messageEntry) && messageEntry.data) { + dwnMessageWithBlob.data = new Blob([await NodeStream.consumeToBytes({ readable: messageEntry.data })], { type: messageEntry.message.descriptor.dataFormat }); } return dwnMessageWithBlob; diff --git a/packages/agent/src/prototyping/clients/dwn-rpc-types.ts b/packages/agent/src/prototyping/clients/dwn-rpc-types.ts index 4266e7432..c1c0718f6 100644 --- a/packages/agent/src/prototyping/clients/dwn-rpc-types.ts +++ b/packages/agent/src/prototyping/clients/dwn-rpc-types.ts @@ -44,7 +44,7 @@ export type DwnRpcRequest = { /** The DID of the target to which the message is addressed. */ targetDid: string; - /** Optional subscription handler for DWN events. */ + /** Optional subscription handler for DWN message events. */ subscriptionHandler?: DwnSubscriptionHandler; } diff --git a/packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts b/packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts index 7e31df907..a865e2d57 100644 --- a/packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts +++ b/packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts @@ -66,8 +66,10 @@ export class HttpDwnRpcClient implements DwnRpc { } const { reply } = dwnRpcResponse.result; - if (dataStream) { + if (dataStream && reply.record) { reply['record']['data'] = dataStream; + } else if (dataStream && reply.entry) { + reply['entry']['data'] = dataStream; } return reply as DwnRpcResponse; diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index 90f726450..c9bae8560 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -1,23 +1,22 @@ import type { ULIDFactory } from 'ulidx'; import type { AbstractBatchOperation, AbstractLevel } from 'abstract-level'; import type { - EventsQueryReply, GenericMessage, - MessagesGetReply, + MessagesQueryReply, + MessagesReadReply, PaginationCursor, } from '@tbd54566975/dwn-sdk-js'; import ms from 'ms'; import { Level } from 'level'; import { monotonicFactory } from 'ulidx'; -import { Convert, NodeStream } from '@web5/common'; -import { DataStream } from '@tbd54566975/dwn-sdk-js'; +import { NodeStream } from '@web5/common'; import type { SyncEngine } from './types/sync.js'; import type { Web5PlatformAgent } from './types/agent.js'; import { DwnInterface } from './types/dwn.js'; -import { getDwnServiceEndpointUrls, isRecordsWrite } from './utils.js'; +import { getDwnServiceEndpointUrls, isRecordsWrite, webReadableToIsomorphicNodeReadable } from './utils.js'; export type SyncEngineLevelParams = { agent?: Web5PlatformAgent; @@ -35,9 +34,6 @@ type SyncState = { cursor?: PaginationCursor, } -const is2xx = (code: number) => code >= 200 && code <= 299; -const is4xx = (code: number) => code >= 400 && code <= 499; - export class SyncEngineLevel implements SyncEngine { /** * Holds the instance of a `Web5PlatformAgent` that represents the current execution context for @@ -108,97 +104,46 @@ export class SyncEngineLevel implements SyncEngine { continue; } - const messagesGet = await this.agent.dwn.createMessage({ + const messagesRead = await this.agent.dwn.createMessage({ author : did, - messageType : DwnInterface.MessagesGet, + messageType : DwnInterface.MessagesRead, messageParams : { - messageCids: [messageCid] + messageCid: messageCid } }); - let reply: MessagesGetReply; + let reply: MessagesReadReply; try { reply = await this.agent.rpc.sendDwnRequest({ dwnUrl, targetDid : did, - message : messagesGet - }) as MessagesGetReply; + message : messagesRead, + }) as MessagesReadReply; } catch(e) { errored.add(dwnUrl); continue; } - // TODO: Refactor this to batch network requests for record messages rather than one at a time. - // Per Moe, this loop exists because the original intent was to pass multiple messageCid - // values to batch network requests for record messages rather than one at a time, as it - // is currently implemented. Either the pull() method should be refactored to batch - // getting messages OR this loop should be removed. - for (let entry of reply.entries ?? []) { - if (entry.error || !entry.message) { - await this.addMessage(did, messageCid); - deleteOperations.push({ type: 'del', key: key }); + if (!reply.entry?.message) { + await this.addMessage(did, messageCid); + deleteOperations.push({ type: 'del', key: key }); + continue; + } - continue; - } + const replyEntry = reply.entry; + if (isRecordsWrite(replyEntry) && replyEntry.data) { + const message = replyEntry.message; let dataStream; - - if (isRecordsWrite(entry)) { - const { encodedData } = entry; - const message = entry.message; - - if (encodedData) { - const dataBytes = Convert.base64Url(encodedData).toUint8Array(); - dataStream = DataStream.fromBytes(dataBytes); - } else { - const recordsRead = await this.agent.dwn.createMessage({ - author : did, - messageType : DwnInterface.RecordsRead, - messageParams : { - filter: { - recordId: message.recordId - } - } - }); - - const recordsReadReply = await this.agent.rpc.sendDwnRequest({ - dwnUrl, - targetDid : did, - message : recordsRead.message - }); - - const { record, status: readStatus } = recordsReadReply; - - if (is2xx(readStatus.code) && record) { - // If the read was successful, convert the data stream from web ReadableStream - // to Node.js Readable so that the DWN can process it. - // TODO: Remove the type assertion once sendDwnRequest type is fixed to return a ReadableStream. - dataStream = NodeStream.fromWebReadable({ readableStream: record.data as unknown as ReadableStream }); - - } else if (readStatus.code >= 400) { - // writes record without data, if this is an initial records write, it will succeed. - const pruneReply = await this.agent.dwn.processMessage({ - targetDid: did, - message - }); - - if (pruneReply.status.code === 202 || pruneReply.status.code === 409) { - await this.addMessage(did, messageCid); - deleteOperations.push({ type: 'del', key: key }); - - continue; - } else { - throw new Error(`SyncManager: Failed to sync tombstone for message '${messageCid}'`); - } - } - } + if (replyEntry.data instanceof ReadableStream) { + dataStream = webReadableToIsomorphicNodeReadable(replyEntry.data); } const pullReply = await this.agent.dwn.processMessage({ - targetDid : did, - message : entry.message, - dataStream + targetDid: did, + message, + dataStream, }); if (pullReply.status.code === 202 || pullReply.status.code === 409) { @@ -355,40 +300,40 @@ export class SyncEngineLevel implements SyncEngine { syncDirection: SyncDirection, cursor?: PaginationCursor }) { - let eventsReply = {} as EventsQueryReply; + let messagesReply = {} as MessagesQueryReply; if (syncDirection === 'pull') { // When sync is a pull, get the event log from the remote DWN. - const eventsGetMessage = await this.agent.dwn.createMessage({ + const messagesReadMessage = await this.agent.dwn.createMessage({ author : did, - messageType : DwnInterface.EventsQuery, + messageType : DwnInterface.MessagesQuery, messageParams : { filters: [], cursor } }); try { - eventsReply = await this.agent.rpc.sendDwnRequest({ + messagesReply = await this.agent.rpc.sendDwnRequest({ dwnUrl : dwnUrl, targetDid : did, - message : eventsGetMessage - }) as EventsQueryReply; + message : messagesReadMessage + }) as MessagesQueryReply; } catch { // If a particular DWN service endpoint is unreachable, silently ignore. } } else if (syncDirection === 'push') { // When sync is a push, get the event log from the local DWN. - const eventsGetDwnResponse = await this.agent.dwn.processRequest({ + const messagesReadDwnResponse = await this.agent.dwn.processRequest({ author : did, target : did, - messageType : DwnInterface.EventsQuery, + messageType : DwnInterface.MessagesQuery, messageParams : { filters: [], cursor } }); - eventsReply = eventsGetDwnResponse.reply as EventsQueryReply; + messagesReply = messagesReadDwnResponse.reply as MessagesQueryReply; } - const eventLog = eventsReply.entries ?? []; - if (eventsReply.cursor) { - this.setCursor(did, dwnUrl, syncDirection, eventsReply.cursor); + const eventLog = messagesReply.entries ?? []; + if (messagesReply.cursor) { + this.setCursor(did, dwnUrl, syncDirection, messagesReply.cursor); } return eventLog; @@ -401,63 +346,32 @@ export class SyncEngineLevel implements SyncEngine { let { reply } = await this.agent.dwn.processRequest({ author : author, target : author, - messageType : DwnInterface.MessagesGet, + messageType : DwnInterface.MessagesRead, messageParams : { - messageCids: [messageCid] + messageCid: messageCid } }); + + // Absence of a messageEntry or message within messageEntry can happen because updating a // Record creates another RecordsWrite with the same recordId. Only the first and // most recent RecordsWrite messages are kept for a given recordId. Any RecordsWrite messages // that aren't the first or most recent are discarded by the DWN. - if (!(reply.entries && reply.entries.length === 1)) { + if (reply.status.code !== 200 || !reply.entry) { return undefined; } - - const [ messageEntry ] = reply.entries; - - const message = messageEntry.message; - if (!message) { + const messageEntry = reply.entry; + if (!messageEntry) { return undefined; } - let dwnMessageWithBlob: { message: GenericMessage, data?: Blob } = { message }; + let dwnMessageWithBlob: { message: GenericMessage, data?: Blob } = { message: messageEntry.message }; // If the message is a RecordsWrite, either data will be present, // OR we have to fetch it using a RecordsRead. - if (isRecordsWrite(messageEntry)) { - if (messageEntry.encodedData) { - const dataBytes = Convert.base64Url(messageEntry.encodedData).toUint8Array(); - // ! TODO: test adding the messageEntry.message.descriptor.dataFormat to the Blob constructor. - dwnMessageWithBlob.data = new Blob([dataBytes]); - - } else { - let readResponse = await this.agent.dwn.processRequest({ - author : author, - target : author, - messageType : DwnInterface.RecordsRead, - messageParams : { filter: { recordId: messageEntry.message.recordId } } - }); - - const reply = readResponse.reply; - - if (is2xx(reply.status.code) && reply.record) { - // If status code is 200-299, return the data. - dwnMessageWithBlob.data = await NodeStream.consumeToBlob({ readable: reply.record.data }); - - } else if (is4xx(reply.status.code)) { - // If status code is 400-499, typically 404 indicating the data no longer exists, it is - // likely that a `RecordsDelete` took place. `RecordsDelete` keeps a `RecordsWrite` and - // deletes the associated data, effectively acting as a "tombstone." Sync still needs to - // push this tombstone so that the `RecordsDelete` can be processed successfully. - - } else { - // If status code is anything else (likely 5xx), throw an error. - const { status: { code, detail } } = reply; - throw new Error(`SyncEngineLevel: (${code}) Failed to read data associated with record ${messageEntry.message.recordId}. ${detail}}`); - } - } + if (isRecordsWrite(messageEntry) && messageEntry.data) { + dwnMessageWithBlob.data = new Blob([await NodeStream.consumeToBytes({ readable: messageEntry.data })], { type: messageEntry.message.descriptor.dataFormat }); } return dwnMessageWithBlob; diff --git a/packages/agent/src/types/dwn.ts b/packages/agent/src/types/dwn.ts index 921733e08..24ffc280c 100644 --- a/packages/agent/src/types/dwn.ts +++ b/packages/agent/src/types/dwn.ts @@ -1,14 +1,8 @@ import type { DidService } from '@web5/dids'; import type { Readable, RequireOnly } from '@web5/common'; import type { - EventsQueryReply, - MessagesGetReply, RecordsReadReply, RecordsQueryReply, - EventsQueryMessage, - EventsQueryOptions, - MessagesGetMessage, - MessagesGetOptions, RecordsReadMessage, RecordsReadOptions, GenericMessageReply, @@ -23,18 +17,23 @@ import type { ProtocolsQueryOptions, ProtocolsConfigureMessage, ProtocolsConfigureOptions, - EventsSubscribeMessage, RecordsSubscribeMessage, - EventsSubscribeOptions, RecordsSubscribeOptions, - EventsSubscribeReply, RecordsSubscribeReply, MessageSubscriptionHandler, RecordSubscriptionHandler, + MessagesQueryMessage, + MessagesReadMessage, + MessagesSubscribeMessage, + MessagesQueryOptions, + MessagesReadOptions, + MessagesSubscribeOptions, + MessagesQueryReply, + MessagesReadReply, + MessagesSubscribeReply, } from '@tbd54566975/dwn-sdk-js'; import { - MessagesGet, RecordsRead, RecordsQuery, RecordsWrite, @@ -43,9 +42,10 @@ import { ProtocolsQuery, DwnInterfaceName, ProtocolsConfigure, - EventsQuery, - EventsSubscribe, RecordsSubscribe, + MessagesQuery, + MessagesRead, + MessagesSubscribe, } from '@tbd54566975/dwn-sdk-js'; /** @@ -87,22 +87,22 @@ export interface DwnDidService extends DidService { } export enum DwnInterface { - EventsQuery = DwnInterfaceName.Events + DwnMethodName.Query, - EventsSubscribe = DwnInterfaceName.Events + DwnMethodName.Subscribe, - MessagesGet = DwnInterfaceName.Messages + DwnMethodName.Get, - ProtocolsConfigure = DwnInterfaceName.Protocols + DwnMethodName.Configure, - ProtocolsQuery = DwnInterfaceName.Protocols + DwnMethodName.Query, - RecordsDelete = DwnInterfaceName.Records + DwnMethodName.Delete, - RecordsQuery = DwnInterfaceName.Records + DwnMethodName.Query, - RecordsRead = DwnInterfaceName.Records + DwnMethodName.Read, - RecordsSubscribe = DwnInterfaceName.Records + DwnMethodName.Subscribe, - RecordsWrite = DwnInterfaceName.Records + DwnMethodName.Write + MessagesQuery = DwnInterfaceName.Messages + DwnMethodName.Query, + MessagesRead = DwnInterfaceName.Messages + DwnMethodName.Read, + MessagesSubscribe = DwnInterfaceName.Messages + DwnMethodName.Subscribe, + ProtocolsConfigure = DwnInterfaceName.Protocols + DwnMethodName.Configure, + ProtocolsQuery = DwnInterfaceName.Protocols + DwnMethodName.Query, + RecordsDelete = DwnInterfaceName.Records + DwnMethodName.Delete, + RecordsQuery = DwnInterfaceName.Records + DwnMethodName.Query, + RecordsRead = DwnInterfaceName.Records + DwnMethodName.Read, + RecordsSubscribe = DwnInterfaceName.Records + DwnMethodName.Subscribe, + RecordsWrite = DwnInterfaceName.Records + DwnMethodName.Write } export interface DwnMessage { - [DwnInterface.EventsSubscribe] : EventsSubscribeMessage; - [DwnInterface.EventsQuery] : EventsQueryMessage; - [DwnInterface.MessagesGet] : MessagesGetMessage; + [DwnInterface.MessagesQuery] : MessagesQueryMessage; + [DwnInterface.MessagesRead] : MessagesReadMessage; + [DwnInterface.MessagesSubscribe] : MessagesSubscribeMessage; [DwnInterface.ProtocolsConfigure] : ProtocolsConfigureMessage; [DwnInterface.ProtocolsQuery] : ProtocolsQueryMessage; [DwnInterface.RecordsDelete] : RecordsDeleteMessage; @@ -113,9 +113,9 @@ export interface DwnMessage { } export interface DwnMessageDescriptor { - [DwnInterface.EventsSubscribe] : EventsSubscribeMessage['descriptor']; - [DwnInterface.EventsQuery] : EventsQueryMessage['descriptor']; - [DwnInterface.MessagesGet] : MessagesGetMessage['descriptor']; + [DwnInterface.MessagesQuery] : MessagesQueryMessage['descriptor']; + [DwnInterface.MessagesRead] : MessagesReadMessage['descriptor']; + [DwnInterface.MessagesSubscribe] : MessagesSubscribeMessage['descriptor']; [DwnInterface.ProtocolsConfigure] : ProtocolsConfigureMessage['descriptor']; [DwnInterface.ProtocolsQuery] : ProtocolsQueryMessage['descriptor']; [DwnInterface.RecordsDelete] : RecordsDeleteMessage['descriptor']; @@ -126,9 +126,9 @@ export interface DwnMessageDescriptor { } export interface DwnMessageParams { - [DwnInterface.EventsQuery] : RequireOnly; - [DwnInterface.EventsSubscribe] : Partial; - [DwnInterface.MessagesGet] : RequireOnly; + [DwnInterface.MessagesQuery] : RequireOnly; + [DwnInterface.MessagesRead] : RequireOnly; + [DwnInterface.MessagesSubscribe] : Partial; [DwnInterface.ProtocolsConfigure] : RequireOnly; [DwnInterface.ProtocolsQuery] : ProtocolsQueryOptions; [DwnInterface.RecordsDelete] : RequireOnly; @@ -139,9 +139,9 @@ export interface DwnMessageParams { } export interface DwnMessageReply { - [DwnInterface.EventsQuery] : EventsQueryReply; - [DwnInterface.EventsSubscribe] : EventsSubscribeReply; - [DwnInterface.MessagesGet] : MessagesGetReply; + [DwnInterface.MessagesQuery] : MessagesQueryReply; + [DwnInterface.MessagesRead] : MessagesReadReply; + [DwnInterface.MessagesSubscribe] : MessagesSubscribeReply; [DwnInterface.ProtocolsConfigure] : GenericMessageReply; [DwnInterface.ProtocolsQuery] : ProtocolsQueryReply; [DwnInterface.RecordsDelete] : GenericMessageReply; @@ -152,12 +152,12 @@ export interface DwnMessageReply { } export interface MessageHandler { - [DwnInterface.EventsSubscribe] : MessageSubscriptionHandler; + [DwnInterface.MessagesSubscribe] : MessageSubscriptionHandler; [DwnInterface.RecordsSubscribe] : RecordSubscriptionHandler; // define all of them individually as undefined - [DwnInterface.EventsQuery] : undefined; - [DwnInterface.MessagesGet] : undefined; + [DwnInterface.MessagesQuery] : undefined; + [DwnInterface.MessagesRead] : undefined; [DwnInterface.ProtocolsConfigure] : undefined; [DwnInterface.ProtocolsQuery] : undefined; [DwnInterface.RecordsDelete] : undefined; @@ -210,9 +210,9 @@ export interface DwnMessageConstructor { } export const dwnMessageConstructors: { [T in DwnInterface]: DwnMessageConstructor } = { - [DwnInterface.EventsQuery] : EventsQuery as any, - [DwnInterface.EventsSubscribe] : EventsSubscribe as any, - [DwnInterface.MessagesGet] : MessagesGet as any, + [DwnInterface.MessagesQuery] : MessagesQuery as any, + [DwnInterface.MessagesRead] : MessagesRead as any, + [DwnInterface.MessagesSubscribe] : MessagesSubscribe as any, [DwnInterface.ProtocolsConfigure] : ProtocolsConfigure as any, [DwnInterface.ProtocolsQuery] : ProtocolsQuery as any, [DwnInterface.RecordsDelete] : RecordsDelete as any, @@ -225,9 +225,9 @@ export const dwnMessageConstructors: { [T in DwnInterface]: DwnMessageConstructo export type DwnMessageConstructors = typeof dwnMessageConstructors; export interface DwnMessageInstance { - [DwnInterface.EventsQuery] : EventsQuery; - [DwnInterface.EventsSubscribe] : EventsSubscribe; - [DwnInterface.MessagesGet] : MessagesGet; + [DwnInterface.MessagesQuery] : MessagesQuery; + [DwnInterface.MessagesRead] : MessagesRead; + [DwnInterface.MessagesSubscribe] : MessagesSubscribe; [DwnInterface.ProtocolsConfigure] : ProtocolsConfigure; [DwnInterface.ProtocolsQuery] : ProtocolsQuery; [DwnInterface.RecordsDelete] : RecordsDelete; diff --git a/packages/agent/tests/dwn-api.spec.ts b/packages/agent/tests/dwn-api.spec.ts index 4fc15d138..7362b2a68 100644 --- a/packages/agent/tests/dwn-api.spec.ts +++ b/packages/agent/tests/dwn-api.spec.ts @@ -4,7 +4,7 @@ import sinon from 'sinon'; import { expect } from 'chai'; import { DidDht } from '@web5/dids'; -import { Convert } from '@web5/common'; +import { Convert, NodeStream } from '@web5/common'; import type { PortableIdentity } from '../src/types/identity.js'; @@ -100,7 +100,7 @@ describe('AgentDwnApi', () => { await testHarness.clearStorage(); }); - it('handles EventsQuery', async () => { + it('handles MessagesQuery', async () => { const testCursor = { messageCid : 'foo', value : 'bar' @@ -108,32 +108,32 @@ describe('AgentDwnApi', () => { const testFilters = [{ protocol: 'http://protocol1' }]; - // Attempt to process the EventsGet. - let eventsQueryResponse = await testHarness.agent.dwn.processRequest({ + // Attempt to process the MessagesQuery. + let messagesQueryResponse = await testHarness.agent.dwn.processRequest({ author : alice.did.uri, target : alice.did.uri, - messageType : DwnInterface.EventsQuery, + messageType : DwnInterface.MessagesQuery, messageParams : { cursor : testCursor, filters : testFilters } }); - expect(eventsQueryResponse).to.have.property('message'); - expect(eventsQueryResponse).to.have.property('messageCid'); - expect(eventsQueryResponse).to.have.property('reply'); + expect(messagesQueryResponse).to.have.property('message'); + expect(messagesQueryResponse).to.have.property('messageCid'); + expect(messagesQueryResponse).to.have.property('reply'); - const eventsQueryMessage = eventsQueryResponse.message!; - expect(eventsQueryMessage.descriptor).to.have.property('cursor', testCursor); - expect(eventsQueryMessage.descriptor.filters).to.deep.equal(testFilters); + const messagesQueryMessage = messagesQueryResponse.message!; + expect(messagesQueryMessage.descriptor).to.have.property('cursor', testCursor); + expect(messagesQueryMessage.descriptor.filters).to.deep.equal(testFilters); - const eventsQueryReply = eventsQueryResponse.reply; - expect(eventsQueryReply).to.have.property('status'); - expect(eventsQueryReply.status.code).to.equal(200); - expect(eventsQueryReply.entries).to.have.length(0); + const messagesQueryReply = messagesQueryResponse.reply; + expect(messagesQueryReply).to.have.property('status'); + expect(messagesQueryReply.status.code).to.equal(200); + expect(messagesQueryReply.entries).to.have.length(0); }); - it('handles EventsSubscription', async () => { + it('handles MessageSubscription', async () => { const receivedMessages: string[] = []; const subscriptionHandler = async (event: MessageEvent) => { const { message } = event; @@ -144,7 +144,7 @@ describe('AgentDwnApi', () => { const { reply: { status: subscribeStatus, subscription } } = await testHarness.agent.dwn.processRequest({ author : alice.did.uri, target : alice.did.uri, - messageType : DwnInterface.EventsSubscribe, + messageType : DwnInterface.MessagesSubscribe, messageParams : { filters: [{ protocol: 'https://protocol.xyz/example' @@ -240,11 +240,11 @@ describe('AgentDwnApi', () => { ]); }); - it('handles MessagesGet', async () => { + it('handles MessagesRead', async () => { // Create test data to write. const dataBytes = Convert.string('Hello, world!').toUint8Array(); - // Write a record to use for the MessagesGet test. + // Write a record to use for the MessagesRead test. let writeResponse = await testHarness.agent.dwn.processRequest({ author : alice.did.uri, target : alice.did.uri, @@ -258,31 +258,29 @@ describe('AgentDwnApi', () => { expect(writeResponse.reply.status.code).to.equal(202); const writeMessage = writeResponse.message!; - // Attempt to process the MessagesGet. - let messagesGetResponse = await testHarness.agent.dwn.processRequest({ + // Attempt to process the MessagesRead. + let messagesReadResponse = await testHarness.agent.dwn.processRequest({ author : alice.did.uri, target : alice.did.uri, - messageType : DwnInterface.MessagesGet, + messageType : DwnInterface.MessagesRead, messageParams : { - messageCids: [writeResponse.messageCid!] + messageCid: writeResponse.messageCid! } }); - expect(messagesGetResponse).to.have.property('message'); - expect(messagesGetResponse).to.have.property('messageCid'); - expect(messagesGetResponse).to.have.property('reply'); + expect(messagesReadResponse).to.have.property('message'); + expect(messagesReadResponse).to.have.property('messageCid'); + expect(messagesReadResponse).to.have.property('reply'); - const messagesGetMessage = messagesGetResponse.message!; - expect(messagesGetMessage.descriptor).to.have.property('messageCids'); - expect(messagesGetMessage.descriptor.messageCids).to.have.length(1); - expect(messagesGetMessage.descriptor.messageCids).to.include(writeResponse.messageCid); + const messagesReadMessage = messagesReadResponse.message!; + expect(messagesReadMessage.descriptor).to.have.property('messageCid'); + expect(messagesReadMessage.descriptor.messageCid).to.equal(writeResponse.messageCid); - const messagesGetReply = messagesGetResponse.reply; - expect(messagesGetReply).to.have.property('status'); - expect(messagesGetReply.status.code).to.equal(200); - expect(messagesGetReply.entries).to.have.length(1); + const messagesReadReply = messagesReadResponse.reply; + expect(messagesReadReply).to.have.property('status'); + expect(messagesReadReply.status.code).to.equal(200); - const [ retrievedRecordsWrite ] = messagesGetReply.entries!; + const retrievedRecordsWrite = messagesReadReply.entry!; expect(retrievedRecordsWrite.message).to.have.property('recordId', writeMessage.recordId); }); @@ -484,6 +482,9 @@ describe('AgentDwnApi', () => { expect(readReply.record).to.have.property('data'); expect(readReply.record).to.have.property('descriptor'); expect(readReply.record).to.have.property('recordId', writeMessage.recordId); + + const readDataBytes = await NodeStream.consumeToBytes({ readable: readReply.record!.data }); + expect(readDataBytes).to.deep.equal(dataBytes); }); it('handles RecordsSubscribe message', async () => { @@ -847,7 +848,7 @@ describe('AgentDwnApi', () => { await testHarness.closeStorage(); }); - it('handles EventsQuery', async () => { + it('handles MessagesQuery', async () => { const testCursor = { messageCid : 'foo', value : 'bar' @@ -855,32 +856,32 @@ describe('AgentDwnApi', () => { const testFilters = [{ protocol: 'http://protocol1' }]; - // Attempt to process the EventsGet. - let eventsQueryResponse = await testHarness.agent.dwn.sendRequest({ + // Attempt to process the MessagesQuery. + let messagesQueryResponse = await testHarness.agent.dwn.sendRequest({ author : alice.did.uri, target : alice.did.uri, - messageType : DwnInterface.EventsQuery, + messageType : DwnInterface.MessagesQuery, messageParams : { cursor : testCursor, filters : testFilters } }); - expect(eventsQueryResponse).to.have.property('message'); - expect(eventsQueryResponse).to.have.property('messageCid'); - expect(eventsQueryResponse).to.have.property('reply'); + expect(messagesQueryResponse).to.have.property('message'); + expect(messagesQueryResponse).to.have.property('messageCid'); + expect(messagesQueryResponse).to.have.property('reply'); - const eventsQueryMessage = eventsQueryResponse.message!; - expect(eventsQueryMessage.descriptor).to.have.property('cursor', testCursor); - expect(eventsQueryMessage.descriptor.filters).to.deep.equal(testFilters); + const messagesQueryMessage = messagesQueryResponse.message!; + expect(messagesQueryMessage.descriptor).to.have.property('cursor', testCursor); + expect(messagesQueryMessage.descriptor.filters).to.deep.equal(testFilters); - const eventsQueryReply = eventsQueryResponse.reply; - expect(eventsQueryReply).to.have.property('status'); - expect(eventsQueryReply.status.code).to.equal(200); - expect(eventsQueryReply.entries).to.have.length(0); + const messagesQueryReply = messagesQueryResponse.reply; + expect(messagesQueryReply).to.have.property('status'); + expect(messagesQueryReply.status.code).to.equal(200); + expect(messagesQueryReply.entries).to.have.length(0); }); - it('handles EventsSubscription', async () => { + it('handles MessagesSubscribe', async () => { const receivedMessages: string[] = []; const subscriptionHandler = async (event: MessageEvent) => { const { message } = event; @@ -891,7 +892,7 @@ describe('AgentDwnApi', () => { const { reply: { status: subscribeStatus, subscription } } = await testHarness.agent.dwn.sendRequest({ author : alice.did.uri, target : alice.did.uri, - messageType : DwnInterface.EventsSubscribe, + messageType : DwnInterface.MessagesSubscribe, messageParams : { filters: [{ protocol: 'https://protocol.xyz/example' @@ -987,11 +988,11 @@ describe('AgentDwnApi', () => { ]); }); - it('handles MessagesGet', async () => { + it('handles MessagesRead', async () => { // Create test data to write. const dataBytes = Convert.string('Hello, world!').toUint8Array(); - // Write a record to use for the MessagesGet test. + // Write a record to use for the MessagesRead test. let writeResponse = await testHarness.agent.dwn.sendRequest({ author : alice.did.uri, target : alice.did.uri, @@ -1005,32 +1006,32 @@ describe('AgentDwnApi', () => { expect(writeResponse.reply.status.code).to.equal(202); const writeMessage = writeResponse.message!; - // Attempt to process the MessagesGet. - let messagesGetResponse = await testHarness.agent.dwn.sendRequest({ + // Attempt to process the MessagesRead. + let messagesReadResponse = await testHarness.agent.dwn.sendRequest({ author : alice.did.uri, target : alice.did.uri, - messageType : DwnInterface.MessagesGet, + messageType : DwnInterface.MessagesRead, messageParams : { - messageCids: [writeResponse.messageCid!] + messageCid: writeResponse.messageCid! } }); - expect(messagesGetResponse).to.have.property('message'); - expect(messagesGetResponse).to.have.property('messageCid'); - expect(messagesGetResponse).to.have.property('reply'); - - const messagesGetMessage = messagesGetResponse.message!; - expect(messagesGetMessage.descriptor).to.have.property('messageCids'); - expect(messagesGetMessage.descriptor.messageCids).to.have.length(1); - expect(messagesGetMessage.descriptor.messageCids).to.include(writeResponse.messageCid); + expect(messagesReadResponse).to.have.property('message'); + expect(messagesReadResponse).to.have.property('messageCid'); + expect(messagesReadResponse).to.have.property('reply'); - const messagesGetReply = messagesGetResponse.reply; - expect(messagesGetReply).to.have.property('status'); - expect(messagesGetReply.status.code).to.equal(200); - expect(messagesGetReply.entries).to.have.length(1); + const messagesReadMessage = messagesReadResponse.message!; + expect(messagesReadMessage.descriptor).to.have.property('messageCid'); + expect(messagesReadMessage.descriptor.messageCid).to.equal(writeResponse.messageCid); - const [ retrievedRecordsWrite ] = messagesGetReply.entries!; + const messagesReadReply = messagesReadResponse.reply; + expect(messagesReadReply).to.have.property('status'); + expect(messagesReadReply.status.code).to.equal(200); + const retrievedRecordsWrite = messagesReadReply.entry!; expect(retrievedRecordsWrite.message).to.have.property('recordId', writeMessage.recordId); + + const readDataBytes = await NodeStream.consumeToBytes({ readable: retrievedRecordsWrite.data! }); + expect(readDataBytes).to.deep.equal(dataBytes); }); it('handles ProtocolsConfigure', async () => { @@ -1231,6 +1232,9 @@ describe('AgentDwnApi', () => { expect(readReply.record).to.have.property('data'); expect(readReply.record).to.have.property('descriptor'); expect(readReply.record).to.have.property('recordId', writeMessage.recordId); + + const readDataBytes = await NodeStream.consumeToBytes({ readable: readReply.record!.data }); + expect(readDataBytes).to.deep.equal(dataBytes); }); it('handles RecordsSubscribe message', async () => { @@ -1569,12 +1573,12 @@ describe('AgentDwnApi', () => { expect(error.message).to.include('AgentDwnApi: Subscription handler is required for subscription requests.'); } - // EventsSubscribe message without a subscriptionHandler + // MessagesSubscribe message without a subscriptionHandler try { await testHarness.agent.dwn.sendRequest({ author : alice.did.uri, target : alice.did.uri, - messageType : DwnInterface.EventsSubscribe, + messageType : DwnInterface.MessagesSubscribe, messageParams : {} }); expect.fail('Expected an error to be thrown'); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index fa9fe3485..e8bda3ed5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -46,8 +46,8 @@ importers: specifier: 1.2.2 version: 1.2.2 '@tbd54566975/dwn-sdk-js': - specifier: 0.3.10 - version: 0.3.10 + specifier: 0.4.0 + version: 0.4.0 '@web5/common': specifier: 1.0.0 version: 1.0.0 @@ -3383,6 +3383,41 @@ packages: transitivePeerDependencies: - encoding - supports-color + dev: true + + /@tbd54566975/dwn-sdk-js@0.4.0: + resolution: {integrity: sha512-eBDjIZQEsxAagKwDbHKzML00/jXlnRN9FLnV9Qx/4UkxZdKRM7IXghFnTRE7aYkwQS8nveAVcijBw46ARSPKcw==} + engines: {node: '>= 18'} + dependencies: + '@ipld/dag-cbor': 9.0.3 + '@js-temporal/polyfill': 0.4.4 + '@noble/ciphers': 0.5.3 + '@noble/ed25519': 2.0.0 + '@noble/secp256k1': 2.0.0 + '@web5/dids': 1.1.0 + abstract-level: 1.0.3 + ajv: 8.12.0 + blockstore-core: 4.2.0 + cross-fetch: 4.0.0 + eciesjs: 0.4.5 + interface-blockstore: 5.2.3 + interface-store: 5.1.2 + ipfs-unixfs-exporter: 13.1.5 + ipfs-unixfs-importer: 15.1.5 + level: 8.0.0 + lodash: 4.17.21 + lru-cache: 9.1.2 + ms: 2.1.3 + multiformats: 11.0.2 + randombytes: 2.1.0 + readable-stream: 4.5.2 + ulidx: 2.1.0 + uuid: 8.3.2 + varint: 6.0.0 + transitivePeerDependencies: + - encoding + - supports-color + dev: false /@tbd54566975/dwn-sql-store@0.5.2: resolution: {integrity: sha512-0NiJraazqgtsLWqju/sQSPoVBv/PbTPkBUMVpcJ64RlCarZA+u7IjJL3/rnJNhUyARjLMyNIBjk33o7jU1zPMQ==} From 28b8012905da3d97475f632716a3bd080b2186b4 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Mon, 1 Jul 2024 21:31:15 -0400 Subject: [PATCH 2/9] upgrade dependencies --- package.json | 4 +- packages/api/package.json | 2 +- packages/dev-env/docker-compose.yaml | 2 +- pnpm-lock.yaml | 57 ++++++---------------------- 4 files changed, 15 insertions(+), 50 deletions(-) diff --git a/package.json b/package.json index c68c47c2e..5f0eb8e41 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "@changesets/cli": "^2.27.5", "@npmcli/package-json": "5.0.0", "@typescript-eslint/eslint-plugin": "7.9.0", - "@web5/dwn-server": "0.3.1", + "@web5/dwn-server": "0.4.0", "audit-ci": "^7.0.1", "eslint-plugin-mocha": "10.4.3", "npkill": "0.11.3" @@ -42,4 +42,4 @@ "ws@<8.17.1": ">=8.17.1" } } -} \ No newline at end of file +} diff --git a/packages/api/package.json b/packages/api/package.json index aa9f5c389..89aba6ef9 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -85,7 +85,7 @@ }, "devDependencies": { "@playwright/test": "1.40.1", - "@tbd54566975/dwn-sdk-js": "0.3.10", + "@tbd54566975/dwn-sdk-js": "0.4.0", "@types/chai": "4.3.6", "@types/eslint": "8.56.10", "@types/mocha": "10.0.1", diff --git a/packages/dev-env/docker-compose.yaml b/packages/dev-env/docker-compose.yaml index e26494326..f187ff5f8 100644 --- a/packages/dev-env/docker-compose.yaml +++ b/packages/dev-env/docker-compose.yaml @@ -3,6 +3,6 @@ version: "3.98" services: dwn-server: container_name: dwn-server - image: ghcr.io/tbd54566975/dwn-server:dwn-sdk-0.3.10 + image: ghcr.io/tbd54566975/dwn-server:dwn-sdk-0.4.0 ports: - "3000:3000" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e8bda3ed5..a93d8ca8d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -25,8 +25,8 @@ importers: specifier: 7.9.0 version: 7.9.0(@typescript-eslint/parser@7.14.1)(eslint@8.57.0)(typescript@5.4.5) '@web5/dwn-server': - specifier: 0.3.1 - version: 0.3.1 + specifier: 0.4.0 + version: 0.4.0 audit-ci: specifier: ^7.0.1 version: 7.0.1 @@ -174,8 +174,8 @@ importers: specifier: 1.40.1 version: 1.40.1 '@tbd54566975/dwn-sdk-js': - specifier: 0.3.10 - version: 0.3.10 + specifier: 0.4.0 + version: 0.4.0 '@types/chai': specifier: 4.3.6 version: 4.3.6 @@ -3351,40 +3351,6 @@ packages: - supports-color dev: true - /@tbd54566975/dwn-sdk-js@0.3.10: - resolution: {integrity: sha512-Ky59hx7Diw2dp0rQdIuk6b/ige3C0mRatQiQNwCWvq6gedkKBP+efqp+1l2xhjKiEanwrOJi39gWkK02jkngmg==} - engines: {node: '>= 18'} - dependencies: - '@ipld/dag-cbor': 9.0.3 - '@js-temporal/polyfill': 0.4.4 - '@noble/ciphers': 0.5.3 - '@noble/ed25519': 2.0.0 - '@noble/secp256k1': 2.0.0 - '@web5/dids': 1.1.0 - abstract-level: 1.0.3 - ajv: 8.12.0 - blockstore-core: 4.2.0 - cross-fetch: 4.0.0 - eciesjs: 0.4.5 - interface-blockstore: 5.2.3 - interface-store: 5.1.2 - ipfs-unixfs-exporter: 13.1.5 - ipfs-unixfs-importer: 15.1.5 - level: 8.0.0 - lodash: 4.17.21 - lru-cache: 9.1.2 - ms: 2.1.3 - multiformats: 11.0.2 - randombytes: 2.1.0 - readable-stream: 4.5.2 - ulidx: 2.1.0 - uuid: 8.3.2 - varint: 6.0.0 - transitivePeerDependencies: - - encoding - - supports-color - dev: true - /@tbd54566975/dwn-sdk-js@0.4.0: resolution: {integrity: sha512-eBDjIZQEsxAagKwDbHKzML00/jXlnRN9FLnV9Qx/4UkxZdKRM7IXghFnTRE7aYkwQS8nveAVcijBw46ARSPKcw==} engines: {node: '>= 18'} @@ -3417,14 +3383,13 @@ packages: transitivePeerDependencies: - encoding - supports-color - dev: false - /@tbd54566975/dwn-sql-store@0.5.2: - resolution: {integrity: sha512-0NiJraazqgtsLWqju/sQSPoVBv/PbTPkBUMVpcJ64RlCarZA+u7IjJL3/rnJNhUyARjLMyNIBjk33o7jU1zPMQ==} + /@tbd54566975/dwn-sql-store@0.6.0: + resolution: {integrity: sha512-9o9W2A/gXsmj+n+H5debmOrQelybS9g2sPxFcB46zvT8Zpe9OAKUB9j8s7o1XEeUflyqAW1gf6+q3KKO/UhPHQ==} engines: {node: '>=18'} dependencies: '@ipld/dag-cbor': 9.0.5 - '@tbd54566975/dwn-sdk-js': 0.3.10 + '@tbd54566975/dwn-sdk-js': 0.4.0 kysely: 0.26.3 multiformats: 12.0.1 readable-stream: 4.4.2 @@ -4636,12 +4601,12 @@ packages: level: 8.0.1 ms: 2.1.3 - /@web5/dwn-server@0.3.1: - resolution: {integrity: sha512-k3+Cmyv8wiujhLyuFfR9O7meBiw/b6me1QSil58t30OM8VIWq9qqfXPacJTRhSkqf1KB2k4w96T/FAfQ5Z91Fw==} + /@web5/dwn-server@0.4.0: + resolution: {integrity: sha512-2zc/REV4ibLRS0ZljF3905ZYBkv1kAaeXksgIgZrTND/xudJHid3aMKhxV2ihzO+Zvkcva2P5ho7ys4vFSU/EA==} hasBin: true dependencies: - '@tbd54566975/dwn-sdk-js': 0.3.10 - '@tbd54566975/dwn-sql-store': 0.5.2 + '@tbd54566975/dwn-sdk-js': 0.4.0 + '@tbd54566975/dwn-sql-store': 0.6.0 better-sqlite3: 8.7.0 body-parser: 1.20.2 bytes: 3.1.2 From 9a56e42ca22bc91bf83b03fdb848e73bfa8ead8e Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 2 Jul 2024 09:51:55 -0400 Subject: [PATCH 3/9] add test for sendRequest by messageCid and fix Read tests for browser --- packages/agent/src/sync-engine-level.ts | 5 +--- packages/agent/tests/dwn-api.spec.ts | 37 +++++++++++++++++++++---- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index c9bae8560..f2404fd21 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -361,10 +361,7 @@ export class SyncEngineLevel implements SyncEngine { if (reply.status.code !== 200 || !reply.entry) { return undefined; } - const messageEntry = reply.entry; - if (!messageEntry) { - return undefined; - } + const messageEntry = reply.entry!; let dwnMessageWithBlob: { message: GenericMessage, data?: Blob } = { message: messageEntry.message }; diff --git a/packages/agent/tests/dwn-api.spec.ts b/packages/agent/tests/dwn-api.spec.ts index 7362b2a68..942211067 100644 --- a/packages/agent/tests/dwn-api.spec.ts +++ b/packages/agent/tests/dwn-api.spec.ts @@ -848,6 +848,37 @@ describe('AgentDwnApi', () => { await testHarness.closeStorage(); }); + it('handles sending existing message using `messageCid` request property', async () => { + // Create test data to write. + const dataBytes = Convert.string('Hello, world!').toUint8Array(); + + // Write a record to the local DWN to use for the test. + let writeResponse = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : 'https://schemas.xyz/example' + }, + dataStream: new Blob([dataBytes]) + }); + expect(writeResponse.reply.status.code).to.equal(202); + + // sendRequest using the message's `messageCid` + const sendResponse = await testHarness.agent.dwn.sendRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageCid : writeResponse.messageCid + }); + + // Verify the response. + expect(sendResponse.message).to.deep.equal(writeResponse.message); + expect(sendResponse.messageCid).to.equal(writeResponse.messageCid); + expect(sendResponse.reply.status.code).to.equal(202); + }); + it('handles MessagesQuery', async () => { const testCursor = { messageCid : 'foo', @@ -1029,9 +1060,6 @@ describe('AgentDwnApi', () => { expect(messagesReadReply.status.code).to.equal(200); const retrievedRecordsWrite = messagesReadReply.entry!; expect(retrievedRecordsWrite.message).to.have.property('recordId', writeMessage.recordId); - - const readDataBytes = await NodeStream.consumeToBytes({ readable: retrievedRecordsWrite.data! }); - expect(readDataBytes).to.deep.equal(dataBytes); }); it('handles ProtocolsConfigure', async () => { @@ -1232,9 +1260,6 @@ describe('AgentDwnApi', () => { expect(readReply.record).to.have.property('data'); expect(readReply.record).to.have.property('descriptor'); expect(readReply.record).to.have.property('recordId', writeMessage.recordId); - - const readDataBytes = await NodeStream.consumeToBytes({ readable: readReply.record!.data }); - expect(readDataBytes).to.deep.equal(dataBytes); }); it('handles RecordsSubscribe message', async () => { From 03aaca13107bd27d124fd23e32dff514353947a3 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 2 Jul 2024 15:05:19 -0400 Subject: [PATCH 4/9] add tests for data with RecordsRead and MessagesRead, modify stream conversion to better account for edge cases --- packages/agent/src/dwn-api.ts | 3 ++- .../clients/http-dwn-rpc-client.ts | 4 ++-- packages/agent/src/sync-engine-level.ts | 19 +++++++++------ packages/agent/tests/dwn-api.spec.ts | 24 ++++++++++++++++++- 4 files changed, 39 insertions(+), 11 deletions(-) diff --git a/packages/agent/src/dwn-api.ts b/packages/agent/src/dwn-api.ts index 4186fd160..463a540b6 100644 --- a/packages/agent/src/dwn-api.ts +++ b/packages/agent/src/dwn-api.ts @@ -379,7 +379,8 @@ export class AgentDwnApi { // If the message is a RecordsWrite, data will be present in the form of a stream if (isRecordsWrite(messageEntry) && messageEntry.data) { - dwnMessageWithBlob.data = new Blob([await NodeStream.consumeToBytes({ readable: messageEntry.data })], { type: messageEntry.message.descriptor.dataFormat }); + const dataBytes = await NodeStream.consumeToBytes({ readable: messageEntry.data }); + dwnMessageWithBlob.data = new Blob([ dataBytes ], { type: messageEntry.message.descriptor.dataFormat }); } return dwnMessageWithBlob; diff --git a/packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts b/packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts index a865e2d57..29a39c265 100644 --- a/packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts +++ b/packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts @@ -67,9 +67,9 @@ export class HttpDwnRpcClient implements DwnRpc { const { reply } = dwnRpcResponse.result; if (dataStream && reply.record) { - reply['record']['data'] = dataStream; + reply.record.data = dataStream; } else if (dataStream && reply.entry) { - reply['entry']['data'] = dataStream; + reply.entry.data = dataStream; } return reply as DwnRpcResponse; diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index f2404fd21..5b214c4b3 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -16,7 +16,7 @@ import type { SyncEngine } from './types/sync.js'; import type { Web5PlatformAgent } from './types/agent.js'; import { DwnInterface } from './types/dwn.js'; -import { getDwnServiceEndpointUrls, isRecordsWrite, webReadableToIsomorphicNodeReadable } from './utils.js'; +import { getDwnServiceEndpointUrls, isRecordsWrite } from './utils.js'; export type SyncEngineLevelParams = { agent?: Web5PlatformAgent; @@ -125,7 +125,7 @@ export class SyncEngineLevel implements SyncEngine { continue; } - if (!reply.entry?.message) { + if (reply.status.code !== 200 || !reply.entry?.message) { await this.addMessage(did, messageCid); deleteOperations.push({ type: 'del', key: key }); continue; @@ -135,10 +135,13 @@ export class SyncEngineLevel implements SyncEngine { if (isRecordsWrite(replyEntry) && replyEntry.data) { const message = replyEntry.message; - let dataStream; - if (replyEntry.data instanceof ReadableStream) { - dataStream = webReadableToIsomorphicNodeReadable(replyEntry.data); - } + + // if the message includes data we convert it to a Node readable stream + // otherwise we set it as undefined, as the message does not include data + // this occurs when the message is a RecordsWrite message that has been updated + const dataStream = replyEntry.data ? + NodeStream.fromWebReadable({ readableStream: replyEntry.data as unknown as ReadableStream }) + : undefined; const pullReply = await this.agent.dwn.processMessage({ targetDid: did, @@ -155,6 +158,7 @@ export class SyncEngineLevel implements SyncEngine { await pullQueue.batch(deleteOperations as any); } + public async push(): Promise { const syncPeerState = await this.getSyncPeerState({ syncDirection: 'push' }); await this.enqueueOperations({ syncDirection: 'push', syncPeerState }); @@ -368,7 +372,8 @@ export class SyncEngineLevel implements SyncEngine { // If the message is a RecordsWrite, either data will be present, // OR we have to fetch it using a RecordsRead. if (isRecordsWrite(messageEntry) && messageEntry.data) { - dwnMessageWithBlob.data = new Blob([await NodeStream.consumeToBytes({ readable: messageEntry.data })], { type: messageEntry.message.descriptor.dataFormat }); + const dataBytes = await NodeStream.consumeToBytes({ readable: messageEntry.data }); + dwnMessageWithBlob.data = new Blob([ dataBytes ], { type: messageEntry.message.descriptor.dataFormat }); } return dwnMessageWithBlob; diff --git a/packages/agent/tests/dwn-api.spec.ts b/packages/agent/tests/dwn-api.spec.ts index 942211067..1b3d52690 100644 --- a/packages/agent/tests/dwn-api.spec.ts +++ b/packages/agent/tests/dwn-api.spec.ts @@ -1,10 +1,11 @@ +import type { Readable } from '@web5/common'; import { Message, ProtocolDefinition, TestDataGenerator, type Dwn, type MessageEvent, type RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import sinon from 'sinon'; import { expect } from 'chai'; import { DidDht } from '@web5/dids'; -import { Convert, NodeStream } from '@web5/common'; +import { Convert, NodeStream, Stream } from '@web5/common'; import type { PortableIdentity } from '../src/types/identity.js'; @@ -282,6 +283,9 @@ describe('AgentDwnApi', () => { const retrievedRecordsWrite = messagesReadReply.entry!; expect(retrievedRecordsWrite.message).to.have.property('recordId', writeMessage.recordId); + + const readDataBytes = await NodeStream.consumeToBytes({ readable: retrievedRecordsWrite.data! }); + expect(readDataBytes).to.deep.equal(dataBytes); }); it('handles ProtocolsConfigure', async () => { @@ -1060,6 +1064,15 @@ describe('AgentDwnApi', () => { expect(messagesReadReply.status.code).to.equal(200); const retrievedRecordsWrite = messagesReadReply.entry!; expect(retrievedRecordsWrite.message).to.have.property('recordId', writeMessage.recordId); + + const dataStream: ReadableStream | Readable = retrievedRecordsWrite.data!; + // If the data stream is a web ReadableStream, convert it to a Node.js Readable. + const nodeReadable = Stream.isReadableStream(dataStream) ? + NodeStream.fromWebReadable({ readableStream: dataStream }) : + dataStream; + + const readDataBytes = await NodeStream.consumeToBytes({ readable: nodeReadable }); + expect(readDataBytes).to.deep.equal(dataBytes); }); it('handles ProtocolsConfigure', async () => { @@ -1260,6 +1273,15 @@ describe('AgentDwnApi', () => { expect(readReply.record).to.have.property('data'); expect(readReply.record).to.have.property('descriptor'); expect(readReply.record).to.have.property('recordId', writeMessage.recordId); + + const dataStream: ReadableStream | Readable = readReply.record!.data; + // If the data stream is a web ReadableStream, convert it to a Node.js Readable. + const nodeReadable = Stream.isReadableStream(dataStream) ? + NodeStream.fromWebReadable({ readableStream: dataStream }) : + dataStream; + + const readDataBytes = await NodeStream.consumeToBytes({ readable: nodeReadable }); + expect(readDataBytes).to.deep.equal(dataBytes); }); it('handles RecordsSubscribe message', async () => { From 14f132adf2bfeaca7ce2bbf3bdce4c1e0ed3ba88 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 2 Jul 2024 15:32:12 -0400 Subject: [PATCH 5/9] test sync for updated records --- packages/agent/src/sync-engine-level.ts | 2 +- .../agent/tests/sync-engine-level.spec.ts | 40 ++++++++++++++++++- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index 5b214c4b3..4f6c5eff3 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -133,7 +133,7 @@ export class SyncEngineLevel implements SyncEngine { const replyEntry = reply.entry; - if (isRecordsWrite(replyEntry) && replyEntry.data) { + if (isRecordsWrite(replyEntry)) { const message = replyEntry.message; // if the message includes data we convert it to a Node readable stream diff --git a/packages/agent/tests/sync-engine-level.spec.ts b/packages/agent/tests/sync-engine-level.spec.ts index 598352841..0e18d01d6 100644 --- a/packages/agent/tests/sync-engine-level.spec.ts +++ b/packages/agent/tests/sync-engine-level.spec.ts @@ -92,6 +92,24 @@ describe('SyncEngineLevel', () => { }, dataStream: new Blob([`Hello, ${i}`]) }); + expect(writeResponse.reply.status.code).to.equal(202); + + // write an update message for one of the records + if (i === 0) { + const updateResponse = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + recordId : writeResponse.message!.recordId, + dataFormat : 'text/plain', + schema : writeResponse.message!.descriptor.schema, + dateCreated : writeResponse.message!.descriptor.dateCreated + }, + dataStream: new Blob([`Hello, ${i} updated!`]), + }); + expect(updateResponse.reply.status.code).to.equal(202); + } localRecords.push((writeResponse.message!).recordId); } @@ -109,6 +127,24 @@ describe('SyncEngineLevel', () => { }, dataStream: new Blob([`Hello, ${i}`]) }); + expect(writeResponse.reply.status.code).to.equal(202); + + // write an update message for one of the records + if (i === 0) { + const updateResponse = await testHarness.agent.dwn.sendRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + recordId : writeResponse.message!.recordId, + dataFormat : 'text/plain', + schema : writeResponse.message!.descriptor.schema, + dateCreated : writeResponse.message!.descriptor.dateCreated + }, + dataStream: new Blob([`Hello, ${i} updated!`]), + }); + expect(updateResponse.reply.status.code).to.equal(202); + } remoteRecords.push((writeResponse.message!).recordId); } @@ -171,7 +207,7 @@ describe('SyncEngineLevel', () => { }); localDwnQueryReply = localQueryResponse.reply; expect(localDwnQueryReply.status.code).to.equal(200); - expect(localDwnQueryReply.entries).to.have.length(6); + expect(localDwnQueryReply.entries).to.have.length(6, 'local'); localRecordsFromQuery = localDwnQueryReply.entries?.map(entry => entry.recordId); expect(localRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]); @@ -189,7 +225,7 @@ describe('SyncEngineLevel', () => { }); remoteDwnQueryReply = remoteQueryResponse.reply; expect(remoteDwnQueryReply.status.code).to.equal(200); - expect(remoteDwnQueryReply.entries).to.have.length(6); + expect(remoteDwnQueryReply.entries).to.have.length(6, 'remote'); remoteRecordsFromQuery = remoteDwnQueryReply.entries?.map(entry => entry.recordId); expect(remoteRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]); }).slow(1000); // Yellow at 500ms, Red at 1000ms. From 1d9bad32e20451de4a656df23224b7def6a7a19e Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 3 Jul 2024 09:26:18 -0400 Subject: [PATCH 6/9] increase coverage for test cases where a messageCid either already exists in the target or does not exist at all --- packages/agent/src/dwn-api.ts | 7 +- packages/agent/tests/dwn-api.spec.ts | 17 + .../agent/tests/sync-engine-level.spec.ts | 437 +++++++++++++++++- 3 files changed, 452 insertions(+), 9 deletions(-) diff --git a/packages/agent/src/dwn-api.ts b/packages/agent/src/dwn-api.ts index 463a540b6..a209bb068 100644 --- a/packages/agent/src/dwn-api.ts +++ b/packages/agent/src/dwn-api.ts @@ -369,11 +369,8 @@ export class AgentDwnApi { throw new Error(`AgentDwnApi: Failed to read message, response status: ${result.status.code} - ${result.status.detail}`); } - const messageEntry = result.entry; - const message = messageEntry?.message as DwnMessage[T]; - if (!message) { - throw new Error(`AgentDwnApi: Message not found with CID: ${messageCid}`); - } + const messageEntry = result.entry!; + const message = messageEntry.message as DwnMessage[T]; let dwnMessageWithBlob: DwnMessageWithBlob = { message }; // If the message is a RecordsWrite, data will be present in the form of a stream diff --git a/packages/agent/tests/dwn-api.spec.ts b/packages/agent/tests/dwn-api.spec.ts index 1b3d52690..a7db50877 100644 --- a/packages/agent/tests/dwn-api.spec.ts +++ b/packages/agent/tests/dwn-api.spec.ts @@ -883,6 +883,23 @@ describe('AgentDwnApi', () => { expect(sendResponse.reply.status.code).to.equal(202); }); + it('should fail when sending a message with a `messageCid` that does not exist', async () => { + // Attempt to send a message with an invalid `messageCid`. + try { + const messageCid = await TestDataGenerator.randomCborSha256Cid(); + + await testHarness.agent.dwn.sendRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageCid, + }); + expect.fail('Expected an error to be thrown'); + } catch (error:any) { + expect(error.message).to.contain('AgentDwnApi: Failed to read message'); + } + }); + it('handles MessagesQuery', async () => { const testCursor = { messageCid : 'foo', diff --git a/packages/agent/tests/sync-engine-level.spec.ts b/packages/agent/tests/sync-engine-level.spec.ts index 0e18d01d6..edcc1ad71 100644 --- a/packages/agent/tests/sync-engine-level.spec.ts +++ b/packages/agent/tests/sync-engine-level.spec.ts @@ -67,10 +67,14 @@ describe('SyncEngineLevel', () => { beforeEach(async () => { randomSchema = cryptoUtils.randomUuid(); - }); - afterEach(async () => { + sinon.restore(); + await syncEngine.clear(); await testHarness.syncStore.clear(); + await testHarness.dwnDataStore.clear(); + await testHarness.dwnEventLog.clear(); + await testHarness.dwnMessageStore.clear(); + await testHarness.dwnResumableTaskStore.clear(); }); after(async () => { @@ -136,8 +140,8 @@ describe('SyncEngineLevel', () => { target : alice.did.uri, messageType : DwnInterface.RecordsWrite, messageParams : { - recordId : writeResponse.message!.recordId, - dataFormat : 'text/plain', + recordId : writeResponse.message!.recordId, + dataFormat : 'text/plain', schema : writeResponse.message!.descriptor.schema, dateCreated : writeResponse.message!.descriptor.dateCreated }, @@ -231,6 +235,218 @@ describe('SyncEngineLevel', () => { }).slow(1000); // Yellow at 500ms, Red at 1000ms. describe('pull()', () => { + it('silently ignores sendDwnRequest for a messageCid that does not exist on a remote DWN', async () => { + // scenario: The messageCids returned from the remote eventLog contains a Cid that is not found in the remote DWN + // this could happen when a record is updated, only the initial write and the most recent state are kept. + // if this happens during a sync, the messageCid will not be found in the remote DWN and the sync should continue + // + // We artificially return an invalid messageCid between 2 valid messageCid and ensure that the sync continues + + // create a record that will not be stored or sent to the remote DWN + const invalidRecord = await testHarness.agent.processDwnRequest({ + store : false, + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, invalid!']) + }); + + // create 2 records for the remote DWN to sync + const record1 = await testHarness.agent.sendDwnRequest({ + store : false, + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, 1']) + }); + expect(record1.reply.status.code).to.equal(202); + + const record2 = await testHarness.agent.sendDwnRequest({ + store : false, + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, 2']) + }); + expect(record2.reply.status.code).to.equal(202); + + // confirm that no records exist locally + let localQueryResponse = await testHarness.agent.processDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.MessagesQuery, + messageParams : { + filters: [] // get all messages + } + }); + let localDwnQueryEntries = localQueryResponse.reply.entries!; + expect(localDwnQueryEntries.length).to.equal(0); + + // spy on sendDwnRequest to the remote DWN + const sendDwnRequestSpy = sinon.spy(testHarness.agent.rpc, 'sendDwnRequest'); + + sinon.stub(syncEngine as any, 'getDwnEventLog').resolves([ + record1.messageCid, + invalidRecord.messageCid, // this record will fail to be retrieved + record2.messageCid + ]); + + // Register Alice's DID to be synchronized. + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri + }); + + // Execute Sync to pull all records from Alice's remote DWNs + await syncEngine.pull(); + + // Verify sendDwnRequest was called once for each record, including the invalid record + // + // NOTE: because we stubbed `getDwnEventLog` to return the messageCids of the records, + // we expect the sendDwnRequest from within the `getDwnEventLog` function to not be called + // if it were not stubbed, the could would have been called an additional time + expect(sendDwnRequestSpy.callCount).to.equal(3); + + // confirm that the two valid records exist locally + localQueryResponse = await testHarness.agent.processDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.MessagesQuery, + messageParams : { + filters: [] // get all messages + } + }); + localDwnQueryEntries = localQueryResponse.reply.entries!; + expect(localDwnQueryEntries.length).to.equal(2); + expect(localDwnQueryEntries).to.have.members([ + record1.messageCid, + record2.messageCid + ]); + }); + + it('silently ignores a messageCid that already exists on the local DWN', async () => { + // scenario: The messageCids returned from the remote eventLog contains a messageCid that already exists on the local DWN. + // During sync, when processing the messageCid the local DWN will return a conflict response, but the sync should continue + + // create a record and store it locally and remotely + const remoteAndLocalRecord = await testHarness.agent.processDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, remote!']) + }); + + // send record to remote + await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageCid : remoteAndLocalRecord.messageCid, + }); + + // create 2 records stored only remotely to later sync to the local DWN + const record1 = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, 1']) + }); + expect(record1.reply.status.code).to.equal(202); + + const record2 = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, 2']) + }); + expect(record2.reply.status.code).to.equal(202); + + // confirm that only the single record exists locally + let localQueryResponse = await testHarness.agent.processDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.MessagesQuery, + messageParams : { + filters: [], // get all messages + } + }); + + let localDwnQueryEntries = localQueryResponse.reply.entries!; + expect(localDwnQueryEntries.length).to.equal(1); + expect(localDwnQueryEntries).to.have.members([remoteAndLocalRecord.messageCid]); + + // stub getDwnEventLog to return the messageCids of the records we want to sync + sinon.stub(syncEngine as any, 'getDwnEventLog').resolves([ + remoteAndLocalRecord.messageCid, + record1.messageCid, + record2.messageCid + ]); + + // Register Alice's DID to be synchronized. + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri + }); + + // spy on sendDwnRequest to the remote DWN + const sendDwnRequestSpy = sinon.spy(testHarness.agent.rpc, 'sendDwnRequest'); + const processMessageSpy = sinon.spy(testHarness.agent.dwn, 'processMessage'); + + // Execute Sync to push records to Alice's remote node + await syncEngine.pull(); + + // Verify sendDwnRequest is called for all 3 records + expect(sendDwnRequestSpy.callCount).to.equal(3, 'sendDwnRequestSpy'); + // Verify that processMessage is called for all 3 records + expect(processMessageSpy.callCount).to.equal(3, 'processMessageSpy'); + + // Verify that the conflict response is returned for the record that already exists locally + expect((await processMessageSpy.firstCall.returnValue).status.code).to.equal(409); + + // Verify that the other 2 records are successfully processed + expect((await processMessageSpy.secondCall.returnValue).status.code).to.equal(202); + expect((await processMessageSpy.thirdCall.returnValue).status.code).to.equal(202); + + // confirm the new records exist remotely + localQueryResponse = await testHarness.agent.processDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.MessagesQuery, + messageParams : { + filters: [], // get all messages + }, + }); + localDwnQueryEntries = localQueryResponse.reply.entries!; + expect(localDwnQueryEntries.length).to.equal(3); + expect(localDwnQueryEntries).to.have.members([ + remoteAndLocalRecord.messageCid, + record1.messageCid, + record2.messageCid + ]); + }); + it('takes no action if no identities are registered', async () => { const didResolveSpy = sinon.spy(testHarness.agent.did, 'resolve'); const sendDwnRequestSpy = sinon.spy(testHarness.agent.rpc, 'sendDwnRequest'); @@ -470,6 +686,219 @@ describe('SyncEngineLevel', () => { }); describe('push()', () => { + it('silently ignores a messageCid from the eventLog that does not exist on the local DWN', async () => { + // It's important to create a new DID here to avoid conflicts with the previous test on the remote DWN, + // since we are not clearing the remote DWN's storage before each test. + const name = cryptoUtils.randomUuid(); + const alice = await testHarness.createIdentity({ name, testDwnUrls }); + + // scenario: The messageCids returned from the local eventLog contains a Cid that is not found when attempting to push it to the remote DWN + // this could happen when a record is updated, only the initial write and the most recent state are kept. + // if this happens during a sync, the messageCid will not be found in the DWN and the sync should continue + // + // We artificially return an invalid messageCid between 2 valid messageCid and ensure that the sync continues + + // create a record that will not be stored or sent to the remote DWN + const invalidRecord = await testHarness.agent.processDwnRequest({ + store : false, + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, invalid!']) + }); + + // create 2 records for the local DWN to sync to the remote DWN + const record1 = await testHarness.agent.processDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, 1']) + }); + expect(record1.reply.status.code).to.equal(202); + + const record2 = await testHarness.agent.processDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, 2']) + }); + expect(record2.reply.status.code).to.equal(202); + + // confirm that no records exist remotely + let remoteQueryResponse = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.MessagesQuery, + messageParams : { + filters: [] // get all messages + } + }); + let remoteDwnQueryEntries = remoteQueryResponse.reply.entries!; + expect(remoteDwnQueryEntries.length).to.equal(0); + + // spy on getDwnMessage that retrieves the message from the local DWN + const getDwnMessageSpy = sinon.spy(syncEngine as any, 'getDwnMessage'); + + // spy on sendDwnRequest to the remote DWN + const sendDwnRequestSpy = sinon.spy(testHarness.agent.rpc, 'sendDwnRequest'); + + // stub getDwnEventLog to return the messageCids of the records as well as the invalid one + sinon.stub(syncEngine as any, 'getDwnEventLog').resolves([ + record1.messageCid, + invalidRecord.messageCid, // this record will fail to be retrieved + record2.messageCid + ]); + + // Register Alice's DID to be synchronized. + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri + }); + + // Execute Sync to pull all records from Alice's remote DWNs + await syncEngine.push(); + + // verify that sendDwnRequest was called once only for each valid record + // and getDwnMessage was called for each record, including the invalid record + expect(sendDwnRequestSpy.callCount).to.equal(2); + expect(getDwnMessageSpy.callCount).to.equal(3); + + // confirm that the two valid records exist remotely + remoteQueryResponse = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.MessagesQuery, + messageParams : { + filters: [] // get all messages + } + }); + remoteDwnQueryEntries = remoteQueryResponse.reply.entries!; + expect(remoteDwnQueryEntries.length).to.equal(2); + expect(remoteDwnQueryEntries).to.have.members([ + record1.messageCid, + record2.messageCid + ]); + }); + + it('silently ignores a messageCid that already exists on the remote DWN', async () => { + // It's important to create a new DID here to avoid conflicts with the previous test on the remote DWN, + // since we are not clearing the remote DWN's storage before each test. + const name = cryptoUtils.randomUuid(); + const alice = await testHarness.createIdentity({ name, testDwnUrls }); + + // Register Alice's DID to be synchronized. + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri + }); + + // scenario: The messageCids returned from the local eventLog contains a Cid that already exists in the remote DWN. + // During sync, the remote DWN will return a conflict 409 status code and the sync should continue + + // create a record, store it and send it to the remote Dwn + const remoteAndLocalRecord = await testHarness.agent.processDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, remote!']) + }); + + // send record to remote + await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageCid : remoteAndLocalRecord.messageCid, + }); + + // create 2 records stored only locally to sync to the remote DWN + const record1 = await testHarness.agent.processDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, 1']) + }); + expect(record1.reply.status.code).to.equal(202); + + const record2 = await testHarness.agent.processDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : randomSchema + }, + dataStream: new Blob(['Hello, 2']) + }); + expect(record2.reply.status.code).to.equal(202); + + // confirm that only the single record exists remotely + let remoteQueryResponse = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.MessagesQuery, + messageParams : { + filters: [], // get all messages + } + }); + + let remoteDwnQueryEntries = remoteQueryResponse.reply.entries!; + expect(remoteDwnQueryEntries.length).to.equal(1); + expect(remoteDwnQueryEntries).to.have.members([remoteAndLocalRecord.messageCid]); + + // stub getDwnEventLog to return the messageCids of the records we want to sync + // we stub this to avoid syncing the registered identity related messages + sinon.stub(syncEngine as any, 'getDwnEventLog').resolves([ + remoteAndLocalRecord.messageCid, + record1.messageCid, + record2.messageCid + ]); + + // spy on sendDwnRequest to the remote DWN + const sendDwnRequestSpy = sinon.spy(testHarness.agent.rpc, 'sendDwnRequest'); + + // Execute Sync to push records to Alice's remote node + await syncEngine.push(); + + // Verify sendDwnRequest was called once for each record including the one that already exists remotely + expect(sendDwnRequestSpy.callCount).to.equal(3); + + // confirm the new records exist remotely + remoteQueryResponse = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.MessagesQuery, + messageParams : { + filters: [], // get all messages + }, + }); + remoteDwnQueryEntries = remoteQueryResponse.reply.entries!; + expect(remoteDwnQueryEntries.length).to.equal(3); + expect(remoteDwnQueryEntries).to.have.members([ + remoteAndLocalRecord.messageCid, + record1.messageCid, + record2.messageCid + ]); + }); + it('takes no action if no identities are registered', async () => { const didResolveSpy = sinon.spy(testHarness.agent.did, 'resolve'); const processRequestSpy = sinon.spy(testHarness.agent.dwn, 'processRequest'); From de54b7204a0a6852de3c7f741d21e1f293bf0ef4 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 3 Jul 2024 09:28:11 -0400 Subject: [PATCH 7/9] add changeset --- .changeset/lemon-islands-provide.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/lemon-islands-provide.md diff --git a/.changeset/lemon-islands-provide.md b/.changeset/lemon-islands-provide.md new file mode 100644 index 000000000..7d6760451 --- /dev/null +++ b/.changeset/lemon-islands-provide.md @@ -0,0 +1,5 @@ +--- +"@web5/agent": minor +--- + +Migrate `Events` interface to `Messages` interface for sync From f91f427284d994477f00a11cb281b986d1d155be Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 3 Jul 2024 10:46:25 -0400 Subject: [PATCH 8/9] modify changeset --- .changeset/lemon-islands-provide.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.changeset/lemon-islands-provide.md b/.changeset/lemon-islands-provide.md index 7d6760451..78ee7fd42 100644 --- a/.changeset/lemon-islands-provide.md +++ b/.changeset/lemon-islands-provide.md @@ -1,5 +1,8 @@ --- "@web5/agent": minor +"@web5/user-agent": minor +"@web5/proxy-agent": minor +"@web5/identity-agent": minor --- Migrate `Events` interface to `Messages` interface for sync From 396ed91021cc17a8beeb0d814e9c07469cd2e13b Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 3 Jul 2024 10:47:45 -0400 Subject: [PATCH 9/9] add api changeset --- .changeset/grumpy-carpets-buy.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/grumpy-carpets-buy.md diff --git a/.changeset/grumpy-carpets-buy.md b/.changeset/grumpy-carpets-buy.md new file mode 100644 index 000000000..257e63330 --- /dev/null +++ b/.changeset/grumpy-carpets-buy.md @@ -0,0 +1,5 @@ +--- +"@web5/api": minor +--- + +Update Agent to latest version along with dwn-sdk-js to v 0.4.0