Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e662814
Add test that invokes bloom_filter_agg.
mbutrovich Sep 25, 2024
20f6e67
QueryPlanSerde support for BloomFilterAgg.
mbutrovich Sep 25, 2024
1ec31a2
Add bloom_filter_agg based on sample UDAF. planner instantiates it no…
mbutrovich Sep 27, 2024
3965dc4
Partial work on Accumulator. Need to finish merge_batch and state.
mbutrovich Sep 27, 2024
62e656c
BloomFilterAgg state, merge_state, and evaluate. Need more tests.
mbutrovich Sep 30, 2024
33ef47d
Matches Spark behavior. Need to clean up the code quite a bit, and do…
mbutrovich Sep 30, 2024
2040c76
Merge branch 'apache:main' into bloom_field_agg
mbutrovich Sep 30, 2024
599a8f9
Remove old comment.
mbutrovich Sep 30, 2024
a2a8cf3
Clippy. Increase bloom filter size back to Spark's default.
mbutrovich Sep 30, 2024
22aedd9
API cleanup.
mbutrovich Sep 30, 2024
bf22902
API cleanup.
mbutrovich Oct 1, 2024
4b7000c
Merge branch 'apache:main' into bloom_field_agg
mbutrovich Oct 2, 2024
88adc75
Add BloomFilterAgg benchmark to CometExecBenchmark
mbutrovich Oct 2, 2024
a21e0e3
Docs.
mbutrovich Oct 2, 2024
5c5d0f9
API cleanup, fix merge_bits to update cardinality.
mbutrovich Oct 2, 2024
cd107e3
Refactor merge_bits to update bit_count with the bit merging.
mbutrovich Oct 2, 2024
4f06098
Remove benchmark results file.
mbutrovich Oct 2, 2024
79f6468
Docs.
mbutrovich Oct 2, 2024
57fe742
Add native side benchmarks.
mbutrovich Oct 2, 2024
ec64e4c
Adjust benchmark parameters to match Spark defaults.
mbutrovich Oct 2, 2024
7a81f35
Address review feedback.
mbutrovich Oct 2, 2024
013513e
Merge branch 'apache:main' into bloom_field_agg
mbutrovich Oct 3, 2024
3347923
Add assertion to merge_batch.
mbutrovich Oct 4, 2024
5c82f24
Merge branch 'apache:main' into bloom_field_agg
mbutrovich Oct 9, 2024
c39ff1d
Merge branch 'apache:main' into bloom_field_agg
mbutrovich Oct 13, 2024
1ed99e3
Address some review feedback.
mbutrovich Oct 17, 2024
d41a9d2
Only generate native BloomFilterAgg if child has LongType.
mbutrovich Oct 18, 2024
6d13890
Add TODO with GitHub issue link.
mbutrovich Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Clippy. Increase bloom filter size back to Spark's default.
  • Loading branch information
mbutrovich committed Sep 30, 2024
commit a2a8cf358699b87e197f0c2e50f8dc315624335e
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ pub struct BloomFilterAgg {
num_bits: i32,
}

fn i32_from_literal_physical_expr(expr: Arc<dyn PhysicalExpr>) -> i32 {
match expr.as_any().downcast_ref::<Literal>().unwrap().value() {
ScalarValue::Int64(scalar_value) => scalar_value.unwrap() as i32,
_ => {
unreachable!()
}
}
}

impl BloomFilterAgg {
pub fn new(
expr: Arc<dyn PhysicalExpr>,
Expand All @@ -51,29 +60,12 @@ impl BloomFilterAgg {
data_type: DataType,
) -> Self {
assert!(matches!(data_type, DataType::Binary));
let num_items = match num_items
.as_any()
.downcast_ref::<Literal>()
.unwrap()
.value()
{
ScalarValue::Int64(scalar_value) => scalar_value.unwrap() as i32,
_ => {
unreachable!()
}
};
let num_bits = match num_bits.as_any().downcast_ref::<Literal>().unwrap().value() {
ScalarValue::Int64(scalar_value) => scalar_value.unwrap() as i32,
_ => {
unreachable!()
}
};
Self {
name: name.into(),
signature: Signature::exact(vec![DataType::Int64], Volatility::Immutable),
expr,
num_items: num_items,
num_bits: num_bits,
num_items: i32_from_literal_physical_expr(num_items),
num_bits: i32_from_literal_physical_expr(num_bits),
}
}
}
Expand Down Expand Up @@ -136,8 +128,8 @@ impl Accumulator for SparkBloomFilter {
spark_bloom_filter.append(&mut self.num_hash_functions().to_be_bytes().to_vec());
spark_bloom_filter.append(&mut (self.state_size_words() as u32).to_be_bytes().to_vec());
let mut filter_state: Vec<u64> = self.bits_state();
for i in 0..filter_state.len() {
filter_state[i] = filter_state[i].to_be();
for i in filter_state.iter_mut() {
*i = i.to_be();
}
spark_bloom_filter.append(&mut Vec::from(filter_state.to_byte_slice()));
Ok(ScalarValue::Binary(Some(spark_bloom_filter)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl SparkBitArray {
.chunks(8)
.map(|chunk| u64::from_ne_bytes(chunk.try_into().unwrap())),
) {
*i.0 = *i.0 | i.1;
*i.0 |= i.1;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class CometExecSuite extends CometTestBase {
sql(
"CREATE VIEW lv_noalias AS SELECT myTab.* FROM src " +
"LATERAL VIEW explode(map('key1', 100, 'key2', 200)) myTab LIMIT 2")
val df = sql("SELECT * FROM lv_noalias a JOIN lv_noalias b ON a.key=b.key");
val df = sql("SELECT * FROM lv_noalias a JOIN lv_noalias b ON a.key=b.key")
checkSparkAnswer(df)
}
}
Expand Down Expand Up @@ -927,7 +927,7 @@ class CometExecSuite extends CometTestBase {
(0 until 100)
.map(_ => (Random.nextInt(), Random.nextInt() % 5)),
"tbl") {
val df = sql("SELECT bloom_filter_agg(cast(_2 as long), cast(10 as long)) FROM tbl")
val df = sql("SELECT bloom_filter_agg(cast(_2 as long)) FROM tbl")
checkSparkAnswerAndOperator(df)
}

Expand Down