Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3271,17 +3271,18 @@ impl RpcMethod<1> for EthGetLogs {
(eth_filter,): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let pf = ctx
.eth_event_handler
.parse_eth_filter_spec(&ctx, &eth_filter)
.map_err(|e| {
if e.downcast_ref::<EthErrors>()
.is_some_and(|eth_err| matches!(eth_err, EthErrors::BlockRangeExceeded { .. }))
{
return e;
}
e.context("failed to parse events for filter")
})?;
let pf = Arc::new(
ctx.eth_event_handler
.parse_eth_filter_spec(&ctx, &eth_filter)
.map_err(|e| {
if e.downcast_ref::<EthErrors>().is_some_and(|eth_err| {
matches!(eth_err, EthErrors::BlockRangeExceeded { .. })
}) {
return e;
}
e.context("failed to parse events for filter")
})?,
);
let events = ctx
.eth_event_handler
.get_events_for_parsed_filter(&ctx, &pf, SkipEvent::OnUnresolvedAddress)
Expand Down Expand Up @@ -3314,7 +3315,7 @@ impl RpcMethod<1> for EthGetFilterLogs {
.eth_event_handler
.get_events_for_parsed_filter(
&ctx,
&event_filter.into(),
&Arc::new(event_filter.into()),
SkipEvent::OnUnresolvedAddress,
)
.await?;
Expand Down Expand Up @@ -3365,7 +3366,7 @@ impl RpcMethod<1> for EthGetFilterChanges {
.eth_event_handler
.get_events_for_parsed_filter(
&ctx,
&event_filter.into(),
&Arc::new(event_filter.into()),
SkipEvent::OnUnresolvedAddress,
)
.await?;
Expand All @@ -3390,7 +3391,7 @@ impl RpcMethod<1> for EthGetFilterChanges {
.eth_event_handler
.get_events_for_parsed_filter(
&ctx,
&ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range(
&Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range(
// heaviest tipset doesn't have events because its messages haven't been executed yet
RangeInclusive::new(
tipset_filter
Expand All @@ -3399,7 +3400,7 @@ impl RpcMethod<1> for EthGetFilterChanges {
// Use -1 to indicate that the range extends until the latest available tipset.
-1,
),
)),
))),
SkipEvent::OnUnresolvedAddress,
)
.await?;
Expand All @@ -3422,7 +3423,7 @@ impl RpcMethod<1> for EthGetFilterChanges {
.eth_event_handler
.get_events_for_parsed_filter(
&ctx,
&ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range(
&Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range(
// heaviest tipset doesn't have events because its messages haven't been executed yet
RangeInclusive::new(
mempool_filter
Expand All @@ -3431,7 +3432,7 @@ impl RpcMethod<1> for EthGetFilterChanges {
// Use -1 to indicate that the range extends until the latest available tipset.
-1,
),
)),
))),
SkipEvent::OnUnresolvedAddress,
)
.await?;
Expand Down
28 changes: 21 additions & 7 deletions src/rpc/methods/eth/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::blocks::Tipset;
use crate::blocks::TipsetKey;
use crate::chain::index::ResolveNullTipset;
use crate::cli_shared::cli::EventsConfig;
use crate::prelude::*;
use crate::rpc::eth::EVM_WORD_LENGTH;
use crate::rpc::eth::errors::EthErrors;
use crate::rpc::eth::filter::event::*;
Expand All @@ -43,9 +44,9 @@ use crate::shim::executor::{Entry, StampedEvent};
use crate::state_manager::StateManager;
use crate::state_manager::{ExecutedMessage, ExecutedTipset};
use crate::utils::misc::env::env_or_default;
use crate::utils::task::AbortHandles;
use ahash::AHashMap as HashMap;
use anyhow::{Context, Error, anyhow, bail, ensure};
use cid::Cid;
use anyhow::{Error, anyhow, bail, ensure};
use futures::{TryStreamExt as _, stream::FuturesOrdered};
use fvm_ipld_encoding::IPLD_RAW;
use serde::*;
Expand All @@ -58,6 +59,7 @@ use store::*;
/// Implementors of this trait define custom logic to determine whether an event matches the filtering criteria
/// based on the event emitter's address and its associated entries.
///
#[auto_impl::auto_impl(&, Arc)]
pub trait Matcher {
/// # Parameters
/// - `emitter_addr`: The address of the Actor that emitted the event, along with the associated event entries.
Expand Down Expand Up @@ -291,22 +293,34 @@ impl EthEventHandler {
pub async fn collect_events_for_tipsets(
ctx: &Ctx,
tipsets: impl Iterator<Item = Tipset>,
spec: Option<&impl Matcher>,
spec: Option<&(impl Matcher + ShallowClone + Send + Sync + 'static)>,
skip_event: SkipEvent,
collected_events: &mut Vec<CollectedEvent>,
) -> anyhow::Result<()> {
let mut tasks = FuturesOrdered::new();
let mut abort_handles = AbortHandles::default();
for tipset in tipsets {
tasks.push_back(async move {
let state_manager = ctx.state_manager.shallow_clone();
let spec = spec.as_ref().map(|v| v.shallow_clone());
let task = tokio::spawn(async move {
let mut events = vec![];
Self::collect_events(&ctx.state_manager, &tipset, spec, skip_event, &mut events)
.await?;
Self::collect_events(
&state_manager,
&tipset,
spec.as_ref(),
skip_event,
&mut events,
)
.await?;
anyhow::Ok(events)
});
abort_handles.push(task.abort_handle());
tasks.push_back(task);
}
let max_filter_results = ctx.eth_event_handler.max_filter_results;
let mut tipsets_contributing = 0usize;
while let Some(events) = tasks.try_next().await? {
let events = events?;
if !events.is_empty() {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
tipsets_contributing += 1;
}
Expand Down Expand Up @@ -434,7 +448,7 @@ impl EthEventHandler {
pub async fn get_events_for_parsed_filter(
&self,
ctx: &Ctx,
pf: &ParsedFilter,
pf: &Arc<ParsedFilter>,
skip_event: SkipEvent,
) -> anyhow::Result<Vec<CollectedEvent>> {
let mut collected_events = vec![];
Expand Down
6 changes: 3 additions & 3 deletions src/rpc/methods/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

use std::collections::BTreeMap;

use cid::Cid;
use enumflags2::BitFlags;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::prelude::*;
use crate::rpc::eth::CollectedEvent;
use crate::rpc::eth::filter::{ParsedFilter, SkipEvent};
use crate::{
Expand Down Expand Up @@ -35,11 +35,11 @@ impl RpcMethod<1> for GetActorEventsRaw {
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
if let Some(filter) = filter {
let parsed_filter = ParsedFilter::from_actor_event_filter(
let parsed_filter = Arc::new(ParsedFilter::from_actor_event_filter(
ctx.chain_store().heaviest_tipset().epoch(),
ctx.eth_event_handler.max_filter_height_range,
filter,
)?;
)?);
let events = ctx
.eth_event_handler
.get_events_for_parsed_filter(&ctx, &parsed_filter, SkipEvent::Never)
Expand Down
5 changes: 3 additions & 2 deletions src/rpc/methods/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@ impl RpcMethod<3> for ForestStateCompute {
{
let chain_store = ctx.chain_store().shallow_clone();
let network_context = ctx.sync_network_context.shallow_clone();
futures.push_front(async move {
futures.push_front(tokio::spawn(async move {
Comment thread
hanabi1224 marked this conversation as resolved.
if crate::chain_sync::load_full_tipset(&chain_store, ts.key()).is_err() {
// Backfill full tipset from the network
const MAX_RETRIES: usize = 5;
Expand All @@ -1630,11 +1630,12 @@ impl RpcMethod<3> for ForestStateCompute {
fts.persist(chain_store.db())?;
}
anyhow::Ok(ts)
});
}));
}

let mut results = Vec::with_capacity(n_epochs as _);
while let Some(ts) = futures.try_next().await? {
let ts = ts?;
let epoch = ts.epoch();
let tipset_key = ts.key().clone();
if !force_recompute {
Expand Down
1 change: 1 addition & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod proofs_api;
pub mod rand;
pub mod reqwest_resume;
mod shallow_clone;
pub mod task;
pub use shallow_clone::ShallowClone;
#[cfg(feature = "sqlite")]
pub mod sqlite;
Expand Down
16 changes: 16 additions & 0 deletions src/utils/task/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use tokio::task::AbortHandle;

/// Holds a collection of [`AbortHandle`] and aborts them automatically on drop
#[derive(Default, derive_more::Deref, derive_more::DerefMut)]
pub struct AbortHandles(Vec<AbortHandle>);

impl Drop for AbortHandles {
fn drop(&mut self) {
for handle in self.iter() {
handle.abort();
}
}
}
Loading