Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions yarn-project/circuit-types/src/p2p/block_attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ export class BlockAttestation extends Gossipable {
return this.payload.archive;
}

get slotNumber(): Fr {
return this.payload.header.globalVariables.slotNumber;
}

get blockNumber(): Fr {
return this.payload.header.globalVariables.blockNumber;
}

/**Get sender
*
* Lazily evaluate and cache the sender of the attestation
Expand Down
4 changes: 4 additions & 0 deletions yarn-project/circuit-types/src/p2p/block_proposal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ export class BlockProposal extends Gossipable {
return this.payload.header.globalVariables.slotNumber;
}

get blockNumber(): Fr {
return this.payload.header.globalVariables.blockNumber;
}

static async createProposalFromSigner(
payload: ConsensusPayload,
payloadSigner: (payload: Buffer32) => Promise<Signature>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,20 @@ export class KvAttestationPool implements AttestationPool {

public async addAttestations(attestations: BlockAttestation[]): Promise<void> {
for (const attestation of attestations) {
const slotNumber = attestation.payload.header.globalVariables.slotNumber.toString();
const slotNumber = attestation.payload.header.globalVariables.slotNumber;
const proposalId = attestation.archive.toString();
const address = (await attestation.getSender()).toString();

// Index the proposalId in the slot map
await this.attestations.set(slotNumber, proposalId);
await this.attestations.set(slotNumber.toString(), proposalId);

// Store the actual attestation in the proposal map
const proposalMap = this.getProposalMap(slotNumber, proposalId);
const proposalMap = this.getProposalMap(slotNumber.toString(), proposalId);
await proposalMap.set(address, attestation.toBuffer());

this.log.verbose(`Added attestation for slot ${slotNumber} from ${address}`);
this.log.verbose(`Added attestation for slot ${slotNumber.toNumber()} from ${address}`, {
slotNumber: slotNumber.toNumber(),
});
}

this.metrics.recordAddedObjects(attestations.length);
Expand Down
84 changes: 58 additions & 26 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,12 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
this.blockProposalValidator = new BlockProposalValidator(epochCache);
this.epochProofQuoteValidator = new EpochProofQuoteValidator(epochCache);

this.blockReceivedCallback = (block: BlockProposal): Promise<BlockAttestation | undefined> => {
this.logger.verbose(
`[WARNING] handler not yet registered: Block received callback not set. Received block ${block.p2pMessageIdentifier()} from peer.`,
this.blockReceivedCallback = async (block: BlockProposal): Promise<BlockAttestation | undefined> => {
this.logger.warn(
`Handler not yet registered: Block received callback not set. Received block for slot ${block.slotNumber.toNumber()} from peer.`,
{ p2pMessageIdentifier: await block.p2pMessageIdentifier() },
);
return Promise.resolve(undefined);
return undefined;
};
}

Expand Down Expand Up @@ -472,7 +473,7 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
}
if (message.topic == EpochProofQuote.p2pTopic) {
const epochProofQuote = EpochProofQuote.fromBuffer(Buffer.from(message.data));
this.processEpochProofQuoteFromPeer(epochProofQuote);
await this.processEpochProofQuoteFromPeer(epochProofQuote);
}

return;
Expand All @@ -483,14 +484,22 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
*
* @param attestation - The attestation to process.
*/
@trackSpan('Libp2pService.processAttestationFromPeer', attestation => ({
@trackSpan('Libp2pService.processAttestationFromPeer', async attestation => ({
[Attributes.BLOCK_NUMBER]: attestation.payload.header.globalVariables.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: attestation.payload.header.globalVariables.slotNumber.toNumber(),
[Attributes.BLOCK_ARCHIVE]: attestation.archive.toString(),
[Attributes.P2P_ID]: attestation.p2pMessageIdentifier().toString(),
[Attributes.P2P_ID]: await attestation.p2pMessageIdentifier().then(i => i.toString()),
}))
private async processAttestationFromPeer(attestation: BlockAttestation): Promise<void> {
this.logger.debug(`Received attestation ${attestation.p2pMessageIdentifier()} from external peer.`);
this.logger.debug(
`Received attestation for block ${attestation.blockNumber.toNumber()} slot ${attestation.slotNumber.toNumber()} from external peer.`,
{
p2pMessageIdentifier: await attestation.p2pMessageIdentifier(),
slot: attestation.slotNumber.toNumber(),
archive: attestation.archive.toString(),
block: attestation.blockNumber.toNumber(),
},
);
await this.mempools.attestationPool!.addAttestations([attestation]);
}

Expand All @@ -501,49 +510,72 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
* @param block - The block to process.
*/
// REVIEW: callback pattern https://github.com/AztecProtocol/aztec-packages/issues/7963
@trackSpan('Libp2pService.processBlockFromPeer', block => ({
[Attributes.BLOCK_NUMBER]: block.payload.header.globalVariables.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: block.payload.header.globalVariables.slotNumber.toNumber(),
@trackSpan('Libp2pService.processBlockFromPeer', async block => ({
[Attributes.BLOCK_NUMBER]: block.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: block.slotNumber.toNumber(),
[Attributes.BLOCK_ARCHIVE]: block.archive.toString(),
[Attributes.P2P_ID]: block.p2pMessageIdentifier().toString(),
[Attributes.P2P_ID]: await block.p2pMessageIdentifier().then(i => i.toString()),
}))
private async processBlockFromPeer(block: BlockProposal): Promise<void> {
this.logger.verbose(`Received block ${block.p2pMessageIdentifier()} from external peer.`);
this.logger.verbose(
`Received block ${block.blockNumber.toNumber()} for slot ${block.slotNumber.toNumber()} from external peer.`,
{
p2pMessageIdentifier: await block.p2pMessageIdentifier(),
slot: block.slotNumber.toNumber(),
archive: block.archive.toString(),
block: block.blockNumber.toNumber(),
},
);
const attestation = await this.blockReceivedCallback(block);

// TODO: fix up this pattern - the abstraction is not nice
// The attestation can be undefined if no handler is registered / the validator deems the block invalid
if (attestation != undefined) {
this.logger.verbose(`Broadcasting attestation ${attestation.p2pMessageIdentifier()}`);
this.broadcastAttestation(attestation);
this.logger.verbose(
`Broadcasting attestation for block ${attestation.blockNumber.toNumber()} slot ${attestation.slotNumber.toNumber()}`,
{
p2pMessageIdentifier: await attestation.p2pMessageIdentifier(),
slot: attestation.slotNumber.toNumber(),
archive: attestation.archive.toString(),
block: attestation.blockNumber.toNumber(),
},
);
await this.broadcastAttestation(attestation);
}
}

/**
* Broadcast an attestation to all peers.
* @param attestation - The attestation to broadcast.
*/
@trackSpan('Libp2pService.broadcastAttestation', attestation => ({
@trackSpan('Libp2pService.broadcastAttestation', async attestation => ({
[Attributes.BLOCK_NUMBER]: attestation.payload.header.globalVariables.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: attestation.payload.header.globalVariables.slotNumber.toNumber(),
[Attributes.BLOCK_ARCHIVE]: attestation.archive.toString(),
[Attributes.P2P_ID]: attestation.p2pMessageIdentifier().toString(),
[Attributes.P2P_ID]: await attestation.p2pMessageIdentifier().then(i => i.toString()),
}))
private broadcastAttestation(attestation: BlockAttestation): void {
this.propagate(attestation);
private async broadcastAttestation(attestation: BlockAttestation) {
await this.propagate(attestation);
}

private processEpochProofQuoteFromPeer(epochProofQuote: EpochProofQuote): void {
this.logger.verbose(`Received epoch proof quote ${epochProofQuote.p2pMessageIdentifier()} from external peer.`);
private async processEpochProofQuoteFromPeer(epochProofQuote: EpochProofQuote) {
const epoch = epochProofQuote.payload.epochToProve;
const prover = epochProofQuote.payload.prover.toString();
const p2pMessageIdentifier = await epochProofQuote.p2pMessageIdentifier();
this.logger.verbose(
`Received epoch proof quote ${p2pMessageIdentifier} by prover ${prover} for epoch ${epoch} from external peer.`,
{ quote: epochProofQuote.toInspect(), p2pMessageIdentifier },
);
this.mempools.epochProofQuotePool.addQuote(epochProofQuote);
}

/**
* Propagates provided message to peers.
* @param message - The message to propagate.
*/
public propagate<T extends Gossipable>(message: T): void {
this.logger.trace(`[${message.p2pMessageIdentifier()}] queued`);
public async propagate<T extends Gossipable>(message: T) {
const p2pMessageIdentifier = await message.p2pMessageIdentifier();
this.logger.trace(`Message ${p2pMessageIdentifier} queued`, { p2pMessageIdentifier });
void this.jobQueue.put(async () => {
await this.sendToPeers(message);
});
Expand Down Expand Up @@ -866,11 +898,11 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
private async sendToPeers<T extends Gossipable>(message: T) {
const parent = message.constructor as typeof Gossipable;

const identifier = message.p2pMessageIdentifier().toString();
this.logger.trace(`Sending message ${identifier}`);
const identifier = await message.p2pMessageIdentifier().then(i => i.toString());
this.logger.trace(`Sending message ${identifier}`, { p2pMessageIdentifier: identifier });

const recipientsNum = await this.publishToTopic(parent.p2pTopic, message.toBuffer());
this.logger.debug(`Sent message ${identifier} to ${recipientsNum} peers`);
this.logger.debug(`Sent message ${identifier} to ${recipientsNum} peers`, { p2pMessageIdentifier: identifier });
}

// Libp2p seems to hang sometimes if new peers are initiating connections.
Expand Down
5 changes: 4 additions & 1 deletion yarn-project/prover-node/src/job/epoch-proving-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ export class EpochProvingJob implements Traceable {
throw new Error('Failed to submit epoch proof to L1');
}

this.log.info(`Submitted proof for epoch`, { epochNumber, uuid: this.uuid });
this.log.info(`Submitted proof for epoch ${epochNumber} (blocks ${fromBlock} to ${toBlock})`, {
epochNumber,
uuid: this.uuid,
});
this.state = 'completed';
this.metrics.recordProvingJob(executionTime, timer.ms(), epochSizeBlocks, epochSizeTxs);
} catch (err: any) {
Expand Down