Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::trace_exporter::TelemetryConfig;
use crate::trace_exporter::TraceExporterWorkers;
use crate::trace_exporter::{
add_path, StatsComputationStatus, TelemetryInstrumentationSessions, TraceExporter,
TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, TracerMetadata,
INFO_ENDPOINT,
TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, TraceSerializer,
TracerMetadata, INFO_ENDPOINT,
};
use arc_swap::ArcSwap;
use libdd_capabilities::{HttpClientTrait, MaybeSend};
Expand Down Expand Up @@ -400,6 +400,7 @@ impl TraceExporterBuilder {
},
input_format: self.input_format,
output_format: self.output_format,
serializer: TraceSerializer::new(self.output_format),
client_computed_top_level: self.client_computed_top_level,
shared_runtime,
dogstatsd,
Expand Down Expand Up @@ -484,6 +485,7 @@ impl TraceExporterBuilder {
},
input_format: self.input_format,
output_format: self.output_format,
serializer: TraceSerializer::new(self.output_format),
client_computed_top_level: self.client_computed_top_level,
shared_runtime,
dogstatsd,
Expand Down
9 changes: 5 additions & 4 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ pub struct TraceExporter<H: HttpClientTrait + MaybeSend + Sync + 'static> {
metadata: TracerMetadata,
input_format: TraceExporterInputFormat,
output_format: TraceExporterOutputFormat,
serializer: TraceSerializer,
shared_runtime: Arc<SharedRuntime>,
/// None if dogstatsd is disabled
dogstatsd: Option<Client>,
Expand Down Expand Up @@ -616,11 +617,11 @@ impl<H: HttpClientTrait + MaybeSend + Sync + 'static> TraceExporter<H> {
return self.send_otlp_traces_inner(traces, config).await;
}

let serializer = TraceSerializer::new(
self.output_format,
let prepared = match self.serializer.prepare_traces_payload(
traces,
header_tags,
self.agent_payload_response_version.as_ref(),
);
let prepared = match serializer.prepare_traces_payload(traces, header_tags) {
) {
Ok(p) => p,
Err(e) => {
error!("Error serializing traces: {e}");
Expand Down
97 changes: 58 additions & 39 deletions libdd-data-pipeline/src/trace_exporter/trace_serializer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use std::sync::atomic::{AtomicUsize, Ordering};

use crate::trace_exporter::agent_response::{
AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION,
};
Expand All @@ -16,6 +18,9 @@ use libdd_trace_utils::span::{v04::Span, TraceData};
use libdd_trace_utils::trace_utils::{self, TracerHeaderTags};
use libdd_trace_utils::tracer_payload;

/// Minimal capacity of fresh buffers allocated to encode traces, in bytes.
const MIN_BUFFER_CAPACITY: usize = 1024;
Comment thread
paullegranddc marked this conversation as resolved.

/// Prepared traces payload ready for sending to the agent
pub(super) struct PreparedTracesPayload {
/// Serialized msgpack payload
Expand All @@ -27,20 +32,18 @@ pub(super) struct PreparedTracesPayload {
}

/// Trace serialization client for handling payload preparation
pub(super) struct TraceSerializer<'a> {
#[derive(Debug)]
pub(super) struct TraceSerializer {
previous_serialised_len: AtomicUsize,
output_format: TraceExporterOutputFormat,
agent_payload_response_version: Option<&'a AgentResponsePayloadVersion>,
}

impl<'a> TraceSerializer<'a> {
impl TraceSerializer {
/// Create a new trace serializer
pub(super) fn new(
output_format: TraceExporterOutputFormat,
agent_payload_response_version: Option<&'a AgentResponsePayloadVersion>,
) -> Self {
pub(super) fn new(output_format: TraceExporterOutputFormat) -> Self {
Self {
previous_serialised_len: AtomicUsize::new(MIN_BUFFER_CAPACITY),
output_format,
agent_payload_response_version,
}
}

Expand All @@ -49,10 +52,12 @@ impl<'a> TraceSerializer<'a> {
&self,
traces: Vec<Vec<Span<T>>>,
header_tags: TracerHeaderTags,
agent_payload_response_version: Option<&AgentResponsePayloadVersion>,
) -> Result<PreparedTracesPayload, TraceExporterError> {
let payload = self.collect_and_process_traces(traces)?;
let chunks = payload.size();
let headers = self.build_traces_headers(header_tags, chunks);
let headers =
self.build_traces_headers(header_tags, chunks, agent_payload_response_version);
let mp_payload = self.serialize_payload(&payload)?;

Ok(PreparedTracesPayload {
Expand All @@ -77,13 +82,18 @@ impl<'a> TraceSerializer<'a> {
}

/// Build HTTP headers for traces request
fn build_traces_headers(&self, header_tags: TracerHeaderTags, chunk_count: usize) -> HeaderMap {
fn build_traces_headers(
&self,
header_tags: TracerHeaderTags,
chunk_count: usize,
agent_payload_response_version: Option<&AgentResponsePayloadVersion>,
) -> HeaderMap {
let mut headers: HeaderMap = header_tags.into();
headers.reserve(4);
headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1"));
headers.insert(DATADOG_TRACE_COUNT, chunk_count.into());
headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
if let Some(agent_payload_response_version) = &self.agent_payload_response_version {
if let Some(agent_payload_response_version) = agent_payload_response_version {
// should never fail, as the version should only contain visible ascii chars
let _ = HeaderValue::try_from(agent_payload_response_version.header_value())
.map(|v| headers.insert(DATADOG_RATES_PAYLOAD_VERSION, v));
Expand All @@ -96,12 +106,24 @@ impl<'a> TraceSerializer<'a> {
&self,
payload: &tracer_payload::TraceChunks<T>,
) -> Result<Vec<u8>, TraceExporterError> {
match payload {
tracer_payload::TraceChunks::V04(p) => Ok(msgpack_encoder::v04::to_vec(p)),
let capacity = self
.previous_serialised_len
.load(Ordering::Relaxed)
.max(MIN_BUFFER_CAPACITY);
Comment on lines +109 to +112
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Persist the serializer across sends

In the production agent path I checked, send_trace_chunks_inner constructs a fresh TraceSerializer immediately before the single prepare_traces_payload call (libdd-data-pipeline/src/trace_exporter/mod.rs:619-623), so this load always sees the constructor's MIN_BUFFER_CAPACITY value rather than the previous payload size. That means batches larger than 1KB still grow from a 1KB allocation on every send, and the stored length is discarded when the local serializer is dropped; to get the intended pre-allocation, the serializer state needs to live across send calls.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is true, maybe previous_serialized_len should be a static ? Or would the scope be too large?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is true, I just refactored the TraceSerializer and stored it in the TraceExporter, this way it persists across sends

let buff = match payload {
tracer_payload::TraceChunks::V04(p) => {
msgpack_encoder::v04::to_vec_with_capacity(p, capacity as u32)
}
tracer_payload::TraceChunks::V05(p) => {
rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization)
let mut buff = Vec::with_capacity(capacity);
rmp_serde::encode::write(&mut buff, p)
.map_err(TraceExporterError::Serialization)?;
buff
}
}
};
self.previous_serialised_len
.store(buff.len(), Ordering::Relaxed);
Ok(buff)
}
}

Expand Down Expand Up @@ -146,30 +168,27 @@ mod tests {

#[test]
fn test_trace_serializer_new() {
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None);
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04);
assert!(matches!(
serializer.output_format,
TraceExporterOutputFormat::V04
));
assert!(serializer.agent_payload_response_version.is_none());
}

#[test]
fn test_trace_serializer_new_with_agent_version() {
let agent_version = AgentResponsePayloadVersion::new();
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, Some(&agent_version));
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05);
assert!(matches!(
serializer.output_format,
TraceExporterOutputFormat::V05
));
assert!(serializer.agent_payload_response_version.is_some());
}

#[test]
fn test_build_traces_headers() {
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None);
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04);
let header_tags = create_test_header_tags();
let headers = serializer.build_traces_headers(header_tags, 3);
let headers = serializer.build_traces_headers(header_tags, 3, None);

// Check basic headers are present
assert_eq!(headers.get(DATADOG_SEND_REAL_HTTP_STATUS).unwrap(), "1");
Expand Down Expand Up @@ -197,9 +216,9 @@ mod tests {
#[test]
fn test_build_traces_headers_with_agent_version() {
let agent_version = AgentResponsePayloadVersion::new();
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, Some(&agent_version));
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04);
let header_tags = create_test_header_tags();
let headers = serializer.build_traces_headers(header_tags, 2);
let headers = serializer.build_traces_headers(header_tags, 2, Some(&agent_version));

// Check that agent payload version header is included
assert!(headers.contains_key(DATADOG_RATES_PAYLOAD_VERSION));
Expand All @@ -208,7 +227,7 @@ mod tests {

#[test]
fn test_collect_and_process_traces_v04() {
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None);
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04);
let traces = vec![vec![create_test_span()]];

let result = serializer.collect_and_process_traces(traces);
Expand All @@ -221,7 +240,7 @@ mod tests {

#[test]
fn test_collect_and_process_traces_v05() {
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, None);
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05);
let traces = vec![vec![create_test_span()]];

let result = serializer.collect_and_process_traces(traces);
Expand All @@ -234,7 +253,7 @@ mod tests {

#[test]
fn test_collect_and_process_traces_multiple_chunks() {
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None);
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04);
let traces = vec![
vec![create_test_span()],
vec![create_test_span(), create_test_span()],
Expand All @@ -250,7 +269,7 @@ mod tests {

#[test]
fn test_serialize_payload_v04() {
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None);
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04);
let original_traces = vec![vec![create_test_span()]];
let payload = serializer
.collect_and_process_traces(original_traces.clone())
Expand Down Expand Up @@ -285,7 +304,7 @@ mod tests {

#[test]
fn test_serialize_payload_v05() {
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, None);
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05);
let original_traces = vec![vec![create_test_span()]];
let payload = serializer
.collect_and_process_traces(original_traces.clone())
Expand Down Expand Up @@ -320,14 +339,14 @@ mod tests {

#[test]
fn test_prepare_traces_payload_v04() {
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None);
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04);
let traces = vec![
vec![create_test_span()],
vec![create_test_span(), create_test_span()],
];
let header_tags = create_test_header_tags();

let result = serializer.prepare_traces_payload(traces, header_tags);
let result = serializer.prepare_traces_payload(traces, header_tags, None);
assert!(result.is_ok());

let prepared = result.unwrap();
Expand All @@ -342,11 +361,11 @@ mod tests {

#[test]
fn test_prepare_traces_payload_v05() {
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05, None);
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05);
let traces = vec![vec![create_test_span()]];
let header_tags = create_test_header_tags();

let result = serializer.prepare_traces_payload(traces, header_tags);
let result = serializer.prepare_traces_payload(traces, header_tags, None);
assert!(result.is_ok());

let prepared = result.unwrap();
Expand All @@ -358,11 +377,11 @@ mod tests {
#[test]
fn test_prepare_traces_payload_with_agent_version() {
let agent_version = AgentResponsePayloadVersion::new();
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, Some(&agent_version));
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04);
let traces = vec![vec![create_test_span()]];
let header_tags = create_test_header_tags();

let result = serializer.prepare_traces_payload(traces, header_tags);
let result = serializer.prepare_traces_payload(traces, header_tags, Some(&agent_version));
assert!(result.is_ok());

let prepared = result.unwrap();
Expand All @@ -372,11 +391,11 @@ mod tests {

#[test]
fn test_prepare_traces_payload_empty_traces() {
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None);
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04);
let traces: Vec<Vec<SpanBytes>> = vec![];
let header_tags = create_test_header_tags();

let result = serializer.prepare_traces_payload(traces, header_tags);
let result = serializer.prepare_traces_payload(traces, header_tags, None);
assert!(result.is_ok());

let prepared = result.unwrap();
Expand All @@ -398,8 +417,8 @@ mod tests {
..Default::default()
};

let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04, None);
let headers = serializer.build_traces_headers(header_tags, 1);
let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04);
let headers = serializer.build_traces_headers(header_tags, 1, None);

assert_eq!(headers.get("datadog-meta-lang").unwrap(), "python");
assert_eq!(headers.get("datadog-meta-lang-version").unwrap(), "3.9.0");
Expand Down
Loading