Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use std::sync::OnceLock;
use std::time::{Duration, Instant};
use tokio::sync::broadcast::error::RecvError;
use tokio::{
net::TcpListener,
signal::{
ctrl_c,
unix::{SignalKind, signal},
Expand Down Expand Up @@ -226,10 +225,8 @@ async fn maybe_start_metrics_service(
ctx: &AppContext,
) -> anyhow::Result<()> {
if config.client.enable_metrics_endpoint {
// Start Prometheus server port
let prometheus_listener = TcpListener::bind(config.client.metrics_address)
.await
.with_context(|| format!("could not bind to {}", config.client.metrics_address))?;
let prometheus_listener =
crate::utils::net::bind_tcp_listener(config.client.metrics_address, 0).await?;
info!(
"Prometheus server started at {}",
config.client.metrics_address
Expand Down Expand Up @@ -382,7 +379,7 @@ async fn maybe_start_health_check_service(
};
let healthcheck_address = forest_state.config.client.healthcheck_address;
info!("Healthcheck endpoint will listen at {healthcheck_address}");
let listener = tokio::net::TcpListener::bind(healthcheck_address).await?;
let listener = crate::utils::net::bind_tcp_listener(healthcheck_address, 0).await?;
services.spawn(async move {
crate::health::init_healthcheck_server(forest_state, listener)
.await
Expand Down Expand Up @@ -472,12 +469,11 @@ fn maybe_start_rpc_service(
let mpool_locker = MpoolLocker::new();
let temp_dir = Arc::new(ctx.temp_dir.clone());
async move {
let rpc_listener = tokio::net::TcpListener::bind(rpc_address)
.await
.map_err(|e| {
anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}")
})
.unwrap();
let rpc_listener = crate::utils::net::bind_tcp_listener(
rpc_address,
crate::rpc::default_max_connections(),
)
.await?;
start_rpc(
RPCState {
state_manager,
Expand Down
24 changes: 15 additions & 9 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,14 +458,20 @@ static DEFAULT_REQUEST_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
.unwrap_or(Duration::from_secs(60))
});

/// Default maximum connections for the RPC server. This needs to be high enough to
/// accommodate the regular usage for RPC providers.
static DEFAULT_MAX_CONNECTIONS: LazyLock<u32> = LazyLock::new(|| {
env::var("FOREST_RPC_MAX_CONNECTIONS")
.ok()
.and_then(|it| it.parse().ok())
.unwrap_or(1000)
});
/// Maximum concurrent connections accepted by the RPC server.
///
/// Configurable via `FOREST_RPC_MAX_CONNECTIONS`. The value also bounds the
/// TCP listen backlog so that bursts of connection attempts do not get
/// silently dropped by the kernel.
pub fn default_max_connections() -> u32 {
static VALUE: LazyLock<u32> = LazyLock::new(|| {
env::var("FOREST_RPC_MAX_CONNECTIONS")
.ok()
.and_then(|it| it.parse().ok())
.unwrap_or(1000)
});
Comment thread
LesnyRumcajs marked this conversation as resolved.
*VALUE
}

const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024;
const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE;
Expand Down Expand Up @@ -568,7 +574,7 @@ where
// Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
.max_request_body_size(MAX_REQUEST_BODY_SIZE)
.max_response_body_size(MAX_RESPONSE_BODY_SIZE)
.max_connections(*DEFAULT_MAX_CONNECTIONS)
.max_connections(default_max_connections())
.set_id_provider(RandomHexStringIdProvider::new())
.build(),
)
Expand Down
4 changes: 3 additions & 1 deletion src/tool/offline_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ where
{
info!("Starting offline RPC Server");
let rpc_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), rpc_port);
let rpc_listener = tokio::net::TcpListener::bind(rpc_address).await?;
let rpc_listener =
crate::utils::net::bind_tcp_listener(rpc_address, crate::rpc::default_max_connections())
.await?;
let mut terminate = signal(SignalKind::terminate())?;
let (stop_handle, server_handle) = stop_channel();
let result = tokio::select! {
Expand Down
33 changes: 33 additions & 0 deletions src/utils/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,53 @@ pub use download_file::*;

use crate::utils::io::WithProgress;
use crate::utils::reqwest_resume;
use anyhow::Context as _;
use cid::Cid;
use futures::{AsyncWriteExt, TryStreamExt};
use reqwest::Response;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::{Arc, LazyLock};
use tap::Pipe;
use tokio::io::AsyncBufRead;
use tokio::net::TcpListener;
use tokio_util::{
compat::TokioAsyncReadCompatExt,
either::Either::{Left, Right},
};
use tracing::info;
use url::Url;

/// Minimum listen backlog applied by [`bind_tcp_listener`].
///
/// `tokio::net::TcpListener::bind` (via `mio` and the Rust standard library)
/// uses a fixed backlog of 128, which is too small to absorb bursts of
/// simultaneous connection attempts: when the accept queue overflows, the
/// kernel silently drops the completed handshakes and clients only retry
/// after `TCP_RTO_MIN` (~1s on Linux). The kernel further clamps the
/// requested backlog to `/proc/sys/net/core/somaxconn`, so it is safe to
/// ask for a large value.
const MIN_LISTEN_BACKLOG: u32 = 1024;

/// Bind a TCP listener with an explicit listen backlog, floored at
/// [`MIN_LISTEN_BACKLOG`]. Use this for any externally-facing listener that
/// might face a burst of simultaneous connection attempts.
pub async fn bind_tcp_listener(addr: SocketAddr, backlog: u32) -> anyhow::Result<TcpListener> {
let socket = if addr.is_ipv6() {
tokio::net::TcpSocket::new_v6()
} else {
tokio::net::TcpSocket::new_v4()
}
.with_context(|| format!("could not create TCP socket for {addr}"))?;
let _ = socket.set_reuseaddr(true);
socket
.bind(addr)
.with_context(|| format!("could not bind to {addr}"))?;
socket
.listen(backlog.max(MIN_LISTEN_BACKLOG))
.with_context(|| format!("could not listen on {addr}"))
}

pub fn global_http_client() -> reqwest::Client {
static CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
CLIENT.clone()
Expand Down
Loading