From 032de7295a08b64943ec91e187b24b3af6b0a70f Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sat, 18 Jan 2025 21:35:40 +0000 Subject: [PATCH 1/7] feat: batch connection sampler --- .../batch_connection_sampler.test.ts | 150 ++++++++++++++++++ .../batch_connection_sampler.ts | 94 +++++++++++ .../connection-sampler/connection_sampler.ts | 19 +++ 3 files changed, 263 insertions(+) create mode 100644 yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts create mode 100644 yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts new file mode 100644 index 000000000000..8e7ba01d2aae --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts @@ -0,0 +1,150 @@ +import { describe, expect, it, jest } from '@jest/globals'; +import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; +import { Libp2p } from 'libp2p'; + +import { BatchConnectionSampler } from './batch_connection_sampler.js'; +import { ConnectionSampler } from './connection_sampler.js'; +import { RandomSampler } from './connection_sampler.js'; + +describe('BatchConnectionSampler', () => { + const mockRandomSampler = { + random: jest.fn(), + } as jest.Mocked; + + let peers: Awaited>[]; + let libp2p: jest.Mocked; + let connectionSampler: ConnectionSampler; + + beforeEach(async () => { + jest.clearAllMocks(); + + // Create a set of test peers + peers = await Promise.all(new Array(5).fill(0).map(() => createSecp256k1PeerId())); + + // Mock libp2p to return our test peers + libp2p = { + getPeers: jest.fn().mockReturnValue(peers), + } as unknown as jest.Mocked; + + // Create a real connection sampler with mocked random sampling + connectionSampler = new ConnectionSampler(libp2p, 1000, mockRandomSampler); + }); + + afterEach(async () => { + await connectionSampler.stop(); + }); + + it('initializes with correct number of peers and request distribution', async () => { + // Mock random to return sequential indices + mockRandomSampler.random.mockImplementation(_ => 0); + + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 10, /* maxPeers */ 3); + + expect(sampler.activePeerCount).toBe(3); + expect(sampler.requestsPerBucket).toBe(3); // floor(10/3) = 3 + expect(mockRandomSampler.random).toHaveBeenCalledTimes(3); + }); + + it('assigns requests to peers deterministically with wraparound', async () => { + // Mock to return first two peers + let callCount = 0; + mockRandomSampler.random.mockImplementation(() => callCount++ % 2); + + // With 5 requests and 2 peers: + // floor(5/2) = 2 requests per peer + // Peer 0: 0,1,4 (gets extra from wraparound) + // Peer 1: 2,3 + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 5, /* maxPeers */ 2); + const assignments = new Array(5).fill(0).map((_, i) => sampler.getPeerForRequest(i)); + + // First peer gets first bucket and wraparound + expect(assignments[0]).toBe(peers[0]); // First bucket + expect(assignments[1]).toBe(peers[0]); // First bucket + expect(assignments[4]).toBe(peers[0]); // Wraparound + + // Second peer gets middle bucket + expect(assignments[2]).toBe(peers[1]); + expect(assignments[3]).toBe(peers[1]); + }); + + it('handles peer removal and replacement', async () => { + let callCount = 0; + mockRandomSampler.random.mockImplementation(max => { + if (callCount < 2) return callCount++; // Return 0, then 1 for initial peers + return 2; // Return index 2 for replacement peer + }); + + // With 4 requests and 2 peers: + // floor(4/2) = 2 requests per peer + // Initial distribution: + // Peer 0: 0,1 + // Peer 1: 2,3 + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 4, /* maxPeers */ 2); + + const initialPeer = sampler.getPeerForRequest(0); + expect(initialPeer).toBe(peers[0]); + + sampler.removePeerAndReplace(peers[0]); + + // After replacement: + // Replacement peer should handle the same bucket + const newPeer = sampler.getPeerForRequest(0); + expect(newPeer).toBe(peers[2]); + expect(sampler.getPeerForRequest(1)).toBe(peers[2]); + + // Other peer's bucket remains unchanged + expect(sampler.getPeerForRequest(2)).toBe(peers[1]); + expect(sampler.getPeerForRequest(3)).toBe(peers[1]); + }); + + it('distributes requests according to documentation example', async () => { + let callCount = 0; + mockRandomSampler.random.mockImplementation(() => { + if (callCount < 3) return callCount++; + return 0; + }); + + // Example from doc comment: + // Peers: [P1] [P2] [P3] + // Requests: 0,1,2,9 | 3,4,5 | 6,7,8 + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 10, /* maxPeers */ 3); + + expect(sampler.activePeerCount).toBe(3); + expect(sampler.requestsPerBucket).toBe(3); // floor(10/3) = 3 + + // P1's bucket (0-2) plus wraparound (9) + expect(sampler.getPeerForRequest(0)).toBe(peers[0]); + expect(sampler.getPeerForRequest(1)).toBe(peers[0]); + expect(sampler.getPeerForRequest(2)).toBe(peers[0]); + expect(sampler.getPeerForRequest(9)).toBe(peers[0]); // Wraparound + + // P2's bucket (3-5) + expect(sampler.getPeerForRequest(3)).toBe(peers[1]); + expect(sampler.getPeerForRequest(4)).toBe(peers[1]); + expect(sampler.getPeerForRequest(5)).toBe(peers[1]); + + // P3's bucket (6-8) + expect(sampler.getPeerForRequest(6)).toBe(peers[2]); + expect(sampler.getPeerForRequest(7)).toBe(peers[2]); + expect(sampler.getPeerForRequest(8)).toBe(peers[2]); + }); + + it('handles edge cases', async () => { + mockRandomSampler.random.mockImplementation(() => 0); + libp2p.getPeers.mockReturnValue([]); + + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 5, /* maxPeers */ 2); + expect(sampler.activePeerCount).toBe(0); + expect(sampler.getPeerForRequest(0)).toBeUndefined(); + + let i = 0; + mockRandomSampler.random.mockImplementation(() => i++ % 3); + + libp2p.getPeers.mockReturnValue(peers); + const samplerWithMorePeers = new BatchConnectionSampler(connectionSampler, /* batchSize */ 2, /* maxPeers */ 3); + expect(samplerWithMorePeers.requestsPerBucket).toBe(1); // floor(2/3) = 0 + // First two requests go to first two peers + expect(samplerWithMorePeers.getPeerForRequest(0)).toBe(peers[0]); + expect(samplerWithMorePeers.getPeerForRequest(1)).toBe(peers[1]); + }); +}); diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts new file mode 100644 index 000000000000..e1cfcd0aba4f --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts @@ -0,0 +1,94 @@ +import { createLogger } from '@aztec/foundation/log'; + +import { type PeerId } from '@libp2p/interface'; + +import { ConnectionSampler } from './connection_sampler.js'; + +/** + * Manages batches of peers for parallel request processing. + * Tracks active peers and provides deterministic peer assignment for requests. + * + * Example with 3 peers and 10 requests: + * + * Peers: [P1] [P2] [P3] + * ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ + * Requests: 0,1,2,9 | 3,4,5 | 6,7,8 + * + * Each peer handles a bucket of consecutive requests. + * If a peer fails, it is replaced while maintaining the same bucket. + */ +export class BatchConnectionSampler { + private readonly logger = createLogger('p2p:reqresp:batch-connection-sampler'); + private readonly batch: PeerId[] = []; + private readonly requestsPerPeer: number; + + constructor( + private readonly connectionSampler: ConnectionSampler, + private readonly batchSize: number, + private readonly maxPeers: number, + ) { + // Calculate how many requests each peer should handle, cannot be 0 + this.requestsPerPeer = Math.max(1, Math.floor(batchSize / maxPeers)); + + // Sample initial peers + this.batch = this.connectionSampler.samplePeersBatch(maxPeers); + } + + /** + * Gets the peer responsible for handling a specific request index + * + * @param index - The request index + * @returns The peer assigned to handle this request + */ + getPeerForRequest(index: number): PeerId | undefined { + if (this.batch.length === 0) return undefined; + + // Calculate which peer bucket this index belongs to + const peerIndex = Math.floor(index / this.requestsPerPeer) % this.batch.length; + return this.batch[peerIndex]; + } + + /** + * Removes a peer and replaces it with a new one, maintaining the same position + * in the batch array to keep request distribution consistent + * + * @param peerId - The peer to remove and replace + */ + removePeerAndReplace(peerId: PeerId): void { + const index = this.batch.findIndex(p => p === peerId); + if (index !== -1) { + const newPeer = this.addReplacement(); + if (newPeer) { + this.batch[index] = newPeer; + this.logger.trace(`Replaced peer ${peerId} with ${newPeer}`, { peerId, newPeer }); + } else { + // If we couldn't get a replacement, remove the peer and compact the array + this.batch.splice(index, 1); + this.logger.trace(`Removed peer ${peerId}`, { peerId }); + } + } + } + + /** + * Adds a new peer + * + * @returns The new peer if successful, undefined otherwise + */ + private addReplacement(): PeerId | undefined { + return this.connectionSampler.getPeer(); + } + + /** + * Gets the number of active peers + */ + get activePeerCount(): number { + return this.batch.length; + } + + /** + * Gets the number of requests each peer is assigned to handle + */ + get requestsPerBucket(): number { + return this.requestsPerPeer; + } +} diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index a44164eed09f..f386b491cc58 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -72,6 +72,25 @@ export class ConnectionSampler { return peers[randomIndex]; } + /** + * Samples a batch of peers from the libp2p node + * + * @param maxPeers - The maximum number of peers to sample + * @returns The sampled peers + */ + samplePeersBatch(maxPeers: number): PeerId[] { + const peers = []; + for (let i = 0; i < maxPeers; i++) { + const peer = this.getPeer(); + // Can be undefined if we have no peers + if (peer) { + peers.push(peer); + } + } + this.logger.trace(`Batch sampled ${peers.length} peers`, { peers }); + return peers; + } + // Set of passthrough functions to keep track of active connections /** From 91f0f560f3cd8024e8580d1fb77eff3b431fa117 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sun, 19 Jan 2025 01:02:17 +0000 Subject: [PATCH 2/7] feat: batch request response requests across multiple peers --- .../batch_connection_sampler.test.ts | 36 ++-- .../batch_connection_sampler.ts | 48 +++--- .../connection_sampler.test.ts | 91 ++++++++++ .../connection-sampler/connection_sampler.ts | 63 ++++--- .../p2p/src/services/reqresp/reqresp.test.ts | 56 +++++++ .../p2p/src/services/reqresp/reqresp.ts | 157 +++++++++++++++++- 6 files changed, 387 insertions(+), 64 deletions(-) diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts index 8e7ba01d2aae..661042a24d78 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts @@ -1,10 +1,9 @@ import { describe, expect, it, jest } from '@jest/globals'; import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; -import { Libp2p } from 'libp2p'; +import { type Libp2p } from 'libp2p'; import { BatchConnectionSampler } from './batch_connection_sampler.js'; -import { ConnectionSampler } from './connection_sampler.js'; -import { RandomSampler } from './connection_sampler.js'; +import { ConnectionSampler, type RandomSampler } from './connection_sampler.js'; describe('BatchConnectionSampler', () => { const mockRandomSampler = { @@ -34,7 +33,7 @@ describe('BatchConnectionSampler', () => { await connectionSampler.stop(); }); - it('initializes with correct number of peers and request distribution', async () => { + it('initializes with correct number of peers and request distribution', () => { // Mock random to return sequential indices mockRandomSampler.random.mockImplementation(_ => 0); @@ -42,10 +41,9 @@ describe('BatchConnectionSampler', () => { expect(sampler.activePeerCount).toBe(3); expect(sampler.requestsPerBucket).toBe(3); // floor(10/3) = 3 - expect(mockRandomSampler.random).toHaveBeenCalledTimes(3); }); - it('assigns requests to peers deterministically with wraparound', async () => { + it('assigns requests to peers deterministically with wraparound', () => { // Mock to return first two peers let callCount = 0; mockRandomSampler.random.mockImplementation(() => callCount++ % 2); @@ -67,10 +65,8 @@ describe('BatchConnectionSampler', () => { expect(assignments[3]).toBe(peers[1]); }); - it('handles peer removal and replacement', async () => { - let callCount = 0; - mockRandomSampler.random.mockImplementation(max => { - if (callCount < 2) return callCount++; // Return 0, then 1 for initial peers + it('handles peer removal and replacement', () => { + mockRandomSampler.random.mockImplementation(_ => { return 2; // Return index 2 for replacement peer }); @@ -97,10 +93,12 @@ describe('BatchConnectionSampler', () => { expect(sampler.getPeerForRequest(3)).toBe(peers[1]); }); - it('distributes requests according to documentation example', async () => { + it('distributes requests according to documentation example', () => { let callCount = 0; mockRandomSampler.random.mockImplementation(() => { - if (callCount < 3) return callCount++; + if (callCount < 3) { + return callCount++; + } return 0; }); @@ -129,7 +127,19 @@ describe('BatchConnectionSampler', () => { expect(sampler.getPeerForRequest(8)).toBe(peers[2]); }); - it('handles edge cases', async () => { + it('same number of requests per peers', () => { + let callCount = 0; + mockRandomSampler.random.mockImplementation(() => callCount++ % 2); + + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 2, /* maxPeers */ 2); + expect(sampler.requestsPerBucket).toBe(1); + expect(sampler.activePeerCount).toBe(2); + + expect(sampler.getPeerForRequest(0)).toBe(peers[0]); + expect(sampler.getPeerForRequest(1)).toBe(peers[1]); + }); + + it('handles edge cases, 0 peers, smaller batch than max peers', () => { mockRandomSampler.random.mockImplementation(() => 0); libp2p.getPeers.mockReturnValue([]); diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts index e1cfcd0aba4f..76fc6770d60c 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts @@ -2,7 +2,7 @@ import { createLogger } from '@aztec/foundation/log'; import { type PeerId } from '@libp2p/interface'; -import { ConnectionSampler } from './connection_sampler.js'; +import { type ConnectionSampler } from './connection_sampler.js'; /** * Manages batches of peers for parallel request processing. @@ -22,11 +22,14 @@ export class BatchConnectionSampler { private readonly batch: PeerId[] = []; private readonly requestsPerPeer: number; - constructor( - private readonly connectionSampler: ConnectionSampler, - private readonly batchSize: number, - private readonly maxPeers: number, - ) { + constructor(private readonly connectionSampler: ConnectionSampler, batchSize: number, maxPeers: number) { + if (maxPeers <= 0) { + throw new Error('Max peers cannot be 0'); + } + if (batchSize <= 0) { + throw new Error('Batch size cannot be 0'); + } + // Calculate how many requests each peer should handle, cannot be 0 this.requestsPerPeer = Math.max(1, Math.floor(batchSize / maxPeers)); @@ -41,7 +44,9 @@ export class BatchConnectionSampler { * @returns The peer assigned to handle this request */ getPeerForRequest(index: number): PeerId | undefined { - if (this.batch.length === 0) return undefined; + if (this.batch.length === 0) { + return undefined; + } // Calculate which peer bucket this index belongs to const peerIndex = Math.floor(index / this.requestsPerPeer) % this.batch.length; @@ -56,26 +61,17 @@ export class BatchConnectionSampler { */ removePeerAndReplace(peerId: PeerId): void { const index = this.batch.findIndex(p => p === peerId); - if (index !== -1) { - const newPeer = this.addReplacement(); - if (newPeer) { - this.batch[index] = newPeer; - this.logger.trace(`Replaced peer ${peerId} with ${newPeer}`, { peerId, newPeer }); - } else { - // If we couldn't get a replacement, remove the peer and compact the array - this.batch.splice(index, 1); - this.logger.trace(`Removed peer ${peerId}`, { peerId }); - } - } - } + if (index === -1) return; - /** - * Adds a new peer - * - * @returns The new peer if successful, undefined otherwise - */ - private addReplacement(): PeerId | undefined { - return this.connectionSampler.getPeer(); + const newPeer = this.connectionSampler.getPeer(); + if (newPeer) { + this.batch[index] = newPeer; + this.logger.trace(`Replaced peer ${peerId} with ${newPeer}`, { peerId, newPeer }); + } else { + // If we couldn't get a replacement, remove the peer and compact the array + this.batch.splice(index, 1); + this.logger.trace(`Removed peer ${peerId}`, { peerId }); + } } /** diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts index 04a975e1d05c..59a6ec4ff09d 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts @@ -167,4 +167,95 @@ describe('ConnectionSampler', () => { expect((sampler as any).streams.size).toBe(0); }); }); + + describe('samplePeersBatch', () => { + beforeEach(async () => { + // Create test peers + peers = await Promise.all(new Array(5).fill(0).map(() => createSecp256k1PeerId())); + + // Mock libp2p + mockLibp2p = { + getPeers: jest.fn().mockReturnValue(peers), + dialProtocol: jest.fn(), + }; + + mockRandomSampler = mock(); + sampler = new ConnectionSampler(mockLibp2p, 1000, mockRandomSampler); + }); + + it('prioritizes peers without active connections', () => { + // Set up some peers with active connections + sampler['activeConnectionsCount'].set(peers[3], 1); + sampler['activeConnectionsCount'].set(peers[4], 2); + + // Sample 3 peers + const sampledPeers = sampler.samplePeersBatch(3); + + // Should get peers[0,1,2] first as they have no connections + expect(sampledPeers).toHaveLength(3); + expect(sampledPeers).toContain(peers[0]); + expect(sampledPeers).toContain(peers[1]); + expect(sampledPeers).toContain(peers[2]); + // Should not include peers with active connections when enough peers without connections exist + expect(sampledPeers).not.toContain(peers[3]); + expect(sampledPeers).not.toContain(peers[4]); + }); + + it('falls back to peers with connections when needed', () => { + // Set up most peers with active connections + sampler['activeConnectionsCount'].set(peers[1], 1); + sampler['activeConnectionsCount'].set(peers[2], 1); + sampler['activeConnectionsCount'].set(peers[3], 1); + sampler['activeConnectionsCount'].set(peers[4], 1); + + mockRandomSampler.random.mockReturnValue(0); // Always pick first available peer + + const sampledPeers = sampler.samplePeersBatch(3); + + // Should get peers[0] first (no connections), then some with connections + expect(sampledPeers).toHaveLength(3); + expect(sampledPeers[0]).toBe(peers[0]); // The only peer without connections + expect(sampledPeers.slice(1)).toEqual(expect.arrayContaining([peers[1]])); // Should include some peers with connections + }); + + it('handles case when all peers have active connections', () => { + // Set up all peers with active connections + peers.forEach(peer => sampler['activeConnectionsCount'].set(peer, 1)); + + mockRandomSampler.random.mockReturnValue(0); // Always pick first available peer + + const sampledPeers = sampler.samplePeersBatch(3); + + expect(sampledPeers).toHaveLength(3); + expect(sampledPeers).toEqual(expect.arrayContaining([peers[0], peers[1], peers[2]])); + }); + + it('handles case when fewer peers available than requested', () => { + // Mock libp2p to return fewer peers + const fewPeers = peers.slice(0, 2); + mockLibp2p.getPeers.mockReturnValue(fewPeers); + + const sampledPeers = sampler.samplePeersBatch(5); + + expect(sampledPeers).toHaveLength(2); // Should only return available peers + expect(sampledPeers).toEqual(expect.arrayContaining(fewPeers)); + }); + + it('handles case when no peers available', () => { + mockLibp2p.getPeers.mockReturnValue([]); + + const sampledPeers = sampler.samplePeersBatch(3); + + expect(sampledPeers).toHaveLength(0); + }); + + it('returns exactly the number of peers requested when available', () => { + const sampledPeers = sampler.samplePeersBatch(3); + + expect(sampledPeers).toHaveLength(3); + // Verify all peers are unique + const uniquePeers = new Set(sampledPeers); + expect(uniquePeers.size).toBe(3); + }); + }); }); diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index f386b491cc58..468404deb524 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -1,4 +1,5 @@ import { createLogger } from '@aztec/foundation/log'; +import { SerialQueue } from '@aztec/foundation/queue'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface'; @@ -30,15 +31,18 @@ export class ConnectionSampler { private readonly activeConnectionsCount: Map = new Map(); private readonly streams: Map = new Map(); + // Serial queue to ensure that we only dial one peer at a time + private dialQueue: SerialQueue = new SerialQueue(); + constructor( private readonly libp2p: Libp2p, private readonly cleanupIntervalMs: number = 60000, // Default to 1 minute - - // Random sampler provided so that it can be mocked - private readonly sampler: RandomSampler = new RandomSampler(), + private readonly sampler: RandomSampler = new RandomSampler(), // Allow randomness to be mocked for testing ) { this.cleanupJob = new RunningPromise(() => this.cleanupStaleConnections(), this.logger, this.cleanupIntervalMs); this.cleanupJob.start(); + + this.dialQueue.start(); } /** @@ -46,18 +50,18 @@ export class ConnectionSampler { */ async stop() { await this.cleanupJob?.stop(); + await this.dialQueue.end(); // Close all active streams const closePromises = Array.from(this.streams.keys()).map(streamId => this.close(streamId)); - await Promise.all(closePromises); } getPeer(): PeerId { const peers = this.libp2p.getPeers(); - let randomIndex = this.sampler.random(peers.length); let attempts = 0; + // If the active connections count is greater than 0, then we already have a connection open // So we try to sample a different peer, but only MAX_SAMPLE_ATTEMPTS times while ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 && attempts < MAX_SAMPLE_ATTEMPTS) { @@ -73,22 +77,41 @@ export class ConnectionSampler { } /** - * Samples a batch of peers from the libp2p node + * Samples a batch of unique peers from the libp2p node, prioritizing peers without active connections * - * @param maxPeers - The maximum number of peers to sample - * @returns The sampled peers + * @param numberToSample - The number of peers to sample + * @returns Array of unique sampled peers, prioritizing those without active connections */ - samplePeersBatch(maxPeers: number): PeerId[] { - const peers = []; - for (let i = 0; i < maxPeers; i++) { - const peer = this.getPeer(); - // Can be undefined if we have no peers - if (peer) { - peers.push(peer); + samplePeersBatch(numberToSample: number): PeerId[] { + const peers = this.libp2p.getPeers(); + const sampledPeers: PeerId[] = []; + const peersWithConnections: PeerId[] = []; // Hold onto peers with active connections incase we need to sample more + + for (const peer of peers) { + const activeConnections = this.activeConnectionsCount.get(peer) ?? 0; + if (activeConnections === 0) { + if (sampledPeers.push(peer) === numberToSample) { + return sampledPeers; + } + } else { + peersWithConnections.push(peer); } } - this.logger.trace(`Batch sampled ${peers.length} peers`, { peers }); - return peers; + + // If we still need more peers, sample from those with connections + while (sampledPeers.length < numberToSample && peersWithConnections.length > 0) { + const randomIndex = this.sampler.random(peersWithConnections.length); + const [peer] = peersWithConnections.splice(randomIndex, 1); + sampledPeers.push(peer); + } + + this.logger.trace(`Batch sampled ${sampledPeers.length} unique peers`, { + peers: sampledPeers, + withoutConnections: sampledPeers.length - peersWithConnections.length, + withConnections: peersWithConnections.length, + }); + + return sampledPeers; } // Set of passthrough functions to keep track of active connections @@ -101,9 +124,11 @@ export class ConnectionSampler { * @returns The stream */ async dialProtocol(peerId: PeerId, protocol: string): Promise { - const stream = await this.libp2p.dialProtocol(peerId, protocol); - this.streams.set(stream.id, { stream, peerId }); + // Dialling at the same time can cause race conditions where two different streams + // end up with the same id, hence a serial queue + const stream = await this.dialQueue.put(() => this.libp2p.dialProtocol(peerId, protocol)); + this.streams.set(stream.id, { stream, peerId }); const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 0) + 1; this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index 5758b7a58bd5..cb66b5dcbfdd 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -337,4 +337,60 @@ describe('ReqResp', () => { expect(response).toEqual(Buffer.from([0x0])); }); }); + + describe('Batch requests', () => { + it('should send a batch request between many peers', async () => { + const batchSize = 9; + nodes = await createNodes(peerScoring, 3); + + await startNodes(nodes); + await sleep(500); + await connectToPeers(nodes); + await sleep(500); + + const sendRequestToPeerSpy = jest.spyOn(nodes[0].req, 'sendRequestToPeer'); + + const requests = Array.from({ length: batchSize }, _ => RequestableBuffer.fromBuffer(Buffer.from(`ping`))); + const expectResponses = Array.from({ length: batchSize }, _ => RequestableBuffer.fromBuffer(Buffer.from(`pong`))); + + const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests); + expect(res).toEqual(expectResponses); + + // Expect one request to have been sent to each peer + expect(sendRequestToPeerSpy).toHaveBeenCalledTimes(batchSize); + expect(sendRequestToPeerSpy).toHaveBeenCalledWith( + expect.objectContaining({ + publicKey: nodes[1].p2p.peerId.publicKey, + }), + ReqRespSubProtocol.PING, + Buffer.from('ping'), + ); + expect(sendRequestToPeerSpy).toHaveBeenCalledWith( + expect.objectContaining({ + publicKey: nodes[2].p2p.peerId.publicKey, + }), + ReqRespSubProtocol.PING, + Buffer.from('ping'), + ); + }); + + it('should stop after max retry attempts', async () => { + const batchSize = 12; + nodes = await createNodes(peerScoring, 3); + + await startNodes(nodes); + await sleep(500); + await connectToPeers(nodes); + await sleep(500); + + const requests = Array.from({ length: batchSize }, _ => RequestableBuffer.fromBuffer(Buffer.from(`ping`))); + // We will fail two of the responses - due to hitting the ping rate limit on the responding nodes + const expectResponses = Array.from({ length: batchSize - 2 }, _ => + RequestableBuffer.fromBuffer(Buffer.from(`pong`)), + ); + + const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests); + expect(res).toEqual(expectResponses); + }); + }); }); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index 604c5eaf3cde..19c4345ea9b4 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -16,6 +16,7 @@ import { import { SnappyTransform } from '../encoding.js'; import { type PeerScoring } from '../peer-manager/peer_scoring.js'; import { type P2PReqRespConfig } from './config.js'; +import { BatchConnectionSampler } from './connection-sampler/batch_connection_sampler.js'; import { ConnectionSampler } from './connection-sampler/connection_sampler.js'; import { DEFAULT_SUB_PROTOCOL_HANDLERS, @@ -123,8 +124,8 @@ export class ReqResp { * If no response is received from any peer, it returns undefined. * * The method performs the following steps: - * - Iterates over all active peers. - * - Opens a stream with each peer using the specified sub-protocol. + * - Sample a peer to send the request to. + * - Opens a stream with the peer using the specified sub-protocol. * * When a response is received, it is validated using the given sub protocols response validator. * To see the interface for the response validator - see `interface.ts` @@ -142,13 +143,18 @@ export class ReqResp { subProtocol: SubProtocol, request: InstanceType, ): Promise | undefined> { - const requestFunction = async () => { - const responseValidator = this.subProtocolValidators[subProtocol]; - const requestBuffer = request.toBuffer(); + const responseValidator = this.subProtocolValidators[subProtocol]; + const requestBuffer = request.toBuffer(); + const requestFunction = async () => { // Attempt to ask all of our peers, but sampled in a random order // This function is wrapped in a timeout, so we will exit the loop if we have not received a response const numberOfPeers = this.libp2p.getPeers().length; + if (numberOfPeers === 0) { + this.logger.debug('No active peers to send requests to'); + return undefined; + } + for (let i = 0; i < numberOfPeers; i++) { // Sample a peer to make a request to const peer = this.connectionSampler.getPeer(); @@ -182,6 +188,145 @@ export class ReqResp { } } + /** + * Request multiple messages over the same sub protocol, balancing the requests across peers. + * + * @devnote + * - The function prioritizes sending requests to free peers using a batch sampling strategy. + * - If a peer fails to respond or returns an invalid response, it is removed from the sampling pool and replaced. + * - The function stops retrying once all requests are processed, no active peers remain, or the maximum retry attempts are reached. + * - Responses are validated using a custom validator for the sub-protocol.* + * + * Requests are sent in parallel to each peer, but multiple requests are sent to the same peer in series + * - If a peer fails to respond or returns an invalid response, it is removed from the sampling pool and replaced. + * - The function stops retrying once all requests are processed, no active peers remain, or the maximum retry attempts are reached. + * - Responses are validated using a custom validator for the sub-protocol.* + * + * @param subProtocol + * @param requests + * @param timeoutMs + * @param maxPeers + * @returns + * + * @throws {CollectiveReqRespTimeoutError} - If the request batch exceeds the specified timeout (`timeoutMs`). + */ + async sendBatchRequest( + subProtocol: SubProtocol, + requests: InstanceType[], + timeoutMs = 10000, + maxPeers = Math.min(10, requests.length), + maxRetryAttempts = 3, + ): Promise[]> { + const responseValidator = this.subProtocolValidators[subProtocol]; + const responses: InstanceType[] = new Array(requests.length); + const requestBuffers = requests.map(req => req.toBuffer()); + + const requestFunction = async () => { + // Track which requests still need to be processed + const pendingRequestIndices = new Set(requestBuffers.map((_, i) => i)); + + // Create batch sampler with the total number of requests and max peers + const batchSampler = new BatchConnectionSampler(this.connectionSampler, requests.length, maxPeers); + + if (batchSampler.activePeerCount === 0) { + this.logger.debug('No active peers to send requests to'); + return []; + } + + // This is where it gets fun + // The outer loop is the retry loop, we will continue to retry until we process all indices we have + // not received a response for, or we have reached the max retry attempts + + // The inner loop is the batch loop, we will process all requests for each peer in parallel + // We will then process the results of the requests, and resample any peers that failed to respond + // We will continue to retry until we have processed all indices, or we have reached the max retry attempts + + let retryAttempts = 0; + while (pendingRequestIndices.size > 0 && batchSampler.activePeerCount > 0 && retryAttempts < maxRetryAttempts) { + // Process requests in parallel for each available peer + const requestBatches = new Map(); + + // Group requests by peer + for (const requestIndex of pendingRequestIndices) { + const peer = batchSampler.getPeerForRequest(requestIndex); + if (!peer) { + break; + } + + if (!requestBatches.has(peer)) { + requestBatches.set(peer, []); + } + requestBatches.get(peer)!.push(requestIndex); + } + + // Make parallel requests for each peer's batch + // A batch entry will look something like this: + // PeerId0: [0, 1, 2, 3] + // PeerId1: [0, 1, 2, 3] + + // Peer Id 0 will send requests 0, 1, 2, 3 in serial + // while simultaneously Peer Id 1 will send requests 0, 1, 2, 3 in serial + + const batchResults = await Promise.all( + Array.from(requestBatches.entries()).map(async ([peer, indices]) => { + try { + // Requests all going to the same peer are sent synchronously + const peerResults: { index: number; response: InstanceType }[] = + []; + for (const index of indices) { + const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffers[index]); + + if (response && response.length > 0) { + const object = subProtocolMap[subProtocol].response.fromBuffer(response); + const isValid = await responseValidator(requests[index], object, peer); + + if (isValid) { + peerResults.push({ index, response: object }); + } + } + } + + return { peer, results: peerResults }; + } catch (error) { + this.logger.debug(`Failed batch request to peer ${peer.toString()}:`, error); + batchSampler.removePeerAndReplace(peer); + return { peer, results: [] }; + } + }), + ); + + // Process results + for (const { results } of batchResults) { + for (const { index, response } of results) { + if (response) { + responses[index] = response; + pendingRequestIndices.delete(index); + } + } + } + + retryAttempts++; + } + + if (retryAttempts >= maxRetryAttempts) { + this.logger.debug(`Max retry attempts ${maxRetryAttempts} reached for batch request`); + } + + return responses; + }; + + try { + return await executeTimeout[]>( + requestFunction, + timeoutMs, + () => new CollectiveReqRespTimeoutError(), + ); + } catch (e: any) { + this.logger.debug(`${e.message} | subProtocol: ${subProtocol}`); + return []; + } + } + /** * Sends a request to a specific peer * @@ -214,7 +359,6 @@ export class ReqResp { let stream: Stream | undefined; try { stream = await this.connectionSampler.dialProtocol(peerId, subProtocol); - this.logger.trace(`Stream opened with ${peerId.toString()} for ${subProtocol}`); // Open the stream with a timeout const result = await executeTimeout( @@ -227,6 +371,7 @@ export class ReqResp { } catch (e: any) { this.handleResponseError(e, peerId, subProtocol); } finally { + // Only close the stream if we created it if (stream) { try { await this.connectionSampler.close(stream.id); From 4c3e424e4c9e734750fc41f21c155f8e8b15c8d7 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sun, 19 Jan 2025 01:25:17 +0000 Subject: [PATCH 3/7] fmt --- .../reqresp/connection-sampler/batch_connection_sampler.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts index 76fc6770d60c..b0839c46ba43 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts @@ -61,7 +61,9 @@ export class BatchConnectionSampler { */ removePeerAndReplace(peerId: PeerId): void { const index = this.batch.findIndex(p => p === peerId); - if (index === -1) return; + if (index === -1) { + return; + } const newPeer = this.connectionSampler.getPeer(); if (newPeer) { From a5a2b76d86fcbffcddc50f006643a4d91af5b6bf Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sun, 19 Jan 2025 14:17:46 +0000 Subject: [PATCH 4/7] fix: sample without replacement in reqresp --- .../batch_connection_sampler.ts | 3 ++- .../connection_sampler.test.ts | 19 +++++++++++++++++-- .../connection-sampler/connection_sampler.ts | 18 ++++++++++++++---- .../p2p/src/services/reqresp/reqresp.test.ts | 5 ++++- .../p2p/src/services/reqresp/reqresp.ts | 5 ++++- 5 files changed, 41 insertions(+), 9 deletions(-) diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts index b0839c46ba43..6e699704e9bb 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts @@ -65,7 +65,8 @@ export class BatchConnectionSampler { return; } - const newPeer = this.connectionSampler.getPeer(); + const excluding = new Map([[peerId, true]]); + const newPeer = this.connectionSampler.getPeer(excluding); if (newPeer) { this.batch[index] = newPeer; this.logger.trace(`Replaced peer ${peerId} with ${newPeer}`, { peerId, newPeer }); diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts index 59a6ec4ff09d..37d9713c1d25 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts @@ -11,6 +11,7 @@ describe('ConnectionSampler', () => { let sampler: ConnectionSampler; let mockLibp2p: any; let peers: PeerId[]; + let excluding: Map; let mockRandomSampler: MockProxy; beforeEach(async () => { @@ -27,6 +28,7 @@ describe('ConnectionSampler', () => { mockRandomSampler.random.mockReturnValue(0); sampler = new ConnectionSampler(mockLibp2p, 500, mockRandomSampler); + excluding = new Map(); }); afterEach(async () => { @@ -35,7 +37,7 @@ describe('ConnectionSampler', () => { describe('getPeer', () => { it('returns a random peer from the list', () => { - const peer = sampler.getPeer(); + const peer = sampler.getPeer(excluding); expect(peers).toContain(peer); }); @@ -52,10 +54,23 @@ describe('ConnectionSampler', () => { // Force Math.random to return values that would select the first two peers mockRandomSampler.random.mockReturnValueOnce(0).mockReturnValueOnce(1).mockReturnValueOnce(2); - const selectedPeer = sampler.getPeer(); + const selectedPeer = sampler.getPeer(excluding); // Should select peers[2] as it has no active connections expect(selectedPeer).toBe(peers[2]); }); + + it('should not sample a peer that is being excluded', () => { + // Sample the excluded peer multiple times, but it should not be selected + mockRandomSampler.random + .mockReturnValueOnce(0) + .mockReturnValueOnce(0) + .mockReturnValueOnce(0) + .mockReturnValueOnce(1); + + excluding.set(peers[0], true); + const selectedPeer = sampler.getPeer(excluding); + expect(selectedPeer).toBe(peers[1]); + }); }); describe('connection management', () => { diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index 468404deb524..b0bc3ee48641 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -57,14 +57,24 @@ export class ConnectionSampler { await Promise.all(closePromises); } - getPeer(): PeerId { + /** + * + * @param excluding - The peers to exclude from the sampling + * This is to prevent sampling with replacement + * @returns + */ + getPeer(excluding?: Map): PeerId { const peers = this.libp2p.getPeers(); let randomIndex = this.sampler.random(peers.length); let attempts = 0; - // If the active connections count is greater than 0, then we already have a connection open - // So we try to sample a different peer, but only MAX_SAMPLE_ATTEMPTS times - while ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 && attempts < MAX_SAMPLE_ATTEMPTS) { + // Keep sampling while: + // - we haven't exceeded max attempts AND + // - either the peer has active connections OR is in the exclusion list + while ( + attempts < MAX_SAMPLE_ATTEMPTS && + ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 || (excluding?.get(peers[randomIndex]) ?? false)) + ) { randomIndex = this.sampler.random(peers.length); attempts++; } diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index cb66b5dcbfdd..6b9eb3554c92 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -1,4 +1,5 @@ import { PeerErrorSeverity, TxHash, mockTx } from '@aztec/circuit-types'; +import { createLogger } from '@aztec/foundation/log'; import { sleep } from '@aztec/foundation/sleep'; import { describe, expect, it, jest } from '@jest/globals'; @@ -28,6 +29,7 @@ describe('ReqResp', () => { let peerManager: MockProxy; let peerScoring: MockProxy; let nodes: ReqRespNode[]; + let logger = createLogger('test:reqresp.test.ts'); beforeEach(() => { peerScoring = mock(); @@ -78,7 +80,7 @@ describe('ReqResp', () => { expect(res).toBeUndefined(); }); - it('should request from a later peer if other peers are offline', async () => { + it.only('should request from a later peer if other peers are offline', async () => { nodes = await createNodes(peerScoring, 4); await startNodes(nodes); @@ -96,6 +98,7 @@ describe('ReqResp', () => { if (!res) { // The peer chosen is randomly selected, and the node above wont respond, so if // we wait and try again, there will only be one node to chose from + logger.debug('No response from node, retrying'); await sleep(500); res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST); } diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index 19c4345ea9b4..5bfe0225ce8b 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -150,14 +150,17 @@ export class ReqResp { // Attempt to ask all of our peers, but sampled in a random order // This function is wrapped in a timeout, so we will exit the loop if we have not received a response const numberOfPeers = this.libp2p.getPeers().length; + if (numberOfPeers === 0) { this.logger.debug('No active peers to send requests to'); return undefined; } + let attemptedPeers: Map = new Map(); for (let i = 0; i < numberOfPeers; i++) { // Sample a peer to make a request to - const peer = this.connectionSampler.getPeer(); + const peer = this.connectionSampler.getPeer(attemptedPeers); + attemptedPeers.set(peer, true); this.logger.trace(`Sending request to peer: ${peer.toString()}`); const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffer); From e8d51bb98229d18cf92d03499b7aadff51385016 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sun, 19 Jan 2025 15:29:46 +0000 Subject: [PATCH 5/7] fmt --- yarn-project/p2p/src/services/reqresp/reqresp.test.ts | 4 ++-- yarn-project/p2p/src/services/reqresp/reqresp.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index 6b9eb3554c92..2fbdfce7456a 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -29,7 +29,7 @@ describe('ReqResp', () => { let peerManager: MockProxy; let peerScoring: MockProxy; let nodes: ReqRespNode[]; - let logger = createLogger('test:reqresp.test.ts'); + const logger = createLogger('test:reqresp.test.ts'); beforeEach(() => { peerScoring = mock(); @@ -80,7 +80,7 @@ describe('ReqResp', () => { expect(res).toBeUndefined(); }); - it.only('should request from a later peer if other peers are offline', async () => { + it('should request from a later peer if other peers are offline', async () => { nodes = await createNodes(peerScoring, 4); await startNodes(nodes); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index 5bfe0225ce8b..8ed753331be5 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -156,7 +156,7 @@ export class ReqResp { return undefined; } - let attemptedPeers: Map = new Map(); + const attemptedPeers: Map = new Map(); for (let i = 0; i < numberOfPeers; i++) { // Sample a peer to make a request to const peer = this.connectionSampler.getPeer(attemptedPeers); From e99d930158cb0e626ab524b00bcc19f70500a35c Mon Sep 17 00:00:00 2001 From: Maddiaa <47148561+Maddiaa0@users.noreply.github.com> Date: Mon, 20 Jan 2025 20:40:27 +0800 Subject: [PATCH 6/7] feat(p2p): reqresp spans (#11335) --- .../connection-sampler/connection_sampler.ts | 2 +- .../p2p/src/services/reqresp/metrics.ts | 57 +++++++++++++++++++ .../p2p/src/services/reqresp/reqresp.ts | 44 ++++++++++++-- .../telemetry-client/src/attributes.ts | 2 + yarn-project/telemetry-client/src/metrics.ts | 5 ++ 5 files changed, 104 insertions(+), 6 deletions(-) create mode 100644 yarn-project/p2p/src/services/reqresp/metrics.ts diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index b0bc3ee48641..c31af230232a 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -171,7 +171,7 @@ export class ConnectionSampler { await stream?.close(); } catch (error) { - this.logger.error(`Failed to close connection to peer ${streamId}`, { error }); + this.logger.warn(`Failed to close connection to peer with stream id ${streamId}`); } finally { this.streams.delete(streamId); } diff --git a/yarn-project/p2p/src/services/reqresp/metrics.ts b/yarn-project/p2p/src/services/reqresp/metrics.ts new file mode 100644 index 000000000000..e32b4cdb4f4b --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/metrics.ts @@ -0,0 +1,57 @@ +// Request response metrics +import { Attributes, Metrics, ValueType } from '@aztec/telemetry-client'; +import { type TelemetryClient, type Tracer, type UpDownCounter } from '@aztec/telemetry-client'; + +export class ReqRespMetrics { + public readonly tracer: Tracer; + + private readonly sentRequests: UpDownCounter; + private readonly receivedRequests: UpDownCounter; + + private readonly failedOutboundRequests: UpDownCounter; + private readonly failedInboundRequests: UpDownCounter; + + constructor(readonly telemetryClient: TelemetryClient, name = 'ReqResp') { + this.tracer = telemetryClient.getTracer(name); + + const meter = telemetryClient.getMeter(name); + this.sentRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_SENT_REQUESTS, { + description: 'Number of requests sent to peers', + unit: 'requests', + valueType: ValueType.INT, + }); + this.receivedRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_RECEIVED_REQUESTS, { + description: 'Number of requests received from peers', + unit: 'requests', + valueType: ValueType.INT, + }); + + this.failedOutboundRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS, { + description: 'Number of failed outbound requests - nodes not getting valid responses', + unit: 'requests', + valueType: ValueType.INT, + }); + + this.failedInboundRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_FAILED_INBOUND_REQUESTS, { + description: 'Number of failed inbound requests - node failing to respond to requests', + unit: 'requests', + valueType: ValueType.INT, + }); + } + + public recordRequestSent(protocol: string) { + this.sentRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } + + public recordRequestReceived(protocol: string) { + this.receivedRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } + + public recordRequestError(protocol: string) { + this.failedOutboundRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } + + public recordResponseError(protocol: string) { + this.failedInboundRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } +} diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index 8ed753331be5..c77f7d13b996 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -2,6 +2,7 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { type Logger, createLogger } from '@aztec/foundation/log'; import { executeTimeout } from '@aztec/foundation/timer'; +import { Attributes, type TelemetryClient, getTelemetryClient, trackSpan } from '@aztec/telemetry-client'; import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface'; import { pipe } from 'it-pipe'; @@ -27,6 +28,7 @@ import { type SubProtocolMap, subProtocolMap, } from './interface.js'; +import { ReqRespMetrics } from './metrics.js'; import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js'; /** @@ -53,13 +55,19 @@ export class ReqResp { private subProtocolHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS; private subProtocolValidators: ReqRespSubProtocolValidators = DEFAULT_SUB_PROTOCOL_VALIDATORS; + private connectionSampler: ConnectionSampler; private rateLimiter: RequestResponseRateLimiter; private snappyTransform: SnappyTransform; - private connectionSampler: ConnectionSampler; + private metrics: ReqRespMetrics; - constructor(config: P2PReqRespConfig, private libp2p: Libp2p, private peerScoring: PeerScoring) { + constructor( + config: P2PReqRespConfig, + private libp2p: Libp2p, + private peerScoring: PeerScoring, + telemetryClient: TelemetryClient = getTelemetryClient(), + ) { this.logger = createLogger('p2p:reqresp'); this.overallRequestTimeoutMs = config.overallRequestTimeoutMs; @@ -71,6 +79,11 @@ export class ReqResp { this.connectionSampler = new ConnectionSampler(libp2p); this.snappyTransform = new SnappyTransform(); + this.metrics = new ReqRespMetrics(telemetryClient); + } + + get tracer() { + return this.metrics.tracer; } /** @@ -97,6 +110,9 @@ export class ReqResp { } // Close all active connections + await this.connectionSampler.stop(); + this.logger.debug('ReqResp: Connection sampler stopped'); + const closeStreamPromises = this.libp2p.getConnections().map(connection => connection.close()); await Promise.all(closeStreamPromises); this.logger.debug('ReqResp: All active streams closed'); @@ -104,9 +120,6 @@ export class ReqResp { this.rateLimiter.stop(); this.logger.debug('ReqResp: Rate limiter stopped'); - await this.connectionSampler.stop(); - this.logger.debug('ReqResp: Connection sampler stopped'); - // NOTE: We assume libp2p instance is managed by the caller } @@ -213,6 +226,13 @@ export class ReqResp { * * @throws {CollectiveReqRespTimeoutError} - If the request batch exceeds the specified timeout (`timeoutMs`). */ + @trackSpan( + 'ReqResp.sendBatchRequest', + (subProtocol: ReqRespSubProtocol, requests: InstanceType[]) => ({ + [Attributes.P2P_REQ_RESP_PROTOCOL]: subProtocol, + [Attributes.P2P_REQ_RESP_BATCH_REQUESTS_COUNT]: requests.length, + }), + ) async sendBatchRequest( subProtocol: SubProtocol, requests: InstanceType[], @@ -354,6 +374,10 @@ export class ReqResp { * If the stream is not closed by the dialled peer, and a timeout occurs, then * the stream is closed on the requester's end and sender (us) updates its peer score */ + @trackSpan('ReqResp.sendRequestToPeer', (peerId: PeerId, subProtocol: ReqRespSubProtocol, _: Buffer) => ({ + [Attributes.P2P_ID]: peerId.toString(), + [Attributes.P2P_REQ_RESP_PROTOCOL]: subProtocol, + })) public async sendRequestToPeer( peerId: PeerId, subProtocol: ReqRespSubProtocol, @@ -361,6 +385,8 @@ export class ReqResp { ): Promise { let stream: Stream | undefined; try { + this.metrics.recordRequestSent(subProtocol); + stream = await this.connectionSampler.dialProtocol(peerId, subProtocol); // Open the stream with a timeout @@ -372,6 +398,7 @@ export class ReqResp { return result; } catch (e: any) { + this.metrics.recordRequestError(subProtocol); this.handleResponseError(e, peerId, subProtocol); } finally { // Only close the stream if we created it @@ -479,7 +506,13 @@ export class ReqResp { * We check rate limits for each peer, note the peer will be penalised within the rate limiter implementation * if they exceed their peer specific limits. */ + @trackSpan('ReqResp.streamHandler', (protocol: ReqRespSubProtocol, { connection }: IncomingStreamData) => ({ + [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol, + [Attributes.P2P_ID]: connection.remotePeer.toString(), + })) private async streamHandler(protocol: ReqRespSubProtocol, { stream, connection }: IncomingStreamData) { + this.metrics.recordRequestReceived(protocol); + // Store a reference to from this for the async generator if (!this.rateLimiter.allow(protocol, connection.remotePeer)) { this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`); @@ -506,6 +539,7 @@ export class ReqResp { ); } catch (e: any) { this.logger.warn(e); + this.metrics.recordResponseError(protocol); } finally { await stream.close(); } diff --git a/yarn-project/telemetry-client/src/attributes.ts b/yarn-project/telemetry-client/src/attributes.ts index 15fdeafa62fa..3df53eaf7b6e 100644 --- a/yarn-project/telemetry-client/src/attributes.ts +++ b/yarn-project/telemetry-client/src/attributes.ts @@ -84,6 +84,8 @@ export const ROLLUP_PROVER_ID = 'aztec.rollup.prover_id'; export const PROOF_TIMED_OUT = 'aztec.proof.timed_out'; export const P2P_ID = 'aztec.p2p.id'; +export const P2P_REQ_RESP_PROTOCOL = 'aztec.p2p.req_resp.protocol'; +export const P2P_REQ_RESP_BATCH_REQUESTS_COUNT = 'aztec.p2p.req_resp.batch_requests_count'; export const POOL_NAME = 'aztec.pool.name'; export const SEQUENCER_STATE = 'aztec.sequencer.state'; diff --git a/yarn-project/telemetry-client/src/metrics.ts b/yarn-project/telemetry-client/src/metrics.ts index f755bbde8f45..5314a2039680 100644 --- a/yarn-project/telemetry-client/src/metrics.ts +++ b/yarn-project/telemetry-client/src/metrics.ts @@ -71,6 +71,11 @@ export const L1_PUBLISHER_TX_BLOBDATA_GAS_COST = 'aztec.l1_publisher.tx_blobdata export const PEER_MANAGER_GOODBYES_SENT = 'aztec.peer_manager.goodbyes_sent'; export const PEER_MANAGER_GOODBYES_RECEIVED = 'aztec.peer_manager.goodbyes_received'; +export const P2P_REQ_RESP_SENT_REQUESTS = 'aztec.p2p.req_resp.sent_requests'; +export const P2P_REQ_RESP_RECEIVED_REQUESTS = 'aztec.p2p.req_resp.received_requests'; +export const P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_outbound_requests'; +export const P2P_REQ_RESP_FAILED_INBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_inbound_requests'; + export const PUBLIC_PROCESSOR_TX_DURATION = 'aztec.public_processor.tx_duration'; export const PUBLIC_PROCESSOR_TX_COUNT = 'aztec.public_processor.tx_count'; export const PUBLIC_PROCESSOR_TX_PHASE_COUNT = 'aztec.public_processor.tx_phase_count'; From e16757b4ade59223624600772519bee3e2dd4326 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Mon, 20 Jan 2025 13:03:07 +0000 Subject: [PATCH 7/7] fix: test case where no peer is sampled --- .../batch_connection_sampler.test.ts | 18 ++++++++++++++++++ .../batch_connection_sampler.ts | 1 + .../connection_sampler.test.ts | 6 ++++++ .../connection-sampler/connection_sampler.ts | 13 ++++++++++--- .../p2p/src/services/reqresp/reqresp.ts | 9 +++++++-- 5 files changed, 42 insertions(+), 5 deletions(-) diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts index 661042a24d78..4908ed6c97e9 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts @@ -93,6 +93,24 @@ describe('BatchConnectionSampler', () => { expect(sampler.getPeerForRequest(3)).toBe(peers[1]); }); + it('handles peer removal and replacement - no replacement available', () => { + mockRandomSampler.random.mockImplementation(() => 2); + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 4, /* maxPeers */ 2); + + expect(sampler.activePeerCount).toBe(2); + expect(sampler.getPeerForRequest(0)).toBe(peers[0]); + + // Will sample no peers + libp2p.getPeers.mockReturnValue([]); + + // Remove peer 0, its requests will be distributed to peer 1 + sampler.removePeerAndReplace(peers[0]); + // Decrease the number of active peers + expect(sampler.activePeerCount).toBe(1); + + expect(sampler.getPeerForRequest(0)).toBe(peers[1]); + }); + it('distributes requests according to documentation example', () => { let callCount = 0; mockRandomSampler.random.mockImplementation(() => { diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts index 6e699704e9bb..665d706a01fd 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts @@ -67,6 +67,7 @@ export class BatchConnectionSampler { const excluding = new Map([[peerId, true]]); const newPeer = this.connectionSampler.getPeer(excluding); + if (newPeer) { this.batch[index] = newPeer; this.logger.trace(`Replaced peer ${peerId} with ${newPeer}`, { peerId, newPeer }); diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts index 37d9713c1d25..b718c835390f 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts @@ -41,6 +41,12 @@ describe('ConnectionSampler', () => { expect(peers).toContain(peer); }); + it('returns undefined if no peers are available', () => { + mockLibp2p.getPeers.mockReturnValue([]); + const peer = sampler.getPeer(excluding); + expect(peer).toBeUndefined(); + }); + it('attempts to find peer with no active connections', async () => { // Setup: Create active connection to first two peers const mockStream1: Partial = { id: '1', close: jest.fn() } as Partial; diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index c31af230232a..4c18816330ab 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -26,7 +26,7 @@ export class RandomSampler { */ export class ConnectionSampler { private readonly logger = createLogger('p2p:reqresp:connection-sampler'); - private cleanupJob?: RunningPromise; + private cleanupJob: RunningPromise; private readonly activeConnectionsCount: Map = new Map(); private readonly streams: Map = new Map(); @@ -49,12 +49,14 @@ export class ConnectionSampler { * Stops the cleanup job and closes all active connections */ async stop() { - await this.cleanupJob?.stop(); + this.logger.info('Stopping connection sampler'); + await this.cleanupJob.stop(); await this.dialQueue.end(); // Close all active streams const closePromises = Array.from(this.streams.keys()).map(streamId => this.close(streamId)); await Promise.all(closePromises); + this.logger.info('Connection sampler stopped'); } /** @@ -63,8 +65,13 @@ export class ConnectionSampler { * This is to prevent sampling with replacement * @returns */ - getPeer(excluding?: Map): PeerId { + getPeer(excluding?: Map): PeerId | undefined { const peers = this.libp2p.getPeers(); + + if (peers.length === 0) { + return undefined; + } + let randomIndex = this.sampler.random(peers.length); let attempts = 0; diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index c77f7d13b996..ff2f01195cd9 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -173,6 +173,11 @@ export class ReqResp { for (let i = 0; i < numberOfPeers; i++) { // Sample a peer to make a request to const peer = this.connectionSampler.getPeer(attemptedPeers); + if (!peer) { + this.logger.debug('No peers available to send requests to'); + return undefined; + } + attemptedPeers.set(peer, true); this.logger.trace(`Sending request to peer: ${peer.toString()}`); @@ -285,10 +290,10 @@ export class ReqResp { // Make parallel requests for each peer's batch // A batch entry will look something like this: // PeerId0: [0, 1, 2, 3] - // PeerId1: [0, 1, 2, 3] + // PeerId1: [4, 5, 6, 7] // Peer Id 0 will send requests 0, 1, 2, 3 in serial - // while simultaneously Peer Id 1 will send requests 0, 1, 2, 3 in serial + // while simultaneously Peer Id 1 will send requests 4, 5, 6, 7 in serial const batchResults = await Promise.all( Array.from(requestBatches.entries()).map(async ([peer, indices]) => {