From 74ff631db5a5bfcca7179479d3732c94888a914a Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 14:24:19 +0300 Subject: [PATCH 01/12] remove hash dependency from EquivalenceProperties --- datafusion/physical-expr/src/equivalence.rs | 71 +++++++++++++++++---- 1 file changed, 58 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index a1f2df9208f07..a910de4c05581 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -21,7 +21,7 @@ use crate::{PhysicalSortExpr, PhysicalSortRequirement}; use arrow::datatypes::SchemaRef; use arrow_schema::SortOptions; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::hash::Hash; use std::sync::Arc; @@ -32,7 +32,7 @@ pub struct EquivalenceProperties { schema: SchemaRef, } -impl EquivalenceProperties { +impl EquivalenceProperties { pub fn new(schema: SchemaRef) -> Self { EquivalenceProperties { classes: vec![], @@ -104,6 +104,29 @@ impl EquivalenceProperties { } } +fn deduplicate_vector(in_data: Vec) -> Vec { + let mut result = vec![]; + for elem in in_data { + if !result.contains(&elem) { + result.push(elem); + } + } + result +} + +fn get_elem_position(in_data: &[T], elem: &T) -> Option { + in_data.iter().position(|item| item.eq(elem)) +} + +fn remove_from_vec(in_data: &mut Vec, elem: &T) -> bool { + if let Some(idx) = get_elem_position(in_data, elem) { + in_data.remove(idx); + true + } else { + false + } +} + /// `OrderingEquivalenceProperties` keeps track of columns that describe the /// global ordering of the schema. These columns are not necessarily same; e.g. /// ```text @@ -131,22 +154,20 @@ pub struct EquivalentClass { /// First element in the EquivalentClass head: T, /// Other equal columns - others: HashSet, + others: Vec, } -impl EquivalentClass { +impl EquivalentClass { pub fn new(head: T, others: Vec) -> EquivalentClass { - EquivalentClass { - head, - others: HashSet::from_iter(others), - } + let others = deduplicate_vector(others); + EquivalentClass { head, others } } pub fn head(&self) -> &T { &self.head } - pub fn others(&self) -> &HashSet { + pub fn others(&self) -> &[T] { &self.others } @@ -155,15 +176,21 @@ impl EquivalentClass { } pub fn insert(&mut self, col: T) -> bool { - self.head != col && self.others.insert(col) + if self.head != col && !self.others.contains(&col) { + self.others.push(col); + true + } else { + false + } } pub fn remove(&mut self, col: &T) -> bool { - let removed = self.others.remove(col); + let removed = remove_from_vec(&mut self.others, col); + // If the the removed entry is head, shit other such that first entry becomes head in others. if !removed && *col == self.head { - let one_col = self.others.iter().next().cloned(); + let one_col = self.others.first().cloned(); if let Some(col) = one_col { - let removed = self.others.remove(&col); + let removed = remove_from_vec(&mut self.others, &col); self.head = col; removed } else { @@ -446,4 +473,22 @@ mod tests { Ok(()) } + + #[test] + fn test_deduplicate_vector() -> Result<()> { + assert_eq!(deduplicate_vector(vec![1, 1, 2, 3, 3]), vec![1, 2, 3]); + assert_eq!( + deduplicate_vector(vec![1, 2, 3, 4, 3, 2, 1, 0]), + vec![1, 2, 3, 4, 0] + ); + Ok(()) + } + + #[test] + fn test_get_elem_position() -> Result<()> { + assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &2), Some(2)); + assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &1), Some(0)); + assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &5), None); + Ok(()) + } } From cc46ff83eac48e3e6009a3327c6d82ecd7439b82 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 15:05:32 +0300 Subject: [PATCH 02/12] Convert OrderedColumn to PhysicalSortExpr --- datafusion/physical-expr/src/equivalence.rs | 142 ++++++++++++++------ datafusion/physical-expr/src/lib.rs | 2 +- datafusion/physical-expr/src/utils.rs | 54 ++++++-- 3 files changed, 148 insertions(+), 50 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index a910de4c05581..0b56d02237eb3 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::expressions::Column; -use crate::{PhysicalSortExpr, PhysicalSortRequirement}; +use crate::expressions::{BinaryExpr, Column}; +use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement}; use arrow::datatypes::SchemaRef; use arrow_schema::SortOptions; @@ -32,7 +32,7 @@ pub struct EquivalenceProperties { schema: SchemaRef, } -impl EquivalenceProperties { +impl EquivalenceProperties { pub fn new(schema: SchemaRef) -> Self { EquivalenceProperties { classes: vec![], @@ -127,6 +127,25 @@ fn remove_from_vec(in_data: &mut Vec, elem: &T) -> bool { } } +fn get_column_indices_helper( + indices: &mut Vec<(usize, String)>, + expr: &Arc, +) { + if let Some(col) = expr.as_any().downcast_ref::() { + indices.push((col.index(), col.name().to_string())) + } else if let Some(binary_expr) = expr.as_any().downcast_ref::() { + get_column_indices_helper(indices, binary_expr.left()); + get_column_indices_helper(indices, binary_expr.right()); + }; +} + +/// Get the indices of the columns occur in the expression +fn get_column_indices_names(expr: &Arc) -> Vec<(usize, String)> { + let mut result = vec![]; + get_column_indices_helper(&mut result, expr); + result +} + /// `OrderingEquivalenceProperties` keeps track of columns that describe the /// global ordering of the schema. These columns are not necessarily same; e.g. /// ```text @@ -142,7 +161,7 @@ fn remove_from_vec(in_data: &mut Vec, elem: &T) -> bool { /// where both `a ASC` and `b DESC` can describe the table ordering. With /// `OrderingEquivalenceProperties`, we can keep track of these equivalences /// and treat `a ASC` and `b DESC` as the same ordering requirement. -pub type OrderingEquivalenceProperties = EquivalenceProperties>; +pub type OrderingEquivalenceProperties = EquivalenceProperties>; /// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are known /// to have the same value in all tuples in a relation. `EquivalentClass` @@ -157,7 +176,7 @@ pub struct EquivalentClass { others: Vec, } -impl EquivalentClass { +impl EquivalentClass { pub fn new(head: T, others: Vec) -> EquivalentClass { let others = deduplicate_vector(others); EquivalentClass { head, others } @@ -214,36 +233,36 @@ impl EquivalentClass { } } -/// This object represents a [`Column`] with a definite ordering. -#[derive(Debug, Hash, PartialEq, Eq, Clone)] -pub struct OrderedColumn { - pub col: Column, - pub options: SortOptions, -} - -impl OrderedColumn { - pub fn new(col: Column, options: SortOptions) -> Self { - Self { col, options } - } -} - -impl From for PhysicalSortExpr { - fn from(value: OrderedColumn) -> Self { - PhysicalSortExpr { - expr: Arc::new(value.col) as _, - options: value.options, - } - } -} - -impl From for PhysicalSortRequirement { - fn from(value: OrderedColumn) -> Self { - PhysicalSortRequirement { - expr: Arc::new(value.col) as _, - options: Some(value.options), - } - } -} +// /// This object represents a [`Column`] with a definite ordering. +// #[derive(Debug, Hash, PartialEq, Eq, Clone)] +// pub struct OrderedColumn { +// pub col: Column, +// pub options: SortOptions, +// } +// +// impl OrderedColumn { +// pub fn new(col: Column, options: SortOptions) -> Self { +// Self { col, options } +// } +// } +// +// impl From for PhysicalSortExpr { +// fn from(value: OrderedColumn) -> Self { +// PhysicalSortExpr { +// expr: Arc::new(value.col) as _, +// options: value.options, +// } +// } +// } +// +// impl From for PhysicalSortRequirement { +// fn from(value: OrderedColumn) -> Self { +// PhysicalSortRequirement { +// expr: Arc::new(value.col) as _, +// options: Some(value.options), +// } +// } +// } /// `Vec` stores the lexicographical ordering for a schema. /// OrderingEquivalentClass keeps track of different alternative orderings than can @@ -256,7 +275,7 @@ impl From for PhysicalSortRequirement { /// |3|2|1|3| /// both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the ordering of the table. /// For this case, we say that `vec![a ASC, b ASC]`, and `vec![c DESC, d ASC]` are ordering equivalent. -pub type OrderingEquivalentClass = EquivalentClass>; +pub type OrderingEquivalentClass = EquivalentClass>; impl OrderingEquivalentClass { /// This function extends ordering equivalences with alias information. @@ -265,15 +284,17 @@ impl OrderingEquivalentClass { /// since b is alias of colum a. After this function (a ASC), (c DESC), (b ASC) would be ordering equivalent. fn update_with_aliases(&mut self, columns_map: &HashMap>) { for (column, columns) in columns_map { + let col_expr = Arc::new(column.clone()) as Arc; let mut to_insert = vec![]; for ordering in std::iter::once(&self.head).chain(self.others.iter()) { for (idx, item) in ordering.iter().enumerate() { - if item.col.eq(column) { + if item.expr.eq(&col_expr) { for col in columns { + let col_expr = Arc::new(col.clone()) as Arc; let mut normalized = self.head.clone(); // Change the corresponding entry in the head with the alias column: let entry = &mut normalized[idx]; - (entry.col, entry.options) = (col.clone(), item.options); + (entry.expr, entry.options) = (col_expr, item.options); to_insert.push(normalized); } } @@ -359,8 +380,12 @@ pub fn project_ordering_equivalence_properties( .iter() .filter(|columns| { columns.iter().any(|column| { - let idx = column.col.index(); - idx >= fields.len() || fields[idx].name() != column.col.name() + let indices_names = get_column_indices_names(&column.expr); + indices_names.into_iter().any(|(idx, name)| { + idx >= fields.len() || fields[idx].name() != &name + }) + // let idx = column.col.index(); + // idx >= fields.len() || fields[idx].name() != column.col.name() }) }) .cloned() @@ -381,6 +406,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::Result; + use datafusion_expr::Operator; use std::sync::Arc; #[test] @@ -491,4 +517,40 @@ mod tests { assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &5), None); Ok(()) } + + #[test] + fn test_remove_from_vec() -> Result<()> { + let mut in_data = vec![1, 1, 2, 3, 3]; + remove_from_vec(&mut in_data, &5); + assert_eq!(in_data, vec![1, 1, 2, 3, 3]); + remove_from_vec(&mut in_data, &2); + assert_eq!(in_data, vec![1, 1, 3, 3]); + remove_from_vec(&mut in_data, &2); + assert_eq!(in_data, vec![1, 1, 3, 3]); + remove_from_vec(&mut in_data, &3); + assert_eq!(in_data, vec![1, 1, 3]); + remove_from_vec(&mut in_data, &3); + assert_eq!(in_data, vec![1, 1]); + Ok(()) + } + + #[test] + fn test_column_indices_names() -> Result<()> { + let expr1 = Arc::new(Column::new("col1", 2)) as _; + assert_eq!( + get_column_indices_names(&expr1), + vec![(2, "col1".to_string())] + ); + let expr2 = Arc::new(Column::new("col2", 5)) as _; + assert_eq!( + get_column_indices_names(&expr2), + vec![(5, "col2".to_string())] + ); + let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as _; + assert_eq!( + get_column_indices_names(&expr3), + vec![(2, "col1".to_string()), (5, "col2".to_string())] + ); + Ok(()) + } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index b65a4e892f9ec..4523e98af8c52 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -50,7 +50,7 @@ pub use aggregate::AggregateExpr; pub use datafusion_common::from_slice; pub use equivalence::{ project_equivalence_properties, project_ordering_equivalence_properties, - EquivalenceProperties, EquivalentClass, OrderedColumn, OrderingEquivalenceProperties, + EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties, OrderingEquivalentClass, }; pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef}; diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 54a18a157304d..465c39e5e731b 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -696,7 +696,7 @@ pub fn reassign_predicate_columns( mod tests { use super::*; use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal}; - use crate::{OrderedColumn, PhysicalSortExpr}; + use crate::PhysicalSortExpr; use arrow::compute::SortOptions; use datafusion_common::{Result, ScalarValue}; use std::fmt::{Display, Formatter}; @@ -784,18 +784,46 @@ mod tests { let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema.clone()); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), option1)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()), + options: option1, + }], &vec![ - OrderedColumn::new(col_d.clone(), option1), - OrderedColumn::new(col_b.clone(), option1), + PhysicalSortExpr { + expr: Arc::new(col_d.clone()), + options: option1, + }, + PhysicalSortExpr { + expr: Arc::new(col_b.clone()), + options: option1, + }, ], + // &vec![OrderedColumn::new(col_a.clone(), option1)], + // &vec![ + // OrderedColumn::new(col_d.clone(), option1), + // OrderedColumn::new(col_b.clone(), option1), + // ], )); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), option1)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()), + options: option1, + }], &vec![ - OrderedColumn::new(col_e.clone(), option2), - OrderedColumn::new(col_b.clone(), option1), + PhysicalSortExpr { + expr: Arc::new(col_e.clone()), + options: option2, + }, + PhysicalSortExpr { + expr: Arc::new(col_b.clone()), + options: option1, + }, ], + // &vec![OrderedColumn::new(col_a.clone(), option1)], + // &vec![ + // OrderedColumn::new(col_e.clone(), option2), + // OrderedColumn::new(col_b.clone(), option1), + // ], )); Ok((test_schema, eq_properties, ordering_eq_properties)) } @@ -1391,8 +1419,16 @@ mod tests { // Column a and e are ordering equivalent (e.g global ordering of the table can be described both as a ASC and e ASC.) let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), option1)], - &vec![OrderedColumn::new(col_e.clone(), option1)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()), + options: option1, + }], + &vec![PhysicalSortExpr { + expr: Arc::new(col_e.clone()), + options: option1, + }], + // &vec![OrderedColumn::new(col_a.clone(), option1)], + // &vec![OrderedColumn::new(col_e.clone(), option1)], )); let sort_req_a = PhysicalSortExpr { expr: Arc::new((col_a).clone()) as _, From 6efde06993d5e3e08360ef6804831ac8285c1517 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 15:19:29 +0300 Subject: [PATCH 03/12] Convert unit tests to new API. --- .../core/src/physical_plan/aggregates/mod.rs | 16 ++++++-- .../core/src/physical_plan/windows/mod.rs | 21 +++++----- datafusion/physical-expr/src/equivalence.rs | 41 ++----------------- 3 files changed, 26 insertions(+), 52 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index fc722e9ac3528..1a4202c87504d 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -1077,8 +1077,8 @@ mod tests { lit, ApproxDistinct, Column, Count, Median, }; use datafusion_physical_expr::{ - AggregateExpr, EquivalenceProperties, OrderedColumn, - OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr, + AggregateExpr, EquivalenceProperties, OrderingEquivalenceProperties, + PhysicalExpr, PhysicalSortExpr, }; use futures::{FutureExt, Stream}; use std::any::Any; @@ -1735,8 +1735,16 @@ mod tests { eq_properties.add_equal_conditions((&col_a, &col_b)); let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema); ordering_eq_properties.add_equal_conditions(( - &vec![OrderedColumn::new(col_a.clone(), options1)], - &vec![OrderedColumn::new(col_c.clone(), options2)], + &vec![PhysicalSortExpr { + expr: Arc::new(col_a.clone()) as _, + options: options1, + }], + &vec![PhysicalSortExpr { + expr: Arc::new(col_c.clone()) as _, + options: options2, + }], + // &vec![OrderedColumn::new(col_a.clone(), options1)], + // &vec![OrderedColumn::new(col_c.clone(), options2)], )); let order_by_exprs = vec![ diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index d4732cc1f604a..ce809376da0a5 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -53,8 +53,8 @@ pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; use datafusion_physical_expr::{ - normalize_expr_with_equivalence_properties, OrderedColumn, - OrderingEquivalenceProperties, PhysicalSortRequirement, + normalize_expr_with_equivalence_properties, OrderingEquivalenceProperties, + PhysicalSortRequirement, }; pub use window_agg_exec::WindowAggExec; @@ -269,14 +269,10 @@ pub(crate) fn window_ordering_equivalence( item.expr.clone(), input.equivalence_properties().classes(), ); - // Currently we only support, ordering equivalences for `Column` expressions. - // TODO: Add support for ordering equivalence for all `PhysicalExpr`s - if let Some(column) = normalized.as_any().downcast_ref::() { - normalized_out_ordering - .push(OrderedColumn::new(column.clone(), item.options)); - } else { - break; - } + normalized_out_ordering.push(PhysicalSortExpr { + expr: normalized, + options: item.options, + }); } for expr in window_expr { if let Some(builtin_window_expr) = @@ -299,7 +295,10 @@ pub(crate) fn window_ordering_equivalence( descending: false, nulls_first: false, }; // ASC, NULLS LAST - let rhs = OrderedColumn::new(column, options); + let rhs = PhysicalSortExpr { + expr: Arc::new(column) as _, + options, + }; result .add_equal_conditions((&normalized_out_ordering, &vec![rhs])); } diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 0b56d02237eb3..627be4c40a263 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -16,13 +16,11 @@ // under the License. use crate::expressions::{BinaryExpr, Column}; -use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement}; +use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::datatypes::SchemaRef; -use arrow_schema::SortOptions; use std::collections::HashMap; -use std::hash::Hash; use std::sync::Arc; /// Equivalence Properties is a vec of EquivalentClass. @@ -163,10 +161,10 @@ fn get_column_indices_names(expr: &Arc) -> Vec<(usize, String) /// and treat `a ASC` and `b DESC` as the same ordering requirement. pub type OrderingEquivalenceProperties = EquivalenceProperties>; -/// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are known +/// EquivalentClass is a set of [`Column`]s or [`PhysicalSortExpr`]s that are known /// to have the same value in all tuples in a relation. `EquivalentClass` /// is generated by equality predicates, typically equijoin conditions and equality -/// conditions in filters. `EquivalentClass` is generated by the +/// conditions in filters. `EquivalentClass` is generated by the /// `ROW_NUMBER` window function. #[derive(Debug, Clone)] pub struct EquivalentClass { @@ -233,38 +231,7 @@ impl EquivalentClass { } } -// /// This object represents a [`Column`] with a definite ordering. -// #[derive(Debug, Hash, PartialEq, Eq, Clone)] -// pub struct OrderedColumn { -// pub col: Column, -// pub options: SortOptions, -// } -// -// impl OrderedColumn { -// pub fn new(col: Column, options: SortOptions) -> Self { -// Self { col, options } -// } -// } -// -// impl From for PhysicalSortExpr { -// fn from(value: OrderedColumn) -> Self { -// PhysicalSortExpr { -// expr: Arc::new(value.col) as _, -// options: value.options, -// } -// } -// } -// -// impl From for PhysicalSortRequirement { -// fn from(value: OrderedColumn) -> Self { -// PhysicalSortRequirement { -// expr: Arc::new(value.col) as _, -// options: Some(value.options), -// } -// } -// } - -/// `Vec` stores the lexicographical ordering for a schema. +/// `Vec` stores the lexicographical ordering for a schema. /// OrderingEquivalentClass keeps track of different alternative orderings than can /// describe the schema. /// For instance, for the table below From 457a22df5365a9e8f6ff495603cc7fd51bfb821e Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 15:45:30 +0300 Subject: [PATCH 04/12] Simplifications --- .../core/src/physical_plan/aggregates/mod.rs | 2 - .../core/src/physical_plan/windows/mod.rs | 2 +- datafusion/physical-expr/src/equivalence.rs | 62 +++++++++---------- datafusion/physical-expr/src/utils.rs | 12 ---- 4 files changed, 31 insertions(+), 47 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 1a4202c87504d..855ec868b75a0 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -1743,8 +1743,6 @@ mod tests { expr: Arc::new(col_c.clone()) as _, options: options2, }], - // &vec![OrderedColumn::new(col_a.clone(), options1)], - // &vec![OrderedColumn::new(col_c.clone(), options2)], )); let order_by_exprs = vec![ diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index ce809376da0a5..a4b8793787ad3 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -288,7 +288,7 @@ pub(crate) fn window_ordering_equivalence( // If there is an existing ordering, add new ordering as an equivalence: if !normalized_out_ordering.is_empty() { if let Some((idx, field)) = - schema.column_with_name(expr.field().unwrap().name()) + schema.column_with_name(builtin_window_expr.name()) { let column = Column::new(field.name(), idx); let options = SortOptions { diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 627be4c40a263..72cc1b6077993 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -112,12 +112,15 @@ fn deduplicate_vector(in_data: Vec) -> Vec { result } -fn get_elem_position(in_data: &[T], elem: &T) -> Option { - in_data.iter().position(|item| item.eq(elem)) +/// Find the position of `entry` inside `in_data`, if `entry` is not found return `None`. +fn get_entry_position(in_data: &[T], entry: &T) -> Option { + in_data.iter().position(|item| item.eq(entry)) } -fn remove_from_vec(in_data: &mut Vec, elem: &T) -> bool { - if let Some(idx) = get_elem_position(in_data, elem) { +/// Remove `entry` for the `in_data`, returns `true` if removal is successful (e.g `entry` is indeed in the `in_data`) +/// Otherwise return `false` +fn remove_from_vec(in_data: &mut Vec, entry: &T) -> bool { + if let Some(idx) = get_entry_position(in_data, entry) { in_data.remove(idx); true } else { @@ -125,22 +128,23 @@ fn remove_from_vec(in_data: &mut Vec, elem: &T) -> bool { } } -fn get_column_indices_helper( +// Helper function to calculate column info recursively +fn get_column_infos_helper( indices: &mut Vec<(usize, String)>, expr: &Arc, ) { if let Some(col) = expr.as_any().downcast_ref::() { indices.push((col.index(), col.name().to_string())) } else if let Some(binary_expr) = expr.as_any().downcast_ref::() { - get_column_indices_helper(indices, binary_expr.left()); - get_column_indices_helper(indices, binary_expr.right()); + get_column_infos_helper(indices, binary_expr.left()); + get_column_infos_helper(indices, binary_expr.right()); }; } -/// Get the indices of the columns occur in the expression -fn get_column_indices_names(expr: &Arc) -> Vec<(usize, String)> { +/// Get index and name of each column that is in the expression (Can return multiple entries for `BinaryExpr`s) +fn get_column_infos(expr: &Arc) -> Vec<(usize, String)> { let mut result = vec![]; - get_column_indices_helper(&mut result, expr); + get_column_infos_helper(&mut result, expr); result } @@ -343,21 +347,21 @@ pub fn project_ordering_equivalence_properties( let schema = output_eq.schema(); let fields = schema.fields(); for class in eq_classes.iter_mut() { - let columns_to_remove = class + let sort_exprs_to_remove = class .iter() - .filter(|columns| { - columns.iter().any(|column| { - let indices_names = get_column_indices_names(&column.expr); - indices_names.into_iter().any(|(idx, name)| { + .filter(|sort_exprs| { + sort_exprs.iter().any(|sort_expr| { + let col_infos = get_column_infos(&sort_expr.expr); + // If any one of the columns, used in Expression is invalid, remove expression + // from ordering equivalences + col_infos.into_iter().any(|(idx, name)| { idx >= fields.len() || fields[idx].name() != &name }) - // let idx = column.col.index(); - // idx >= fields.len() || fields[idx].name() != column.col.name() }) }) .cloned() .collect::>(); - for column in columns_to_remove { + for column in sort_exprs_to_remove { class.remove(&column); } } @@ -478,10 +482,10 @@ mod tests { } #[test] - fn test_get_elem_position() -> Result<()> { - assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &2), Some(2)); - assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &1), Some(0)); - assert_eq!(get_elem_position(&[1, 1, 2, 3, 3], &5), None); + fn test_get_entry_position() -> Result<()> { + assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &2), Some(2)); + assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &1), Some(0)); + assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &5), None); Ok(()) } @@ -502,20 +506,14 @@ mod tests { } #[test] - fn test_column_indices_names() -> Result<()> { + fn test_get_column_infos() -> Result<()> { let expr1 = Arc::new(Column::new("col1", 2)) as _; - assert_eq!( - get_column_indices_names(&expr1), - vec![(2, "col1".to_string())] - ); + assert_eq!(get_column_infos(&expr1), vec![(2, "col1".to_string())]); let expr2 = Arc::new(Column::new("col2", 5)) as _; - assert_eq!( - get_column_indices_names(&expr2), - vec![(5, "col2".to_string())] - ); + assert_eq!(get_column_infos(&expr2), vec![(5, "col2".to_string())]); let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as _; assert_eq!( - get_column_indices_names(&expr3), + get_column_infos(&expr3), vec![(2, "col1".to_string()), (5, "col2".to_string())] ); Ok(()) diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 465c39e5e731b..aaa4781cc2df8 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -798,11 +798,6 @@ mod tests { options: option1, }, ], - // &vec![OrderedColumn::new(col_a.clone(), option1)], - // &vec![ - // OrderedColumn::new(col_d.clone(), option1), - // OrderedColumn::new(col_b.clone(), option1), - // ], )); ordering_eq_properties.add_equal_conditions(( &vec![PhysicalSortExpr { @@ -819,11 +814,6 @@ mod tests { options: option1, }, ], - // &vec![OrderedColumn::new(col_a.clone(), option1)], - // &vec![ - // OrderedColumn::new(col_e.clone(), option2), - // OrderedColumn::new(col_b.clone(), option1), - // ], )); Ok((test_schema, eq_properties, ordering_eq_properties)) } @@ -1427,8 +1417,6 @@ mod tests { expr: Arc::new(col_e.clone()), options: option1, }], - // &vec![OrderedColumn::new(col_a.clone(), option1)], - // &vec![OrderedColumn::new(col_e.clone(), option1)], )); let sort_req_a = PhysicalSortExpr { expr: Arc::new((col_a).clone()) as _, From d57e98b271712476767835333b50e64c50426c35 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 16:04:01 +0300 Subject: [PATCH 05/12] Simplifications --- datafusion/physical-expr/src/equivalence.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 72cc1b6077993..3b52315dea977 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -102,6 +102,7 @@ impl EquivalenceProperties { } } +/// Remove duplicates inside the `in_data` vector, returned vector would consist of unique entries fn deduplicate_vector(in_data: Vec) -> Vec { let mut result = vec![]; for elem in in_data { @@ -163,7 +164,7 @@ fn get_column_infos(expr: &Arc) -> Vec<(usize, String)> { /// where both `a ASC` and `b DESC` can describe the table ordering. With /// `OrderingEquivalenceProperties`, we can keep track of these equivalences /// and treat `a ASC` and `b DESC` as the same ordering requirement. -pub type OrderingEquivalenceProperties = EquivalenceProperties>; +pub type OrderingEquivalenceProperties = EquivalenceProperties; /// EquivalentClass is a set of [`Column`]s or [`PhysicalSortExpr`]s that are known /// to have the same value in all tuples in a relation. `EquivalentClass` @@ -235,7 +236,10 @@ impl EquivalentClass { } } -/// `Vec` stores the lexicographical ordering for a schema. +// `LexOrdering` is type alias for lexicographical ordering definition `Vec` +type LexOrdering = Vec; + +/// `LexOrdering` stores the lexicographical ordering for a schema. /// OrderingEquivalentClass keeps track of different alternative orderings than can /// describe the schema. /// For instance, for the table below @@ -246,7 +250,7 @@ impl EquivalentClass { /// |3|2|1|3| /// both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the ordering of the table. /// For this case, we say that `vec![a ASC, b ASC]`, and `vec![c DESC, d ASC]` are ordering equivalent. -pub type OrderingEquivalentClass = EquivalentClass>; +pub type OrderingEquivalentClass = EquivalentClass; impl OrderingEquivalentClass { /// This function extends ordering equivalences with alias information. @@ -361,8 +365,8 @@ pub fn project_ordering_equivalence_properties( }) .cloned() .collect::>(); - for column in sort_exprs_to_remove { - class.remove(&column); + for sort_exprs in sort_exprs_to_remove { + class.remove(&sort_exprs); } } eq_classes.retain(|props| props.len() > 1); From f8db9d205f12829b2515207d8e964a51973a04f9 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 26 May 2023 16:13:38 +0300 Subject: [PATCH 06/12] Add new test --- .../tests/sqllogictests/test_files/window.slt | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index 32f45dbb57b4b..e141799b41879 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -2405,6 +2405,30 @@ GlobalLimitExec: skip=0, fetch=5 ------SortExec: expr=[c9@0 DESC] --------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true +# This test shows that ordering equivalence can keep track of complex expressions (not just Column expressions) +# during ordering satisfy analysis. In the final plan we should only see single SortExec. +query TT +EXPLAIN SELECT c5, c9, rn1 FROM (SELECT c5, c9, + ROW_NUMBER() OVER(ORDER BY c9 + c5 DESC) as rn1 + FROM aggregate_test_100 + ORDER BY c9 + c5 DESC) + ORDER BY rn1, c9 + c5 DESC + LIMIT 5 +---- +logical_plan +Limit: skip=0, fetch=5 +--Sort: rn1 ASC NULLS LAST, CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST, fetch=5 +----Sort: CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST +------Projection: aggregate_test_100.c5, aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1 +--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +----------TableScan: aggregate_test_100 projection=[c5, c9] +physical_plan +GlobalLimitExec: skip=0, fetch=5 +--ProjectionExec: expr=[c5@0 as c5, c9@1 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rn1] +----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] +------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC] +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5, c9], has_header=true + # The following query has type error. We should test the error could be detected # from either the logical plan (when `skip_failed_rules` is set to `false`) or # the physical plan (when `skip_failed_rules` is set to `true`). From 825ace1e6662c057d9d3864d942aac2f5845e66d Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 29 May 2023 10:23:51 +0300 Subject: [PATCH 07/12] Update comments, move type definition to common place. --- datafusion/physical-expr/src/equivalence.rs | 10 +++++----- datafusion/physical-expr/src/lib.rs | 2 +- datafusion/physical-expr/src/sort_expr.rs | 3 +++ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 1eaf31844e745..148b147d18c32 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -16,7 +16,10 @@ // under the License. use crate::expressions::{BinaryExpr, Column}; -use crate::{normalize_expr_with_equivalence_properties, PhysicalExpr, PhysicalSortExpr}; +use crate::{ + normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr, + PhysicalSortExpr, +}; use arrow::datatypes::SchemaRef; @@ -216,7 +219,7 @@ impl EquivalentClass { pub fn remove(&mut self, col: &T) -> bool { let removed = remove_from_vec(&mut self.others, col); - // If the the removed entry is head, shit other such that first entry becomes head in others. + // If we are removing the head, shift others so that its first entry becomes the new head. if !removed && *col == self.head { let one_col = self.others.first().cloned(); if let Some(col) = one_col { @@ -244,9 +247,6 @@ impl EquivalentClass { } } -// `LexOrdering` is type alias for lexicographical ordering definition `Vec` -type LexOrdering = Vec; - /// `LexOrdering` stores the lexicographical ordering for a schema. /// OrderingEquivalentClass keeps track of different alternative orderings than can /// describe the schema. diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index f7bdfbaf3c896..64ffa134c20e3 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -56,7 +56,7 @@ pub use equivalence::{ pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; -pub use sort_expr::{PhysicalSortExpr, PhysicalSortRequirement}; +pub use sort_expr::{LexOrdering, PhysicalSortExpr, PhysicalSortRequirement}; pub use utils::{ expr_list_eq_any_order, expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, normalize_out_expr_with_columns_map, diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index 78cdd87db2758..7eec9e5be3d82 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -213,3 +213,6 @@ fn to_str(options: &SortOptions) -> &str { (false, false) => "ASC NULLS LAST", } } + +// `LexOrdering` is type alias for lexicographical ordering definition `Vec` +pub type LexOrdering = Vec; From 6e8e02755b4016a318f76750dd61cc7efd0cd074 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 29 May 2023 10:45:50 +0300 Subject: [PATCH 08/12] Update comment --- datafusion/physical-expr/src/sort_expr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index 7eec9e5be3d82..665a47e586e29 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -214,5 +214,5 @@ fn to_str(options: &SortOptions) -> &str { } } -// `LexOrdering` is type alias for lexicographical ordering definition `Vec` +/// `LexOrdering` is a type alias for lexicographical ordering definition `Vec` pub type LexOrdering = Vec; From 48a602e2c19d019841ab50e999c3d5085ac5c7e1 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Sat, 3 Jun 2023 02:02:24 +0300 Subject: [PATCH 09/12] change function name --- datafusion/physical-expr/src/equivalence.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 148b147d18c32..d60498bc43345 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -436,7 +436,7 @@ pub fn project_ordering_equivalence_properties( .iter() .filter(|sort_exprs| { sort_exprs.iter().any(|sort_expr| { - let col_infos = get_column_infos(&sort_expr.expr); + let col_infos = get_column_indices(&sort_expr.expr); // If any one of the columns, used in Expression is invalid, remove expression // from ordering equivalences col_infos.into_iter().any(|(idx, name)| { @@ -593,12 +593,12 @@ mod tests { #[test] fn test_get_column_infos() -> Result<()> { let expr1 = Arc::new(Column::new("col1", 2)) as _; - assert_eq!(get_column_infos(&expr1), vec![(2, "col1".to_string())]); + assert_eq!(get_column_indices(&expr1), vec![(2, "col1".to_string())]); let expr2 = Arc::new(Column::new("col2", 5)) as _; - assert_eq!(get_column_infos(&expr2), vec![(5, "col2".to_string())]); + assert_eq!(get_column_indices(&expr2), vec![(5, "col2".to_string())]); let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as _; assert_eq!( - get_column_infos(&expr3), + get_column_indices(&expr3), vec![(2, "col1".to_string()), (5, "col2".to_string())] ); Ok(()) From bebf5a56fa28d766b7e6182a873ecfda9dba8499 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 6 Jun 2023 14:48:35 +0300 Subject: [PATCH 10/12] Add hash support for PhysicalExpr and PhysicalSortExpr --- datafusion/physical-expr/src/equivalence.rs | 91 +++---------------- .../physical-expr/src/expressions/binary.rs | 8 +- .../physical-expr/src/expressions/case.rs | 8 +- .../physical-expr/src/expressions/cast.rs | 7 ++ .../physical-expr/src/expressions/column.rs | 11 +++ .../physical-expr/src/expressions/datetime.rs | 8 +- .../src/expressions/get_indexed_field.rs | 8 +- .../physical-expr/src/expressions/in_list.rs | 7 ++ .../src/expressions/is_not_null.rs | 8 +- .../physical-expr/src/expressions/is_null.rs | 8 +- .../physical-expr/src/expressions/like.rs | 8 +- .../physical-expr/src/expressions/literal.rs | 8 +- .../physical-expr/src/expressions/negative.rs | 8 +- .../physical-expr/src/expressions/no_op.rs | 8 +- .../physical-expr/src/expressions/not.rs | 8 +- .../physical-expr/src/expressions/try_cast.rs | 8 +- datafusion/physical-expr/src/physical_expr.rs | 9 ++ .../physical-expr/src/scalar_function.rs | 7 ++ datafusion/physical-expr/src/sort_expr.rs | 10 ++ 19 files changed, 149 insertions(+), 89 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 78279851bba5e..c4a39fd678c39 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -23,7 +23,8 @@ use crate::{ use arrow::datatypes::SchemaRef; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; /// Represents a collection of [`EquivalentClass`] (equivalences @@ -39,7 +40,7 @@ pub struct EquivalenceProperties { schema: SchemaRef, } -impl EquivalenceProperties { +impl EquivalenceProperties { pub fn new(schema: SchemaRef) -> Self { EquivalenceProperties { classes: vec![], @@ -113,33 +114,6 @@ impl EquivalenceProperties { } } -/// Remove duplicates inside the `in_data` vector, returned vector would consist of unique entries -fn deduplicate_vector(in_data: Vec) -> Vec { - let mut result = vec![]; - for elem in in_data { - if !result.contains(&elem) { - result.push(elem); - } - } - result -} - -/// Find the position of `entry` inside `in_data`, if `entry` is not found return `None`. -fn get_entry_position(in_data: &[T], entry: &T) -> Option { - in_data.iter().position(|item| item.eq(entry)) -} - -/// Remove `entry` for the `in_data`, returns `true` if removal is successful (e.g `entry` is indeed in the `in_data`) -/// Otherwise return `false` -fn remove_from_vec(in_data: &mut Vec, entry: &T) -> bool { - if let Some(idx) = get_entry_position(in_data, entry) { - in_data.remove(idx); - true - } else { - false - } -} - // Helper function to calculate column info recursively fn get_column_indices_helper( indices: &mut Vec<(usize, String)>, @@ -187,20 +161,22 @@ pub struct EquivalentClass { /// First element in the EquivalentClass head: T, /// Other equal columns - others: Vec, + others: HashSet, } -impl EquivalentClass { +impl EquivalentClass { pub fn new(head: T, others: Vec) -> EquivalentClass { - let others = deduplicate_vector(others); - EquivalentClass { head, others } + EquivalentClass { + head, + others: HashSet::from_iter(others), + } } pub fn head(&self) -> &T { &self.head } - pub fn others(&self) -> &[T] { + pub fn others(&self) -> &HashSet { &self.others } @@ -209,21 +185,16 @@ impl EquivalentClass { } pub fn insert(&mut self, col: T) -> bool { - if self.head != col && !self.others.contains(&col) { - self.others.push(col); - true - } else { - false - } + self.head != col && self.others.insert(col) } pub fn remove(&mut self, col: &T) -> bool { - let removed = remove_from_vec(&mut self.others, col); + let removed = self.others.remove(col); // If we are removing the head, shift others so that its first entry becomes the new head. if !removed && *col == self.head { - let one_col = self.others.first().cloned(); + let one_col = self.others.iter().next().cloned(); if let Some(col) = one_col { - let removed = remove_from_vec(&mut self.others, &col); + let removed = self.others.remove(&col); self.head = col; removed } else { @@ -556,40 +527,6 @@ mod tests { Ok(()) } - #[test] - fn test_deduplicate_vector() -> Result<()> { - assert_eq!(deduplicate_vector(vec![1, 1, 2, 3, 3]), vec![1, 2, 3]); - assert_eq!( - deduplicate_vector(vec![1, 2, 3, 4, 3, 2, 1, 0]), - vec![1, 2, 3, 4, 0] - ); - Ok(()) - } - - #[test] - fn test_get_entry_position() -> Result<()> { - assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &2), Some(2)); - assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &1), Some(0)); - assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &5), None); - Ok(()) - } - - #[test] - fn test_remove_from_vec() -> Result<()> { - let mut in_data = vec![1, 1, 2, 3, 3]; - remove_from_vec(&mut in_data, &5); - assert_eq!(in_data, vec![1, 1, 2, 3, 3]); - remove_from_vec(&mut in_data, &2); - assert_eq!(in_data, vec![1, 1, 3, 3]); - remove_from_vec(&mut in_data, &2); - assert_eq!(in_data, vec![1, 1, 3, 3]); - remove_from_vec(&mut in_data, &3); - assert_eq!(in_data, vec![1, 1, 3]); - remove_from_vec(&mut in_data, &3); - assert_eq!(in_data, vec![1, 1]); - Ok(()) - } - #[test] fn test_get_column_infos() -> Result<()> { let expr1 = Arc::new(Column::new("col1", 2)) as _; diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index e5b66d4a39871..1ec7e00c89d45 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -19,6 +19,7 @@ mod adapter; mod kernels; mod kernels_arrow; +use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; use arrow::array::*; @@ -96,7 +97,7 @@ use datafusion_expr::type_coercion::binary::{ use datafusion_expr::{ColumnarValue, Operator}; /// Binary expression -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct BinaryExpr { left: Arc, op: Operator, @@ -837,6 +838,11 @@ impl PhysicalExpr for BinaryExpr { }; Ok(vec![left, right]) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for BinaryExpr { diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 903ccda62f084..91fa9bbb93092 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -16,6 +16,7 @@ // under the License. use std::borrow::Cow; +use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; use crate::expressions::try_cast; @@ -51,7 +52,7 @@ type WhenThen = (Arc, Arc); /// [WHEN ...] /// [ELSE result] /// END -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct CaseExpr { /// Optional base expression that can be compared to literal values in the "when" expressions expr: Option>, @@ -348,6 +349,11 @@ impl PhysicalExpr for CaseExpr { )?)) } } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for CaseExpr { diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 8e4e1b57e8c24..850049b969a0a 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -17,6 +17,7 @@ use std::any::Any; use std::fmt; +use std::hash::Hasher; use std::sync::Arc; use crate::intervals::Interval; @@ -132,6 +133,12 @@ impl PhysicalExpr for CastExpr { interval.cast_to(&cast_type, &self.cast_options)?, )]) } + + fn dyn_hash(&self, _state: &mut dyn Hasher) { + // `self.cast_options` doesn't support hashing + // Hence we cannot calculate `dyn_hash` for this type. + unimplemented!(); + } } impl PartialEq for CastExpr { diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index eb2be5ef217c1..9eca9bf71326a 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -18,6 +18,7 @@ //! Column expression use std::any::Any; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use arrow::{ @@ -109,6 +110,11 @@ impl PhysicalExpr for Column { let col_bounds = context.column_boundaries[self.index].clone(); context.with_boundaries(col_bounds) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for Column { @@ -191,6 +197,11 @@ impl PhysicalExpr for UnKnownColumn { ) -> Result> { Ok(self) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for UnKnownColumn { diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs index f1933c1d180a6..4d0ee5cc7dbca 100644 --- a/datafusion/physical-expr/src/expressions/datetime.rs +++ b/datafusion/physical-expr/src/expressions/datetime.rs @@ -27,12 +27,13 @@ use datafusion_expr::type_coercion::binary::get_result_type; use datafusion_expr::{ColumnarValue, Operator}; use std::any::Any; use std::fmt::{Display, Formatter}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use super::binary::{resolve_temporal_op, resolve_temporal_op_scalar}; /// Perform DATE/TIME/TIMESTAMP +/ INTERVAL math -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct DateTimeIntervalExpr { lhs: Arc, op: Operator, @@ -185,6 +186,11 @@ impl PhysicalExpr for DateTimeIntervalExpr { children[1].clone(), ))) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for DateTimeIntervalExpr { diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs b/datafusion/physical-expr/src/expressions/get_indexed_field.rs index c07641796aa4d..090cfe5a6e64c 100644 --- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs +++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs @@ -35,10 +35,11 @@ use datafusion_expr::{ }; use std::convert::TryInto; use std::fmt::Debug; +use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; /// expression to get a field of a struct array. -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct GetIndexedFieldExpr { arg: Arc, key: ScalarValue, @@ -153,6 +154,11 @@ impl PhysicalExpr for GetIndexedFieldExpr { self.key.clone(), ))) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for GetIndexedFieldExpr { diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 3feb728900adc..451cec53ce889 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -20,6 +20,7 @@ use ahash::RandomState; use std::any::Any; use std::fmt::Debug; +use std::hash::Hasher; use std::sync::Arc; use crate::hash_utils::HashValue; @@ -330,6 +331,12 @@ impl PhysicalExpr for InListExpr { self.static_filter.clone(), ))) } + + fn dyn_hash(&self, _state: &mut dyn Hasher) { + // `self.static_filter` doesn't support hashing. Hence + // we cannot calculate hash for this type. + unimplemented!(); + } } impl PartialEq for InListExpr { diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 32e53e0c1edea..da717a517fb37 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -17,6 +17,7 @@ //! IS NOT NULL expression +use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; use crate::physical_expr::down_cast_any_ref; @@ -31,7 +32,7 @@ use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; /// IS NOT NULL expression -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct IsNotNullExpr { /// The input expression arg: Arc, @@ -91,6 +92,11 @@ impl PhysicalExpr for IsNotNullExpr { ) -> Result> { Ok(Arc::new(IsNotNullExpr::new(children[0].clone()))) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for IsNotNullExpr { diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 85e111440aaf5..ee7897edd4de6 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -17,6 +17,7 @@ //! IS NULL expression +use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; use arrow::compute; @@ -32,7 +33,7 @@ use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; /// IS NULL expression -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct IsNullExpr { /// Input expression arg: Arc, @@ -92,6 +93,11 @@ impl PhysicalExpr for IsNullExpr { ) -> Result> { Ok(Arc::new(IsNullExpr::new(children[0].clone()))) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for IsNullExpr { diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs index 456e477a1e535..f549613acbf83 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; use arrow::{ @@ -35,7 +36,7 @@ use arrow::compute::kernels::comparison::{ }; // Like expression -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct LikeExpr { negated: bool, case_insensitive: bool, @@ -186,6 +187,11 @@ impl PhysicalExpr for LikeExpr { fn analyze(&self, context: AnalysisContext) -> AnalysisContext { context.with_boundaries(None) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for LikeExpr { diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 013169ccf7852..8cb2bd5b950d6 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -18,6 +18,7 @@ //! Literal expressions for physical operations use std::any::Any; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use arrow::{ @@ -32,7 +33,7 @@ use datafusion_common::ScalarValue; use datafusion_expr::{ColumnarValue, Expr}; /// Represents a literal value -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Hash)] pub struct Literal { value: ScalarValue, } @@ -93,6 +94,11 @@ impl PhysicalExpr for Literal { Some(1), ))) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for Literal { diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index 0d6aec879e567..7f1bd43fec702 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -18,6 +18,7 @@ //! Negation (-) expression use std::any::Any; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use arrow::array::ArrayRef; @@ -52,7 +53,7 @@ macro_rules! compute_op { } /// Negative expression -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct NegativeExpr { /// Input expression arg: Arc, @@ -128,6 +129,11 @@ impl PhysicalExpr for NegativeExpr { ) -> Result> { Ok(Arc::new(NegativeExpr::new(children[0].clone()))) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for NegativeExpr { diff --git a/datafusion/physical-expr/src/expressions/no_op.rs b/datafusion/physical-expr/src/expressions/no_op.rs index 7c7d8cc8977dc..584d1d66955da 100644 --- a/datafusion/physical-expr/src/expressions/no_op.rs +++ b/datafusion/physical-expr/src/expressions/no_op.rs @@ -18,6 +18,7 @@ //! NoOp placeholder for physical operations use std::any::Any; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use arrow::{ @@ -33,7 +34,7 @@ use datafusion_expr::ColumnarValue; /// A place holder expression, can not be evaluated. /// /// Used in some cases where an `Arc` is needed, such as `children()` -#[derive(Debug, PartialEq, Eq, Default)] +#[derive(Debug, PartialEq, Eq, Default, Hash)] pub struct NoOp {} impl NoOp { @@ -79,6 +80,11 @@ impl PhysicalExpr for NoOp { ) -> Result> { Ok(self) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for NoOp { diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 32040547b4f73..f64fbe1a2ea87 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -19,6 +19,7 @@ use std::any::Any; use std::fmt; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; @@ -33,7 +34,7 @@ use datafusion_common::{cast::as_boolean_array, DataFusionError, Result, ScalarV use datafusion_expr::ColumnarValue; /// Not expression -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct NotExpr { /// Input expression arg: Arc, @@ -145,6 +146,11 @@ impl PhysicalExpr for NotExpr { ) -> Result> { Ok(Arc::new(NotExpr::new(children[0].clone()))) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for NotExpr { diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs index bbb29d6fb5b0a..92ffaa1a88421 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr/src/expressions/try_cast.rs @@ -17,6 +17,7 @@ use std::any::Any; use std::fmt; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; @@ -31,7 +32,7 @@ use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; /// TRY_CAST expression casts an expression to a specific data type and retuns NULL on invalid cast -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct TryCastExpr { /// The expression to cast expr: Arc, @@ -105,6 +106,11 @@ impl PhysicalExpr for TryCastExpr { self.cast_type.clone(), ))) } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.hash(&mut s); + } } impl PartialEq for TryCastExpr { diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index d6dd14e8a116d..b8c7a62d69e31 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -33,6 +33,7 @@ use arrow::compute::{and_kleene, filter_record_batch, is_not_null, SlicesIterato use crate::intervals::Interval; use std::any::Any; +use std::hash::{Hash, Hasher}; use std::sync::Arc; /// Expression that can be evaluated against a RecordBatch @@ -104,6 +105,14 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { "Not implemented for {self}" ))) } + + fn dyn_hash(&self, _state: &mut dyn Hasher); +} + +impl Hash for dyn PhysicalExpr { + fn hash(&self, state: &mut H) { + self.dyn_hash(state); + } } /// Shared [`PhysicalExpr`]. diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index da47a55aa9e39..87cbf6999d210 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -41,6 +41,7 @@ use datafusion_expr::ScalarFunctionImplementation; use std::any::Any; use std::fmt::Debug; use std::fmt::{self, Formatter}; +use std::hash::Hasher; use std::sync::Arc; /// Physical expression of a scalar function @@ -162,6 +163,12 @@ impl PhysicalExpr for ScalarFunctionExpr { self.return_type(), ))) } + + fn dyn_hash(&self, _state: &mut dyn Hasher) { + // hashing for `self.fun` is not supported + // Hence we cannot calculate hash for this type + unimplemented!(); + } } impl PartialEq for ScalarFunctionExpr { diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index 665a47e586e29..7622c119c86b3 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -22,6 +22,7 @@ use arrow::compute::kernels::sort::{SortColumn, SortOptions}; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; +use std::hash::{Hash, Hasher}; use std::sync::Arc; /// Represents Sort operation for a column in a RecordBatch @@ -39,6 +40,15 @@ impl PartialEq for PhysicalSortExpr { } } +impl Eq for PhysicalSortExpr {} + +impl Hash for PhysicalSortExpr { + fn hash(&self, state: &mut H) { + self.expr.hash(state); + self.options.hash(state); + } +} + impl std::fmt::Display for PhysicalSortExpr { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{} {}", self.expr, to_str(&self.options)) From c5fc8f98688bc03ad536228d58c097973fae1f63 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Fri, 9 Jun 2023 14:38:15 -0500 Subject: [PATCH 11/12] Better commenting --- datafusion/physical-expr/src/equivalence.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 529e95e06e025..52bde7475ba5f 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -190,14 +190,15 @@ impl EquivalentClass { pub fn remove(&mut self, col: &T) -> bool { let removed = self.others.remove(col); - // If we are removing the head, shift others so that its first entry becomes the new head. + // If we are removing the head, adjust others so that its first entry becomes the new head. if !removed && *col == self.head { - let one_col = self.others.iter().next().cloned(); - if let Some(col) = one_col { + if let Some(col) = self.others.iter().next().cloned() { let removed = self.others.remove(&col); self.head = col; removed } else { + // We don't allow empty equivalence classes, reject removal if one tries removing + // the only element in an equivalence class. false } } else { From 274be0aed6de147c5ff4f088c0656338a1933add Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 12 Jun 2023 11:14:57 +0300 Subject: [PATCH 12/12] Address reviews --- .../physical-expr/src/expressions/cast.rs | 12 ++++---- .../physical-expr/src/expressions/in_list.rs | 12 ++++---- datafusion/physical-expr/src/physical_expr.rs | 30 +++++++++++++++++++ .../physical-expr/src/scalar_function.rs | 12 ++++---- 4 files changed, 51 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 850049b969a0a..b5916def86119 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -17,7 +17,7 @@ use std::any::Any; use std::fmt; -use std::hash::Hasher; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::intervals::Interval; @@ -134,10 +134,12 @@ impl PhysicalExpr for CastExpr { )]) } - fn dyn_hash(&self, _state: &mut dyn Hasher) { - // `self.cast_options` doesn't support hashing - // Hence we cannot calculate `dyn_hash` for this type. - unimplemented!(); + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.expr.hash(&mut s); + self.cast_type.hash(&mut s); + // Add `self.cast_options` when hash is available + // https://github.com/apache/arrow-rs/pull/4395 } } diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 451cec53ce889..0bcddb4ec8b3a 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -20,7 +20,7 @@ use ahash::RandomState; use std::any::Any; use std::fmt::Debug; -use std::hash::Hasher; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::hash_utils::HashValue; @@ -332,10 +332,12 @@ impl PhysicalExpr for InListExpr { ))) } - fn dyn_hash(&self, _state: &mut dyn Hasher) { - // `self.static_filter` doesn't support hashing. Hence - // we cannot calculate hash for this type. - unimplemented!(); + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.expr.hash(&mut s); + self.negated.hash(&mut s); + self.list.hash(&mut s); + // Add `self.static_filter` when hash is available } } diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index b8c7a62d69e31..68525920a0d1a 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -106,6 +106,36 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { ))) } + /// Update the hash `state` with this expression requirements from + /// [`Hash`]. + /// + /// This method is required to support hashing [`PhysicalExpr`]s. To + /// implement it, typically the type implementing + /// [`PhysicalExpr`] implements [`Hash`] and + /// then the following boiler plate is used: + /// + /// # Example: + /// ``` + /// // User defined expression that derives Hash + /// #[derive(Hash, Debug, PartialEq, Eq)] + /// struct MyExpr { + /// val: u64 + /// } + /// + /// // impl PhysicalExpr { + /// // ... + /// # impl MyExpr { + /// // Boiler plate to call the derived Hash impl + /// fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) { + /// use std::hash::Hash; + /// let mut s = state; + /// self.hash(&mut s); + /// } + /// // } + /// # } + /// ``` + /// Note: [`PhysicalExpr`] is not constrained by [`Hash`] + /// directly because it must remain object safe. fn dyn_hash(&self, _state: &mut dyn Hasher); } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 87cbf6999d210..ef771a6784537 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -41,7 +41,7 @@ use datafusion_expr::ScalarFunctionImplementation; use std::any::Any; use std::fmt::Debug; use std::fmt::{self, Formatter}; -use std::hash::Hasher; +use std::hash::{Hash, Hasher}; use std::sync::Arc; /// Physical expression of a scalar function @@ -164,10 +164,12 @@ impl PhysicalExpr for ScalarFunctionExpr { ))) } - fn dyn_hash(&self, _state: &mut dyn Hasher) { - // hashing for `self.fun` is not supported - // Hence we cannot calculate hash for this type - unimplemented!(); + fn dyn_hash(&self, state: &mut dyn Hasher) { + let mut s = state; + self.name.hash(&mut s); + self.args.hash(&mut s); + self.return_type.hash(&mut s); + // Add `self.fun` when hash is available } }