From 5a2f4c701e40a71c04d74440dc6491771f16cd53 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 6 May 2026 15:26:29 +0200 Subject: [PATCH 1/6] perf(trace-serializer): pre-allocate serialization buffer # Motivation When serializing spans, we start from a zero sized buffer. Pre-allocate the buffer to prevent small re-allocs. # Changes * Save the length of the payload we serialised * When creating a new buffer allocate capacity for the length of the previous payload * Also add a 1KB minimum buffer size. --- .../src/trace_exporter/trace_serializer.rs | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 2a257c7e82..b397fe3cee 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -1,6 +1,8 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use std::sync::atomic::{AtomicUsize, Ordering}; + use crate::trace_exporter::agent_response::{ AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION, }; @@ -16,6 +18,8 @@ use libdd_trace_utils::span::{v04::Span, TraceData}; use libdd_trace_utils::trace_utils::{self, TracerHeaderTags}; use libdd_trace_utils::tracer_payload; +const MIN_BUFFER_CAPACITY: usize = 1024; + /// Prepared traces payload ready for sending to the agent pub(super) struct PreparedTracesPayload { /// Serialized msgpack payload @@ -28,6 +32,7 @@ pub(super) struct PreparedTracesPayload { /// Trace serialization client for handling payload preparation pub(super) struct TraceSerializer<'a> { + previous_serialised_len: AtomicUsize, output_format: TraceExporterOutputFormat, agent_payload_response_version: Option<&'a AgentResponsePayloadVersion>, } @@ -39,6 +44,7 @@ impl<'a> TraceSerializer<'a> { agent_payload_response_version: Option<&'a AgentResponsePayloadVersion>, ) -> Self { Self { + previous_serialised_len: AtomicUsize::new(MIN_BUFFER_CAPACITY), output_format, agent_payload_response_version, } @@ -96,12 +102,24 @@ impl<'a> TraceSerializer<'a> { &self, payload: &tracer_payload::TraceChunks, ) -> Result, TraceExporterError> { - match payload { - tracer_payload::TraceChunks::V04(p) => Ok(msgpack_encoder::v04::to_vec(p)), + let capacity = self + .previous_serialised_len + .load(Ordering::Relaxed) + .max(MIN_BUFFER_CAPACITY); + let buff = match payload { + tracer_payload::TraceChunks::V04(p) => { + msgpack_encoder::v04::to_vec_with_capacity(p, capacity as u32) + } tracer_payload::TraceChunks::V05(p) => { - rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization) + let mut buff = Vec::with_capacity(capacity); + rmp_serde::encode::write(&mut buff, p) + .map_err(TraceExporterError::Serialization)?; + buff } - } + }; + self.previous_serialised_len + .store(buff.len(), Ordering::Relaxed); + Ok(buff) } } From c9f2cc4a2fb9dd8139e17e6077ab9ce8dd48851c Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 6 May 2026 16:56:22 +0200 Subject: [PATCH 2/6] fix: make TraceSerializer persistent --- Cargo.toml | 4 ++ .../src/trace_exporter/builder.rs | 3 + libdd-data-pipeline/src/trace_exporter/mod.rs | 9 +-- .../src/trace_exporter/trace_serializer.rs | 70 +++++++++---------- 4 files changed, 47 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 290d70288a..e76c6f347e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,3 +113,7 @@ codegen-units = 1 # so benchmarks are not measuring the same thing as the release build. This patch removes # the default dependency on libm. A PR will be opened to proptest to make this optional. proptest = { git = 'https://github.com/bantonsson/proptest.git', branch = "ban/avoid-libm-in-std" } +# rmp ={ path = "../../../../../projects/msgpack-rust/rmp" } +# rmp-serde ={ path = "../../../../../projects/msgpack-rust/rmp-serde" } +# rmp ={ path = "/projects/msgpack-rust/rmp" } +# rmp-serde ={ path = "/projects/msgpack-rust/rmp-serde" } diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 6f24b8eb80..696c8473d6 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -310,6 +310,8 @@ impl TraceExporterBuilder { #[cfg(not(target_arch = "wasm32"))] { + use crate::trace_exporter::TraceSerializer; + let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)); let (info_fetcher, info_response_observer) = AgentInfoFetcher::::new(info_endpoint.clone(), Duration::from_secs(5 * 60)); @@ -400,6 +402,7 @@ impl TraceExporterBuilder { }, input_format: self.input_format, output_format: self.output_format, + serializer: TraceSerializer::new(self.output_format), client_computed_top_level: self.client_computed_top_level, shared_runtime, dogstatsd, diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index f87bfe16f3..71edd11f0a 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -215,6 +215,7 @@ pub struct TraceExporter { metadata: TracerMetadata, input_format: TraceExporterInputFormat, output_format: TraceExporterOutputFormat, + serializer: TraceSerializer, shared_runtime: Arc, /// None if dogstatsd is disabled dogstatsd: Option, @@ -616,11 +617,11 @@ impl TraceExporter { return self.send_otlp_traces_inner(traces, config).await; } - let serializer = TraceSerializer::new( - self.output_format, + let prepared = match self.serializer.prepare_traces_payload( + traces, + header_tags, self.agent_payload_response_version.as_ref(), - ); - let prepared = match serializer.prepare_traces_payload(traces, header_tags) { + ) { Ok(p) => p, Err(e) => { error!("Error serializing traces: {e}"); diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index b397fe3cee..4cc0c8f9c6 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -31,22 +31,18 @@ pub(super) struct PreparedTracesPayload { } /// Trace serialization client for handling payload preparation -pub(super) struct TraceSerializer<'a> { +#[derive(Debug)] +pub(super) struct TraceSerializer { previous_serialised_len: AtomicUsize, output_format: TraceExporterOutputFormat, - agent_payload_response_version: Option<&'a AgentResponsePayloadVersion>, } -impl<'a> TraceSerializer<'a> { +impl TraceSerializer { /// Create a new trace serializer - pub(super) fn new( - output_format: TraceExporterOutputFormat, - agent_payload_response_version: Option<&'a AgentResponsePayloadVersion>, - ) -> Self { + pub(super) fn new(output_format: TraceExporterOutputFormat) -> Self { Self { previous_serialised_len: AtomicUsize::new(MIN_BUFFER_CAPACITY), output_format, - agent_payload_response_version, } } @@ -55,10 +51,12 @@ impl<'a> TraceSerializer<'a> { &self, traces: Vec>>, header_tags: TracerHeaderTags, + agent_payload_response_version: Option<&AgentResponsePayloadVersion>, ) -> Result { let payload = self.collect_and_process_traces(traces)?; let chunks = payload.size(); - let headers = self.build_traces_headers(header_tags, chunks); + let headers = + self.build_traces_headers(header_tags, chunks, agent_payload_response_version); let mp_payload = self.serialize_payload(&payload)?; Ok(PreparedTracesPayload { @@ -83,13 +81,18 @@ impl<'a> TraceSerializer<'a> { } /// Build HTTP headers for traces request - fn build_traces_headers(&self, header_tags: TracerHeaderTags, chunk_count: usize) -> HeaderMap { + fn build_traces_headers( + &self, + header_tags: TracerHeaderTags, + chunk_count: usize, + agent_payload_response_version: Option<&AgentResponsePayloadVersion>, + ) -> HeaderMap { let mut headers: HeaderMap = header_tags.into(); headers.reserve(4); headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1")); headers.insert(DATADOG_TRACE_COUNT, chunk_count.into()); headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK); - if let Some(agent_payload_response_version) = &self.agent_payload_response_version { + if let Some(agent_payload_response_version) = agent_payload_response_version { // should never fail, as the version should only contain visible ascii chars let _ = HeaderValue::try_from(agent_payload_response_version.header_value()) .map(|v| headers.insert(DATADOG_RATES_PAYLOAD_VERSION, v)); @@ -164,30 +167,27 @@ mod tests { #[test] fn test_trace_serializer_new() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); assert!(matches!( serializer.output_format, TraceExporterOutputFormat::V04 )); - assert!(serializer.agent_payload_response_version.is_none()); } #[test] fn test_trace_serializer_new_with_agent_version() { - let agent_version = AgentResponsePayloadVersion::new(); - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, Some(&agent_version)); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); assert!(matches!( serializer.output_format, TraceExporterOutputFormat::V05 )); - assert!(serializer.agent_payload_response_version.is_some()); } #[test] fn test_build_traces_headers() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let header_tags = create_test_header_tags(); - let headers = serializer.build_traces_headers(header_tags, 3); + let headers = serializer.build_traces_headers(header_tags, 3, None); // Check basic headers are present assert_eq!(headers.get(DATADOG_SEND_REAL_HTTP_STATUS).unwrap(), "1"); @@ -215,9 +215,9 @@ mod tests { #[test] fn test_build_traces_headers_with_agent_version() { let agent_version = AgentResponsePayloadVersion::new(); - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, Some(&agent_version)); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let header_tags = create_test_header_tags(); - let headers = serializer.build_traces_headers(header_tags, 2); + let headers = serializer.build_traces_headers(header_tags, 2, Some(&agent_version)); // Check that agent payload version header is included assert!(headers.contains_key(DATADOG_RATES_PAYLOAD_VERSION)); @@ -226,7 +226,7 @@ mod tests { #[test] fn test_collect_and_process_traces_v04() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let traces = vec![vec![create_test_span()]]; let result = serializer.collect_and_process_traces(traces); @@ -239,7 +239,7 @@ mod tests { #[test] fn test_collect_and_process_traces_v05() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); let traces = vec![vec![create_test_span()]]; let result = serializer.collect_and_process_traces(traces); @@ -252,7 +252,7 @@ mod tests { #[test] fn test_collect_and_process_traces_multiple_chunks() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let traces = vec![ vec![create_test_span()], vec![create_test_span(), create_test_span()], @@ -268,7 +268,7 @@ mod tests { #[test] fn test_serialize_payload_v04() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let original_traces = vec![vec![create_test_span()]]; let payload = serializer .collect_and_process_traces(original_traces.clone()) @@ -303,7 +303,7 @@ mod tests { #[test] fn test_serialize_payload_v05() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); let original_traces = vec![vec![create_test_span()]]; let payload = serializer .collect_and_process_traces(original_traces.clone()) @@ -338,14 +338,14 @@ mod tests { #[test] fn test_prepare_traces_payload_v04() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let traces = vec![ vec![create_test_span()], vec![create_test_span(), create_test_span()], ]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = serializer.prepare_traces_payload(traces, header_tags, None); assert!(result.is_ok()); let prepared = result.unwrap(); @@ -360,11 +360,11 @@ mod tests { #[test] fn test_prepare_traces_payload_v05() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); let traces = vec![vec![create_test_span()]]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = serializer.prepare_traces_payload(traces, header_tags, None); assert!(result.is_ok()); let prepared = result.unwrap(); @@ -376,11 +376,11 @@ mod tests { #[test] fn test_prepare_traces_payload_with_agent_version() { let agent_version = AgentResponsePayloadVersion::new(); - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, Some(&agent_version)); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let traces = vec![vec![create_test_span()]]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = serializer.prepare_traces_payload(traces, header_tags, Some(&agent_version)); assert!(result.is_ok()); let prepared = result.unwrap(); @@ -390,11 +390,11 @@ mod tests { #[test] fn test_prepare_traces_payload_empty_traces() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let traces: Vec> = vec![]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = serializer.prepare_traces_payload(traces, header_tags, None); assert!(result.is_ok()); let prepared = result.unwrap(); @@ -416,8 +416,8 @@ mod tests { ..Default::default() }; - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); - let headers = serializer.build_traces_headers(header_tags, 1); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let headers = serializer.build_traces_headers(header_tags, 1, None); assert_eq!(headers.get("datadog-meta-lang").unwrap(), "python"); assert_eq!(headers.get("datadog-meta-lang-version").unwrap(), "3.9.0"); From fff9999fd341c105a35ef26a874f3be57364f666 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 6 May 2026 17:10:43 +0200 Subject: [PATCH 3/6] fix: wasm only codepath --- libdd-data-pipeline/src/trace_exporter/builder.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 696c8473d6..5797b4ed45 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -487,6 +487,7 @@ impl TraceExporterBuilder { }, input_format: self.input_format, output_format: self.output_format, + serializer: TraceSerializer::new(self.output_format), client_computed_top_level: self.client_computed_top_level, shared_runtime, dogstatsd, From d74ef622b77e67446cd1b7b888925a5a710cdfcb Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Thu, 7 May 2026 12:04:00 +0200 Subject: [PATCH 4/6] fix: adress comments --- Cargo.toml | 4 ---- libdd-data-pipeline/src/trace_exporter/trace_serializer.rs | 1 + 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e76c6f347e..290d70288a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,7 +113,3 @@ codegen-units = 1 # so benchmarks are not measuring the same thing as the release build. This patch removes # the default dependency on libm. A PR will be opened to proptest to make this optional. proptest = { git = 'https://github.com/bantonsson/proptest.git', branch = "ban/avoid-libm-in-std" } -# rmp ={ path = "../../../../../projects/msgpack-rust/rmp" } -# rmp-serde ={ path = "../../../../../projects/msgpack-rust/rmp-serde" } -# rmp ={ path = "/projects/msgpack-rust/rmp" } -# rmp-serde ={ path = "/projects/msgpack-rust/rmp-serde" } diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 4cc0c8f9c6..005050024f 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -18,6 +18,7 @@ use libdd_trace_utils::span::{v04::Span, TraceData}; use libdd_trace_utils::trace_utils::{self, TracerHeaderTags}; use libdd_trace_utils::tracer_payload; +/// Minimal capacity of fresh buffers allocated to encode traces, in bytes. const MIN_BUFFER_CAPACITY: usize = 1024; /// Prepared traces payload ready for sending to the agent From ccd7c0cd4d704abd85a5fe0c65837bdc54040f26 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Thu, 7 May 2026 12:06:37 +0200 Subject: [PATCH 5/6] fix: TraceSerializer import on wasm --- libdd-data-pipeline/src/trace_exporter/builder.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 5797b4ed45..c13d5fe304 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -15,7 +15,7 @@ use crate::trace_exporter::TraceExporterWorkers; use crate::trace_exporter::{ add_path, StatsComputationStatus, TelemetryInstrumentationSessions, TraceExporter, TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, TracerMetadata, - INFO_ENDPOINT, + INFO_ENDPOINT, TraceSerializer }; use arc_swap::ArcSwap; use libdd_capabilities::{HttpClientTrait, MaybeSend}; @@ -310,8 +310,6 @@ impl TraceExporterBuilder { #[cfg(not(target_arch = "wasm32"))] { - use crate::trace_exporter::TraceSerializer; - let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)); let (info_fetcher, info_response_observer) = AgentInfoFetcher::::new(info_endpoint.clone(), Duration::from_secs(5 * 60)); From da0086a0d86890688a7a2bffb265358f242151e7 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Thu, 7 May 2026 12:53:41 +0200 Subject: [PATCH 6/6] fix: fmt --- libdd-data-pipeline/src/trace_exporter/builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index c13d5fe304..fbff2016b1 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -14,8 +14,8 @@ use crate::trace_exporter::TelemetryConfig; use crate::trace_exporter::TraceExporterWorkers; use crate::trace_exporter::{ add_path, StatsComputationStatus, TelemetryInstrumentationSessions, TraceExporter, - TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, TracerMetadata, - INFO_ENDPOINT, TraceSerializer + TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, TraceSerializer, + TracerMetadata, INFO_ENDPOINT, }; use arc_swap::ArcSwap; use libdd_capabilities::{HttpClientTrait, MaybeSend};