Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5354ed8
added bytes_limiting policy type
portertech Sep 4, 2025
e3a70a3
fixed test context
portertech Sep 5, 2025
0eabb42
optimize calc trace size
portertech Sep 5, 2025
7395580
changelog entry
portertech Sep 5, 2025
100dc78
removed debug logging
portertech Sep 5, 2025
60491a2
Merge branch 'main' into bytes_limiting
portertech Sep 5, 2025
9035ebf
kb -> bytes, precision
portertech Sep 5, 2025
01aa675
use golang.org/x/time/rate
portertech Sep 5, 2025
136a7e6
use ptrace proto marshaler to determine trace size
portertech Sep 5, 2025
47f648e
updated changelog entry
portertech Sep 5, 2025
5227b30
go mod tidy
portertech Sep 5, 2025
8b14194
lint fixes
portertech Sep 5, 2025
7c8585c
Merge branch 'main' into bytes_limiting
portertech Sep 8, 2025
95349b6
import time/rate
portertech Sep 8, 2025
0923183
make gotidy
portertech Sep 8, 2025
2a5adb1
Merge branch 'main' into bytes_limiting
portertech Sep 8, 2025
20259f9
Merge branch 'main' into bytes_limiting
portertech Sep 9, 2025
7c75171
make crosslink, fixed GH conflict resolution commit
portertech Sep 9, 2025
97fb2ed
Merge branch 'main' into bytes_limiting
portertech Sep 10, 2025
8ac0184
Update processor/tailsamplingprocessor/internal/sampling/bytes_limiti…
portertech Sep 26, 2025
910d490
Update processor/tailsamplingprocessor/config.go
portertech Sep 26, 2025
99b06f7
use samplingpolicy pkg types
portertech Oct 11, 2025
d52e900
Merge branch 'main' into bytes_limiting
portertech Oct 11, 2025
44f421d
fixed processor benchmark
portertech Oct 11, 2025
6d9bb67
fixed changelog component
portertech Oct 11, 2025
9dd8ae1
the component is now processor/tail_sampling
portertech Oct 11, 2025
2a9d8a7
comment indentation lint error
portertech Oct 11, 2025
70445d1
deleted test artifact
portertech Oct 14, 2025
cb2216d
revert ddog x/time changes
portertech Oct 14, 2025
6f584c5
missed one ddog change
portertech Oct 14, 2025
1976685
revert other ddog x/time changes
portertech Oct 14, 2025
5f048ae
Merge branch 'main' into bytes_limiting
portertech Oct 14, 2025
744f623
fixed unrelated changelog entry component
portertech Oct 14, 2025
d9d2ba3
silly github conflict editor, indentation
portertech Oct 14, 2025
b5712d7
Merge branch 'main' into bytes_limiting
portertech Oct 15, 2025
fe5bcff
Merge branch 'main' into bytes_limiting
portertech Oct 31, 2025
42389c7
Merge branch 'main' into bytes_limiting
portertech Nov 13, 2025
1141ee9
corrected gh conflict fix indentation
portertech Nov 13, 2025
8eef272
Merge branch 'main' into bytes_limiting
portertech Nov 14, 2025
1db34ab
Merge branch 'main' into bytes_limiting
portertech Nov 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/tailsampling-processor-bytes-limiting-policy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: tailsamplingprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add bytes_limiting policy type, sample based on the rate of bytes per second using a token bucket algorithm.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [42509]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion connector/datadogconnector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ require (
golang.org/x/sys v0.36.0 // indirect
golang.org/x/term v0.35.0 // indirect
golang.org/x/text v0.29.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/time v0.13.0 // indirect
gonum.org/v1/gonum v0.16.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250721164621-a45f3dfb1074 // indirect
Expand Down
4 changes: 2 additions & 2 deletions connector/datadogconnector/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exporter/datadogexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ require (
golang.org/x/sys v0.36.0 // indirect
golang.org/x/term v0.35.0 // indirect
golang.org/x/text v0.29.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/time v0.13.0 // indirect
golang.org/x/tools v0.36.0 // indirect
gonum.org/v1/gonum v0.16.0 // indirect
google.golang.org/api v0.238.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions exporter/datadogexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exporter/datadogexporter/integrationtest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ require (
golang.org/x/sys v0.36.0 // indirect
golang.org/x/term v0.35.0 // indirect
golang.org/x/text v0.29.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/time v0.13.0 // indirect
gonum.org/v1/gonum v0.16.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250721164621-a45f3dfb1074 // indirect
Expand Down
4 changes: 2 additions & 2 deletions exporter/datadogexporter/integrationtest/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 52 additions & 3 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Multiple policies exist today and it is straight forward to add more. These incl
- `string_attribute`: Sample based on string attributes (resource and record) value matches, both exact and regex value matches are supported
- `trace_state`: Sample based on [TraceState](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#tracestate) value matches
- `rate_limiting`: Sample based on the rate of spans per second.
- `bytes_limiting`: Sample based on the rate of bytes per second using a token bucket algorithm implemented by golang.org/x/time/rate. This allows for burst traffic up to a configurable capacity while maintaining the average rate over time. The bucket is refilled continuously at the specified rate and has a maximum capacity for burst handling.
- `span_count`: Sample based on the minimum and/or maximum number of spans, inclusive. If the sum of all spans in the trace is outside the range threshold, the trace will not be sampled.
- `boolean_attribute`: Sample based on boolean attribute (resource and record).
- `ottl_condition`: Sample based on given boolean OTTL condition (span and span event).
Expand Down Expand Up @@ -127,21 +128,26 @@ processors:
},
{
name: test-policy-9,
type: bytes_limiting,
bytes_limiting: {bytes_per_second: 1024000, burst_capacity: 2048000}
},
{
name: test-policy-10,
type: span_count,
span_count: {min_spans: 2, max_spans: 20}
},
{
name: test-policy-10,
name: test-policy-11,
type: trace_state,
trace_state: { key: key3, values: [value1, value2] }
},
{
name: test-policy-11,
name: test-policy-12,
type: boolean_attribute,
boolean_attribute: {key: key4, value: true}
},
{
name: test-policy-12,
name: test-policy-13,
type: ottl_condition,
ottl_condition: {
error_mode: ignore,
Expand Down Expand Up @@ -230,6 +236,49 @@ processors:

Refer to [tail_sampling_config.yaml](./testdata/tail_sampling_config.yaml) for detailed examples on using the processor.

## Bytes Limiting Policy

The `bytes_limiting` policy uses a token bucket algorithm implemented by [golang.org/x/time/rate](https://pkg.go.dev/golang.org/x/time/rate) to control the rate of data throughput based on the accurate protobuf marshaled size of traces calculated using the OpenTelemetry Collector's built-in `ProtoMarshaler.TracesSize()` method. This policy is particularly useful for:

- **Volume control**: Limiting the total amount of trace data processed per unit time
- **Burst handling**: Allowing short-term spikes in data volume while maintaining long-term rate limits
- **Memory protection**: Preventing downstream systems from being overwhelmed by large traces

### Configuration

The `bytes_limiting` policy supports the following configuration parameters:

- `bytes_per_second`: The sustained rate at which bytes are allowed through (required)
- `burst_capacity`: The maximum number of bytes that can be processed in a burst (optional, defaults to 2x `bytes_per_second`)

### Token Bucket Algorithm

The policy implements a token bucket algorithm where:

1. **Tokens represent bytes**: Each token in the bucket represents one byte of trace data
2. **Continuous refill**: Tokens are added to the bucket at the configured `bytes_per_second` rate
3. **Burst capacity**: The bucket can hold up to `burst_capacity` tokens for handling traffic bursts
4. **Consumption**: When a trace arrives, tokens equal to the trace size are consumed from the bucket
5. **Rejection**: If insufficient tokens are available, the trace is not sampled

### Example Configuration

```yaml
processors:
tail_sampling:
policies:
- name: volume-control
type: bytes_limiting
bytes_limiting:
bytes_per_second: 1048576 # 1 MB/second sustained rate
burst_capacity: 5242880 # 5 MB burst capacity
```

This configuration allows:
- A sustained throughput of 1 MB/second (1,048,576 bytes/s)
- Burst traffic up to 5 MB (5,242,880 bytes) before rate limiting kicks in
- Smooth handling of variable trace sizes and timing

## A Practical Example

Imagine that you wish to configure the processor to implement the following rules:
Expand Down
14 changes: 14 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
// OTTLCondition sample traces which match user provided OpenTelemetry Transformation Language
// conditions.
OTTLCondition PolicyType = "ottl_condition"
// BytesLimiting allows all traces until the specified byte limits are satisfied.
BytesLimiting PolicyType = "bytes_limiting"
)

// sharedPolicyCfg holds the common configuration to all policies that are used in derivative policy configurations
Expand All @@ -66,6 +68,8 @@ type sharedPolicyCfg struct {
StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"`
// Configs for rate limiting filter sampling policy evaluator.
RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"`
// Configs for bytes limiting filter sampling policy evaluator.
BytesLimitingCfg BytesLimitingCfg `mapstructure:"bytes_limiting"`
// Configs for span count filter sampling policy evaluator.
SpanCountCfg SpanCountCfg `mapstructure:"span_count"`
// Configs for defining trace_state policy
Expand Down Expand Up @@ -203,6 +207,16 @@ type RateLimitingCfg struct {
SpansPerSecond int64 `mapstructure:"spans_per_second"`
}

// BytesLimitingCfg holds the configurable settings to create a bytes limiting
// sampling policy evaluator using a token bucket algorithm.
type BytesLimitingCfg struct {
// BytesPerSecond sets the limit on the maximum number of bytes that can be processed each second.
BytesPerSecond int64 `mapstructure:"bytes_per_second"`
// BurstCapacity sets the maximum burst capacity in bytes. If not specified, defaults to 2x BytesPerSecond.
// This allows for short bursts of traffic above the sustained rate.
BurstCapacity int64 `mapstructure:"burst_capacity"`
}

// SpanCountCfg holds the configurable settings to create a Span Count filter sampling
// policy evaluator
type SpanCountCfg struct {
Expand Down
15 changes: 11 additions & 4 deletions processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,28 +87,35 @@ func TestLoadConfig(t *testing.T) {
},
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-policy-8",
Name: "test-policy-8",
Type: BytesLimiting,
BytesLimitingCfg: BytesLimitingCfg{BytesPerSecond: 1024000, BurstCapacity: 2048000},
},
},
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-policy-9",
Type: SpanCount,
SpanCountCfg: SpanCountCfg{MinSpans: 2},
},
},
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-policy-9",
Name: "test-policy-10",
Type: TraceState,
TraceStateCfg: TraceStateCfg{Key: "key3", Values: []string{"value1", "value2"}},
},
},
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-policy-10",
Name: "test-policy-11",
Type: BooleanAttribute,
BooleanAttributeCfg: BooleanAttributeCfg{Key: "key4", Value: true},
},
},
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-policy-11",
Name: "test-policy-12",
Type: OTTLCondition,
OTTLConditionCfg: OTTLConditionCfg{
ErrorMode: ottl.IgnoreError,
Expand Down
1 change: 1 addition & 0 deletions processor/tailsamplingprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
go.opentelemetry.io/collector/component/componenttest v0.135.0
go.opentelemetry.io/collector/consumer/consumertest v0.135.0
go.opentelemetry.io/collector/processor/processortest v0.135.0
golang.org/x/time v0.13.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions processor/tailsamplingprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sampling // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"golang.org/x/time/rate"
)

type bytesLimiting struct {
// Rate limiter using golang.org/x/time/rate for efficient token bucket implementation
limiter *rate.Limiter
}

var _ PolicyEvaluator = (*bytesLimiting)(nil)

Check failure on line 21 in processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go

View workflow job for this annotation

GitHub Actions / govulncheck (processor-1)

undefined: PolicyEvaluator

// NewBytesLimiting creates a policy evaluator that samples traces based on byte limit per second using a token bucket algorithm.
// The bucket capacity defaults to 2x the bytes per second to allow for reasonable burst traffic.
func NewBytesLimiting(settings component.TelemetrySettings, bytesPerSecond int64) PolicyEvaluator {

Check failure on line 25 in processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go

View workflow job for this annotation

GitHub Actions / govulncheck (processor-1)

undefined: PolicyEvaluator
return NewBytesLimitingWithBurstCapacity(settings, bytesPerSecond, bytesPerSecond*2)
}

// NewBytesLimitingWithBurstCapacity creates a policy evaluator with custom burst capacity.
// Uses golang.org/x/time/rate.Limiter for efficient, thread-safe token bucket implementation.
func NewBytesLimitingWithBurstCapacity(_ component.TelemetrySettings, bytesPerSecond, burstCapacity int64) PolicyEvaluator {

Check failure on line 31 in processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go

View workflow job for this annotation

GitHub Actions / govulncheck (processor-1)

undefined: PolicyEvaluator
// Create rate limiter with specified rate and burst capacity
// rate.Limit is tokens per second (bytes per second in our case)
// burst capacity is the maximum number of tokens (bytes) that can be consumed in a burst
limiter := rate.NewLimiter(rate.Limit(bytesPerSecond), int(burstCapacity))

return &bytesLimiting{
limiter: limiter,
}
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision based on token bucket algorithm.
// Uses golang.org/x/time/rate.Limiter.AllowN() for efficient, thread-safe token consumption.
func (b *bytesLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *TraceData) (Decision, error) {

Check failure on line 44 in processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go

View workflow job for this annotation

GitHub Actions / govulncheck (processor-1)

undefined: Decision

Check failure on line 44 in processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go

View workflow job for this annotation

GitHub Actions / govulncheck (processor-1)

undefined: TraceData
// Calculate the size of the trace in bytes
traceSize := calculateTraceSize(trace)

// Use AllowN to check if we can consume 'traceSize' tokens
// AllowN returns true if the limiter allows the event and false otherwise
// The limiter automatically handles token bucket refill and thread safety
if b.limiter.AllowN(time.Now(), int(traceSize)) {
return Sampled, nil

Check failure on line 52 in processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go

View workflow job for this annotation

GitHub Actions / govulncheck (processor-1)

undefined: Sampled
}

return NotSampled, nil

Check failure on line 55 in processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go

View workflow job for this annotation

GitHub Actions / govulncheck (processor-1)

undefined: NotSampled
}

// calculateTraceSize calculates the accurate protobuf marshaled size of a trace in bytes
// using the OpenTelemetry Collector's built-in ProtoMarshaler.TracesSize() method
func calculateTraceSize(trace *TraceData) int64 {

Check failure on line 60 in processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go

View workflow job for this annotation

GitHub Actions / govulncheck (processor-1)

undefined: TraceData
trace.Lock()
defer trace.Unlock()

// Use the OpenTelemetry Collector's built-in method for accurate size calculation
// This gives us the exact protobuf marshaled size
marshaler := &ptrace.ProtoMarshaler{}
return int64(marshaler.TracesSize(trace.ReceivedBatches))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes the assumption that the traces will be serialized to proto format, is there a way to allow a different encoding, for example using the encoding extension?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, when would this processor encounter other encodings?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTLP JSON is one. Splunk HEC another. You can see all the encodings we support under extensions/encoding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opening #44416 to follow up.

}
Loading
Loading