Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions exporter/zipkinexporter/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"net/http"

zipkinmodel "github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/proto/zipkin_proto3"
zipkinreporter "github.com/openzipkin/zipkin-go/reporter"

Expand Down Expand Up @@ -74,11 +73,10 @@ func (ze *zipkinExporter) start(_ context.Context, host component.Host) (err err
}

func (ze *zipkinExporter) pushTraces(ctx context.Context, td pdata.Traces) error {
tbatch, err := translator.FromTraces(td)
spans, err := translator.FromTraces(td)
if err != nil {
return consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}
spans := tbatch.([]*zipkinmodel.SpanModel)

body, err := ze.serializer.Serialize(spans)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions receiver/kafkareceiver/zipkin_unmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/apache/thrift/lib/go/thrift"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
zipkinmodel "github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/proto/zipkin_proto3"
zipkinreporter "github.com/openzipkin/zipkin-go/reporter"
"github.com/stretchr/testify/assert"
Expand All @@ -45,9 +44,8 @@ func TestUnmarshalZipkin(t *testing.T) {
span.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
span.SetSpanID(pdata.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
span.SetParentSpanID(pdata.NewSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 0}))
ret, err := v2FromTranslator.FromTraces(td)
spans, err := v2FromTranslator.FromTraces(td)
require.NoError(t, err)
spans := ret.([]*zipkinmodel.SpanModel)

serializer := zipkinreporter.JSONSerializer{}
jsonBytes, err := serializer.Serialize(spans)
Expand Down
2 changes: 1 addition & 1 deletion receiver/zipkinreceiver/trace_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func TestReceiverInvalidContentType(t *testing.T) {
zr.ServeHTTP(req, r)

require.Equal(t, 400, req.Code)
require.Equal(t, "unmarshal failed: invalid character 'i' looking for beginning of object key string\n", req.Body.String())
require.Equal(t, "invalid character 'i' looking for beginning of object key string\n", req.Body.String())
}

func TestReceiverConsumerError(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions translator/trace/zipkinv2/from_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,15 @@ const (
)

var (
sampled = true
_ pdata.FromTracesTranslator = (*FromTranslator)(nil)
sampled = true
)

// FromTranslator converts from pdata to Zipkin data model.
type FromTranslator struct{}

// FromTraces translates internal trace data into Zipkin v2 spans.
// Returns a slice of Zipkin SpanModel's.
func (t FromTranslator) FromTraces(td pdata.Traces) (interface{}, error) {
func (t FromTranslator) FromTraces(td pdata.Traces) ([]*zipkinmodel.SpanModel, error) {
resourceSpans := td.ResourceSpans()
if resourceSpans.Len() == 0 {
return nil, nil
Expand Down
8 changes: 3 additions & 5 deletions translator/trace/zipkinv2/from_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ func TestInternalTracesToZipkinSpans(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
zss, err := FromTranslator{}.FromTraces(test.td)
spans, err := FromTranslator{}.FromTraces(test.td)
assert.EqualValues(t, test.err, err)
if test.name == "empty" {
assert.Nil(t, zss)
assert.Nil(t, spans)
} else {
spans := zss.([]*zipkinmodel.SpanModel)
assert.Equal(t, len(test.zs), len(spans))
assert.EqualValues(t, test.zs, spans)
}
Expand All @@ -90,8 +89,7 @@ func TestInternalTracesToZipkinSpansAndBack(t *testing.T) {
"../../../internal/goldendataset/testdata/generated_pict_pairs_spans.txt")
assert.NoError(t, err)
for _, td := range tds {
ret, err := FromTranslator{}.FromTraces(td)
zipkinSpans := ret.([]*zipkinmodel.SpanModel)
zipkinSpans, err := FromTranslator{}.FromTraces(td)
assert.NoError(t, err)
assert.Equal(t, td.SpanCount(), len(zipkinSpans))
tdFromZS, zErr := ToTranslator{}.ToTraces(zipkinSpans)
Expand Down
35 changes: 17 additions & 18 deletions translator/trace/zipkinv2/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,39 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

var _ pdata.TracesDecoder = (*jsonDecoder)(nil)

type jsonDecoder struct{}
type jsonUnmarshaler struct {
toTranslator ToTranslator
}

// DecodeTraces from JSON bytes to zipkin model.
func (j jsonDecoder) DecodeTraces(buf []byte) (interface{}, error) {
// UnmarshalTraces from JSON bytes.
func (j jsonUnmarshaler) UnmarshalTraces(buf []byte) (pdata.Traces, error) {
var spans []*zipkinmodel.SpanModel
if err := json.Unmarshal(buf, &spans); err != nil {
return nil, err
return pdata.Traces{}, err
}
return spans, nil
return j.toTranslator.ToTraces(spans)
}

var _ pdata.TracesEncoder = (*jsonEncoder)(nil)

type jsonEncoder struct {
serializer zipkinreporter.JSONSerializer
type jsonMarshaler struct {
serializer zipkinreporter.JSONSerializer
fromTranslator FromTranslator
}

// EncodeTraces from zipkin model to bytes.
func (j jsonEncoder) EncodeTraces(mod interface{}) ([]byte, error) {
spans, ok := mod.([]*zipkinmodel.SpanModel)
if !ok {
return nil, pdata.NewErrIncompatibleType([]*zipkinmodel.SpanModel{}, mod)
// MarshalTraces to JSON bytes.
func (j jsonMarshaler) MarshalTraces(td pdata.Traces) ([]byte, error) {
spans, err := j.fromTranslator.FromTraces(td)
if err != nil {
return nil, err
}
return j.serializer.Serialize(spans)
}

// NewJSONTracesUnmarshaler returns an unmarshaler for JSON bytes.
func NewJSONTracesUnmarshaler(parseStringTags bool) pdata.TracesUnmarshaler {
return pdata.NewTracesUnmarshaler(jsonDecoder{}, ToTranslator{ParseStringTags: parseStringTags})
return jsonUnmarshaler{toTranslator: ToTranslator{ParseStringTags: parseStringTags}}
}

// NewJSONTracesMarshaler returns a marshaler to JSON bytes.
func NewJSONTracesMarshaler() pdata.TracesMarshaler {
return pdata.NewTracesMarshaler(jsonEncoder{}, FromTranslator{})
return jsonMarshaler{}
}
40 changes: 14 additions & 26 deletions translator/trace/zipkinv2/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,40 @@ import (
"io/ioutil"
"testing"

zipkinmodel "github.com/openzipkin/zipkin-go/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/model/pdata"
)

func TestJSONDecoder_DecodeTraces(t *testing.T) {
func TestJSONUnmarshaler_UnmarshalTraces(t *testing.T) {
data, err := ioutil.ReadFile("testdata/zipkin_v2_single.json")
require.NoError(t, err)
decoder := jsonDecoder{}
spans, err := decoder.DecodeTraces(data)
decoder := NewJSONTracesUnmarshaler(false)
td, err := decoder.UnmarshalTraces(data)
assert.NoError(t, err)
assert.NotNil(t, spans)
assert.IsType(t, []*zipkinmodel.SpanModel{}, spans)
assert.Equal(t, 1, td.SpanCount())
}

func TestJSONDecoder_DecodeTracesError(t *testing.T) {
decoder := jsonDecoder{}
spans, err := decoder.DecodeTraces([]byte("{"))
func TestJSONUnmarshaler_DecodeTracesError(t *testing.T) {
decoder := NewJSONTracesUnmarshaler(false)
_, err := decoder.UnmarshalTraces([]byte("{"))
assert.Error(t, err)
assert.Nil(t, spans)
}

func TestJSONEncoder_EncodeTraces(t *testing.T) {
encoder := jsonEncoder{}
buf, err := encoder.EncodeTraces(generateSpanErrorTags())
marshaler := NewJSONTracesMarshaler()
buf, err := marshaler.MarshalTraces(generateTraceSingleSpanErrorStatus())
assert.NoError(t, err)
assert.Greater(t, len(buf), 1)
}

func TestJSONEncoder_EncodeTracesError(t *testing.T) {
encoder := jsonEncoder{}
buf, err := encoder.EncodeTraces(nil)
invalidTD := pdata.NewTraces()
// Add one span with empty trace ID.
invalidTD.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty()
marshaler := NewJSONTracesMarshaler()
buf, err := marshaler.MarshalTraces(invalidTD)
assert.Error(t, err)
assert.Nil(t, buf)
}

func TestNewJSONTracesUnmarshaler(t *testing.T) {
m := NewJSONTracesUnmarshaler(false)
assert.NotNil(t, m)
assert.Implements(t, (*pdata.TracesUnmarshaler)(nil), m)
}

func TestNewJSONTracesMarshaler(t *testing.T) {
m := NewJSONTracesMarshaler()
assert.NotNil(t, m)
assert.Implements(t, (*pdata.TracesMarshaler)(nil), m)
}
52 changes: 25 additions & 27 deletions translator/trace/zipkinv2/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,51 @@
package zipkinv2

import (
zipkinmodel "github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/proto/zipkin_proto3"

"go.opentelemetry.io/collector/model/pdata"
)

var _ pdata.TracesDecoder = (*protobufDecoder)(nil)

type protobufDecoder struct {
// DebugWasSet toggles the Debug field of each Span. It is usually set to true if
type protobufUnmarshaler struct {
// debugWasSet toggles the Debug field of each Span. It is usually set to true if
// the "X-B3-Flags" header is set to 1 on the request.
DebugWasSet bool
debugWasSet bool

toTranslator ToTranslator
}

// DecodeTraces from protobuf bytes to zipkin model.
func (p protobufDecoder) DecodeTraces(buf []byte) (interface{}, error) {
spans, err := zipkin_proto3.ParseSpans(buf, p.DebugWasSet)
// UnmarshalTraces from protobuf bytes.
func (p protobufUnmarshaler) UnmarshalTraces(buf []byte) (pdata.Traces, error) {
spans, err := zipkin_proto3.ParseSpans(buf, p.debugWasSet)
if err != nil {
return nil, err
return pdata.Traces{}, err
}
return spans, nil
return p.toTranslator.ToTraces(spans)
}

var _ pdata.TracesEncoder = (*protobufEncoder)(nil)

type protobufEncoder struct {
serializer zipkin_proto3.SpanSerializer
type protobufMarshaler struct {
serializer zipkin_proto3.SpanSerializer
fromTranslator FromTranslator
}

// EncodeTraces to protobuf bytes.
func (p protobufEncoder) EncodeTraces(mod interface{}) ([]byte, error) {
spans, ok := mod.([]*zipkinmodel.SpanModel)
if !ok {
return nil, pdata.NewErrIncompatibleType([]*zipkinmodel.SpanModel{}, mod)
// MarshalTraces to protobuf bytes.
func (p protobufMarshaler) MarshalTraces(td pdata.Traces) ([]byte, error) {
spans, err := p.fromTranslator.FromTraces(td)
if err != nil {
return nil, err
}
return p.serializer.Serialize(spans)
}

// NewProtobufTracesUnmarshaler returns an unmarshaler of protobuf bytes.
// NewProtobufTracesUnmarshaler returns an pdata.TracesUnmarshaler of protobuf bytes.
func NewProtobufTracesUnmarshaler(debugWasSet, parseStringTags bool) pdata.TracesUnmarshaler {
return pdata.NewTracesUnmarshaler(
protobufDecoder{DebugWasSet: debugWasSet},
ToTranslator{ParseStringTags: parseStringTags},
)
return protobufUnmarshaler{
debugWasSet: debugWasSet,
toTranslator: ToTranslator{ParseStringTags: parseStringTags},
}
}

// NewProtobufTracesMarshaler returns a new marshaler to protobuf bytes.
// NewProtobufTracesMarshaler returns a new pdata.TracesMarshaler to protobuf bytes.
func NewProtobufTracesMarshaler() pdata.TracesMarshaler {
return pdata.NewTracesMarshaler(protobufEncoder{}, FromTranslator{})
return protobufMarshaler{}
}
55 changes: 14 additions & 41 deletions translator/trace/zipkinv2/protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,36 @@
package zipkinv2

import (
"io/ioutil"
"testing"

zipkinmodel "github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/proto/zipkin_proto3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/model/pdata"
)

func TestProtobufDecoder_DecodeTraces(t *testing.T) {
data, err := ioutil.ReadFile("testdata/zipkin_v2_single.json")
require.NoError(t, err)
decoder := jsonDecoder{}
spans, err := decoder.DecodeTraces(data)
require.NoError(t, err)

pb, err := zipkin_proto3.SpanSerializer{}.Serialize(spans.([]*zipkinmodel.SpanModel))
func TestProtobufMarshalUnmarshal(t *testing.T) {
pb, err := NewProtobufTracesMarshaler().MarshalTraces(generateTraceSingleSpanErrorStatus())
require.NoError(t, err)

pbDecoder := protobufDecoder{DebugWasSet: false}
pbSpans, err := pbDecoder.DecodeTraces(pb)
pbUnmarshaler := protobufUnmarshaler{debugWasSet: false}
td, err := pbUnmarshaler.UnmarshalTraces(pb)
assert.NoError(t, err)
assert.IsType(t, []*zipkinmodel.SpanModel{}, pbSpans)
assert.Len(t, pbSpans, 1)
assert.Equal(t, generateTraceSingleSpanErrorStatus(), td)
}

func TestProtobufDecoder_DecodeTracesError(t *testing.T) {
decoder := protobufDecoder{DebugWasSet: false}
spans, err := decoder.DecodeTraces([]byte("{"))
func TestProtobuf_UnmarshalTracesError(t *testing.T) {
decoder := protobufUnmarshaler{debugWasSet: false}
_, err := decoder.UnmarshalTraces([]byte("{"))
assert.Error(t, err)
assert.Nil(t, spans)
}

func TestProtobufEncoder_EncodeTraces(t *testing.T) {
encoder := protobufEncoder{}
buf, err := encoder.EncodeTraces(generateSpanErrorTags())
assert.NoError(t, err)
assert.Greater(t, len(buf), 1)
}

func TestProtobufEncoder_EncodeTracesError(t *testing.T) {
encoder := protobufEncoder{}
buf, err := encoder.EncodeTraces(nil)
func TestProtobuf_MarshalTracesError(t *testing.T) {
invalidTD := pdata.NewTraces()
// Add one span with empty trace ID.
invalidTD.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty()
marshaler := NewProtobufTracesMarshaler()
buf, err := marshaler.MarshalTraces(invalidTD)
assert.Nil(t, buf)
assert.Error(t, err)
}

func TestNewProtobufTracesUnmarshaler(t *testing.T) {
m := NewProtobufTracesUnmarshaler(false, false)
assert.NotNil(t, m)
assert.Implements(t, (*pdata.TracesUnmarshaler)(nil), m)
}

func TestNewProtobufTracesMarshaler(t *testing.T) {
m := NewProtobufTracesMarshaler()
assert.NotNil(t, m)
assert.Implements(t, (*pdata.TracesMarshaler)(nil), m)
}
11 changes: 2 additions & 9 deletions translator/trace/zipkinv2/to_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,14 @@ import (
"go.opentelemetry.io/collector/translator/trace/internal/zipkin"
)

var _ pdata.ToTracesTranslator = (*ToTranslator)(nil)

// ToTranslator converts from Zipkin data model to pdata.
type ToTranslator struct {
// ParseStringTags should be set to true if tags should be converted to numbers when possible.
ParseStringTags bool
}

// ToTraces translates Zipkin v2 spans into internal trace data.
func (t ToTranslator) ToTraces(src interface{}) (pdata.Traces, error) {
zipkinSpans, ok := src.([]*zipkinmodel.SpanModel)
if !ok {
return pdata.Traces{}, pdata.NewErrIncompatibleType([]*zipkinmodel.SpanModel{}, src)
}

// ToTraces translates Zipkin v2 spans into pdata.Traces.
func (t ToTranslator) ToTraces(zipkinSpans []*zipkinmodel.SpanModel) (pdata.Traces, error) {
traceData := pdata.NewTraces()
if len(zipkinSpans) == 0 {
return traceData, nil
Expand Down