Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
21004b4
Add byte buffer tracking for underlying hash tables
GWphua Nov 7, 2025
c935ea6
Byte buffer tracking for underlying offset handlers
GWphua Nov 7, 2025
c781910
Fix tests
GWphua Nov 7, 2025
7063d09
Fix quidem tests
GWphua Nov 10, 2025
19f6bc3
Documentation
GWphua Nov 10, 2025
0fcb6a0
bytesUsed naming
GWphua Nov 11, 2025
25f10d2
Add max metrics
GWphua Nov 24, 2025
b6ad3c2
Add missing calculation in BufferHashGrouper
GWphua Nov 24, 2025
28719eb
Checkstyle
GWphua Nov 24, 2025
59fe03c
Checkstyle
GWphua Nov 24, 2025
e6020a6
Merge remote-tracking branch 'origin/master' into group-by-query
GWphua Dec 31, 2025
507eecd
GroupByStatsProvider javadocs
GWphua Dec 31, 2025
9623e3a
Fix GroupByStatsProviderTest comments
GWphua Jan 12, 2026
ae40900
Fix doc order for GroupByStatsProvider metrics
GWphua Jan 12, 2026
400d0f4
Fix test for GroupByStatsMonitorTest
GWphua Jan 12, 2026
8f7b218
Update server/src/test/java/org/apache/druid/server/metrics/GroupBySt…
GWphua Jan 14, 2026
df3bf70
Revert stylistic changes in BufferHashGrouper
GWphua Jan 14, 2026
ac71a63
Rename mergeBufferUsage to mergeBufferUsedBytes
GWphua Jan 14, 2026
003da9c
Order of maxAcquisitionTimeNs
GWphua Jan 14, 2026
e416867
Track the open addressing hash table
GWphua Jan 14, 2026
cd38a05
Merge branch 'master' into group-by-query
GWphua Jan 16, 2026
a26c40a
Remove max metrics, push them in another PR...
GWphua Jan 21, 2026
9725532
Remove max metrics in GroupByStatsProviderTest
GWphua Jan 21, 2026
1455712
LimitedBufferHashGrouper to use parent method to report maxTableBuffe…
GWphua Jan 21, 2026
5db69c5
Standardised merge buffer names
GWphua Jan 21, 2026
9ce074a
Tests for buffer hash grouper
GWphua Jan 21, 2026
4f4a10a
Address multiplication cast
GWphua Jan 22, 2026
e92357d
Javadocs for getMergeBufferUsedBytes
GWphua Jan 22, 2026
d55e402
Remix comments in test for peak calculations
GWphua Jan 23, 2026
34eb62d
Merge branch 'master' into group-by-query
GWphua Jan 26, 2026
988de09
Clean up after merging conflicts
GWphua Jan 26, 2026
ce05900
Standardize maxMergeBufferUsedBytes
GWphua Feb 26, 2026
32c1ed1
Test duplicate buffer adds
GWphua Feb 26, 2026
08c235a
Test
GWphua Feb 26, 2026
dd0267b
Add javadocs for update
GWphua Feb 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
Expand Down Expand Up @@ -117,6 +119,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
Expand Down Expand Up @@ -147,6 +151,8 @@ to represent the task ID are deprecated and will be removed in a future release.
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.concurrent.atomic.AtomicLong;

/**
* Metrics collector for groupBy queries like spilled bytes, merge buffer acquistion time, dictionary size.
* Collects groupBy query metrics (spilled bytes, merge buffer usage, dictionary size) per-query, then
* aggregates them when queries complete. Stats are retrieved and reset periodically via {@link #getStatsSince()}.
*/
@LazySingleton
public class GroupByStatsProvider
Expand Down Expand Up @@ -60,14 +61,18 @@ public synchronized void closeQuery(QueryResourceId resourceId)

public synchronized AggregateStats getStatsSince()
{
return aggregateStatsContainer.reset();
AggregateStats aggregateStats = new AggregateStats(aggregateStatsContainer);
aggregateStatsContainer.reset();
return aggregateStats;
}

public static class AggregateStats
{
private long mergeBufferQueries = 0;
private long mergeBufferAcquisitionTimeNs = 0;
private long maxMergeBufferAcquisitionTimeNs = 0;
private long totalMergeBufferUsedBytes = 0;
private long maxMergeBufferUsedBytes = 0;
private long spilledQueries = 0;
private long spilledBytes = 0;
private long maxSpilledBytes = 0;
Expand All @@ -78,10 +83,28 @@ public AggregateStats()
{
}

public AggregateStats(AggregateStats aggregateStats)
{
this(
aggregateStats.mergeBufferQueries,
aggregateStats.mergeBufferAcquisitionTimeNs,
aggregateStats.maxMergeBufferAcquisitionTimeNs,
aggregateStats.totalMergeBufferUsedBytes,
aggregateStats.maxMergeBufferUsedBytes,
aggregateStats.spilledQueries,
aggregateStats.spilledBytes,
aggregateStats.maxSpilledBytes,
aggregateStats.mergeDictionarySize,
aggregateStats.maxMergeDictionarySize
);
}

public AggregateStats(
long mergeBufferQueries,
long mergeBufferAcquisitionTimeNs,
long maxMergeBufferAcquisitionTimeNs,
long totalMergeBufferUsedBytes,
long maxMergeBufferUsedBytes,
long spilledQueries,
long spilledBytes,
long maxSpilledBytes,
Expand All @@ -92,6 +115,8 @@ public AggregateStats(
this.mergeBufferQueries = mergeBufferQueries;
this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs;
this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs;
this.totalMergeBufferUsedBytes = totalMergeBufferUsedBytes;
this.maxMergeBufferUsedBytes = maxMergeBufferUsedBytes;
this.spilledQueries = spilledQueries;
this.spilledBytes = spilledBytes;
this.maxSpilledBytes = maxSpilledBytes;
Expand All @@ -114,6 +139,16 @@ public long getMaxMergeBufferAcquisitionTimeNs()
return maxMergeBufferAcquisitionTimeNs;
}

public long getTotalMergeBufferUsedBytes()
{
return totalMergeBufferUsedBytes;
}

public long getMaxMergeBufferUsedBytes()
{
return maxMergeBufferUsedBytes;
}

public long getSpilledQueries()
{
return spilledQueries;
Expand Down Expand Up @@ -148,6 +183,8 @@ public void addQueryStats(PerQueryStats perQueryStats)
maxMergeBufferAcquisitionTimeNs,
perQueryStats.getMergeBufferAcquisitionTimeNs()
);
totalMergeBufferUsedBytes += perQueryStats.getMaxMergeBufferUsedBytes();
maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, perQueryStats.getMaxMergeBufferUsedBytes());
}

if (perQueryStats.getSpilledBytes() > 0) {
Expand All @@ -160,36 +197,25 @@ public void addQueryStats(PerQueryStats perQueryStats)
maxMergeDictionarySize = Math.max(maxMergeDictionarySize, perQueryStats.getMergeDictionarySize());
}

public AggregateStats reset()
public void reset()
{
AggregateStats aggregateStats =
new AggregateStats(
mergeBufferQueries,
mergeBufferAcquisitionTimeNs,
maxMergeBufferAcquisitionTimeNs,
spilledQueries,
spilledBytes,
maxSpilledBytes,
mergeDictionarySize,
maxMergeDictionarySize
);

this.mergeBufferQueries = 0;
this.mergeBufferAcquisitionTimeNs = 0;
this.maxMergeBufferAcquisitionTimeNs = 0;
this.totalMergeBufferUsedBytes = 0;
this.maxMergeBufferUsedBytes = 0;
this.spilledQueries = 0;
this.spilledBytes = 0;
this.maxSpilledBytes = 0;
this.mergeDictionarySize = 0;
this.maxMergeDictionarySize = 0;

return aggregateStats;
}
}

public static class PerQueryStats
{
private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0);
private final AtomicLong maxMergeBufferUsedBytes = new AtomicLong(0);
private final AtomicLong spilledBytes = new AtomicLong(0);
private final AtomicLong mergeDictionarySize = new AtomicLong(0);

Expand All @@ -198,6 +224,11 @@ public void mergeBufferAcquisitionTime(long delay)
mergeBufferAcquisitionTimeNs.addAndGet(delay);
}

public void maxMergeBufferUsedBytes(long bytes)
{
maxMergeBufferUsedBytes.addAndGet(bytes);
}

public void spilledBytes(long bytes)
{
spilledBytes.addAndGet(bytes);
Expand All @@ -213,6 +244,11 @@ public long getMergeBufferAcquisitionTimeNs()
return mergeBufferAcquisitionTimeNs.get();
}

public long getMaxMergeBufferUsedBytes()
{
return maxMergeBufferUsedBytes.get();
}

public long getSpilledBytes()
{
return spilledBytes.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ public void close()
aggregators.reset();
}

/**
* Retrieves the size of the merge buffers used for this groupby query. This value is retrieved when
* {@link SpillingGrouper#close()} is called.
* <p></p>
* This method is implemented to return the highest memory value used, this is helpful especially in
* reporting the highest number of bytes used throughout the entire query lifecycle.
*/
public long getMaxMergeBufferUsedBytes()
{
return hashTable.getMaxMergeBufferUsedBytes();
}

/**
* Populate a {@link ReusableEntry} with values from a particular bucket.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.AbstractList;
import java.util.Collections;
Expand All @@ -50,7 +49,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
// to get a comparator that uses the ordering defined by the OrderByColumnSpec of a query.
private final boolean useDefaultSorting;

@Nullable
private ByteBufferIntList offsetList;

public BufferHashGrouper(
Expand Down Expand Up @@ -154,6 +152,18 @@ public void reset()
aggregators.reset();
}

@Override
public long getMaxMergeBufferUsedBytes()
{
if (!initialized) {
return 0L;
}

long hashTableUsage = hashTable.getMaxMergeBufferUsedBytes();
long offSetListUsage = offsetList.getMaxMergeBufferUsedBytes();
return hashTableUsage + offSetListUsage;
}

@Override
public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@

import java.nio.ByteBuffer;

/**
* A fixed-width, open-addressing hash table that lives inside a caller-provided byte buffer.
* <p>
* The table uses a contiguous slice of the input {@link ByteBuffer} as its backing store. Each bucket holds
* at most one entry, and occupies {@code bucketSizeWithHash} number of bytes. Collisions are resolved by continuously
* probing the next bucket to find an empty bucket to slot the new entry. The current table view {@code tableBuffer}
* is maintained as a {@link ByteBuffer} slice that moves and grows within the arena as the table expands.
*/
public class ByteBufferHashTable
{
public static int calculateTableArenaSizeWithPerBucketAdditionalSize(
Expand Down Expand Up @@ -79,6 +87,9 @@ public static int calculateTableArenaSizeWithFixedAdditionalSize(
@Nullable
protected BucketUpdateHandler bucketUpdateHandler;

// Tracks maximum bytes used for the entire lifecycle of this hash table.
protected long maxMergeBufferUsedBytes;

public ByteBufferHashTable(
float maxLoadFactor,
int initialBuckets,
Expand All @@ -97,6 +108,7 @@ public ByteBufferHashTable(
this.maxSizeForTesting = maxSizeForTesting;
this.tableArenaSize = buffer.capacity();
this.bucketUpdateHandler = bucketUpdateHandler;
this.maxMergeBufferUsedBytes = 0;
}

public void reset()
Expand Down Expand Up @@ -139,6 +151,7 @@ public void reset()
bufferDup.position(tableStart);
bufferDup.limit(tableStart + maxBuckets * bucketSizeWithHash);
tableBuffer = bufferDup.slice();
updateMaxMergeBufferUsedBytes();

// Clear used bits of new table
for (int i = 0; i < maxBuckets; i++) {
Expand Down Expand Up @@ -245,6 +258,7 @@ protected void initializeNewBucketKey(
tableBuffer.putInt(Groupers.getUsedFlag(keyHash));
tableBuffer.put(keyBuffer);
size++;
updateMaxMergeBufferUsedBytes();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a similar call to updateMaxMergeBufferUsedBytes() from adjustTableWhenFull() as well?

The LimitedBufferHashGrouper does that too

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since updateMaxMergeBufferUsedBytes is calculated by size * bucketSizeWithHash, I added this function wherever size or bucketSizeWithHash is changed.

ByteBufferHashTable does not change both values in adjustTableWhenFull, so I did not add that. Granted, from a code reader's perspective, the places where the max merge buffer usage is updated seems quite random...

I have added a Javadoc to the related function.


if (bucketUpdateHandler != null) {
bucketUpdateHandler.handleNewBucket(offset);
Expand Down Expand Up @@ -381,6 +395,20 @@ public int getGrowthCount()
return growthCount;
}

/**
* To maintain an accurate tracking of the maximum bytes used per query, this function is to be called immediately
* whenever either of {@link #size} or {@link #bucketSizeWithHash} is changed.
*/
protected void updateMaxMergeBufferUsedBytes()
{
maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, (long) size * bucketSizeWithHash);
}

public long getMaxMergeBufferUsedBytes()
{
return maxMergeBufferUsedBytes;
}

public interface BucketUpdateHandler
{
void handleNewBucket(int bucketOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class ByteBufferIntList
private final int maxElements;
private int numElements;

private int maxMergeBufferUsedBytes;

public ByteBufferIntList(
ByteBuffer buffer,
int maxElements
Expand All @@ -38,6 +40,7 @@ public ByteBufferIntList(
this.buffer = buffer;
this.maxElements = maxElements;
this.numElements = 0;
this.maxMergeBufferUsedBytes = 0;

if (buffer.capacity() < (maxElements * Integer.BYTES)) {
throw new IAE(
Expand All @@ -55,6 +58,7 @@ public void add(int val)
}
buffer.putInt(numElements * Integer.BYTES, val);
numElements++;
maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, numElements * Integer.BYTES);
}

public void set(int index, int val)
Expand All @@ -71,4 +75,9 @@ public void reset()
{
numElements = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't reset() also set maxMergeBufferUsageBytes to 0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ByteBufferIntList in BufferHashGrouper should by all means be final, but we cannot make it so because it is only initialized when BufferHashGrouper#init is called.

Here I am intending to record the maximum usage for the entire BufferHashGrouper's lifecycle. So, reset will not set the max usage to 0.

}

public int getMaxMergeBufferUsedBytes()
{
return maxMergeBufferUsedBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ByteBufferMinMaxOffsetHeap

private int heapSize;
private int maxHeapSize;
private int maxMergeBufferUsedBytes;

public ByteBufferMinMaxOffsetHeap(
ByteBuffer buf,
Expand All @@ -55,6 +56,7 @@ public ByteBufferMinMaxOffsetHeap(
this.buf = buf;
this.limit = limit;
this.heapSize = 0;
this.maxMergeBufferUsedBytes = 0;
this.minComparator = minComparator;
this.maxComparator = Ordering.from(minComparator).reverse();
this.heapIndexUpdater = heapIndexUpdater;
Expand All @@ -71,9 +73,9 @@ public int addOffset(int offset)
int pos = heapSize;
buf.putInt(pos * Integer.BYTES, offset);
heapSize++;
if (heapSize > maxHeapSize) {
maxHeapSize = heapSize;
}

maxHeapSize = Math.max(maxHeapSize, heapSize);
maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, maxHeapSize * Integer.BYTES);

if (heapIndexUpdater != null) {
heapIndexUpdater.updateHeapIndexForOffset(offset, pos);
Expand Down Expand Up @@ -226,6 +228,11 @@ public int getHeapSize()
return heapSize;
}

public int getMaxMergeBufferUsedBytes()
{
return maxMergeBufferUsedBytes;
}

private void bubbleUp(int pos)
{
if (isEvenLevel(pos)) {
Expand Down
Loading
Loading