Skip to content

Commit bf214d6

Browse files
committed
Add initial implementation of pdatagrcp
This is needed because if we split pdata, we will nolonger be able to depend on the InternalRep or generated proto classes (raw, grpc). The pdatagrcp will eventually be public once we stabilize the API. The API in the pdatagrcp package is inspired from the grpc generated classes and redirects all calls to the generated classes. Some simple renames: * TraceService[Client|Server] -> Traces[Client|Server] * MetricsService[Client|Server] -> Metrics[Client|Server] * LogsService[Client|Server] -> Logs[Client|Server] Other changes: * Replace usages of grpc generated classes in otlpexporter to excercise this new package. * Left some TODOs for the moment until this package is used more to determine the right API. Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
1 parent 065563a commit bf214d6

File tree

5 files changed

+276
-85
lines changed

5 files changed

+276
-85
lines changed

exporter/otlpexporter/otlp.go

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@ import (
3030
"go.opentelemetry.io/collector/consumer/consumererror"
3131
"go.opentelemetry.io/collector/consumer/pdata"
3232
"go.opentelemetry.io/collector/exporter/exporterhelper"
33-
"go.opentelemetry.io/collector/internal"
34-
otlplogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
35-
otlpmetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
36-
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
33+
"go.opentelemetry.io/collector/internal/pdatagrpc"
3734
)
3835

3936
type exporter struct {
@@ -66,33 +63,31 @@ func (e *exporter) shutdown(context.Context) error {
6663
}
6764

6865
func (e *exporter) pushTraceData(ctx context.Context, td pdata.Traces) error {
69-
if err := e.w.exportTrace(ctx, internal.TracesToOtlp(td.InternalRep())); err != nil {
66+
if err := e.w.exportTrace(ctx, td); err != nil {
7067
return fmt.Errorf("failed to push trace data via OTLP exporter: %w", err)
7168
}
7269
return nil
7370
}
7471

7572
func (e *exporter) pushMetricsData(ctx context.Context, md pdata.Metrics) error {
76-
req := internal.MetricsToOtlp(md.InternalRep())
77-
if err := e.w.exportMetrics(ctx, req); err != nil {
73+
if err := e.w.exportMetrics(ctx, md); err != nil {
7874
return fmt.Errorf("failed to push metrics data via OTLP exporter: %w", err)
7975
}
8076
return nil
8177
}
8278

8379
func (e *exporter) pushLogData(ctx context.Context, ld pdata.Logs) error {
84-
request := internal.LogsToOtlp(ld.InternalRep())
85-
if err := e.w.exportLogs(ctx, request); err != nil {
80+
if err := e.w.exportLogs(ctx, ld); err != nil {
8681
return fmt.Errorf("failed to push log data via OTLP exporter: %w", err)
8782
}
8883
return nil
8984
}
9085

9186
type grpcSender struct {
9287
// gRPC clients and connection.
93-
traceExporter otlptrace.TraceServiceClient
94-
metricExporter otlpmetrics.MetricsServiceClient
95-
logExporter otlplogs.LogsServiceClient
88+
traceExporter pdatagrpc.TracesClient
89+
metricExporter pdatagrpc.MetricsClient
90+
logExporter pdatagrpc.LogsClient
9691
clientConn *grpc.ClientConn
9792
metadata metadata.MD
9893
callOptions []grpc.CallOption
@@ -110,9 +105,9 @@ func newGrpcSender(config *Config) (*grpcSender, error) {
110105
}
111106

112107
gs := &grpcSender{
113-
traceExporter: otlptrace.NewTraceServiceClient(clientConn),
114-
metricExporter: otlpmetrics.NewMetricsServiceClient(clientConn),
115-
logExporter: otlplogs.NewLogsServiceClient(clientConn),
108+
traceExporter: pdatagrpc.NewTracesClient(clientConn),
109+
metricExporter: pdatagrpc.NewMetricsClient(clientConn),
110+
logExporter: pdatagrpc.NewLogsClient(clientConn),
116111
clientConn: clientConn,
117112
metadata: metadata.New(config.GRPCClientSettings.Headers),
118113
callOptions: []grpc.CallOption{
@@ -126,18 +121,18 @@ func (gs *grpcSender) stop() error {
126121
return gs.clientConn.Close()
127122
}
128123

129-
func (gs *grpcSender) exportTrace(ctx context.Context, request *otlptrace.ExportTraceServiceRequest) error {
130-
_, err := gs.traceExporter.Export(gs.enhanceContext(ctx), request, gs.callOptions...)
124+
func (gs *grpcSender) exportTrace(ctx context.Context, td pdata.Traces) error {
125+
_, err := gs.traceExporter.Export(gs.enhanceContext(ctx), td, gs.callOptions...)
131126
return processError(err)
132127
}
133128

134-
func (gs *grpcSender) exportMetrics(ctx context.Context, request *otlpmetrics.ExportMetricsServiceRequest) error {
135-
_, err := gs.metricExporter.Export(gs.enhanceContext(ctx), request, gs.callOptions...)
129+
func (gs *grpcSender) exportMetrics(ctx context.Context, md pdata.Metrics) error {
130+
_, err := gs.metricExporter.Export(gs.enhanceContext(ctx), md, gs.callOptions...)
136131
return processError(err)
137132
}
138133

139-
func (gs *grpcSender) exportLogs(ctx context.Context, request *otlplogs.ExportLogsServiceRequest) error {
140-
_, err := gs.logExporter.Export(gs.enhanceContext(ctx), request, gs.callOptions...)
134+
func (gs *grpcSender) exportLogs(ctx context.Context, ld pdata.Logs) error {
135+
_, err := gs.logExporter.Export(gs.enhanceContext(ctx), ld, gs.callOptions...)
141136
return processError(err)
142137
}
143138

exporter/otlpexporter/otlp_test.go

Lines changed: 37 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,13 @@ import (
3333
"go.opentelemetry.io/collector/config/configgrpc"
3434
"go.opentelemetry.io/collector/config/configtls"
3535
"go.opentelemetry.io/collector/consumer/pdata"
36-
"go.opentelemetry.io/collector/internal"
37-
otlplogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
38-
otlpmetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
39-
otlptraces "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
36+
"go.opentelemetry.io/collector/internal/pdatagrpc"
4037
"go.opentelemetry.io/collector/internal/testdata"
4138
"go.opentelemetry.io/collector/obsreport"
4239
)
4340

4441
type mockReceiver struct {
42+
t *testing.T
4543
srv *grpc.Server
4644
requestCount int32
4745
totalItems int32
@@ -57,43 +55,35 @@ func (r *mockReceiver) GetMetadata() metadata.MD {
5755

5856
type mockTracesReceiver struct {
5957
mockReceiver
60-
lastRequest *otlptraces.ExportTraceServiceRequest
58+
lastRequest pdata.Traces
6159
}
6260

63-
func (r *mockTracesReceiver) Export(
64-
ctx context.Context,
65-
req *otlptraces.ExportTraceServiceRequest,
66-
) (*otlptraces.ExportTraceServiceResponse, error) {
61+
func (r *mockTracesReceiver) Export(ctx context.Context, td pdata.Traces) (interface{}, error) {
6762
atomic.AddInt32(&r.requestCount, 1)
68-
spanCount := 0
69-
for _, rs := range req.ResourceSpans {
70-
for _, ils := range rs.InstrumentationLibrarySpans {
71-
spanCount += len(ils.Spans)
72-
}
73-
}
74-
atomic.AddInt32(&r.totalItems, int32(spanCount))
63+
atomic.AddInt32(&r.totalItems, int32(td.SpanCount()))
7564
r.mux.Lock()
7665
defer r.mux.Unlock()
77-
r.lastRequest = req
66+
r.lastRequest = td
7867
r.metadata, _ = metadata.FromIncomingContext(ctx)
79-
return &otlptraces.ExportTraceServiceResponse{}, nil
68+
return nil, nil
8069
}
8170

82-
func (r *mockTracesReceiver) GetLastRequest() *otlptraces.ExportTraceServiceRequest {
71+
func (r *mockTracesReceiver) GetLastRequest() pdata.Traces {
8372
r.mux.Lock()
8473
defer r.mux.Unlock()
8574
return r.lastRequest
8675
}
8776

88-
func otlpTracesReceiverOnGRPCServer(ln net.Listener) *mockTracesReceiver {
77+
func otlpTracesReceiverOnGRPCServer(t *testing.T, ln net.Listener) *mockTracesReceiver {
8978
rcv := &mockTracesReceiver{
9079
mockReceiver: mockReceiver{
80+
t: t,
9181
srv: obsreport.GRPCServerWithObservabilityEnabled(),
9282
},
9383
}
9484

9585
// Now run it as a gRPC server
96-
otlptraces.RegisterTraceServiceServer(rcv.srv, rcv)
86+
pdatagrpc.RegisterTracesServer(rcv.srv, rcv)
9787
go func() {
9888
_ = rcv.srv.Serve(ln)
9989
}()
@@ -103,29 +93,20 @@ func otlpTracesReceiverOnGRPCServer(ln net.Listener) *mockTracesReceiver {
10393

10494
type mockLogsReceiver struct {
10595
mockReceiver
106-
lastRequest *otlplogs.ExportLogsServiceRequest
96+
lastRequest pdata.Logs
10797
}
10898

109-
func (r *mockLogsReceiver) Export(
110-
ctx context.Context,
111-
req *otlplogs.ExportLogsServiceRequest,
112-
) (*otlplogs.ExportLogsServiceResponse, error) {
99+
func (r *mockLogsReceiver) Export(ctx context.Context, ld pdata.Logs) (interface{}, error) {
113100
atomic.AddInt32(&r.requestCount, 1)
114-
recordCount := 0
115-
for _, rs := range req.ResourceLogs {
116-
for _, il := range rs.InstrumentationLibraryLogs {
117-
recordCount += len(il.Logs)
118-
}
119-
}
120-
atomic.AddInt32(&r.totalItems, int32(recordCount))
101+
atomic.AddInt32(&r.totalItems, int32(ld.LogRecordCount()))
121102
r.mux.Lock()
122103
defer r.mux.Unlock()
123-
r.lastRequest = req
104+
r.lastRequest = ld
124105
r.metadata, _ = metadata.FromIncomingContext(ctx)
125-
return &otlplogs.ExportLogsServiceResponse{}, nil
106+
return nil, nil
126107
}
127108

128-
func (r *mockLogsReceiver) GetLastRequest() *otlplogs.ExportLogsServiceRequest {
109+
func (r *mockLogsReceiver) GetLastRequest() pdata.Logs {
129110
r.mux.Lock()
130111
defer r.mux.Unlock()
131112
return r.lastRequest
@@ -139,7 +120,7 @@ func otlpLogsReceiverOnGRPCServer(ln net.Listener) *mockLogsReceiver {
139120
}
140121

141122
// Now run it as a gRPC server
142-
otlplogs.RegisterLogsServiceServer(rcv.srv, rcv)
123+
pdatagrpc.RegisterLogsServer(rcv.srv, rcv)
143124
go func() {
144125
_ = rcv.srv.Serve(ln)
145126
}()
@@ -149,24 +130,21 @@ func otlpLogsReceiverOnGRPCServer(ln net.Listener) *mockLogsReceiver {
149130

150131
type mockMetricsReceiver struct {
151132
mockReceiver
152-
lastRequest *otlpmetrics.ExportMetricsServiceRequest
133+
lastRequest pdata.Metrics
153134
}
154135

155-
func (r *mockMetricsReceiver) Export(
156-
ctx context.Context,
157-
req *otlpmetrics.ExportMetricsServiceRequest,
158-
) (*otlpmetrics.ExportMetricsServiceResponse, error) {
136+
func (r *mockMetricsReceiver) Export(ctx context.Context, md pdata.Metrics) (interface{}, error) {
159137
atomic.AddInt32(&r.requestCount, 1)
160-
_, recordCount := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(req)).MetricAndDataPointCount()
138+
_, recordCount := md.MetricAndDataPointCount()
161139
atomic.AddInt32(&r.totalItems, int32(recordCount))
162140
r.mux.Lock()
163141
defer r.mux.Unlock()
164-
r.lastRequest = req
142+
r.lastRequest = md
165143
r.metadata, _ = metadata.FromIncomingContext(ctx)
166-
return &otlpmetrics.ExportMetricsServiceResponse{}, nil
144+
return nil, nil
167145
}
168146

169-
func (r *mockMetricsReceiver) GetLastRequest() *otlpmetrics.ExportMetricsServiceRequest {
147+
func (r *mockMetricsReceiver) GetLastRequest() pdata.Metrics {
170148
r.mux.Lock()
171149
defer r.mux.Unlock()
172150
return r.lastRequest
@@ -180,7 +158,7 @@ func otlpMetricsReceiverOnGRPCServer(ln net.Listener) *mockMetricsReceiver {
180158
}
181159

182160
// Now run it as a gRPC server
183-
otlpmetrics.RegisterMetricsServiceServer(rcv.srv, rcv)
161+
pdatagrpc.RegisterMetricsServer(rcv.srv, rcv)
184162
go func() {
185163
_ = rcv.srv.Serve(ln)
186164
}()
@@ -192,7 +170,7 @@ func TestSendTraces(t *testing.T) {
192170
// Start an OTLP-compatible receiver.
193171
ln, err := net.Listen("tcp", "localhost:")
194172
require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err)
195-
rcv := otlpTracesReceiverOnGRPCServer(ln)
173+
rcv := otlpTracesReceiverOnGRPCServer(t, ln)
196174
// Also closes the connection.
197175
defer rcv.srv.GracefulStop()
198176

@@ -238,8 +216,6 @@ func TestSendTraces(t *testing.T) {
238216
// A trace with 2 spans.
239217
td = testdata.GenerateTracesTwoSpansSameResource()
240218

241-
expectedOTLPReq := internal.TracesToOtlp(td.Clone().InternalRep())
242-
243219
err = exp.ConsumeTraces(context.Background(), td)
244220
assert.NoError(t, err)
245221

@@ -253,7 +229,7 @@ func TestSendTraces(t *testing.T) {
253229
// Verify received span.
254230
assert.EqualValues(t, 2, atomic.LoadInt32(&rcv.totalItems))
255231
assert.EqualValues(t, 2, atomic.LoadInt32(&rcv.requestCount))
256-
assert.EqualValues(t, expectedOTLPReq, rcv.GetLastRequest())
232+
assert.EqualValues(t, td, rcv.GetLastRequest())
257233

258234
require.EqualValues(t, rcv.GetMetadata().Get("header"), expectedHeader)
259235
}
@@ -308,8 +284,6 @@ func TestSendMetrics(t *testing.T) {
308284
// A trace with 2 spans.
309285
md = testdata.GenerateMetricsTwoMetrics()
310286

311-
expectedOTLPReq := internal.MetricsToOtlp(md.Clone().InternalRep())
312-
313287
err = exp.ConsumeMetrics(context.Background(), md)
314288
assert.NoError(t, err)
315289

@@ -323,7 +297,7 @@ func TestSendMetrics(t *testing.T) {
323297
// Verify received metrics.
324298
assert.EqualValues(t, 2, atomic.LoadInt32(&rcv.requestCount))
325299
assert.EqualValues(t, 4, atomic.LoadInt32(&rcv.totalItems))
326-
assert.EqualValues(t, expectedOTLPReq, rcv.GetLastRequest())
300+
assert.EqualValues(t, md, rcv.GetLastRequest())
327301

328302
require.EqualValues(t, rcv.GetMetadata().Get("header"), expectedHeader)
329303
}
@@ -428,7 +402,7 @@ func TestSendTraceDataServerStartWhileRequest(t *testing.T) {
428402
}()
429403

430404
time.Sleep(2 * time.Second)
431-
rcv := otlpTracesReceiverOnGRPCServer(ln)
405+
rcv := otlpTracesReceiverOnGRPCServer(t, ln)
432406
defer rcv.srv.GracefulStop()
433407
// Wait until one of the conditions below triggers.
434408
select {
@@ -441,13 +415,13 @@ func TestSendTraceDataServerStartWhileRequest(t *testing.T) {
441415
}
442416

443417
func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td pdata.Traces, ln net.Listener) {
444-
rcv := otlpTracesReceiverOnGRPCServer(ln)
418+
rcv := otlpTracesReceiverOnGRPCServer(t, ln)
445419
defer rcv.srv.GracefulStop()
446420
// Ensure that initially there is no data in the receiver.
447421
assert.EqualValues(t, 0, atomic.LoadInt32(&rcv.requestCount))
448422

449423
// Clone the request and store as expected.
450-
expectedOTLPReq := internal.TracesToOtlp(td.Clone().InternalRep())
424+
expectedData := td.Clone()
451425

452426
// Resend the request, this should succeed.
453427
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -461,7 +435,7 @@ func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td pd
461435

462436
// Verify received span.
463437
assert.EqualValues(t, 2, atomic.LoadInt32(&rcv.totalItems))
464-
assert.EqualValues(t, expectedOTLPReq, rcv.GetLastRequest())
438+
assert.EqualValues(t, expectedData, rcv.GetLastRequest())
465439
}
466440

467441
func TestSendLogData(t *testing.T) {
@@ -497,8 +471,8 @@ func TestSendLogData(t *testing.T) {
497471
assert.EqualValues(t, 0, atomic.LoadInt32(&rcv.requestCount))
498472

499473
// Send empty request.
500-
td := pdata.NewLogs()
501-
assert.NoError(t, exp.ConsumeLogs(context.Background(), td))
474+
ld := pdata.NewLogs()
475+
assert.NoError(t, exp.ConsumeLogs(context.Background(), ld))
502476

503477
// Wait until it is received.
504478
assert.Eventually(t, func() bool {
@@ -509,10 +483,9 @@ func TestSendLogData(t *testing.T) {
509483
assert.EqualValues(t, 0, atomic.LoadInt32(&rcv.totalItems))
510484

511485
// A request with 2 log entries.
512-
td = testdata.GenerateLogsTwoLogRecordsSameResource()
513-
expectedOTLPReq := internal.LogsToOtlp(td.Clone().InternalRep())
486+
ld = testdata.GenerateLogsTwoLogRecordsSameResource()
514487

515-
err = exp.ConsumeLogs(context.Background(), td)
488+
err = exp.ConsumeLogs(context.Background(), ld)
516489
assert.NoError(t, err)
517490

518491
// Wait until it is received.
@@ -523,5 +496,5 @@ func TestSendLogData(t *testing.T) {
523496
// Verify received logs.
524497
assert.EqualValues(t, 2, atomic.LoadInt32(&rcv.requestCount))
525498
assert.EqualValues(t, 2, atomic.LoadInt32(&rcv.totalItems))
526-
assert.EqualValues(t, expectedOTLPReq, rcv.GetLastRequest())
499+
assert.EqualValues(t, ld, rcv.GetLastRequest())
527500
}

0 commit comments

Comments
 (0)