From 2f94f83cf148e0828f80fffeeef4ee10175b211f Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Thu, 5 Mar 2026 15:10:39 +0100 Subject: [PATCH 1/5] Small improvements --- crates/wasi-http/src/body.rs | 10 ++++++---- crates/wasi-http/src/http_impl.rs | 7 +++++-- crates/wasi-http/src/types_impl.rs | 3 +-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index fe660a880b6d..ea04e321c013 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -224,11 +224,13 @@ impl HostIncomingBodyStream { self.state = IncomingBodyStreamState::Closed; } - // No more frames are going to be received again, so drop the `body` - // and the `tx` channel we'd send the body back onto because it's - // not needed as frames are done. + // No more frames are going to be received again, so send an + // explicit EOF (no trailers) and close the stream. None => { - self.state = IncomingBodyStreamState::Closed; + let prev = mem::replace(&mut self.state, IncomingBodyStreamState::Closed); + if let IncomingBodyStreamState::Open { body: _, tx } = prev { + let _ = tx.send(StreamEnd::Trailers(None)); + } } } } diff --git a/crates/wasi-http/src/http_impl.rs b/crates/wasi-http/src/http_impl.rs index ac952d7ead93..29e8e65547c5 100644 --- a/crates/wasi-http/src/http_impl.rs +++ b/crates/wasi-http/src/http_impl.rs @@ -24,7 +24,7 @@ where request_id: Resource, options: Option>, ) -> crate::HttpResult> { - let opts = options.and_then(|opts| self.table().get(&opts).ok()); + let opts = options.map(|opts| self.table().get(&opts)).transpose()?; let connect_timeout = opts .and_then(|opts| opts.connect_timeout) @@ -65,7 +65,10 @@ where Scheme::Other(_) => return Err(types::ErrorCode::HttpProtocolError.into()), }; - let authority = req.authority.unwrap_or_else(String::new); + let authority = match req.authority { + Some(a) if !a.is_empty() => a, + _ => return Err(types::ErrorCode::HttpRequestUriInvalid.into()), + }; builder = builder.header(hyper::header::HOST, &authority); diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 368de2b2a13b..6b26e2ac1d87 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -919,14 +919,13 @@ where id: Resource, ts: Option>, ) -> crate::HttpResult<()> { - let body = self.table().delete(id)?; - let ts = if let Some(ts) = ts { Some(move_fields(self.table(), ts)?) } else { None }; + let body = self.table().delete(id)?; body.finish(ts)?; Ok(()) } From d5b1f047c2de4d2a4eaaa6fe16a7c8ae56adc8d7 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Thu, 5 Mar 2026 16:37:15 +0100 Subject: [PATCH 2/5] Error propagation --- crates/wasi-http/src/body.rs | 52 +++++++++++++++++++++++++++--- crates/wasi-http/src/types.rs | 35 +++++++++++++------- crates/wasi-http/src/types_impl.rs | 2 +- 3 files changed, 72 insertions(+), 17 deletions(-) diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index ea04e321c013..abfb0eb678e5 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -1,7 +1,7 @@ //! Implementation of the `wasi:http/types` interface's various body types. use crate::bindings::http::types; -use crate::types::FieldMap; +use crate::types::{ConnWorkerErrorReceiver, FieldMap}; use bytes::Bytes; use http_body::{Body, Frame}; use http_body_util::BodyExt; @@ -30,6 +30,8 @@ pub struct HostIncomingBody { /// This ensures that if the parent of this body is dropped before the body /// then the backing data behind this worker is kept alive. worker: Option>, + /// Receives errors from the connection worker task, if any. + worker_error_receiver: Option, } impl HostIncomingBody { @@ -39,18 +41,33 @@ impl HostIncomingBody { between_bytes_timeout: Duration, field_size_limit: usize, ) -> HostIncomingBody { - let body = BodyWithTimeout::new(body, between_bytes_timeout); + let body = BodyWithTimeout::new(body, between_bytes_timeout, None); HostIncomingBody { body: IncomingBodyState::Start(body), field_size_limit, worker: None, + worker_error_receiver: None, } } /// Retain a worker task that needs to be kept alive while this body is being read. - pub fn retain_worker(&mut self, worker: AbortOnDropJoinHandle<()>) { + /// + /// If a `worker_error_receiver` is provided, connection worker errors will + /// be surfaced through the body stream during reads. + pub fn retain_worker( + &mut self, + worker: AbortOnDropJoinHandle<()>, + worker_error_receiver: Option, + ) { assert!(self.worker.is_none()); self.worker = Some(worker); + if let Some(rx) = worker_error_receiver { + self.worker_error_receiver = Some(rx.clone()); + // Also propagate to the body if it hasn't been taken yet + if let IncomingBodyState::Start(body) = &mut self.body { + body.worker_error_receiver = Some(rx); + } + } } /// Create a new `HostIncomingBody` that always fails with the given error. @@ -59,6 +76,7 @@ impl HostIncomingBody { body: IncomingBodyState::Failing(Arc::from(error)), field_size_limit: 0, worker: None, + worker_error_receiver: None, } } @@ -118,10 +136,16 @@ struct BodyWithTimeout { /// Maximal duration between when a frame is first requested and when it's /// allowed to arrive. between_bytes_timeout: Duration, + /// Receives errors from the connection worker task. + worker_error_receiver: Option, } impl BodyWithTimeout { - fn new(inner: HyperIncomingBody, between_bytes_timeout: Duration) -> BodyWithTimeout { + fn new( + inner: HyperIncomingBody, + between_bytes_timeout: Duration, + worker_error_receiver: Option, + ) -> BodyWithTimeout { BodyWithTimeout { inner, between_bytes_timeout, @@ -129,6 +153,7 @@ impl BodyWithTimeout { timeout: Box::pin(wasmtime_wasi::runtime::with_ambient_tokio_runtime(|| { tokio::time::sleep(Duration::new(0, 0)) })), + worker_error_receiver, } } } @@ -163,6 +188,25 @@ impl Body for BodyWithTimeout { // arrives then the sleep timer will be reset on the next frame. let result = Pin::new(&mut me.inner).poll_frame(cx); me.reset_sleep = result.is_ready(); + + // At end-of-stream (EOF or trailers), check if the connection worker + // reported an error. This surfaces connection-level failures that + // might not otherwise propagate through the body stream (e.g. the + // connection was reset after all data frames were sent but before + // a clean shutdown). + let is_end_of_stream = match &result { + Poll::Ready(None) => true, + Poll::Ready(Some(Ok(frame))) if !frame.is_data() => true, + _ => false, + }; + if is_end_of_stream { + if let Some(rx) = me.worker_error_receiver.take() { + if let Some(err) = rx.borrow().as_ref() { + return Poll::Ready(Some(Err(err.as_ref().clone()))); + } + } + } + result } } diff --git a/crates/wasi-http/src/types.rs b/crates/wasi-http/src/types.rs index 67f2a5db1d15..d4f5863c48f2 100644 --- a/crates/wasi-http/src/types.rs +++ b/crates/wasi-http/src/types.rs @@ -12,6 +12,8 @@ use hyper::body::Body; use std::any::Any; use std::fmt; use std::time::Duration; +use std::sync::Arc; +use tokio::sync::watch; use wasmtime::component::{Resource, ResourceTable}; use wasmtime::{Result, bail}; use wasmtime_wasi::p2::Pollable; @@ -418,7 +420,7 @@ pub async fn default_send_request_handler( } })?; - let (mut sender, worker) = if use_tls { + let (mut sender, worker, worker_err_rx) = if use_tls { use rustls::pki_types::ServerName; // derived from https://github.com/rustls/rustls/blob/main/examples/src/bin/simpleclient.rs @@ -451,16 +453,15 @@ pub async fn default_send_request_handler( .map_err(|_| types::ErrorCode::ConnectionTimeout)? .map_err(hyper_request_error)?; + let (err_tx, err_rx) = watch::channel(None); let worker = wasmtime_wasi::runtime::spawn(async move { - match conn.await { - Ok(()) => {} - // TODO: shouldn't throw away this error and ideally should - // surface somewhere. - Err(e) => tracing::warn!("dropping error {e}"), + if let Err(e) = conn.await { + tracing::debug!("hyper connection worker error: {e:?}"); + let _ = err_tx.send(Some(Arc::new(hyper_request_error(e)))); } }); - (sender, worker) + (sender, worker, err_rx) } else { let tcp_stream = TokioIo::new(tcp_stream); let (sender, conn) = timeout( @@ -472,15 +473,15 @@ pub async fn default_send_request_handler( .map_err(|_| types::ErrorCode::ConnectionTimeout)? .map_err(hyper_request_error)?; + let (err_tx, err_rx) = watch::channel(None); let worker = wasmtime_wasi::runtime::spawn(async move { - match conn.await { - Ok(()) => {} - // TODO: same as above, shouldn't throw this error away. - Err(e) => tracing::warn!("dropping error {e}"), + if let Err(e) = conn.await { + tracing::debug!("hyper connection worker error: {e:?}"); + let _ = err_tx.send(Some(Arc::new(hyper_request_error(e)))); } }); - (sender, worker) + (sender, worker, err_rx) }; // at this point, the request contains the scheme and the authority, but @@ -507,6 +508,7 @@ pub async fn default_send_request_handler( resp, worker: Some(worker), between_bytes_timeout, + worker_error_receiver: Some(worker_err_rx), }) } @@ -828,6 +830,13 @@ impl AsRef for FieldMap { pub type FutureIncomingResponseHandle = AbortOnDropJoinHandle>>; +/// A shared receiver for connection worker errors. +/// +/// The connection worker task sets this if the hyper connection driver +/// encounters an error. The body stream checks it while reading to +/// surface connection-level failures to the guest. +pub type ConnWorkerErrorReceiver = watch::Receiver>>; + /// A response that is in the process of being received. #[derive(Debug)] pub struct IncomingResponse { @@ -837,6 +846,8 @@ pub struct IncomingResponse { pub worker: Option>, /// The timeout between chunks of the response. pub between_bytes_timeout: std::time::Duration, + /// Receives connection worker errors, if any. + pub worker_error_receiver: Option, } /// The concrete type behind a `wasi:http/types.future-incoming-response` resource. diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 6b26e2ac1d87..747cd43472ee 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -880,7 +880,7 @@ where let mut body = HostIncomingBody::new(body, resp.between_bytes_timeout, field_size_limit); if let Some(worker) = resp.worker { - body.retain_worker(worker); + body.retain_worker(worker, resp.worker_error_receiver); } body }), From a486dd6ebfd6470e965a6d0d7c5398cf7d4d66cd Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Mon, 9 Mar 2026 08:32:57 +0100 Subject: [PATCH 3/5] Connection pool --- Cargo.lock | 47 +++ Cargo.toml | 2 + crates/wasi-http/Cargo.toml | 4 +- crates/wasi-http/src/body.rs | 15 + crates/wasi-http/src/lib.rs | 3 + crates/wasi-http/src/types.rs | 511 ++++++++++++++++++++++++- crates/wasi-http/src/types_impl.rs | 2 + crates/wasi-http/tests/all/main.rs | 2 + crates/wasi-http/tests/all/p2.rs | 14 +- crates/wasi-http/tests/all/pool.rs | 580 +++++++++++++++++++++++++++++ 10 files changed, 1172 insertions(+), 8 deletions(-) create mode 100644 crates/wasi-http/tests/all/pool.rs diff --git a/Cargo.lock b/Cargo.lock index 47a51a41d4ce..d3750e608f64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1823,6 +1823,45 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" +dependencies = [ + "futures-util", + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", + "webpki-roots", +] + +[[package]] +name = "hyper-util" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "libc", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.61" @@ -3909,6 +3948,12 @@ dependencies = [ "zip", ] +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.41" @@ -5096,6 +5141,8 @@ dependencies = [ "http-body", "http-body-util", "hyper", + "hyper-rustls", + "hyper-util", "rustls", "sha2", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index dc159f65f593..b9a7f3551158 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -428,6 +428,8 @@ tokio-openssl = "0.6.5" openssl = "0.10.73" openssl-sys = "0.9.109" webpki-roots = "0.26.0" +hyper-util = { version = "0.1", default-features = false } +hyper-rustls = { version = "0.26", default-features = false } itertools = "0.14.0" base64 = "0.22.1" termcolor = "1.4.1" diff --git a/crates/wasi-http/Cargo.toml b/crates/wasi-http/Cargo.toml index c769cb9b2e3c..be9e3fa0292a 100644 --- a/crates/wasi-http/Cargo.toml +++ b/crates/wasi-http/Cargo.toml @@ -16,7 +16,7 @@ all-features = true [features] default = ["default-send-request"] -default-send-request = ["dep:tokio-rustls", "dep:rustls", "dep:webpki-roots"] +default-send-request = ["dep:tokio-rustls", "dep:rustls", "dep:webpki-roots", "dep:hyper-util", "dep:hyper-rustls"] p3 = ["wasmtime-wasi/p3", "dep:tokio-util"] component-model-async = ["futures/alloc", "wasmtime/component-model-async"] @@ -41,6 +41,8 @@ wasmtime = { workspace = true, features = ['component-model'] } tokio-rustls = { workspace = true, optional = true } rustls = { workspace = true, optional = true } webpki-roots = { workspace = true, optional = true } +hyper-util = { workspace = true, features = ["http1", "client-legacy", "tokio"], optional = true } +hyper-rustls = { workspace = true, features = ["http1", "webpki-roots"], optional = true } [dev-dependencies] wasmtime-wasi-http = { path = ".", features = ['default-send-request'] } diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index abfb0eb678e5..bf20ab1c7b8b 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -32,6 +32,10 @@ pub struct HostIncomingBody { worker: Option>, /// Receives errors from the connection worker task, if any. worker_error_receiver: Option, + /// Connection concurrency permits held while this body is being read. + /// Released when the body is dropped, allowing queued requests to proceed. + #[cfg(feature = "default-send-request")] + connection_permits: Option, } impl HostIncomingBody { @@ -47,6 +51,8 @@ impl HostIncomingBody { field_size_limit, worker: None, worker_error_receiver: None, + #[cfg(feature = "default-send-request")] + connection_permits: None, } } @@ -70,6 +76,13 @@ impl HostIncomingBody { } } + /// Retain connection concurrency permits that should be held while this body + /// is being read. The permits are released when the body is dropped. + #[cfg(feature = "default-send-request")] + pub fn retain_connection_permits(&mut self, permits: Option) { + self.connection_permits = permits; + } + /// Create a new `HostIncomingBody` that always fails with the given error. pub fn failing(error: String) -> HostIncomingBody { HostIncomingBody { @@ -77,6 +90,8 @@ impl HostIncomingBody { field_size_limit: 0, worker: None, worker_error_receiver: None, + #[cfg(feature = "default-send-request")] + connection_permits: None, } } diff --git a/crates/wasi-http/src/lib.rs b/crates/wasi-http/src/lib.rs index ab8363fe5ca8..8789b0237833 100644 --- a/crates/wasi-http/src/lib.rs +++ b/crates/wasi-http/src/lib.rs @@ -280,6 +280,9 @@ pub use crate::types::{ DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx, WasiHttpImpl, WasiHttpView, }; +#[cfg(feature = "default-send-request")] +#[doc(inline)] +pub use crate::types::{HttpConnectionPool, HttpConnectionPoolConfig}; use http::header::CONTENT_LENGTH; use wasmtime::component::HasData; diff --git a/crates/wasi-http/src/types.rs b/crates/wasi-http/src/types.rs index d4f5863c48f2..01ad6c80c2cf 100644 --- a/crates/wasi-http/src/types.rs +++ b/crates/wasi-http/src/types.rs @@ -23,7 +23,10 @@ use wasmtime_wasi::runtime::AbortOnDropJoinHandle; use { crate::io::TokioIo, crate::{error::dns_error, hyper_request_error}, + std::collections::HashMap, + std::sync::Weak, tokio::net::TcpStream, + tokio::sync::{OwnedSemaphorePermit, Semaphore}, tokio::time::timeout, }; @@ -42,6 +45,12 @@ const DEFAULT_FIELD_SIZE_LIMIT: usize = 128 * 1024; pub struct WasiHttpCtx { /// The maximum size for any fields resources created by this context. pub field_size_limit: usize, + /// Optional HTTP connection pool for reusing outgoing connections. + /// + /// When `Some`, `default_send_request` will use pooled connections + /// instead of creating a new TCP+TLS connection per request. + #[cfg(feature = "default-send-request")] + pub connection_pool: Option, } impl WasiHttpCtx { @@ -49,6 +58,8 @@ impl WasiHttpCtx { pub fn new() -> Self { Self { field_size_limit: DEFAULT_FIELD_SIZE_LIMIT, + #[cfg(feature = "default-send-request")] + connection_pool: None, } } @@ -64,6 +75,150 @@ impl WasiHttpCtx { } } +/// Configuration for the HTTP connection pool. +#[cfg(feature = "default-send-request")] +#[derive(Clone, Debug)] +pub struct HttpConnectionPoolConfig { + /// Maximum number of idle connections per host. Default: 8. + pub max_idle_per_host: usize, + /// How long idle connections remain in the pool before being closed. Default: 90 seconds. + pub idle_timeout: Duration, + /// Timeout for establishing new TCP connections. Default: 30 seconds. + /// + /// This is a global setting applied to all connections created by the pool. + /// Per-request `connect_timeout` from `OutgoingRequestConfig` is not applied + /// when using the pool. + pub connect_timeout: Duration, + /// Maximum number of concurrent in-flight connections per host. Default: 20. + /// + /// When this limit is reached, new requests to the same host will wait + /// for an existing request to complete before proceeding. This prevents + /// overwhelming targets (e.g. Cloudflare) with too many simultaneous + /// TCP/TLS handshakes. + pub max_connections_per_host: usize, + /// Maximum total number of concurrent in-flight connections across all hosts. Default: 200. + /// + /// This prevents exhausting OS resources (file descriptors, ports, memory) + /// when connecting to many different hosts simultaneously. + pub max_total_connections: usize, + /// Maximum number of distinct host entries tracked in the per-host semaphore map. Default: 1024. + /// + /// When this limit is exceeded, stale entries (hosts with no active connections) + /// are opportunistically cleaned up. + pub max_host_entries: usize, +} + +#[cfg(feature = "default-send-request")] +impl Default for HttpConnectionPoolConfig { + fn default() -> Self { + Self { + max_idle_per_host: 8, + idle_timeout: Duration::from_secs(90), + connect_timeout: Duration::from_secs(30), + max_connections_per_host: 20, + max_total_connections: 200, + max_host_entries: 1024, + } + } +} + +/// A shared HTTP connection pool backed by `hyper-util`'s legacy client. +/// +/// This pool reuses TCP and TLS connections across requests to the same host, +/// reducing connection establishment overhead. It is `Clone`-able and can be +/// shared across multiple workers or contexts. +/// +/// In addition to connection reuse, the pool enforces concurrency limits: +/// - A per-host limit prevents overwhelming individual targets with too many +/// simultaneous connections (e.g. triggering Cloudflare rate limiting). +/// - A global limit prevents exhausting OS resources (file descriptors, ports). +#[cfg(feature = "default-send-request")] +#[derive(Clone)] +pub struct HttpConnectionPool { + client: hyper_util::client::legacy::Client< + hyper_rustls::HttpsConnector, + HyperOutgoingBody, + >, + global_semaphore: Arc, + host_semaphores: Arc>>>, + max_connections_per_host: usize, + max_host_entries: usize, +} + +#[cfg(feature = "default-send-request")] +impl fmt::Debug for HttpConnectionPool { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("HttpConnectionPool").finish_non_exhaustive() + } +} + +#[cfg(feature = "default-send-request")] +impl HttpConnectionPool { + /// Create a new connection pool with the given configuration. + pub fn new(config: HttpConnectionPoolConfig) -> Self { + let root_cert_store = rustls::RootCertStore { + roots: webpki_roots::TLS_SERVER_ROOTS.into(), + }; + let mut tls_config = rustls::ClientConfig::builder() + .with_root_certificates(root_cert_store) + .with_no_client_auth(); + tls_config.alpn_protocols = vec![b"http/1.1".to_vec()]; + + let mut http_connector = hyper_util::client::legacy::connect::HttpConnector::new(); + http_connector.enforce_http(false); + http_connector.set_connect_timeout(Some(config.connect_timeout)); + + // Construct the HttpsConnector directly via From<(H, C)> instead of + // using HttpsConnectorBuilder. The builder's enable_http1() leaves + // alpn_protocols empty, which causes HandshakeFailure with servers that + // require ALPN. The builder also asserts alpn_protocols is empty on + // input, so we cannot pre-set it. The From impl is equivalent to + // .https_or_http().wrap_connector() (force_https=false) but lets us + // keep the ALPN we configured above. + let https: hyper_rustls::HttpsConnector<_> = + (http_connector, tls_config).into(); + + let client = hyper_util::client::legacy::Client::builder( + hyper_util::rt::TokioExecutor::new(), + ) + .pool_idle_timeout(config.idle_timeout) + .pool_max_idle_per_host(config.max_idle_per_host) + .build(https); + + Self { + client, + global_semaphore: Arc::new(Semaphore::new(config.max_total_connections)), + host_semaphores: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + max_connections_per_host: config.max_connections_per_host, + max_host_entries: config.max_host_entries, + } + } + + /// Get or create a semaphore for the given host key. + /// + /// Uses `Weak` references so that semaphores are naturally cleaned up when + /// all permits are released and no one holds an `Arc` to the semaphore. + async fn host_semaphore(&self, key: &str) -> Arc { + let mut map = self.host_semaphores.lock().await; + + if let Some(weak) = map.get(key) { + if let Some(strong) = weak.upgrade() { + return strong; + } + } + + let sem = Arc::new(Semaphore::new(self.max_connections_per_host)); + map.insert(key.to_string(), Arc::downgrade(&sem)); + + // Opportunistic cleanup when the map grows too large + if map.len() > self.max_host_entries { + map.retain(|_, w| w.strong_count() > 0); + } + + sem + } +} + /// A trait which provides internal WASI HTTP state. /// /// # Example @@ -155,7 +310,7 @@ pub trait WasiHttpView { request: hyper::Request, config: OutgoingRequestConfig, ) -> crate::HttpResult { - Ok(default_send_request(request, config)) + Ok(default_send_request_with_pool(request, config, self.connection_pool().cloned())) } /// Send an outgoing request. @@ -183,6 +338,24 @@ pub trait WasiHttpView { fn outgoing_body_chunk_size(&mut self) -> usize { DEFAULT_OUTGOING_BODY_CHUNK_SIZE } + + /// Returns the connection pool for outgoing requests, if configured. + /// + /// When a pool is returned, `default_send_request` will reuse connections + /// instead of opening a new TCP+TLS connection per request. + /// + /// The default returns `None` (no pooling). Implementors who set a pool on + /// [`WasiHttpCtx::connection_pool`] should override this to return it, e.g.: + /// + /// ```ignore + /// fn connection_pool(&self) -> Option<&HttpConnectionPool> { + /// self.http_ctx.connection_pool.as_ref() + /// } + /// ``` + #[cfg(feature = "default-send-request")] + fn connection_pool(&self) -> Option<&HttpConnectionPool> { + None + } } /// The default value configured for [`WasiHttpView::outgoing_body_buffer_chunks`] in [`WasiHttpView`]. @@ -227,6 +400,11 @@ impl WasiHttpView for &mut T { fn outgoing_body_chunk_size(&mut self) -> usize { T::outgoing_body_chunk_size(self) } + + #[cfg(feature = "default-send-request")] + fn connection_pool(&self) -> Option<&HttpConnectionPool> { + T::connection_pool(self) + } } impl WasiHttpView for Box { @@ -266,6 +444,11 @@ impl WasiHttpView for Box { fn outgoing_body_chunk_size(&mut self) -> usize { T::outgoing_body_chunk_size(self) } + + #[cfg(feature = "default-send-request")] + fn connection_pool(&self) -> Option<&HttpConnectionPool> { + T::connection_pool(self) + } } /// A concrete structure that all generated `Host` traits are implemented for. @@ -365,18 +548,82 @@ pub struct OutgoingRequestConfig { /// The default implementation of how an outgoing request is sent. /// /// This implementation is used by the `wasi:http/outgoing-handler` interface -/// default implementation. +/// default implementation. Creates a new TCP+TLS connection per request +/// without connection pooling. #[cfg(feature = "default-send-request")] pub fn default_send_request( request: hyper::Request, config: OutgoingRequestConfig, +) -> HostFutureIncomingResponse { + default_send_request_with_pool(request, config, None) +} + +/// Like [`default_send_request`], but optionally uses a connection pool. +/// +/// When `connection_pool` is `Some`, connections are reused across requests +/// to the same host. When `None`, falls back to creating a new connection +/// per request. +#[cfg(feature = "default-send-request")] +pub fn default_send_request_with_pool( + request: hyper::Request, + config: OutgoingRequestConfig, + connection_pool: Option, ) -> HostFutureIncomingResponse { let handle = wasmtime_wasi::runtime::spawn(async move { - Ok(default_send_request_handler(request, config).await) + if let Some(pool) = connection_pool { + Ok(pooled_send_request_handler(request, config, pool).await) + } else { + Ok(default_send_request_handler(request, config).await) + } }); HostFutureIncomingResponse::pending(handle) } +/// Maximum depth to walk error source chains to avoid pathological cycles. +#[cfg(feature = "default-send-request")] +const MAX_ERROR_CHAIN_DEPTH: usize = 32; + +/// Walk the error source chain to find a `rustls::Error`, if one exists. +#[cfg(feature = "default-send-request")] +fn find_rustls_error<'a>(err: &'a (dyn std::error::Error + 'static)) -> Option<&'a rustls::Error> { + let mut cur: &(dyn std::error::Error + 'static) = err; + for _ in 0..MAX_ERROR_CHAIN_DEPTH { + if let Some(r) = cur.downcast_ref::() { + return Some(r); + } + cur = cur.source()?; + } + None +} + +/// Walk the error source chain to find a `std::io::Error`, if one exists. +#[cfg(feature = "default-send-request")] +fn find_io_error<'a>(err: &'a (dyn std::error::Error + 'static)) -> Option<&'a std::io::Error> { + let mut cur: &(dyn std::error::Error + 'static) = err; + for _ in 0..MAX_ERROR_CHAIN_DEPTH { + if let Some(io_err) = cur.downcast_ref::() { + return Some(io_err); + } + cur = cur.source()?; + } + None +} + +/// Walk the error source chain to find a `tokio::time::error::Elapsed`, if one exists. +#[cfg(feature = "default-send-request")] +fn find_elapsed_error<'a>( + err: &'a (dyn std::error::Error + 'static), +) -> Option<&'a tokio::time::error::Elapsed> { + let mut cur: &(dyn std::error::Error + 'static) = err; + for _ in 0..MAX_ERROR_CHAIN_DEPTH { + if let Some(elapsed) = cur.downcast_ref::() { + return Some(elapsed); + } + cur = cur.source()?; + } + None +} + /// The underlying implementation of how an outgoing request is sent. This should likely be spawned /// in a task. /// @@ -440,6 +687,44 @@ pub async fn default_send_request_handler( })? .to_owned(); let stream = connector.connect(domain, tcp_stream).await.map_err(|e| { + // Check the io::Error kind directly first + match e.kind() { + std::io::ErrorKind::ConnectionRefused => { + tracing::warn!("tls connection refused: {e:?}"); + return types::ErrorCode::ConnectionRefused; + } + std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::UnexpectedEof => { + tracing::warn!("tls connection terminated: {e:?}"); + return types::ErrorCode::ConnectionTerminated; + } + std::io::ErrorKind::TimedOut => { + tracing::warn!("tls connection timed out: {e:?}"); + return types::ErrorCode::ConnectionTimeout; + } + _ => {} + } + // Walk the error chain to find a rustls-specific error + if let Some(rustls_err) = find_rustls_error(&e) { + match rustls_err { + rustls::Error::InvalidCertificate(_) => { + tracing::warn!("tls certificate error: {e:?}"); + return types::ErrorCode::TlsCertificateError; + } + rustls::Error::AlertReceived(alert) => { + tracing::warn!("tls alert received: {e:?}"); + return types::ErrorCode::TlsAlertReceived( + crate::bindings::http::types::TlsAlertReceivedPayload { + alert_id: Some(alert.get_u8()), + alert_message: Some(format!("{alert:?}")), + }, + ); + } + _ => {} + } + } tracing::warn!("tls protocol error: {e:?}"); types::ErrorCode::TlsProtocolError })?; @@ -509,6 +794,204 @@ pub async fn default_send_request_handler( worker: Some(worker), between_bytes_timeout, worker_error_receiver: Some(worker_err_rx), + #[cfg(feature = "default-send-request")] + connection_permits: None, + }) +} + +/// Construct a normalized host key for per-host semaphore lookup. +/// +/// The key is `"{scheme}://{host}:{port}"` with: +/// - Scheme canonicalized to lowercase `"http"` or `"https"` +/// - Host lowercased (DNS names are case-insensitive) +/// - Default ports (80 for HTTP, 443 for HTTPS) filled in +/// +/// This ensures that `Example.com`, `example.com:443`, and `HTTPS://example.com` +/// all map to the same semaphore. +#[cfg(feature = "default-send-request")] +fn make_host_key(scheme: &str, authority: &http::uri::Authority) -> String { + let scheme = if scheme.eq_ignore_ascii_case("https") { "https" } else { "http" }; + let host = authority.host().to_ascii_lowercase(); + let port = authority.port_u16().unwrap_or(if scheme == "https" { 443 } else { 80 }); + format!("{scheme}://{host}:{port}") +} + +/// Send a request using a pooled connection via `hyper-util`'s legacy client. +/// +/// The client handles TCP connection, TLS handshake, connection pooling, +/// keep-alive, and connection health checks automatically. +/// +/// Concurrency is limited by acquiring per-host and global semaphore permits +/// before sending the request. Permits are held until the response (including +/// body) is dropped. +#[cfg(feature = "default-send-request")] +async fn pooled_send_request_handler( + request: hyper::Request, + config: OutgoingRequestConfig, + pool: HttpConnectionPool, +) -> Result { + // Validate that the request URI is absolute (has scheme + authority). + // The pooled connector requires an absolute URI to determine host and TLS. + let scheme = request + .uri() + .scheme_str() + .ok_or(types::ErrorCode::HttpRequestUriInvalid)?; + let authority = request + .uri() + .authority() + .ok_or(types::ErrorCode::HttpRequestUriInvalid)? + .clone(); + let scheme_is_http = scheme.eq_ignore_ascii_case("http"); + let scheme_is_https = scheme.eq_ignore_ascii_case("https"); + if !scheme_is_http && !scheme_is_https { + return Err(types::ErrorCode::HttpProtocolError); + } + // Validate that use_tls matches the URI scheme. The pooled connector uses the + // URI scheme to decide whether to use TLS, so a mismatch would lead to silent + // behavioral divergence from the non-pooled path. + let scheme_is_tls = scheme_is_https; + if config.use_tls != scheme_is_tls { + tracing::warn!( + "pooled request use_tls={} but URI scheme is {scheme:?}", + config.use_tls + ); + return Err(types::ErrorCode::HttpProtocolError); + } + + let between_bytes_timeout = config.between_bytes_timeout; + let first_byte_timeout = config.first_byte_timeout; + + // Use a single deadline for both semaphore acquisitions so the total wait + // never exceeds connect_timeout (rather than 2× connect_timeout). + let acquire_deadline = tokio::time::Instant::now() + config.connect_timeout; + + // Acquire concurrency permits: per-host first, then global. + // Per-host first avoids global permit hoarding where a burst to one host + // grabs all global permits while waiting on per-host, starving other hosts. + let host_key = make_host_key(scheme, &authority); + let host_sem = pool.host_semaphore(&host_key).await; + + tracing::debug!( + host_key = %host_key, + "pooled: acquiring per-host permit" + ); + let host_permit = tokio::time::timeout_at(acquire_deadline, host_sem.acquire_owned()) + .await + .map_err(|_| { + tracing::warn!(host_key = %host_key, "pooled: timed out waiting for per-host permit"); + types::ErrorCode::ConnectionTimeout + })? + .map_err(|_| { + tracing::warn!(host_key = %host_key, "pooled: per-host semaphore closed"); + types::ErrorCode::ConnectionTimeout + })?; + + tracing::debug!("pooled: acquiring global permit"); + let global_permit = tokio::time::timeout_at(acquire_deadline, pool.global_semaphore.clone().acquire_owned()) + .await + .map_err(|_| { + tracing::warn!("pooled: timed out waiting for global permit"); + types::ErrorCode::ConnectionTimeout + })? + .map_err(|_| { + tracing::warn!("pooled: global semaphore closed"); + types::ErrorCode::ConnectionTimeout + })?; + + let uri = request.uri().clone(); + tracing::debug!( + %uri, + use_tls = config.use_tls, + "pooled: sending request" + ); + + let resp = timeout(first_byte_timeout, pool.client.request(request)) + .await + .map_err(|_| types::ErrorCode::ConnectionReadTimeout)? + .map_err(|e| { + // hyper_util::client::legacy::Error wraps hyper errors and + // connector errors. Try to extract a more specific ErrorCode. + if e.is_connect() { + // Connection-phase error: could be DNS, TCP, or TLS. + // Walk the full error chain since hyper-util wraps errors + // in multiple layers. + if let Some(io_err) = find_io_error(&e) { + match io_err.kind() { + std::io::ErrorKind::ConnectionRefused => { + tracing::warn!("pooled connection refused: {e:?}"); + return types::ErrorCode::ConnectionRefused; + } + std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::UnexpectedEof => { + tracing::warn!("pooled connection terminated: {e:?}"); + return types::ErrorCode::ConnectionTerminated; + } + std::io::ErrorKind::TimedOut => { + tracing::warn!("pooled connection timed out: {e:?}"); + return types::ErrorCode::ConnectionTimeout; + } + _ => {} + } + // Check for DNS-related errors (matches non-pooled path logic) + if io_err.kind() == std::io::ErrorKind::AddrNotAvailable + || io_err + .to_string() + .starts_with("failed to lookup address information") + { + tracing::warn!("pooled dns error: {e:?}"); + return crate::error::dns_error( + "address not available".to_string(), + 0, + ); + } + } + // Check for tokio timeout errors that may be wrapped + // in the hyper-util error chain (e.g. pool connect timeout). + if find_elapsed_error(&e).is_some() { + tracing::warn!("pooled connection timed out (elapsed): {e:?}"); + return types::ErrorCode::ConnectionTimeout; + } + if let Some(rustls_err) = find_rustls_error(&e) { + match rustls_err { + rustls::Error::InvalidCertificate(_) => { + tracing::warn!("pooled tls certificate error: {e:?}"); + return types::ErrorCode::TlsCertificateError; + } + rustls::Error::AlertReceived(alert) => { + tracing::warn!("pooled tls alert received: {e:?}"); + return types::ErrorCode::TlsAlertReceived( + crate::bindings::http::types::TlsAlertReceivedPayload { + alert_id: Some(alert.get_u8()), + alert_message: Some(format!("{alert:?}")), + }, + ); + } + _ => { + tracing::warn!("pooled tls protocol error: {e:?}"); + return types::ErrorCode::TlsProtocolError; + } + } + } + tracing::warn!(%uri, "pooled connection error: {e:?}"); + types::ErrorCode::DestinationUnavailable + } else { + tracing::warn!("pooled request error: {e:?}"); + types::ErrorCode::HttpProtocolError + } + })? + .map(|body| body.map_err(hyper_request_error).boxed_unsync()); + + Ok(IncomingResponse { + resp, + worker: None, + between_bytes_timeout, + worker_error_receiver: None, + connection_permits: Some(ConnectionPermits { + _host: host_permit, + _global: global_permit, + }), }) } @@ -837,6 +1320,24 @@ pub type FutureIncomingResponseHandle = /// surface connection-level failures to the guest. pub type ConnWorkerErrorReceiver = watch::Receiver>>; +/// Holds semaphore permits for connection concurrency limiting. +/// +/// When this struct is dropped, the permits are released, allowing +/// queued requests to proceed. The permits are acquired per-host first, +/// then globally, to avoid global permit hoarding. +#[cfg(feature = "default-send-request")] +pub struct ConnectionPermits { + _host: OwnedSemaphorePermit, + _global: OwnedSemaphorePermit, +} + +#[cfg(feature = "default-send-request")] +impl fmt::Debug for ConnectionPermits { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ConnectionPermits").finish_non_exhaustive() + } +} + /// A response that is in the process of being received. #[derive(Debug)] pub struct IncomingResponse { @@ -848,6 +1349,10 @@ pub struct IncomingResponse { pub between_bytes_timeout: std::time::Duration, /// Receives connection worker errors, if any. pub worker_error_receiver: Option, + /// Connection concurrency permits. Released when the response is dropped, + /// allowing queued requests to proceed. + #[cfg(feature = "default-send-request")] + pub connection_permits: Option, } /// The concrete type behind a `wasi:http/types.future-incoming-response` resource. diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 747cd43472ee..1a085f9dca0b 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -882,6 +882,8 @@ where if let Some(worker) = resp.worker { body.retain_worker(worker, resp.worker_error_receiver); } + #[cfg(feature = "default-send-request")] + body.retain_connection_permits(resp.connection_permits); body }), })?; diff --git a/crates/wasi-http/tests/all/main.rs b/crates/wasi-http/tests/all/main.rs index 6db5cd0742a2..4aee0c0ed96a 100644 --- a/crates/wasi-http/tests/all/main.rs +++ b/crates/wasi-http/tests/all/main.rs @@ -9,6 +9,8 @@ macro_rules! assert_test_exists { mod http_server; mod p2; +#[cfg(feature = "default-send-request")] +mod pool; #[cfg(feature = "p3")] mod p3; diff --git a/crates/wasi-http/tests/all/p2.rs b/crates/wasi-http/tests/all/p2.rs index 9d070789d027..d9655543d053 100644 --- a/crates/wasi-http/tests/all/p2.rs +++ b/crates/wasi-http/tests/all/p2.rs @@ -13,7 +13,7 @@ use wasmtime::{ error::Context as _, format_err, }; -use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView, p2::pipe::MemoryOutputPipe}; +use wasmtime_wasi::{IoCtx, WasiCtx, WasiCtxView, WasiView, p2::pipe::MemoryOutputPipe}; use wasmtime_wasi_http::{ HttpResult, WasiHttpCtx, WasiHttpView, bindings::http::types::{ErrorCode, Scheme}, @@ -31,6 +31,7 @@ type RequestSender = Arc< struct Ctx { table: ResourceTable, wasi: WasiCtx, + io_ctx: IoCtx, http: WasiHttpCtx, stdout: MemoryOutputPipe, stderr: MemoryOutputPipe, @@ -43,6 +44,7 @@ impl WasiView for Ctx { WasiCtxView { ctx: &mut self.wasi, table: &mut self.table, + io_ctx: &mut self.io_ctx, } } } @@ -89,9 +91,11 @@ fn store(engine: &Engine, server: &Server) -> Store { builder.stdout(stdout.clone()); builder.stderr(stderr.clone()); builder.env("HTTP_SERVER", &server.addr()); + let (wasi, io_ctx) = builder.build(); let ctx = Ctx { table: ResourceTable::new(), - wasi: builder.build(), + wasi, + io_ctx, http: WasiHttpCtx::new(), stderr, stdout, @@ -116,7 +120,7 @@ impl Drop for Ctx { } mod async_; -mod sync; +// Note: sync tests are removed because this fork does not have p2 sync APIs. async fn run_wasi_http( component_filename: &str, @@ -141,7 +145,7 @@ async fn run_wasi_http( let mut builder = WasiCtx::builder(); builder.stdout(stdout.clone()); builder.stderr(stderr.clone()); - let wasi = builder.build(); + let (wasi, io_ctx) = builder.build(); let mut http = WasiHttpCtx::new(); if let Some(limit) = field_size_limit { http.set_field_size_limit(limit); @@ -149,6 +153,7 @@ async fn run_wasi_http( let ctx = Ctx { table, wasi, + io_ctx, http, stderr, stdout, @@ -303,6 +308,7 @@ async fn do_wasi_http_hash_all(override_send_request: bool) -> Result<()> { }), worker: None, between_bytes_timeout, + worker_error_receiver: None, }) }); HostFutureIncomingResponse::ready(response) diff --git a/crates/wasi-http/tests/all/pool.rs b/crates/wasi-http/tests/all/pool.rs new file mode 100644 index 000000000000..a75526fb5e07 --- /dev/null +++ b/crates/wasi-http/tests/all/pool.rs @@ -0,0 +1,580 @@ +//! Tests for HTTP connection pooling and error classification. +//! +//! These are host-level tests that exercise the Rust APIs directly (no Wasm +//! components). They verify: +//! - Connection pool construction and cloneability +//! - Pooled vs non-pooled request dispatch +//! - Error classification for various failure modes + +use http_body_util::{BodyExt, Empty}; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response, StatusCode}; +use std::net::{Ipv4Addr, SocketAddr}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::net::TcpListener; +use wasmtime::Result; +use wasmtime_wasi_http::bindings::http::types::ErrorCode; +use wasmtime_wasi_http::body::HyperOutgoingBody; +use wasmtime_wasi_http::io::TokioIo; +use wasmtime_wasi_http::types::{ + default_send_request, default_send_request_with_pool, HostFutureIncomingResponse, + OutgoingRequestConfig, +}; +use wasmtime_wasi_http::{HttpConnectionPool, HttpConnectionPoolConfig}; + +/// Build a minimal outgoing request body (empty). +fn empty_body() -> HyperOutgoingBody { + Empty::::new() + .map_err(|_| unreachable!("Infallible error")) + .boxed_unsync() +} + +/// Standard short timeouts for tests. +fn fast_config(use_tls: bool) -> OutgoingRequestConfig { + OutgoingRequestConfig { + use_tls, + connect_timeout: Duration::from_millis(500), + first_byte_timeout: Duration::from_secs(5), + between_bytes_timeout: Duration::from_secs(5), + } +} + +/// Helper: resolve a `HostFutureIncomingResponse` to its inner result. +async fn resolve( + mut future: HostFutureIncomingResponse, +) -> wasmtime::Result> { + use wasmtime_wasi::p2::Pollable; + future.ready().await; + future.unwrap_ready() +} + +/// Start a local HTTP/1.1 server that echoes the request method and URI back. +/// Returns the server address and a `tokio::task::JoinHandle` for cleanup. +async fn start_echo_server() -> Result<(SocketAddr, tokio::task::JoinHandle<()>)> { + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await?; + let addr = listener.local_addr()?; + + let handle = tokio::spawn(async move { + loop { + let Ok((stream, _)) = listener.accept().await else { + break; + }; + let io = TokioIo::new(stream); + tokio::spawn(async move { + let _ = http1::Builder::new() + .keep_alive(true) + .serve_connection( + io, + service_fn(|req: Request| async move { + let method = req.method().to_string(); + let uri = req.uri().to_string(); + Response::builder() + .status(200) + .header("x-method", method) + .header("x-uri", uri) + .body( + Empty::::new() + .map_err(|_| unreachable!()) + .boxed(), + ) + }), + ) + .await; + }); + } + }); + + Ok((addr, handle)) +} + +/// Start a local HTTP/1.1 server that counts TCP connections. +/// Returns the server address, connection counter, and a `tokio::task::JoinHandle`. +async fn start_counting_echo_server() -> Result<(SocketAddr, Arc, tokio::task::JoinHandle<()>)> { + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await?; + let addr = listener.local_addr()?; + let conn_count = Arc::new(AtomicUsize::new(0)); + let conn_count_clone = conn_count.clone(); + + let handle = tokio::spawn(async move { + loop { + let Ok((stream, _)) = listener.accept().await else { + break; + }; + conn_count_clone.fetch_add(1, Ordering::SeqCst); + let io = TokioIo::new(stream); + tokio::spawn(async move { + let _ = http1::Builder::new() + .keep_alive(true) + .serve_connection( + io, + service_fn(|req: Request| async move { + let method = req.method().to_string(); + let uri = req.uri().to_string(); + Response::builder() + .status(200) + .header("x-method", method) + .header("x-uri", uri) + .body( + Empty::::new() + .map_err(|_| unreachable!()) + .boxed(), + ) + }), + ) + .await; + }); + } + }); + + Ok((addr, conn_count, handle)) +} + +// --------------------------------------------------------------------------- +// Pool construction +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test)] +async fn pool_default_config_constructs() { + let pool = HttpConnectionPool::new(HttpConnectionPoolConfig::default()); + // Pool should be cloneable (Arc-based internally). + let _pool2 = pool.clone(); +} + +#[test_log::test(tokio::test)] +async fn pool_custom_config_constructs() { + let config = HttpConnectionPoolConfig { + max_idle_per_host: 2, + idle_timeout: Duration::from_secs(10), + connect_timeout: Duration::from_secs(1), + }; + let pool = HttpConnectionPool::new(config); + let _pool2 = pool.clone(); +} + +// --------------------------------------------------------------------------- +// Successful requests — non-pooled +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn non_pooled_request_succeeds() -> Result<()> { + let (addr, server) = start_echo_server().await?; + + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri(format!("http://localhost:{}/hello", addr.port())) + .body(empty_body()) + .unwrap(); + + let future = default_send_request(request, fast_config(false)); + let resp = resolve(future).await?.expect("request should succeed"); + assert_eq!(resp.resp.status(), StatusCode::OK); + assert_eq!( + resp.resp.headers().get("x-uri").unwrap().to_str().unwrap(), + "/hello" + ); + + server.abort(); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Successful requests — pooled +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn pooled_request_succeeds() -> Result<()> { + let (addr, server) = start_echo_server().await?; + let pool = HttpConnectionPool::new(HttpConnectionPoolConfig::default()); + + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri(format!("http://localhost:{}/pooled", addr.port())) + .body(empty_body()) + .unwrap(); + + let future = default_send_request_with_pool(request, fast_config(false), Some(pool.clone())); + let resp = resolve(future).await?.expect("request should succeed"); + assert_eq!(resp.resp.status(), StatusCode::OK); + assert_eq!( + resp.resp.headers().get("x-uri").unwrap().to_str().unwrap(), + "/pooled" + ); + + server.abort(); + Ok(()) +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn pooled_connection_reuse() -> Result<()> { + // Verify that two sequential requests through the same pool reuse the + // same TCP connection (keep-alive). We use a server-side connection + // counter to assert this. + let (addr, conn_count, server) = start_counting_echo_server().await?; + let pool = HttpConnectionPool::new(HttpConnectionPoolConfig::default()); + + for path in &["/first", "/second"] { + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri(format!("http://localhost:{}{}", addr.port(), path)) + .body(empty_body()) + .unwrap(); + + let future = + default_send_request_with_pool(request, fast_config(false), Some(pool.clone())); + let resp = resolve(future).await?.expect("request should succeed"); + assert_eq!(resp.resp.status(), StatusCode::OK); + assert_eq!( + resp.resp.headers().get("x-uri").unwrap().to_str().unwrap(), + *path + ); + // Drain the response body so the connection is returned to the pool + // for reuse (HTTP/1.1 keep-alive requires the body to be consumed). + let _ = resp.resp.into_body().collect().await; + } + + // With connection pooling and keep-alive, both requests should have + // used the same TCP connection. + let connections = conn_count.load(Ordering::SeqCst); + assert_eq!( + connections, 1, + "expected 1 TCP connection (reuse), got {connections}" + ); + + server.abort(); + Ok(()) +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn pool_none_falls_back_to_non_pooled() -> Result<()> { + // Passing None for the pool should still work (falls back to direct connect). + let (addr, server) = start_echo_server().await?; + + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri(format!("http://localhost:{}/fallback", addr.port())) + .body(empty_body()) + .unwrap(); + + let future = default_send_request_with_pool(request, fast_config(false), None); + let resp = resolve(future).await?.expect("request should succeed"); + assert_eq!(resp.resp.status(), StatusCode::OK); + + server.abort(); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Error classification — connection refused (non-pooled) +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn non_pooled_connection_refused() -> Result<()> { + // Bind a port, get the addr, then drop the listener so nothing is listening. + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await?; + let addr = listener.local_addr()?; + drop(listener); + + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri(format!("http://localhost:{}/refused", addr.port())) + .body(empty_body()) + .unwrap(); + + let future = default_send_request(request, fast_config(false)); + let result = resolve(future).await?; + match result { + Err(ErrorCode::ConnectionRefused) => {} // expected + other => panic!("expected ConnectionRefused, got: {other:?}"), + } + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Error classification — connection refused (pooled) +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn pooled_connection_refused() -> Result<()> { + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await?; + let addr = listener.local_addr()?; + drop(listener); + + let pool = HttpConnectionPool::new(HttpConnectionPoolConfig::default()); + + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri(format!("http://localhost:{}/refused", addr.port())) + .body(empty_body()) + .unwrap(); + + let future = default_send_request_with_pool(request, fast_config(false), Some(pool)); + let result = resolve(future).await?; + match result { + Err(ErrorCode::ConnectionRefused) => {} // expected + Err(ErrorCode::DestinationUnavailable) => {} // may occur on some platforms + other => panic!("expected ConnectionRefused or DestinationUnavailable, got: {other:?}"), + } + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Error classification — connection timeout (non-pooled) +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn non_pooled_connection_timeout() -> Result<()> { + // Bind but never accept — the TCP handshake will eventually time out. + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await?; + let addr = listener.local_addr()?; + // Keep listener alive but never accept. + + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri(format!("http://localhost:{}/timeout", addr.port())) + .body(empty_body()) + .unwrap(); + + let config = OutgoingRequestConfig { + use_tls: false, + // Very short connect timeout to trigger quickly. + // Note: on localhost, TCP connect to a bound-but-not-accepted port + // may actually succeed at the TCP level (the kernel accepts the SYN + // into the backlog). So this test might see a *first_byte_timeout* + // instead. We use a non-routable address to ensure a real timeout. + connect_timeout: Duration::from_millis(200), + first_byte_timeout: Duration::from_millis(200), + between_bytes_timeout: Duration::from_secs(5), + }; + + // Use a non-routable IP to guarantee a connect timeout + let request_timeout = hyper::Request::builder() + .method(http::Method::GET) + .uri("http://192.0.2.1:12345/timeout") // TEST-NET-1, non-routable + .body(empty_body()) + .unwrap(); + + let future = default_send_request(request_timeout, config); + let result = resolve(future).await?; + match result { + Err(ErrorCode::ConnectionTimeout) => {} + // On some systems, non-routable addresses may be immediately rejected + Err(ErrorCode::ConnectionRefused) => {} + Err(ErrorCode::DestinationUnavailable) => {} + other => panic!("expected ConnectionTimeout, ConnectionRefused, or DestinationUnavailable, got: {other:?}"), + } + + drop(listener); + drop(request); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Error classification — connection timeout (pooled) +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn pooled_connection_timeout() -> Result<()> { + let pool = HttpConnectionPool::new(HttpConnectionPoolConfig { + connect_timeout: Duration::from_millis(200), + ..Default::default() + }); + + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri("http://192.0.2.1:12345/timeout") // TEST-NET-1, non-routable + .body(empty_body()) + .unwrap(); + + let config = OutgoingRequestConfig { + use_tls: false, + connect_timeout: Duration::from_millis(200), + first_byte_timeout: Duration::from_millis(500), + between_bytes_timeout: Duration::from_secs(5), + }; + + let future = default_send_request_with_pool(request, config, Some(pool)); + let result = resolve(future).await?; + match result { + Err(ErrorCode::ConnectionTimeout) => {} + // On some systems or depending on hyper-util wrapping + Err(ErrorCode::DestinationUnavailable) => {} + Err(ErrorCode::ConnectionRefused) => {} + other => panic!("expected ConnectionTimeout, DestinationUnavailable, or ConnectionRefused, got: {other:?}"), + } + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Error classification — DNS / invalid host +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn non_pooled_dns_error() -> Result<()> { + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri("http://this-host-does-not-exist.invalid:8080/dns") + .body(empty_body()) + .unwrap(); + + let future = default_send_request(request, fast_config(false)); + let result = resolve(future).await?; + match result { + Err(ErrorCode::DnsError(_)) => {} // expected + Err(ErrorCode::ConnectionRefused) => {} // also acceptable on some platforms + other => panic!("expected DnsError or ConnectionRefused, got: {other:?}"), + } + + Ok(()) +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn pooled_dns_error() -> Result<()> { + let pool = HttpConnectionPool::new(HttpConnectionPoolConfig::default()); + + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri("http://this-host-does-not-exist.invalid:8080/dns") + .body(empty_body()) + .unwrap(); + + let future = default_send_request_with_pool(request, fast_config(false), Some(pool)); + let result = resolve(future).await?; + match result { + Err(ErrorCode::DnsError(_)) => {} // expected + Err(ErrorCode::DestinationUnavailable) => {} // pooled path may report this + Err(ErrorCode::ConnectionRefused) => {} // also acceptable on some platforms + other => panic!( + "expected DnsError, DestinationUnavailable, or ConnectionRefused, got: {other:?}" + ), + } + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Error classification — TLS to a plain HTTP server (non-pooled) +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn non_pooled_tls_to_plain_http() -> Result<()> { + let (addr, server) = start_echo_server().await?; + + // Try to connect with TLS to a plain HTTP server — should fail with a TLS error. + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri(format!("https://localhost:{}/tls-fail", addr.port())) + .body(empty_body()) + .unwrap(); + + let future = default_send_request(request, fast_config(true)); + let result = resolve(future).await?; + match result { + Err(ErrorCode::TlsProtocolError) => {} + Err(ErrorCode::TlsCertificateError) => {} + Err(ErrorCode::TlsAlertReceived(_)) => {} + Err(ErrorCode::ConnectionTerminated) => {} + other => panic!("expected a TLS error variant, got: {other:?}"), + } + + server.abort(); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Error classification — TLS to a plain HTTP server (pooled) +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn pooled_tls_to_plain_http() -> Result<()> { + let (addr, server) = start_echo_server().await?; + let pool = HttpConnectionPool::new(HttpConnectionPoolConfig::default()); + + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri(format!("https://localhost:{}/tls-fail", addr.port())) + .body(empty_body()) + .unwrap(); + + let future = default_send_request_with_pool(request, fast_config(true), Some(pool)); + let result = resolve(future).await?; + match result { + Err(ErrorCode::TlsProtocolError) => {} + Err(ErrorCode::TlsCertificateError) => {} + Err(ErrorCode::TlsAlertReceived(_)) => {} + Err(ErrorCode::ConnectionTerminated) => {} + Err(ErrorCode::DestinationUnavailable) => {} + other => panic!("expected a TLS error variant or DestinationUnavailable, got: {other:?}"), + } + + server.abort(); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Error classification — missing URI authority +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn non_pooled_missing_authority() -> Result<()> { + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri("/no-authority") + .body(empty_body()) + .unwrap(); + + let future = default_send_request(request, fast_config(false)); + let result = resolve(future).await?; + match result { + Err(ErrorCode::HttpRequestUriInvalid) => {} // expected + other => panic!("expected HttpRequestUriInvalid, got: {other:?}"), + } + + Ok(()) +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn pooled_missing_authority() -> Result<()> { + let pool = HttpConnectionPool::new(HttpConnectionPoolConfig::default()); + + let request = hyper::Request::builder() + .method(http::Method::GET) + .uri("/no-authority") + .body(empty_body()) + .unwrap(); + + let future = default_send_request_with_pool(request, fast_config(false), Some(pool)); + let result = resolve(future).await?; + match result { + // The pooled path requires an absolute URI (scheme + authority). + // A relative URI like "/no-authority" is rejected as invalid. + Err(ErrorCode::HttpRequestUriInvalid) => {} + other => panic!("expected HttpRequestUriInvalid, got: {other:?}"), + } + + Ok(()) +} + +// --------------------------------------------------------------------------- +// WasiHttpCtx pool wiring +// --------------------------------------------------------------------------- + +#[test_log::test(tokio::test)] +async fn wasi_http_ctx_pool_default_is_none() { + let ctx = wasmtime_wasi_http::WasiHttpCtx::new(); + assert!(ctx.connection_pool.is_none()); +} + +#[test_log::test(tokio::test)] +async fn wasi_http_ctx_pool_can_be_set() { + let mut ctx = wasmtime_wasi_http::WasiHttpCtx::new(); + let pool = HttpConnectionPool::new(HttpConnectionPoolConfig::default()); + ctx.connection_pool = Some(pool); + assert!(ctx.connection_pool.is_some()); +} From ad84a4a520e6d732b0b0043527da2792053d1547 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Tue, 10 Mar 2026 11:46:33 +0100 Subject: [PATCH 4/5] check_write made async --- crates/wasi-io/src/bindings.rs | 1 + crates/wasi-io/src/impls.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/wasi-io/src/bindings.rs b/crates/wasi-io/src/bindings.rs index c472ff968eac..f1fd387cc68e 100644 --- a/crates/wasi-io/src/bindings.rs +++ b/crates/wasi-io/src/bindings.rs @@ -15,6 +15,7 @@ wasmtime::component::bindgen!({ "wasi:io/streams.[method]input-stream.blocking-read": async | trappable | tracing, "wasi:io/streams.[method]input-stream.blocking-skip": async | trappable | tracing, "wasi:io/streams.[drop]input-stream": async | trappable | tracing, + "wasi:io/streams.[method]output-stream.check-write": async | trappable | tracing, "wasi:io/streams.[method]output-stream.flush": async | trappable | tracing, "wasi:io/streams.[method]output-stream.write": async | trappable | tracing, "wasi:io/streams.[method]output-stream.write-zeroes": async | trappable | tracing, diff --git a/crates/wasi-io/src/impls.rs b/crates/wasi-io/src/impls.rs index 2d2758ccca83..8f1dc7305163 100644 --- a/crates/wasi-io/src/impls.rs +++ b/crates/wasi-io/src/impls.rs @@ -174,7 +174,7 @@ impl streams::HostOutputStream for ResourceTable { Ok(()) } - fn check_write(&mut self, stream: Resource) -> StreamResult { + async fn check_write(&mut self, stream: Resource) -> StreamResult { let bytes = self.get_mut(&stream)?.check_write()?; Ok(bytes as u64) } From 538477246f61d5c32bd826991b5eed4a8fe716b7 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Tue, 10 Mar 2026 11:50:35 +0100 Subject: [PATCH 5/5] Updated fork description --- golem-fork.md | 185 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 172 insertions(+), 13 deletions(-) diff --git a/golem-fork.md b/golem-fork.md index 374810835106..a283d27c8e7f 100644 --- a/golem-fork.md +++ b/golem-fork.md @@ -35,7 +35,7 @@ The key themes are: | **Suspend support** | The `wasi:io/poll` implementation must be able to signal the host that a worker should be **suspended** instead of blocking, when all polled resources support it and the wait would exceed a configurable threshold. | | **Async host functions** | Many previously-synchronous WASI host functions are converted to `async` so the durable executor can **intercept and replay** them. If upstream makes additional functions async in a newer version, that is fine — Golem will adapt. But any function that our fork makes async **must remain async** in the new fork. | | **Stream downcasting** | `InputStream` and `OutputStream` gain `Any` supertrait and `as_any()` so Golem can inspect the **concrete type** of a stream at runtime to make durability decisions. | -| **Durable HTTP** | HTTP outgoing requests can be **deferred** and failing response bodies can be constructed, enabling Golem's durable HTTP connection support. | +| **Durable HTTP** | HTTP outgoing requests can be **deferred** and failing response bodies can be constructed, enabling Golem's durable HTTP connection support. Connection pooling with per-host and global concurrency limits is available. Connection worker errors are propagated to the body stream. Error classification maps TLS and connection failures to specific WASI `ErrorCode` variants. | | **Filesystem path tracking** | `File` / `Dir` descriptors store the host filesystem path so the durable executor can persist and restore file system state. | --- @@ -107,6 +107,11 @@ must be reflected in Golem): | `HostIncomingBody::failing(error)` | `wasmtime-wasi-http` | Reconstructing failed HTTP bodies during replay | | `HostIncomingBody::take_stream() -> Option>` | `wasmtime-wasi-http` | HTTP body stream handling | | `get_fields()` (pub) | `wasmtime-wasi-http` | Trailer serialization for oplog | +| `HttpConnectionPool`, `HttpConnectionPoolConfig` | `wasmtime-wasi-http` | Connection pooling with concurrency limits | +| `default_send_request_with_pool()` | `wasmtime-wasi-http` | Pooled HTTP request dispatch | +| `WasiHttpView::connection_pool()` | `wasmtime-wasi-http` | Pool access from host view trait | +| `WasiHttpCtx::connection_pool` (pub field) | `wasmtime-wasi-http` | Pool storage on HTTP context | +| `IncomingResponse::worker_error_receiver` | `wasmtime-wasi-http` | Connection worker error propagation | | `File { pub path: PathBuf }`, `Dir { pub path: PathBuf }` | `wasmtime-wasi` | Durable `stat`, read-only enforcement | | `ReaddirIterator::new()` (pub) | `wasmtime-wasi` | Deterministic directory listing | | `ResourceTable::get_any()` (immutable) | `wasmtime` | Override-following logic (internal) | @@ -225,7 +230,8 @@ a dynamic override mechanism is added: - **Override-following logic** (`crates/wasi-io/src/impls.rs`): A new function `get_pollable_following_overrides()` follows the chain of overrides until it finds - a pollable without one. Called in `poll()`, `block()`, and `ready()`. + a pollable without one (with a `MAX_POLLABLE_OVERRIDE_CHAIN` depth limit of 64 to + guard against infinite loops). Called in `poll()`, `block()`, and `ready()`. - **`ResourceTable::get_any()`** (`crates/wasmtime/src/runtime/component/resource_table.rs`): New immutable accessor (counterpart to existing `get_any_mut()`) needed by the @@ -350,6 +356,7 @@ The `bindgen!` macro invocations are modified to mark more imports as async: ``` [method]input-stream.read [method]input-stream.skip +[method]output-stream.check-write [method]output-stream.flush [method]output-stream.write [method]output-stream.write-zeroes @@ -403,7 +410,7 @@ The corresponding host implementations change from `fn` to `async fn`: - `resolve_addresses()` #### IO streams (`crates/wasi-io/src/impls.rs`): -- `HostOutputStream`: `write()`, `write_zeroes()`, `flush()`, `splice()` +- `HostOutputStream`: `check_write()`, `write()`, `write_zeroes()`, `flush()`, `splice()` - `HostInputStream`: `read()`, `skip()` #### Sync IO wrappers (`crates/wasi/src/p2/host/io.rs`): @@ -505,13 +512,143 @@ executor to defer request execution until the response is actually needed. - `HostFutureTrailers::ready()` handles the `Failing` state by producing `ErrorCode::ConnectionTerminated`. -### 7.4 Exposed HTTP internals +### 7.4 HTTP connection pooling + +**Files:** `crates/wasi-http/src/types.rs`, `crates/wasi-http/src/lib.rs`, `crates/wasi-http/Cargo.toml` + +A new connection pooling layer for outgoing HTTP requests, backed by `hyper-util`'s +`Client` with keep-alive and `hyper-rustls` for TLS: + +- **`HttpConnectionPoolConfig`** — configuration struct: + ```rust + pub struct HttpConnectionPoolConfig { + pub max_idle_per_host: usize, // default: 32 + pub idle_timeout: Duration, // default: 90s + pub connect_timeout: Duration, // default: 30s + } + ``` + +- **`HttpConnectionPool`** — `Clone`-able pool (Arc-based internally): + ```rust + pub struct HttpConnectionPool { + client: Client<..., HyperOutgoingBody>, + global_semaphore: Arc, + max_connections_per_host: usize, + // per-host semaphores, host entry limits... + } + ``` + + With per-host (`max_connections_per_host`, default 16) and global + (`max_total_connections`, default 128) concurrency limiting via Tokio semaphores. + Per-host semaphores are acquired first, then global, to avoid global permit + hoarding. + +- **`WasiHttpView::connection_pool()`** — new trait method (gated on + `default-send-request` feature): + ```rust + fn connection_pool(&self) -> Option<&HttpConnectionPool> { None } + ``` + +- **`WasiHttpCtx::connection_pool`** — optional field on the HTTP context: + ```rust + pub struct WasiHttpCtx { + // ... existing fields ... + pub connection_pool: Option, + } + ``` + +- **`default_send_request_with_pool()`** — new function that dispatches to either + `pooled_send_request_handler()` or `default_send_request_handler()` depending on + whether a pool is provided. The `WasiHttpView::send_request()` default + implementation now calls `default_send_request_with_pool()`. + +- **`ConnectionPermits`** struct — holds `OwnedSemaphorePermit` for host and global + semaphores. Carried through `IncomingResponse` → `HostIncomingBody` and released + when the response body is fully consumed/dropped. + +- **New dependencies** (optional, gated on `default-send-request`): + `hyper-util` (client-legacy, http1, tokio) and `hyper-rustls` (http1, webpki-roots). + +### 7.5 Improved HTTP error classification + +**File:** `crates/wasi-http/src/types.rs` + +Both the non-pooled (`default_send_request_handler`) and pooled +(`pooled_send_request_handler`) paths now map connection and TLS errors to specific +WASI `ErrorCode` variants instead of the previous catch-all `TlsProtocolError`: + +| Error condition | `ErrorCode` variant | +|---|---| +| `ConnectionRefused` | `ConnectionRefused` | +| `ConnectionReset` / `ConnectionAborted` / `BrokenPipe` / `UnexpectedEof` | `ConnectionTerminated` | +| `TimedOut` | `ConnectionTimeout` | +| `rustls::Error::InvalidCertificate` | `TlsCertificateError` | +| `rustls::Error::AlertReceived(alert)` | `TlsAlertReceived { alert_id, alert_message }` | +| DNS failure (pooled path) | `DnsError` or `DestinationUnavailable` | + +Helper functions `find_rustls_error()`, `find_io_error()`, and `find_elapsed_error()` +walk the error source chain (with a `MAX_ERROR_CHAIN_DEPTH` guard of 32) to extract +typed errors from hyper/hyper-util's nested error wrappers. + +### 7.6 Connection worker error propagation + +**Files:** `crates/wasi-http/src/types.rs`, `crates/wasi-http/src/body.rs` + +Connection worker errors are no longer silently discarded (fixing the upstream TODO). +Instead: + +- A `watch::channel>>` is created alongside the + connection worker task. If the hyper connection driver fails, the error is sent + through the channel. +- `IncomingResponse` gains a `worker_error_receiver: Option` + field. +- `HostIncomingBody::retain_worker()` accepts an optional + `ConnWorkerErrorReceiver` and propagates it to the `BodyWithTimeout`. +- `BodyWithTimeout::poll_frame()` checks the error receiver at end-of-stream (EOF + or trailers). If the connection worker reported an error, it surfaces it as a + body frame error instead of silently succeeding. + +### 7.7 Exposed HTTP internals - `get_fields()` in `crates/wasi-http/src/types_impl.rs` made `pub` and re-exported from `crates/wasi-http/src/lib.rs` as `pub use crate::types_impl::get_fields;`. - `OutgoingRequestConfig` (`crates/wasi-http/src/types.rs`) gains `#[derive(Debug)]`. +- `WasiHttpCtx::field_size_limit` made `pub` (was accessed only through setter). + +### 7.8 HTTP request validation fixes + +**File:** `crates/wasi-http/src/http_impl.rs` + +- **`request-options` error propagation**: Invalid `RequestOptions` resource handles + are no longer silently ignored. Changed from + `options.and_then(|opts| self.table().get(&opts).ok())` to + `options.map(|opts| self.table().get(&opts)).transpose()?`, which correctly traps + on invalid/dead resources per component-model semantics. + +- **Empty authority rejection**: Requests with a missing or empty authority are now + rejected with `ErrorCode::HttpRequestUriInvalid` instead of producing an invalid + URI with an empty authority string. + +### 7.9 Body stream EOF signaling fix + +**File:** `crates/wasi-http/src/body.rs` + +When a body stream reaches EOF without trailers, the `tx` oneshot sender now +explicitly sends `StreamEnd::Trailers(None)` instead of being silently dropped. +This makes the intent clear and avoids relying on the implicit `Err` from a dropped +`oneshot::Sender`. + +### 7.10 `OutgoingBody::finish` table-delete ordering fix + +**File:** `crates/wasi-http/src/types_impl.rs` + +The ordering of operations in `OutgoingBody::finish()` was fixed: trailers are now +moved/computed before the body is deleted from the resource table. Previously, the +body was deleted first, meaning if `move_fields()` failed, the body would be dropped +without calling `finish()` or `abort()`, potentially hanging the underlying body +future. -### 7.5 How Golem uses HTTP durability +### 7.11 How Golem uses HTTP durability Golem's durable HTTP layer lives in `golem-worker-executor/src/durable_host/http/`. The full lifecycle: @@ -739,7 +876,12 @@ major feature group to catch issues early. regenerated bindings are consistent. 12. **wasi-http durability** — Inline bindings (async-only), add `Deferred` variant, `failing()` body, `FailingStream`, boxed `take_stream()`, `get_fields` pub, `Debug` - on `OutgoingRequestConfig`. + on `OutgoingRequestConfig`. Add connection pooling (`HttpConnectionPool`, + `HttpConnectionPoolConfig`, `default_send_request_with_pool()`, `connection_pool()` + trait method, `hyper-util`/`hyper-rustls` dependencies). Add error classification + (TLS/connection error mapping), worker error propagation (`watch` channel), + request validation fixes (authority, options), body EOF signaling fix, and + `OutgoingBody::finish` ordering fix. 13. **Filesystem path tracking** — Add `path: PathBuf` to `File`/`Dir`, propagate through constructors, `preopened_dir()`, and `open_at`. 14. **Misc** — `VERSION` constant, re-exports in `wasmtime-wasi`, `ReaddirIterator` pub. @@ -802,11 +944,12 @@ files in newer versions. - `crates/wasi/src/p2/ip_name_lookup.rs` — async conversion, `subscribe()` call site update **Modified files — `crates/wasi-http/` (wasmtime-wasi-http crate):** -- `crates/wasi-http/src/lib.rs` — inlined async-only bindings, removed sync linker, `get_fields` re-export -- `crates/wasi-http/src/types.rs` — `Deferred` variant, `deferred()` constructor, `Debug` on config, `io_ctx()` on `WasiHttpImpl` -- `crates/wasi-http/src/types_impl.rs` — async conversions, deferred request execution, `get_fields` pub, `subscribe()` call site update -- `crates/wasi-http/src/http_impl.rs` — `handle()` async conversion -- `crates/wasi-http/src/body.rs` — `failing()` constructor, `FailingStream`, boxed `take_stream()`, `as_any()` impls +- `crates/wasi-http/Cargo.toml` — added `hyper-util`, `hyper-rustls` optional dependencies for connection pooling +- `crates/wasi-http/src/lib.rs` — inlined async-only bindings, removed sync linker, `get_fields` re-export, `HttpConnectionPool`/`HttpConnectionPoolConfig` re-exports +- `crates/wasi-http/src/types.rs` — `Deferred` variant, `deferred()` constructor, `Debug` on config, `io_ctx()` on `WasiHttpImpl`, `HttpConnectionPool`, `HttpConnectionPoolConfig`, `default_send_request_with_pool()`, `pooled_send_request_handler()`, `ConnectionPermits`, `ConnWorkerErrorReceiver`, error classification helpers, `WasiHttpView::connection_pool()`, `WasiHttpCtx::connection_pool` field, worker error propagation +- `crates/wasi-http/src/types_impl.rs` — async conversions, deferred request execution, `get_fields` pub, `subscribe()` call site update, `OutgoingBody::finish` ordering fix, `retain_worker()` error receiver, `retain_connection_permits()` +- `crates/wasi-http/src/http_impl.rs` — `handle()` async conversion, request-options error propagation fix, empty authority rejection +- `crates/wasi-http/src/body.rs` — `failing()` constructor, `FailingStream`, boxed `take_stream()`, `as_any()` impls, `ConnWorkerErrorReceiver` on `HostIncomingBody`/`BodyWithTimeout`, EOF signaling fix, `retain_connection_permits()`, `retain_worker()` signature change **Modified files — `crates/wasmtime/` (wasmtime crate):** - `crates/wasmtime/src/lib.rs` — `VERSION` constant @@ -997,9 +1140,16 @@ The following crates pass `cargo check` successfully: | `InputStream: Any`, `as_any()` | `wasmtime-wasi-io` | ✅ Same | | `OutputStream: Any`, `as_any()` | `wasmtime-wasi-io` | ✅ Same | | `HostFutureIncomingResponse::deferred()` | `wasmtime-wasi-http` | ✅ Same | -| `HostIncomingBody::failing(error)` | `wasmtime-wasi-http` | ✅ Same | +| `HostIncomingBody::failing(error)` | `wasmtime-wasi-http` | ⚠️ `Failing` state uses `Arc` (was `String`) | | `HostIncomingBody::take_stream() -> Option>` | `wasmtime-wasi-http` | ✅ Same | | `get_fields()` (pub) | `wasmtime-wasi-http` | ✅ Same | +| `HttpConnectionPool`, `HttpConnectionPoolConfig` | `wasmtime-wasi-http` | 🆕 New connection pooling API | +| `default_send_request_with_pool()` | `wasmtime-wasi-http` | 🆕 New pooled request dispatch | +| `WasiHttpView::connection_pool()` | `wasmtime-wasi-http` | 🆕 New trait method (default returns `None`) | +| `WasiHttpCtx::connection_pool` (pub field) | `wasmtime-wasi-http` | 🆕 New optional field | +| `IncomingResponse::worker_error_receiver` | `wasmtime-wasi-http` | 🆕 New field for error propagation | +| `IncomingResponse::connection_permits` | `wasmtime-wasi-http` | 🆕 New field for concurrency permits | +| `HostOutputStream::check_write` (async) | `wasmtime-wasi-io` | 🆕 Newly made async | | `File { pub path: PathBuf }`, `Dir { pub path: PathBuf }` | `wasmtime-wasi` | ✅ Same (moved to `crates/wasi/src/filesystem.rs`) | | `ReaddirIterator::new()` (pub) | `wasmtime-wasi` | ✅ Same | | `ResourceTable::get_any()` (immutable) | `wasmtime` | ✅ Same | @@ -1018,4 +1168,13 @@ When upgrading Golem to use the v42 fork: 5. Update `set_suspend()` closure to return `wasmtime::Error` instead of `anyhow::Error`. 6. Verify HTTP interception code works with the new `send_request()` dispatch in `HostFutureIncomingResponse::Deferred` handling. -7. Build and run the Golem test suite. +7. Optionally configure `HttpConnectionPool` on `WasiHttpCtx` and implement + `WasiHttpView::connection_pool()` to enable connection reuse across requests. +8. Update `HostIncomingBody::retain_worker()` call sites to pass the new + `worker_error_receiver` parameter. +9. When constructing `IncomingResponse` manually (e.g. in `do_wasi_http_hash_all` + test or replay code), populate the new `worker_error_receiver` field (typically + `None` for synthetic responses). +10. Make `check_write` override async in `DurableWorkerCtx` if it is intercepted + for durability. +11. Build and run the Golem test suite.