diff --git a/yarn-project/p2p/src/service/reqresp/interface.ts b/yarn-project/p2p/src/service/reqresp/interface.ts index 3254ef89dcb0..fa66707e8be9 100644 --- a/yarn-project/p2p/src/service/reqresp/interface.ts +++ b/yarn-project/p2p/src/service/reqresp/interface.ts @@ -5,5 +5,9 @@ export enum ReqRespType { TxsByHash = 'txs_by_hash', } -export const PING_PROTOCOL: string = '/aztec/ping/0.1.0'; -export const STATUS_PROTOCOL: string = '/aztec/status/0.1.0'; +export const PING_PROTOCOL = '/aztec/ping/0.1.0'; +export const STATUS_PROTOCOL = '/aztec/status/0.1.0'; + +export type SubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL; + +export type SubProtocolHandler = (msg: string) => Uint8Array; diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.ts b/yarn-project/p2p/src/service/reqresp/reqresp.ts index 96226e425256..045157b99597 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.ts @@ -7,12 +7,12 @@ import { type Libp2p } from 'libp2p'; import { type Uint8ArrayList } from 'uint8arraylist'; import { pingHandler, statusHandler } from './handlers.js'; -import { PING_PROTOCOL, STATUS_PROTOCOL } from './interface.js'; +import { PING_PROTOCOL, STATUS_PROTOCOL, SubProtocol, SubProtocolHandler } from './interface.js'; /** * A mapping from a protocol to a handler function */ -const REQ_RESP_PROTOCOLS = { +const SUB_PROTOCOL_HANDLERS: Record = { [PING_PROTOCOL]: pingHandler, [STATUS_PROTOCOL]: statusHandler, }; @@ -31,8 +31,8 @@ export class ReqResp { */ async start() { // Register all protocol handlers - for (const protocol of Object.keys(REQ_RESP_PROTOCOLS)) { - await this.libp2p.handle(protocol, this.streamHandler); + for (const subProtocol of Object.keys(SUB_PROTOCOL_HANDLERS)) { + await this.libp2p.handle(subProtocol, this.streamHandler.bind(this, subProtocol as SubProtocol)); } } @@ -41,7 +41,7 @@ export class ReqResp { */ async stop() { // Unregister all handlers - for (const protocol of Object.keys(REQ_RESP_PROTOCOLS)) { + for (const protocol of Object.keys(SUB_PROTOCOL_HANDLERS)) { await this.libp2p.unhandle(protocol); } await this.libp2p.stop(); @@ -51,17 +51,17 @@ export class ReqResp { /** * Send a request to peers, returns the first response * - * @param protocol - The protocol being requested + * @param subProtocol - The protocol being requested * @param payload - The payload to send * @returns - The response from the peer, otherwise undefined */ - async sendRequest(protocol: string, payload: Buffer): Promise { + async sendRequest(subProtocol: SubProtocol, payload: Buffer): Promise { // Get active peers const peers = this.libp2p.getPeers(); // Attempt to ask all of our peers for (const peer of peers) { - const response = await this.sendRequestToPeer(peer, protocol, payload); + const response = await this.sendRequestToPeer(peer, subProtocol, payload); // If we get a response, return it, otherwise we iterate onto the next peer if (response) { @@ -75,20 +75,15 @@ export class ReqResp { * Sends a request to a specific peer * * @param peerId - The peer to send the request to - * @param protocol - The protocol to use to request + * @param subProtocol - The protocol to use to request * @param payload - The payload to send * @returns If the request is successful, the response is returned, otherwise undefined */ - async sendRequestToPeer(peerId: PeerId, protocol: string, payload: Buffer): Promise { + async sendRequestToPeer(peerId: PeerId, subProtocol: SubProtocol, payload: Buffer): Promise { try { - const stream = await this.libp2p.dialProtocol(peerId, protocol); + const stream = await this.libp2p.dialProtocol(peerId, subProtocol); - const result = await pipe( - // Send message in two chunks - protocol && payload - [Buffer.from(protocol), Buffer.from(payload)], - stream, - this.readMessage, - ); + const result = await pipe([payload], stream, this.readMessage); return result; } catch (e) { this.logger.warn(`Failed to send request to peer ${peerId.publicKey}`); @@ -114,22 +109,14 @@ export class ReqResp { * * @param param0 - The incoming stream data */ - private async streamHandler({ stream }: IncomingStreamData) { + private async streamHandler(protocol: SubProtocol, { stream }: IncomingStreamData) { try { await pipe( stream, - async function* (source: any) { - let protocol: string | undefined = undefined; - for await (const chunk of source) { - // The first message should contain the protocol, subsequent messages should contain the payload - - const msg = Buffer.from(chunk.subarray()).toString(); - if (!protocol) { - protocol = msg.toString(); - } else { - const handler: any = REQ_RESP_PROTOCOLS[protocol]; - yield handler(msg); - } + async function* (source) { + for await (const chunkList of source) { + const msg = Buffer.from(chunkList.subarray()).toString(); + yield SUB_PROTOCOL_HANDLERS[protocol](msg); } }, stream,