Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions processor/attributesprocessor/attributes_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func newLogAttributesProcessor(attrProc *processorhelper.AttrProc, include, excl
}
}

// ProcessLogs implements the LogsProcessor
func (a *logAttributesProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
func (a *logAttributesProcessor) processLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rs := rls.At(i)
Expand Down
3 changes: 1 addition & 2 deletions processor/attributesprocessor/attributes_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func newSpanAttributesProcessor(attrProc *processorhelper.AttrProc, include, exc
}
}

// ProcessTraces implements the TProcessor
func (a *spanAttributesProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
func (a *spanAttributesProcessor) processTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)
Expand Down
4 changes: 2 additions & 2 deletions processor/attributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func createTracesProcessor(
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
newSpanAttributesProcessor(attrProc, include, exclude),
newSpanAttributesProcessor(attrProc, include, exclude).processTraces,
processorhelper.WithCapabilities(processorCapabilities))
}

Expand Down Expand Up @@ -105,6 +105,6 @@ func createLogProcessor(
return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
newLogAttributesProcessor(attrProc, include, exclude),
newLogAttributesProcessor(attrProc, include, exclude).processLogs,
processorhelper.WithCapabilities(processorCapabilities))
}
2 changes: 1 addition & 1 deletion processor/filterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ func createMetricsProcessor(
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
fp,
fp.processMetrics,
processorhelper.WithCapabilities(processorCapabilities))
}
4 changes: 2 additions & 2 deletions processor/filterprocessor/filter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func createMatcher(mp *filtermetric.MatchProperties) (filtermetric.Matcher, filt
return nameMatcher, attributeMatcher, err
}

// ProcessMetrics filters the given metrics based off the filterMetricProcessor's filters.
func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, pdm pdata.Metrics) (pdata.Metrics, error) {
// processMetrics filters the given metrics based off the filterMetricProcessor's filters.
func (fmp *filterMetricProcessor) processMetrics(_ context.Context, pdm pdata.Metrics) (pdata.Metrics, error) {
pdm.ResourceMetrics().RemoveIf(func(rm pdata.ResourceMetrics) bool {
keepMetricsForResource := fmp.shouldKeepMetricsForResource(rm.Resource())
if !keepMetricsForResource {
Expand Down
6 changes: 3 additions & 3 deletions processor/memorylimiter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func createTracesProcessor(
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
ml,
ml.processTraces,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
}
Expand All @@ -79,7 +79,7 @@ func createMetricsProcessor(
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
ml,
ml.processMetrics,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
}
Expand All @@ -97,7 +97,7 @@ func createLogsProcessor(
return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
ml,
ml.processLogs,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
}
9 changes: 3 additions & 6 deletions processor/memorylimiter/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ func (ml *memoryLimiter) shutdown(context.Context) error {
return nil
}

// ProcessTraces implements the TProcessor interface
func (ml *memoryLimiter) ProcessTraces(ctx context.Context, td pdata.Traces) (pdata.Traces, error) {
func (ml *memoryLimiter) processTraces(ctx context.Context, td pdata.Traces) (pdata.Traces, error) {
numSpans := td.SpanCount()
if ml.forcingDrop() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
Expand All @@ -167,8 +166,7 @@ func (ml *memoryLimiter) ProcessTraces(ctx context.Context, td pdata.Traces) (pd
return td, nil
}

// ProcessMetrics implements the MProcessor interface
func (ml *memoryLimiter) ProcessMetrics(ctx context.Context, md pdata.Metrics) (pdata.Metrics, error) {
func (ml *memoryLimiter) processMetrics(ctx context.Context, md pdata.Metrics) (pdata.Metrics, error) {
numDataPoints := md.DataPointCount()
if ml.forcingDrop() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
Expand All @@ -186,8 +184,7 @@ func (ml *memoryLimiter) ProcessMetrics(ctx context.Context, md pdata.Metrics) (
return md, nil
}

// ProcessLogs implements the LProcessor interface
func (ml *memoryLimiter) ProcessLogs(ctx context.Context, ld pdata.Logs) (pdata.Logs, error) {
func (ml *memoryLimiter) processLogs(ctx context.Context, ld pdata.Logs) (pdata.Logs, error) {
numRecords := ld.LogRecordCount()
if ml.forcingDrop() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
Expand Down
6 changes: 3 additions & 3 deletions processor/memorylimiter/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
},
consumertest.NewNop(),
ml,
ml.processMetrics,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
require.NoError(t, err)
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
},
consumertest.NewNop(),
ml,
ml.processTraces,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
require.NoError(t, err)
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
},
consumertest.NewNop(),
ml,
ml.processLogs,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ func newTracesProcessor(nextConsumer consumer.Traces, cfg *Config) (component.Tr
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
tsp,
tsp.processTraces,
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}

func (tsp *tracesamplerprocessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
func (tsp *tracesamplerprocessor) processTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
td.ResourceSpans().RemoveIf(func(rs pdata.ResourceSpans) bool {
rs.InstrumentationLibrarySpans().RemoveIf(func(ils pdata.InstrumentationLibrarySpans) bool {
ils.Spans().RemoveIf(func(s pdata.Span) bool {
Expand Down
17 changes: 7 additions & 10 deletions processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

// LProcessor is a helper interface that allows avoiding implementing all functions in LogsProcessor by using NewLogsProcessor.
type LProcessor interface {
// ProcessLogs is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessLogs(context.Context, pdata.Logs) (pdata.Logs, error)
}
// ProcessLogsFunc is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
type ProcessLogsFunc func(context.Context, pdata.Logs) (pdata.Logs, error)

type logProcessor struct {
component.Component
Expand All @@ -46,11 +43,11 @@ type logProcessor struct {
func NewLogsProcessor(
cfg config.Processor,
nextConsumer consumer.Logs,
processor LProcessor,
logsFunc ProcessLogsFunc,
options ...Option,
) (component.LogsProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
if logsFunc == nil {
return nil, errors.New("nil logsFunc")
}

if nextConsumer == nil {
Expand All @@ -63,7 +60,7 @@ func NewLogsProcessor(
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
ld, err = processor.ProcessLogs(ctx, ld)
ld, err = logsFunc(ctx, ld)
span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
Expand Down
14 changes: 4 additions & 10 deletions processor/processorhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,8 @@ func TestNewLogsProcessor_ProcessLogsErrSkipProcessingData(t *testing.T) {
assert.Equal(t, nil, lp.ConsumeLogs(context.Background(), pdata.NewLogs()))
}

type testLProcessor struct {
retError error
}

func newTestLProcessor(retError error) LProcessor {
return &testLProcessor{retError: retError}
}

func (tlp *testLProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
return ld, tlp.retError
func newTestLProcessor(retError error) ProcessLogsFunc {
return func(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
return ld, retError
}
}
17 changes: 7 additions & 10 deletions processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

// MProcessor is a helper interface that allows avoiding implementing all functions in MetricsProcessor by using NewTracesProcessor.
type MProcessor interface {
// ProcessMetrics is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessMetrics(context.Context, pdata.Metrics) (pdata.Metrics, error)
}
// ProcessMetricsFunc is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
type ProcessMetricsFunc func(context.Context, pdata.Metrics) (pdata.Metrics, error)

type metricsProcessor struct {
component.Component
Expand All @@ -46,11 +43,11 @@ type metricsProcessor struct {
func NewMetricsProcessor(
cfg config.Processor,
nextConsumer consumer.Metrics,
processor MProcessor,
metricsFunc ProcessMetricsFunc,
options ...Option,
) (component.MetricsProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
if metricsFunc == nil {
return nil, errors.New("nil metricsFunc")
}

if nextConsumer == nil {
Expand All @@ -63,7 +60,7 @@ func NewMetricsProcessor(
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
md, err = processor.ProcessMetrics(ctx, md)
md, err = metricsFunc(ctx, md)
span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
Expand Down
14 changes: 4 additions & 10 deletions processor/processorhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,8 @@ func TestNewMetricsProcessor_ProcessMetricsErrSkipProcessingData(t *testing.T) {
assert.Equal(t, nil, mp.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
}

type testMProcessor struct {
retError error
}

func newTestMProcessor(retError error) MProcessor {
return &testMProcessor{retError: retError}
}

func (tmp *testMProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
return md, tmp.retError
func newTestMProcessor(retError error) ProcessMetricsFunc {
return func(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
return md, retError
}
}
17 changes: 7 additions & 10 deletions processor/processorhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

// TProcessor is a helper interface that allows avoiding implementing all functions in TracesProcessor by using NewTracesProcessor.
type TProcessor interface {
// ProcessTraces is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessTraces(context.Context, pdata.Traces) (pdata.Traces, error)
}
// ProcessTracesFunc is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
type ProcessTracesFunc func(context.Context, pdata.Traces) (pdata.Traces, error)

type tracesProcessor struct {
component.Component
Expand All @@ -46,11 +43,11 @@ type tracesProcessor struct {
func NewTracesProcessor(
cfg config.Processor,
nextConsumer consumer.Traces,
processor TProcessor,
tracesFunc ProcessTracesFunc,
options ...Option,
) (component.TracesProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
if tracesFunc == nil {
return nil, errors.New("nil tracesFunc")
}

if nextConsumer == nil {
Expand All @@ -63,7 +60,7 @@ func NewTracesProcessor(
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
td, err = processor.ProcessTraces(ctx, td)
td, err = tracesFunc(ctx, td)
span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
Expand Down
14 changes: 4 additions & 10 deletions processor/processorhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,8 @@ func TestNewTracesProcessor_ProcessTracesErrSkipProcessingData(t *testing.T) {
assert.Equal(t, nil, tp.ConsumeTraces(context.Background(), pdata.NewTraces()))
}

type testTProcessor struct {
retError error
}

func newTestTProcessor(retError error) TProcessor {
return &testTProcessor{retError: retError}
}

func (ttp *testTProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
return td, ttp.retError
func newTestTProcessor(retError error) ProcessTracesFunc {
return func(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
return td, retError
}
}
9 changes: 6 additions & 3 deletions processor/resourceprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ func createTracesProcessor(
if err != nil {
return nil, err
}
proc := &resourceProcessor{attrProc: attrProc}
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
&resourceProcessor{attrProc: attrProc},
proc.processTraces,
processorhelper.WithCapabilities(processorCapabilities))
}

Expand All @@ -73,10 +74,11 @@ func createMetricsProcessor(
if err != nil {
return nil, err
}
proc := &resourceProcessor{attrProc: attrProc}
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
&resourceProcessor{attrProc: attrProc},
proc.processMetrics,
processorhelper.WithCapabilities(processorCapabilities))
}

Expand All @@ -89,10 +91,11 @@ func createLogsProcessor(
if err != nil {
return nil, err
}
proc := &resourceProcessor{attrProc: attrProc}
return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
&resourceProcessor{attrProc: attrProc},
proc.processLogs,
processorhelper.WithCapabilities(processorCapabilities))
}

Expand Down
9 changes: 3 additions & 6 deletions processor/resourceprocessor/resource_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,23 @@ type resourceProcessor struct {
attrProc *processorhelper.AttrProc
}

// ProcessTraces implements the TProcessor interface
func (rp *resourceProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
func (rp *resourceProcessor) processTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
rp.attrProc.Process(rss.At(i).Resource().Attributes())
}
return td, nil
}

// ProcessMetrics implements the MProcessor interface
func (rp *resourceProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
func (rp *resourceProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rp.attrProc.Process(rms.At(i).Resource().Attributes())
}
return md, nil
}

// ProcessLogs implements the LProcessor interface
func (rp *resourceProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
func (rp *resourceProcessor) processLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rp.attrProc.Process(rls.At(i).Resource().Attributes())
Expand Down
Loading