Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions yarn-project/p2p/src/service/reqresp/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,9 @@ export enum ReqRespType {
TxsByHash = 'txs_by_hash',
}

export const PING_PROTOCOL: string = '/aztec/ping/0.1.0';
export const STATUS_PROTOCOL: string = '/aztec/status/0.1.0';
export const PING_PROTOCOL = '/aztec/ping/0.1.0';
export const STATUS_PROTOCOL = '/aztec/status/0.1.0';

export type SubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL;

export type SubProtocolHandler = (msg: string) => Uint8Array;
47 changes: 17 additions & 30 deletions yarn-project/p2p/src/service/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import { type Libp2p } from 'libp2p';
import { type Uint8ArrayList } from 'uint8arraylist';

import { pingHandler, statusHandler } from './handlers.js';
import { PING_PROTOCOL, STATUS_PROTOCOL } from './interface.js';
import { PING_PROTOCOL, STATUS_PROTOCOL, SubProtocol, SubProtocolHandler } from './interface.js';

/**
* A mapping from a protocol to a handler function
*/
const REQ_RESP_PROTOCOLS = {
const SUB_PROTOCOL_HANDLERS: Record<SubProtocol, SubProtocolHandler> = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
};
Expand All @@ -31,8 +31,8 @@ export class ReqResp {
*/
async start() {
// Register all protocol handlers
for (const protocol of Object.keys(REQ_RESP_PROTOCOLS)) {
await this.libp2p.handle(protocol, this.streamHandler);
for (const subProtocol of Object.keys(SUB_PROTOCOL_HANDLERS)) {
await this.libp2p.handle(subProtocol, this.streamHandler.bind(this, subProtocol as SubProtocol));
}
}

Expand All @@ -41,7 +41,7 @@ export class ReqResp {
*/
async stop() {
// Unregister all handlers
for (const protocol of Object.keys(REQ_RESP_PROTOCOLS)) {
for (const protocol of Object.keys(SUB_PROTOCOL_HANDLERS)) {
await this.libp2p.unhandle(protocol);
}
await this.libp2p.stop();
Expand All @@ -51,17 +51,17 @@ export class ReqResp {
/**
* Send a request to peers, returns the first response
*
* @param protocol - The protocol being requested
* @param subProtocol - The protocol being requested
* @param payload - The payload to send
* @returns - The response from the peer, otherwise undefined
*/
async sendRequest(protocol: string, payload: Buffer): Promise<Buffer | undefined> {
async sendRequest(subProtocol: SubProtocol, payload: Buffer): Promise<Buffer | undefined> {
// Get active peers
const peers = this.libp2p.getPeers();

// Attempt to ask all of our peers
for (const peer of peers) {
const response = await this.sendRequestToPeer(peer, protocol, payload);
const response = await this.sendRequestToPeer(peer, subProtocol, payload);

// If we get a response, return it, otherwise we iterate onto the next peer
if (response) {
Expand All @@ -75,20 +75,15 @@ export class ReqResp {
* Sends a request to a specific peer
*
* @param peerId - The peer to send the request to
* @param protocol - The protocol to use to request
* @param subProtocol - The protocol to use to request
* @param payload - The payload to send
* @returns If the request is successful, the response is returned, otherwise undefined
*/
async sendRequestToPeer(peerId: PeerId, protocol: string, payload: Buffer): Promise<Buffer | undefined> {
async sendRequestToPeer(peerId: PeerId, subProtocol: SubProtocol, payload: Buffer): Promise<Buffer | undefined> {
try {
const stream = await this.libp2p.dialProtocol(peerId, protocol);
const stream = await this.libp2p.dialProtocol(peerId, subProtocol);

const result = await pipe(
// Send message in two chunks - protocol && payload
[Buffer.from(protocol), Buffer.from(payload)],
stream,
this.readMessage,
);
const result = await pipe([payload], stream, this.readMessage);
return result;
} catch (e) {
this.logger.warn(`Failed to send request to peer ${peerId.publicKey}`);
Expand All @@ -114,22 +109,14 @@ export class ReqResp {
*
* @param param0 - The incoming stream data
*/
private async streamHandler({ stream }: IncomingStreamData) {
private async streamHandler(protocol: SubProtocol, { stream }: IncomingStreamData) {
try {
await pipe(
stream,
async function* (source: any) {
let protocol: string | undefined = undefined;
for await (const chunk of source) {
// The first message should contain the protocol, subsequent messages should contain the payload

const msg = Buffer.from(chunk.subarray()).toString();
if (!protocol) {
protocol = msg.toString();
} else {
const handler: any = REQ_RESP_PROTOCOLS[protocol];
yield handler(msg);
}
async function* (source) {
for await (const chunkList of source) {
const msg = Buffer.from(chunkList.subarray()).toString();
yield SUB_PROTOCOL_HANDLERS[protocol](msg);
}
},
stream,
Expand Down