From fa9511f22e86cf61bd9b1df797843c5f0a05b52a Mon Sep 17 00:00:00 2001 From: Wang Date: Mon, 31 Oct 2022 22:11:17 +0800 Subject: [PATCH 1/8] [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement --- datafusion/core/src/dataframe.rs | 80 +++++ .../core/src/physical_optimizer/merge_exec.rs | 35 +- .../core/src/physical_plan/aggregates/mod.rs | 72 +++- datafusion/core/src/physical_plan/analyze.rs | 8 +- .../src/physical_plan/coalesce_batches.rs | 11 +- .../src/physical_plan/coalesce_partitions.rs | 8 +- datafusion/core/src/physical_plan/empty.rs | 6 +- datafusion/core/src/physical_plan/explain.rs | 4 - .../src/physical_plan/file_format/avro.rs | 4 - .../core/src/physical_plan/file_format/csv.rs | 4 - .../src/physical_plan/file_format/json.rs | 4 - .../src/physical_plan/file_format/parquet.rs | 4 - datafusion/core/src/physical_plan/filter.rs | 96 +++++- .../src/physical_plan/joins/cross_join.rs | 28 +- .../core/src/physical_plan/joins/hash_join.rs | 73 +++- .../physical_plan/joins/sort_merge_join.rs | 145 +++++++- .../core/src/physical_plan/joins/utils.rs | 123 ++++++- datafusion/core/src/physical_plan/limit.rs | 24 +- datafusion/core/src/physical_plan/memory.rs | 4 - datafusion/core/src/physical_plan/mod.rs | 61 +++- datafusion/core/src/physical_plan/planner.rs | 40 ++- .../core/src/physical_plan/projection.rs | 88 ++++- .../core/src/physical_plan/repartition.rs | 12 +- .../core/src/physical_plan/sorts/sort.rs | 18 +- .../sorts/sort_preserving_merge.rs | 13 +- datafusion/core/src/physical_plan/union.rs | 179 ++++++++-- datafusion/core/src/physical_plan/values.rs | 13 +- .../core/src/physical_plan/windows/mod.rs | 4 + .../physical_plan/windows/window_agg_exec.rs | 39 ++- .../core/src/scheduler/pipeline/execution.rs | 4 +- datafusion/core/tests/user_defined_plan.rs | 8 +- .../physical-expr/src/expressions/column.rs | 70 ++++ .../physical-expr/src/expressions/mod.rs | 2 +- datafusion/physical-expr/src/lib.rs | 12 +- datafusion/physical-expr/src/physical_expr.rs | 63 ++++ datafusion/physical-expr/src/utils.rs | 311 +++++++++++++++++- 36 files changed, 1438 insertions(+), 232 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index a699a234cd6d1..188541049f6f9 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -1519,4 +1519,84 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn partition_aware_union() -> Result<()> { + let left = test_table().await?.select_columns(&["c1", "c2"])?; + let right = test_table_with_name("c2") + .await? + .select_columns(&["c1", "c3"])? + .with_column_renamed("c2.c1", "c2_c1")?; + + let left_rows = left.collect().await?; + let right_rows = right.collect().await?; + let join1 = + left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], None)?; + let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?; + + let union = join1.union(join2)?; + + let union_rows = union.collect().await?; + + assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::()); + assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::()); + assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::()); + + let physical_plan = union.create_physical_plan().await?; + let default_partition_count = + SessionContext::new().copied_config().target_partitions; + assert_eq!( + physical_plan.output_partitioning().partition_count(), + default_partition_count + ); + Ok(()) + } + + #[tokio::test] + async fn non_partition_aware_union() -> Result<()> { + let left = test_table().await?.select_columns(&["c1", "c2"])?; + let right = test_table_with_name("c2") + .await? + .select_columns(&["c1", "c2"])? + .with_column_renamed("c2.c1", "c2_c1")? + .with_column_renamed("c2.c2", "c2_c2")?; + + let left_rows = left.collect().await?; + let right_rows = right.collect().await?; + let join1 = left.join( + right.clone(), + JoinType::Inner, + &["c1", "c2"], + &["c2_c1", "c2_c2"], + None, + )?; + + // join key ordering is different + let join2 = left.join( + right, + JoinType::Inner, + &["c2", "c1"], + &["c2_c2", "c2_c1"], + None, + )?; + + let union = join1.union(join2)?; + + let union_rows = union.collect().await?; + + assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::()); + assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::()); + assert_eq!(916, union_rows.iter().map(|x| x.num_rows()).sum::()); + + let physical_plan = union.create_physical_plan().await?; + let default_partition_count = + SessionContext::new().copied_config().target_partitions; + + // the union's output partitioning count should be the combination of all output partitions count + assert_eq!( + physical_plan.output_partitioning().partition_count(), + default_partition_count * 2 + ); + Ok(()) + } } diff --git a/datafusion/core/src/physical_optimizer/merge_exec.rs b/datafusion/core/src/physical_optimizer/merge_exec.rs index f614673f500e6..a62bb1a026246 100644 --- a/datafusion/core/src/physical_optimizer/merge_exec.rs +++ b/datafusion/core/src/physical_optimizer/merge_exec.rs @@ -52,27 +52,20 @@ impl PhysicalOptimizerRule for AddCoalescePartitionsExec { .iter() .map(|child| self.optimize(child.clone(), config)) .collect::>>()?; - match plan.required_child_distribution() { - Distribution::UnspecifiedDistribution => { - with_new_children_if_necessary(plan, children) - } - Distribution::HashPartitioned(_) => { - with_new_children_if_necessary(plan, children) - } - Distribution::SinglePartition => with_new_children_if_necessary( - plan, - children - .iter() - .map(|child| { - if child.output_partitioning().partition_count() == 1 { - child.clone() - } else { - Arc::new(CoalescePartitionsExec::new(child.clone())) - } - }) - .collect(), - ), - } + assert_eq!(children.len(), plan.required_input_distribution().len()); + let new_children = children + .into_iter() + .zip(plan.required_input_distribution()) + .map(|(child, dist)| match dist { + Distribution::SinglePartition + if child.output_partitioning().partition_count() > 1 => + { + Arc::new(CoalescePartitionsExec::new(child.clone())) + } + _ => child, + }) + .collect::>(); + with_new_children_if_necessary(plan, new_children) } } diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 2c4a9b26c3994..d69c2d85c3f92 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -37,6 +37,7 @@ use datafusion_physical_expr::{ expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr, }; use std::any::Any; +use std::collections::HashMap; use std::sync::Arc; @@ -45,9 +46,14 @@ mod no_grouping; mod row_hash; use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStreamV2; +use crate::physical_plan::EquivalenceProperties; pub use datafusion_expr::AggregateFunction; use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator; pub use datafusion_physical_expr::expressions::create_aggregate_expr; +use datafusion_physical_expr::{ + merge_equivalence_properties_with_alias, normalize_out_expr_with_alias_schema, + truncate_equivalence_properties_not_in_schema, +}; use datafusion_row::{row_supported, RowType}; /// Hash aggregate modes @@ -163,6 +169,8 @@ pub struct AggregateExec { /// same as input.schema() but for the final aggregate it will be the same as the input /// to the partial aggregate input_schema: SchemaRef, + /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr + alias_map: HashMap>, /// Execution Metrics metrics: ExecutionPlanMetricsSet, } @@ -186,6 +194,18 @@ impl AggregateExec { let schema = Arc::new(schema); + let mut alias_map: HashMap> = HashMap::new(); + for (expression, name) in group_by.expr.iter() { + if let Some(column) = expression.as_any().downcast_ref::() { + let new_col_idx = schema.index_of(name)?; + // When the column name is the same, but index does not equal, treat it as Alias + if (column.name() != name) || (column.index() != new_col_idx) { + let entry = alias_map.entry(column.clone()).or_insert_with(Vec::new); + entry.push(Column::new(name, new_col_idx)); + } + }; + } + Ok(AggregateExec { mode, group_by, @@ -193,6 +213,7 @@ impl AggregateExec { input, schema, input_schema, + alias_map, metrics: ExecutionPlanMetricsSet::new(), }) } @@ -255,25 +276,58 @@ impl ExecutionPlan for AggregateExec { /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { - self.input.output_partitioning() + match &self.mode { + AggregateMode::Partial => { + // Partial Aggregation will not change the output partitioning but need to respect the Alias + let input_partition = self.input.output_partitioning(); + match input_partition { + Partitioning::Hash(exprs, part) => { + let normalized_exprs = exprs + .into_iter() + .map(|expr| { + normalize_out_expr_with_alias_schema( + expr, + &self.alias_map, + &self.schema, + ) + }) + .collect::>(); + Partitioning::Hash(normalized_exprs, part) + } + _ => input_partition, + } + } + // Final Aggregation's output partitioning is the same as its real input + _ => self.input.output_partitioning(), + } } + // TODO check the output ordering of AggregateExec fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } - fn required_child_distribution(&self) -> Distribution { + fn required_input_distribution(&self) -> Vec { match &self.mode { - AggregateMode::Partial => Distribution::UnspecifiedDistribution, - AggregateMode::FinalPartitioned => Distribution::HashPartitioned( - self.group_by.expr.iter().map(|x| x.0.clone()).collect(), - ), - AggregateMode::Final => Distribution::SinglePartition, + AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution], + AggregateMode::FinalPartitioned => { + vec![Distribution::HashPartitioned(self.output_group_expr())] + } + AggregateMode::Final => vec![Distribution::SinglePartition], } } - fn relies_on_input_order(&self) -> bool { - false + fn equivalence_properties(&self) -> Vec { + let mut input_equivalence_properties = self.input.equivalence_properties(); + merge_equivalence_properties_with_alias( + &mut input_equivalence_properties, + &self.alias_map, + ); + truncate_equivalence_properties_not_in_schema( + &mut input_equivalence_properties, + &self.schema, + ); + input_equivalence_properties } fn children(&self) -> Vec> { diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index 8134ee7d2f2da..b0578bd486dea 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -72,8 +72,8 @@ impl ExecutionPlan for AnalyzeExec { } /// Specifies we want the input as a single stream - fn required_child_distribution(&self) -> Distribution { - Distribution::SinglePartition + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] } /// Get the output partitioning of this plan @@ -85,10 +85,6 @@ impl ExecutionPlan for AnalyzeExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index 317500ddc904f..8f0de7dcb15dd 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -25,8 +25,8 @@ use std::task::{Context, Poll}; use crate::error::Result; use crate::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, + DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, }; use crate::execution::context::TaskContext; @@ -96,12 +96,15 @@ impl ExecutionPlan for CoalesceBatchesExec { self.input.output_partitioning() } + // Depends on how the CoalesceBatches was implemented, it is possible to keep + // the input ordering when combines small batches into larger batches + // TODO revisit the logic later fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } - fn relies_on_input_order(&self) -> bool { - false + fn equivalence_properties(&self) -> Vec { + self.input.equivalence_properties() } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index d1c797eacd5c9..cdd07390ac114 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -33,7 +33,9 @@ use super::expressions::PhysicalSortExpr; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{RecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; +use crate::physical_plan::{ + DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, +}; use super::SendableRecordBatchStream; use crate::execution::context::TaskContext; @@ -87,8 +89,8 @@ impl ExecutionPlan for CoalescePartitionsExec { None } - fn relies_on_input_order(&self) -> bool { - false + fn equivalence_properties(&self) -> Vec { + self.input.equivalence_properties() } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs index c693764c87aa0..4751dade1ddab 100644 --- a/datafusion/core/src/physical_plan/empty.rs +++ b/datafusion/core/src/physical_plan/empty.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ - memory::MemoryStream, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning, }; use arrow::array::NullArray; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -98,10 +98,6 @@ impl ExecutionPlan for EmptyExec { vec![] } - fn required_child_distribution(&self) -> Distribution { - Distribution::UnspecifiedDistribution - } - /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { Partitioning::UnknownPartitioning(self.partitions) diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs index 15f459fb045b1..ac350b1837e8b 100644 --- a/datafusion/core/src/physical_plan/explain.rs +++ b/datafusion/core/src/physical_plan/explain.rs @@ -97,10 +97,6 @@ impl ExecutionPlan for ExplainExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index 2aab84fadbcfc..38178a976804f 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -76,10 +76,6 @@ impl ExecutionPlan for AvroExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn children(&self) -> Vec> { Vec::new() } diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index d086a77982357..51180c0f00a8c 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -109,10 +109,6 @@ impl ExecutionPlan for CsvExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } - fn relies_on_input_order(&self) -> bool { - false - } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index c8c5d71bd73f2..ceb9e79589343 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -91,10 +91,6 @@ impl ExecutionPlan for NdJsonExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn children(&self) -> Vec> { Vec::new() } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 0dda94322619a..19607ecad50ec 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -290,10 +290,6 @@ impl ExecutionPlan for ParquetExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index b4e3edaee05fd..58ac1adec5d26 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -28,13 +28,19 @@ use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, - DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, + Column, DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, + PhysicalExpr, }; use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::BinaryExpr; +use datafusion_physical_expr::{ + combine_equivalence_properties, remove_equivalence_properties, split_predicate, +}; use log::debug; @@ -113,8 +119,17 @@ impl ExecutionPlan for FilterExec { true } - fn relies_on_input_order(&self) -> bool { - false + fn equivalence_properties(&self) -> Vec { + // Combine the equal predicates with the input equivalence properties + let mut input_properties = self.input.equivalence_properties(); + let (equal_pairs, ne_pairs) = collect_columns_from_predicate(&self.predicate); + for new_condition in equal_pairs { + combine_equivalence_properties(&mut input_properties, new_condition) + } + for remove_condition in ne_pairs { + remove_equivalence_properties(&mut input_properties, remove_condition) + } + input_properties } fn with_new_children( @@ -231,6 +246,38 @@ impl RecordBatchStream for FilterExecStream { } } +/// Return the equals Column-Pairs and Non-equals Column-Pairs +fn collect_columns_from_predicate(predicate: &Arc) -> EqualAndNonEqual { + let mut eq_predicate_columns: Vec<(&Column, &Column)> = Vec::new(); + let mut ne_predicate_columns: Vec<(&Column, &Column)> = Vec::new(); + + let predicates = split_predicate(predicate); + predicates.into_iter().for_each(|p| { + if let Some(binary) = p.as_any().downcast_ref::() { + let left = binary.left(); + let right = binary.right(); + if left.as_any().is::() && right.as_any().is::() { + let left_column = left.as_any().downcast_ref::().unwrap(); + let right_column = right.as_any().downcast_ref::().unwrap(); + match binary.op() { + Operator::Eq => { + eq_predicate_columns.push((left_column, right_column)) + } + Operator::NotEq => { + ne_predicate_columns.push((left_column, right_column)) + } + _ => {} + } + } + } + }); + + (eq_predicate_columns, ne_predicate_columns) +} +/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates +pub type EqualAndNonEqual<'a> = + (Vec<(&'a Column, &'a Column)>, Vec<(&'a Column, &'a Column)>); + #[cfg(test)] mod tests { @@ -295,4 +342,47 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn collect_columns_predicates() -> Result<()> { + let schema = test_util::aggr_test_schema(); + let predicate: Arc = binary( + binary( + binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?, + Operator::And, + binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?, + &schema, + )?, + Operator::And, + binary( + binary( + col("c2", &schema)?, + Operator::Eq, + col("c9", &schema)?, + &schema, + )?, + Operator::And, + binary( + col("c1", &schema)?, + Operator::NotEq, + col("c13", &schema)?, + &schema, + )?, + &schema, + )?, + &schema, + )?; + + let (equal_pairs, ne_pairs) = collect_columns_from_predicate(&predicate); + + assert_eq!(1, equal_pairs.len()); + assert_eq!(equal_pairs[0].0.name(), "c2"); + assert_eq!(equal_pairs[0].1.name(), "c9"); + + assert_eq!(1, ne_pairs.len()); + assert_eq!(ne_pairs[0].0.name(), "c1"); + assert_eq!(ne_pairs[0].1.name(), "c13"); + + Ok(()) + } } diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index 7a35116a46585..fc86b07cee259 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -29,16 +29,19 @@ use arrow::record_batch::RecordBatch; use crate::execution::context::TaskContext; use crate::physical_plan::{ coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, - ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, Statistics, + ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan, + Partitioning, PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream, + Statistics, }; use crate::{error::Result, scalar::ScalarValue}; use async_trait::async_trait; -use datafusion_physical_expr::PhysicalSortExpr; use log::debug; use std::time::Instant; -use super::utils::{check_join_is_valid, OnceAsync, OnceFut}; +use super::utils::{ + adjust_right_output_partitioning, check_join_is_valid, + cross_join_equivalence_properties, OnceAsync, OnceFut, +}; /// Data of the left side type JoinLeftData = RecordBatch; @@ -153,16 +156,27 @@ impl ExecutionPlan for CrossJoinExec { )?)) } + // TODO optimize CrossJoin implementation to generate M * N partitions fn output_partitioning(&self) -> Partitioning { - self.right.output_partitioning() + let left_columns_len = self.left.schema().fields.len(); + adjust_right_output_partitioning( + self.right.output_partitioning(), + left_columns_len, + ) } + // TODO check the output ordering of CrossJoin fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } - fn relies_on_input_order(&self) -> bool { - false + fn equivalence_properties(&self) -> Vec { + let left_columns_len = self.left.schema().fields.len(); + cross_join_equivalence_properties( + self.left.equivalence_properties(), + self.right.equivalence_properties(), + left_columns_len, + ) } fn execute( diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index b41c0df1eb0d8..f7a839f62ebea 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -62,12 +62,13 @@ use crate::physical_plan::{ expressions::PhysicalSortExpr, hash_utils::create_hashes, joins::utils::{ - build_join_schema, check_join_is_valid, estimate_join_statistics, ColumnIndex, - JoinFilter, JoinOn, JoinSide, + build_join_schema, check_join_is_valid, estimate_join_statistics, + join_equivalence_properties, join_output_partitioning, ColumnIndex, JoinFilter, + JoinOn, JoinSide, }, metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, - DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, - SendableRecordBatchStream, Statistics, + DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, + PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::error::{DataFusionError, Result}; @@ -270,6 +271,58 @@ impl ExecutionPlan for HashJoinExec { self.schema.clone() } + fn required_input_distribution(&self) -> Vec { + match self.mode { + PartitionMode::CollectLeft => vec![ + Distribution::SinglePartition, + Distribution::UnspecifiedDistribution, + ], + PartitionMode::Partitioned => { + let (left_expr, right_expr) = self + .on + .iter() + .map(|(l, r)| { + ( + Arc::new(l.clone()) as Arc, + Arc::new(r.clone()) as Arc, + ) + }) + .unzip(); + vec![ + Distribution::HashPartitioned(left_expr), + Distribution::HashPartitioned(right_expr), + ] + } + } + } + + fn output_partitioning(&self) -> Partitioning { + let left_columns_len = self.left.schema().fields.len(); + join_output_partitioning( + self.join_type, + self.left.output_partitioning(), + self.right.output_partitioning(), + left_columns_len, + ) + } + + // TODO Output ordering might be kept for some cases. + // For example if it is inner join then the stream side order can be kept + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn equivalence_properties(&self) -> Vec { + let left_columns_len = self.left.schema().fields.len(); + join_equivalence_properties( + self.join_type, + self.left.equivalence_properties(), + self.right.equivalence_properties(), + left_columns_len, + self.on(), + ) + } + fn children(&self) -> Vec> { vec![self.left.clone(), self.right.clone()] } @@ -289,18 +342,6 @@ impl ExecutionPlan for HashJoinExec { )?)) } - fn output_partitioning(&self) -> Partitioning { - self.right.output_partitioning() - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn relies_on_input_order(&self) -> bool { - false - } - fn execute( &self, partition: usize, diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index dfcab88c277eb..181f661da403c 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -43,14 +43,17 @@ use crate::physical_plan::common::combine_batches; use crate::physical_plan::expressions::Column; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::joins::utils::{ - build_join_schema, check_join_is_valid, JoinOn, + build_join_schema, check_join_is_valid, join_equivalence_properties, + join_output_partitioning, JoinOn, }; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use crate::physical_plan::{ - metrics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, Statistics, + metrics, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, + Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_physical_expr::rewrite::TreeNodeRewritable; + /// join execution plan executes partitions in parallel and combines them into a set of /// partitions. #[derive(Debug)] @@ -67,6 +70,12 @@ pub struct SortMergeJoinExec { schema: SchemaRef, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// The left SortExpr + left_sort_exprs: Vec, + /// The right SortExpr + right_sort_exprs: Vec, + /// The output ordering + output_ordering: Option>, /// Sort options of join columns used in sorting left and right execution plans sort_options: Vec, /// If null_equals_null is true, null == null else null != null @@ -104,6 +113,75 @@ impl SortMergeJoinExec { ))); } + let (left_expr, right_expr): (Vec<_>, Vec<_>) = on + .iter() + .map(|(l, r)| { + ( + Arc::new(l.clone()) as Arc, + Arc::new(r.clone()) as Arc, + ) + }) + .unzip(); + + let left_sort_exprs = left_expr + .into_iter() + .zip(sort_options.iter()) + .map(|(k, sort_op)| PhysicalSortExpr { + expr: k, + options: *sort_op, + }) + .collect::>(); + + let right_sort_exprs = right_expr + .into_iter() + .zip(sort_options.iter()) + .map(|(k, sort_op)| PhysicalSortExpr { + expr: k, + options: *sort_op, + }) + .collect::>(); + + let output_ordering = match join_type { + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti => { + left.output_ordering().map(|sort_exprs| sort_exprs.to_vec()) + } + JoinType::RightSemi => right + .output_ordering() + .map(|sort_exprs| sort_exprs.to_vec()), + JoinType::Right => { + let left_columns_len = left.schema().fields.len(); + right.output_ordering().map(|sort_exprs| { + sort_exprs + .iter() + .map(|e| { + let new_expr = e + .expr + .clone() + .transform_down(&|e| match e + .as_any() + .downcast_ref::() + { + Some(col) => Some(Arc::new(Column::new( + col.name(), + left_columns_len + col.index(), + ))), + None => None, + }) + .unwrap(); + PhysicalSortExpr { + expr: new_expr, + options: e.options, + } + }) + .collect::>() + }) + } + JoinType::Full => None, + }; + let schema = Arc::new(build_join_schema(&left_schema, &right_schema, &join_type).0); @@ -114,10 +192,18 @@ impl SortMergeJoinExec { join_type, schema, metrics: ExecutionPlanMetricsSet::new(), + left_sort_exprs, + right_sort_exprs, + output_ordering, sort_options, null_equals_null, }) } + + /// Set of common columns used to join on + pub fn on(&self) -> &[(Column, Column)] { + &self.on + } } impl ExecutionPlan for SortMergeJoinExec { @@ -129,23 +215,50 @@ impl ExecutionPlan for SortMergeJoinExec { self.schema.clone() } + fn required_input_distribution(&self) -> Vec { + let (left_expr, right_expr) = self + .on + .iter() + .map(|(l, r)| { + ( + Arc::new(l.clone()) as Arc, + Arc::new(r.clone()) as Arc, + ) + }) + .unzip(); + vec![ + Distribution::HashPartitioned(left_expr), + Distribution::HashPartitioned(right_expr), + ] + } + + fn required_input_ordering(&self) -> Vec> { + vec![Some(&self.left_sort_exprs), Some(&self.right_sort_exprs)] + } + fn output_partitioning(&self) -> Partitioning { - self.right.output_partitioning() + let left_columns_len = self.left.schema().fields.len(); + join_output_partitioning( + self.join_type, + self.left.output_partitioning(), + self.right.output_partitioning(), + left_columns_len, + ) } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - match self.join_type { - JoinType::Inner - | JoinType::Left - | JoinType::LeftSemi - | JoinType::LeftAnti => self.left.output_ordering(), - JoinType::Right | JoinType::RightSemi => self.right.output_ordering(), - JoinType::Full => None, - } + self.output_ordering.as_deref() } - fn relies_on_input_order(&self) -> bool { - true + fn equivalence_properties(&self) -> Vec { + let left_columns_len = self.left.schema().fields.len(); + join_equivalence_properties( + self.join_type, + self.left.equivalence_properties(), + self.right.equivalence_properties(), + left_columns_len, + self.on(), + ) } fn children(&self) -> Vec> { @@ -226,8 +339,8 @@ impl ExecutionPlan for SortMergeJoinExec { DisplayFormatType::Default => { write!( f, - "SortMergeJoin: join_type={:?}, on={:?}, schema={:?}", - self.join_type, self.on, &self.schema + "SortMergeJoin: join_type={:?}, on={:?}", + self.join_type, self.on ) } } diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index 1d1560478ae4c..7b384a80b5765 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -33,7 +33,11 @@ use std::future::Future; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics}; +use crate::physical_plan::{ + ColumnStatistics, EquivalenceProperties, ExecutionPlan, Partitioning, Statistics, +}; +use datafusion_physical_expr::combine_equivalence_properties; +use datafusion_physical_expr::rewrite::TreeNodeRewritable; /// The on clause of the join, as vector of (left, right) columns. pub type JoinOn = Vec<(Column, Column)>; @@ -83,6 +87,123 @@ fn check_join_set_is_valid( Ok(()) } +/// Calculate the OutputPartitioning for Join Node +pub fn join_output_partitioning( + join_type: JoinType, + left_partitioning: Partitioning, + right_partitioning: Partitioning, + left_columns_len: usize, +) -> Partitioning { + match join_type { + JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => { + left_partitioning + } + JoinType::RightSemi => right_partitioning, + JoinType::Right => { + adjust_right_output_partitioning(right_partitioning, left_columns_len) + } + _ => Partitioning::UnknownPartitioning(right_partitioning.partition_count()), + } +} + +/// Adjust the right out partitioning to new Column Index +pub fn adjust_right_output_partitioning( + right_partitioning: Partitioning, + left_columns_len: usize, +) -> Partitioning { + match right_partitioning { + Partitioning::RoundRobinBatch(size) => Partitioning::RoundRobinBatch(size), + Partitioning::UnknownPartitioning(size) => { + Partitioning::UnknownPartitioning(size) + } + Partitioning::Hash(exprs, size) => { + let new_exprs = exprs + .into_iter() + .map(|expr| { + expr.transform_down(&|e| match e.as_any().downcast_ref::() { + Some(col) => Some(Arc::new(Column::new( + col.name(), + left_columns_len + col.index(), + ))), + None => None, + }) + .unwrap() + }) + .collect::>(); + Partitioning::Hash(new_exprs, size) + } + } +} + +/// Calculate the Equivalence Properties for Join Node +pub fn join_equivalence_properties( + join_type: JoinType, + left_properties: Vec, + right_properties: Vec, + left_columns_len: usize, + on: &[(Column, Column)], +) -> Vec { + let mut eq_properties = match join_type { + JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { + let mut left_properties = left_properties; + let new_right_properties = right_properties + .into_iter() + .map(|prop| { + let new_head = Column::new( + prop.head().name(), + left_columns_len + prop.head().index(), + ); + let new_others = prop + .others() + .iter() + .map(|col| { + Column::new(col.name(), left_columns_len + col.index()) + }) + .collect::>(); + EquivalenceProperties::new(new_head, new_others) + }) + .collect::>(); + left_properties.extend(new_right_properties); + left_properties + } + JoinType::LeftSemi | JoinType::LeftAnti => left_properties, + JoinType::RightSemi => right_properties, + }; + + if join_type == JoinType::Inner { + on.iter().for_each(|(column1, column2)| { + let new_column2 = + Column::new(column2.name(), left_columns_len + column2.index()); + combine_equivalence_properties(&mut eq_properties, (column1, &new_column2)) + }) + } + eq_properties +} + +/// Calculate the Equivalence Properties for CrossJoin Node +pub fn cross_join_equivalence_properties( + left_properties: Vec, + right_properties: Vec, + left_columns_len: usize, +) -> Vec { + let mut left_properties = left_properties; + let new_right_properties = right_properties + .into_iter() + .map(|prop| { + let new_head = + Column::new(prop.head().name(), left_columns_len + prop.head().index()); + let new_others = prop + .others() + .iter() + .map(|col| Column::new(col.name(), left_columns_len + col.index())) + .collect::>(); + EquivalenceProperties::new(new_head, new_others) + }) + .collect::>(); + left_properties.extend(new_right_properties); + left_properties +} + /// Used in ColumnIndex to distinguish which side the index is for #[derive(Debug, Clone)] pub enum JoinSide { diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 322c21ff419cc..857d8ebf88433 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -27,7 +27,7 @@ use std::task::{Context, Poll}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, }; use arrow::array::ArrayRef; use arrow::compute::limit; @@ -98,10 +98,9 @@ impl ExecutionPlan for GlobalLimitExec { vec![self.input.clone()] } - fn required_child_distribution(&self) -> Distribution { - Distribution::SinglePartition + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] } - /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { Partitioning::UnknownPartitioning(1) @@ -123,6 +122,10 @@ impl ExecutionPlan for GlobalLimitExec { self.input.output_ordering() } + fn equivalence_properties(&self) -> Vec { + self.input.equivalence_properties() + } + fn with_new_children( self: Arc, children: Vec>, @@ -281,14 +284,13 @@ impl ExecutionPlan for LocalLimitExec { false } - // Local limit does not make any attempt to maintain the input - // sortedness (if there is more than one partition) + // Local limit will not change the input plan's ordering fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - if self.output_partitioning().partition_count() == 1 { - self.input.output_ordering() - } else { - None - } + self.input.output_ordering() + } + + fn equivalence_properties(&self) -> Vec { + self.input.equivalence_properties() } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs index d2dbe0d738c39..95dc2258d4cff 100644 --- a/datafusion/core/src/physical_plan/memory.rs +++ b/datafusion/core/src/physical_plan/memory.rs @@ -81,10 +81,6 @@ impl ExecutionPlan for MemoryExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 9e36c3ec80edc..329a62418fcf4 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -122,10 +122,20 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// have any particular output order here fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; - /// Specifies the data distribution requirements of all the - /// children for this operator - fn required_child_distribution(&self) -> Distribution { - Distribution::UnspecifiedDistribution + /// Specifies the data distribution requirements for all the + /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child, + fn required_input_distribution(&self) -> Vec { + if !self.children().is_empty() { + vec![Distribution::UnspecifiedDistribution; self.children().len()] + } else { + vec![Distribution::UnspecifiedDistribution] + } + } + + /// Specifies the ordering requirements for all the + /// children for this operator. + fn required_input_ordering(&self) -> Vec> { + vec![None; self.children().len()] } /// Returns `true` if this operator relies on its inputs being @@ -136,13 +146,17 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// optimizations which might reorder the inputs (such as /// repartitioning to increase concurrency). /// - /// The default implementation returns `true` + /// The default implementation checks the input ordering requirements + /// and if there is non empty ordering requirements to the input, the method will + /// return `true`. /// /// WARNING: if you override this default and return `false`, your /// operator can not rely on DataFusion preserving the input order /// as it will likely not. fn relies_on_input_order(&self) -> bool { - true + self.required_input_ordering() + .iter() + .any(|ordering| matches!(ordering, Some(_))) } /// Returns `false` if this operator's implementation may reorder @@ -175,10 +189,17 @@ pub trait ExecutionPlan: Debug + Send + Sync { fn benefits_from_input_partitioning(&self) -> bool { // By default try to maximize parallelism with more CPUs if // possible - !matches!( - self.required_child_distribution(), - Distribution::SinglePartition - ) + !self + .required_input_distribution() + .into_iter() + .any(|dist| matches!(dist, Distribution::SinglePartition)) + } + + /// Get a list of equivalence properties within the plan + /// Equivalence Propertie is a set of Columns that are known to have the same value in all tuples of this relation. + /// Equivalence Propertie are generated by equality predicates, typically equijoin conditions and equality comparisons in filters. + fn equivalence_properties(&self) -> Vec { + vec![] } /// Get a list of child execution plans that provide the input for this plan. The returned list @@ -460,6 +481,23 @@ impl Partitioning { } } +impl PartialEq for Partitioning { + fn eq(&self, other: &Partitioning) -> bool { + match (self, other) { + ( + Partitioning::RoundRobinBatch(count1), + Partitioning::RoundRobinBatch(count2), + ) if count1 == count2 => true, + (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2)) + if expr_list_eq_strict_order(exprs1, exprs2) && (count1 == count2) => + { + true + } + _ => false, + } + } +} + /// Distribution schemes #[derive(Debug, Clone)] pub enum Distribution { @@ -472,7 +510,10 @@ pub enum Distribution { HashPartitioned(Vec>), } +use datafusion_physical_expr::expr_list_eq_strict_order; +use datafusion_physical_expr::expressions::Column; pub use datafusion_physical_expr::window::WindowExpr; +use datafusion_physical_expr::EquivalenceProperties; pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr}; /// Applies an optional projection to a [`SchemaRef`], returning the diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index a389bb65bdc7e..348f19ea1b961 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -522,8 +522,9 @@ impl DefaultPhysicalPlanner { && session_state.config.target_partitions > 1 && session_state.config.repartition_windows; - let input_exec = if can_repartition { - let partition_keys = partition_keys + let physical_partition_keys = if can_repartition + { + partition_keys .iter() .map(|e| { self.create_physical_expr( @@ -533,11 +534,16 @@ impl DefaultPhysicalPlanner { session_state, ) }) - .collect::>>>()?; + .collect::>>>()? + } else { + vec![] + }; + + let input_exec = if can_repartition { Arc::new(RepartitionExec::try_new( input_exec, Partitioning::Hash( - partition_keys, + physical_partition_keys.clone(), session_state.config.target_partitions, ), )?) @@ -576,8 +582,8 @@ impl DefaultPhysicalPlanner { let logical_input_schema = input.schema(); - let input_exec = if sort_keys.is_empty() { - input_exec + let physical_sort_keys = if sort_keys.is_empty() { + None } else { let physical_input_schema = input_exec.schema(); let sort_keys = sort_keys @@ -600,13 +606,19 @@ impl DefaultPhysicalPlanner { _ => unreachable!(), }) .collect::>>()?; - Arc::new(if can_repartition { - SortExec::new_with_partitioning(sort_keys, input_exec, true, None) - } else { - SortExec::try_new(sort_keys, input_exec, None)? - }) + Some(sort_keys) }; + let input_exec = match physical_sort_keys.clone() { + None => input_exec, + Some(sort_exprs) => { + if can_repartition { + Arc::new(SortExec::new_with_partitioning(sort_exprs, input_exec, true, None)) + } else { + Arc::new(SortExec::try_new(sort_exprs, input_exec, None)?) + } + }, + }; let physical_input_schema = input_exec.schema(); let window_expr = window_expr .iter() @@ -624,6 +636,8 @@ impl DefaultPhysicalPlanner { window_expr, input_exec, physical_input_schema, + physical_partition_keys, + physical_sort_keys, )?)) } LogicalPlan::Aggregate(Aggregate { @@ -2344,10 +2358,6 @@ mod tests { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn children(&self) -> Vec> { vec![] } diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 5fa3c93cdd421..a2f0540757a1b 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -21,14 +21,15 @@ //! projection expressions. `SELECT` without `FROM` will only evaluate expressions. use std::any::Any; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use crate::error::Result; use crate::physical_plan::{ - ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, + ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan, + Partitioning, PhysicalExpr, }; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; @@ -39,6 +40,10 @@ use super::expressions::{Column, PhysicalSortExpr}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; use crate::execution::context::TaskContext; +use datafusion_physical_expr::{ + merge_equivalence_properties_with_alias, normalize_out_expr_with_alias_schema, + truncate_equivalence_properties_not_in_schema, +}; use futures::stream::Stream; use futures::stream::StreamExt; @@ -51,6 +56,10 @@ pub struct ProjectionExec { schema: SchemaRef, /// The input plan input: Arc, + /// The output ordering + output_ordering: Option>, + /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr + alias_map: HashMap>, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -82,10 +91,47 @@ impl ProjectionExec { input_schema.metadata().clone(), )); + let mut alias_map: HashMap> = HashMap::new(); + for (expression, name) in expr.iter() { + if let Some(column) = expression.as_any().downcast_ref::() { + let new_col_idx = schema.index_of(name)?; + // When the column name is the same, but index does not equal, treat it as Alias + if (column.name() != name) || (column.index() != new_col_idx) { + let entry = alias_map.entry(column.clone()).or_insert_with(Vec::new); + entry.push(Column::new(name, new_col_idx)); + } + }; + } + + // Output Ordering need to respect the alias + let child_output_ordering = input.output_ordering(); + let output_ordering = match child_output_ordering { + Some(sort_exprs) => { + let normalized_exprs = sort_exprs + .iter() + .map(|sort_expr| { + let expr = normalize_out_expr_with_alias_schema( + sort_expr.expr.clone(), + &alias_map, + &schema, + ); + PhysicalSortExpr { + expr, + options: sort_expr.options, + } + }) + .collect::>(); + Some(normalized_exprs) + } + None => None, + }; + Ok(Self { expr, schema, input: input.clone(), + output_ordering, + alias_map, metrics: ExecutionPlanMetricsSet::new(), }) } @@ -118,11 +164,28 @@ impl ExecutionPlan for ProjectionExec { /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { - self.input.output_partitioning() + // Output partition need to respect the alias + let input_partition = self.input.output_partitioning(); + match input_partition { + Partitioning::Hash(exprs, part) => { + let normalized_exprs = exprs + .into_iter() + .map(|expr| { + normalize_out_expr_with_alias_schema( + expr, + &self.alias_map, + &self.schema, + ) + }) + .collect::>(); + Partitioning::Hash(normalized_exprs, part) + } + _ => input_partition, + } } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - self.input.output_ordering() + self.output_ordering.as_deref() } fn maintains_input_order(&self) -> bool { @@ -130,8 +193,21 @@ impl ExecutionPlan for ProjectionExec { true } - fn relies_on_input_order(&self) -> bool { - false + // Equivalence properties need to be adjusted after the Projection. + // 1) Add Alias, Alias can introduce additional equivalence properties, + // For example: Projection(a, a as a1, a as a2) + // 2) Truncate the properties that are not in the schema of the Projection + fn equivalence_properties(&self) -> Vec { + let mut input_equivalence_properties = self.input.equivalence_properties(); + merge_equivalence_properties_with_alias( + &mut input_equivalence_properties, + &self.alias_map, + ); + truncate_equivalence_properties_not_in_schema( + &mut input_equivalence_properties, + &self.schema, + ); + input_equivalence_properties } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs index 5611989f0ea9c..d45848b57bc85 100644 --- a/datafusion/core/src/physical_plan/repartition.rs +++ b/datafusion/core/src/physical_plan/repartition.rs @@ -25,7 +25,9 @@ use std::{any::Any, vec}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::hash_utils::create_hashes; -use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics}; +use crate::physical_plan::{ + DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, Statistics, +}; use arrow::array::{ArrayRef, UInt64Builder}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; @@ -272,10 +274,6 @@ impl ExecutionPlan for RepartitionExec { vec![self.input.clone()] } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, children: Vec>, @@ -294,6 +292,10 @@ impl ExecutionPlan for RepartitionExec { None } + fn equivalence_properties(&self) -> Vec { + self.input.equivalence_properties() + } + fn execute( &self, partition: usize, diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 763c7c553f41f..d60bb98a74955 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -46,6 +46,7 @@ use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion_physical_expr::EquivalenceProperties; use futures::lock::Mutex; use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use log::{debug, error}; @@ -743,11 +744,13 @@ impl ExecutionPlan for SortExec { } } - fn required_child_distribution(&self) -> Distribution { + fn required_input_distribution(&self) -> Vec { if self.preserve_partitioning { - Distribution::UnspecifiedDistribution + vec![Distribution::UnspecifiedDistribution] } else { - Distribution::SinglePartition + // global sort + // TODO support RangePartition and OrderedDistribution + vec![Distribution::SinglePartition] } } @@ -755,11 +758,6 @@ impl ExecutionPlan for SortExec { vec![self.input.clone()] } - fn relies_on_input_order(&self) -> bool { - // this operator resorts everything - false - } - fn benefits_from_input_partitioning(&self) -> bool { false } @@ -768,6 +766,10 @@ impl ExecutionPlan for SortExec { Some(&self.expr) } + fn equivalence_properties(&self) -> Vec { + self.input.equivalence_properties() + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 22bd83ec9ae8b..e45ec886ed17b 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -49,6 +49,7 @@ use crate::physical_plan::{ Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_physical_expr::EquivalenceProperties; /// Sort preserving merge execution plan /// @@ -123,18 +124,22 @@ impl ExecutionPlan for SortPreservingMergeExec { Partitioning::UnknownPartitioning(1) } - fn required_child_distribution(&self) -> Distribution { - Distribution::UnspecifiedDistribution + fn required_input_distribution(&self) -> Vec { + vec![Distribution::UnspecifiedDistribution] } - fn relies_on_input_order(&self) -> bool { - true + fn required_input_ordering(&self) -> Vec> { + vec![Some(&self.expr)] } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { Some(&self.expr) } + fn equivalence_properties(&self) -> Vec { + self.input.equivalence_properties() + } + fn children(&self) -> Vec> { vec![self.input.clone()] } diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index bf9dfbd1b694c..af57c9ef9cc28 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -21,15 +21,19 @@ //! The Union operator combines multiple inputs with the same schema +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{any::Any, sync::Arc}; +use arrow::error::Result as ArrowResult; use arrow::{ datatypes::{Field, Schema, SchemaRef}, record_batch::RecordBatch, }; -use futures::StreamExt; +use futures::{Stream, StreamExt}; use itertools::Itertools; use log::debug; +use log::warn; use super::{ expressions::PhysicalSortExpr, @@ -42,6 +46,8 @@ use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, }; +use datafusion_physical_expr::sort_expr_list_eq_strict_order; +use tokio::macros::support::thread_rng_n; /// UNION ALL execution plan #[derive(Debug)] @@ -52,6 +58,8 @@ pub struct UnionExec { metrics: ExecutionPlanMetricsSet, /// Schema of Union schema: SchemaRef, + /// Partition aware Union + partition_aware: bool, } impl UnionExec { @@ -78,10 +86,24 @@ impl UnionExec { inputs[0].schema().metadata().clone(), )); + // If all the input partitions have the same Hash partition spec with the first_input_partition + // The UnionExec is partition aware. + // + // It might be too strict here in the case that the input partition specs are compatible but not exactly the same. + // For example one input partition has the partition spec Hash('a','b','c') and + // other has the partition spec Hash('a'), It is safe to derive the out partition with the spec Hash('a','b','c'). + let first_input_partition = inputs[0].output_partitioning(); + let partition_aware = matches!(first_input_partition, Partitioning::Hash(_, _)) + && inputs + .iter() + .map(|plan| plan.output_partitioning()) + .all(|partition| partition == first_input_partition); + UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), schema, + partition_aware, } } @@ -107,23 +129,46 @@ impl ExecutionPlan for UnionExec { /// Output of the union is the combination of all output partitions of the inputs fn output_partitioning(&self) -> Partitioning { - // Sums all the output partitions - let num_partitions = self - .inputs - .iter() - .map(|plan| plan.output_partitioning().partition_count()) - .sum(); - // TODO: this loses partitioning info in case of same partitioning scheme (for example `Partitioning::Hash`) - // https://issues.apache.org/jira/browse/ARROW-11991 - Partitioning::UnknownPartitioning(num_partitions) + if self.partition_aware { + self.inputs[0].output_partitioning() + } else { + // Output the combination of all output partitions of the inputs if the Union is not partition aware + let num_partitions = self + .inputs + .iter() + .map(|plan| plan.output_partitioning().partition_count()) + .sum(); + + Partitioning::UnknownPartitioning(num_partitions) + } } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn relies_on_input_order(&self) -> bool { - false + let first_input_ordering = self.inputs[0].output_ordering(); + // If the Union is not partition aware and all the input ordering spec strictly equal with the first_input_ordering + // Return the first_input_ordering as the output_ordering + // + // It might be too strict here in the case that the input ordering are compatible but not exactly the same. + // For example one input ordering has the ordering spec SortExpr('a','b','c') and the other has the ordering + // spec SortExpr('a'), It is safe to derive the out ordering with the spec SortExpr('a'). + if !self.partition_aware + && first_input_ordering.is_some() + && self + .inputs + .iter() + .map(|plan| plan.output_ordering()) + .all(|ordering| { + ordering.is_some() + && sort_expr_list_eq_strict_order( + ordering.unwrap(), + first_input_ordering.unwrap(), + ) + }) + { + first_input_ordering + } else { + None + } } fn with_new_children( @@ -145,19 +190,38 @@ impl ExecutionPlan for UnionExec { let elapsed_compute = baseline_metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); // record on drop - // find partition to execute - for input in self.inputs.iter() { - // Calculate whether partition belongs to the current partition - if partition < input.output_partitioning().partition_count() { - let stream = input.execute(partition, context)?; - debug!("Found a Union partition to execute"); + if self.partition_aware { + let mut input_stream_vec = vec![]; + for input in self.inputs.iter() { + if partition < input.output_partitioning().partition_count() { + input_stream_vec.push(input.execute(partition, context.clone())?); + } else { + // Do not find a partition to execute + break; + } + } + if input_stream_vec.len() == self.inputs.len() { + let stream = Box::pin(CombinedRecordBatchStream::new( + self.schema(), + input_stream_vec, + )); return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); - } else { - partition -= input.output_partitioning().partition_count(); + } + } else { + // find partition to execute + for input in self.inputs.iter() { + // Calculate whether partition belongs to the current partition + if partition < input.output_partitioning().partition_count() { + let stream = input.execute(partition, context)?; + debug!("Found a Union partition to execute"); + return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); + } else { + partition -= input.output_partitioning().partition_count(); + } } } - debug!("Error in Union: Partition {} not found", partition); + warn!("Error in Union: Partition {} not found", partition); Err(crate::error::DataFusionError::Execution(format!( "Partition {} not found in Union", @@ -194,6 +258,73 @@ impl ExecutionPlan for UnionExec { } } +/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one +pub struct CombinedRecordBatchStream { + /// Schema wrapped by Arc + schema: SchemaRef, + /// Stream entries + entries: Vec, +} + +impl CombinedRecordBatchStream { + /// Create an CombinedRecordBatchStream + pub fn new(schema: SchemaRef, entries: Vec) -> Self { + Self { schema, entries } + } +} + +impl RecordBatchStream for CombinedRecordBatchStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for CombinedRecordBatchStream { + type Item = ArrowResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + use Poll::*; + + let start = thread_rng_n(self.entries.len() as u32) as usize; + let mut idx = start; + + for _ in 0..self.entries.len() { + let stream = self.entries.get_mut(idx).unwrap(); + + match Pin::new(stream).poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => { + // Remove the entry + self.entries.swap_remove(idx); + + // Check if this was the last entry, if so the cursor needs + // to wrap + if idx == self.entries.len() { + idx = 0; + } else if idx < start && start <= self.entries.len() { + // The stream being swapped into the current index has + // already been polled, so skip it. + idx = idx.wrapping_add(1) % self.entries.len(); + } + } + Pending => { + idx = idx.wrapping_add(1) % self.entries.len(); + } + } + } + + // If the map is empty, then the stream is complete. + if self.entries.is_empty() { + Ready(None) + } else { + Pending + } + } +} + /// Stream wrapper that records `BaselineMetrics` for a particular /// partition struct ObservedStream { diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs index 897936814ceea..6ab4f7b82490e 100644 --- a/datafusion/core/src/physical_plan/values.rs +++ b/datafusion/core/src/physical_plan/values.rs @@ -22,8 +22,8 @@ use super::{common, SendableRecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; use crate::physical_plan::{ - memory::MemoryStream, ColumnarValue, DisplayFormatType, Distribution, ExecutionPlan, - Partitioning, PhysicalExpr, + memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning, + PhysicalExpr, }; use crate::scalar::ScalarValue; use arrow::array::new_null_array; @@ -108,11 +108,6 @@ impl ExecutionPlan for ValuesExec { fn children(&self) -> Vec> { vec![] } - - fn required_child_distribution(&self) -> Distribution { - Distribution::UnspecifiedDistribution - } - /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { Partitioning::UnknownPartitioning(1) @@ -122,10 +117,6 @@ impl ExecutionPlan for ValuesExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index 95582b2119de6..0183ee6cc3905 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -216,6 +216,8 @@ mod tests { ], input, schema.clone(), + vec![], + None, )?); let result: Vec = collect(window_exec, task_ctx).await?; @@ -261,6 +263,8 @@ mod tests { )?], blocking_exec, schema, + vec![], + None, )?); let fut = collect(window_agg_exec, task_ctx); diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index e9eac35a3d883..3f67b0392bb8c 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -24,8 +24,9 @@ use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; use crate::physical_plan::{ - common, ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, - Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, + common, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties, + ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, + SendableRecordBatchStream, Statistics, WindowExpr, }; use arrow::{ array::ArrayRef, @@ -35,6 +36,7 @@ use arrow::{ }; use futures::stream::Stream; use futures::{ready, StreamExt}; +use log::warn; use std::any::Any; use std::pin::Pin; use std::sync::Arc; @@ -51,6 +53,10 @@ pub struct WindowAggExec { schema: SchemaRef, /// Schema before the window input_schema: SchemaRef, + /// Partition Keys + pub partition_keys: Vec>, + /// Sort Keys + pub sort_keys: Option>, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -61,6 +67,8 @@ impl WindowAggExec { window_expr: Vec>, input: Arc, input_schema: SchemaRef, + partition_keys: Vec>, + sort_keys: Option>, ) -> Result { let schema = create_schema(&input_schema, &window_expr)?; let schema = Arc::new(schema); @@ -69,6 +77,8 @@ impl WindowAggExec { window_expr, schema, input_schema, + partition_keys, + sort_keys, metrics: ExecutionPlanMetricsSet::new(), }) } @@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec { true } - fn relies_on_input_order(&self) -> bool { - true + fn required_input_ordering(&self) -> Vec> { + let sort_keys = self.sort_keys.as_deref(); + vec![sort_keys] } - fn required_child_distribution(&self) -> Distribution { - if self - .window_expr() - .iter() - .all(|expr| expr.partition_by().is_empty()) - { - Distribution::SinglePartition + fn required_input_distribution(&self) -> Vec { + if self.partition_keys.is_empty() { + warn!("No partition defined for WindowAggExec!!!"); + vec![Distribution::SinglePartition] } else { - Distribution::UnspecifiedDistribution + //TODO support PartitionCollections if there is no common partition columns in the window_expr + vec![Distribution::HashPartitioned(self.partition_keys.clone())] } } + fn equivalence_properties(&self) -> Vec { + self.input.equivalence_properties() + } + fn with_new_children( self: Arc, children: Vec>, @@ -143,6 +156,8 @@ impl ExecutionPlan for WindowAggExec { self.window_expr.clone(), children[0].clone(), self.input_schema.clone(), + self.partition_keys.clone(), + self.sort_keys.clone(), )?)) } diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs index 20e7c6e79a48c..8ecece85e9380 100644 --- a/datafusion/core/src/scheduler/pipeline/execution.rs +++ b/datafusion/core/src/scheduler/pipeline/execution.rs @@ -235,8 +235,8 @@ impl ExecutionPlan for ProxyExecutionPlan { self.inner.output_ordering() } - fn required_child_distribution(&self) -> Distribution { - self.inner.required_child_distribution() + fn required_input_distribution(&self) -> Vec { + self.inner.required_input_distribution() } fn relies_on_input_order(&self) -> bool { diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index c93b7c3eb4065..2f9f31f32534b 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -441,12 +441,8 @@ impl ExecutionPlan for TopKExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - - fn required_child_distribution(&self) -> Distribution { - Distribution::SinglePartition + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] } fn children(&self) -> Vec> { diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 450919aa3f2cc..776f4dc0a5066 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -118,6 +118,76 @@ impl PartialEq for Column { } } +/// Represents the unknown column without index +#[derive(Debug, Hash, PartialEq, Eq, Clone)] +pub struct UnKnownColumn { + name: String, +} + +impl UnKnownColumn { + /// Create a new unknown column expression + pub fn new(name: &str) -> Self { + Self { + name: name.to_owned(), + } + } + + /// Get the column name + pub fn name(&self) -> &str { + &self.name + } +} + +impl std::fmt::Display for UnKnownColumn { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.name) + } +} + +impl PhysicalExpr for UnKnownColumn { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn std::any::Any { + self + } + + /// Get the data type of this expression, given the schema of the input + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(DataType::Null) + } + + /// Decide whehter this expression is nullable, given the schema of the input + fn nullable(&self, _input_schema: &Schema) -> Result { + Ok(true) + } + + /// Evaluate the expression + fn evaluate(&self, _batch: &RecordBatch) -> Result { + Err(DataFusionError::Plan( + "UnKnownColumn::evaluate() should not be called".to_owned(), + )) + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(self) + } +} + +impl PartialEq for UnKnownColumn { + fn eq(&self, other: &dyn Any) -> bool { + down_cast_any_ref(other) + .downcast_ref::() + .map(|x| self == x) + .unwrap_or(false) + } +} + #[derive(Debug, Clone)] struct ColumnExprStats { index: usize, diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index d27737dd98815..26bb3ca1e64c3 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -72,7 +72,7 @@ pub use case::{case, CaseExpr}; pub use cast::{ cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS, }; -pub use column::{col, Column}; +pub use column::{col, Column, UnKnownColumn}; pub use datetime::DateTimeIntervalExpr; pub use get_indexed_field::GetIndexedFieldExpr; pub use in_list::{in_list, InListExpr}; diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 9087c2c2d9732..252bf2986274e 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -45,7 +45,17 @@ pub mod window; // reexport this to maintain compatibility with anything that used from_slice previously pub use aggregate::AggregateExpr; pub use datafusion_common::from_slice; -pub use physical_expr::{ExprBoundaries, PhysicalExpr, PhysicalExprStats}; +pub use physical_expr::{ + EquivalenceProperties, ExprBoundaries, PhysicalExpr, PhysicalExprStats, +}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; pub use sort_expr::PhysicalSortExpr; +pub use utils::{ + combine_equivalence_properties, expr_list_eq_any_order, expr_list_eq_strict_order, + merge_equivalence_properties_with_alias, normalize_expr_with_equivalence_properties, + normalize_out_expr_with_alias_schema, + normalize_sort_expr_with_equivalence_properties, remove_equivalence_properties, + sort_expr_list_eq_strict_order, split_predicate, + truncate_equivalence_properties_not_in_schema, +}; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 772a02097bfd5..e56554d2e2661 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -19,6 +19,7 @@ use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; +use crate::expressions::Column; use datafusion_common::{ColumnStatistics, DataFusionError, Result, ScalarValue}; use datafusion_expr::ColumnarValue; @@ -26,6 +27,7 @@ use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; use arrow::compute::{and_kleene, filter_record_batch, is_not_null, SlicesIterator}; use std::any::Any; +use std::collections::HashSet; use std::fmt::{Debug, Display}; use std::sync::Arc; @@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats { } } +#[derive(Debug, Clone)] +pub struct EquivalenceProperties { + /// First element in the EquivalenceProperties + head: Column, + /// Other equal columns + others: HashSet, +} + +impl EquivalenceProperties { + pub fn new(head: Column, others: Vec) -> Self { + EquivalenceProperties { + head, + others: HashSet::from_iter(others), + } + } + + pub fn head(&self) -> &Column { + &self.head + } + + pub fn others(&self) -> &HashSet { + &self.others + } + + pub fn contains(&self, col: &Column) -> bool { + self.head == *col || self.others.contains(col) + } + + pub fn insert(&mut self, col: Column) -> bool { + self.others.insert(col) + } + + pub fn remove(&mut self, col: &Column) -> bool { + let removed = self.others.remove(col); + if !removed && *col == self.head { + let one_col = self.others.iter().next().cloned(); + if let Some(col) = one_col { + let removed = self.others.remove(&col); + self.head = col; + removed + } else { + false + } + } else { + true + } + } + + pub fn iter(&self) -> impl Iterator { + std::iter::once(&self.head).chain(self.others.iter()) + } + + pub fn len(&self) -> usize { + self.others.len() + 1 + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + /// Returns a copy of this expr if we change any child according to the pointer comparison. /// The size of `children` must be equal to the size of `PhysicalExpr::children()`. /// Allow the vtable address comparisons for PhysicalExpr Trait Objects,it is harmless even diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 7ee947c0a7f99..7105e2da50582 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -15,9 +15,18 @@ // specific language governing permissions and limitations // under the License. +use crate::expressions::BinaryExpr; +use crate::expressions::Column; +use crate::expressions::UnKnownColumn; +use crate::physical_expr::EquivalenceProperties; +use crate::rewrite::TreeNodeRewritable; use crate::PhysicalExpr; use crate::PhysicalSortExpr; +use datafusion_expr::Operator; +use arrow::datatypes::SchemaRef; + +use std::collections::HashMap; use std::sync::Arc; /// Compare the two expr lists are equal no matter the order. @@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order( list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2)) } +/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs. +/// +/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"] +/// +pub fn split_predicate(predicate: &Arc) -> Vec<&Arc> { + match predicate.as_any().downcast_ref::() { + Some(binary) => match binary.op() { + Operator::And => { + let mut vec1 = split_predicate(binary.left()); + let vec2 = split_predicate(binary.right()); + vec1.extend(vec2); + vec1 + } + _ => vec![predicate], + }, + None => vec![], + } +} + +/// Combine the new equal condition with the existing equivalence properties. +pub fn combine_equivalence_properties( + eq_properties: &mut Vec, + new_condition: (&Column, &Column), +) { + let mut idx1 = -1i32; + let mut idx2 = -1i32; + for (idx, prop) in eq_properties.iter_mut().enumerate() { + let contains_first = prop.contains(new_condition.0); + let contains_second = prop.contains(new_condition.1); + if contains_first && !contains_second { + prop.insert(new_condition.1.clone()); + idx1 = idx as i32; + } else if !contains_first && contains_second { + prop.insert(new_condition.0.clone()); + idx2 = idx as i32; + } else if contains_first && contains_second { + idx1 = idx as i32; + idx2 = idx as i32; + break; + } + } + + if idx1 != -1 && idx2 != -1 && idx1 != idx2 { + // need to merge the two existing properties + let second_properties = eq_properties.get(idx2 as usize).unwrap().clone(); + let first_properties = eq_properties.get_mut(idx1 as usize).unwrap(); + for prop in second_properties.iter() { + if !first_properties.contains(prop) { + first_properties.insert(prop.clone()); + } + } + eq_properties.remove(idx2 as usize); + } else if idx1 == -1 && idx2 == -1 { + // adding new pairs + eq_properties.push(EquivalenceProperties::new( + new_condition.0.clone(), + vec![new_condition.1.clone()], + )) + } +} + +pub fn remove_equivalence_properties( + eq_properties: &mut Vec, + remove_condition: (&Column, &Column), +) { + let mut match_idx = -1i32; + for (idx, prop) in eq_properties.iter_mut().enumerate() { + let contains_first = prop.contains(remove_condition.0); + let contains_second = prop.contains(remove_condition.1); + if contains_first && contains_second { + match_idx = idx as i32; + break; + } + } + if match_idx >= 0 { + let matches = eq_properties.get_mut(match_idx as usize).unwrap(); + matches.remove(remove_condition.0); + matches.remove(remove_condition.1); + if matches.len() <= 1 { + eq_properties.remove(match_idx as usize); + } + } +} + +pub fn merge_equivalence_properties_with_alias( + eq_properties: &mut Vec, + alias_map: &HashMap>, +) { + for (column, columns) in alias_map { + let mut find_match = false; + for (_idx, prop) in eq_properties.iter_mut().enumerate() { + if prop.contains(column) { + for col in columns { + prop.insert(col.clone()); + } + find_match = true; + break; + } + } + if !find_match { + eq_properties + .push(EquivalenceProperties::new(column.clone(), columns.clone())); + } + } +} + +pub fn truncate_equivalence_properties_not_in_schema( + eq_properties: &mut Vec, + schema: &SchemaRef, +) { + for props in eq_properties.iter_mut() { + let mut columns_to_remove = vec![]; + for column in props.iter() { + if let Ok(idx) = schema.index_of(column.name()) { + if idx != column.index() { + columns_to_remove.push(column.clone()); + } + } else { + columns_to_remove.push(column.clone()); + } + } + for column in columns_to_remove { + props.remove(&column); + } + } + eq_properties.retain(|props| props.len() > 1); +} + +/// Normalize the output expressions based on Alias Map and SchemaRef. +/// +/// 1) If there is mapping in Alias Map, replace the Column in the output expressions with the 1st Column in Alias Map +/// 2) If the Column is invalid for the current Schema, replace the Column with a place holder UnKnownColumn +/// +pub fn normalize_out_expr_with_alias_schema( + expr: Arc, + alias_map: &HashMap>, + schema: &SchemaRef, +) -> Arc { + let expr_clone = expr.clone(); + expr_clone + .transform(&|expr| { + let normalized_form: Option> = + match expr.as_any().downcast_ref::() { + Some(column) => { + let out = alias_map + .get(column) + .map(|c| { + let out_col: Arc = + Arc::new(c[0].clone()); + out_col + }) + .or_else(|| match schema.index_of(column.name()) { + // Exactly matching, return None, no need to do the transform + Ok(idx) if column.index() == idx => None, + _ => { + let out_col: Arc = + Arc::new(UnKnownColumn::new(column.name())); + Some(out_col) + } + }); + out + } + None => None, + }; + normalized_form + }) + .unwrap_or(expr) +} + +pub fn normalize_expr_with_equivalence_properties( + expr: Arc, + eq_properties: &[EquivalenceProperties], +) -> Arc { + let mut normalized = expr.clone(); + if let Some(column) = expr.as_any().downcast_ref::() { + for prop in eq_properties { + if prop.contains(column) { + normalized = Arc::new(prop.head().clone()); + break; + } + } + } + normalized +} + +pub fn normalize_sort_expr_with_equivalence_properties( + sort_expr: PhysicalSortExpr, + eq_properties: &[EquivalenceProperties], +) -> PhysicalSortExpr { + let mut normalized = sort_expr.clone(); + if let Some(column) = sort_expr.expr.as_any().downcast_ref::() { + for prop in eq_properties { + if prop.contains(column) { + normalized = PhysicalSortExpr { + expr: Arc::new(prop.head().clone()), + options: sort_expr.options, + }; + break; + } + } + } + normalized +} + #[cfg(test)] mod tests { @@ -77,7 +290,7 @@ mod tests { use std::sync::Arc; #[test] - fn expr_list_eq_any_order_test() -> Result<()> { + fn expr_list_eq_test() -> Result<()> { let list1: Vec> = vec![ Arc::new(Column::new("a", 0)), Arc::new(Column::new("a", 0)), @@ -91,6 +304,15 @@ mod tests { assert!(!expr_list_eq_any_order(list1.as_slice(), list2.as_slice())); assert!(!expr_list_eq_any_order(list2.as_slice(), list1.as_slice())); + assert!(!expr_list_eq_strict_order( + list1.as_slice(), + list2.as_slice() + )); + assert!(!expr_list_eq_strict_order( + list2.as_slice(), + list1.as_slice() + )); + let list3: Vec> = vec![ Arc::new(Column::new("a", 0)), Arc::new(Column::new("b", 1)), @@ -110,6 +332,17 @@ mod tests { assert!(expr_list_eq_any_order(list3.as_slice(), list3.as_slice())); assert!(expr_list_eq_any_order(list4.as_slice(), list4.as_slice())); + assert!(!expr_list_eq_strict_order( + list3.as_slice(), + list4.as_slice() + )); + assert!(!expr_list_eq_strict_order( + list4.as_slice(), + list3.as_slice() + )); + assert!(expr_list_eq_any_order(list3.as_slice(), list3.as_slice())); + assert!(expr_list_eq_any_order(list4.as_slice(), list4.as_slice())); + Ok(()) } @@ -202,4 +435,80 @@ mod tests { Ok(()) } + + #[test] + fn combine_equivalence_properties_test() -> Result<()> { + let mut eq_properties: Vec = vec![]; + let new_condition = (&Column::new("a", 0), &Column::new("b", 1)); + combine_equivalence_properties(&mut eq_properties, new_condition); + assert_eq!(eq_properties.len(), 1); + + let new_condition = (&Column::new("b", 1), &Column::new("a", 0)); + combine_equivalence_properties(&mut eq_properties, new_condition); + assert_eq!(eq_properties.len(), 1); + assert_eq!(eq_properties[0].len(), 2); + + let new_condition = (&Column::new("b", 1), &Column::new("c", 2)); + combine_equivalence_properties(&mut eq_properties, new_condition); + assert_eq!(eq_properties.len(), 1); + assert_eq!(eq_properties[0].len(), 3); + + let new_condition = (&Column::new("x", 99), &Column::new("y", 100)); + combine_equivalence_properties(&mut eq_properties, new_condition); + assert_eq!(eq_properties.len(), 2); + + let new_condition = (&Column::new("x", 99), &Column::new("a", 0)); + combine_equivalence_properties(&mut eq_properties, new_condition); + assert_eq!(eq_properties.len(), 1); + assert_eq!(eq_properties[0].len(), 5); + + Ok(()) + } + + #[test] + fn remove_equivalence_properties_test() -> Result<()> { + let mut eq_properties: Vec = vec![]; + let remove_condition = (&Column::new("a", 0), &Column::new("b", 1)); + remove_equivalence_properties(&mut eq_properties, remove_condition); + assert_eq!(eq_properties.len(), 0); + + let new_condition = (&Column::new("a", 0), &Column::new("b", 1)); + combine_equivalence_properties(&mut eq_properties, new_condition); + let new_condition = (&Column::new("a", 0), &Column::new("c", 2)); + combine_equivalence_properties(&mut eq_properties, new_condition); + let new_condition = (&Column::new("c", 2), &Column::new("d", 3)); + combine_equivalence_properties(&mut eq_properties, new_condition); + assert_eq!(eq_properties.len(), 1); + + let remove_condition = (&Column::new("a", 0), &Column::new("b", 1)); + remove_equivalence_properties(&mut eq_properties, remove_condition); + assert_eq!(eq_properties.len(), 1); + assert_eq!(eq_properties[0].len(), 2); + + Ok(()) + } + + #[test] + fn merge_equivalence_properties_with_alias_test() -> Result<()> { + let mut eq_properties: Vec = vec![]; + let mut alias_map = HashMap::new(); + alias_map.insert( + Column::new("a", 0), + vec![Column::new("a1", 1), Column::new("a2", 2)], + ); + + merge_equivalence_properties_with_alias(&mut eq_properties, &alias_map); + assert_eq!(eq_properties.len(), 1); + assert_eq!(eq_properties[0].len(), 3); + + let mut alias_map = HashMap::new(); + alias_map.insert( + Column::new("a", 0), + vec![Column::new("a3", 1), Column::new("a4", 2)], + ); + merge_equivalence_properties_with_alias(&mut eq_properties, &alias_map); + assert_eq!(eq_properties.len(), 1); + assert_eq!(eq_properties[0].len(), 5); + Ok(()) + } } From f21a2c1927b2a26f48ed397c4360e7a498b21c1c Mon Sep 17 00:00:00 2001 From: Wang Date: Tue, 1 Nov 2022 11:01:39 +0800 Subject: [PATCH 2/8] Fix hash join output_partitioning --- .../core/src/physical_plan/joins/hash_join.rs | 35 ++++++++++++++----- .../physical_plan/joins/sort_merge_join.rs | 4 +-- .../core/src/physical_plan/joins/utils.rs | 8 ++--- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 30a3444cf8c13..8c10529df4cdc 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -62,9 +62,9 @@ use crate::physical_plan::{ expressions::PhysicalSortExpr, hash_utils::create_hashes, joins::utils::{ - build_join_schema, check_join_is_valid, estimate_join_statistics, - join_equivalence_properties, join_output_partitioning, ColumnIndex, JoinFilter, - JoinOn, JoinSide, + adjust_right_output_partitioning, build_join_schema, check_join_is_valid, + estimate_join_statistics, join_equivalence_properties, + partitioned_join_output_partitioning, ColumnIndex, JoinFilter, JoinOn, JoinSide, }, metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, @@ -298,12 +298,29 @@ impl ExecutionPlan for HashJoinExec { fn output_partitioning(&self) -> Partitioning { let left_columns_len = self.left.schema().fields.len(); - join_output_partitioning( - self.join_type, - self.left.output_partitioning(), - self.right.output_partitioning(), - left_columns_len, - ) + match self.mode { + PartitionMode::CollectLeft => match join_type { + JoinType::Inner | JoinType::Right => adjust_right_output_partitioning( + self.right.output_partitioning(), + left_columns_len, + ), + JoinType::RightSemi | JoinType::RightAnti => { + self.right.output_partitioning() + } + JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::Full => Partitioning::UnknownPartitioning( + self.right.output_partitioning().partition_count(), + ), + }, + PartitionMode::Partitioned => partitioned_join_output_partitioning( + self.join_type, + self.left.output_partitioning(), + self.right.output_partitioning(), + left_columns_len, + ), + } } // TODO Output ordering might be kept for some cases. diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 1f1fa8ec9e35b..0741f662f75d2 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -44,7 +44,7 @@ use crate::physical_plan::expressions::Column; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::joins::utils::{ build_join_schema, check_join_is_valid, join_equivalence_properties, - join_output_partitioning, JoinOn, + partitioned_join_output_partitioning, JoinOn, }; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use crate::physical_plan::{ @@ -238,7 +238,7 @@ impl ExecutionPlan for SortMergeJoinExec { fn output_partitioning(&self) -> Partitioning { let left_columns_len = self.left.schema().fields.len(); - join_output_partitioning( + partitioned_join_output_partitioning( self.join_type, self.left.output_partitioning(), self.right.output_partitioning(), diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index 32b9820c7a37f..0464579dcac49 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -87,8 +87,8 @@ fn check_join_set_is_valid( Ok(()) } -/// Calculate the OutputPartitioning for Join Node -pub fn join_output_partitioning( +/// Calculate the OutputPartitioning for Partitioned Join +pub fn partitioned_join_output_partitioning( join_type: JoinType, left_partitioning: Partitioning, right_partitioning: Partitioning, @@ -98,11 +98,11 @@ pub fn join_output_partitioning( JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => { left_partitioning } - JoinType::RightSemi => right_partitioning, + JoinType::RightSemi | JoinType::RightAnti => right_partitioning, JoinType::Right => { adjust_right_output_partitioning(right_partitioning, left_columns_len) } - _ => Partitioning::UnknownPartitioning(right_partitioning.partition_count()), + JoinType::Full => Partitioning::UnknownPartitioning(right_partitioning.partition_count()), } } From c32d772e0cf5f9199b19f1058e4f1860a44fc885 Mon Sep 17 00:00:00 2001 From: Wang Date: Tue, 1 Nov 2022 11:09:24 +0800 Subject: [PATCH 3/8] fix --- datafusion/core/src/physical_plan/joins/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 8c10529df4cdc..97ae1cf049d98 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -299,7 +299,7 @@ impl ExecutionPlan for HashJoinExec { fn output_partitioning(&self) -> Partitioning { let left_columns_len = self.left.schema().fields.len(); match self.mode { - PartitionMode::CollectLeft => match join_type { + PartitionMode::CollectLeft => match self.join_type { JoinType::Inner | JoinType::Right => adjust_right_output_partitioning( self.right.output_partitioning(), left_columns_len, From e945c37d25cb173d03929084bcd8aac31f71580e Mon Sep 17 00:00:00 2001 From: Wang Date: Tue, 1 Nov 2022 11:37:57 +0800 Subject: [PATCH 4/8] fix format --- datafusion/core/src/physical_plan/joins/utils.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index 0464579dcac49..34bf667746540 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -102,7 +102,9 @@ pub fn partitioned_join_output_partitioning( JoinType::Right => { adjust_right_output_partitioning(right_partitioning, left_columns_len) } - JoinType::Full => Partitioning::UnknownPartitioning(right_partitioning.partition_count()), + JoinType::Full => { + Partitioning::UnknownPartitioning(right_partitioning.partition_count()) + } } } From 33e9c1851e5e0cb6e51c228dc7379363c570790c Mon Sep 17 00:00:00 2001 From: Wang Date: Thu, 3 Nov 2022 16:46:31 +0800 Subject: [PATCH 5/8] Resolve review comments --- datafusion/core/src/dataframe.rs | 20 +- .../core/src/physical_plan/aggregates/mod.rs | 19 +- .../src/physical_plan/coalesce_batches.rs | 5 +- .../src/physical_plan/coalesce_partitions.rs | 2 +- datafusion/core/src/physical_plan/filter.rs | 15 +- .../src/physical_plan/joins/cross_join.rs | 2 +- .../core/src/physical_plan/joins/hash_join.rs | 6 +- .../physical_plan/joins/sort_merge_join.rs | 6 +- .../core/src/physical_plan/joins/utils.rs | 36 +-- datafusion/core/src/physical_plan/limit.rs | 4 +- datafusion/core/src/physical_plan/mod.rs | 8 +- .../core/src/physical_plan/projection.rs | 18 +- .../core/src/physical_plan/repartition.rs | 2 +- .../core/src/physical_plan/sorts/sort.rs | 2 +- .../sorts/sort_preserving_merge.rs | 2 +- .../physical_plan/windows/window_agg_exec.rs | 2 +- datafusion/physical-expr/src/equivalence.rs | 256 +++++++++++++++++ datafusion/physical-expr/src/lib.rs | 17 +- datafusion/physical-expr/src/physical_expr.rs | 63 ----- datafusion/physical-expr/src/utils.rs | 261 +++--------------- 20 files changed, 377 insertions(+), 369 deletions(-) create mode 100644 datafusion/physical-expr/src/equivalence.rs diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index ceeaf81889c51..704704d97f3fd 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -842,6 +842,7 @@ mod tests { use super::*; use crate::execution::options::{CsvReadOptions, ParquetReadOptions}; use crate::physical_plan::ColumnarValue; + use crate::physical_plan::Partitioning; use crate::test_util; use crate::test_util::parquet_test_data; use crate::{assert_batches_sorted_eq, execution::context::SessionContext}; @@ -1541,10 +1542,20 @@ mod tests { let physical_plan = union.create_physical_plan().await?; let default_partition_count = SessionContext::new().copied_config().target_partitions; + + // For partition aware union, the output partition count should not be changed. assert_eq!( physical_plan.output_partitioning().partition_count(), default_partition_count ); + // For partition aware union, the output partition is the same with the union's inputs + for child in physical_plan.children() { + assert_eq!( + physical_plan.output_partitioning(), + child.output_partitioning() + ); + } + Ok(()) } @@ -1588,11 +1599,10 @@ mod tests { let default_partition_count = SessionContext::new().copied_config().target_partitions; - // the union's output partitioning count should be the combination of all output partitions count - assert_eq!( - physical_plan.output_partitioning().partition_count(), - default_partition_count * 2 - ); + // For non-partition aware union, the output partitioning count should be the combination of all output partitions count + assert!(matches!( + physical_plan.output_partitioning(), + Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2)); Ok(()) } } diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index d69c2d85c3f92..29a80c6f792a4 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -50,10 +50,7 @@ use crate::physical_plan::EquivalenceProperties; pub use datafusion_expr::AggregateFunction; use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator; pub use datafusion_physical_expr::expressions::create_aggregate_expr; -use datafusion_physical_expr::{ - merge_equivalence_properties_with_alias, normalize_out_expr_with_alias_schema, - truncate_equivalence_properties_not_in_schema, -}; +use datafusion_physical_expr::normalize_out_expr_with_alias_schema; use datafusion_row::{row_supported, RowType}; /// Hash aggregate modes @@ -170,6 +167,7 @@ pub struct AggregateExec { /// to the partial aggregate input_schema: SchemaRef, /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr + /// The key is the column from the input schema and the values are the columns from the output schema alias_map: HashMap>, /// Execution Metrics metrics: ExecutionPlanMetricsSet, @@ -302,7 +300,6 @@ impl ExecutionPlan for AggregateExec { } } - // TODO check the output ordering of AggregateExec fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } @@ -317,16 +314,10 @@ impl ExecutionPlan for AggregateExec { } } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { let mut input_equivalence_properties = self.input.equivalence_properties(); - merge_equivalence_properties_with_alias( - &mut input_equivalence_properties, - &self.alias_map, - ); - truncate_equivalence_properties_not_in_schema( - &mut input_equivalence_properties, - &self.schema, - ); + input_equivalence_properties.merge_properties_with_alias(&self.alias_map); + input_equivalence_properties.truncate_properties_not_in_schema(&self.schema); input_equivalence_properties } diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index 8f0de7dcb15dd..e7c492732d190 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -96,14 +96,11 @@ impl ExecutionPlan for CoalesceBatchesExec { self.input.output_partitioning() } - // Depends on how the CoalesceBatches was implemented, it is possible to keep - // the input ordering when combines small batches into larger batches - // TODO revisit the logic later fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { self.input.equivalence_properties() } diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index cdd07390ac114..816a9c9403c68 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -89,7 +89,7 @@ impl ExecutionPlan for CoalescePartitionsExec { None } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { self.input.equivalence_properties() } diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index 58ac1adec5d26..17a2355d0578a 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -38,9 +38,7 @@ use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::BinaryExpr; -use datafusion_physical_expr::{ - combine_equivalence_properties, remove_equivalence_properties, split_predicate, -}; +use datafusion_physical_expr::split_conjunction; use log::debug; @@ -119,15 +117,12 @@ impl ExecutionPlan for FilterExec { true } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { // Combine the equal predicates with the input equivalence properties let mut input_properties = self.input.equivalence_properties(); - let (equal_pairs, ne_pairs) = collect_columns_from_predicate(&self.predicate); + let (equal_pairs, _ne_pairs) = collect_columns_from_predicate(&self.predicate); for new_condition in equal_pairs { - combine_equivalence_properties(&mut input_properties, new_condition) - } - for remove_condition in ne_pairs { - remove_equivalence_properties(&mut input_properties, remove_condition) + input_properties.add_equal_conditions(new_condition) } input_properties } @@ -251,7 +246,7 @@ fn collect_columns_from_predicate(predicate: &Arc) -> EqualAnd let mut eq_predicate_columns: Vec<(&Column, &Column)> = Vec::new(); let mut ne_predicate_columns: Vec<(&Column, &Column)> = Vec::new(); - let predicates = split_predicate(predicate); + let predicates = split_conjunction(predicate); predicates.into_iter().for_each(|p| { if let Some(binary) = p.as_any().downcast_ref::() { let left = binary.left(); diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index fc86b07cee259..a71e06ccec7a7 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -170,7 +170,7 @@ impl ExecutionPlan for CrossJoinExec { None } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { let left_columns_len = self.left.schema().fields.len(); cross_join_equivalence_properties( self.left.equivalence_properties(), diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 97ae1cf049d98..07b4d6f4a1099 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -63,7 +63,7 @@ use crate::physical_plan::{ hash_utils::create_hashes, joins::utils::{ adjust_right_output_partitioning, build_join_schema, check_join_is_valid, - estimate_join_statistics, join_equivalence_properties, + combine_join_equivalence_properties, estimate_join_statistics, partitioned_join_output_partitioning, ColumnIndex, JoinFilter, JoinOn, JoinSide, }, metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, @@ -329,9 +329,9 @@ impl ExecutionPlan for HashJoinExec { None } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { let left_columns_len = self.left.schema().fields.len(); - join_equivalence_properties( + combine_join_equivalence_properties( self.join_type, self.left.equivalence_properties(), self.right.equivalence_properties(), diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 0741f662f75d2..44771ba4c27bc 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -43,7 +43,7 @@ use crate::physical_plan::common::combine_batches; use crate::physical_plan::expressions::Column; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::joins::utils::{ - build_join_schema, check_join_is_valid, join_equivalence_properties, + build_join_schema, check_join_is_valid, combine_join_equivalence_properties, partitioned_join_output_partitioning, JoinOn, }; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; @@ -250,9 +250,9 @@ impl ExecutionPlan for SortMergeJoinExec { self.output_ordering.as_deref() } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { let left_columns_len = self.left.schema().fields.len(); - join_equivalence_properties( + combine_join_equivalence_properties( self.join_type, self.left.equivalence_properties(), self.right.equivalence_properties(), diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index 34bf667746540..905e59de966fe 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -23,7 +23,7 @@ use crate::physical_plan::expressions::Column; use arrow::datatypes::{Field, Schema}; use arrow::error::ArrowError; use datafusion_common::ScalarValue; -use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::{EquivalentClass, PhysicalExpr}; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use parking_lot::Mutex; @@ -36,7 +36,6 @@ use std::task::{Context, Poll}; use crate::physical_plan::{ ColumnStatistics, EquivalenceProperties, ExecutionPlan, Partitioning, Statistics, }; -use datafusion_physical_expr::combine_equivalence_properties; use datafusion_physical_expr::rewrite::TreeNodeRewritable; /// The on clause of the join, as vector of (left, right) columns. @@ -137,19 +136,20 @@ pub fn adjust_right_output_partitioning( } } -/// Calculate the Equivalence Properties for Join Node -pub fn join_equivalence_properties( +/// Combine the Equivalence Properties for Join Node +pub fn combine_join_equivalence_properties( join_type: JoinType, - left_properties: Vec, - right_properties: Vec, + left_properties: EquivalenceProperties, + right_properties: EquivalenceProperties, left_columns_len: usize, on: &[(Column, Column)], -) -> Vec { - let mut eq_properties = match join_type { +) -> EquivalenceProperties { + let mut new_properties = match join_type { JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { let mut left_properties = left_properties; let new_right_properties = right_properties - .into_iter() + .classes() + .iter() .map(|prop| { let new_head = Column::new( prop.head().name(), @@ -162,9 +162,10 @@ pub fn join_equivalence_properties( Column::new(col.name(), left_columns_len + col.index()) }) .collect::>(); - EquivalenceProperties::new(new_head, new_others) + EquivalentClass::new(new_head, new_others) }) .collect::>(); + left_properties.extend(new_right_properties); left_properties } @@ -176,21 +177,22 @@ pub fn join_equivalence_properties( on.iter().for_each(|(column1, column2)| { let new_column2 = Column::new(column2.name(), left_columns_len + column2.index()); - combine_equivalence_properties(&mut eq_properties, (column1, &new_column2)) + new_properties.add_equal_conditions((column1, &new_column2)) }) } - eq_properties + new_properties } /// Calculate the Equivalence Properties for CrossJoin Node pub fn cross_join_equivalence_properties( - left_properties: Vec, - right_properties: Vec, + left_properties: EquivalenceProperties, + right_properties: EquivalenceProperties, left_columns_len: usize, -) -> Vec { +) -> EquivalenceProperties { let mut left_properties = left_properties; let new_right_properties = right_properties - .into_iter() + .classes() + .iter() .map(|prop| { let new_head = Column::new(prop.head().name(), left_columns_len + prop.head().index()); @@ -199,7 +201,7 @@ pub fn cross_join_equivalence_properties( .iter() .map(|col| Column::new(col.name(), left_columns_len + col.index())) .collect::>(); - EquivalenceProperties::new(new_head, new_others) + EquivalentClass::new(new_head, new_others) }) .collect::>(); left_properties.extend(new_right_properties); diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 857d8ebf88433..171e2b2f0f64f 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -122,7 +122,7 @@ impl ExecutionPlan for GlobalLimitExec { self.input.output_ordering() } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { self.input.equivalence_properties() } @@ -289,7 +289,7 @@ impl ExecutionPlan for LocalLimitExec { self.input.output_ordering() } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { self.input.equivalence_properties() } diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 329a62418fcf4..e41d59ec82033 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -195,11 +195,9 @@ pub trait ExecutionPlan: Debug + Send + Sync { .any(|dist| matches!(dist, Distribution::SinglePartition)) } - /// Get a list of equivalence properties within the plan - /// Equivalence Propertie is a set of Columns that are known to have the same value in all tuples of this relation. - /// Equivalence Propertie are generated by equality predicates, typically equijoin conditions and equality comparisons in filters. - fn equivalence_properties(&self) -> Vec { - vec![] + /// Get the EquivalenceProperties within the plan + fn equivalence_properties(&self) -> EquivalenceProperties { + EquivalenceProperties::new() } /// Get a list of child execution plans that provide the input for this plan. The returned list diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index a2f0540757a1b..2b6297f8c9f59 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -40,10 +40,7 @@ use super::expressions::{Column, PhysicalSortExpr}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; use crate::execution::context::TaskContext; -use datafusion_physical_expr::{ - merge_equivalence_properties_with_alias, normalize_out_expr_with_alias_schema, - truncate_equivalence_properties_not_in_schema, -}; +use datafusion_physical_expr::normalize_out_expr_with_alias_schema; use futures::stream::Stream; use futures::stream::StreamExt; @@ -59,6 +56,7 @@ pub struct ProjectionExec { /// The output ordering output_ordering: Option>, /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr + /// The key is the column from the input schema and the values are the columns from the output schema alias_map: HashMap>, /// Execution metrics metrics: ExecutionPlanMetricsSet, @@ -197,16 +195,10 @@ impl ExecutionPlan for ProjectionExec { // 1) Add Alias, Alias can introduce additional equivalence properties, // For example: Projection(a, a as a1, a as a2) // 2) Truncate the properties that are not in the schema of the Projection - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { let mut input_equivalence_properties = self.input.equivalence_properties(); - merge_equivalence_properties_with_alias( - &mut input_equivalence_properties, - &self.alias_map, - ); - truncate_equivalence_properties_not_in_schema( - &mut input_equivalence_properties, - &self.schema, - ); + input_equivalence_properties.merge_properties_with_alias(&self.alias_map); + input_equivalence_properties.truncate_properties_not_in_schema(&self.schema); input_equivalence_properties } diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs index d45848b57bc85..045f760c876aa 100644 --- a/datafusion/core/src/physical_plan/repartition.rs +++ b/datafusion/core/src/physical_plan/repartition.rs @@ -292,7 +292,7 @@ impl ExecutionPlan for RepartitionExec { None } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { self.input.equivalence_properties() } diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index d60bb98a74955..de25fb9252ee8 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -766,7 +766,7 @@ impl ExecutionPlan for SortExec { Some(&self.expr) } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { self.input.equivalence_properties() } diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index e45ec886ed17b..fc823705c99c0 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -136,7 +136,7 @@ impl ExecutionPlan for SortPreservingMergeExec { Some(&self.expr) } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { self.input.equivalence_properties() } diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index 3f67b0392bb8c..76ad0afb10a19 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -144,7 +144,7 @@ impl ExecutionPlan for WindowAggExec { } } - fn equivalence_properties(&self) -> Vec { + fn equivalence_properties(&self) -> EquivalenceProperties { self.input.equivalence_properties() } diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs new file mode 100644 index 0000000000000..21ceb186092cc --- /dev/null +++ b/datafusion/physical-expr/src/equivalence.rs @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expressions::Column; + +use arrow::datatypes::SchemaRef; + +use std::collections::HashMap; +use std::collections::HashSet; + +/// Equivalence Properties is a vec of EquivalentClass. +#[derive(Debug, Default, Clone)] +pub struct EquivalenceProperties { + classes: Vec, +} + +impl EquivalenceProperties { + pub fn new() -> Self { + EquivalenceProperties { classes: vec![] } + } + + pub fn classes(&self) -> &[EquivalentClass] { + &self.classes + } + + pub fn extend>(&mut self, iter: I) { + self.classes.extend(iter) + } + + /// Add new equal conditions into the EquivalenceProperties, the new equal conditions are usually comming from the + /// equality predicates in Join or Filter + pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column)) { + let mut idx1: Option = None; + let mut idx2: Option = None; + for (idx, class) in self.classes.iter_mut().enumerate() { + let contains_first = class.contains(new_conditions.0); + let contains_second = class.contains(new_conditions.1); + match (contains_first, contains_second) { + (true, false) => { + class.insert(new_conditions.1.clone()); + idx1 = Some(idx); + } + (false, true) => { + class.insert(new_conditions.0.clone()); + idx2 = Some(idx); + } + (true, true) => { + idx1 = Some(idx); + idx2 = Some(idx); + break; + } + (false, false) => {} + } + } + + match (idx1, idx2) { + (Some(idx_1), Some(idx_2)) if idx_1 != idx_2 => { + // need to merge the two existing EquivalentClasses + let second_eq_class = self.classes.get(idx_2).unwrap().clone(); + let first_eq_class = self.classes.get_mut(idx_1).unwrap(); + for prop in second_eq_class.iter() { + if !first_eq_class.contains(prop) { + first_eq_class.insert(prop.clone()); + } + } + self.classes.remove(idx_2); + } + (None, None) => { + // adding new pairs + self.classes.push(EquivalentClass::new( + new_conditions.0.clone(), + vec![new_conditions.1.clone()], + )); + } + _ => {} + } + } + + pub fn merge_properties_with_alias( + &mut self, + alias_map: &HashMap>, + ) { + for (column, columns) in alias_map { + let mut find_match = false; + for class in self.classes.iter_mut() { + if class.contains(column) { + for col in columns { + class.insert(col.clone()); + } + find_match = true; + break; + } + } + if !find_match { + self.classes + .push(EquivalentClass::new(column.clone(), columns.clone())); + } + } + } + + pub fn truncate_properties_not_in_schema(&mut self, schema: &SchemaRef) { + for class in self.classes.iter_mut() { + let mut columns_to_remove = vec![]; + for column in class.iter() { + if let Ok(idx) = schema.index_of(column.name()) { + if idx != column.index() { + columns_to_remove.push(column.clone()); + } + } else { + columns_to_remove.push(column.clone()); + } + } + for column in columns_to_remove { + class.remove(&column); + } + } + self.classes.retain(|props| props.len() > 1); + } +} + +/// Equivalent Class is a set of Columns that are known to have the same value in all tuples in a relation +/// Equivalence Class is generated by equality predicates, typically equijoin conditions and equality conditions in filters. +#[derive(Debug, Clone)] +pub struct EquivalentClass { + /// First element in the EquivalenceProperties + head: Column, + /// Other equal columns + others: HashSet, +} + +impl EquivalentClass { + pub fn new(head: Column, others: Vec) -> Self { + EquivalentClass { + head, + others: HashSet::from_iter(others), + } + } + + pub fn head(&self) -> &Column { + &self.head + } + + pub fn others(&self) -> &HashSet { + &self.others + } + + pub fn contains(&self, col: &Column) -> bool { + self.head == *col || self.others.contains(col) + } + + pub fn insert(&mut self, col: Column) -> bool { + self.others.insert(col) + } + + pub fn remove(&mut self, col: &Column) -> bool { + let removed = self.others.remove(col); + if !removed && *col == self.head { + let one_col = self.others.iter().next().cloned(); + if let Some(col) = one_col { + let removed = self.others.remove(&col); + self.head = col; + removed + } else { + false + } + } else { + true + } + } + + pub fn iter(&self) -> impl Iterator { + std::iter::once(&self.head).chain(self.others.iter()) + } + + pub fn len(&self) -> usize { + self.others.len() + 1 + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::expressions::Column; + use datafusion_common::Result; + + #[test] + fn add_equal_conditions_test() -> Result<()> { + let mut eq_properties = EquivalenceProperties::new(); + let new_condition = (&Column::new("a", 0), &Column::new("b", 1)); + eq_properties.add_equal_conditions(new_condition); + assert_eq!(eq_properties.classes().len(), 1); + + let new_condition = (&Column::new("b", 1), &Column::new("a", 0)); + eq_properties.add_equal_conditions(new_condition); + assert_eq!(eq_properties.classes().len(), 1); + assert_eq!(eq_properties.classes()[0].len(), 2); + + let new_condition = (&Column::new("b", 1), &Column::new("c", 2)); + eq_properties.add_equal_conditions(new_condition); + assert_eq!(eq_properties.classes().len(), 1); + assert_eq!(eq_properties.classes()[0].len(), 3); + + let new_condition = (&Column::new("x", 99), &Column::new("y", 100)); + eq_properties.add_equal_conditions(new_condition); + assert_eq!(eq_properties.classes().len(), 2); + + let new_condition = (&Column::new("x", 99), &Column::new("a", 0)); + eq_properties.add_equal_conditions(new_condition); + assert_eq!(eq_properties.classes().len(), 1); + assert_eq!(eq_properties.classes()[0].len(), 5); + + Ok(()) + } + + #[test] + fn merge_equivalence_properties_with_alias_test() -> Result<()> { + let mut eq_properties = EquivalenceProperties::new(); + let mut alias_map = HashMap::new(); + alias_map.insert( + Column::new("a", 0), + vec![Column::new("a1", 1), Column::new("a2", 2)], + ); + + eq_properties.merge_properties_with_alias(&alias_map); + assert_eq!(eq_properties.classes().len(), 1); + assert_eq!(eq_properties.classes()[0].len(), 3); + + let mut alias_map = HashMap::new(); + alias_map.insert( + Column::new("a", 0), + vec![Column::new("a3", 1), Column::new("a4", 2)], + ); + eq_properties.merge_properties_with_alias(&alias_map); + assert_eq!(eq_properties.classes().len(), 1); + assert_eq!(eq_properties.classes()[0].len(), 5); + Ok(()) + } +} diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 252bf2986274e..3ddcb34bb1488 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -21,6 +21,7 @@ pub mod conditional_expressions; #[cfg(feature = "crypto_expressions")] pub mod crypto_expressions; pub mod datetime_expressions; +pub mod equivalence; pub mod execution_props; pub mod expressions; pub mod functions; @@ -45,17 +46,15 @@ pub mod window; // reexport this to maintain compatibility with anything that used from_slice previously pub use aggregate::AggregateExpr; pub use datafusion_common::from_slice; -pub use physical_expr::{ - EquivalenceProperties, ExprBoundaries, PhysicalExpr, PhysicalExprStats, -}; +pub use equivalence::EquivalenceProperties; +pub use equivalence::EquivalentClass; +pub use physical_expr::{ExprBoundaries, PhysicalExpr, PhysicalExprStats}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; pub use sort_expr::PhysicalSortExpr; pub use utils::{ - combine_equivalence_properties, expr_list_eq_any_order, expr_list_eq_strict_order, - merge_equivalence_properties_with_alias, normalize_expr_with_equivalence_properties, - normalize_out_expr_with_alias_schema, - normalize_sort_expr_with_equivalence_properties, remove_equivalence_properties, - sort_expr_list_eq_strict_order, split_predicate, - truncate_equivalence_properties_not_in_schema, + expr_list_eq_any_order, expr_list_eq_strict_order, + normalize_expr_with_equivalence_properties, normalize_out_expr_with_alias_schema, + normalize_sort_expr_with_equivalence_properties, sort_expr_list_eq_strict_order, + split_conjunction, }; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index e56554d2e2661..772a02097bfd5 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -19,7 +19,6 @@ use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use crate::expressions::Column; use datafusion_common::{ColumnStatistics, DataFusionError, Result, ScalarValue}; use datafusion_expr::ColumnarValue; @@ -27,7 +26,6 @@ use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; use arrow::compute::{and_kleene, filter_record_batch, is_not_null, SlicesIterator}; use std::any::Any; -use std::collections::HashSet; use std::fmt::{Debug, Display}; use std::sync::Arc; @@ -138,67 +136,6 @@ impl PhysicalExprStats for BasicExpressionStats { } } -#[derive(Debug, Clone)] -pub struct EquivalenceProperties { - /// First element in the EquivalenceProperties - head: Column, - /// Other equal columns - others: HashSet, -} - -impl EquivalenceProperties { - pub fn new(head: Column, others: Vec) -> Self { - EquivalenceProperties { - head, - others: HashSet::from_iter(others), - } - } - - pub fn head(&self) -> &Column { - &self.head - } - - pub fn others(&self) -> &HashSet { - &self.others - } - - pub fn contains(&self, col: &Column) -> bool { - self.head == *col || self.others.contains(col) - } - - pub fn insert(&mut self, col: Column) -> bool { - self.others.insert(col) - } - - pub fn remove(&mut self, col: &Column) -> bool { - let removed = self.others.remove(col); - if !removed && *col == self.head { - let one_col = self.others.iter().next().cloned(); - if let Some(col) = one_col { - let removed = self.others.remove(&col); - self.head = col; - removed - } else { - false - } - } else { - true - } - } - - pub fn iter(&self) -> impl Iterator { - std::iter::once(&self.head).chain(self.others.iter()) - } - - pub fn len(&self) -> usize { - self.others.len() + 1 - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - /// Returns a copy of this expr if we change any child according to the pointer comparison. /// The size of `children` must be equal to the size of `PhysicalExpr::children()`. /// Allow the vtable address comparisons for PhysicalExpr Trait Objects,it is harmless even diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 7105e2da50582..78ce9c931bf08 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. +use crate::equivalence::EquivalentClass; use crate::expressions::BinaryExpr; use crate::expressions::Column; use crate::expressions::UnKnownColumn; -use crate::physical_expr::EquivalenceProperties; use crate::rewrite::TreeNodeRewritable; use crate::PhysicalExpr; use crate::PhysicalSortExpr; @@ -77,129 +77,32 @@ pub fn sort_expr_list_eq_strict_order( /// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs. /// /// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"] -/// -pub fn split_predicate(predicate: &Arc) -> Vec<&Arc> { +pub fn split_conjunction( + predicate: &Arc, +) -> Vec<&Arc> { + split_conjunction_impl(predicate, vec![]) +} + +fn split_conjunction_impl<'a>( + predicate: &'a Arc, + mut exprs: Vec<&'a Arc>, +) -> Vec<&'a Arc> { match predicate.as_any().downcast_ref::() { Some(binary) => match binary.op() { Operator::And => { - let mut vec1 = split_predicate(binary.left()); - let vec2 = split_predicate(binary.right()); - vec1.extend(vec2); - vec1 - } - _ => vec![predicate], - }, - None => vec![], - } -} - -/// Combine the new equal condition with the existing equivalence properties. -pub fn combine_equivalence_properties( - eq_properties: &mut Vec, - new_condition: (&Column, &Column), -) { - let mut idx1 = -1i32; - let mut idx2 = -1i32; - for (idx, prop) in eq_properties.iter_mut().enumerate() { - let contains_first = prop.contains(new_condition.0); - let contains_second = prop.contains(new_condition.1); - if contains_first && !contains_second { - prop.insert(new_condition.1.clone()); - idx1 = idx as i32; - } else if !contains_first && contains_second { - prop.insert(new_condition.0.clone()); - idx2 = idx as i32; - } else if contains_first && contains_second { - idx1 = idx as i32; - idx2 = idx as i32; - break; - } - } - - if idx1 != -1 && idx2 != -1 && idx1 != idx2 { - // need to merge the two existing properties - let second_properties = eq_properties.get(idx2 as usize).unwrap().clone(); - let first_properties = eq_properties.get_mut(idx1 as usize).unwrap(); - for prop in second_properties.iter() { - if !first_properties.contains(prop) { - first_properties.insert(prop.clone()); - } - } - eq_properties.remove(idx2 as usize); - } else if idx1 == -1 && idx2 == -1 { - // adding new pairs - eq_properties.push(EquivalenceProperties::new( - new_condition.0.clone(), - vec![new_condition.1.clone()], - )) - } -} - -pub fn remove_equivalence_properties( - eq_properties: &mut Vec, - remove_condition: (&Column, &Column), -) { - let mut match_idx = -1i32; - for (idx, prop) in eq_properties.iter_mut().enumerate() { - let contains_first = prop.contains(remove_condition.0); - let contains_second = prop.contains(remove_condition.1); - if contains_first && contains_second { - match_idx = idx as i32; - break; - } - } - if match_idx >= 0 { - let matches = eq_properties.get_mut(match_idx as usize).unwrap(); - matches.remove(remove_condition.0); - matches.remove(remove_condition.1); - if matches.len() <= 1 { - eq_properties.remove(match_idx as usize); - } - } -} - -pub fn merge_equivalence_properties_with_alias( - eq_properties: &mut Vec, - alias_map: &HashMap>, -) { - for (column, columns) in alias_map { - let mut find_match = false; - for (_idx, prop) in eq_properties.iter_mut().enumerate() { - if prop.contains(column) { - for col in columns { - prop.insert(col.clone()); - } - find_match = true; - break; + let exprs = split_conjunction_impl(binary.left(), exprs); + split_conjunction_impl(binary.right(), exprs) } - } - if !find_match { - eq_properties - .push(EquivalenceProperties::new(column.clone(), columns.clone())); - } - } -} - -pub fn truncate_equivalence_properties_not_in_schema( - eq_properties: &mut Vec, - schema: &SchemaRef, -) { - for props in eq_properties.iter_mut() { - let mut columns_to_remove = vec![]; - for column in props.iter() { - if let Ok(idx) = schema.index_of(column.name()) { - if idx != column.index() { - columns_to_remove.push(column.clone()); - } - } else { - columns_to_remove.push(column.clone()); + _ => { + exprs.push(predicate); + exprs } - } - for column in columns_to_remove { - props.remove(&column); + }, + None => { + exprs.push(predicate); + exprs } } - eq_properties.retain(|props| props.len() > 1); } /// Normalize the output expressions based on Alias Map and SchemaRef. @@ -245,37 +148,41 @@ pub fn normalize_out_expr_with_alias_schema( pub fn normalize_expr_with_equivalence_properties( expr: Arc, - eq_properties: &[EquivalenceProperties], + eq_properties: &[EquivalentClass], ) -> Arc { - let mut normalized = expr.clone(); - if let Some(column) = expr.as_any().downcast_ref::() { - for prop in eq_properties { - if prop.contains(column) { - normalized = Arc::new(prop.head().clone()); - break; + let expr_clone = expr.clone(); + expr_clone + .transform(&|expr| match expr.as_any().downcast_ref::() { + Some(column) => { + let mut normalized: Option> = None; + for class in eq_properties { + if class.contains(column) { + normalized = Some(Arc::new(class.head().clone())); + break; + } + } + normalized } - } - } - normalized + None => None, + }) + .unwrap_or(expr) } pub fn normalize_sort_expr_with_equivalence_properties( sort_expr: PhysicalSortExpr, - eq_properties: &[EquivalenceProperties], + eq_properties: &[EquivalentClass], ) -> PhysicalSortExpr { - let mut normalized = sort_expr.clone(); - if let Some(column) = sort_expr.expr.as_any().downcast_ref::() { - for prop in eq_properties { - if prop.contains(column) { - normalized = PhysicalSortExpr { - expr: Arc::new(prop.head().clone()), - options: sort_expr.options, - }; - break; - } + let normalized_expr = + normalize_expr_with_equivalence_properties(sort_expr.expr.clone(), eq_properties); + + if sort_expr.expr.ne(&normalized_expr) { + PhysicalSortExpr { + expr: normalized_expr, + options: sort_expr.options, } + } else { + sort_expr } - normalized } #[cfg(test)] @@ -435,80 +342,4 @@ mod tests { Ok(()) } - - #[test] - fn combine_equivalence_properties_test() -> Result<()> { - let mut eq_properties: Vec = vec![]; - let new_condition = (&Column::new("a", 0), &Column::new("b", 1)); - combine_equivalence_properties(&mut eq_properties, new_condition); - assert_eq!(eq_properties.len(), 1); - - let new_condition = (&Column::new("b", 1), &Column::new("a", 0)); - combine_equivalence_properties(&mut eq_properties, new_condition); - assert_eq!(eq_properties.len(), 1); - assert_eq!(eq_properties[0].len(), 2); - - let new_condition = (&Column::new("b", 1), &Column::new("c", 2)); - combine_equivalence_properties(&mut eq_properties, new_condition); - assert_eq!(eq_properties.len(), 1); - assert_eq!(eq_properties[0].len(), 3); - - let new_condition = (&Column::new("x", 99), &Column::new("y", 100)); - combine_equivalence_properties(&mut eq_properties, new_condition); - assert_eq!(eq_properties.len(), 2); - - let new_condition = (&Column::new("x", 99), &Column::new("a", 0)); - combine_equivalence_properties(&mut eq_properties, new_condition); - assert_eq!(eq_properties.len(), 1); - assert_eq!(eq_properties[0].len(), 5); - - Ok(()) - } - - #[test] - fn remove_equivalence_properties_test() -> Result<()> { - let mut eq_properties: Vec = vec![]; - let remove_condition = (&Column::new("a", 0), &Column::new("b", 1)); - remove_equivalence_properties(&mut eq_properties, remove_condition); - assert_eq!(eq_properties.len(), 0); - - let new_condition = (&Column::new("a", 0), &Column::new("b", 1)); - combine_equivalence_properties(&mut eq_properties, new_condition); - let new_condition = (&Column::new("a", 0), &Column::new("c", 2)); - combine_equivalence_properties(&mut eq_properties, new_condition); - let new_condition = (&Column::new("c", 2), &Column::new("d", 3)); - combine_equivalence_properties(&mut eq_properties, new_condition); - assert_eq!(eq_properties.len(), 1); - - let remove_condition = (&Column::new("a", 0), &Column::new("b", 1)); - remove_equivalence_properties(&mut eq_properties, remove_condition); - assert_eq!(eq_properties.len(), 1); - assert_eq!(eq_properties[0].len(), 2); - - Ok(()) - } - - #[test] - fn merge_equivalence_properties_with_alias_test() -> Result<()> { - let mut eq_properties: Vec = vec![]; - let mut alias_map = HashMap::new(); - alias_map.insert( - Column::new("a", 0), - vec![Column::new("a1", 1), Column::new("a2", 2)], - ); - - merge_equivalence_properties_with_alias(&mut eq_properties, &alias_map); - assert_eq!(eq_properties.len(), 1); - assert_eq!(eq_properties[0].len(), 3); - - let mut alias_map = HashMap::new(); - alias_map.insert( - Column::new("a", 0), - vec![Column::new("a3", 1), Column::new("a4", 2)], - ); - merge_equivalence_properties_with_alias(&mut eq_properties, &alias_map); - assert_eq!(eq_properties.len(), 1); - assert_eq!(eq_properties[0].len(), 5); - Ok(()) - } } From 8dc5d5fcd94d4b4a17fe119c055e64c9ac47a145 Mon Sep 17 00:00:00 2001 From: Wang Date: Thu, 3 Nov 2022 16:55:15 +0800 Subject: [PATCH 6/8] tiny fix --- datafusion/physical-expr/src/equivalence.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 21ceb186092cc..69377f1cbd81c 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -136,7 +136,7 @@ impl EquivalenceProperties { /// Equivalence Class is generated by equality predicates, typically equijoin conditions and equality conditions in filters. #[derive(Debug, Clone)] pub struct EquivalentClass { - /// First element in the EquivalenceProperties + /// First element in the EquivalentClass head: Column, /// Other equal columns others: HashSet, From 443723584037f01c177c148663bfe7b1b778c63d Mon Sep 17 00:00:00 2001 From: Wang Date: Fri, 4 Nov 2022 00:20:28 +0800 Subject: [PATCH 7/8] UT to verify hash join output_partitioning --- datafusion/core/src/dataframe.rs | 72 ++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 704704d97f3fd..3d1c8d009418c 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -843,6 +843,7 @@ mod tests { use crate::execution::options::{CsvReadOptions, ParquetReadOptions}; use crate::physical_plan::ColumnarValue; use crate::physical_plan::Partitioning; + use crate::physical_plan::PhysicalExpr; use crate::test_util; use crate::test_util::parquet_test_data; use crate::{assert_batches_sorted_eq, execution::context::SessionContext}; @@ -852,6 +853,7 @@ mod tests { avg, cast, count, count_distinct, create_udf, lit, max, min, sum, BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFunction, }; + use datafusion_physical_expr::expressions::Column; #[tokio::test] async fn select_columns() -> Result<()> { @@ -1605,4 +1607,74 @@ mod tests { Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2)); Ok(()) } + + #[tokio::test] + async fn verify_join_output_partitioning() -> Result<()> { + let left = test_table().await?.select_columns(&["c1", "c2"])?; + let right = test_table_with_name("c2") + .await? + .select_columns(&["c1", "c2"])? + .with_column_renamed("c2.c1", "c2_c1")? + .with_column_renamed("c2.c2", "c2_c2")?; + + let all_join_types = vec![ + JoinType::Inner, + JoinType::Left, + JoinType::Right, + JoinType::Full, + JoinType::LeftSemi, + JoinType::RightSemi, + JoinType::LeftAnti, + JoinType::RightAnti, + ]; + + let default_partition_count = + SessionContext::new().copied_config().target_partitions; + + for join_type in all_join_types { + let join = left.join( + right.clone(), + join_type, + &["c1", "c2"], + &["c2_c1", "c2_c2"], + None, + )?; + let physical_plan = join.create_physical_plan().await?; + let out_partitioning = physical_plan.output_partitioning(); + let join_schema = physical_plan.schema(); + + match join_type { + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti => { + let left_exprs: Vec> = vec![ + Arc::new(Column::new_with_schema("c1", &join_schema).unwrap()), + Arc::new(Column::new_with_schema("c2", &join_schema).unwrap()), + ]; + assert_eq!( + out_partitioning, + Partitioning::Hash(left_exprs, default_partition_count) + ); + } + JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + let right_exprs: Vec> = vec![ + Arc::new(Column::new_with_schema("c2_c1", &join_schema).unwrap()), + Arc::new(Column::new_with_schema("c2_c2", &join_schema).unwrap()), + ]; + assert_eq!( + out_partitioning, + Partitioning::Hash(right_exprs, default_partition_count) + ); + } + JoinType::Full => { + assert!(matches!( + out_partitioning, + Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count)); + } + } + } + + Ok(()) + } } From fcfbf6638a4af8f5ed0866ab15e12ba3a4476f35 Mon Sep 17 00:00:00 2001 From: Wang Date: Fri, 4 Nov 2022 10:48:07 +0800 Subject: [PATCH 8/8] fix comments --- datafusion/physical-expr/src/equivalence.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 69377f1cbd81c..411a492a522fb 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -133,7 +133,7 @@ impl EquivalenceProperties { } /// Equivalent Class is a set of Columns that are known to have the same value in all tuples in a relation -/// Equivalence Class is generated by equality predicates, typically equijoin conditions and equality conditions in filters. +/// Equivalent Class is generated by equality predicates, typically equijoin conditions and equality conditions in filters. #[derive(Debug, Clone)] pub struct EquivalentClass { /// First element in the EquivalentClass