|
28 | 28 | // RingCount is the interface exposed by a ring implementation which allows |
29 | 29 | // to count members |
30 | 30 | type RingCount interface { |
31 | | - HealthyInstancesCount() int |
| 31 | + InstancesCount() int |
| 32 | + InstancesInZoneCount() int |
32 | 33 | ZonesCount() int |
33 | 34 | } |
34 | 35 |
|
@@ -184,34 +185,43 @@ func (l *Limiter) convertGlobalToLocalLimit(userID string, globalLimit int) int |
184 | 185 | return 0 |
185 | 186 | } |
186 | 187 |
|
187 | | - // Given we don't need a super accurate count (ie. when the ingesters |
188 | | - // topology changes) and we prefer to always be in favor of the tenant, |
189 | | - // we can use a per-ingester limit equal to: |
190 | | - // (global limit / number of ingesters) * replication factor |
191 | | - numIngesters := l.ring.HealthyInstancesCount() |
192 | | - |
193 | | - // May happen because the number of ingesters is asynchronously updated. |
194 | | - // If happens, we just temporarily ignore the global limit. |
195 | | - if numIngesters == 0 { |
196 | | - return 0 |
| 188 | + zonesCount := l.getZonesCount() |
| 189 | + var ingestersInZoneCount int |
| 190 | + if zonesCount > 1 { |
| 191 | + // In this case zone-aware replication is enabled, and ingestersInZoneCount is initially set to |
| 192 | + // the total number of ingesters in the corresponding zone |
| 193 | + ingestersInZoneCount = l.ring.InstancesInZoneCount() |
| 194 | + } else { |
| 195 | + // In this case zone-aware replication is disabled, and ingestersInZoneCount is initially set to |
| 196 | + // the total number of ingesters |
| 197 | + ingestersInZoneCount = l.ring.InstancesCount() |
| 198 | + } |
| 199 | + shardSize := l.getShardSize(userID) |
| 200 | + // If shuffle sharding is enabled and the total number of ingesters in the zone is greater than the |
| 201 | + // expected number of ingesters per sharded zone, then we should honor the latter because series/metadata |
| 202 | + // cannot be written to more ingesters than that. |
| 203 | + if shardSize > 0 { |
| 204 | + ingestersInZoneCount = util_math.Min(ingestersInZoneCount, util.ShuffleShardExpectedInstancesPerZone(shardSize, zonesCount)) |
197 | 205 | } |
198 | 206 |
|
199 | | - // If the number of available ingesters is greater than the tenant's shard |
200 | | - // size, then we should honor the shard size because series/metadata won't |
201 | | - // be written to more ingesters than it. |
202 | | - if shardSize := l.getShardSize(userID); shardSize > 0 { |
203 | | - // We use Min() to protect from the case the expected shard size is > available ingesters. |
204 | | - numIngesters = util_math.Min(numIngesters, util.ShuffleShardExpectedInstances(shardSize, l.getNumZones())) |
| 207 | + // This may happen, for example when the total number of ingesters is asynchronously updated, or |
| 208 | + // when zone-aware replication is enabled but ingesters in a zone have been scaled down. |
| 209 | + // In those cases we ignore the global limit. |
| 210 | + if ingestersInZoneCount == 0 { |
| 211 | + return 0 |
205 | 212 | } |
206 | 213 |
|
207 | | - return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor)) |
| 214 | + // Global limit is equally distributed among all the active zones. |
| 215 | + // The portion of global limit related to each zone is then equally distributed |
| 216 | + // among all the ingesters belonging to that zone. |
| 217 | + return int((float64(globalLimit*l.replicationFactor) / float64(zonesCount)) / float64(ingestersInZoneCount)) |
208 | 218 | } |
209 | 219 |
|
210 | 220 | func (l *Limiter) getShardSize(userID string) int { |
211 | 221 | return l.limits.IngestionTenantShardSize(userID) |
212 | 222 | } |
213 | 223 |
|
214 | | -func (l *Limiter) getNumZones() int { |
| 224 | +func (l *Limiter) getZonesCount() int { |
215 | 225 | if l.zoneAwarenessEnabled { |
216 | 226 | return util_math.Max(l.ring.ZonesCount(), 1) |
217 | 227 | } |
|
0 commit comments