Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 1c511b7

Browse files
committed
WIP: update idx
* update MetricDefinition serialization * use MKeys internally instead of string id's * support msg.Point * while we're at it, also document all Index function parameters
1 parent d621404 commit 1c511b7

File tree

4 files changed

+58
-50
lines changed

4 files changed

+58
-50
lines changed

idx/cassandra/cassandra.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func (c *CasIdx) Stop() {
264264
c.session.Close()
265265
}
266266

267-
func (c *CasIdx) AddOrUpdate(data *schema.MetricData, partition int32) idx.Archive {
267+
func (c *CasIdx) AddOrUpdate(point msg.Point, partition int32) idx.Archive {
268268
pre := time.Now()
269269
existing, inMemory := c.MemoryIdx.Get(data.Id)
270270
archive := c.MemoryIdx.AddOrUpdate(data, partition)
@@ -358,8 +358,14 @@ func (c *CasIdx) load(defs []schema.MetricDefinition, iter cqlIterator, cutoff u
358358
var tags []string
359359
cutoff64 := int64(cutoff)
360360
for iter.Scan(&id, &orgId, &partition, &name, &metric, &interval, &unit, &mtype, &tags, &lastupdate) {
361+
mkey, err := schema.MKeyFromString(id)
362+
if err != nil {
363+
log.Error(3, "cassandra-idx: load() could not parse ID %q: %s -> skipping", id, err)
364+
continue
365+
}
366+
361367
mdef := &schema.MetricDefinition{
362-
Id: id,
368+
Id: mkey,
363369
OrgId: orgId,
364370
Partition: partition,
365371
Name: name,
@@ -413,7 +419,7 @@ func (c *CasIdx) processWriteQueue() {
413419
for !success {
414420
if err := c.session.Query(
415421
qry,
416-
req.def.Id,
422+
req.def.Id.String(),
417423
req.def.OrgId,
418424
req.def.Partition,
419425
req.def.Name,

idx/idx.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"errors"
77
"time"
88

9+
"github.com/grafana/metrictank/msg"
10+
911
schema "gopkg.in/raintank/schema.v1"
1012
)
1113

@@ -58,37 +60,35 @@ type MetricIndex interface {
5860

5961
// AddOrUpdate makes sure a metric is known in the index,
6062
// and should be called for every received metric.
61-
AddOrUpdate(*schema.MetricData, int32) Archive
63+
AddOrUpdate(point msg.Point, partition int32) Archive
6264

6365
// Get returns the archive for the requested id.
64-
Get(string) (Archive, bool)
66+
Get(key schema.MKey) (Archive, bool)
6567

6668
// GetPath returns the archives under the given path.
67-
GetPath(int, string) []Archive
69+
GetPath(orgId int, path string) []Archive
6870

69-
// Delete deletes items from the index for the given org and query.
71+
// Delete deletes items from the index
7072
// If the pattern matches a branch node, then
7173
// all leaf nodes on that branch are deleted. So if the pattern is
7274
// "*", all items in the index are deleted.
7375
// It returns a copy of all of the Archives deleted.
74-
Delete(int, string) ([]Archive, error)
76+
Delete(orgId int, pattern string) ([]Archive, error)
7577

76-
// Find searches the index. The method is passed an OrgId, a query
77-
// pattern and a unix timestamp. Searches should return all nodes that match for
78-
// the given OrgId and OrgId -1. The pattern should be handled in the same way
79-
// Graphite would. see https://graphite.readthedocs.io/en/latest/render_api.html#paths-and-wildcards
80-
// And the unix stimestamp is used to ignore series that have been stale since
81-
// the timestamp.
82-
Find(int, string, int64) ([]Node, error)
78+
// Find searches the index for matching nodes.
79+
// * orgId can be -1.
80+
// * pattern is handled like graphite does. see https://graphite.readthedocs.io/en/latest/render_api.html#paths-and-wildcards
81+
// * from is a unix timestamp. series not updated since then are excluded.
82+
Find(orgId int, pattern string, from int64) ([]Node, error)
8383

8484
// List returns all Archives for the passed OrgId, or for all organisations if -1 is provided.
85-
List(int) []Archive
85+
List(orgId int) []Archive
8686

8787
// Prune deletes all metrics from the index for the passed org where
8888
// the last time the metric was seen is older then the passed timestamp. If the org
8989
// passed is -1, then the all orgs should be examined for stale metrics to be deleted.
9090
// It returns all Archives deleted and any error encountered.
91-
Prune(int, time.Time) ([]Archive, error)
91+
Prune(orgId int, oldest time.Time) ([]Archive, error)
9292

9393
// FindByTag takes a list of expressions in the format key<operator>value.
9494
// The allowed operators are: =, !=, =~, !=~.
@@ -98,24 +98,24 @@ type MetricIndex interface {
9898
// where the LastUpdate time is >= from will be returned as results.
9999
// The returned results are not deduplicated and in certain cases it is possible
100100
// that duplicate entries will be returned.
101-
FindByTag(int, []string, int64) ([]Node, error)
101+
FindByTag(orgId int, expressions []string, from int64) ([]Node, error)
102102

103103
// Tags returns a list of all tag keys associated with the metrics of a given
104104
// organization. The return values are filtered by the regex in the second parameter.
105105
// If the third parameter is >0 then only metrics will be accounted of which the
106106
// LastUpdate time is >= the given value.
107-
Tags(int, string, int64) ([]string, error)
107+
Tags(orgId int, filter string, from int64) ([]string, error)
108108

109109
// FindTags generates a list of possible tags that could complete a
110110
// given prefix. It also accepts additional tag conditions to further narrow
111111
// down the result set in the format of graphite's tag queries
112-
FindTags(int, string, []string, int64, uint) ([]string, error)
112+
FindTags(orgId int, prefix string, expressions []string, from int64, limit uint) ([]string, error)
113113

114114
// FindTagValues generates a list of possible values that could
115115
// complete a given value prefix. It requires a tag to be specified and only values
116116
// of the given tag will be returned. It also accepts additional conditions to
117117
// further narrow down the result set in the format of graphite's tag queries
118-
FindTagValues(int, string, string, []string, int64, uint) ([]string, error)
118+
FindTagValues(orgId int, tag string, prefix string, expressions []string, from int64, limit uint) ([]string, error)
119119

120120
// TagDetails returns a list of all values associated with a given tag key in the
121121
// given org. The occurrences of each value is counted and the count is referred to by
@@ -124,9 +124,9 @@ type MetricIndex interface {
124124
// the values before accounting for them.
125125
// If the fourth parameter is > 0 then only those metrics of which the LastUpdate
126126
// time is >= the from timestamp will be included.
127-
TagDetails(int, string, string, int64) (map[string]uint64, error)
127+
TagDetails(orgId int, key string, filter string, from int64) (map[string]uint64, error)
128128

129129
// DeleteTagged deletes the specified series from the tag index and also the
130130
// DefById index.
131-
DeleteTagged(int, []string) ([]Archive, error)
131+
DeleteTagged(orgId int, paths []string) ([]Archive, error)
132132
}

idx/memory/memory.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,15 @@ type Tree struct {
6363
Items map[string]*Node // key is the full path of the node.
6464
}
6565

66-
type IdSet map[string]struct{} // set of ids
66+
type IdSet map[schema.MKey]struct{} // set of ids
6767

6868
func (ids IdSet) String() string {
6969
var res string
7070
for id := range ids {
7171
if len(res) > 0 {
7272
res += " "
7373
}
74-
res += id
74+
res += id.String()
7575
}
7676
return res
7777

@@ -80,7 +80,7 @@ func (ids IdSet) String() string {
8080
type TagValue map[string]IdSet // value -> set of ids
8181
type TagIndex map[string]TagValue // key -> list of values
8282

83-
func (t *TagIndex) addTagId(name, value string, id string) {
83+
func (t *TagIndex) addTagId(name, value string, id schema.MKey) {
8484
ti := *t
8585
if _, ok := ti[name]; !ok {
8686
ti[name] = make(TagValue)
@@ -91,7 +91,7 @@ func (t *TagIndex) addTagId(name, value string, id string) {
9191
ti[name][value][id] = struct{}{}
9292
}
9393

94-
func (t *TagIndex) delTagId(name, value string, id string) {
94+
func (t *TagIndex) delTagId(name, value string, id schema.MKey) {
9595
ti := *t
9696

9797
delete(ti[name][value], id)
@@ -155,7 +155,7 @@ func (defs defByTagSet) defs(id int, fullName string) map[*schema.MetricDefiniti
155155
type Node struct {
156156
Path string
157157
Children []string
158-
Defs []string
158+
Defs []schema.MKey
159159
}
160160

161161
func (n *Node) HasChildren() bool {
@@ -179,7 +179,7 @@ type MemoryIdx struct {
179179

180180
// used for both hierarchy and tag index, so includes all MDs, with
181181
// and without tags. It also mixes all orgs into one flat map.
182-
defById map[string]*idx.Archive // by ID string
182+
defById map[schema.MKey]*idx.Archive
183183

184184
// used by hierarchy index only
185185
tree map[int]*Tree // by orgId
@@ -191,7 +191,7 @@ type MemoryIdx struct {
191191

192192
func New() *MemoryIdx {
193193
return &MemoryIdx{
194-
defById: make(map[string]*idx.Archive),
194+
defById: make(map[schema.MKey]*idx.Archive),
195195
defByTagSet: make(defByTagSet),
196196
tree: make(map[int]*Tree),
197197
tags: make(map[int]TagIndex),
@@ -359,7 +359,7 @@ func (m *MemoryIdx) add(def *schema.MetricDefinition) idx.Archive {
359359
root := &Node{
360360
Path: "",
361361
Children: make([]string, 0),
362-
Defs: make([]string, 0),
362+
Defs: make([]schema.MKey, 0),
363363
}
364364
m.tree[def.OrgId] = &Tree{
365365
Items: map[string]*Node{"": root},
@@ -396,7 +396,7 @@ func (m *MemoryIdx) add(def *schema.MetricDefinition) idx.Archive {
396396
tree.Items[branch] = &Node{
397397
Path: branch,
398398
Children: []string{prevNode},
399-
Defs: make([]string, 0),
399+
Defs: make([]schema.MKey, 0),
400400
}
401401

402402
prevPos = pos
@@ -416,15 +416,15 @@ func (m *MemoryIdx) add(def *schema.MetricDefinition) idx.Archive {
416416
tree.Items[path] = &Node{
417417
Path: path,
418418
Children: []string{},
419-
Defs: []string{def.Id},
419+
Defs: []schema.MKey{def.Id},
420420
}
421421
m.defById[def.Id] = archive
422422
statAdd.Inc()
423423

424424
return *archive
425425
}
426426

427-
func (m *MemoryIdx) Get(id string) (idx.Archive, bool) {
427+
func (m *MemoryIdx) Get(id schema.MKey) (idx.Archive, bool) {
428428
pre := time.Now()
429429
m.RLock()
430430
defer m.RUnlock()

idx/memory/tag_query.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"sync"
1010
"sync/atomic"
1111

12+
schema "gopkg.in/raintank/schema.v1"
13+
1214
"github.com/grafana/metrictank/idx"
1315
"github.com/raintank/worldping-api/pkg/log"
1416
)
@@ -97,8 +99,8 @@ type TagQuery struct {
9799

98100
startWith match // choses the first clause to generate the initial result set (one of EQUAL PREFIX MATCH MATCH_TAG PREFIX_TAG)
99101

100-
index TagIndex // the tag index, hierarchy of tags & values, set by Run()/RunGetTags()
101-
byId map[string]*idx.Archive // the metric index by ID, set by Run()/RunGetTags()
102+
index TagIndex // the tag index, hierarchy of tags & values, set by Run()/RunGetTags()
103+
byId map[schema.MKey]*idx.Archive // the metric index by ID, set by Run()/RunGetTags()
102104

103105
wg *sync.WaitGroup
104106
}
@@ -333,7 +335,7 @@ func NewTagQuery(expressions []string, from int64) (TagQuery, error) {
333335
}
334336

335337
// getInitialByEqual generates the initial resultset by executing the given equal expression
336-
func (q *TagQuery) getInitialByEqual(expr kv, idCh chan string, stopCh chan struct{}) {
338+
func (q *TagQuery) getInitialByEqual(expr kv, idCh chan schema.MKey, stopCh chan struct{}) {
337339
defer q.wg.Done()
338340

339341
KEYS:
@@ -349,7 +351,7 @@ KEYS:
349351
}
350352

351353
// getInitialByPrefix generates the initial resultset by executing the given prefix match expression
352-
func (q *TagQuery) getInitialByPrefix(expr kv, idCh chan string, stopCh chan struct{}) {
354+
func (q *TagQuery) getInitialByPrefix(expr kv, idCh chan schema.MKey, stopCh chan struct{}) {
353355
defer q.wg.Done()
354356

355357
VALUES:
@@ -371,7 +373,7 @@ VALUES:
371373
}
372374

373375
// getInitialByMatch generates the initial resultset by executing the given match expression
374-
func (q *TagQuery) getInitialByMatch(expr kvRe, idCh chan string, stopCh chan struct{}) {
376+
func (q *TagQuery) getInitialByMatch(expr kvRe, idCh chan schema.MKey, stopCh chan struct{}) {
375377
defer q.wg.Done()
376378

377379
// shortcut if value == nil.
@@ -412,7 +414,7 @@ VALUES2:
412414

413415
// getInitialByTagPrefix generates the initial resultset by creating a list of
414416
// metric IDs of which at least one tag starts with the defined prefix
415-
func (q *TagQuery) getInitialByTagPrefix(idCh chan string, stopCh chan struct{}) {
417+
func (q *TagQuery) getInitialByTagPrefix(idCh chan schema.MKey, stopCh chan struct{}) {
416418
defer q.wg.Done()
417419

418420
TAGS:
@@ -437,7 +439,7 @@ TAGS:
437439

438440
// getInitialByTagMatch generates the initial resultset by creating a list of
439441
// metric IDs of which at least one tag matches the defined regex
440-
func (q *TagQuery) getInitialByTagMatch(idCh chan string, stopCh chan struct{}) {
442+
func (q *TagQuery) getInitialByTagMatch(idCh chan schema.MKey, stopCh chan struct{}) {
441443
defer q.wg.Done()
442444

443445
TAGS:
@@ -461,8 +463,8 @@ TAGS:
461463
// getInitialIds asynchronously collects all ID's of the initial result set. It returns:
462464
// a channel through which the IDs of the initial result set will be sent
463465
// a stop channel, which when closed, will cause it to abort the background worker.
464-
func (q *TagQuery) getInitialIds() (chan string, chan struct{}) {
465-
idCh := make(chan string, 1000)
466+
func (q *TagQuery) getInitialIds() (chan schema.MKey, chan struct{}) {
467+
idCh := make(chan schema.MKey, 1000)
466468
stopCh := make(chan struct{})
467469
q.wg.Add(1)
468470

@@ -492,7 +494,7 @@ func (q *TagQuery) getInitialIds() (chan string, chan struct{}) {
492494
// all required tests in order to decide whether this metric should be part
493495
// of the final result set or not
494496
// in map/reduce terms this is the reduce function
495-
func (q *TagQuery) testByAllExpressions(id string, def *idx.Archive, omitTagFilters bool) bool {
497+
func (q *TagQuery) testByAllExpressions(id schema.MKey, def *idx.Archive, omitTagFilters bool) bool {
496498
if !q.testByFrom(def) {
497499
return false
498500
}
@@ -695,7 +697,7 @@ func (q *TagQuery) testByTagPrefix(def *idx.Archive) bool {
695697
}
696698

697699
// testByEqual filters a given metric by the defined "=" expressions
698-
func (q *TagQuery) testByEqual(id string, exprs []kv, not bool) bool {
700+
func (q *TagQuery) testByEqual(id schema.MKey, exprs []kv, not bool) bool {
699701
for _, e := range exprs {
700702
indexIds := q.index[e.key][e.value]
701703

@@ -722,7 +724,7 @@ func (q *TagQuery) testByEqual(id string, exprs []kv, not bool) bool {
722724
// required tests to decide whether a metric should be part of the final
723725
// result set or not
724726
// it returns the final result set via the given resCh parameter
725-
func (q *TagQuery) filterIdsFromChan(idCh, resCh chan string) {
727+
func (q *TagQuery) filterIdsFromChan(idCh, resCh chan schema.MKey) {
726728
for id := range idCh {
727729
var def *idx.Archive
728730
var ok bool
@@ -772,14 +774,14 @@ func (q *TagQuery) sortByCost() {
772774
}
773775

774776
// Run executes the tag query on the given index and returns a list of ids
775-
func (q *TagQuery) Run(index TagIndex, byId map[string]*idx.Archive) IdSet {
777+
func (q *TagQuery) Run(index TagIndex, byId map[schema.MKey]*idx.Archive) IdSet {
776778
q.index = index
777779
q.byId = byId
778780

779781
q.sortByCost()
780782

781783
idCh, _ := q.getInitialIds()
782-
resCh := make(chan string)
784+
resCh := make(chan schema.MKey)
783785

784786
// start the tag query workers. they'll consume the ids on the idCh and
785787
// evaluate for each of them whether it satisfies all the conditions
@@ -836,7 +838,7 @@ func (q *TagQuery) getMaxTagCount() int {
836838
// according to the criteria associated with this query
837839
// those that pass all the tests will have their relevant tags extracted, which
838840
// are then pushed into the given tag channel
839-
func (q *TagQuery) filterTagsFromChan(idCh chan string, tagCh chan string, stopCh chan struct{}, omitTagFilters bool) {
841+
func (q *TagQuery) filterTagsFromChan(idCh chan schema.MKey, tagCh chan string, stopCh chan struct{}, omitTagFilters bool) {
840842
// used to prevent that this worker thread will push the same result into
841843
// the chan twice
842844
resultsCache := make(map[string]struct{})
@@ -953,7 +955,7 @@ func (q *TagQuery) tagFilterMatchesName() bool {
953955

954956
// RunGetTags executes the tag query and returns all the tags of the
955957
// resulting metrics
956-
func (q *TagQuery) RunGetTags(index TagIndex, byId map[string]*idx.Archive) map[string]struct{} {
958+
func (q *TagQuery) RunGetTags(index TagIndex, byId map[schema.MKey]*idx.Archive) map[string]struct{} {
957959
q.index = index
958960
q.byId = byId
959961

0 commit comments

Comments
 (0)