Add PercentileTDigest support for MergeAndRollup aggregation#18088
Add PercentileTDigest support for MergeAndRollup aggregation#18088justahuman1 wants to merge 6 commits intoapache:masterfrom
Conversation
Add PercentileTDigestAggregator for minion merge/rollup tasks, enabling TDigest sketch merging with configurable compression factor. - New PercentileTDigestAggregator implementing ValueAggregator - Register PERCENTILETDIGEST and PERCENTILERAWTDIGEST in ValueAggregatorFactory - Add both types to AVAILABLE_CORE_VALUE_AGGREGATORS in MinionConstants - Allow compressionFactor in MergeRollupTaskGenerator validation - Unit tests for the aggregator with default and custom compression - Integration test exercising the full MergeRollupTaskExecutor pipeline with TDigest BYTES columns, multiple dimension groups, cross-segment merging, skewed distributions, and edge cases
859f945 to
9ac773b
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18088 +/- ##
============================================
- Coverage 63.75% 63.32% -0.44%
- Complexity 1573 1628 +55
============================================
Files 3167 3230 +63
Lines 191658 196729 +5071
Branches 29469 30413 +944
============================================
+ Hits 122198 124581 +2383
- Misses 59851 62175 +2324
- Partials 9609 9973 +364
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adds TDigest sketch merging support to Pinot’s minion merge/rollup pipeline by introducing a new PercentileTDigestAggregator, wiring it into the aggregator factory/minion constants, and extending task-config validation (plus new unit/integration-style tests) to support a configurable compressionFactor.
Changes:
- Add
PercentileTDigestAggregatorand registerPERCENTILETDIGEST/PERCENTILERAWTDIGESTinValueAggregatorFactory. - Extend
MergeRollupTaskGeneratorvalidation to allow and validatecompressionFactor. - Add unit and executor tests covering TDigest merging and compressionFactor validation.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java | New minion rollup aggregator for merging serialized TDigests with optional compressionFactor. |
| pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java | Registers TDigest aggregator for PERCENTILETDIGEST and PERCENTILERAWTDIGEST. |
| pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java | Adds TDigest aggregation types to available core value aggregators set. |
| pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/.../MergeRollupTaskGenerator.java | Allows/validates compressionFactor as an aggregation function parameter. |
| pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java | Unit tests for default/custom compression merge behavior. |
| pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/.../MergeRollupTDigestTaskExecutorTest.java | Integration-style tests for merge/rollup executor using TDigest bytes metric. |
| pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/.../MergeRollupTaskGeneratorTest.java | Adds valid/invalid compressionFactor validation tests. |
| TDigest first = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value1); | ||
| TDigest second = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value2); | ||
| TDigest merged = TDigest.createMergingDigest(compression); | ||
| merged.add(first); | ||
| merged.add(second); | ||
| return ObjectSerDeUtils.TDIGEST_SER_DE.serialize(merged); |
There was a problem hiding this comment.
PercentileTDigestAggregator directly deserializes (byte[]) value1/value2. For BYTES columns, Pinot's default null is an empty byte[] (see FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES), and RollupReducer can aggregate such values when includeNullFields=false. Deserializing byte[0] will fail, so this aggregator should explicitly handle empty inputs (e.g., return the other value when one side is empty, and produce a serialized empty TDigest when both are empty).
| // check that function parameter value is valid for nominal entries | ||
| if (functionParameterName.equalsIgnoreCase(Constants.CPCSKETCH_LGK_KEY) | ||
| || functionParameterName.equalsIgnoreCase(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES)) { | ||
| || functionParameterName.equalsIgnoreCase(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES) | ||
| || functionParameterName.equalsIgnoreCase(Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY)) { | ||
| String value = functionParameters.get(functionParameterName); |
There was a problem hiding this comment.
The inline comment says "check that function parameter value is valid for nominal entries", but this block now also validates lgK and compressionFactor. Please update the comment (or split the validation) so it accurately reflects the parameters being validated.
There was a problem hiding this comment.
I removed the comment because its just duplicating the if statement check..
| // Segment 0: group1=[1..100], group2=[500..600] | ||
| List<GenericRow> seg0 = new ArrayList<>(); | ||
| seg0.add(makeRow(GROUP_1, createTDigest(1, 101))); | ||
| seg0.add(makeRow(GROUP_2, createTDigest(500, 600))); | ||
| segments.add(seg0); |
There was a problem hiding this comment.
This comment is off by one: createTDigest(500, 600) adds values 500..599 (end is exclusive). Either adjust the comment to [500..599] or change the test data range to match.
…stAggregator - Fix MergeRollupTDigestTaskExecutorTest: wrong imports for SchemaUtils and TableConfigUtils (should be SchemaSerDeUtils/TableConfigSerDeUtils) - Fix MergeRollupTaskGeneratorTest: missing ImmutableMap import - Add empty byte[] handling to PercentileTDigestAggregator to prevent BufferUnderflowException when BYTES columns have default null value (byte[0]). Follows same pattern as DistinctCountCPCSketchAggregator. - Add unit tests for empty byte[] aggregation cases - Clean up test assertion style: remove unnecessary .0 suffixes
| int compression = getCompression(functionParameters); | ||
| return ObjectSerDeUtils.TDIGEST_SER_DE.serialize(TDigest.createMergingDigest(compression)); |
There was a problem hiding this comment.
Does it work if you simply return empty bytes?
There was a problem hiding this comment.
Thanks for the quick review!
I don't think returning empty bytes is safe. The query-time aggregation function and RollupReducer both deserialize without checking for empty bytes. At LinkedIn, we hit this exact issue with HLL in production recently where empty bytes caused BufferUnderflowException during the query execution path (I wanted to send a PR to add a guard for all aggregators separately as well if that makes sense). The reason for this empty serialization was to ensures everything downstream can parse it properly.
What do you think/recommend?
I tried to create a static variable for this (for perf reasons) but apparently byte buffers can be different for empty arrays with a different compression value. See below script for test:
import com.tdunning.math.stats.TDigest;
import com.tdunning.math.stats.MergingDigest;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class TDigestTest {
public static void main(String[] args) {
TDigest d100 = TDigest.createMergingDigest(100);
TDigest d200 = TDigest.createMergingDigest(200);
ByteBuffer buf100 = ByteBuffer.allocate(d100.smallByteSize());
d100.asSmallBytes(buf100);
byte[] bytes100 = buf100.array();
ByteBuffer buf200 = ByteBuffer.allocate(d200.smallByteSize());
d200.asSmallBytes(buf200);
byte[] bytes200 = buf200.array();
System.out.println("compression=100 size: " + bytes100.length + " bytes: " + Arrays.toString(bytes100));
System.out.println("compression=200 size: " + bytes200.length + " bytes: " + Arrays.toString(bytes200));
System.out.println("equal: " + Arrays.equals(bytes100, bytes200));
}
}Output:
compression=100 size: 30 bytes: [0, 0, 0, 2, 127, -16, 0, 0, 0, 0, 0, 0, -1, -16, 0, 0, 0, 0, 0, 0, 66, -56, 0, 0, 0, -46, 1, -12, 0, 0]
compression=200 size: 30 bytes: [0, 0, 0, 2, 127, -16, 0, 0, 0, 0, 0, 0, -1, -16, 0, 0, 0, 0, 0, 0, 67, 72, 0, 0, 1, -102, 3, -24, 0, 0]
- Add JavaDoc to PercentileTDigestAggregator documenting raw/non-raw support - Add factory tests verifying both PERCENTILETDIGEST and PERCENTILERAWTDIGEST resolve to the same aggregator - Remove stale "nominal entries" comment in MergeRollupTaskGenerator - Fix off-by-one in test comment: [500..600] -> [500..599]
Summary
Adds
PercentileTDigestAggregatorfor minion merge/rollup tasks, enabling TDigest sketch merging with configurable compression factor.PercentileTDigestAggregatorimplementingValueAggregator— deserializes two TDigest byte arrays, merges viaTDigest.createMergingDigest(), serializes backPERCENTILETDIGESTandPERCENTILERAWTDIGESTinValueAggregatorFactoryandMinionConstants.AVAILABLE_CORE_VALUE_AGGREGATORScompressionFactorinMergeRollupTaskGeneratorvalidation (alongside existinglgK,nominalEntries,samplingProbability)Test plan
PercentileTDigestAggregatorTest— default and custom compressionMergeRollupTaskGeneratorTest— valid and invalidcompressionFactorMergeRollupTDigestTaskExecutorTest