diff --git a/.gitignore b/.gitignore index 2ea6b27..a0015fc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target /.idea +/.vscode /CLAUDE.md /AGENTS.md diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 0bd9065..c91541b 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -207,7 +207,7 @@ pub async fn main() { proxy_flusher, }); - let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); tokio::spawn(async move { let res = mini_agent .start_mini_agent(shutdown_rx, stats_concentrator.map(|c| c.service_handle)) diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index 65d8113..04c72bf 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -125,32 +125,22 @@ impl Config { })?; // Windows named pipe name for APM receiver. - // Normalize by adding \\.\pipe\ prefix if not present - let dd_apm_windows_pipe_name: Option = { - #[cfg(any(all(windows, feature = "windows-pipes"), test))] - { - env::var("DD_APM_WINDOWS_PIPE_NAME").ok().map(|pipe_name| { - if pipe_name.starts_with("\\\\.\\pipe\\") || pipe_name.starts_with(r"\\.\pipe\") - { - pipe_name - } else { - format!(r"\\.\pipe\{}", pipe_name) - } - }) - } - #[cfg(not(any(all(windows, feature = "windows-pipes"), test)))] - { - None - } - }; - let dd_apm_receiver_port: u16 = if dd_apm_windows_pipe_name.is_some() { - 0 // Override to 0 when using Windows named pipe - } else { - env::var("DD_APM_RECEIVER_PORT") - .ok() - .and_then(|port| port.parse::().ok()) - .unwrap_or(DEFAULT_APM_RECEIVER_PORT) - }; + // Normalize by adding \\.\pipe\ prefix if not present. + #[cfg(any(all(windows, feature = "windows-pipes"), test))] + let dd_apm_windows_pipe_name: Option = + env::var("DD_APM_WINDOWS_PIPE_NAME").ok().map(|pipe_name| { + if pipe_name.starts_with("\\\\.\\pipe\\") || pipe_name.starts_with(r"\\.\pipe\") { + pipe_name + } else { + format!(r"\\.\pipe\{}", pipe_name) + } + }); + // TCP listener always runs so legacy tracers without named-pipe support + // can still reach the agent when DD_APM_WINDOWS_PIPE_NAME is also set. + let dd_apm_receiver_port: u16 = env::var("DD_APM_RECEIVER_PORT") + .ok() + .and_then(|port| port.parse::().ok()) + .unwrap_or(DEFAULT_APM_RECEIVER_PORT); let dd_dogstatsd_windows_pipe_name: Option = { #[cfg(any(all(windows, feature = "windows-pipes"), test))] @@ -425,8 +415,12 @@ mod tests { Some(r"\\.\pipe\test_pipe".to_string()) ); - // Port should be overridden to 0 when pipe is set - assert_eq!(config.dd_apm_receiver_port, 0); + // TCP listener still runs at the default port for legacy + // tracers; pipe configuration adds a transport, doesn't replace. + assert_eq!( + config.dd_apm_receiver_port, + super::DEFAULT_APM_RECEIVER_PORT + ); }, ); } diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index bbc27f7..2109532 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -33,6 +33,10 @@ const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; const STATS_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; const PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; +/// JoinHandle type for a transport accept loop (TCP or named pipe). +type TransportHandle = + tokio::task::JoinHandle>>; + pub struct MiniAgent { pub config: Arc, pub trace_processor: Arc, @@ -46,7 +50,7 @@ pub struct MiniAgent { impl MiniAgent { pub async fn start_mini_agent( &self, - shutdown_rx: tokio::sync::oneshot::Receiver<()>, + shutdown_rx: tokio::sync::watch::Receiver, stats_concentrator_service_handle: Option>, ) -> Result<(), Box> { let now = Instant::now(); @@ -86,9 +90,9 @@ impl MiniAgent { Receiver, ) = mpsc::channel(STATS_PAYLOAD_CHANNEL_BUFFER_SIZE); - // Create a separate shutdown channel for the stats flusher so that serve_tcp - // can drain all in-flight HTTP handlers before triggering the final flush, - // preventing AddChunk/ClientStatsPayload messages from being missed. + // Separate shutdown channel for the stats flusher: serve() drains all + // in-flight HTTP handlers before triggering the final flush, preventing + // AddChunk/ClientStatsPayload messages from being missed. let (flusher_shutdown_tx, flusher_shutdown_rx) = oneshot::channel::<()>(); // start our stats flusher. @@ -138,78 +142,202 @@ impl MiniAgent { ) }); - // Determine which transport to use based on configuration - #[cfg(any(all(windows, feature = "windows-pipes"), test))] - let pipe_name_opt = self.config.dd_apm_windows_pipe_name.as_ref(); - #[cfg(not(any(all(windows, feature = "windows-pipes"), test)))] - let pipe_name_opt: Option<&String> = None; - - if let Some(pipe_name) = pipe_name_opt { - debug!("Mini Agent started: listening on named pipe {}", pipe_name); - } else { - debug!( - "Mini Agent started: listening on port {}", - self.config.dd_apm_receiver_port - ); - } debug!( "Time taken to start the Mini Agent: {} ms", now.elapsed().as_millis() ); - if let Some(pipe_name) = pipe_name_opt { - // Windows named pipe transport + // TCP listener is always started; legacy tracers without named-pipe + // support reach the agent here even when a pipe is also configured. + let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port)); + let tcp_listener = tokio::net::TcpListener::bind(&addr).await?; + debug!( + "Mini Agent listening on TCP port {}", + self.config.dd_apm_receiver_port + ); + + // Named pipe is only spawned on Windows with the feature enabled and a + // pipe name configured. On all other builds, the pipe code is absent + // entirely (no symbols, no select arm, no cost). + #[cfg(all(windows, feature = "windows-pipes"))] + let pipe_handle = self + .config + .dd_apm_windows_pipe_name + .as_ref() + .map(|pipe_name| { + debug!("Mini Agent also listening on named pipe {}", pipe_name); + let pipe_service = service.clone(); + let pipe_shutdown_rx = shutdown_rx.clone(); + tokio::spawn(Self::serve_accept_loop_named_pipe( + pipe_name.clone(), + pipe_service, + pipe_shutdown_rx, + )) + }); + + let tcp_shutdown_rx = shutdown_rx.clone(); + let tcp_handle = tokio::spawn(Self::serve_accept_loop_tcp( + tcp_listener, + service, + tcp_shutdown_rx, + )); + + Self::serve( + tcp_handle, #[cfg(all(windows, feature = "windows-pipes"))] - { - Self::serve_named_pipe( - pipe_name, - service, - trace_flusher_handle, - stats_flusher_handle, - stats_concentrator_service_handle, - shutdown_rx, - flusher_shutdown_tx, - ) - .await?; + pipe_handle, + trace_flusher_handle, + stats_flusher_handle, + stats_concentrator_service_handle, + shutdown_rx, + flusher_shutdown_tx, + ) + .await + } + + /// Supervises the long-lived tasks. Any task dying unexpectedly is fatal: + /// a tracer picks one transport at startup and stays on it, so silently + /// dropping a transport can strand the tracer. Handle shutdowns. + #[allow(clippy::too_many_arguments)] + async fn serve( + mut tcp_handle: TransportHandle, + #[cfg(all(windows, feature = "windows-pipes"))] mut pipe_handle: Option, + mut trace_flusher_handle: tokio::task::JoinHandle<()>, + mut stats_flusher_handle: tokio::task::JoinHandle<()>, + mut stats_concentrator_service_handle: Option>, + mut shutdown_rx: tokio::sync::watch::Receiver, + flusher_shutdown_tx: oneshot::Sender<()>, + ) -> Result<(), Box> { + // task supervision cases: we react if... + enum Event { + TcpDied(String), + #[cfg(all(windows, feature = "windows-pipes"))] + PipeDied(String), + TraceFlusherDied(String), + StatsFlusherDied(String), + ConcentratorDied(String), + Shutdown, + } + + // Gated cases of tokio:select, handling named pipe behavior. + // tokio::pin! used to let us take &mut from exit futures + // so they can be polled across select! iterations and then + // awaited again in the shutdown branch. + #[cfg(all(windows, feature = "windows-pipes"))] + let event = { + let pipe_exit = async { + match pipe_handle.as_mut() { + Some(h) => h.await, + None => std::future::pending().await, + } + }; + tokio::pin!(pipe_exit); + let concentrator_exit = async { + match stats_concentrator_service_handle.as_mut() { + Some(h) => h.await, + None => std::future::pending().await, + } + }; + tokio::pin!(concentrator_exit); + + tokio::select! { + r = &mut tcp_handle => Event::TcpDied(format!("{r:?}")), + r = &mut pipe_exit => Event::PipeDied(format!("{r:?}")), + r = &mut trace_flusher_handle => Event::TraceFlusherDied(format!("{r:?}")), + r = &mut stats_flusher_handle => Event::StatsFlusherDied(format!("{r:?}")), + r = &mut concentrator_exit => Event::ConcentratorDied(format!("{r:?}")), + _ = shutdown_rx.changed() => Event::Shutdown, } - #[cfg(not(all(windows, feature = "windows-pipes")))] - { - let _ = pipe_name; // Suppress unused variable warning - unreachable!( - "Named pipes are only supported on Windows with the windows-pipes feature \ - enabled, cannot use pipe: {}.", - pipe_name - ); + }; + #[cfg(not(all(windows, feature = "windows-pipes")))] + let event = { + let concentrator_exit = async { + match stats_concentrator_service_handle.as_mut() { + Some(h) => h.await, + None => std::future::pending().await, + } + }; + tokio::pin!(concentrator_exit); + + tokio::select! { + r = &mut tcp_handle => Event::TcpDied(format!("{r:?}")), + r = &mut trace_flusher_handle => Event::TraceFlusherDied(format!("{r:?}")), + r = &mut stats_flusher_handle => Event::StatsFlusherDied(format!("{r:?}")), + r = &mut concentrator_exit => Event::ConcentratorDied(format!("{r:?}")), + _ = shutdown_rx.changed() => Event::Shutdown, } - } else { - // TCP transport - let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port)); - let listener = tokio::net::TcpListener::bind(&addr).await?; - - Self::serve_tcp( - listener, - service, - trace_flusher_handle, - stats_flusher_handle, - stats_concentrator_service_handle, - shutdown_rx, - flusher_shutdown_tx, - ) - .await?; + }; + + let result: Result<(), Box> = match event { + Event::Shutdown => { + // The same shutdown_rx fan-out has already fired in each + // accept loop concurrently with our select arm here. + // Awaiting the transport handles waits for them to drain. + if let Err(e) = (&mut tcp_handle).await { + error!("TCP accept loop failed during shutdown: {e:?}"); + } + #[cfg(all(windows, feature = "windows-pipes"))] + if let Some(h) = pipe_handle.as_mut() { + if let Err(e) = h.await { + error!("Named pipe accept loop failed during shutdown: {e:?}"); + } + } + // Now all handlers have written to the channels. Force-flush + // the stats flusher. + let _ = flusher_shutdown_tx.send(()); + match (&mut stats_flusher_handle).await { + Ok(()) => Ok(()), + Err(e) => { + Err(format!("Stats flusher task failed during shutdown: {e:?}").into()) + } + } + } + Event::TcpDied(s) => { + error!("TCP accept loop died: {s}"); + Err("TCP accept loop terminated unexpectedly".into()) + } + #[cfg(all(windows, feature = "windows-pipes"))] + Event::PipeDied(s) => { + error!("Named pipe accept loop died: {s}"); + Err("Named pipe accept loop terminated unexpectedly".into()) + } + Event::TraceFlusherDied(s) => { + error!("Trace flusher task died: {s}"); + Err("Trace flusher task terminated unexpectedly".into()) + } + Event::StatsFlusherDied(s) => { + error!("Stats flusher task died: {s}"); + Err("Stats flusher task terminated unexpectedly".into()) + } + Event::ConcentratorDied(s) => { + error!("Stats concentrator service task died: {s}"); + Err("Stats concentrator service task terminated unexpectedly".into()) + } + }; + + // Abort surviving tasks so they don't detach and hold sockets, + // named pipes, or buffered channel state. abort() is &self and a + // no-op on already-finished tasks, so calling it on whichever + // handle resolved in the select! above is harmless. + tcp_handle.abort(); + #[cfg(all(windows, feature = "windows-pipes"))] + if let Some(h) = pipe_handle.as_ref() { + h.abort(); + } + trace_flusher_handle.abort(); + stats_flusher_handle.abort(); + if let Some(h) = stats_concentrator_service_handle.as_ref() { + h.abort(); } - Ok(()) + result } - async fn serve_tcp( + async fn serve_accept_loop_tcp( listener: tokio::net::TcpListener, service: S, - mut trace_flusher_handle: tokio::task::JoinHandle<()>, - stats_flusher_handle: tokio::task::JoinHandle<()>, - mut stats_concentrator_service_handle: Option>, - mut shutdown_rx: oneshot::Receiver<()>, - flusher_shutdown_tx: oneshot::Sender<()>, - ) -> Result<(), Box> + mut shutdown_rx: tokio::sync::watch::Receiver, + ) -> Result<(), Box> where S: hyper::service::Service< hyper::Request, @@ -222,7 +350,6 @@ impl MiniAgent { { let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); - let mut stats_flusher_handle = stats_flusher_handle; loop { let conn = tokio::select! { @@ -238,7 +365,7 @@ impl MiniAgent { continue; } Err(e) => { - error!("Server error: {e}"); + error!("TCP server error: {e}"); return Err(e.into()); } Ok((conn, _)) => conn, @@ -254,36 +381,15 @@ impl MiniAgent { }, Ok(()) | Err(_) => continue, }, - // If there's some error in the background tasks, we can't send data - result = &mut trace_flusher_handle => { - return Err(format!("Trace flusher task terminated unexpectedly: {result:?}").into()); - }, - result = &mut stats_flusher_handle => { - return Err(format!("Stats flusher task terminated unexpectedly: {result:?}").into()); - }, - result = async { - match stats_concentrator_service_handle { - Some(ref mut h) => h.await, - None => std::future::pending().await, - } - } => { - return Err(format!("Stats concentrator service task terminated unexpectedly: {result:?}").into()); - }, - _ = &mut shutdown_rx => { - // Drain all in-flight connections so every handler has finished - // writing to the stats/trace channels before we trigger the flush. + _ = shutdown_rx.changed() => { + // drains every in-flight handler to completion here + // before the supervisor triggers the final flush. while let Some(result) = joinset.join_next().await { if let Err(e) = result && e.is_panic() { std::panic::resume_unwind(e.into_panic()); } } - // Signal the stats flusher to force-flush now that all handlers - // have finished writing to the channel. - let _ = flusher_shutdown_tx.send(()); - if let Err(e) = stats_flusher_handle.await { - return Err(format!("Stats flusher task failed during shutdown: {e:?}").into()); - } return Ok(()); }, }; @@ -292,22 +398,18 @@ impl MiniAgent { let service = service.clone(); joinset.spawn(async move { if let Err(e) = server.serve_connection(conn, service).await { - error!("Connection error: {e}"); + error!("TCP connection error: {e}"); } }); } } #[cfg(all(windows, feature = "windows-pipes"))] - async fn serve_named_pipe( - pipe_name: &str, + async fn serve_accept_loop_named_pipe( + pipe_name: String, service: S, - mut trace_flusher_handle: tokio::task::JoinHandle<()>, - stats_flusher_handle: tokio::task::JoinHandle<()>, - mut stats_concentrator_service_handle: Option>, - mut shutdown_rx: oneshot::Receiver<()>, - flusher_shutdown_tx: oneshot::Sender<()>, - ) -> Result<(), Box> + mut shutdown_rx: tokio::sync::watch::Receiver, + ) -> Result<(), Box> where S: hyper::service::Service< hyper::Request, @@ -320,12 +422,9 @@ impl MiniAgent { { let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); - let mut stats_flusher_handle = stats_flusher_handle; loop { - // Create a new pipe instance - // pipe_name already includes \\.\pipe\ prefix from config - let pipe = match ServerOptions::new().create(pipe_name) { + let pipe = match ServerOptions::new().create(&pipe_name) { Ok(pipe) => { debug!("Created pipe server instance '{}' in byte mode", pipe_name); pipe @@ -336,7 +435,6 @@ impl MiniAgent { } }; - // Wait for client connection let conn = tokio::select! { connect_res = pipe.connect() => match connect_res { Err(e) @@ -369,44 +467,25 @@ impl MiniAgent { }, Ok(()) | Err(_) => continue, }, - // If there's some error in the background tasks, we can't send data - result = &mut trace_flusher_handle => { - return Err(format!("Trace flusher task terminated unexpectedly: {result:?}").into()); - }, - result = &mut stats_flusher_handle => { - return Err(format!("Stats flusher task terminated unexpectedly: {result:?}").into()); - }, - result = async { - match stats_concentrator_service_handle { - Some(ref mut h) => h.await, - None => std::future::pending().await, - } - } => { - return Err(format!("Stats concentrator service task terminated unexpectedly: {result:?}").into()); - }, - _ = &mut shutdown_rx => { + _ = shutdown_rx.changed() => { + // drains every in-flight handler to completion here + // before the supervisor triggers the final flush. while let Some(result) = joinset.join_next().await { - if let Err(e) = result { - if e.is_panic() { + if let Err(e) = result + && e.is_panic() { std::panic::resume_unwind(e.into_panic()); } - } - } - let _ = flusher_shutdown_tx.send(()); - if let Err(e) = stats_flusher_handle.await { - return Err(format!("Stats flusher task failed during shutdown: {e:?}").into()); } return Ok(()); }, }; - // Hyper http parser handles buffering pipe data let conn = hyper_util::rt::TokioIo::new(conn); let server = server.clone(); let service = service.clone(); joinset.spawn(async move { if let Err(e) = server.serve_connection(conn, service).await { - error!("Connection error: {e}"); + error!("Named pipe connection error: {e}"); } }); } diff --git a/crates/datadog-trace-agent/tests/common/helpers.rs b/crates/datadog-trace-agent/tests/common/helpers.rs index 9808f47..c981f0c 100644 --- a/crates/datadog-trace-agent/tests/common/helpers.rs +++ b/crates/datadog-trace-agent/tests/common/helpers.rs @@ -10,11 +10,18 @@ use libdd_trace_utils::test_utils::create_test_json_span; use std::time::{Duration, UNIX_EPOCH}; use tokio::time::timeout; -/// Create a simple test trace payload as msgpack bytes -pub fn create_test_trace_payload() -> Vec { +/// Create a simple test trace payload as msgpack bytes. Pass `Some(name)` to +/// override the default service ("test-service"); dual-transport tests use +/// distinct service names to distinguish payloads that flow through +/// different listeners but share a single flusher pipeline. +pub fn create_test_trace_payload(service: Option<&str>) -> Vec { let start = UNIX_EPOCH.elapsed().unwrap().as_nanos() as i64; - let json_span = create_test_json_span(11, 222, 0, start, false); - rmp_serde::to_vec(&vec![vec![json_span]]).expect("Failed to serialize test trace") + let mut span = create_test_json_span(11, 222, 0, start, false); + if let Some(name) = service { + span["service"] = serde_json::Value::String(name.into()); + span["meta"]["service"] = serde_json::Value::String(name.into()); + } + rmp_serde::to_vec(&vec![vec![span]]).expect("Failed to serialize test trace") } /// Send an HTTP request over TCP and return the response diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 1941200..85dc420 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -23,8 +23,27 @@ use std::time::Duration; #[cfg(all(windows, feature = "windows-pipes"))] use common::helpers::send_named_pipe_request; +// Used by negative assertions ("no stats request arrived"): we have to +// wait some bounded time before declaring absence. Positive assertions +// poll the mock server directly via verify_*_request and don't need this. const FLUSH_WAIT_DURATION: Duration = Duration::from_millis(1500); +// Hard ceiling on verify_trace_request / verify_stats_request polling. The +// poll returns as soon as the request lands so steady-state cost is small; +// the timeout exists to fail loudly rather than hang indefinitely. +const VERIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(60); + +async fn wait_for_request_at_path(mock_server: &common::mock_server::MockServer, path: &str) { + let deadline = tokio::time::Instant::now() + VERIFY_REQUEST_TIMEOUT; + while tokio::time::Instant::now() < deadline { + if !mock_server.get_requests_for_path(path).is_empty() { + return; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + panic!("Timed out after {VERIFY_REQUEST_TIMEOUT:?} waiting for request at {path}"); +} + /// Helper to configure a config with mock server endpoints pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { let trace_url = format!("{}/api/v0.2/traces", mock_server_url); @@ -78,7 +97,8 @@ pub fn create_mini_agent_with_real_flushers( } /// Helper to verify trace request sent to mock server -pub fn verify_trace_request(mock_server: &common::mock_server::MockServer) { +pub async fn verify_trace_request(mock_server: &common::mock_server::MockServer) { + wait_for_request_at_path(mock_server, "/api/v0.2/traces").await; let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); assert!( @@ -114,7 +134,8 @@ pub fn verify_trace_request(mock_server: &common::mock_server::MockServer) { } /// Helper to verify stats request sent to mock server -pub fn verify_stats_request(mock_server: &common::mock_server::MockServer) { +pub async fn verify_stats_request(mock_server: &common::mock_server::MockServer) { + wait_for_request_at_path(mock_server, "/api/v0.2/stats").await; let stats_reqs = mock_server.get_requests_for_path("/api/v0.2/stats"); assert!( @@ -179,7 +200,7 @@ async fn test_mini_agent_tcp_handles_requests() { // Start the mini agent let agent_handle = tokio::spawn(async move { - let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); let _ = mini_agent.start_mini_agent(shutdown_rx, None).await; }); @@ -240,7 +261,7 @@ async fn test_mini_agent_tcp_handles_requests() { ); // Test /v0.4/traces endpoint with real trace data - let trace_payload = create_test_trace_payload(); + let trace_payload = create_test_trace_payload(None); let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload), &[]) .await @@ -279,7 +300,7 @@ async fn test_mini_agent_named_pipe_handles_requests() { // Start the mini agent let agent_handle = tokio::spawn(async move { - let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); let _ = mini_agent.start_mini_agent(shutdown_rx, None).await; }); @@ -322,7 +343,7 @@ async fn test_mini_agent_named_pipe_handles_requests() { ); // Test /v0.4/traces endpoint with real trace data - let trace_payload = create_test_trace_payload(); + let trace_payload = create_test_trace_payload(None); let trace_response = send_named_pipe_request(&pipe_path, "/v0.4/traces", "POST", Some(trace_payload)) .await @@ -353,7 +374,7 @@ async fn test_mini_agent_tcp_with_real_flushers() { let (mini_agent, stats_concentrator_service_handle) = create_mini_agent_with_real_flushers(config); - let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); let agent_handle = tokio::spawn(async move { let _ = mini_agent .start_mini_agent(shutdown_rx, Some(stats_concentrator_service_handle)) @@ -377,21 +398,19 @@ async fn test_mini_agent_tcp_with_real_flushers() { ); // Send trace data - let trace_payload = create_test_trace_payload(); + let trace_payload = create_test_trace_payload(None); let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload), &[]) .await .expect("Failed to send /v0.4/traces request"); assert_eq!(trace_response.status(), StatusCode::OK); - // Wait for trace flush - tokio::time::sleep(FLUSH_WAIT_DURATION).await; - verify_trace_request(&mock_server); + verify_trace_request(&mock_server).await; // Trigger shutdown to force flush in progress concentrator buckets - let _ = shutdown_tx.send(()); + let _ = shutdown_tx.send(true); let _ = agent_handle.await; - verify_stats_request(&mock_server); // Stats generator should generate stats from trace payload + verify_stats_request(&mock_server).await; // Stats generator should generate stats from trace payload } #[cfg(test)] @@ -410,7 +429,7 @@ async fn test_concentrator_task_death_shuts_down_mini_agent() { create_mini_agent_with_real_flushers(config); let abort_handle = stats_concentrator_service_handle.abort_handle(); - let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); let agent_handle = tokio::spawn(async move { mini_agent .start_mini_agent(shutdown_rx, Some(stats_concentrator_service_handle)) @@ -463,7 +482,7 @@ async fn test_mini_agent_tcp_with_real_flushers_and_tracer_computed_stats() { create_mini_agent_with_real_flushers(config); let agent_handle = tokio::spawn(async move { - let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); let _ = mini_agent.start_mini_agent(shutdown_rx, None).await; }); @@ -484,7 +503,7 @@ async fn test_mini_agent_tcp_with_real_flushers_and_tracer_computed_stats() { ); // Send trace data - let trace_payload = create_test_trace_payload(); + let trace_payload = create_test_trace_payload(None); let trace_response = send_tcp_request( test_port, "/v0.4/traces", @@ -496,11 +515,11 @@ async fn test_mini_agent_tcp_with_real_flushers_and_tracer_computed_stats() { .expect("Failed to send /v0.4/traces request"); assert_eq!(trace_response.status(), StatusCode::OK); - // Wait for flush + verify_trace_request(&mock_server).await; + // Bounded wait to confirm absence of stats request — stats wouldn't + // be generated when Datadog-Client-Computed-Stats is set on the trace. tokio::time::sleep(FLUSH_WAIT_DURATION).await; - - verify_trace_request(&mock_server); - verify_no_stats_request(&mock_server); // Stats generator should not generate stats from trace payload when Datadog-Client-Computed-Stats header is present in trace payload + verify_no_stats_request(&mock_server); // Clean up agent_handle.abort(); @@ -524,7 +543,7 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { let (mini_agent, _stats_concentrator_service_handle) = create_mini_agent_with_real_flushers(config); - let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); let agent_handle = tokio::spawn(async move { let _ = mini_agent.start_mini_agent(shutdown_rx, None).await; }); @@ -546,19 +565,124 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { ); // Send trace data via named pipe - let trace_payload = create_test_trace_payload(); + let trace_payload = create_test_trace_payload(None); let trace_response = send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload)) .await .expect("Failed to send /v0.4/traces request over named pipe"); assert_eq!(trace_response.status(), StatusCode::OK); - // Wait for trace flush - tokio::time::sleep(FLUSH_WAIT_DURATION).await; - verify_trace_request(&mock_server); + verify_trace_request(&mock_server).await; // Trigger shutdown to force flush in progress concentrator buckets - let _ = shutdown_tx.send(()); + let _ = shutdown_tx.send(true); + let _ = agent_handle.await; + verify_stats_request(&mock_server).await; +} + +#[cfg(all(test, windows, feature = "windows-pipes"))] +#[tokio::test] +#[serial] +async fn test_mini_agent_dual_transport_with_real_flushers() { + let mock_server = MockServer::start().await; + tokio::time::sleep(Duration::from_millis(50)).await; + + let pipe_name = r"\\.\pipe\dd_trace_dual_transport_test"; + let tcp_port: u16 = 8130; + + let mut config = create_tcp_test_config(tcp_port); + configure_mock_endpoints(&mut config, &mock_server.url()); + config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); + // Both transports are deliberately set on the same agent: a non-zero TCP + // port AND a pipe name. They must come up concurrently. + config.agent_stats_computation_enabled = true; + let config = Arc::new(config); + + let (mini_agent, stats_concentrator_service_handle) = + create_mini_agent_with_real_flushers(config); + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + let agent_handle = tokio::spawn(async move { + let _ = mini_agent + .start_mini_agent(shutdown_rx, Some(stats_concentrator_service_handle)) + .await; + }); + + // Readiness on each transport, sequentially (TCP then pipe). + let mut tcp_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_tcp_request(tcp_port, "/info", "GET", None, &[]).await + && response.status().is_success() + { + tcp_ready = true; + break; + } + } + assert!( + tcp_ready, + "TCP listener did not bind on port {tcp_port} when pipe is also configured \ + — config may be overriding receiver_port to 0 when a pipe name is set" + ); + + let mut pipe_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_named_pipe_request(pipe_name, "/info", "GET", None).await + && response.status().is_success() + { + pipe_ready = true; + break; + } + } + assert!( + pipe_ready, + "Named pipe listener did not come up at {pipe_name} when TCP is also configured" + ); + + // One trace per transport, distinguishable by service name. + let tcp_payload = create_test_trace_payload(Some("dual-tcp-svc")); + let pipe_payload = create_test_trace_payload(Some("dual-pipe-svc")); + + let tcp_response = send_tcp_request(tcp_port, "/v0.4/traces", "POST", Some(tcp_payload), &[]) + .await + .expect("Failed to send /v0.4/traces request over TCP"); + assert_eq!(tcp_response.status(), StatusCode::OK); + + let pipe_response = + send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(pipe_payload)) + .await + .expect("Failed to send /v0.4/traces request over named pipe"); + assert_eq!(pipe_response.status(), StatusCode::OK); + + wait_for_request_at_path(&mock_server, "/api/v0.2/traces").await; + + // Both payloads must reach the same backend through the shared flusher + // pipeline. The flusher may batch them into one POST or two; either is + // fine, what matters is that both service-name needles show up. + let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); + assert!( + !trace_reqs.is_empty(), + "no trace POST reached backend; expected traces from both transports" + ); + let mut all_bytes = Vec::new(); + for req in &trace_reqs { + assert_eq!(req.method, "POST"); + all_bytes.extend_from_slice(&req.body); + } + assert!( + all_bytes.windows(12).any(|w| w == b"dual-tcp-svc"), + "TCP-side trace did not reach backend" + ); + assert!( + all_bytes.windows(13).any(|w| w == b"dual-pipe-svc"), + "pipe-side trace did not reach backend" + ); + + // Trigger graceful shutdown. The watch fan-out must reach BOTH accept + // loops; each drains its in-flight handlers; the supervisor then + // signals the stats flusher to do its final emit. If fan-out skipped + // a transport, agent_handle would hang or stats wouldn't arrive. + let _ = shutdown_tx.send(true); let _ = agent_handle.await; - verify_stats_request(&mock_server); + verify_stats_request(&mock_server).await; }