diff --git a/yarn-project/p2p/src/mocks/index.ts b/yarn-project/p2p/src/mocks/index.ts index c9d0679e3f8e..61e16486d33d 100644 --- a/yarn-project/p2p/src/mocks/index.ts +++ b/yarn-project/p2p/src/mocks/index.ts @@ -4,6 +4,7 @@ import { bootstrap } from '@libp2p/bootstrap'; import { tcp } from '@libp2p/tcp'; import { type Libp2p, type Libp2pOptions, createLibp2p } from 'libp2p'; +import { type PeerManager } from '../service/peer_manager.js'; import { type P2PReqRespConfig } from '../service/reqresp/config.js'; import { pingHandler, statusHandler } from '../service/reqresp/handlers.js'; import { @@ -60,8 +61,8 @@ export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = { * @param numberOfNodes - the number of nodes to create * @returns An array of the created nodes */ -export const createNodes = async (numberOfNodes: number): Promise => { - return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp())); +export const createNodes = async (peerManager: PeerManager, numberOfNodes: number): Promise => { + return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp(peerManager))); }; // TODO: think about where else this can go @@ -79,13 +80,13 @@ export const stopNodes = async (nodes: ReqRespNode[]): Promise => { }; // Create a req resp node, exposing the underlying p2p node -export const createReqResp = async (): Promise => { +export const createReqResp = async (peerManager: PeerManager): Promise => { const p2p = await createLibp2pNode(); const config: P2PReqRespConfig = { overallRequestTimeoutMs: 4000, individualRequestTimeoutMs: 2000, }; - const req = new ReqResp(config, p2p); + const req = new ReqResp(config, p2p, peerManager); return { p2p, req, diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index fef7a9ae44c2..aa450fb1b446 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -109,7 +109,7 @@ export class LibP2PService implements P2PService { return this.peerManager.getPeerScore(peerId); }; this.node.services.pubsub.score.params.appSpecificWeight = 10; - this.reqresp = new ReqResp(config, node); + this.reqresp = new ReqResp(config, node, this.peerManager); this.blockReceivedCallback = (block: BlockProposal): Promise => { this.logger.verbose( diff --git a/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.test.ts b/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.test.ts index e35a59d3a08a..813eb1ddbc9d 100644 --- a/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.test.ts +++ b/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.test.ts @@ -1,6 +1,9 @@ import { jest } from '@jest/globals'; import { type PeerId } from '@libp2p/interface'; +import { type MockProxy, mock } from 'jest-mock-extended'; +import { type PeerManager } from '../../peer_manager.js'; +import { PeerErrorSeverity } from '../../peer_scoring.js'; import { PING_PROTOCOL, type ReqRespSubProtocolRateLimits, TX_REQ_PROTOCOL } from '../interface.js'; import { RequestResponseRateLimiter } from './rate_limiter.js'; @@ -20,6 +23,7 @@ const makePeer = (id: string): PeerId => { describe('rate limiter', () => { let rateLimiter: RequestResponseRateLimiter; + let peerManager: MockProxy; beforeEach(() => { jest.useFakeTimers(); @@ -37,7 +41,10 @@ describe('rate limiter', () => { }, }, } as ReqRespSubProtocolRateLimits; // force type as we will not provide descriptions of all protocols - rateLimiter = new RequestResponseRateLimiter(config); + + peerManager = mock(); + + rateLimiter = new RequestResponseRateLimiter(peerManager, config); }); afterEach(() => { @@ -67,21 +74,25 @@ describe('rate limiter', () => { expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(true); } expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(false); + + // Spy on the peer manager and check that penalizePeer is called + expect(peerManager.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.MidToleranceError); }); it('Should allow requests within the global limit', () => { // Initial burst + const falingPeer = makePeer('nolettoinno'); for (let i = 0; i < 10; i++) { expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer(`peer${i}`))).toBe(true); } - expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('nolettoinno'))).toBe(false); + expect(rateLimiter.allow(TX_REQ_PROTOCOL, falingPeer)).toBe(false); // Smooth requests for (let i = 0; i < 10; i++) { jest.advanceTimersByTime(100); expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer(`peer${i}`))).toBe(true); } - expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('nolettoinno'))).toBe(false); + expect(rateLimiter.allow(TX_REQ_PROTOCOL, falingPeer)).toBe(false); // Reset after quota has passed jest.advanceTimersByTime(1000); @@ -89,7 +100,7 @@ describe('rate limiter', () => { for (let i = 0; i < 10; i++) { expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer(`peer${i}`))).toBe(true); } - expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('nolettoinno'))).toBe(false); + expect(rateLimiter.allow(TX_REQ_PROTOCOL, falingPeer)).toBe(false); }); it('Should reset after quota has passed', () => { @@ -125,7 +136,7 @@ describe('rate limiter', () => { }, }, } as ReqRespSubProtocolRateLimits; - const multiProtocolRateLimiter = new RequestResponseRateLimiter(config); + const multiProtocolRateLimiter = new RequestResponseRateLimiter(peerManager, config); const peerId = makePeer('peer1'); @@ -145,7 +156,7 @@ describe('rate limiter', () => { }); it('Should allow requests if no rate limiter is configured', () => { - const rateLimiter = new RequestResponseRateLimiter({} as ReqRespSubProtocolRateLimits); + const rateLimiter = new RequestResponseRateLimiter(peerManager, {} as ReqRespSubProtocolRateLimits); expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('peer1'))).toBe(true); }); diff --git a/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.ts b/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.ts index d6bf348df42b..37b2909c9aee 100644 --- a/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.ts +++ b/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.ts @@ -5,6 +5,8 @@ */ import { type PeerId } from '@libp2p/interface'; +import { type PeerManager } from '../../peer_manager.js'; +import { PeerErrorSeverity } from '../../peer_scoring.js'; import { type ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js'; import { DEFAULT_RATE_LIMITS } from './rate_limits.js'; @@ -69,6 +71,12 @@ interface PeerRateLimiter { lastAccess: number; } +enum RateLimitStatus { + Allowed, + DeniedGlobal, + DeniedPeer, +} + /** * SubProtocolRateLimiter: A rate limiter for managing request rates on a per-peer and global basis for a specific subprotocol. * @@ -98,9 +106,9 @@ export class SubProtocolRateLimiter { this.peerQuotaTimeMs = peerQuotaTimeMs; } - allow(peerId: PeerId): boolean { + allow(peerId: PeerId): RateLimitStatus { if (!this.globalLimiter.allow()) { - return false; + return RateLimitStatus.DeniedGlobal; } const peerIdStr = peerId.toString(); @@ -115,7 +123,11 @@ export class SubProtocolRateLimiter { } else { peerLimiter.lastAccess = Date.now(); } - return peerLimiter.limiter.allow(); + const peerLimitAllowed = peerLimiter.limiter.allow(); + if (!peerLimitAllowed) { + return RateLimitStatus.DeniedPeer; + } + return RateLimitStatus.Allowed; } cleanupInactivePeers() { @@ -138,14 +150,16 @@ export class SubProtocolRateLimiter { * - Initializes with a set of rate limit configurations for different subprotocols. * - Creates a separate SubProtocolRateLimiter for each configured subprotocol. * - When a request comes in, it routes the rate limiting decision to the appropriate subprotocol limiter. + * - Peers who exceed their peer rate limits will be penalised by the peer manager. * * Usage: * ``` + * const peerManager = new PeerManager(...); * const rateLimits = { * subprotocol1: { peerLimit: { quotaCount: 10, quotaTimeMs: 1000 }, globalLimit: { quotaCount: 100, quotaTimeMs: 1000 } }, * subprotocol2: { peerLimit: { quotaCount: 5, quotaTimeMs: 1000 }, globalLimit: { quotaCount: 50, quotaTimeMs: 1000 } } * }; - * const limiter = new RequestResponseRateLimiter(rateLimits); + * const limiter = new RequestResponseRateLimiter(peerManager, rateLimits); * * Note: Ensure to call `stop()` when shutting down to properly clean up all subprotocol limiters. */ @@ -154,7 +168,7 @@ export class RequestResponseRateLimiter { private cleanupInterval: NodeJS.Timeout | undefined = undefined; - constructor(rateLimits: ReqRespSubProtocolRateLimits = DEFAULT_RATE_LIMITS) { + constructor(private peerManager: PeerManager, rateLimits: ReqRespSubProtocolRateLimits = DEFAULT_RATE_LIMITS) { this.subProtocolRateLimiters = new Map(); for (const [subProtocol, protocolLimits] of Object.entries(rateLimits)) { @@ -179,10 +193,19 @@ export class RequestResponseRateLimiter { allow(subProtocol: ReqRespSubProtocol, peerId: PeerId): boolean { const limiter = this.subProtocolRateLimiters.get(subProtocol); if (!limiter) { - // TODO: maybe throw an error here if no rate limiter is configured? return true; } - return limiter.allow(peerId); + const rateLimitStatus = limiter.allow(peerId); + + switch (rateLimitStatus) { + case RateLimitStatus.DeniedPeer: + this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + return false; + case RateLimitStatus.DeniedGlobal: + return false; + default: + return true; + } } cleanupInactivePeers() { diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts index 7c57d3fa8459..e1c5f5ad102d 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts @@ -2,18 +2,26 @@ import { TxHash, mockTx } from '@aztec/circuit-types'; import { sleep } from '@aztec/foundation/sleep'; import { describe, expect, it, jest } from '@jest/globals'; +import { type MockProxy, mock } from 'jest-mock-extended'; import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js'; import { MOCK_SUB_PROTOCOL_HANDLERS, connectToPeers, createNodes, startNodes, stopNodes } from '../../mocks/index.js'; +import { type PeerManager } from '../peer_manager.js'; import { PING_PROTOCOL, TX_REQ_PROTOCOL } from './interface.js'; // The Req Resp protocol should allow nodes to dial specific peers // and ask for specific data that they missed via the traditional gossip protocol. describe('ReqResp', () => { + let peerManager: MockProxy; + + beforeEach(() => { + peerManager = mock(); + }); + it('Should perform a ping request', async () => { // Create two nodes // They need to discover each other - const nodes = await createNodes(2); + const nodes = await createNodes(peerManager, 2); const { req: pinger } = nodes[0]; await startNodes(nodes); @@ -32,7 +40,7 @@ describe('ReqResp', () => { }); it('Should handle gracefully if a peer connected peer is offline', async () => { - const nodes = await createNodes(2); + const nodes = await createNodes(peerManager, 2); const { req: pinger } = nodes[0]; const { req: ponger } = nodes[1]; @@ -53,7 +61,7 @@ describe('ReqResp', () => { }); it('Should request from a later peer if other peers are offline', async () => { - const nodes = await createNodes(4); + const nodes = await createNodes(peerManager, 4); await startNodes(nodes); await sleep(500); @@ -73,7 +81,7 @@ describe('ReqResp', () => { }); it('Should hit a rate limit if too many requests are made in quick succession', async () => { - const nodes = await createNodes(2); + const nodes = await createNodes(peerManager, 2); await startNodes(nodes); @@ -110,7 +118,7 @@ describe('ReqResp', () => { return Promise.resolve(Uint8Array.from(Buffer.from(''))); }; - const nodes = await createNodes(2); + const nodes = await createNodes(peerManager, 2); await startNodes(nodes, protocolHandlers); await sleep(500); @@ -133,7 +141,7 @@ describe('ReqResp', () => { return Promise.resolve(Uint8Array.from(Buffer.from(''))); }; - const nodes = await createNodes(2); + const nodes = await createNodes(peerManager, 2); await startNodes(nodes, protocolHandlers); await sleep(500); @@ -147,7 +155,7 @@ describe('ReqResp', () => { }); it('Should hit individual timeout if nothing is returned over the stream', async () => { - const nodes = await createNodes(2); + const nodes = await createNodes(peerManager, 2); await startNodes(nodes); @@ -175,7 +183,7 @@ describe('ReqResp', () => { }); it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => { - const nodes = await createNodes(4); + const nodes = await createNodes(peerManager, 4); await startNodes(nodes); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.ts b/yarn-project/p2p/src/service/reqresp/reqresp.ts index 51f7a12eb8a3..39e5da0441f1 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.ts @@ -8,6 +8,7 @@ import { type Libp2p } from 'libp2p'; import { type Uint8ArrayList } from 'uint8arraylist'; import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js'; +import { type PeerManager } from '../peer_manager.js'; import { type P2PReqRespConfig } from './config.js'; import { DEFAULT_SUB_PROTOCOL_HANDLERS, @@ -38,13 +39,13 @@ export class ReqResp { private subProtocolHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS; private rateLimiter: RequestResponseRateLimiter; - constructor(config: P2PReqRespConfig, protected readonly libp2p: Libp2p) { + constructor(config: P2PReqRespConfig, protected readonly libp2p: Libp2p, peerManager: PeerManager) { this.logger = createDebugLogger('aztec:p2p:reqresp'); this.overallRequestTimeoutMs = config.overallRequestTimeoutMs; this.individualRequestTimeoutMs = config.individualRequestTimeoutMs; - this.rateLimiter = new RequestResponseRateLimiter(); + this.rateLimiter = new RequestResponseRateLimiter(peerManager); } /**