Skip to content

Commit a73b405

Browse files
committed
Use Func pattern in processorhelper, consistent with others
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
1 parent e5f9876 commit a73b405

File tree

19 files changed

+62
-94
lines changed

19 files changed

+62
-94
lines changed

processor/attributesprocessor/attributes_log.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ func newLogAttributesProcessor(attrProc *processorhelper.AttrProc, include, excl
3939
}
4040
}
4141

42-
// ProcessLogs implements the LogsProcessor
43-
func (a *logAttributesProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
42+
func (a *logAttributesProcessor) processLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
4443
rls := ld.ResourceLogs()
4544
for i := 0; i < rls.Len(); i++ {
4645
rs := rls.At(i)

processor/attributesprocessor/attributes_trace.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ func newSpanAttributesProcessor(attrProc *processorhelper.AttrProc, include, exc
3939
}
4040
}
4141

42-
// ProcessTraces implements the TProcessor
43-
func (a *spanAttributesProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
42+
func (a *spanAttributesProcessor) processTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
4443
rss := td.ResourceSpans()
4544
for i := 0; i < rss.Len(); i++ {
4645
rs := rss.At(i)

processor/attributesprocessor/factory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func createTracesProcessor(
7575
return processorhelper.NewTracesProcessor(
7676
cfg,
7777
nextConsumer,
78-
newSpanAttributesProcessor(attrProc, include, exclude),
78+
newSpanAttributesProcessor(attrProc, include, exclude).processTraces,
7979
processorhelper.WithCapabilities(processorCapabilities))
8080
}
8181

@@ -105,6 +105,6 @@ func createLogProcessor(
105105
return processorhelper.NewLogsProcessor(
106106
cfg,
107107
nextConsumer,
108-
newLogAttributesProcessor(attrProc, include, exclude),
108+
newLogAttributesProcessor(attrProc, include, exclude).processLogs,
109109
processorhelper.WithCapabilities(processorCapabilities))
110110
}

processor/filterprocessor/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,6 @@ func createMetricsProcessor(
5757
return processorhelper.NewMetricsProcessor(
5858
cfg,
5959
nextConsumer,
60-
fp,
60+
fp.processMetrics,
6161
processorhelper.WithCapabilities(processorCapabilities))
6262
}

processor/filterprocessor/filter_processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ func createMatcher(mp *filtermetric.MatchProperties) (filtermetric.Matcher, filt
113113
return nameMatcher, attributeMatcher, err
114114
}
115115

116-
// ProcessMetrics filters the given metrics based off the filterMetricProcessor's filters.
117-
func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, pdm pdata.Metrics) (pdata.Metrics, error) {
116+
// processMetrics filters the given metrics based off the filterMetricProcessor's filters.
117+
func (fmp *filterMetricProcessor) processMetrics(_ context.Context, pdm pdata.Metrics) (pdata.Metrics, error) {
118118
pdm.ResourceMetrics().RemoveIf(func(rm pdata.ResourceMetrics) bool {
119119
keepMetricsForResource := fmp.shouldKeepMetricsForResource(rm.Resource())
120120
if !keepMetricsForResource {

processor/memorylimiter/factory.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func createTracesProcessor(
6161
return processorhelper.NewTracesProcessor(
6262
cfg,
6363
nextConsumer,
64-
ml,
64+
ml.processTraces,
6565
processorhelper.WithCapabilities(processorCapabilities),
6666
processorhelper.WithShutdown(ml.shutdown))
6767
}
@@ -79,7 +79,7 @@ func createMetricsProcessor(
7979
return processorhelper.NewMetricsProcessor(
8080
cfg,
8181
nextConsumer,
82-
ml,
82+
ml.processMetrics,
8383
processorhelper.WithCapabilities(processorCapabilities),
8484
processorhelper.WithShutdown(ml.shutdown))
8585
}
@@ -97,7 +97,7 @@ func createLogsProcessor(
9797
return processorhelper.NewLogsProcessor(
9898
cfg,
9999
nextConsumer,
100-
ml,
100+
ml.processLogs,
101101
processorhelper.WithCapabilities(processorCapabilities),
102102
processorhelper.WithShutdown(ml.shutdown))
103103
}

processor/memorylimiter/memorylimiter.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,7 @@ func (ml *memoryLimiter) shutdown(context.Context) error {
147147
return nil
148148
}
149149

150-
// ProcessTraces implements the TProcessor interface
151-
func (ml *memoryLimiter) ProcessTraces(ctx context.Context, td pdata.Traces) (pdata.Traces, error) {
150+
func (ml *memoryLimiter) processTraces(ctx context.Context, td pdata.Traces) (pdata.Traces, error) {
152151
numSpans := td.SpanCount()
153152
if ml.forcingDrop() {
154153
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
@@ -167,8 +166,7 @@ func (ml *memoryLimiter) ProcessTraces(ctx context.Context, td pdata.Traces) (pd
167166
return td, nil
168167
}
169168

170-
// ProcessMetrics implements the MProcessor interface
171-
func (ml *memoryLimiter) ProcessMetrics(ctx context.Context, md pdata.Metrics) (pdata.Metrics, error) {
169+
func (ml *memoryLimiter) processMetrics(ctx context.Context, md pdata.Metrics) (pdata.Metrics, error) {
172170
numDataPoints := md.DataPointCount()
173171
if ml.forcingDrop() {
174172
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
@@ -186,8 +184,7 @@ func (ml *memoryLimiter) ProcessMetrics(ctx context.Context, md pdata.Metrics) (
186184
return md, nil
187185
}
188186

189-
// ProcessLogs implements the LProcessor interface
190-
func (ml *memoryLimiter) ProcessLogs(ctx context.Context, ld pdata.Logs) (pdata.Logs, error) {
187+
func (ml *memoryLimiter) processLogs(ctx context.Context, ld pdata.Logs) (pdata.Logs, error) {
191188
numRecords := ld.LogRecordCount()
192189
if ml.forcingDrop() {
193190
// TODO: actually to be 100% sure that this is "refused" and not "dropped"

processor/memorylimiter/memorylimiter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
122122
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
123123
},
124124
consumertest.NewNop(),
125-
ml,
125+
ml.processMetrics,
126126
processorhelper.WithCapabilities(processorCapabilities),
127127
processorhelper.WithShutdown(ml.shutdown))
128128
require.NoError(t, err)
@@ -193,7 +193,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
193193
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
194194
},
195195
consumertest.NewNop(),
196-
ml,
196+
ml.processTraces,
197197
processorhelper.WithCapabilities(processorCapabilities),
198198
processorhelper.WithShutdown(ml.shutdown))
199199
require.NoError(t, err)
@@ -264,7 +264,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
264264
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
265265
},
266266
consumertest.NewNop(),
267-
ml,
267+
ml.processLogs,
268268
processorhelper.WithCapabilities(processorCapabilities),
269269
processorhelper.WithShutdown(ml.shutdown))
270270
require.NoError(t, err)

processor/probabilisticsamplerprocessor/probabilisticsampler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ func newTracesProcessor(nextConsumer consumer.Traces, cfg *Config) (component.Tr
6565
return processorhelper.NewTracesProcessor(
6666
cfg,
6767
nextConsumer,
68-
tsp,
68+
tsp.processTraces,
6969
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}))
7070
}
7171

72-
func (tsp *tracesamplerprocessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
72+
func (tsp *tracesamplerprocessor) processTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
7373
td.ResourceSpans().RemoveIf(func(rs pdata.ResourceSpans) bool {
7474
rs.InstrumentationLibrarySpans().RemoveIf(func(ils pdata.InstrumentationLibrarySpans) bool {
7575
ils.Spans().RemoveIf(func(s pdata.Span) bool {

processor/processorhelper/logs.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,9 @@ import (
2929
"go.opentelemetry.io/collector/model/pdata"
3030
)
3131

32-
// LProcessor is a helper interface that allows avoiding implementing all functions in LogsProcessor by using NewLogsProcessor.
33-
type LProcessor interface {
34-
// ProcessLogs is a helper function that processes the incoming data and returns the data to be sent to the next component.
35-
// If error is returned then returned data are ignored. It MUST not call the next component.
36-
ProcessLogs(context.Context, pdata.Logs) (pdata.Logs, error)
37-
}
32+
// ProcessLogsFunc is a helper function that processes the incoming data and returns the data to be sent to the next component.
33+
// If error is returned then returned data are ignored. It MUST not call the next component.
34+
type ProcessLogsFunc func(context.Context, pdata.Logs) (pdata.Logs, error)
3835

3936
type logProcessor struct {
4037
component.Component
@@ -46,11 +43,11 @@ type logProcessor struct {
4643
func NewLogsProcessor(
4744
cfg config.Processor,
4845
nextConsumer consumer.Logs,
49-
processor LProcessor,
46+
logsFunc ProcessLogsFunc,
5047
options ...Option,
5148
) (component.LogsProcessor, error) {
52-
if processor == nil {
53-
return nil, errors.New("nil processor")
49+
if logsFunc == nil {
50+
return nil, errors.New("nil logsFunc")
5451
}
5552

5653
if nextConsumer == nil {
@@ -63,7 +60,7 @@ func NewLogsProcessor(
6360
span := trace.SpanFromContext(ctx)
6461
span.AddEvent("Start processing.", eventOptions)
6562
var err error
66-
ld, err = processor.ProcessLogs(ctx, ld)
63+
ld, err = logsFunc(ctx, ld)
6764
span.AddEvent("End processing.", eventOptions)
6865
if err != nil {
6966
if errors.Is(err, ErrSkipProcessingData) {

0 commit comments

Comments
 (0)