diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ed9371c19e..7278afd544c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 +* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 * [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363 diff --git a/pkg/cortexpb/timeseriesv2.go b/pkg/cortexpb/timeseriesv2.go index c2efeeed7b9..bb739e4ed9e 100644 --- a/pkg/cortexpb/timeseriesv2.go +++ b/pkg/cortexpb/timeseriesv2.go @@ -2,10 +2,19 @@ package cortexpb import ( "sync" + + "go.uber.org/atomic" ) +var dynamicSymbolsCapacity atomic.Int64 + +func init() { + dynamicSymbolsCapacity.Store(int64(initialSymbolsCapacity)) +} + var ( - expectedSymbols = 20 + initialSymbolsCapacity = 128 + maxSymbolsCapacity = int64(8192) slicePoolV2 = sync.Pool{ New: func() any { @@ -29,7 +38,7 @@ var ( New: func() any { return &PreallocWriteRequestV2{ WriteRequestV2: WriteRequestV2{ - Symbols: make([]string, 0, expectedSymbols), + Symbols: make([]string, 0, dynamicSymbolsCapacity.Load()), }, } }, @@ -78,6 +87,30 @@ func ReuseWriteRequestV2(req *PreallocWriteRequestV2) { } req.Source = 0 + // If the underlying array has grown beyond our acceptable maximum capacity, + // we discard this object instead of putting it back into the pool to let GC + // reclaim it. + symbolsCap := int64(cap(req.Symbols)) + if symbolsCap > maxSymbolsCapacity { + if req.Timeseries != nil { + ReuseSliceV2(req.Timeseries) + req.Timeseries = nil + } + return + } + + // Update the dynamic symbol capacity. + for { + current := dynamicSymbolsCapacity.Load() + if symbolsCap <= current { + // break when other goroutines have already updated the capacity to a larger value + break + } + if dynamicSymbolsCapacity.CompareAndSwap(current, symbolsCap) { + break + } + } + for i := range req.Symbols { req.Symbols[i] = "" } diff --git a/pkg/cortexpb/timeseriesv2_test.go b/pkg/cortexpb/timeseriesv2_test.go index 50de6b25d1e..66ac45001b5 100644 --- a/pkg/cortexpb/timeseriesv2_test.go +++ b/pkg/cortexpb/timeseriesv2_test.go @@ -59,35 +59,63 @@ func TestTimeseriesV2FromPool(t *testing.T) { } func TestReuseWriteRequestV2(t *testing.T) { - req := PreallocWriteRequestV2FromPool() + t.Run("resets fields to default and cleans backing array", func(t *testing.T) { + req := PreallocWriteRequestV2FromPool() + + // Populate req with some data. + req.Source = RULE + req.Symbols = append(req.Symbols, "", "__name__", "test") + + tsSlice := PreallocTimeseriesV2SliceFromPool() + tsSlice = append(tsSlice, PreallocTimeseriesV2{TimeSeriesV2: TimeseriesV2FromPool()}) + req.Timeseries = tsSlice + + // Capture backing array before reuse + symbolsBackingArray := req.Symbols[:cap(req.Symbols)] + require.Equal(t, "__name__", symbolsBackingArray[1]) + require.Equal(t, "test", symbolsBackingArray[2]) + + // Put the request back into the pool + ReuseWriteRequestV2(req) + + // Verify clearing directly on the backing array + for i, s := range symbolsBackingArray[:3] { + assert.Equalf(t, "", s, "symbol at index %d not cleared", i) + } + + // Source is reset to default + assert.Equal(t, API, req.Source) + // The symbol length is properly reset to 0. + assert.Len(t, req.Symbols, 0) + // Timeseries slice is nil + assert.Nil(t, req.Timeseries) + }) + t.Run("updates dynamic capacity", func(t *testing.T) { + currentCap := dynamicSymbolsCapacity.Load() + newCap := int(currentCap) + 100 // Increase capacity - // Populate req with some data. - req.Source = RULE - req.Symbols = append(req.Symbols, "", "__name__", "test") + req := PreallocWriteRequestV2FromPool() + req.Symbols = make([]string, newCap) + req.Timeseries = PreallocTimeseriesV2SliceFromPool() - tsSlice := PreallocTimeseriesV2SliceFromPool() - tsSlice = append(tsSlice, PreallocTimeseriesV2{TimeSeriesV2: TimeseriesV2FromPool()}) - req.Timeseries = tsSlice + ReuseWriteRequestV2(req) - // Capture backing array before reuse - symbolsBackingArray := req.Symbols[:cap(req.Symbols)] - require.Equal(t, "__name__", symbolsBackingArray[1]) - require.Equal(t, "test", symbolsBackingArray[2]) + // Verify that the dynamic capacity has scaled up + assert.Equal(t, int64(newCap), dynamicSymbolsCapacity.Load()) + }) + t.Run("outlier capacity does not update dynamic capacity and is discarded", func(t *testing.T) { + currentCap := dynamicSymbolsCapacity.Load() + outlierCap := int(maxSymbolsCapacity) + 100 // Exceeds the max limit - // Put the request back into the pool - ReuseWriteRequestV2(req) + req := PreallocWriteRequestV2FromPool() + req.Symbols = make([]string, outlierCap) + req.Timeseries = PreallocTimeseriesV2SliceFromPool() - // Verify clearing directly on the backing array - for i, s := range symbolsBackingArray[:3] { - assert.Equalf(t, "", s, "symbol at index %d not cleared", i) - } + ReuseWriteRequestV2(req) - // Source is reset to default - assert.Equal(t, API, req.Source) - // The symbol length is properly reset to 0. - assert.Len(t, req.Symbols, 0) - // Timeseries slice is nil - assert.Nil(t, req.Timeseries) + // Verify dynamic capacity didn't increase due to out-of-bound outlier + assert.Equal(t, currentCap, dynamicSymbolsCapacity.Load()) + }) } func BenchmarkMarshallWriteRequestV2(b *testing.B) {