diff --git a/promql/functions.go b/promql/functions.go index 8a3ca23e67f..965e47e261d 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -335,6 +335,57 @@ func extendedRate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel return append(enh.Out, Sample{F: resultValue}), nil } +// yIncrease is a utility function for yincrease/yrate/ydelta. +// It calculates the increase of the range (allowing for counter resets if isCounter is true), +// taking into account the sample at the end of the previous range (just before rangeStartMsec). +// It returns the result across the range [rangeStartMsec, rangeEndMsec). +// It always extends the preceding sample's value until the next sample, including the +// unwritten origin sample value at the start of every time series. +// +// It is a linear function, meaning that for adjacent periods p0 and p1 +// ("adjacent" means p0's rangeEndMsec == p1's rangeStartMsec): +// +// yIncrease(p0) + yIncrease(p1) == yIncrease(p0 + p1) +func yIncrease(points []FPoint, rangeStartMsec, rangeEndMsec int64, isCounter bool) float64 { + var lastBeforeRange, lastInRange, inRangeRestartSkew float64 + + if !isCounter && len(points) > 0 { + lastBeforeRange = points[0].F // Gauges don't start at 0. + } + + // The points are in time order, so we can just walk the list once and remember the last values + // seen "before" and "in" range. If there are no values in range, we use the last value before range + // so that the increase is 0. + for i := 0; i < len(points) && points[i].T < rangeEndMsec; i++ { // Only consider points in [rangeStartMsec, rangeEndMsec). + if points[i].T >= rangeStartMsec { + if isCounter && points[i].F < lastInRange { // Counter reset (process restart). + inRangeRestartSkew += lastInRange + } + } else { + lastBeforeRange = points[i].F + } + lastInRange = points[i].F + } + + return lastInRange - lastBeforeRange + inRangeRestartSkew +} + +// rangeFromSelectors extracts points, rangeStartMsec, rangeEndMsec, and rangeSeconds +// from the common (Matrix, MatrixSelector) arguments supplied to yincrease/yrate/ydelta. +// The range is [rangeStartMsec, rangeEndMsec). That is, every sample in range has the property: +// rangeStartMsec <= sample.T < rangeEndMsec. +func rangeFromSelectors(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) ([]FPoint, int64, int64, float64) { + ms := args[0].(*parser.MatrixSelector) + vs := ms.VectorSelector.(*parser.VectorSelector) + + rangeStartMsec := enh.Ts - durationMilliseconds(ms.Range+vs.Offset) + rangeEndMsec := enh.Ts - durationMilliseconds(vs.Offset) + + points := vals[0].(Matrix)[0].Floats + + return points, rangeStartMsec, rangeEndMsec, ms.Range.Seconds() +} + // === delta(Matrix parser.ValueTypeMatrix) (Vector, Annotations) === func funcDelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return extrapolatedRate(vals, args, enh, false, false) @@ -365,6 +416,27 @@ func funcXincrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHe return extendedRate(vals, args, enh, true, false) } +// === ydelta(node parser.ValueTypeMatrix) (Vector, Annotations) === +func funcYdelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + points, rangeStartMsec, rangeEndMsec, _ := rangeFromSelectors(vals, args, enh) + value := yIncrease(points, rangeStartMsec, rangeEndMsec, false) + return append(enh.Out, Sample{F: value}), nil +} + +// === yincrease(node parser.ValueTypeMatrix) (Vector, Annotations) === +func funcYincrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + points, rangeStartMsec, rangeEndMsec, _ := rangeFromSelectors(vals, args, enh) + value := yIncrease(points, rangeStartMsec, rangeEndMsec, true) + return append(enh.Out, Sample{F: value}), nil +} + +// === yrate(node parser.ValueTypeMatrix) (Vector, Annotations) === +func funcYrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + points, rangeStartMsec, rangeEndMsec, rangeSeconds := rangeFromSelectors(vals, args, enh) + value := yIncrease(points, rangeStartMsec, rangeEndMsec, true) / rangeSeconds + return append(enh.Out, Sample{F: value}), nil +} + // === irate(node parser.ValueTypeMatrix) (Vector, Annotations) === func funcIrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return instantValue(vals, enh.Out, true) @@ -1802,6 +1874,9 @@ var FunctionCalls = map[string]FunctionCall{ "xdelta": funcXdelta, "xincrease": funcXincrease, "xrate": funcXrate, + "ydelta": funcYdelta, + "yincrease": funcYincrease, + "yrate": funcYrate, "year": funcYear, } @@ -1821,10 +1896,20 @@ var AtModifierUnsafeFunctions = map[string]struct{}{ } func init() { - // REPLACE_RATE_FUNCS replaces the default rate extrapolation functions - // with xrate functions. This allows for a drop-in replacement and Grafana - // auto-completion, Prometheus tooling, Thanos, etc. should still work as expected. - if os.Getenv("REPLACE_RATE_FUNCS") == "1" { + // REPLACE_RATE_FUNCS lets operators swap the built-in rate extrapolation + // functions with Invoca's xrate or yrate family at process start, so + // Grafana auto-completion, Prometheus tooling, Thanos, etc. continue to + // work against queries that call the standard rate/increase/delta names. + // + // Values: + // "1" - replace rate/increase/delta with xrate/xincrease/xdelta + // AND remove the x* names (legacy behaviour). + // "x", "X" - point rate/increase/delta at xrate/xincrease/xdelta but + // keep the x* names available. + // "2", - point rate/increase/delta at yrate/yincrease/ydelta but + // "y", "Y" keep the y* names available. + switch os.Getenv("REPLACE_RATE_FUNCS") { + case "1": FunctionCalls["delta"] = FunctionCalls["xdelta"] FunctionCalls["increase"] = FunctionCalls["xincrease"] FunctionCalls["rate"] = FunctionCalls["xrate"] @@ -1842,7 +1927,39 @@ func init() { delete(parser.Functions, "xincrease") delete(parser.Functions, "xrate") fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).") - } + + case "x", "X": + repointParserFunctions("delta", "xdelta") + repointParserFunctions("increase", "xincrease") + repointParserFunctions("rate", "xrate") + repointFunction("delta", "xdelta") + repointFunction("increase", "xincrease") + repointFunction("rate", "xrate") + fmt.Println("Successfully replaced rate/increase/delta with xrate/xincrease/xdelta (and left the x* names available as well).") + + case "2", "y", "Y": + repointParserFunctions("delta", "ydelta") + repointParserFunctions("increase", "yincrease") + repointParserFunctions("rate", "yrate") + repointFunction("delta", "ydelta") + repointFunction("increase", "yincrease") + repointFunction("rate", "yrate") + fmt.Println("Successfully replaced rate/increase/delta with yrate/yincrease/ydelta (and left the y* names available as well).") + } +} + +// repointParserFunctions makes the parser entry for name resolve to the +// entry currently registered under newName (e.g. "rate" -> "xrate"). +// It leaves the newName entry in place. +func repointParserFunctions(name, newName string) { + parser.Functions[name] = parser.Functions[newName] +} + +// repointFunction makes the FunctionCalls entry for name dispatch to the +// implementation currently registered under newName. The newName entry +// is left in place. +func repointFunction(name, newName string) { + FunctionCalls[name] = FunctionCalls[newName] } type vectorByValueHeap Vector diff --git a/promql/parser/functions.go b/promql/parser/functions.go index bf570ed5207..b889c0d1f40 100644 --- a/promql/parser/functions.go +++ b/promql/parser/functions.go @@ -443,6 +443,24 @@ var Functions = map[string]*Function{ ReturnType: ValueTypeVector, ExtRange: true, }, + "ydelta": { + Name: "ydelta", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + ExtRange: true, + }, + "yincrease": { + Name: "yincrease", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + ExtRange: true, + }, + "yrate": { + Name: "yrate", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + ExtRange: true, + }, "year": { Name: "year", ArgTypes: []ValueType{ValueTypeVector}, diff --git a/promql/promqltest/testdata/functions.test b/promql/promqltest/testdata/functions.test index b793fef3a0c..cfe32a14526 100644 --- a/promql/promqltest/testdata/functions.test +++ b/promql/promqltest/testdata/functions.test @@ -18,6 +18,10 @@ eval instant at 25s xrate(http_requests[50s]) {path="/foo"} .02 {path="/bar"} .1 +eval instant at 25s yrate(http_requests[50s]) + {path="/foo"} 0.04 + {path="/bar"} 0.1 + # 2. Eval 1 second earlier compared to (1). # * path="/foo" rate should be same or fractionally higher ("shorter" sample, same actual increase); # * path="/bar" rate should be same or fractionally lower (80% the increase, 80/96% range covered by sample). @@ -30,6 +34,10 @@ eval instant at 24s xrate(http_requests[50s]) {path="/foo"} .02 {path="/bar"} .08 +eval instant at 24s yrate(http_requests[50s]) + {path="/foo"} 0.04 + {path="/bar"} 0.1 + # 3. Eval 1 second later compared to (1). # * path="/foo" rate should be same or fractionally lower ("longer" sample, same actual increase). # * path="/bar" rate should be same or fractionally lower ("longer" sample, same actual increase). @@ -42,6 +50,10 @@ eval instant at 26s xrate(http_requests[50s]) {path="/foo"} .02 {path="/bar"} .1 +eval instant at 26s yrate(http_requests[50s]) + {path="/foo"} 0.04 + {path="/bar"} 0.12 + # # Timeseries starts before range, ends within range. @@ -56,6 +68,10 @@ eval instant at 75s xrate(http_requests[50s]) {path="/foo"} .02 {path="/bar"} .1 +eval instant at 75s yrate(http_requests[50s]) + {path="/foo"} 0.06 + {path="/bar"} 0.22 + # 5. Eval 1s earlier compared to (4). # * path="/foo" rate should be same or fractionally lower ("longer" sample, same actual increase). # * path="/bar" rate should be same or fractionally lower ("longer" sample, same actual increase). @@ -69,6 +85,10 @@ eval instant at 74s xrate(http_requests[50s]) {path="/foo"} .02 {path="/bar"} .12 +eval instant at 74s yrate(http_requests[50s]) + {path="/foo"} 0.02 + {path="/bar"} 0.12 + # 6. Eval 1s later compared to (4). Rate/increase (should be) fractionally smaller. # * path="/foo" rate should be same or fractionally higher ("shorter" sample, same actual increase); # * path="/bar" rate should be same or fractionally lower (80% the increase, 80/96% range covered by sample). @@ -81,6 +101,10 @@ eval instant at 76s xrate(http_requests[50s]) {path="/foo"} .02 {path="/bar"} .1 +eval instant at 76s yrate(http_requests[50s]) + {path="/foo"} 0.02 + {path="/bar"} 0.1 + # # Evaluation of 10 second rate every 10 seconds, not aligned with collection. # @@ -236,58 +260,140 @@ eval instant at 50m increase(http_requests[100m]) clear -# Tests for xincrease()/xrate(). +# Tests for increase()/xincrease()/yincrease()/xrate()/yrate(). +# +# The counters start at 1000/2000 so yincrease/yrate (which treat every +# pre-origin value as 0) return wildly different results from the +# xrate / rate family (which only consider deltas inside the range). +# +# Eval times are 49s/48s rather than 50s/47s so that sample timestamps +# land strictly inside the range [start, end) rather than at its +# boundaries; this keeps the pre-range sample accessible to matrixIterSlice +# and makes yincrease's "counter-at-rangeStart" value observable. load 5s - http_requests{path="/foo"} 0+10x10 - http_requests{path="/bar"} 0+10x5 0+10x4 + http_requests{path="/foo"} 1000+10x10 + http_requests{path="/bar"} 2000+10x5 5+10x4 -# Tests for xincrease(). -eval instant at 50s xincrease(http_requests[50s]) +# Tests for increase() (standard Prometheus, for reference). +eval instant at 49s increase(http_requests[50s]) {path="/foo"} 100 - {path="/bar"} 90 + {path="/bar"} 94.44444444444444 + +eval instant at 49s increase(http_requests[100s]) + {path="/foo"} 103 + {path="/bar"} 97.27777777777777 + +# Tests for xincrease(). +eval instant at 49s xincrease(http_requests[50s]) + {path="/foo"} 90 + {path="/bar"} 85 + +eval instant at 49s xincrease(http_requests[100s]) + {path="/foo"} 90 + {path="/bar"} 85 -eval instant at 50s xincrease(http_requests[5s]) +eval instant at 49s xincrease(http_requests[5s]) {path="/foo"} 10 {path="/bar"} 10 -eval instant at 50s xincrease(http_requests[3s]) - {path="/foo"} 6 - {path="/bar"} 6 - eval instant at 49s xincrease(http_requests[3s]) +eval instant at 48s xincrease(http_requests[3s]) + +# Tests for yincrease(). yrate always compares to a pre-origin of 0, +# so yincrease sees the full 1000/2000 offset in the first range. +eval instant at 49s yincrease(http_requests[50s]) + {path="/foo"} 1090 + {path="/bar"} 2085 + +eval instant at 49s yincrease(http_requests[100s]) + {path="/foo"} 1090 + {path="/bar"} 2085 + +eval instant at 49s yincrease(http_requests[5s]) + {path="/foo"} 10 + {path="/bar"} 10 + +eval instant at 49s yincrease(http_requests[3s]) + {path="/foo"} 0 + {path="/bar"} 0 + # Tests for xrate(). -eval instant at 50s xrate(http_requests[50s]) - {path="/foo"} 2 - {path="/bar"} 1.8 +eval instant at 49s xrate(http_requests[50s]) + {path="/foo"} 1.8 + {path="/bar"} 1.7 -eval instant at 50s xrate(http_requests[100s]) - {path="/foo"} 1 - {path="/bar"} 0.9 +eval instant at 49s xrate(http_requests[100s]) + {path="/foo"} 0.9 + {path="/bar"} 0.85 -eval instant at 50s xrate(http_requests[5s]) +eval instant at 49s xrate(http_requests[5s]) {path="/foo"} 2 {path="/bar"} 2 -eval instant at 50s xrate(http_requests[3s]) +eval instant at 49s xrate(http_requests[3s]) + +eval instant at 48s xrate(http_requests[3s]) + +# Tests for yrate(). +eval instant at 49s yrate(http_requests[50s]) + {path="/foo"} 21.8 + {path="/bar"} 41.7 + +eval instant at 49s yrate(http_requests[100s]) + {path="/foo"} 10.9 + {path="/bar"} 20.85 + +eval instant at 49s yrate(http_requests[5s]) {path="/foo"} 2 {path="/bar"} 2 -eval instant at 49s xrate(http_requests[3s]) +eval instant at 49s yrate(http_requests[3s]) + {path="/foo"} 0 + {path="/bar"} 0 clear -# Test for increase()/xincrease with counter reset. +# Test for increase()/xincrease()/yincrease() with counter reset. # When the counter is reset, it always starts at 0. -# So the sequence 3 2 (decreasing counter = reset) is interpreted the same as 3 0 1 2. -# Prometheus assumes it missed the intermediate values 0 and 1. +# So the sequence 1006 4 (decreasing counter = reset) is interpreted the +# same as 1006 0 1 2 3 4. Prometheus assumes it missed the intermediate +# values 0, 1, 2, 3. load 5m - http_requests{path="/foo"} 0 1 2 3 2 3 4 + http_requests{path="/foo"} 1000 1001 1003 1006 4 9 16 + +eval instant at 29m increase(http_requests[30m]) + {path="/foo"} 18 + +eval instant at 29m xincrease(http_requests[30m]) + {path="/foo"} 15 + +eval instant at 29m yincrease(http_requests[30m]) + {path="/foo"} 1015 + +# Test counter reset inside the range, not spanning the range boundary. +eval instant at 19m xincrease(http_requests[5m]) + {path="/foo"} 3 + +eval instant at 19m yincrease(http_requests[5m]) + {path="/foo"} 3 + +eval instant at 19m xincrease(http_requests[10m]) + {path="/foo"} 5 + +eval instant at 19m yincrease(http_requests[10m]) + {path="/foo"} 5 + +eval instant at 24m xincrease(http_requests[5m]) + {path="/foo"} 4 -eval instant at 30m increase(http_requests[30m]) +eval instant at 24m yincrease(http_requests[5m]) + {path="/foo"} 4 + +eval instant at 24m xincrease(http_requests[10m]) {path="/foo"} 7 -eval instant at 30m xincrease(http_requests[30m]) +eval instant at 24m yincrease(http_requests[10m]) {path="/foo"} 7 clear @@ -391,6 +497,37 @@ eval instant at 20m xdelta(http_requests[1m]) clear +# Tests for ydelta(). +# ydelta extends the value of the sample preceding rangeStart across +# every gap, so the answer is simply last-in-range minus +# last-before-range (no counter-reset correction: ydelta does not treat +# the series as a counter). +load 5m + http_requests{path="/foo"} 1 2 3 4 5 6 7 + http_requests{path="/bar"} 11 9 7 5 3 1 0 + +eval instant at 29m delta(http_requests[30m]) + {path="/foo"} 6 + {path="/bar"} -12 + +eval instant at 29m xdelta(http_requests[30m]) + {path="/foo"} 5 + {path="/bar"} -10 + +eval instant at 29m ydelta(http_requests[30m]) + {path="/foo"} 5 + {path="/bar"} -10 + +eval instant at 29m ydelta(http_requests[25m]) + {path="/foo"} 5 + {path="/bar"} -10 + +eval instant at 29m ydelta(http_requests[5m]) + {path="/foo"} 1 + {path="/bar"} -2 + +clear + # Tests for idelta(). load 5m http_requests{path="/foo"} 0 50 100 150