diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 1a15a5314f88..8983b79c5413 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -90,6 +90,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| @@ -117,6 +119,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| @@ -147,6 +151,8 @@ to represent the task ID are deprecated and will be removed in a future release. |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| +|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| +|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java index 51f564005555..f6b92a7b62c1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java @@ -27,7 +27,8 @@ import java.util.concurrent.atomic.AtomicLong; /** - * Metrics collector for groupBy queries like spilled bytes, merge buffer acquistion time, dictionary size. + * Collects groupBy query metrics (spilled bytes, merge buffer usage, dictionary size) per-query, then + * aggregates them when queries complete. Stats are retrieved and reset periodically via {@link #getStatsSince()}. */ @LazySingleton public class GroupByStatsProvider @@ -60,7 +61,9 @@ public synchronized void closeQuery(QueryResourceId resourceId) public synchronized AggregateStats getStatsSince() { - return aggregateStatsContainer.reset(); + AggregateStats aggregateStats = new AggregateStats(aggregateStatsContainer); + aggregateStatsContainer.reset(); + return aggregateStats; } public static class AggregateStats @@ -68,6 +71,8 @@ public static class AggregateStats private long mergeBufferQueries = 0; private long mergeBufferAcquisitionTimeNs = 0; private long maxMergeBufferAcquisitionTimeNs = 0; + private long totalMergeBufferUsedBytes = 0; + private long maxMergeBufferUsedBytes = 0; private long spilledQueries = 0; private long spilledBytes = 0; private long maxSpilledBytes = 0; @@ -78,10 +83,28 @@ public AggregateStats() { } + public AggregateStats(AggregateStats aggregateStats) + { + this( + aggregateStats.mergeBufferQueries, + aggregateStats.mergeBufferAcquisitionTimeNs, + aggregateStats.maxMergeBufferAcquisitionTimeNs, + aggregateStats.totalMergeBufferUsedBytes, + aggregateStats.maxMergeBufferUsedBytes, + aggregateStats.spilledQueries, + aggregateStats.spilledBytes, + aggregateStats.maxSpilledBytes, + aggregateStats.mergeDictionarySize, + aggregateStats.maxMergeDictionarySize + ); + } + public AggregateStats( long mergeBufferQueries, long mergeBufferAcquisitionTimeNs, long maxMergeBufferAcquisitionTimeNs, + long totalMergeBufferUsedBytes, + long maxMergeBufferUsedBytes, long spilledQueries, long spilledBytes, long maxSpilledBytes, @@ -92,6 +115,8 @@ public AggregateStats( this.mergeBufferQueries = mergeBufferQueries; this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs; this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs; + this.totalMergeBufferUsedBytes = totalMergeBufferUsedBytes; + this.maxMergeBufferUsedBytes = maxMergeBufferUsedBytes; this.spilledQueries = spilledQueries; this.spilledBytes = spilledBytes; this.maxSpilledBytes = maxSpilledBytes; @@ -114,6 +139,16 @@ public long getMaxMergeBufferAcquisitionTimeNs() return maxMergeBufferAcquisitionTimeNs; } + public long getTotalMergeBufferUsedBytes() + { + return totalMergeBufferUsedBytes; + } + + public long getMaxMergeBufferUsedBytes() + { + return maxMergeBufferUsedBytes; + } + public long getSpilledQueries() { return spilledQueries; @@ -148,6 +183,8 @@ public void addQueryStats(PerQueryStats perQueryStats) maxMergeBufferAcquisitionTimeNs, perQueryStats.getMergeBufferAcquisitionTimeNs() ); + totalMergeBufferUsedBytes += perQueryStats.getMaxMergeBufferUsedBytes(); + maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, perQueryStats.getMaxMergeBufferUsedBytes()); } if (perQueryStats.getSpilledBytes() > 0) { @@ -160,36 +197,25 @@ public void addQueryStats(PerQueryStats perQueryStats) maxMergeDictionarySize = Math.max(maxMergeDictionarySize, perQueryStats.getMergeDictionarySize()); } - public AggregateStats reset() + public void reset() { - AggregateStats aggregateStats = - new AggregateStats( - mergeBufferQueries, - mergeBufferAcquisitionTimeNs, - maxMergeBufferAcquisitionTimeNs, - spilledQueries, - spilledBytes, - maxSpilledBytes, - mergeDictionarySize, - maxMergeDictionarySize - ); - this.mergeBufferQueries = 0; this.mergeBufferAcquisitionTimeNs = 0; this.maxMergeBufferAcquisitionTimeNs = 0; + this.totalMergeBufferUsedBytes = 0; + this.maxMergeBufferUsedBytes = 0; this.spilledQueries = 0; this.spilledBytes = 0; this.maxSpilledBytes = 0; this.mergeDictionarySize = 0; this.maxMergeDictionarySize = 0; - - return aggregateStats; } } public static class PerQueryStats { private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0); + private final AtomicLong maxMergeBufferUsedBytes = new AtomicLong(0); private final AtomicLong spilledBytes = new AtomicLong(0); private final AtomicLong mergeDictionarySize = new AtomicLong(0); @@ -198,6 +224,11 @@ public void mergeBufferAcquisitionTime(long delay) mergeBufferAcquisitionTimeNs.addAndGet(delay); } + public void maxMergeBufferUsedBytes(long bytes) + { + maxMergeBufferUsedBytes.addAndGet(bytes); + } + public void spilledBytes(long bytes) { spilledBytes.addAndGet(bytes); @@ -213,6 +244,11 @@ public long getMergeBufferAcquisitionTimeNs() return mergeBufferAcquisitionTimeNs.get(); } + public long getMaxMergeBufferUsedBytes() + { + return maxMergeBufferUsedBytes.get(); + } + public long getSpilledBytes() { return spilledBytes.get(); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java index 70cf5832cf33..a5edb38cfa4b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java @@ -173,6 +173,18 @@ public void close() aggregators.reset(); } + /** + * Retrieves the size of the merge buffers used for this groupby query. This value is retrieved when + * {@link SpillingGrouper#close()} is called. + *

+ * This method is implemented to return the highest memory value used, this is helpful especially in + * reporting the highest number of bytes used throughout the entire query lifecycle. + */ + public long getMaxMergeBufferUsedBytes() + { + return hashTable.getMaxMergeBufferUsedBytes(); + } + /** * Populate a {@link ReusableEntry} with values from a particular bucket. */ diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java index 4970ebe9e83e..670a03cb2dee 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java @@ -26,7 +26,6 @@ import org.apache.druid.query.aggregation.AggregatorAdapters; import org.apache.druid.query.aggregation.AggregatorFactory; -import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.AbstractList; import java.util.Collections; @@ -50,7 +49,6 @@ public class BufferHashGrouper extends AbstractBufferHashGrouper> iterator(boolean sorted) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java index 62c65f7cecb7..f348c6ba7fb0 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java @@ -26,6 +26,14 @@ import java.nio.ByteBuffer; +/** + * A fixed-width, open-addressing hash table that lives inside a caller-provided byte buffer. + *

+ * The table uses a contiguous slice of the input {@link ByteBuffer} as its backing store. Each bucket holds + * at most one entry, and occupies {@code bucketSizeWithHash} number of bytes. Collisions are resolved by continuously + * probing the next bucket to find an empty bucket to slot the new entry. The current table view {@code tableBuffer} + * is maintained as a {@link ByteBuffer} slice that moves and grows within the arena as the table expands. + */ public class ByteBufferHashTable { public static int calculateTableArenaSizeWithPerBucketAdditionalSize( @@ -79,6 +87,9 @@ public static int calculateTableArenaSizeWithFixedAdditionalSize( @Nullable protected BucketUpdateHandler bucketUpdateHandler; + // Tracks maximum bytes used for the entire lifecycle of this hash table. + protected long maxMergeBufferUsedBytes; + public ByteBufferHashTable( float maxLoadFactor, int initialBuckets, @@ -97,6 +108,7 @@ public ByteBufferHashTable( this.maxSizeForTesting = maxSizeForTesting; this.tableArenaSize = buffer.capacity(); this.bucketUpdateHandler = bucketUpdateHandler; + this.maxMergeBufferUsedBytes = 0; } public void reset() @@ -139,6 +151,7 @@ public void reset() bufferDup.position(tableStart); bufferDup.limit(tableStart + maxBuckets * bucketSizeWithHash); tableBuffer = bufferDup.slice(); + updateMaxMergeBufferUsedBytes(); // Clear used bits of new table for (int i = 0; i < maxBuckets; i++) { @@ -245,6 +258,7 @@ protected void initializeNewBucketKey( tableBuffer.putInt(Groupers.getUsedFlag(keyHash)); tableBuffer.put(keyBuffer); size++; + updateMaxMergeBufferUsedBytes(); if (bucketUpdateHandler != null) { bucketUpdateHandler.handleNewBucket(offset); @@ -381,6 +395,20 @@ public int getGrowthCount() return growthCount; } + /** + * To maintain an accurate tracking of the maximum bytes used per query, this function is to be called immediately + * whenever either of {@link #size} or {@link #bucketSizeWithHash} is changed. + */ + protected void updateMaxMergeBufferUsedBytes() + { + maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, (long) size * bucketSizeWithHash); + } + + public long getMaxMergeBufferUsedBytes() + { + return maxMergeBufferUsedBytes; + } + public interface BucketUpdateHandler { void handleNewBucket(int bucketOffset); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java index 28de255c13a0..33a79451993e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java @@ -30,6 +30,8 @@ public class ByteBufferIntList private final int maxElements; private int numElements; + private int maxMergeBufferUsedBytes; + public ByteBufferIntList( ByteBuffer buffer, int maxElements @@ -38,6 +40,7 @@ public ByteBufferIntList( this.buffer = buffer; this.maxElements = maxElements; this.numElements = 0; + this.maxMergeBufferUsedBytes = 0; if (buffer.capacity() < (maxElements * Integer.BYTES)) { throw new IAE( @@ -55,6 +58,7 @@ public void add(int val) } buffer.putInt(numElements * Integer.BYTES, val); numElements++; + maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, numElements * Integer.BYTES); } public void set(int index, int val) @@ -71,4 +75,9 @@ public void reset() { numElements = 0; } + + public int getMaxMergeBufferUsedBytes() + { + return maxMergeBufferUsedBytes; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java index cfa7295e6b43..ff2746bca29c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java @@ -44,6 +44,7 @@ public class ByteBufferMinMaxOffsetHeap private int heapSize; private int maxHeapSize; + private int maxMergeBufferUsedBytes; public ByteBufferMinMaxOffsetHeap( ByteBuffer buf, @@ -55,6 +56,7 @@ public ByteBufferMinMaxOffsetHeap( this.buf = buf; this.limit = limit; this.heapSize = 0; + this.maxMergeBufferUsedBytes = 0; this.minComparator = minComparator; this.maxComparator = Ordering.from(minComparator).reverse(); this.heapIndexUpdater = heapIndexUpdater; @@ -71,9 +73,9 @@ public int addOffset(int offset) int pos = heapSize; buf.putInt(pos * Integer.BYTES, offset); heapSize++; - if (heapSize > maxHeapSize) { - maxHeapSize = heapSize; - } + + maxHeapSize = Math.max(maxHeapSize, heapSize); + maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, maxHeapSize * Integer.BYTES); if (heapIndexUpdater != null) { heapIndexUpdater.updateHeapIndexForOffset(offset, pos); @@ -226,6 +228,11 @@ public int getHeapSize() return heapSize; } + public int getMaxMergeBufferUsedBytes() + { + return maxMergeBufferUsedBytes; + } + private void bubbleUp(int pos) { if (isEvenLevel(pos)) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 8242c9d8cf5c..b4b4cb347019 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -332,7 +332,7 @@ public void reset() throw new ISE("Grouper is closed"); } - groupers.forEach(Grouper::reset); + groupers.forEach(SpillingGrouper::reset); } @Override @@ -496,7 +496,7 @@ public void close() { if (!closed) { closed = true; - groupers.forEach(Grouper::close); + groupers.forEach(SpillingGrouper::close); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java index 0627fba0333d..873dbc776bda 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java @@ -458,6 +458,18 @@ public boolean validateBufferCapacity(int bufferCapacity) } } + @Override + public long getMaxMergeBufferUsedBytes() + { + if (!initialized) { + return 0L; + } + + long hashTableUsage = super.getMaxMergeBufferUsedBytes(); + long offSetHeapUsage = offsetHeap.getMaxMergeBufferUsedBytes(); + return hashTableUsage + offSetHeapUsage; + } + private class AlternatingByteBufferHashTable extends ByteBufferHashTable { // The base buffer is split into two alternating halves, with one sub-buffer in use at a given time. @@ -509,6 +521,7 @@ public AlternatingByteBufferHashTable( public void reset() { size = 0; + updateMaxMergeBufferUsedBytes(); growthCount = 0; // clear the used bits of the first buffer for (int i = 0; i < maxBuckets; i++) { @@ -570,6 +583,7 @@ public void adjustTableWhenFull() } size = numCopied; + updateMaxMergeBufferUsedBytes(); tableBuffer = newTableBuffer; growthCount++; } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index fadcfa02c95d..688c9f065661 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -68,7 +68,7 @@ public class SpillingGrouper implements Grouper "Not enough disk space to execute this query. Try raising druid.query.groupBy.maxOnDiskStorage." ); - private final Grouper grouper; + private final AbstractBufferHashGrouper grouper; private final KeySerde keySerde; private final LimitedTemporaryStorage temporaryStorage; private final ObjectMapper spillMapper; @@ -218,12 +218,23 @@ public void reset() @Override public void close() { - perQueryStats.dictionarySize(keySerde.getDictionarySize()); + perQueryStats.dictionarySize(getDictionarySizeEstimate()); + perQueryStats.maxMergeBufferUsedBytes(getMaxMergeBufferUsedBytes()); grouper.close(); keySerde.reset(); deleteFiles(); } + private long getMaxMergeBufferUsedBytes() + { + return grouper.isInitialized() ? grouper.getMaxMergeBufferUsedBytes() : 0L; + } + + private long getDictionarySizeEstimate() + { + return keySerde.getDictionarySize(); + } + /** * Returns a dictionary of string keys added to this grouper. Note that the dictionary of keySerde is spilled on * local storage whenever the inner grouper is spilled. If there are spilled dictionaries, this method loads them diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java index 4a6cfb83d102..207b37d65f4b 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java @@ -35,6 +35,7 @@ public void testMetricCollection() stats1.mergeBufferAcquisitionTime(300); stats1.mergeBufferAcquisitionTime(400); + stats1.maxMergeBufferUsedBytes(50); stats1.spilledBytes(200); stats1.spilledBytes(400); stats1.dictionarySize(100); @@ -45,6 +46,7 @@ public void testMetricCollection() stats2.mergeBufferAcquisitionTime(500); stats2.mergeBufferAcquisitionTime(600); + stats2.maxMergeBufferUsedBytes(100); stats2.spilledBytes(400); stats2.spilledBytes(600); stats2.dictionarySize(300); @@ -54,6 +56,8 @@ public void testMetricCollection() Assert.assertEquals(0L, aggregateStats.getMergeBufferQueries()); Assert.assertEquals(0L, aggregateStats.getMergeBufferAcquisitionTimeNs()); Assert.assertEquals(0L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs()); + Assert.assertEquals(0L, aggregateStats.getTotalMergeBufferUsedBytes()); + Assert.assertEquals(0L, aggregateStats.getMaxMergeBufferUsedBytes()); Assert.assertEquals(0L, aggregateStats.getSpilledQueries()); Assert.assertEquals(0L, aggregateStats.getSpilledBytes()); Assert.assertEquals(0L, aggregateStats.getMaxSpilledBytes()); @@ -67,6 +71,8 @@ public void testMetricCollection() Assert.assertEquals(2, aggregateStats.getMergeBufferQueries()); Assert.assertEquals(1800L, aggregateStats.getMergeBufferAcquisitionTimeNs()); Assert.assertEquals(1100L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs()); + Assert.assertEquals(150L, aggregateStats.getTotalMergeBufferUsedBytes()); + Assert.assertEquals(100L, aggregateStats.getMaxMergeBufferUsedBytes()); Assert.assertEquals(2L, aggregateStats.getSpilledQueries()); Assert.assertEquals(1600L, aggregateStats.getSpilledBytes()); Assert.assertEquals(1000L, aggregateStats.getMaxSpilledBytes()); @@ -74,7 +80,6 @@ public void testMetricCollection() Assert.assertEquals(700L, aggregateStats.getMaxMergeDictionarySize()); } - @Test public void testMetricsWithMultipleQueries() { @@ -83,24 +88,28 @@ public void testMetricsWithMultipleQueries() QueryResourceId r1 = new QueryResourceId("r1"); GroupByStatsProvider.PerQueryStats stats1 = statsProvider.getPerQueryStatsContainer(r1); stats1.mergeBufferAcquisitionTime(2000); + stats1.maxMergeBufferUsedBytes(50); stats1.spilledBytes(100); stats1.dictionarySize(200); QueryResourceId r2 = new QueryResourceId("r2"); GroupByStatsProvider.PerQueryStats stats2 = statsProvider.getPerQueryStatsContainer(r2); stats2.mergeBufferAcquisitionTime(100); + stats2.maxMergeBufferUsedBytes(500); stats2.spilledBytes(150); stats2.dictionarySize(250); QueryResourceId r3 = new QueryResourceId("r3"); GroupByStatsProvider.PerQueryStats stats3 = statsProvider.getPerQueryStatsContainer(r3); stats3.mergeBufferAcquisitionTime(200); + stats3.maxMergeBufferUsedBytes(100); stats3.spilledBytes(3000); stats3.dictionarySize(300); QueryResourceId r4 = new QueryResourceId("r4"); GroupByStatsProvider.PerQueryStats stats4 = statsProvider.getPerQueryStatsContainer(r4); stats4.mergeBufferAcquisitionTime(300); + stats4.maxMergeBufferUsedBytes(75); stats4.spilledBytes(200); stats4.dictionarySize(1500); @@ -112,11 +121,13 @@ public void testMetricsWithMultipleQueries() GroupByStatsProvider.AggregateStats aggregateStats = statsProvider.getStatsSince(); Assert.assertEquals(2000L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs()); + Assert.assertEquals(500L, aggregateStats.getMaxMergeBufferUsedBytes()); Assert.assertEquals(3000L, aggregateStats.getMaxSpilledBytes()); Assert.assertEquals(1500L, aggregateStats.getMaxMergeDictionarySize()); Assert.assertEquals(4L, aggregateStats.getMergeBufferQueries()); Assert.assertEquals(2600L, aggregateStats.getMergeBufferAcquisitionTimeNs()); + Assert.assertEquals(725L, aggregateStats.getTotalMergeBufferUsedBytes()); Assert.assertEquals(4L, aggregateStats.getSpilledQueries()); Assert.assertEquals(3450L, aggregateStats.getSpilledBytes()); Assert.assertEquals(2250L, aggregateStats.getMergeDictionarySize()); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java index c96bc50dd78d..ec5d64794cc7 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java @@ -156,6 +156,61 @@ public void testNoGrowing() } } + @Test + public void testMaxMergeBufferUsedBytes() + { + final GroupByTestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); + final BufferHashGrouper grouper = new BufferHashGrouper<>( + Suppliers.ofInstance(ByteBuffer.allocate(1000)), + GrouperTestUtil.intKeySerde(), + AggregatorAdapters.factorizeBuffered( + columnSelectorFactory, + ImmutableList.of( + new LongSumAggregatorFactory("valueSum", "value"), + new CountAggregatorFactory("count") + ) + ), + Integer.MAX_VALUE, + 0, + 0, + true + ); + grouper.init(); + + long initialUsage = grouper.getMaxMergeBufferUsedBytes(); + Assert.assertEquals(0L, initialUsage); + + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); + + grouper.aggregate(new IntKey(1)); + final long expectedBucketSize = grouper.getMaxMergeBufferUsedBytes(); + + grouper.aggregate(new IntKey(2)); + grouper.aggregate(new IntKey(3)); + + Assert.assertEquals(3L * expectedBucketSize, grouper.getMaxMergeBufferUsedBytes()); + + grouper.aggregate(new IntKey(4)); + grouper.aggregate(new IntKey(5)); + + Assert.assertEquals(5L * expectedBucketSize, grouper.getMaxMergeBufferUsedBytes()); + + grouper.reset(); + Assert.assertEquals(0, grouper.getSize()); + Assert.assertEquals(5L * expectedBucketSize, grouper.getMaxMergeBufferUsedBytes()); + + grouper.aggregate(new IntKey(1)); + grouper.aggregate(new IntKey(6)); + grouper.aggregate(new IntKey(7)); + grouper.aggregate(new IntKey(8)); + grouper.aggregate(new IntKey(9)); + grouper.aggregate(new IntKey(10)); + + Assert.assertEquals(6L * expectedBucketSize, grouper.getMaxMergeBufferUsedBytes()); + + grouper.close(); + } + private ResourceHolder> makeGrouper( GroupByTestColumnSelectorFactory columnSelectorFactory, int bufferSize, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java index 07631840231c..df9851ba829b 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java @@ -307,6 +307,62 @@ public void testIteratorOrderByAggsDesc() Assert.assertEquals(LIMIT, i); } + @Test + public void testMaxMergeBufferUsedBytesTracksMaxUsageAfterReset() + { + final GroupByTestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); + final LimitedBufferHashGrouper grouper = makeGrouper(columnSelectorFactory, 20000); + + Assert.assertEquals(0L, grouper.getMaxMergeBufferUsedBytes()); + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); + + Assert.assertTrue(String.valueOf(KEY_BASE), grouper.aggregate(new IntKey(KEY_BASE)).isOk()); + final long usagePerEntry = grouper.getMaxMergeBufferUsedBytes(); + + grouper.reset(); + Assert.assertEquals(0, grouper.getSize()); + Assert.assertEquals(usagePerEntry, grouper.getMaxMergeBufferUsedBytes()); + + // Add 10 entries after reset + for (int i = 0; i < 10; i++) { + Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(new IntKey(i + KEY_BASE)).isOk()); + } + + Assert.assertEquals(10 * usagePerEntry, grouper.getMaxMergeBufferUsedBytes()); + } + + @Test + public void testMaxMergeBufferUsedBytesAfterBufferSwap() + { + // This test closely follows the flow of testLimitAndBufferSwapping(). + final GroupByTestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); + final LimitedBufferHashGrouper grouper = makeGrouper(columnSelectorFactory, 20000); + + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); + + // Calculate usage per entry from first entry + Assert.assertTrue(String.valueOf(KEY_BASE), grouper.aggregate(new IntKey(KEY_BASE)).isOk()); + final long usagePerEntry = grouper.getMaxMergeBufferUsedBytes(); + + // This results in 13 swaps and final size of 116 (100 keys + 16 new keys after last swap) + for (int i = 1; i < NUM_ROWS; i++) { + Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(new IntKey(i + KEY_BASE)).isOk()); + } + + Assert.assertEquals(13, grouper.getGrowthCount()); + Assert.assertEquals(116, grouper.getSize()); + Assert.assertEquals(168, grouper.getMaxSize()); + + final long bucketSizeWithHash = usagePerEntry - Integer.BYTES; + final long hashTablePeak = grouper.getMaxSize() * bucketSizeWithHash; + // Heap can temporarily have LIMIT + 1 before removing one + final long heapPeak = ((long) LIMIT + 1) * Integer.BYTES; + // Peak usage is the sum of hash table peak and heap peak, which peak at different sizes... + final long expectedPeakUsage = hashTablePeak + heapPeak; + + Assert.assertEquals(expectedPeakUsage, grouper.getMaxMergeBufferUsedBytes()); + } + private static LimitedBufferHashGrouper makeGrouper( GroupByTestColumnSelectorFactory columnSelectorFactory, int bufferSize diff --git a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java index ecb702cc70da..e5f46020fe09 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java @@ -58,21 +58,16 @@ public boolean doMonitor(ServiceEmitter emitter) final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", mergeBufferPool.getPendingRequests())); - emitter.emit(builder.setMetric("mergeBuffer/used", mergeBufferPool.getUsedResourcesCount())); GroupByStatsProvider.AggregateStats statsContainer = groupByStatsProvider.getStatsSince(); if (statsContainer.getMergeBufferQueries() > 0) { emitter.emit(builder.setMetric("mergeBuffer/queries", statsContainer.getMergeBufferQueries())); - emitter.emit(builder.setMetric( - "mergeBuffer/acquisitionTimeNs", - statsContainer.getMergeBufferAcquisitionTimeNs() - )); - emitter.emit(builder.setMetric( - "mergeBuffer/maxAcquisitionTimeNs", - statsContainer.getMaxMergeBufferAcquisitionTimeNs() - )); + emitter.emit(builder.setMetric("mergeBuffer/acquisitionTimeNs", statsContainer.getMergeBufferAcquisitionTimeNs())); + emitter.emit(builder.setMetric("mergeBuffer/maxAcquisitionTimeNs", statsContainer.getMaxMergeBufferAcquisitionTimeNs())); + emitter.emit(builder.setMetric("mergeBuffer/bytesUsed", statsContainer.getTotalMergeBufferUsedBytes())); + emitter.emit(builder.setMetric("mergeBuffer/maxBytesUsed", statsContainer.getMaxMergeBufferUsedBytes())); } if (statsContainer.getSpilledQueries() > 0) { diff --git a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java index 8c9d4dc4c814..eaca043e02e7 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java @@ -61,6 +61,8 @@ public synchronized AggregateStats getStatsSince() 1L, 100L, 100L, + 200L, + 200L, 2L, 200L, 200L, @@ -70,7 +72,7 @@ public synchronized AggregateStats getStatsSince() } }; - mergeBufferPool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 5); + mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 5); executorService = Executors.newSingleThreadExecutor(); } @@ -83,8 +85,7 @@ public void tearDown() @Test public void testMonitor() { - final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); + final GroupByStatsMonitor monitor = new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); emitter.start(); monitor.doMonitor(emitter); @@ -92,12 +93,14 @@ public void testMonitor() // Trigger metric emission monitor.doMonitor(emitter); - Assert.assertEquals(10, emitter.getNumEmittedEvents()); + Assert.assertEquals(12, emitter.getNumEmittedEvents()); emitter.verifyValue("mergeBuffer/pendingRequests", 0L); emitter.verifyValue("mergeBuffer/used", 0L); emitter.verifyValue("mergeBuffer/queries", 1L); emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 100L); emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 100L); + emitter.verifyValue("mergeBuffer/bytesUsed", 200L); + emitter.verifyValue("mergeBuffer/maxBytesUsed", 200L); emitter.verifyValue("groupBy/spilledQueries", 2L); emitter.verifyValue("groupBy/spilledBytes", 200L); emitter.verifyValue("groupBy/maxSpilledBytes", 200L); @@ -112,15 +115,11 @@ public void testMonitorWithServiceDimensions() final String taskId = "taskId1"; final String groupId = "test_groupid"; final String taskType = "test_tasktype"; - final GroupByStatsMonitor monitor = new GroupByStatsMonitor( - groupByStatsProvider, - mergeBufferPool - ); + final GroupByStatsMonitor monitor = new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host", new TestTaskHolder(dataSource, taskId, taskType, groupId)); emitter.start(); monitor.doMonitor(emitter); emitter.flush(); - // Trigger metric emission monitor.doMonitor(emitter); final Map dimFilters = Map.of( @@ -136,6 +135,8 @@ public void testMonitorWithServiceDimensions() verifyMetricValue(emitter, "mergeBuffer/queries", dimFilters, 1L); verifyMetricValue(emitter, "mergeBuffer/acquisitionTimeNs", dimFilters, 100L); verifyMetricValue(emitter, "mergeBuffer/maxAcquisitionTimeNs", dimFilters, 100L); + verifyMetricValue(emitter, "mergeBuffer/bytesUsed", dimFilters, 200L); + verifyMetricValue(emitter, "mergeBuffer/maxBytesUsed", dimFilters, 200L); verifyMetricValue(emitter, "groupBy/spilledQueries", dimFilters, 2L); verifyMetricValue(emitter, "groupBy/spilledBytes", dimFilters, 200L); verifyMetricValue(emitter, "groupBy/maxSpilledBytes", dimFilters, 200L); @@ -151,8 +152,7 @@ public void testMonitoringMergeBuffer_acquiredCount() mergeBufferPool.takeBatch(4); }).get(20, TimeUnit.SECONDS); - final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); + final GroupByStatsMonitor monitor = new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); boolean ret = monitor.doMonitor(emitter); Assert.assertTrue(ret); @@ -180,8 +180,7 @@ public void testMonitoringMergeBuffer_pendingRequests() } } - final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); + final GroupByStatsMonitor monitor = new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); boolean ret = monitor.doMonitor(emitter); Assert.assertTrue(ret); @@ -203,18 +202,21 @@ public void testMonitoringWithMultipleResources() QueryResourceId r1 = new QueryResourceId("r1"); GroupByStatsProvider.PerQueryStats stats1 = statsProvider.getPerQueryStatsContainer(r1); stats1.mergeBufferAcquisitionTime(100); + stats1.maxMergeBufferUsedBytes(50); stats1.spilledBytes(200); stats1.dictionarySize(100); QueryResourceId r2 = new QueryResourceId("r2"); GroupByStatsProvider.PerQueryStats stats2 = statsProvider.getPerQueryStatsContainer(r2); stats2.mergeBufferAcquisitionTime(500); + stats2.maxMergeBufferUsedBytes(30); stats2.spilledBytes(100); stats2.dictionarySize(300); QueryResourceId r3 = new QueryResourceId("r3"); GroupByStatsProvider.PerQueryStats stats3 = statsProvider.getPerQueryStatsContainer(r3); stats3.mergeBufferAcquisitionTime(200); + stats3.maxMergeBufferUsedBytes(150); stats3.spilledBytes(800); stats3.dictionarySize(200); @@ -230,11 +232,13 @@ public void testMonitoringWithMultipleResources() emitter.verifyValue("mergeBuffer/queries", 3L); emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 800L); + emitter.verifyValue("mergeBuffer/bytesUsed", 230L); emitter.verifyValue("groupBy/spilledQueries", 3L); emitter.verifyValue("groupBy/spilledBytes", 1100L); emitter.verifyValue("groupBy/mergeDictionarySize", 600L); emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 500L); + emitter.verifyValue("mergeBuffer/maxBytesUsed", 150L); emitter.verifyValue("groupBy/maxSpilledBytes", 800L); emitter.verifyValue("groupBy/maxMergeDictionarySize", 300L); }