Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,5 @@ rpath = false
large_futures = "warn"

[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to force_hash_collisions change, or something separate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it allows tarpaulin to be used as a feature even if the crate doesn't define it

unused_imports = "deny"
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ path = "src/lib.rs"
avro = ["apache-avro"]
backtrace = []
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]
force_hash_collisions = []

[dependencies]
ahash = { workspace = true }
Expand Down
20 changes: 18 additions & 2 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@

//! Functionality used both on logical and physical plans

#[cfg(not(feature = "force_hash_collisions"))]
use std::sync::Arc;

use ahash::RandomState;
use arrow::array::*;
use arrow::datatypes::*;
use arrow::row::Rows;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use arrow_buffer::IntervalDayTime;
use arrow_buffer::IntervalMonthDayNano;

#[cfg(not(feature = "force_hash_collisions"))]
use crate::cast::{
as_boolean_array, as_fixed_size_list_array, as_generic_binary_array,
as_large_list_array, as_list_array, as_primitive_array, as_string_array,
as_struct_array,
};
use crate::error::{Result, _internal_err};
use crate::error::Result;
#[cfg(not(feature = "force_hash_collisions"))]
use crate::error::_internal_err;

// Combines two hashes into one hash
#[inline]
Expand All @@ -41,6 +46,7 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 {
hash.wrapping_mul(37).wrapping_add(r)
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) {
if mul_col {
hashes_buffer.iter_mut().for_each(|hash| {
Expand Down Expand Up @@ -90,6 +96,7 @@ hash_float_value!((half::f16, u16), (f32, u32), (f64, u64));
/// Builds hash values of PrimitiveArray and writes them into `hashes_buffer`
/// If `rehash==true` this combines the previous hash value in the buffer
/// with the new hash using `combine_hashes`
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array_primitive<T>(
array: &PrimitiveArray<T>,
random_state: &RandomState,
Expand Down Expand Up @@ -135,6 +142,7 @@ fn hash_array_primitive<T>(
/// Hashes one array into the `hashes_buffer`
/// If `rehash==true` this combines the previous hash value in the buffer
/// with the new hash using `combine_hashes`
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array<T>(
array: T,
random_state: &RandomState,
Expand Down Expand Up @@ -180,6 +188,7 @@ fn hash_array<T>(
}

/// Hash the values in a dictionary array
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_dictionary<K: ArrowDictionaryKeyType>(
array: &DictionaryArray<K>,
random_state: &RandomState,
Expand Down Expand Up @@ -210,6 +219,7 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
Ok(())
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_struct_array(
array: &StructArray,
random_state: &RandomState,
Expand All @@ -236,6 +246,7 @@ fn hash_struct_array(
Ok(())
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_list_array<OffsetSize>(
array: &GenericListArray<OffsetSize>,
random_state: &RandomState,
Expand Down Expand Up @@ -269,6 +280,7 @@ where
Ok(())
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_fixed_list_array(
array: &FixedSizeListArray,
random_state: &RandomState,
Expand Down Expand Up @@ -450,7 +462,11 @@ pub fn create_row_hashes_v2<'a>(

#[cfg(test)]
mod tests {
use arrow::{array::*, datatypes::*};
use std::sync::Arc;

use arrow::array::*;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::datatypes::*;

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ default = [
]
encoding_expressions = ["datafusion-functions/encoding_expressions"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"]
math_expressions = ["datafusion-functions/math_expressions"]
parquet = ["datafusion-common/parquet", "dep:parquet"]
pyarrow = ["datafusion-common/pyarrow", "parquet"]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/simplify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<'a> SimplifyContext<'a> {
impl<'a> SimplifyInfo for SimplifyContext<'a> {
/// returns true if this Expr has boolean type
fn is_boolean_type(&self, expr: &Expr) -> Result<bool> {
for schema in &self.schema {
if let Some(schema) = &self.schema {
if let Ok(DataType::Boolean) = expr.get_type(schema) {
return Ok(true);
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ rust-version = { workspace = true }
[lints]
workspace = true

[features]
force_hash_collisions = []

[lib]
name = "datafusion_physical_plan"
path = "src/lib.rs"
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,7 @@ mod tests {
use rstest::*;
use rstest_reuse::*;

#[cfg(not(feature = "force_hash_collisions"))]
fn div_ceil(a: usize, b: usize) -> usize {
(a + b - 1) / b
}
Expand Down Expand Up @@ -1930,6 +1931,8 @@ mod tests {
Ok(())
}

// FIXME(#TODO) test fails with feature `force_hash_collisions`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @nrc -- I agree that cfg'ing these out is the right step for this PR

I'll file a ticket to look into the failures.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was writing up a ticket and testing locally. When I removed the cfgs

diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs
index fdf062b1f..359a86856 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -1583,7 +1583,7 @@ mod tests {
     use rstest::*;
     use rstest_reuse::*;

-    #[cfg(not(feature = "force_hash_collisions"))]
+
     fn div_ceil(a: usize, b: usize) -> usize {
         (a + b - 1) / b
     }
@@ -1932,7 +1932,6 @@ mod tests {
     }

     // FIXME(#TODO) test fails with feature `force_hash_collisions`
-    #[cfg(not(feature = "force_hash_collisions"))]
     #[apply(batch_sizes)]
     #[tokio::test]
     async fn join_inner_two(batch_size: usize) -> Result<()> {
@@ -1989,7 +1988,6 @@ mod tests {

     /// Test where the left has 2 parts, the right with 1 part => 1 part
     // FIXME(#TODO) test fails with feature `force_hash_collisions`
-    #[cfg(not(feature = "force_hash_collisions"))]
     #[apply(batch_sizes)]
     #[tokio::test]
     async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> {
@@ -2103,7 +2101,6 @@ mod tests {

     /// Test where the left has 1 part, the right has 2 parts => 2 parts
     // FIXME(#TODO) test fails with feature `force_hash_collisions`
-    #[cfg(not(feature = "force_hash_collisions"))]
     #[apply(batch_sizes)]
     #[tokio::test]
     async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> {

And then ran the tests they seemed to pass for me locally:

andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion$ cargo test --lib --features=force_hash_collisions   -p datafusion-physical-plan -- join_inner
warning: /Users/andrewlamb/Software/datafusion/Cargo.toml: unused manifest key: workspace.lints.rust.unexpected_cfgs.check-cfg
   Compiling datafusion-physical-plan v40.0.0 (/Users/andrewlamb/Software/datafusion/datafusion/physical-plan)
    Finished `test` profile [unoptimized + debuginfo] target(s) in 3.59s
     Running unittests src/lib.rs (target/debug/deps/datafusion_physical_plan-8b4d4245d6c07ebc)

running 40 tests
test joins::hash_join::tests::join_inner_one_no_shared_column_names ... ok
test joins::hash_join::tests::join_inner_one::batch_size_3_5 ... ok
test joins::hash_join::tests::join_inner_one_two_parts_left::batch_size_4_2 ... ok
test joins::hash_join::tests::join_inner_one::batch_size_2_10 ... ok
test joins::hash_join::tests::join_inner_one::batch_size_5_1 ... ok
test joins::hash_join::tests::join_inner_one::batch_size_1_8192 ... ok
test joins::hash_join::tests::join_inner_one_two_parts_left::batch_size_5_1 ... ok
test joins::hash_join::tests::join_inner_one_two_parts_left::batch_size_2_10 ... ok
test joins::hash_join::tests::join_inner_one_two_parts_left::batch_size_3_5 ... ok
test joins::hash_join::tests::join_inner_one::batch_size_4_2 ... ok
test joins::hash_join::tests::join_inner_one_two_parts_left_randomly_ordered ... ok
test joins::hash_join::tests::join_inner_one_two_parts_left::batch_size_1_8192 ... ok
test joins::hash_join::tests::join_inner_one_randomly_ordered ... ok
test joins::hash_join::tests::join_inner_one_two_parts_right::batch_size_2_10 ... ok
test joins::hash_join::tests::join_inner_one_two_parts_right::batch_size_1_8192 ... ok
test joins::hash_join::tests::join_inner_one_two_parts_right::batch_size_3_5 ... ok
test joins::hash_join::tests::join_inner_two::batch_size_1_8192 ... ok
test joins::hash_join::tests::join_inner_two::batch_size_4_2 ... ok
test joins::hash_join::tests::join_inner_two::batch_size_2_10 ... ok
test joins::hash_join::tests::join_inner_two::batch_size_3_5 ... ok
test joins::hash_join::tests::join_inner_two::batch_size_5_1 ... ok
test joins::hash_join::tests::join_inner_one_two_parts_right::batch_size_4_2 ... ok
test joins::hash_join::tests::join_inner_one_two_parts_right::batch_size_5_1 ... ok
test joins::hash_join::tests::join_inner_with_filter::batch_size_3_5 ... ok
test joins::hash_join::tests::join_inner_with_filter::batch_size_1_8192 ... ok
test joins::hash_join::tests::join_inner_with_filter::batch_size_2_10 ... ok
test joins::hash_join::tests::join_inner_with_filter::batch_size_5_1 ... ok
test joins::hash_join::tests::join_inner_with_filter::batch_size_4_2 ... ok
test joins::sort_merge_join::tests::join_inner_two ... ok
test joins::sort_merge_join::tests::join_inner_one ... ok
test joins::sort_merge_join::tests::join_inner_two_two ... ok
test joins::sort_merge_join::tests::join_inner_output_two_batches ... ok
test joins::hash_join::tests::partitioned_join_inner_one::batch_size_2_10 ... ok
test joins::sort_merge_join::tests::join_inner_with_nulls_with_options ... ok
test joins::sort_merge_join::tests::join_inner_with_nulls ... ok
test joins::hash_join::tests::partitioned_join_inner_one::batch_size_5_1 ... ok
test joins::hash_join::tests::partitioned_join_inner_one::batch_size_4_2 ... ok
test joins::hash_join::tests::partitioned_join_inner_one::batch_size_1_8192 ... ok
test joins::hash_join::tests::partitioned_join_inner_one::batch_size_3_5 ... ok
test joins::nested_loop_join::tests::join_inner_with_filter ... ok

test result: ok. 40 passed; 0 failed; 0 ignored; 0 measured; 688 filtered out; finished in 0.03s

#[cfg(not(feature = "force_hash_collisions"))]
#[apply(batch_sizes)]
#[tokio::test]
async fn join_inner_two(batch_size: usize) -> Result<()> {
Expand Down Expand Up @@ -1985,6 +1988,8 @@ mod tests {
}

/// Test where the left has 2 parts, the right with 1 part => 1 part
// FIXME(#TODO) test fails with feature `force_hash_collisions`
#[cfg(not(feature = "force_hash_collisions"))]
#[apply(batch_sizes)]
#[tokio::test]
async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> {
Expand Down Expand Up @@ -2097,6 +2102,8 @@ mod tests {
}

/// Test where the left has 1 part, the right has 2 parts => 2 parts
// FIXME(#TODO) test fails with feature `force_hash_collisions`
#[cfg(not(feature = "force_hash_collisions"))]
#[apply(batch_sizes)]
#[tokio::test]
async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> {
Expand Down
37 changes: 19 additions & 18 deletions datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -251,25 +251,26 @@ SELECT COUNT(*) FROM timestamp_with_tz;
----
131072

# FIXME(#TODO) fails with feature `force_hash_collisions`
# Perform the query:
query IPT
SELECT
count,
LAG(timestamp, 1) OVER (ORDER BY timestamp),
arrow_typeof(LAG(timestamp, 1) OVER (ORDER BY timestamp))
FROM timestamp_with_tz
LIMIT 10;
----
0 NULL Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
4 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
14 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
# query IPT
# SELECT
# count,
# LAG(timestamp, 1) OVER (ORDER BY timestamp),
# arrow_typeof(LAG(timestamp, 1) OVER (ORDER BY timestamp))
# FROM timestamp_with_tz
# LIMIT 10;
# ----
# 0 NULL Timestamp(Millisecond, Some("UTC"))
# 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
# 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
# 4 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
# 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
# 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
# 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
# 14 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
# 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
# 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))

# Test config listing_table_ignore_subdirectory:

Expand Down
19 changes: 10 additions & 9 deletions datafusion/sqllogictest/test_files/sort_merge_join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,17 @@ SELECT * FROM t1 FULL JOIN t2 ON t1_id = t2_id
44 d 4 44 x 3
NULL NULL NULL 55 w 3

# FIXME(#TODO) fails with feature `force_hash_collisions`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the issue with SMJ with hash collisions?

# equijoin_full_and_condition_from_both
query ITIITI rowsort
SELECT * FROM t1 FULL JOIN t2 ON t1_id = t2_id AND t2_int <= t1_int
----
11 a 1 NULL NULL NULL
22 b 2 22 y 1
33 c 3 NULL NULL NULL
44 d 4 44 x 3
NULL NULL NULL 11 z 3
NULL NULL NULL 55 w 3
# query ITIITI rowsort
# SELECT * FROM t1 FULL JOIN t2 ON t1_id = t2_id AND t2_int <= t1_int
# ----
# 11 a 1 NULL NULL NULL
# 22 b 2 22 y 1
# 33 c 3 NULL NULL NULL
# 44 d 4 44 x 3
# NULL NULL NULL 11 z 3
# NULL NULL NULL 55 w 3

statement ok
DROP TABLE t1;
Expand Down