From 4c271a2181174a2a50b08ab5f7099cdd1f61c1fd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 20 Jul 2023 13:34:17 -0500 Subject: [PATCH 1/4] Minor: Add docs and a test for unnest --- datafusion/core/src/physical_plan/unnest.rs | 17 ++++++ datafusion/core/tests/dataframe/mod.rs | 65 +++++++++++++++++++++ datafusion/expr/src/logical_plan/plan.rs | 20 ++++++- 3 files changed, 101 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs index 7a213dffeb516..48507e0276920 100644 --- a/datafusion/core/src/physical_plan/unnest.rs +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -44,6 +44,23 @@ use crate::physical_plan::{ use super::DisplayAs; /// Unnest the given column by joining the row with each value in the nested type. +/// +/// For example, calling unnest(c1) results in the following: +/// +/// ```text +/// ┌─────────┐ ┌─────┐ ┌─────────┐ ┌─────┐ +/// │ {1, 2} │ │ A │ Unnest │ 1 │ │ A │ +/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ +/// │ null │ │ B │ │ 2 │ │ A │ +/// ├─────────┤ ├─────┤ ────────────▶ ├─────────┤ ├─────┤ +/// │ {} │ │ D │ │ null │ │ B │ +/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ +/// │ {3} │ │ E │ │ null │ │ D │ +/// └─────────┘ └─────┘ ├─────────┤ ├─────┤ +/// c1 c2 │ 3 │ │ E │ +/// └─────────┘ └─────┘ +/// c1 c2 +/// ``` #[derive(Debug)] pub struct UnnestExec { /// Input execution plan diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index aecec35f2e16d..9b47880624a09 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1114,6 +1114,71 @@ async fn unnest_fixed_list() -> Result<()> { Ok(()) } +#[tokio::test] +async fn unnest_fixed_list_nonull() -> Result<()> { + let mut shape_id_builder = UInt32Builder::new(); + let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2); + + for idx in 0..6 { + // Append shape id. + shape_id_builder.append_value(idx as u32 + 1); + + tags_builder + .values() + .append_value(format!("tag{}1", idx + 1)); + tags_builder + .values() + .append_value(format!("tag{}2", idx + 1)); + tags_builder.append(true); + } + + let batch = RecordBatch::try_from_iter(vec![ + ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef), + ("tags", Arc::new(tags_builder.finish()) as ArrayRef), + ])?; + + let ctx = SessionContext::new(); + ctx.register_batch("shapes", batch)?; + let df = ctx.table("shapes").await?; + + let results = df.clone().collect().await?; + let expected = vec![ + "+----------+----------------+", + "| shape_id | tags |", + "+----------+----------------+", + "| 1 | [tag11, tag12] |", + "| 2 | [tag21, tag22] |", + "| 3 | [tag31, tag32] |", + "| 4 | [tag41, tag42] |", + "| 5 | [tag51, tag52] |", + "| 6 | [tag61, tag62] |", + "+----------+----------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + let results = df.unnest_column("tags")?.collect().await?; + let expected = vec![ + "+----------+-------+", + "| shape_id | tags |", + "+----------+-------+", + "| 1 | tag11 |", + "| 1 | tag12 |", + "| 2 | tag21 |", + "| 2 | tag22 |", + "| 3 | tag31 |", + "| 3 | tag32 |", + "| 4 | tag41 |", + "| 4 | tag42 |", + "| 5 | tag51 |", + "| 5 | tag52 |", + "| 6 | tag61 |", + "| 6 | tag62 |", + "+----------+-------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} #[tokio::test] async fn unnest_aggregate_columns() -> Result<()> { const NUM_ROWS: usize = 5; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 0485973364cf4..bd4ff79d37445 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -120,7 +120,8 @@ pub enum LogicalPlan { Ddl(DdlStatement), /// Describe the schema of table DescribeTable(DescribeTable), - /// Unnest a column that contains a nested list type. + /// Unnest a column that contains a nested list type. See + /// [`Unnest`] for more details. Unnest(Unnest), } @@ -1753,6 +1754,23 @@ pub enum Partitioning { } /// Unnest a column that contains a nested list type. +/// +/// For example, calling unnest(c1) results in the following: +/// +/// ```text +/// ┌─────────┐ ┌─────┐ ┌─────────┐ ┌─────┐ +/// │ {1, 2} │ │ A │ Unnest │ 1 │ │ A │ +/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ +/// │ null │ │ B │ │ 2 │ │ A │ +/// ├─────────┤ ├─────┤ ────────────▶ ├─────────┤ ├─────┤ +/// │ {} │ │ D │ │ null │ │ B │ +/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ +/// │ {3} │ │ E │ │ null │ │ D │ +/// └─────────┘ └─────┘ ├─────────┤ ├─────┤ +/// c1 c2 │ 3 │ │ E │ +/// └─────────┘ └─────┘ +/// c1 c2 +/// ``` #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Unnest { /// The incoming logical plan From f29f7f69ab1cec31976b3af53fefc93d96db6883 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 25 Jul 2023 09:30:30 -0400 Subject: [PATCH 2/4] Update docs --- datafusion/core/src/dataframe.rs | 4 +- datafusion/core/src/physical_plan/unnest.rs | 24 ++-------- datafusion/expr/src/logical_plan/plan.rs | 51 +++++++++++++++------ 3 files changed, 45 insertions(+), 34 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index e1109b7fc403d..1d92c392981ce 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -149,7 +149,8 @@ impl DataFrame { Ok(DataFrame::new(self.session_state, project_plan)) } - /// Expand each list element of a column to multiple rows. + /// Expand each list element of a column to multiple rows. See + /// [Unnest] for more details. /// /// ``` /// # use datafusion::prelude::*; @@ -162,6 +163,7 @@ impl DataFrame { /// # Ok(()) /// # } /// ``` + /// [Unnest]: crate::logical_expr::Unnest pub fn unnest_column(self, column: &str) -> Result { let plan = LogicalPlanBuilder::from(self.plan) .unnest_column(column)? diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs index 48507e0276920..523d394dcb2bb 100644 --- a/datafusion/core/src/physical_plan/unnest.rs +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -//! Defines the unnest column plan for unnesting values in a column that contains a list -//! type, conceptually is like joining each row with all the values in the list column. +//! [UnnestExec] for unnesting / flattening Lists + use arrow::array::{ new_null_array, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, Int32Array, LargeListArray, ListArray, PrimitiveArray, @@ -43,24 +43,10 @@ use crate::physical_plan::{ use super::DisplayAs; -/// Unnest the given column by joining the row with each value in the nested type. -/// -/// For example, calling unnest(c1) results in the following: +/// Unnest the given column by joining the row with each value in the +/// nested type. See [Unnest] for details and examples /// -/// ```text -/// ┌─────────┐ ┌─────┐ ┌─────────┐ ┌─────┐ -/// │ {1, 2} │ │ A │ Unnest │ 1 │ │ A │ -/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ -/// │ null │ │ B │ │ 2 │ │ A │ -/// ├─────────┤ ├─────┤ ────────────▶ ├─────────┤ ├─────┤ -/// │ {} │ │ D │ │ null │ │ B │ -/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ -/// │ {3} │ │ E │ │ null │ │ D │ -/// └─────────┘ └─────┘ ├─────────┤ ├─────┤ -/// c1 c2 │ 3 │ │ E │ -/// └─────────┘ └─────┘ -/// c1 c2 -/// ``` +/// [Unnest]: crate::logical_expr::Unnest #[derive(Debug)] pub struct UnnestExec { /// Input execution plan diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index dce400bb95ee7..20a12e6e3c7b4 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1763,23 +1763,46 @@ pub enum Partitioning { DistributeBy(Vec), } -/// Unnest a column that contains a nested list type. +/// Unnest a column that contains a nested list type, replicating +/// values in the other, non nested rows. /// -/// For example, calling unnest(c1) results in the following: +/// Conceptually this operation is like joining each row with all the +/// values in the list column. /// +/// If `preserve_nulls` is false, the default, nulls and empty lists +/// from the input column are not carried through to the output. +/// +/// If `preserve_nulls` is true, nulls from the input column are +/// carried through to the output. +/// +/// # Examples +/// +/// ## `Unnest(c1)`, preserve_nulls: false +/// ```text +/// ┌─────────┐ ┌─────┐ ┌─────────┐ ┌─────┐ +/// │ {1, 2} │ │ A │ Unnest │ 1 │ │ A │ +/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ +/// │ null │ │ B │ │ 2 │ │ A │ +/// ├─────────┤ ├─────┤ ────────────▶ ├─────────┤ ├─────┤ +/// │ {} │ │ D │ │ 3 │ │ E │ +/// ├─────────┤ ├─────┤ └─────────┘ └─────┘ +/// │ {3} │ │ E │ c1 c2 +/// └─────────┘ └─────┘ +/// c1 c2 +/// ``` +/// +/// ## `Unnest(c1)`, preserve_nulls: true /// ```text -/// ┌─────────┐ ┌─────┐ ┌─────────┐ ┌─────┐ -/// │ {1, 2} │ │ A │ Unnest │ 1 │ │ A │ -/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ -/// │ null │ │ B │ │ 2 │ │ A │ -/// ├─────────┤ ├─────┤ ────────────▶ ├─────────┤ ├─────┤ -/// │ {} │ │ D │ │ null │ │ B │ -/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ -/// │ {3} │ │ E │ │ null │ │ D │ -/// └─────────┘ └─────┘ ├─────────┤ ├─────┤ -/// c1 c2 │ 3 │ │ E │ -/// └─────────┘ └─────┘ -/// c1 c2 +/// ┌─────────┐ ┌─────┐ ┌─────────┐ ┌─────┐ +/// │ {1, 2} │ │ A │ Unnest │ 1 │ │ A │ +/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ +/// │ null │ │ B │ │ 2 │ │ A │ +/// ├─────────┤ ├─────┤ ────────────▶ ├─────────┤ ├─────┤ +/// │ {} │ │ D │ │ null │ │ B │ +/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ +/// │ {3} │ │ E │ │ 3 │ │ E │ +/// └─────────┘ └─────┘ └─────────┘ └─────┘ +/// c1 c2 c1 c2 /// ``` #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Unnest { From 2b1d619bdc19ae1d1f31322e564ae629f0248dcb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 25 Jul 2023 09:39:04 -0400 Subject: [PATCH 3/4] Begin working on test --- datafusion/core/tests/dataframe/mod.rs | 52 ++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 9b47880624a09..cc036d6381591 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1044,6 +1044,30 @@ async fn unnest_columns() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn unnest_column_nulls() -> Result<()> { + let df = table_with_lists_and_nulls().await?; + let results = df.clone().collect().await?; + let expected = vec![ + "+----------+------------------------------------------------+--------------------+", + "+----------+------------------------------------------------+--------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + // Unnest ignoring nulls + let results = df + .unnest_column("points")? + .collect().await?; + let expected = vec![ + "+----------+------------------------------------------------+--------------------+", + "+----------+------------------------------------------------+--------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + #[tokio::test] async fn unnest_fixed_list() -> Result<()> { let mut shape_id_builder = UInt32Builder::new(); @@ -1359,6 +1383,34 @@ async fn table_with_nested_types(n: usize) -> Result { ctx.table("shapes").await } + +/// Create a data frame that contains nested types. +/// +/// Create a data frame with a list +/// - list if integers +/// - A list of tags. +async fn table_with_lists_and_nulls() -> Result { + + let mut list_builder = ListBuilder::new(UInt32Builder::new()); + let mut id_builder = StringBuilder::new(); + + + id_builder.append_value("A"); + list_builder.values().append_value(1); + list_builder.values().append_value(2); + list_builder.append(true); + + let batch = RecordBatch::try_from_iter(vec![ + ("list", Arc::new(list_builder.finish()) as ArrayRef), + ("id", Arc::new(id_builder.finish()) as ArrayRef), + ])?; + + let ctx = SessionContext::new(); + ctx.register_batch("shapes", batch)?; + ctx.table("shapes").await +} + + pub async fn register_alltypes_tiny_pages_parquet(ctx: &SessionContext) -> Result<()> { let testdata = parquet_test_data(); ctx.register_parquet( From e81752e12a5af8b8925ae4790cd3fff47921ad67 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 25 Jul 2023 10:08:31 -0400 Subject: [PATCH 4/4] Make preserving nulls optional --- datafusion/core/src/dataframe.rs | 9 +- datafusion/core/src/physical_plan/unnest.rs | 11 +- datafusion/core/src/physical_planner.rs | 4 +- datafusion/core/tests/dataframe/mod.rs | 106 ++++++++++++++------ datafusion/expr/src/logical_plan/builder.rs | 29 +++++- datafusion/expr/src/logical_plan/plan.rs | 10 +- datafusion/expr/src/utils.rs | 8 +- 7 files changed, 131 insertions(+), 46 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 1d92c392981ce..fc6ad707f7dbc 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -149,8 +149,9 @@ impl DataFrame { Ok(DataFrame::new(self.session_state, project_plan)) } - /// Expand each list element of a column to multiple rows. See - /// [Unnest] for more details. + /// Expand each list element of a column to multiple rows, + /// optionally preserving nulls. See [Unnest] for examples and + /// more details. /// /// ``` /// # use datafusion::prelude::*; @@ -164,9 +165,9 @@ impl DataFrame { /// # } /// ``` /// [Unnest]: crate::logical_expr::Unnest - pub fn unnest_column(self, column: &str) -> Result { + pub fn unnest_column(self, column: &str, preserve_nulls: bool) -> Result { let plan = LogicalPlanBuilder::from(self.plan) - .unnest_column(column)? + .unnest_column(column, preserve_nulls)? .build()?; Ok(DataFrame::new(self.session_state, plan)) } diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs index 523d394dcb2bb..b867701fe740c 100644 --- a/datafusion/core/src/physical_plan/unnest.rs +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -55,15 +55,23 @@ pub struct UnnestExec { schema: SchemaRef, /// The unnest column column: Column, + /// Should input nulls be preserved + preserve_nulls: bool, } impl UnnestExec { /// Create a new [UnnestExec]. - pub fn new(input: Arc, column: Column, schema: SchemaRef) -> Self { + pub fn new( + input: Arc, + column: Column, + schema: SchemaRef, + preserve_nulls: bool, + ) -> Self { UnnestExec { input, schema, column, + preserve_nulls, } } } @@ -110,6 +118,7 @@ impl ExecutionPlan for UnnestExec { children[0].clone(), self.column.clone(), self.schema.clone(), + self.preserve_nulls, ))) } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 913deebddfdbe..d677ca37cf131 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1125,12 +1125,12 @@ impl DefaultPhysicalPlanner { Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch))) } - LogicalPlan::Unnest(Unnest { input, column, schema }) => { + LogicalPlan::Unnest(Unnest { input, column, schema, preserve_nulls }) => { let input = self.create_initial_plan(input, session_state).await?; let column_exec = schema.index_of_column(column) .map(|idx| Column::new(&column.name, idx))?; let schema = SchemaRef::new(schema.as_ref().to_owned().into()); - Ok(Arc::new(UnnestExec::new(input, column_exec, schema))) + Ok(Arc::new(UnnestExec::new(input, column_exec, schema, *preserve_nulls))) } LogicalPlan::Ddl(ddl) => { // There is no default plan for DDl statements -- diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index cc036d6381591..48c2c3deac32f 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -942,6 +942,7 @@ async fn right_anti_filter_push_down() -> Result<()> { #[tokio::test] async fn unnest_columns() -> Result<()> { + let preserve_nulls = true; const NUM_ROWS: usize = 4; let df = table_with_nested_types(NUM_ROWS).await?; let results = df.collect().await?; @@ -959,7 +960,7 @@ async fn unnest_columns() -> Result<()> { // Unnest tags let df = table_with_nested_types(NUM_ROWS).await?; - let results = df.unnest_column("tags")?.collect().await?; + let results = df.unnest_column("tags", preserve_nulls)?.collect().await?; let expected = vec![ "+----------+------------------------------------------------+------+", "| shape_id | points | tags |", @@ -977,12 +978,15 @@ async fn unnest_columns() -> Result<()> { // Test aggregate results for tags. let df = table_with_nested_types(NUM_ROWS).await?; - let count = df.unnest_column("tags")?.count().await?; + let count = df.unnest_column("tags", preserve_nulls)?.count().await?; assert_eq!(count, results.iter().map(|r| r.num_rows()).sum::()); // Unnest points let df = table_with_nested_types(NUM_ROWS).await?; - let results = df.unnest_column("points")?.collect().await?; + let results = df + .unnest_column("points", preserve_nulls)? + .collect() + .await?; let expected = vec![ "+----------+-----------------+--------------------+", "| shape_id | points | tags |", @@ -1001,14 +1005,14 @@ async fn unnest_columns() -> Result<()> { // Test aggregate results for points. let df = table_with_nested_types(NUM_ROWS).await?; - let count = df.unnest_column("points")?.count().await?; + let count = df.unnest_column("points", preserve_nulls)?.count().await?; assert_eq!(count, results.iter().map(|r| r.num_rows()).sum::()); // Unnest both points and tags. let df = table_with_nested_types(NUM_ROWS).await?; let results = df - .unnest_column("points")? - .unnest_column("tags")? + .unnest_column("points", preserve_nulls)? + .unnest_column("tags", preserve_nulls)? .collect() .await?; let expected = vec![ @@ -1035,8 +1039,8 @@ async fn unnest_columns() -> Result<()> { // Test aggregate results for points and tags. let df = table_with_nested_types(NUM_ROWS).await?; let count = df - .unnest_column("points")? - .unnest_column("tags")? + .unnest_column("points", preserve_nulls)? + .unnest_column("tags", preserve_nulls)? .count() .await?; assert_eq!(count, results.iter().map(|r| r.num_rows()).sum::()); @@ -1044,26 +1048,57 @@ async fn unnest_columns() -> Result<()> { Ok(()) } - #[tokio::test] async fn unnest_column_nulls() -> Result<()> { let df = table_with_lists_and_nulls().await?; let results = df.clone().collect().await?; let expected = vec![ - "+----------+------------------------------------------------+--------------------+", - "+----------+------------------------------------------------+--------------------+", + "+--------+----+", + "| list | id |", + "+--------+----+", + "| [1, 2] | A |", + "| | B |", + "| [] | C |", + "| [3] | D |", + "+--------+----+", ]; - assert_batches_sorted_eq!(expected, &results); + assert_batches_eq!(expected, &results); - // Unnest ignoring nulls + // Unnest, preserving nulls (row with B is preserved) + let preserve_nulls = true; let results = df - .unnest_column("points")? - .collect().await?; + .clone() + .unnest_column("list", preserve_nulls)? + .collect() + .await?; let expected = vec![ - "+----------+------------------------------------------------+--------------------+", - "+----------+------------------------------------------------+--------------------+", + "+------+----+", + "| list | id |", + "+------+----+", + "| 1 | A |", + "| 2 | A |", + "| | B |", + "| 3 | D |", + "+------+----+", ]; - assert_batches_sorted_eq!(expected, &results); + assert_batches_eq!(expected, &results); + + // Unnest without preserving + // NOTE this is incorrect, + // see https://github.com/apache/arrow-datafusion/pull/7002 + let preserve_nulls = false; + let results = df.unnest_column("list", preserve_nulls)?.collect().await?; + let expected = vec![ + "+------+----+", + "| list | id |", + "+------+----+", + "| 1 | A |", + "| 2 | A |", + "| | B |", // this row should not be here + "| 3 | D |", + "+------+----+", + ]; + assert_batches_eq!(expected, &results); Ok(()) } @@ -1116,7 +1151,8 @@ async fn unnest_fixed_list() -> Result<()> { ]; assert_batches_sorted_eq!(expected, &results); - let results = df.unnest_column("tags")?.collect().await?; + let preserve_nulls = true; + let results = df.unnest_column("tags", preserve_nulls)?.collect().await?; let expected = vec![ "+----------+-------+", "| shape_id | tags |", @@ -1180,7 +1216,8 @@ async fn unnest_fixed_list_nonull() -> Result<()> { ]; assert_batches_sorted_eq!(expected, &results); - let results = df.unnest_column("tags")?.collect().await?; + let preserve_nulls = true; + let results = df.unnest_column("tags", preserve_nulls)?.collect().await?; let expected = vec![ "+----------+-------+", "| shape_id | tags |", @@ -1223,8 +1260,9 @@ async fn unnest_aggregate_columns() -> Result<()> { assert_batches_sorted_eq!(expected, &results); let df = table_with_nested_types(NUM_ROWS).await?; + let preserve_nulls = true; let results = df - .unnest_column("tags")? + .unnest_column("tags", preserve_nulls)? .aggregate(vec![], vec![count(col("tags"))])? .collect() .await?; @@ -1383,22 +1421,29 @@ async fn table_with_nested_types(n: usize) -> Result { ctx.table("shapes").await } - -/// Create a data frame that contains nested types. -/// -/// Create a data frame with a list -/// - list if integers -/// - A list of tags. +/// A a data frame that a list of integers and string IDs async fn table_with_lists_and_nulls() -> Result { - let mut list_builder = ListBuilder::new(UInt32Builder::new()); let mut id_builder = StringBuilder::new(); - - id_builder.append_value("A"); + // [1, 2], A list_builder.values().append_value(1); list_builder.values().append_value(2); list_builder.append(true); + id_builder.append_value("A"); + + // NULL, B + list_builder.append(false); + id_builder.append_value("B"); + + // [], C + list_builder.append(true); + id_builder.append_value("C"); + + // [3], D + list_builder.values().append_value(3); + list_builder.append(true); + id_builder.append_value("D"); let batch = RecordBatch::try_from_iter(vec![ ("list", Arc::new(list_builder.finish()) as ArrayRef), @@ -1410,7 +1455,6 @@ async fn table_with_lists_and_nulls() -> Result { ctx.table("shapes").await } - pub async fn register_alltypes_tiny_pages_parquet(ctx: &SessionContext) -> Result<()> { let testdata = parquet_test_data(); ctx.register_parquet( diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index cd8940e134639..90b256e35d056 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1021,9 +1021,20 @@ impl LogicalPlanBuilder { }))) } - /// Unnest the given column. - pub fn unnest_column(self, column: impl Into) -> Result { - Ok(Self::from(unnest(self.plan, column.into())?)) + /// Unnest the given column, optionally preserving nulls. See + /// [Unnest] for examples and more details. + /// + /// [Unnest]: crate::logical_expr::Unnest + pub fn unnest_column( + self, + column: impl Into, + preserve_nulls: bool, + ) -> Result { + Ok(Self::from(unnest( + self.plan, + column.into(), + preserve_nulls, + )?)) } } @@ -1366,8 +1377,15 @@ impl TableSource for LogicalTableSource { } } -/// Create an unnest plan. -pub fn unnest(input: LogicalPlan, column: Column) -> Result { +/// Create an unnest plan for the given column, optionally preserving nulls. See +/// [Unnest] for examples and more details. +/// +/// [Unnest]: crate::logical_expr::Unnest +pub fn unnest( + input: LogicalPlan, + column: Column, + preserve_nulls: bool, +) -> Result { let unnest_field = input.schema().field_from_column(&column)?; // Extract the type of the nested field in the list. @@ -1409,6 +1427,7 @@ pub fn unnest(input: LogicalPlan, column: Column) -> Result { input: Arc::new(input), column: unnested_field.qualified_column(), schema, + preserve_nulls, })) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 20a12e6e3c7b4..7470784dc5d4c 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1226,8 +1226,12 @@ impl LogicalPlan { LogicalPlan::DescribeTable(DescribeTable { .. }) => { write!(f, "DescribeTable") } - LogicalPlan::Unnest(Unnest { column, .. }) => { - write!(f, "Unnest: {column}") + LogicalPlan::Unnest(Unnest { + column, + preserve_nulls, + .. + }) => { + write!(f, "Unnest: {column} preserve_nulls: {preserve_nulls}") } } } @@ -1812,6 +1816,8 @@ pub struct Unnest { pub column: Column, /// The output schema, containing the unnested field column. pub schema: DFSchemaRef, + /// Should nulls in the input be preserved? Defaults to false + pub preserve_nulls: bool, } #[cfg(test)] diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 3ddfea5105689..058236c7edc00 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -998,7 +998,12 @@ pub fn from_plan( Ok(plan.clone()) } LogicalPlan::DescribeTable(_) => Ok(plan.clone()), - LogicalPlan::Unnest(Unnest { column, schema, .. }) => { + LogicalPlan::Unnest(Unnest { + column, + schema, + preserve_nulls, + .. + }) => { // Update schema with unnested column type. let input = Arc::new(inputs[0].clone()); let nested_field = input.schema().field_from_column(column)?; @@ -1025,6 +1030,7 @@ pub fn from_plan( input, column: column.clone(), schema, + preserve_nulls: *preserve_nulls, })) } }