diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 6f24b8eb80..fbff2016b1 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -14,8 +14,8 @@ use crate::trace_exporter::TelemetryConfig; use crate::trace_exporter::TraceExporterWorkers; use crate::trace_exporter::{ add_path, StatsComputationStatus, TelemetryInstrumentationSessions, TraceExporter, - TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, TracerMetadata, - INFO_ENDPOINT, + TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, TraceSerializer, + TracerMetadata, INFO_ENDPOINT, }; use arc_swap::ArcSwap; use libdd_capabilities::{HttpClientTrait, MaybeSend}; @@ -400,6 +400,7 @@ impl TraceExporterBuilder { }, input_format: self.input_format, output_format: self.output_format, + serializer: TraceSerializer::new(self.output_format), client_computed_top_level: self.client_computed_top_level, shared_runtime, dogstatsd, @@ -484,6 +485,7 @@ impl TraceExporterBuilder { }, input_format: self.input_format, output_format: self.output_format, + serializer: TraceSerializer::new(self.output_format), client_computed_top_level: self.client_computed_top_level, shared_runtime, dogstatsd, diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index f87bfe16f3..71edd11f0a 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -215,6 +215,7 @@ pub struct TraceExporter { metadata: TracerMetadata, input_format: TraceExporterInputFormat, output_format: TraceExporterOutputFormat, + serializer: TraceSerializer, shared_runtime: Arc, /// None if dogstatsd is disabled dogstatsd: Option, @@ -616,11 +617,11 @@ impl TraceExporter { return self.send_otlp_traces_inner(traces, config).await; } - let serializer = TraceSerializer::new( - self.output_format, + let prepared = match self.serializer.prepare_traces_payload( + traces, + header_tags, self.agent_payload_response_version.as_ref(), - ); - let prepared = match serializer.prepare_traces_payload(traces, header_tags) { + ) { Ok(p) => p, Err(e) => { error!("Error serializing traces: {e}"); diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 2a257c7e82..005050024f 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -1,6 +1,8 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use std::sync::atomic::{AtomicUsize, Ordering}; + use crate::trace_exporter::agent_response::{ AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION, }; @@ -16,6 +18,9 @@ use libdd_trace_utils::span::{v04::Span, TraceData}; use libdd_trace_utils::trace_utils::{self, TracerHeaderTags}; use libdd_trace_utils::tracer_payload; +/// Minimal capacity of fresh buffers allocated to encode traces, in bytes. +const MIN_BUFFER_CAPACITY: usize = 1024; + /// Prepared traces payload ready for sending to the agent pub(super) struct PreparedTracesPayload { /// Serialized msgpack payload @@ -27,20 +32,18 @@ pub(super) struct PreparedTracesPayload { } /// Trace serialization client for handling payload preparation -pub(super) struct TraceSerializer<'a> { +#[derive(Debug)] +pub(super) struct TraceSerializer { + previous_serialised_len: AtomicUsize, output_format: TraceExporterOutputFormat, - agent_payload_response_version: Option<&'a AgentResponsePayloadVersion>, } -impl<'a> TraceSerializer<'a> { +impl TraceSerializer { /// Create a new trace serializer - pub(super) fn new( - output_format: TraceExporterOutputFormat, - agent_payload_response_version: Option<&'a AgentResponsePayloadVersion>, - ) -> Self { + pub(super) fn new(output_format: TraceExporterOutputFormat) -> Self { Self { + previous_serialised_len: AtomicUsize::new(MIN_BUFFER_CAPACITY), output_format, - agent_payload_response_version, } } @@ -49,10 +52,12 @@ impl<'a> TraceSerializer<'a> { &self, traces: Vec>>, header_tags: TracerHeaderTags, + agent_payload_response_version: Option<&AgentResponsePayloadVersion>, ) -> Result { let payload = self.collect_and_process_traces(traces)?; let chunks = payload.size(); - let headers = self.build_traces_headers(header_tags, chunks); + let headers = + self.build_traces_headers(header_tags, chunks, agent_payload_response_version); let mp_payload = self.serialize_payload(&payload)?; Ok(PreparedTracesPayload { @@ -77,13 +82,18 @@ impl<'a> TraceSerializer<'a> { } /// Build HTTP headers for traces request - fn build_traces_headers(&self, header_tags: TracerHeaderTags, chunk_count: usize) -> HeaderMap { + fn build_traces_headers( + &self, + header_tags: TracerHeaderTags, + chunk_count: usize, + agent_payload_response_version: Option<&AgentResponsePayloadVersion>, + ) -> HeaderMap { let mut headers: HeaderMap = header_tags.into(); headers.reserve(4); headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1")); headers.insert(DATADOG_TRACE_COUNT, chunk_count.into()); headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK); - if let Some(agent_payload_response_version) = &self.agent_payload_response_version { + if let Some(agent_payload_response_version) = agent_payload_response_version { // should never fail, as the version should only contain visible ascii chars let _ = HeaderValue::try_from(agent_payload_response_version.header_value()) .map(|v| headers.insert(DATADOG_RATES_PAYLOAD_VERSION, v)); @@ -96,12 +106,24 @@ impl<'a> TraceSerializer<'a> { &self, payload: &tracer_payload::TraceChunks, ) -> Result, TraceExporterError> { - match payload { - tracer_payload::TraceChunks::V04(p) => Ok(msgpack_encoder::v04::to_vec(p)), + let capacity = self + .previous_serialised_len + .load(Ordering::Relaxed) + .max(MIN_BUFFER_CAPACITY); + let buff = match payload { + tracer_payload::TraceChunks::V04(p) => { + msgpack_encoder::v04::to_vec_with_capacity(p, capacity as u32) + } tracer_payload::TraceChunks::V05(p) => { - rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization) + let mut buff = Vec::with_capacity(capacity); + rmp_serde::encode::write(&mut buff, p) + .map_err(TraceExporterError::Serialization)?; + buff } - } + }; + self.previous_serialised_len + .store(buff.len(), Ordering::Relaxed); + Ok(buff) } } @@ -146,30 +168,27 @@ mod tests { #[test] fn test_trace_serializer_new() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); assert!(matches!( serializer.output_format, TraceExporterOutputFormat::V04 )); - assert!(serializer.agent_payload_response_version.is_none()); } #[test] fn test_trace_serializer_new_with_agent_version() { - let agent_version = AgentResponsePayloadVersion::new(); - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, Some(&agent_version)); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); assert!(matches!( serializer.output_format, TraceExporterOutputFormat::V05 )); - assert!(serializer.agent_payload_response_version.is_some()); } #[test] fn test_build_traces_headers() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let header_tags = create_test_header_tags(); - let headers = serializer.build_traces_headers(header_tags, 3); + let headers = serializer.build_traces_headers(header_tags, 3, None); // Check basic headers are present assert_eq!(headers.get(DATADOG_SEND_REAL_HTTP_STATUS).unwrap(), "1"); @@ -197,9 +216,9 @@ mod tests { #[test] fn test_build_traces_headers_with_agent_version() { let agent_version = AgentResponsePayloadVersion::new(); - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, Some(&agent_version)); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let header_tags = create_test_header_tags(); - let headers = serializer.build_traces_headers(header_tags, 2); + let headers = serializer.build_traces_headers(header_tags, 2, Some(&agent_version)); // Check that agent payload version header is included assert!(headers.contains_key(DATADOG_RATES_PAYLOAD_VERSION)); @@ -208,7 +227,7 @@ mod tests { #[test] fn test_collect_and_process_traces_v04() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let traces = vec![vec![create_test_span()]]; let result = serializer.collect_and_process_traces(traces); @@ -221,7 +240,7 @@ mod tests { #[test] fn test_collect_and_process_traces_v05() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); let traces = vec![vec![create_test_span()]]; let result = serializer.collect_and_process_traces(traces); @@ -234,7 +253,7 @@ mod tests { #[test] fn test_collect_and_process_traces_multiple_chunks() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let traces = vec![ vec![create_test_span()], vec![create_test_span(), create_test_span()], @@ -250,7 +269,7 @@ mod tests { #[test] fn test_serialize_payload_v04() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let original_traces = vec![vec![create_test_span()]]; let payload = serializer .collect_and_process_traces(original_traces.clone()) @@ -285,7 +304,7 @@ mod tests { #[test] fn test_serialize_payload_v05() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); let original_traces = vec![vec![create_test_span()]]; let payload = serializer .collect_and_process_traces(original_traces.clone()) @@ -320,14 +339,14 @@ mod tests { #[test] fn test_prepare_traces_payload_v04() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let traces = vec![ vec![create_test_span()], vec![create_test_span(), create_test_span()], ]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = serializer.prepare_traces_payload(traces, header_tags, None); assert!(result.is_ok()); let prepared = result.unwrap(); @@ -342,11 +361,11 @@ mod tests { #[test] fn test_prepare_traces_payload_v05() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); let traces = vec![vec![create_test_span()]]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = serializer.prepare_traces_payload(traces, header_tags, None); assert!(result.is_ok()); let prepared = result.unwrap(); @@ -358,11 +377,11 @@ mod tests { #[test] fn test_prepare_traces_payload_with_agent_version() { let agent_version = AgentResponsePayloadVersion::new(); - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, Some(&agent_version)); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let traces = vec![vec![create_test_span()]]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = serializer.prepare_traces_payload(traces, header_tags, Some(&agent_version)); assert!(result.is_ok()); let prepared = result.unwrap(); @@ -372,11 +391,11 @@ mod tests { #[test] fn test_prepare_traces_payload_empty_traces() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let traces: Vec> = vec![]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = serializer.prepare_traces_payload(traces, header_tags, None); assert!(result.is_ok()); let prepared = result.unwrap(); @@ -398,8 +417,8 @@ mod tests { ..Default::default() }; - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None); - let headers = serializer.build_traces_headers(header_tags, 1); + let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let headers = serializer.build_traces_headers(header_tags, 1, None); assert_eq!(headers.get("datadog-meta-lang").unwrap(), "python"); assert_eq!(headers.get("datadog-meta-lang-version").unwrap(), "3.9.0");