Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363
Expand Down
37 changes: 35 additions & 2 deletions pkg/cortexpb/timeseriesv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@ package cortexpb

import (
"sync"

"go.uber.org/atomic"
)

var dynamicSymbolsCapacity atomic.Int64

func init() {
dynamicSymbolsCapacity.Store(int64(initialSymbolsCapacity))
}

var (
expectedSymbols = 20
initialSymbolsCapacity = 128
maxSymbolsCapacity = int64(8192)

slicePoolV2 = sync.Pool{
New: func() any {
Expand All @@ -29,7 +38,7 @@ var (
New: func() any {
return &PreallocWriteRequestV2{
WriteRequestV2: WriteRequestV2{
Symbols: make([]string, 0, expectedSymbols),
Symbols: make([]string, 0, dynamicSymbolsCapacity.Load()),
},
}
},
Expand Down Expand Up @@ -78,6 +87,30 @@ func ReuseWriteRequestV2(req *PreallocWriteRequestV2) {
}
req.Source = 0

// If the underlying array has grown beyond our acceptable maximum capacity,
// we discard this object instead of putting it back into the pool to let GC
// reclaim it.
symbolsCap := int64(cap(req.Symbols))
if symbolsCap > maxSymbolsCapacity {
if req.Timeseries != nil {
ReuseSliceV2(req.Timeseries)
req.Timeseries = nil
}
return
}

// Update the dynamic symbol capacity.
for {
current := dynamicSymbolsCapacity.Load()
if symbolsCap <= current {
// break when other goroutines have already updated the capacity to a larger value
break
}
if dynamicSymbolsCapacity.CompareAndSwap(current, symbolsCap) {
break
}
}

for i := range req.Symbols {
req.Symbols[i] = ""
}
Expand Down
74 changes: 51 additions & 23 deletions pkg/cortexpb/timeseriesv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,35 +59,63 @@ func TestTimeseriesV2FromPool(t *testing.T) {
}

func TestReuseWriteRequestV2(t *testing.T) {
req := PreallocWriteRequestV2FromPool()
t.Run("resets fields to default and cleans backing array", func(t *testing.T) {
req := PreallocWriteRequestV2FromPool()

// Populate req with some data.
req.Source = RULE
req.Symbols = append(req.Symbols, "", "__name__", "test")

tsSlice := PreallocTimeseriesV2SliceFromPool()
tsSlice = append(tsSlice, PreallocTimeseriesV2{TimeSeriesV2: TimeseriesV2FromPool()})
req.Timeseries = tsSlice

// Capture backing array before reuse
symbolsBackingArray := req.Symbols[:cap(req.Symbols)]
require.Equal(t, "__name__", symbolsBackingArray[1])
require.Equal(t, "test", symbolsBackingArray[2])

// Put the request back into the pool
ReuseWriteRequestV2(req)

// Verify clearing directly on the backing array
for i, s := range symbolsBackingArray[:3] {
assert.Equalf(t, "", s, "symbol at index %d not cleared", i)
}

// Source is reset to default
assert.Equal(t, API, req.Source)
// The symbol length is properly reset to 0.
assert.Len(t, req.Symbols, 0)
// Timeseries slice is nil
assert.Nil(t, req.Timeseries)
})
t.Run("updates dynamic capacity", func(t *testing.T) {
currentCap := dynamicSymbolsCapacity.Load()
newCap := int(currentCap) + 100 // Increase capacity

// Populate req with some data.
req.Source = RULE
req.Symbols = append(req.Symbols, "", "__name__", "test")
req := PreallocWriteRequestV2FromPool()
req.Symbols = make([]string, newCap)
req.Timeseries = PreallocTimeseriesV2SliceFromPool()

tsSlice := PreallocTimeseriesV2SliceFromPool()
tsSlice = append(tsSlice, PreallocTimeseriesV2{TimeSeriesV2: TimeseriesV2FromPool()})
req.Timeseries = tsSlice
ReuseWriteRequestV2(req)

// Capture backing array before reuse
symbolsBackingArray := req.Symbols[:cap(req.Symbols)]
require.Equal(t, "__name__", symbolsBackingArray[1])
require.Equal(t, "test", symbolsBackingArray[2])
// Verify that the dynamic capacity has scaled up
assert.Equal(t, int64(newCap), dynamicSymbolsCapacity.Load())
})
t.Run("outlier capacity does not update dynamic capacity and is discarded", func(t *testing.T) {
currentCap := dynamicSymbolsCapacity.Load()
outlierCap := int(maxSymbolsCapacity) + 100 // Exceeds the max limit

// Put the request back into the pool
ReuseWriteRequestV2(req)
req := PreallocWriteRequestV2FromPool()
req.Symbols = make([]string, outlierCap)
req.Timeseries = PreallocTimeseriesV2SliceFromPool()

// Verify clearing directly on the backing array
for i, s := range symbolsBackingArray[:3] {
assert.Equalf(t, "", s, "symbol at index %d not cleared", i)
}
ReuseWriteRequestV2(req)

// Source is reset to default
assert.Equal(t, API, req.Source)
// The symbol length is properly reset to 0.
assert.Len(t, req.Symbols, 0)
// Timeseries slice is nil
assert.Nil(t, req.Timeseries)
// Verify dynamic capacity didn't increase due to out-of-bound outlier
assert.Equal(t, currentCap, dynamicSymbolsCapacity.Load())
})
}

func BenchmarkMarshallWriteRequestV2(b *testing.B) {
Expand Down
Loading