diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 08e8ac8fb1e..287fd4fd16b 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -176,7 +176,7 @@ fn trace_params(params: impl jsonrpsee::core::traits::ToRpcParams) { /// Represents a single, perhaps persistent connection to a URL over which requests /// can be made using [`jsonrpsee`] primitives. -struct UrlClient { +pub struct UrlClient { url: Url, inner: UrlClientInner, } @@ -190,7 +190,7 @@ impl Debug for UrlClient { } impl UrlClient { - async fn new(url: Url, token: impl Into>) -> Result { + pub async fn new(url: Url, token: impl Into>) -> Result { const ONE_DAY: Duration = Duration::from_secs(24 * 3600); // we handle timeouts ourselves. let headers = match token.into() { Some(token) => HeaderMap::from_iter([( diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index f5af9085516..0105683f98e 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -12,6 +12,7 @@ mod filter_list; pub mod json_validator; mod log_layer; mod metrics_layer; +mod parallel_batch_layer; mod request; mod segregation_layer; mod set_extension_layer; @@ -28,6 +29,7 @@ pub use filter_list::FilterList; use futures::FutureExt as _; use jsonrpsee::server::ServerConfig; use log_layer::LogLayer; +use parallel_batch_layer::ParallelBatchLayer; use reflect::Ctx; pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt}; pub use request::Request; @@ -573,18 +575,18 @@ pub async fn start_rpc( let methods: Arc> = Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect()); + let server_config = ServerConfig::builder() + .max_request_body_size(MAX_REQUEST_BODY_SIZE) + // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` + .max_response_body_size(*MAX_RESPONSE_BODY_SIZE) + .max_connections(default_max_connections()) + .set_id_provider(RandomHexStringIdProvider::new()) + .build(); + let max_response_body_size = *MAX_RESPONSE_BODY_SIZE as usize; let per_conn = PerConnection { stop_handle: stop_handle.clone(), svc_builder: Server::builder() - .set_config( - ServerConfig::builder() - .max_request_body_size(MAX_REQUEST_BODY_SIZE) - // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` - .max_response_body_size(*MAX_RESPONSE_BODY_SIZE) - .max_connections(default_max_connections()) - .set_id_provider(RandomHexStringIdProvider::new()) - .build(), - ) + .set_config(server_config.clone()) .set_http_middleware( tower::ServiceBuilder::new() .option_layer(COMPRESS_MIN_BODY_SIZE.map(CompressionLayer::new)) @@ -647,7 +649,9 @@ pub async fn start_rpc( keystore: keystore.clone(), }) .layer(LogLayer::default()) - .layer(MetricsLayer::default()); + .layer(MetricsLayer::default()) + // `ParallelBatchLayer` has to be the last layer + .layer(ParallelBatchLayer::new(max_response_body_size)); let mut jsonrpsee_svc = svc_builder .set_rpc_middleware(rpc_middleware) .build(methods, stop_handle); @@ -801,10 +805,18 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR mod tests { use super::*; use crate::{ - db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion, + db::MemoryDB, + networks::NetworkChain, + rpc::{client::UrlClient, common::ShiftingVersion}, tool::offline_server::server::offline_rpc_state, }; - use jsonrpsee::server::stop_channel; + use jsonrpsee::{ + core::{ + client::{BatchResponse, ClientT}, + params::BatchRequestBuilder, + }, + server::stop_channel, + }; use std::net::{Ipv4Addr, SocketAddr}; use tokio::task::JoinSet; @@ -913,6 +925,29 @@ mod tests { drop(client); + // Sending a batch request + let client = UrlClient::new( + format!("http://{}:{}/rpc/v1", rpc_address.ip(), rpc_address.port()) + .parse() + .unwrap(), + None, + ) + .await + .unwrap(); + let mut batch_request_builder = BatchRequestBuilder::new(); + let empty_payload: [(); 0] = []; + batch_request_builder + .insert("Filecoin.Version", empty_payload) + .unwrap(); + batch_request_builder + .insert("eth_chainId", empty_payload) + .unwrap(); + let batch_response: BatchResponse = + client.batch_request(batch_request_builder).await.unwrap(); + assert_eq!(batch_response.len(), 2); + assert_eq!(batch_response.num_successful_calls(), 2); + assert_eq!(batch_response.num_failed_calls(), 0); + // Gracefully shutdown the RPC server println!("sending shutdown signal"); shutdown_send.send(()).await.unwrap(); diff --git a/src/rpc/parallel_batch_layer.rs b/src/rpc/parallel_batch_layer.rs new file mode 100644 index 00000000000..c367de028d3 --- /dev/null +++ b/src/rpc/parallel_batch_layer.rs @@ -0,0 +1,314 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use std::{borrow::Cow, sync::Arc}; + +use ahash::HashMap; +use jsonrpsee::{ + MethodResponse, + core::middleware::{Batch, BatchEntry, Notification}, + server::{BatchResponseBuilder, middleware::rpc::RpcServiceT}, + types::{ErrorCode, ErrorObject, Id, Request}, +}; +use tokio::task::JoinSet; +use tower::Layer; + +/// Parallelize batch RPC requests across the `tokio` worker pool. +/// +/// jsonrpsee processes batches sequentially by default. The +/// [JSON-RPC spec](https://www.jsonrpc.org/specification#batch) does not +/// require sequential processing or response ordering, but order is +/// preserved here to avoid surprising clients. +#[derive(Clone, derive_more::Constructor)] +pub(super) struct ParallelBatchLayer { + max_response_body_size: usize, +} + +impl Layer for ParallelBatchLayer { + type Service = ParallelBatchService; + + fn layer(&self, service: S) -> Self::Service { + ParallelBatchService { + service: Arc::new(service), + max_response_body_size: self.max_response_body_size, + } + } +} + +#[derive(Clone)] +pub(super) struct ParallelBatchService { + service: Arc, + max_response_body_size: usize, +} + +impl RpcServiceT for ParallelBatchService +where + S: RpcServiceT< + MethodResponse = MethodResponse, + NotificationResponse = MethodResponse, + BatchResponse = MethodResponse, + > + Send + + Sync + + 'static, +{ + type MethodResponse = S::MethodResponse; + type NotificationResponse = S::NotificationResponse; + type BatchResponse = S::BatchResponse; + + fn call<'a>(&self, req: Request<'a>) -> impl Future + Send + 'a { + self.service.call(req) + } + + fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { + let max = self.max_response_body_size; + let mut got_notification = false; + // JoinSet aborts in-flight tasks on drop. + let mut join_set: JoinSet<(usize, Option)> = JoinSet::new(); + // Lets a panicked call task be turned into a per-entry error with the + // original request id. + let mut call_meta: HashMap)> = HashMap::default(); + let mut results: Vec<(usize, Option)> = Vec::new(); + + for (idx, entry) in batch.into_iter().enumerate() { + let service = Arc::clone(&self.service); + match entry { + Ok(BatchEntry::Call(req)) => { + let req_id = req.id().into_owned(); + let req = into_owned_request(req); + let handle = + join_set.spawn(async move { (idx, Some(service.call(req).await)) }); + call_meta.insert(handle.id(), (idx, req_id)); + } + Ok(BatchEntry::Notification(n)) => { + got_notification = true; + let n = into_owned_notification(n); + join_set.spawn(async move { + service.notification(n).await; + (idx, None) + }); + } + Err(err) => { + let (err, id) = err.into_parts(); + results.push(( + idx, + Some(MethodResponse::error(id.into_owned(), err.into_owned())), + )); + } + } + } + + async move { + results.reserve(join_set.len()); + while let Some(joined) = join_set.join_next_with_id().await { + match joined { + Ok((_, r)) => results.push(r), + Err(e) if e.is_panic() => { + if let Some((idx, req_id)) = call_meta.remove(&e.id()) { + tracing::error!(idx, "RPC call panicked in batch entry"); + let err = ErrorObject::owned::<()>( + ErrorCode::InternalError.code(), + "RPC handler panicked", + None, + ); + results.push((idx, Some(MethodResponse::error(req_id, err)))); + } else { + tracing::error!("RPC notification panicked in batch entry"); + } + } + Err(_) => unreachable!("JoinSet only cancels tasks on drop"), + } + } + results.sort_by_key(|(idx, _)| *idx); + + let mut batch_rp = BatchResponseBuilder::new_with_limit(max); + for (_, rp) in results { + if let Some(rp) = rp + && let Err(err) = batch_rp.append(rp) + { + return err; + } + } + + // Empty builder + at least one notification is the spec's + // "no response" case for a notification-only batch. + if batch_rp.is_empty() && got_notification { + MethodResponse::notification() + } else { + MethodResponse::from_batch(batch_rp.finish()) + } + } + } + + fn notification<'a>( + &self, + n: Notification<'a>, + ) -> impl Future + Send + 'a { + self.service.notification(n) + } +} + +fn into_owned_request(req: Request<'_>) -> Request<'static> { + Request { + jsonrpc: req.jsonrpc, + id: req.id.into_owned(), + method: Cow::Owned(req.method.into_owned()), + params: req.params.map(|p| Cow::Owned(p.into_owned())), + extensions: req.extensions, + } +} + +fn into_owned_notification(n: Notification<'_>) -> Notification<'static> { + Notification { + jsonrpc: n.jsonrpc, + method: Cow::Owned(n.method.into_owned()), + params: n.params.map(|p| Cow::Owned(p.into_owned())), + extensions: n.extensions, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use jsonrpsee::core::middleware::BatchEntryErr; + use jsonrpsee::server::ResponsePayload; + use jsonrpsee::types::{Extensions, TwoPointZero}; + use std::time::Duration; + + const MAX_RESP: usize = 1024 * 1024; + + /// Method conventions used by tests: + /// "ok" – success response carrying the method name. + /// "slow:" – sleep, then succeed. + /// "panic" – panic inside the call task. + #[derive(Clone, Default)] + struct TestService; + + impl RpcServiceT for TestService { + type MethodResponse = MethodResponse; + type NotificationResponse = MethodResponse; + type BatchResponse = MethodResponse; + + fn call<'a>( + &self, + req: Request<'a>, + ) -> impl Future + Send + 'a { + let id = req.id().into_owned(); + let method = req.method_name().to_string(); + async move { + if method == "panic" { + panic!("test panic"); + } + if let Some(rest) = method.strip_prefix("slow:") { + let ms: u64 = rest.parse().unwrap(); + tokio::time::sleep(Duration::from_millis(ms)).await; + } + MethodResponse::response(id, ResponsePayload::success(method), MAX_RESP) + } + } + + // `async fn` form drops the explicit `'a` capture the trait wants, + // and the `manual_async_fn` lint fires on trivial `async {}` bodies. + #[expect(clippy::manual_async_fn, reason = "trait demands explicit 'a")] + fn batch<'a>( + &self, + _b: Batch<'a>, + ) -> impl Future + Send + 'a { + async { unreachable!("ParallelBatchLayer overrides this") } + } + + #[expect(clippy::manual_async_fn, reason = "trait demands explicit 'a")] + fn notification<'a>( + &self, + _n: Notification<'a>, + ) -> impl Future + Send + 'a { + async { MethodResponse::notification() } + } + } + + fn layer() -> ParallelBatchService { + ParallelBatchService { + service: Arc::new(TestService), + max_response_body_size: MAX_RESP, + } + } + + fn call(id: u64, method: &str) -> Request<'static> { + Request::owned(method.to_string(), None, Id::Number(id)) + } + + fn notification(method: &str) -> Notification<'static> { + Notification { + jsonrpc: TwoPointZero, + method: Cow::Owned(method.to_string()), + params: None, + extensions: Extensions::new(), + } + } + + fn as_array(rp: &MethodResponse) -> Vec { + serde_json::from_str::>(rp.as_json().get()).unwrap() + } + + #[tokio::test] + async fn preserves_order_under_heterogeneous_latency() { + let svc = layer(); + let batch = Batch::from(vec![ + Ok(BatchEntry::Call(call(1, "slow:50"))), + Ok(BatchEntry::Call(call(2, "ok"))), + Ok(BatchEntry::Call(call(3, "slow:25"))), + ]); + let arr = as_array(&svc.batch(batch).await); + assert_eq!(arr.len(), 3); + assert_eq!(arr[0]["id"], 1); + assert_eq!(arr[1]["id"], 2); + assert_eq!(arr[2]["id"], 3); + } + + #[tokio::test] + async fn panicked_call_yields_per_entry_error() { + let svc = layer(); + let batch = Batch::from(vec![ + Ok(BatchEntry::Call(call(1, "ok"))), + Ok(BatchEntry::Call(call(2, "panic"))), + Ok(BatchEntry::Call(call(3, "ok"))), + ]); + let arr = as_array(&svc.batch(batch).await); + assert_eq!(arr.len(), 3); + assert_eq!(arr[0]["id"], 1); + assert!(arr[0]["result"].is_string(), "first entry should succeed"); + assert_eq!(arr[1]["id"], 2); + assert!( + arr[1]["error"].is_object(), + "panicked entry must carry its own error" + ); + assert_eq!(arr[2]["id"], 3); + assert!(arr[2]["result"].is_string(), "third entry should succeed"); + } + + #[tokio::test] + async fn notification_only_batch_returns_notification() { + let svc = layer(); + let batch = Batch::from(vec![Ok(BatchEntry::Notification(notification("ok")))]); + let resp = svc.batch(batch).await; + assert!(resp.is_notification()); + } + + #[tokio::test] + async fn entry_err_preserves_index() { + let svc = layer(); + let batch = Batch::from(vec![ + Ok(BatchEntry::Call(call(1, "ok"))), + Err(BatchEntryErr::new( + Id::Number(2), + ErrorObject::from(ErrorCode::InvalidRequest), + )), + Ok(BatchEntry::Call(call(3, "ok"))), + ]); + let arr = as_array(&svc.batch(batch).await); + assert_eq!(arr.len(), 3); + assert_eq!(arr[0]["id"], 1); + assert_eq!(arr[1]["id"], 2); + assert!(arr[1]["error"].is_object()); + assert_eq!(arr[2]["id"], 3); + } +}