From 17f0d599a910830cdcdad50aa02e57d23f1f5b36 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 6 Apr 2026 19:46:24 +0900 Subject: [PATCH 1/4] Change expectedSymbols to 2048 Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + pkg/cortexpb/timeseriesv2.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ed9371c19e..5f77243dd33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [CHANGE] Distributor: Increase the default capacity of `expectedSymbols` from 20 to 2048 in the PRW2 path to reduce slice reallocations. #7397 * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 diff --git a/pkg/cortexpb/timeseriesv2.go b/pkg/cortexpb/timeseriesv2.go index c2efeeed7b9..0d727404502 100644 --- a/pkg/cortexpb/timeseriesv2.go +++ b/pkg/cortexpb/timeseriesv2.go @@ -5,7 +5,7 @@ import ( ) var ( - expectedSymbols = 20 + expectedSymbols = 2048 slicePoolV2 = sync.Pool{ New: func() any { From db25f24a405528eb523e1c0aed4bac43f7ad625e Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 7 Apr 2026 10:49:36 +0900 Subject: [PATCH 2/4] fix changelog Signed-off-by: SungJin1212 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f77243dd33..6656b050660 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog ## master / unreleased -* [CHANGE] Distributor: Increase the default capacity of `expectedSymbols` from 20 to 2048 in the PRW2 path to reduce slice reallocations. #7397 +* [CHANGE] Distributor: Increase the default capacity of `expectedSymbols` from 20 to 2048 in the PRW2 path to reduce slice reallocations. #7398 * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 From f14ccf105dcfa64a58ff7005f90195d2f583c704 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 7 Apr 2026 11:46:09 +0900 Subject: [PATCH 3/4] Add adaptive symbol capacity Signed-off-by: SungJin1212 --- pkg/cortexpb/timeseriesv2.go | 37 +++++++++++++++- pkg/cortexpb/timeseriesv2_test.go | 74 +++++++++++++++++++++---------- 2 files changed, 86 insertions(+), 25 deletions(-) diff --git a/pkg/cortexpb/timeseriesv2.go b/pkg/cortexpb/timeseriesv2.go index 0d727404502..bb739e4ed9e 100644 --- a/pkg/cortexpb/timeseriesv2.go +++ b/pkg/cortexpb/timeseriesv2.go @@ -2,10 +2,19 @@ package cortexpb import ( "sync" + + "go.uber.org/atomic" ) +var dynamicSymbolsCapacity atomic.Int64 + +func init() { + dynamicSymbolsCapacity.Store(int64(initialSymbolsCapacity)) +} + var ( - expectedSymbols = 2048 + initialSymbolsCapacity = 128 + maxSymbolsCapacity = int64(8192) slicePoolV2 = sync.Pool{ New: func() any { @@ -29,7 +38,7 @@ var ( New: func() any { return &PreallocWriteRequestV2{ WriteRequestV2: WriteRequestV2{ - Symbols: make([]string, 0, expectedSymbols), + Symbols: make([]string, 0, dynamicSymbolsCapacity.Load()), }, } }, @@ -78,6 +87,30 @@ func ReuseWriteRequestV2(req *PreallocWriteRequestV2) { } req.Source = 0 + // If the underlying array has grown beyond our acceptable maximum capacity, + // we discard this object instead of putting it back into the pool to let GC + // reclaim it. + symbolsCap := int64(cap(req.Symbols)) + if symbolsCap > maxSymbolsCapacity { + if req.Timeseries != nil { + ReuseSliceV2(req.Timeseries) + req.Timeseries = nil + } + return + } + + // Update the dynamic symbol capacity. + for { + current := dynamicSymbolsCapacity.Load() + if symbolsCap <= current { + // break when other goroutines have already updated the capacity to a larger value + break + } + if dynamicSymbolsCapacity.CompareAndSwap(current, symbolsCap) { + break + } + } + for i := range req.Symbols { req.Symbols[i] = "" } diff --git a/pkg/cortexpb/timeseriesv2_test.go b/pkg/cortexpb/timeseriesv2_test.go index 50de6b25d1e..66ac45001b5 100644 --- a/pkg/cortexpb/timeseriesv2_test.go +++ b/pkg/cortexpb/timeseriesv2_test.go @@ -59,35 +59,63 @@ func TestTimeseriesV2FromPool(t *testing.T) { } func TestReuseWriteRequestV2(t *testing.T) { - req := PreallocWriteRequestV2FromPool() + t.Run("resets fields to default and cleans backing array", func(t *testing.T) { + req := PreallocWriteRequestV2FromPool() + + // Populate req with some data. + req.Source = RULE + req.Symbols = append(req.Symbols, "", "__name__", "test") + + tsSlice := PreallocTimeseriesV2SliceFromPool() + tsSlice = append(tsSlice, PreallocTimeseriesV2{TimeSeriesV2: TimeseriesV2FromPool()}) + req.Timeseries = tsSlice + + // Capture backing array before reuse + symbolsBackingArray := req.Symbols[:cap(req.Symbols)] + require.Equal(t, "__name__", symbolsBackingArray[1]) + require.Equal(t, "test", symbolsBackingArray[2]) + + // Put the request back into the pool + ReuseWriteRequestV2(req) + + // Verify clearing directly on the backing array + for i, s := range symbolsBackingArray[:3] { + assert.Equalf(t, "", s, "symbol at index %d not cleared", i) + } + + // Source is reset to default + assert.Equal(t, API, req.Source) + // The symbol length is properly reset to 0. + assert.Len(t, req.Symbols, 0) + // Timeseries slice is nil + assert.Nil(t, req.Timeseries) + }) + t.Run("updates dynamic capacity", func(t *testing.T) { + currentCap := dynamicSymbolsCapacity.Load() + newCap := int(currentCap) + 100 // Increase capacity - // Populate req with some data. - req.Source = RULE - req.Symbols = append(req.Symbols, "", "__name__", "test") + req := PreallocWriteRequestV2FromPool() + req.Symbols = make([]string, newCap) + req.Timeseries = PreallocTimeseriesV2SliceFromPool() - tsSlice := PreallocTimeseriesV2SliceFromPool() - tsSlice = append(tsSlice, PreallocTimeseriesV2{TimeSeriesV2: TimeseriesV2FromPool()}) - req.Timeseries = tsSlice + ReuseWriteRequestV2(req) - // Capture backing array before reuse - symbolsBackingArray := req.Symbols[:cap(req.Symbols)] - require.Equal(t, "__name__", symbolsBackingArray[1]) - require.Equal(t, "test", symbolsBackingArray[2]) + // Verify that the dynamic capacity has scaled up + assert.Equal(t, int64(newCap), dynamicSymbolsCapacity.Load()) + }) + t.Run("outlier capacity does not update dynamic capacity and is discarded", func(t *testing.T) { + currentCap := dynamicSymbolsCapacity.Load() + outlierCap := int(maxSymbolsCapacity) + 100 // Exceeds the max limit - // Put the request back into the pool - ReuseWriteRequestV2(req) + req := PreallocWriteRequestV2FromPool() + req.Symbols = make([]string, outlierCap) + req.Timeseries = PreallocTimeseriesV2SliceFromPool() - // Verify clearing directly on the backing array - for i, s := range symbolsBackingArray[:3] { - assert.Equalf(t, "", s, "symbol at index %d not cleared", i) - } + ReuseWriteRequestV2(req) - // Source is reset to default - assert.Equal(t, API, req.Source) - // The symbol length is properly reset to 0. - assert.Len(t, req.Symbols, 0) - // Timeseries slice is nil - assert.Nil(t, req.Timeseries) + // Verify dynamic capacity didn't increase due to out-of-bound outlier + assert.Equal(t, currentCap, dynamicSymbolsCapacity.Load()) + }) } func BenchmarkMarshallWriteRequestV2(b *testing.B) { From b80c872f37028dcf0eb10cb0fa8742134336a770 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 7 Apr 2026 11:56:23 +0900 Subject: [PATCH 4/4] update changelog Signed-off-by: SungJin1212 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6656b050660..7278afd544c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,9 @@ # Changelog ## master / unreleased -* [CHANGE] Distributor: Increase the default capacity of `expectedSymbols` from 20 to 2048 in the PRW2 path to reduce slice reallocations. #7398 * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 +* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 * [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363