Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 122 additions & 5 deletions promql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,67 @@
// the sampled range to the requested range.
if points[firstPoint].T <= rangeStart && durationToEnd < averageInterval {
adjustToRange := float64(durationMilliseconds(ms.Range))
resultValue = resultValue * (adjustToRange / sampledRange)

Check failure on line 328 in promql/functions.go

View workflow job for this annotation

GitHub Actions / golangci-lint

assignOp: replace `resultValue = resultValue * (adjustToRange / sampledRange)` with `resultValue *= (adjustToRange / sampledRange)` (gocritic)

Check failure on line 328 in promql/functions.go

View workflow job for this annotation

GitHub Actions / golangci-lint

assignOp: replace `resultValue = resultValue * (adjustToRange / sampledRange)` with `resultValue *= (adjustToRange / sampledRange)` (gocritic)

Check failure on line 328 in promql/functions.go

View workflow job for this annotation

GitHub Actions / golangci-lint

assignOp: replace `resultValue = resultValue * (adjustToRange / sampledRange)` with `resultValue *= (adjustToRange / sampledRange)` (gocritic)

Check failure on line 328 in promql/functions.go

View workflow job for this annotation

GitHub Actions / golangci-lint

assignOp: replace `resultValue = resultValue * (adjustToRange / sampledRange)` with `resultValue *= (adjustToRange / sampledRange)` (gocritic)
}

if isRate {
resultValue = resultValue / ms.Range.Seconds()

Check failure on line 332 in promql/functions.go

View workflow job for this annotation

GitHub Actions / golangci-lint

assignOp: replace `resultValue = resultValue / ms.Range.Seconds()` with `resultValue /= ms.Range.Seconds()` (gocritic)

Check failure on line 332 in promql/functions.go

View workflow job for this annotation

GitHub Actions / golangci-lint

assignOp: replace `resultValue = resultValue / ms.Range.Seconds()` with `resultValue /= ms.Range.Seconds()` (gocritic)

Check failure on line 332 in promql/functions.go

View workflow job for this annotation

GitHub Actions / golangci-lint

assignOp: replace `resultValue = resultValue / ms.Range.Seconds()` with `resultValue /= ms.Range.Seconds()` (gocritic)

Check failure on line 332 in promql/functions.go

View workflow job for this annotation

GitHub Actions / golangci-lint

assignOp: replace `resultValue = resultValue / ms.Range.Seconds()` with `resultValue /= ms.Range.Seconds()` (gocritic)
}

return append(enh.Out, Sample{F: resultValue}), nil
}

// yIncrease is a utility function for yincrease/yrate/ydelta.
// It calculates the increase of the range (allowing for counter resets if isCounter is true),
// taking into account the sample at the end of the previous range (just before rangeStartMsec).
// It returns the result across the range [rangeStartMsec, rangeEndMsec).
// It always extends the preceding sample's value until the next sample, including the
// unwritten origin sample value at the start of every time series.
//
// It is a linear function, meaning that for adjacent periods p0 and p1
// ("adjacent" means p0's rangeEndMsec == p1's rangeStartMsec):
//
// yIncrease(p0) + yIncrease(p1) == yIncrease(p0 + p1)
func yIncrease(points []FPoint, rangeStartMsec, rangeEndMsec int64, isCounter bool) float64 {
var lastBeforeRange, lastInRange, inRangeRestartSkew float64

if !isCounter && len(points) > 0 {
lastBeforeRange = points[0].F // Gauges don't start at 0.
}

// The points are in time order, so we can just walk the list once and remember the last values
// seen "before" and "in" range. If there are no values in range, we use the last value before range
// so that the increase is 0.
for i := 0; i < len(points) && points[i].T < rangeEndMsec; i++ { // Only consider points in [rangeStartMsec, rangeEndMsec).
if points[i].T >= rangeStartMsec {
if isCounter && points[i].F < lastInRange { // Counter reset (process restart).
inRangeRestartSkew += lastInRange
}
} else {
lastBeforeRange = points[i].F
}
lastInRange = points[i].F
}

return lastInRange - lastBeforeRange + inRangeRestartSkew
}

// rangeFromSelectors extracts points, rangeStartMsec, rangeEndMsec, and rangeSeconds
// from the common (Matrix, MatrixSelector) arguments supplied to yincrease/yrate/ydelta.
// The range is [rangeStartMsec, rangeEndMsec). That is, every sample in range has the property:
// rangeStartMsec <= sample.T < rangeEndMsec.
func rangeFromSelectors(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) ([]FPoint, int64, int64, float64) {
ms := args[0].(*parser.MatrixSelector)
vs := ms.VectorSelector.(*parser.VectorSelector)

rangeStartMsec := enh.Ts - durationMilliseconds(ms.Range+vs.Offset)
rangeEndMsec := enh.Ts - durationMilliseconds(vs.Offset)

points := vals[0].(Matrix)[0].Floats

return points, rangeStartMsec, rangeEndMsec, ms.Range.Seconds()
}

// === delta(Matrix parser.ValueTypeMatrix) (Vector, Annotations) ===
func funcDelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return extrapolatedRate(vals, args, enh, false, false)
Expand Down Expand Up @@ -365,6 +416,27 @@
return extendedRate(vals, args, enh, true, false)
}

// === ydelta(node parser.ValueTypeMatrix) (Vector, Annotations) ===
func funcYdelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
points, rangeStartMsec, rangeEndMsec, _ := rangeFromSelectors(vals, args, enh)
value := yIncrease(points, rangeStartMsec, rangeEndMsec, false)
return append(enh.Out, Sample{F: value}), nil
}

// === yincrease(node parser.ValueTypeMatrix) (Vector, Annotations) ===
func funcYincrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
points, rangeStartMsec, rangeEndMsec, _ := rangeFromSelectors(vals, args, enh)
value := yIncrease(points, rangeStartMsec, rangeEndMsec, true)
return append(enh.Out, Sample{F: value}), nil
}

// === yrate(node parser.ValueTypeMatrix) (Vector, Annotations) ===
func funcYrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
points, rangeStartMsec, rangeEndMsec, rangeSeconds := rangeFromSelectors(vals, args, enh)
value := yIncrease(points, rangeStartMsec, rangeEndMsec, true) / rangeSeconds
return append(enh.Out, Sample{F: value}), nil
}

// === irate(node parser.ValueTypeMatrix) (Vector, Annotations) ===
func funcIrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return instantValue(vals, enh.Out, true)
Expand Down Expand Up @@ -1802,6 +1874,9 @@
"xdelta": funcXdelta,
"xincrease": funcXincrease,
"xrate": funcXrate,
"ydelta": funcYdelta,
"yincrease": funcYincrease,
"yrate": funcYrate,
"year": funcYear,
}

Expand All @@ -1821,10 +1896,20 @@
}

func init() {
// REPLACE_RATE_FUNCS replaces the default rate extrapolation functions
// with xrate functions. This allows for a drop-in replacement and Grafana
// auto-completion, Prometheus tooling, Thanos, etc. should still work as expected.
if os.Getenv("REPLACE_RATE_FUNCS") == "1" {
// REPLACE_RATE_FUNCS lets operators swap the built-in rate extrapolation
// functions with Invoca's xrate or yrate family at process start, so
// Grafana auto-completion, Prometheus tooling, Thanos, etc. continue to
// work against queries that call the standard rate/increase/delta names.
//
// Values:
// "1" - replace rate/increase/delta with xrate/xincrease/xdelta
// AND remove the x* names (legacy behaviour).
// "x", "X" - point rate/increase/delta at xrate/xincrease/xdelta but
// keep the x* names available.
// "2", - point rate/increase/delta at yrate/yincrease/ydelta but
// "y", "Y" keep the y* names available.
switch os.Getenv("REPLACE_RATE_FUNCS") {
case "1":
FunctionCalls["delta"] = FunctionCalls["xdelta"]
FunctionCalls["increase"] = FunctionCalls["xincrease"]
FunctionCalls["rate"] = FunctionCalls["xrate"]
Expand All @@ -1842,7 +1927,39 @@
delete(parser.Functions, "xincrease")
delete(parser.Functions, "xrate")
fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).")
}

case "x", "X":
repointParserFunctions("delta", "xdelta")
repointParserFunctions("increase", "xincrease")
repointParserFunctions("rate", "xrate")
repointFunction("delta", "xdelta")
repointFunction("increase", "xincrease")
repointFunction("rate", "xrate")
fmt.Println("Successfully replaced rate/increase/delta with xrate/xincrease/xdelta (and left the x* names available as well).")

case "2", "y", "Y":
repointParserFunctions("delta", "ydelta")
repointParserFunctions("increase", "yincrease")
repointParserFunctions("rate", "yrate")
repointFunction("delta", "ydelta")
repointFunction("increase", "yincrease")
repointFunction("rate", "yrate")
fmt.Println("Successfully replaced rate/increase/delta with yrate/yincrease/ydelta (and left the y* names available as well).")
}
}

// repointParserFunctions makes the parser entry for name resolve to the
// entry currently registered under newName (e.g. "rate" -> "xrate").
// It leaves the newName entry in place.
func repointParserFunctions(name, newName string) {
parser.Functions[name] = parser.Functions[newName]
}

// repointFunction makes the FunctionCalls entry for name dispatch to the
// implementation currently registered under newName. The newName entry
// is left in place.
func repointFunction(name, newName string) {
FunctionCalls[name] = FunctionCalls[newName]
}

type vectorByValueHeap Vector
Expand Down
18 changes: 18 additions & 0 deletions promql/parser/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,24 @@ var Functions = map[string]*Function{
ReturnType: ValueTypeVector,
ExtRange: true,
},
"ydelta": {
Name: "ydelta",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
ExtRange: true,
},
"yincrease": {
Name: "yincrease",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
ExtRange: true,
},
"yrate": {
Name: "yrate",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
ExtRange: true,
},
"year": {
Name: "year",
ArgTypes: []ValueType{ValueTypeVector},
Expand Down
Loading
Loading