Skip to content

Commit d95fa03

Browse files
authored
Feat: replace BlockCell with watch channel (#397)
* make PayloadBuilder try_build async * replace BlockCell with oneshot channel * replace best payload oneshot to watch channel * test: ResolvePayload fut
1 parent 0de1a59 commit d95fa03

2 files changed

Lines changed: 133 additions & 181 deletions

File tree

crates/op-rbuilder/src/builders/flashblocks/payload.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
config::FlashBlocksConfigExt,
1111
timing::FlashblockScheduler,
1212
},
13-
generator::{BlockCell, BuildArguments, PayloadBuilder},
13+
generator::{BuildArguments, PayloadBuilder},
1414
},
1515
gas_limiter::AddressGasLimiter,
1616
metrics::OpRBuilderMetrics,
@@ -52,7 +52,7 @@ use reth_transaction_pool::TransactionPool;
5252
use reth_trie::{HashedPostState, updates::TrieUpdates};
5353
use revm::Database;
5454
use std::{collections::BTreeMap, sync::Arc, time::Instant};
55-
use tokio::sync::mpsc;
55+
use tokio::sync::{mpsc, watch};
5656
use tokio_util::sync::CancellationToken;
5757
use tracing::{debug, error, info, metadata::Level, span, warn};
5858

@@ -401,7 +401,7 @@ where
401401
async fn build_payload(
402402
&self,
403403
args: BuildArguments<OpPayloadBuilderAttributes<OpTransactionSigned>, OpBuiltPayload>,
404-
best_payload: BlockCell<OpBuiltPayload>,
404+
best_payload_tx: watch::Sender<Option<OpBuiltPayload>>,
405405
) -> Result<(), PayloadBuilderError> {
406406
let block_build_start_time = Instant::now();
407407
let BuildArguments {
@@ -499,7 +499,7 @@ where
499499
"Failed to send updated payload"
500500
);
501501
}
502-
best_payload.set(payload);
502+
best_payload_tx.send_replace(Some(payload));
503503

504504
info!(
505505
target: "payload_builder",
@@ -672,7 +672,6 @@ where
672672
&state_provider,
673673
&mut best_txs,
674674
&block_cancel,
675-
&best_payload,
676675
);
677676

678677
let cache = std::mem::take(&mut state.cache);
@@ -684,7 +683,10 @@ where
684683
transition = new_transition;
685684

686685
let next_flashblock_state = match build_result {
687-
Ok(Some(next_flashblock_state)) => next_flashblock_state,
686+
Ok(Some((next_flashblock_state, new_payload))) => {
687+
best_payload_tx.send_replace(Some(new_payload));
688+
next_flashblock_state
689+
}
688690
Ok(None) => {
689691
self.record_flashblocks_metrics(
690692
&ctx,
@@ -726,8 +728,7 @@ where
726728
state_provider: impl reth::providers::StateProvider + Clone,
727729
best_txs: &mut NextFlashblockPoolTxCursor<'a, Pool>,
728730
block_cancel: &CancellationToken,
729-
best_payload: &BlockCell<OpBuiltPayload>,
730-
) -> eyre::Result<Option<FlashblocksState>> {
731+
) -> eyre::Result<Option<(FlashblocksState, OpBuiltPayload)>> {
731732
let flashblock_index = fb_state.flashblock_index();
732733
let mut target_gas_for_batch = fb_state.target_gas_for_batch();
733734
let mut target_da_for_batch = fb_state.target_da_for_batch();
@@ -893,8 +894,6 @@ where
893894
"Failed to send updated payload"
894895
);
895896
}
896-
best_payload.set(new_payload);
897-
898897
// Record flashblock build duration
899898
ctx.metrics
900899
.flashblock_build_duration
@@ -944,7 +943,7 @@ where
944943
"Flashblock built"
945944
);
946945

947-
Ok(Some(next_flashblock_state))
946+
Ok(Some((next_flashblock_state, new_payload)))
948947
}
949948
}
950949
}
@@ -998,9 +997,9 @@ where
998997
async fn try_build(
999998
&self,
1000999
args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
1001-
best_payload: BlockCell<Self::BuiltPayload>,
1000+
best_payload_tx: watch::Sender<Option<Self::BuiltPayload>>,
10021001
) -> Result<(), PayloadBuilderError> {
1003-
self.build_payload(args, best_payload).await
1002+
self.build_payload(args, best_payload_tx).await
10041003
}
10051004
}
10061005

0 commit comments

Comments
 (0)