From 7b885165eb46a5492620aad1392f4ee30dd0bbbc Mon Sep 17 00:00:00 2001 From: Phil Windle Date: Tue, 9 Jun 2026 17:04:12 +0000 Subject: [PATCH] refactor(prover-node): address review feedback on checkpoint-store redesign - prover-node: advance the local tips store only after block-stream handling succeeds, and propagate checkpoint registration failures so failed events are retried by the L2BlockStream - prover-node: skip checkpoints for epochs past their proof-submission window - prover-node: bound the publishing-service shutdown with a timeout - prover-node: expireEpoch fetches blocks via getBlocks({ epoch }) - session-manager: compare archiver coverage by content-addressed id (not number), reuse a live partial session whose content matches, and refuse to re-prove an epoch the proven chain already encompasses - session-manager: collapse the redundant proving/finalization delays into one - startProof: return the job id without awaiting completion (callers poll getJobs) - proof-publishing-service: wire the candidate deadline into l1-tx-utils - prover-node-publisher: drop the now-redundant wait-for-proven loop - orchestrator: abort the sub-tree when the chonk verifier proof fails, and cancel the proving state on sub-tree cancel for parity with the top tree - orchestrator: name BaseRollupHintsWithoutProofAndVK and correct the proving-scheduler throttle docs - e2e: anchor the mid-epoch reorg test on a fresh epoch and assert it exercises in-epoch checkpoint removal (N checkpoints, remove the last, prove N-1) --- ...epochs_optimistic_proving.parallel.test.ts | 48 ++++++- .../checkpoint-sub-tree-orchestrator.ts | 36 ++++- .../src/orchestrator/proving-scheduler.ts | 20 ++- yarn-project/prover-node/src/config.ts | 3 +- .../src/proof-publishing-service.ts | 2 + .../src/prover-node-publisher.test.ts | 76 ----------- .../prover-node/src/prover-node-publisher.ts | 58 +------- .../prover-node/src/prover-node.test.ts | 50 ++++--- yarn-project/prover-node/src/prover-node.ts | 118 ++++++++++------ .../prover-node/src/session-manager.test.ts | 128 +++++++++--------- .../prover-node/src/session-manager.ts | 94 ++++++++----- .../stdlib/src/interfaces/prover-node.test.ts | 4 +- .../stdlib/src/interfaces/prover-node.ts | 9 +- 13 files changed, 337 insertions(+), 309 deletions(-) diff --git a/yarn-project/end-to-end/src/e2e_epochs/epochs_optimistic_proving.parallel.test.ts b/yarn-project/end-to-end/src/e2e_epochs/epochs_optimistic_proving.parallel.test.ts index 969a545f2f34..27c0e2dd2e97 100644 --- a/yarn-project/end-to-end/src/e2e_epochs/epochs_optimistic_proving.parallel.test.ts +++ b/yarn-project/end-to-end/src/e2e_epochs/epochs_optimistic_proving.parallel.test.ts @@ -55,6 +55,23 @@ describe('e2e_epochs/epochs_optimistic_proving', () => { return BlockNumber(cp.startBlock + cp.blockCount - 1); }; + /** + * Returns the canonical checkpoint numbers that fall within `epoch`, considering checkpoints + * `1..upTo`. Retries until the archiver has indexed the whole range so the count is stable. + */ + const checkpointsInEpoch = async (epoch: EpochNumber, upTo: CheckpointNumber): Promise => { + const cps = await retryUntil( + async () => { + const all = await node.getCheckpoints(CheckpointNumber(1), Number(upTo)); + return all.length >= Number(upTo) ? all : undefined; + }, + `archiver indexes checkpoints up to ${upTo}`, + 30, + 0.2, + ); + return cps.filter(cp => getEpochAtSlot(cp.header.slotNumber, test.constants) === epoch).map(cp => cp.number); + }; + /** * Background sampler proving the prover-node works an epoch *optimistically* — i.e. it * spawns a checkpoint's sub-tree before the epoch is over on L1, not just after the @@ -463,6 +480,12 @@ describe('e2e_epochs/epochs_optimistic_proving', () => { }); it('removes a checkpoint mid-epoch via reorg and proves with survivors', async () => { + // Anchor on a freshly-started epoch so the checkpoints we reorg over (and the survivor) + // are guaranteed to live in the same epoch. Without this, setup landing near an epoch + // boundary could leave the survivor in the previous epoch, passing the test without + // actually exercising in-epoch checkpoint removal (see #22990). + await test.waitUntilNextEpochStarts(); + // Wait for 2 checkpoints mid-epoch. const initialCheckpoint = (await test.monitor.run(true)).checkpointNumber; const midCheckpoint = CheckpointNumber(initialCheckpoint + 2); @@ -470,6 +493,15 @@ describe('e2e_epochs/epochs_optimistic_proving', () => { const checkpointBeforeReorg = test.monitor.checkpointNumber; logger.info(`Reached checkpoint ${checkpointBeforeReorg}`); + // Capture the epoch we're reorging within so we can assert the survivor stays in it. + const epochBeforeReorg = await epochOfCheckpoint(checkpointBeforeReorg); + + // (1) The epoch must hold multiple checkpoints, with checkpointBeforeReorg as its latest — + // otherwise removing the last one wouldn't leave any in-epoch survivors to prove with. + const epochCheckpointsBeforeReorg = await checkpointsInEpoch(epochBeforeReorg, checkpointBeforeReorg); + expect(epochCheckpointsBeforeReorg.length).toBeGreaterThanOrEqual(2); + expect(epochCheckpointsBeforeReorg.at(-1)).toEqual(checkpointBeforeReorg); + // Stop block production so no replacement is proposed. await context.aztecNodeAdmin!.setConfig({ skipPublishingCheckpointsPercent: 100 }); @@ -478,7 +510,8 @@ describe('e2e_epochs/epochs_optimistic_proving', () => { await context.cheatCodes.eth.reorgWithReplacement(1); const afterReorgCheckpoint = (await test.monitor.run(true)).checkpointNumber; - expect(afterReorgCheckpoint).toBeLessThan(checkpointBeforeReorg); + // (2) The reorg removed exactly the last checkpoint, leaving N-1. + expect(afterReorgCheckpoint).toEqual(CheckpointNumber(checkpointBeforeReorg - 1)); logger.info(`After reorg: checkpoint ${afterReorgCheckpoint} (was ${checkpointBeforeReorg})`); // Verify node detects the reorg. @@ -489,12 +522,21 @@ describe('e2e_epochs/epochs_optimistic_proving', () => { 0.5, ); - // Wait for the epoch to end and proof to land with the surviving checkpoints. - // Use the surviving checkpoint to look up which epoch we're in. + // The survivor must still be in the epoch we reorged within — otherwise the reorg removed + // the only in-epoch checkpoint and the test isn't exercising mid-epoch removal. const currentEpoch = await epochOfCheckpoint(afterReorgCheckpoint); + expect(currentEpoch).toEqual(epochBeforeReorg); + + // The epoch now holds exactly N-1 checkpoints — the survivors of the removal. + const survivingCheckpoints = await checkpointsInEpoch(epochBeforeReorg, afterReorgCheckpoint); + expect(survivingCheckpoints.length).toEqual(epochCheckpointsBeforeReorg.length - 1); + expect(survivingCheckpoints.at(-1)).toEqual(afterReorgCheckpoint); + + // Wait for the epoch to end and proof to land with the surviving checkpoints. await test.waitUntilEpochStarts(currentEpoch + 1); const epochEndCheckpoint = (await test.monitor.run(true)).checkpointNumber; + // (3) The epoch proved up to and including the last surviving checkpoint (the (N-1)th). expect(epochEndCheckpoint).toEqual(afterReorgCheckpoint); await test.waitUntilProvenCheckpointNumber(epochEndCheckpoint, 240); diff --git a/yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.ts b/yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.ts index d29a98d094a4..55e22e1247cc 100644 --- a/yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.ts +++ b/yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.ts @@ -8,6 +8,7 @@ import { } from '@aztec/constants'; import { BlockNumber, type EpochNumber } from '@aztec/foundation/branded-types'; import { Fr } from '@aztec/foundation/curves/bn254'; +import { AbortError } from '@aztec/foundation/error'; import type { LoggerBindings } from '@aztec/foundation/log'; import { type PromiseWithResolvers, promiseWithResolvers } from '@aztec/foundation/promise'; import type { SerialQueue } from '@aztec/foundation/queue'; @@ -83,6 +84,17 @@ export type SubTreeResult = { type TreeSnapshots = Map; +/** + * Base rollup hints as produced before proving: `PrivateBaseRollupHints` / `PublicBaseRollupHints` + * deliberately carry no recursive proof or verification key. The proof + VK are supplied later, when + * `TxProvingState.getBaseRollupTypeAndInputs` wraps these hints into the "with proof + VK" types — + * `PrivateTxBaseRollupPrivateInputs` (a `ChonkProofData`) or `PublicTxBaseRollupPrivateInputs` (a + * chonk-verifier proof + AVM proof). Those proofs are *required constructor arguments* of the wrapper + * types, so the only way to obtain a provable input is to populate them — they cannot be silently + * omitted. Naming the proof-less hints type here makes that boundary explicit at `prepareBaseRollupInputs`. + */ +type BaseRollupHintsWithoutProofAndVK = BaseRollupHints; + /** * Orchestrates block-level proving for a single checkpoint, stopping at the boundary * where checkpoint root rollup would otherwise begin. Used by the per-checkpoint @@ -457,6 +469,9 @@ export class CheckpointSubTreeOrchestrator extends ProvingScheduler { public cancel() { this.cancelled = true; this.resetSchedulerState(this.cancelJobsOnStop); + // Reject the proving state (and hence subTreeResult) so anyone awaiting the sub-tree result + // is released rather than hanging — matching TopTreeOrchestrator.cancel(). + this.provingState?.cancel(); for (const [blockNumber, db] of this.dbs.entries()) { void db.close().catch(err => this.logger.error(`Error closing db for block ${blockNumber}`, err)); @@ -556,9 +571,11 @@ export class CheckpointSubTreeOrchestrator extends ProvingScheduler { newL1ToL2MessageTreeSnapshot: AppendOnlyTreeSnapshot, startSpongeBlob: SpongeBlob, db: MerkleTreeWriteOperations, - ): Promise<[BaseRollupHints, TreeSnapshots]> { - // We build the base rollup inputs using a mock proof and verification key. - // These will be overwritten later once we have proven the chonk verifier circuit and any public kernels. + ): Promise<[BaseRollupHintsWithoutProofAndVK, TreeSnapshots]> { + // These hints deliberately carry no recursive proof or verification key — see + // BaseRollupHintsWithoutProofAndVK. The tx's proof + VK are attached later in + // TxProvingState.getBaseRollupTypeAndInputs from the proven chonk-verifier / kernel / AVM + // proofs, which are required there and so cannot be silently omitted. const start = performance.now(); const hints = await insertSideEffectsAndBuildBaseRollupHints( tx, @@ -660,9 +677,16 @@ export class CheckpointSubTreeOrchestrator extends ProvingScheduler { this.epochNumber, ), ); - void promise.then(handleResult).catch(() => { - // The cache self-cleans on rejection; a future call (replacement sub-tree - // for this tx) will see the miss and re-enqueue. No action needed here. + void promise.then(handleResult).catch(err => { + // The cache self-cleans on rejection, so a replacement sub-tree for this tx will see the + // miss and re-enqueue. But if this proving state is still active, the failure must abort + // it: otherwise the base rollup for this tx is never enqueued and the checkpoint (and + // epoch) orchestrators hang forever waiting for a proof that will never arrive. + if (err instanceof AbortError || !provingState.verifyState()) { + return; + } + this.logger.error(`Chonk verifier proof failed for tx ${txHash}`, err); + provingState.reject(`Chonk verifier proof failed for tx ${txHash}: ${err}`); }); } diff --git a/yarn-project/prover-client/src/orchestrator/proving-scheduler.ts b/yarn-project/prover-client/src/orchestrator/proving-scheduler.ts index 821c53d76e94..dd608474da94 100644 --- a/yarn-project/prover-client/src/orchestrator/proving-scheduler.ts +++ b/yarn-project/prover-client/src/orchestrator/proving-scheduler.ts @@ -19,10 +19,16 @@ export interface ProvingStateLike { * Common scheduling infrastructure shared by every orchestrator that drives broker * proving jobs: * - * - A shared `SerialQueue` (`deferredJobQueue`) acting as the enqueue-throttle. The - * queue is owned by the `ProverClient` and shared across every orchestrator (every - * sub-tree and top-tree across every concurrent epoch session), so the total rate - * of job submission to the broker is bounded once, not once-per-orchestrator. + * - A shared `SerialQueue` (`deferredJobQueue`) that serialises the *act of handing a + * job to the broker*, one initiation per event-loop tick. Each queue task kicks off + * `safeJob` (which submits to the broker) without awaiting it, then yields with + * `sleep(0)`; the next task therefore runs on the following macrotask. This does NOT + * cap how many broker jobs are concurrently in flight — the broker's own queue absorbs + * that. What it bounds is the burst rate: a sub-tree that synchronously discovers + * thousands of ready jobs can't flood the broker (and monopolise the event loop) in a + * single tick. The queue is owned by the `ProverClient` and shared across every + * orchestrator (every sub-tree and top-tree across every concurrent epoch session), so + * this pacing is applied once globally rather than once-per-orchestrator. * - A list of `AbortController`s (`pendingProvingJobs`) so a `cancel()` can abort * in-flight broker jobs when needed. * - A `deferredProving(state, request, callback, isCancelled?)` method that wraps @@ -143,9 +149,11 @@ export abstract class ProvingScheduler { }; void this.deferredJobQueue.put(async () => { + // Kick off the broker submission without awaiting it — awaiting here would serialise all + // proving (one job at a time) and kill parallelism. The `sleep(0)` yields the event loop + // so the next queued job initiates on the following macrotask, pacing bursts rather than + // bounding in-flight broker concurrency (the broker's own queue handles that). void safeJob(); - // Yield to the macrotask queue so Node has a chance to interleave other work - // between enqueues. await sleep(0); }); } diff --git a/yarn-project/prover-node/src/config.ts b/yarn-project/prover-node/src/config.ts index 61a39d5d90e2..304ce7a9c1a5 100644 --- a/yarn-project/prover-node/src/config.ts +++ b/yarn-project/prover-node/src/config.ts @@ -68,7 +68,8 @@ export const specificProverNodeConfigMappings: ConfigMappingsType { }, ); - it('waits until the proven checkpoint reaches the checkpoint before the proof start', async () => { - const checkpoints = Array.from({ length: 100 }, () => RootRollupPublicInputs.random()); - const fromCheckpoint = CheckpointNumber(33); - const toCheckpoint = CheckpointNumber(64); - - rollup.getTips - .mockResolvedValueOnce({ - pending: CheckpointNumber(65), - proven: CheckpointNumber(31), - }) - .mockResolvedValueOnce({ - pending: CheckpointNumber(65), - proven: CheckpointNumber(32), - }) - .mockResolvedValue({ - pending: CheckpointNumber(65), - proven: CheckpointNumber(32), - }); - rollup.getRollupConstants.mockResolvedValue({ - l1StartBlock: 0n, - l1GenesisTime: BigInt(Math.floor(Date.now() / 1000)), - slotDuration: 1, - epochDuration: 1, - proofSubmissionEpochs: 100, - targetCommitteeSize: 48, - rollupManaLimit: Number.MAX_SAFE_INTEGER, - }); - - rollup.getCheckpoint.mockImplementation((checkpointNumber: CheckpointNumber) => - Promise.resolve({ - archive: checkpoints[checkpointNumber - 1].endArchiveRoot, - attestationsHash: Buffer32.ZERO, - payloadDigest: Buffer32.ZERO, - headerHash: Buffer32.ZERO, - blobCommitmentsHash: Buffer32.ZERO, - outHash: '0x', - slotNumber: SlotNumber(0), - feeHeader: { - excessMana: 0n, - manaUsed: 0n, - ethPerFeeAsset: 0n, - congestionCost: 0n, - proverCost: 0n, - }, - }), - ); - - const ourPublicInputs = RootRollupPublicInputs.random(); - ourPublicInputs.previousArchiveRoot = checkpoints[fromCheckpoint - 2].endArchiveRoot; - ourPublicInputs.endArchiveRoot = checkpoints[toCheckpoint - 1].endArchiveRoot; - - const ourBatchedBlob = new BatchedBlob( - ourPublicInputs.blobPublicInputs.blobCommitmentsHash, - ourPublicInputs.blobPublicInputs.z, - ourPublicInputs.blobPublicInputs.y, - ourPublicInputs.blobPublicInputs.c, - ourPublicInputs.blobPublicInputs.c.negate(), - ); - - rollup.getEpochProofPublicInputs.mockResolvedValue(ourPublicInputs.toFields()); - - await publisher.submitEpochProof({ - epochNumber: EpochNumber(2), - fromCheckpoint, - toCheckpoint, - publicInputs: ourPublicInputs, - proof: Proof.empty(), - batchedBlobInputs: ourBatchedBlob, - attestations: [], - }); - - expect(rollup.getRollupConstants).toHaveBeenCalled(); - expect(rollup.getTips).toHaveBeenCalledTimes(3); - expect(l1Utils.sendAndMonitorTransaction).toHaveBeenCalled(); - }); - it('analyzeEpochProofSubmission validates, estimates, and does not send tx', async () => { const fromCheckpoint = 33; const toCheckpoint = 64; diff --git a/yarn-project/prover-node/src/prover-node-publisher.ts b/yarn-project/prover-node/src/prover-node-publisher.ts index 20f50e57f93d..67d9ff1d3ace 100644 --- a/yarn-project/prover-node/src/prover-node-publisher.ts +++ b/yarn-project/prover-node/src/prover-node-publisher.ts @@ -8,13 +8,11 @@ import { areArraysEqual } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/curves/bn254'; import { EthAddress } from '@aztec/foundation/eth-address'; import { type Logger, type LoggerBindings, createLogger } from '@aztec/foundation/log'; -import { retryUntil } from '@aztec/foundation/retry'; import type { Tuple } from '@aztec/foundation/serialize'; import { Timer } from '@aztec/foundation/timer'; import { RollupAbi } from '@aztec/l1-artifacts'; import type { PublisherConfig, TxSenderConfig } from '@aztec/sequencer-client'; import { CommitteeAttestation, CommitteeAttestationsAndSigners } from '@aztec/stdlib/block'; -import { getProofSubmissionDeadlineTimestamp } from '@aztec/stdlib/epoch-helpers'; import type { Proof } from '@aztec/stdlib/proofs'; import type { FeeRecipient, RootRollupPublicInputs } from '@aztec/stdlib/rollup'; import type { L1PublishProofStats } from '@aztec/stdlib/stats'; @@ -80,15 +78,12 @@ export class ProverNodePublisher { proof: Proof; batchedBlobInputs: BatchedBlob; attestations: ViemCommitteeAttestation[]; + /** Wall-clock deadline (proof-submission window end) past which the L1 tx should stop retrying. */ + deadline?: Date; }): Promise { const { epochNumber, fromCheckpoint, toCheckpoint } = args; const ctx = { epochNumber, fromCheckpoint, toCheckpoint }; - if (!(await this.waitUntilStartBuildsOnProven(args))) { - this.log.verbose('Timed out waiting for proven checkpoint to reach proof start', ctx); - return false; - } - const timer = new Timer(); // Validate epoch proof range and hashes are correct before submitting await this.validateEpochProofSubmission(args); @@ -132,49 +127,6 @@ export class ProverNodePublisher { return false; } - private async waitUntilStartBuildsOnProven(args: { epochNumber: EpochNumber; fromCheckpoint: CheckpointNumber }) { - const { epochNumber, fromCheckpoint } = args; - const provenCheckpoint = await this.getProvenCheckpoint(); - if (this.isStartBuildingOnProven(fromCheckpoint, provenCheckpoint)) { - return true; - } - - const timeout = await this.getSecondsUntilProofSubmissionWindowEnd(epochNumber); - this.log.info(`Waiting for proven checkpoint to reach proof start`, { - epochNumber, - fromCheckpoint, - provenCheckpoint, - timeout, - }); - - await retryUntil( - async () => { - const proven = await this.getProvenCheckpoint(); - this.log.verbose(`Proven checkpoint is at ${proven} (waiting for ${fromCheckpoint - 1})`, { epochNumber }); - return this.isStartBuildingOnProven(fromCheckpoint, proven) ? true : undefined; - }, - `proven checkpoint to reach ${fromCheckpoint - 1}`, - timeout, - 4, - ); - - return true; - } - - private async getProvenCheckpoint() { - return (await this.rollupContract.getTips()).proven; - } - - private isStartBuildingOnProven(fromCheckpoint: CheckpointNumber, provenCheckpoint: CheckpointNumber) { - return fromCheckpoint - 1 <= provenCheckpoint; - } - - private async getSecondsUntilProofSubmissionWindowEnd(epochNumber: EpochNumber) { - const deadline = getProofSubmissionDeadlineTimestamp(epochNumber, await this.rollupContract.getRollupConstants()); - const now = BigInt(Math.floor(Date.now() / 1000)); - return Math.max(Number(deadline - now), 0.001); - } - private async validateEpochProofSubmission(args: { fromCheckpoint: CheckpointNumber; toCheckpoint: CheckpointNumber; @@ -313,6 +265,7 @@ export class ProverNodePublisher { private async sendSubmitEpochProofTx(args: { fromCheckpoint: CheckpointNumber; toCheckpoint: CheckpointNumber; + deadline?: Date; publicInputs: RootRollupPublicInputs; proof: Proof; batchedBlobInputs: BatchedBlob; @@ -331,7 +284,10 @@ export class ProverNodePublisher { args: txArgs, }); try { - const { receipt } = await this.l1TxUtils.sendAndMonitorTransaction({ to: this.rollupContract.address, data }); + const { receipt } = await this.l1TxUtils.sendAndMonitorTransaction( + { to: this.rollupContract.address, data }, + { txTimeoutAt: args.deadline }, + ); if (receipt.status !== 'success') { const errorMsg = await this.l1TxUtils.tryGetErrorFromRevertedTx( data, diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index bb35bcdfa545..d3abd72b5cee 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -187,9 +187,11 @@ describe('ProverNode', () => { expect(reapSpy).not.toHaveBeenCalled(); }); - it('updates the tips store in finally even when the inner handler throws', async () => { + it('propagates a checkpoint registration failure and leaves the tips store unadvanced (A-1041)', async () => { setupNotFullyProven(); - // Make the checkpoint handler throw by having worldState.syncImmediate reject. + // Registration fails: worldState.syncImmediate (inside collectRegisterData) rejects. The + // failure propagates rather than being swallowed, so the checkpoint is never registered and + // the tips stay put for the L2BlockStream to retry. worldState.syncImmediate.mockRejectedValue(new Error('boom')); const event: L2BlockStreamEvent = { @@ -198,26 +200,42 @@ describe('ProverNode', () => { block: { number: BlockNumber(1), hash: '0x01' }, }; - // The handler swallows the inner error (logs warn), so this shouldn't throw. - await proverNode.handleBlockStreamEvent(event); + await expect(proverNode.handleBlockStreamEvent(event)).rejects.toThrow('boom'); - // Confirm the tipsStore observed the event despite the inner failure. - expect(await proverNode.getTipsStore().getL2BlockHash(1)).toBe('0x01'); - // Inner failure is swallowed: the store stays empty and the session manager is NOT - // notified — otherwise downstream would see a notification for content that was never - // actually registered. + // Tips left unadvanced; nothing was registered and the session manager wasn't notified. + expect(await proverNode.getTipsStore().getL2BlockHash(1)).toBeUndefined(); expect(proverNode.getCheckpointStore().listAll()).toHaveLength(0); expect(sessionManager.onCheckpointAdded).not.toHaveBeenCalled(); }); + it('leaves the tips store unadvanced when a handler propagates an error (A-1041)', async () => { + setupNotFullyProven(); + // Registration succeeds, but the expiry sweep throws — a failure that propagates before the + // tips-store update, so the error surfaces to the L2BlockStream and the tips stay put. + l2BlockSource.getSyncedL2SlotNumber.mockRejectedValue(new Error('archiver down')); + + const event: L2BlockStreamEvent = { + type: 'chain-checkpointed', + checkpoint: makePublishedCheckpoint(makeCheckpoint(1, 1, 1)), + block: { number: BlockNumber(1), hash: '0x01' }, + }; + + await expect(proverNode.handleBlockStreamEvent(event)).rejects.toThrow('archiver down'); + + // Tips left unadvanced so the L2BlockStream re-emits this event on its next poll. + expect(await proverNode.getTipsStore().getL2BlockHash(1)).toBeUndefined(); + }); + // ---------------- handleCheckpointEvent gating ---------------- it('skips registration when the epoch is already fully proven on L1', async () => { - // Proven block sits at the last block of epoch 1 (epochDuration=1, slot=1). + // Proven block sits at the last block of epoch 1 (epochDuration=1, slot=1). Block 2 must be + // absent so isProvenBlockLastOfItsEpoch falls through to isEpochComplete and reports the + // proven tip as the epoch's last block. l2BlockSource.getBlockNumber.mockResolvedValue(BlockNumber(1)); - l2BlockSource.getBlockData.mockResolvedValue({ - header: { getSlot: () => SlotNumber(1) }, - } as any); + l2BlockSource.getBlockData.mockImplementation((query: any) => + Promise.resolve(Number(query.number) === 1 ? ({ header: { getSlot: () => SlotNumber(1) } } as any) : undefined), + ); l2BlockSource.isEpochComplete.mockResolvedValue(true); await proverNode.handleBlockStreamEvent({ @@ -246,9 +264,9 @@ describe('ProverNode', () => { // ---------------- forwarders ---------------- - it('startProof forwards to the session manager', async () => { - sessionManager.startProof.mockResolvedValue(undefined); - await proverNode.startProof(EpochNumber(5)); + it('startProof forwards to the session manager and returns the job id', async () => { + sessionManager.startProof.mockResolvedValue('job-5'); + await expect(proverNode.startProof(EpochNumber(5))).resolves.toBe('job-5'); expect(sessionManager.startProof).toHaveBeenCalledWith(EpochNumber(5)); }); diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 3dd8fda32d3a..22c28e8c5bd4 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -5,7 +5,7 @@ import { BlockNumber, CheckpointNumber, EpochNumber } from '@aztec/foundation/br import { assertRequired, compact, pick } from '@aztec/foundation/collection'; import { memoize } from '@aztec/foundation/decorators'; import { createLogger } from '@aztec/foundation/log'; -import { DateProvider } from '@aztec/foundation/timer'; +import { DateProvider, executeTimeout } from '@aztec/foundation/timer'; import type { EpochProverFactory } from '@aztec/prover-client'; import { getLastSiblingPath } from '@aztec/prover-client/helpers'; import { ChonkCache } from '@aztec/prover-client/orchestrator'; @@ -20,7 +20,7 @@ import { import type { Checkpoint, PublishedCheckpoint } from '@aztec/stdlib/checkpoint'; import type { ChainConfig } from '@aztec/stdlib/config'; import type { ContractDataSource } from '@aztec/stdlib/contract'; -import { type L1RollupConstants, getEpochAtSlot } from '@aztec/stdlib/epoch-helpers'; +import { type L1RollupConstants, getEpochAtSlot, getProofSubmissionDeadlineEpoch } from '@aztec/stdlib/epoch-helpers'; import { type EpochProverManager, type EpochProvingJobState, @@ -55,6 +55,13 @@ import { SessionManager } from './session-manager.js'; type ProverNodeOptions = SpecificProverNodeConfig & Partial; type DataStoreOptions = Pick & Pick; +/** + * Grace period for the proof-publishing service to settle during shutdown. The service waits for + * any in-flight L1 proof-submission tx to finish; that tx can take a long time to mine, so we cap + * the wait rather than letting `stop()` hang indefinitely. + */ +const PUBLISHING_SERVICE_STOP_TIMEOUT_MS = 30_000; + /** * An Aztec Prover Node is a standalone process that monitors the chain for new checkpoints, * starts proving them optimistically as they arrive, and submits epoch proofs to L1 once @@ -210,28 +217,26 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra // ---------------- L2BlockStream handler ---------------- public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { - try { - switch (event.type) { - case 'chain-checkpointed': - await this.handleCheckpointEvent(event.checkpoint); - break; - case 'chain-pruned': - await this.handlePruneEvent(event.checkpoint); - break; - case 'chain-proven': - this.publishingService?.onChainProven(BlockNumber(event.block.number)); - break; - case 'chain-finalized': - case 'blocks-added': - break; - } - // Expiry is driven by the archiver's latest synced L2 slot - await this.checkEpochExpiry(); - } finally { - // Update the local tips store *after* the handler ran. A throwing handler shouldn't - // leave the tipsStore claiming progress that didn't happen (A-1041). - await this.tipsStore.handleBlockStreamEvent(event); + switch (event.type) { + case 'chain-checkpointed': + await this.handleCheckpointEvent(event.checkpoint); + break; + case 'chain-pruned': + await this.handlePruneEvent(event.checkpoint); + break; + case 'chain-proven': + this.publishingService?.onChainProven(BlockNumber(event.block.number)); + break; + case 'chain-finalized': + case 'blocks-added': + break; } + // Expiry is driven by the archiver's latest synced L2 slot + await this.checkEpochExpiry(); + // Advance the local tips store only after the proving-side handling has succeeded. Any + // failure above propagates to the L2BlockStream (which logs and stops this poll pass) and + // skips this update, so the event is re-emitted on the next poll rather than skipped (A-1041). + await this.tipsStore.handleBlockStreamEvent(event); } /** Register a new checkpoint with the store and notify the session manager. */ @@ -246,20 +251,21 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra return; } + if (await this.isEpochPastProofSubmissionWindow(epochNumber, l1Constants)) { + this.log.debug( + `Skipping checkpoint ${checkpoint.number} for epoch ${epochNumber} past its proof-submission window`, + ); + return; + } + this.log.info(`New checkpoint ${checkpoint.number} for epoch ${epochNumber}`, { checkpointNumber: checkpoint.number, epochNumber, slotNumber, }); - try { - const registerData = await this.collectRegisterData(checkpoint, publishedCheckpoint.attestations); - await this.checkpointStore.addOrUpdate(checkpoint, registerData); - } catch (err) { - this.log.warn(`Could not register checkpoint ${checkpoint.number} for epoch ${epochNumber}`, err); - return; - } - + const registerData = await this.collectRegisterData(checkpoint, publishedCheckpoint.attestations); + await this.checkpointStore.addOrUpdate(checkpoint, registerData); await this.sessionManager?.onCheckpointAdded(epochNumber); } @@ -306,6 +312,25 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra await this.sessionManager?.onPrune(affectedEpochs); } + /** + * Returns true once the chain has advanced past the given epoch's proof-submission window. + * Used to ignore checkpoints whose epoch can no longer be proven in time — chiefly while the + * archiver replays old blocks after a restart. Compares the archiver's latest synced L2 slot + * against the epoch's submission-deadline epoch; conservatively returns false if the slot can't + * be read yet. + */ + private async isEpochPastProofSubmissionWindow( + epochNumber: EpochNumber, + l1Constants: L1RollupConstants, + ): Promise { + const latestSlot = await this.l2BlockSource.getSyncedL2SlotNumber(); + if (latestSlot === undefined) { + return false; + } + const latestEpoch = getEpochAtSlot(latestSlot, l1Constants); + return latestEpoch >= getProofSubmissionDeadlineEpoch(epochNumber, l1Constants); + } + /** * Compares the archiver's latest synced L2 slot against `lastExpiredEpoch` and, for each * newly-expired epoch, releases the chonk-cache entries for its blocks and reaps any @@ -341,16 +366,9 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra */ private async expireEpoch(epoch: EpochNumber): Promise { try { - const checkpoints = await this.l2BlockSource.getCheckpointsData({ epoch }); - if (checkpoints.length > 0) { - const firstBlock = checkpoints[0].startBlock; - const last = checkpoints[checkpoints.length - 1]; - const lastBlock = BlockNumber(last.startBlock + last.blockCount - 1); - const limit = lastBlock - firstBlock + 1; - const blocks = await this.l2BlockSource.getBlocks({ from: firstBlock, limit }); - if (blocks.length > 0) { - this.chonkCache.releaseForBlocks(blocks); - } + const blocks = await this.l2BlockSource.getBlocks({ epoch, onlyCheckpointed: true }); + if (blocks.length > 0) { + this.chonkCache.releaseForBlocks(blocks); } } catch (err) { this.log.warn(`Could not release chonk-cache entries for expired epoch ${epoch}`, err); @@ -360,11 +378,14 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra // ---------------- public API ---------------- - public async startProof(epochNumber: EpochNumber): Promise { + /** + * Schedules proving for the given epoch and returns the job id without waiting for completion. + */ + public async startProof(epochNumber: EpochNumber): Promise { if (!this.sessionManager) { throw new Error('ProverNode not started'); } - await this.sessionManager.startProof(epochNumber); + return await this.sessionManager.startProof(epochNumber); } // ---------------- Service lifecycle ---------------- @@ -409,7 +430,15 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra await this.sessionManager.stop(); } if (this.publishingService) { - await this.publishingService.stop(); + // Bound the wait: the publishing service blocks until any in-flight L1 proof-submission tx + // settles, which can outlast a reasonable shutdown window. On timeout we log and move on — + // the tx may still mine, but shutdown must not hang on it. + const publishingService = this.publishingService; + await executeTimeout( + () => publishingService.stop(), + PUBLISHING_SERVICE_STOP_TIMEOUT_MS, + 'prover-node publishing-service stop', + ).catch(err => this.log.warn(`Timed out stopping proof publishing service`, err)); } await this.checkpointStore.stop(); this.chonkCache.stop(); @@ -439,7 +468,6 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra maxPendingJobs: this.config.proverNodeMaxPendingJobs, tickIntervalMs: this.config.proverNodePollingIntervalMs, finalizationDelayMs: this.config.proverNodeEpochProvingDelayMs, - provingDelayMs: this.config.proverNodeEpochProvingDelayMs, }, onSessionFailed: async session => { await this.tryUploadSessionFailure(session); diff --git a/yarn-project/prover-node/src/session-manager.test.ts b/yarn-project/prover-node/src/session-manager.test.ts index 4c965fa734e1..c907cf8b3151 100644 --- a/yarn-project/prover-node/src/session-manager.test.ts +++ b/yarn-project/prover-node/src/session-manager.test.ts @@ -9,7 +9,7 @@ import type { EpochProvingJobState } from '@aztec/stdlib/interfaces/server'; import { mock } from 'jest-mock-extended'; import type { CheckpointStore } from './checkpoint-store.js'; -import type { CheckpointProver } from './job/checkpoint-prover.js'; +import { CheckpointProver } from './job/checkpoint-prover.js'; import { EpochSession, type SessionSpec } from './job/epoch-session.js'; import { ProverNodeJobMetrics } from './metrics.js'; import type { ProofPublishingService } from './proof-publishing-service.js'; @@ -112,10 +112,7 @@ describe('SessionManager', () => { it('does not open a full session when archiver checkpoints are not all in the store', async () => { const epoch = EpochNumber(3); l2BlockSource.isEpochComplete.mockResolvedValue(true); - l2BlockSource.getCheckpoints.mockResolvedValue([ - { checkpoint: { number: 1 } } as any, - { checkpoint: { number: 2 } } as any, - ]); + l2BlockSource.getCheckpoints.mockResolvedValue([archiverCp(1, 6), archiverCp(2, 7)]); // Store only has checkpoint 1. store.listCanonicalInSlotRange.mockReturnValue([proverForCheckpoint(1, 6)]); await manager.onCheckpointAdded(epoch); @@ -128,10 +125,7 @@ describe('SessionManager', () => { // Two canonical checkpoints at distinct slots within epoch 3's range [6, 7]. const provers = [proverForCheckpoint(1, 6), proverForCheckpoint(2, 7)]; l2BlockSource.isEpochComplete.mockResolvedValue(true); - l2BlockSource.getCheckpoints.mockResolvedValue([ - { checkpoint: { number: 1 } } as any, - { checkpoint: { number: 2 } } as any, - ]); + l2BlockSource.getCheckpoints.mockResolvedValue([archiverCp(1, 6), archiverCp(2, 7)]); store.listCanonicalInSlotRange.mockReturnValue(provers); await manager.onCheckpointAdded(epoch); @@ -178,7 +172,7 @@ describe('SessionManager', () => { l2BlockSource.getCheckpoints.mockImplementation(({ epoch }: { epoch: EpochNumber }) => Promise.resolve( (Number(epoch) === 3 ? epoch3Provers : Number(epoch) === 4 ? epoch4Provers : []).map( - p => ({ checkpoint: { number: p.checkpoint.number } }) as any, + p => ({ checkpoint: p.checkpoint }) as any, ), ), ); @@ -208,7 +202,7 @@ describe('SessionManager', () => { mockNextUnprovenSlot(2, 6); const provers = [proverForCheckpoint(1, 6)]; l2BlockSource.isEpochComplete.mockResolvedValue(true); - l2BlockSource.getCheckpoints.mockResolvedValue([{ checkpoint: { number: 1 } } as any]); + l2BlockSource.getCheckpoints.mockResolvedValue([archiverCp(1, 6)]); store.listCanonicalInSlotRange.mockReturnValue(provers); await manager.onTick(); @@ -234,7 +228,7 @@ describe('SessionManager', () => { mockNextUnprovenSlot(2, 6); const provers = [proverForCheckpoint(1, 6)]; l2BlockSource.isEpochComplete.mockResolvedValue(true); - l2BlockSource.getCheckpoints.mockResolvedValue([{ checkpoint: { number: 1 } } as any]); + l2BlockSource.getCheckpoints.mockResolvedValue([archiverCp(1, 6)]); store.listCanonicalInSlotRange.mockReturnValue(provers); await manager.onTick(); @@ -251,7 +245,7 @@ describe('SessionManager', () => { mockNextUnprovenSlot(2, 6); const provers = [proverForCheckpoint(1, 6)]; l2BlockSource.isEpochComplete.mockResolvedValue(true); - l2BlockSource.getCheckpoints.mockResolvedValue([{ checkpoint: { number: 1 } } as any]); + l2BlockSource.getCheckpoints.mockResolvedValue([archiverCp(1, 6)]); store.listCanonicalInSlotRange.mockReturnValue(provers); await manager.onTick(); @@ -272,7 +266,7 @@ describe('SessionManager', () => { // and the next tick must try again rather than skip the epoch. mockNextUnprovenSlot(2, 6); l2BlockSource.isEpochComplete.mockResolvedValue(true); - l2BlockSource.getCheckpoints.mockResolvedValue([{ checkpoint: { number: 1 } } as any]); + l2BlockSource.getCheckpoints.mockResolvedValue([archiverCp(1, 6)]); store.listCanonicalInSlotRange.mockReturnValue([]); // store hasn't indexed it yet await manager.onTick(); @@ -405,8 +399,7 @@ describe('SessionManager', () => { expect(manager.getPartialSession(original.spec)).toBe(recreated as unknown as EpochSession); expect(stubs).toHaveLength(2); - // startProof was awaiting original.whenDone; cancel resolved it as 'cancelled' so the - // outer promise resolves cleanly without surfacing an unhandled rejection. + // startProof resolves with the scheduled job id as soon as the session is constructed. await startPromise; }); @@ -536,9 +529,9 @@ describe('SessionManager', () => { expect(partial.isTerminal()).toBe(false); expect(manager.getPartialSession(partial.spec)).toBe(partial as unknown as EpochSession); - // startProof awaits whenDone — settle the stub so the test can finish. - partial.terminate('completed'); + // startProof returns the job id without awaiting completion; await the resolved id. await done; + partial.terminate('completed'); }); it('startProof throws when the epoch has no canonical content', async () => { @@ -546,19 +539,32 @@ describe('SessionManager', () => { await expect(manager.startProof(EpochNumber(7))).rejects.toThrow(/No blocks found/); }); + it('startProof refuses to re-prove an epoch the proven chain already encompasses', async () => { + const epoch = EpochNumber(7); + // proverForCheckpoint builds a checkpoint whose single block number equals the checkpoint + // number (1 here). A proven tip at or beyond that block means the epoch is already proven. + store.listCanonicalForEpoch.mockResolvedValue([proverForCheckpoint(1, 14)]); + l2BlockSource.getBlockNumber.mockResolvedValue(BlockNumber(1)); + + await expect(manager.startProof(epoch)).rejects.toThrow(/already proven/i); + expect(stubs).toHaveLength(0); + }); + it('startProof dedupes against an existing full session with the same range', async () => { const epoch = EpochNumber(7); - const provers = [proverForCheckpoint(1, 14)]; + // Checkpoint at the epoch's last slot (15) so the partial range startProof derives ([14,15]) + // matches the full session's range — otherwise the dedup guard wouldn't fire. + const provers = [proverForCheckpoint(1, 15)]; await openCanonicalFullSession(epoch, provers); expect(stubs.length).toBe(1); const fullSession = stubs[0]; store.listCanonicalForEpoch.mockResolvedValue(provers); - const done = manager.startProof(epoch); + const doneId = await manager.startProof(epoch); fullSession.terminate('completed'); - await done; - // No new session opened; startProof just awaited the full's whenDone. + // No new session opened; startProof returned the existing full session's id. + expect(doneId).toBe(fullSession.uuid); expect(stubs.length).toBe(1); }); @@ -568,28 +574,19 @@ describe('SessionManager', () => { store.listCanonicalForEpoch.mockResolvedValue(canonical); store.listCanonicalInSlotRange.mockReturnValue(canonical); - const stubPromise = awaitNextStub(); - const first = manager.startProof(epoch); - const partial = await stubPromise; + const firstId = await manager.startProof(epoch); expect(stubs).toHaveLength(1); + const partial = stubs[0]; + expect(firstId).toBe(partial.uuid); - // Wait for first.startProof's final `await created.whenDone()` to land, then arm a - // fresh trigger for the next whenDone — which can only be second.startProof's dedup - // branch (`await existingPartial.whenDone()`). This guarantees the dedup check has - // fired before we terminate, removing the race that would otherwise let second fall - // through to construct a fresh stub. - await awaitNextWhenDoneCall(partial); - expect(partial.whenDoneCalls).toBe(1); - - const dedupAwaited = awaitNextWhenDoneCall(partial); - const second = manager.startProof(epoch); - await dedupAwaited; - expect(partial.whenDoneCalls).toBe(2); - - partial.terminate('completed'); - await Promise.all([first, second]); + // A second startProof for the same spec returns the existing partial's id without + // constructing a new session or cancelling the existing one. + const secondId = await manager.startProof(epoch); + expect(secondId).toBe(partial.uuid); expect(stubs).toHaveLength(1); // no second stub ever constructed expect(partial.cancelReasons).toEqual([]); // dedup path never cancels the existing partial + + partial.terminate('completed'); }); // ---------------- stop ---------------- @@ -604,7 +601,7 @@ describe('SessionManager', () => { l2BlockSource.getCheckpoints.mockImplementation(({ epoch }: { epoch: EpochNumber }) => Promise.resolve( (Number(epoch) === 3 ? epoch3Provers : Number(epoch) === 4 ? epoch4Provers : []).map( - p => ({ checkpoint: { number: p.checkpoint.number } }) as any, + p => ({ checkpoint: p.checkpoint }) as any, ), ), ); @@ -670,9 +667,7 @@ describe('SessionManager', () => { async function openCanonicalFullSession(epoch: EpochNumber, provers: CheckpointProver[]): Promise { l2BlockSource.isEpochComplete.mockResolvedValueOnce(true); - l2BlockSource.getCheckpoints.mockResolvedValueOnce( - provers.map(p => ({ checkpoint: { number: p.checkpoint.number } }) as any), - ); + l2BlockSource.getCheckpoints.mockResolvedValueOnce(provers.map(p => ({ checkpoint: p.checkpoint }) as any)); store.listCanonicalInSlotRange.mockReturnValueOnce(provers); await manager.onCheckpointAdded(epoch); } @@ -692,20 +687,6 @@ describe('SessionManager', () => { return promise; } - /** - * Single-shot trigger: resolves on the next `stub.whenDone()` invocation. Lets tests - * wait for an `await session.whenDone()` callsite (e.g. startProof's final await, or - * the dedup branch) to land without polling or sleeping. - */ - function awaitNextWhenDoneCall(stub: StubSession): Promise { - const { promise, resolve } = promiseWithResolvers(); - stub.onWhenDone = () => { - stub.onWhenDone = undefined; - resolve(); - }; - return promise; - } - /** * Mocks the chain tip that `nextUnprovenEpoch` reads: proven height = `provenBlock`, with the * first unproven block (`provenBlock + 1`) sitting at `firstUnprovenSlot` — or not yet mined @@ -752,10 +733,6 @@ type StubSession = { cancelBlocker?: Promise; /** Resolves the first time cancel() is invoked — tests use it to know when stop's cancel call lands. */ cancelStarted: ReturnType>; - /** Number of times whenDone() has been invoked. Lets tests deterministically detect dedup awaits. */ - whenDoneCalls: number; - /** Fires every time whenDone() is invoked — useful for "wait until the dedup branch awaits". */ - onWhenDone?: () => void; donePromise: Promise; resolveDone: (s: EpochProvingJobState) => void; terminate(state: EpochProvingJobState): void; @@ -783,7 +760,6 @@ function makeStubSession(spec: SessionSpec, provers: readonly CheckpointProver[] cancelled: false, cancelReasons: [], cancelStarted: promiseWithResolvers(), - whenDoneCalls: 0, donePromise: promise, resolveDone: resolve, terminate(state) { @@ -829,24 +805,42 @@ function makeStubSession(spec: SessionSpec, provers: readonly CheckpointProver[] return this.donePromise; }, whenDone() { - this.whenDoneCalls++; - this.onWhenDone?.(); return this.donePromise; }, }; return stub; } +/** + * Minimal checkpoint content carrying just enough for `CheckpointProver.idFor` (number, slot, + * archive root). The archive root is derived from (number, slot) so identical (number, slot) pairs + * produce identical content-addressed ids — letting archiver-side and store-side stubs match. + */ +function makeCheckpointContent(number: number, slot: number) { + return { + number, + header: { slotNumber: SlotNumber(slot) }, + archive: { root: { toString: () => `root-${number}-${slot}` } }, + blocks: [{ number }], + } as any; +} + function proverForCheckpoint(number: number, slot: number): CheckpointProver { + const checkpoint = makeCheckpointContent(number, slot); return { - id: `${number}:${slot}`, - checkpoint: { number, blocks: [{ number }] } as any, + id: CheckpointProver.idFor(checkpoint), + checkpoint, slotNumber: SlotNumber(slot), isPruned: () => false, isCancelled: () => false, } as unknown as CheckpointProver; } +/** Archiver-side PublishedCheckpoint stub whose content matches `proverForCheckpoint(number, slot)`. */ +function archiverCp(number: number, slot: number) { + return { checkpoint: makeCheckpointContent(number, slot) } as any; +} + function proverWithSlot(slot: number): CheckpointProver { return proverForCheckpoint(1, slot); } diff --git a/yarn-project/prover-node/src/session-manager.ts b/yarn-project/prover-node/src/session-manager.ts index 3df56a3801b9..3430815c205f 100644 --- a/yarn-project/prover-node/src/session-manager.ts +++ b/yarn-project/prover-node/src/session-manager.ts @@ -4,7 +4,6 @@ import type { EthAddress } from '@aztec/foundation/eth-address'; import { type Logger, type LoggerBindings, createLogger } from '@aztec/foundation/log'; import { SerialQueue } from '@aztec/foundation/queue'; import { RunningPromise } from '@aztec/foundation/running-promise'; -import { sleep } from '@aztec/foundation/sleep'; import type { DateProvider } from '@aztec/foundation/timer'; import type { EpochProverFactory } from '@aztec/prover-client'; import type { L2BlockSource } from '@aztec/stdlib/block'; @@ -18,7 +17,7 @@ import { import type { EpochProvingJobState } from '@aztec/stdlib/interfaces/server'; import type { CheckpointStore } from './checkpoint-store.js'; -import type { CheckpointProver } from './job/checkpoint-prover.js'; +import { CheckpointProver } from './job/checkpoint-prover.js'; import type { EpochProvingJobData } from './job/epoch-proving-job-data.js'; import { EpochSession, @@ -44,10 +43,8 @@ export type SessionManagerConfig = { maxPendingJobs: number; /** Interval at which the internal periodic tick fires `reconcile({ kind: 'tick' })`. */ tickIntervalMs: number; - /** Forwarded to every session. */ + /** Forwarded to every session: delay before top-tree proving, letting late reorgs settle. */ finalizationDelayMs: number | undefined; - /** Debug-only: delay before opening a full session once its epoch is ready to prove. */ - provingDelayMs?: number; }; export type SessionManagerDeps = { @@ -181,22 +178,28 @@ export class SessionManager { // ---------------- public API ---------------- /** - * Begins a proof attempt for the supplied epoch. Every session — full or partial — - * begins at the epoch's first slot; the partial's spec stops at the last canonical - * slot, while the full's stops at the epoch's last slot. Dedupes against any - * existing session covering the same range; awaits completion. + * Schedules a proof attempt for the supplied epoch and returns the job id without waiting for + * the proof to complete — proving can far outlast an HTTP request, so callers poll `getJobs()` + * for the outcome. Every session — full or partial — begins at the epoch's first slot; the + * partial's spec stops at the last canonical slot, while the full's stops at the epoch's last + * slot. Dedupes against any existing session covering the same range, returning its id. */ - public async startProof(epoch: EpochNumber): Promise { + public async startProof(epoch: EpochNumber): Promise { const canonical = await this.deps.checkpointStore.listCanonicalForEpoch(epoch); if (canonical.length === 0) { throw new EmptyEpochError(epoch); } + // Don't re-prove an epoch the L1 proven chain already encompasses — it was already proven + // (possibly by another prover node), so a fresh proof would be wasted work. + if (await this.isProvenChainEncompassing(canonical)) { + throw new EpochAlreadyProvenError(epoch); + } const l1Constants = await this.getL1Constants(); const [fromSlot] = getSlotRangeForEpoch(epoch, l1Constants); const toSlot = canonical[canonical.length - 1].slotNumber; const spec: SessionSpec = { kind: 'partial', epochNumber: epoch, fromSlot, toSlot }; - // Are there any sessions that cover this exact range already + // Reuse a session already covering this exact range rather than scheduling a duplicate. const existingFull = this.getFullSession(epoch); if ( existingFull && @@ -204,20 +207,19 @@ export class SessionManager { existingFull.getSpec().fromSlot === fromSlot && existingFull.getSpec().toSlot === toSlot ) { - await existingFull.whenDone(); - return; + return existingFull.getId(); } const existingPartial = this.getPartialSession(spec); if (existingPartial && !existingPartial.isTerminal()) { - await existingPartial.whenDone(); - return; + return existingPartial.getId(); } await this.scheduleReconcile({ kind: 'start-proof', spec }); const created = this.getPartialSession(spec); - if (created) { - await created.whenDone(); + if (!created) { + throw new Error(`Failed to schedule partial proof for epoch ${epoch}`); } + return created.getId(); } /** Stops the tick, drains the reconcile queue, and cancels every live session. */ @@ -327,16 +329,25 @@ export class SessionManager { } private openPartialSession(spec: SessionSpec): void { - if (this.getPartialSession(spec)?.isTerminal() === false) { + const canonical = this.deps.checkpointStore.listCanonicalInSlotRange(spec.fromSlot, spec.toSlot); + if (canonical.length === 0) { + return; + } + // Reuse a live partial session for this epoch whose checkpoint set already matches the + // canonical content — e.g. a repeated `startProof` with no new checkpoints mined since the + // last one. Reconstructing would re-prove identical content and burn a pending-job slot. + const existing = Array.from(this.partialSessions.values()).find( + s => + s.getSpec().epochNumber === spec.epochNumber && + !s.isTerminal() && + this.checkpointsMatch(s.getCheckpoints(), canonical), + ); + if (existing) { return; } if (this.atMaxSessionLimit()) { throw new Error(`Maximum pending proving jobs ${this.deps.config.maxPendingJobs} reached.`); } - const canonical = this.deps.checkpointStore.listCanonicalInSlotRange(spec.fromSlot, spec.toSlot); - if (canonical.length === 0) { - return; - } const session = this.constructSession(spec, canonical); this.partialSessions.set(specKey(spec), session); void this.runSession(session); @@ -383,16 +394,7 @@ export class SessionManager { } private async runSession(session: EpochSession): Promise { - // Debug-only knob — delay actual proving start without blocking the reconcile queue - // (the alternative, sleeping inside openFullSessionIfReady, would stall every other - // tick / checkpoint / prune for the configured duration). - if (this.deps.config.provingDelayMs) { - this.log.warn( - `Waiting ${this.deps.config.provingDelayMs}ms before proving epoch ${session.getSpec().epochNumber}`, - ); - await sleep(this.deps.config.provingDelayMs); - } - // A reconcile may have cancelled this session during the delay (content-change + // A reconcile may have cancelled this session before it starts (content-change // recreation). Don't proceed — start() would build a TopTreeJob that should never run. if (session.isTerminal()) { this.log.debug(`Skipping start for ${session.getId()}: already terminal (${session.getState()})`); @@ -506,8 +508,25 @@ export class SessionManager { if (storeCps.length < archiverCps.length) { return false; } - const storeNumbers = new Set(storeCps.map(p => p.checkpoint.number)); - return archiverCps.every(cp => storeNumbers.has(cp.checkpoint.number)); + // Compare by content-addressed id (number, slot, archive root) rather than checkpoint number: + // a reorg can keep the number while changing the checkpoint's post-state archive root. + const storeIds = new Set(storeCps.map(p => p.id)); + return archiverCps.every(cp => storeIds.has(CheckpointProver.idFor(cp.checkpoint))); + } + + /** + * Returns true if the L1 proven tip already covers every canonical checkpoint in the set — i.e. + * the epoch has already been fully proven, so there is no point starting a new proof for it. + * Conservatively returns false when nothing is proven yet. + */ + private async isProvenChainEncompassing(canonical: readonly CheckpointProver[]): Promise { + const provenBlock = await this.deps.l2BlockSource.getBlockNumber({ tag: 'proven' }); + if (!provenBlock || provenBlock <= 0) { + return false; + } + const lastCheckpoint = canonical[canonical.length - 1].checkpoint; + const lastBlock = lastCheckpoint.blocks[lastCheckpoint.blocks.length - 1].number; + return provenBlock >= lastBlock; } private async getL1Constants(): Promise { @@ -524,3 +543,10 @@ class EmptyEpochError extends Error { this.name = 'EmptyEpochError'; } } + +class EpochAlreadyProvenError extends Error { + constructor(epochNumber: EpochNumber) { + super(`Epoch ${epochNumber} is already proven on L1`); + this.name = 'EpochAlreadyProvenError'; + } +} diff --git a/yarn-project/stdlib/src/interfaces/prover-node.test.ts b/yarn-project/stdlib/src/interfaces/prover-node.test.ts index 43d1a26ab364..770412e760f9 100644 --- a/yarn-project/stdlib/src/interfaces/prover-node.test.ts +++ b/yarn-project/stdlib/src/interfaces/prover-node.test.ts @@ -94,8 +94,8 @@ class MockProverNode implements ProverNodeApi { ]); } - startProof(epochNumber: number): Promise { + startProof(epochNumber: number): Promise { expect(typeof epochNumber).toBe('number'); - return Promise.resolve(); + return Promise.resolve(`job-${epochNumber}`); } } diff --git a/yarn-project/stdlib/src/interfaces/prover-node.ts b/yarn-project/stdlib/src/interfaces/prover-node.ts index 49f23a342799..18f9a47a37db 100644 --- a/yarn-project/stdlib/src/interfaces/prover-node.ts +++ b/yarn-project/stdlib/src/interfaces/prover-node.ts @@ -34,7 +34,12 @@ export type EpochProvingJobTerminalState = (typeof EpochProvingJobTerminalState) export interface ProverNodeApi { getJobs(): Promise<{ uuid: string; status: EpochProvingJobState; epochNumber: number }[]>; - startProof(epochNumber: number): Promise; + /** + * Schedules proving for the given epoch and returns the job id immediately, without waiting for + * the proof to complete (proving can take far longer than an HTTP request). Poll `getJobs()` to + * track the returned job's progress. + */ + startProof(epochNumber: number): Promise; getL2Tips(): Promise; @@ -48,7 +53,7 @@ export const ProverNodeApiSchema: ApiSchemaFor = { output: z.array(z.object({ uuid: z.string(), status: z.enum(EpochProvingJobState), epochNumber: z.number() })), }), - startProof: z.function({ input: z.tuple([schemas.Integer]), output: z.void() }), + startProof: z.function({ input: z.tuple([schemas.Integer]), output: z.string() }), getL2Tips: z.function({ input: z.tuple([]), output: L2TipsSchema }),