fix: Optimizes SpillingGrouper for high cardinality dimension(s) GroupBy with large memory footprint aggregators#19357
Conversation
| try { | ||
| for (final byte[] runBytes : pendingSpillRuns) { | ||
| readers.add(spillMapper.readValues( | ||
| spillMapper.getFactory().createParser(new LZ4BlockInputStream(new ByteArrayInputStream(runBytes))), |
FrankChen021
left a comment
There was a problem hiding this comment.
The changes LGTM, no correctness issues found.
e832d03 to
343994d
Compare
|
Before I review, can we please include benchmarks before/after for this change? |
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
This is an automated review by Codex GPT-5
|
|
||
| final long fileSize = file.length(); | ||
| if (fileSize < MIN_SPILL_FILE_BYTES) { | ||
| pendingSpillRuns.add(Files.readAllBytes(file.toPath())); |
There was a problem hiding this comment.
[P1] Deleted staging spills still consume the disk quota
This path writes every small spill through LimitedTemporaryStorage, reads it back into heap, then deletes the temp file. LimitedTemporaryStorage.delete only removes the file from the file set; it does not decrement bytesUsed, and LimitedOutputStream.grab has already charged those bytes against maxOnDiskStorage. As a result, high-cardinality small-spill queries can hit TemporaryStorageFullException even though those staging files were deleted and no persistent spill file exists yet, and later flushes double-charge the same data when writing the merged file. This undermines the batching optimization and can fail queries well below their configured on-disk limit; small runs should avoid charging LimitedTemporaryStorage or the accounting needs to refund deleted staging bytes.
@jtuglu1 Added benchmark results. |
| * and writing to disk only once this threshold is reached, we avoid that explosion in file count without any | ||
| * extra disk I/O for small spills. | ||
| */ | ||
| private static final long MIN_SPILL_FILE_BYTES = 1024 * 1024L; // 1MB |
There was a problem hiding this comment.
let's make this a config. sketch sizes can vary with the columns they cover and the k-values.
| try { | ||
| Files.delete(file.toPath()); | ||
| } | ||
| catch (IOException e) { | ||
| log.warn(e, "Cannot delete file: %s", file); | ||
| } | ||
| files.remove(file); | ||
| bytesUsed.addAndGet(-fileSize); |
There was a problem hiding this comment.
if this delete fails, I'd rather not have our accounting also be inaccurate – can we put this in the try after the delete? IMO, our accounting should always be an overestimate to avoid actual disk space issues where worse exceptions can happen (e.g. I'd rather throw a query error cleanly and debug that then see historicals/peons start crashing due to underestimated statistics leading to disk space errors).
| @@ -293,6 +320,22 @@ public void setSpillingAllowed(final boolean spillingAllowed) | |||
| @Override | |||
| public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted) | |||
| { | |||
| // Flush any runs that did not reach MIN_SPILL_FILE_BYTES during the spill phase. | |||
| try { | |||
| flushPendingRunsToDisk(); | |||
There was a problem hiding this comment.
we should see what the overhead of sorted=false is here. If we don't need a sorted run as the end result, we can just do a simple concat to avoid the decompress/re-sort overhead of merge sort. I think we might need to condition this on sortHasNonGroupingFields=false too.
There was a problem hiding this comment.
Overhead breakdown of flushPendingRunsToDisk():
- LZ4 decompress — fast (~GB/s)
- JSON parse — moderate (dominant cost)
- Merge-sort comparison — cheap (O(N log K), K = few pending runs)
- JSON serialize — moderate (dominant cost)
- LZ4 compress + write — fast
Replacing mergeSorted with concat (step 3) saves very little — the JSON serde in steps 2+4 dominates.
The other approach is to write each pending run's raw byte[] sequentially into one file (each is already a complete LZ4+JSON stream). At read time, create one iterator per sub-stream. The catch with this approach is that LZ4BlockInputStream stops at each stream boundary, so reading N streams from one file requires creating N LZ4BlockInputStream instances on the same underlying FileInputStream. LZ4BlockInputStream allocates a single decompression buffer (default 64KB, matching LZ4BlockOutputStream's default block size). With a lot of spills (the scenario is are trying to fix with large aggregators + high cardinality group bys), these LZ4BlockInputStream will adds up resulting in OOM like before.
The merge-sort serde cost in flushPendingRunsToDisk() is the price we pay for keeping both file count and read-time memory bounded. And as noted earlier, replacing mergeSorted with concat alone saves very little since JSON serde dominates the cost.
There was a problem hiding this comment.
Btw I wanted to benchmark SpillingGrouper.iterator(false) code path but turns out that SpillingGrouper.iterator(false) is never reachable through any production query path without a code change.
The only place SpillingGrouper.iterator() is called is:
- RowBasedGrouperHelper:634 → iterator(true) (always)
- ConcurrentGrouper:430/470 → iterator(true) or iterator(sorted) (where sorted comes from ConcurrentGrouper.iterator(sorted), which is always called with true from
RowBasedGrouperHelper)
There was a problem hiding this comment.
Looks like the reason iterator(true) is hardcoded in RowBasedGrouperHelper:634 is that the merge layer above it relies on sorted input — CombiningIterator in ConcurrentGrouper and the broker merge both detect duplicate keys by comparing consecutive sorted entries. So sorted=true is for merge correctness, not output ordering.
There was a problem hiding this comment.
could we add a comment for this on the method? Without looking at all the call-sites, it's tough to see why we ignore the sort param.
There was a problem hiding this comment.
Now that we are properly tracking bytes during deletion, I wonder if we should also look at deleting dictionaryFiles as well here.
There was a problem hiding this comment.
Right — deleteFiles() only deletes data files but not dictionaryFiles. Those are also tracked by LimitedTemporaryStorage and consume disk quota. Fixed.
| Assert.assertTrue( | ||
| "reason should mention disk space", | ||
| lastResult.getReason().contains("Not enough disk space") | ||
| ); |
There was a problem hiding this comment.
Could we maybe add some asserts/another test which verify the statistics on file delete errors? Something that would catch regressions for the bug that was patched in this PR.
| pendingDictionaryEntries.addAll(keySerde.getDictionary()); | ||
| grouper.reset(); | ||
|
|
||
| final long fileSize = file.length(); |
There was a problem hiding this comment.
I wonder if there might be an optimization here to reserve entire needed capacity for the array list upfront to prevent it needing to repeatedly double up to sufficient capacity. I think it should noop on subsequent calls too once capacity is allocated. Since we only .clear() and not something like trimToSize() this resizing might just be amortized across all the files, especially if they are of similar size and don't amount to much.
There was a problem hiding this comment.
I don't think it's worth optimizing.
- ArrayList.clear() sets size to 0 but keeps the backing array, so after the first batch flushes, the capacity is already there for subsequent batches.
- Even on the first batch, the cost is trivial. Say we have ~170 small spills per 1MB batch — that's about 7-8 doublings from the default capacity of 10. Each doubling copies an array of object references (8 bytes each), so the total copy overhead is on the order of a few KB. Negligible compared to the MB of actual spill data being handled.
- Pre-allocating would require estimating how many spills fit in a batch (minSpillFileSize / avgSpillSize), but the spill size isn't known ahead of time — it depends on the data. So the estimate would be a guess anyway.
There was a problem hiding this comment.
| final Object[] deserializedValues = reusableEntry.getValues(); | ||
| for (int i = 0; i < deserializedValues.length; i++) { | ||
| deserializedValues[i] = aggregatorFactories[i].deserialize(entry.getValues()[i]); | ||
| if (deserializedValues[i] instanceof Integer) { |
There was a problem hiding this comment.
Curious why are we coercing to long values here?
There was a problem hiding this comment.
This is existing code. https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java#L319
I just refactored it to a common method to avoid repeating code / sharing between flushPendingRunsToDisk() and iterator().
jtuglu1
left a comment
There was a problem hiding this comment.
LGTM once the remaining comments addressed and/or resolved
There was a problem hiding this comment.
@gianm might be able to comment on why this not put inside the try as well.
Thanks for the review. All followup comments addressed. |
FrankChen021
left a comment
There was a problem hiding this comment.
I rechecked the follow-up and the latest code addresses the quota issue I raised: deleted staging spill files now refund the LimitedTemporaryStorage byte accounting, temp file naming remains safe after deletes, and the new regression coverage checks tracked bytes against actual disk bytes. No further inline reply from me is needed on that thread.
This is an automated review by Codex GPT-5
…pBy with large memory footprint aggregators (apache#19357)
Optimizes SpillingGrouper for high cardinality dimension(s) GroupBy with large memory footprint aggregators
Description
tldr; Batch small spill files in SpillingGrouper to reduce file count
Problem
When aggregators like HLL sketches (or ThetaSketch) are used in groupBy queries, the BufferHashGrouper pre-allocates a large fixed-size buffer per group slot (e.g. ~64KB per slot for HLL with lgK=16). This causes the in-memory grouper to fill up quickly and spill frequently (as the number of unique grouping the BufferHashGrouper can store in memory is low). However, when each key has only been seen a few times (i.e. high cardinality dimension(s) GroupBy), the sketch serializes to a compact form of just a handful of bytes (HLL in List mode). The result is thousands of tiny spill files on disk — each one only a few KB despite the grouper buffer being full.
Large number of spill files can cause OOM when GroupBy merges spill files by opening all of them simultaneously. We previously prevent this by adding guardrail in #19141 which would fail the query. This PR fix the issue and will allow the query to succeed.
Fix
Instead of writing every grouper flush directly to its own spill file, small flushes (serialized size < 1MB) are batched in heap memory. When the accumulated pending bytes reach the 1MB threshold, all pending runs are merge-sorted and written as a single consolidated file. Large flushes (>= 1MB) bypass batching and go directly to disk as before.
The approach avoids holding the full serialized data in heap during the spill by streaming directly to a temp file first, then reading back only small files (< 1MB) into memory for batching. This prevents OOM risk when the serialized size happens to be close to the buffer size.
Key changed/added classes in this PR
All changes are in SpillingGrouper.java:
Memory overhead
Peak additional heap usage is bounded by ~2 × MIN_SPILL_FILE_BYTES (2MB): up to 1MB of accumulated pending byte arrays plus one file being read back. The grouper buffer itself is an off-heap direct ByteBuffer and is reset before the read-back.
Optimizations considered but not pursued
Combine duplicate keys during flush (
combineByKey)During
flushPendingRunsToDisk(), we tried deserializing all pending runs, combining entries with duplicate keys by merging their aggregator values (usingAggregatorFactory.combine()), then serializing the deduplicated result. The idea was to reduce the output file size and the final merge fan-in.Flame graph analysis showed
combineByKeyitself consumed only 28 CPU samples — negligible overhead. However, wall-clock time regressed from 180s to 190s (+5.5%). The cause: even though the CPU cost ofcombine()itself was small, the synchronous deserialize-combine-reserialize pipeline added latency to the processing thread's critical path. The processing thread cannot ingest new rows while flushing, so any additional work during the flush directly extends the stall. In this workload, the keys had low duplication across the small runs, socombineByKeyeliminated very few entries while paying the full deserialization cost for every entry.Reducing merge fan-in with cascaded intermediate merges
Instead of writing all pending runs as a single merged file, we considered a cascaded merge approach — progressively merging spill files on disk (similar to an external merge sort with bounded fan-in) to keep the final
iterator()merge across a small number of large files. This was not pursued because the batching approach already achieves the same goal more simply: by accumulating small runs in memory and writing them as one file, the number of files atiterator()time is naturally reduced by orders of magnitude (e.g., from ~2,000 to ~dozens). Cascaded disk merges would add I/O amplification (each entry written to disk multiple times) for marginal additional fan-in reduction.Potential future improvements
Reducing sketch initialization cost
Flame graph profiling shows that
HllSketchMergeBufferAggregatorHelper.createNewUnion()accounts for 50.7% of the processing thread's CPU time — the dominant bottleneck. Every time a new group slot is initialized in theBufferHashGrouper,init()callscreateNewUnion()which copies a pre-built empty Union image (~65KB for lgK=16) into the buffer viainitializeEmptyUnion(), then wraps it withUnion.writableWrap(). During hash table growth (adjustTableWhenFull()), every existing slot is relocated viarelocate(), which callscreateNewUnion()again. With frequent spills, the grouper repeatedly fills, spills, resets, and re-initializes — multiplying the init cost.Possible approaches:
Variable-size buffer slots in
BufferHashGrouper: Currently,ByteBufferHashTableuses fixed-width buckets (bucketSizeWithHash = HASH_SIZE + keySize + aggregators.spaceNeeded()), wherespaceNeeded()is the maximum intermediate size. For HLL with lgK=16, this pre-allocates ~65KB per slot even when the sketch is in List mode (few distinct values, only a handful of bytes). If the hash table supported variable-width slots — starting small and growing on demand as sketches transition from List → Set → HLL mode — far more groups would fit in the buffer before spilling. This would dramatically reduce the number of spills and proportionally reducecreateNewUnion()calls. However, this is a fundamental architectural change toByteBufferHashTable, which relies on fixed-width contiguous buckets for O(1) offset calculation and linear probing.Lazy sketch initialization: Instead of initializing a full Union object when a new bucket is created, defer the expensive
createNewUnion()call until the first aggregation value is actually merged. For groups that are spilled before receiving many values, this could avoid the full initialization cost entirely. TheBufferAggregatorinterface would need to support deferred init — e.g., storing a sentinel in the buffer and lazily initializing on firstaggregate()call.Benchmarks result
Before this PR:
After this PR:
The existing queryMultiQueryableIndexWithSpilling/queryMultiQueryableIndexWithSpillingTTFR uses bufferGrouperMaxSize=4000 which produce reasonable spills of ~200kb. I have also added new benchmarks with similar idea but producing spill files on extremes ends for size. queryMultiQueryableIndexWithSmallSpilling/queryMultiQueryableIndexWithSmallSpillingTTFR sets bufferGrouperMaxSize=100, producing spill size ~6 KB. This would result in more batching. queryMultiQueryableIndexWithLargeSpilling/queryMultiQueryableIndexWithLargeSpillingTTFR sets bufferGrouperMaxSize=70000, producing spill size ~4 MB. This would skip batching. These new benchmarks are not added to the PR since they are really the same as queryMultiQueryableIndexWithSpilling/queryMultiQueryableIndexWithSpillingTTFR just with different config values.
This PR has: