Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/googlecloudstorageexporter-add-traces-support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: exporter/googlecloudstorage

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for exporting traces to Google Cloud Storage

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [44945]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
6 changes: 4 additions & 2 deletions exporter/googlecloudstorageexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
# Google Cloud Storage Exporter
| Status | |
| ------------- |-----------|
| Stability | [alpha]: logs |
| Stability | [development]: traces |
| | [alpha]: logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fgooglecloudstorage%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fgooglecloudstorage) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fgooglecloudstorage%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fgooglecloudstorage) |
| Code coverage | [![codecov](https://codecov.io/github/open-telemetry/opentelemetry-collector-contrib/graph/main/badge.svg?component=exporter_googlecloudstorage)](https://app.codecov.io/gh/open-telemetry/opentelemetry-collector-contrib/tree/main/?components%5B0%5D=exporter_googlecloudstorage&displayType=list) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@constanca-m](https://www.github.com/constanca-m), [@braydonk](https://www.github.com/braydonk) \| Seeking more code owners! |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
[alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->
Expand All @@ -18,7 +20,7 @@ This exporter writes received OpenTelemetry data to a cloud storage bucket.

| Name | Description | Required | Default |
|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------|
| `encoding` | The encoding extension ID to use for marshaling logs. If left empty, `plog.JSONMarshaler` will be used. | No | |
| `encoding` | The encoding extension ID to use for marshaling logs and traces. If left empty, `plog.JSONMarshaler` will be used for logs and `ptrace.JSONMarshaler` will be used for traces. | No | |
| `bucket.project_id` | The project where the bucket will be created or where it exists. If left empty, it will query the metadata endpoint. It requires the collector to be running in a Google Cloud environment. | No | |
| `bucket.name` | Name for the bucket storage. | Yes | |
| `bucket.file_prefix` | Prefix for the created filename. This prefix is applied after the partition path (if any). | No | `logs` |
Expand Down
74 changes: 67 additions & 7 deletions exporter/googlecloudstorageexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,54 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/api/googleapi"
)

type signalType string

const (
signalTypeLogs signalType = "logs"
signalTypeTraces signalType = "traces"
)

var (
errNotLogsMarshaler = errors.New("extension is not a logs marshaler")
errNotTracesMarshaler = errors.New("extension is not a traces marshaler")
)

var validSignals = []signalType{signalTypeLogs, signalTypeTraces}

type storageExporter struct {
cfg *Config
logsMarshaler plog.Marshaler
tracesMarshaler ptrace.Marshaler
storageClient *storage.Client
bucketHandle *storage.BucketHandle
logger *zap.Logger
partitionFormat *strftime.Strftime
signal signalType
}

var _ exporter.Logs = (*storageExporter)(nil)
var (
_ exporter.Logs = (*storageExporter)(nil)
_ exporter.Traces = (*storageExporter)(nil)
)

func newGCSExporter(
ctx context.Context,
cfg *Config,
logger *zap.Logger,
signal signalType,
) (*storageExporter, error) {
return newStorageExporter(
ctx,
cfg,
metadata.ZoneWithContext,
metadata.ProjectIDWithContext,
logger,
signal,
)
}

Expand All @@ -54,7 +76,15 @@ func newStorageExporter(
getZone func(context.Context) (string, error),
getProjectID func(context.Context) (string, error),
logger *zap.Logger,
signal signalType,
) (*storageExporter, error) {
// Validate signal type
switch signal {
case signalTypeLogs, signalTypeTraces: // valid
default:
return nil, fmt.Errorf("signal type %q not recognized, valid values are %v", signal, validSignals)
}

errMsg := "failed to determine %s: not set in exporter config '%s' and unable to retrieve from metadata: %w"

if cfg.Bucket.Region == "" {
Expand Down Expand Up @@ -85,6 +115,7 @@ func newStorageExporter(
cfg: cfg,
logger: logger,
partitionFormat: partitionFormat,
signal: signal,
}, nil
}

Expand All @@ -97,13 +128,30 @@ func isBucketConflictError(err error) bool {
}

func (s *storageExporter) Start(ctx context.Context, host component.Host) error {
// Initialize default marshalers
s.logsMarshaler = &plog.JSONMarshaler{}
s.tracesMarshaler = &ptrace.JSONMarshaler{}

// Load encoding extension if configured
if s.cfg.Encoding != nil {
encoding, err := loadExtension[plog.Marshaler](host, *s.cfg.Encoding, "logs marshaler")
if err != nil {
return fmt.Errorf("failed to load logs extension: %w", err)
switch s.signal {
case signalTypeLogs:
logsEncoding, err := loadExtension[plog.Marshaler](host, *s.cfg.Encoding, "logs marshaler", errNotLogsMarshaler)
if err != nil {
return fmt.Errorf("failed to load logs extension: %w", err)
}
s.logsMarshaler = logsEncoding
case signalTypeTraces:
tracesEncoding, err := loadExtension[ptrace.Marshaler](host, *s.cfg.Encoding, "traces marshaler", errNotTracesMarshaler)
if err != nil {
if !errors.Is(err, errNotTracesMarshaler) {
return fmt.Errorf("failed to load traces extension: %w", err)
}
s.logger.Warn("Configured encoding extension does not support traces, falling back to JSON marshaler", zap.String("encoding", s.cfg.Encoding.String()))
} else {
s.tracesMarshaler = tracesEncoding
}
}
s.logsMarshaler = encoding
}

// TODO Add option for authenticator
Expand Down Expand Up @@ -154,6 +202,18 @@ func (s *storageExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return nil
}

func (s *storageExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
buf, err := s.tracesMarshaler.MarshalTraces(td)
if err != nil {
return fmt.Errorf("failed to marshal traces: %w", err)
}

if err = s.uploadFile(ctx, buf); err != nil {
return fmt.Errorf("failed to upload traces: %w", err)
}
return nil
}

// generateFilename returns the name of the file to be uploaded.
// It starts from a unique ID, and prepends the partitionFormat and the prefix to it.
func generateFilename(
Expand Down Expand Up @@ -229,15 +289,15 @@ func (s *storageExporter) uploadFile(ctx context.Context, content []byte) (err e
}

// loadExtension tries to load an available extension for the given id.
func loadExtension[T any](host component.Host, id component.ID, extensionType string) (T, error) {
func loadExtension[T any](host component.Host, id component.ID, _ string, errNotMarshaler error) (T, error) {
var zero T
ext, ok := host.GetExtensions()[id]
if !ok {
return zero, fmt.Errorf("unknown extension %q", id)
}
extT, ok := ext.(T)
if !ok {
return zero, fmt.Errorf("extension %q is not a %s", id, extensionType)
return zero, fmt.Errorf("extension %q: %w", id, errNotMarshaler)
}
return extT, nil
}
Loading
Loading