[VL] Rework co-fallback mechanism of bloom-filter might_contain/agg#5435
Conversation
|
Run Gluten Clickhouse CI |
|
Run Gluten Clickhouse CI |
1 similar comment
|
Run Gluten Clickhouse CI |
|
Example Velox might_contain code with vanilla code-gen public Object generate(Object[] references) {
return new GeneratedIteratorForCodegenStage1(references);
}
/*wsc_codegenStageId*/
final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
private Object[] references;
private scala.collection.Iterator[] inputs;
private scala.collection.Iterator localtablescan_input_0;
private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] filter_mutableStateArray_1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
private org.apache.spark.util.sketch.VeloxBloomFilter[] filter_mutableStateArray_0 = new org.apache.spark.util.sketch.VeloxBloomFilter[1];
public GeneratedIteratorForCodegenStage1(Object[] references) {
this.references = references;
}
public void init(int index, scala.collection.Iterator[] inputs) {
partitionIndex = index;
this.inputs = inputs;
localtablescan_input_0 = inputs[0];
filter_mutableStateArray_1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
filter_mutableStateArray_0[0] = org.apache.spark.util.sketch.VeloxBloomFilter.readFrom(((byte[]) references[2] /* bloomFilterData */));
}
protected void processNext() throws java.io.IOException {
while ( localtablescan_input_0.hasNext()) {
InternalRow localtablescan_row_0 = (InternalRow) localtablescan_input_0.next();
((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
do {
long localtablescan_value_0 = localtablescan_row_0.getLong(0);
boolean filter_isNull_0 = false;
boolean filter_value_0 = false;
if (!filter_isNull_0) {
filter_value_0 = filter_mutableStateArray_0[0].mightContainLong((Long)localtablescan_value_0);
}
if (filter_isNull_0 || !filter_value_0) continue;
((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numOutputRows */).add(1);
filter_mutableStateArray_1[0].reset();
filter_mutableStateArray_1[0].write(0, localtablescan_value_0);
append((filter_mutableStateArray_1[0].getRow()));
} while(false);
if (shouldStop()) return;
}
}
} |
|
Run Gluten Clickhouse CI |
7078467 to
cab3f8b
Compare
|
Run Gluten Clickhouse CI |
1 similar comment
|
Run Gluten Clickhouse CI |
|
Run Gluten Clickhouse CI |
| override def onExecutorStart(conf: SparkConf): Unit = { | ||
| UDFResolver.resolveUdfConf(conf, isDriver = false) | ||
| initialize(conf) | ||
| } |
There was a problem hiding this comment.
Moved UDF resolution to this file
|
Run Gluten Clickhouse CI |
|
Run Gluten Clickhouse CI |
1 similar comment
|
Run Gluten Clickhouse CI |
|
/Benchmark Velox |
| collectWithSubqueries(df.queryExecution.executedPlan) { | ||
| case h if h.isInstanceOf[HashAggregateExecBaseTransformer] => h | ||
| }.size == 0, | ||
| }.size == 2, |
There was a problem hiding this comment.
Could you document this change?
There was a problem hiding this comment.
No need to add documents. Will change the test name to Test bloom_filter_agg offloaded with filter fallen back to clarify in next patch
|
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
|
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
Velox's bloom-filter agg/filter functions are logically different with Spark's version. This makes their resident Gluten/Spark Filter/Aggregate operators logically different with Spark's version. For such logical differences, we should use different functions to distinguish between implementations rather than reusing Spark's function type in Velox backend.
Patch incorporates:
VeloxBloomFilterMightContain/VeloxBloomFilterAggregate.VeloxBloomFilterinheriting Spark'sBloomFilterto make sure fallen-backvelox_might_containexpression work correctly. Should support fallen-backvelox_bloom_filter_aggin future.FallbackBloomFilterAggIfNeeded([VL] Make bloom_filter_agg fall back when might_contain is not transformable #3994).BloomFilterMightContainJointRewriteRulein Velox backend module only, to convertBloomFilterMightContain/BloomFilterAggregatetoVeloxBloomFilterMightContain/VeloxBloomFilterAggregatewhen Gluten is enabled.The patch makes the relevant code safer than before, since we'll have explicit function pair of
BloomFilterMightContain/BloomFilterAggregateandVeloxBloomFilterMightContain/VeloxBloomFilterAggregate. Thus we can easierly detect a mismatch between agg/filter function by checking their function types, rather than failing the query execution at runtime.