Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions crates/op-rbuilder/src/builders/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,11 @@ where
}

// This channel coordinates flashblock building
let (fb_cancel_token_rx, mut fb_cancel_token_tx) =
let (fb_cancel_token_tx, mut fb_cancel_token_rx) =
mpsc::channel((self.config.flashblocks_per_block() + 1) as usize);
self.spawn_timer_task(
block_cancel.clone(),
fb_cancel_token_rx,
fb_cancel_token_tx,
first_flashblock_offset,
);
// Process flashblocks in a blocking loop
Expand All @@ -393,7 +393,7 @@ where
// Cancellation of this token means that we need to stop building flashblock.
// If channel return None it means that we built all flashblock or parent_token got cancelled
let fb_cancel_token =
tokio::task::block_in_place(|| fb_cancel_token_tx.blocking_recv()).flatten();
tokio::task::block_in_place(|| fb_cancel_token_rx.blocking_recv()).flatten();

match fb_cancel_token {
Some(cancel_token) => {
Expand Down Expand Up @@ -499,6 +499,8 @@ where
// Track invalid/bad block
ctx.metrics.invalid_blocks_count.increment(1);
error!(target: "payload_builder", "Failed to build block {}, flashblock {}: {}", ctx.block_number(), ctx.flashblock_index(), err);
// Signal cancellation of the block building job
block_cancel.cancel();
// Return the error
return Err(err);
}
Expand Down Expand Up @@ -616,7 +618,7 @@ where
pub fn spawn_timer_task(
&self,
block_cancel: CancellationToken,
flashblock_cancel_token_rx: Sender<Option<CancellationToken>>,
fb_cancel_token_tx: Sender<Option<CancellationToken>>,
first_flashblock_offset: Duration,
) {
let interval = self.config.specific.interval;
Expand All @@ -627,7 +629,7 @@ where
let mut timer = tokio::time::interval(first_flashblock_offset);
timer.tick().await;
let child_token = block_cancel.child_token();
flashblock_cancel_token_rx
fb_cancel_token_tx
.send(Some(child_token.clone()))
.await?;
timer.tick().await;
Expand All @@ -640,7 +642,7 @@ where
// Initiate fb job
let child_token = block_cancel.child_token();
debug!(target: "payload_builder", "Sending child cancel token to execution loop");
flashblock_cancel_token_rx
fb_cancel_token_tx
.send(Some(child_token.clone()))
.await?;
timer.tick().await;
Expand Down
Loading