From df7758be183951fa8664f57731f65bc7cce88f25 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 22 May 2026 00:30:18 +0800 Subject: [PATCH 1/6] fix: process batch RPC request in parallel --- src/rpc/mod.rs | 24 ++++--- src/rpc/parallel_batch_layer.rs | 118 ++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 10 deletions(-) create mode 100644 src/rpc/parallel_batch_layer.rs diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index f5af90855161..cf4f9f4ed2a4 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -12,6 +12,7 @@ mod filter_list; pub mod json_validator; mod log_layer; mod metrics_layer; +mod parallel_batch_layer; mod request; mod segregation_layer; mod set_extension_layer; @@ -28,6 +29,7 @@ pub use filter_list::FilterList; use futures::FutureExt as _; use jsonrpsee::server::ServerConfig; use log_layer::LogLayer; +use parallel_batch_layer::ParallelBatchLayer; use reflect::Ctx; pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt}; pub use request::Request; @@ -573,18 +575,18 @@ pub async fn start_rpc( let methods: Arc> = Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect()); + let server_config = ServerConfig::builder() + .max_request_body_size(MAX_REQUEST_BODY_SIZE) + // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` + .max_response_body_size(*MAX_RESPONSE_BODY_SIZE) + .max_connections(default_max_connections()) + .set_id_provider(RandomHexStringIdProvider::new()) + .build(); + let max_response_body_size = *MAX_RESPONSE_BODY_SIZE as usize; let per_conn = PerConnection { stop_handle: stop_handle.clone(), svc_builder: Server::builder() - .set_config( - ServerConfig::builder() - .max_request_body_size(MAX_REQUEST_BODY_SIZE) - // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` - .max_response_body_size(*MAX_RESPONSE_BODY_SIZE) - .max_connections(default_max_connections()) - .set_id_provider(RandomHexStringIdProvider::new()) - .build(), - ) + .set_config(server_config.clone()) .set_http_middleware( tower::ServiceBuilder::new() .option_layer(COMPRESS_MIN_BODY_SIZE.map(CompressionLayer::new)) @@ -647,7 +649,9 @@ pub async fn start_rpc( keystore: keystore.clone(), }) .layer(LogLayer::default()) - .layer(MetricsLayer::default()); + .layer(MetricsLayer::default()) + // `ParallelBatchLayer` has to be the last layer + .layer(ParallelBatchLayer::new(max_response_body_size)); let mut jsonrpsee_svc = svc_builder .set_rpc_middleware(rpc_middleware) .build(methods, stop_handle); diff --git a/src/rpc/parallel_batch_layer.rs b/src/rpc/parallel_batch_layer.rs new file mode 100644 index 000000000000..329f97a9a00b --- /dev/null +++ b/src/rpc/parallel_batch_layer.rs @@ -0,0 +1,118 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::prelude::*; +use futures::{FutureExt, StreamExt, stream::FuturesOrdered}; +use jsonrpsee::{ + MethodResponse, + core::middleware::{Batch, BatchEntry, Notification}, + server::{BatchResponseBuilder, middleware::rpc::RpcServiceT}, +}; +use tower::Layer; + +/// Parallelize batch RPC requests that are processed in sequence by default +/// See +/// +/// Note that such parallelization is allowed as per the [`JSON-RPC` specification](https://www.jsonrpc.org/specification#:~:text=6%20Batch) +#[derive(Clone, derive_more::Constructor)] +pub(super) struct ParallelBatchLayer { + max_response_body_size: usize, +} + +impl Layer for ParallelBatchLayer { + type Service = ParallelBatchService; + + fn layer(&self, service: S) -> Self::Service { + ParallelBatchService { + service: service.into(), + max_response_body_size: self.max_response_body_size, + } + } +} + +#[derive(Clone)] +pub(super) struct ParallelBatchService { + service: Arc, + max_response_body_size: usize, +} + +impl RpcServiceT for ParallelBatchService +where + S: RpcServiceT< + MethodResponse = MethodResponse, + NotificationResponse = MethodResponse, + BatchResponse = MethodResponse, + > + Send + + Sync + + 'static, +{ + type MethodResponse = S::MethodResponse; + type NotificationResponse = S::NotificationResponse; + type BatchResponse = S::BatchResponse; + + fn call<'a>( + &self, + req: jsonrpsee::types::Request<'a>, + ) -> impl Future + Send + 'a { + self.service.call(req) + } + + // Parallelized version of https://github.com/paritytech/jsonrpsee/blob/v0.26.0/server/src/middleware/rpc.rs#L151 + fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { + // Process batch in parallel instead of delegating to the inner service, which processes them sequentially. + let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size); + let mut got_notification = false; + // Although it's not neccesary to perserve the order in response, we do it to avoid potential bugs on client side + // See + let mut tasks = FuturesOrdered::new(); + for batch_entry in batch.into_iter() { + match batch_entry { + Ok(BatchEntry::Call(req)) => { + tasks.push_back(self.service.call(req).map(Some).boxed()); + } + Ok(BatchEntry::Notification(n)) => { + got_notification = true; + let service = self.service.shallow_clone(); + tasks.push_back( + async move { + service.notification(n).await; + None + } + .boxed(), + ); + } + Err(err) => { + let (err, id) = err.into_parts(); + let rp = MethodResponse::error(id, err); + tasks.push_back(async move { Some(rp) }.boxed()); + } + } + } + + async move { + while let Some(r) = tasks.next().await { + if let Some(rp) = r + && let Err(err) = batch_rp.append(rp) + { + return err; + } + } + + // If the batch is empty and we got a notification, we return an empty response. + if batch_rp.is_empty() && got_notification { + MethodResponse::notification() + } + // An empty batch is regarded as an invalid request here. + else { + MethodResponse::from_batch(batch_rp.finish()) + } + } + } + + fn notification<'a>( + &self, + n: Notification<'a>, + ) -> impl Future + Send + 'a { + self.service.notification(n) + } +} From bf427dfaf554f5eae177dc6d28008a2dedbc797b Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 22 May 2026 01:09:48 +0800 Subject: [PATCH 2/6] FOREST_RPC_BATCH_MAX_CONCURRENCY --- docs/docs/users/reference/env_variables.md | 1 + src/rpc/parallel_batch_layer.rs | 29 ++++++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/docs/docs/users/reference/env_variables.md b/docs/docs/users/reference/env_variables.md index fa0fba485157..a7e59646fe26 100644 --- a/docs/docs/users/reference/env_variables.md +++ b/docs/docs/users/reference/env_variables.md @@ -69,6 +69,7 @@ process. | `FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS` | positive integer | 32 | 32 | Maximum number of inbound chain exchange requests Forest will service concurrently. Excess requests are rejected with a `GoAway` response | | `FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER` | positive integer | 4 | 4 | Per-peer cap on concurrent inbound chain exchange requests. Excess requests from a single peer are rejected with a `GoAway` response | | `FOREST_MAX_OUTBOUND_CHAIN_EXCHANGE_RESPONSE_BYTES` | positive integer (bytes) | 10485760 (10 MiB) | 10485760 | Cap on the encoded byte size of a chain exchange response Forest serves to peers. Building stops as soon as the running encoded size would exceed this cap and the response is returned with `PartialResponse` status | +| `FOREST_RPC_BATCH_MAX_CONCURRENCY` | positive integer | 8 | 8 | max number of entries in an RPC batch request that can be processed in parallel | ### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT` diff --git a/src/rpc/parallel_batch_layer.rs b/src/rpc/parallel_batch_layer.rs index 329f97a9a00b..69ee292ab901 100644 --- a/src/rpc/parallel_batch_layer.rs +++ b/src/rpc/parallel_batch_layer.rs @@ -1,6 +1,8 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use std::{num::NonZeroUsize, sync::LazyLock}; + use crate::prelude::*; use futures::{FutureExt, StreamExt, stream::FuturesOrdered}; use jsonrpsee::{ @@ -8,6 +10,8 @@ use jsonrpsee::{ core::middleware::{Batch, BatchEntry, Notification}, server::{BatchResponseBuilder, middleware::rpc::RpcServiceT}, }; +use nonzero_ext::nonzero; +use tokio::sync::Semaphore; use tower::Layer; /// Parallelize batch RPC requests that are processed in sequence by default @@ -60,21 +64,42 @@ where // Parallelized version of https://github.com/paritytech/jsonrpsee/blob/v0.26.0/server/src/middleware/rpc.rs#L151 fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { // Process batch in parallel instead of delegating to the inner service, which processes them sequentially. + const MAX_CONCURRENCY_ENV: &str = "FOREST_RPC_BATCH_MAX_CONCURRENCY"; + static MAX_CONCURRENCY: LazyLock = LazyLock::new(|| { + std::env::var(MAX_CONCURRENCY_ENV) + .ok() + .and_then(|i| i.parse().ok()) + .inspect(|i| { + tracing::info!( + "Max RPC batch concurrency is set to {i} by {MAX_CONCURRENCY_ENV}" + ) + }) + .unwrap_or(nonzero!(8usize)) + }); + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENCY.get())); let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size); let mut got_notification = false; // Although it's not neccesary to perserve the order in response, we do it to avoid potential bugs on client side // See let mut tasks = FuturesOrdered::new(); for batch_entry in batch.into_iter() { + let service = self.service.shallow_clone(); + let semaphore = semaphore.shallow_clone(); match batch_entry { Ok(BatchEntry::Call(req)) => { - tasks.push_back(self.service.call(req).map(Some).boxed()); + tasks.push_back( + async move { + let _permit = semaphore.acquire().await; + service.call(req).map(Some).await + } + .boxed(), + ); } Ok(BatchEntry::Notification(n)) => { got_notification = true; - let service = self.service.shallow_clone(); tasks.push_back( async move { + let _permit = semaphore.acquire().await; service.notification(n).await; None } From 5628ce04385d42c0b48363c8faf1d89e9d8005a9 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 22 May 2026 01:11:23 +0800 Subject: [PATCH 3/6] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0b49e7e9787..35d28e00c6b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,8 @@ - [`#7066`](https://github.com/ChainSafe/forest/pull/7066): Disable JSON-RPC HTTP response compression by default. Set `FOREST_RPC_COMPRESS_MIN_BODY_SIZE` to a non-negative value (e.g. `1024`) to re-enable gzip compression of responses above that size. +- [`#7093`](https://github.com/ChainSafe/forest/pull/7093): Parallelize RPC batch request processing. + ### Removed ### Fixed From c86a3309e6d146862caa601c9f7246e8c3ae4d6e Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 22 May 2026 01:57:22 +0800 Subject: [PATCH 4/6] cover batch in unit test --- src/rpc/client.rs | 4 ++-- src/rpc/mod.rs | 35 +++++++++++++++++++++++++++++++++-- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 08e8ac8fb1e0..287fd4fd16bf 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -176,7 +176,7 @@ fn trace_params(params: impl jsonrpsee::core::traits::ToRpcParams) { /// Represents a single, perhaps persistent connection to a URL over which requests /// can be made using [`jsonrpsee`] primitives. -struct UrlClient { +pub struct UrlClient { url: Url, inner: UrlClientInner, } @@ -190,7 +190,7 @@ impl Debug for UrlClient { } impl UrlClient { - async fn new(url: Url, token: impl Into>) -> Result { + pub async fn new(url: Url, token: impl Into>) -> Result { const ONE_DAY: Duration = Duration::from_secs(24 * 3600); // we handle timeouts ourselves. let headers = match token.into() { Some(token) => HeaderMap::from_iter([( diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index cf4f9f4ed2a4..0105683f98e5 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -805,10 +805,18 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR mod tests { use super::*; use crate::{ - db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion, + db::MemoryDB, + networks::NetworkChain, + rpc::{client::UrlClient, common::ShiftingVersion}, tool::offline_server::server::offline_rpc_state, }; - use jsonrpsee::server::stop_channel; + use jsonrpsee::{ + core::{ + client::{BatchResponse, ClientT}, + params::BatchRequestBuilder, + }, + server::stop_channel, + }; use std::net::{Ipv4Addr, SocketAddr}; use tokio::task::JoinSet; @@ -917,6 +925,29 @@ mod tests { drop(client); + // Sending a batch request + let client = UrlClient::new( + format!("http://{}:{}/rpc/v1", rpc_address.ip(), rpc_address.port()) + .parse() + .unwrap(), + None, + ) + .await + .unwrap(); + let mut batch_request_builder = BatchRequestBuilder::new(); + let empty_payload: [(); 0] = []; + batch_request_builder + .insert("Filecoin.Version", empty_payload) + .unwrap(); + batch_request_builder + .insert("eth_chainId", empty_payload) + .unwrap(); + let batch_response: BatchResponse = + client.batch_request(batch_request_builder).await.unwrap(); + assert_eq!(batch_response.len(), 2); + assert_eq!(batch_response.num_successful_calls(), 2); + assert_eq!(batch_response.num_failed_calls(), 0); + // Gracefully shutdown the RPC server println!("sending shutdown signal"); shutdown_send.send(()).await.unwrap(); From d274cea8130fc5028f23d2bd5ec91596ec0c3aa7 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 22 May 2026 20:56:44 +0800 Subject: [PATCH 5/6] no FOREST_RPC_BATCH_MAX_CONCURRENCY --- CHANGELOG.md | 2 -- docs/docs/users/reference/env_variables.md | 1 - src/rpc/parallel_batch_layer.rs | 41 +++------------------- 3 files changed, 4 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35d28e00c6b5..e0b49e7e9787 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,8 +37,6 @@ - [`#7066`](https://github.com/ChainSafe/forest/pull/7066): Disable JSON-RPC HTTP response compression by default. Set `FOREST_RPC_COMPRESS_MIN_BODY_SIZE` to a non-negative value (e.g. `1024`) to re-enable gzip compression of responses above that size. -- [`#7093`](https://github.com/ChainSafe/forest/pull/7093): Parallelize RPC batch request processing. - ### Removed ### Fixed diff --git a/docs/docs/users/reference/env_variables.md b/docs/docs/users/reference/env_variables.md index a7e59646fe26..fa0fba485157 100644 --- a/docs/docs/users/reference/env_variables.md +++ b/docs/docs/users/reference/env_variables.md @@ -69,7 +69,6 @@ process. | `FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS` | positive integer | 32 | 32 | Maximum number of inbound chain exchange requests Forest will service concurrently. Excess requests are rejected with a `GoAway` response | | `FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER` | positive integer | 4 | 4 | Per-peer cap on concurrent inbound chain exchange requests. Excess requests from a single peer are rejected with a `GoAway` response | | `FOREST_MAX_OUTBOUND_CHAIN_EXCHANGE_RESPONSE_BYTES` | positive integer (bytes) | 10485760 (10 MiB) | 10485760 | Cap on the encoded byte size of a chain exchange response Forest serves to peers. Building stops as soon as the running encoded size would exceed this cap and the response is returned with `PartialResponse` status | -| `FOREST_RPC_BATCH_MAX_CONCURRENCY` | positive integer | 8 | 8 | max number of entries in an RPC batch request that can be processed in parallel | ### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT` diff --git a/src/rpc/parallel_batch_layer.rs b/src/rpc/parallel_batch_layer.rs index 69ee292ab901..1c55217c0322 100644 --- a/src/rpc/parallel_batch_layer.rs +++ b/src/rpc/parallel_batch_layer.rs @@ -1,17 +1,12 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::{num::NonZeroUsize, sync::LazyLock}; - -use crate::prelude::*; use futures::{FutureExt, StreamExt, stream::FuturesOrdered}; use jsonrpsee::{ MethodResponse, core::middleware::{Batch, BatchEntry, Notification}, server::{BatchResponseBuilder, middleware::rpc::RpcServiceT}, }; -use nonzero_ext::nonzero; -use tokio::sync::Semaphore; use tower::Layer; /// Parallelize batch RPC requests that are processed in sequence by default @@ -28,7 +23,7 @@ impl Layer for ParallelBatchLayer { fn layer(&self, service: S) -> Self::Service { ParallelBatchService { - service: service.into(), + service, max_response_body_size: self.max_response_body_size, } } @@ -36,7 +31,7 @@ impl Layer for ParallelBatchLayer { #[derive(Clone)] pub(super) struct ParallelBatchService { - service: Arc, + service: S, max_response_body_size: usize, } @@ -64,47 +59,19 @@ where // Parallelized version of https://github.com/paritytech/jsonrpsee/blob/v0.26.0/server/src/middleware/rpc.rs#L151 fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { // Process batch in parallel instead of delegating to the inner service, which processes them sequentially. - const MAX_CONCURRENCY_ENV: &str = "FOREST_RPC_BATCH_MAX_CONCURRENCY"; - static MAX_CONCURRENCY: LazyLock = LazyLock::new(|| { - std::env::var(MAX_CONCURRENCY_ENV) - .ok() - .and_then(|i| i.parse().ok()) - .inspect(|i| { - tracing::info!( - "Max RPC batch concurrency is set to {i} by {MAX_CONCURRENCY_ENV}" - ) - }) - .unwrap_or(nonzero!(8usize)) - }); - let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENCY.get())); let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size); let mut got_notification = false; // Although it's not neccesary to perserve the order in response, we do it to avoid potential bugs on client side // See let mut tasks = FuturesOrdered::new(); for batch_entry in batch.into_iter() { - let service = self.service.shallow_clone(); - let semaphore = semaphore.shallow_clone(); match batch_entry { Ok(BatchEntry::Call(req)) => { - tasks.push_back( - async move { - let _permit = semaphore.acquire().await; - service.call(req).map(Some).await - } - .boxed(), - ); + tasks.push_back(self.service.call(req).map(Some).boxed()); } Ok(BatchEntry::Notification(n)) => { got_notification = true; - tasks.push_back( - async move { - let _permit = semaphore.acquire().await; - service.notification(n).await; - None - } - .boxed(), - ); + tasks.push_back(self.service.notification(n).map(|_| None).boxed()); } Err(err) => { let (err, id) = err.into_parts(); From 4697043c3deb66f734667b6c262c3009c5beefb5 Mon Sep 17 00:00:00 2001 From: Hubert Date: Mon, 25 May 2026 14:20:46 +0200 Subject: [PATCH 6/6] feat: parallelism in batches (#7101) --- src/rpc/parallel_batch_layer.rs | 260 ++++++++++++++++++++++++++++---- 1 file changed, 232 insertions(+), 28 deletions(-) diff --git a/src/rpc/parallel_batch_layer.rs b/src/rpc/parallel_batch_layer.rs index 1c55217c0322..c367de028d3f 100644 --- a/src/rpc/parallel_batch_layer.rs +++ b/src/rpc/parallel_batch_layer.rs @@ -1,18 +1,24 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use futures::{FutureExt, StreamExt, stream::FuturesOrdered}; +use std::{borrow::Cow, sync::Arc}; + +use ahash::HashMap; use jsonrpsee::{ MethodResponse, core::middleware::{Batch, BatchEntry, Notification}, server::{BatchResponseBuilder, middleware::rpc::RpcServiceT}, + types::{ErrorCode, ErrorObject, Id, Request}, }; +use tokio::task::JoinSet; use tower::Layer; -/// Parallelize batch RPC requests that are processed in sequence by default -/// See +/// Parallelize batch RPC requests across the `tokio` worker pool. /// -/// Note that such parallelization is allowed as per the [`JSON-RPC` specification](https://www.jsonrpc.org/specification#:~:text=6%20Batch) +/// jsonrpsee processes batches sequentially by default. The +/// [JSON-RPC spec](https://www.jsonrpc.org/specification#batch) does not +/// require sequential processing or response ordering, but order is +/// preserved here to avoid surprising clients. #[derive(Clone, derive_more::Constructor)] pub(super) struct ParallelBatchLayer { max_response_body_size: usize, @@ -23,7 +29,7 @@ impl Layer for ParallelBatchLayer { fn layer(&self, service: S) -> Self::Service { ParallelBatchService { - service, + service: Arc::new(service), max_response_body_size: self.max_response_body_size, } } @@ -31,7 +37,7 @@ impl Layer for ParallelBatchLayer { #[derive(Clone)] pub(super) struct ParallelBatchService { - service: S, + service: Arc, max_response_body_size: usize, } @@ -49,53 +55,85 @@ where type NotificationResponse = S::NotificationResponse; type BatchResponse = S::BatchResponse; - fn call<'a>( - &self, - req: jsonrpsee::types::Request<'a>, - ) -> impl Future + Send + 'a { + fn call<'a>(&self, req: Request<'a>) -> impl Future + Send + 'a { self.service.call(req) } - // Parallelized version of https://github.com/paritytech/jsonrpsee/blob/v0.26.0/server/src/middleware/rpc.rs#L151 fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { - // Process batch in parallel instead of delegating to the inner service, which processes them sequentially. - let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size); + let max = self.max_response_body_size; let mut got_notification = false; - // Although it's not neccesary to perserve the order in response, we do it to avoid potential bugs on client side - // See - let mut tasks = FuturesOrdered::new(); - for batch_entry in batch.into_iter() { - match batch_entry { + // JoinSet aborts in-flight tasks on drop. + let mut join_set: JoinSet<(usize, Option)> = JoinSet::new(); + // Lets a panicked call task be turned into a per-entry error with the + // original request id. + let mut call_meta: HashMap)> = HashMap::default(); + let mut results: Vec<(usize, Option)> = Vec::new(); + + for (idx, entry) in batch.into_iter().enumerate() { + let service = Arc::clone(&self.service); + match entry { Ok(BatchEntry::Call(req)) => { - tasks.push_back(self.service.call(req).map(Some).boxed()); + let req_id = req.id().into_owned(); + let req = into_owned_request(req); + let handle = + join_set.spawn(async move { (idx, Some(service.call(req).await)) }); + call_meta.insert(handle.id(), (idx, req_id)); } Ok(BatchEntry::Notification(n)) => { got_notification = true; - tasks.push_back(self.service.notification(n).map(|_| None).boxed()); + let n = into_owned_notification(n); + join_set.spawn(async move { + service.notification(n).await; + (idx, None) + }); } Err(err) => { let (err, id) = err.into_parts(); - let rp = MethodResponse::error(id, err); - tasks.push_back(async move { Some(rp) }.boxed()); + results.push(( + idx, + Some(MethodResponse::error(id.into_owned(), err.into_owned())), + )); } } } async move { - while let Some(r) = tasks.next().await { - if let Some(rp) = r + results.reserve(join_set.len()); + while let Some(joined) = join_set.join_next_with_id().await { + match joined { + Ok((_, r)) => results.push(r), + Err(e) if e.is_panic() => { + if let Some((idx, req_id)) = call_meta.remove(&e.id()) { + tracing::error!(idx, "RPC call panicked in batch entry"); + let err = ErrorObject::owned::<()>( + ErrorCode::InternalError.code(), + "RPC handler panicked", + None, + ); + results.push((idx, Some(MethodResponse::error(req_id, err)))); + } else { + tracing::error!("RPC notification panicked in batch entry"); + } + } + Err(_) => unreachable!("JoinSet only cancels tasks on drop"), + } + } + results.sort_by_key(|(idx, _)| *idx); + + let mut batch_rp = BatchResponseBuilder::new_with_limit(max); + for (_, rp) in results { + if let Some(rp) = rp && let Err(err) = batch_rp.append(rp) { return err; } } - // If the batch is empty and we got a notification, we return an empty response. + // Empty builder + at least one notification is the spec's + // "no response" case for a notification-only batch. if batch_rp.is_empty() && got_notification { MethodResponse::notification() - } - // An empty batch is regarded as an invalid request here. - else { + } else { MethodResponse::from_batch(batch_rp.finish()) } } @@ -108,3 +146,169 @@ where self.service.notification(n) } } + +fn into_owned_request(req: Request<'_>) -> Request<'static> { + Request { + jsonrpc: req.jsonrpc, + id: req.id.into_owned(), + method: Cow::Owned(req.method.into_owned()), + params: req.params.map(|p| Cow::Owned(p.into_owned())), + extensions: req.extensions, + } +} + +fn into_owned_notification(n: Notification<'_>) -> Notification<'static> { + Notification { + jsonrpc: n.jsonrpc, + method: Cow::Owned(n.method.into_owned()), + params: n.params.map(|p| Cow::Owned(p.into_owned())), + extensions: n.extensions, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use jsonrpsee::core::middleware::BatchEntryErr; + use jsonrpsee::server::ResponsePayload; + use jsonrpsee::types::{Extensions, TwoPointZero}; + use std::time::Duration; + + const MAX_RESP: usize = 1024 * 1024; + + /// Method conventions used by tests: + /// "ok" – success response carrying the method name. + /// "slow:" – sleep, then succeed. + /// "panic" – panic inside the call task. + #[derive(Clone, Default)] + struct TestService; + + impl RpcServiceT for TestService { + type MethodResponse = MethodResponse; + type NotificationResponse = MethodResponse; + type BatchResponse = MethodResponse; + + fn call<'a>( + &self, + req: Request<'a>, + ) -> impl Future + Send + 'a { + let id = req.id().into_owned(); + let method = req.method_name().to_string(); + async move { + if method == "panic" { + panic!("test panic"); + } + if let Some(rest) = method.strip_prefix("slow:") { + let ms: u64 = rest.parse().unwrap(); + tokio::time::sleep(Duration::from_millis(ms)).await; + } + MethodResponse::response(id, ResponsePayload::success(method), MAX_RESP) + } + } + + // `async fn` form drops the explicit `'a` capture the trait wants, + // and the `manual_async_fn` lint fires on trivial `async {}` bodies. + #[expect(clippy::manual_async_fn, reason = "trait demands explicit 'a")] + fn batch<'a>( + &self, + _b: Batch<'a>, + ) -> impl Future + Send + 'a { + async { unreachable!("ParallelBatchLayer overrides this") } + } + + #[expect(clippy::manual_async_fn, reason = "trait demands explicit 'a")] + fn notification<'a>( + &self, + _n: Notification<'a>, + ) -> impl Future + Send + 'a { + async { MethodResponse::notification() } + } + } + + fn layer() -> ParallelBatchService { + ParallelBatchService { + service: Arc::new(TestService), + max_response_body_size: MAX_RESP, + } + } + + fn call(id: u64, method: &str) -> Request<'static> { + Request::owned(method.to_string(), None, Id::Number(id)) + } + + fn notification(method: &str) -> Notification<'static> { + Notification { + jsonrpc: TwoPointZero, + method: Cow::Owned(method.to_string()), + params: None, + extensions: Extensions::new(), + } + } + + fn as_array(rp: &MethodResponse) -> Vec { + serde_json::from_str::>(rp.as_json().get()).unwrap() + } + + #[tokio::test] + async fn preserves_order_under_heterogeneous_latency() { + let svc = layer(); + let batch = Batch::from(vec![ + Ok(BatchEntry::Call(call(1, "slow:50"))), + Ok(BatchEntry::Call(call(2, "ok"))), + Ok(BatchEntry::Call(call(3, "slow:25"))), + ]); + let arr = as_array(&svc.batch(batch).await); + assert_eq!(arr.len(), 3); + assert_eq!(arr[0]["id"], 1); + assert_eq!(arr[1]["id"], 2); + assert_eq!(arr[2]["id"], 3); + } + + #[tokio::test] + async fn panicked_call_yields_per_entry_error() { + let svc = layer(); + let batch = Batch::from(vec![ + Ok(BatchEntry::Call(call(1, "ok"))), + Ok(BatchEntry::Call(call(2, "panic"))), + Ok(BatchEntry::Call(call(3, "ok"))), + ]); + let arr = as_array(&svc.batch(batch).await); + assert_eq!(arr.len(), 3); + assert_eq!(arr[0]["id"], 1); + assert!(arr[0]["result"].is_string(), "first entry should succeed"); + assert_eq!(arr[1]["id"], 2); + assert!( + arr[1]["error"].is_object(), + "panicked entry must carry its own error" + ); + assert_eq!(arr[2]["id"], 3); + assert!(arr[2]["result"].is_string(), "third entry should succeed"); + } + + #[tokio::test] + async fn notification_only_batch_returns_notification() { + let svc = layer(); + let batch = Batch::from(vec![Ok(BatchEntry::Notification(notification("ok")))]); + let resp = svc.batch(batch).await; + assert!(resp.is_notification()); + } + + #[tokio::test] + async fn entry_err_preserves_index() { + let svc = layer(); + let batch = Batch::from(vec![ + Ok(BatchEntry::Call(call(1, "ok"))), + Err(BatchEntryErr::new( + Id::Number(2), + ErrorObject::from(ErrorCode::InvalidRequest), + )), + Ok(BatchEntry::Call(call(3, "ok"))), + ]); + let arr = as_array(&svc.batch(batch).await); + assert_eq!(arr.len(), 3); + assert_eq!(arr[0]["id"], 1); + assert_eq!(arr[1]["id"], 2); + assert!(arr[1]["error"].is_object()); + assert_eq!(arr[2]["id"], 3); + } +}