From 2a51cd5fa6a638ccaec265a7a414b7c1f41099a7 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 14 Feb 2024 23:23:04 +0800 Subject: [PATCH 01/10] first draft Signed-off-by: jayzhan211 --- datafusion/common/src/scalar/mod.rs | 30 +++++++++++++++++++ .../src/aggregate/array_agg_ordered.rs | 16 +++++++--- .../sqllogictest/test_files/aggregate.slt | 17 +++++++++++ 3 files changed, 59 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 29107ab10e7e8..946af3d6da6c2 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2192,6 +2192,36 @@ impl ScalarValue { Ok(scalars) } + /// convert_array_to_scalar_vec but only convert the first level instead of recursively converting all the levels + pub fn convert_first_level_array_to_scalar_vec(array: &dyn Array) -> Result> { + let mut scalars = Vec::with_capacity(array.len()); + + if array.len() == 0 { + return Ok(vec![]); + } + assert_eq!(array.len(), 1); + + for index in 0..array.len() { + let nested_array = match array.data_type() { + DataType::List(_) => { + array.as_list::().value(index) + } + DataType::LargeList(_) => { + array.as_list::().value(index) + } + _ => { + return _internal_err!("ScalarValue is not a list"); + } + }; + + for index in 0..nested_array.len() { + let sv = ScalarValue::try_from_array(nested_array.as_ref(), index)?; + scalars.push(sv); + } + } + Ok(scalars) + } + // TODO: Support more types after other ScalarValue is wrapped with ArrayRef /// Get raw data (inner array) inside ScalarValue pub fn raw_data(&self) -> Result { diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs index 587f40081c906..78d582784e3b3 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs @@ -208,6 +208,8 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { return Ok(()); } + // println!("values: {:?}", values); + let n_row = values[0].len(); for index in 0..n_row { let row = get_row_at_idx(values, index)?; @@ -222,6 +224,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { if states.is_empty() { return Ok(()); } + println!("states: {:?}", states); // First entry in the state is the aggregation result. Second entry // stores values received for ordering requirement columns for each @@ -246,10 +249,15 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { partition_ordering_values.push(self.ordering_values.clone().into()); // Convert array to Scalars to sort them easily. Convert back to array at evaluation. - let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; - for v in array_agg_res.into_iter() { - partition_values.push(v.into()); - } + // let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; + let array_agg_res = ScalarValue::convert_first_level_array_to_scalar_vec(array_agg_values)?; + println!("array_agg_res: {:?}", array_agg_res); + partition_values.push(array_agg_res.into()); + // // println!("array_agg_res: {:?}", array_agg_res); + // println!("array_v2: {:?}", array_v2); + // for v in array_agg_res.into_iter() { + // partition_values.push(v.into()); + // } let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index f50134e63509d..9de7cfcd16ff0 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -139,6 +139,23 @@ AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)] --------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], has_header=true +# test array_agg_order with list data type +statement ok +CREATE TABLE table2 AS VALUES + ('w', 1, [1,2,3], 10), + ('b', 2, [4,5,6], 20), + ('b', 2, [4,5,6], 20) +; + +query T? +select column1, array_agg(column3 order by column2, column4) from table2 group by column1; +---- +w [[1, 2, 3]] +b [[4, 5, 6], [4, 5, 6]] + +statement ok +drop table table2; + statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1 SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100 From 2b645b6f68a971d37c4f6bbf227d72ad6bb09c04 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 15 Feb 2024 09:54:31 +0800 Subject: [PATCH 02/10] fix convert_first_level_array_to_scalar_vec Signed-off-by: jayzhan211 --- datafusion/common/src/scalar/mod.rs | 32 ++++++------------- .../src/aggregate/array_agg_ordered.rs | 17 +++------- .../sqllogictest/test_files/aggregate.slt | 12 +++---- 3 files changed, 21 insertions(+), 40 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 946af3d6da6c2..008163605f93a 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2193,32 +2193,20 @@ impl ScalarValue { } /// convert_array_to_scalar_vec but only convert the first level instead of recursively converting all the levels - pub fn convert_first_level_array_to_scalar_vec(array: &dyn Array) -> Result> { + /// List(List(1, 2, 3)) + pub fn convert_first_level_array_to_scalar_vec( + array: &dyn Array, + ) -> Result>> { let mut scalars = Vec::with_capacity(array.len()); - if array.len() == 0 { - return Ok(vec![]); - } - assert_eq!(array.len(), 1); - for index in 0..array.len() { - let nested_array = match array.data_type() { - DataType::List(_) => { - array.as_list::().value(index) - } - DataType::LargeList(_) => { - array.as_list::().value(index) - } - _ => { - return _internal_err!("ScalarValue is not a list"); - } - }; - - for index in 0..nested_array.len() { - let sv = ScalarValue::try_from_array(nested_array.as_ref(), index)?; - scalars.push(sv); - } + let nested_array = array.as_list::().value(index); + let scalar_values = (0..nested_array.len()) + .map(|i| ScalarValue::try_from_array(&nested_array, i)) + .collect::>>()?; + scalars.push(scalar_values); } + Ok(scalars) } diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs index 78d582784e3b3..777bc588d7fd1 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs @@ -208,8 +208,6 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { return Ok(()); } - // println!("values: {:?}", values); - let n_row = values[0].len(); for index in 0..n_row { let row = get_row_at_idx(values, index)?; @@ -224,7 +222,6 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { if states.is_empty() { return Ok(()); } - println!("states: {:?}", states); // First entry in the state is the aggregation result. Second entry // stores values received for ordering requirement columns for each @@ -249,15 +246,11 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { partition_ordering_values.push(self.ordering_values.clone().into()); // Convert array to Scalars to sort them easily. Convert back to array at evaluation. - // let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; - let array_agg_res = ScalarValue::convert_first_level_array_to_scalar_vec(array_agg_values)?; - println!("array_agg_res: {:?}", array_agg_res); - partition_values.push(array_agg_res.into()); - // // println!("array_agg_res: {:?}", array_agg_res); - // println!("array_v2: {:?}", array_v2); - // for v in array_agg_res.into_iter() { - // partition_values.push(v.into()); - // } + let array_agg_res = + ScalarValue::convert_first_level_array_to_scalar_vec(array_agg_values)?; + for v in array_agg_res.into_iter() { + partition_values.push(v.into()); + } let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 9de7cfcd16ff0..024fa66f79baf 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -141,20 +141,20 @@ AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)] # test array_agg_order with list data type statement ok -CREATE TABLE table2 AS VALUES +CREATE TABLE agg_order_list_table AS VALUES ('w', 1, [1,2,3], 10), ('b', 2, [4,5,6], 20), - ('b', 2, [4,5,6], 20) + ('b', 1, [7,8,9], 30) ; query T? -select column1, array_agg(column3 order by column2, column4) from table2 group by column1; +select column1, array_agg(column3 order by column2, column4) from agg_order_list_table group by column1; ---- w [[1, 2, 3]] -b [[4, 5, 6], [4, 5, 6]] +b [[7, 8, 9], [4, 5, 6]] statement ok -drop table table2; +drop table agg_order_list_table; statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1 SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100 @@ -3215,4 +3215,4 @@ SELECT 0 AS "t.a" FROM t HAVING MAX(t.a) = 0; ---- statement ok -DROP TABLE t; \ No newline at end of file +DROP TABLE t; From 9ab33b2b469a73f66755d1f27f6b6b4b81f0cbd0 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 15 Feb 2024 10:21:17 +0800 Subject: [PATCH 03/10] add doc Signed-off-by: jayzhan211 --- datafusion/common/src/scalar/mod.rs | 37 ++++++++++++++++++- .../sqllogictest/test_files/aggregate.slt | 10 ++--- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 008163605f93a..e8235758299eb 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2192,8 +2192,41 @@ impl ScalarValue { Ok(scalars) } - /// convert_array_to_scalar_vec but only convert the first level instead of recursively converting all the levels - /// List(List(1, 2, 3)) + /// convert_array_to_scalar_vec but only convert the first level instead of recursively converting + /// all the levels, so list remains as ScalarValue::List + /// + /// Example + /// ``` + /// use datafusion_common::ScalarValue; + /// use arrow::array::ListArray; + /// use arrow::datatypes::{DataType, Int32Type}; + /// use datafusion_common::utils::array_into_list_array; + /// use std::sync::Arc; + /// + /// let list_arr = ListArray::from_iter_primitive::(vec![ + /// Some(vec![Some(1), Some(2), Some(3)]), + /// Some(vec![Some(4), Some(5)]) + /// ]); + /// let list_arr = array_into_list_array(Arc::new(list_arr)); + /// + /// let scalar_vec = ScalarValue::convert_first_level_array_to_scalar_vec(&list_arr).unwrap(); + /// + /// let l1 = ListArray::from_iter_primitive::(vec![ + /// Some(vec![Some(1), Some(2), Some(3)]), + /// ]); + /// let l2 = ListArray::from_iter_primitive::(vec![ + /// Some(vec![Some(4), Some(5)]), + /// ]); + /// + /// let expected = vec![ + /// vec![ + /// ScalarValue::List(Arc::new(l1)), + /// ScalarValue::List(Arc::new(l2)), + /// ], + /// ]; + /// + /// assert_eq!(scalar_vec, expected); + /// ``` pub fn convert_first_level_array_to_scalar_vec( array: &dyn Array, ) -> Result>> { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 024fa66f79baf..b30d81814020e 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -141,20 +141,20 @@ AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)] # test array_agg_order with list data type statement ok -CREATE TABLE agg_order_list_table AS VALUES +CREATE TABLE array_agg_order_list_table AS VALUES ('w', 1, [1,2,3], 10), ('b', 2, [4,5,6], 20), ('b', 1, [7,8,9], 30) ; -query T? -select column1, array_agg(column3 order by column2, column4) from agg_order_list_table group by column1; +query T? rowsort +select column1, array_agg(column3 order by column2, column4) from array_agg_order_list_table group by column1; ---- -w [[1, 2, 3]] b [[7, 8, 9], [4, 5, 6]] +w [[1, 2, 3]] statement ok -drop table agg_order_list_table; +drop table array_agg_order_list_table; statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1 SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100 From e66ee537a5c5934e439abb83ae9533b0d8e18495 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 15 Feb 2024 11:01:05 +0800 Subject: [PATCH 04/10] fix nth Signed-off-by: jayzhan211 --- .../physical-expr/src/aggregate/nth_value.rs | 4 ++-- .../sqllogictest/test_files/aggregate.slt | 20 ++++++++++++++++--- .../test_files/aggregates_topk.slt | 3 +++ 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/nth_value.rs b/datafusion/physical-expr/src/aggregate/nth_value.rs index 5d721e3a5e87c..50a1d309e414f 100644 --- a/datafusion/physical-expr/src/aggregate/nth_value.rs +++ b/datafusion/physical-expr/src/aggregate/nth_value.rs @@ -236,7 +236,7 @@ impl Accumulator for NthValueAccumulator { let n_required = self.n.unsigned_abs() as usize; if self.ordering_req.is_empty() { let array_agg_res = - ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; + ScalarValue::convert_first_level_array_to_scalar_vec(array_agg_values)?; for v in array_agg_res.into_iter() { self.values.extend(v); if self.values.len() > n_required { @@ -260,7 +260,7 @@ impl Accumulator for NthValueAccumulator { partition_ordering_values.push(self.ordering_values.clone()); let array_agg_res = - ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; + ScalarValue::convert_first_level_array_to_scalar_vec(array_agg_values)?; for v in array_agg_res.into_iter() { partition_values.push(v.into()); diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index b30d81814020e..92eed13e776e4 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -142,16 +142,30 @@ AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)] # test array_agg_order with list data type statement ok CREATE TABLE array_agg_order_list_table AS VALUES - ('w', 1, [1,2,3], 10), + ('w', 2, [1,2,3], 10), + ('w', 1, [9,5,2], 20), + ('w', 1, [3,2,5], 30), ('b', 2, [4,5,6], 20), ('b', 1, [7,8,9], 30) ; query T? rowsort -select column1, array_agg(column3 order by column2, column4) from array_agg_order_list_table group by column1; +select column1, array_agg(column3 order by column2, column4 desc) from array_agg_order_list_table group by column1; ---- b [[7, 8, 9], [4, 5, 6]] -w [[1, 2, 3]] +w [[3, 2, 5], [9, 5, 2], [1, 2, 3]] + +query T?? rowsort +select column1, first_value(column3 order by column2, column4 desc), last_value(column3 order by column2, column4 desc) from array_agg_order_list_table group by column1; +---- +b [7, 8, 9] [4, 5, 6] +w [3, 2, 5] [1, 2, 3] + +query T? rowsort +select column1, nth_value(column3, 2 order by column2, column4 desc) from array_agg_order_list_table group by column1; +---- +b [4, 5, 6] +w [9, 5, 2] statement ok drop table array_agg_order_list_table; diff --git a/datafusion/sqllogictest/test_files/aggregates_topk.slt b/datafusion/sqllogictest/test_files/aggregates_topk.slt index bd8f00e041589..b6a2d5976a2ad 100644 --- a/datafusion/sqllogictest/test_files/aggregates_topk.slt +++ b/datafusion/sqllogictest/test_files/aggregates_topk.slt @@ -212,3 +212,6 @@ b 0 -2 a -1 -1 NULL 0 0 c 1 2 + +statement ok +drop table traces; \ No newline at end of file From 192c452aa327133301e9215d3048d87d26fc07c2 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 15 Feb 2024 15:33:35 +0800 Subject: [PATCH 05/10] support distinct Signed-off-by: jayzhan211 --- .../src/aggregate/array_agg_distinct.rs | 169 ++++++++---------- .../sqllogictest/test_files/aggregate.slt | 24 +++ .../test_files/aggregates_topk.slt | 2 +- 3 files changed, 104 insertions(+), 91 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index 2e9df477d5164..408c2f376a9b3 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field}; +use arrow_array::cast::AsArray; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; @@ -138,9 +139,10 @@ impl Accumulator for DistinctArrayAggAccumulator { assert_eq!(values.len(), 1, "batch input should only include 1 column!"); let array = &values[0]; - let scalar_vec = ScalarValue::convert_array_to_scalar_vec(array)?; - for scalars in scalar_vec { - self.values.extend(scalars); + + for i in 0..array.len() { + let scalar = ScalarValue::try_from_array(&array, i)?; + self.values.insert(scalar); } Ok(()) @@ -151,7 +153,11 @@ impl Accumulator for DistinctArrayAggAccumulator { return Ok(()); } - self.update_batch(states) + let array = &states[0]; + + // Unwrap outer ListArray then do update batch + let inner_array = array.as_list::().value(0); + self.update_batch(&[inner_array]) } fn evaluate(&mut self) -> Result { @@ -184,44 +190,53 @@ mod tests { use datafusion_common::utils::array_into_list_array; use datafusion_common::{internal_err, DataFusionError}; - // arrow::compute::sort cann't sort ListArray directly, so we need to sort the inner primitive array and wrap it back into ListArray. - fn sort_list_inner(arr: ScalarValue) -> ScalarValue { - let arr = match arr { - ScalarValue::List(arr) => arr.value(0), - _ => { - panic!("Expected ScalarValue::List, got {:?}", arr) - } - }; + // arrow::compute::sort can't sort nested ListArray directly, so we compare the scalar values pair-wise. + fn compare_list_contents( + expected: Vec, + actual: ScalarValue, + ) -> Result<()> { + let array = actual.to_array()?; + let list_array = array.as_list::(); + let inner_array = list_array.value(0); + let mut actual_scalars = vec![]; + for index in 0..inner_array.len() { + let sv = ScalarValue::try_from_array(&inner_array, index)?; + actual_scalars.push(sv); + } - let arr = arrow::compute::sort(&arr, None).unwrap(); - let list_arr = array_into_list_array(arr); - ScalarValue::List(Arc::new(list_arr)) - } + if actual_scalars.len() != expected.len() { + return internal_err!( + "Expected and actual list lengths differ: expected={}, actual={}", + expected.len(), + actual_scalars.len() + ); + } - fn compare_list_contents(expected: ScalarValue, actual: ScalarValue) -> Result<()> { - let actual = sort_list_inner(actual); - - match (&expected, &actual) { - (ScalarValue::List(arr1), ScalarValue::List(arr2)) => { - if arr1.eq(arr2) { - Ok(()) - } else { - internal_err!( - "Actual value {:?} not found in expected values {:?}", - actual, - expected - ) + let mut seen = vec![false; expected.len()]; + for v in expected { + let mut found = false; + for (i, sv) in actual_scalars.iter().enumerate() { + if sv == &v { + seen[i] = true; + found = true; + break; } } - _ => { - internal_err!("Expected scalar lists as inputs") + if !found { + return internal_err!( + "Expected value {:?} not found in actual values {:?}", + v, + actual_scalars + ); } } + + Ok(()) } fn check_distinct_array_agg( input: ArrayRef, - expected: ScalarValue, + expected: Vec, datatype: DataType, ) -> Result<()> { let schema = Schema::new(vec![Field::new("a", datatype.clone(), false)]); @@ -234,14 +249,13 @@ mod tests { true, )); let actual = aggregate(&batch, agg)?; - compare_list_contents(expected, actual) } fn check_merge_distinct_array_agg( input1: ArrayRef, input2: ArrayRef, - expected: ScalarValue, + expected: Vec, datatype: DataType, ) -> Result<()> { let schema = Schema::new(vec![Field::new("a", datatype.clone(), false)]); @@ -262,23 +276,20 @@ mod tests { accum1.merge_batch(&[array])?; let actual = accum1.evaluate()?; - compare_list_contents(expected, actual) } #[test] fn distinct_array_agg_i32() -> Result<()> { let col: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 4, 5, 2])); - let expected = - ScalarValue::List(Arc::new( - ListArray::from_iter_primitive::(vec![Some(vec![ - Some(1), - Some(2), - Some(4), - Some(5), - Some(7), - ])]), - )); + + let expected = vec![ + ScalarValue::Int32(Some(1)), + ScalarValue::Int32(Some(2)), + ScalarValue::Int32(Some(4)), + ScalarValue::Int32(Some(5)), + ScalarValue::Int32(Some(7)), + ]; check_distinct_array_agg(col, expected, DataType::Int32) } @@ -288,18 +299,15 @@ mod tests { let col1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 4, 5, 2])); let col2: ArrayRef = Arc::new(Int32Array::from(vec![1, 3, 7, 8, 4])); - let expected = - ScalarValue::List(Arc::new( - ListArray::from_iter_primitive::(vec![Some(vec![ - Some(1), - Some(2), - Some(3), - Some(4), - Some(5), - Some(7), - Some(8), - ])]), - )); + let expected = vec![ + ScalarValue::Int32(Some(1)), + ScalarValue::Int32(Some(2)), + ScalarValue::Int32(Some(3)), + ScalarValue::Int32(Some(4)), + ScalarValue::Int32(Some(5)), + ScalarValue::Int32(Some(7)), + ScalarValue::Int32(Some(8)), + ]; check_merge_distinct_array_agg(col1, col2, expected, DataType::Int32) } @@ -351,23 +359,16 @@ mod tests { let l2 = ScalarValue::List(Arc::new(l2)); let l3 = ScalarValue::List(Arc::new(l3)); - // Duplicate l1 in the input array and check that it is deduped in the output. - let array = ScalarValue::iter_to_array(vec![l1.clone(), l2, l3, l1]).unwrap(); - - let expected = - ScalarValue::List(Arc::new( - ListArray::from_iter_primitive::(vec![Some(vec![ - Some(1), - Some(2), - Some(3), - Some(4), - Some(5), - Some(6), - Some(7), - Some(8), - Some(9), - ])]), - )); + // Duplicate l1 and l3 in the input array and check that it is deduped in the output. + let array = ScalarValue::iter_to_array(vec![ + l1.clone(), + l2.clone(), + l3.clone(), + l3.clone(), + l1.clone(), + ]) + .unwrap(); + let expected = vec![l1, l2, l3]; check_distinct_array_agg( array, @@ -426,22 +427,10 @@ mod tests { let l3 = ScalarValue::List(Arc::new(l3)); // Duplicate l1 in the input array and check that it is deduped in the output. - let input1 = ScalarValue::iter_to_array(vec![l1.clone(), l2]).unwrap(); - let input2 = ScalarValue::iter_to_array(vec![l1, l3]).unwrap(); - - let expected = - ScalarValue::List(Arc::new( - ListArray::from_iter_primitive::(vec![Some(vec![ - Some(1), - Some(2), - Some(3), - Some(4), - Some(5), - Some(6), - Some(7), - Some(8), - ])]), - )); + let input1 = ScalarValue::iter_to_array(vec![l1.clone(), l2.clone()]).unwrap(); + let input2 = ScalarValue::iter_to_array(vec![l1.clone(), l3.clone()]).unwrap(); + + let expected = vec![l1, l2, l3]; check_merge_distinct_array_agg(input1, input2, expected, DataType::Int32) } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 92eed13e776e4..36ddd2d9525a7 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -170,6 +170,30 @@ w [9, 5, 2] statement ok drop table array_agg_order_list_table; +# test array_agg_distinct with list data type +statement ok +CREATE TABLE array_agg_distinct_list_table AS VALUES + ('w', [0,1]), + ('w', [0,1]), + ('w', [1,0]), + ('b', [1,0]), + ('b', [1,0]), + ('b', [1,0]), + ('b', [0,1]) +; + +# Apply array_sort to have determinisitic result, higher dimension nested array also works but not for array sort, +# so they are covered in `datafusion/physical-expr/src/aggregate/array_agg_distinct.rs` +query ?? +select array_sort(c1), array_sort(c2) from ( + select array_agg(distinct column1) as c1, array_agg(distinct column2) as c2 from array_agg_distinct_list_table +); +---- +[b, w] [[0, 1], [1, 0]] + +statement ok +drop table array_agg_distinct_list_table; + statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1 SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100 diff --git a/datafusion/sqllogictest/test_files/aggregates_topk.slt b/datafusion/sqllogictest/test_files/aggregates_topk.slt index b6a2d5976a2ad..3f139ede8c775 100644 --- a/datafusion/sqllogictest/test_files/aggregates_topk.slt +++ b/datafusion/sqllogictest/test_files/aggregates_topk.slt @@ -214,4 +214,4 @@ NULL 0 0 c 1 2 statement ok -drop table traces; \ No newline at end of file +drop table traces; From e9053d56cb3621cd3ec162214bc894419c090b09 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 15 Feb 2024 15:46:24 +0800 Subject: [PATCH 06/10] cleanup Signed-off-by: jayzhan211 --- .../physical-expr/src/aggregate/array_agg_distinct.rs | 1 - .../physical-expr/src/aggregate/count_distinct/mod.rs | 10 +++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index 408c2f376a9b3..e47f7d14a67ab 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -187,7 +187,6 @@ mod tests { use arrow_array::Array; use arrow_array::ListArray; use arrow_buffer::OffsetBuffer; - use datafusion_common::utils::array_into_list_array; use datafusion_common::{internal_err, DataFusionError}; // arrow::compute::sort can't sort nested ListArray directly, so we compare the scalar values pair-wise. diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs index 8baea511c7765..0000d48e6918e 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use ahash::RandomState; use arrow::array::{Array, ArrayRef}; use arrow::datatypes::{DataType, Field, TimeUnit}; +use arrow_array::cast::AsArray; use arrow_array::types::{ Date32Type, Date64Type, Decimal128Type, Decimal256Type, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, Time32MillisecondType, @@ -241,11 +242,10 @@ impl Accumulator for DistinctCountAccumulator { return Ok(()); } assert_eq!(states.len(), 1, "array_agg states must be singleton!"); - let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&states[0])?; - for scalars in scalar_vec.into_iter() { - self.values.extend(scalars); - } - Ok(()) + let array = &states[0]; + let list_array = array.as_list::(); + let inner_array = list_array.value(0); + self.update_batch(&[inner_array]) } fn evaluate(&mut self) -> Result { From bf815b7d54d7f854ff541dd80bd865f4666e959a Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 15 Feb 2024 16:19:59 +0800 Subject: [PATCH 07/10] rm convert_first_level_array_to_scalar_vec Signed-off-by: jayzhan211 --- datafusion/common/src/scalar/mod.rs | 59 ++++--------------- datafusion/core/tests/sql/aggregates.rs | 2 +- .../src/aggregate/array_agg_ordered.rs | 3 +- .../physical-expr/src/aggregate/nth_value.rs | 4 +- 4 files changed, 14 insertions(+), 54 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index e8235758299eb..cd9b0411f5b36 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -52,7 +52,6 @@ use arrow::{ UInt16Type, UInt32Type, UInt64Type, UInt8Type, DECIMAL128_MAX_PRECISION, }, }; -use arrow_array::cast::as_list_array; use arrow_array::{ArrowNativeTypeOp, Scalar}; pub use struct_builder::ScalarStructBuilder; @@ -2143,65 +2142,29 @@ impl ScalarValue { /// use datafusion_common::ScalarValue; /// use arrow::array::ListArray; /// use arrow::datatypes::{DataType, Int32Type}; + /// use datafusion_common::utils::array_into_list_array; + /// use std::sync::Arc; /// /// let list_arr = ListArray::from_iter_primitive::(vec![ /// Some(vec![Some(1), Some(2), Some(3)]), - /// None, /// Some(vec![Some(4), Some(5)]) /// ]); /// /// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap(); /// /// let expected = vec![ - /// vec![ + /// vec![ /// ScalarValue::Int32(Some(1)), /// ScalarValue::Int32(Some(2)), /// ScalarValue::Int32(Some(3)), - /// ], - /// vec![], - /// vec![ScalarValue::Int32(Some(4)), ScalarValue::Int32(Some(5))] + /// ], + /// vec![ + /// ScalarValue::Int32(Some(4)), + /// ScalarValue::Int32(Some(5)), + /// ], /// ]; /// /// assert_eq!(scalar_vec, expected); - /// ``` - pub fn convert_array_to_scalar_vec(array: &dyn Array) -> Result>> { - let mut scalars = Vec::with_capacity(array.len()); - - for index in 0..array.len() { - let scalar_values = match array.data_type() { - DataType::List(_) => { - let list_array = as_list_array(array); - match list_array.is_null(index) { - true => Vec::new(), - false => { - let nested_array = list_array.value(index); - ScalarValue::convert_array_to_scalar_vec(&nested_array)? - .into_iter() - .flatten() - .collect() - } - } - } - _ => { - let scalar = ScalarValue::try_from_array(array, index)?; - vec![scalar] - } - }; - scalars.push(scalar_values); - } - Ok(scalars) - } - - /// convert_array_to_scalar_vec but only convert the first level instead of recursively converting - /// all the levels, so list remains as ScalarValue::List - /// - /// Example - /// ``` - /// use datafusion_common::ScalarValue; - /// use arrow::array::ListArray; - /// use arrow::datatypes::{DataType, Int32Type}; - /// use datafusion_common::utils::array_into_list_array; - /// use std::sync::Arc; /// /// let list_arr = ListArray::from_iter_primitive::(vec![ /// Some(vec![Some(1), Some(2), Some(3)]), @@ -2209,7 +2172,7 @@ impl ScalarValue { /// ]); /// let list_arr = array_into_list_array(Arc::new(list_arr)); /// - /// let scalar_vec = ScalarValue::convert_first_level_array_to_scalar_vec(&list_arr).unwrap(); + /// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap(); /// /// let l1 = ListArray::from_iter_primitive::(vec![ /// Some(vec![Some(1), Some(2), Some(3)]), @@ -2227,9 +2190,7 @@ impl ScalarValue { /// /// assert_eq!(scalar_vec, expected); /// ``` - pub fn convert_first_level_array_to_scalar_vec( - array: &dyn Array, - ) -> Result>> { + pub fn convert_array_to_scalar_vec(array: &dyn Array) -> Result>> { let mut scalars = Vec::with_capacity(array.len()); for index in 0..array.len() { diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index af6d0d5f4e245..84b791a3de05b 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -44,9 +44,9 @@ async fn csv_query_array_agg_distinct() -> Result<()> { // We should have 1 row containing a list let column = actual[0].column(0); assert_eq!(column.len(), 1); - let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&column)?; let mut scalars = scalar_vec[0].clone(); + // workaround lack of Ord of ScalarValue let cmp = |a: &ScalarValue, b: &ScalarValue| { a.partial_cmp(b).expect("Can compare ScalarValues") diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs index 777bc588d7fd1..587f40081c906 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs @@ -246,8 +246,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { partition_ordering_values.push(self.ordering_values.clone().into()); // Convert array to Scalars to sort them easily. Convert back to array at evaluation. - let array_agg_res = - ScalarValue::convert_first_level_array_to_scalar_vec(array_agg_values)?; + let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; for v in array_agg_res.into_iter() { partition_values.push(v.into()); } diff --git a/datafusion/physical-expr/src/aggregate/nth_value.rs b/datafusion/physical-expr/src/aggregate/nth_value.rs index 50a1d309e414f..5d721e3a5e87c 100644 --- a/datafusion/physical-expr/src/aggregate/nth_value.rs +++ b/datafusion/physical-expr/src/aggregate/nth_value.rs @@ -236,7 +236,7 @@ impl Accumulator for NthValueAccumulator { let n_required = self.n.unsigned_abs() as usize; if self.ordering_req.is_empty() { let array_agg_res = - ScalarValue::convert_first_level_array_to_scalar_vec(array_agg_values)?; + ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; for v in array_agg_res.into_iter() { self.values.extend(v); if self.values.len() > n_required { @@ -260,7 +260,7 @@ impl Accumulator for NthValueAccumulator { partition_ordering_values.push(self.ordering_values.clone()); let array_agg_res = - ScalarValue::convert_first_level_array_to_scalar_vec(array_agg_values)?; + ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; for v in array_agg_res.into_iter() { partition_values.push(v.into()); From e6087a02b638b6d65d036070048d82705f4c8c0d Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 16 Feb 2024 22:57:15 +0800 Subject: [PATCH 08/10] add doc and assertion Signed-off-by: jayzhan211 --- datafusion/common/src/scalar/mod.rs | 9 +++++++++ .../physical-expr/src/aggregate/array_agg_distinct.rs | 1 + 2 files changed, 10 insertions(+) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index cd9b0411f5b36..1bdf7f834412a 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2145,11 +2145,15 @@ impl ScalarValue { /// use datafusion_common::utils::array_into_list_array; /// use std::sync::Arc; /// + /// # Example 1: Array + /// + /// // Equivalent to [[1,2,3], [4,5]] /// let list_arr = ListArray::from_iter_primitive::(vec![ /// Some(vec![Some(1), Some(2), Some(3)]), /// Some(vec![Some(4), Some(5)]) /// ]); /// + /// // Convert the array into Scalar Values for each row /// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap(); /// /// let expected = vec![ @@ -2166,12 +2170,17 @@ impl ScalarValue { /// /// assert_eq!(scalar_vec, expected); /// + /// Example 2: Nested array + /// /// let list_arr = ListArray::from_iter_primitive::(vec![ /// Some(vec![Some(1), Some(2), Some(3)]), /// Some(vec![Some(4), Some(5)]) /// ]); + /// + /// // Wrap into another layer of list, we got nested array as [ [[1,2,3], [4,5]] ] /// let list_arr = array_into_list_array(Arc::new(list_arr)); /// + /// // Convert the array into Scalar Values for each row, we got 1D arrays in this example /// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap(); /// /// let l1 = ListArray::from_iter_primitive::(vec![ diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index e47f7d14a67ab..b073b00578a5e 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -155,6 +155,7 @@ impl Accumulator for DistinctArrayAggAccumulator { let array = &states[0]; + assert_eq!(array.len(), 1, "state array should only include 1 row!"); // Unwrap outer ListArray then do update batch let inner_array = array.as_list::().value(0); self.update_batch(&[inner_array]) From 59eecb069f3cc597cf5a539fc820d1cec7fde3d2 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 16 Feb 2024 23:26:37 +0800 Subject: [PATCH 09/10] fix doc Signed-off-by: jayzhan211 --- datafusion/common/src/scalar/mod.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 1bdf7f834412a..d56feb80b18bc 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2137,15 +2137,11 @@ impl ScalarValue { /// Retrieve ScalarValue for each row in `array` /// - /// Example + /// Example 1: Array /// ``` /// use datafusion_common::ScalarValue; /// use arrow::array::ListArray; /// use arrow::datatypes::{DataType, Int32Type}; - /// use datafusion_common::utils::array_into_list_array; - /// use std::sync::Arc; - /// - /// # Example 1: Array /// /// // Equivalent to [[1,2,3], [4,5]] /// let list_arr = ListArray::from_iter_primitive::(vec![ @@ -2169,8 +2165,15 @@ impl ScalarValue { /// ]; /// /// assert_eq!(scalar_vec, expected); + /// ``` /// /// Example 2: Nested array + /// ``` + /// use datafusion_common::ScalarValue; + /// use arrow::array::ListArray; + /// use arrow::datatypes::{DataType, Int32Type}; + /// use datafusion_common::utils::array_into_list_array; + /// use std::sync::Arc; /// /// let list_arr = ListArray::from_iter_primitive::(vec![ /// Some(vec![Some(1), Some(2), Some(3)]), From 89adca85e53d15b40133e4f91b45750be28576b8 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 16 Feb 2024 23:27:48 +0800 Subject: [PATCH 10/10] fix doc Signed-off-by: jayzhan211 --- datafusion/common/src/scalar/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index d56feb80b18bc..5c9b6866ad2fe 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2137,7 +2137,7 @@ impl ScalarValue { /// Retrieve ScalarValue for each row in `array` /// - /// Example 1: Array + /// Example 1: Array (ScalarValue::Int32) /// ``` /// use datafusion_common::ScalarValue; /// use arrow::array::ListArray; @@ -2167,7 +2167,7 @@ impl ScalarValue { /// assert_eq!(scalar_vec, expected); /// ``` /// - /// Example 2: Nested array + /// Example 2: Nested array (ScalarValue::List) /// ``` /// use datafusion_common::ScalarValue; /// use arrow::array::ListArray;