Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit ec13cb9

Browse files
committed
WIP: fix tools
left to do: mt-kafka-mdm-sniff-out-of-order mt-replicator-via-tsdb mt-store-cat mt-whisper-importer-reader mt-whisper-importer-writer
1 parent 8164a1e commit ec13cb9

File tree

2 files changed

+40
-14
lines changed

2 files changed

+40
-14
lines changed

cmd/mt-index-migrate/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,13 @@ func getDefs(session *gocql.Session, defsChan chan *schema.MetricDefinition) {
151151
var lastupdate int64
152152
var tags []string
153153
for iter.Scan(&id, &orgId, &partition, &name, &metric, &interval, &unit, &mtype, &tags, &lastupdate) {
154+
mkey, err := schema.MKeyFromString(id)
155+
if err != nil {
156+
log.Error(3, "could not parse ID %q: %s -> skipping", id, err)
157+
continue
158+
}
154159
mdef := schema.MetricDefinition{
155-
Id: id,
160+
Id: mkey,
156161
OrgId: orgId,
157162
Partition: partition,
158163
Name: name,

cmd/mt-kafka-mdm-sniff/main.go

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,42 +21,63 @@ import (
2121

2222
var (
2323
confFile = flag.String("config", "/etc/metrictank/metrictank.ini", "configuration file path")
24-
format = flag.String("format", "{{.Part}} {{.OrgId}} {{.Id}} {{.Name}} {{.Metric}} {{.Interval}} {{.Value}} {{.Time}} {{.Unit}} {{.Mtype}} {{.Tags}}", "template to render the data with")
24+
formatMd = flag.String("format-md", "{{.Part}} {{.OrgId}} {{.Id}} {{.Name}} {{.Metric}} {{.Interval}} {{.Value}} {{.Time}} {{.Unit}} {{.Mtype}} {{.Tags}}", "template to render MetricData with")
25+
formatP = flag.String("format-point", "{{.Part}} {{.MKey}} {{.Value}} {{.Time}}", "template to render MetricPoint data with")
2526
prefix = flag.String("prefix", "", "only show metrics that have this prefix")
2627
substr = flag.String("substr", "", "only show metrics that have this substring")
2728

2829
stdoutLock = sync.Mutex{}
2930
)
3031

31-
type Data struct {
32+
type DataMd struct {
3233
Part int32
3334
schema.MetricData
3435
}
3536

37+
type DataP struct {
38+
Part int32
39+
schema.MetricPoint
40+
}
41+
3642
type inputPrinter struct {
37-
template.Template
38-
data Data
43+
tplMd template.Template
44+
tplP template.Template
3945
}
4046

41-
func newInputPrinter(format string) inputPrinter {
42-
tpl := template.Must(template.New("format").Parse(format + "\n"))
47+
func newInputPrinter(formatMd, formatP string) inputPrinter {
48+
tplMd := template.Must(template.New("format").Parse(formatMd + "\n"))
49+
tplP := template.Must(template.New("format").Parse(formatP + "\n"))
4350
return inputPrinter{
44-
*tpl,
45-
Data{},
51+
*tplMd,
52+
*tplP,
4653
}
4754
}
4855

49-
func (ip inputPrinter) Process(metric *schema.MetricData, partition int32) {
56+
func (ip inputPrinter) ProcessMetricData(metric *schema.MetricData, partition int32) {
5057
if *prefix != "" && !strings.HasPrefix(metric.Metric, *prefix) {
5158
return
5259
}
5360
if *substr != "" && !strings.Contains(metric.Metric, *substr) {
5461
return
5562
}
56-
ip.data.MetricData = *metric
57-
ip.data.Part = partition
5863
stdoutLock.Lock()
59-
err := ip.Execute(os.Stdout, ip.data)
64+
err := ip.tplMd.Execute(os.Stdout, DataMd{
65+
partition,
66+
*metric,
67+
})
68+
69+
stdoutLock.Unlock()
70+
if err != nil {
71+
log.Error(0, "executing template: %s", err)
72+
}
73+
}
74+
75+
func (ip inputPrinter) ProcessMetricPoint(point schema.MetricPoint, partition int32) {
76+
stdoutLock.Lock()
77+
err := ip.tplP.Execute(os.Stdout, DataP{
78+
partition,
79+
point,
80+
})
6081
stdoutLock.Unlock()
6182
if err != nil {
6283
log.Error(0, "executing template: %s", err)
@@ -102,7 +123,7 @@ func main() {
102123

103124
mdm := inKafkaMdm.New()
104125
pluginFatal := make(chan struct{})
105-
mdm.Start(newInputPrinter(*format), pluginFatal)
126+
mdm.Start(newInputPrinter(*formatMd, *formatP), pluginFatal)
106127
sigChan := make(chan os.Signal, 1)
107128
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
108129
select {

0 commit comments

Comments
 (0)