Skip to content

Commit d94eec1

Browse files
authored
Merge branch 'main' into rmconsumerdata
2 parents 84d8e5b + 6ad7d67 commit d94eec1

File tree

23 files changed

+177
-123
lines changed

23 files changed

+177
-123
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
- Remove `consumerdata.TraceData` (#2551)
88
- Move `consumerdata.MetricsData` to `internaldata.MetricsData` (#2512)
9+
- Remove custom OpenCensus sematic conventions that have equivalent in otel (#2552)
910

1011
## v0.21.0 Beta
1112

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package componenttest
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
23+
"go.uber.org/zap"
24+
25+
"go.opentelemetry.io/collector/component"
26+
"go.opentelemetry.io/collector/config/configerror"
27+
"go.opentelemetry.io/collector/config/configmodels"
28+
"go.opentelemetry.io/collector/consumer/consumertest"
29+
"go.opentelemetry.io/collector/internal/testdata"
30+
)
31+
32+
func verifyTraceProcessorDoesntProduceAfterShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
33+
// Create a processor and output its produce to a sink.
34+
nextSink := new(consumertest.TracesSink)
35+
processor, err := factory.CreateTracesProcessor(
36+
context.Background(),
37+
component.ProcessorCreateParams{Logger: zap.NewNop()},
38+
cfg,
39+
nextSink,
40+
)
41+
if err != nil {
42+
if err == configerror.ErrDataTypeIsNotSupported {
43+
return
44+
}
45+
require.NoError(t, err)
46+
}
47+
err = processor.Start(context.Background(), NewNopHost())
48+
assert.NoError(t, err)
49+
50+
// Send some traces to the processor.
51+
const generatedCount = 10
52+
for i := 0; i < generatedCount; i++ {
53+
processor.ConsumeTraces(context.Background(), testdata.GenerateTraceDataOneSpan())
54+
}
55+
56+
// Now shutdown the processor.
57+
err = processor.Shutdown(context.Background())
58+
assert.NoError(t, err)
59+
60+
// The Shutdown() is done. It means the processor must have sent everything we
61+
// gave it to the next sink.
62+
assert.EqualValues(t, generatedCount, nextSink.SpansCount())
63+
}
64+
65+
func VerifyProcessorShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
66+
verifyTraceProcessorDoesntProduceAfterShutdown(t, factory, cfg)
67+
// TODO: add metrics and logs verification.
68+
// TODO: add other shutdown verifications.
69+
}

internal/goldendataset/span_generator.go

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -476,33 +476,36 @@ func calculateListSize(listCnt PICTInputSpanChild) int {
476476

477477
func generateSpanEvent(index int) *otlptrace.Span_Event {
478478
t := time.Now().Add(-75 * time.Microsecond)
479+
name, attributes := generateEventNameAndAttributes(index)
479480
return &otlptrace.Span_Event{
480481
TimeUnixNano: uint64(t.UnixNano()),
481-
Name: "message",
482-
Attributes: generateEventAttributes(index),
482+
Name: name,
483+
Attributes: attributes,
483484
DroppedAttributesCount: 0,
484485
}
485486
}
486487

487-
func generateEventAttributes(index int) []otlpcommon.KeyValue {
488-
if index%4 == 2 {
489-
return nil
490-
}
491-
attrMap := make(map[string]interface{})
492-
if index%2 == 0 {
493-
attrMap[conventions.AttributeMessageType] = "SENT"
494-
} else {
495-
attrMap[conventions.AttributeMessageType] = "RECEIVED"
496-
}
497-
attrMap[conventions.AttributeMessageID] = int64(index)
498-
attrMap[conventions.AttributeMessageCompressedSize] = int64(17 * index)
499-
attrMap[conventions.AttributeMessageUncompressedSize] = int64(24 * index)
500-
if index%4 == 1 {
501-
attrMap["app.inretry"] = true
502-
attrMap["app.progress"] = 0.6
503-
attrMap["app.statemap"] = "14|5|202"
488+
func generateEventNameAndAttributes(index int) (string, []otlpcommon.KeyValue) {
489+
switch index % 4 {
490+
case 0, 3:
491+
attrMap := make(map[string]interface{})
492+
if index%2 == 0 {
493+
attrMap[conventions.AttributeMessageType] = "SENT"
494+
} else {
495+
attrMap[conventions.AttributeMessageType] = "RECEIVED"
496+
}
497+
attrMap[conventions.AttributeMessageID] = int64(index / 4)
498+
attrMap[conventions.AttributeMessageCompressedSize] = int64(17 * index)
499+
attrMap[conventions.AttributeMessageUncompressedSize] = int64(24 * index)
500+
return "message", convertMapToAttributeKeyValues(attrMap)
501+
case 1:
502+
return "custom", convertMapToAttributeKeyValues(map[string]interface{}{
503+
"app.inretry": true,
504+
"app.progress": 0.6,
505+
"app.statemap": "14|5|202"})
506+
default:
507+
return "annotation", nil
504508
}
505-
return convertMapToAttributeKeyValues(attrMap)
506509
}
507510

508511
func generateSpanLink(random io.Reader, index int) *otlptrace.Span_Link {

internal/goldendataset/traces_generator.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,25 @@ package goldendataset
1717
import (
1818
"fmt"
1919
"io"
20+
"math/rand"
2021

22+
"go.opentelemetry.io/collector/consumer/pdata"
2123
otlpcommon "go.opentelemetry.io/collector/internal/data/protogen/common/v1"
2224
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1"
2325
)
2426

25-
// GenerateResourceSpans generates a slice of OTLP ResourceSpans objects based on the PICT-generated pairwise
27+
// GenerateTraces generates a slice of OTLP ResourceSpans objects based on the PICT-generated pairwise
2628
// parameters defined in the parameters file specified by the tracePairsFile parameter. The pairs to generate
27-
// spans for for defined in the file specified by the spanPairsFile parameter. The random parameter injects the
28-
// random number generator to use in generating IDs and other random values.
29+
// spans for for defined in the file specified by the spanPairsFile parameter.
2930
// The slice of ResourceSpans are returned. If an err is returned, the slice elements will be nil.
30-
func GenerateResourceSpans(tracePairsFile string, spanPairsFile string,
31-
random io.Reader) ([]*otlptrace.ResourceSpans, error) {
31+
func GenerateTraces(tracePairsFile string, spanPairsFile string) ([]pdata.Traces, error) {
32+
random := io.Reader(rand.New(rand.NewSource(42)))
3233
pairsData, err := loadPictOutputFile(tracePairsFile)
3334
if err != nil {
3435
return nil, err
3536
}
3637
pairsTotal := len(pairsData) - 1
37-
spans := make([]*otlptrace.ResourceSpans, pairsTotal)
38+
traces := make([]pdata.Traces, pairsTotal)
3839
for index, values := range pairsData {
3940
if index == 0 {
4041
continue
@@ -44,22 +45,22 @@ func GenerateResourceSpans(tracePairsFile string, spanPairsFile string,
4445
InstrumentationLibrary: PICTInputInstrumentationLibrary(values[TracesColumnInstrumentationLibrary]),
4546
Spans: PICTInputSpans(values[TracesColumnSpans]),
4647
}
47-
rscSpan, spanErr := GenerateResourceSpan(tracingInputs, spanPairsFile, random)
48+
rscSpan, spanErr := generateResourceSpan(tracingInputs, spanPairsFile, random)
4849
if spanErr != nil {
4950
err = spanErr
5051
}
51-
spans[index-1] = rscSpan
52+
traces[index-1] = pdata.TracesFromOtlp([]*otlptrace.ResourceSpans{rscSpan})
5253
}
53-
return spans, err
54+
return traces, err
5455
}
5556

56-
// GenerateResourceSpan generates a single OTLP ResourceSpans populated based on the provided inputs. They are:
57+
// generateResourceSpan generates a single OTLP ResourceSpans populated based on the provided inputs. They are:
5758
// tracingInputs - the pairwise combination of field value variations for this ResourceSpans
5859
// spanPairsFile - the file with the PICT-generated parameter combinations to generate spans for
5960
// random - the random number generator to use in generating ID values
6061
//
6162
// The generated resource spans. If err is not nil, some or all of the resource spans fields will be nil.
62-
func GenerateResourceSpan(tracingInputs *PICTTracingInputs, spanPairsFile string,
63+
func generateResourceSpan(tracingInputs *PICTTracingInputs, spanPairsFile string,
6364
random io.Reader) (*otlptrace.ResourceSpans, error) {
6465
libSpans, err := generateLibrarySpansArray(tracingInputs, spanPairsFile, random)
6566
return &otlptrace.ResourceSpans{

internal/goldendataset/traces_generator_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,14 @@
1515
package goldendataset
1616

1717
import (
18-
"io"
19-
"math/rand"
2018
"testing"
2119

2220
"github.com/stretchr/testify/assert"
2321
)
2422

2523
func TestGenerateTraces(t *testing.T) {
26-
random := io.Reader(rand.New(rand.NewSource(42)))
27-
rscSpans, err := GenerateResourceSpans("testdata/generated_pict_pairs_traces.txt",
28-
"testdata/generated_pict_pairs_spans.txt", random)
24+
rscSpans, err := GenerateTraces("testdata/generated_pict_pairs_traces.txt",
25+
"testdata/generated_pict_pairs_spans.txt")
2926
assert.Nil(t, err)
3027
assert.Equal(t, 32, len(rscSpans))
3128
}

processor/batchprocessor/batch_processor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ func (bp *batchProcessor) Start(context.Context, component.Host) error {
108108
// Shutdown is invoked during service shutdown.
109109
func (bp *batchProcessor) Shutdown(context.Context) error {
110110
bp.cancel()
111+
112+
// Wait until current batch is drained.
111113
<-bp.done
112114
return nil
113115
}
@@ -132,6 +134,7 @@ func (bp *batchProcessor) startProcessingCycle() {
132134
// make it cancellable using the context that Shutdown gets as a parameter
133135
bp.sendItems(statTimeoutTriggerSend)
134136
}
137+
// Indicate that we finished draining.
135138
close(bp.done)
136139
return
137140
case item := <-bp.newItem:

processor/batchprocessor/batch_processor_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,3 +689,8 @@ func logsReceivedByName(lds []pdata.Logs) map[string]pdata.LogRecord {
689689
}
690690
return logsReceivedByName
691691
}
692+
693+
func TestShutdown(t *testing.T) {
694+
factory := NewFactory()
695+
componenttest.VerifyProcessorShutdown(t, factory, factory.CreateDefaultConfig())
696+
}

receiver/prometheusreceiver/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ services. It supports the full set of Prometheus configuration, including
2525
service discovery. Just like you would write in a YAML configuration file
2626
before starting Prometheus, such as with:
2727

28+
**Note**: Since the collector configuration supports env variable substitution
29+
`$` charaters in your prometheus configuration are interpreted as environment
30+
variables. If you want to use $ charaters in your prometheus configuration,
31+
you must escape them using `$$`.
32+
2833
```shell
2934
prometheus --config.file=prom.yaml
3035
```

testbed/correctness/traces/correctness_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ func testWithTracingGoldenDataset(
6161
dataProvider := testbed.NewGoldenDataProvider(
6262
"../../../internal/goldendataset/testdata/generated_pict_pairs_traces.txt",
6363
"../../../internal/goldendataset/testdata/generated_pict_pairs_spans.txt",
64-
"",
65-
161803)
64+
"")
6665
factories, err := defaultcomponents.Components()
6766
require.NoError(t, err, "default components resulted in: %v", err)
6867
runner := testbed.NewInProcessCollector(factories)

testbed/testbed/data_providers.go

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ package testbed
1717
import (
1818
"encoding/binary"
1919
"fmt"
20-
"io"
2120
"log"
22-
"math/rand"
2321
"os"
2422
"strconv"
2523
"time"
@@ -216,12 +214,12 @@ func (dp *PerfTestDataProvider) GenerateLogs() (pdata.Logs, bool) {
216214
type GoldenDataProvider struct {
217215
tracePairsFile string
218216
spanPairsFile string
219-
random io.Reader
220217
batchesGenerated *atomic.Uint64
221218
dataItemsGenerated *atomic.Uint64
222-
resourceSpans []*otlptrace.ResourceSpans
223-
spansIndex int
224-
spansMap map[string]*otlptrace.Span
219+
220+
tracesGenerated []pdata.Traces
221+
tracesIndex int
222+
spansMap map[string]*otlptrace.Span
225223

226224
metricPairsFile string
227225
metricsGenerated []pdata.Metrics
@@ -230,13 +228,11 @@ type GoldenDataProvider struct {
230228

231229
// NewGoldenDataProvider creates a new instance of GoldenDataProvider which generates test data based
232230
// on the pairwise combinations specified in the tracePairsFile and spanPairsFile input variables.
233-
// The supplied randomSeed is used to initialize the random number generator used in generating tracing IDs.
234-
func NewGoldenDataProvider(tracePairsFile string, spanPairsFile string, metricPairsFile string, randomSeed int64) *GoldenDataProvider {
231+
func NewGoldenDataProvider(tracePairsFile string, spanPairsFile string, metricPairsFile string) *GoldenDataProvider {
235232
return &GoldenDataProvider{
236233
tracePairsFile: tracePairsFile,
237234
spanPairsFile: spanPairsFile,
238235
metricPairsFile: metricPairsFile,
239-
random: io.Reader(rand.New(rand.NewSource(randomSeed))),
240236
}
241237
}
242238

@@ -246,27 +242,22 @@ func (dp *GoldenDataProvider) SetLoadGeneratorCounters(batchesGenerated *atomic.
246242
}
247243

248244
func (dp *GoldenDataProvider) GenerateTraces() (pdata.Traces, bool) {
249-
if dp.resourceSpans == nil {
245+
if dp.tracesGenerated == nil {
250246
var err error
251-
dp.resourceSpans, err = goldendataset.GenerateResourceSpans(dp.tracePairsFile, dp.spanPairsFile, dp.random)
247+
dp.tracesGenerated, err = goldendataset.GenerateTraces(dp.tracePairsFile, dp.spanPairsFile)
252248
if err != nil {
253249
log.Printf("cannot generate traces: %s", err)
254-
dp.resourceSpans = make([]*otlptrace.ResourceSpans, 0)
250+
dp.tracesGenerated = nil
255251
}
256252
}
257253
dp.batchesGenerated.Inc()
258-
if dp.spansIndex >= len(dp.resourceSpans) {
259-
return pdata.TracesFromOtlp(make([]*otlptrace.ResourceSpans, 0)), true
260-
}
261-
resourceSpans := make([]*otlptrace.ResourceSpans, 1)
262-
resourceSpans[0] = dp.resourceSpans[dp.spansIndex]
263-
dp.spansIndex++
264-
spanCount := uint64(0)
265-
for _, libSpans := range resourceSpans[0].InstrumentationLibrarySpans {
266-
spanCount += uint64(len(libSpans.Spans))
254+
if dp.tracesIndex >= len(dp.tracesGenerated) {
255+
return pdata.NewTraces(), true
267256
}
268-
dp.dataItemsGenerated.Add(spanCount)
269-
return pdata.TracesFromOtlp(resourceSpans), false
257+
td := dp.tracesGenerated[dp.tracesIndex]
258+
dp.tracesIndex++
259+
dp.dataItemsGenerated.Add(uint64(td.SpanCount()))
260+
return td, false
270261
}
271262

272263
func (dp *GoldenDataProvider) GenerateMetrics() (pdata.Metrics, bool) {
@@ -277,8 +268,8 @@ func (dp *GoldenDataProvider) GenerateMetrics() (pdata.Metrics, bool) {
277268
log.Printf("cannot generate metrics: %s", err)
278269
}
279270
}
280-
numMetricsGenerated := len(dp.metricsGenerated)
281-
if dp.metricsIndex == numMetricsGenerated {
271+
dp.batchesGenerated.Inc()
272+
if dp.metricsIndex == len(dp.metricsGenerated) {
282273
return pdata.Metrics{}, true
283274
}
284275
pdm := dp.metricsGenerated[dp.metricsIndex]
@@ -294,7 +285,11 @@ func (dp *GoldenDataProvider) GenerateLogs() (pdata.Logs, bool) {
294285

295286
func (dp *GoldenDataProvider) GetGeneratedSpan(traceID pdata.TraceID, spanID pdata.SpanID) *otlptrace.Span {
296287
if dp.spansMap == nil {
297-
dp.spansMap = populateSpansMap(dp.resourceSpans)
288+
var resourceSpansList []*otlptrace.ResourceSpans
289+
for _, td := range dp.tracesGenerated {
290+
resourceSpansList = append(resourceSpansList, pdata.TracesToOtlp(td)...)
291+
}
292+
dp.spansMap = populateSpansMap(resourceSpansList)
298293
}
299294
key := traceIDAndSpanIDToString(traceID, spanID)
300295
return dp.spansMap[key]

0 commit comments

Comments
 (0)