From 7856a6c51b0e9d62fec7f4eb014d70410bd70f18 Mon Sep 17 00:00:00 2001 From: Hubert Bugaj Date: Tue, 26 May 2026 11:59:51 +0200 Subject: [PATCH 1/2] fix: execute-tipset-traces-parallelize --- src/rpc/methods/eth.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 560573915e6..717837eee45 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -3483,9 +3483,24 @@ async fn execute_tipset_traces( let state = ctx.state_manager.get_state_tree(&state_root)?; - let mut entries = Vec::new(); - for (msg_idx, ir) in non_system_traces_with_positions(raw_traces) { - let tx_hash = EthGetTransactionHashByCid::handle(ctx.clone(), (ir.msg_cid,), ext).await?; + // Resolve every non-system message's tx hash in parallel. Each lookup is + // an independent DB read; running them sequentially adds O(N) await + // latency to every trace_block response. + type TxHashTask = Result<(i64, ApiInvocResult, Option), ServerError>; + let raw: Vec<(i64, ApiInvocResult)> = non_system_traces_with_positions(raw_traces).collect(); + let mut entries: Vec = Vec::with_capacity(raw.len()); + let mut join_set: tokio::task::JoinSet = tokio::task::JoinSet::new(); + for (msg_idx, ir) in raw { + let ctx = ctx.clone(); + let ext = ext.clone(); + join_set.spawn(async move { + let tx_hash = + EthGetTransactionHashByCid::handle(ctx, (ir.msg_cid,), &ext).await?; + Ok((msg_idx, ir, tx_hash)) + }); + } + while let Some(joined) = join_set.join_next().await { + let (msg_idx, ir, tx_hash) = joined.context("trace tx-hash task panicked")??; let tx_hash = tx_hash .with_context(|| format!("cannot find transaction hash for cid {}", ir.msg_cid))?; entries.push(trace::TipsetTraceEntry { @@ -3494,6 +3509,7 @@ async fn execute_tipset_traces( invoc_result: ir, }); } + entries.sort_by_key(|e| e.msg_position); Ok((state, entries)) } From a27ce1ff7a72952f628f6486ccc33c85d4205c00 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 27 May 2026 05:42:00 +0800 Subject: [PATCH 2/2] refactor --- src/rpc/methods/eth.rs | 94 +++++++++++++++++++++--------------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index aa3c1dcf794..cde031f5ceb 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -2597,21 +2597,9 @@ async fn get_eth_transaction_by_hash( } pub enum EthGetTransactionHashByCid {} -impl RpcMethod<1> for EthGetTransactionHashByCid { - const NAME: &'static str = "Filecoin.EthGetTransactionHashByCid"; - const NAME_ALIAS: Option<&'static str> = Some("eth_getTransactionHashByCid"); - const PARAM_NAMES: [&'static str; 1] = ["cid"]; - const API_PATHS: BitFlags = ApiPaths::all_with_v2(); - const PERMISSION: Permission = Permission::Read; - type Params = (Cid,); - type Ok = Option; - - async fn handle( - ctx: Ctx, - (cid,): Self::Params, - _: &http::Extensions, - ) -> Result { +impl EthGetTransactionHashByCid { + fn run(ctx: &Ctx, cid: Cid) -> anyhow::Result> { let smsgs_result: Result, crate::chain::Error> = crate::chain::messages_from_cids(ctx.db(), &[cid]); if let Ok(smsgs) = smsgs_result @@ -2638,6 +2626,25 @@ impl RpcMethod<1> for EthGetTransactionHashByCid { } } +impl RpcMethod<1> for EthGetTransactionHashByCid { + const NAME: &'static str = "Filecoin.EthGetTransactionHashByCid"; + const NAME_ALIAS: Option<&'static str> = Some("eth_getTransactionHashByCid"); + const PARAM_NAMES: [&'static str; 1] = ["cid"]; + const API_PATHS: BitFlags = ApiPaths::all_with_v2(); + const PERMISSION: Permission = Permission::Read; + + type Params = (Cid,); + type Ok = Option; + + async fn handle( + ctx: Ctx, + (cid,): Self::Params, + _: &http::Extensions, + ) -> Result { + Ok(Self::run(&ctx, cid)?) + } +} + pub enum EthCall {} impl RpcMethod<2> for EthCall { const NAME: &'static str = "Filecoin.EthCall"; @@ -3476,7 +3483,7 @@ impl RpcMethod<1> for EthTraceBlock { let ts = resolver .tipset_by_block_number_or_hash(block_param, ResolveNullTipset::TakeOlder) .await?; - eth_trace_block(&ctx, &ts, ext).await + eth_trace_block(&ctx, &ts).await } } @@ -3484,7 +3491,6 @@ impl RpcMethod<1> for EthTraceBlock { async fn execute_tipset_traces( ctx: &Ctx, ts: &Tipset, - ext: &http::Extensions, ) -> Result<(StateTree, Vec), ServerError> { let (state_root, raw_traces) = { let sm = ctx.state_manager.shallow_clone(); @@ -3497,30 +3503,30 @@ async fn execute_tipset_traces( let state = ctx.state_manager.get_state_tree(&state_root)?; // Resolve every non-system message's tx hash in parallel. Each lookup is - // an independent DB read; running them sequentially adds O(N) await + // an independent DB read; running them sequentially adds O(N) IO // latency to every trace_block response. - type TxHashTask = Result<(i64, ApiInvocResult, Option), ServerError>; let raw: Vec<(i64, ApiInvocResult)> = non_system_traces_with_positions(raw_traces).collect(); let mut entries: Vec = Vec::with_capacity(raw.len()); - let mut join_set: tokio::task::JoinSet = tokio::task::JoinSet::new(); - for (msg_idx, ir) in raw { - let ctx = ctx.clone(); - let ext = ext.clone(); - join_set.spawn(async move { - let tx_hash = - EthGetTransactionHashByCid::handle(ctx, (ir.msg_cid,), &ext).await?; - Ok((msg_idx, ir, tx_hash)) + let mut join_set = tokio::task::JoinSet::new(); + for (msg_position, invoc_result) in raw { + let ctx = ctx.shallow_clone(); + join_set.spawn_blocking(move || { + let tx_hash = EthGetTransactionHashByCid::run(&ctx, invoc_result.msg_cid)? + .with_context(|| { + format!( + "cannot find transaction hash for cid {}", + invoc_result.msg_cid + ) + })?; + anyhow::Ok(trace::TipsetTraceEntry { + tx_hash, + msg_position, + invoc_result, + }) }); } while let Some(joined) = join_set.join_next().await { - let (msg_idx, ir, tx_hash) = joined.context("trace tx-hash task panicked")??; - let tx_hash = tx_hash - .with_context(|| format!("cannot find transaction hash for cid {}", ir.msg_cid))?; - entries.push(trace::TipsetTraceEntry { - tx_hash, - msg_position: msg_idx, - invoc_result: ir, - }); + entries.push(joined.context("trace tx-hash task panicked")??); } entries.sort_by_key(|e| e.msg_position); @@ -3540,12 +3546,8 @@ fn non_system_traces_with_positions( .map(|(idx, ir)| (idx as i64, ir)) } -async fn eth_trace_block( - ctx: &Ctx, - ts: &Tipset, - ext: &http::Extensions, -) -> Result, ServerError> { - let (state, entries) = execute_tipset_traces(ctx, ts, ext).await?; +async fn eth_trace_block(ctx: &Ctx, ts: &Tipset) -> Result, ServerError> { + let (state, entries) = execute_tipset_traces(ctx, ts).await?; let block_hash: EthHash = ts.key().cid()?.into(); let mut all_traces = vec![]; @@ -3583,13 +3585,12 @@ impl RpcMethod<2> for EthDebugTraceTransaction { ext: &http::Extensions, ) -> Result { let opts = opts.unwrap_or_default(); - debug_trace_transaction(ctx, ext, Self::api_path(ext)?, tx_hash, opts).await + debug_trace_transaction(ctx, Self::api_path(ext)?, tx_hash, opts).await } } async fn debug_trace_transaction( ctx: Ctx, - ext: &http::Extensions, api_path: ApiPaths, tx_hash: String, opts: GethDebugTracingOptions, @@ -3669,7 +3670,7 @@ async fn debug_trace_transaction( return Ok(GethTrace::PreState(frame)); } - let (state, entries) = execute_tipset_traces(&ctx, &ts, ext).await?; + let (state, entries) = execute_tipset_traces(&ctx, &ts).await?; let entry = entries .into_iter() .find(|e| e.tx_hash == eth_hash) @@ -3871,7 +3872,7 @@ impl RpcMethod<1> for EthTraceTransaction { .tipset_by_block_number_or_hash(eth_txn.block_number, ResolveNullTipset::TakeOlder) .await?; - let traces = eth_trace_block(&ctx, &ts, ext) + let traces = eth_trace_block(&ctx, &ts) .await? .into_iter() .filter(|trace| trace.transaction_hash == eth_hash) @@ -3912,16 +3913,15 @@ impl RpcMethod<2> for EthTraceReplayBlockTransactions { .tipset_by_block_number_or_hash(block_param, ResolveNullTipset::TakeOlder) .await?; - eth_trace_replay_block_transactions(&ctx, &ts, ext).await + eth_trace_replay_block_transactions(&ctx, &ts).await } } async fn eth_trace_replay_block_transactions( ctx: &Ctx, ts: &Tipset, - ext: &http::Extensions, ) -> Result, ServerError> { - let (state, entries) = execute_tipset_traces(ctx, ts, ext).await?; + let (state, entries) = execute_tipset_traces(ctx, ts).await?; let mut all_traces = vec![]; for entry in entries {