diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 9eba86929062f..9ae48ee1d64f1 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -1068,8 +1068,8 @@ mod tests { lit, ApproxDistinct, Column, Count, Median, }; use datafusion_physical_expr::{ - AggregateExpr, EquivalenceProperties, OrderedColumn, - OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr, + AggregateExpr, EquivalenceProperties, OrderingEquivalenceProperties, + PhysicalExpr, PhysicalSortExpr, }; use futures::{FutureExt, Stream}; use std::any::Any; @@ -1726,8 +1726,14 @@ mod tests { eq_properties.add_equal_conditions((&col_a, &col_b)); let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), options1)], - &vec![OrderedColumn::new(col_c.clone(), options2)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()) as _, + options: options1, + }], + &vec![PhysicalSortExpr { + expr: Arc::new(col_c.clone()) as _, + options: options2, + }], )); let order_by_exprs = vec![ diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 155d79e7e86c1..deff619b4ffe4 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -588,26 +588,10 @@ pub fn ordering_equivalence_properties_helper( // Return an empty OrderingEquivalenceProperties: return oep; }; - let first_column = first_ordering - .iter() - .map(|e| TryFrom::try_from(e.clone())) - .collect::>>(); - let checked_column_first = if let Ok(first) = first_column { - first - } else { - // Return an empty OrderingEquivalenceProperties: - return oep; - }; // First entry among eq_orderings is the head, skip it: for ordering in eq_orderings.iter().skip(1) { - let column = ordering - .iter() - .map(|e| TryFrom::try_from(e.clone())) - .collect::>>(); - if let Ok(column) = column { - if !column.is_empty() { - oep.add_equal_conditions((&checked_column_first, &column)) - } + if !ordering.is_empty() { + oep.add_equal_conditions((first_ordering, ordering)) } } oep diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index d7eedf7f18ad8..f773f3b54953c 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -53,9 +53,7 @@ use datafusion_physical_expr::utils::{convert_to_expr, get_indices_of_matching_e pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; -use datafusion_physical_expr::{ - OrderedColumn, OrderingEquivalenceProperties, PhysicalSortRequirement, -}; +use datafusion_physical_expr::{OrderingEquivalenceProperties, PhysicalSortRequirement}; pub use window_agg_exec::WindowAggExec; /// Create a physical expression for window function @@ -270,14 +268,17 @@ pub(crate) fn window_ordering_equivalence( .is::() { if let Some((idx, field)) = - schema.column_with_name(expr.field().unwrap().name()) + schema.column_with_name(builtin_window_expr.name()) { let column = Column::new(field.name(), idx); let options = SortOptions { descending: false, nulls_first: false, }; // ASC, NULLS LAST - let rhs = OrderedColumn::new(column, options); + let rhs = PhysicalSortExpr { + expr: Arc::new(column) as _, + options, + }; builder.add_equal_conditions(vec![rhs]); } } diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index f1280fa688c5b..c0b861fd8a10b 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -2405,6 +2405,30 @@ GlobalLimitExec: skip=0, fetch=5 ------SortExec: expr=[c9@0 DESC] --------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true +# This test shows that ordering equivalence can keep track of complex expressions (not just Column expressions) +# during ordering satisfy analysis. In the final plan we should only see single SortExec. +query TT +EXPLAIN SELECT c5, c9, rn1 FROM (SELECT c5, c9, + ROW_NUMBER() OVER(ORDER BY c9 + c5 DESC) as rn1 + FROM aggregate_test_100 + ORDER BY c9 + c5 DESC) + ORDER BY rn1, c9 + c5 DESC + LIMIT 5 +---- +logical_plan +Limit: skip=0, fetch=5 +--Sort: rn1 ASC NULLS LAST, CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST, fetch=5 +----Sort: CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST +------Projection: aggregate_test_100.c5, aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1 +--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +----------TableScan: aggregate_test_100 projection=[c5, c9] +physical_plan +GlobalLimitExec: skip=0, fetch=5 +--ProjectionExec: expr=[c5@0 as c5, c9@1 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rn1] +----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] +------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC] +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5, c9], has_header=true + # The following query has type error. We should test the error could be detected # from either the logical plan (when `skip_failed_rules` is set to `false`) or # the physical plan (when `skip_failed_rules` is set to `true`). diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 83f26a1d06d98..78279851bba5e 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -15,17 +15,15 @@ // specific language governing permissions and limitations // under the License. -use crate::expressions::Column; +use crate::expressions::{BinaryExpr, Column}; use crate::{ - normalize_expr_with_equivalence_properties, PhysicalSortExpr, PhysicalSortRequirement, + normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr, + PhysicalSortExpr, }; use arrow::datatypes::SchemaRef; -use arrow_schema::SortOptions; -use datafusion_common::DataFusionError; -use std::collections::{HashMap, HashSet}; -use std::hash::Hash; +use std::collections::HashMap; use std::sync::Arc; /// Represents a collection of [`EquivalentClass`] (equivalences @@ -34,14 +32,14 @@ use std::sync::Arc; /// This is used to represent both: /// /// 1. Equality conditions (like `A=B`), when `T` = [`Column`] -/// 2. Ordering (like `A ASC = B ASC`), when `T` = [`OrderedColumn`] +/// 2. Ordering (like `A ASC = B ASC`), when `T` = [`PhysicalSortExpr`] #[derive(Debug, Clone)] pub struct EquivalenceProperties { classes: Vec>, schema: SchemaRef, } -impl EquivalenceProperties { +impl EquivalenceProperties { pub fn new(schema: SchemaRef) -> Self { EquivalenceProperties { classes: vec![], @@ -115,6 +113,53 @@ impl EquivalenceProperties { } } +/// Remove duplicates inside the `in_data` vector, returned vector would consist of unique entries +fn deduplicate_vector(in_data: Vec) -> Vec { + let mut result = vec![]; + for elem in in_data { + if !result.contains(&elem) { + result.push(elem); + } + } + result +} + +/// Find the position of `entry` inside `in_data`, if `entry` is not found return `None`. +fn get_entry_position(in_data: &[T], entry: &T) -> Option { + in_data.iter().position(|item| item.eq(entry)) +} + +/// Remove `entry` for the `in_data`, returns `true` if removal is successful (e.g `entry` is indeed in the `in_data`) +/// Otherwise return `false` +fn remove_from_vec(in_data: &mut Vec, entry: &T) -> bool { + if let Some(idx) = get_entry_position(in_data, entry) { + in_data.remove(idx); + true + } else { + false + } +} + +// Helper function to calculate column info recursively +fn get_column_indices_helper( + indices: &mut Vec<(usize, String)>, + expr: &Arc, +) { + if let Some(col) = expr.as_any().downcast_ref::() { + indices.push((col.index(), col.name().to_string())) + } else if let Some(binary_expr) = expr.as_any().downcast_ref::() { + get_column_indices_helper(indices, binary_expr.left()); + get_column_indices_helper(indices, binary_expr.right()); + }; +} + +/// Get index and name of each column that is in the expression (Can return multiple entries for `BinaryExpr`s) +fn get_column_indices(expr: &Arc) -> Vec<(usize, String)> { + let mut result = vec![]; + get_column_indices_helper(&mut result, expr); + result +} + /// `OrderingEquivalenceProperties` keeps track of columns that describe the /// global ordering of the schema. These columns are not necessarily same; e.g. /// ```text @@ -130,34 +175,32 @@ impl EquivalenceProperties { /// where both `a ASC` and `b DESC` can describe the table ordering. With /// `OrderingEquivalenceProperties`, we can keep track of these equivalences /// and treat `a ASC` and `b DESC` as the same ordering requirement. -pub type OrderingEquivalenceProperties = EquivalenceProperties>; +pub type OrderingEquivalenceProperties = EquivalenceProperties; -/// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are known +/// EquivalentClass is a set of [`Column`]s or [`PhysicalSortExpr`]s that are known /// to have the same value in all tuples in a relation. `EquivalentClass` /// is generated by equality predicates, typically equijoin conditions and equality -/// conditions in filters. `EquivalentClass` is generated by the +/// conditions in filters. `EquivalentClass` is generated by the /// `ROW_NUMBER` window function. #[derive(Debug, Clone)] pub struct EquivalentClass { /// First element in the EquivalentClass head: T, /// Other equal columns - others: HashSet, + others: Vec, } -impl EquivalentClass { +impl EquivalentClass { pub fn new(head: T, others: Vec) -> EquivalentClass { - EquivalentClass { - head, - others: HashSet::from_iter(others), - } + let others = deduplicate_vector(others); + EquivalentClass { head, others } } pub fn head(&self) -> &T { &self.head } - pub fn others(&self) -> &HashSet { + pub fn others(&self) -> &[T] { &self.others } @@ -166,15 +209,21 @@ impl EquivalentClass { } pub fn insert(&mut self, col: T) -> bool { - self.head != col && self.others.insert(col) + if self.head != col && !self.others.contains(&col) { + self.others.push(col); + true + } else { + false + } } pub fn remove(&mut self, col: &T) -> bool { - let removed = self.others.remove(col); + let removed = remove_from_vec(&mut self.others, col); + // If we are removing the head, shift others so that its first entry becomes the new head. if !removed && *col == self.head { - let one_col = self.others.iter().next().cloned(); + let one_col = self.others.first().cloned(); if let Some(col) = one_col { - let removed = self.others.remove(&col); + let removed = remove_from_vec(&mut self.others, &col); self.head = col; removed } else { @@ -198,58 +247,7 @@ impl EquivalentClass { } } -/// This object represents a [`Column`] with a definite ordering, for -/// example `A ASC` and is used to represent equivalent orderings in -/// the optimizer. -#[derive(Debug, Hash, PartialEq, Eq, Clone)] -pub struct OrderedColumn { - pub col: Column, - pub options: SortOptions, -} - -impl OrderedColumn { - pub fn new(col: Column, options: SortOptions) -> Self { - Self { col, options } - } -} - -impl From for PhysicalSortExpr { - fn from(value: OrderedColumn) -> Self { - PhysicalSortExpr { - expr: Arc::new(value.col) as _, - options: value.options, - } - } -} - -impl TryFrom for OrderedColumn { - type Error = DataFusionError; - - fn try_from(value: PhysicalSortExpr) -> Result { - if let Some(col) = value.expr.as_any().downcast_ref::() { - Ok(OrderedColumn { - col: col.clone(), - options: value.options, - }) - } else { - Err(DataFusionError::NotImplemented( - "Only Column PhysicalSortExpr's can be downcasted to OrderedColumn yet" - .to_string(), - )) - } - } -} - -impl From for PhysicalSortRequirement { - fn from(value: OrderedColumn) -> Self { - PhysicalSortRequirement { - expr: Arc::new(value.col) as _, - options: Some(value.options), - } - } -} - -/// `Vec` stores the lexicographical ordering for a schema. +/// `LexOrdering` stores the lexicographical ordering for a schema. /// OrderingEquivalentClass keeps track of different alternative orderings than can /// describe the schema. /// For instance, for the table below @@ -260,7 +258,7 @@ impl From for PhysicalSortRequirement { /// |3|2|1|3| /// both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the ordering of the table. /// For this case, we say that `vec![a ASC, b ASC]`, and `vec![c DESC, d ASC]` are ordering equivalent. -pub type OrderingEquivalentClass = EquivalentClass>; +pub type OrderingEquivalentClass = EquivalentClass; impl OrderingEquivalentClass { /// This function extends ordering equivalences with alias information. @@ -269,15 +267,17 @@ impl OrderingEquivalentClass { /// since b is alias of colum a. After this function (a ASC), (c DESC), (b ASC) would be ordering equivalent. fn update_with_aliases(&mut self, columns_map: &HashMap>) { for (column, columns) in columns_map { + let col_expr = Arc::new(column.clone()) as Arc; let mut to_insert = vec![]; for ordering in std::iter::once(&self.head).chain(self.others.iter()) { for (idx, item) in ordering.iter().enumerate() { - if item.col.eq(column) { + if item.expr.eq(&col_expr) { for col in columns { + let col_expr = Arc::new(col.clone()) as Arc; let mut normalized = self.head.clone(); // Change the corresponding entry in the head with the alias column: let entry = &mut normalized[idx]; - (entry.col, entry.options) = (col.clone(), item.options); + (entry.expr, entry.options) = (col_expr, item.options); to_insert.push(normalized); } } @@ -333,7 +333,10 @@ impl OrderingEquivalenceBuilder { self } - pub fn add_equal_conditions(&mut self, new_equivalent_ordering: Vec) { + pub fn add_equal_conditions( + &mut self, + new_equivalent_ordering: Vec, + ) { let mut normalized_out_ordering = vec![]; for item in &self.existing_ordering { // To account for ordering equivalences, first normalize the expression: @@ -341,14 +344,10 @@ impl OrderingEquivalenceBuilder { item.expr.clone(), self.eq_properties.classes(), ); - // Currently we only support ordering equivalences for `Column` expressions. - // TODO: Add support for ordering equivalence for all `PhysicalExpr`s. - if let Some(column) = normalized.as_any().downcast_ref::() { - normalized_out_ordering - .push(OrderedColumn::new(column.clone(), item.options)); - } else { - break; - } + normalized_out_ordering.push(PhysicalSortExpr { + expr: normalized, + options: item.options, + }); } // If there is an existing ordering, add new ordering as an equivalence: if !normalized_out_ordering.is_empty() { @@ -433,18 +432,22 @@ pub fn project_ordering_equivalence_properties( let schema = output_eq.schema(); let fields = schema.fields(); for class in eq_classes.iter_mut() { - let columns_to_remove = class + let sort_exprs_to_remove = class .iter() - .filter(|columns| { - columns.iter().any(|column| { - let idx = column.col.index(); - idx >= fields.len() || fields[idx].name() != column.col.name() + .filter(|sort_exprs| { + sort_exprs.iter().any(|sort_expr| { + let col_infos = get_column_indices(&sort_expr.expr); + // If any one of the columns, used in Expression is invalid, remove expression + // from ordering equivalences + col_infos.into_iter().any(|(idx, name)| { + idx >= fields.len() || fields[idx].name() != &name + }) }) }) .cloned() .collect::>(); - for column in columns_to_remove { - class.remove(&column); + for sort_exprs in sort_exprs_to_remove { + class.remove(&sort_exprs); } } eq_classes.retain(|props| props.len() > 1); @@ -459,6 +462,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::Result; + use datafusion_expr::Operator; use std::sync::Arc; #[test] @@ -551,4 +555,52 @@ mod tests { Ok(()) } + + #[test] + fn test_deduplicate_vector() -> Result<()> { + assert_eq!(deduplicate_vector(vec![1, 1, 2, 3, 3]), vec![1, 2, 3]); + assert_eq!( + deduplicate_vector(vec![1, 2, 3, 4, 3, 2, 1, 0]), + vec![1, 2, 3, 4, 0] + ); + Ok(()) + } + + #[test] + fn test_get_entry_position() -> Result<()> { + assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &2), Some(2)); + assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &1), Some(0)); + assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &5), None); + Ok(()) + } + + #[test] + fn test_remove_from_vec() -> Result<()> { + let mut in_data = vec![1, 1, 2, 3, 3]; + remove_from_vec(&mut in_data, &5); + assert_eq!(in_data, vec![1, 1, 2, 3, 3]); + remove_from_vec(&mut in_data, &2); + assert_eq!(in_data, vec![1, 1, 3, 3]); + remove_from_vec(&mut in_data, &2); + assert_eq!(in_data, vec![1, 1, 3, 3]); + remove_from_vec(&mut in_data, &3); + assert_eq!(in_data, vec![1, 1, 3]); + remove_from_vec(&mut in_data, &3); + assert_eq!(in_data, vec![1, 1]); + Ok(()) + } + + #[test] + fn test_get_column_infos() -> Result<()> { + let expr1 = Arc::new(Column::new("col1", 2)) as _; + assert_eq!(get_column_indices(&expr1), vec![(2, "col1".to_string())]); + let expr2 = Arc::new(Column::new("col2", 5)) as _; + assert_eq!(get_column_indices(&expr2), vec![(5, "col2".to_string())]); + let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as _; + assert_eq!( + get_column_indices(&expr3), + vec![(2, "col1".to_string()), (5, "col2".to_string())] + ); + Ok(()) + } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index b54bcda601c74..64ffa134c20e3 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -50,7 +50,7 @@ pub use aggregate::AggregateExpr; pub use datafusion_common::from_slice; pub use equivalence::{ project_equivalence_properties, project_ordering_equivalence_properties, - EquivalenceProperties, EquivalentClass, OrderedColumn, OrderingEquivalenceProperties, + EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties, OrderingEquivalentClass, }; pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef}; diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index d9009ca31ec26..b7404c367948b 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -683,7 +683,7 @@ pub fn reassign_predicate_columns( mod tests { use super::*; use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal}; - use crate::{OrderedColumn, PhysicalSortExpr}; + use crate::PhysicalSortExpr; use arrow::compute::SortOptions; use datafusion_common::{Result, ScalarValue}; use std::fmt::{Display, Formatter}; @@ -771,17 +771,35 @@ mod tests { let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema.clone()); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), option1)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()), + options: option1, + }], &vec![ - OrderedColumn::new(col_d.clone(), option1), - OrderedColumn::new(col_b.clone(), option1), + PhysicalSortExpr { + expr: Arc::new(col_d.clone()), + options: option1, + }, + PhysicalSortExpr { + expr: Arc::new(col_b.clone()), + options: option1, + }, ], )); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), option1)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()), + options: option1, + }], &vec![ - OrderedColumn::new(col_e.clone(), option2), - OrderedColumn::new(col_b.clone(), option1), + PhysicalSortExpr { + expr: Arc::new(col_e.clone()), + options: option2, + }, + PhysicalSortExpr { + expr: Arc::new(col_b.clone()), + options: option1, + }, ], )); Ok((test_schema, eq_properties, ordering_eq_properties)) @@ -1288,8 +1306,14 @@ mod tests { // Column a and e are ordering equivalent (e.g global ordering of the table can be described both as a ASC and e ASC.) let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), option1)], - &vec![OrderedColumn::new(col_e.clone(), option1)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()), + options: option1, + }], + &vec![PhysicalSortExpr { + expr: Arc::new(col_e.clone()), + options: option1, + }], )); let sort_req_a = PhysicalSortExpr { expr: Arc::new((col_a).clone()) as _,