From a4b5af277d8d87e70fc0d004fa912c84dd2ee480 Mon Sep 17 00:00:00 2001 From: Hubert Bugaj Date: Mon, 13 Apr 2026 11:01:12 +0200 Subject: [PATCH 1/5] fix: run tipset validation on a blocking task --- src/daemon/mod.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 5dec2067c999..6fc3deed23be 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -176,15 +176,17 @@ async fn maybe_import_snapshot( .snapshot_head .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch()); assert!(current_height.is_positive()); - match validate_from.is_negative() { - // allow --height=-1000 to scroll back from the current head - true => ctx - .state_manager - .validate_range((current_height + validate_from)..=current_height)?, - false => ctx - .state_manager - .validate_range(validate_from..=current_height)?, - } + // allow --height=-1000 to scroll back from the current head + let start = if validate_from.is_negative() { + current_height + validate_from + } else { + validate_from + }; + // `validate_range` is CPU-bound (drives rayon-parallel VM execution) and + // can run for minutes. Safer to spawn it on a blocking thread. + let state_manager = ctx.state_manager.clone(); + tokio::task::spawn_blocking(move || state_manager.validate_range(start..=current_height)) + .await??; } Ok(()) From 2c9b847bfcd527e613afbdb69ed1fb1dc7930287 Mon Sep 17 00:00:00 2001 From: Hubert Bugaj Date: Mon, 13 Apr 2026 14:53:48 +0200 Subject: [PATCH 2/5] no parallel tipset validation --- src/state_manager/mod.rs | 85 +++++++++++++++++++--------------------- 1 file changed, 40 insertions(+), 45 deletions(-) diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index 9586618b0193..0e3b1bffc123 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -75,7 +75,6 @@ use itertools::Itertools as _; use nonzero_ext::nonzero; use num::BigInt; use num_traits::identities::Zero; -use rayon::prelude::ParallelBridge; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::ops::RangeInclusive; @@ -1645,8 +1644,10 @@ where /// Validates all tipsets at epoch `start..=end` behind the heaviest tipset. /// - /// This spawns [`rayon::current_num_threads`] threads to do the compute-heavy work - /// of tipset validation. + /// Tipsets are processed sequentially. The compute-intensive work inside each + /// tipset (`bellperson` proof verification, FVM batch seal verification, etc.) + /// is already heavily rayon-parallelized. Parallelizing the outer loop actually introduces + /// some issues due to locks in the aforementioned crates. So don't do it. /// /// # What is validation? /// Every state transition returns a new _state root_, which is typically retained in, e.g., snapshots. @@ -1662,10 +1663,6 @@ where /// - assert that they match /// /// See [`Self::compute_tipset_state_blocking`] for an explanation of state transitions. - /// - /// # Known issues - /// This function is blocking, but we do observe threads waiting and synchronizing. - /// This is suspected to be due something in the VM or its `WASM` runtime. #[tracing::instrument(skip(self))] pub fn validate_range(&self, epochs: RangeInclusive) -> anyhow::Result<()> { let heaviest = self.heaviest_tipset(); @@ -1852,44 +1849,42 @@ where DB: Blockstore + Send + Sync + 'static, T: Iterator + Send, { - use rayon::iter::ParallelIterator as _; - tipsets - .tuple_windows() - .par_bridge() - .try_for_each(|(child, parent)| { - info!(height = parent.epoch(), "compute parent state"); - let ExecutedTipset { - state_root: actual_state, - receipt_root: actual_receipt, - .. - } = apply_block_messages( - genesis_timestamp, - chain_index.shallow_clone(), - chain_config.shallow_clone(), - beacon.shallow_clone(), - engine, - parent, - NO_CALLBACK, - VMTrace::NotTraced, - ) - .context("couldn't compute tipset state")?; - let expected_receipt = child.min_ticket_block().message_receipts; - let expected_state = child.parent_state(); - match (expected_state, expected_receipt) == (&actual_state, actual_receipt) { - true => Ok(()), - false => { - error!( - height = child.epoch(), - ?expected_state, - ?expected_receipt, - ?actual_state, - ?actual_receipt, - "state mismatch" - ); - bail!("state mismatch"); - } - } - }) + // Validate one tipset at a time. Parallelizing the outer loop across tipsets + // might wedge the global rayon pool. + // Sequential outer iteration leaves the entire rayon pool free for that + // already-rich inner parallelism. + for (child, parent) in tipsets.tuple_windows() { + info!(height = parent.epoch(), "compute parent state"); + let ExecutedTipset { + state_root: actual_state, + receipt_root: actual_receipt, + .. + } = apply_block_messages( + genesis_timestamp, + chain_index.shallow_clone(), + chain_config.shallow_clone(), + beacon.shallow_clone(), + engine, + parent, + NO_CALLBACK, + VMTrace::NotTraced, + ) + .context("couldn't compute tipset state")?; + let expected_receipt = child.min_ticket_block().message_receipts; + let expected_state = child.parent_state(); + if (expected_state, expected_receipt) != (&actual_state, actual_receipt) { + error!( + height = child.epoch(), + ?expected_state, + ?expected_receipt, + ?actual_state, + ?actual_receipt, + "state mismatch" + ); + bail!("state mismatch"); + } + } + Ok(()) } /// Shared context for creating VMs and preparing tipset state. From dea3da41d34cf31a46bdd317c83115f480f887b7 Mon Sep 17 00:00:00 2001 From: Hubert Bugaj Date: Mon, 13 Apr 2026 16:12:27 +0200 Subject: [PATCH 3/5] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8404b43a573..7cfea4f891f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,8 @@ - [#6856](https://github.com/ChainSafe/forest/pull/6856): Return ethereum compatible error `BlockRangeExceeded` with code `-32005` when block range exceeds in the eth filter and logs API. +- [#6898](https://github.com/ChainSafe/forest/pull/6898): Fixed occassional lock contention during snapshot validation. + ## Forest v0.32.4 "Mild Inconvenience" This is a non-mandatory release for all node operators. It enables F3 finality resolution on ETH v1 RPC methods. From a41ef107a76bf7483da768e7a463aa199f79cb71 Mon Sep 17 00:00:00 2001 From: Hubert Date: Mon, 13 Apr 2026 16:38:41 +0200 Subject: [PATCH 4/5] Update CHANGELOG.md Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cfea4f891f4..2839e686f8fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,7 +55,7 @@ - [#6856](https://github.com/ChainSafe/forest/pull/6856): Return ethereum compatible error `BlockRangeExceeded` with code `-32005` when block range exceeds in the eth filter and logs API. -- [#6898](https://github.com/ChainSafe/forest/pull/6898): Fixed occassional lock contention during snapshot validation. +- [`#6893`](https://github.com/ChainSafe/forest/issues/6893): Fixed occasional lock contention during tipset validation. ## Forest v0.32.4 "Mild Inconvenience" From c45ef8179db8a46720b55c6f490457eec5c3ba7b Mon Sep 17 00:00:00 2001 From: Hubert Bugaj Date: Mon, 13 Apr 2026 17:56:42 +0200 Subject: [PATCH 5/5] unit tests for validation range --- src/daemon/mod.rs | 78 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 70 insertions(+), 8 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 6fc3deed23be..46f6668150a9 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -175,23 +175,48 @@ async fn maybe_import_snapshot( .client .snapshot_head .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch()); - assert!(current_height.is_positive()); - // allow --height=-1000 to scroll back from the current head - let start = if validate_from.is_negative() { - current_height + validate_from - } else { - validate_from - }; + + let validation_range = validation_range(current_height, validate_from)?; // `validate_range` is CPU-bound (drives rayon-parallel VM execution) and // can run for minutes. Safer to spawn it on a blocking thread. let state_manager = ctx.state_manager.clone(); - tokio::task::spawn_blocking(move || state_manager.validate_range(start..=current_height)) + tokio::task::spawn_blocking(move || state_manager.validate_range(validation_range)) .await??; } Ok(()) } +/// Returns the range of epochs to validate. This includes special handling for negative `from` +/// values, which are interpreted as offsets from the current epoch. +fn validation_range( + current: ChainEpoch, + from: ChainEpoch, +) -> anyhow::Result> { + anyhow::ensure!( + current.is_positive(), + "current head epoch {current} is invalid" + ); + + // Negative values scroll back from the current head (e.g. --height=-1000). + // `saturating_add` + `.max(0)` keeps extreme negatives from underflowing or + // wrapping to a huge positive (which would silently produce an empty range). + let start = if from.is_negative() { + current.saturating_add(from).max(0) + } else { + from + }; + + // An absolute `--height` past the head would otherwise produce an empty + // range and silently succeed without validating anything. + anyhow::ensure!( + start <= current, + "requested validation start epoch {start} is beyond the current head at epoch {current}", + ); + + Ok(start..=current) +} + async fn maybe_start_metrics_service( services: &mut JoinSet>, config: &Config, @@ -822,3 +847,40 @@ where { tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") }) } + +#[cfg(test)] +mod tests { + use rstest::rstest; + + use super::*; + + #[rstest] + #[case::current_non_positive(0, 1, anyhow::Result::Err(anyhow::anyhow!( + "current head epoch 0 is invalid" + )))] + #[case::current_non_positive(-1, 1, anyhow::Result::Err(anyhow::anyhow!( + "current head epoch 0 is invalid" + )))] + #[case::from_positive_beyond_head(10, 11, anyhow::Result::Err(anyhow::anyhow!( + "requested validation start epoch 11 is beyond the current head at epoch 10" + )))] + #[case::from_positive_within_range(10, 5, anyhow::Result::Ok(5..=10))] + #[case::from_zero(10, 0, anyhow::Result::Ok(0..=10))] + #[case::from_negative_within_range(10, -5, anyhow::Result::Ok(5..=10))] + #[case::from_negative_beyond_range(10, -15, anyhow::Result::Ok(0..=10))] + fn test_validation_range( + #[case] current: ChainEpoch, + #[case] from: ChainEpoch, + #[case] expected: anyhow::Result>, + ) { + let result = validation_range(current, from); + match expected { + Ok(expected_range) => { + assert_eq!(result.unwrap(), expected_range); + } + Err(_) => { + assert!(result.is_err()); + } + } + } +}