diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index b2991c4a5d11..cbb8f7758b30 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -44,7 +44,6 @@ use std::sync::OnceLock; use std::time::{Duration, Instant}; use tokio::sync::broadcast::error::RecvError; use tokio::{ - net::TcpListener, signal::{ ctrl_c, unix::{SignalKind, signal}, @@ -226,10 +225,8 @@ async fn maybe_start_metrics_service( ctx: &AppContext, ) -> anyhow::Result<()> { if config.client.enable_metrics_endpoint { - // Start Prometheus server port - let prometheus_listener = TcpListener::bind(config.client.metrics_address) - .await - .with_context(|| format!("could not bind to {}", config.client.metrics_address))?; + let prometheus_listener = + crate::utils::net::bind_tcp_listener(config.client.metrics_address, 0).await?; info!( "Prometheus server started at {}", config.client.metrics_address @@ -382,7 +379,7 @@ async fn maybe_start_health_check_service( }; let healthcheck_address = forest_state.config.client.healthcheck_address; info!("Healthcheck endpoint will listen at {healthcheck_address}"); - let listener = tokio::net::TcpListener::bind(healthcheck_address).await?; + let listener = crate::utils::net::bind_tcp_listener(healthcheck_address, 0).await?; services.spawn(async move { crate::health::init_healthcheck_server(forest_state, listener) .await @@ -472,12 +469,11 @@ fn maybe_start_rpc_service( let mpool_locker = MpoolLocker::new(); let temp_dir = Arc::new(ctx.temp_dir.clone()); async move { - let rpc_listener = tokio::net::TcpListener::bind(rpc_address) - .await - .map_err(|e| { - anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}") - }) - .unwrap(); + let rpc_listener = crate::utils::net::bind_tcp_listener( + rpc_address, + crate::rpc::default_max_connections(), + ) + .await?; start_rpc( RPCState { state_manager, diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 1d56d72c5310..271349fbbff4 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -458,14 +458,20 @@ static DEFAULT_REQUEST_TIMEOUT: LazyLock = LazyLock::new(|| { .unwrap_or(Duration::from_secs(60)) }); -/// Default maximum connections for the RPC server. This needs to be high enough to -/// accommodate the regular usage for RPC providers. -static DEFAULT_MAX_CONNECTIONS: LazyLock = LazyLock::new(|| { - env::var("FOREST_RPC_MAX_CONNECTIONS") - .ok() - .and_then(|it| it.parse().ok()) - .unwrap_or(1000) -}); +/// Maximum concurrent connections accepted by the RPC server. +/// +/// Configurable via `FOREST_RPC_MAX_CONNECTIONS`. The value also bounds the +/// TCP listen backlog so that bursts of connection attempts do not get +/// silently dropped by the kernel. +pub fn default_max_connections() -> u32 { + static VALUE: LazyLock = LazyLock::new(|| { + env::var("FOREST_RPC_MAX_CONNECTIONS") + .ok() + .and_then(|it| it.parse().ok()) + .unwrap_or(1000) + }); + *VALUE +} const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024; const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE; @@ -568,7 +574,7 @@ where // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` .max_request_body_size(MAX_REQUEST_BODY_SIZE) .max_response_body_size(MAX_RESPONSE_BODY_SIZE) - .max_connections(*DEFAULT_MAX_CONNECTIONS) + .max_connections(default_max_connections()) .set_id_provider(RandomHexStringIdProvider::new()) .build(), ) diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 4d99a6c6eba0..87ff4c75b4f1 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -232,7 +232,9 @@ where { info!("Starting offline RPC Server"); let rpc_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), rpc_port); - let rpc_listener = tokio::net::TcpListener::bind(rpc_address).await?; + let rpc_listener = + crate::utils::net::bind_tcp_listener(rpc_address, crate::rpc::default_max_connections()) + .await?; let mut terminate = signal(SignalKind::terminate())?; let (stop_handle, server_handle) = stop_channel(); let result = tokio::select! { diff --git a/src/utils/net.rs b/src/utils/net.rs index 240ea11a1c08..9737cdb9ff5f 100644 --- a/src/utils/net.rs +++ b/src/utils/net.rs @@ -6,13 +6,16 @@ pub use download_file::*; use crate::utils::io::WithProgress; use crate::utils::reqwest_resume; +use anyhow::Context as _; use cid::Cid; use futures::{AsyncWriteExt, TryStreamExt}; use reqwest::Response; +use std::net::SocketAddr; use std::path::Path; use std::sync::{Arc, LazyLock}; use tap::Pipe; use tokio::io::AsyncBufRead; +use tokio::net::TcpListener; use tokio_util::{ compat::TokioAsyncReadCompatExt, either::Either::{Left, Right}, @@ -20,6 +23,36 @@ use tokio_util::{ use tracing::info; use url::Url; +/// Minimum listen backlog applied by [`bind_tcp_listener`]. +/// +/// `tokio::net::TcpListener::bind` (via `mio` and the Rust standard library) +/// uses a fixed backlog of 128, which is too small to absorb bursts of +/// simultaneous connection attempts: when the accept queue overflows, the +/// kernel silently drops the completed handshakes and clients only retry +/// after `TCP_RTO_MIN` (~1s on Linux). The kernel further clamps the +/// requested backlog to `/proc/sys/net/core/somaxconn`, so it is safe to +/// ask for a large value. +const MIN_LISTEN_BACKLOG: u32 = 1024; + +/// Bind a TCP listener with an explicit listen backlog, floored at +/// [`MIN_LISTEN_BACKLOG`]. Use this for any externally-facing listener that +/// might face a burst of simultaneous connection attempts. +pub async fn bind_tcp_listener(addr: SocketAddr, backlog: u32) -> anyhow::Result { + let socket = if addr.is_ipv6() { + tokio::net::TcpSocket::new_v6() + } else { + tokio::net::TcpSocket::new_v4() + } + .with_context(|| format!("could not create TCP socket for {addr}"))?; + let _ = socket.set_reuseaddr(true); + socket + .bind(addr) + .with_context(|| format!("could not bind to {addr}"))?; + socket + .listen(backlog.max(MIN_LISTEN_BACKLOG)) + .with_context(|| format!("could not listen on {addr}")) +} + pub fn global_http_client() -> reqwest::Client { static CLIENT: LazyLock = LazyLock::new(reqwest::Client::new); CLIENT.clone()