Skip to content

Commit cf5506d

Browse files
committed
ProofAggregationCoordinatorService: Add poller for proof response
1 parent 02862e9 commit cf5506d

File tree

4 files changed

+123
-23
lines changed

4 files changed

+123
-23
lines changed

coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/conflation/ConflationApp.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,8 +383,6 @@ class ConflationApp(
383383
),
384384
consecutiveProvenBlobsProvider = maxBlobEndBlockNumberTracker,
385385
proofAggregationClient = proverClientFactory.proofAggregationProverClient(),
386-
l2EthApiClient = l2EthClient,
387-
l2MessageService = l2MessageService,
388386
noL2ActivityTimeout = configs.conflation.conflationDeadlineLastBlockConfirmationDelay,
389387
waitForNoL2ActivityToTriggerAggregation =
390388
configs.conflation.proofAggregation.waitForNoL2ActivityToTriggerAggregation,
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package net.consensys.zkevm.ethereum.coordination.aggregation
2+
3+
import io.vertx.core.Vertx
4+
import linea.timer.TimerSchedule
5+
import linea.timer.VertxPeriodicPollingService
6+
import net.consensys.zkevm.coordinator.clients.ProofAggregationProverClientV2
7+
import net.consensys.zkevm.domain.Aggregation
8+
import net.consensys.zkevm.domain.AggregationProofIndex
9+
import org.apache.logging.log4j.Logger
10+
import tech.pegasys.teku.infrastructure.async.SafeFuture
11+
import java.util.concurrent.ConcurrentLinkedDeque
12+
import kotlin.time.Duration.Companion.milliseconds
13+
14+
class AggregationProofPoller(
15+
private val aggregationProofClient: ProofAggregationProverClientV2,
16+
private val aggregationProofHandler: AggregationProofHandler,
17+
private val log: Logger,
18+
vertx: Vertx,
19+
pollingIntervalMs: Long = 100.milliseconds.inWholeMilliseconds,
20+
name: String = "AggregationProofPoller",
21+
timerSchedule: TimerSchedule = TimerSchedule.FIXED_DELAY,
22+
) : VertxPeriodicPollingService(
23+
vertx = vertx,
24+
pollingIntervalMs = pollingIntervalMs,
25+
log = log,
26+
name = name,
27+
timerSchedule = timerSchedule,
28+
) {
29+
data class ProofInProgress(
30+
val proofIndex: AggregationProofIndex,
31+
val unProvenAggregation: Aggregation,
32+
)
33+
34+
private val proofRequestsInProgress = ConcurrentLinkedDeque<ProofInProgress>()
35+
36+
@Synchronized
37+
fun addProofRequestsInProgressForPolling(proofIndex: AggregationProofIndex, unProvenAggregation: Aggregation) {
38+
proofRequestsInProgress.add(ProofInProgress(proofIndex, unProvenAggregation))
39+
}
40+
41+
override fun action(): SafeFuture<*> {
42+
return if (proofRequestsInProgress.isNotEmpty()) {
43+
val proofInProgress = proofRequestsInProgress.peekFirst()
44+
aggregationProofClient.findProofResponse(proofInProgress.proofIndex)
45+
.thenCompose { proofResponse ->
46+
if (proofResponse != null) {
47+
log.info(
48+
"aggregation proof generated: aggregation={}",
49+
proofInProgress.unProvenAggregation.intervalString(),
50+
)
51+
52+
val provenAggregation = proofInProgress.unProvenAggregation.copy(aggregationProof = proofResponse)
53+
aggregationProofHandler.acceptNewAggregation(provenAggregation)
54+
.thenApply {
55+
proofRequestsInProgress.remove(proofInProgress)
56+
}
57+
} else {
58+
SafeFuture.completedFuture(Unit)
59+
}
60+
}
61+
} else {
62+
SafeFuture.completedFuture(Unit)
63+
}
64+
}
65+
}

coordinator/core/src/main/kotlin/net/consensys/zkevm/ethereum/coordination/aggregation/ProofAggregationCoordinatorService.kt

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,19 @@ package net.consensys.zkevm.ethereum.coordination.aggregation
22

33
import io.vertx.core.Vertx
44
import linea.LongRunningService
5-
import linea.contract.l2.L2MessageServiceSmartContractClientReadOnly
65
import linea.domain.BlockIntervals
76
import linea.domain.toBlockIntervalsString
8-
import linea.ethapi.EthApiClient
97
import linea.timer.TimerSchedule
108
import linea.timer.VertxPeriodicPollingService
119
import net.consensys.linea.async.AsyncRetryer
1210
import net.consensys.linea.metrics.LineaMetricsCategory
1311
import net.consensys.linea.metrics.MetricsFacade
1412
import net.consensys.zkevm.coordinator.clients.ProofAggregationProverClientV2
1513
import net.consensys.zkevm.domain.Aggregation
14+
import net.consensys.zkevm.domain.AggregationProofIndex
1615
import net.consensys.zkevm.domain.BlobAndBatchCounters
1716
import net.consensys.zkevm.domain.BlobsToAggregate
1817
import net.consensys.zkevm.domain.CompressionProofIndex
19-
import net.consensys.zkevm.domain.ProofToFinalize
2018
import net.consensys.zkevm.domain.ProofsToAggregate
2119
import net.consensys.zkevm.ethereum.coordination.blockcreation.SafeBlockProvider
2220
import org.apache.logging.log4j.LogManager
@@ -53,6 +51,13 @@ class ProofAggregationCoordinatorService(
5351
val proofGenerationRetryBackoffDelay: Duration,
5452
)
5553

54+
internal val aggregationProofPoller: AggregationProofPoller = AggregationProofPoller(
55+
aggregationProofClient = proofAggregationClient,
56+
aggregationProofHandler = aggregationProofHandler,
57+
log = log,
58+
vertx = vertx,
59+
)
60+
5661
private val pendingBlobs = ConcurrentLinkedQueue<BlobAndBatchCounters>()
5762
private val aggregationSizeInBlocksHistogram =
5863
metricsFacade.createHistogram(
@@ -186,28 +191,25 @@ class ProofAggregationCoordinatorService(
186191
)
187192
},
188193
) {
189-
log.debug("requesting aggregation proof: aggregation={}", blobsToAggregate.intervalString())
194+
log.debug("creating aggregation proof request: aggregation={}", blobsToAggregate.intervalString())
190195
aggregationProofCreation(blockIntervals, compressionProofIndexes)
191196
}
192-
.thenPeek {
193-
log.info("aggregation proof generated: aggregation={}", blobsToAggregate.intervalString())
194-
}
195-
.thenCompose { aggregationProof ->
196-
val aggregation =
197+
.thenApply { aggregationProofIndex ->
198+
val unProvenAggregation =
197199
Aggregation(
198200
startBlockNumber = blobsToAggregate.startBlockNumber,
199201
endBlockNumber = blobsToAggregate.endBlockNumber,
200202
batchCount = batchCount.toULong(),
201-
aggregationProof = aggregationProof,
203+
aggregationProof = null,
202204
)
203-
aggregationProofHandler.acceptNewAggregation(aggregation)
205+
aggregationProofPoller.addProofRequestsInProgressForPolling(aggregationProofIndex, unProvenAggregation)
204206
}
205207
}
206208

207209
private fun aggregationProofCreation(
208210
executionProofsIndexes: BlockIntervals,
209211
compressionProofIndexes: List<CompressionProofIndex>,
210-
): SafeFuture<ProofToFinalize> {
212+
): SafeFuture<AggregationProofIndex> {
211213
val blobsToAggregate = executionProofsIndexes.toBlockInterval()
212214
return aggregationL2StateProvider
213215
.getAggregationL2State(blockNumber = blobsToAggregate.startBlockNumber.toLong() - 1)
@@ -236,10 +238,10 @@ class ProofAggregationCoordinatorService(
236238
parentAggregationLastFtxRollingHash = rollingInfo.parentAggregationLastFtxRollingHash,
237239
)
238240
}
239-
.thenCompose(proofAggregationClient::requestProof)
241+
.thenCompose(proofAggregationClient::createProofRequest)
240242
.whenException {
241243
log.debug(
242-
"Error getting aggregation proof: aggregation={} errorMessage={}",
244+
"Error creating aggregation proof request: aggregation={} errorMessage={}",
243245
executionProofsIndexes.toBlockInterval().intervalString(),
244246
it.message,
245247
it,
@@ -263,8 +265,6 @@ class ProofAggregationCoordinatorService(
263265
aggregationL2StateProvider: AggregationL2StateProvider,
264266
consecutiveProvenBlobsProvider: ConsecutiveProvenBlobsProvider,
265267
proofAggregationClient: ProofAggregationProverClientV2,
266-
l2EthApiClient: EthApiClient,
267-
l2MessageService: L2MessageServiceSmartContractClientReadOnly,
268268
noL2ActivityTimeout: Duration,
269269
waitForNoL2ActivityToTriggerAggregation: Boolean,
270270
targetEndBlockNumbers: List<ULong>,
@@ -352,4 +352,16 @@ class ProofAggregationCoordinatorService(
352352
override fun handleError(error: Throwable) {
353353
log.error("Error polling blobs for aggregation: errorMessage={}", error.message, error)
354354
}
355+
356+
override fun start(): SafeFuture<Unit> {
357+
return aggregationProofPoller.start().thenCompose {
358+
super.start()
359+
}
360+
}
361+
362+
override fun stop(): SafeFuture<Unit> {
363+
return super.stop().thenCompose {
364+
aggregationProofPoller.stop()
365+
}
366+
}
355367
}

coordinator/core/src/test/kotlin/net/consensys/zkevm/ethereum/coordination/aggregation/ProofAggregationCoordinatorServiceTest.kt

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import net.consensys.linea.metrics.MetricsFacade
88
import net.consensys.linea.metrics.micrometer.MicrometerMetricsFacade
99
import net.consensys.zkevm.coordinator.clients.ProofAggregationProverClientV2
1010
import net.consensys.zkevm.domain.Aggregation
11+
import net.consensys.zkevm.domain.AggregationProofIndex
1112
import net.consensys.zkevm.domain.BlobAndBatchCounters
1213
import net.consensys.zkevm.domain.BlobCounters
1314
import net.consensys.zkevm.domain.BlobsToAggregate
@@ -98,6 +99,8 @@ class ProofAggregationCoordinatorServiceTest {
9899
metricsFacade = metricsFacade,
99100
invalidityProofProvider = mockInvalidityProofProvider,
100101
)
102+
proofAggregationCoordinatorService.aggregationProofPoller.start()
103+
101104
verify(mockAggregationCalculator).onAggregation(proofAggregationCoordinatorService)
102105

103106
val blob1 = listOf(createBlob(11u, 19u), createBlob(20u, 33u), createBlob(34u, 41u))
@@ -242,6 +245,11 @@ class ProofAggregationCoordinatorServiceTest {
242245
batchCount = compressionBlobs1.sumOf { it.blobCounters.numberOfBatches }.toULong(),
243246
aggregationProof = aggregationProof1,
244247
)
248+
val aggregation1ProofIndex = AggregationProofIndex(
249+
startBlockNumber = aggregation1.startBlockNumber,
250+
endBlockNumber = aggregation1.endBlockNumber,
251+
hash = Random.nextBytes(32),
252+
)
245253

246254
val aggregation2 =
247255
Aggregation(
@@ -250,18 +258,33 @@ class ProofAggregationCoordinatorServiceTest {
250258
batchCount = compressionBlobs2.sumOf { it.blobCounters.numberOfBatches }.toULong(),
251259
aggregationProof = aggregationProof2,
252260
)
261+
val aggregation2ProofIndex = AggregationProofIndex(
262+
startBlockNumber = aggregation2.startBlockNumber,
263+
endBlockNumber = aggregation2.endBlockNumber,
264+
hash = Random.nextBytes(32),
265+
)
253266

254-
whenever(mockProofAggregationClient.requestProof(any()))
267+
whenever(mockProofAggregationClient.createProofRequest(any()))
255268
.thenAnswer {
256269
if (it.getArgument<ProofsToAggregate>(0) == proofsToAggregate1) {
257-
SafeFuture.completedFuture(aggregationProof1)
270+
SafeFuture.completedFuture(aggregation1ProofIndex)
258271
} else if (it.getArgument<ProofsToAggregate>(0) == proofsToAggregate2) {
259-
SafeFuture.completedFuture(aggregationProof2)
272+
SafeFuture.completedFuture(aggregation2ProofIndex)
260273
} else {
261274
throw IllegalStateException()
262275
}
263276
}
264277

278+
whenever(mockProofAggregationClient.findProofResponse(any<AggregationProofIndex>()))
279+
.thenAnswer {
280+
val proofIndex = it.getArgument<AggregationProofIndex>(0)
281+
when (proofIndex) {
282+
aggregation1ProofIndex -> SafeFuture.completedFuture(aggregationProof1)
283+
aggregation2ProofIndex -> SafeFuture.completedFuture(aggregationProof2)
284+
else -> SafeFuture.completedFuture(null)
285+
}
286+
}
287+
265288
whenever(
266289
mockAggregationsRepository.findHighestConsecutiveEndBlockNumber(
267290
aggregation1.startBlockNumber.toLong(),
@@ -301,7 +324,8 @@ class ProofAggregationCoordinatorServiceTest {
301324
assertThat(meterRegistry.summary("aggregation.blocks.size").max()).isEqualTo(23.0)
302325
assertThat(meterRegistry.summary("aggregation.batches.size").max()).isEqualTo(6.0)
303326
assertThat(meterRegistry.summary("aggregation.blobs.size").max()).isEqualTo(2.0)
304-
verify(mockProofAggregationClient).requestProof(proofsToAggregate1)
327+
verify(mockProofAggregationClient).createProofRequest(proofsToAggregate1)
328+
verify(mockProofAggregationClient).findProofResponse(aggregation1ProofIndex)
305329
verify(mockAggregationsRepository).saveNewAggregation(aggregation1)
306330
assertThat(provenAggregation).isEqualTo(aggregation1.endBlockNumber)
307331
}
@@ -319,7 +343,8 @@ class ProofAggregationCoordinatorServiceTest {
319343
assertThat(meterRegistry.summary("aggregation.blocks.size").max()).isEqualTo(27.0)
320344
assertThat(meterRegistry.summary("aggregation.batches.size").max()).isEqualTo(6.0)
321345
assertThat(meterRegistry.summary("aggregation.blobs.size").max()).isEqualTo(2.0)
322-
verify(mockProofAggregationClient).requestProof(proofsToAggregate2)
346+
verify(mockProofAggregationClient).createProofRequest(proofsToAggregate2)
347+
verify(mockProofAggregationClient).findProofResponse(aggregation2ProofIndex)
323348
verify(mockAggregationsRepository).saveNewAggregation(aggregation2)
324349
assertThat(provenAggregation).isEqualTo(aggregation2.endBlockNumber)
325350
}

0 commit comments

Comments
 (0)