Skip to content

Commit 08415cb

Browse files
authored
Support configurable bucket ranges for bulk_process_time Histogram (digitalocean#48)
* node/elasticsearch: support configurable bucket range for indexing time * helpers.go, node/elasticsearch: allow user configuration of configurable bucket range represented as float64 * docs/node-elasticsearch.md: document configurable indexing time bucket, and format tables so that my IDE stops shouting * node/elasticsearch/metrics_test.go: test that n buckets exist, ignoring values. also, stop testing behaviors specific to prometheus, as that is not firebolts concern
1 parent 1c4b91e commit 08415cb

File tree

7 files changed

+131
-24
lines changed

7 files changed

+131
-24
lines changed

docs/node-elasticsearch.md

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,29 @@ The `elasticsearch` node indexes documents to an Elasticsearch cluster. Its pa
1111

1212
The IndexRequest type contains four fields:
1313

14-
Field | Required | Default | Description
15-
------------------------|:--------:|---------|--------------
16-
Index | * | | Destination index name in Elasticsearch.
17-
MappingType | | | In ES 7.x+, omit MappingType and ES will default to `_doc`. In ES 6.x-, specify your mapping type.
18-
DocID | | | Provide a document ID to index this document under, or leave nil and ES will assign an ID.
19-
Doc | * | | The document to index. This should be a struct that is marshallable to JSON.
14+
| Field | Required | Default | Description |
15+
|-------------|:--------:|---------|-----------------------------------------------------------------------------------------------------|
16+
| Index | * | | Destination index name in Elasticsearch. |
17+
| MappingType | | | In ES 7.x+, omit MappingType and ES will default to `_doc`. In ES 6.x-, specify your mapping type. |
18+
| DocID | | | Provide a document ID to index this document under, or leave nil and ES will assign an ID. |
19+
| Doc | * | | The document to index. This should be a struct that is marshallable to JSON. |
2020

2121
Internally, `elasticsearch` uses the `BulkService` in the `olivere/elastic` client.
2222

2323
## Configuration
2424

25-
Param | Required | Default | Description
26-
--------------------------|:--------:|---------|--------------
27-
elastic-addr | * | | Comma-separated list of Elasticsearch client nodes.
28-
elastic-username | | | Username used in http/basic authentication.
29-
elastic-password | | | Password used in http/basic authentication.
30-
batch-size | | 100 | Wait until this many documents are collected and send them as a single batch.
31-
batch-max-wait-ms | | 1000 | Max time, in ms, to wait for `batch-size` documents to be ready before sending a smaller batch.
32-
bulk-index-timeout-ms | | 5000 | Timeout passed to Elasticsearch along with the bulk index request.
33-
reconnect-batch-count | | 10000 | Reestablish connections to ES after this many batches. Useful to ensure that load remains distributed among ES client nodes as they are created or fail.
34-
bulk-index-max-retries | | 3 | Number of times to retry indexing errors. Mapping errors / field type conflicts are not retryable.
35-
bulk-index-timeout-seconds| | 20 | Forcibly cancel individual bulk indexing operations after this time.
36-
index-workers | | 1 | Number of goroutine workers to use to process batch indexing operations against Elasticsearch.
25+
| Param | Required | Default | Description |
26+
|----------------------------|:--------:|----------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
27+
| elastic-addr | * | | Comma-separated list of Elasticsearch client nodes. |
28+
| elastic-username | | | Username used in http/basic authentication. |
29+
| elastic-password | | | Password used in http/basic authentication. |
30+
| batch-size | | 100 | Wait until this many documents are collected and send them as a single batch. |
31+
| batch-max-wait-ms | | 1000 | Max time, in ms, to wait for `batch-size` documents to be ready before sending a smaller batch. |
32+
| bulk-index-timeout-ms | | 5000 | Timeout passed to Elasticsearch along with the bulk index request. |
33+
| reconnect-batch-count | | 10000 | Reestablish connections to ES after this many batches. Useful to ensure that load remains distributed among ES client nodes as they are created or fail. |
34+
| bulk-index-max-retries | | 3 | Number of times to retry indexing errors. Mapping errors / field type conflicts are not retryable. |
35+
| bulk-index-timeout-seconds | | 20 | Forcibly cancel individual bulk indexing operations after this time. |
36+
| index-workers | | 1 | Number of goroutine workers to use to process batch indexing operations against Elasticsearch. |
37+
| histogram-min-bucket-sec | | 0.01 | Smallest bucket describing observed Elasticsearch client indexing time. |
38+
| histogram-max-bucket-sec | | 2 * (bulk-index-timeout-ms/1000) | Maximum bucket describing observed Elasticsearch client indexing time. |
39+
| histogram-bucket-count | | 8 | Number of buckets to generate between min and max histogram bucket seconds. |

helpers.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,35 @@ func (c Nodeconfig) StringConfigRequired(name string) (string, error) {
6060
}
6161
return userValue, nil
6262
}
63+
64+
// Float64Config validates and fetches the flaot-typed optional config value specified by 'name', using the 'defaultValue' if
65+
// no value was provided in the configuration. The default float64 (if used) is formatted following platform-and-golang
66+
// default precision and width (%f formatting).
67+
func (c Nodeconfig) Float64Config(name string, defaultValue float64, minValue float64, maxValue float64) (float64, error) {
68+
// set the default value, if not provided
69+
_, ok := c[name]
70+
if !ok {
71+
c[name] = fmt.Sprintf("%f", defaultValue)
72+
}
73+
74+
return c.Float64ConfigRequired(name, minValue, maxValue)
75+
}
76+
77+
// Float64ConfigRequired validates and fetches the float64-typed required config value specified by 'name', returning an error
78+
// if no value was provided in the configuration.
79+
func (c Nodeconfig) Float64ConfigRequired(name string, minValue, maxValue float64) (float64, error) {
80+
userValue, ok := c[name]
81+
if !ok {
82+
return 0, fmt.Errorf("missing config value [%s]", name)
83+
}
84+
85+
f64Value, err := strconv.ParseFloat(userValue, 64)
86+
if err != nil {
87+
return 0, fmt.Errorf("expected float64 value for config [%s]", name)
88+
}
89+
90+
if f64Value > maxValue || f64Value < minValue {
91+
return 0, fmt.Errorf("config value [%s] requires value between [%f] and [%f]", name, minValue, maxValue)
92+
}
93+
return f64Value, nil
94+
}

node/elasticsearch/connectionfactory_int_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestReconnect(t *testing.T) {
1717
metrics.Init("elasticsearch")
1818

1919
metrics := &Metrics{}
20-
metrics.RegisterElasticIndexMetrics()
20+
metrics.RegisterElasticIndexMetrics(0.1, 20.0, 8)
2121

2222
cf := newEsBulkServiceFactory(context.TODO(), "http://localhost:9200", "", "", 3, 10000, metrics)
2323

node/elasticsearch/elasticsearch.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,24 @@ func (i *Elasticsearch) Setup(cfgMap map[string]string) error {
8282
return err
8383
}
8484

85+
bulkProcessHistogramMin, err := config.Float64Config("histogram-min-bucket-sec", 0.01, 0.01, math.MaxFloat64)
86+
if err != nil {
87+
return err
88+
}
89+
90+
bulkProcessHistogramMax, err := config.Float64Config("histogram-max-bucket-sec", float64(2*(bulkIndexTimeoutMs/1000)), 0.01, math.MaxFloat64)
91+
if err != nil {
92+
return err
93+
}
94+
95+
bulkProcessingHistogramBuckets, err := config.IntConfig("histogram-bucket-count", 8, 1, math.MaxInt)
96+
if err != nil {
97+
return err
98+
}
99+
85100
// initialize metrics
86101
metrics := &Metrics{}
87-
metrics.RegisterElasticIndexMetrics()
102+
metrics.RegisterElasticIndexMetrics(bulkProcessHistogramMin, bulkProcessHistogramMax, bulkProcessingHistogramBuckets)
88103

89104
// service factory; in tests it must be prepopulated with a mock
90105
if i.serviceFactory == nil {

node/elasticsearch/elasticsearch_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package elasticsearch
22

33
import (
4+
"fmt"
45
"math"
56
"strconv"
67
"testing"
@@ -33,8 +34,21 @@ func TestSetup(t *testing.T) {
3334
assert.Error(t, err)
3435
assert.Equal(t, "expected integer value for config [batch-size]", err.Error())
3536

36-
config["batch-size"] = "10" // clean up the prev err
37-
config["batch-max-wait-ms"] = "-99999999" // less than minvalue
37+
config["batch-size"] = "10" // clean up the prev err
38+
config["histogram-min-bucket-sec"] = "aaaa" // not a float64
39+
err = e.Setup(config)
40+
assert.Error(t, err)
41+
assert.Equal(t, "expected float64 value for config [histogram-min-bucket-sec]", err.Error())
42+
43+
config["histogram-min-bucket-sec"] = "0.01" // clean up the prev err
44+
config["bulk-index-timeout-seconds"] = "20" // needed for calculating max bucket
45+
config["histogram-max-bucket-sec"] = "-0.01" // value out of bounds
46+
err = e.Setup(config)
47+
assert.Error(t, err)
48+
assert.Equal(t, fmt.Sprintf("config value [%s] requires value between [%f] and [%f]", "histogram-max-bucket-sec", 0.01, math.MaxFloat64), err.Error())
49+
50+
config["histogram-max-bucket-sec"] = "40.0" // clean up the prev err
51+
config["batch-max-wait-ms"] = "-99999999" // less than minvalue
3852
err = e.Setup(config)
3953
assert.Error(t, err)
4054
assert.Equal(t, "config value [batch-max-wait-ms] requires value between [1] and [2147483647]", err.Error())

node/elasticsearch/metrics.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,16 @@ type Metrics struct {
1818
AvailableBatchRoutines prometheus.Gauge
1919
}
2020

21+
// generateElasticsearchProcessTimeBuckets generates a bucket slice usable for bucketing the BulkProcessTime (or future
22+
// prometheus.Histogram metrics)
23+
func generateElasticsearchProcessTimeBuckets(min, max float64, count int) []float64 {
24+
return prometheus.ExponentialBucketsRange(min, max, count)
25+
}
26+
2127
// RegisterElasticIndexMetrics initializes metrics and registers them with the prometheus client.
22-
func (m *Metrics) RegisterElasticIndexMetrics() {
28+
// To support user-configurable bucketing of Histogram metrics, a min, max, and count value must be supplied for generating
29+
// exponential buckets
30+
func (m *Metrics) RegisterElasticIndexMetrics(min, max float64, count int) {
2331
m.BulkErrors = *prometheus.NewCounterVec(
2432
prometheus.CounterOpts{
2533
Namespace: metrics.Get().AppMetricsPrefix,
@@ -43,7 +51,7 @@ func (m *Metrics) RegisterElasticIndexMetrics() {
4351
Namespace: metrics.Get().AppMetricsPrefix,
4452
Name: "bulk_process_time",
4553
Help: "Time to write bulk logs to elasticsearch",
46-
Buckets: prometheus.ExponentialBuckets(0.01, 3, 8),
54+
Buckets: generateElasticsearchProcessTimeBuckets(min, max, count),
4755
},
4856
)
4957

node/elasticsearch/metrics_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package elasticsearch
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func Test_generateElasticsearchProcessTimeBuckets(t *testing.T) {
10+
type args struct {
11+
min float64
12+
max float64
13+
count int
14+
}
15+
tests := []struct {
16+
name string
17+
args args
18+
wantCount int
19+
}{
20+
{
21+
name: "creates expected number of default buckets",
22+
args: args{
23+
min: 0.01,
24+
max: 10,
25+
count: 8,
26+
},
27+
wantCount: 8,
28+
},
29+
}
30+
for _, tt := range tests {
31+
t.Run(tt.name, func(t *testing.T) {
32+
assert.Equalf(t, tt.wantCount, len(generateElasticsearchProcessTimeBuckets(tt.args.min, tt.args.max, tt.args.count)), "generateElasticsearchProcessTimeBuckets(%v, %v, %v)", tt.args.min, tt.args.max, tt.args.count)
33+
})
34+
}
35+
}

0 commit comments

Comments
 (0)