Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 8 additions & 22 deletions yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { MemoryArchiverStore } from './memory_archiver_store/memory_archiver_sto

interface MockRollupContractRead {
archiveAt: (args: readonly [bigint]) => Promise<`0x${string}`>;
getProvenBlockNumber: () => Promise<bigint>;
}

describe('Archiver', () => {
Expand All @@ -40,9 +41,9 @@ describe('Archiver', () => {
let publicClient: MockProxy<PublicClient<HttpTransport, Chain>>;
let instrumentation: MockProxy<ArchiverInstrumentation>;
let archiverStore: ArchiverDataStore;
let proverId: Fr;
let now: number;

let rollupRead: MockProxy<MockRollupContractRead>;
let archiver: Archiver;
let blocks: L2Block[];

Expand All @@ -56,7 +57,6 @@ describe('Archiver', () => {

instrumentation = mock({ isEnabled: () => true });
archiverStore = new MemoryArchiverStore(1000);
proverId = Fr.random();

archiver = new Archiver(
publicClient,
Expand All @@ -70,9 +70,11 @@ describe('Archiver', () => {

blocks = blockNumbers.map(x => L2Block.random(x, 4, x, x + 1, 2, 2));

((archiver as any).rollup as any).read = mock<MockRollupContractRead>({
rollupRead = mock<MockRollupContractRead>({
archiveAt: (args: readonly [bigint]) => Promise.resolve(blocks[Number(args[0] - 1n)].archive.root.toString()),
});

((archiver as any).rollup as any).read = rollupRead;
});

afterEach(async () => {
Expand All @@ -91,9 +93,10 @@ describe('Archiver', () => {
mockGetLogs({
messageSent: [makeMessageSentEvent(98n, 1n, 0n), makeMessageSentEvent(99n, 1n, 1n)],
L2BlockProposed: [makeL2BlockProposedEvent(101n, 1n, blocks[0].archive.root.toString())],
proofVerified: [makeProofVerifiedEvent(102n, 1n, proverId)],
});

rollupRead.getProvenBlockNumber.mockResolvedValueOnce(1n);

mockGetLogs({
messageSent: [
makeMessageSentEvent(2504n, 2n, 0n),
Expand Down Expand Up @@ -175,11 +178,6 @@ describe('Archiver', () => {
// Check getting only proven blocks
expect((await archiver.getBlocks(1, 100)).map(b => b.number)).toEqual([1, 2, 3]);
expect((await archiver.getBlocks(1, 100, true)).map(b => b.number)).toEqual([1]);

// Check instrumentation of proven blocks
expect(instrumentation.processProofsVerified).toHaveBeenCalledWith([
{ delay: 1000n, l1BlockNumber: 102n, l2BlockNumber: 1n, proverId: proverId.toString() },
]);
}, 10_000);

it('does not sync past current block number', async () => {
Expand Down Expand Up @@ -259,12 +257,10 @@ describe('Archiver', () => {
const mockGetLogs = (logs: {
messageSent?: ReturnType<typeof makeMessageSentEvent>[];
L2BlockProposed?: ReturnType<typeof makeL2BlockProposedEvent>[];
proofVerified?: ReturnType<typeof makeProofVerifiedEvent>[];
}) => {
publicClient.getLogs
.mockResolvedValueOnce(logs.messageSent ?? [])
.mockResolvedValueOnce(logs.L2BlockProposed ?? [])
.mockResolvedValueOnce(logs.proofVerified ?? []);
.mockResolvedValueOnce(logs.L2BlockProposed ?? []);
};
});

Expand Down Expand Up @@ -300,16 +296,6 @@ function makeMessageSentEvent(l1BlockNum: bigint, l2BlockNumber: bigint, index:
} as Log<bigint, number, false, undefined, true, typeof InboxAbi, 'MessageSent'>;
}

function makeProofVerifiedEvent(l1BlockNum: bigint, l2BlockNumber: bigint, proverId: Fr) {
return {
blockNumber: l1BlockNum,
args: {
blockNumber: l2BlockNumber,
proverId: proverId.toString(),
},
} as Log<bigint, number, false, undefined, true, typeof RollupAbi, 'L2ProofVerified'>;
}

/**
* Makes a fake rollup tx for testing purposes.
* @param block - The L2Block.
Expand Down
163 changes: 44 additions & 119 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import {
import { createEthereumChain } from '@aztec/ethereum';
import { type ContractArtifact } from '@aztec/foundation/abi';
import { type AztecAddress } from '@aztec/foundation/aztec-address';
import { compactArray, unique } from '@aztec/foundation/collection';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { Fr } from '@aztec/foundation/fields';
import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log';
Expand Down Expand Up @@ -55,12 +54,7 @@ import {

import { type ArchiverDataStore } from './archiver_store.js';
import { type ArchiverConfig } from './config.js';
import {
getL1BlockTime,
retrieveBlockFromRollup,
retrieveL1ToL2Messages,
retrieveL2ProofVerifiedEvents,
} from './data_retrieval.js';
import { retrieveBlockFromRollup, retrieveL1ToL2Messages } from './data_retrieval.js';
import { ArchiverInstrumentation } from './instrumentation.js';
import { type SingletonDataRetrieval } from './structs/data_retrieval.js';

Expand Down Expand Up @@ -206,31 +200,12 @@ export class Archiver implements ArchiveSource {
* This code does not handle reorgs.
*/
const {
blockBodiesSynchedTo = this.l1StartBlock,
blocksSynchedTo = this.l1StartBlock,
messagesSynchedTo = this.l1StartBlock,
provenLogsSynchedTo = this.l1StartBlock,
} = await this.store.getSynchPoint();
const currentL1BlockNumber = await this.publicClient.getBlockNumber();

if (
currentL1BlockNumber <= blocksSynchedTo &&
currentL1BlockNumber <= messagesSynchedTo &&
currentL1BlockNumber <= blockBodiesSynchedTo &&
currentL1BlockNumber <= provenLogsSynchedTo
) {
// chain hasn't moved forward
// or it's been rolled back
this.log.debug(`Nothing to sync`, {
currentL1BlockNumber,
blocksSynchedTo,
messagesSynchedTo,
provenLogsSynchedTo,
blockBodiesSynchedTo,
});
return;
}

// ********** Ensuring Consistency of data pulled from L1 **********

/**
Expand All @@ -250,9 +225,24 @@ export class Archiver implements ArchiveSource {
* in future but for the time being it should give us the guarantees that we need
*/

await this.updateLastProvenL2Block(provenLogsSynchedTo, currentL1BlockNumber);

// ********** Events that are processed per L1 block **********

await this.handleL1ToL2Messages(blockUntilSynced, messagesSynchedTo, currentL1BlockNumber);

// ********** Events that are processed per L2 block **********
await this.handleL2blocks(blockUntilSynced, blocksSynchedTo, currentL1BlockNumber);
}

private async handleL1ToL2Messages(
blockUntilSynced: boolean,
messagesSynchedTo: bigint,
currentL1BlockNumber: bigint,
) {
if (currentL1BlockNumber <= messagesSynchedTo) {
return;
}

const retrievedL1ToL2Messages = await retrieveL1ToL2Messages(
this.inbox,
Expand All @@ -262,14 +252,34 @@ export class Archiver implements ArchiveSource {
);

if (retrievedL1ToL2Messages.retrievedData.length !== 0) {
await this.store.addL1ToL2Messages(retrievedL1ToL2Messages);

this.log.verbose(
`Retrieved ${retrievedL1ToL2Messages.retrievedData.length} new L1 -> L2 messages between L1 blocks ${
messagesSynchedTo + 1n
} and ${currentL1BlockNumber}.`,
);
}
}

await this.store.addL1ToL2Messages(retrievedL1ToL2Messages);
private async updateLastProvenL2Block(provenSynchedTo: bigint, currentL1BlockNumber: bigint) {
if (currentL1BlockNumber <= provenSynchedTo) {
return;
}

const provenBlockNumber = await this.rollup.read.getProvenBlockNumber();
if (provenBlockNumber) {
await this.store.setProvenL2BlockNumber({
retrievedData: Number(provenBlockNumber),
lastProcessedL1BlockNumber: currentL1BlockNumber,
});
}
}

private async handleL2blocks(blockUntilSynced: boolean, blocksSynchedTo: bigint, currentL1BlockNumber: bigint) {
if (currentL1BlockNumber <= blocksSynchedTo) {
return;
}

this.log.debug(`Retrieving blocks from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
const retrievedBlocks = await retrieveBlockFromRollup(
Expand All @@ -281,8 +291,6 @@ export class Archiver implements ArchiveSource {
this.log,
);

// Add the body

(retrievedBlocks.length ? this.log.verbose : this.log.debug)(
`Retrieved ${retrievedBlocks.length || 'no'} new L2 blocks between L1 blocks ${
blocksSynchedTo + 1n
Expand All @@ -298,13 +306,16 @@ export class Archiver implements ArchiveSource {
.join(',')} with last processed L1 block ${lastProcessedL1BlockNumber}`,
);

// If we actually received something, we will use it.
if (retrievedBlocks.length > 0) {
await Promise.all(
retrievedBlocks.map(block => {
const noteEncryptedLogs = block.data.body.noteEncryptedLogs;
const encryptedLogs = block.data.body.encryptedLogs;
const unencryptedLogs = block.data.body.unencryptedLogs;
return this.store.addLogs(noteEncryptedLogs, encryptedLogs, unencryptedLogs, block.data.number);
return this.store.addLogs(
block.data.body.noteEncryptedLogs,
block.data.body.encryptedLogs,
block.data.body.unencryptedLogs,
block.data.number,
);
}),
);

Expand All @@ -321,10 +332,6 @@ export class Archiver implements ArchiveSource {
);

const timer = new Timer();
await this.store.addBlockBodies({
lastProcessedL1BlockNumber: lastProcessedL1BlockNumber,
retrievedData: retrievedBlocks.map(b => b.data.body),
});
await this.store.addBlocks(retrievedBlocks);
this.instrumentation.processNewBlocks(
timer.ms() / retrievedBlocks.length,
Expand All @@ -334,93 +341,11 @@ export class Archiver implements ArchiveSource {
this.log.verbose(`Processed ${retrievedBlocks.length} new L2 blocks up to ${lastL2BlockNumber}`);
}

// Fetch the logs for proven blocks in the block range and update the last proven block number.
if (currentL1BlockNumber > provenLogsSynchedTo) {
await this.updateLastProvenL2Block(provenLogsSynchedTo + 1n, currentL1BlockNumber);
}

if (retrievedBlocks.length > 0 || blockUntilSynced) {
(blockUntilSynced ? this.log.info : this.log.verbose)(`Synced to L1 block ${currentL1BlockNumber}`);
}
}

private async updateLastProvenL2Block(fromBlock: bigint, toBlock: bigint) {
const logs = await retrieveL2ProofVerifiedEvents(this.publicClient, this.rollupAddress, fromBlock, toBlock);
const lastLog = logs[logs.length - 1];
if (!lastLog) {
return;
}

const provenBlockNumber = lastLog.l2BlockNumber;
if (!provenBlockNumber) {
throw new Error(`Missing argument blockNumber from L2ProofVerified event`);
}

await this.emitProofVerifiedMetrics(logs);

const currentProvenBlockNumber = await this.store.getProvenL2BlockNumber();
if (provenBlockNumber > currentProvenBlockNumber) {
// Update the last proven block number
this.log.verbose(`Updated last proven block number from ${currentProvenBlockNumber} to ${provenBlockNumber}`);
await this.store.setProvenL2BlockNumber({
retrievedData: Number(provenBlockNumber),
lastProcessedL1BlockNumber: lastLog.l1BlockNumber,
});
this.instrumentation.updateLastProvenBlock(Number(provenBlockNumber));
} else {
// We set the last processed L1 block number to the last L1 block number in the range to avoid duplicate processing
await this.store.setProvenL2BlockNumber({
retrievedData: Number(currentProvenBlockNumber),
lastProcessedL1BlockNumber: lastLog.l1BlockNumber,
});
}
}

/**
* Emits as metrics the block number proven, who proved it, and how much time passed since it was submitted.
* @param logs - The ProofVerified logs to emit metrics for, as collected from `retrieveL2ProofVerifiedEvents`.
**/
private async emitProofVerifiedMetrics(logs: { l1BlockNumber: bigint; l2BlockNumber: bigint; proverId: Fr }[]) {
if (!logs.length || !this.instrumentation.isEnabled()) {
return;
}

const l1BlockTimes = new Map(
await Promise.all(
unique(logs.map(log => log.l1BlockNumber)).map(
async blockNumber => [blockNumber, await getL1BlockTime(this.publicClient, blockNumber)] as const,
),
),
);

// Collect L2 block times for all the blocks verified, this is the time in which the block proven was
// originally submitted to L1, using the L1 timestamp of the transaction.
const getL2BlockTime = async (blockNumber: bigint) =>
(await this.store.getBlocks(Number(blockNumber), 1))[0]?.l1.timestamp;

const l2BlockTimes = new Map(
await Promise.all(
unique(logs.map(log => log.l2BlockNumber)).map(
async blockNumber => [blockNumber, await getL2BlockTime(blockNumber)] as const,
),
),
);

// Emit the prover id and the time difference between block submission and proof.
this.instrumentation.processProofsVerified(
compactArray(
logs.map(log => {
const l1BlockTime = l1BlockTimes.get(log.l1BlockNumber)!;
const l2BlockTime = l2BlockTimes.get(log.l2BlockNumber);
if (!l2BlockTime) {
return undefined;
}
return { ...log, delay: l1BlockTime - l2BlockTime, proverId: log.proverId.toString() };
}),
),
);
}

/**
* Extracts and stores contract classes out of ContractClassRegistered events emitted by the class registerer contract.
* @param allLogs - All logs emitted in a bunch of blocks.
Expand Down
18 changes: 0 additions & 18 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
type Body,
type EncryptedL2BlockL2Logs,
type EncryptedNoteL2BlockL2Logs,
type FromLogType,
Expand Down Expand Up @@ -33,8 +32,6 @@ import { type L1Published } from './structs/published.js';
export type ArchiverL1SynchPoint = {
/** Number of the last L1 block that added a new L2 block metadata. */
blocksSynchedTo?: bigint;
/** Number of the last L1 block that added a new L2 block body. */
blockBodiesSynchedTo?: bigint;
/** Number of the last L1 block that added L1 -> L2 messages from the Inbox. */
messagesSynchedTo?: bigint;
/** Number of the last L1 block that added a new proven block. */
Expand All @@ -53,21 +50,6 @@ export interface ArchiverDataStore {
*/
addBlocks(blocks: L1Published<L2Block>[]): Promise<boolean>;

/**
* Append new block bodies to the store's list.
* @param blockBodies - The L2 block bodies to be added to the store.
* @returns True if the operation is successful.
*/
addBlockBodies(blockBodies: DataRetrieval<Body>): Promise<boolean>;

/**
* Gets block bodies that have the same txsEffectsHashes as we supply.
*
* @param txsEffectsHashes - A list of txsEffectsHashes.
* @returns The requested L2 block bodies
*/
getBlockBodies(txsEffectsHashes: Buffer[]): Promise<(Body | undefined)[]>;

/**
* Gets up to `limit` amount of L2 blocks starting from `from`.
* @param from - Number of the first block to return (inclusive).
Expand Down
Loading