Skip to content

Commit ee7d2b5

Browse files
Refactor state management in HashJoinExec and use CASE expressions for more precise filters (apache#18451)
## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - apache#18448 - apache#18449 (depends on apache#18448) - (This PR): apache#18451 ## Changes in this PR This PR refactors state management in HashJoinExec to make filter pushdown more efficient and prepare for pushing down membership tests. - Refactor internal data structures to clean up state management and make usage more idiomatic (use `Option` instead of comparing integers, etc.) - Uses CASE expressions to evaluate pushed-down filters selectively by partition Example: `CASE hash_repartition % N WHEN partition_id THEN condition ELSE false END` --------- Co-authored-by: Lía Adriana <lia.castaneda@datadoghq.com> (cherry picked from commit 5b0aa37)
1 parent 34fbf14 commit ee7d2b5

File tree

8 files changed

+796
-239
lines changed

8 files changed

+796
-239
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 285 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::sync::{Arc, LazyLock};
1919

2020
use arrow::{
21-
array::record_batch,
21+
array::{record_batch, Float64Array, Int32Array, RecordBatch, StringArray},
2222
datatypes::{DataType, Field, Schema, SchemaRef},
2323
util::pretty::pretty_format_batches,
2424
};
@@ -278,7 +278,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
278278
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
279279
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
280280
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
281-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
281+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND d@0 <= ab ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
282282
"
283283
);
284284
}
@@ -1308,7 +1308,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
13081308
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
13091309
- CoalesceBatchesExec: target_batch_size=8192
13101310
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1311-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
1311+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ELSE false END ]
13121312
"
13131313
);
13141314

@@ -1325,7 +1325,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
13251325
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
13261326
- CoalesceBatchesExec: target_batch_size=8192
13271327
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1328-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
1328+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 0 THEN a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ELSE false END ]
13291329
"
13301330
);
13311331

@@ -1670,8 +1670,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
16701670
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
16711671
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
16721672
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
1673-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ]
1674-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ]
1673+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN b@0 >= aa AND b@0 <= ab ELSE false END ]
1674+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= ca AND d@0 <= cb ELSE false END ]
16751675
"
16761676
);
16771677
}
@@ -2333,3 +2333,282 @@ fn test_pushdown_with_computed_grouping_key() {
23332333
"
23342334
);
23352335
}
2336+
2337+
#[tokio::test]
2338+
async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
2339+
use datafusion_common::JoinType;
2340+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
2341+
2342+
// Test scenario where all build-side partitions are empty
2343+
// This validates the code path that sets the filter to `false` when no rows can match
2344+
2345+
// Create empty build side
2346+
let build_batches = vec![];
2347+
let build_side_schema = Arc::new(Schema::new(vec![
2348+
Field::new("a", DataType::Utf8, false),
2349+
Field::new("b", DataType::Utf8, false),
2350+
]));
2351+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
2352+
.with_support(true)
2353+
.with_batches(build_batches)
2354+
.build();
2355+
2356+
// Create probe side with some data
2357+
let probe_batches = vec![record_batch!(
2358+
("a", Utf8, ["aa", "ab", "ac"]),
2359+
("b", Utf8, ["ba", "bb", "bc"])
2360+
)
2361+
.unwrap()];
2362+
let probe_side_schema = Arc::new(Schema::new(vec![
2363+
Field::new("a", DataType::Utf8, false),
2364+
Field::new("b", DataType::Utf8, false),
2365+
]));
2366+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
2367+
.with_support(true)
2368+
.with_batches(probe_batches)
2369+
.build();
2370+
2371+
// Create RepartitionExec nodes for both sides
2372+
let partition_count = 4;
2373+
2374+
let build_hash_exprs = vec![
2375+
col("a", &build_side_schema).unwrap(),
2376+
col("b", &build_side_schema).unwrap(),
2377+
];
2378+
let build_repartition = Arc::new(
2379+
RepartitionExec::try_new(
2380+
build_scan,
2381+
Partitioning::Hash(build_hash_exprs, partition_count),
2382+
)
2383+
.unwrap(),
2384+
);
2385+
let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192));
2386+
2387+
let probe_hash_exprs = vec![
2388+
col("a", &probe_side_schema).unwrap(),
2389+
col("b", &probe_side_schema).unwrap(),
2390+
];
2391+
let probe_repartition = Arc::new(
2392+
RepartitionExec::try_new(
2393+
Arc::clone(&probe_scan),
2394+
Partitioning::Hash(probe_hash_exprs, partition_count),
2395+
)
2396+
.unwrap(),
2397+
);
2398+
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));
2399+
2400+
// Create HashJoinExec
2401+
let on = vec![
2402+
(
2403+
col("a", &build_side_schema).unwrap(),
2404+
col("a", &probe_side_schema).unwrap(),
2405+
),
2406+
(
2407+
col("b", &build_side_schema).unwrap(),
2408+
col("b", &probe_side_schema).unwrap(),
2409+
),
2410+
];
2411+
let hash_join = Arc::new(
2412+
HashJoinExec::try_new(
2413+
build_coalesce,
2414+
probe_coalesce,
2415+
on,
2416+
None,
2417+
&JoinType::Inner,
2418+
None,
2419+
PartitionMode::Partitioned,
2420+
datafusion_common::NullEquality::NullEqualsNothing,
2421+
)
2422+
.unwrap(),
2423+
);
2424+
2425+
let plan =
2426+
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
2427+
2428+
// Apply the filter pushdown optimizer
2429+
let mut config = SessionConfig::new();
2430+
config.options_mut().execution.parquet.pushdown_filters = true;
2431+
let optimizer = FilterPushdown::new_post_optimization();
2432+
let plan = optimizer.optimize(plan, config.options()).unwrap();
2433+
2434+
insta::assert_snapshot!(
2435+
format_plan_for_test(&plan),
2436+
@r"
2437+
- CoalesceBatchesExec: target_batch_size=8192
2438+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
2439+
- CoalesceBatchesExec: target_batch_size=8192
2440+
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
2441+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
2442+
- CoalesceBatchesExec: target_batch_size=8192
2443+
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
2444+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
2445+
"
2446+
);
2447+
2448+
// Put some data through the plan to check that the filter is updated to reflect the TopK state
2449+
let session_ctx = SessionContext::new_with_config(config);
2450+
session_ctx.register_object_store(
2451+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
2452+
Arc::new(InMemory::new()),
2453+
);
2454+
let state = session_ctx.state();
2455+
let task_ctx = state.task_ctx();
2456+
// Execute all partitions (required for partitioned hash join coordination)
2457+
let _batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
2458+
.await
2459+
.unwrap();
2460+
2461+
// Test that filters are pushed down correctly to each side of the join
2462+
insta::assert_snapshot!(
2463+
format_plan_for_test(&plan),
2464+
@r"
2465+
- CoalesceBatchesExec: target_batch_size=8192
2466+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
2467+
- CoalesceBatchesExec: target_batch_size=8192
2468+
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
2469+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
2470+
- CoalesceBatchesExec: target_batch_size=8192
2471+
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
2472+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ]
2473+
"
2474+
);
2475+
}
2476+
2477+
#[tokio::test]
2478+
async fn test_hashjoin_dynamic_filter_with_nulls() {
2479+
use datafusion_common::JoinType;
2480+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
2481+
2482+
// Test scenario where build side has NULL values in join keys
2483+
// This validates NULL handling in bounds computation and filter generation
2484+
2485+
// Create build side with NULL values
2486+
let build_batch = RecordBatch::try_new(
2487+
Arc::new(Schema::new(vec![
2488+
Field::new("a", DataType::Utf8, true), // nullable
2489+
Field::new("b", DataType::Int32, true), // nullable
2490+
])),
2491+
vec![
2492+
Arc::new(StringArray::from(vec![Some("aa"), None, Some("ab")])),
2493+
Arc::new(Int32Array::from(vec![Some(1), Some(2), None])),
2494+
],
2495+
)
2496+
.unwrap();
2497+
let build_batches = vec![build_batch];
2498+
let build_side_schema = Arc::new(Schema::new(vec![
2499+
Field::new("a", DataType::Utf8, true),
2500+
Field::new("b", DataType::Int32, true),
2501+
]));
2502+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
2503+
.with_support(true)
2504+
.with_batches(build_batches)
2505+
.build();
2506+
2507+
// Create probe side with nullable fields
2508+
let probe_batch = RecordBatch::try_new(
2509+
Arc::new(Schema::new(vec![
2510+
Field::new("a", DataType::Utf8, true),
2511+
Field::new("b", DataType::Int32, true),
2512+
Field::new("c", DataType::Float64, false),
2513+
])),
2514+
vec![
2515+
Arc::new(StringArray::from(vec![
2516+
Some("aa"),
2517+
Some("ab"),
2518+
Some("ac"),
2519+
None,
2520+
])),
2521+
Arc::new(Int32Array::from(vec![Some(1), Some(3), Some(4), Some(5)])),
2522+
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
2523+
],
2524+
)
2525+
.unwrap();
2526+
let probe_batches = vec![probe_batch];
2527+
let probe_side_schema = Arc::new(Schema::new(vec![
2528+
Field::new("a", DataType::Utf8, true),
2529+
Field::new("b", DataType::Int32, true),
2530+
Field::new("c", DataType::Float64, false),
2531+
]));
2532+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
2533+
.with_support(true)
2534+
.with_batches(probe_batches)
2535+
.build();
2536+
2537+
// Create HashJoinExec in CollectLeft mode (simpler for this test)
2538+
let on = vec![
2539+
(
2540+
col("a", &build_side_schema).unwrap(),
2541+
col("a", &probe_side_schema).unwrap(),
2542+
),
2543+
(
2544+
col("b", &build_side_schema).unwrap(),
2545+
col("b", &probe_side_schema).unwrap(),
2546+
),
2547+
];
2548+
let hash_join = Arc::new(
2549+
HashJoinExec::try_new(
2550+
build_scan,
2551+
Arc::clone(&probe_scan),
2552+
on,
2553+
None,
2554+
&JoinType::Inner,
2555+
None,
2556+
PartitionMode::CollectLeft,
2557+
datafusion_common::NullEquality::NullEqualsNothing,
2558+
)
2559+
.unwrap(),
2560+
);
2561+
2562+
let plan =
2563+
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
2564+
2565+
// Apply the filter pushdown optimizer
2566+
let mut config = SessionConfig::new();
2567+
config.options_mut().execution.parquet.pushdown_filters = true;
2568+
let optimizer = FilterPushdown::new_post_optimization();
2569+
let plan = optimizer.optimize(plan, config.options()).unwrap();
2570+
2571+
insta::assert_snapshot!(
2572+
format_plan_for_test(&plan),
2573+
@r"
2574+
- CoalesceBatchesExec: target_batch_size=8192
2575+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
2576+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
2577+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
2578+
"
2579+
);
2580+
2581+
// Put some data through the plan to check that the filter is updated to reflect the TopK state
2582+
let session_ctx = SessionContext::new_with_config(config);
2583+
session_ctx.register_object_store(
2584+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
2585+
Arc::new(InMemory::new()),
2586+
);
2587+
let state = session_ctx.state();
2588+
let task_ctx = state.task_ctx();
2589+
// Execute all partitions (required for partitioned hash join coordination)
2590+
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
2591+
.await
2592+
.unwrap();
2593+
2594+
// Test that filters are pushed down correctly to each side of the join
2595+
insta::assert_snapshot!(
2596+
format_plan_for_test(&plan),
2597+
@r"
2598+
- CoalesceBatchesExec: target_batch_size=8192
2599+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
2600+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
2601+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 ]
2602+
"
2603+
);
2604+
2605+
#[rustfmt::skip]
2606+
let expected = [
2607+
"+----+---+----+---+-----+",
2608+
"| a | b | a | b | c |",
2609+
"+----+---+----+---+-----+",
2610+
"| aa | 1 | aa | 1 | 1.0 |",
2611+
"+----+---+----+---+-----+",
2612+
];
2613+
assert_batches_eq!(&expected, &batches);
2614+
}

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ pub struct TestOpener {
6161
impl FileOpener for TestOpener {
6262
fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
6363
let mut batches = self.batches.clone();
64+
if self.batches.is_empty() {
65+
return Ok((async { Ok(TestStream::new(vec![]).boxed()) }).boxed());
66+
}
6467
if let Some(batch_size) = self.batch_size {
6568
let batch = concat_batches(&batches[0].schema(), &batches)?;
6669
let mut new_batches = Vec::new();
@@ -335,11 +338,12 @@ impl TestStream {
335338
/// least one entry in data (for the schema)
336339
pub fn new(data: Vec<RecordBatch>) -> Self {
337340
// check that there is at least one entry in data and that all batches have the same schema
338-
assert!(!data.is_empty(), "data must not be empty");
339-
assert!(
340-
data.iter().all(|batch| batch.schema() == data[0].schema()),
341-
"all batches must have the same schema"
342-
);
341+
if let Some(first) = data.first() {
342+
assert!(
343+
data.iter().all(|batch| batch.schema() == first.schema()),
344+
"all batches must have the same schema"
345+
);
346+
}
343347
Self {
344348
data,
345349
..Default::default()

0 commit comments

Comments
 (0)