Add spill file count limit for GroupBy query#19141
Conversation
There was a problem hiding this comment.
LGTM - as we have been running internally in production for a while now, pretty confident this is a good change. Left a few nits RE custom exception (maybe this should extend InternalServerError so we can classify it? I see the other group by exception doesn't do this, but maybe that predates the exception classes) and providing context in docs as to why you would need this vs just the raw byte config.
| public TemporaryStorageFileLimitException(final int fileCount) | ||
| { | ||
| return s3Object; | ||
| super(StringUtils.format("Cannot write to disk, hit file count limit of %,d.", fileCount)); |
| |`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. See [groupBy memory tuning and resource limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for details.|100000000| | ||
| |`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. See [groupBy memory tuning and resource limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for details.|100000000| | ||
| |`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)| | ||
| |`druid.query.groupBy.maxSpillFileCount`|Maximum number of spill files allowed per GroupBy query. Queries that exceed this limit will fail.|Integer.MAX_VALUE (unlimited)| |
There was a problem hiding this comment.
maybe also mention why you might want to use this, either a) to prevent OOM and/or to prevent excessive spill files, while the total spill byte size is under druid.query.groupBy.maxOnDiskStorage.
There was a problem hiding this comment.
Im going to say "See groupBy memory tuning and resource limits for details." here and add more details to the memory-tuning-and-resource-limits section (so as to not repeat myself)
That's not needed. SpillingGrouper.aggregate() doesn't throw the Exception. It returns AggregateResult with a failure reason. That will be handled upstream. |
Add spill file count limit for GroupBy query
Description
GroupBy queries that group on high-cardinality dimensions can create a large number of spill files. This problem is more likely when queries contain many aggregators and/or aggregators with large memory footprints (e.g., DataSketch). This is because GroupBy can only hold a limited number of unique groupings in memory before flushing to disk — the exact limit depends on the size of each row, which is determined by the size of the aggregators. The issue arises when GroupBy attempts to merge all the spill files. Currently, GroupBy merges spill files by opening all of them simultaneously. Opening these files requires memory for objects such as MappingIterator, SmileParser, etc., which can cause historical nodes to OOM.
This PR fixes the issue by introducing a new property:
druid.query.groupBy.maxSpillFileCountThe maximum number of spill files allowed per GroupBy query. When the limit is reached, the query fails with a ResourceLimitExceededException. This property can be used to prevent historical nodes from OOMing due to an excessive number of spill files being opened simultaneously during the merge phase. Defaults to Integer.MAX_VALUE (unlimited). Can also be set per query via the query context key
maxSpillFileCount.Note that this new config, maxSpillFileCount, is complementary to the existing maxOnDiskStorage. maxOnDiskStorage limits total bytes across all spill files, but cannot prevent a large number of tiny files — a query can create hundreds of thousands of spill files while staying well under the byte limit. maxSpillFileCount fills this gap by limiting file count directly, which bounds the number of simultaneously open file handles during the merge phase. This situation arises when aggregators like ThetaSketch pre-allocate a large fixed buffer per row in memory (e.g. ~131KB), causing the buffer to flush frequently with only a small number of rows; since each row corresponds to a unique grouping key in a high-cardinality dimension, each sketch has seen very few values at flush time and serializes to only a few bytes on disk using the sketch's compact format.
Release Notes
maxSpillFileCount.Followup
Key changed/added classes in this PR
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.javaprocessing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.javaThis PR has: