diff --git a/CHANGELOG.md b/CHANGELOG.md index c0d74847c4d..a84ea2e5006 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,8 @@ - [#7227](https://github.com/ChainSafe/forest/issues/7227): Fixed invalid `Filecoin.GasEstimateGasPremium` and `Filecoin.GasEstimateFeeCap` responses that were returning a fraction instead of an integer. +- [#7096](https://github.com/ChainSafe/forest/issues/7096): `eth_subscribe` `logs` now re-emits the logs of reorg-reverted tipsets with `removed: true`, ahead of the logs of the replacing tipsets. + ## Forest v0.33.7 "Shimmergloom" ### Added diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index c8ed446258b..82342132afc 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -602,6 +602,7 @@ fn maybe_start_rpc_service( bad_blocks, sync_status, eth_event_handler, + eth_logs_feed: Default::default(), sync_network_context, start_time, shutdown, diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 852a83970e4..f4ee7c3ac4d 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -1578,6 +1578,14 @@ impl Clone for PathChange { } } +impl PathChange { + pub fn tipset(&self) -> &T { + match self { + Self::Revert(ts) | Self::Apply(ts) => ts, + } + } +} + impl HasLotusJson for PathChange { type LotusJson = PathChange<::LotusJson>; diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 2c7d8bed3ba..afffb1ef6ca 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -35,10 +35,13 @@ use crate::rpc::{ error::ServerError, eth::{ errors::EthErrors, - filter::{SkipEvent, event::EventFilter, mempool::MempoolFilter, tipset::TipSetFilter}, + filter::{ + EventRevertStatus, SkipEvent, event::EventFilter, mempool::MempoolFilter, + tipset::TipSetFilter, + }, utils::decode_revert_reason, }, - methods::chain::ChainGetTipSetV2, + methods::chain::{ChainGetTipSetV2, PathChange}, state::ApiInvocResult, types::{ApiTipsetKey, EventEntry, MessageLookup}, }; @@ -1454,17 +1457,35 @@ pub async fn eth_logs_for_block_and_transaction( eth_filter_logs_from_events(ctx, &events) } -pub async fn eth_logs_with_filter( +/// Collects the logs produced by a single chain head change, for the logs +/// subscription. +pub(in crate::rpc) async fn eth_logs_for_head_change( ctx: &Ctx, - ts: &Tipset, - spec: Option, + change: &PathChange, ) -> anyhow::Result> { + let (receipt_ts, revert_status) = match change { + PathChange::Revert(ts) => (ts, EventRevertStatus::Reverted), + PathChange::Apply(ts) => (ts, EventRevertStatus::Applied), + }; + // Genesis carries no events and has no parent message tipset to load. + if receipt_ts.epoch() == 0 { + return Ok(vec![]); + } + let msg_ts = ctx + .chain_index() + .load_required_tipset(receipt_ts.parents())?; + let executed_ts = ctx + .state_manager + .load_executed_tipset_with_receipt(&msg_ts, receipt_ts) + .await?; let mut events = vec![]; - EthEventHandler::collect_events( + EthEventHandler::collect_events_from_messages( &ctx.state_manager, - ts, - spec.as_ref(), + &msg_ts, + &executed_ts.executed_messages, + None::<&ParsedFilter>, SkipEvent::OnUnresolvedAddress, + revert_status, &mut events, ) .await?; @@ -3147,6 +3168,11 @@ pub struct CollectedEvent { pub(crate) msg_cid: Cid, } +/// Positions `(message index, event index)` of collected events, grouped by tipset. +/// Identifies events without retaining their entry payloads; grouping by tipset lets +/// membership checks borrow the tipset key and stores each distinct key only once. +pub type SeenEventPositions = HashMap>; + fn match_key(key: &str) -> Option { match key.get(0..2) { Some("t1") => Some(0), @@ -3410,6 +3436,54 @@ impl RpcMethod<1> for EthGetLogs { } } +/// Shared implementation of `eth_getFilterLogs` / `eth_getFilterChanges` for installed event +/// filters: collects the filter's full result set from the canonical chain, returns only the +/// events that were not present in the previous poll. +async fn poll_event_filter( + ctx: &Ctx, + event_filter: &EventFilter, +) -> anyhow::Result> { + let events = ctx + .eth_event_handler + .get_events_for_parsed_filter( + ctx, + &Arc::new(event_filter.into()), + SkipEvent::OnUnresolvedAddress, + ) + .await?; + let mut seen_positions = SeenEventPositions::default(); + let mut recent_events = Vec::new(); + for event in events { + let position = (event.msg_idx, event.event_idx); + let already_seen = event_filter + .seen_positions + .get(&event.tipset_key) + .is_some_and(|positions| positions.contains(&position)); + match seen_positions.get_mut(&event.tipset_key) { + Some(positions) => { + positions.insert(position); + } + None => { + seen_positions.insert(event.tipset_key.clone(), HashSet::from_iter([position])); + } + } + if !already_seen { + recent_events.push(event); + } + } + if let Some(store) = &ctx.eth_event_handler.filter_store { + store.update(Arc::new(EventFilter { + id: event_filter.id.clone(), + tipsets: event_filter.tipsets.clone(), + addresses: event_filter.addresses.clone(), + keys_with_codec: event_filter.keys_with_codec.clone(), + max_results: event_filter.max_results, + seen_positions, + })); + } + Ok(recent_events) +} + pub enum EthGetFilterLogs {} impl RpcMethod<1> for EthGetFilterLogs { const NAME: &'static str = "Filecoin.EthGetFilterLogs"; @@ -3432,28 +3506,7 @@ impl RpcMethod<1> for EthGetFilterLogs { if let Some(store) = ð_event_handler.filter_store { let filter = store.get(&filter_id)?; if let Some(event_filter) = filter.as_any().downcast_ref::() { - let events = ctx - .eth_event_handler - .get_events_for_parsed_filter( - &ctx, - &Arc::new(event_filter.into()), - SkipEvent::OnUnresolvedAddress, - ) - .await?; - let recent_events: Vec = events - .clone() - .into_iter() - .filter(|event| !event_filter.collected.contains(event)) - .collect(); - let filter = Arc::new(EventFilter { - id: event_filter.id.clone(), - tipsets: event_filter.tipsets.clone(), - addresses: event_filter.addresses.clone(), - keys_with_codec: event_filter.keys_with_codec.clone(), - max_results: event_filter.max_results, - collected: events.clone(), - }); - store.update(filter); + let recent_events = poll_event_filter(&ctx, event_filter).await?; return Ok(eth_filter_result_from_events(&ctx, &recent_events)?); } } @@ -3483,28 +3536,7 @@ impl RpcMethod<1> for EthGetFilterChanges { if let Some(store) = ð_event_handler.filter_store { let filter = store.get(&filter_id)?; if let Some(event_filter) = filter.as_any().downcast_ref::() { - let events = ctx - .eth_event_handler - .get_events_for_parsed_filter( - &ctx, - &Arc::new(event_filter.into()), - SkipEvent::OnUnresolvedAddress, - ) - .await?; - let recent_events: Vec = events - .clone() - .into_iter() - .filter(|event| !event_filter.collected.contains(event)) - .collect(); - let filter = Arc::new(EventFilter { - id: event_filter.id.clone(), - tipsets: event_filter.tipsets.clone(), - addresses: event_filter.addresses.clone(), - keys_with_codec: event_filter.keys_with_codec.clone(), - max_results: event_filter.max_results, - collected: events.clone(), - }); - store.update(filter); + let recent_events = poll_event_filter(&ctx, event_filter).await?; return Ok(eth_filter_result_from_events(&ctx, &recent_events)?); } if let Some(tipset_filter) = filter.as_any().downcast_ref::() { diff --git a/src/rpc/methods/eth/filter/event.rs b/src/rpc/methods/eth/filter/event.rs index 605ba069c94..14b871bd358 100644 --- a/src/rpc/methods/eth/filter/event.rs +++ b/src/rpc/methods/eth/filter/event.rs @@ -3,7 +3,7 @@ use crate::prelude::*; use crate::rpc::eth::filter::{ActorEventBlock, ParsedFilter, ParsedFilterTipsets}; -use crate::rpc::eth::{CollectedEvent, FilterID, filter::Filter}; +use crate::rpc::eth::{FilterID, SeenEventPositions, filter::Filter}; use crate::shim::address::Address; use ahash::HashMap; use anyhow::Result; @@ -22,8 +22,8 @@ pub struct EventFilter { pub keys_with_codec: HashMap>, // Maximum number of results to collect pub max_results: usize, - // Collected events - pub collected: Vec, + // Positions of the events returned by the last poll, used to compute the next poll's delta + pub seen_positions: SeenEventPositions, } impl From<&EventFilter> for ParsedFilter { @@ -72,7 +72,7 @@ impl EventFilterManager { addresses: pf.addresses, keys_with_codec: pf.keys, max_results: self.max_filter_results, - collected: vec![], + seen_positions: Default::default(), }); self.filters.write().insert(id, filter.clone()); diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index b97a14bf7a3..da2dd8e23f1 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -29,7 +29,6 @@ use crate::blocks::TipsetKey; use crate::chain::index::ResolveNullTipset; use crate::cli_shared::cli::EventsConfig; use crate::prelude::*; -use crate::rpc::eth::EVM_WORD_LENGTH; use crate::rpc::eth::errors::EthErrors; use crate::rpc::eth::filter::event::*; use crate::rpc::eth::filter::mempool::*; @@ -130,6 +129,12 @@ pub enum SkipEvent { Never, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum EventRevertStatus { + Applied, + Reverted, +} + impl EthEventHandler { pub fn new() -> Self { let config = EventsConfig::default(); @@ -341,12 +346,33 @@ impl EthEventHandler { skip_event: SkipEvent, collected_events: &mut Vec, ) -> anyhow::Result<()> { - let msg_cid_filter = spec.and_then(|s| s.msg_cid_filter()).copied(); - let height = tipset.epoch(); - let tipset_key = tipset.key(); let ExecutedTipset { executed_messages, .. } = state_manager.load_executed_tipset_for_rpc(tipset).await?; + Self::collect_events_from_messages( + state_manager, + tipset, + &executed_messages, + spec, + skip_event, + EventRevertStatus::Applied, + collected_events, + ) + .await + } + + pub async fn collect_events_from_messages( + state_manager: &StateManager, + tipset: &Tipset, + executed_messages: &[ExecutedMessage], + spec: Option<&impl Matcher>, + skip_event: SkipEvent, + revert_status: EventRevertStatus, + collected_events: &mut Vec, + ) -> anyhow::Result<()> { + let msg_cid_filter = spec.and_then(|s| s.msg_cid_filter()).copied(); + let height = tipset.epoch(); + let tipset_key = tipset.key(); let mut resolved_id_addrs = HashMap::default(); let mut event_count = 0; for ( @@ -417,7 +443,7 @@ impl EthEventHandler { entries, emitter_addr: resolved, event_idx, - reverted: false, + reverted: matches!(revert_status, EventRevertStatus::Reverted), height, tipset_key: tipset_key.clone(), msg_idx: msg_idx as u64, @@ -565,50 +591,6 @@ impl EthFilterSpec { } } -impl Matcher for EthFilterSpec { - fn matches( - &self, - emitter_addr: &crate::shim::address::Address, - entries: &[Entry], - ) -> anyhow::Result { - fn get_word(value: &[u8]) -> Option<&[u8; EVM_WORD_LENGTH]> { - value.get(..EVM_WORD_LENGTH)?.try_into().ok() - } - - let eth_emitter_addr = EthAddress::from_filecoin_address(emitter_addr)?; - - let match_addr = match self.address { - Some(ref address_list) => { - if address_list.is_empty() { - true - } else { - address_list.iter().any(|other| other == ð_emitter_addr) - } - } - None => true, - }; - - let match_topics = if let Some(spec) = self.topics.as_ref() { - entries.iter().enumerate().all(|(i, entry)| { - if let Some(slice) = get_word(entry.value()) { - let hash: EthHash = (*slice).into(); - match spec.0.get(i) { - Some(EthHashList::List(vec)) => vec.contains(&hash), - Some(EthHashList::Single(Some(h))) => h == &hash, - _ => true, /* wildcard */ - } - } else { - // Drop events with mis-sized topics - false - } - }) - } else { - true - }; - Ok(match_addr && match_topics) - } -} - // TODO(forest): https://github.com/ChainSafe/forest/issues/6411 fn parse_block_range( heaviest: ChainEpoch, @@ -883,19 +865,6 @@ mod tests { ); } - #[test] - fn test_empty_address_list() { - let empty_list_spec = EthFilterSpec { - address: Some(vec![].into()), // Empty list, not None - ..Default::default() - }; - - let addr = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - - // Updated to match Lotus behavior: empty list = wildcard (matches all) - assert!(empty_list_spec.matches(&addr, &[]).unwrap()); - } - #[test] fn test_parse_eth_filter_spec_with_none_address() { let eth_filter_spec = EthFilterSpec { @@ -935,51 +904,6 @@ mod tests { ); } - #[test] - fn test_lotus_compatible_address_behavior() { - // Test the Lotus-compatible behavior: empty list = wildcard - let addr = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - - // Case 1: None (omitted) = wildcard - let none_spec = EthFilterSpec { - address: None, - ..Default::default() - }; - assert!( - none_spec.matches(&addr, &[]).unwrap(), - "None should match all addresses" - ); - - // Case 2: Empty list = wildcard (Lotus behavior) - let empty_spec = EthFilterSpec { - address: Some(vec![].into()), - ..Default::default() - }; - assert!( - empty_spec.matches(&addr, &[]).unwrap(), - "Empty list should match all addresses (Lotus compatible)" - ); - - // Case 3: Specific address = only that address - let eth_addr = EthAddress::from_filecoin_address(&addr).unwrap(); - let specific_spec = EthFilterSpec { - address: Some(vec![eth_addr].into()), - ..Default::default() - }; - assert!( - specific_spec.matches(&addr, &[]).unwrap(), - "Specific address should match itself" - ); - - // Case 4: Different address = no match - let different_addr = - Address::from_str("t410fe2jx2wo3irrsktetbvptcnj7csvitihxyehuaeq").unwrap(); - assert!( - !specific_spec.matches(&different_addr, &[]).unwrap(), - "Specific address should not match different address" - ); - } - #[test] fn test_eth_filter_spec_default_has_none_values() { let default_spec = EthFilterSpec::default(); @@ -1005,14 +929,6 @@ mod tests { default_spec.block_hash.is_none(), "Default EthFilterSpec should have None block_hash" ); - - // Verify that the default spec matches any address (wildcard behavior) - let addr0 = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - let addr1 = Address::from_str("t410fe2jx2wo3irrsktetbvptcnj7csvitihxyehuaeq").unwrap(); - - // Test with no entries - assert!(default_spec.matches(&addr0, &[]).unwrap()); - assert!(default_spec.matches(&addr1, &[]).unwrap()); } #[test] @@ -1345,198 +1261,6 @@ mod tests { } } - #[test] - fn test_do_match_address() { - let empty_spec = EthFilterSpec::default(); - - let addr0 = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - let eth_addr0 = EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap(); - - let addr1 = Address::from_str("t410fe2jx2wo3irrsktetbvptcnj7csvitihxyehuaeq").unwrap(); - let eth_addr1 = EthAddress::from_str("0x26937d59db4463254c930d5f31353f14aa89a0f7").unwrap(); - - let entries0 = vec![ - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t1".into(), - IPLD_RAW, - vec![ - 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, - 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t2".into(), - IPLD_RAW, - vec![ - 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, - 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "d".into(), - IPLD_RAW, - vec![ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, - 254, 169, 229, 74, 6, 24, 52, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 13, 232, 134, 151, 206, 121, 139, 231, 226, 192, - ], - ), - ]; - - // Matching an empty spec - assert!(empty_spec.matches(&addr0, &[]).unwrap()); - - assert!(empty_spec.matches(&addr0, &entries0).unwrap()); - - // Matching the given address 0 - let spec0 = EthFilterSpec { - address: Some(vec![eth_addr0].into()), - ..Default::default() - }; - - assert!(spec0.matches(&addr0, &[]).unwrap()); - - assert!(!spec0.matches(&addr1, &[]).unwrap()); - - // Matching the given address 0 or 1 - let spec1 = EthFilterSpec { - address: Some(vec![eth_addr0, eth_addr1].into()), - ..Default::default() - }; - - assert!(spec1.matches(&addr0, &[]).unwrap()); - - assert!(spec1.matches(&addr1, &[]).unwrap()); - } - - #[test] - fn test_do_match_topic() { - let addr0 = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - - let entries0 = vec![ - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t1".into(), - IPLD_RAW, - vec![ - 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, - 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t2".into(), - IPLD_RAW, - vec![ - 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, - 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "d".into(), - IPLD_RAW, - vec![ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, - 254, 169, 229, 74, 6, 24, 52, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 13, 232, 134, 151, 206, 121, 139, 231, 226, 192, - ], - ), - ]; - - let topic0 = - EthHash::from_str("0xe24720f45cb74f2d55f1deebb6098f50f10b511dab8a7d47c4819a08dcd0b895") - .unwrap(); - - let topic1 = - EthHash::from_str("0x7404e3d104ea7841c3d9e6fd20adfe99b4ad586bc08d8f3bd3afef894cf184de") - .unwrap(); - - let topic2 = - EthHash::from_str("0x000000000000000000000000d0fb381fc644cdd5d694d35e1afb445527b9244b") - .unwrap(); - - let topic3 = - EthHash::from_str("0x00000000000000000000000092c3b379c217fdf8603884770e83fded7b7410f8") - .unwrap(); - - let spec1 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::Single(None)])), - ..Default::default() - }; - - assert!(spec1.matches(&addr0, &entries0).unwrap()); - - let spec2 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![ - EthHashList::Single(None), - EthHashList::Single(None), - ])), - ..Default::default() - }; - - assert!(spec2.matches(&addr0, &entries0).unwrap()); - - let spec2 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic0))])), - ..Default::default() - }; - - assert!(spec2.matches(&addr0, &entries0).unwrap()); - - let spec3 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::List(vec![topic0])])), - ..Default::default() - }; - - assert!(spec3.matches(&addr0, &entries0).unwrap()); - - let spec4 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::List(vec![topic1, topic0])])), - ..Default::default() - }; - - assert!(spec4.matches(&addr0, &entries0).unwrap()); - - let spec5 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic1))])), - ..Default::default() - }; - - assert!(!spec5.matches(&addr0, &entries0).unwrap()); - - let spec6 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::List(vec![topic2, topic3])])), - ..Default::default() - }; - - assert!(!spec6.matches(&addr0, &entries0).unwrap()); - - let spec7 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![ - EthHashList::Single(Some(topic1)), - EthHashList::Single(Some(topic1)), - ])), - ..Default::default() - }; - - assert!(!spec7.matches(&addr0, &entries0).unwrap()); - - let spec8 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![ - EthHashList::Single(Some(topic0)), - EthHashList::Single(Some(topic1)), - EthHashList::Single(Some(topic3)), - ])), - ..Default::default() - }; - - assert!(!spec8.matches(&addr0, &entries0).unwrap()); - } - #[test] fn test_parsed_filter_match_address() { // Note that all the following addresses and topics (base64-encoded strings) come from real data on Calibnet, @@ -1743,12 +1467,6 @@ mod tests { assert!(!filter3.matches(&addr0, &entries0).unwrap()); } - #[test] - fn test_eth_filter_spec_msg_cid_filter_default_none() { - let spec = EthFilterSpec::default(); - assert!(spec.msg_cid_filter().is_none()); - } - #[test] fn test_parsed_filter_msg_cid_filter_returns_field() { let pf_none = ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range(0..=0)); @@ -1794,4 +1512,73 @@ mod tests { assert!(ensure_filter_cap(100, 3, 101).is_err()); } + + #[tokio::test] + async fn test_collect_events_from_messages_sets_revert_status() { + use crate::blocks::{CachingBlockHeader, RawBlockHeader}; + use crate::chain::ChainStore; + use crate::db::MemoryDB; + use crate::message::ChainMessage; + use crate::networks::ChainConfig; + use crate::shim::executor::Receipt; + use crate::shim::message::Message; + + let db = Arc::new(MemoryDB::default()); + let genesis_header = CachingBlockHeader::new(RawBlockHeader { + miner_address: Address::new_id(0), + // A zero genesis timestamp is rejected by the beacon schedule. + timestamp: 7777, + ..Default::default() + }); + let chain_store = + ChainStore::new(db, Arc::new(ChainConfig::default()), genesis_header).unwrap(); + let tipset = chain_store.heaviest_tipset(); + let state_manager = StateManager::new(chain_store).unwrap(); + + let event = StampedEvent::V4(fvm_shared4::event::StampedEvent::new( + 1234, + fvm_shared4::event::ActorEvent { + entries: vec![fvm_shared4::event::Entry { + flags: Flags::FLAG_INDEXED_ALL, + key: "t1".into(), + codec: IPLD_RAW, + value: vec![0xab; 32], + }], + }, + )); + let executed_messages = vec![ExecutedMessage { + message: ChainMessage::Unsigned(Message::default().into()), + receipt: Receipt::V4(fvm_shared4::receipt::Receipt { + exit_code: fvm_shared4::error::ExitCode::OK, + return_data: Default::default(), + gas_used: 0, + events_root: None, + }), + events: Some(vec![event]), + }]; + + for (revert_status, expected_reverted) in [ + (EventRevertStatus::Applied, false), + (EventRevertStatus::Reverted, true), + ] { + let mut events = vec![]; + EthEventHandler::collect_events_from_messages( + &state_manager, + &tipset, + &executed_messages, + None::<&ParsedFilter>, + // The test genesis has no state tree, so the emitter cannot be resolved; + // fall back to its ID address instead of skipping the event. + SkipEvent::Never, + revert_status, + &mut events, + ) + .await + .unwrap(); + assert_eq!(events.len(), 1); + let event = events.first().unwrap(); + assert_eq!(event.reverted, expected_reverted); + assert_eq!(event.emitter_addr, Address::new_id(1234)); + } + } } diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index a1e47b64fe7..37d505da672 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -64,15 +64,23 @@ use crate::message_pool::MpoolUpdate; use crate::prelude::ShallowClone; use crate::rpc::RPCState; use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams}; -use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec}; +use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec, EthHashList, EthTopicSpec}; use crate::rpc::eth::{ - Block as EthBlock, TxInfo, eth_logs_with_filter, eth_tx_hash_from_signed_message, + Block as EthBlock, EthHash, EthLog, TxInfo, eth_logs_for_head_change, + eth_tx_hash_from_signed_message, }; use crate::utils::broadcast::subscription_stream; use futures::{Stream, StreamExt as _}; use jsonrpsee::core::SubscriptionResult; use jsonrpsee::{PendingSubscriptionSink, SubscriptionSink}; use std::sync::Arc; +use tokio::sync::broadcast; + +/// A cap on the number of in-flight per-tipset log batches in the shared logs feed. +const LOGS_FEED_CAP: usize = 256; + +/// Sender half of the shared logs feed; see [`RPCState::eth_logs_feed`]. +pub type LogsFeed = broadcast::Sender>>; #[derive(derive_more::Constructor)] pub struct EthPubSub { @@ -147,25 +155,91 @@ fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc) { tokio::spawn(pipe_stream_to_sink(stream, sink)); } -fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option) { - let stream = head_message_tipsets(&ctx) - .filter_map(move |ts| { - let ctx = ctx.shallow_clone(); - let filter = filter.clone(); - async move { - eth_logs_with_filter(&ctx, &ts, filter) - .await - .inspect_err(|e| { - tracing::error!("Failed to fetch logs for tipset {}: {e:#}", ts.key()) - }) - .ok() +/// Drives the shared logs feed for every chain head change, collects the Ethereum logs of the affected tipsets +async fn run_logs_feed(ctx: Arc, feed: LogsFeed) { + let mut head_changes = subscription_stream(ctx.chain_store().subscribe_head_changes()); + while let Some(changes) = head_changes.next().await { + // Collecting events is not free; skip the work entirely while no subscription is live. + if feed.receiver_count() == 0 { + continue; + } + for change in changes.into_change_vec() { + match eth_logs_for_head_change(&ctx, &change).await { + Ok(logs) => { + if !logs.is_empty() { + let _ = feed.send(Arc::new(logs)); + } + } + Err(e) => { + tracing::error!( + "Failed to collect logs for head change {}: {e:#}", + change.tipset().key() + ); + } } + } + } +} + +/// Returns a receiver of the shared logs feed, starting the feed task on first use. +fn subscribe_logs_feed(ctx: &Arc) -> broadcast::Receiver>> { + ctx.eth_logs_feed + .get_or_init(|| { + let (tx, _) = broadcast::channel(LOGS_FEED_CAP); + tokio::spawn(run_logs_feed(ctx.clone(), tx.clone())); + tx + }) + .subscribe() +} + +fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option) { + let rx = subscribe_logs_feed(&ctx); + let stream = subscription_stream(rx) + .flat_map(move |logs| { + let matched: Vec = logs + .iter() + .filter(|log| filter.as_ref().is_none_or(|spec| log_matches(spec, log))) + .cloned() + .collect(); + futures::stream::iter(matched) }) - .flat_map(futures::stream::iter) .boxed(); tokio::spawn(pipe_stream_to_sink(stream, sink)); } +/// Whether `log` matches the filter `spec` +/// +/// - `address`: an absent or empty list matches any address; otherwise `logs` address must be +/// in the list. +/// - `topics`: each position is a set of accepted hashes. +/// An empty set (a `null` or empty-list position) is a wildcard that imposes no constraint, even +/// past the log's topics; a non-empty set requires `log` to have a topic at that index within the +/// set. +fn log_matches(spec: &EthFilterSpec, log: &EthLog) -> bool { + let address_ok = spec + .address + .as_ref() + .is_none_or(|list| list.is_empty() || list.contains(&log.address)); + + let topics_ok = spec.topics.as_ref().is_none_or(|EthTopicSpec(positions)| { + positions.iter().enumerate().all(|(i, position)| { + // A position is a set of accepted hashes; an empty set is a wildcard. + let accepted: &[EthHash] = match position { + EthHashList::List(hashes) => hashes, + EthHashList::Single(Some(hash)) => std::slice::from_ref(hash), + EthHashList::Single(None) => &[], + }; + accepted.is_empty() + || log + .topics + .get(i) + .is_some_and(|topic| accepted.contains(topic)) + }) + }); + + address_ok && topics_ok +} + fn spawn_pending_transactions(sink: SubscriptionSink, ctx: Arc) { let mpool_rx = ctx.mpool.subscribe_to_updates(); let eth_chain_id = ctx.chain_config().eth_chain_id; @@ -218,3 +292,194 @@ where } tracing::debug!("Subscription task ended (id: {:?})", sink.subscription_id()); } + +#[cfg(test)] +mod tests { + use super::*; + use crate::rpc::eth::{EthAddress, EthHash}; + use std::str::FromStr as _; + + fn eth_log(address: &EthAddress, topics: Vec) -> EthLog { + EthLog { + address: *address, + topics, + ..Default::default() + } + } + + fn address_0() -> EthAddress { + EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap() + } + + fn address_1() -> EthAddress { + EthAddress::from_str("0x26937d59db4463254c930d5f31353f14aa89a0f7").unwrap() + } + + fn topic(byte: u8) -> EthHash { + EthHash(ethereum_types::H256::from_slice(&[byte; 32])) + } + + #[test] + fn log_matches_address() { + let log = eth_log(&address_0(), vec![]); + + // Absent and empty address lists are wildcards (Lotus/go-ethereum behavior). + assert!(log_matches(&EthFilterSpec::default(), &log)); + let empty = EthFilterSpec { + address: Some(vec![].into()), + ..Default::default() + }; + assert!(log_matches(&empty, &log)); + + let specific = EthFilterSpec { + address: Some(vec![address_0()].into()), + ..Default::default() + }; + assert!(log_matches(&specific, &log)); + assert!(!log_matches(&specific, ð_log(&address_1(), vec![]))); + + // Any address in the list may match. + let either = EthFilterSpec { + address: Some(vec![address_0(), address_1()].into()), + ..Default::default() + }; + assert!(log_matches(&either, &log)); + assert!(log_matches(&either, ð_log(&address_1(), vec![]))); + } + + #[test] + fn log_matches_topics() { + let log = eth_log(&address_0(), vec![topic(1), topic(2)]); + + let with_topics = |positions: Vec| EthFilterSpec { + topics: Some(EthTopicSpec(positions)), + ..Default::default() + }; + + // Wildcards: null position, empty list position, fewer positions than topics. + assert!(log_matches(&with_topics(vec![]), &log)); + assert!(log_matches( + &with_topics(vec![EthHashList::Single(None)]), + &log + )); + assert!(log_matches( + &with_topics(vec![EthHashList::List(vec![])]), + &log + )); + + // Value in the first position. + assert!(log_matches( + &with_topics(vec![EthHashList::Single(Some(topic(1)))]), + &log + )); + assert!(!log_matches( + &with_topics(vec![EthHashList::Single(Some(topic(2)))]), + &log + )); + + // OR within a position. + assert!(log_matches( + &with_topics(vec![EthHashList::List(vec![topic(9), topic(1)])]), + &log + )); + assert!(!log_matches( + &with_topics(vec![EthHashList::List(vec![topic(8), topic(9)])]), + &log + )); + + // AND across positions. + assert!(log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(Some(topic(2))), + ]), + &log + )); + assert!(!log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(Some(topic(9))), + ]), + &log + )); + + // A trailing wildcard position imposes no constraint, even past the log's topics — + // matching Anvil (reth), Lotus, and Forest's eth_getLogs. + assert!(log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(Some(topic(2))), + EthHashList::Single(None), + ]), + &log + )); + } + + #[test] + fn log_matches_trailing_wildcard_past_topics() { + // A log with a single topic, e.g. a no-indexed-arg event: topics = [signature]. + // These assertions mirror the empirically-confirmed Anvil (reth) behaviour. + let log = eth_log(&address_0(), vec![topic(1)]); + let with_topics = |positions: Vec| EthFilterSpec { + topics: Some(EthTopicSpec(positions)), + ..Default::default() + }; + + // [sig, null]: trailing wildcard does not require a second topic -> matches. + assert!(log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(None), + ]), + &log + )); + // [sig, []]: an empty list is also a wildcard -> matches. + assert!(log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::List(vec![]), + ]), + &log + )); + // [sig, value]: a constrained second position with no topic to match -> no match. + assert!(!log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(Some(topic(2))), + ]), + &log + )); + // Many trailing wildcards are still ignored. + assert!(log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(None), + EthHashList::List(vec![]), + ]), + &log + )); + } + + #[test] + fn log_matches_address_and_topics_combined() { + let log = eth_log(&address_0(), vec![topic(1)]); + let spec = EthFilterSpec { + address: Some(vec![address_0()].into()), + topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic(1)))])), + ..Default::default() + }; + assert!(log_matches(&spec, &log)); + + let wrong_address = EthFilterSpec { + address: Some(vec![address_1()].into()), + ..spec.clone() + }; + assert!(!log_matches(&wrong_address, &log)); + + let wrong_topic = EthFilterSpec { + topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic(9)))])), + ..spec + }; + assert!(!log_matches(&wrong_topic, &log)); + } +} diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 1591a9cf25f..90edda30f61 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -237,6 +237,7 @@ mod tests { bad_blocks: Some(Default::default()), sync_status: Default::default(), eth_event_handler: Arc::new(EthEventHandler::new()), + eth_logs_feed: Default::default(), sync_network_context, start_time, shutdown: mpsc::channel(1).0, // dummy for tests diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 44539798a8a..f64abfd8a71 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -501,6 +501,7 @@ pub struct RPCState { pub bad_blocks: Option, pub sync_status: crate::chain_sync::SyncStatus, pub eth_event_handler: Arc, + pub eth_logs_feed: std::sync::OnceLock, pub sync_network_context: SyncNetworkContext, pub tipset_send: flume::Sender, pub start_time: chrono::DateTime, diff --git a/src/state_manager/state_computation.rs b/src/state_manager/state_computation.rs index e05ae4c1aa7..7dbfe8126d4 100644 --- a/src/state_manager/state_computation.rs +++ b/src/state_manager/state_computation.rs @@ -32,22 +32,48 @@ impl StateManager { } } - /// Load an executed tipset for RPC methods, with state computation unless explicitly enabled. - pub async fn load_executed_tipset_for_rpc( - &self, - ts: &Tipset, - ) -> anyhow::Result { + /// State recomputation policy for RPC methods: recomputation is disabled unless explicitly + /// enabled via the environment. + fn rpc_state_recompute_policy() -> StateRecomputePolicy { crate::def_is_env_truthy!( enable_state_computation, "FOREST_ETH_RPC_COMPUTE_STATE_ON_INDEX_MISS" ); - let policy = if enable_state_computation() { + + if enable_state_computation() { StateRecomputePolicy::Allowed } else { StateRecomputePolicy::Disallowed - }; + } + } + + /// Load an executed tipset for RPC methods, with state computation unless explicitly enabled. + pub async fn load_executed_tipset_for_rpc( + &self, + ts: &Tipset, + ) -> anyhow::Result { + self.load_executed_tipset_with_cache(ts, Self::rpc_state_recompute_policy()) + .await + } - self.load_executed_tipset_with_cache(ts, policy).await + /// Load an executed tipset using an explicitly provided receipt (child) tipset instead of + /// resolving the child on the current heaviest chain. This is required when serving events + /// for tipsets that are no longer canonical. + pub async fn load_executed_tipset_with_receipt( + &self, + msg_ts: &Tipset, + receipt_ts: &Tipset, + ) -> anyhow::Result { + self.cache + .get_or_insert_async(msg_ts.key(), async move { + self.load_executed_tipset_inner( + msg_ts, + Some(receipt_ts), + Self::rpc_state_recompute_policy(), + ) + .await + }) + .await } /// Load an executed tipset, including state root, message receipts and events with caching. diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 41993ba87c0..2bacc33897e 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -106,6 +106,7 @@ pub async fn offline_rpc_state( bad_blocks: Default::default(), sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), + eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), shutdown, diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index ae3f6460526..43395b6f6bc 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -144,6 +144,7 @@ async fn ctx( bad_blocks: Default::default(), sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), + eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), shutdown, diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index 8df22c54264..431a8ce8bed 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -13,10 +13,12 @@ use crate::shim::{address::Address, message::Message}; use anyhow::{Context, ensure}; use cbor4ii::core::Value; use cid::Cid; +use ethereum_types::H256; use futures::{SinkExt, StreamExt}; use serde_json::json; use tokio::time::Duration; use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage}; +use tokio_util::sync::CancellationToken; use std::io::{self, Write}; use std::pin::Pin; @@ -575,59 +577,270 @@ struct LogView { transaction_hash: EthHash, } +/// One opened logs subscription paired with the events it is expected to deliver. +struct CaseSub { + label: &'static str, + ws: EthSubStream, + sub_id: serde_json::Value, + /// Event signatures (each log's `topic[0]`) this filter should deliver for our mint. + expected: Vec, +} + +/// Read notifications until one for our mint tx (`our_tx`) arrives; unrelated logs are +/// skipped. Returns `None` if the `timeout` window elapses (or the socket closes) first. +async fn next_our_log( + ws: &mut EthSubStream, + sub_id: &serde_json::Value, + our_tx: &EthHash, + timeout: Duration, +) -> anyhow::Result> { + while let Ok(payload) = next_subscription_payload(ws, sub_id, timeout).await { + ensure!( + payload.is_object(), + "logs must yield a single log object, got: {payload}" + ); + let log: LogView = + serde_json::from_value(payload).context("logs payload is not an Eth log")?; + if &log.transaction_hash == our_tx { + return Ok(Some(log)); + } + } + Ok(None) +} + +/// Backstop for a single subscription read. The coordinator's cancellation normally ends a +/// drain first; this only bounds a read if the mint never executes. +const LOGS_DELIVERY_TIMEOUT: Duration = Duration::from_secs(100); + +/// Drain one case subscription until the coordinator signals `stop` (a settle window after the mint's receipt +/// lands), then assert the set of event signatures it delivered for our mint equals +/// `expected`. An empty `expected` asserts the filter delivered nothing. +async fn verify_case( + label: &str, + ws: &mut EthSubStream, + sub_id: &serde_json::Value, + our_tx: &EthHash, + expected: &[EthHash], + stop: &CancellationToken, +) -> anyhow::Result<()> { + let mut got = Vec::new(); + loop { + tokio::select! { + _ = stop.cancelled() => break, + log = next_our_log(ws, sub_id, our_tx, LOGS_DELIVERY_TIMEOUT) => { + if let Some(log) = log? { + got.push(*log.topics.first().context("log is missing topic[0]")?); + } + } + } + } + // Reorgs re-deliver the same event (apply -> revert -> re-apply); assert the *set* of + // event signatures delivered, not the count. + got.sort(); + got.dedup(); + let mut want = expected.to_vec(); + want.sort(); + ensure!(got == want, "{label}: expected {want:?}, got {got:?}"); + Ok(()) +} + fn eth_subscribe_logs(tx: TestTransaction) -> RpcTestScenario { RpcTestScenario::basic(move |client| { + // Once the txn lands, how long to let the log feed push to every subscription before we stop draining. + const SETTLE: Duration = Duration::from_secs(10); + let tx = tx.clone(); async move { - let filter = json!({ - "address": [], - "topics": [tx.topic.to_string()], - }); - let (mut ws_stream, subscription_id) = - open_eth_subscription(&client, SubscriptionKind::Logs, Some(filter)).await?; + // A valid address that is not the contract. + const WRONG_ADDRESS: &str = "0x000000000000000000000000000000000000dead"; + // The suite already passes Mint's signature as `topic`, so reuse it. + // Transfer's signature isn't passed in derive it from the event string. + let mint_topic = tx.topic; + let transfer = EthHash(H256::from(crate::utils::encoding::keccak_256( + b"Transfer(address,address,uint256)", + ))); + let contract = EthAddress::from_filecoin_address(&tx.to)?; + + // The values we filter on come straight from the `mint(address,uint256)` + // calldata (`--payload`): a 4-byte selector, then the 32-byte `to` and `amount` + // words. + let v_to = EthHash(H256::from_slice( + tx.payload + .get(4..36) + .context("mint calldata missing `to`")?, + )); + let v_amount = EthHash(H256::from_slice( + tx.payload + .get(36..68) + .context("mint calldata missing `amount`")?, + )); + let v_zero = EthHash::default(); // address(0) + + // Contract address as a `0x..` string, for embedding in filters. + let c = format!("{:#x}", contract.0); + + // Every topic case scopes to the contract address; only `topics` varies. + let by_topics = + |topics: serde_json::Value| json!({ "address": [c.as_str()], "topics": topics }); + + // Expected event sets, named for the assertion site. + let both_topics = vec![mint_topic, transfer]; + let only_mint_topic = vec![mint_topic]; + let only_transfer_topic = vec![transfer]; + let none: Vec = vec![]; + + // (label, filter, expected events), grouped by the dimension it exercises. + let specs: Vec<(&'static str, serde_json::Value, Vec)> = vec![ + // --- address --- + ( + "address empty list (wildcard)", + json!({ "address": [], "topics": null }), + both_topics.clone(), + ), + ( + "address [contract, other]", + json!({ "address": [c.as_str(), WRONG_ADDRESS], "topics": null }), + both_topics.clone(), + ), + ( + "address non-matching", + json!({ "address": [WRONG_ADDRESS], "topics": null }), + none.clone(), + ), + // --- topic[0] = event signature --- + ( + "topic0 = Mint", + by_topics(json!([mint_topic.to_string()])), + only_mint_topic.clone(), + ), + ( + "topic0 = Transfer", + by_topics(json!([transfer.to_string()])), + only_transfer_topic.clone(), + ), + ( + "topic0 OR [Mint, Transfer]", + by_topics(json!([[mint_topic.to_string(), transfer.to_string()]])), + both_topics.clone(), + ), + ( + "topic0 empty-list wildcard", + by_topics(json!([[]])), + both_topics.clone(), + ), + ( + "topic0 null wildcard", + by_topics(json!([null])), + both_topics.clone(), + ), + // --- trailing wildcard / positions past the log's topics --- + ( + "Mint + trailing null", + by_topics(json!([mint_topic.to_string(), null])), + only_mint_topic.clone(), + ), + ( + "Mint + null past topics", + by_topics(json!([mint_topic.to_string(), v_to.to_string(), null])), + only_mint_topic.clone(), + ), + ( + "Mint + value past topics (no match)", + by_topics(json!([ + mint_topic.to_string(), + v_to.to_string(), + v_to.to_string() + ])), + none.clone(), + ), + // --- indexed values: AND across positions, OR within, positional --- + ( + "topic1 = to (only Mint)", + by_topics(json!([null, v_to.to_string()])), + only_mint_topic.clone(), + ), + ( + "topic1 = 0x0 (only Transfer)", + by_topics(json!([null, v_zero.to_string()])), + only_transfer_topic.clone(), + ), + ( + "topic2 = to (only Transfer)", + by_topics(json!([null, null, v_to.to_string()])), + only_transfer_topic.clone(), + ), + ( + "Transfer AND from=0 AND to", + by_topics(json!([ + transfer.to_string(), + v_zero.to_string(), + v_to.to_string() + ])), + only_transfer_topic.clone(), + ), + ( + "Transfer AND topic1=to (mismatch)", + by_topics(json!([transfer.to_string(), v_to.to_string()])), + none.clone(), + ), + ( + "(Mint|Transfer) AND topic1=to", + by_topics(json!([ + [mint_topic.to_string(), transfer.to_string()], + v_to.to_string() + ])), + only_mint_topic.clone(), + ), + // --- data is never matched as a topic --- + ( + "topic1 = amount (in data, no match)", + by_topics(json!([null, v_amount.to_string()])), + none.clone(), + ), + ]; + let mut subs = Vec::with_capacity(specs.len()); + for (label, filter, expected) in specs { + let (ws, sub_id) = + open_eth_subscription(&client, SubscriptionKind::Logs, Some(filter)).await?; + subs.push(CaseSub { + label, + ws, + sub_id, + expected, + }); + } - // Emit the event on-chain and remember the exact tx that produced it, - // so we can confirm the log we receive is `ours` + // Emit the operator's `mint` tx; it produces the Mint + Transfer logs. let cid = invoke_contract(&client, &tx).await?; let tx_hash = client .call(EthGetTransactionHashByCid::request((cid,))?) .await? .context("no Eth transaction hash for CID")?; - - // Logs are delivered when the tipset holding the event is applied, - // which can take a few epochs (~30s each) on calibnet - let watch = async { - loop { - let payload = next_subscription_payload( - &mut ws_stream, - &subscription_id, - Duration::from_secs(300), - ) - .await?; - // A logs notification is a single log object (one per log, - // matching geth/reth/Lotus) — not an array. - anyhow::ensure!( - payload.is_object(), - "logs must yield a single log object, got: {payload}" - ); - let log: LogView = serde_json::from_value(payload) - .context("logs payload is not an Eth log")?; - // Identity: the log must carry our event topic and `our` tx hash. - if log.transaction_hash == tx_hash && log.topics.contains(&tx.topic) { - break; - } + // Drain every subscription concurrently so none sits idle while we wait. + // All subs receive the same logs at once, so the coordinator just waits for the mint to execute, + // and then we wait for some `SETTLE` time and then stop as soon as the logs are delivered. + let stop = CancellationToken::new(); + let coordinator = async { + let executed = wait_pending_message(&client, cid).await; + tokio::time::sleep(SETTLE).await; + stop.cancel(); + executed + }; + let drains = + futures::future::join_all(subs.iter_mut().map(|c| { + verify_case(c.label, &mut c.ws, &c.sub_id, &tx_hash, &c.expected, &stop) + })); + let (executed, case_results) = tokio::join!(coordinator, drains); + let outcome = executed.and_then(|()| { + for result in case_results { + result?; } anyhow::Ok(()) - }; - let outcome = tokio::time::timeout(Duration::from_secs(300), watch) - .await - .unwrap_or_else(|_| { - Err(anyhow::anyhow!( - "timed out waiting for our logs notification" - )) - }); + }); - let _ = close_eth_subscription(&mut ws_stream, &subscription_id).await; + for sub in &mut subs { + let _ = close_eth_subscription(&mut sub.ws, &sub.sub_id).await; + } outcome } }) @@ -959,7 +1172,7 @@ pub(super) async fn create_tests(tx: TestTransaction) -> Vec { EthGetTransactionHashByCid ), with_methods!( - eth_subscribe_logs(tx.clone()).name("eth_subscribe logs works"), + eth_subscribe_logs(tx.clone()).name("eth_subscribe logs filter matrix"), EthSubscribe, EthUnsubscribe, EthGetTransactionHashByCid diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index ecd91281b81..7607a50893d 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -173,6 +173,7 @@ async fn ctx( bad_blocks: Default::default(), sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), + eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), shutdown,