Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 8164a1e

Browse files
committed
api: update to adopt new Key types
1 parent 0150249 commit 8164a1e

File tree

5 files changed

+31
-36
lines changed

5 files changed

+31
-36
lines changed

api/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (s *Server) indexFindByTag(ctx *middleware.Context, req models.IndexFindByT
163163

164164
// IndexGet returns a msgp encoded schema.MetricDefinition
165165
func (s *Server) indexGet(ctx *middleware.Context, req models.IndexGet) {
166-
def, ok := s.MetricIndex.Get(req.Id)
166+
def, ok := s.MetricIndex.Get(req.MKey)
167167
if !ok {
168168
response.Write(ctx, response.NewError(http.StatusNotFound, "Not Found"))
169169
return

api/dataprocessor.go

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -380,16 +380,12 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema
380380
}
381381
}
382382

383-
func logLoad(typ, key string, from, to uint32) {
383+
func logLoad(typ string, key schema.AMKey, from, to uint32) {
384384
if LogLevel < 2 {
385-
log.Debug("DP load from %-6s %-20s %d - %d (%s - %s) span:%ds", typ, key, from, to, util.TS(from), util.TS(to), to-from-1)
385+
log.Debug("DP load from %-6s %20s %d - %d (%s - %s) span:%ds", typ, key, from, to, util.TS(from), util.TS(to), to-from-1)
386386
}
387387
}
388388

389-
func AggMetricKey(key, archive string, aggSpan uint32) string {
390-
return fmt.Sprintf("%s_%s_%d", key, archive, aggSpan)
391-
}
392-
393389
func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) {
394390
select {
395391
case <-ctx.Done():
@@ -474,18 +470,17 @@ func (s *Server) itersToPoints(ctx *requestContext, iters []chunk.Iter) []schema
474470
func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error) {
475471
_, span := tracing.NewSpan(ctx.ctx, s.Tracer, "getSeriesAggMetrics")
476472
defer span.Finish()
477-
metric, ok := s.MemoryStore.Get(ctx.Key)
473+
metric, ok := s.MemoryStore.Get(ctx.AMKey.MKey)
478474
if !ok {
479475
return mdata.Result{
480476
Oldest: ctx.Req.To,
481477
}, nil
482478
}
483479

480+
logLoad("memory", ctx.AMKey, ctx.From, ctx.To)
484481
if ctx.Cons != consolidation.None {
485-
logLoad("memory", ctx.AggKey, ctx.From, ctx.To)
486482
return metric.GetAggregated(ctx.Cons, ctx.Req.ArchInterval, ctx.From, ctx.To)
487483
} else {
488-
logLoad("memory", ctx.Req.Key, ctx.From, ctx.To)
489484
return metric.Get(ctx.From, ctx.To), nil
490485
}
491486
}
@@ -495,22 +490,17 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
495490
var iters []chunk.Iter
496491
var prevts uint32
497492

498-
key := ctx.Key
499-
if ctx.Cons != consolidation.None {
500-
key = ctx.AggKey
501-
}
502-
503493
_, span := tracing.NewSpan(ctx.ctx, s.Tracer, "getSeriesCachedStore")
504494
defer span.Finish()
505-
span.SetTag("key", key)
495+
span.SetTag("key", ctx.AMKey)
506496
span.SetTag("from", ctx.From)
507497
span.SetTag("until", until)
508498

509499
reqSpanBoth.ValueUint32(ctx.To - ctx.From)
510-
logLoad("cassan", ctx.Key, ctx.From, ctx.To)
500+
logLoad("cassan", ctx.AMKey, ctx.From, ctx.To)
511501

512-
log.Debug("cache: searching query key %s, from %d, until %d", key, ctx.From, until)
513-
cacheRes := s.Cache.Search(ctx.ctx, key, ctx.From, until)
502+
log.Debug("cache: searching query key %s, from %d, until %d", ctx.AMKey, ctx.From, until)
503+
cacheRes := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until)
514504
log.Debug("cache: result start %d, end %d", len(cacheRes.Start), len(cacheRes.End))
515505

516506
// check to see if the request has been canceled, if so abort now.
@@ -544,7 +534,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
544534
// the request cannot completely be served from cache, it will require cassandra involvement
545535
if !cacheRes.Complete {
546536
if cacheRes.From != cacheRes.Until {
547-
storeIterGens, err := s.BackendStore.Search(ctx.ctx, key, ctx.Req.TTL, cacheRes.From, cacheRes.Until)
537+
storeIterGens, err := s.BackendStore.Search(ctx.ctx, ctx.AMKey, ctx.Req.TTL, cacheRes.From, cacheRes.Until)
548538
if err != nil {
549539
return iters, err
550540
}
@@ -566,7 +556,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
566556
}
567557
// it's important that the itgens get added in chronological order,
568558
// currently we rely on cassandra returning results in order
569-
s.Cache.Add(key, ctx.Key, prevts, itgen)
559+
s.Cache.Add(ctx.AMKey, prevts, itgen)
570560
prevts = itgen.Ts
571561
iters = append(iters, *it)
572562
}
@@ -632,18 +622,18 @@ func mergeSeries(in []models.Series) []models.Series {
632622
return merged
633623
}
634624

625+
// requestContext is a more concrete specification to load data based on a models.Req
635626
type requestContext struct {
636627
ctx context.Context
637628

638629
// request by external user.
639630
Req *models.Req
640631

641632
// internal request needed to satisfy user request.
642-
Cons consolidation.Consolidator // to satisfy avg request from user, this would be sum or cnt
643-
From uint32 // may be different than user request, see below
644-
To uint32 // may be different than user request, see below
645-
Key string // key to query
646-
AggKey string // aggkey to query (if needed)
633+
Cons consolidation.Consolidator // to satisfy avg request from user, this would be sum or cnt
634+
From uint32 // may be different than user request, see below
635+
To uint32 // may be different than user request, see below
636+
AMKey schema.AMKey // set by combining Req's key, consolidator and archive info
647637
}
648638

649639
func prevBoundary(ts uint32, span uint32) uint32 {
@@ -656,7 +646,6 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
656646
ctx: ctx,
657647
Req: req,
658648
Cons: consolidator,
659-
Key: req.Key,
660649
}
661650

662651
// while aggregated archives are quantized, raw intervals are not. quantizing happens after fetching the data,
@@ -673,10 +662,11 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
673662
if consolidator == consolidation.None {
674663
rc.From = prevBoundary(req.From, req.ArchInterval) + 1
675664
rc.To = prevBoundary(req.To, req.ArchInterval) + 1
665+
rc.AMKey = schema.AMKey{MKey: req.MKey}
676666
} else {
677667
rc.From = req.From
678668
rc.To = req.To
679-
rc.AggKey = AggMetricKey(req.Key, consolidator.Archive(), req.ArchInterval)
669+
rc.AMKey = schema.GetAMKey(req.MKey, consolidator.Archive(), req.ArchInterval)
680670
}
681671

682672
return &rc

api/graphite.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
macaron "gopkg.in/macaron.v1"
14+
schema "gopkg.in/raintank/schema.v1"
1415

1516
"github.com/grafana/metrictank/api/middleware"
1617
"github.com/grafana/metrictank/api/models"
@@ -445,7 +446,7 @@ func (s *Server) metricsIndex(ctx *middleware.Context) {
445446
}()
446447

447448
series := make([]idx.Archive, 0)
448-
seenDefs := make(map[string]struct{})
449+
seenDefs := make(map[schema.MKey]struct{})
449450
for resp := range responses {
450451
if resp.err != nil {
451452
response.Write(ctx, response.WrapError(err))

api/models/node.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package models
33
import (
44
"github.com/grafana/metrictank/cluster"
55
opentracing "github.com/opentracing/opentracing-go"
6+
schema "gopkg.in/raintank/schema.v1"
67
)
78

89
type NodeStatus struct {
@@ -136,7 +137,7 @@ func (i IndexTagDelSeries) TraceDebug(span opentracing.Span) {
136137
}
137138

138139
type IndexGet struct {
139-
Id string `json:"id" form:"id" binding:"Required"`
140+
MKey schema.MKey `json:"id" form:"id" binding:"Required"`
140141
}
141142

142143
type IndexFind struct {

api/models/request.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,19 @@ package models
33
import (
44
"fmt"
55

6+
schema "gopkg.in/raintank/schema.v1"
7+
68
"github.com/grafana/metrictank/cluster"
79
"github.com/grafana/metrictank/consolidation"
810
"github.com/grafana/metrictank/util"
911
opentracing "github.com/opentracing/opentracing-go"
1012
"github.com/opentracing/opentracing-go/log"
1113
)
1214

15+
// Req is a request for data by MKey and parameters such as consolidator, max points, etc
1316
type Req struct {
1417
// these fields can be set straight away:
15-
Key string `json:"key"` // metric key aka metric definition id (orgid.<hash>), often same as target for graphite-metrictank requests
18+
MKey schema.MKey `json:"key"` // metric key aka metric definition id (orgid.<hash>), often same as target for graphite-metrictank requests
1619
Target string `json:"target"` // the target we should return either to graphite or as if we're graphite. simply the graphite metric key from the index
1720
Pattern string `json:"pattern"` // the original query pattern specified by user (not wrapped by any functions). e.g. `foo.b*`. To be able to tie the result data back to the data need as requested
1821
From uint32 `json:"from"`
@@ -36,7 +39,7 @@ type Req struct {
3639
AggNum uint32 `json:"aggNum"` // how many points to consolidate together at runtime, after fetching from the archive
3740
}
3841

39-
func NewReq(key, target, patt string, from, to, maxPoints, rawInterval uint32, cons, consReq consolidation.Consolidator, node cluster.Node, schemaId, aggId uint16) Req {
42+
func NewReq(key schema.MKey, target, patt string, from, to, maxPoints, rawInterval uint32, cons, consReq consolidation.Consolidator, node cluster.Node, schemaId, aggId uint16) Req {
4043
return Req{
4144
key,
4245
target,
@@ -59,18 +62,18 @@ func NewReq(key, target, patt string, from, to, maxPoints, rawInterval uint32, c
5962
}
6063

6164
func (r Req) String() string {
62-
return fmt.Sprintf("%s %d - %d (%s - %s) span:%ds. points <= %d. %s.", r.Key, r.From, r.To, util.TS(r.From), util.TS(r.To), r.To-r.From-1, r.MaxPoints, r.Consolidator)
65+
return fmt.Sprintf("%s %d - %d (%s - %s) span:%ds. points <= %d. %s.", r.MKey, r.From, r.To, util.TS(r.From), util.TS(r.To), r.To-r.From-1, r.MaxPoints, r.Consolidator)
6366
}
6467

6568
func (r Req) DebugString() string {
6669
return fmt.Sprintf("Req key=%q target=%q pattern=%q %d - %d (%s - %s) (span %d) maxPoints=%d rawInt=%d cons=%s consReq=%d schemaId=%d aggId=%d archive=%d archInt=%d ttl=%d outInt=%d aggNum=%d",
67-
r.Key, r.Target, r.Pattern, r.From, r.To, util.TS(r.From), util.TS(r.To), r.To-r.From-1, r.MaxPoints, r.RawInterval, r.Consolidator, r.ConsReq, r.SchemaId, r.AggId, r.Archive, r.ArchInterval, r.TTL, r.OutInterval, r.AggNum)
70+
r.MKey, r.Target, r.Pattern, r.From, r.To, util.TS(r.From), util.TS(r.To), r.To-r.From-1, r.MaxPoints, r.RawInterval, r.Consolidator, r.ConsReq, r.SchemaId, r.AggId, r.Archive, r.ArchInterval, r.TTL, r.OutInterval, r.AggNum)
6871
}
6972

7073
// Trace puts all request properties as tags in a span
7174
// good for when a span deals with 1 request
7275
func (r Req) Trace(span opentracing.Span) {
73-
span.SetTag("key", r.Key)
76+
span.SetTag("key", r.MKey)
7477
span.SetTag("target", r.Target)
7578
span.SetTag("pattern", r.Pattern)
7679
span.SetTag("from", r.From)
@@ -95,7 +98,7 @@ func (r Req) Trace(span opentracing.Span) {
9598
// 1000~1500 bytes
9699
func (r Req) TraceLog(span opentracing.Span) {
97100
span.LogFields(
98-
log.String("key", r.Key),
101+
log.Object("key", r.MKey),
99102
log.String("target", r.Target),
100103
log.String("pattern", r.Pattern),
101104
log.Int("from", int(r.From)),

0 commit comments

Comments
 (0)