From 86871a8480e7fcfcc18dea90e842e5e325896ae5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 3 Dec 2023 13:03:57 -0800 Subject: [PATCH 01/10] fix: RANGE frame can be regularized to ROWS frame only if empty ORDER BY clause --- datafusion/expr/src/window_frame.rs | 16 ++++--- datafusion/sqllogictest/test_files/window.slt | 44 ++++++++++++++++--- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/datafusion/expr/src/window_frame.rs b/datafusion/expr/src/window_frame.rs index 5f161b85dd9ac..978f7b508d14c 100644 --- a/datafusion/expr/src/window_frame.rs +++ b/datafusion/expr/src/window_frame.rs @@ -148,18 +148,20 @@ impl WindowFrame { pub fn regularize(mut frame: WindowFrame, order_bys: usize) -> Result { if frame.units == WindowFrameUnits::Range && order_bys != 1 { // Normally, RANGE frames require an ORDER BY clause with exactly one - // column. However, an ORDER BY clause may be absent in two edge cases. + // column. However, an ORDER BY clause may be absent in two edge cases: + // 1. start bound is UNBOUNDED or CURRENT ROW + // 2. end bound is CURRENT ROW or UNBOUNDED. + // In these cases, we regularize the RANGE frame to be equivalent to a ROWS + // frame with the UNBOUNDED bounds. if (frame.start_bound.is_unbounded() || frame.start_bound == WindowFrameBound::CurrentRow) && (frame.end_bound == WindowFrameBound::CurrentRow || frame.end_bound.is_unbounded()) + && order_bys == 0 { - if order_bys == 0 { - frame.units = WindowFrameUnits::Rows; - frame.start_bound = - WindowFrameBound::Preceding(ScalarValue::UInt64(None)); - frame.end_bound = WindowFrameBound::Following(ScalarValue::UInt64(None)); - } + frame.units = WindowFrameUnits::Rows; + frame.start_bound = WindowFrameBound::Preceding(ScalarValue::UInt64(None)); + frame.end_bound = WindowFrameBound::Following(ScalarValue::UInt64(None)); } else { plan_err!("RANGE requires exactly one ORDER BY column")? } diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index bb6ca119480d5..ee54476185066 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -1080,7 +1080,7 @@ SELECT 794 95 95 #fn test_window_range_equivalent_frames -query IIIIIII +query error DataFusion error: Error during planning: RANGE requires exactly one ORDER BY column SELECT c9, COUNT(*) OVER(ORDER BY c9, c1 RANGE BETWEEN CURRENT ROW AND CURRENT ROW) AS cnt1, @@ -1092,12 +1092,22 @@ SELECT FROM aggregate_test_100 ORDER BY c9 LIMIT 5 + +query IIII +SELECT + c9, + COUNT(*) OVER(RANGE BETWEEN CURRENT ROW AND CURRENT ROW) AS cnt4, + COUNT(*) OVER(RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cnt5, + COUNT(*) OVER(RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS cnt6 + FROM aggregate_test_100 + ORDER BY c9 + LIMIT 5 ---- -28774375 1 1 1 100 100 100 -63044568 1 2 1 100 100 100 -141047417 1 3 1 100 100 100 -141680161 1 4 1 100 100 100 -145294611 1 5 1 100 100 100 +28774375 100 100 100 +63044568 100 100 100 +141047417 100 100 100 +141680161 100 100 100 +145294611 100 100 100 #fn test_window_cume_dist query IRR @@ -3727,3 +3737,25 @@ FROM score_board s statement ok DROP TABLE score_board; + +# RANGE frame with/without (multiple) ORDER BY +query error DataFusion error: Error during planning: RANGE requires exactly one ORDER BY column +select a, + rank() over (partition by a order by a, a + 1 RANGE UNBOUNDED PRECEDING) rnk + from (select 1 a union all select 2 a) q + +query II +select a, + rank() over (partition by a order by a RANGE UNBOUNDED PRECEDING) rnk + from (select 1 a union all select 2 a) q +---- +1 1 +2 1 + +query II +select a, + rank() over (partition by a RANGE UNBOUNDED PRECEDING) rnk + from (select 1 a union all select 2 a) q +---- +1 1 +2 1 From ab0b4866989af479101f834f00b2ee526fb644a5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 3 Dec 2023 14:52:33 -0800 Subject: [PATCH 02/10] Fix flaky test --- datafusion/sqllogictest/test_files/window.slt | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index ee54476185066..5db2836d84ad5 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3742,20 +3742,18 @@ DROP TABLE score_board; query error DataFusion error: Error during planning: RANGE requires exactly one ORDER BY column select a, rank() over (partition by a order by a, a + 1 RANGE UNBOUNDED PRECEDING) rnk - from (select 1 a union all select 2 a) q + from (select 1 a) q query II select a, rank() over (partition by a order by a RANGE UNBOUNDED PRECEDING) rnk - from (select 1 a union all select 2 a) q + from (select 1 a) q ---- 1 1 -2 1 query II select a, rank() over (partition by a RANGE UNBOUNDED PRECEDING) rnk - from (select 1 a union all select 2 a) q + from (select 1 a) q ---- 1 1 -2 1 From f12b9e615a7ded5f440420b6a6fe7eb90dc55b30 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 3 Dec 2023 16:12:11 -0800 Subject: [PATCH 03/10] Update test comment --- datafusion/sqllogictest/test_files/window.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 5db2836d84ad5..7b1f4aecb9ab1 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3738,7 +3738,7 @@ FROM score_board s statement ok DROP TABLE score_board; -# RANGE frame with/without (multiple) ORDER BY +# RANGE frame can be regularized to ROWS frame only if empty ORDER BY clause query error DataFusion error: Error during planning: RANGE requires exactly one ORDER BY column select a, rank() over (partition by a order by a, a + 1 RANGE UNBOUNDED PRECEDING) rnk From 6926b2fb0efbd85d06d194e210c87471fa9c907d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 3 Dec 2023 16:12:56 -0800 Subject: [PATCH 04/10] Add code comment --- datafusion/expr/src/window_frame.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/expr/src/window_frame.rs b/datafusion/expr/src/window_frame.rs index 978f7b508d14c..575d4d766b0ac 100644 --- a/datafusion/expr/src/window_frame.rs +++ b/datafusion/expr/src/window_frame.rs @@ -153,6 +153,7 @@ pub fn regularize(mut frame: WindowFrame, order_bys: usize) -> Result Date: Sun, 3 Dec 2023 21:43:18 -0800 Subject: [PATCH 05/10] Update --- datafusion/expr/src/window_frame.rs | 17 +++-- datafusion/sqllogictest/test_files/window.slt | 74 +++++++++++++------ 2 files changed, 63 insertions(+), 28 deletions(-) diff --git a/datafusion/expr/src/window_frame.rs b/datafusion/expr/src/window_frame.rs index 575d4d766b0ac..2a64f21b856b2 100644 --- a/datafusion/expr/src/window_frame.rs +++ b/datafusion/expr/src/window_frame.rs @@ -148,7 +148,8 @@ impl WindowFrame { pub fn regularize(mut frame: WindowFrame, order_bys: usize) -> Result { if frame.units == WindowFrameUnits::Range && order_bys != 1 { // Normally, RANGE frames require an ORDER BY clause with exactly one - // column. However, an ORDER BY clause may be absent in two edge cases: + // column. However, an ORDER BY clause may be absent or present but with + // more than one column in two edge cases: // 1. start bound is UNBOUNDED or CURRENT ROW // 2. end bound is CURRENT ROW or UNBOUNDED. // In these cases, we regularize the RANGE frame to be equivalent to a ROWS @@ -158,11 +159,17 @@ pub fn regularize(mut frame: WindowFrame, order_bys: usize) -> Result Date: Mon, 4 Dec 2023 15:34:19 -0800 Subject: [PATCH 06/10] fix: Literal in window definition should not refer to relation column --- datafusion/physical-expr/src/sort_expr.rs | 6 +----- .../src/windows/bounded_window_agg_exec.rs | 2 +- datafusion/sql/src/expr/function.rs | 10 +++++++--- datafusion/sql/src/expr/mod.rs | 7 ++++++- datafusion/sql/src/expr/order_by.rs | 3 ++- datafusion/sql/src/query.rs | 2 +- datafusion/sql/src/statement.rs | 3 ++- datafusion/sqllogictest/test_files/window.slt | 12 ++++++------ 8 files changed, 26 insertions(+), 19 deletions(-) diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index 664a6b65b7f7b..8ca798ffa0e5a 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -65,11 +65,7 @@ impl PhysicalSortExpr { let value_to_sort = self.expr.evaluate(batch)?; let array_to_sort = match value_to_sort { ColumnarValue::Array(array) => array, - ColumnarValue::Scalar(scalar) => { - return exec_err!( - "Sort operation is not applicable to scalar value {scalar}" - ); - } + ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(batch.num_rows())?, }; Ok(SortColumn { values: array_to_sort, diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 8156ab1fa31b7..36c8550be8b08 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -586,7 +586,7 @@ impl LinearSearch { .map(|item| match item.evaluate(record_batch)? { ColumnarValue::Array(array) => Ok(array), ColumnarValue::Scalar(scalar) => { - plan_err!("Sort operation is not applicable to scalar value {scalar}") + scalar.to_array_of_size(record_batch.num_rows()) } }) .collect() diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 958e038798426..322af9098ae91 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -92,8 +92,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .into_iter() .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context)) .collect::>>()?; - let order_by = - self.order_by_to_sort_expr(&window.order_by, schema, planner_context)?; + let order_by = self.order_by_to_sort_expr( + &window.order_by, + schema, + planner_context, + false, + )?; let window_frame = window .window_frame .as_ref() @@ -143,7 +147,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // next, aggregate built-ins if let Ok(fun) = AggregateFunction::from_str(&name) { let order_by = - self.order_by_to_sort_expr(&order_by, schema, planner_context)?; + self.order_by_to_sort_expr(&order_by, schema, planner_context, true)?; let order_by = (!order_by.is_empty()).then_some(order_by); let args = self.function_args_to_expr(args, schema, planner_context)?; let filter: Option> = filter diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index b8c130055a5ac..435fe881bfe68 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -555,7 +555,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } = array_agg; let order_by = if let Some(order_by) = order_by { - Some(self.order_by_to_sort_expr(&order_by, input_schema, planner_context)?) + Some(self.order_by_to_sort_expr( + &order_by, + input_schema, + planner_context, + true, + )?) } else { None }; diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs index 1dccc2376f0b1..ff4b4d427bd38 100644 --- a/datafusion/sql/src/expr/order_by.rs +++ b/datafusion/sql/src/expr/order_by.rs @@ -30,6 +30,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { exprs: &[OrderByExpr], schema: &DFSchema, planner_context: &mut PlannerContext, + literal_to_column: bool, ) -> Result> { let mut expr_vec = vec![]; for e in exprs { @@ -40,7 +41,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } = e; let expr = match expr { - SQLExpr::Value(Value::Number(v, _)) => { + SQLExpr::Value(Value::Number(v, _)) if literal_to_column => { let field_index = v .parse::() .map_err(|err| plan_datafusion_err!("{}", err))?; diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 643f41d844858..dd4cab126261e 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -161,7 +161,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } let order_by_rex = - self.order_by_to_sort_expr(&order_by, plan.schema(), planner_context)?; + self.order_by_to_sort_expr(&order_by, plan.schema(), planner_context, true)?; if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan { // In case of `DISTINCT ON` we must capture the sort expressions since during the plan diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index aa2f0583cb99f..32350102feade 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -704,7 +704,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut all_results = vec![]; for expr in order_exprs { // Convert each OrderByExpr to a SortExpr: - let expr_vec = self.order_by_to_sort_expr(&expr, schema, planner_context)?; + let expr_vec = + self.order_by_to_sort_expr(&expr, schema, planner_context, true)?; // Verify that columns of all SortExprs exist in the schema: for expr in expr_vec.iter() { for column in expr.to_columns()?.iter() { diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index c0dcd4ae1ea57..0179431ac8adf 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3778,10 +3778,10 @@ query error DataFusion error: Arrow error: Invalid argument error: must either s select rank() over (RANGE between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk from (select 1 a union select 2 a) q; -# TODO: this is different to Postgres which returns [1, 1] for `rnk`. -query I -select rank() over (order by 1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk - from (select 1 a union select 2 a) q ORDER BY rnk +query II +select a, + rank() over (order by 1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk + from (select 1 a union select 2 a) q ORDER BY a ---- -1 -2 +1 1 +2 1 From da3e46724e8676ed9bf7dbbbcf0733cd29ea7d12 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 4 Dec 2023 15:38:55 -0800 Subject: [PATCH 07/10] Remove unused import --- datafusion/physical-expr/src/sort_expr.rs | 2 +- datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index 8ca798ffa0e5a..914d76f9261a1 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -26,7 +26,7 @@ use crate::PhysicalExpr; use arrow::compute::kernels::sort::{SortColumn, SortOptions}; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; -use datafusion_common::{exec_err, DataFusionError, Result}; +use datafusion_common::Result; use datafusion_expr::ColumnarValue; /// Represents Sort operation for a column in a RecordBatch diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 36c8550be8b08..b4ed75f6d3974 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -50,7 +50,7 @@ use datafusion_common::utils::{ evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices, get_record_batch_at_indices, get_row_at_idx, }; -use datafusion_common::{exec_err, plan_err, DataFusionError, Result}; +use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::window_state::{PartitionBatchState, WindowAggState}; use datafusion_expr::ColumnarValue; From a2c8d673bd6a4691cf3bd3379faf4df9ee3dcd0d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 5 Dec 2023 14:24:03 -0800 Subject: [PATCH 08/10] Update datafusion/sql/src/expr/function.rs Co-authored-by: Andrew Lamb --- datafusion/sql/src/expr/function.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 322af9098ae91..1e35bd190c763 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -96,7 +96,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { &window.order_by, schema, planner_context, - false, + // Numeric literals in window function ORDER BY are treated as constants + false, )?; let window_frame = window .window_frame From 1b9c50f79c070735e2e08965289cdbb80404777c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 5 Dec 2023 14:26:00 -0800 Subject: [PATCH 09/10] Add code comment --- datafusion/sql/src/expr/order_by.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs index ff4b4d427bd38..772255bd9773a 100644 --- a/datafusion/sql/src/expr/order_by.rs +++ b/datafusion/sql/src/expr/order_by.rs @@ -24,7 +24,11 @@ use datafusion_expr::Expr; use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value}; impl<'a, S: ContextProvider> SqlToRel<'a, S> { - /// convert sql [OrderByExpr] to `Vec` + /// Convert sql [OrderByExpr] to `Vec`. + /// + /// If `literal_to_column` is true, treat any numeric literals (e.g. `2`) as a 1 based index + /// into the SELECT list (e.g. `SELECT a, b FROM table ORDER BY 2`). + /// If false, interpret numeric literals as constant values. pub(crate) fn order_by_to_sort_expr( &self, exprs: &[OrderByExpr], From 48c20655f3c07126798269e10b5dc41b2a7c4a49 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 5 Dec 2023 16:53:11 -0800 Subject: [PATCH 10/10] Fix format --- datafusion/sql/src/expr/function.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 1e35bd190c763..14ea20c3fa5fc 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -97,7 +97,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { schema, planner_context, // Numeric literals in window function ORDER BY are treated as constants - false, + false, )?; let window_frame = window .window_frame