diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ee1aa6a9b8..12f0b9c2810 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## master / unreleased * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 -* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 +* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 * [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160 diff --git a/pkg/cortexpb/timeseriesv2.go b/pkg/cortexpb/timeseriesv2.go index bb739e4ed9e..ed8d9ba5696 100644 --- a/pkg/cortexpb/timeseriesv2.go +++ b/pkg/cortexpb/timeseriesv2.go @@ -102,11 +102,15 @@ func ReuseWriteRequestV2(req *PreallocWriteRequestV2) { // Update the dynamic symbol capacity. for { current := dynamicSymbolsCapacity.Load() - if symbolsCap <= current { - // break when other goroutines have already updated the capacity to a larger value + // We use an EMA to update the capacity. + newAvg := max((current*9+symbolsCap*1)/10, int64(initialSymbolsCapacity)) + + if current == newAvg { + // nothing to change break } - if dynamicSymbolsCapacity.CompareAndSwap(current, symbolsCap) { + + if dynamicSymbolsCapacity.CompareAndSwap(current, newAvg) { break } } diff --git a/pkg/cortexpb/timeseriesv2_test.go b/pkg/cortexpb/timeseriesv2_test.go index 66ac45001b5..3f06bd4c092 100644 --- a/pkg/cortexpb/timeseriesv2_test.go +++ b/pkg/cortexpb/timeseriesv2_test.go @@ -100,8 +100,9 @@ func TestReuseWriteRequestV2(t *testing.T) { ReuseWriteRequestV2(req) - // Verify that the dynamic capacity has scaled up - assert.Equal(t, int64(newCap), dynamicSymbolsCapacity.Load()) + // Verify that the dynamic capacity has been updated + expectedCap := max((currentCap*9+int64(newCap))/10, int64(initialSymbolsCapacity)) + assert.Equal(t, expectedCap, dynamicSymbolsCapacity.Load()) }) t.Run("outlier capacity does not update dynamic capacity and is discarded", func(t *testing.T) { currentCap := dynamicSymbolsCapacity.Load()