Skip to content

Commit 2efeae4

Browse files
authored
[exporter] moved mergeBatchFunc and mergeBatchSplitFunc to request (open-telemetry#11459)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR changes `mergeBatchFunc` and `mergeBatchSplit` function as a member function of `batchRequest`. <!-- Issue number if applicable --> #### Link to tracking issue <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 5cd035b commit 2efeae4

22 files changed

+348
-423
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Made mergeFunc and mergeSplitFunc required method of exporter.Request
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [10368]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
mergeFunc and mergeSplitFunc used to be part of the configuration pass to the exporter. Now it is changed
20+
| to be a method function of request.
21+
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]

exporter/exporterbatcher/batch_func.go

Lines changed: 0 additions & 24 deletions
This file was deleted.

exporter/exporterhelper/common.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,3 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
6969
func WithBatcher(cfg exporterbatcher.Config) Option {
7070
return internal.WithBatcher(cfg)
7171
}
72-
73-
// WithBatchFuncs enables setting custom batch merge functions.
74-
// This API is at the early stage of development and may change without backward compatibility
75-
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
76-
func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request],
77-
msf exporterbatcher.BatchMergeSplitFunc[Request]) Option {
78-
return internal.WithBatchFuncs(mf, msf)
79-
}

exporter/exporterhelper/exporterhelperprofiles/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ require (
1414
go.opentelemetry.io/collector/exporter v0.111.0
1515
go.opentelemetry.io/collector/exporter/exporterprofiles v0.111.0
1616
go.opentelemetry.io/collector/exporter/exportertest v0.111.0
17-
go.opentelemetry.io/collector/pdata v1.17.0
1817
go.opentelemetry.io/collector/pdata/pprofile v0.111.0
1918
go.opentelemetry.io/collector/pdata/testdata v0.111.0
2019
go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.0.0-20241021162523-3193106bf4b1
@@ -38,6 +37,7 @@ require (
3837
go.opentelemetry.io/collector/config/configtelemetry v0.111.0 // indirect
3938
go.opentelemetry.io/collector/extension v0.111.0 // indirect
4039
go.opentelemetry.io/collector/extension/experimental/storage v0.111.0 // indirect
40+
go.opentelemetry.io/collector/pdata v1.17.0 // indirect
4141
go.opentelemetry.io/collector/pipeline v0.111.0 // indirect
4242
go.opentelemetry.io/collector/receiver v0.111.0 // indirect
4343
go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.0 // indirect

exporter/exporterhelper/exporterhelperprofiles/profiles.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ func NewProfilesExporter(
8888
}
8989
profilesOpts := []exporterhelper.Option{
9090
internal.WithMarshaler(profilesRequestMarshaler), internal.WithUnmarshaler(newProfileRequestUnmarshalerFunc(pusher)),
91-
internal.WithBatchFuncs(mergeProfiles, mergeSplitProfiles),
9291
}
9392
return NewProfilesRequestExporter(ctx, set, requestFromProfiles(pusher), append(profilesOpts, options...)...)
9493
}

exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,28 @@ import (
1212
"go.opentelemetry.io/collector/pdata/pprofile"
1313
)
1414

15-
// mergeProfiles merges two profiles requests into one.
16-
func mergeProfiles(_ context.Context, r1 exporterhelper.Request, r2 exporterhelper.Request) (exporterhelper.Request, error) {
17-
tr1, ok1 := r1.(*profilesRequest)
15+
// Merge merges two profiles requests into one.
16+
func (req *profilesRequest) Merge(_ context.Context, r2 exporterhelper.Request) (exporterhelper.Request, error) {
1817
tr2, ok2 := r2.(*profilesRequest)
19-
if !ok1 || !ok2 {
18+
if !ok2 {
2019
return nil, errors.New("invalid input type")
2120
}
22-
tr2.pd.ResourceProfiles().MoveAndAppendTo(tr1.pd.ResourceProfiles())
23-
return tr1, nil
21+
tr2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles())
22+
return req, nil
2423
}
2524

26-
// mergeSplitProfiles splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
27-
func mergeSplitProfiles(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 exporterhelper.Request, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
25+
// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
26+
func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
2827
var (
2928
res []exporterhelper.Request
3029
destReq *profilesRequest
3130
capacityLeft = cfg.MaxSizeItems
3231
)
33-
for _, req := range []exporterhelper.Request{r1, r2} {
34-
if req == nil {
32+
for _, r := range []exporterhelper.Request{req, r2} {
33+
if r == nil {
3534
continue
3635
}
37-
srcReq, ok := req.(*profilesRequest)
36+
srcReq, ok := r.(*profilesRequest)
3837
if !ok {
3938
return nil, errors.New("invalid input type")
4039
}

exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,25 @@ import (
1212
"github.com/stretchr/testify/assert"
1313
"github.com/stretchr/testify/require"
1414

15-
"go.opentelemetry.io/collector/consumer"
1615
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1716
"go.opentelemetry.io/collector/exporter/exporterhelper"
1817
"go.opentelemetry.io/collector/pdata/pprofile"
19-
"go.opentelemetry.io/collector/pdata/ptrace"
2018
"go.opentelemetry.io/collector/pdata/testdata"
2119
)
2220

2321
func TestMergeProfiles(t *testing.T) {
2422
pr1 := &profilesRequest{pd: testdata.GenerateProfiles(2)}
2523
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
26-
res, err := mergeProfiles(context.Background(), pr1, pr2)
24+
res, err := pr1.Merge(context.Background(), pr2)
2725
require.NoError(t, err)
2826
fmt.Fprintf(os.Stdout, "%#v\n", res.(*profilesRequest).pd)
2927
assert.Equal(t, 5, res.(*profilesRequest).pd.SampleCount())
3028
}
3129

3230
func TestMergeProfilesInvalidInput(t *testing.T) {
33-
pr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
31+
pr1 := &dummyRequest{}
3432
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
35-
_, err := mergeProfiles(context.Background(), pr1, pr2)
33+
_, err := pr2.Merge(context.Background(), pr1)
3634
assert.Error(t, err)
3735
}
3836

@@ -51,13 +49,6 @@ func TestMergeSplitProfiles(t *testing.T) {
5149
pr2: &profilesRequest{pd: pprofile.NewProfiles()},
5250
expected: []*profilesRequest{{pd: pprofile.NewProfiles()}},
5351
},
54-
{
55-
name: "both_requests_nil",
56-
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
57-
pr1: nil,
58-
pr2: nil,
59-
expected: []*profilesRequest{},
60-
},
6152
{
6253
name: "first_request_empty",
6354
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
@@ -66,17 +57,10 @@ func TestMergeSplitProfiles(t *testing.T) {
6657
expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}},
6758
},
6859
{
69-
name: "first_requests_nil",
70-
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
71-
pr1: nil,
72-
pr2: &profilesRequest{pd: testdata.GenerateProfiles(5)},
73-
expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}},
74-
},
75-
{
76-
name: "first_nil_second_empty",
60+
name: "first_empty_second_nil",
7761
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
78-
pr1: nil,
79-
pr2: &profilesRequest{pd: pprofile.NewProfiles()},
62+
pr1: &profilesRequest{pd: pprofile.NewProfiles()},
63+
pr2: nil,
8064
expected: []*profilesRequest{{pd: pprofile.NewProfiles()}},
8165
},
8266
{
@@ -93,8 +77,8 @@ func TestMergeSplitProfiles(t *testing.T) {
9377
{
9478
name: "split_only",
9579
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4},
96-
pr1: nil,
97-
pr2: &profilesRequest{pd: testdata.GenerateProfiles(10)},
80+
pr1: &profilesRequest{pd: testdata.GenerateProfiles(10)},
81+
pr2: nil,
9882
expected: []*profilesRequest{
9983
{pd: testdata.GenerateProfiles(4)},
10084
{pd: testdata.GenerateProfiles(4)},
@@ -133,7 +117,7 @@ func TestMergeSplitProfiles(t *testing.T) {
133117
}
134118
for _, tt := range tests {
135119
t.Run(tt.name, func(t *testing.T) {
136-
res, err := mergeSplitProfiles(context.Background(), tt.cfg, tt.pr1, tt.pr2)
120+
res, err := tt.pr1.MergeSplit(context.Background(), tt.cfg, tt.pr2)
137121
require.NoError(t, err)
138122
assert.Equal(t, len(tt.expected), len(res))
139123
for i, r := range res {
@@ -145,9 +129,9 @@ func TestMergeSplitProfiles(t *testing.T) {
145129
}
146130

147131
func TestMergeSplitProfilesInvalidInput(t *testing.T) {
148-
r1 := &tracesRequest{td: testdata.GenerateTraces(2)}
132+
r1 := &dummyRequest{}
149133
r2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
150-
_, err := mergeSplitProfiles(context.Background(), exporterbatcher.MaxSizeConfig{}, r1, r2)
134+
_, err := r2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r1)
151135
assert.Error(t, err)
152136
}
153137

@@ -160,15 +144,23 @@ func TestExtractProfiles(t *testing.T) {
160144
}
161145
}
162146

163-
type tracesRequest struct {
164-
td ptrace.Traces
165-
pusher consumer.ConsumeTracesFunc
147+
// dummyRequest implements Request. It is for checking that merging two request types would fail
148+
type dummyRequest struct {
149+
}
150+
151+
func (req *dummyRequest) Export(_ context.Context) error {
152+
return nil
153+
}
154+
155+
func (req *dummyRequest) ItemsCount() int {
156+
return 1
166157
}
167158

168-
func (req *tracesRequest) Export(ctx context.Context) error {
169-
return req.pusher(ctx, req.td)
159+
func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.Request) (exporterhelper.Request, error) {
160+
return nil, nil
170161
}
171162

172-
func (req *tracesRequest) ItemsCount() int {
173-
return req.td.SpanCount()
163+
func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.Request) (
164+
[]exporterhelper.Request, error) {
165+
return nil, nil
174166
}

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ type BaseExporter struct {
3535

3636
Signal pipeline.Signal
3737

38-
BatchMergeFunc exporterbatcher.BatchMergeFunc[internal.Request]
39-
BatchMergeSplitfunc exporterbatcher.BatchMergeSplitFunc[internal.Request]
40-
4138
Marshaler exporterqueue.Marshaler[internal.Request]
4239
Unmarshaler exporterqueue.Unmarshaler[internal.Request]
4340

@@ -104,10 +101,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
104101
}
105102

106103
if be.BatcherCfg.Enabled {
107-
bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc)
108-
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
109-
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
110-
}
104+
bs := NewBatchSender(be.BatcherCfg, be.Set)
111105
be.BatchSender = bs
112106
}
113107

@@ -298,16 +292,6 @@ func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Op
298292
}
299293
}
300294

301-
// withBatchFuncs is used to set the functions for merging and splitting batches for OLTP-based exporters.
302-
// It must be provided as the first option when creating a new exporter helper.
303-
func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) Option {
304-
return func(o *BaseExporter) error {
305-
o.BatchMergeFunc = mf
306-
o.BatchMergeSplitfunc = msf
307-
return nil
308-
}
309-
}
310-
311295
func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
312296
if err != nil {
313297
require.Equal(t, codes.Error, sd.Status().Code, "SpanData %v", sd)

exporter/exporterhelper/internal/batch_sender.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ import (
2424
// - concurrencyLimit is reached.
2525
type BatchSender struct {
2626
BaseRequestSender
27-
cfg exporterbatcher.Config
28-
mergeFunc exporterbatcher.BatchMergeFunc[internal.Request]
29-
mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[internal.Request]
27+
cfg exporterbatcher.Config
3028

3129
// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
3230
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
@@ -46,14 +44,11 @@ type BatchSender struct {
4644
}
4745

4846
// newBatchSender returns a new batch consumer component.
49-
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings,
50-
mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) *BatchSender {
47+
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings) *BatchSender {
5148
bs := &BatchSender{
5249
activeBatch: newEmptyBatch(),
5350
cfg: cfg,
5451
logger: set.Logger,
55-
mergeFunc: mf,
56-
mergeSplitFunc: msf,
5752
shutdownCh: nil,
5853
shutdownCompleteCh: make(chan struct{}),
5954
stopped: &atomic.Bool{},
@@ -156,10 +151,17 @@ func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error {
156151
func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Request) error {
157152
bs.mu.Lock()
158153

159-
reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req)
160-
if err != nil || len(reqs) == 0 {
154+
var reqs []internal.Request
155+
var mergeSplitErr error
156+
if bs.activeBatch.request == nil {
157+
reqs, mergeSplitErr = req.MergeSplit(ctx, bs.cfg.MaxSizeConfig, nil)
158+
} else {
159+
reqs, mergeSplitErr = bs.activeBatch.request.MergeSplit(ctx, bs.cfg.MaxSizeConfig, req)
160+
}
161+
162+
if mergeSplitErr != nil || len(reqs) == 0 {
161163
bs.mu.Unlock()
162-
return err
164+
return mergeSplitErr
163165
}
164166

165167
bs.activeRequests.Add(1)
@@ -201,7 +203,7 @@ func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request)
201203

202204
if bs.activeBatch.request != nil {
203205
var err error
204-
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
206+
req, err = bs.activeBatch.request.Merge(ctx, req)
205207
if err != nil {
206208
bs.mu.Unlock()
207209
return err

0 commit comments

Comments
 (0)