Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ import {
FullNodeCheckpointsBuilder,
NodeKeystoreAdapter,
ValidatorClient,
createBlockProposalHandler,
createProposalHandler,
createValidatorClient,
} from '@aztec/validator-client';
import { createWorldStateSynchronizer } from '@aztec/world-state';
Expand Down Expand Up @@ -390,19 +390,21 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
}
}

// If there's no validator client, create a BlockProposalHandler to handle block proposals
// If there's no validator client, create a ProposalHandler to handle block and checkpoint proposals
// for monitoring or reexecution. Reexecution (default) allows us to follow the pending chain,
// while non-reexecution is used for validating the proposals and collecting their txs.
// Checkpoint proposals are handled if the blob client can upload blobs.
if (!validatorClient) {
const reexecute = !!config.alwaysReexecuteBlockProposals;
log.info(`Setting up block proposal handler` + (reexecute ? ' with reexecution of proposals' : ''));
createBlockProposalHandler(config, {
log.info(`Setting up proposal handler` + (reexecute ? ' with reexecution of proposals' : ''));
createProposalHandler(config, {
checkpointsBuilder: validatorCheckpointsBuilder,
worldState: worldStateSynchronizer,
epochCache,
blockSource: archiver,
l1ToL2MessageSource: archiver,
p2pClient,
blobClient,
dateProvider,
telemetry,
}).register(p2pClient, reexecute);
Expand Down
38 changes: 38 additions & 0 deletions yarn-project/foundation/src/collection/array.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
mean,
median,
partition,
partitionAsync,
removeArrayPaddingEnd,
stdDev,
times,
Expand Down Expand Up @@ -380,3 +381,40 @@ describe('partition', () => {
expect(odd).toEqual([{ a: 1 }, { a: 3 }]);
});
});

describe('partitionAsync', () => {
it('partitions an array into pass and fail arrays based on the predicate', async () => {
const input = [1, 2, 3, 4, 5];
const [even, odd] = await partitionAsync(input, x => Promise.resolve(x % 2 === 0));
expect(even).toEqual([2, 4]);
expect(odd).toEqual([1, 3, 5]);
});

it('returns all items in the first array if all pass the predicate', async () => {
const input = [2, 4, 6];
const [pass, fail] = await partitionAsync(input, x => Promise.resolve(x % 2 === 0));
expect(pass).toEqual([2, 4, 6]);
expect(fail).toEqual([]);
});

it('returns all items in the second array if none pass the predicate', async () => {
const input = [1, 3, 5];
const [pass, fail] = await partitionAsync(input, x => Promise.resolve(x % 2 === 0));
expect(pass).toEqual([]);
expect(fail).toEqual([1, 3, 5]);
});

it('handles an empty array', async () => {
const input: number[] = [];
const [pass, fail] = await partitionAsync(input, x => Promise.resolve(x > 0));
expect(pass).toEqual([]);
expect(fail).toEqual([]);
});

it('works with objects and custom predicates', async () => {
const input = [{ a: 1 }, { a: 2 }, { a: 3 }];
const [even, odd] = await partitionAsync(input, obj => Promise.resolve(obj.a % 2 === 0));
expect(even).toEqual([{ a: 2 }]);
expect(odd).toEqual([{ a: 1 }, { a: 3 }]);
});
});
14 changes: 14 additions & 0 deletions yarn-project/foundation/src/collection/array.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,17 @@ export function partition<T>(items: T[], predicate: (item: T) => boolean): [T[],
}
return [pass, fail];
}

/** Partitions the given iterable into two arrays based on the predicate. */
export async function partitionAsync<T>(items: T[], predicate: (item: T) => Promise<boolean>): Promise<[T[], T[]]> {
const pass: T[] = [];
const fail: T[] = [];
for (const item of items) {
if (await predicate(item)) {
pass.push(item);
} else {
fail.push(item);
}
}
return [pass, fail];
}
13 changes: 10 additions & 3 deletions yarn-project/p2p/src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import { AttestationPool, type AttestationPoolApi } from '../mem_pools/attestati
import type { MemPools } from '../mem_pools/interface.js';
import type { TxPoolV2 } from '../mem_pools/tx_pool_v2/interfaces.js';
import { AztecKVTxPoolV2 } from '../mem_pools/tx_pool_v2/tx_pool_v2.js';
import { createTxValidatorForTransactionsEnteringPendingTxPool } from '../msg_validators/index.js';
import {
createTxValidatorForReqResponseReceivedTxs,
createTxValidatorForTransactionsEnteringPendingTxPool,
} from '../msg_validators/index.js';
import { DummyP2PService } from '../services/dummy_service.js';
import { LibP2PService } from '../services/index.js';
import { createFileStoreTxSources } from '../services/tx_collection/file_store_tx_source.js';
Expand Down Expand Up @@ -130,9 +133,12 @@ export async function createP2PClient(
telemetry,
);

const txValidatorForTxCollection = createTxValidatorForReqResponseReceivedTxs(proofVerifier, config);
const nodeSources = [
...createNodeRpcTxSources(config.txCollectionNodeRpcUrls, config),
...(deps.rpcTxProviders ?? []).map((node, i) => new NodeRpcTxSource(node, `node-rpc-provider-${i}`)),
...createNodeRpcTxSources(config.txCollectionNodeRpcUrls, txValidatorForTxCollection, config),
...(deps.rpcTxProviders ?? []).map(
(node, i) => new NodeRpcTxSource(node, txValidatorForTxCollection, `node-rpc-provider-${i}`),
),
...(deps.txCollectionNodeSources ?? []),
];
if (nodeSources.length > 0) {
Expand All @@ -144,6 +150,7 @@ export async function createP2PClient(
const fileStoreSources = await createFileStoreTxSources(
config.txCollectionFileStoreUrls,
txFileStoreBasePath,
txValidatorForTxCollection,
logger.createChild('file-store-tx-source'),
telemetry,
);
Expand Down
74 changes: 43 additions & 31 deletions yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { partitionAsync } from '@aztec/foundation/collection';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { Timer } from '@aztec/foundation/timer';
import { type ReadOnlyFileStore, createReadOnlyFileStore } from '@aztec/stdlib/file-store';
import { Tx, type TxHash } from '@aztec/stdlib/tx';
import { Tx, type TxHash, type TxValidator } from '@aztec/stdlib/tx';
import {
type Histogram,
Metrics,
Expand All @@ -23,6 +24,7 @@ export class FileStoreTxSource implements TxSource {
private readonly fileStore: ReadOnlyFileStore,
private readonly baseUrl: string,
private readonly basePath: string,
private readonly txValidator: TxValidator,
private readonly log: Logger,
telemetry: TelemetryClient,
) {
Expand All @@ -44,6 +46,7 @@ export class FileStoreTxSource implements TxSource {
public static async create(
url: string,
basePath: string,
txValidator: TxValidator,
log: Logger = createLogger('p2p:file_store_tx_source'),
telemetry: TelemetryClient = getTelemetryClient(),
): Promise<FileStoreTxSource | undefined> {
Expand All @@ -53,7 +56,7 @@ export class FileStoreTxSource implements TxSource {
log.warn(`Failed to create file store for URL: ${url}`);
return undefined;
}
return new FileStoreTxSource(fileStore, url, basePath, log, telemetry);
return new FileStoreTxSource(fileStore, url, basePath, txValidator, log, telemetry);
} catch (err) {
log.warn(`Error creating file store for URL: ${url}`, { error: err });
return undefined;
Expand All @@ -65,35 +68,41 @@ export class FileStoreTxSource implements TxSource {
}

public async getTxsByHash(txHashes: TxHash[]): Promise<TxSourceCollectionResult> {
const invalidTxHashes: string[] = [];
const results = await Promise.all(
txHashes.map(async txHash => {
const path = `${this.basePath}/txs/${txHash.toString()}.bin`;
const timer = new Timer();
try {
const buffer = await this.fileStore.read(path);
const tx = Tx.fromBuffer(buffer);
return { tx, downloadDuration: timer.ms(), downloadSize: buffer.length };
} catch {
this.downloadsFailed.add(1);
return undefined;
}
}),
);

const txs = results.filter(tx => tx !== undefined);
const [validTxs, invalidTxs] = await partitionAsync(
txs,
async ({ tx, downloadDuration, downloadSize }): Promise<boolean> => {
const valid = await this.txValidator.validateTx(tx);
if (valid.result === 'valid') {
this.downloadsSuccess.add(1);
this.downloadDuration.record(Math.ceil(downloadDuration));
this.downloadSize.record(downloadSize);
return true;
} else {
this.downloadsFailed.add(1);
return false;
}
},
);

return {
validTxs: (
await Promise.all(
txHashes.map(async txHash => {
const path = `${this.basePath}/txs/${txHash.toString()}.bin`;
const timer = new Timer();
try {
const buffer = await this.fileStore.read(path);
const tx = Tx.fromBuffer(buffer);
if ((await tx.validateTxHash()) && txHash.equals(tx.txHash)) {
this.downloadsSuccess.add(1);
this.downloadDuration.record(Math.ceil(timer.ms()));
this.downloadSize.record(buffer.length);
return tx;
} else {
invalidTxHashes.push(tx.txHash.toString());
this.downloadsFailed.add(1);
return undefined;
}
} catch {
// Tx not found or error reading - return undefined
this.downloadsFailed.add(1);
return undefined;
}
}),
)
).filter(tx => tx !== undefined),
invalidTxHashes: invalidTxHashes,
validTxs: validTxs.map(({ tx }) => tx),
invalidTxHashes: invalidTxs.map(({ tx }) => tx.getTxHash().toString()),
};
}
}
Expand All @@ -109,9 +118,12 @@ export class FileStoreTxSource implements TxSource {
export async function createFileStoreTxSources(
urls: string[],
basePath: string,
txValidator: TxValidator,
log: Logger = createLogger('p2p:file_store_tx_source'),
telemetry: TelemetryClient = getTelemetryClient(),
): Promise<FileStoreTxSource[]> {
const sources = await Promise.all(urls.map(url => FileStoreTxSource.create(url, basePath, log, telemetry)));
const sources = await Promise.all(
urls.map(url => FileStoreTxSource.create(url, basePath, txValidator, log, telemetry)),
);
return sources.filter((s): s is FileStoreTxSource => s !== undefined);
}
62 changes: 62 additions & 0 deletions yarn-project/p2p/src/services/tx_collection/tx_source.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import type { AztecNode } from '@aztec/stdlib/interfaces/client';
import { Tx, type TxValidator } from '@aztec/stdlib/tx';

import { type MockProxy, mock } from 'jest-mock-extended';

import { NodeRpcTxSource } from './tx_source.js';

describe('NodeRpcTxSource', () => {
let mockClient: MockProxy<Pick<AztecNode, 'getTxsByHash'>>;
let mockValidator: MockProxy<TxValidator>;

const makeTx = async () => {
const tx = Tx.random();
await tx.recomputeHash();
return tx;
};

beforeEach(() => {
mockClient = mock<Pick<AztecNode, 'getTxsByHash'>>();
mockValidator = mock<TxValidator>();
mockValidator.validateTx.mockResolvedValue({ result: 'valid' });
});

const createSource = () => new NodeRpcTxSource(mockClient, mockValidator, 'test');

it('returns valid txs when validator accepts', async () => {
const tx1 = await makeTx();
const tx2 = await makeTx();
mockClient.getTxsByHash.mockResolvedValue([tx1, tx2]);

const result = await createSource().getTxsByHash([tx1.getTxHash(), tx2.getTxHash()]);

expect(result.validTxs).toHaveLength(2);
expect(result.invalidTxHashes).toHaveLength(0);
});

it('returns invalid tx hashes when validator rejects', async () => {
const tx1 = await makeTx();
const tx2 = await makeTx();
mockClient.getTxsByHash.mockResolvedValue([tx1, tx2]);
mockValidator.validateTx.mockResolvedValue({ result: 'invalid', reason: ['bad'] });

const result = await createSource().getTxsByHash([tx1.getTxHash(), tx2.getTxHash()]);

expect(result.validTxs).toHaveLength(0);
expect(result.invalidTxHashes).toEqual([tx1.getTxHash().toString(), tx2.getTxHash().toString()]);
});

it('partitions txs based on validator result', async () => {
const tx1 = await makeTx();
const tx2 = await makeTx();
mockClient.getTxsByHash.mockResolvedValue([tx1, tx2]);
mockValidator.validateTx
.mockResolvedValueOnce({ result: 'valid' })
.mockResolvedValueOnce({ result: 'invalid', reason: ['bad'] });

const result = await createSource().getTxsByHash([tx1.getTxHash(), tx2.getTxHash()]);

expect(result.validTxs).toEqual([tx1]);
expect(result.invalidTxHashes).toEqual([tx2.getTxHash().toString()]);
});
});
15 changes: 8 additions & 7 deletions yarn-project/p2p/src/services/tx_collection/tx_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { getVKTreeRoot } from '@aztec/noir-protocol-circuits-types/vk-tree';
import { protocolContractsHash } from '@aztec/protocol-contracts';
import type { ChainConfig } from '@aztec/stdlib/config';
import { type AztecNode, createAztecNodeClient } from '@aztec/stdlib/interfaces/client';
import type { Tx, TxHash } from '@aztec/stdlib/tx';
import type { Tx, TxHash, TxValidator } from '@aztec/stdlib/tx';
import { type ComponentsVersions, getComponentsVersionsFromConfig } from '@aztec/stdlib/versioning';
import { makeTracedFetch } from '@aztec/telemetry-client';

Expand All @@ -16,12 +16,13 @@ export interface TxSource {
export class NodeRpcTxSource implements TxSource {
constructor(
private readonly client: Pick<AztecNode, 'getTxsByHash'>,
private readonly txValidator: TxValidator,
private readonly info: string,
) {}

public static fromUrl(nodeUrl: string, versions: ComponentsVersions): NodeRpcTxSource {
public static fromUrl(nodeUrl: string, txValidator: TxValidator, versions: ComponentsVersions): NodeRpcTxSource {
const client = createAztecNodeClient(nodeUrl, versions, makeTracedFetch([1, 2, 3], false));
return new NodeRpcTxSource(client, nodeUrl);
return new NodeRpcTxSource(client, txValidator, nodeUrl);
}

public getInfo() {
Expand All @@ -38,8 +39,8 @@ export class NodeRpcTxSource implements TxSource {
const invalidTxHashes: string[] = [];
await Promise.all(
txs.map(async tx => {
const isValid = await tx.validateTxHash();
if (isValid) {
const validation = await this.txValidator.validateTx(tx);
if (validation.result === 'valid') {
validTxs.push(tx);
} else {
invalidTxHashes.push(tx.getTxHash().toString());
Expand All @@ -50,7 +51,7 @@ export class NodeRpcTxSource implements TxSource {
}
}

export function createNodeRpcTxSources(urls: string[], chainConfig: ChainConfig) {
export function createNodeRpcTxSources(urls: string[], txValidator: TxValidator, chainConfig: ChainConfig) {
const versions = getComponentsVersionsFromConfig(chainConfig, protocolContractsHash, getVKTreeRoot());
return urls.map(url => NodeRpcTxSource.fromUrl(url, versions));
return urls.map(url => NodeRpcTxSource.fromUrl(url, txValidator, versions));
}
Loading
Loading