Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1e29795
add logging; total rewrite of extendedRate (tests don't pass)
ColinDKelley Apr 4, 2023
166b442
undo total rewrite of extendedRate (tests pass)
ColinDKelley Apr 4, 2023
43f94a3
0 counter fix on startup (commented out)
ColinDKelley Apr 10, 2023
762f5f2
add xrate0 and xincrease0
ColinDKelley Apr 10, 2023
4b0cdd1
refactor extendedRate0
ColinDKelley Apr 10, 2023
98caa6b
restore extendedRate to original+comments
ColinDKelley Apr 10, 2023
8804261
copy xrate, xincrease tests to cover xrate0, xincrease0
ColinDKelley Apr 10, 2023
f0e3a33
gofmt and lint
ColinDKelley Apr 10, 2023
5e734fa
move extendedRate up to simplify diff
ColinDKelley Apr 10, 2023
d0c6d45
add Msec suffix to locals; add extra logging
ColinDKelley Apr 11, 2023
7f1efda
fix extendedRate0 starting resultValue
ColinDKelley Apr 11, 2023
37bd4ad
drop margin % and use scrapeIntervalMsec instead
ColinDKelley Apr 11, 2023
296d345
fix bug in 0 Fix; add Msec var suffixes
ColinDKelley Apr 17, 2023
e97cd45
+1000, +2000 for first set of test data
ColinDKelley Apr 17, 2023
2c874af
add explicit presample, edge, and no presample test cases
ColinDKelley Apr 17, 2023
c8260bc
add yincrease, yrate
ColinDKelley Apr 21, 2023
fd3b4be
extract debugSampleString; add support for inRangeRestartSkew
ColinDKelley Apr 22, 2023
76f6dc2
drop xrate0 and xincrease0 along with extendedRate0
ColinDKelley Apr 22, 2023
94ccfae
drop unused inferScrapeInterval
ColinDKelley Apr 22, 2023
ef151f1
update comments block for yIncrease
ColinDKelley Apr 22, 2023
163708f
alias ydelta = yincrease for compatibility
ColinDKelley Apr 22, 2023
900af44
extend REPLACE_RATE_FUNCS=2 to replace with yrate/yincrease/ydelta
ColinDKelley Apr 22, 2023
3bdabd3
extend yIncrease with isCounter and implement ydelta using false
ColinDKelley Apr 24, 2023
6cbe071
reorder loop in yIncrease for readability
ColinDKelley Apr 25, 2023
0cbfd91
new load values at +1000, +2000
ColinDKelley May 2, 2023
9f2d038
reset; add load values at +1000, +2000
ColinDKelley May 2, 2023
0bdc656
add equivalent y* tests for all rate, xrate ones
ColinDKelley May 2, 2023
d933616
update tests for .321 msec offset scrapes when samples start with [12…
ColinDKelley May 2, 2023
af16f0b
update engine_test
ColinDKelley May 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
github.com/prometheus/exporter-toolkit v0.7.1
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.9
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.31.0
go.opentelemetry.io/otel v1.6.1
Expand Down
6 changes: 3 additions & 3 deletions promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1560,7 +1560,7 @@ load 1ms
start: 50, end: 80, interval: 10,
result: Matrix{
Series{
Points: []Point{{V: 995, T: 50000}, {V: 994, T: 60000}, {V: 993, T: 70000}, {V: 992, T: 80000}},
Points: []Point{{V: 996, T: 50000}, {V: 995, T: 60000}, {V: 994, T: 70000}, {V: 993, T: 80000}},
Metric: lblstopk3,
},
},
Expand All @@ -1578,7 +1578,7 @@ load 1ms
start: 70, end: 100, interval: 10,
result: Matrix{
Series{
Points: []Point{{V: 993, T: 70000}, {V: 992, T: 80000}, {V: 991, T: 90000}, {V: 990, T: 100000}},
Points: []Point{{V: 994, T: 70000}, {V: 993, T: 80000}, {V: 992, T: 90000}, {V: 991, T: 100000}},
Metric: lblstopk3,
},
},
Expand All @@ -1587,7 +1587,7 @@ load 1ms
start: 100, end: 130, interval: 10,
result: Matrix{
Series{
Points: []Point{{V: 990, T: 100000}, {V: 989, T: 110000}, {V: 988, T: 120000}, {V: 987, T: 130000}},
Points: []Point{{V: 991, T: 100000}, {V: 990, T: 110000}, {V: 989, T: 120000}, {V: 988, T: 130000}},
Metric: lblstopk3,
},
},
Expand Down
148 changes: 136 additions & 12 deletions promql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package promql

import (
"bytes"
"fmt"
"log"
"math"
"os"
"sort"
Expand Down Expand Up @@ -158,23 +160,19 @@ func extendedRate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel
firstPoint := 0
// If the point before the range is too far from rangeStart, drop it.
if float64(rangeStart-points[0].T) > averageInterval { //** There's 0 slop here, so we could drop the point even if it was scraped 1 msec early
//** Repeating the above check for 0 or 1 data points, with +1. I'm pretty sure these checks could be done just once.
//** Repeating the above check for 0 or 1 data points, with +1.
if len(points) < 3 {
return enh.Out
}
firstPoint = 1
sampledRange = float64(points[len(points)-1].T - points[1].T) //** repeating above code "
averageInterval = sampledRange / float64(len(points)-2) //** repeating above code "
sampledRange = float64(points[len(points)-1].T - points[1].T) //** repeating above code
averageInterval = sampledRange / float64(len(points)-2) //** repeating above code
}

counterCorrection := float64(0.0)
lastValue := float64(0.0)

if isCounter { //** isCounter means we were called from rate or increase or delta... which means you can't use those for gauges?
//** Here, we can handle the initial start from "null" (no previous data point)
//** if first point is near rangeStart, that means there was no earlier data point, which means we probably just started.
//** counterCorrection = points[firstPoint].V
//** (unless maybe the counterCorrection would be huge compared to the remaining deltas...in which case it might be a missed scrape)
if isCounter { //** called from rate or increase (not delta)
for i := firstPoint; i < len(points); i++ {
sample := points[i]
if sample.V < lastValue { //** Handle when the counter steps backwards due to process restart
Expand All @@ -193,18 +191,85 @@ func extendedRate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel
// the sampled range to the requested range.
if points[firstPoint].T <= rangeStart && durationToEnd < averageInterval {
adjustToRange := float64(durationMilliseconds(ms.Range))
resultValue = resultValue * (adjustToRange / sampledRange)
resultValue *= (adjustToRange / sampledRange)
}

if isRate {
resultValue = resultValue / ms.Range.Seconds()
resultValue /= ms.Range.Seconds()
}

return append(enh.Out, Sample{
Point: Point{V: resultValue},
})
}

const elideSamplesAfter int = 10

func debugSampleString(points []Point) string {
buffer := new(bytes.Buffer)
for i, point := range points {
if i == elideSamplesAfter && len(points)-1 > elideSamplesAfter {
fmt.Fprintf(buffer, "...")
} else if i > elideSamplesAfter && len(points)-i > elideSamplesAfter {
continue
} else {
if i > 0 {
fmt.Fprintf(buffer, ", ")
}
fmt.Fprintf(buffer, "[%.3f,%.1f]", float64(point.T)/1000.0, point.V)
}
}
return buffer.String()
}

// yIncrease is a utility function for yincrease/yrate/ydelta.
// It calculates the increase of the range (allowing for counter resets),
// taking into account the sample at the end of the previous range (just before rangeStartMsec).
// It returns the result across the range [rangeStartMsec, rangeEndMsec)
// It always extends the preceding sample's value until the next sample, including the
// unwritten origin sample value at the start of every time series.
//
// It is a linear function, meaning that for adjacent periods p0 and p1
// ("adjacent" means p0's rangeEndMsec == p1's rangeStartMsec):
// yIncrease(p0) + yIncrease(p1) == yIncrease(p0 + p1)
func yIncrease(points []Point, rangeStartMsec, rangeEndMsec int64, isCounter bool) float64 {
Copy link
Copy Markdown
Author

@ColinDKelley ColinDKelley Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this function always returns a value. If asked for a range that contains 0 or 1 samples (either because the range was set really narrow, or because of missed scrapes), it simply return 0, because that's what is implied by extending the previous sample across time. I like that because it preserves the linear property. But if we argued that empty is equivalent to 0, we could change this to return empty in those corner cases.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are 0 samples, is it the case that the time series doesn't even exist? That feels like we should probably return empty for that case.

For 1 sample, I think it makes sense to return 0.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that actually happens today because the calling code that creates the range vectors knows to never create one with no data. I noticed this at the front at least: if I backed up by 5 minutes in the requested query range, my yrate function doesn't get called for those first 5 minutes. The first time it gets called is when there's 1 sample, followed by 2, etc.

BTW I suspect the calling code will skip ranges if there happened to be consecutive missed scrapes there. So if the samples are:

2, 3, 3, 4, 5, nil, nil, 6, 7, 7

I believe the range vectors for [scrapeInterval] with a step of scrapeInterval will be

[2], [2, 3], [3, 3], [3, 4], [4, 5], [5], nil, [6], [6, 7], [7, 7]

(where nil means there's no date stored at that timestamp).

I'll confirm that this is generally true.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like it won't be needed, but I'd like to point out that an "empty" return value isn't possible from this function without changing the function signature to return a pointer. All go types are initialized to a zero value (0 in the case of numeric types).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I agree and I'm always returning a value--no exceptions. If I needed an exception, I'd add a second return value. Or cheat and return Nan. :P

log.Printf("yIncrease: range: %.3f...%.3f\n", float64(rangeStartMsec)/1000.0, float64(rangeEndMsec)/1000.0)
log.Println("yIncrease: samples: ", debugSampleString(points))

lastBeforeRange := float64(0.0) // This provides the 0 counter fix for a fresh start of a pod.
if !isCounter && len(points) > 0 {
lastBeforeRange = points[0].V // Gauges don't start at 0.
}
lastInRange := float64(0.0)

lastValue := float64(0.0)
inRangeRestartSkew := float64(0.0)

// The points are in time order, so we can just walk the list once and remember the last values
// seen "before" and "in" range. If there are no values in range, we use the last value before range.
for _, point := range points {
if point.T >= rangeEndMsec { // Only consider points in [rangeStartMsec, rangeEndMsec).
break
}
lastInRange = point.V
if point.T >= rangeStartMsec {
if isCounter &&
point.V < lastValue { // If counter went backwards, it must have been a counter reset on process restart.
inRangeRestartSkew += point.V
}
} else {
lastBeforeRange = point.V
}
lastValue = point.V
}

result := lastInRange - lastBeforeRange + inRangeRestartSkew

log.Printf("yIncrease: returning result: %.1f\n", result)

return result
}

// === delta(Matrix parser.ValueTypeMatrix) Vector ===
func funcDelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return extrapolatedRate(vals, args, enh, false, false)
Expand Down Expand Up @@ -235,6 +300,48 @@ func funcXincrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHe
return extendedRate(vals, args, enh, true, false)
}

// Extracts points, rangeStartMsec, rangeEndMsec, rangeSeconds from common params.
// Note: the range is [rangeStartMsec, rangeEndMsec). That is, every sample in range has the property:
// rangeStartMsec <= sample.T < rangeEndMsec
func rangeFromSelectors(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) ([]Point, int64, int64, float64) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wonderful... It makes yIncrease function so much easier to understand, having this extraction of arguments moved outside the function and only passing in what you need.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's what it took to get the functions clean and simple! IMO it was the only reasonable way to get the boilerplate out of the functions avoid passing arguments in like isRate.

But...I'm wondering about backing away from this refactor when I submit to the Prometheus team, in order to get a more surgical PR that isn't cluttered by refactoring.

The best answer is to submit a prefactor PR first, just like we do here at Invoca. That worked great with the listen gem, because Samuel Williams is so reasonable. But I'm not seeing that in the Prometheus team. And TBH, when there's a lot of change going on, refactors of any sort do create a headache because of merge conflicts.

ms := args[0].(*parser.MatrixSelector)
vs := ms.VectorSelector.(*parser.VectorSelector)

rangeStartMsec := enh.Ts - durationMilliseconds(ms.Range+vs.Offset)
rangeEndMsec := enh.Ts - durationMilliseconds(vs.Offset)

points := vals[0].(Matrix)[0].Points

return points, rangeStartMsec, rangeEndMsec, ms.Range.Seconds()
}

// === ydelta(node parser.ValueTypeMatrix) Vector ===
func funcYdelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
points, rangeStartMsec, rangeEndMsec, _ := rangeFromSelectors(vals, args, enh)

value := yIncrease(points, rangeStartMsec, rangeEndMsec, false)

return append(enh.Out, Sample{Point: Point{V: value}})
}

// === yincrease(node parser.ValueTypeMatrix) Vector ===
func funcYincrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
points, rangeStartMsec, rangeEndMsec, _ := rangeFromSelectors(vals, args, enh)

value := yIncrease(points, rangeStartMsec, rangeEndMsec, true)

return append(enh.Out, Sample{Point: Point{V: value}})
}

// === yrate(node parser.ValueTypeMatrix) Vector ===
func funcYrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
points, rangeStartMsec, rangeEndMsec, rangeSeconds := rangeFromSelectors(vals, args, enh)

value := yIncrease(points, rangeStartMsec, rangeEndMsec, true) / rangeSeconds

return append(enh.Out, Sample{Point: Point{V: value}})
}

// === irate(node parser.ValueTypeMatrix) Vector ===
func funcIrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return instantValue(vals, enh.Out, true)
Expand Down Expand Up @@ -1227,6 +1334,9 @@ var FunctionCalls = map[string]FunctionCall{
"xincrease": funcXincrease,
"xrate": funcXrate,
"year": funcYear,
"ydelta": funcYdelta,
"yincrease": funcYincrease,
"yrate": funcYrate,
}

// AtModifierUnsafeFunctions are the functions whose result
Expand All @@ -1246,9 +1356,10 @@ var AtModifierUnsafeFunctions = map[string]struct{}{

func init() {
// REPLACE_RATE_FUNCS replaces the default rate extrapolation functions
// with xrate functions. This allows for a drop-in replacement and Grafana
// with xrate or yrate functions. This allows for a drop-in replacement and Grafana
// auto-completion, Prometheus tooling, Thanos, etc. should still work as expected.
if os.Getenv("REPLACE_RATE_FUNCS") == "1" {
switch os.Getenv("REPLACE_RATE_FUNCS") {
case "1":
FunctionCalls["delta"] = FunctionCalls["xdelta"]
FunctionCalls["increase"] = FunctionCalls["xincrease"]
FunctionCalls["rate"] = FunctionCalls["xrate"]
Expand All @@ -1266,6 +1377,19 @@ func init() {
delete(parser.Functions, "xincrease")
delete(parser.Functions, "xrate")
fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).")
case "2":
FunctionCalls["delta"] = FunctionCalls["ydelta"]
parser.Functions["delta"] = parser.Functions["ydelta"]
parser.Functions["delta"].Name = "delta"

FunctionCalls["increase"] = FunctionCalls["yincrease"]
parser.Functions["increase"] = parser.Functions["yincrease"]
parser.Functions["increase"].Name = "increase"

FunctionCalls["rate"] = FunctionCalls["yrate"]
parser.Functions["rate"] = parser.Functions["yrate"]
parser.Functions["rate"].Name = "rate"
fmt.Println("Successfully replaced rate/increase/delta with yrate/yincrease/ydelta (and left the latter names available as well).")
Copy link
Copy Markdown
Author

@ColinDKelley ColinDKelley Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see any reason to delete the y* names when this REPLACE setting of "2" is used. In fact I like keeping those y* names, because we can then make sure to have at least one dashboard/Prometheus Alert using the y* name as a canary that will detect if we're not running our fork. (I think that would be an easy mistake to make; it's somewhat likely to happen during a k8s upgrade or refactor of invocaops_docker jsonnet.)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! An easy way to verify we're running what we think we're running!

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder about making these values a bit more meaningful (but still support the old "1" value for backward compatibility):

case "1", "x":
  ...
case "y":
  ...

}
}

Expand Down
18 changes: 18 additions & 0 deletions promql/parser/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,24 @@ var Functions = map[string]*Function{
Variadic: 1,
ReturnType: ValueTypeVector,
},
"ydelta": {
Name: "ydelta",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
ExtRange: true,
},
"yincrease": {
Name: "yincrease",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
ExtRange: true,
},
"yrate": {
Name: "yrate",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
ExtRange: true,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remind me what ExtRange: true is doing here for us?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's used over here to decide if we need to get lookbackDelta (5 minutes by default) of extra samples before the range.

BTW in the Thanos copy of all this code, they skipped that clean table-driven approach and instead just made a function that knows when they need an "extended range".

},
}

// getFunction returns a predefined Function object for the given name.
Expand Down
8 changes: 7 additions & 1 deletion promql/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,16 @@ func (cmd *loadCmd) set(m labels.Labels, vals ...parser.SequenceValue) {
// append the defined time series to the storage.
func (cmd *loadCmd) append(a storage.Appender) error {
for h, smpls := range cmd.defs {
scrapeOffsetMsec := int64(0)
if len(smpls) > 0 &&
(smpls[0].V >= 1000.0 && smpls[0].V <= 1001.0 || smpls[0].V >= 2000.0 && smpls[0].V <= 2001.0) {
scrapeOffsetMsec = 321
}

m := cmd.metrics[h]

for _, s := range smpls {
if _, err := a.Append(0, m, s.T, s.V); err != nil {
if _, err := a.Append(0, m, s.T + scrapeOffsetMsec, s.V); err != nil {
return err
}
}
Expand Down
Loading