-
Notifications
You must be signed in to change notification settings - Fork 3.4k
[processor/tailsampling] Added bytes_limiting policy type #42509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 27 commits
5354ed8
e3a70a3
0eabb42
7395580
100dc78
60491a2
9035ebf
01aa675
136a7e6
47f648e
5227b30
8b14194
7c8585c
95349b6
0923183
2a5adb1
20259f9
7c75171
97fb2ed
8ac0184
910d490
99b06f7
d52e900
44f421d
6d9bb67
9dd8ae1
2a9d8a7
70445d1
cb2216d
6f584c5
1976685
5f048ae
744f623
d9d2ba3
b5712d7
fe5bcff
42389c7
1141ee9
8eef272
1db34ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| # Use this changelog template to create an entry for release notes. | ||
|
|
||
| # One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
| change_type: enhancement | ||
|
|
||
| # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
| component: processor/tail_sampling | ||
|
|
||
| # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
| note: Add bytes_limiting policy type, sample based on the rate of bytes per second using a token bucket algorithm. | ||
|
|
||
| # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
| issues: [42509] | ||
|
|
||
| # (Optional) One or more lines of additional information to render under the primary note. | ||
| # These lines will be padded with 2 spaces and then inserted directly into the document. | ||
| # Use pipe (|) for multiline entries. | ||
| subtext: | ||
|
|
||
| # If your change doesn't affect end users or the exported elements of any package, | ||
| # you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
| # Optional: The change log or logs in which this entry should be included. | ||
| # e.g. '[user]' or '[user, api]' | ||
| # Include 'user' if the change is relevant to end users. | ||
| # Include 'api' if there is a change to a library API. | ||
| # Default: '[user]' | ||
| change_logs: [] |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package sampling // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" | ||
|
|
||
| import ( | ||
| "context" | ||
| "time" | ||
|
|
||
| "go.opentelemetry.io/collector/component" | ||
| "go.opentelemetry.io/collector/pdata/pcommon" | ||
| "go.opentelemetry.io/collector/pdata/ptrace" | ||
| "golang.org/x/time/rate" | ||
|
|
||
| "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" | ||
| ) | ||
|
|
||
| type bytesLimiting struct { | ||
| // Rate limiter using golang.org/x/time/rate for efficient token bucket implementation | ||
| limiter *rate.Limiter | ||
| } | ||
|
|
||
| var _ samplingpolicy.Evaluator = (*bytesLimiting)(nil) | ||
|
|
||
| // NewBytesLimiting creates a policy evaluator that samples traces based on byte limit per second using a token bucket algorithm. | ||
| // The bucket capacity defaults to 2x the bytes per second to allow for reasonable burst traffic. | ||
| func NewBytesLimiting(settings component.TelemetrySettings, bytesPerSecond int64) samplingpolicy.Evaluator { | ||
| return NewBytesLimitingWithBurstCapacity(settings, bytesPerSecond, bytesPerSecond*2) | ||
| } | ||
|
|
||
| // NewBytesLimitingWithBurstCapacity creates a policy evaluator with custom burst capacity. | ||
| // Uses golang.org/x/time/rate.Limiter for efficient, thread-safe token bucket implementation. | ||
| func NewBytesLimitingWithBurstCapacity(_ component.TelemetrySettings, bytesPerSecond, burstCapacity int64) samplingpolicy.Evaluator { | ||
| // Create rate limiter with specified rate and burst capacity | ||
| // rate.Limit is tokens per second (bytes per second in our case) | ||
| // burst capacity is the maximum number of tokens (bytes) that can be consumed in a single request | ||
| limiter := rate.NewLimiter(rate.Limit(bytesPerSecond), int(burstCapacity)) | ||
|
|
||
| return &bytesLimiting{ | ||
| limiter: limiter, | ||
| } | ||
| } | ||
|
|
||
| // Evaluate looks at the trace data and returns a corresponding SamplingDecision based on token bucket algorithm. | ||
| // Uses golang.org/x/time/rate.Limiter.AllowN() for efficient, thread-safe token consumption. | ||
| func (b *bytesLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { | ||
| // Calculate the size of the trace in bytes | ||
| traceSize := calculateTraceSize(trace) | ||
|
|
||
| // Use AllowN to check if we can consume 'traceSize' tokens | ||
| // AllowN returns true if the limiter allows the event and false otherwise | ||
| // The limiter automatically handles token bucket refill and thread safety | ||
| if b.limiter.AllowN(time.Now(), int(traceSize)) { | ||
| return samplingpolicy.Sampled, nil | ||
| } | ||
|
|
||
| return samplingpolicy.NotSampled, nil | ||
| } | ||
|
|
||
| // calculateTraceSize calculates the accurate protobuf marshaled size of a trace in bytes | ||
| // using the OpenTelemetry Collector's built-in ProtoMarshaler.TracesSize() method | ||
| func calculateTraceSize(trace *samplingpolicy.TraceData) int64 { | ||
| trace.Lock() | ||
| defer trace.Unlock() | ||
|
|
||
| // Use the OpenTelemetry Collector's built-in method for accurate size calculation | ||
| // This gives us the exact protobuf marshaled size | ||
| marshaler := &ptrace.ProtoMarshaler{} | ||
| return int64(marshaler.TracesSize(trace.ReceivedBatches)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this makes the assumption that the traces will be serialized to proto format, is there a way to allow a different encoding, for example using the encoding extension?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. curious, when would this processor encounter other encodings?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OTLP JSON is one. Splunk HEC another. You can see all the encodings we support under extensions/encoding.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Opening #44416 to follow up. |
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.