diff --git a/CHANGELOG.md b/CHANGELOG.md index 849aea52f20..b9764faa9ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526 * [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590 * [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663 +* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680 * [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681 * [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607 * [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index c411d592fae..1944ca7587c 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3635,9 +3635,10 @@ query_rejection: # The default tenant's shard size when the shuffle-sharding strategy is used by # ruler. When this setting is specified in the per-tenant overrides, a value of -# 0 disables shuffle sharding for the tenant. +# 0 disables shuffle sharding for the tenant. If the value is < 1 the shard size +# will be a percentage of the total rulers. # CLI flag: -ruler.tenant-shard-size -[ruler_tenant_shard_size: | default = 0] +[ruler_tenant_shard_size: | default = 0] # Maximum number of rules per rule group per-tenant. 0 to disable. # CLI flag: -ruler.max-rules-per-rule-group diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index eb34ee02e3a..80ce675878c 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -155,7 +155,7 @@ func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender { // RulesLimits defines limits used by Ruler. type RulesLimits interface { MaxQueryLength(userID string) time.Duration - RulerTenantShardSize(userID string) int + RulerTenantShardSize(userID string) float64 RulerMaxRuleGroupsPerTenant(userID string) int RulerMaxRulesPerRuleGroup(userID string) int RulerQueryOffset(userID string) time.Duration diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 067c7a4f591..ddabc670b5e 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -857,7 +857,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp userRings := map[string]ring.ReadRing{} for _, u := range users { if shardSize := r.limits.RulerTenantShardSize(u); shardSize > 0 { - subRing := r.ring.ShuffleShard(u, shardSize) + subRing := r.ring.ShuffleShard(u, r.getShardSizeForUser(u)) // Include the user only if it belongs to this ruler shard. if subRing.HasInstance(r.lifecycler.GetInstanceID()) { @@ -1325,11 +1325,18 @@ func (r *Ruler) ruleGroupListToGroupStateDesc(userID string, backupGroups rulesp return groupDescs, nil } +func (r *Ruler) getShardSizeForUser(userID string) int { + newShardSize := util.DynamicShardSize(r.limits.RulerTenantShardSize(userID), r.ring.InstancesCount()) + + // We want to guarantee that shard size will be at least replication factor + return max(newShardSize, r.cfg.Ring.ReplicationFactor) +} + func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest RulesRequest) (*RulesResponse, error) { ring := ring.ReadRing(r.ring) if shardSize := r.limits.RulerTenantShardSize(userID); shardSize > 0 && r.cfg.ShardingStrategy == util.ShardingStrategyShuffle { - ring = r.ring.ShuffleShard(userID, shardSize) + ring = r.ring.ShuffleShard(userID, r.getShardSizeForUser(userID)) } rulers, failedZones, err := GetReplicationSetForListRule(ring, &r.cfg.Ring) diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 7fd7236e8aa..ec7eb287c30 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -87,7 +87,7 @@ func defaultRulerConfig(t testing.TB) Config { type ruleLimits struct { mtx sync.RWMutex - tenantShard int + tenantShard float64 maxRulesPerRuleGroup int maxRuleGroups int disabledRuleGroups validation.DisabledRuleGroups @@ -102,7 +102,7 @@ func (r *ruleLimits) setRulerExternalLabels(lset labels.Labels) { r.mtx.Unlock() } -func (r *ruleLimits) RulerTenantShardSize(_ string) int { +func (r *ruleLimits) RulerTenantShardSize(_ string) float64 { r.mtx.RLock() defer r.mtx.RUnlock() return r.tenantShard @@ -630,7 +630,7 @@ func TestGetRules(t *testing.T) { type testCase struct { sharding bool shardingStrategy string - shuffleShardSize int + shuffleShardSize float64 rulesRequest RulesRequest expectedCount map[string]int expectedClientCallCount int @@ -1887,7 +1887,7 @@ func TestSharding(t *testing.T) { sharding bool shardingStrategy string replicationFactor int - shuffleShardSize int + shuffleShardSize float64 setupRing func(*ring.Desc) enabledUsers []string disabledUsers []string @@ -3104,3 +3104,150 @@ func TestRuler_QueryOffset(t *testing.T) { gotOffset = rg.GetGroup().QueryOffset require.Equal(t, time.Minute*2, *gotOffset) } + +func TestGetShardSizeForUser(t *testing.T) { + tests := []struct { + name string + userID string + replicationFactor int + rulerInstanceCount int + tenantShardSize float64 + expectedShardSize int + }{ + { + name: "User with fixed shard size with 10 ruler instances", + userID: "user1", + rulerInstanceCount: 10, + replicationFactor: 1, + tenantShardSize: 2, + expectedShardSize: 2, + }, + { + name: "User with fixed shard size with 50 ruler instances", + userID: "user1", + rulerInstanceCount: 50, + replicationFactor: 1, + tenantShardSize: 30, + expectedShardSize: 30, + }, + { + name: "User with percentage shard size with 10 ruler instances", + userID: "user1", + rulerInstanceCount: 10, + replicationFactor: 1, + tenantShardSize: 0.6, + expectedShardSize: 6, + }, + { + name: "User with percentage shard size with 80 ruler instances", + userID: "user1", + rulerInstanceCount: 80, + replicationFactor: 1, + tenantShardSize: 0.25, + expectedShardSize: 20, + }, + { + name: "Ensure shard size is at least replication factor", + userID: "user1", + rulerInstanceCount: 10, + replicationFactor: 3, + tenantShardSize: 0.1, + expectedShardSize: 3, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + + rulerStateMap := make(map[string]ring.InstanceState) + rulerAZEvenSpread := make(map[string]string) + rulerIDs := make([]string, tc.rulerInstanceCount) + + for i := 0; i < tc.rulerInstanceCount; i++ { + rulerID := fmt.Sprintf("ruler%d", i+1) + rulerIDs[i] = rulerID + rulerStateMap[rulerID] = ring.ACTIVE + rulerAZEvenSpread[rulerID] = string(rune('a' + i%3)) + } + + kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) }) + allRulesByUser := map[string]rulespb.RuleGroupList{} + allTokensByRuler := map[string][]uint32{} + rulerAddrMap := map[string]*Ruler{} + + createRuler := func(id string) *Ruler { + store := newMockRuleStore(allRulesByUser, nil) + cfg := defaultRulerConfig(t) + + cfg.ShardingStrategy = util.ShardingStrategyShuffle + cfg.EnableSharding = true + cfg.EnableHAEvaluation = false + cfg.EvaluationInterval = 5 * time.Minute + + cfg.Ring = RingConfig{ + InstanceID: id, + InstanceAddr: id, + KVStore: kv.Config{ + Mock: kvStore, + }, + ReplicationFactor: tc.replicationFactor, + ZoneAwarenessEnabled: true, + InstanceZone: rulerAZEvenSpread[id], + } + + r, _ := buildRuler(t, cfg, nil, store, rulerAddrMap) + r.limits = &ruleLimits{tenantShard: tc.tenantShardSize} + rulerAddrMap[id] = r + if r.ring != nil { + require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.ring)) + t.Cleanup(r.ring.StopAsync) + } + return r + } + + var testRuler *Ruler + // Create rulers and ensure they join the ring + for _, rID := range rulerIDs { + r := createRuler(rID) + testRuler = r + require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.lifecycler)) + } + + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + for rID, tokens := range allTokensByRuler { + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), rulerAddrMap[rID].lifecycler.GetInstanceZone(), tokens, ring.ACTIVE, time.Now()) + } + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + + // Check the ring state + ringDesc, err := kvStore.Get(context.Background(), ringKey) + require.NoError(t, err) + require.NotNil(t, ringDesc) + desc := ringDesc.(*ring.Desc) + require.Equal(t, tc.rulerInstanceCount, len(desc.Ingesters)) + + forEachRuler := func(f func(rID string, r *Ruler)) { + for rID, r := range rulerAddrMap { + f(rID, r) + } + } + + // Sync Rules + forEachRuler(func(_ string, r *Ruler) { + r.syncRules(context.Background(), rulerSyncReasonInitial) + }) + + result := testRuler.getShardSizeForUser(tc.userID) + assert.Equal(t, tc.expectedShardSize, result) + }) + } +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 152e51ff622..84596f17950 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -183,7 +183,7 @@ type Limits struct { // Ruler defaults and limits. RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` - RulerTenantShardSize int `yaml:"ruler_tenant_shard_size" json:"ruler_tenant_shard_size"` + RulerTenantShardSize float64 `yaml:"ruler_tenant_shard_size" json:"ruler_tenant_shard_size"` RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"` RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"` RulerQueryOffset model.Duration `yaml:"ruler_query_offset" json:"ruler_query_offset"` @@ -283,7 +283,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxOutstandingPerTenant, "frontend.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per request queue (either query frontend or query scheduler); requests beyond this error with HTTP 429.") f.Var(&l.RulerEvaluationDelay, "ruler.evaluation-delay-duration", "Deprecated(use ruler.query-offset instead) and will be removed in v1.19.0: Duration to delay the evaluation of rules to ensure the underlying metrics have been pushed to Cortex.") - f.IntVar(&l.RulerTenantShardSize, "ruler.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by ruler. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") + f.Float64Var(&l.RulerTenantShardSize, "ruler.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by ruler. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 the shard size will be a percentage of the total rulers.") f.IntVar(&l.RulerMaxRulesPerRuleGroup, "ruler.max-rules-per-rule-group", 0, "Maximum number of rules per rule group per-tenant. 0 to disable.") f.IntVar(&l.RulerMaxRuleGroupsPerTenant, "ruler.max-rule-groups-per-tenant", 0, "Maximum number of rule groups per-tenant. 0 to disable.") f.Var(&l.RulerQueryOffset, "ruler.query-offset", "Duration to offset all rule evaluation queries per-tenant.") @@ -838,7 +838,7 @@ func (o *Overrides) MetricRelabelConfigs(userID string) []*relabel.Config { } // RulerTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy. -func (o *Overrides) RulerTenantShardSize(userID string) int { +func (o *Overrides) RulerTenantShardSize(userID string) float64 { return o.GetOverridesForUser(userID).RulerTenantShardSize }