Skip to content

Commit 0b242f0

Browse files
Alex-Souslikconstanca-m
authored andcommitted
[exporter/googlecloudstorage] add compression support (open-telemetry#45384)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Add compression support for the Google Cloud Storage exporter, allowing users to compress log data before uploading to GCS to reduce storage costs and transfer times. Supports gzip and zstd compression algorithms with automatic file extension handling (.gz, .zst). <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#45337 <!--Describe what testing was performed and which tests were added.--> #### Testing - Added comprehensive unit tests for compression functionality including gzip and zstd compression verification - Tests verify that compressed data can be properly decompressed back to original content - Tests verify correct file extensions (.gz, .zst) are applied to compressed files - Tests verify compression reduces data size compared to uncompressed data - All existing tests continue to pass, ensuring backward compatibility - Added configuration validation tests for compression settings <!--Describe the documentation added.--> #### Documentation - Updated README.md with compression configuration options and examples - Added usage examples showing how to enable gzip/zstd compression - Documented the `bucket.compression` configuration parameter <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Alex <alex.s@akeyless.io> Co-authored-by: Constança Manteigas <113898685+constanca-m@users.noreply.github.com>
1 parent 50967f1 commit 0b242f0

File tree

10 files changed

+404
-5
lines changed

10 files changed

+404
-5
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
5+
component: exporter/googlecloudstorage
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add compression support for Google Cloud Storage exporter
9+
10+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
11+
issues: [45337]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
The Google Cloud Storage exporter now supports compression of log data before uploading to GCS.
18+
Supported compression algorithms: `gzip` and `zstd`.
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

exporter/googlecloudstorageexporter/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ This exporter writes received OpenTelemetry data to a cloud storage bucket.
2727
| `bucket.partition` | Configuration for time-based partitioning. See below for details. | No | |
2828
| `bucket.reuse_if_exists` | If true, use the existing bucket if it already exists; if false, error if bucket exists. | No | `false` |
2929
| `bucket.region` | Region where the bucket will be created or where it exists. If left empty, it will query the metadata endpoint. It requires the collector to be running in a Google Cloud environment. | Yes | |
30+
| `bucket.compression` | Compression algorithm used to compress data before uploading. Valid values are `gzip`, `zstd`, or no value set for no compression. | No | |
3031

3132
### Partition Configuration
3233

@@ -59,3 +60,18 @@ extensions:
5960
# text encoding to ensure only the body is placed in the bucket
6061
text_encoding:
6162
```
63+
64+
### Compression Example
65+
66+
```yaml
67+
exporters:
68+
googlecloudstorage:
69+
bucket:
70+
name: compressed-logs-bucket
71+
project_id: my-project
72+
region: us-central1
73+
compression: gzip
74+
file_prefix: logs
75+
partition:
76+
format: "year=%Y/month=%m/day=%d"
77+
```

exporter/googlecloudstorageexporter/config.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ import (
99

1010
"github.com/lestrrat-go/strftime"
1111
"go.opentelemetry.io/collector/component"
12+
"go.opentelemetry.io/collector/config/configcompression"
1213
"go.opentelemetry.io/collector/confmap/xconfmap"
1314
)
1415

1516
var (
16-
errNameRequired = errors.New("name is required")
17-
errFormatInvalid = errors.New("invalid format")
17+
errNameRequired = errors.New("name is required")
18+
errFormatInvalid = errors.New("invalid format")
19+
errUnknownCompression = errors.New("unknown compression type")
1820
)
1921

2022
type Config struct {
@@ -52,6 +54,11 @@ type bucketConfig struct {
5254
// empty, it will query the metadata endpoint. It requires the collector
5355
// to be running in a Google Cloud environment.
5456
Region string `mapstructure:"region"`
57+
58+
// Compression sets the algorithm used to process the payload
59+
// before uploading to GCS.
60+
// Valid values are: `gzip`, `zstd`, or no value set.
61+
Compression configcompression.Type `mapstructure:"compression"`
5562
}
5663

5764
type partitionConfig struct {
@@ -84,6 +91,17 @@ func (c *bucketConfig) Validate() error {
8491
if c.Name == "" {
8592
return errNameRequired
8693
}
94+
95+
compression := c.Compression
96+
if compression.IsCompressed() {
97+
if compression != configcompression.TypeGzip && compression != configcompression.TypeZstd {
98+
return fmt.Errorf(
99+
"%w %q, valid values are %q and %q",
100+
errUnknownCompression, compression,
101+
configcompression.TypeGzip, configcompression.TypeZstd)
102+
}
103+
}
104+
87105
return nil
88106
}
89107

exporter/googlecloudstorageexporter/config.schema.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ properties:
33
bucket:
44
type: object
55
properties:
6+
compression:
7+
description: 'Compression sets the algorithm used to process the payload before uploading to GCS. Valid values are: `gzip`, `zstd`, or no value set.'
8+
$ref: go.opentelemetry.io/collector/config/configcompression.type
69
file_prefix:
710
description: 'FilePrefix holds the prefix for the created filename. This prefix is applied after the partition path (if any). Example: file_prefix: "logs" Result: ".../logs_UUID"'
811
type: string

exporter/googlecloudstorageexporter/config_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,38 @@ func TestValidate(t *testing.T) {
5959
},
6060
},
6161
},
62+
{
63+
id: component.NewIDWithName(metadata.Type, "with_gzip_compression"),
64+
expected: &Config{
65+
Encoding: func() *component.ID {
66+
id := component.MustNewID("test")
67+
return &id
68+
}(),
69+
Bucket: bucketConfig{
70+
Name: "test-bucket",
71+
Region: "test-region",
72+
ProjectID: "test-project-id",
73+
FilePrefix: "logs",
74+
Compression: "gzip",
75+
},
76+
},
77+
},
78+
{
79+
id: component.NewIDWithName(metadata.Type, "with_zstd_compression"),
80+
expected: &Config{
81+
Encoding: func() *component.ID {
82+
id := component.MustNewID("test")
83+
return &id
84+
}(),
85+
Bucket: bucketConfig{
86+
Name: "test-bucket",
87+
Region: "test-region",
88+
ProjectID: "test-project-id",
89+
FilePrefix: "logs",
90+
Compression: "zstd",
91+
},
92+
},
93+
},
6294
{
6395
id: component.NewIDWithName(metadata.Type, "empty_bucket_name"),
6496
expectedErr: errNameRequired,
@@ -67,6 +99,10 @@ func TestValidate(t *testing.T) {
6799
id: component.NewIDWithName(metadata.Type, "invalid_partition_format"),
68100
expectedErr: errFormatInvalid,
69101
},
102+
{
103+
id: component.NewIDWithName(metadata.Type, "unsupported_compression"),
104+
expectedErr: errUnknownCompression,
105+
},
70106
}
71107

72108
for _, tt := range tests {

exporter/googlecloudstorageexporter/exporter.go

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,24 @@
44
package googlecloudstorageexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlecloudstorageexporter"
55

66
import (
7+
"bytes"
8+
"compress/gzip"
79
"context"
810
"errors"
911
"fmt"
12+
"io"
1013
"net/http"
1114
"strings"
15+
"sync"
1216
"time"
1317

1418
"cloud.google.com/go/compute/metadata"
1519
"cloud.google.com/go/storage"
1620
"github.com/google/uuid"
21+
"github.com/klauspost/compress/zstd"
1722
"github.com/lestrrat-go/strftime"
1823
"go.opentelemetry.io/collector/component"
24+
"go.opentelemetry.io/collector/config/configcompression"
1925
"go.opentelemetry.io/collector/consumer"
2026
"go.opentelemetry.io/collector/exporter"
2127
"go.opentelemetry.io/collector/pdata/plog"
@@ -24,6 +30,10 @@ import (
2430
"google.golang.org/api/googleapi"
2531
)
2632

33+
type poolItem interface {
34+
io.WriteCloser
35+
Reset(io.Writer)
36+
}
2737
type signalType string
2838

2939
const (
@@ -46,6 +56,8 @@ type storageExporter struct {
4656
bucketHandle *storage.BucketHandle
4757
logger *zap.Logger
4858
partitionFormat *strftime.Strftime
59+
gzipWriterPool *sync.Pool
60+
zstdWriterPool *sync.Pool
4961
signal signalType
5062
}
5163

@@ -116,6 +128,28 @@ func newStorageExporter(
116128
logger: logger,
117129
partitionFormat: partitionFormat,
118130
signal: signal,
131+
gzipWriterPool: &sync.Pool{
132+
New: func() any {
133+
// Create a new gzip writer that writes to a dummy buffer initially
134+
// It will be reset to the actual destination when used
135+
writer := gzip.NewWriter(io.Discard)
136+
return writer
137+
},
138+
},
139+
zstdWriterPool: &sync.Pool{
140+
New: func() any {
141+
// Create a new zstd writer that writes to a dummy buffer initially
142+
// It will be reset to the actual destination when used
143+
writer, err := zstd.NewWriter(io.Discard)
144+
if err != nil {
145+
logger.Error("failed to create zstd writer for pool, falling back to on-demand creation",
146+
zap.Error(err))
147+
// Return nil on error - sync.Pool will handle this gracefully
148+
return nil
149+
}
150+
return writer
151+
},
152+
},
119153
}, nil
120154
}
121155

@@ -218,6 +252,7 @@ func (s *storageExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) e
218252
// It starts from a unique ID, and prepends the partitionFormat and the prefix to it.
219253
func generateFilename(
220254
uniqueID, filePrefix, partitionPrefix string,
255+
compression configcompression.Type,
221256
partitionFormat *strftime.Strftime,
222257
now time.Time,
223258
) string {
@@ -244,6 +279,15 @@ func generateFilename(
244279
}
245280
filename = partitionPrefix + filename
246281
}
282+
283+
// Add compression extension
284+
switch compression {
285+
case configcompression.TypeGzip:
286+
filename += ".gz"
287+
case configcompression.TypeZstd:
288+
filename += ".zst"
289+
}
290+
247291
return filename
248292
}
249293

@@ -253,13 +297,20 @@ func (s *storageExporter) uploadFile(ctx context.Context, content []byte) (err e
253297
return nil
254298
}
255299

300+
// Compress the content if compression is configured
301+
compressedContent, err := s.compressContent(content)
302+
if err != nil {
303+
return fmt.Errorf("failed to compress content: %w", err)
304+
}
305+
256306
// if we have multiple files coming, we need to make sure the name is unique so they do
257307
// not overwrite each other
258308
uniqueID := uuid.New().String()
259309
filename := generateFilename(
260310
uniqueID,
261311
s.cfg.Bucket.FilePrefix,
262312
s.cfg.Bucket.Partition.Prefix,
313+
s.cfg.Bucket.Compression,
263314
s.partitionFormat,
264315
time.Now().UTC(),
265316
)
@@ -276,18 +327,77 @@ func (s *storageExporter) uploadFile(ctx context.Context, content []byte) (err e
276327
)
277328
}
278329
}()
279-
if _, err = writer.Write(content); err != nil {
330+
if _, err = writer.Write(compressedContent); err != nil {
280331
return fmt.Errorf("failed to write to file: %w", err)
281332
}
282333
s.logger.Info(
283334
"New file uploaded",
284335
zap.String("filename", filename),
285336
zap.String("bucket", s.cfg.Bucket.Name),
286-
zap.Int("size", len(content)),
337+
zap.Int("size", len(compressedContent)),
287338
)
288339
return nil
289340
}
290341

342+
func (s *storageExporter) compressContent(raw []byte) ([]byte, error) {
343+
switch s.cfg.Bucket.Compression {
344+
case configcompression.TypeGzip:
345+
return compress[*gzip.Writer](s.gzipWriterPool, raw, func(w io.Writer) (*gzip.Writer, error) {
346+
return gzip.NewWriter(w), nil
347+
})
348+
case configcompression.TypeZstd:
349+
return compress[*zstd.Encoder](s.zstdWriterPool, raw, func(w io.Writer) (*zstd.Encoder, error) {
350+
return zstd.NewWriter(w)
351+
})
352+
default:
353+
return raw, nil
354+
}
355+
}
356+
357+
func compress[T poolItem](pool *sync.Pool, raw []byte, newItem func(io.Writer) (T, error)) ([]byte, error) {
358+
if pool == nil {
359+
return nil, errors.New("unexpected: compress pool is nil")
360+
}
361+
362+
content := bytes.NewBuffer(nil)
363+
364+
// Get writer from pool or create new one
365+
pooled := pool.Get()
366+
var zipper T
367+
var fromPool bool
368+
if pooled != nil {
369+
if w, ok := pooled.(T); ok {
370+
zipper = w
371+
zipper.Reset(content)
372+
fromPool = true
373+
}
374+
}
375+
if !fromPool {
376+
var err error
377+
zipper, err = newItem(content)
378+
if err != nil {
379+
return nil, err
380+
}
381+
}
382+
383+
// Write the data
384+
if _, err := zipper.Write(raw); err != nil {
385+
// Always close to release resources, but ignore close error on write failure
386+
_ = zipper.Close()
387+
return nil, err
388+
}
389+
390+
// Close the writer
391+
if err := zipper.Close(); err != nil {
392+
return nil, err
393+
}
394+
395+
// Only return the writer to the pool after successful write and close
396+
pool.Put(zipper)
397+
398+
return content.Bytes(), nil
399+
}
400+
291401
// loadExtension tries to load an available extension for the given id.
292402
func loadExtension[T any](host component.Host, id component.ID, _ string, errNotMarshaler error) (T, error) {
293403
var zero T

0 commit comments

Comments
 (0)