diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 25be8bfd7f7..3aadb76ac72 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -2597,29 +2597,16 @@ async fn get_eth_transaction_by_hash( } pub enum EthGetTransactionHashByCid {} -impl RpcMethod<1> for EthGetTransactionHashByCid { - const NAME: &'static str = "Filecoin.EthGetTransactionHashByCid"; - const NAME_ALIAS: Option<&'static str> = Some("eth_getTransactionHashByCid"); - const PARAM_NAMES: [&'static str; 1] = ["cid"]; - const API_PATHS: BitFlags = ApiPaths::all_with_v2(); - const PERMISSION: Permission = Permission::Read; - type Params = (Cid,); - type Ok = Option; - - async fn handle( - ctx: Ctx, - (cid,): Self::Params, - _: &http::Extensions, - ) -> Result { +impl EthGetTransactionHashByCid { + fn run(db: &DbImpl, eth_chain_id: EthChainIdType, cid: Cid) -> anyhow::Result> { let smsgs_result: Result, crate::chain::Error> = - crate::chain::messages_from_cids(ctx.db(), &[cid]); + crate::chain::messages_from_cids(db, &[cid]); if let Ok(smsgs) = smsgs_result && let Some(smsg) = smsgs.first() { let hash = if smsg.is_delegated() { - let chain_id = ctx.chain_config().eth_chain_id; - let (_, tx) = eth_tx_from_signed_eth_message(smsg, chain_id)?; + let (_, tx) = eth_tx_from_signed_eth_message(smsg, eth_chain_id)?; tx.eth_hash()?.into() } else if smsg.is_secp256k1() { smsg.cid().into() @@ -2629,7 +2616,7 @@ impl RpcMethod<1> for EthGetTransactionHashByCid { return Ok(Some(hash)); } - let msg_result = crate::chain::get_chain_message(ctx.db(), &cid); + let msg_result = crate::chain::get_chain_message(db, &cid); if let Ok(msg) = msg_result { return Ok(Some(msg.cid().into())); } @@ -2638,6 +2625,25 @@ impl RpcMethod<1> for EthGetTransactionHashByCid { } } +impl RpcMethod<1> for EthGetTransactionHashByCid { + const NAME: &'static str = "Filecoin.EthGetTransactionHashByCid"; + const NAME_ALIAS: Option<&'static str> = Some("eth_getTransactionHashByCid"); + const PARAM_NAMES: [&'static str; 1] = ["cid"]; + const API_PATHS: BitFlags = ApiPaths::all_with_v2(); + const PERMISSION: Permission = Permission::Read; + + type Params = (Cid,); + type Ok = Option; + + async fn handle( + ctx: Ctx, + (cid,): Self::Params, + _: &http::Extensions, + ) -> Result { + Ok(Self::run(ctx.db(), ctx.chain_config().eth_chain_id, cid)?) + } +} + pub enum EthCall {} impl RpcMethod<2> for EthCall { const NAME: &'static str = "Filecoin.EthCall"; @@ -3476,7 +3482,7 @@ impl RpcMethod<1> for EthTraceBlock { let ts = resolver .tipset_by_block_number_or_hash(block_param, ResolveNullTipset::TakeOlder) .await?; - eth_trace_block(&ctx, &ts, ext).await + eth_trace_block(&ctx, &ts).await } } @@ -3484,36 +3490,37 @@ impl RpcMethod<1> for EthTraceBlock { async fn execute_tipset_traces( ctx: &Ctx, ts: &Tipset, - ext: &http::Extensions, ) -> Result<(StateTree, Vec), ServerError> { let (state_root, raw_traces) = ctx.state_manager.execution_trace(ts).await?; let state = ctx.state_manager.get_state_tree(&state_root)?; // Resolve every non-system message's tx hash in parallel. Each lookup is - // an independent DB read; running them sequentially adds O(N) await + // an independent DB read; running them sequentially adds O(N) IO // latency to every trace_block response. - type TxHashTask = Result<(i64, Arc, Option), ServerError>; - let raw: Vec<(i64, Arc)> = - non_system_traces_with_positions(raw_traces).collect(); + let raw = non_system_traces_with_positions(raw_traces).collect_vec(); let mut entries: Vec = Vec::with_capacity(raw.len()); - let mut join_set: tokio::task::JoinSet = tokio::task::JoinSet::new(); - for (msg_idx, ir) in raw { - let ctx = ctx.clone(); - let ext = ext.clone(); - join_set.spawn(async move { - let tx_hash = EthGetTransactionHashByCid::handle(ctx, (ir.msg_cid,), &ext).await?; - Ok((msg_idx, ir, tx_hash)) + let mut join_set = tokio::task::JoinSet::new(); + let db = ctx.db(); + let eth_chain_id = ctx.chain_config().eth_chain_id; + for (msg_position, invoc_result) in raw { + let db = db.shallow_clone(); + join_set.spawn_blocking(move || { + let tx_hash = EthGetTransactionHashByCid::run(&db, eth_chain_id, invoc_result.msg_cid)? + .with_context(|| { + format!( + "cannot find transaction hash for cid {}", + invoc_result.msg_cid + ) + })?; + anyhow::Ok(trace::TipsetTraceEntry { + tx_hash, + msg_position, + invoc_result, + }) }); } while let Some(joined) = join_set.join_next().await { - let (msg_idx, ir, tx_hash) = joined.context("trace tx-hash task panicked")??; - let tx_hash = tx_hash - .with_context(|| format!("cannot find transaction hash for cid {}", ir.msg_cid))?; - entries.push(trace::TipsetTraceEntry { - tx_hash, - msg_position: msg_idx, - invoc_result: ir, - }); + entries.push(joined.context("trace tx-hash task panicked")??); } entries.sort_by_key(|e| e.msg_position); @@ -3533,12 +3540,8 @@ fn non_system_traces_with_positions( .map(|(idx, ir)| (idx as i64, ir)) } -async fn eth_trace_block( - ctx: &Ctx, - ts: &Tipset, - ext: &http::Extensions, -) -> Result, ServerError> { - let (state, entries) = execute_tipset_traces(ctx, ts, ext).await?; +async fn eth_trace_block(ctx: &Ctx, ts: &Tipset) -> Result, ServerError> { + let (state, entries) = execute_tipset_traces(ctx, ts).await?; let block_hash: EthHash = ts.key().cid()?.into(); let mut all_traces = vec![]; @@ -3576,13 +3579,12 @@ impl RpcMethod<2> for EthDebugTraceTransaction { ext: &http::Extensions, ) -> Result { let opts = opts.unwrap_or_default(); - debug_trace_transaction(ctx, ext, Self::api_path(ext)?, tx_hash, opts).await + debug_trace_transaction(ctx, Self::api_path(ext)?, tx_hash, opts).await } } async fn debug_trace_transaction( ctx: Ctx, - ext: &http::Extensions, api_path: ApiPaths, tx_hash: String, opts: GethDebugTracingOptions, @@ -3662,7 +3664,7 @@ async fn debug_trace_transaction( return Ok(GethTrace::PreState(frame)); } - let (state, entries) = execute_tipset_traces(&ctx, &ts, ext).await?; + let (state, entries) = execute_tipset_traces(&ctx, &ts).await?; let entry = entries .into_iter() .find(|e| e.tx_hash == eth_hash) @@ -3865,7 +3867,7 @@ impl RpcMethod<1> for EthTraceTransaction { .tipset_by_block_number_or_hash(eth_txn.block_number, ResolveNullTipset::TakeOlder) .await?; - let traces = eth_trace_block(&ctx, &ts, ext) + let traces = eth_trace_block(&ctx, &ts) .await? .into_iter() .filter(|trace| trace.transaction_hash == eth_hash) @@ -3906,16 +3908,15 @@ impl RpcMethod<2> for EthTraceReplayBlockTransactions { .tipset_by_block_number_or_hash(block_param, ResolveNullTipset::TakeOlder) .await?; - eth_trace_replay_block_transactions(&ctx, &ts, ext).await + eth_trace_replay_block_transactions(&ctx, &ts).await } } async fn eth_trace_replay_block_transactions( ctx: &Ctx, ts: &Tipset, - ext: &http::Extensions, ) -> Result, ServerError> { - let (state, entries) = execute_tipset_traces(ctx, ts, ext).await?; + let (state, entries) = execute_tipset_traces(ctx, ts).await?; let mut all_traces = vec![]; for entry in entries {