Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 0150249

Browse files
committed
input/idx: update for new MetricPoint type and messages
1 parent 2f4cd6e commit 0150249

File tree

5 files changed

+20
-36
lines changed

5 files changed

+20
-36
lines changed

idx/cassandra/cassandra.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,17 +264,13 @@ func (c *CasIdx) Stop() {
264264
c.session.Close()
265265
}
266266

267-
func (c *CasIdx) UpdateMaybe(point schema.MetricPointId2, partition int32) (idx.Archive, bool) {
267+
func (c *CasIdx) UpdateMaybe(point schema.MetricPoint, partition int32) (idx.Archive, bool) {
268268
pre := time.Now()
269-
mkey := schema.MKey{
270-
Key: point.MetricPointId1.Id,
271-
Org: point.Org,
272-
}
273269

274270
// note that both functions return an 'ok' bool.
275271
// abeit very unlikely,
276272
// the idx entry could be pruned in between the two calls and so they could be different
277-
existing, inMemory := c.MemoryIdx.Get(mkey)
273+
existing, inMemory := c.MemoryIdx.Get(point.MKey)
278274
archive, inMemory2 := c.MemoryIdx.UpdateMaybe(point, partition)
279275

280276
if !updateCassIdx {

idx/idx.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type MetricIndex interface {
5858

5959
// UpdateMaybe updates an existing archive, if found.
6060
// it returns the existing archive (if any), and whether it was found
61-
UpdateMaybe(point schema.MetricPointId2, partition int32) (Archive, bool)
61+
UpdateMaybe(point schema.MetricPoint, partition int32) (Archive, bool)
6262

6363
// AddOrUpdate makes sure a metric is known in the index,
6464
// and should be called for every received metric.

idx/memory/memory.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -208,21 +208,16 @@ func (m *MemoryIdx) Stop() {
208208

209209
// UpdateMaybe updates an existing archive, if found.
210210
// it returns the existing archive (if any), and whether it was found
211-
func (m *MemoryIdx) UpdateMaybe(point schema.MetricPointId2, partition int32) (idx.Archive, bool) {
211+
func (m *MemoryIdx) UpdateMaybe(point schema.MetricPoint, partition int32) (idx.Archive, bool) {
212212
pre := time.Now()
213213

214-
mkey := schema.MKey{
215-
Key: point.MetricPointId1.Id,
216-
Org: point.Org,
217-
}
218-
219214
m.Lock()
220215
defer m.Unlock()
221216

222-
existing, ok := m.defById[mkey]
217+
existing, ok := m.defById[point.MKey]
223218
if ok {
224-
log.Debug("metricDef with id %v already in index", mkey)
225-
existing.LastUpdate = int64(point.MetricPointId1.Time)
219+
log.Debug("metricDef with id %v already in index", point.MKey)
220+
existing.LastUpdate = int64(point.Time)
226221
existing.Partition = partition
227222
statUpdate.Inc()
228223
statUpdateDuration.Value(time.Since(pre))

input/input.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616

1717
type Handler interface {
1818
ProcessMetricData(md *schema.MetricData, partition int32)
19-
ProcessMetricPoint(point schema.MetricPointId2, partition int32)
19+
ProcessMetricPoint(point schema.MetricPoint, partition int32)
2020
}
2121

2222
// TODO: clever way to document all metrics for all different inputs
@@ -48,17 +48,13 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input
4848

4949
// ProcessMetricPoint updates the index if possible, and stores the data if we have an index entry
5050
// concurrency-safe.
51-
func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPointId2, partition int32) {
51+
func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, partition int32) {
5252
in.metricsReceived.Inc()
5353
if !point.Valid() {
5454
in.MetricInvalid.Inc()
5555
log.Debug("in: Invalid metric %v", point)
5656
return
5757
}
58-
mkey := schema.MKey{
59-
Key: point.MetricPointId1.Id,
60-
Org: point.Org,
61-
}
6258

6359
pre := time.Now()
6460
archive, ok := in.metricIndex.UpdateMaybe(point, partition)
@@ -69,8 +65,8 @@ func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPointId2, partiti
6965
}
7066

7167
pre = time.Now()
72-
m := in.metrics.GetOrCreate(mkey, archive.SchemaId, archive.AggId)
73-
m.Add(point.MetricPointId1.Time, point.MetricPointId1.Value)
68+
m := in.metrics.GetOrCreate(point.MKey, archive.SchemaId, archive.AggId)
69+
m.Add(point.Time, point.Value)
7470
in.pressureTank.Add(int(time.Since(pre).Nanoseconds()))
7571

7672
}

input/kafkamdm/kafkamdm.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
schema "gopkg.in/raintank/schema.v1"
12+
"gopkg.in/raintank/schema.v1/msg"
1213

1314
"github.com/Shopify/sarama"
1415
"github.com/raintank/worldping-api/pkg/log"
@@ -71,7 +72,7 @@ var partitionLag map[int32]*stats.Gauge64
7172
func ConfigSetup() {
7273
inKafkaMdm := flag.NewFlagSet("kafka-mdm-in", flag.ExitOnError)
7374
inKafkaMdm.BoolVar(&Enabled, "enabled", false, "")
74-
inKafkaMdm.UintVar(&orgId, "org-id", 0, "For incoming MetricPointId1 messages, assume this org id")
75+
inKafkaMdm.UintVar(&orgId, "org-id", 0, "For incoming MetricPoint messages, assume this org id")
7576
inKafkaMdm.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be be given multiple times as a comma-separated list)")
7677
inKafkaMdm.StringVar(&topicStr, "topics", "mdm", "kafka topic (may be given multiple times as a comma-separated list)")
7778
inKafkaMdm.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
@@ -352,17 +353,13 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
352353
}
353354

354355
func (k *KafkaMdm) handleMsg(data []byte, partition int32) {
355-
if len(data) == 29 && data[0] == 0 {
356-
var point schema.MetricPointId2
357-
point.MetricPointId1.UnmarshalDirect(data[1:])
358-
point.Org = uint32(orgId)
359-
k.Handler.ProcessMetricPoint(point, partition)
360-
return
361-
}
362-
363-
if len(data) == 33 && data[0] == 1 {
364-
var point schema.MetricPointId2
365-
point.UnmarshalDirect(data[1:])
356+
if msg.IsPointMsg(data) {
357+
_, point, err := msg.ReadPointMsg(data, uint32(orgId))
358+
if err != nil {
359+
metricsDecodeErr.Inc()
360+
log.Error(3, "kafka-mdm decode error, skipping message. %s", err)
361+
return
362+
}
366363
k.Handler.ProcessMetricPoint(point, partition)
367364
return
368365
}

0 commit comments

Comments
 (0)