From 74ff631db5a5bfcca7179479d3732c94888a914a Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 14:24:19 +0300 Subject: [PATCH 1/9] remove hash dependency from EquivalenceProperties --- datafusion/physical-expr/src/equivalence.rs | 71 +++++++++++++++++---- 1 file changed, 58 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index a1f2df9208f07..a910de4c05581 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -21,7 +21,7 @@ use crate::{PhysicalSortExpr, PhysicalSortRequirement}; use arrow::datatypes::SchemaRef; use arrow_schema::SortOptions; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::hash::Hash; use std::sync::Arc; @@ -32,7 +32,7 @@ pub struct EquivalenceProperties { schema: SchemaRef, } -impl EquivalenceProperties { +impl EquivalenceProperties { pub fn new(schema: SchemaRef) -> Self { EquivalenceProperties { classes: vec![], @@ -104,6 +104,29 @@ impl EquivalenceProperties { } } +fn deduplicate_vector(in_data: Vec) -> Vec { + let mut result = vec![]; + for elem in in_data { + if !result.contains(&elem) { + result.push(elem); + } + } + result +} + +fn get_elem_position(in_data: &[T], elem: &T) -> Option { + in_data.iter().position(|item| item.eq(elem)) +} + +fn remove_from_vec(in_data: &mut Vec, elem: &T) -> bool { + if let Some(idx) = get_elem_position(in_data, elem) { + in_data.remove(idx); + true + } else { + false + } +} + /// `OrderingEquivalenceProperties` keeps track of columns that describe the /// global ordering of the schema. These columns are not necessarily same; e.g. /// ```text @@ -131,22 +154,20 @@ pub struct EquivalentClass { /// First element in the EquivalentClass head: T, /// Other equal columns - others: HashSet, + others: Vec, } -impl EquivalentClass { +impl EquivalentClass { pub fn new(head: T, others: Vec) -> EquivalentClass { - EquivalentClass { - head, - others: HashSet::from_iter(others), - } + let others = deduplicate_vector(others); + EquivalentClass { head, others } } pub fn head(&self) -> &T { &self.head } - pub fn others(&self) -> &HashSet { + pub fn others(&self) -> &[T] { &self.others } @@ -155,15 +176,21 @@ impl EquivalentClass { } pub fn insert(&mut self, col: T) -> bool { - self.head != col && self.others.insert(col) + if self.head != col && !self.others.contains(&col) { + self.others.push(col); + true + } else { + false + } } pub fn remove(&mut self, col: &T) -> bool { - let removed = self.others.remove(col); + let removed = remove_from_vec(&mut self.others, col); + // If the the removed entry is head, shit other such that first entry becomes head in others. if !removed && *col == self.head { - let one_col = self.others.iter().next().cloned(); + let one_col = self.others.first().cloned(); if let Some(col) = one_col { - let removed = self.others.remove(&col); + let removed = remove_from_vec(&mut self.others, &col); self.head = col; removed } else { @@ -446,4 +473,22 @@ mod tests { Ok(()) } + + #[test] + fn test_deduplicate_vector() -> Result<()> { + assert_eq!(deduplicate_vector(vec![1, 1, 2, 3, 3]), vec![1, 2, 3]); + assert_eq!( + deduplicate_vector(vec![1, 2, 3, 4, 3, 2, 1, 0]), + vec![1, 2, 3, 4, 0] + ); + Ok(()) + } + + #[test] + fn test_get_elem_position() -> Result<()> { + assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &2), Some(2)); + assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &1), Some(0)); + assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &5), None); + Ok(()) + } } From cc46ff83eac48e3e6009a3327c6d82ecd7439b82 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 15:05:32 +0300 Subject: [PATCH 2/9] Convert OrderedColumn to PhysicalSortExpr --- datafusion/physical-expr/src/equivalence.rs | 142 ++++++++++++++------ datafusion/physical-expr/src/lib.rs | 2 +- datafusion/physical-expr/src/utils.rs | 54 ++++++-- 3 files changed, 148 insertions(+), 50 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index a910de4c05581..0b56d02237eb3 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::expressions::Column; -use crate::{PhysicalSortExpr, PhysicalSortRequirement}; +use crate::expressions::{BinaryExpr, Column}; +use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement}; use arrow::datatypes::SchemaRef; use arrow_schema::SortOptions; @@ -32,7 +32,7 @@ pub struct EquivalenceProperties { schema: SchemaRef, } -impl EquivalenceProperties { +impl EquivalenceProperties { pub fn new(schema: SchemaRef) -> Self { EquivalenceProperties { classes: vec![], @@ -127,6 +127,25 @@ fn remove_from_vec(in_data: &mut Vec, elem: &T) -> bool { } } +fn get_column_indices_helper( + indices: &mut Vec<(usize, String)>, + expr: &Arc, +) { + if let Some(col) = expr.as_any().downcast_ref::() { + indices.push((col.index(), col.name().to_string())) + } else if let Some(binary_expr) = expr.as_any().downcast_ref::() { + get_column_indices_helper(indices, binary_expr.left()); + get_column_indices_helper(indices, binary_expr.right()); + }; +} + +/// Get the indices of the columns occur in the expression +fn get_column_indices_names(expr: &Arc) -> Vec<(usize, String)> { + let mut result = vec![]; + get_column_indices_helper(&mut result, expr); + result +} + /// `OrderingEquivalenceProperties` keeps track of columns that describe the /// global ordering of the schema. These columns are not necessarily same; e.g. /// ```text @@ -142,7 +161,7 @@ fn remove_from_vec(in_data: &mut Vec, elem: &T) -> bool { /// where both `a ASC` and `b DESC` can describe the table ordering. With /// `OrderingEquivalenceProperties`, we can keep track of these equivalences /// and treat `a ASC` and `b DESC` as the same ordering requirement. -pub type OrderingEquivalenceProperties = EquivalenceProperties>; +pub type OrderingEquivalenceProperties = EquivalenceProperties>; /// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are known /// to have the same value in all tuples in a relation. `EquivalentClass` @@ -157,7 +176,7 @@ pub struct EquivalentClass { others: Vec, } -impl EquivalentClass { +impl EquivalentClass { pub fn new(head: T, others: Vec) -> EquivalentClass { let others = deduplicate_vector(others); EquivalentClass { head, others } @@ -214,36 +233,36 @@ impl EquivalentClass { } } -/// This object represents a [`Column`] with a definite ordering. -#[derive(Debug, Hash, PartialEq, Eq, Clone)] -pub struct OrderedColumn { - pub col: Column, - pub options: SortOptions, -} - -impl OrderedColumn { - pub fn new(col: Column, options: SortOptions) -> Self { - Self { col, options } - } -} - -impl From for PhysicalSortExpr { - fn from(value: OrderedColumn) -> Self { - PhysicalSortExpr { - expr: Arc::new(value.col) as _, - options: value.options, - } - } -} - -impl From for PhysicalSortRequirement { - fn from(value: OrderedColumn) -> Self { - PhysicalSortRequirement { - expr: Arc::new(value.col) as _, - options: Some(value.options), - } - } -} +// /// This object represents a [`Column`] with a definite ordering. +// #[derive(Debug, Hash, PartialEq, Eq, Clone)] +// pub struct OrderedColumn { +// pub col: Column, +// pub options: SortOptions, +// } +// +// impl OrderedColumn { +// pub fn new(col: Column, options: SortOptions) -> Self { +// Self { col, options } +// } +// } +// +// impl From for PhysicalSortExpr { +// fn from(value: OrderedColumn) -> Self { +// PhysicalSortExpr { +// expr: Arc::new(value.col) as _, +// options: value.options, +// } +// } +// } +// +// impl From for PhysicalSortRequirement { +// fn from(value: OrderedColumn) -> Self { +// PhysicalSortRequirement { +// expr: Arc::new(value.col) as _, +// options: Some(value.options), +// } +// } +// } /// `Vec` stores the lexicographical ordering for a schema. /// OrderingEquivalentClass keeps track of different alternative orderings than can @@ -256,7 +275,7 @@ impl From for PhysicalSortRequirement { /// |3|2|1|3| /// both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the ordering of the table. /// For this case, we say that `vec![a ASC, b ASC]`, and `vec![c DESC, d ASC]` are ordering equivalent. -pub type OrderingEquivalentClass = EquivalentClass>; +pub type OrderingEquivalentClass = EquivalentClass>; impl OrderingEquivalentClass { /// This function extends ordering equivalences with alias information. @@ -265,15 +284,17 @@ impl OrderingEquivalentClass { /// since b is alias of colum a. After this function (a ASC), (c DESC), (b ASC) would be ordering equivalent. fn update_with_aliases(&mut self, columns_map: &HashMap>) { for (column, columns) in columns_map { + let col_expr = Arc::new(column.clone()) as Arc; let mut to_insert = vec![]; for ordering in std::iter::once(&self.head).chain(self.others.iter()) { for (idx, item) in ordering.iter().enumerate() { - if item.col.eq(column) { + if item.expr.eq(&col_expr) { for col in columns { + let col_expr = Arc::new(col.clone()) as Arc; let mut normalized = self.head.clone(); // Change the corresponding entry in the head with the alias column: let entry = &mut normalized[idx]; - (entry.col, entry.options) = (col.clone(), item.options); + (entry.expr, entry.options) = (col_expr, item.options); to_insert.push(normalized); } } @@ -359,8 +380,12 @@ pub fn project_ordering_equivalence_properties( .iter() .filter(|columns| { columns.iter().any(|column| { - let idx = column.col.index(); - idx >= fields.len() || fields[idx].name() != column.col.name() + let indices_names = get_column_indices_names(&column.expr); + indices_names.into_iter().any(|(idx, name)| { + idx >= fields.len() || fields[idx].name() != &name + }) + // let idx = column.col.index(); + // idx >= fields.len() || fields[idx].name() != column.col.name() }) }) .cloned() @@ -381,6 +406,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::Result; + use datafusion_expr::Operator; use std::sync::Arc; #[test] @@ -491,4 +517,40 @@ mod tests { assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &5), None); Ok(()) } + + #[test] + fn test_remove_from_vec() -> Result<()> { + let mut in_data = vec![1, 1, 2, 3, 3]; + remove_from_vec(&mut in_data, &5); + assert_eq!(in_data, vec![1, 1, 2, 3, 3]); + remove_from_vec(&mut in_data, &2); + assert_eq!(in_data, vec![1, 1, 3, 3]); + remove_from_vec(&mut in_data, &2); + assert_eq!(in_data, vec![1, 1, 3, 3]); + remove_from_vec(&mut in_data, &3); + assert_eq!(in_data, vec![1, 1, 3]); + remove_from_vec(&mut in_data, &3); + assert_eq!(in_data, vec![1, 1]); + Ok(()) + } + + #[test] + fn test_column_indices_names() -> Result<()> { + let expr1 = Arc::new(Column::new("col1", 2)) as _; + assert_eq!( + get_column_indices_names(&expr1), + vec![(2, "col1".to_string())] + ); + let expr2 = Arc::new(Column::new("col2", 5)) as _; + assert_eq!( + get_column_indices_names(&expr2), + vec![(5, "col2".to_string())] + ); + let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as _; + assert_eq!( + get_column_indices_names(&expr3), + vec![(2, "col1".to_string()), (5, "col2".to_string())] + ); + Ok(()) + } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index b65a4e892f9ec..4523e98af8c52 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -50,7 +50,7 @@ pub use aggregate::AggregateExpr; pub use datafusion_common::from_slice; pub use equivalence::{ project_equivalence_properties, project_ordering_equivalence_properties, - EquivalenceProperties, EquivalentClass, OrderedColumn, OrderingEquivalenceProperties, + EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties, OrderingEquivalentClass, }; pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef}; diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 54a18a157304d..465c39e5e731b 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -696,7 +696,7 @@ pub fn reassign_predicate_columns( mod tests { use super::*; use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal}; - use crate::{OrderedColumn, PhysicalSortExpr}; + use crate::PhysicalSortExpr; use arrow::compute::SortOptions; use datafusion_common::{Result, ScalarValue}; use std::fmt::{Display, Formatter}; @@ -784,18 +784,46 @@ mod tests { let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema.clone()); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), option1)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()), + options: option1, + }], &vec![ - OrderedColumn::new(col_d.clone(), option1), - OrderedColumn::new(col_b.clone(), option1), + PhysicalSortExpr { + expr: Arc::new(col_d.clone()), + options: option1, + }, + PhysicalSortExpr { + expr: Arc::new(col_b.clone()), + options: option1, + }, ], + // &vec![OrderedColumn::new(col_a.clone(), option1)], + // &vec![ + // OrderedColumn::new(col_d.clone(), option1), + // OrderedColumn::new(col_b.clone(), option1), + // ], )); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), option1)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()), + options: option1, + }], &vec![ - OrderedColumn::new(col_e.clone(), option2), - OrderedColumn::new(col_b.clone(), option1), + PhysicalSortExpr { + expr: Arc::new(col_e.clone()), + options: option2, + }, + PhysicalSortExpr { + expr: Arc::new(col_b.clone()), + options: option1, + }, ], + // &vec![OrderedColumn::new(col_a.clone(), option1)], + // &vec![ + // OrderedColumn::new(col_e.clone(), option2), + // OrderedColumn::new(col_b.clone(), option1), + // ], )); Ok((test_schema, eq_properties, ordering_eq_properties)) } @@ -1391,8 +1419,16 @@ mod tests { // Column a and e are ordering equivalent (e.g global ordering of the table can be described both as a ASC and e ASC.) let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), option1)], - &vec![OrderedColumn::new(col_e.clone(), option1)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()), + options: option1, + }], + &vec![PhysicalSortExpr { + expr: Arc::new(col_e.clone()), + options: option1, + }], + // &vec![OrderedColumn::new(col_a.clone(), option1)], + // &vec![OrderedColumn::new(col_e.clone(), option1)], )); let sort_req_a = PhysicalSortExpr { expr: Arc::new((col_a).clone()) as _, From 6efde06993d5e3e08360ef6804831ac8285c1517 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 15:19:29 +0300 Subject: [PATCH 3/9] Convert unit tests to new API. --- .../core/src/physical_plan/aggregates/mod.rs | 16 ++++++-- .../core/src/physical_plan/windows/mod.rs | 21 +++++----- datafusion/physical-expr/src/equivalence.rs | 41 ++----------------- 3 files changed, 26 insertions(+), 52 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index fc722e9ac3528..1a4202c87504d 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -1077,8 +1077,8 @@ mod tests { lit, ApproxDistinct, Column, Count, Median, }; use datafusion_physical_expr::{ - AggregateExpr, EquivalenceProperties, OrderedColumn, - OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr, + AggregateExpr, EquivalenceProperties, OrderingEquivalenceProperties, + PhysicalExpr, PhysicalSortExpr, }; use futures::{FutureExt, Stream}; use std::any::Any; @@ -1735,8 +1735,16 @@ mod tests { eq_properties.add_equal_conditions((&col_a, &col_b)); let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), options1)], - &vec![OrderedColumn::new(col_c.clone(), options2)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()) as _, + options: options1, + }], + &vec![PhysicalSortExpr { + expr: Arc::new(col_c.clone()) as _, + options: options2, + }], + // &vec![OrderedColumn::new(col_a.clone(), options1)], + // &vec![OrderedColumn::new(col_c.clone(), options2)], )); let order_by_exprs = vec![ diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index d4732cc1f604a..ce809376da0a5 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -53,8 +53,8 @@ pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; use datafusion_physical_expr::{ - normalize_expr_with_equivalence_properties, OrderedColumn, - OrderingEquivalenceProperties, PhysicalSortRequirement, + normalize_expr_with_equivalence_properties, OrderingEquivalenceProperties, + PhysicalSortRequirement, }; pub use window_agg_exec::WindowAggExec; @@ -269,14 +269,10 @@ pub(crate) fn window_ordering_equivalence( item.expr.clone(), input.equivalence_properties().classes(), ); - // Currently we only support, ordering equivalences for `Column` expressions. - // TODO: Add support for ordering equivalence for all `PhysicalExpr`s - if let Some(column) = normalized.as_any().downcast_ref::() { - normalized_out_ordering - .push(OrderedColumn::new(column.clone(), item.options)); - } else { - break; - } + normalized_out_ordering.push(PhysicalSortExpr { + expr: normalized, + options: item.options, + }); } for expr in window_expr { if let Some(builtin_window_expr) = @@ -299,7 +295,10 @@ pub(crate) fn window_ordering_equivalence( descending: false, nulls_first: false, }; // ASC, NULLS LAST - let rhs = OrderedColumn::new(column, options); + let rhs = PhysicalSortExpr { + expr: Arc::new(column) as _, + options, + }; result .add_equal_conditions((&normalized_out_ordering, &vec![rhs])); } diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 0b56d02237eb3..627be4c40a263 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -16,13 +16,11 @@ // under the License. use crate::expressions::{BinaryExpr, Column}; -use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement}; +use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::datatypes::SchemaRef; -use arrow_schema::SortOptions; use std::collections::HashMap; -use std::hash::Hash; use std::sync::Arc; /// Equivalence Properties is a vec of EquivalentClass. @@ -163,10 +161,10 @@ fn get_column_indices_names(expr: &Arc) -> Vec<(usize, String) /// and treat `a ASC` and `b DESC` as the same ordering requirement. pub type OrderingEquivalenceProperties = EquivalenceProperties>; -/// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are known +/// EquivalentClass is a set of [`Column`]s or [`PhysicalSortExpr`]s that are known /// to have the same value in all tuples in a relation. `EquivalentClass` /// is generated by equality predicates, typically equijoin conditions and equality -/// conditions in filters. `EquivalentClass` is generated by the +/// conditions in filters. `EquivalentClass` is generated by the /// `ROW_NUMBER` window function. #[derive(Debug, Clone)] pub struct EquivalentClass { @@ -233,38 +231,7 @@ impl EquivalentClass { } } -// /// This object represents a [`Column`] with a definite ordering. -// #[derive(Debug, Hash, PartialEq, Eq, Clone)] -// pub struct OrderedColumn { -// pub col: Column, -// pub options: SortOptions, -// } -// -// impl OrderedColumn { -// pub fn new(col: Column, options: SortOptions) -> Self { -// Self { col, options } -// } -// } -// -// impl From for PhysicalSortExpr { -// fn from(value: OrderedColumn) -> Self { -// PhysicalSortExpr { -// expr: Arc::new(value.col) as _, -// options: value.options, -// } -// } -// } -// -// impl From for PhysicalSortRequirement { -// fn from(value: OrderedColumn) -> Self { -// PhysicalSortRequirement { -// expr: Arc::new(value.col) as _, -// options: Some(value.options), -// } -// } -// } - -/// `Vec` stores the lexicographical ordering for a schema. +/// `Vec` stores the lexicographical ordering for a schema. /// OrderingEquivalentClass keeps track of different alternative orderings than can /// describe the schema. /// For instance, for the table below From 457a22df5365a9e8f6ff495603cc7fd51bfb821e Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 15:45:30 +0300 Subject: [PATCH 4/9] Simplifications --- .../core/src/physical_plan/aggregates/mod.rs | 2 - .../core/src/physical_plan/windows/mod.rs | 2 +- datafusion/physical-expr/src/equivalence.rs | 62 +++++++++---------- datafusion/physical-expr/src/utils.rs | 12 ---- 4 files changed, 31 insertions(+), 47 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 1a4202c87504d..855ec868b75a0 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -1743,8 +1743,6 @@ mod tests { expr: Arc::new(col_c.clone()) as _, options: options2, }], - // &vec![OrderedColumn::new(col_a.clone(), options1)], - // &vec![OrderedColumn::new(col_c.clone(), options2)], )); let order_by_exprs = vec![ diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index ce809376da0a5..a4b8793787ad3 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -288,7 +288,7 @@ pub(crate) fn window_ordering_equivalence( // If there is an existing ordering, add new ordering as an equivalence: if !normalized_out_ordering.is_empty() { if let Some((idx, field)) = - schema.column_with_name(expr.field().unwrap().name()) + schema.column_with_name(builtin_window_expr.name()) { let column = Column::new(field.name(), idx); let options = SortOptions { diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 627be4c40a263..72cc1b6077993 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -112,12 +112,15 @@ fn deduplicate_vector(in_data: Vec) -> Vec { result } -fn get_elem_position(in_data: &[T], elem: &T) -> Option { - in_data.iter().position(|item| item.eq(elem)) +/// Find the position of `entry` inside `in_data`, if `entry` is not found return `None`. +fn get_entry_position(in_data: &[T], entry: &T) -> Option { + in_data.iter().position(|item| item.eq(entry)) } -fn remove_from_vec(in_data: &mut Vec, elem: &T) -> bool { - if let Some(idx) = get_elem_position(in_data, elem) { +/// Remove `entry` for the `in_data`, returns `true` if removal is successful (e.g `entry` is indeed in the `in_data`) +/// Otherwise return `false` +fn remove_from_vec(in_data: &mut Vec, entry: &T) -> bool { + if let Some(idx) = get_entry_position(in_data, entry) { in_data.remove(idx); true } else { @@ -125,22 +128,23 @@ fn remove_from_vec(in_data: &mut Vec, elem: &T) -> bool { } } -fn get_column_indices_helper( +// Helper function to calculate column info recursively +fn get_column_infos_helper( indices: &mut Vec<(usize, String)>, expr: &Arc, ) { if let Some(col) = expr.as_any().downcast_ref::() { indices.push((col.index(), col.name().to_string())) } else if let Some(binary_expr) = expr.as_any().downcast_ref::() { - get_column_indices_helper(indices, binary_expr.left()); - get_column_indices_helper(indices, binary_expr.right()); + get_column_infos_helper(indices, binary_expr.left()); + get_column_infos_helper(indices, binary_expr.right()); }; } -/// Get the indices of the columns occur in the expression -fn get_column_indices_names(expr: &Arc) -> Vec<(usize, String)> { +/// Get index and name of each column that is in the expression (Can return multiple entries for `BinaryExpr`s) +fn get_column_infos(expr: &Arc) -> Vec<(usize, String)> { let mut result = vec![]; - get_column_indices_helper(&mut result, expr); + get_column_infos_helper(&mut result, expr); result } @@ -343,21 +347,21 @@ pub fn project_ordering_equivalence_properties( let schema = output_eq.schema(); let fields = schema.fields(); for class in eq_classes.iter_mut() { - let columns_to_remove = class + let sort_exprs_to_remove = class .iter() - .filter(|columns| { - columns.iter().any(|column| { - let indices_names = get_column_indices_names(&column.expr); - indices_names.into_iter().any(|(idx, name)| { + .filter(|sort_exprs| { + sort_exprs.iter().any(|sort_expr| { + let col_infos = get_column_infos(&sort_expr.expr); + // If any one of the columns, used in Expression is invalid, remove expression + // from ordering equivalences + col_infos.into_iter().any(|(idx, name)| { idx >= fields.len() || fields[idx].name() != &name }) - // let idx = column.col.index(); - // idx >= fields.len() || fields[idx].name() != column.col.name() }) }) .cloned() .collect::>(); - for column in columns_to_remove { + for column in sort_exprs_to_remove { class.remove(&column); } } @@ -478,10 +482,10 @@ mod tests { } #[test] - fn test_get_elem_position() -> Result<()> { - assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &2), Some(2)); - assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &1), Some(0)); - assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &5), None); + fn test_get_entry_position() -> Result<()> { + assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &2), Some(2)); + assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &1), Some(0)); + assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &5), None); Ok(()) } @@ -502,20 +506,14 @@ mod tests { } #[test] - fn test_column_indices_names() -> Result<()> { + fn test_get_column_infos() -> Result<()> { let expr1 = Arc::new(Column::new("col1", 2)) as _; - assert_eq!( - get_column_indices_names(&expr1), - vec![(2, "col1".to_string())] - ); + assert_eq!(get_column_infos(&expr1), vec![(2, "col1".to_string())]); let expr2 = Arc::new(Column::new("col2", 5)) as _; - assert_eq!( - get_column_indices_names(&expr2), - vec![(5, "col2".to_string())] - ); + assert_eq!(get_column_infos(&expr2), vec![(5, "col2".to_string())]); let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as _; assert_eq!( - get_column_indices_names(&expr3), + get_column_infos(&expr3), vec![(2, "col1".to_string()), (5, "col2".to_string())] ); Ok(()) diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 465c39e5e731b..aaa4781cc2df8 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -798,11 +798,6 @@ mod tests { options: option1, }, ], - // &vec![OrderedColumn::new(col_a.clone(), option1)], - // &vec![ - // OrderedColumn::new(col_d.clone(), option1), - // OrderedColumn::new(col_b.clone(), option1), - // ], )); ordering_eq_properties.add_equal_conditions(( &vec![PhysicalSortExpr { @@ -819,11 +814,6 @@ mod tests { options: option1, }, ], - // &vec![OrderedColumn::new(col_a.clone(), option1)], - // &vec![ - // OrderedColumn::new(col_e.clone(), option2), - // OrderedColumn::new(col_b.clone(), option1), - // ], )); Ok((test_schema, eq_properties, ordering_eq_properties)) } @@ -1427,8 +1417,6 @@ mod tests { expr: Arc::new(col_e.clone()), options: option1, }], - // &vec![OrderedColumn::new(col_a.clone(), option1)], - // &vec![OrderedColumn::new(col_e.clone(), option1)], )); let sort_req_a = PhysicalSortExpr { expr: Arc::new((col_a).clone()) as _, From d57e98b271712476767835333b50e64c50426c35 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 16:04:01 +0300 Subject: [PATCH 5/9] Simplifications --- datafusion/physical-expr/src/equivalence.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 72cc1b6077993..3b52315dea977 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -102,6 +102,7 @@ impl EquivalenceProperties { } } +/// Remove duplicates inside the `in_data` vector, returned vector would consist of unique entries fn deduplicate_vector(in_data: Vec) -> Vec { let mut result = vec![]; for elem in in_data { @@ -163,7 +164,7 @@ fn get_column_infos(expr: &Arc) -> Vec<(usize, String)> { /// where both `a ASC` and `b DESC` can describe the table ordering. With /// `OrderingEquivalenceProperties`, we can keep track of these equivalences /// and treat `a ASC` and `b DESC` as the same ordering requirement. -pub type OrderingEquivalenceProperties = EquivalenceProperties>; +pub type OrderingEquivalenceProperties = EquivalenceProperties; /// EquivalentClass is a set of [`Column`]s or [`PhysicalSortExpr`]s that are known /// to have the same value in all tuples in a relation. `EquivalentClass` @@ -235,7 +236,10 @@ impl EquivalentClass { } } -/// `Vec` stores the lexicographical ordering for a schema. +// `LexOrdering` is type alias for lexicographical ordering definition `Vec` +type LexOrdering = Vec; + +/// `LexOrdering` stores the lexicographical ordering for a schema. /// OrderingEquivalentClass keeps track of different alternative orderings than can /// describe the schema. /// For instance, for the table below @@ -246,7 +250,7 @@ impl EquivalentClass { /// |3|2|1|3| /// both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the ordering of the table. /// For this case, we say that `vec![a ASC, b ASC]`, and `vec![c DESC, d ASC]` are ordering equivalent. -pub type OrderingEquivalentClass = EquivalentClass>; +pub type OrderingEquivalentClass = EquivalentClass; impl OrderingEquivalentClass { /// This function extends ordering equivalences with alias information. @@ -361,8 +365,8 @@ pub fn project_ordering_equivalence_properties( }) .cloned() .collect::>(); - for column in sort_exprs_to_remove { - class.remove(&column); + for sort_exprs in sort_exprs_to_remove { + class.remove(&sort_exprs); } } eq_classes.retain(|props| props.len() > 1); From f8db9d205f12829b2515207d8e964a51973a04f9 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 16:13:38 +0300 Subject: [PATCH 6/9] Add new test --- .../tests/sqllogictests/test_files/window.slt | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index 32f45dbb57b4b..e141799b41879 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -2405,6 +2405,30 @@ GlobalLimitExec: skip=0, fetch=5 ------SortExec: expr=[c9@0 DESC] --------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true +# This test shows that ordering equivalence can keep track of complex expressions (not just Column expressions) +# during ordering satisfy analysis. In the final plan we should only see single SortExec. +query TT +EXPLAIN SELECT c5, c9, rn1 FROM (SELECT c5, c9, + ROW_NUMBER() OVER(ORDER BY c9 + c5 DESC) as rn1 + FROM aggregate_test_100 + ORDER BY c9 + c5 DESC) + ORDER BY rn1, c9 + c5 DESC + LIMIT 5 +---- +logical_plan +Limit: skip=0, fetch=5 +--Sort: rn1 ASC NULLS LAST, CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST, fetch=5 +----Sort: CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST +------Projection: aggregate_test_100.c5, aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1 +--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +----------TableScan: aggregate_test_100 projection=[c5, c9] +physical_plan +GlobalLimitExec: skip=0, fetch=5 +--ProjectionExec: expr=[c5@0 as c5, c9@1 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rn1] +----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] +------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC] +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5, c9], has_header=true + # The following query has type error. We should test the error could be detected # from either the logical plan (when `skip_failed_rules` is set to `false`) or # the physical plan (when `skip_failed_rules` is set to `true`). From 825ace1e6662c057d9d3864d942aac2f5845e66d Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 29 May 2023 10:23:51 +0300 Subject: [PATCH 7/9] Update comments, move type definition to common place. --- datafusion/physical-expr/src/equivalence.rs | 10 +++++----- datafusion/physical-expr/src/lib.rs | 2 +- datafusion/physical-expr/src/sort_expr.rs | 3 +++ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 1eaf31844e745..148b147d18c32 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -16,7 +16,10 @@ // under the License. use crate::expressions::{BinaryExpr, Column}; -use crate::{normalize_expr_with_equivalence_properties, PhysicalExpr, PhysicalSortExpr}; +use crate::{ + normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr, + PhysicalSortExpr, +}; use arrow::datatypes::SchemaRef; @@ -216,7 +219,7 @@ impl EquivalentClass { pub fn remove(&mut self, col: &T) -> bool { let removed = remove_from_vec(&mut self.others, col); - // If the the removed entry is head, shit other such that first entry becomes head in others. + // If we are removing the head, shift others so that its first entry becomes the new head. if !removed && *col == self.head { let one_col = self.others.first().cloned(); if let Some(col) = one_col { @@ -244,9 +247,6 @@ impl EquivalentClass { } } -// `LexOrdering` is type alias for lexicographical ordering definition `Vec` -type LexOrdering = Vec; - /// `LexOrdering` stores the lexicographical ordering for a schema. /// OrderingEquivalentClass keeps track of different alternative orderings than can /// describe the schema. diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index f7bdfbaf3c896..64ffa134c20e3 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -56,7 +56,7 @@ pub use equivalence::{ pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; -pub use sort_expr::{PhysicalSortExpr, PhysicalSortRequirement}; +pub use sort_expr::{LexOrdering, PhysicalSortExpr, PhysicalSortRequirement}; pub use utils::{ expr_list_eq_any_order, expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, normalize_out_expr_with_columns_map, diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index 78cdd87db2758..7eec9e5be3d82 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -213,3 +213,6 @@ fn to_str(options: &SortOptions) -> &str { (false, false) => "ASC NULLS LAST", } } + +// `LexOrdering` is type alias for lexicographical ordering definition `Vec` +pub type LexOrdering = Vec; From 6e8e02755b4016a318f76750dd61cc7efd0cd074 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 29 May 2023 10:45:50 +0300 Subject: [PATCH 8/9] Update comment --- datafusion/physical-expr/src/sort_expr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index 7eec9e5be3d82..665a47e586e29 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -214,5 +214,5 @@ fn to_str(options: &SortOptions) -> &str { } } -// `LexOrdering` is type alias for lexicographical ordering definition `Vec` +/// `LexOrdering` is a type alias for lexicographical ordering definition `Vec` pub type LexOrdering = Vec; From 48a602e2c19d019841ab50e999c3d5085ac5c7e1 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Sat, 3 Jun 2023 02:02:24 +0300 Subject: [PATCH 9/9] change function name --- datafusion/physical-expr/src/equivalence.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 148b147d18c32..d60498bc43345 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -436,7 +436,7 @@ pub fn project_ordering_equivalence_properties( .iter() .filter(|sort_exprs| { sort_exprs.iter().any(|sort_expr| { - let col_infos = get_column_infos(&sort_expr.expr); + let col_infos = get_column_indices(&sort_expr.expr); // If any one of the columns, used in Expression is invalid, remove expression // from ordering equivalences col_infos.into_iter().any(|(idx, name)| { @@ -593,12 +593,12 @@ mod tests { #[test] fn test_get_column_infos() -> Result<()> { let expr1 = Arc::new(Column::new("col1", 2)) as _; - assert_eq!(get_column_infos(&expr1), vec![(2, "col1".to_string())]); + assert_eq!(get_column_indices(&expr1), vec![(2, "col1".to_string())]); let expr2 = Arc::new(Column::new("col2", 5)) as _; - assert_eq!(get_column_infos(&expr2), vec![(5, "col2".to_string())]); + assert_eq!(get_column_indices(&expr2), vec![(5, "col2".to_string())]); let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as _; assert_eq!( - get_column_infos(&expr3), + get_column_indices(&expr3), vec![(2, "col1".to_string()), (5, "col2".to_string())] ); Ok(())