From a9dfd29cce71c648012886d16bd98f40ddbd6697 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Mar 2023 12:09:10 -0400 Subject: [PATCH 1/6] Use TableReference for TableScan --- datafusion/common/src/dfschema.rs | 16 +-- datafusion/common/src/table_reference.rs | 89 +++++++++++++---- datafusion/core/src/execution/context.rs | 7 +- datafusion/core/src/physical_plan/planner.rs | 5 +- datafusion/core/src/test_util/mod.rs | 16 +-- datafusion/expr/src/logical_plan/builder.rs | 99 ++++++++++++++----- datafusion/expr/src/logical_plan/plan.rs | 16 +-- datafusion/expr/src/utils.rs | 12 +-- datafusion/optimizer/src/inline_table_scan.rs | 2 +- datafusion/optimizer/src/push_down_filter.rs | 6 +- .../optimizer/src/push_down_projection.rs | 3 +- datafusion/proto/proto/datafusion.proto | 9 +- datafusion/proto/src/generated/pbjson.rs | 30 +++--- datafusion/proto/src/generated/prost.rs | 12 +-- datafusion/proto/src/logical_plan/mod.rs | 23 +++-- datafusion/sql/src/relation/mod.rs | 7 +- .../substrait/src/logical_plan/producer.rs | 2 +- 17 files changed, 234 insertions(+), 120 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 52a0b3d40a92a..67a367c5ef3f8 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -108,13 +108,17 @@ impl DFSchema { Ok(Self { fields, metadata }) } - /// Create a `DFSchema` from an Arrow schema - pub fn try_from_qualified_schema(qualifier: &str, schema: &Schema) -> Result { + /// Create a `DFSchema` from an Arrow schema and a given qualifier + pub fn try_from_qualified_schema<'a>( + qualifier: impl Into>, + schema: &Schema, + ) -> Result { + let qualifier = qualifier.into(); Self::new_with_metadata( schema .fields() .iter() - .map(|f| DFField::from_qualified(qualifier.to_string(), f.clone())) + .map(|f| DFField::from_qualified(qualifier.clone(), f.clone())) .collect(), schema.metadata().clone(), ) @@ -662,12 +666,12 @@ impl DFField { } /// Create a qualified field from an existing Arrow field - pub fn from_qualified( - qualifier: impl Into, + pub fn from_qualified<'a>( + qualifier: impl Into>, field: Field, ) -> Self { Self { - qualifier: Some(qualifier.into()), + qualifier: Some(qualifier.into().to_owned_reference()), field, } } diff --git a/datafusion/common/src/table_reference.rs b/datafusion/common/src/table_reference.rs index 257073681934f..6d46f6b691c2e 100644 --- a/datafusion/common/src/table_reference.rs +++ b/datafusion/common/src/table_reference.rs @@ -35,7 +35,38 @@ impl<'a> std::fmt::Display for ResolvedTableReference<'a> { } } -/// Represents a path to a table that may require further resolution +/// [`TableReference`]s represent a multi part identifier (path) to a +/// table that may require further resolution. +/// +/// # Creating [`TableReference`] +/// +/// When converting strings to [`TableReference`]s, the string is +/// parsed as though it were a SQL identifier, normalizing (convert to +/// lowercase) any unquoted identifiers. +/// +/// See [`TableReference::bare`] to create references without applying +/// normalization semantics +/// +/// # Examples +/// ``` +/// # use datafusion_common::TableReference; +/// // Get a table reference to 'mytable' +/// let table_reference = TableReference::from("mytable"); +/// assert_eq!(table_reference, TableReference::bare("mytable")); +/// +/// // Get a table reference to 'mytable' (note the capitalization) +/// let table_reference = TableReference::from("MyTable"); +/// assert_eq!(table_reference, TableReference::bare("mytable")); +/// +/// // Get a table reference to 'MyTable' (note the capitalization) using double quotes +/// // (programatically it is better to use `TableReference::bare` for this) +/// let table_reference = TableReference::from(r#""MyTable""#); +/// assert_eq!(table_reference, TableReference::bare("MyTable")); +/// +/// // Get a table reference to 'myschema.mytable' (note the capitalization) +/// let table_reference = TableReference::from("MySchema.MyTable"); +/// assert_eq!(table_reference, TableReference::partial("myschema", "mytable")); +///``` #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum TableReference<'a> { /// An unqualified table reference, e.g. "table" @@ -61,6 +92,16 @@ pub enum TableReference<'a> { }, } +/// This is a [`TableReference`] that has 'static lifetime (aka it +/// owns the underlying string) +/// +/// To convert a [`TableReference`] to an [`OwnedTableReference`], use +/// +/// ``` +/// # use datafusion_common::{OwnedTableReference, TableReference}; +/// let table_reference = TableReference::from("mytable"); +/// let owned_reference = table_reference.to_owned_reference(); +/// ``` pub type OwnedTableReference = TableReference<'static>; impl std::fmt::Display for TableReference<'_> { @@ -80,14 +121,23 @@ impl std::fmt::Display for TableReference<'_> { } impl<'a> TableReference<'a> { - /// Convenience method for creating a `Bare` variant of `TableReference` + /// Convenience method for creating a typed none `None` + pub fn none() -> Option> { + None + } + + /// Convenience method for creating a [`TableReference::Bare`] + /// + /// As described on [`TableReference`] this does *NO* parsing at + /// all -- so "Foo.Bar" stays as a reference to the table named + /// "Foo.Bar" (rather than "foo"."bar") pub fn bare(table: impl Into>) -> TableReference<'a> { TableReference::Bare { table: table.into(), } } - /// Convenience method for creating a `Partial` variant of `TableReference` + /// Convenience method for creating a [`TableReference::Partial`] pub fn partial( schema: impl Into>, table: impl Into>, @@ -98,7 +148,7 @@ impl<'a> TableReference<'a> { } } - /// Convenience method for creating a `Full` variant of `TableReference` + /// Convenience method for creating a [`TableReference::Full`] pub fn full( catalog: impl Into>, schema: impl Into>, @@ -136,12 +186,12 @@ impl<'a> TableReference<'a> { } } - /// Compare with another `TableReference` as if both are resolved. + /// Compare with another [`TableReference`] as if both are resolved. /// This allows comparing across variants, where if a field is not present /// in both variants being compared then it is ignored in the comparison. /// - /// e.g. this allows a `TableReference::Bare` to be considered equal to a - /// fully qualified `TableReference::Full` if the table names match. + /// e.g. this allows a [`TableReference::Bare`] to be considered equal to a + /// fully qualified [`TableReference::Full`] if the table names match. pub fn resolved_eq(&self, other: &Self) -> bool { match self { TableReference::Bare { table } => table == other.table(), @@ -189,7 +239,8 @@ impl<'a> TableReference<'a> { } } - /// Converts directly into an [`OwnedTableReference`] + /// Converts directly into an [`OwnedTableReference`] by copying + /// the underlying data. pub fn to_owned_reference(&self) -> OwnedTableReference { match self { Self::Full { @@ -212,6 +263,16 @@ impl<'a> TableReference<'a> { } /// Forms a string where the identifiers are quoted + /// + /// # Example + /// ``` + /// # use datafusion_common::TableReference; + /// let table_reference = TableReference::partial("myschema", "mytable"); + /// assert_eq!(table_reference.to_quoted_string(), r#""myschema"."mytable""#); + /// + /// let table_reference = TableReference::partial("MySchema", "MyTable"); + /// assert_eq!(table_reference.to_quoted_string(), r#""MySchema"."MyTable""#); + /// ``` pub fn to_quoted_string(&self) -> String { match self { TableReference::Bare { table } => quote_identifier(table), @@ -231,14 +292,8 @@ impl<'a> TableReference<'a> { } } - /// Forms a [`TableReference`] by attempting to parse `s` as a multipart identifier, - /// failing that then taking the entire unnormalized input as the identifier itself. - /// - /// Will normalize (convert to lowercase) any unquoted identifiers. - /// - /// e.g. `Foo` will be parsed as `foo`, and `"Foo"".bar"` will be parsed as - /// `Foo".bar` (note the preserved case and requiring two double quotes to represent - /// a single double quote in the identifier) + /// Forms a [`TableReference`] by parsing `s` as a multipart + /// identifier. See docs on [`TableReference`] for more details. pub fn parse_str(s: &'a str) -> Self { let mut parts = parse_identifiers_normalized(s); @@ -260,7 +315,7 @@ impl<'a> TableReference<'a> { } } -/// Parse a `String` into a OwnedTableReference +/// Parse a `String` into a OwnedTableReference as a SQL identifier. impl From for OwnedTableReference { fn from(s: String) -> Self { TableReference::parse_str(&s).to_owned_reference() diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 1466bcba0f86f..1cb199c2bc398 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1020,10 +1020,9 @@ impl SessionContext { table_ref: impl Into>, ) -> Result { let table_ref = table_ref.into(); - let table = table_ref.table().to_owned(); - let provider = self.table_provider(table_ref).await?; + let provider = self.table_provider(table_ref.to_owned_reference()).await?; let plan = LogicalPlanBuilder::scan( - &table, + table_ref.to_owned_reference(), provider_as_source(Arc::clone(&provider)), None, )? @@ -1037,7 +1036,7 @@ impl SessionContext { table_ref: impl Into>, ) -> Result> { let table_ref = table_ref.into(); - let table = table_ref.table().to_owned(); + let table = table_ref.table().to_string(); let schema = self.state.read().schema_for_ref(table_ref)?; match schema.table(&table).await { Some(ref provider) => Ok(Arc::clone(provider)), diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 0dba182d8a355..839d9ef9cb7cd 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1876,7 +1876,7 @@ mod tests { use arrow::array::{ArrayRef, DictionaryArray, Int32Array}; use arrow::datatypes::{DataType, Field, Int32Type, SchemaRef}; use arrow::record_batch::RecordBatch; - use datafusion_common::assert_contains; + use datafusion_common::{assert_contains, TableReference}; use datafusion_common::{DFField, DFSchema, DFSchemaRef}; use datafusion_expr::{ col, lit, sum, Extension, GroupingSet, LogicalPlanBuilder, @@ -2518,7 +2518,8 @@ Internal error: Optimizer rule 'type_coercion' failed due to unexpected error: E match ctx.read_csv(path, options).await?.into_optimized_plan()? { LogicalPlan::TableScan(ref scan) => { let mut scan = scan.clone(); - scan.table_name = name.to_string(); + let table_reference: TableReference<'_> = name.into(); + scan.table_name = table_reference.to_owned_reference(); let new_schema = scan .projected_schema .as_ref() diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 61cdc188f8231..8c570cff8fd00 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -39,7 +39,7 @@ use crate::prelude::{CsvReadOptions, SessionContext}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; -use datafusion_common::{DataFusionError, Statistics}; +use datafusion_common::{DataFusionError, Statistics, TableReference}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; use datafusion_physical_expr::PhysicalSortExpr; use futures::Stream; @@ -219,11 +219,8 @@ pub fn scan_empty( ) -> Result { let table_schema = Arc::new(table_schema.clone()); let provider = Arc::new(EmptyTable::new(table_schema)); - LogicalPlanBuilder::scan( - name.unwrap_or(UNNAMED_TABLE), - provider_as_source(provider), - projection, - ) + let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string()); + LogicalPlanBuilder::scan(name, provider_as_source(provider), projection) } /// Scan an empty data source with configured partition, mainly used in tests. @@ -235,11 +232,8 @@ pub fn scan_empty_with_partitions( ) -> Result { let table_schema = Arc::new(table_schema.clone()); let provider = Arc::new(EmptyTable::new(table_schema).with_partitions(partitions)); - LogicalPlanBuilder::scan( - name.unwrap_or(UNNAMED_TABLE), - provider_as_source(provider), - projection, - ) + let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string()); + LogicalPlanBuilder::scan(name, provider_as_source(provider), projection) } /// Get the schema for the aggregate_test_* csv files diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 5616b7c52b640..476203e1ef58a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -41,7 +41,7 @@ use crate::{ use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::{ Column, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, - ScalarValue, ToDFSchema, + ScalarValue, TableReference, ToDFSchema, }; use std::any::Any; use std::cmp::Ordering; @@ -192,8 +192,39 @@ impl LogicalPlanBuilder { } /// Convert a table provider into a builder with a TableScan + /// + /// Note that if you pass a string as `table_name`, it is treated + /// as a SQL identifier, as described on [`TableReference`] and + /// thus is normalized + /// + /// # Example: + /// ``` + /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, + /// # logical_plan::builder::LogicalTableSource, logical_plan::table_scan + /// # }; + /// # use std::sync::Arc; + /// # use arrow::datatypes::{Schema, DataType, Field}; + /// # use datafusion_common::TableReference; + /// # + /// # let employee_schema = Arc::new(Schema::new(vec![ + /// # Field::new("id", DataType::Int32, false), + /// # ])) as _; + /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema)); + /// // Scan table_source with the name "mytable" (after normalization) + /// # let table = table_source.clone(); + /// let scan = LogicalPlanBuilder::scan("MyTable", table, None); + /// + /// // Scan table_source with the name "MyTable" by enclosing in quotes + /// # let table = table_source.clone(); + /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None); + /// + /// // Scan table_source with the name "MyTable" by forming the table reference + /// # let table = table_source.clone(); + /// let table_reference = TableReference::bare("MyTable"); + /// let scan = LogicalPlanBuilder::scan(table_reference, table, None); + /// ``` pub fn scan( - table_name: impl Into, + table_name: impl Into, table_source: Arc, projection: Option>, ) -> Result { @@ -217,19 +248,13 @@ impl LogicalPlanBuilder { /// Convert a table provider into a builder with a TableScan pub fn scan_with_filters( - table_name: impl Into, + table_name: impl Into, table_source: Arc, projection: Option>, filters: Vec, ) -> Result { let table_name = table_name.into(); - if table_name.is_empty() { - return Err(DataFusionError::Plan( - "table_name cannot be empty".to_string(), - )); - } - let schema = table_source.schema(); let projected_schema = projection @@ -239,7 +264,7 @@ impl LogicalPlanBuilder { p.iter() .map(|i| { DFField::from_qualified( - table_name.to_string(), + table_name.clone(), schema.field(*i).clone(), ) }) @@ -248,7 +273,7 @@ impl LogicalPlanBuilder { ) }) .unwrap_or_else(|| { - DFSchema::try_from_qualified_schema(&table_name, &schema) + DFSchema::try_from_qualified_schema(table_name.clone(), &schema) })?; let table_scan = LogicalPlan::TableScan(TableScan { @@ -1196,14 +1221,22 @@ pub fn subquery_alias( /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema. /// This is mostly used for testing and documentation. -pub fn table_scan( - name: Option<&str>, +pub fn table_scan<'a>( + name: Option>>, table_schema: &Schema, projection: Option>, ) -> Result { + let table_source = table_source(table_schema); + let name = name + .map(|n| n.into()) + .unwrap_or_else(|| OwnedTableReference::bare(UNNAMED_TABLE)) + .to_owned_reference(); + LogicalPlanBuilder::scan(name, table_source, projection) +} + +fn table_source(table_schema: &Schema) -> Arc { let table_schema = Arc::new(table_schema.clone()); - let table_source = Arc::new(LogicalTableSource { table_schema }); - LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection) + Arc::new(LogicalTableSource { table_schema }) } /// Wrap projection for a plan, if the join keys contains normal expression. @@ -1361,12 +1394,24 @@ mod tests { #[test] fn plan_builder_schema() { let schema = employee_schema(); - let plan = table_scan(Some("employee_csv"), &schema, None).unwrap(); - - let expected = - DFSchema::try_from_qualified_schema("employee_csv", &schema).unwrap(); + let projection = None; + let plan = + LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection) + .unwrap(); + let expected = DFSchema::try_from_qualified_schema( + TableReference::bare("employee_csv"), + &schema, + ) + .unwrap(); + assert_eq!(&expected, plan.schema().as_ref()); - assert_eq!(&expected, plan.schema().as_ref()) + // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifer + // (and thus normalized to "employee"csv") as well + let projection = None; + let plan = + LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection) + .unwrap(); + assert_eq!(&expected, plan.schema().as_ref()); } #[test] @@ -1481,9 +1526,10 @@ mod tests { #[test] fn plan_builder_union_different_num_columns_error() -> Result<()> { - let plan1 = table_scan(None, &employee_schema(), Some(vec![3]))?; - - let plan2 = table_scan(None, &employee_schema(), Some(vec![3, 4]))?; + let plan1 = + table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?; + let plan2 = + table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?; let expected = "Error during planning: Union queries must have the same number of columns, (left is 1, right is 2)"; let err_msg1 = plan1.clone().union(plan2.clone().build()?).unwrap_err(); @@ -1707,9 +1753,10 @@ mod tests { #[test] fn plan_builder_intersect_different_num_columns_error() -> Result<()> { - let plan1 = table_scan(None, &employee_schema(), Some(vec![3]))?; - - let plan2 = table_scan(None, &employee_schema(), Some(vec![3, 4]))?; + let plan1 = + table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?; + let plan2 = + table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?; let expected = "Error during planning: INTERSECT/EXCEPT query must have the same number of columns. \ Left is 1 and right is 2."; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 777a5871c95e3..6aedf2b2a469a 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -34,7 +34,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, - ScalarValue, + ScalarValue, TableReference, }; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug, Display, Formatter}; @@ -1437,9 +1437,12 @@ impl SubqueryAlias { alias: impl Into, ) -> datafusion_common::Result { let alias = alias.into(); + let table_ref = TableReference::bare(&alias); let schema: Schema = plan.schema().as_ref().clone().into(); - let schema = - DFSchemaRef::new(DFSchema::try_from_qualified_schema(&alias, &schema)?); + let schema = DFSchemaRef::new(DFSchema::try_from_qualified_schema( + table_ref.to_owned_reference(), + &schema, + )?); Ok(SubqueryAlias { input: Arc::new(plan), alias, @@ -1521,9 +1524,7 @@ pub struct Window { #[derive(Clone)] pub struct TableScan { /// The name of the table - // TODO: change to OwnedTableReference - // see: https://github.com/apache/arrow-datafusion/issues/5522 - pub table_name: String, + pub table_name: OwnedTableReference, /// The source of the table pub source: Arc, /// Optional column indices to use as a projection @@ -2388,12 +2389,13 @@ mod tests { } fn test_plan() -> LogicalPlan { + let table_ref: Option> = None; let schema = Schema::new(vec![ Field::new("id", DataType::Int32, false), Field::new("state", DataType::Utf8, false), ]); - table_scan(None, &schema, Some(vec![0, 1])) + table_scan(table_ref, &schema, Some(vec![0, 1])) .unwrap() .filter(col("state").eq(lit("CO"))) .unwrap() diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 3d8b447926f9f..ea8607feeb0c3 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -644,14 +644,10 @@ pub fn from_plan( })) } LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { - let schema = inputs[0].schema().as_ref().clone().into(); - let schema = - DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?); - Ok(LogicalPlan::SubqueryAlias(SubqueryAlias { - alias: alias.clone(), - input: Arc::new(inputs[0].clone()), - schema, - })) + Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new( + inputs[0].clone(), + alias.clone(), + )?)) } LogicalPlan::Limit(Limit { skip, fetch, .. }) => Ok(LogicalPlan::Limit(Limit { skip: *skip, diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 6b58399192eed..7336582f1a421 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -57,7 +57,7 @@ impl OptimizerRule for InlineTableScan { generate_projection_expr(projection, sub_plan)?; let plan = LogicalPlanBuilder::from(sub_plan.clone()) .project(projection_exprs)? - .alias(table_name)?; + .alias(table_name.to_string())?; Ok(Some(plan.build()?)) } else { Ok(None) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index f910268dfe860..55c77e51e2d3d 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -2082,7 +2082,7 @@ mod tests { let test_provider = PushDownProvider { filter_support }; let table_scan = LogicalPlan::TableScan(TableScan { - table_name: "test".to_string(), + table_name: "test".into(), filters: vec![], projected_schema: Arc::new(DFSchema::try_from( (*test_provider.schema()).clone(), @@ -2154,7 +2154,7 @@ mod tests { }; let table_scan = LogicalPlan::TableScan(TableScan { - table_name: "test".to_string(), + table_name: "test".into(), filters: vec![col("a").eq(lit(10i64)), col("b").gt(lit(11i64))], projected_schema: Arc::new(DFSchema::try_from( (*test_provider.schema()).clone(), @@ -2183,7 +2183,7 @@ mod tests { }; let table_scan = LogicalPlan::TableScan(TableScan { - table_name: "test".to_string(), + table_name: "test".into(), filters: vec![], projected_schema: Arc::new(DFSchema::try_from( (*test_provider.schema()).clone(), diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index 4e9d5f0395545..767077aa0c027 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -496,8 +496,7 @@ fn push_down_scan( let mut projection: BTreeSet = used_columns .iter() .filter(|c| { - c.relation.is_none() - || c.relation.as_ref().unwrap().to_string() == scan.table_name + c.relation.is_none() || c.relation.as_ref().unwrap() == &scan.table_name }) .map(|c| schema.index_of(&c.name)) .filter_map(ArrowResult::ok) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index d0deb567fad4a..93ae09194dade 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -97,7 +97,8 @@ message ParquetFormat { message AvroFormat {} message ListingTableScanNode { - string table_name = 1; + reserved 1; // was string table_name + OwnedTableReference table_name = 14; repeated string paths = 2; string file_extension = 3; ProjectionColumns projection = 4; @@ -115,7 +116,8 @@ message ListingTableScanNode { } message ViewTableScanNode { - string table_name = 1; + reserved 1; // was string table_name + OwnedTableReference table_name = 6; LogicalPlanNode input = 2; Schema schema = 3; ProjectionColumns projection = 4; @@ -124,7 +126,8 @@ message ViewTableScanNode { // Logical Plan to Scan a CustomTableProvider registered at runtime message CustomTableScanNode { - string table_name = 1; + reserved 1; // was string table_name + OwnedTableReference table_name = 6; ProjectionColumns projection = 2; Schema schema = 3; repeated LogicalExprNode filters = 4; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 7246b5f2b2cf8..0ccb62fc940e2 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -4286,7 +4286,7 @@ impl serde::Serialize for CustomTableScanNode { { use serde::ser::SerializeStruct; let mut len = 0; - if !self.table_name.is_empty() { + if self.table_name.is_some() { len += 1; } if self.projection.is_some() { @@ -4302,8 +4302,8 @@ impl serde::Serialize for CustomTableScanNode { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion.CustomTableScanNode", len)?; - if !self.table_name.is_empty() { - struct_ser.serialize_field("tableName", &self.table_name)?; + if let Some(v) = self.table_name.as_ref() { + struct_ser.serialize_field("tableName", v)?; } if let Some(v) = self.projection.as_ref() { struct_ser.serialize_field("projection", v)?; @@ -4399,7 +4399,7 @@ impl<'de> serde::Deserialize<'de> for CustomTableScanNode { if table_name__.is_some() { return Err(serde::de::Error::duplicate_field("tableName")); } - table_name__ = Some(map.next_value()?); + table_name__ = map.next_value()?; } GeneratedField::Projection => { if projection__.is_some() { @@ -4430,7 +4430,7 @@ impl<'de> serde::Deserialize<'de> for CustomTableScanNode { } } Ok(CustomTableScanNode { - table_name: table_name__.unwrap_or_default(), + table_name: table_name__, projection: projection__, schema: schema__, filters: filters__.unwrap_or_default(), @@ -9586,7 +9586,7 @@ impl serde::Serialize for ListingTableScanNode { { use serde::ser::SerializeStruct; let mut len = 0; - if !self.table_name.is_empty() { + if self.table_name.is_some() { len += 1; } if !self.paths.is_empty() { @@ -9620,8 +9620,8 @@ impl serde::Serialize for ListingTableScanNode { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion.ListingTableScanNode", len)?; - if !self.table_name.is_empty() { - struct_ser.serialize_field("tableName", &self.table_name)?; + if let Some(v) = self.table_name.as_ref() { + struct_ser.serialize_field("tableName", v)?; } if !self.paths.is_empty() { struct_ser.serialize_field("paths", &self.paths)?; @@ -9779,7 +9779,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { if table_name__.is_some() { return Err(serde::de::Error::duplicate_field("tableName")); } - table_name__ = Some(map.next_value()?); + table_name__ = map.next_value()?; } GeneratedField::Paths => { if paths__.is_some() { @@ -9861,7 +9861,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { } } Ok(ListingTableScanNode { - table_name: table_name__.unwrap_or_default(), + table_name: table_name__, paths: paths__.unwrap_or_default(), file_extension: file_extension__.unwrap_or_default(), projection: projection__, @@ -20863,7 +20863,7 @@ impl serde::Serialize for ViewTableScanNode { { use serde::ser::SerializeStruct; let mut len = 0; - if !self.table_name.is_empty() { + if self.table_name.is_some() { len += 1; } if self.input.is_some() { @@ -20879,8 +20879,8 @@ impl serde::Serialize for ViewTableScanNode { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion.ViewTableScanNode", len)?; - if !self.table_name.is_empty() { - struct_ser.serialize_field("tableName", &self.table_name)?; + if let Some(v) = self.table_name.as_ref() { + struct_ser.serialize_field("tableName", v)?; } if let Some(v) = self.input.as_ref() { struct_ser.serialize_field("input", v)?; @@ -20975,7 +20975,7 @@ impl<'de> serde::Deserialize<'de> for ViewTableScanNode { if table_name__.is_some() { return Err(serde::de::Error::duplicate_field("tableName")); } - table_name__ = Some(map.next_value()?); + table_name__ = map.next_value()?; } GeneratedField::Input => { if input__.is_some() { @@ -21004,7 +21004,7 @@ impl<'de> serde::Deserialize<'de> for ViewTableScanNode { } } Ok(ViewTableScanNode { - table_name: table_name__.unwrap_or_default(), + table_name: table_name__, input: input__, schema: schema__, projection: projection__, diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index da95fd558fef6..6209989deff0e 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -130,8 +130,8 @@ pub struct AvroFormat {} #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ListingTableScanNode { - #[prost(string, tag = "1")] - pub table_name: ::prost::alloc::string::String, + #[prost(message, optional, tag = "14")] + pub table_name: ::core::option::Option, #[prost(string, repeated, tag = "2")] pub paths: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, #[prost(string, tag = "3")] @@ -171,8 +171,8 @@ pub mod listing_table_scan_node { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ViewTableScanNode { - #[prost(string, tag = "1")] - pub table_name: ::prost::alloc::string::String, + #[prost(message, optional, tag = "6")] + pub table_name: ::core::option::Option, #[prost(message, optional, boxed, tag = "2")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(message, optional, tag = "3")] @@ -186,8 +186,8 @@ pub struct ViewTableScanNode { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CustomTableScanNode { - #[prost(string, tag = "1")] - pub table_name: ::prost::alloc::string::String, + #[prost(message, optional, tag = "6")] + pub table_name: ::core::option::Option, #[prost(message, optional, tag = "2")] pub projection: ::core::option::Option, #[prost(message, optional, tag = "3")] diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 9c9be27328e47..cce62653b6b75 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -398,8 +398,13 @@ impl AsLogicalPlan for LogicalPlanNode { let provider = ListingTable::try_new(config)?; + let table_name = from_owned_table_reference( + scan.table_name.as_ref(), + "ListingTableScan", + )?; + LogicalPlanBuilder::scan_with_filters( - &scan.table_name, + table_name, provider_as_source(Arc::new(provider)), projection, filters, @@ -430,8 +435,11 @@ impl AsLogicalPlan for LogicalPlanNode { ctx, )?; + let table_name = + from_owned_table_reference(scan.table_name.as_ref(), "CustomScan")?; + LogicalPlanBuilder::scan_with_filters( - &scan.table_name, + table_name, provider_as_source(provider), projection, filters, @@ -730,8 +738,11 @@ impl AsLogicalPlan for LogicalPlanNode { let provider = ViewTable::try_new(input, definition)?; + let table_name = + from_owned_table_reference(scan.table_name.as_ref(), "ViewScan")?; + LogicalPlanBuilder::scan( - &scan.table_name, + table_name, provider_as_source(Arc::new(provider)), projection, )? @@ -843,7 +854,7 @@ impl AsLogicalPlan for LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::ListingScan( protobuf::ListingTableScanNode { file_format_type: Some(file_format_type), - table_name: table_name.to_owned(), + table_name: Some(table_name.clone().into()), collect_stat: options.collect_stat, file_extension: options.file_extension.clone(), table_partition_cols: options @@ -868,7 +879,7 @@ impl AsLogicalPlan for LogicalPlanNode { Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new( protobuf::ViewTableScanNode { - table_name: table_name.to_owned(), + table_name: Some(table_name.clone().into()), input: Some(Box::new( protobuf::LogicalPlanNode::try_from_logical_plan( view_table.logical_plan(), @@ -890,7 +901,7 @@ impl AsLogicalPlan for LogicalPlanNode { .try_encode_table_provider(provider, &mut bytes) .map_err(|e| context!("Error serializing custom table", e))?; let scan = CustomScan(CustomTableScanNode { - table_name: table_name.clone(), + table_name: Some(table_name.clone().into()), projection, schema: Some(schema), filters, diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 19a278666efb6..1b369fedf3290 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -35,10 +35,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let table_name = table_ref.to_string(); let cte = planner_context.ctes.get(&table_name); ( - match (cte, self.schema_provider.get_table_provider(table_ref)) { + match ( + cte, + self.schema_provider.get_table_provider(table_ref.clone()), + ) { (Some(cte_plan), _) => Ok(cte_plan.clone()), (_, Ok(provider)) => { - LogicalPlanBuilder::scan(&table_name, provider, None)?.build() + LogicalPlanBuilder::scan(table_ref, provider, None)?.build() } (None, Err(e)) => Err(e), }?, diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index cf4003c1c80e0..f891cf6cc9f1c 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -124,7 +124,7 @@ pub fn to_substrait_rel( }), advanced_extension: None, read_type: Some(ReadType::NamedTable(NamedTable { - names: vec![scan.table_name.clone()], + names: vec![scan.table_name.to_string()], advanced_extension: None, })), }))), From 54b0d8acfb705eaa5ed02c72dc70d9baeadf1b81 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Mar 2023 14:16:29 -0400 Subject: [PATCH 2/6] revert doc changes --- datafusion/common/src/table_reference.rs | 84 +++++------------------- 1 file changed, 17 insertions(+), 67 deletions(-) diff --git a/datafusion/common/src/table_reference.rs b/datafusion/common/src/table_reference.rs index 6d46f6b691c2e..aad66473aec3f 100644 --- a/datafusion/common/src/table_reference.rs +++ b/datafusion/common/src/table_reference.rs @@ -35,38 +35,7 @@ impl<'a> std::fmt::Display for ResolvedTableReference<'a> { } } -/// [`TableReference`]s represent a multi part identifier (path) to a -/// table that may require further resolution. -/// -/// # Creating [`TableReference`] -/// -/// When converting strings to [`TableReference`]s, the string is -/// parsed as though it were a SQL identifier, normalizing (convert to -/// lowercase) any unquoted identifiers. -/// -/// See [`TableReference::bare`] to create references without applying -/// normalization semantics -/// -/// # Examples -/// ``` -/// # use datafusion_common::TableReference; -/// // Get a table reference to 'mytable' -/// let table_reference = TableReference::from("mytable"); -/// assert_eq!(table_reference, TableReference::bare("mytable")); -/// -/// // Get a table reference to 'mytable' (note the capitalization) -/// let table_reference = TableReference::from("MyTable"); -/// assert_eq!(table_reference, TableReference::bare("mytable")); -/// -/// // Get a table reference to 'MyTable' (note the capitalization) using double quotes -/// // (programatically it is better to use `TableReference::bare` for this) -/// let table_reference = TableReference::from(r#""MyTable""#); -/// assert_eq!(table_reference, TableReference::bare("MyTable")); -/// -/// // Get a table reference to 'myschema.mytable' (note the capitalization) -/// let table_reference = TableReference::from("MySchema.MyTable"); -/// assert_eq!(table_reference, TableReference::partial("myschema", "mytable")); -///``` +/// Represents a path to a table that may require further resolution #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum TableReference<'a> { /// An unqualified table reference, e.g. "table" @@ -92,16 +61,6 @@ pub enum TableReference<'a> { }, } -/// This is a [`TableReference`] that has 'static lifetime (aka it -/// owns the underlying string) -/// -/// To convert a [`TableReference`] to an [`OwnedTableReference`], use -/// -/// ``` -/// # use datafusion_common::{OwnedTableReference, TableReference}; -/// let table_reference = TableReference::from("mytable"); -/// let owned_reference = table_reference.to_owned_reference(); -/// ``` pub type OwnedTableReference = TableReference<'static>; impl std::fmt::Display for TableReference<'_> { @@ -126,18 +85,14 @@ impl<'a> TableReference<'a> { None } - /// Convenience method for creating a [`TableReference::Bare`] - /// - /// As described on [`TableReference`] this does *NO* parsing at - /// all -- so "Foo.Bar" stays as a reference to the table named - /// "Foo.Bar" (rather than "foo"."bar") + /// Convenience method for creating a `Bare` variant of `TableReference` pub fn bare(table: impl Into>) -> TableReference<'a> { TableReference::Bare { table: table.into(), } } - /// Convenience method for creating a [`TableReference::Partial`] + /// Convenience method for creating a `Partial` variant of `TableReference` pub fn partial( schema: impl Into>, table: impl Into>, @@ -148,7 +103,7 @@ impl<'a> TableReference<'a> { } } - /// Convenience method for creating a [`TableReference::Full`] + /// Convenience method for creating a `Full` variant of `TableReference` pub fn full( catalog: impl Into>, schema: impl Into>, @@ -186,12 +141,12 @@ impl<'a> TableReference<'a> { } } - /// Compare with another [`TableReference`] as if both are resolved. + /// Compare with another `TableReference` as if both are resolved. /// This allows comparing across variants, where if a field is not present /// in both variants being compared then it is ignored in the comparison. /// - /// e.g. this allows a [`TableReference::Bare`] to be considered equal to a - /// fully qualified [`TableReference::Full`] if the table names match. + /// e.g. this allows a `TableReference::Bare` to be considered equal to a + /// fully qualified `TableReference::Full` if the table names match. pub fn resolved_eq(&self, other: &Self) -> bool { match self { TableReference::Bare { table } => table == other.table(), @@ -239,8 +194,7 @@ impl<'a> TableReference<'a> { } } - /// Converts directly into an [`OwnedTableReference`] by copying - /// the underlying data. + /// Converts directly into an [`OwnedTableReference`] pub fn to_owned_reference(&self) -> OwnedTableReference { match self { Self::Full { @@ -263,16 +217,6 @@ impl<'a> TableReference<'a> { } /// Forms a string where the identifiers are quoted - /// - /// # Example - /// ``` - /// # use datafusion_common::TableReference; - /// let table_reference = TableReference::partial("myschema", "mytable"); - /// assert_eq!(table_reference.to_quoted_string(), r#""myschema"."mytable""#); - /// - /// let table_reference = TableReference::partial("MySchema", "MyTable"); - /// assert_eq!(table_reference.to_quoted_string(), r#""MySchema"."MyTable""#); - /// ``` pub fn to_quoted_string(&self) -> String { match self { TableReference::Bare { table } => quote_identifier(table), @@ -292,8 +236,14 @@ impl<'a> TableReference<'a> { } } - /// Forms a [`TableReference`] by parsing `s` as a multipart - /// identifier. See docs on [`TableReference`] for more details. + /// Forms a [`TableReference`] by attempting to parse `s` as a multipart identifier, + /// failing that then taking the entire unnormalized input as the identifier itself. + /// + /// Will normalize (convert to lowercase) any unquoted identifiers. + /// + /// e.g. `Foo` will be parsed as `foo`, and `"Foo"".bar"` will be parsed as + /// `Foo".bar` (note the preserved case and requiring two double quotes to represent + /// a single double quote in the identifier) pub fn parse_str(s: &'a str) -> Self { let mut parts = parse_identifiers_normalized(s); @@ -315,7 +265,7 @@ impl<'a> TableReference<'a> { } } -/// Parse a `String` into a OwnedTableReference as a SQL identifier. +/// Parse a `String` into a OwnedTableReference impl From for OwnedTableReference { fn from(s: String) -> Self { TableReference::parse_str(&s).to_owned_reference() From 2c3b6d7e6c9052f8ed353c33f46d4a3ecd73a2f4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Mar 2023 17:33:09 -0400 Subject: [PATCH 3/6] Use TableReference::none --- datafusion/expr/src/logical_plan/plan.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6aedf2b2a469a..35efbfd24fce4 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2389,13 +2389,12 @@ mod tests { } fn test_plan() -> LogicalPlan { - let table_ref: Option> = None; let schema = Schema::new(vec![ Field::new("id", DataType::Int32, false), Field::new("state", DataType::Utf8, false), ]); - table_scan(table_ref, &schema, Some(vec![0, 1])) + table_scan(TableReference::none(), table_ref, &schema, Some(vec![0, 1])) .unwrap() .filter(col("state").eq(lit("CO"))) .unwrap() From feac21fb440ffa12426aee7dd95ac89c7fbd0aab Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Mar 2023 17:33:18 -0400 Subject: [PATCH 4/6] Apply suggestions from code review Co-authored-by: Jeffrey <22608443+Jefffrey@users.noreply.github.com> --- datafusion/core/src/physical_plan/planner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 839d9ef9cb7cd..9cc907b28b61d 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -2518,8 +2518,8 @@ Internal error: Optimizer rule 'type_coercion' failed due to unexpected error: E match ctx.read_csv(path, options).await?.into_optimized_plan()? { LogicalPlan::TableScan(ref scan) => { let mut scan = scan.clone(); - let table_reference: TableReference<'_> = name.into(); - scan.table_name = table_reference.to_owned_reference(); + let table_reference = TableReference::from(name).to_owned_reference(); + scan.table_name = table_reference; let new_schema = scan .projected_schema .as_ref() From ef1bc06e97454eed9820f67e77ea89bb07ea95c7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Mar 2023 17:39:09 -0400 Subject: [PATCH 5/6] Add comment --- datafusion/optimizer/src/inline_table_scan.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 7336582f1a421..722a70cb380db 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -57,6 +57,15 @@ impl OptimizerRule for InlineTableScan { generate_projection_expr(projection, sub_plan)?; let plan = LogicalPlanBuilder::from(sub_plan.clone()) .project(projection_exprs)? + // Since this This is creating a subquery like: + //```sql + // ... + // FROM as "table_name" + // ``` + // + // it doesn't make sense to have a qualified + // reference (e.g. "foo"."bar") -- this convert to + // string .alias(table_name.to_string())?; Ok(Some(plan.build()?)) } else { From c2344898ffa8cf40129b9e0f63c14a5b1ca2237e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Mar 2023 17:46:49 -0400 Subject: [PATCH 6/6] Restore table check --- datafusion/expr/src/logical_plan/builder.rs | 18 ++++++++++++++++++ datafusion/expr/src/logical_plan/plan.rs | 2 +- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 476203e1ef58a..c5e6237854501 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -255,6 +255,12 @@ impl LogicalPlanBuilder { ) -> Result { let table_name = table_name.into(); + if table_name.table().is_empty() { + return Err(DataFusionError::Plan( + "table_name cannot be empty".to_string(), + )); + } + let schema = table_source.schema(); let projected_schema = projection @@ -1414,6 +1420,18 @@ mod tests { assert_eq!(&expected, plan.schema().as_ref()); } + #[test] + fn plan_builder_empty_name() { + let schema = employee_schema(); + let projection = None; + let err = + LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err(); + assert_eq!( + err.to_string(), + "Error during planning: table_name cannot be empty" + ); + } + #[test] fn plan_builder_aggregate() -> Result<()> { let plan = diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 35efbfd24fce4..a6ba1c96154f6 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2394,7 +2394,7 @@ mod tests { Field::new("state", DataType::Utf8, false), ]); - table_scan(TableReference::none(), table_ref, &schema, Some(vec![0, 1])) + table_scan(TableReference::none(), &schema, Some(vec![0, 1])) .unwrap() .filter(col("state").eq(lit("CO"))) .unwrap()