Skip to content

Commit 5994296

Browse files
committed
feat: allow configuring flush interval using time.Duration
1 parent c16d579 commit 5994296

File tree

4 files changed

+36
-25
lines changed

4 files changed

+36
-25
lines changed

senders/client.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@ package senders
22

33
import (
44
"fmt"
5+
"github.com/wavefronthq/wavefront-sdk-go/event"
6+
"github.com/wavefronthq/wavefront-sdk-go/histogram"
7+
"github.com/wavefronthq/wavefront-sdk-go/internal"
58
eventInternal "github.com/wavefronthq/wavefront-sdk-go/internal/event"
69
histogramInternal "github.com/wavefronthq/wavefront-sdk-go/internal/histogram"
710
"github.com/wavefronthq/wavefront-sdk-go/internal/metric"
811
"github.com/wavefronthq/wavefront-sdk-go/internal/span"
9-
"time"
10-
11-
"github.com/wavefronthq/wavefront-sdk-go/event"
12-
"github.com/wavefronthq/wavefront-sdk-go/histogram"
13-
"github.com/wavefronthq/wavefront-sdk-go/internal"
1412
)
1513

1614
// Sender Interface for sending metrics, distributions and spans to Wavefront
@@ -58,16 +56,14 @@ type wavefrontSender struct {
5856
}
5957

6058
func newLineHandler(reporter internal.Reporter, cfg *configuration, format, prefix string, registry *internal.MetricRegistry) *internal.LineHandler {
61-
flushInterval := time.Second * time.Duration(cfg.FlushIntervalSeconds)
62-
6359
opts := []internal.LineHandlerOption{internal.SetHandlerPrefix(prefix), internal.SetRegistry(registry)}
6460
batchSize := cfg.BatchSize
6561
if format == internal.EventFormat {
6662
batchSize = 1
6763
opts = append(opts, internal.SetLockOnThrottledError(true))
6864
}
6965

70-
return internal.NewLineHandler(reporter, format, flushInterval, batchSize, cfg.MaxBufferSize, opts...)
66+
return internal.NewLineHandler(reporter, format, cfg.FlushInterval, batchSize, cfg.MaxBufferSize, opts...)
7167
}
7268

7369
func (sender *wavefrontSender) Start() {

senders/client_factory.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ import (
1616
const (
1717
defaultTracesPort = 30001
1818
defaultMetricsPort = 2878
19-
defaultBatchSize = 10000
20-
defaultBufferSize = 50000
21-
defaultFlushInterval = 1
19+
defaultBatchSize = 10_000
20+
defaultBufferSize = 50_000
21+
defaultFlushInterval = 1 * time.Second
2222
defaultTimeout = 10 * time.Second
2323
)
2424

@@ -48,9 +48,9 @@ type configuration struct {
4848

4949
// interval (in seconds) at which to flush data to Wavefront. defaults to 1 Second.
5050
// together with batch size controls the max theoretical throughput of the sender.
51-
FlushIntervalSeconds int
52-
SDKMetricsTags map[string]string
53-
Path string
51+
FlushInterval time.Duration
52+
SDKMetricsTags map[string]string
53+
Path string
5454

5555
Timeout time.Duration
5656

@@ -86,13 +86,13 @@ func NewSender(wfURL string, setters ...Option) (Sender, error) {
8686
// CreateConfig is for internal use only.
8787
func CreateConfig(wfURL string, setters ...Option) (*configuration, error) {
8888
cfg := &configuration{
89-
MetricsPort: defaultMetricsPort,
90-
TracesPort: defaultTracesPort,
91-
BatchSize: defaultBatchSize,
92-
MaxBufferSize: defaultBufferSize,
93-
FlushIntervalSeconds: defaultFlushInterval,
94-
SDKMetricsTags: map[string]string{},
95-
Timeout: defaultTimeout,
89+
MetricsPort: defaultMetricsPort,
90+
TracesPort: defaultTracesPort,
91+
BatchSize: defaultBatchSize,
92+
MaxBufferSize: defaultBufferSize,
93+
FlushInterval: defaultFlushInterval,
94+
SDKMetricsTags: map[string]string{},
95+
Timeout: defaultTimeout,
9696
}
9797

9898
u, err := url.Parse(wfURL)
@@ -221,7 +221,14 @@ func MaxBufferSize(n int) Option {
221221
// FlushIntervalSeconds set the interval (in seconds) at which to flush data to Wavefront. Defaults to 1 Second.
222222
func FlushIntervalSeconds(n int) Option {
223223
return func(cfg *configuration) {
224-
cfg.FlushIntervalSeconds = n
224+
cfg.FlushInterval = time.Second * time.Duration(n)
225+
}
226+
}
227+
228+
// FlushInterval set the interval at which to flush data to Wavefront. Defaults to 1 Second.
229+
func FlushInterval(interval time.Duration) Option {
230+
return func(cfg *configuration) {
231+
cfg.FlushInterval = interval
225232
}
226233
}
227234

senders/client_factory_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func TestDefaults(t *testing.T) {
9797
require.NoError(t, err)
9898

9999
assert.Equal(t, 10000, cfg.BatchSize)
100-
assert.Equal(t, 1, cfg.FlushIntervalSeconds)
100+
assert.Equal(t, 1*time.Second, cfg.FlushInterval)
101101
assert.Equal(t, 50000, cfg.MaxBufferSize)
102102
assert.Equal(t, 2878, cfg.MetricsPort)
103103
assert.Equal(t, 30001, cfg.TracesPort)
@@ -116,7 +116,14 @@ func TestFlushIntervalSeconds(t *testing.T) {
116116
cfg, err := senders.CreateConfig("https://localhost", senders.FlushIntervalSeconds(123))
117117
require.NoError(t, err)
118118

119-
assert.Equal(t, 123, cfg.FlushIntervalSeconds)
119+
assert.Equal(t, 123*time.Second, cfg.FlushInterval)
120+
}
121+
122+
func TestFlushInterval(t *testing.T) {
123+
cfg, err := senders.CreateConfig("https://localhost", senders.FlushInterval(1*time.Hour))
124+
require.NoError(t, err)
125+
126+
assert.Equal(t, 1*time.Hour, cfg.FlushInterval)
120127
}
121128

122129
func TestMaxBufferSize(t *testing.T) {

senders/example_newsender_options_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package senders_test
33
import (
44
"crypto/tls"
55
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
6+
"time"
67
)
78

89
func ExampleNewSender_options() {
910
// NewSender accepts optional arguments. Use these if you need to set non-default ports for your Wavefront Proxy, tune batching parameters, or set tags for internal SDK metrics.
1011
sender, err := wavefront.NewSender(
1112
"http://localhost",
1213
wavefront.BatchSize(20000), // Send batches of 20,000.
13-
wavefront.FlushIntervalSeconds(5), // Flush every 5 seconds.
14+
wavefront.FlushInterval(5*time.Second), // Flush every 5 seconds.
1415
wavefront.MetricsPort(4321), // Use port 4321 for metrics.
1516
wavefront.TracesPort(40001), // Use port 40001 for traces.
1617
wavefront.Timeout(15), // Set an HTTP timeout in seconds (default is 10s)

0 commit comments

Comments
 (0)