From b545b27c6e8816bf779812c7e5b8dd7c5bf9af63 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Sat, 20 Sep 2025 14:33:54 +0300 Subject: [PATCH 01/12] fix --- datafusion/physical-plan/src/windows/mod.rs | 102 ++++++++++++++++---- 1 file changed, 83 insertions(+), 19 deletions(-) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index dccd9200fc777..1c1348701b984 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -371,18 +371,46 @@ pub(crate) fn window_equivalence_properties( for (i, expr) in window_exprs.iter().enumerate() { let partitioning_exprs = expr.partition_by(); let no_partitioning = partitioning_exprs.is_empty(); - // Collect columns defining partitioning, and construct all `SortOptions` - // variations for them. Then, we will check each one whether it satisfies - // the existing ordering provided by the input plan. + + // Use incremental approach to avoid O(4^n) exponential complexity: + // Instead of generating all combinations upfront via multi_cartesian_product, + // we build orderings incrementally and prune invalid paths early. let mut all_satisfied_lexs = vec![]; - for lex in partitioning_exprs - .iter() - .map(|pb_order| sort_options_resolving_constant(Arc::clone(pb_order))) - .multi_cartesian_product() - .filter_map(LexOrdering::new) - { - if window_eq_properties.ordering_satisfy(lex.clone())? { - all_satisfied_lexs.push(lex); + if !no_partitioning { + // Start with empty orderings that we'll extend incrementally + let mut current_orderings = vec![vec![]]; + for partition_expr in partitioning_exprs.iter() { + let mut next_orderings = vec![]; + + let sort_options = + sort_options_resolving_constant(Arc::clone(partition_expr), true); + + // For each current partial ordering, try extending with each sort option + for current in current_orderings.iter() { + for sort_expr in sort_options.iter() { + let mut extended = current.clone(); + extended.push(sort_expr.clone()); + + // Check if this partial ordering can potentially satisfy requirements + if let Some(lex) = LexOrdering::new(extended.clone()) { + if window_eq_properties.ordering_satisfy(lex.clone())? { + next_orderings.push(extended); + } + } + } + } + // If no valid orderings remain, stop early + if next_orderings.is_empty() { + break; + } + current_orderings = next_orderings; + } + + // Convert final orderings to LexOrdering and add to all_satisfied_lexs + for ordering in current_orderings { + if let Some(lex) = LexOrdering::new(ordering) { + all_satisfied_lexs.push(lex); + } } } // If there is a partitioning, and no possible ordering cannot satisfy @@ -410,8 +438,10 @@ pub(crate) fn window_equivalence_properties( // Window function results in a partial constant value in // some ordering. Adjust the ordering equivalences accordingly: let new_lexs = all_satisfied_lexs.into_iter().flat_map(|lex| { - let new_partial_consts = - sort_options_resolving_constant(Arc::clone(&window_col)); + let new_partial_consts = sort_options_resolving_constant( + Arc::clone(&window_col), + false, + ); new_partial_consts.into_iter().map(move |partial| { let mut existing = lex.clone(); @@ -471,7 +501,7 @@ pub(crate) fn window_equivalence_properties( .get_aggregate_expr() .expressions() .into_iter() - .map(sort_options_resolving_constant) + .map(|expr| sort_options_resolving_constant(expr, false)) .multi_cartesian_product(); let (mut asc, mut satisfied) = (false, false); @@ -634,11 +664,45 @@ pub fn get_window_mode( Ok(None) } -fn sort_options_resolving_constant(expr: Arc) -> Vec { - vec![ - PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), - PhysicalSortExpr::new(expr, SortOptions::new(true, true)), - ] +/// Generates sort option variations for a given expression. +/// +/// This function is used to handle constant columns in window operations. Since constant +/// columns can be considered as having any ordering, we generate multiple sort options +/// to explore different ordering possibilities. +/// +/// # Parameters +/// - `expr`: The physical expression to generate sort options for +/// - `all_options`: If true, generates all 4 possible sort options (ASC/DESC × NULLS FIRST/LAST). +/// If false, generates only 2 options that preserve set monotonicity. +/// +/// # When to use `all_options = true`: +/// Use for PARTITION BY columns where we want to explore all possible orderings to find +/// one that matches the existing data ordering. +/// +/// # When to use `all_options = false`: +/// Use for aggregate/window function arguments where set monotonicity needs to be preserved. +/// Only generates ASC NULLS LAST and DESC NULLS FIRST because: +/// - Set monotonicity is broken if data has increasing order but nulls come first +/// - Set monotonicity is broken if data has decreasing order but nulls come last +fn sort_options_resolving_constant( + expr: Arc, + all_options: bool, +) -> Vec { + if all_options { + // Generate all 4 possible sort options for partition columns + vec![ + PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST + PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, true)), // ASC NULLS FIRST + PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(true, false)), // DESC NULLS LAST + PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST + ] + } else { + // Generate only the 2 options that preserve set monotonicity + vec![ + PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST + PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST + ] + } } #[cfg(test)] From f30af51b9580ce35c183ca66d5a90944d5c4fa0d Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Sat, 20 Sep 2025 15:11:18 +0300 Subject: [PATCH 02/12] Update mod.rs --- datafusion/physical-plan/src/windows/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 1c1348701b984..92e46af61dd8a 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -673,7 +673,7 @@ pub fn get_window_mode( /// # Parameters /// - `expr`: The physical expression to generate sort options for /// - `all_options`: If true, generates all 4 possible sort options (ASC/DESC × NULLS FIRST/LAST). -/// If false, generates only 2 options that preserve set monotonicity. +/// If false, generates only 2 options that preserve set monotonicity. /// /// # When to use `all_options = true`: /// Use for PARTITION BY columns where we want to explore all possible orderings to find From 43f75920d849e57f4bf7530dd7d6fff8706032dc Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Sat, 20 Sep 2025 15:14:20 +0300 Subject: [PATCH 03/12] Update mod.rs --- datafusion/physical-plan/src/windows/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 92e46af61dd8a..38de176206cce 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -673,7 +673,7 @@ pub fn get_window_mode( /// # Parameters /// - `expr`: The physical expression to generate sort options for /// - `all_options`: If true, generates all 4 possible sort options (ASC/DESC × NULLS FIRST/LAST). -/// If false, generates only 2 options that preserve set monotonicity. +/// If false, generates only 2 options that preserve set monotonicity. /// /// # When to use `all_options = true`: /// Use for PARTITION BY columns where we want to explore all possible orderings to find From 43a323fa3124fbddc2412925bfc32aa250eedf6a Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Sun, 21 Sep 2025 21:29:47 +0300 Subject: [PATCH 04/12] Update mod.rs --- datafusion/physical-plan/src/windows/mod.rs | 43 +++++++++------------ 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 38de176206cce..0056103decf22 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -372,42 +372,37 @@ pub(crate) fn window_equivalence_properties( let partitioning_exprs = expr.partition_by(); let no_partitioning = partitioning_exprs.is_empty(); - // Use incremental approach to avoid O(4^n) exponential complexity: - // Instead of generating all combinations upfront via multi_cartesian_product, - // we build orderings incrementally and prune invalid paths early. + // Find "one" valid ordering for partition columns to avoid exponential complexity. let mut all_satisfied_lexs = vec![]; if !no_partitioning { - // Start with empty orderings that we'll extend incrementally - let mut current_orderings = vec![vec![]]; + // Find a single valid ordering using a greedy approach + let mut ordering = vec![]; for partition_expr in partitioning_exprs.iter() { - let mut next_orderings = vec![]; - let sort_options = sort_options_resolving_constant(Arc::clone(partition_expr), true); - // For each current partial ordering, try extending with each sort option - for current in current_orderings.iter() { - for sort_expr in sort_options.iter() { - let mut extended = current.clone(); - extended.push(sort_expr.clone()); - - // Check if this partial ordering can potentially satisfy requirements - if let Some(lex) = LexOrdering::new(extended.clone()) { - if window_eq_properties.ordering_satisfy(lex.clone())? { - next_orderings.push(extended); - } + // Try each sort option and pick the first one that works + let mut found = false; + for sort_expr in sort_options.iter() { + let mut candidate_ordering = ordering.clone(); + candidate_ordering.push(sort_expr.clone()); + + if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) { + if window_eq_properties.ordering_satisfy(lex)? { + ordering.push(sort_expr.clone()); + found = true; + break; } } } - // If no valid orderings remain, stop early - if next_orderings.is_empty() { + // If no sort option works for this column, we can't build a valid ordering + if !found { + ordering.clear(); break; } - current_orderings = next_orderings; } - - // Convert final orderings to LexOrdering and add to all_satisfied_lexs - for ordering in current_orderings { + // If we successfully built an ordering for all columns, use it + if ordering.len() == partitioning_exprs.len() { if let Some(lex) = LexOrdering::new(ordering) { all_satisfied_lexs.push(lex); } From c4bbecbbe423116dfdd2934cee0e8507ae74fccb Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 22 Sep 2025 14:43:18 +0200 Subject: [PATCH 05/12] tests copied from v1 pr --- datafusion/core/benches/sql_planner.rs | 3 +- datafusion/sqllogictest/test_files/window.slt | 36 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index c71191507fbc6..9ae6e1f57078e 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -476,7 +476,8 @@ fn criterion_benchmark(c: &mut Criterion) { }); }); - for partitioning_columns in [4, 7, 8] { + // It was observed in production that queries with window functions sometimes partition over more than 30 columns + for partitioning_columns in [4, 7, 8, 12, 30] { c.bench_function( &format!( "physical_window_function_partition_by_{partitioning_columns}_on_values" diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index c30258234490a..6162fce775ed6 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -6034,3 +6034,39 @@ LIMIT 5 0 2 NULL NULL 0 NULL NULL 0 3 NULL NULL 0 NULL NULL 0 4 NULL NULL 0 NULL NULL + +# regression test for https://github.com/apache/datafusion/issues/17401 +query I +WITH source AS ( + SELECT + 1 AS n, + '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, + '' AS a9, '' AS a10, '' AS a11, '' AS a12 +) +SELECT + sum(n) OVER (PARTITION BY + a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12 + ) +FROM source; +---- +1 + +# regression test for https://github.com/apache/datafusion/issues/17401 +query I +WITH source AS ( + SELECT + 1 AS n, + '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, + '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16, + '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24, + '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32, + '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40 +) +SELECT + sum(n) OVER (PARTITION BY + a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, + a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 + ) +FROM source; +---- +1 From e3908360ea61b3e8817fcc583d651624fb52b2b8 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 22 Sep 2025 16:32:16 +0200 Subject: [PATCH 06/12] test case from review comment https://github.com/apache/datafusion/pull/17684#discussion_r2366146307 --- datafusion/sqllogictest/test_files/window.slt | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 6162fce775ed6..b99bc7f1dbefb 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -6070,3 +6070,27 @@ SELECT FROM source; ---- 1 + +# regression test for https://github.com/apache/datafusion/issues/17401 +query I +WITH source AS ( + SELECT + 1 AS n, + '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, + '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16, + '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24, + '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32, + '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40 +) +SELECT + sum(n) OVER (PARTITION BY + a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, + a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 + ) +FROM ( + SELECT * FROM source + ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, + a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 +); +---- +1 From bc56376c74df8f4b9c19ba39aba95e0e5bc08da9 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 22 Sep 2025 16:39:23 +0200 Subject: [PATCH 07/12] one more test case --- datafusion/sqllogictest/test_files/window.slt | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index b99bc7f1dbefb..e81662a753190 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -6094,3 +6094,32 @@ FROM ( ); ---- 1 + +# regression test for https://github.com/apache/datafusion/issues/17401 +query I +WITH source AS ( + SELECT + 1 AS n, + '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, + '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16, + '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24, + '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32, + '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40 +) +SELECT + sum(n) OVER (PARTITION BY + a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, + a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 + ) +FROM ( + SELECT * FROM source + WHERE a1 = '' AND a2 = '' AND a3 = '' AND a4 = '' AND a5 = '' AND a6 = '' AND a7 = '' AND a8 = '' + AND a9 = '' AND a10 = '' AND a11 = '' AND a12 = '' AND a13 = '' AND a14 = '' AND a15 = '' AND a16 = '' + AND a17 = '' AND a18 = '' AND a19 = '' AND a20 = '' AND a21 = '' AND a22 = '' AND a23 = '' AND a24 = '' + AND a25 = '' AND a26 = '' AND a27 = '' AND a28 = '' AND a29 = '' AND a30 = '' AND a31 = '' AND a32 = '' + AND a33 = '' AND a34 = '' AND a35 = '' AND a36 = '' AND a37 = '' AND a38 = '' AND a39 = '' AND a40 = '' + ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, + a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 +); +---- +1 \ No newline at end of file From 0e9e236d9263faece9b28f1071a5de364bba0c0d Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 23 Sep 2025 17:54:41 +0300 Subject: [PATCH 08/12] Update mod.rs --- datafusion/physical-plan/src/windows/mod.rs | 106 ++++++++++++-------- 1 file changed, 64 insertions(+), 42 deletions(-) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 0056103decf22..b5f63315fab53 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -374,38 +374,39 @@ pub(crate) fn window_equivalence_properties( // Find "one" valid ordering for partition columns to avoid exponential complexity. let mut all_satisfied_lexs = vec![]; - if !no_partitioning { - // Find a single valid ordering using a greedy approach - let mut ordering = vec![]; - for partition_expr in partitioning_exprs.iter() { - let sort_options = - sort_options_resolving_constant(Arc::clone(partition_expr), true); - - // Try each sort option and pick the first one that works - let mut found = false; - for sort_expr in sort_options.iter() { - let mut candidate_ordering = ordering.clone(); - candidate_ordering.push(sort_expr.clone()); - - if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) { - if window_eq_properties.ordering_satisfy(lex)? { - ordering.push(sort_expr.clone()); - found = true; - break; - } + let mut candidate_ordering = vec![]; + + for partition_expr in partitioning_exprs.iter() { + let sort_options = + sort_options_resolving_constant(Arc::clone(partition_expr), true); + + // Try each sort option and pick the first one that works + let mut found = false; + for sort_expr in sort_options.iter() { + candidate_ordering.push(sort_expr.clone()); + if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) { + if window_eq_properties.ordering_satisfy(lex)? { + found = true; + break; } } - // If no sort option works for this column, we can't build a valid ordering - if !found { - ordering.clear(); - break; - } + // This option didn't work, remove it and try the next one + candidate_ordering.pop(); } - // If we successfully built an ordering for all columns, use it - if ordering.len() == partitioning_exprs.len() { - if let Some(lex) = LexOrdering::new(ordering) { - all_satisfied_lexs.push(lex); - } + // If no sort option works for this column, we can't build a valid ordering + if !found { + candidate_ordering.clear(); + break; + } + } + + // If we successfully built an ordering for all columns, use it + // When there are no partition expressions, candidate_ordering will be empty and won't be added + if candidate_ordering.len() == partitioning_exprs.len() + && !candidate_ordering.is_empty() + { + if let Some(lex) = LexOrdering::new(candidate_ordering) { + all_satisfied_lexs.push(lex); } } // If there is a partitioning, and no possible ordering cannot satisfy @@ -492,23 +493,44 @@ pub(crate) fn window_equivalence_properties( // utilize set-monotonicity since the set shrinks as the frame // boundary starts "touching" the end of the table. else if frame.is_causal() { - let args_all_lexs = sliding_expr - .get_aggregate_expr() - .expressions() - .into_iter() - .map(|expr| sort_options_resolving_constant(expr, false)) - .multi_cartesian_product(); - - let (mut asc, mut satisfied) = (false, false); - for order in args_all_lexs { - if let Some(f) = order.first() { - asc = !f.options.descending; + // Find one valid ordering for aggregate arguments instead of + // checking all combinations + let aggregate_exprs = sliding_expr.get_aggregate_expr().expressions(); + let mut candidate_order = vec![]; + let mut asc = false; + + for (idx, expr) in aggregate_exprs.iter().enumerate() { + let mut found = false; + let sort_options = + sort_options_resolving_constant(Arc::clone(expr), false); + + // Try each option and pick the first that works + for sort_expr in sort_options.iter() { + candidate_order.push(sort_expr.clone()); + + if let Some(lex) = LexOrdering::new(candidate_order.clone()) { + if window_eq_properties.ordering_satisfy(lex)? { + if idx == 0 { + asc = !sort_expr.options.descending; + } + found = true; + break; + } + } + // This option didn't work, remove it and try the next one + candidate_order.pop(); } - if window_eq_properties.ordering_satisfy(order)? { - satisfied = true; + + // If we couldn't extend the ordering, stop trying + if !found { break; } } + + // Check if we successfully built a complete ordering + let satisfied = candidate_order.len() == aggregate_exprs.len() + && !aggregate_exprs.is_empty(); + if satisfied { let increasing = set_monotonicity.eq(&SetMonotonicity::Increasing); From 2b164d5b1164c616a3271a8252e419a6dfb4d847 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Wed, 24 Sep 2025 16:02:25 +0300 Subject: [PATCH 09/12] Update datafusion/physical-plan/src/windows/mod.rs Co-authored-by: Andrew Lamb --- datafusion/physical-plan/src/windows/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index b5f63315fab53..81a8bea8a76e2 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -373,6 +373,7 @@ pub(crate) fn window_equivalence_properties( let no_partitioning = partitioning_exprs.is_empty(); // Find "one" valid ordering for partition columns to avoid exponential complexity. + // see https://github.com/apache/datafusion/issues/17401 let mut all_satisfied_lexs = vec![]; let mut candidate_ordering = vec![]; From b3543c00589ea7c4c088bd22fab40a624c4124b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Wed, 24 Sep 2025 16:02:43 +0300 Subject: [PATCH 10/12] Update datafusion/physical-plan/src/windows/mod.rs Co-authored-by: Andrew Lamb --- datafusion/physical-plan/src/windows/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 81a8bea8a76e2..46a39fb73d557 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -383,8 +383,8 @@ pub(crate) fn window_equivalence_properties( // Try each sort option and pick the first one that works let mut found = false; - for sort_expr in sort_options.iter() { - candidate_ordering.push(sort_expr.clone()); + for sort_expr in sort_options.into_iter() { + candidate_ordering.push(sort_expr); if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) { if window_eq_properties.ordering_satisfy(lex)? { found = true; From 45793433c3cb92406c4c5386ff4c3acd1e41ef52 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 24 Sep 2025 16:24:22 +0300 Subject: [PATCH 11/12] Update mod.rs --- datafusion/physical-plan/src/windows/mod.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index b5f63315fab53..27fd0c7322709 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -402,9 +402,7 @@ pub(crate) fn window_equivalence_properties( // If we successfully built an ordering for all columns, use it // When there are no partition expressions, candidate_ordering will be empty and won't be added - if candidate_ordering.len() == partitioning_exprs.len() - && !candidate_ordering.is_empty() - { + if candidate_ordering.len() == partitioning_exprs.len() { if let Some(lex) = LexOrdering::new(candidate_ordering) { all_satisfied_lexs.push(lex); } @@ -505,13 +503,21 @@ pub(crate) fn window_equivalence_properties( sort_options_resolving_constant(Arc::clone(expr), false); // Try each option and pick the first that works - for sort_expr in sort_options.iter() { - candidate_order.push(sort_expr.clone()); + for sort_expr in sort_options.into_iter() { + let is_asc = !sort_expr.options.descending; + candidate_order.push(sort_expr); if let Some(lex) = LexOrdering::new(candidate_order.clone()) { if window_eq_properties.ordering_satisfy(lex)? { if idx == 0 { - asc = !sort_expr.options.descending; + // The first column's ordering direction determines the overall + // monotonicity behavior of the window result. + // - If the aggregate has increasing set monotonicity (e.g., MAX, COUNT) + // and the first arg is ascending, the window result is increasing + // - If the aggregate has decreasing set monotonicity (e.g., MIN) + // and the first arg is ascending, the window result is also increasing + // This flag is used to determine the final window column ordering. + asc = is_asc; } found = true; break; From e94a4b74759240e9162522ca88ad4b82dffa7073 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Thu, 25 Sep 2025 10:55:48 +0300 Subject: [PATCH 12/12] Update mod.rs --- datafusion/physical-plan/src/windows/mod.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 723ec55a94d83..ddc2bfa10ea7e 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -696,34 +696,34 @@ pub fn get_window_mode( /// /// # Parameters /// - `expr`: The physical expression to generate sort options for -/// - `all_options`: If true, generates all 4 possible sort options (ASC/DESC × NULLS FIRST/LAST). -/// If false, generates only 2 options that preserve set monotonicity. +/// - `only_monotonic`: If false, generates all 4 possible sort options (ASC/DESC × NULLS FIRST/LAST). +/// If true, generates only 2 options that preserve set monotonicity. /// -/// # When to use `all_options = true`: +/// # When to use `only_monotonic = false`: /// Use for PARTITION BY columns where we want to explore all possible orderings to find /// one that matches the existing data ordering. /// -/// # When to use `all_options = false`: +/// # When to use `only_monotonic = true`: /// Use for aggregate/window function arguments where set monotonicity needs to be preserved. /// Only generates ASC NULLS LAST and DESC NULLS FIRST because: /// - Set monotonicity is broken if data has increasing order but nulls come first /// - Set monotonicity is broken if data has decreasing order but nulls come last fn sort_options_resolving_constant( expr: Arc, - all_options: bool, + only_monotonic: bool, ) -> Vec { - if all_options { - // Generate all 4 possible sort options for partition columns + if only_monotonic { + // Generate only the 2 options that preserve set monotonicity vec![ PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST - PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, true)), // ASC NULLS FIRST - PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(true, false)), // DESC NULLS LAST PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST ] } else { - // Generate only the 2 options that preserve set monotonicity + // Generate all 4 possible sort options for partition columns vec![ PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST + PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, true)), // ASC NULLS FIRST + PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(true, false)), // DESC NULLS LAST PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST ] }