Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 53 additions & 52 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2597,29 +2597,16 @@ async fn get_eth_transaction_by_hash(
}

pub enum EthGetTransactionHashByCid {}
impl RpcMethod<1> for EthGetTransactionHashByCid {
const NAME: &'static str = "Filecoin.EthGetTransactionHashByCid";
const NAME_ALIAS: Option<&'static str> = Some("eth_getTransactionHashByCid");
const PARAM_NAMES: [&'static str; 1] = ["cid"];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
const PERMISSION: Permission = Permission::Read;

type Params = (Cid,);
type Ok = Option<EthHash>;

async fn handle(
ctx: Ctx,
(cid,): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
impl EthGetTransactionHashByCid {
fn run(db: &DbImpl, eth_chain_id: EthChainIdType, cid: Cid) -> anyhow::Result<Option<EthHash>> {
let smsgs_result: Result<Vec<SignedMessage>, crate::chain::Error> =
crate::chain::messages_from_cids(ctx.db(), &[cid]);
crate::chain::messages_from_cids(db, &[cid]);
if let Ok(smsgs) = smsgs_result
&& let Some(smsg) = smsgs.first()
{
let hash = if smsg.is_delegated() {
let chain_id = ctx.chain_config().eth_chain_id;
let (_, tx) = eth_tx_from_signed_eth_message(smsg, chain_id)?;
let (_, tx) = eth_tx_from_signed_eth_message(smsg, eth_chain_id)?;
tx.eth_hash()?.into()
} else if smsg.is_secp256k1() {
smsg.cid().into()
Expand All @@ -2629,7 +2616,7 @@ impl RpcMethod<1> for EthGetTransactionHashByCid {
return Ok(Some(hash));
}

let msg_result = crate::chain::get_chain_message(ctx.db(), &cid);
let msg_result = crate::chain::get_chain_message(db, &cid);
if let Ok(msg) = msg_result {
return Ok(Some(msg.cid().into()));
}
Expand All @@ -2638,6 +2625,25 @@ impl RpcMethod<1> for EthGetTransactionHashByCid {
}
}

impl RpcMethod<1> for EthGetTransactionHashByCid {
const NAME: &'static str = "Filecoin.EthGetTransactionHashByCid";
const NAME_ALIAS: Option<&'static str> = Some("eth_getTransactionHashByCid");
const PARAM_NAMES: [&'static str; 1] = ["cid"];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
const PERMISSION: Permission = Permission::Read;

type Params = (Cid,);
type Ok = Option<EthHash>;

async fn handle(
ctx: Ctx,
(cid,): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
Ok(Self::run(ctx.db(), ctx.chain_config().eth_chain_id, cid)?)
}
}

pub enum EthCall {}
impl RpcMethod<2> for EthCall {
const NAME: &'static str = "Filecoin.EthCall";
Expand Down Expand Up @@ -3476,44 +3482,45 @@ impl RpcMethod<1> for EthTraceBlock {
let ts = resolver
.tipset_by_block_number_or_hash(block_param, ResolveNullTipset::TakeOlder)
.await?;
eth_trace_block(&ctx, &ts, ext).await
eth_trace_block(&ctx, &ts).await
}
}

/// Replays a tipset and resolves every non-system transaction into a [`trace::TipsetTraceEntry`].
async fn execute_tipset_traces(
ctx: &Ctx,
ts: &Tipset,
ext: &http::Extensions,
) -> Result<(StateTree<DbImpl>, Vec<trace::TipsetTraceEntry>), ServerError> {
let (state_root, raw_traces) = ctx.state_manager.execution_trace(ts).await?;
let state = ctx.state_manager.get_state_tree(&state_root)?;

// Resolve every non-system message's tx hash in parallel. Each lookup is
// an independent DB read; running them sequentially adds O(N) await
// an independent DB read; running them sequentially adds O(N) IO
// latency to every trace_block response.
type TxHashTask = Result<(i64, Arc<ApiInvocResult>, Option<EthHash>), ServerError>;
let raw: Vec<(i64, Arc<ApiInvocResult>)> =
non_system_traces_with_positions(raw_traces).collect();
let raw = non_system_traces_with_positions(raw_traces).collect_vec();
let mut entries: Vec<trace::TipsetTraceEntry> = Vec::with_capacity(raw.len());
let mut join_set: tokio::task::JoinSet<TxHashTask> = tokio::task::JoinSet::new();
for (msg_idx, ir) in raw {
let ctx = ctx.clone();
let ext = ext.clone();
join_set.spawn(async move {
let tx_hash = EthGetTransactionHashByCid::handle(ctx, (ir.msg_cid,), &ext).await?;
Ok((msg_idx, ir, tx_hash))
let mut join_set = tokio::task::JoinSet::new();
let db = ctx.db();
let eth_chain_id = ctx.chain_config().eth_chain_id;
for (msg_position, invoc_result) in raw {
let db = db.shallow_clone();
join_set.spawn_blocking(move || {
let tx_hash = EthGetTransactionHashByCid::run(&db, eth_chain_id, invoc_result.msg_cid)?
.with_context(|| {
format!(
"cannot find transaction hash for cid {}",
invoc_result.msg_cid
)
})?;
anyhow::Ok(trace::TipsetTraceEntry {
tx_hash,
msg_position,
invoc_result,
})
});
}
while let Some(joined) = join_set.join_next().await {
let (msg_idx, ir, tx_hash) = joined.context("trace tx-hash task panicked")??;
let tx_hash = tx_hash
.with_context(|| format!("cannot find transaction hash for cid {}", ir.msg_cid))?;
entries.push(trace::TipsetTraceEntry {
tx_hash,
msg_position: msg_idx,
invoc_result: ir,
});
entries.push(joined.context("trace tx-hash task panicked")??);
}
entries.sort_by_key(|e| e.msg_position);

Expand All @@ -3533,12 +3540,8 @@ fn non_system_traces_with_positions(
.map(|(idx, ir)| (idx as i64, ir))
}

async fn eth_trace_block(
ctx: &Ctx,
ts: &Tipset,
ext: &http::Extensions,
) -> Result<Vec<EthBlockTrace>, ServerError> {
let (state, entries) = execute_tipset_traces(ctx, ts, ext).await?;
async fn eth_trace_block(ctx: &Ctx, ts: &Tipset) -> Result<Vec<EthBlockTrace>, ServerError> {
let (state, entries) = execute_tipset_traces(ctx, ts).await?;
let block_hash: EthHash = ts.key().cid()?.into();
let mut all_traces = vec![];

Expand Down Expand Up @@ -3576,13 +3579,12 @@ impl RpcMethod<2> for EthDebugTraceTransaction {
ext: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let opts = opts.unwrap_or_default();
debug_trace_transaction(ctx, ext, Self::api_path(ext)?, tx_hash, opts).await
debug_trace_transaction(ctx, Self::api_path(ext)?, tx_hash, opts).await
}
}

async fn debug_trace_transaction(
ctx: Ctx,
ext: &http::Extensions,
api_path: ApiPaths,
tx_hash: String,
opts: GethDebugTracingOptions,
Expand Down Expand Up @@ -3662,7 +3664,7 @@ async fn debug_trace_transaction(
return Ok(GethTrace::PreState(frame));
}

let (state, entries) = execute_tipset_traces(&ctx, &ts, ext).await?;
let (state, entries) = execute_tipset_traces(&ctx, &ts).await?;
let entry = entries
.into_iter()
.find(|e| e.tx_hash == eth_hash)
Expand Down Expand Up @@ -3865,7 +3867,7 @@ impl RpcMethod<1> for EthTraceTransaction {
.tipset_by_block_number_or_hash(eth_txn.block_number, ResolveNullTipset::TakeOlder)
.await?;

let traces = eth_trace_block(&ctx, &ts, ext)
let traces = eth_trace_block(&ctx, &ts)
.await?
.into_iter()
.filter(|trace| trace.transaction_hash == eth_hash)
Expand Down Expand Up @@ -3906,16 +3908,15 @@ impl RpcMethod<2> for EthTraceReplayBlockTransactions {
.tipset_by_block_number_or_hash(block_param, ResolveNullTipset::TakeOlder)
.await?;

eth_trace_replay_block_transactions(&ctx, &ts, ext).await
eth_trace_replay_block_transactions(&ctx, &ts).await
}
}

async fn eth_trace_replay_block_transactions(
ctx: &Ctx,
ts: &Tipset,
ext: &http::Extensions,
) -> Result<Vec<EthReplayBlockTransactionTrace>, ServerError> {
let (state, entries) = execute_tipset_traces(ctx, ts, ext).await?;
let (state, entries) = execute_tipset_traces(ctx, ts).await?;

let mut all_traces = vec![];
for entry in entries {
Expand Down
Loading