From c33fd45a253d93daa86db1c6f7168bcca9bc2108 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 18 Jul 2025 21:16:33 +0530 Subject: [PATCH 1/3] refactor: eth subscribe implementation --- Cargo.lock | 15 ++ Cargo.toml | 2 +- src/rpc/methods/eth.rs | 1 + src/rpc/methods/eth/pubsub.rs | 219 +++++++++------------------- src/rpc/methods/eth/pubsub_trait.rs | 43 ++++++ src/rpc/methods/eth/types.rs | 2 +- src/rpc/mod.rs | 28 +--- 7 files changed, 137 insertions(+), 173 deletions(-) create mode 100644 src/rpc/methods/eth/pubsub_trait.rs diff --git a/Cargo.lock b/Cargo.lock index 13e49335790b..18c28de17c82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4831,10 +4831,12 @@ checksum = "1fba77a59c4c644fd48732367624d1bcf6f409f9c9a286fbc71d2f1fc0b2ea16" dependencies = [ "jsonrpsee-core", "jsonrpsee-http-client", + "jsonrpsee-proc-macros", "jsonrpsee-server", "jsonrpsee-types", "jsonrpsee-ws-client", "tokio", + "tracing", ] [[package]] @@ -4910,6 +4912,19 @@ dependencies = [ "url", ] +[[package]] +name = "jsonrpsee-proc-macros" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fa4f5daed39f982a1bb9d15449a28347490ad42b212f8eaa2a2a344a0dce9e9" +dependencies = [ + "heck 0.5.0", + "proc-macro-crate 3.3.0", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "jsonrpsee-server" version = "0.25.1" diff --git a/Cargo.toml b/Cargo.toml index e7c77be8a576..53a094623f12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,7 +107,7 @@ integer-encoding = "4.0" ipld-core = { version = "0.4", features = ["serde", "arb"] } is-terminal = "0.4" itertools = "0.14" -jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client"] } +jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client", "macros"] } jsonwebtoken = "9" keccak-hash = "0.11" kubert-prometheus-process = "0.2" diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index f3856ca68660..d8b4c4b799c6 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -5,6 +5,7 @@ pub(crate) mod errors; mod eth_tx; pub mod filter; pub mod pubsub; +pub(crate) mod pubsub_trait; mod trace; pub mod types; mod utils; diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index eba6c267d83b..c1f56798201e 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -59,172 +59,89 @@ //! ``` //! -use std::fmt; - +use crate::rpc::eth::pubsub_trait::{ + EthPubSubApiServer, LogFilter, SubscriptionKind, SubscriptionParams, +}; +use crate::rpc::{RPCState, chain}; use fvm_ipld_blockstore::Blockstore; -use serde::de::{self, Deserializer, SeqAccess, Visitor}; -use serde::{Deserialize, Serialize}; +use jsonrpsee::PendingSubscriptionSink; +use jsonrpsee::core::{SubscriptionError, SubscriptionResult}; +use std::sync::Arc; use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError}; -use crate::rpc::Ctx; -use crate::rpc::eth::types::EthAddressList; -use crate::rpc::eth::{EthFilterSpec, EthTopicSpec}; - -pub const ETH_SUBSCRIPTION: &str = "eth_subscription"; - -const NEW_HEADS: &str = "newHeads"; -const PENDING_TRANSACTIONS: &str = "pendingTransactions"; -const LOGS: &str = "logs"; - -#[derive(Default, Serialize, Deserialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub struct LogFilter { - pub address: EthAddressList, - pub topics: Option, +pub struct EthPubSub { + ctx: Arc>, } -#[derive(Debug)] -enum Subscription { - NewHeads, - PendingTransactions, - Logs(Option), +impl EthPubSub { + pub fn new(ctx: Arc>) -> Self { + Self { ctx } + } } -impl<'de> Deserialize<'de> for Subscription { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct SubscriptionVisitor; - - impl<'de> Visitor<'de> for SubscriptionVisitor { - type Value = Subscription; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str(r#"a JSON array like ["logs", {...}] or ["newHeads"]"#) +#[async_trait::async_trait] +impl EthPubSubApiServer for EthPubSub +where + DB: Blockstore + Send + Sync + 'static, +{ + async fn subscribe( + &self, + pending: PendingSubscriptionSink, + kind: SubscriptionKind, + params: Option, + ) -> SubscriptionResult { + let sink = pending.accept().await?; + let ctx = self.ctx.clone(); + + match kind { + SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await, + SubscriptionKind::PendingTransactions => { + // TODO(akaladarshi): https://github.com/ChainSafe/forest/pull/5782 + return Err(SubscriptionError::from( + jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::error::METHOD_NOT_FOUND_CODE, + "pendingTransactions subscription not yet implemented", + None::<()>, + ), + )); } - - fn visit_seq(self, mut seq: V) -> Result - where - V: SeqAccess<'de>, - { - let event_type: String = seq - .next_element()? - .ok_or_else(|| de::Error::invalid_length(0, &self))?; - - match event_type.as_str() { - NEW_HEADS => { - if seq.next_element::()?.is_some() { - return Err(de::Error::custom("unsupported event type")); - } - Ok(Subscription::NewHeads) - } - PENDING_TRANSACTIONS => { - if seq.next_element::()?.is_some() { - return Err(de::Error::custom("unsupported event type")); - } - Ok(Subscription::PendingTransactions) - } - LOGS => Ok(Subscription::Logs(seq.next_element()?)), - _ => Err(de::Error::unknown_variant( - &event_type, - &[NEW_HEADS, PENDING_TRANSACTIONS, LOGS], - )), - } + SubscriptionKind::Logs => { + let filter = params.and_then(|p| p.filter); + self.handle_logs_subscription(sink, ctx, filter).await } } - deserializer.deserialize_seq(SubscriptionVisitor) + Ok(()) } } -pub async fn eth_subscribe( - params: jsonrpsee::types::Params<'static>, - pending: jsonrpsee::core::server::PendingSubscriptionSink, - ctx: Ctx, - _ext: http::Extensions, -) -> impl jsonrpsee::IntoSubscriptionCloseResponse { - let subscription: Subscription = match params.parse() { - Ok(sub) => sub, - Err(e) => { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) - .await; - // If the subscription has not been "accepted" then - // the return value will be "ignored" as it's not - // allowed to send out any further notifications on - // on the subscription. - return Ok(()); - } - }; - - tracing::trace!("Subscribing to event: {:?}", subscription); - - match subscription { - Subscription::NewHeads => { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = match pending.accept().await { - Ok(sink) => sink, - Err(e) => { - tracing::error!("Failed to accept subscription: {:?}", e); - return Ok(()); - } - }; - - // Spawn newHeads task - let (new_heads, handle) = crate::rpc::new_heads(&ctx); - - tokio::spawn(async move { - tracing::trace!( - "Subscription task started (id: {:?})", - sink.subscription_id() - ); - - handle_subscription(new_heads, sink, handle).await; - }); - } - Subscription::Logs(filter) => { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = match pending.accept().await { - Ok(sink) => sink, - Err(e) => { - tracing::error!("Failed to accept subscription: {:?}", e); - return Ok(()); - } - }; - - let filter_spec: Option = filter.map(Into::into); - - // Spawn logs task - let (logs, handle) = crate::rpc::chain::logs(&ctx, filter_spec); - - tokio::spawn(async move { - tracing::trace!( - "Logs subscription task started (id: {:?})", - sink.subscription_id() - ); - - handle_subscription(logs, sink, handle).await; - }); - } - Subscription::PendingTransactions => { - // TODO(akaladarshi): https://github.com/ChainSafe/forest/pull/5782 - pending - .reject(jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::error::METHOD_NOT_FOUND_CODE, - "pendingTransactions subscription not yet implemented", - None::<()>, - )) - .await; - return Ok(()); - } +impl EthPubSub +where + DB: Blockstore + Send + Sync + 'static, +{ + async fn handle_new_heads_subscription( + &self, + accepted_sink: jsonrpsee::SubscriptionSink, + ctx: Arc>, + ) { + let (subscriber, handle) = chain::new_heads(&ctx); + tokio::spawn(async move { + handle_subscription(subscriber, accepted_sink, handle).await; + }); } - Ok(()) + async fn handle_logs_subscription( + &self, + accepted_sink: jsonrpsee::SubscriptionSink, + ctx: Arc>, + filter_spec: Option, + ) { + let filter_spec = filter_spec.map(Into::into); + let (logs, handle) = chain::logs(&ctx, filter_spec); + tokio::spawn(async move { + handle_subscription(logs, accepted_sink, handle).await; + }); + } } async fn handle_subscription( @@ -266,5 +183,5 @@ async fn handle_subscription( } handle.abort(); - tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); + tracing::info!("Subscription task ended (id: {:?})", sink.subscription_id()); } diff --git a/src/rpc/methods/eth/pubsub_trait.rs b/src/rpc/methods/eth/pubsub_trait.rs new file mode 100644 index 000000000000..286383653e31 --- /dev/null +++ b/src/rpc/methods/eth/pubsub_trait.rs @@ -0,0 +1,43 @@ +// Copyright 2019-2025 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT +// src/rpc/methods/eth/traits.rs +use crate::rpc::eth::types::{EthAddressList, EthTopicSpec}; +use jsonrpsee::proc_macros::rpc; +use serde::{Deserialize, Serialize}; + +#[rpc(server, namespace = "eth")] +pub trait EthPubSubApi { + /// Subscribe to Ethereum events + #[subscription( + name = "subscribe" => "subscription", + unsubscribe = "unsubscribe", + item = serde_json::Value + )] + async fn subscribe( + &self, + kind: SubscriptionKind, + params: Option, + ) -> jsonrpsee::core::SubscriptionResult; +} + +// Keep the existing types but make them more structured +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum SubscriptionKind { + NewHeads, + PendingTransactions, + Logs, +} + +#[derive(Default, Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct LogFilter { + pub address: EthAddressList, + pub topics: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubscriptionParams { + #[serde(flatten)] + pub filter: Option, +} diff --git a/src/rpc/methods/eth/types.rs b/src/rpc/methods/eth/types.rs index cb6d94e466e7..29087f9a67cb 100644 --- a/src/rpc/methods/eth/types.rs +++ b/src/rpc/methods/eth/types.rs @@ -3,7 +3,7 @@ use super::*; use crate::blocks::CachingBlockHeader; -use crate::rpc::eth::pubsub::LogFilter; +use crate::rpc::eth::pubsub_trait::LogFilter; use anyhow::ensure; use ipld_core::serde::SerdeError; use jsonrpsee::core::traits::IdProvider; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 3a09f47c956a..e4e1184b38dc 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,6 +1,7 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use crate::rpc::methods::eth::pubsub_trait::EthPubSubApiServer; mod auth_layer; mod channel; mod client; @@ -12,7 +13,6 @@ mod request; mod segregation_layer; mod set_extension_layer; -use crate::rpc::chain::new_heads; use crate::rpc::eth::types::RandomHexStringIdProvider; use crate::shim::clock::ChainEpoch; pub use client::Client; @@ -404,10 +404,7 @@ mod methods { use crate::rpc::auth_layer::AuthLayer; pub use crate::rpc::channel::CANCEL_METHOD_NAME; use crate::rpc::channel::RpcModule as FilRpcModule; -use crate::rpc::eth::{ - EthSubscribe, EthUnsubscribe, - pubsub::{ETH_SUBSCRIPTION, eth_subscribe}, -}; +use crate::rpc::eth::pubsub::EthPubSub; use crate::rpc::metrics_layer::MetricsLayer; use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore}; @@ -515,20 +512,6 @@ where let keystore = state.keystore.clone(); let mut module = create_module(state.clone()); - // Register `Filecoin.EthSubscribe` and related methods. - module.register_subscription( - EthSubscribe::NAME, - ETH_SUBSCRIPTION, - EthUnsubscribe::NAME, - eth_subscribe, - )?; - if let Some(alias) = EthSubscribe::NAME_ALIAS { - module.register_alias(alias, EthSubscribe::NAME)?; - } - if let Some(alias) = EthUnsubscribe::NAME_ALIAS { - module.register_alias(alias, EthUnsubscribe::NAME)?; - } - let mut pubsub_module = FilRpcModule::default(); pubsub_module.register_channel("Filecoin.ChainNotify", { @@ -662,7 +645,7 @@ fn create_module(state: Arc>) -> RpcModule> where DB: Blockstore + Send + Sync + 'static, { - let mut module = RpcModule::from_arc(state); + let mut module = RpcModule::from_arc(state.clone()); macro_rules! register { ($ty:ty) => { // Register only non-subscription RPC methods. @@ -675,6 +658,11 @@ where }; } for_each_rpc_method!(register); + + let eth_pubsub = EthPubSub::new(state.clone()); + module + .merge(eth_pubsub.into_rpc()) + .expect("Could not merge eth pubsub module"); module } From fb42c44bd1047c38be975c109f805a39eb5439f2 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 29 Jul 2025 12:09:33 +0530 Subject: [PATCH 2/3] add aliases for subscribe and unsubcribe API --- src/rpc/methods/eth/pubsub_trait.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/rpc/methods/eth/pubsub_trait.rs b/src/rpc/methods/eth/pubsub_trait.rs index 286383653e31..915f819f3d8a 100644 --- a/src/rpc/methods/eth/pubsub_trait.rs +++ b/src/rpc/methods/eth/pubsub_trait.rs @@ -1,6 +1,6 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -// src/rpc/methods/eth/traits.rs + use crate::rpc::eth::types::{EthAddressList, EthTopicSpec}; use jsonrpsee::proc_macros::rpc; use serde::{Deserialize, Serialize}; @@ -10,7 +10,9 @@ pub trait EthPubSubApi { /// Subscribe to Ethereum events #[subscription( name = "subscribe" => "subscription", + aliases = ["Filecoin.EthSubscribe"], unsubscribe = "unsubscribe", + unsubscribe_aliases = ["Filecoin.EthUnsubscribe"], item = serde_json::Value )] async fn subscribe( From 55f71381e2bf2eaa7e9e55b46224c6164a4304cb Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 29 Jul 2025 17:02:03 +0530 Subject: [PATCH 3/3] address comments --- src/rpc/methods/eth/pubsub.rs | 1 - src/rpc/methods/eth/pubsub_trait.rs | 1 - src/rpc/mod.rs | 11 +++++------ 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index c1f56798201e..ad614d91a4b8 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -96,7 +96,6 @@ where match kind { SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await, SubscriptionKind::PendingTransactions => { - // TODO(akaladarshi): https://github.com/ChainSafe/forest/pull/5782 return Err(SubscriptionError::from( jsonrpsee::types::ErrorObjectOwned::owned( jsonrpsee::types::error::METHOD_NOT_FOUND_CODE, diff --git a/src/rpc/methods/eth/pubsub_trait.rs b/src/rpc/methods/eth/pubsub_trait.rs index 915f819f3d8a..39bb57f4f0b1 100644 --- a/src/rpc/methods/eth/pubsub_trait.rs +++ b/src/rpc/methods/eth/pubsub_trait.rs @@ -22,7 +22,6 @@ pub trait EthPubSubApi { ) -> jsonrpsee::core::SubscriptionResult; } -// Keep the existing types but make them more structured #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum SubscriptionKind { diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index e4e1184b38dc..c14634d11850 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -512,6 +512,10 @@ where let keystore = state.keystore.clone(); let mut module = create_module(state.clone()); + // register eth subscription APIs + let eth_pubsub = EthPubSub::new(state.clone()); + module.merge(eth_pubsub.into_rpc())?; + let mut pubsub_module = FilRpcModule::default(); pubsub_module.register_channel("Filecoin.ChainNotify", { @@ -645,7 +649,7 @@ fn create_module(state: Arc>) -> RpcModule> where DB: Blockstore + Send + Sync + 'static, { - let mut module = RpcModule::from_arc(state.clone()); + let mut module = RpcModule::from_arc(state); macro_rules! register { ($ty:ty) => { // Register only non-subscription RPC methods. @@ -658,11 +662,6 @@ where }; } for_each_rpc_method!(register); - - let eth_pubsub = EthPubSub::new(state.clone()); - module - .merge(eth_pubsub.into_rpc()) - .expect("Could not merge eth pubsub module"); module }