Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ fn trace_params(params: impl jsonrpsee::core::traits::ToRpcParams) {

/// Represents a single, perhaps persistent connection to a URL over which requests
/// can be made using [`jsonrpsee`] primitives.
struct UrlClient {
pub struct UrlClient {
url: Url,
inner: UrlClientInner,
}
Expand All @@ -190,7 +190,7 @@ impl Debug for UrlClient {
}

impl UrlClient {
async fn new(url: Url, token: impl Into<Option<String>>) -> Result<Self, ClientError> {
pub async fn new(url: Url, token: impl Into<Option<String>>) -> Result<Self, ClientError> {
const ONE_DAY: Duration = Duration::from_secs(24 * 3600); // we handle timeouts ourselves.
let headers = match token.into() {
Some(token) => HeaderMap::from_iter([(
Expand Down
59 changes: 47 additions & 12 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod filter_list;
pub mod json_validator;
mod log_layer;
mod metrics_layer;
mod parallel_batch_layer;
mod request;
mod segregation_layer;
mod set_extension_layer;
Expand All @@ -28,6 +29,7 @@ pub use filter_list::FilterList;
use futures::FutureExt as _;
use jsonrpsee::server::ServerConfig;
use log_layer::LogLayer;
use parallel_batch_layer::ParallelBatchLayer;
use reflect::Ctx;
pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt};
pub use request::Request;
Expand Down Expand Up @@ -573,18 +575,18 @@ pub async fn start_rpc(
let methods: Arc<HashMap<ApiPaths, Methods>> =
Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());

let server_config = ServerConfig::builder()
.max_request_body_size(MAX_REQUEST_BODY_SIZE)
// Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
.max_response_body_size(*MAX_RESPONSE_BODY_SIZE)
.max_connections(default_max_connections())
.set_id_provider(RandomHexStringIdProvider::new())
.build();
let max_response_body_size = *MAX_RESPONSE_BODY_SIZE as usize;
let per_conn = PerConnection {
stop_handle: stop_handle.clone(),
svc_builder: Server::builder()
.set_config(
ServerConfig::builder()
.max_request_body_size(MAX_REQUEST_BODY_SIZE)
// Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
.max_response_body_size(*MAX_RESPONSE_BODY_SIZE)
.max_connections(default_max_connections())
.set_id_provider(RandomHexStringIdProvider::new())
.build(),
)
.set_config(server_config.clone())
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(COMPRESS_MIN_BODY_SIZE.map(CompressionLayer::new))
Expand Down Expand Up @@ -647,7 +649,9 @@ pub async fn start_rpc(
keystore: keystore.clone(),
})
.layer(LogLayer::default())
.layer(MetricsLayer::default());
.layer(MetricsLayer::default())
// `ParallelBatchLayer` has to be the last layer
.layer(ParallelBatchLayer::new(max_response_body_size));
let mut jsonrpsee_svc = svc_builder
.set_rpc_middleware(rpc_middleware)
.build(methods, stop_handle);
Expand Down Expand Up @@ -801,10 +805,18 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR
mod tests {
use super::*;
use crate::{
db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion,
db::MemoryDB,
networks::NetworkChain,
rpc::{client::UrlClient, common::ShiftingVersion},
tool::offline_server::server::offline_rpc_state,
};
use jsonrpsee::server::stop_channel;
use jsonrpsee::{
core::{
client::{BatchResponse, ClientT},
params::BatchRequestBuilder,
},
server::stop_channel,
};
use std::net::{Ipv4Addr, SocketAddr};
use tokio::task::JoinSet;

Expand Down Expand Up @@ -913,6 +925,29 @@ mod tests {

drop(client);

// Sending a batch request
let client = UrlClient::new(
format!("http://{}:{}/rpc/v1", rpc_address.ip(), rpc_address.port())
.parse()
.unwrap(),
None,
)
.await
.unwrap();
let mut batch_request_builder = BatchRequestBuilder::new();
let empty_payload: [(); 0] = [];
batch_request_builder
.insert("Filecoin.Version", empty_payload)
.unwrap();
batch_request_builder
.insert("eth_chainId", empty_payload)
.unwrap();
let batch_response: BatchResponse<serde_json::Value> =
client.batch_request(batch_request_builder).await.unwrap();
assert_eq!(batch_response.len(), 2);
assert_eq!(batch_response.num_successful_calls(), 2);
assert_eq!(batch_response.num_failed_calls(), 0);

// Gracefully shutdown the RPC server
println!("sending shutdown signal");
shutdown_send.send(()).await.unwrap();
Expand Down
Loading
Loading