From bc04ba5c5881184c4977ca19a289140928a1a71c Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Sat, 7 Jan 2023 19:52:00 +1100 Subject: [PATCH] Support wildcard select on multiple column using joins --- datafusion/expr/src/logical_plan/builder.rs | 5 +- datafusion/expr/src/utils.rs | 24 +++++++-- datafusion/sql/src/planner.rs | 59 ++++++++++++++++++++- 3 files changed, 80 insertions(+), 8 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index d49af378b16f1..63783a110667e 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1042,8 +1042,9 @@ pub fn project( Expr::Wildcard => { projected_expr.extend(expand_wildcard(input_schema, &plan)?) } - Expr::QualifiedWildcard { ref qualifier } => projected_expr - .extend(expand_qualified_wildcard(qualifier, input_schema, &plan)?), + Expr::QualifiedWildcard { ref qualifier } => { + projected_expr.extend(expand_qualified_wildcard(qualifier, input_schema)?) + } _ => projected_expr .push(columnize_expr(normalize_col(e, &plan)?, input_schema)), } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 8cf79c612e323..682d1332143c5 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -150,13 +150,23 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result>(); // sort join columns to make sure we consistently keep the same // qualified column cols.sort(); - cols.into_iter().skip(1) + let mut out_column_names: HashSet = HashSet::new(); + cols.into_iter() + .filter_map(|c| { + if out_column_names.contains(&c.name) { + Some(c) + } else { + out_column_names.insert(c.name); + None + } + }) + .collect::>() }) .collect::>(); @@ -186,7 +196,6 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result Result> { let qualified_fields: Vec = schema .fields_with_qualified(qualifier) @@ -198,9 +207,14 @@ pub fn expand_qualified_wildcard( "Invalid qualifier {qualifier}" ))); } - let qualifier_schema = + let qualified_schema = DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?; - expand_wildcard(&qualifier_schema, plan) + // if qualified, allow all columns in output (i.e. ignore using column check) + Ok(qualified_schema + .fields() + .iter() + .map(|f| Expr::Column(f.qualified_column())) + .collect::>()) } /// (expr, "is the SortExpr for window (either comes from PARTITION BY or ORDER BY columns)") diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index e0c2847f87e96..34a078e3c5f2b 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -1469,7 +1469,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let qualifier = format!("{object_name}"); // do not expand from outer schema - expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), plan) + expand_qualified_wildcard(&qualifier, plan.schema().as_ref()) } } } @@ -2390,6 +2390,63 @@ mod tests { quick_test(sql, expected); } + #[test] + fn using_join_multiple_keys() { + let sql = "SELECT * FROM person a join person b using (id, age)"; + let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \ + b.first_name, b.last_name, b.state, b.salary, b.birth_date, b.😀\ + \n Inner Join: Using a.id = b.id, a.age = b.age\ + \n SubqueryAlias: a\ + \n TableScan: person\ + \n SubqueryAlias: b\ + \n TableScan: person"; + quick_test(sql, expected); + } + + #[test] + fn using_join_multiple_keys_subquery() { + let sql = "SELECT age FROM (SELECT * FROM person a join person b using (id, age, state))"; + let expected = "Projection: a.age\ + \n Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \ + b.first_name, b.last_name, b.salary, b.birth_date, b.😀\ + \n Inner Join: Using a.id = b.id, a.age = b.age, a.state = b.state\ + \n SubqueryAlias: a\ + \n TableScan: person\ + \n SubqueryAlias: b\ + \n TableScan: person"; + quick_test(sql, expected); + } + + #[test] + fn using_join_multiple_keys_select_all_columns() { + let sql = "SELECT a.*, b.* FROM person a join person b using (id, age)"; + let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \ + b.id, b.first_name, b.last_name, b.age, b.state, b.salary, b.birth_date, b.😀\ + \n Inner Join: Using a.id = b.id, a.age = b.age\ + \n SubqueryAlias: a\ + \n TableScan: person\ + \n SubqueryAlias: b\ + \n TableScan: person"; + quick_test(sql, expected); + } + + #[test] + fn using_join_multiple_keys_multiple_joins() { + let sql = "SELECT * FROM person a join person b using (id, age, state) join person c using (id, age, state)"; + let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \ + b.first_name, b.last_name, b.salary, b.birth_date, b.😀, \ + c.first_name, c.last_name, c.salary, c.birth_date, c.😀\ + \n Inner Join: Using a.id = c.id, a.age = c.age, a.state = c.state\ + \n Inner Join: Using a.id = b.id, a.age = b.age, a.state = b.state\ + \n SubqueryAlias: a\ + \n TableScan: person\ + \n SubqueryAlias: b\ + \n TableScan: person\ + \n SubqueryAlias: c\ + \n TableScan: person"; + quick_test(sql, expected); + } + #[test] fn select_with_having() { let sql = "SELECT id, age