From b179a477b579dd37ea8a2ee24c4ccaaa5770de8e Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 26 May 2026 06:34:35 +0800 Subject: [PATCH 1/4] fix: tokio::spawn futures for FuturesOrdered --- src/rpc/methods/eth.rs | 35 +++++++++++++++--------------- src/rpc/methods/eth/filter/mod.rs | 26 +++++++++++++++------- src/rpc/methods/misc.rs | 6 ++--- src/rpc/methods/state.rs | 5 +++-- src/utils/proofs_api/paramfetch.rs | 15 +++++++------ 5 files changed, 50 insertions(+), 37 deletions(-) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 560573915e60..55ccc72f7391 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -3259,17 +3259,18 @@ impl RpcMethod<1> for EthGetLogs { (eth_filter,): Self::Params, _: &http::Extensions, ) -> Result { - let pf = ctx - .eth_event_handler - .parse_eth_filter_spec(&ctx, ð_filter) - .map_err(|e| { - if e.downcast_ref::() - .is_some_and(|eth_err| matches!(eth_err, EthErrors::BlockRangeExceeded { .. })) - { - return e; - } - e.context("failed to parse events for filter") - })?; + let pf = Arc::new( + ctx.eth_event_handler + .parse_eth_filter_spec(&ctx, ð_filter) + .map_err(|e| { + if e.downcast_ref::().is_some_and(|eth_err| { + matches!(eth_err, EthErrors::BlockRangeExceeded { .. }) + }) { + return e; + } + e.context("failed to parse events for filter") + })?, + ); let events = ctx .eth_event_handler .get_events_for_parsed_filter(&ctx, &pf, SkipEvent::OnUnresolvedAddress) @@ -3302,7 +3303,7 @@ impl RpcMethod<1> for EthGetFilterLogs { .eth_event_handler .get_events_for_parsed_filter( &ctx, - &event_filter.into(), + &Arc::new(event_filter.into()), SkipEvent::OnUnresolvedAddress, ) .await?; @@ -3353,7 +3354,7 @@ impl RpcMethod<1> for EthGetFilterChanges { .eth_event_handler .get_events_for_parsed_filter( &ctx, - &event_filter.into(), + &Arc::new(event_filter.into()), SkipEvent::OnUnresolvedAddress, ) .await?; @@ -3378,7 +3379,7 @@ impl RpcMethod<1> for EthGetFilterChanges { .eth_event_handler .get_events_for_parsed_filter( &ctx, - &ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( + &Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( // heaviest tipset doesn't have events because its messages haven't been executed yet RangeInclusive::new( tipset_filter @@ -3387,7 +3388,7 @@ impl RpcMethod<1> for EthGetFilterChanges { // Use -1 to indicate that the range extends until the latest available tipset. -1, ), - )), + ))), SkipEvent::OnUnresolvedAddress, ) .await?; @@ -3410,7 +3411,7 @@ impl RpcMethod<1> for EthGetFilterChanges { .eth_event_handler .get_events_for_parsed_filter( &ctx, - &ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( + &Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( // heaviest tipset doesn't have events because its messages haven't been executed yet RangeInclusive::new( mempool_filter @@ -3419,7 +3420,7 @@ impl RpcMethod<1> for EthGetFilterChanges { // Use -1 to indicate that the range extends until the latest available tipset. -1, ), - )), + ))), SkipEvent::OnUnresolvedAddress, ) .await?; diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index caf8ebc16a8a..c77bfbd839ba 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -28,6 +28,7 @@ use crate::blocks::Tipset; use crate::blocks::TipsetKey; use crate::chain::index::ResolveNullTipset; use crate::cli_shared::cli::EventsConfig; +use crate::prelude::*; use crate::rpc::eth::EVM_WORD_LENGTH; use crate::rpc::eth::errors::EthErrors; use crate::rpc::eth::filter::event::*; @@ -44,8 +45,7 @@ use crate::state_manager::StateManager; use crate::state_manager::{ExecutedMessage, ExecutedTipset}; use crate::utils::misc::env::env_or_default; use ahash::AHashMap as HashMap; -use anyhow::{Context, Error, anyhow, bail, ensure}; -use cid::Cid; +use anyhow::{Error, anyhow, bail, ensure}; use futures::{TryStreamExt as _, stream::FuturesOrdered}; use fvm_ipld_encoding::IPLD_RAW; use serde::*; @@ -58,6 +58,7 @@ use store::*; /// Implementors of this trait define custom logic to determine whether an event matches the filtering criteria /// based on the event emitter's address and its associated entries. /// +#[auto_impl::auto_impl(&, Arc)] pub trait Matcher { /// # Parameters /// - `emitter_addr`: The address of the Actor that emitted the event, along with the associated event entries. @@ -291,22 +292,31 @@ impl EthEventHandler { pub async fn collect_events_for_tipsets( ctx: &Ctx, tipsets: impl Iterator, - spec: Option<&impl Matcher>, + spec: Option<&(impl Matcher + ShallowClone + Send + Sync + 'static)>, skip_event: SkipEvent, collected_events: &mut Vec, ) -> anyhow::Result<()> { let mut tasks = FuturesOrdered::new(); for tipset in tipsets { - tasks.push_back(async move { + let state_manager = ctx.state_manager.shallow_clone(); + let spec = spec.as_ref().map(|v| v.shallow_clone()); + tasks.push_back(tokio::spawn(async move { let mut events = vec![]; - Self::collect_events(&ctx.state_manager, &tipset, spec, skip_event, &mut events) - .await?; + Self::collect_events( + &state_manager, + &tipset, + spec.as_ref(), + skip_event, + &mut events, + ) + .await?; anyhow::Ok(events) - }); + })); } let max_filter_results = ctx.eth_event_handler.max_filter_results; let mut tipsets_contributing = 0usize; while let Some(events) = tasks.try_next().await? { + let events = events?; if !events.is_empty() { tipsets_contributing += 1; } @@ -434,7 +444,7 @@ impl EthEventHandler { pub async fn get_events_for_parsed_filter( &self, ctx: &Ctx, - pf: &ParsedFilter, + pf: &Arc, skip_event: SkipEvent, ) -> anyhow::Result> { let mut collected_events = vec![]; diff --git a/src/rpc/methods/misc.rs b/src/rpc/methods/misc.rs index e456009c5204..deac177e441a 100644 --- a/src/rpc/methods/misc.rs +++ b/src/rpc/methods/misc.rs @@ -3,11 +3,11 @@ use std::collections::BTreeMap; -use cid::Cid; use enumflags2::BitFlags; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::prelude::*; use crate::rpc::eth::CollectedEvent; use crate::rpc::eth::filter::{ParsedFilter, SkipEvent}; use crate::{ @@ -35,11 +35,11 @@ impl RpcMethod<1> for GetActorEventsRaw { _: &http::Extensions, ) -> Result { if let Some(filter) = filter { - let parsed_filter = ParsedFilter::from_actor_event_filter( + let parsed_filter = Arc::new(ParsedFilter::from_actor_event_filter( ctx.chain_store().heaviest_tipset().epoch(), ctx.eth_event_handler.max_filter_height_range, filter, - )?; + )?); let events = ctx .eth_event_handler .get_events_for_parsed_filter(&ctx, &parsed_filter, SkipEvent::Never) diff --git a/src/rpc/methods/state.rs b/src/rpc/methods/state.rs index e16367ba31f9..7a6b15cdca95 100644 --- a/src/rpc/methods/state.rs +++ b/src/rpc/methods/state.rs @@ -1594,7 +1594,7 @@ impl RpcMethod<3> for ForestStateCompute { { let chain_store = ctx.chain_store().shallow_clone(); let network_context = ctx.sync_network_context.shallow_clone(); - futures.push_front(async move { + futures.push_front(tokio::spawn(async move { if crate::chain_sync::load_full_tipset(&chain_store, ts.key()).is_err() { // Backfill full tipset from the network const MAX_RETRIES: usize = 5; @@ -1612,11 +1612,12 @@ impl RpcMethod<3> for ForestStateCompute { fts.persist(chain_store.db())?; } anyhow::Ok(ts) - }); + })); } let mut results = Vec::with_capacity(n_epochs as _); while let Some(ts) = futures.try_next().await? { + let ts = ts?; let epoch = ts.epoch(); let tipset_key = ts.key().clone(); if !force_recompute { diff --git a/src/utils/proofs_api/paramfetch.rs b/src/utils/proofs_api/paramfetch.rs index d16833de36b8..00d34a7072e8 100644 --- a/src/utils/proofs_api/paramfetch.rs +++ b/src/utils/proofs_api/paramfetch.rs @@ -19,10 +19,11 @@ use crate::{ }; use anyhow::{Context, bail}; use backon::{ExponentialBuilder, Retryable}; -use futures::{AsyncWriteExt, TryStreamExt, stream::FuturesUnordered}; +use futures::{AsyncWriteExt, TryStreamExt}; use tokio::{ fs::{self}, sync::Mutex, + task::JoinSet, }; use tracing::{debug, info, warn}; @@ -95,7 +96,7 @@ pub async fn get_params( let params: ParameterMap = serde_json::from_str(param_json)?; - FuturesUnordered::from_iter( + JoinSet::from_iter( params .into_iter() .filter(|(name, info)| match storage_size { @@ -105,13 +106,13 @@ pub async fn get_params( } SectorSizeOpt::All => true, }) - .map(|(name, info)| async move { - let data_dir_clone = data_dir.to_owned(); - fetch_verify_params(&data_dir_clone, &name, Arc::new(info)).await + .map(|(name, info)| { + let data_dir = data_dir.to_owned(); + async move { fetch_verify_params(&data_dir, &name, Arc::new(info)).await } }), ) - .try_collect::>() - .await?; + .join_all() + .await; Ok(()) } From 34c56537e8894b5bc5ce699e1bcf5ca6b5469f48 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 26 May 2026 08:32:35 +0800 Subject: [PATCH 2/4] abort on error --- src/rpc/methods/eth/filter/mod.rs | 8 ++++++-- src/utils/mod.rs | 1 + src/utils/proofs_api/paramfetch.rs | 10 ++++++---- src/utils/task/mod.rs | 16 ++++++++++++++++ 4 files changed, 29 insertions(+), 6 deletions(-) create mode 100644 src/utils/task/mod.rs diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index c77bfbd839ba..9555db7b1bc9 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -44,6 +44,7 @@ use crate::shim::executor::{Entry, StampedEvent}; use crate::state_manager::StateManager; use crate::state_manager::{ExecutedMessage, ExecutedTipset}; use crate::utils::misc::env::env_or_default; +use crate::utils::task::AbortHandles; use ahash::AHashMap as HashMap; use anyhow::{Error, anyhow, bail, ensure}; use futures::{TryStreamExt as _, stream::FuturesOrdered}; @@ -297,10 +298,11 @@ impl EthEventHandler { collected_events: &mut Vec, ) -> anyhow::Result<()> { let mut tasks = FuturesOrdered::new(); + let mut abort_handles = AbortHandles::default(); for tipset in tipsets { let state_manager = ctx.state_manager.shallow_clone(); let spec = spec.as_ref().map(|v| v.shallow_clone()); - tasks.push_back(tokio::spawn(async move { + let task = tokio::spawn(async move { let mut events = vec![]; Self::collect_events( &state_manager, @@ -311,7 +313,9 @@ impl EthEventHandler { ) .await?; anyhow::Ok(events) - })); + }); + abort_handles.push(task.abort_handle()); + tasks.push_back(task); } let max_filter_results = ctx.eth_event_handler.max_filter_results; let mut tipsets_contributing = 0usize; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 5c0dce2a7f82..5809bafc040b 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -18,6 +18,7 @@ pub mod proofs_api; pub mod rand; pub mod reqwest_resume; mod shallow_clone; +pub mod task; pub use shallow_clone::ShallowClone; #[cfg(feature = "sqlite")] pub mod sqlite; diff --git a/src/utils/proofs_api/paramfetch.rs b/src/utils/proofs_api/paramfetch.rs index 00d34a7072e8..51f16bd8ca1e 100644 --- a/src/utils/proofs_api/paramfetch.rs +++ b/src/utils/proofs_api/paramfetch.rs @@ -96,7 +96,7 @@ pub async fn get_params( let params: ParameterMap = serde_json::from_str(param_json)?; - JoinSet::from_iter( + let mut tasks = JoinSet::from_iter( params .into_iter() .filter(|(name, info)| match storage_size { @@ -110,9 +110,11 @@ pub async fn get_params( let data_dir = data_dir.to_owned(); async move { fetch_verify_params(&data_dir, &name, Arc::new(info)).await } }), - ) - .join_all() - .await; + ); + + while let Some(task) = tasks.join_next().await { + task??; + } Ok(()) } diff --git a/src/utils/task/mod.rs b/src/utils/task/mod.rs new file mode 100644 index 000000000000..d4bb5ffaec87 --- /dev/null +++ b/src/utils/task/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use tokio::task::AbortHandle; + +/// Holds a collection of [`AbortHandle`] and aborts them automatically on drop +#[derive(Default, derive_more::Deref, derive_more::DerefMut)] +pub struct AbortHandles(Vec); + +impl Drop for AbortHandles { + fn drop(&mut self) { + for handle in self.0.iter() { + handle.abort(); + } + } +} From bdb1f9318d0ee3e309858480286560a4071d2465 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 26 May 2026 17:02:05 +0800 Subject: [PATCH 3/4] revert changes in paramfetch --- src/utils/proofs_api/paramfetch.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/utils/proofs_api/paramfetch.rs b/src/utils/proofs_api/paramfetch.rs index 51f16bd8ca1e..d16833de36b8 100644 --- a/src/utils/proofs_api/paramfetch.rs +++ b/src/utils/proofs_api/paramfetch.rs @@ -19,11 +19,10 @@ use crate::{ }; use anyhow::{Context, bail}; use backon::{ExponentialBuilder, Retryable}; -use futures::{AsyncWriteExt, TryStreamExt}; +use futures::{AsyncWriteExt, TryStreamExt, stream::FuturesUnordered}; use tokio::{ fs::{self}, sync::Mutex, - task::JoinSet, }; use tracing::{debug, info, warn}; @@ -96,7 +95,7 @@ pub async fn get_params( let params: ParameterMap = serde_json::from_str(param_json)?; - let mut tasks = JoinSet::from_iter( + FuturesUnordered::from_iter( params .into_iter() .filter(|(name, info)| match storage_size { @@ -106,15 +105,13 @@ pub async fn get_params( } SectorSizeOpt::All => true, }) - .map(|(name, info)| { - let data_dir = data_dir.to_owned(); - async move { fetch_verify_params(&data_dir, &name, Arc::new(info)).await } + .map(|(name, info)| async move { + let data_dir_clone = data_dir.to_owned(); + fetch_verify_params(&data_dir_clone, &name, Arc::new(info)).await }), - ); - - while let Some(task) = tasks.join_next().await { - task??; - } + ) + .try_collect::>() + .await?; Ok(()) } From 8347575bfe08b89505ba6658222702e0b75aa9a7 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 26 May 2026 17:18:57 +0800 Subject: [PATCH 4/4] Update src/utils/task/mod.rs Co-authored-by: Hubert --- src/utils/task/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/task/mod.rs b/src/utils/task/mod.rs index d4bb5ffaec87..23141b8f311b 100644 --- a/src/utils/task/mod.rs +++ b/src/utils/task/mod.rs @@ -9,7 +9,7 @@ pub struct AbortHandles(Vec); impl Drop for AbortHandles { fn drop(&mut self) { - for handle in self.0.iter() { + for handle in self.iter() { handle.abort(); } }