From 4b5a440b63cc8fc25f933afa5c9190b71a3bf363 Mon Sep 17 00:00:00 2001 From: fcarreiro Date: Wed, 6 May 2026 16:43:58 +0000 Subject: [PATCH] feat(p2p): tx validation cache --- yarn-project/foundation/src/config/env_var.ts | 1 + yarn-project/p2p/src/client/factory.ts | 14 +- yarn-project/p2p/src/config.ts | 8 + .../tx_validator/cached_tx_validator.test.ts | 71 +++++++ .../tx_validator/cached_tx_validator.ts | 32 ++++ .../tx_validator/data_validator.ts | 2 + .../msg_validators/tx_validator/factory.ts | 30 ++- .../src/msg_validators/tx_validator/index.ts | 2 + .../tx_validator/tx_proof_validator.ts | 2 + .../tx_validator/tx_validation_cache.test.ts | 177 ++++++++++++++++++ .../tx_validator/tx_validation_cache.ts | 171 +++++++++++++++++ .../p2p/src/services/libp2p/libp2p_service.ts | 23 ++- 12 files changed, 518 insertions(+), 15 deletions(-) create mode 100644 yarn-project/p2p/src/msg_validators/tx_validator/cached_tx_validator.test.ts create mode 100644 yarn-project/p2p/src/msg_validators/tx_validator/cached_tx_validator.ts create mode 100644 yarn-project/p2p/src/msg_validators/tx_validator/tx_validation_cache.test.ts create mode 100644 yarn-project/p2p/src/msg_validators/tx_validator/tx_validation_cache.ts diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 4eb5634ea0a4..041f8d085169 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -158,6 +158,7 @@ export type EnvVar = | 'P2P_TX_POOL_DELETE_TXS_AFTER_REORG' | 'P2P_MIN_TX_POOL_AGE_MS' | 'P2P_RPC_PRICE_BUMP_PERCENTAGE' + | 'P2P_TX_VALIDATION_CACHE_SIZE' | 'DEBUG_P2P_INSTRUMENT_MESSAGES' | 'PEER_ID_PRIVATE_KEY' | 'PEER_ID_PRIVATE_KEY_PATH' diff --git a/yarn-project/p2p/src/client/factory.ts b/yarn-project/p2p/src/client/factory.ts index 8080d68ae005..9c2985cad238 100644 --- a/yarn-project/p2p/src/client/factory.ts +++ b/yarn-project/p2p/src/client/factory.ts @@ -24,6 +24,7 @@ import { createTxValidatorForTransactionsEnteringPendingTxPool, getDefaultAllowedSetupFunctions, } from '../msg_validators/index.js'; +import { TxValidationCache } from '../msg_validators/tx_validator/tx_validation_cache.js'; import { DummyP2PService } from '../services/dummy_service.js'; import { LibP2PService } from '../services/index.js'; import { createFileStoreTxSources } from '../services/tx_collection/file_store_tx_source.js'; @@ -137,6 +138,9 @@ export async function createP2PClient( attestationPool: deps.attestationPool ?? new AttestationPool(attestationStore, telemetry), }; + const txValidationCache = + config.txValidationCacheSize > 0 ? new TxValidationCache(config.txValidationCacheSize) : undefined; + const p2pService = await createP2PService( config, archiver, @@ -151,9 +155,15 @@ export async function createP2PClient( packageVersion, logger.createChild('libp2p_service'), telemetry, + txValidationCache, ); - const txValidatorForTxCollection = createTxValidatorForReqResponseReceivedTxs(proofVerifier, config); + const txValidatorForTxCollection = createTxValidatorForReqResponseReceivedTxs( + proofVerifier, + config, + /*bindings=*/ undefined, + txValidationCache, + ); const nodeSources = [ ...createNodeRpcTxSources(config.txCollectionNodeRpcUrls, txValidatorForTxCollection, config), ...(deps.rpcTxProviders ?? []).map( @@ -230,6 +240,7 @@ async function createP2PService( packageVersion: string, logger: Logger, telemetry: TelemetryClient, + txValidationCache?: TxValidationCache, ) { if (!config.p2pEnabled) { logger.verbose('P2P is disabled. Using dummy P2P service.'); @@ -253,6 +264,7 @@ async function createP2PService( blockMinFeesProvider, telemetry, logger: logger.createChild(`libp2p_service`), + txValidationCache, }); return p2pService; diff --git a/yarn-project/p2p/src/config.ts b/yarn-project/p2p/src/config.ts index c675e73f5284..97a65a9e531d 100644 --- a/yarn-project/p2p/src/config.ts +++ b/yarn-project/p2p/src/config.ts @@ -184,6 +184,9 @@ export interface P2PConfig /** The node's seen message ID cache size */ seenMessageCacheSize: number; + /** Maximum number of (validator, tx) pairs to keep in the tx validation LRU cache. */ + txValidationCacheSize: number; + /** True to disable the status handshake on peer connected. */ p2pDisableStatusHandshake?: boolean; @@ -471,6 +474,11 @@ export const p2pConfigMappings: ConfigMappingsType = { description: 'The number of messages to keep in the seen message cache', ...numberConfigHelper(100_000), // 100K }, + txValidationCacheSize: { + env: 'P2P_TX_VALIDATION_CACHE_SIZE', + description: 'Maximum number of (validator, tx) pairs to keep in the tx validation LRU cache.', + ...numberConfigHelper(10_000), + }, p2pDisableStatusHandshake: { env: 'P2P_DISABLE_STATUS_HANDSHAKE', description: 'True to disable the status handshake on peer connected.', diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/cached_tx_validator.test.ts b/yarn-project/p2p/src/msg_validators/tx_validator/cached_tx_validator.test.ts new file mode 100644 index 000000000000..3ab1e58a9e58 --- /dev/null +++ b/yarn-project/p2p/src/msg_validators/tx_validator/cached_tx_validator.test.ts @@ -0,0 +1,71 @@ +import { mockTx } from '@aztec/stdlib/testing'; +import type { AnyTx, TxValidationResult, TxValidator } from '@aztec/stdlib/tx'; + +import { jest } from '@jest/globals'; + +import { CachedTxValidator } from './cached_tx_validator.js'; +import type { ITxValidationCache } from './tx_validation_cache.js'; + +describe('CachedTxValidator', () => { + class TestValidator implements TxValidator { + public readonly identifier = Symbol('TestValidator'); + + constructor(private readonly validateImpl: (tx: AnyTx) => Promise) {} + + public validateTx(tx: AnyTx): Promise { + return this.validateImpl(tx); + } + } + + class TestTxValidatorCache implements ITxValidationCache { + public readonly getOrValidate: jest.MockedFunction; + + constructor(impl?: ITxValidationCache['getOrValidate']) { + this.getOrValidate = jest.fn(impl ?? ((_s, _h, validate) => validate())); + } + } + + it('returns inner validator unchanged when cache is not provided', () => { + const inner = new TestValidator(() => Promise.resolve({ result: 'valid' })); + + const wrapped = CachedTxValidator.new(inner, undefined); + + expect(wrapped).toBe(inner); + }); + + it('delegates validation to cache.getOrValidate using validator identifier and tx hash', async () => { + const tx = await mockTx(1); + const validate = jest.fn<(tx: AnyTx) => Promise>().mockResolvedValue({ result: 'valid' }); + const inner = new TestValidator(txArg => validate(txArg)); + const cache = new TestTxValidatorCache(); + + const wrapped = CachedTxValidator.new(inner, cache); + await wrapped.validateTx(tx); + + expect(cache.getOrValidate).toHaveBeenCalledTimes(1); + expect(cache.getOrValidate).toHaveBeenCalledWith(inner.identifier, tx.getTxHash(), expect.any(Function)); + expect(validate).toHaveBeenCalledTimes(1); + }); + + it('returns the value produced by cache.getOrValidate', async () => { + const tx = await mockTx(2); + const result: TxValidationResult = { result: 'invalid', reason: ['cache-hit'] }; + const validate = jest.fn<(tx: AnyTx) => Promise>().mockResolvedValue({ result: 'valid' }); + const inner = new TestValidator(txArg => validate(txArg)); + const cache = new TestTxValidatorCache(() => Promise.resolve(result)); + + const wrapped = CachedTxValidator.new(inner, cache); + + await expect(wrapped.validateTx(tx)).resolves.toEqual(result); + expect(validate).not.toHaveBeenCalled(); + }); + + it('propagates rejections from cache.getOrValidate', async () => { + const tx = await mockTx(3); + const error = new Error('cache failed'); + const cache = new TestTxValidatorCache(() => Promise.reject(error)); + const wrapped = CachedTxValidator.new(new TestValidator(() => Promise.resolve({ result: 'valid' })), cache); + + await expect(wrapped.validateTx(tx)).rejects.toThrow(error.message); + }); +}); diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/cached_tx_validator.ts b/yarn-project/p2p/src/msg_validators/tx_validator/cached_tx_validator.ts new file mode 100644 index 000000000000..d10c64eb5b32 --- /dev/null +++ b/yarn-project/p2p/src/msg_validators/tx_validator/cached_tx_validator.ts @@ -0,0 +1,32 @@ +import type { AnyTx, TxValidationResult, TxValidator } from '@aztec/stdlib/tx'; +import { getTxHash } from '@aztec/stdlib/tx'; + +import type { ITxValidationCache } from './tx_validation_cache.js'; + +/** Wraps a {@link TxValidator} to cache its results in a shared {@link ITxValidationCache}. */ +export class CachedTxValidator implements TxValidator { + constructor( + private readonly inner: TxValidator, + private readonly validatorSymbol: symbol, + private readonly cache: ITxValidationCache, + ) {} + + public static new( + inner: TxValidator & { identifier: symbol }, + cache?: ITxValidationCache, + ): TxValidator { + return CachedTxValidator.newWithIdentifier(inner, inner.identifier, cache); + } + + public static newWithIdentifier( + inner: TxValidator, + identifier: symbol, + cache?: ITxValidationCache, + ): TxValidator { + return cache ? new CachedTxValidator(inner, identifier, cache) : inner; + } + + public validateTx(tx: T): Promise { + return this.cache.getOrValidate(this.validatorSymbol, getTxHash(tx), () => this.inner.validateTx(tx)); + } +} diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.ts b/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.ts index 7c284b6d0ce3..d1dcd7c75ead 100644 --- a/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.ts +++ b/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.ts @@ -20,6 +20,8 @@ import { } from '@aztec/stdlib/tx'; export class DataTxValidator implements TxValidator { + public readonly identifier: symbol = Symbol('DataTxValidator'); + #log: Logger; constructor(bindings?: LoggerBindings) { diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/factory.ts b/yarn-project/p2p/src/msg_validators/tx_validator/factory.ts index f6ad249e4566..8662cab65627 100644 --- a/yarn-project/p2p/src/msg_validators/tx_validator/factory.ts +++ b/yarn-project/p2p/src/msg_validators/tx_validator/factory.ts @@ -53,6 +53,7 @@ import type { TxMetaData } from '../../mem_pools/tx_pool_v2/tx_metadata.js'; import { AggregateTxValidator } from './aggregate_tx_validator.js'; import { ArchiveCache } from './archive_cache.js'; import { type ArchiveSource, BlockHeaderTxValidator } from './block_header_validator.js'; +import { CachedTxValidator } from './cached_tx_validator.js'; import { ContractInstanceTxValidator } from './contract_instance_validator.js'; import { DataTxValidator } from './data_validator.js'; import { DoubleSpendTxValidator, type NullifierSource } from './double_spend_validator.js'; @@ -64,6 +65,7 @@ import { SizeTxValidator } from './size_validator.js'; import { TimestampTxValidator } from './timestamp_validator.js'; import { TxPermittedValidator } from './tx_permitted_validator.js'; import { TxProofValidator } from './tx_proof_validator.js'; +import { TxValidationCache } from './tx_validation_cache.js'; /** * A validator paired with a peer penalty severity. @@ -99,6 +101,7 @@ export function createFirstStageTxValidationsForGossipedTransactions( allowedInSetup: AllowedElement[] = [], bindings?: LoggerBindings, gasLimitOpts?: { rollupManaLimit?: number; maxBlockL2Gas?: number; maxBlockDAGas?: number }, + cache?: TxValidationCache, ): Record { const merkleTree = worldStateSynchronizer.getCommitted(); @@ -165,7 +168,7 @@ export function createFirstStageTxValidationsForGossipedTransactions( severity: PeerErrorSeverity.MidToleranceError, }, dataValidator: { - validator: new DataTxValidator(bindings), + validator: CachedTxValidator.new(new DataTxValidator(bindings), cache), severity: PeerErrorSeverity.MidToleranceError, }, contractInstanceValidator: { @@ -185,10 +188,11 @@ export function createFirstStageTxValidationsForGossipedTransactions( export function createSecondStageTxValidationsForGossipedTransactions( proofVerifier: ClientProtocolCircuitVerifier, bindings?: LoggerBindings, + cache?: TxValidationCache, ): Record { return { proofValidator: { - validator: new TxProofValidator(proofVerifier, bindings), + validator: CachedTxValidator.new(new TxProofValidator(proofVerifier, bindings), cache), severity: PeerErrorSeverity.LowToleranceError, }, }; @@ -210,8 +214,9 @@ function createTxValidatorForMinimumTxIntegrityChecks( rollupVersion: number; }, bindings?: LoggerBindings, + cache?: TxValidationCache, ): TxValidator { - return new AggregateTxValidator( + const aggregate = new AggregateTxValidator( new MetadataTxValidator( { l1ChainId: new Fr(l1ChainId), @@ -222,10 +227,14 @@ function createTxValidatorForMinimumTxIntegrityChecks( bindings, ), new SizeTxValidator(bindings), - new DataTxValidator(bindings), + CachedTxValidator.new(new DataTxValidator(bindings), cache), new ContractInstanceTxValidator(bindings), - new TxProofValidator(verifier, bindings), + CachedTxValidator.new(new TxProofValidator(verifier, bindings), cache), ); + + // This validator is not state-dependent so we can cache it. + const identifier = Symbol('TxValidatorForMinimumTxIntegrityChecks'); + return CachedTxValidator.newWithIdentifier(aggregate, identifier, cache); } /** @@ -244,8 +253,9 @@ export function createTxValidatorForReqResponseReceivedTxs( rollupVersion: number; }, bindings?: LoggerBindings, + cache?: TxValidationCache, ): TxValidator { - return createTxValidatorForMinimumTxIntegrityChecks(verifier, { l1ChainId, rollupVersion }, bindings); + return createTxValidatorForMinimumTxIntegrityChecks(verifier, { l1ChainId, rollupVersion }, bindings, cache); } /** @@ -263,8 +273,9 @@ export function createTxValidatorForBlockProposalReceivedTxs( rollupVersion: number; }, bindings?: LoggerBindings, + cache?: TxValidationCache, ): TxValidator { - return createTxValidatorForMinimumTxIntegrityChecks(verifier, { l1ChainId, rollupVersion }, bindings); + return createTxValidatorForMinimumTxIntegrityChecks(verifier, { l1ChainId, rollupVersion }, bindings, cache); } /** @@ -303,6 +314,7 @@ export function createTxValidatorForAcceptingTxsOverRPC( maxBlockDAGas?: number; }, bindings?: LoggerBindings, + cache?: TxValidationCache, ): TxValidator { const validators: TxValidator[] = [ new TxPermittedValidator(txsPermitted, bindings), @@ -326,7 +338,7 @@ export function createTxValidatorForAcceptingTxsOverRPC( new PhasesTxValidator(contractDataSource, setupAllowList, timestamp, bindings), new BlockHeaderTxValidator(new ArchiveCache(db), bindings), new DoubleSpendTxValidator(new NullifierCache(db), bindings), - new DataTxValidator(bindings), + CachedTxValidator.new(new DataTxValidator(bindings), cache), new ContractInstanceTxValidator(bindings), ]; @@ -341,7 +353,7 @@ export function createTxValidatorForAcceptingTxsOverRPC( } if (verifier) { - validators.push(new TxProofValidator(verifier, bindings)); + validators.push(CachedTxValidator.new(new TxProofValidator(verifier, bindings), cache)); } return new AggregateTxValidator(...validators); diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/index.ts b/yarn-project/p2p/src/msg_validators/tx_validator/index.ts index 893796772a3b..4a21303b725e 100644 --- a/yarn-project/p2p/src/msg_validators/tx_validator/index.ts +++ b/yarn-project/p2p/src/msg_validators/tx_validator/index.ts @@ -14,3 +14,5 @@ export * from './tx_permitted_validator.js'; export * from './timestamp_validator.js'; export * from './size_validator.js'; export * from './factory.js'; +export * from './tx_validation_cache.js'; +export * from './cached_tx_validator.js'; diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/tx_proof_validator.ts b/yarn-project/p2p/src/msg_validators/tx_validator/tx_proof_validator.ts index eae4cbf33c3c..d461097a738e 100644 --- a/yarn-project/p2p/src/msg_validators/tx_validator/tx_proof_validator.ts +++ b/yarn-project/p2p/src/msg_validators/tx_validator/tx_proof_validator.ts @@ -3,6 +3,8 @@ import type { ClientProtocolCircuitVerifier } from '@aztec/stdlib/interfaces/ser import { TX_ERROR_INVALID_PROOF, Tx, type TxValidationResult, type TxValidator } from '@aztec/stdlib/tx'; export class TxProofValidator implements TxValidator { + public readonly identifier: symbol = Symbol('TxProofValidator'); + #log: Logger; constructor( diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/tx_validation_cache.test.ts b/yarn-project/p2p/src/msg_validators/tx_validator/tx_validation_cache.test.ts new file mode 100644 index 000000000000..8fb890cd98e4 --- /dev/null +++ b/yarn-project/p2p/src/msg_validators/tx_validator/tx_validation_cache.test.ts @@ -0,0 +1,177 @@ +import { mockTx } from '@aztec/stdlib/testing'; +import type { TxHash, TxValidationResult } from '@aztec/stdlib/tx'; + +import { jest } from '@jest/globals'; + +import { TxValidationCache } from './tx_validation_cache.js'; + +describe('TxValidationCache', () => { + const validatorA = Symbol('validatorA'); + const validatorB = Symbol('validatorB'); + + let cache: TxValidationCache; + let txHash: TxHash; + + beforeEach(async () => { + cache = new TxValidationCache(100); + txHash = (await mockTx(1)).getTxHash(); + }); + + describe('get / set', () => { + it('returns undefined on a cache miss', () => { + expect(cache.get(validatorA, txHash)).toBeUndefined(); + }); + + it('returns the stored promise on a cache hit', async () => { + const result: TxValidationResult = { result: 'valid' }; + cache.set(validatorA, txHash, Promise.resolve(result)); + + await expect(cache.get(validatorA, txHash)).resolves.toEqual(result); + }); + + it('does not share entries across different validator symbols', () => { + cache.set(validatorA, txHash, Promise.resolve({ result: 'valid' })); + + expect(cache.get(validatorB, txHash)).toBeUndefined(); + }); + + it('does not share entries across different tx hashes', async () => { + const otherHash = (await mockTx(2)).getTxHash(); + cache.set(validatorA, txHash, Promise.resolve({ result: 'valid' })); + + expect(cache.get(validatorA, otherHash)).toBeUndefined(); + }); + }); + + describe('LRU eviction', () => { + it('evicts the least-recently-used entry when the cache is full', async () => { + const smallCache = new TxValidationCache(2); + const hash1 = (await mockTx(10)).getTxHash(); + const hash2 = (await mockTx(11)).getTxHash(); + const hash3 = (await mockTx(12)).getTxHash(); + const result: TxValidationResult = { result: 'valid' }; + + smallCache.set(validatorA, hash1, Promise.resolve(result)); + smallCache.set(validatorA, hash2, Promise.resolve(result)); + // hash1 is now the LRU entry; adding hash3 should evict it + smallCache.set(validatorA, hash3, Promise.resolve(result)); + + expect(smallCache.get(validatorA, hash1)).toBeUndefined(); + expect(smallCache.get(validatorA, hash2)).toBeDefined(); + expect(smallCache.get(validatorA, hash3)).toBeDefined(); + }); + + it('refreshes recency on get so that accessed entries are not evicted first', async () => { + const smallCache = new TxValidationCache(2); + const hash1 = (await mockTx(20)).getTxHash(); + const hash2 = (await mockTx(21)).getTxHash(); + const hash3 = (await mockTx(22)).getTxHash(); + const result: TxValidationResult = { result: 'valid' }; + + smallCache.set(validatorA, hash1, Promise.resolve(result)); + smallCache.set(validatorA, hash2, Promise.resolve(result)); + // Access hash1 so hash2 becomes the LRU entry + void smallCache.get(validatorA, hash1); + smallCache.set(validatorA, hash3, Promise.resolve(result)); + + expect(smallCache.get(validatorA, hash1)).toBeDefined(); + expect(smallCache.get(validatorA, hash2)).toBeUndefined(); + expect(smallCache.get(validatorA, hash3)).toBeDefined(); + }); + + it('throws when constructed with maxSize < 1', () => { + expect(() => new TxValidationCache(0)).toThrow(); + }); + }); + + describe('getOrValidate', () => { + it('calls validate and caches the result on a miss', async () => { + const expected: TxValidationResult = { result: 'invalid', reason: ['bad'] }; + const validate = jest.fn<() => Promise>().mockResolvedValue(expected); + + await expect(cache.getOrValidate(validatorA, txHash, validate)).resolves.toEqual(expected); + expect(validate).toHaveBeenCalledTimes(1); + }); + + it('returns the cached promise on a hit without calling validate again', async () => { + const expected: TxValidationResult = { result: 'valid' }; + const validate = jest.fn<() => Promise>().mockResolvedValue(expected); + + await cache.getOrValidate(validatorA, txHash, validate); + await expect(cache.getOrValidate(validatorA, txHash, validate)).resolves.toEqual(expected); + expect(validate).toHaveBeenCalledTimes(1); + }); + + it('coalesces concurrent in-flight validations for the same key into a single call', async () => { + const expected: TxValidationResult = { result: 'invalid', reason: ['bad proof'] }; + + let resolveValidation!: (v: TxValidationResult) => void; + const inFlight = new Promise(resolve => { + resolveValidation = resolve; + }); + const validate = jest.fn<() => Promise>().mockReturnValue(inFlight); + + const first = cache.getOrValidate(validatorA, txHash, validate); + const second = cache.getOrValidate(validatorA, txHash, validate); + const third = cache.getOrValidate(validatorA, txHash, validate); + + expect(validate).toHaveBeenCalledTimes(1); + + resolveValidation(expected); + + await expect(first).resolves.toEqual(expected); + await expect(second).resolves.toEqual(expected); + await expect(third).resolves.toEqual(expected); + }); + + it('scopes validation results by validator symbol', async () => { + const resultA: TxValidationResult = { result: 'valid' }; + const resultB: TxValidationResult = { result: 'invalid', reason: ['nope'] }; + + const validateA = jest.fn<() => Promise>().mockResolvedValue(resultA); + const validateB = jest.fn<() => Promise>().mockResolvedValue(resultB); + + await expect(cache.getOrValidate(validatorA, txHash, validateA)).resolves.toEqual(resultA); + await expect(cache.getOrValidate(validatorB, txHash, validateB)).resolves.toEqual(resultB); + + expect(validateA).toHaveBeenCalledTimes(1); + expect(validateB).toHaveBeenCalledTimes(1); + }); + + it('evicts a rejected validation so a later call retries and can succeed', async () => { + const error = new Error('temporary failure'); + const success: TxValidationResult = { result: 'valid' }; + const validate = jest + .fn<() => Promise>() + .mockRejectedValueOnce(error) + .mockResolvedValueOnce(success); + + await expect(cache.getOrValidate(validatorA, txHash, validate)).rejects.toThrow(error.message); + await expect(cache.getOrValidate(validatorA, txHash, validate)).resolves.toEqual(success); + }); + + it('evicts rejected in-flight promises and retries on the next call', async () => { + const error = new Error('downstream unavailable'); + const success: TxValidationResult = { result: 'invalid', reason: ['bad tx'] }; + + let rejectValidation!: (err: Error) => void; + const firstInFlight = new Promise((_, reject) => { + rejectValidation = reject; + }); + + const validate = jest + .fn<() => Promise>() + .mockReturnValueOnce(firstInFlight) + .mockResolvedValueOnce(success); + + const first = cache.getOrValidate(validatorA, txHash, validate); + const second = cache.getOrValidate(validatorA, txHash, validate); + + rejectValidation(error); + await expect(first).rejects.toThrow(error.message); + await expect(second).rejects.toThrow(error.message); + + await expect(cache.getOrValidate(validatorA, txHash, validate)).resolves.toEqual(success); + }); + }); +}); diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/tx_validation_cache.ts b/yarn-project/p2p/src/msg_validators/tx_validator/tx_validation_cache.ts new file mode 100644 index 000000000000..e6ca2a0a8034 --- /dev/null +++ b/yarn-project/p2p/src/msg_validators/tx_validator/tx_validation_cache.ts @@ -0,0 +1,171 @@ +import { type Logger, createLogger } from '@aztec/foundation/log'; +import type { TxHash, TxValidationResult } from '@aztec/stdlib/tx'; + +/** + * Minimal interface consumed by {@link CachedTxValidator}. + * Keeping the dependency on an interface lets callers (and tests) substitute any cache implementation. + */ +export interface ITxValidationCache { + /** Returns the cached promise if present, otherwise calls `validate`, caches its promise, and returns it. */ + getOrValidate( + validatorSymbol: symbol, + txHash: TxHash, + validate: () => Promise, + ): Promise; +} + +/** Node in the doubly-linked list used for LRU ordering. Head = least recent, tail = most recent. */ +type LruNode = { + key: string; + prev: LruNode | undefined; + next: LruNode | undefined; +}; + +/** + * Caches per-validator tx validation results to avoid redundant work across repeated validation calls. + * + * The cache key is composed from the validator symbol and tx hash, ensuring results are + * scoped to the specific validator that produced them. + * + * Promises are stored before they are awaited, so concurrent calls for the same pair share + * a single in-flight validation rather than launching duplicate work. + * + * Entries are evicted in least-recently-used order once the cache reaches `maxSize`. + */ +export class TxValidationCache { + #log: Logger; + + private readonly values = new Map>(); + private readonly nodes = new Map(); + private head: LruNode | undefined; + private tail: LruNode | undefined; + + constructor(private readonly maxSize: number) { + if (maxSize < 1) { + throw new Error('TxValidationCache maxSize must be at least 1'); + } + this.#log = createLogger('p2p:tx_validation_cache'); + } + + private key(validatorSymbol: symbol, txHash: TxHash): string { + return `${Symbol.keyFor(validatorSymbol) ?? validatorSymbol.toString()}:${txHash.toString()}`; + } + + /** Returns the cached promise for the given validator and tx, or undefined if not cached. Refreshes recency. */ + public get(validatorSymbol: symbol, txHash: TxHash): Promise | undefined { + const k = this.key(validatorSymbol, txHash); + const node = this.nodes.get(k); + if (!node) { + return undefined; + } + this.moveToTail(node); + return this.values.get(k); + } + + /** Stores a validation promise for the given validator and tx, evicting the LRU entry if at capacity. */ + public set(validatorSymbol: symbol, txHash: TxHash, result: Promise): void { + const k = this.key(validatorSymbol, txHash); + const existing = this.nodes.get(k); + if (existing) { + this.values.set(k, result); + this.moveToTail(existing); + return; + } + + if (this.values.size >= this.maxSize) { + this.evictHead(); + } + + const node: LruNode = { key: k, prev: this.tail, next: undefined }; + if (this.tail) { + this.tail.next = node; + } else { + this.head = node; + } + this.tail = node; + this.nodes.set(k, node); + this.values.set(k, result); + } + + /** Removes a cached validation promise for the given validator and tx. */ + public delete(validatorSymbol: symbol, txHash: TxHash): void { + const k = this.key(validatorSymbol, txHash); + const node = this.nodes.get(k); + if (!node) { + return; + } + this.unlink(node); + this.nodes.delete(k); + this.values.delete(k); + } + + /** + * Returns the cached promise if present, otherwise calls `validate`, stores its promise + * immediately (before awaiting), and returns it. + */ + public async getOrValidate( + validatorSymbol: symbol, + txHash: TxHash, + validate: () => Promise, + ): Promise { + const cached = this.get(validatorSymbol, txHash); + if (cached !== undefined) { + // If the promise is already resolved, log the result. + const result: string = await Promise.race([cached.then(p => p.result), Promise.resolve('')]); + this.#log.debug( + `Returning cached result '${result}' for validator ${validatorSymbol.toString()} and tx ${txHash.toString()}`, + ); + return cached; + } + const promise = validate().catch(err => { + // Evict failed validations so the next call retries instead of reusing a rejected promise. + this.delete(validatorSymbol, txHash); + throw err; + }); + this.set(validatorSymbol, txHash, promise); + return promise; + } + + private moveToTail(node: LruNode): void { + if (node === this.tail) { + return; + } + this.unlink(node); + node.prev = this.tail; + node.next = undefined; + if (this.tail) { + this.tail.next = node; + } + this.tail = node; + } + + private unlink(node: LruNode): void { + if (node.prev) { + node.prev.next = node.next; + } else { + this.head = node.next; + } + if (node.next) { + node.next.prev = node.prev; + } else { + this.tail = node.prev; + } + node.prev = undefined; + node.next = undefined; + } + + private evictHead(): void { + const oldHead = this.head; + if (!oldHead) { + return; + } + this.head = oldHead.next; + if (this.head) { + this.head.prev = undefined; + } else { + this.tail = undefined; + } + this.nodes.delete(oldHead.key); + this.values.delete(oldHead.key); + } +} diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 980d632a6d28..34881b898dd5 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -75,6 +75,7 @@ import { createTxValidatorForBlockProposalReceivedTxs, createTxValidatorForReqResponseReceivedTxs, } from '../../msg_validators/tx_validator/factory.js'; +import { TxValidationCache } from '../../msg_validators/tx_validator/tx_validation_cache.js'; import { GossipSubEvent } from '../../types/index.js'; import { type PubSubLibp2p, convertToMultiaddr } from '../../util.js'; import { getVersions } from '../../versioning.js'; @@ -198,6 +199,7 @@ export class LibP2PService extends WithTracer implements P2PService { private blockMinFeesProvider: BlockMinFeesProvider, telemetry: TelemetryClient, logger: Logger = createLogger('p2p:libp2p_service'), + private txValidationCache?: TxValidationCache, ) { super(telemetry, 'LibP2PService'); this.telemetry = telemetry; @@ -297,6 +299,7 @@ export class LibP2PService extends WithTracer implements P2PService { telemetry: TelemetryClient; logger: Logger; packageVersion: string; + txValidationCache?: TxValidationCache; }, ) { const { @@ -310,6 +313,7 @@ export class LibP2PService extends WithTracer implements P2PService { telemetry, logger, packageVersion, + txValidationCache, } = deps; const { p2pPort, maxPeerCount, listenAddress } = config; const bindAddrTcp = convertToMultiaddr(listenAddress, p2pPort, 'tcp'); @@ -516,6 +520,7 @@ export class LibP2PService extends WithTracer implements P2PService { blockMinFeesProvider, telemetry, logger, + txValidationCache, ); } @@ -1630,10 +1635,12 @@ export class LibP2PService extends WithTracer implements P2PService { } protected createRequestedTxValidator(): TxValidator { - return createTxValidatorForReqResponseReceivedTxs(this.proofVerifier, { - l1ChainId: this.config.l1ChainId, - rollupVersion: this.config.rollupVersion, - }); + return createTxValidatorForReqResponseReceivedTxs( + this.proofVerifier, + { l1ChainId: this.config.l1ChainId, rollupVersion: this.config.rollupVersion }, + this.logger.getBindings(), + this.txValidationCache, + ); } private getGasFees(): Promise { @@ -1661,6 +1668,7 @@ export class LibP2PService extends WithTracer implements P2PService { this.proofVerifier, { l1ChainId: this.config.l1ChainId, rollupVersion: this.config.rollupVersion }, this.logger.getBindings(), + this.txValidationCache, ); const results = await Promise.all( @@ -1704,12 +1712,17 @@ export class LibP2PService extends WithTracer implements P2PService { maxBlockL2Gas: this.config.validateMaxL2BlockGas, maxBlockDAGas: this.config.validateMaxDABlockGas, }, + this.txValidationCache, ); } /** Creates the second stage (expensive proof verification) validators for gossiped transactions. */ protected createSecondStageMessageValidators(): Record { - return createSecondStageTxValidationsForGossipedTransactions(this.proofVerifier, this.logger.getBindings()); + return createSecondStageTxValidationsForGossipedTransactions( + this.proofVerifier, + this.logger.getBindings(), + this.txValidationCache, + ); } /**