diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 05a9508dd6d4..a5a0d3027f81 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -3271,17 +3271,18 @@ impl RpcMethod<1> for EthGetLogs { (eth_filter,): Self::Params, _: &http::Extensions, ) -> Result { - let pf = ctx - .eth_event_handler - .parse_eth_filter_spec(&ctx, ð_filter) - .map_err(|e| { - if e.downcast_ref::() - .is_some_and(|eth_err| matches!(eth_err, EthErrors::BlockRangeExceeded { .. })) - { - return e; - } - e.context("failed to parse events for filter") - })?; + let pf = Arc::new( + ctx.eth_event_handler + .parse_eth_filter_spec(&ctx, ð_filter) + .map_err(|e| { + if e.downcast_ref::().is_some_and(|eth_err| { + matches!(eth_err, EthErrors::BlockRangeExceeded { .. }) + }) { + return e; + } + e.context("failed to parse events for filter") + })?, + ); let events = ctx .eth_event_handler .get_events_for_parsed_filter(&ctx, &pf, SkipEvent::OnUnresolvedAddress) @@ -3314,7 +3315,7 @@ impl RpcMethod<1> for EthGetFilterLogs { .eth_event_handler .get_events_for_parsed_filter( &ctx, - &event_filter.into(), + &Arc::new(event_filter.into()), SkipEvent::OnUnresolvedAddress, ) .await?; @@ -3365,7 +3366,7 @@ impl RpcMethod<1> for EthGetFilterChanges { .eth_event_handler .get_events_for_parsed_filter( &ctx, - &event_filter.into(), + &Arc::new(event_filter.into()), SkipEvent::OnUnresolvedAddress, ) .await?; @@ -3390,7 +3391,7 @@ impl RpcMethod<1> for EthGetFilterChanges { .eth_event_handler .get_events_for_parsed_filter( &ctx, - &ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( + &Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( // heaviest tipset doesn't have events because its messages haven't been executed yet RangeInclusive::new( tipset_filter @@ -3399,7 +3400,7 @@ impl RpcMethod<1> for EthGetFilterChanges { // Use -1 to indicate that the range extends until the latest available tipset. -1, ), - )), + ))), SkipEvent::OnUnresolvedAddress, ) .await?; @@ -3422,7 +3423,7 @@ impl RpcMethod<1> for EthGetFilterChanges { .eth_event_handler .get_events_for_parsed_filter( &ctx, - &ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( + &Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( // heaviest tipset doesn't have events because its messages haven't been executed yet RangeInclusive::new( mempool_filter @@ -3431,7 +3432,7 @@ impl RpcMethod<1> for EthGetFilterChanges { // Use -1 to indicate that the range extends until the latest available tipset. -1, ), - )), + ))), SkipEvent::OnUnresolvedAddress, ) .await?; diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index caf8ebc16a8a..9555db7b1bc9 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -28,6 +28,7 @@ use crate::blocks::Tipset; use crate::blocks::TipsetKey; use crate::chain::index::ResolveNullTipset; use crate::cli_shared::cli::EventsConfig; +use crate::prelude::*; use crate::rpc::eth::EVM_WORD_LENGTH; use crate::rpc::eth::errors::EthErrors; use crate::rpc::eth::filter::event::*; @@ -43,9 +44,9 @@ use crate::shim::executor::{Entry, StampedEvent}; use crate::state_manager::StateManager; use crate::state_manager::{ExecutedMessage, ExecutedTipset}; use crate::utils::misc::env::env_or_default; +use crate::utils::task::AbortHandles; use ahash::AHashMap as HashMap; -use anyhow::{Context, Error, anyhow, bail, ensure}; -use cid::Cid; +use anyhow::{Error, anyhow, bail, ensure}; use futures::{TryStreamExt as _, stream::FuturesOrdered}; use fvm_ipld_encoding::IPLD_RAW; use serde::*; @@ -58,6 +59,7 @@ use store::*; /// Implementors of this trait define custom logic to determine whether an event matches the filtering criteria /// based on the event emitter's address and its associated entries. /// +#[auto_impl::auto_impl(&, Arc)] pub trait Matcher { /// # Parameters /// - `emitter_addr`: The address of the Actor that emitted the event, along with the associated event entries. @@ -291,22 +293,34 @@ impl EthEventHandler { pub async fn collect_events_for_tipsets( ctx: &Ctx, tipsets: impl Iterator, - spec: Option<&impl Matcher>, + spec: Option<&(impl Matcher + ShallowClone + Send + Sync + 'static)>, skip_event: SkipEvent, collected_events: &mut Vec, ) -> anyhow::Result<()> { let mut tasks = FuturesOrdered::new(); + let mut abort_handles = AbortHandles::default(); for tipset in tipsets { - tasks.push_back(async move { + let state_manager = ctx.state_manager.shallow_clone(); + let spec = spec.as_ref().map(|v| v.shallow_clone()); + let task = tokio::spawn(async move { let mut events = vec![]; - Self::collect_events(&ctx.state_manager, &tipset, spec, skip_event, &mut events) - .await?; + Self::collect_events( + &state_manager, + &tipset, + spec.as_ref(), + skip_event, + &mut events, + ) + .await?; anyhow::Ok(events) }); + abort_handles.push(task.abort_handle()); + tasks.push_back(task); } let max_filter_results = ctx.eth_event_handler.max_filter_results; let mut tipsets_contributing = 0usize; while let Some(events) = tasks.try_next().await? { + let events = events?; if !events.is_empty() { tipsets_contributing += 1; } @@ -434,7 +448,7 @@ impl EthEventHandler { pub async fn get_events_for_parsed_filter( &self, ctx: &Ctx, - pf: &ParsedFilter, + pf: &Arc, skip_event: SkipEvent, ) -> anyhow::Result> { let mut collected_events = vec![]; diff --git a/src/rpc/methods/misc.rs b/src/rpc/methods/misc.rs index e456009c5204..deac177e441a 100644 --- a/src/rpc/methods/misc.rs +++ b/src/rpc/methods/misc.rs @@ -3,11 +3,11 @@ use std::collections::BTreeMap; -use cid::Cid; use enumflags2::BitFlags; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::prelude::*; use crate::rpc::eth::CollectedEvent; use crate::rpc::eth::filter::{ParsedFilter, SkipEvent}; use crate::{ @@ -35,11 +35,11 @@ impl RpcMethod<1> for GetActorEventsRaw { _: &http::Extensions, ) -> Result { if let Some(filter) = filter { - let parsed_filter = ParsedFilter::from_actor_event_filter( + let parsed_filter = Arc::new(ParsedFilter::from_actor_event_filter( ctx.chain_store().heaviest_tipset().epoch(), ctx.eth_event_handler.max_filter_height_range, filter, - )?; + )?); let events = ctx .eth_event_handler .get_events_for_parsed_filter(&ctx, &parsed_filter, SkipEvent::Never) diff --git a/src/rpc/methods/state.rs b/src/rpc/methods/state.rs index 9cb49456fc01..1a3f45b0181a 100644 --- a/src/rpc/methods/state.rs +++ b/src/rpc/methods/state.rs @@ -1612,7 +1612,7 @@ impl RpcMethod<3> for ForestStateCompute { { let chain_store = ctx.chain_store().shallow_clone(); let network_context = ctx.sync_network_context.shallow_clone(); - futures.push_front(async move { + futures.push_front(tokio::spawn(async move { if crate::chain_sync::load_full_tipset(&chain_store, ts.key()).is_err() { // Backfill full tipset from the network const MAX_RETRIES: usize = 5; @@ -1630,11 +1630,12 @@ impl RpcMethod<3> for ForestStateCompute { fts.persist(chain_store.db())?; } anyhow::Ok(ts) - }); + })); } let mut results = Vec::with_capacity(n_epochs as _); while let Some(ts) = futures.try_next().await? { + let ts = ts?; let epoch = ts.epoch(); let tipset_key = ts.key().clone(); if !force_recompute { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 5c0dce2a7f82..5809bafc040b 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -18,6 +18,7 @@ pub mod proofs_api; pub mod rand; pub mod reqwest_resume; mod shallow_clone; +pub mod task; pub use shallow_clone::ShallowClone; #[cfg(feature = "sqlite")] pub mod sqlite; diff --git a/src/utils/task/mod.rs b/src/utils/task/mod.rs new file mode 100644 index 000000000000..23141b8f311b --- /dev/null +++ b/src/utils/task/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use tokio::task::AbortHandle; + +/// Holds a collection of [`AbortHandle`] and aborts them automatically on drop +#[derive(Default, derive_more::Deref, derive_more::DerefMut)] +pub struct AbortHandles(Vec); + +impl Drop for AbortHandles { + fn drop(&mut self) { + for handle in self.iter() { + handle.abort(); + } + } +}