From 0840207349e383de8a788ce4ebef4e64d174d187 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 19 Mar 2025 06:59:16 +0800 Subject: [PATCH 1/8] feat: implement GroupsAccumulator for count(DISTINCT) aggr Signed-off-by: Ruihang Xia --- .../groups_accumulator/accumulate.rs | 8 +- datafusion/functions-aggregate/src/count.rs | 384 +++++++++++++++++- 2 files changed, 383 insertions(+), 9 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index e629e99e1657a..bb1b1bc3387df 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -15,7 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! [`GroupsAccumulator`] helpers: [`NullState`] and [`accumulate_indices`] +//! [`GroupsAccumulator`] helpers: [`NullState`] and [`accumulate_indices`]-like functions. +//! +//! This mod provides various kinds of helper functions to work with [`GroupsAccumulator`], +//! here is a quick summary of the functions provided and their purpose/differences: +//! - [`accumulate`]: Accumulate a single, primitive value per group. +//! - [`accumulate_multiple`]: Accumulate multiple, primitive values per group. +//! - [`accumulate_indices`]: Accumulate indices only (without actual value) per group. //! //! [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 2d995b4a41793..64b02097311ed 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -41,7 +41,7 @@ use arrow::{ }; use arrow::{ - array::{Array, BooleanArray, Int64Array, PrimitiveArray}, + array::{Array, BooleanArray, Int64Array, ListArray, PrimitiveArray}, buffer::BooleanBuffer, }; use datafusion_common::{ @@ -62,7 +62,10 @@ use datafusion_functions_aggregate_common::aggregate::count_distinct::{ use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate_indices; use datafusion_physical_expr_common::binary_map::OutputType; +use datafusion_common::cast::as_list_array; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; +use datafusion_common::utils::SingleRowListArrayBuilder; + make_udaf_expr_and_func!( Count, count, @@ -344,20 +347,21 @@ impl AggregateUDFImpl for Count { } fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { - // groups accumulator only supports `COUNT(c1)`, not + // groups accumulator only supports `COUNT(c1)` or `COUNT(distinct c1)`, not // `COUNT(c1, c2)`, etc - if args.is_distinct { - return false; - } args.exprs.len() == 1 } fn create_groups_accumulator( &self, - _args: AccumulatorArgs, + args: AccumulatorArgs, ) -> Result> { // instantiate specialized accumulator - Ok(Box::new(CountGroupsAccumulator::new())) + if args.is_distinct { + Ok(Box::new(DistinctCountGroupsAccumulator::new())) + } else { + Ok(Box::new(CountGroupsAccumulator::new())) + } } fn reverse_expr(&self) -> ReversedUDAF { @@ -752,10 +756,315 @@ impl Accumulator for DistinctCountAccumulator { } } +/// GroupsAccumulator for COUNT DISTINCT operations +#[derive(Debug, Default)] +pub struct DistinctCountGroupsAccumulator { + /// One HashSet per group to track distinct values + distinct_sets: Vec>, +} + +impl DistinctCountGroupsAccumulator { + pub fn new() -> Self { + Self::default() + } + + fn ensure_sets(&mut self, total_num_groups: usize) { + if self.distinct_sets.len() < total_num_groups { + self.distinct_sets + .resize_with(total_num_groups, HashSet::default); + } + } + + // Helper method to encode sets of distinct values into Arrow arrays + fn encode_sets_to_arrays(&self, start: usize, end: usize) -> Result { + let mut arrays: Vec = Vec::with_capacity(end - start); + + // Create a list array for each group + for group_idx in start..end { + if group_idx < self.distinct_sets.len() { + // Convert the set's values to an array + let values: Vec<_> = + self.distinct_sets[group_idx].iter().cloned().collect(); + let values_array = ScalarValue::iter_to_array(values)?; + + // Wrap as a list array + let list_array = + SingleRowListArrayBuilder::new(values_array).build_list_array(); + arrays.push(Arc::new(list_array) as _); + } else { + // Group doesn't exist, create empty list + let empty_array = ScalarValue::iter_to_array(Vec::::new())?; + let list_array = + SingleRowListArrayBuilder::new(empty_array).build_list_array(); + arrays.push(Arc::new(list_array) as _); + } + } + + // If we have no arrays, create empty list + if arrays.is_empty() { + let empty_array = ScalarValue::iter_to_array(Vec::::new())?; + let list_array = + SingleRowListArrayBuilder::new(empty_array).build_list_array(); + return Ok(Arc::new(list_array)); + } + + // If we only have one array, return it + if arrays.len() == 1 { + return Ok(arrays[0].clone()); + } + + // Otherwise concatenate all arrays - use explicit casting to ensure we have dyn Arrays + let array_refs: Vec<&dyn Array> = arrays.iter().map(|a| a.as_ref()).collect(); + Ok(compute::concat(&array_refs)?) + } +} + +impl GroupsAccumulator for DistinctCountGroupsAccumulator { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); + self.ensure_sets(total_num_groups); + + let array = &values[0]; + + // Use a pattern similar to accumulate_indices to process rows + // that are not null and pass the filter + let nulls = array.logical_nulls(); + + match (nulls.as_ref(), opt_filter) { + (None, None) => { + // No nulls, no filter - process all rows + for (row_idx, &group_idx) in group_indices.iter().enumerate() { + if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { + self.distinct_sets[group_idx].insert(scalar); + } + } + } + (Some(nulls), None) => { + // Has nulls, no filter + for (row_idx, (&group_idx, is_valid)) in + group_indices.iter().zip(nulls.iter()).enumerate() + { + if is_valid { + if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { + self.distinct_sets[group_idx].insert(scalar); + } + } + } + } + (None, Some(filter)) => { + // No nulls, has filter + for (row_idx, (&group_idx, filter_value)) in + group_indices.iter().zip(filter.iter()).enumerate() + { + if let Some(true) = filter_value { + if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { + self.distinct_sets[group_idx].insert(scalar); + } + } + } + } + (Some(nulls), Some(filter)) => { + // Has nulls and filter + let iter = filter + .iter() + .zip(group_indices.iter()) + .zip(nulls.iter()) + .enumerate(); + + for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { + if is_valid && filter_value == Some(true) { + if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { + self.distinct_sets[group_idx].insert(scalar); + } + } + } + } + } + + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + // Convert counts to Int64Array + let counts = match emit_to { + EmitTo::All => { + let counts: Vec = self + .distinct_sets + .iter() + .map(|set| set.len() as i64) + .collect(); + self.distinct_sets.clear(); + counts + } + EmitTo::First(n) => { + let counts: Vec = self + .distinct_sets + .iter() + .take(n) + .map(|set| set.len() as i64) + .collect(); + self.distinct_sets = self.distinct_sets.split_off(n); + counts + } + }; + + // COUNT DISTINCT never returns nulls + Ok(Arc::new(Int64Array::from(counts))) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + _opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!( + values.len(), + 1, + "COUNT DISTINCT merge expects a single state array" + ); + self.ensure_sets(total_num_groups); + + let list_array = as_list_array(&values[0])?; + + // For each group in the incoming batch + for (i, &group_idx) in group_indices.iter().enumerate() { + if i < list_array.len() { + let inner_array = list_array.value(i); + // Add each value to our set for this group + for j in 0..inner_array.len() { + if !inner_array.is_null(j) { + let scalar = ScalarValue::try_from_array(&inner_array, j)?; + self.distinct_sets[group_idx].insert(scalar); + } + } + } + } + + Ok(()) + } + + fn state(&mut self, emit_to: EmitTo) -> Result> { + // Create arrays that hold the distinct values for each group + let arrays = match emit_to { + EmitTo::All => { + let arrays = self.encode_sets_to_arrays(0, self.distinct_sets.len())?; + self.distinct_sets.clear(); + arrays + } + EmitTo::First(n) => { + let arrays = self.encode_sets_to_arrays(0, n)?; + self.distinct_sets = self.distinct_sets.split_off(n); + arrays + } + }; + + Ok(vec![arrays]) + } + + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + // For a single distinct value per row, create a list array with that value + assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); + let array = &values[0]; + let num_rows = array.len(); + + // Create list entries for all rows in a batch efficiently + let mut values_vec = Vec::with_capacity(num_rows); + let mut offsets = Vec::with_capacity(num_rows + 1); + offsets.push(0); + + // Track which rows will have values (non-null, passes filter) + let mut has_value = vec![false; num_rows]; + let mut total_values = 0; + + // First pass: identify valid rows and calculate space needed + for i in 0..num_rows { + if !array.is_null(i) + && opt_filter.map_or(true, |f| !f.is_null(i) && f.value(i)) + { + has_value[i] = true; + total_values += 1; + } + offsets.push(total_values); + } + + // Second pass: build values array + for i in 0..num_rows { + if has_value[i] { + values_vec.push(ScalarValue::try_from_array(array, i)?); + } + } + + // Build the values array once for all rows + let values_array = ScalarValue::iter_to_array(values_vec)?; + + // Create the list array with the calculated offsets + let offsets_buffer = arrow::buffer::Buffer::from_slice_ref(&offsets); + let list_data = arrow::array::ArrayData::builder(DataType::List(Arc::new( + Field::new("item", values_array.data_type().clone(), true), + ))) + .len(num_rows) + .add_buffer(offsets_buffer) + .add_child_data(values_array.into_data()) + .build()?; + + Ok(vec![Arc::new(ListArray::from(list_data))]) + } + + fn supports_convert_to_state(&self) -> bool { + true + } + + fn size(&self) -> usize { + // Base size of the struct + let mut size = size_of::(); + + // Size of the vector holding the HashSets + size += size_of::>>() + + self.distinct_sets.capacity() + * size_of::>(); + + // Estimate HashSet contents size more efficiently + // Instead of iterating through all values which is expensive, use an approximation + for set in &self.distinct_sets { + // Base size of the HashSet + size += set.capacity() * size_of::<(ScalarValue, ())>(); + + // Estimate ScalarValue size using sample-based approach + // Only look at up to 10 items as a sample + let sample_size = 10.min(set.len()); + if sample_size > 0 { + let avg_size = set + .iter() + .take(sample_size) + .map(|v| v.size()) + .sum::() + / sample_size; + + // Extrapolate to the full set + size += avg_size * (set.len() - sample_size); + } + } + + size + } +} + #[cfg(test)] mod tests { use super::*; - use arrow::array::NullArray; + use arrow::array::{Int32Array, NullArray, StringArray}; #[test] fn count_accumulator_nulls() -> Result<()> { @@ -764,4 +1073,63 @@ mod tests { assert_eq!(accumulator.evaluate()?, ScalarValue::Int64(Some(0))); Ok(()) } + + #[test] + fn test_distinct_count_groups_basic() -> Result<()> { + // Create a simple accumulator for Int32 values + let mut accumulator = DistinctCountGroupsAccumulator::new(); + + // Create some test data + let values = vec![Arc::new(Int32Array::from(vec![1, 2, 1, 3, 2, 1])) as ArrayRef]; + + // Group indices: we have 3 groups + let group_indices = vec![0, 1, 0, 2, 1, 0]; + + // Update the accumulator + accumulator.update_batch(&values, &group_indices, None, 3)?; + + // Evaluate + let result = accumulator.evaluate(EmitTo::All)?; + let counts = result.as_primitive::(); + + // Group 0 should have distinct values [1] (1 appears 3 times) -> count 1 + // Group 1 should have distinct values [2] (2 appears 2 times) -> count 1 + // Group 2 should have distinct values [3] (3 appears 1 time) -> count 1 + assert_eq!(counts.value(0), 1); // Group 0: distinct values 1, 1, 1 -> count 1 + assert_eq!(counts.value(1), 1); // Group 1: distinct values 2, 2 -> count 1 + assert_eq!(counts.value(2), 1); // Group 2: distinct values 3 -> count 1 + + Ok(()) + } + + #[test] + fn test_distinct_count_groups_with_filter() -> Result<()> { + // Create a simple accumulator for string values + let mut accumulator = DistinctCountGroupsAccumulator::new(); + + // Create some test data + let values = vec![ + Arc::new(StringArray::from(vec!["a", "b", "a", "c", "b", "d"])) as ArrayRef, + ]; + + // Group indices: we have 2 groups + let group_indices = vec![0, 0, 0, 1, 1, 1]; + + // Filter: include only some rows + let filter = BooleanArray::from(vec![true, true, false, true, false, true]); + + // Update the accumulator + accumulator.update_batch(&values, &group_indices, Some(&filter), 2)?; + + // Evaluate + let result = accumulator.evaluate(EmitTo::All)?; + let counts = result.as_primitive::(); + + // Group 0 should have ["a", "b"] (filter excludes the second "a") + // Group 1 should have ["c", "d"] (filter excludes "b") + assert_eq!(counts.value(0), 2); + assert_eq!(counts.value(1), 2); + + Ok(()) + } } From 7c166e659a339d77bd490ca77bc71d17024c9c68 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 20 Mar 2025 05:23:49 +0800 Subject: [PATCH 2/8] finalize Signed-off-by: Ruihang Xia --- datafusion/functions-aggregate/src/count.rs | 164 +++++------------- .../sqllogictest/test_files/aggregate.slt | 6 + 2 files changed, 53 insertions(+), 117 deletions(-) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 64b02097311ed..418f7724df537 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -16,9 +16,12 @@ // under the License. use ahash::RandomState; +use arrow::array::NullArray; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use datafusion_common::stats::Precision; use datafusion_expr::expr::WindowFunction; use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask; use datafusion_macros::user_doc; use datafusion_physical_expr::expressions; use std::collections::HashSet; @@ -64,7 +67,6 @@ use datafusion_physical_expr_common::binary_map::OutputType; use datafusion_common::cast::as_list_array; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; -use datafusion_common::utils::SingleRowListArrayBuilder; make_udaf_expr_and_func!( Count, @@ -774,49 +776,6 @@ impl DistinctCountGroupsAccumulator { .resize_with(total_num_groups, HashSet::default); } } - - // Helper method to encode sets of distinct values into Arrow arrays - fn encode_sets_to_arrays(&self, start: usize, end: usize) -> Result { - let mut arrays: Vec = Vec::with_capacity(end - start); - - // Create a list array for each group - for group_idx in start..end { - if group_idx < self.distinct_sets.len() { - // Convert the set's values to an array - let values: Vec<_> = - self.distinct_sets[group_idx].iter().cloned().collect(); - let values_array = ScalarValue::iter_to_array(values)?; - - // Wrap as a list array - let list_array = - SingleRowListArrayBuilder::new(values_array).build_list_array(); - arrays.push(Arc::new(list_array) as _); - } else { - // Group doesn't exist, create empty list - let empty_array = ScalarValue::iter_to_array(Vec::::new())?; - let list_array = - SingleRowListArrayBuilder::new(empty_array).build_list_array(); - arrays.push(Arc::new(list_array) as _); - } - } - - // If we have no arrays, create empty list - if arrays.is_empty() { - let empty_array = ScalarValue::iter_to_array(Vec::::new())?; - let list_array = - SingleRowListArrayBuilder::new(empty_array).build_list_array(); - return Ok(Arc::new(list_array)); - } - - // If we only have one array, return it - if arrays.len() == 1 { - return Ok(arrays[0].clone()); - } - - // Otherwise concatenate all arrays - use explicit casting to ensure we have dyn Arrays - let array_refs: Vec<&dyn Array> = arrays.iter().map(|a| a.as_ref()).collect(); - Ok(compute::concat(&array_refs)?) - } } impl GroupsAccumulator for DistinctCountGroupsAccumulator { @@ -952,21 +911,38 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { } fn state(&mut self, emit_to: EmitTo) -> Result> { - // Create arrays that hold the distinct values for each group - let arrays = match emit_to { - EmitTo::All => { - let arrays = self.encode_sets_to_arrays(0, self.distinct_sets.len())?; - self.distinct_sets.clear(); - arrays - } - EmitTo::First(n) => { - let arrays = self.encode_sets_to_arrays(0, n)?; - self.distinct_sets = self.distinct_sets.split_off(n); - arrays - } + let distinct_sets: Vec> = + emit_to.take_needed(&mut self.distinct_sets); + + let mut offsets = Vec::with_capacity(distinct_sets.len() + 1); + offsets.push(0); + let mut curr_len = 0i32; + + let mut value_iter = distinct_sets + .into_iter() + .flat_map(|set| { + // build offset + curr_len += set.len() as i32; + offsets.push(curr_len); + // convert into iter + set.into_iter() + }) + .peekable(); + let data_array: ArrayRef = if value_iter.peek().is_none() { + Arc::new(NullArray::new(0)) as _ + } else { + Arc::new(ScalarValue::iter_to_array(value_iter)?) as _ }; + let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); + + let list_array = ListArray::new( + Arc::new(Field::new_list_field(data_array.data_type().clone(), true)), + offset_buffer, + data_array, + None, + ); - Ok(vec![arrays]) + Ok(vec![Arc::new(list_array) as _]) } fn convert_to_state( @@ -976,50 +952,19 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { ) -> Result> { // For a single distinct value per row, create a list array with that value assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); - let array = &values[0]; - let num_rows = array.len(); - - // Create list entries for all rows in a batch efficiently - let mut values_vec = Vec::with_capacity(num_rows); - let mut offsets = Vec::with_capacity(num_rows + 1); - offsets.push(0); - - // Track which rows will have values (non-null, passes filter) - let mut has_value = vec![false; num_rows]; - let mut total_values = 0; - - // First pass: identify valid rows and calculate space needed - for i in 0..num_rows { - if !array.is_null(i) - && opt_filter.map_or(true, |f| !f.is_null(i) && f.value(i)) - { - has_value[i] = true; - total_values += 1; - } - offsets.push(total_values); - } - - // Second pass: build values array - for i in 0..num_rows { - if has_value[i] { - values_vec.push(ScalarValue::try_from_array(array, i)?); - } - } - - // Build the values array once for all rows - let values_array = ScalarValue::iter_to_array(values_vec)?; - - // Create the list array with the calculated offsets - let offsets_buffer = arrow::buffer::Buffer::from_slice_ref(&offsets); - let list_data = arrow::array::ArrayData::builder(DataType::List(Arc::new( - Field::new("item", values_array.data_type().clone(), true), - ))) - .len(num_rows) - .add_buffer(offsets_buffer) - .add_child_data(values_array.into_data()) - .build()?; + let values = values[0].clone(); + + let offsets = + OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1)); + let nulls = filtered_null_mask(opt_filter, &values); + let list_array = ListArray::new( + Arc::new(Field::new_list_field(values.data_type().clone(), true)), + offsets, + values, + nulls, + ); - Ok(vec![Arc::new(ListArray::from(list_data))]) + Ok(vec![Arc::new(list_array)]) } fn supports_convert_to_state(&self) -> bool { @@ -1076,19 +1021,13 @@ mod tests { #[test] fn test_distinct_count_groups_basic() -> Result<()> { - // Create a simple accumulator for Int32 values let mut accumulator = DistinctCountGroupsAccumulator::new(); - - // Create some test data let values = vec![Arc::new(Int32Array::from(vec![1, 2, 1, 3, 2, 1])) as ArrayRef]; - // Group indices: we have 3 groups + // 3 groups let group_indices = vec![0, 1, 0, 2, 1, 0]; - - // Update the accumulator accumulator.update_batch(&values, &group_indices, None, 3)?; - // Evaluate let result = accumulator.evaluate(EmitTo::All)?; let counts = result.as_primitive::(); @@ -1104,24 +1043,15 @@ mod tests { #[test] fn test_distinct_count_groups_with_filter() -> Result<()> { - // Create a simple accumulator for string values let mut accumulator = DistinctCountGroupsAccumulator::new(); - - // Create some test data let values = vec![ Arc::new(StringArray::from(vec!["a", "b", "a", "c", "b", "d"])) as ArrayRef, ]; - - // Group indices: we have 2 groups + // 2 groups let group_indices = vec![0, 0, 0, 1, 1, 1]; - - // Filter: include only some rows let filter = BooleanArray::from(vec![true, true, false, true, false, true]); - - // Update the accumulator accumulator.update_batch(&values, &group_indices, Some(&filter), 2)?; - // Evaluate let result = accumulator.evaluate(EmitTo::All)?; let counts = result.as_primitive::(); diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index bc43f6bc8e61a..28088b56b1024 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -2302,6 +2302,12 @@ SELECT count(c1, c2) FROM test query error DataFusion error: This feature is not implemented: COUNT DISTINCT with multiple arguments SELECT count(distinct c1, c2) FROM test +# count(distinct) and count() together +query II +SELECT count(c1), count(distinct c1) FROM test +---- +4 3 + # count_null query III SELECT count(null), count(null, null), count(distinct null) FROM test From f5c0935da7017d7a97d9ffea7d2d611b274ea0a6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 20 Mar 2025 05:58:23 +0800 Subject: [PATCH 3/8] fix clippy Signed-off-by: Ruihang Xia --- datafusion/functions-aggregate/src/count.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 418f7724df537..1ca43d054be63 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -952,7 +952,7 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { ) -> Result> { // For a single distinct value per row, create a list array with that value assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); - let values = values[0].clone(); + let values = ArrayRef::clone(&values[0]); let offsets = OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1)); From 2bd4986e8943de9f27006adcff45937a56fb380f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 20 Mar 2025 07:30:35 +0800 Subject: [PATCH 4/8] record data type Signed-off-by: Ruihang Xia --- datafusion/functions-aggregate/src/count.rs | 56 +++++++++------------ 1 file changed, 23 insertions(+), 33 deletions(-) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 1ca43d054be63..5bcd545dbeba7 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -16,7 +16,6 @@ // under the License. use ahash::RandomState; -use arrow::array::NullArray; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use datafusion_common::stats::Precision; use datafusion_expr::expr::WindowFunction; @@ -212,7 +211,9 @@ impl AggregateUDFImpl for Count { format_state_name(args.name, "count distinct"), // See COMMENTS.md to understand why nullable is set to true Field::new_list_field(args.input_types[0].clone(), true), - false, + // For group count distinct accumulator, null list item stands for an + // empty value set (i.e., all NULL value so far for that group). + true, )]) } else { Ok(vec![Field::new( @@ -360,7 +361,9 @@ impl AggregateUDFImpl for Count { ) -> Result> { // instantiate specialized accumulator if args.is_distinct { - Ok(Box::new(DistinctCountGroupsAccumulator::new())) + Ok(Box::new(DistinctCountGroupsAccumulator::new( + args.exprs[0].data_type(args.schema)?, + ))) } else { Ok(Box::new(CountGroupsAccumulator::new())) } @@ -759,15 +762,19 @@ impl Accumulator for DistinctCountAccumulator { } /// GroupsAccumulator for COUNT DISTINCT operations -#[derive(Debug, Default)] +#[derive(Debug)] pub struct DistinctCountGroupsAccumulator { /// One HashSet per group to track distinct values distinct_sets: Vec>, + data_type: DataType, } impl DistinctCountGroupsAccumulator { - pub fn new() -> Self { - Self::default() + pub fn new(data_type: DataType) -> Self { + Self { + distinct_sets: vec![], + data_type, + } } fn ensure_sets(&mut self, total_num_groups: usize) { @@ -850,30 +857,13 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - // Convert counts to Int64Array - let counts = match emit_to { - EmitTo::All => { - let counts: Vec = self - .distinct_sets - .iter() - .map(|set| set.len() as i64) - .collect(); - self.distinct_sets.clear(); - counts - } - EmitTo::First(n) => { - let counts: Vec = self - .distinct_sets - .iter() - .take(n) - .map(|set| set.len() as i64) - .collect(); - self.distinct_sets = self.distinct_sets.split_off(n); - counts - } - }; + let distinct_sets: Vec> = + emit_to.take_needed(&mut self.distinct_sets); - // COUNT DISTINCT never returns nulls + let counts = distinct_sets + .iter() + .map(|set| set.len() as i64) + .collect::>(); Ok(Arc::new(Int64Array::from(counts))) } @@ -929,14 +919,14 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { }) .peekable(); let data_array: ArrayRef = if value_iter.peek().is_none() { - Arc::new(NullArray::new(0)) as _ + arrow::array::new_empty_array(&self.data_type) as _ } else { Arc::new(ScalarValue::iter_to_array(value_iter)?) as _ }; let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); let list_array = ListArray::new( - Arc::new(Field::new_list_field(data_array.data_type().clone(), true)), + Arc::new(Field::new_list_field(self.data_type.clone(), true)), offset_buffer, data_array, None, @@ -1021,7 +1011,7 @@ mod tests { #[test] fn test_distinct_count_groups_basic() -> Result<()> { - let mut accumulator = DistinctCountGroupsAccumulator::new(); + let mut accumulator = DistinctCountGroupsAccumulator::new(DataType::Int32); let values = vec![Arc::new(Int32Array::from(vec![1, 2, 1, 3, 2, 1])) as ArrayRef]; // 3 groups @@ -1043,7 +1033,7 @@ mod tests { #[test] fn test_distinct_count_groups_with_filter() -> Result<()> { - let mut accumulator = DistinctCountGroupsAccumulator::new(); + let mut accumulator = DistinctCountGroupsAccumulator::new(DataType::Utf8); let values = vec![ Arc::new(StringArray::from(vec!["a", "b", "a", "c", "b", "d"])) as ArrayRef, ]; From 7bc506d23dd87b3a1a7e95467c9f4adaab00d4b2 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 25 Mar 2025 11:57:23 +0800 Subject: [PATCH 5/8] dedicate accumulator Signed-off-by: Ruihang Xia --- datafusion/functions-aggregate/src/count.rs | 1261 ++++++++++++++++++- 1 file changed, 1256 insertions(+), 5 deletions(-) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 5bcd545dbeba7..94c9d467377eb 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -16,15 +16,22 @@ // under the License. use ahash::RandomState; -use arrow::buffer::{OffsetBuffer, ScalarBuffer}; +use arrow::array::{ + ArrowPrimitiveType, BinaryArray, BinaryViewArray, GenericByteArray, + GenericByteViewArray, OffsetSizeTrait, StringArray, StringViewArray, +}; +use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer}; use datafusion_common::stats::Precision; use datafusion_expr::expr::WindowFunction; use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask; +use datafusion_functions_aggregate_common::utils::Hashable; use datafusion_macros::user_doc; use datafusion_physical_expr::expressions; use std::collections::HashSet; use std::fmt::Debug; +use std::hash::Hash; +use std::marker::PhantomData; use std::mem::{size_of, size_of_val}; use std::ops::BitAnd; use std::sync::Arc; @@ -64,8 +71,10 @@ use datafusion_functions_aggregate_common::aggregate::count_distinct::{ use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate_indices; use datafusion_physical_expr_common::binary_map::OutputType; -use datafusion_common::cast::as_list_array; +use arrow::datatypes::{ByteArrayType, GenericStringType}; +use datafusion_common::cast::{as_list_array, as_primitive_array}; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; +use std::convert::TryFrom; make_udaf_expr_and_func!( Count, @@ -361,9 +370,129 @@ impl AggregateUDFImpl for Count { ) -> Result> { // instantiate specialized accumulator if args.is_distinct { - Ok(Box::new(DistinctCountGroupsAccumulator::new( - args.exprs[0].data_type(args.schema)?, - ))) + if args.exprs.len() > 1 { + return not_impl_err!("COUNT DISTINCT with multiple arguments"); + } + + let data_type = &args.exprs[0].data_type(args.schema)?; + Ok(match data_type { + // try and use a specialized accumulator if possible, otherwise fall back to generic accumulator + DataType::Int8 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Int8Type, + >::new(data_type.clone())), + DataType::Int16 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Int16Type, + >::new(data_type.clone())), + DataType::Int32 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Int32Type, + >::new(data_type.clone())), + DataType::Int64 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Int64Type, + >::new(data_type.clone())), + DataType::UInt8 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< + UInt8Type, + >::new(data_type.clone())), + DataType::UInt16 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< + UInt16Type, + >::new(data_type.clone())), + DataType::UInt32 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< + UInt32Type, + >::new(data_type.clone())), + DataType::UInt64 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< + UInt64Type, + >::new(data_type.clone())), + DataType::Decimal128(_, _) => Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new( + data_type.clone(), + ), + ), + DataType::Decimal256(_, _) => Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new( + data_type.clone(), + ), + ), + + DataType::Date32 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Date32Type, + >::new(data_type.clone())), + DataType::Date64 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Date64Type, + >::new(data_type.clone())), + DataType::Time32(TimeUnit::Millisecond) => { + Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Time32MillisecondType, + >::new(data_type.clone())) + } + DataType::Time32(TimeUnit::Second) => Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new( + data_type.clone(), + ), + ), + DataType::Time64(TimeUnit::Microsecond) => { + Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Time64MicrosecondType, + >::new(data_type.clone())) + } + DataType::Time64(TimeUnit::Nanosecond) => { + Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Time64NanosecondType, + >::new(data_type.clone())) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + Box::new(PrimitiveDistinctCountGroupsAccumulator::< + TimestampMicrosecondType, + >::new(data_type.clone())) + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + Box::new(PrimitiveDistinctCountGroupsAccumulator::< + TimestampMillisecondType, + >::new(data_type.clone())) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + Box::new(PrimitiveDistinctCountGroupsAccumulator::< + TimestampNanosecondType, + >::new(data_type.clone())) + } + DataType::Timestamp(TimeUnit::Second, _) => Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new( + data_type.clone(), + ), + ), + + DataType::Float16 => Box::new(FloatDistinctCountGroupsAccumulator::< + Float16Type, + >::new(data_type.clone())), + DataType::Float32 => Box::new(FloatDistinctCountGroupsAccumulator::< + Float32Type, + >::new(data_type.clone())), + DataType::Float64 => Box::new(FloatDistinctCountGroupsAccumulator::< + Float64Type, + >::new(data_type.clone())), + + // DataType::Utf8 => Box::new( + // BytesDistinctCountGroupsAccumulator::::new(OutputType::Utf8), + // ), + // DataType::Utf8View => Box::new( + // BytesViewDistinctCountGroupsAccumulator::new(OutputType::Utf8View), + // ), + // DataType::LargeUtf8 => Box::new( + // BytesDistinctCountGroupsAccumulator::::new(OutputType::Utf8), + // ), + // DataType::Binary => Box::new( + // BytesDistinctCountGroupsAccumulator::::new(OutputType::Binary), + // ), + // DataType::BinaryView => Box::new( + // BytesViewDistinctCountGroupsAccumulator::new(OutputType::BinaryView), + // ), + // DataType::LargeBinary => { + // Box::new(BytesDistinctCountGroupsAccumulator::::new( + // OutputType::Binary, + // )) + // } + + // Use the generic accumulator based on `ScalarValue` for all other types + _ => Box::new(DistinctCountGroupsAccumulator::new(data_type.clone())), + }) } else { Ok(Box::new(CountGroupsAccumulator::new())) } @@ -996,6 +1125,1128 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { } } +/// A specialized GroupsAccumulator for count distinct operations with primitive types +/// This is more efficient than the general DistinctCountGroupsAccumulator for primitive types +#[derive(Debug)] +pub struct PrimitiveDistinctCountGroupsAccumulator +where + T: ArrowPrimitiveType + Send, + T::Native: Eq + Hash, +{ + /// One HashSet per group to track distinct values + distinct_sets: Vec>, + data_type: DataType, +} + +impl PrimitiveDistinctCountGroupsAccumulator +where + T: ArrowPrimitiveType + Send, + T::Native: Eq + Hash, +{ + pub fn new(data_type: DataType) -> Self { + Self { + distinct_sets: vec![], + data_type, + } + } + + fn ensure_sets(&mut self, total_num_groups: usize) { + if self.distinct_sets.len() < total_num_groups { + self.distinct_sets + .resize_with(total_num_groups, HashSet::default); + } + } +} + +impl GroupsAccumulator for PrimitiveDistinctCountGroupsAccumulator +where + T: ArrowPrimitiveType + Send + Debug, + T::Native: Eq + Hash, +{ + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); + self.ensure_sets(total_num_groups); + + let array = as_primitive_array::(&values[0])?; + let data = array.values(); + + // Implement a manual iteration rather than using accumulate_indices with a closure + // that needs row_index + match (array.logical_nulls(), opt_filter) { + (None, None) => { + // No nulls, no filter - process all rows + for (row_idx, &group_idx) in group_indices.iter().enumerate() { + self.distinct_sets[group_idx].insert(data[row_idx]); + } + } + (Some(nulls), None) => { + // Has nulls, no filter + for (row_idx, (&group_idx, is_valid)) in + group_indices.iter().zip(nulls.iter()).enumerate() + { + if is_valid { + self.distinct_sets[group_idx].insert(data[row_idx]); + } + } + } + (None, Some(filter)) => { + // No nulls, has filter + for (row_idx, (&group_idx, filter_value)) in + group_indices.iter().zip(filter.iter()).enumerate() + { + if let Some(true) = filter_value { + self.distinct_sets[group_idx].insert(data[row_idx]); + } + } + } + (Some(nulls), Some(filter)) => { + // Has nulls and filter + let iter = filter + .iter() + .zip(group_indices.iter()) + .zip(nulls.iter()) + .enumerate(); + + for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { + if is_valid && filter_value == Some(true) { + self.distinct_sets[group_idx].insert(data[row_idx]); + } + } + } + } + + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); + + let counts = distinct_sets + .iter() + .map(|set| set.len() as i64) + .collect::>(); + + Ok(Arc::new(Int64Array::from(counts))) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + _opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!( + values.len(), + 1, + "COUNT DISTINCT merge expects a single state array" + ); + self.ensure_sets(total_num_groups); + + let list_array = as_list_array(&values[0])?; + + // For each group in the incoming batch + for (i, &group_idx) in group_indices.iter().enumerate() { + if i < list_array.len() { + let inner_array = list_array.value(i); + if !inner_array.is_empty() { + // Get the primitive array from the list and extend our set with its values + let primitive_array = as_primitive_array::(&inner_array)?; + self.distinct_sets[group_idx].extend(primitive_array.values()); + } + } + } + + Ok(()) + } + + fn state(&mut self, emit_to: EmitTo) -> Result> { + let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); + + let mut offsets = Vec::with_capacity(distinct_sets.len() + 1); + offsets.push(0); + let mut values = Vec::new(); + + // Create the values array by flattening all sets + for set in distinct_sets { + let start_len = values.len(); + values.extend(set.into_iter()); + offsets.push(values.len() as i32); + } + + // Create the primitive array from the flattened values + let values_array = Arc::new( + PrimitiveArray::::from_iter_values(values.into_iter()) + .with_data_type(self.data_type.clone()), + ) as ArrayRef; + + // Create list array with the offsets + let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); + let list_array = ListArray::new( + Arc::new(Field::new_list_field(self.data_type.clone(), true)), + offset_buffer, + values_array, + None, + ); + + Ok(vec![Arc::new(list_array) as _]) + } + + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + // For a single distinct value per row, create a list array with that value + assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); + let values = ArrayRef::clone(&values[0]); + + let offsets = + OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1)); + let nulls = filtered_null_mask(opt_filter, &values); + let list_array = ListArray::new( + Arc::new(Field::new_list_field(values.data_type().clone(), true)), + offsets, + values, + nulls, + ); + + Ok(vec![Arc::new(list_array)]) + } + + fn supports_convert_to_state(&self) -> bool { + true + } + + fn size(&self) -> usize { + let mut total_size = std::mem::size_of::(); + + // Size of vector container + total_size += std::mem::size_of::>>(); + + // Size of actual sets and their contents + for set in &self.distinct_sets { + let set_size = std::mem::size_of::>() + + set.capacity() * std::mem::size_of::(); + total_size += set_size; + } + + total_size + } +} + +#[derive(Debug)] +pub struct FloatDistinctCountGroupsAccumulator +where + T: ArrowPrimitiveType + Send, +{ + /// One HashSet per group to track distinct values + distinct_sets: Vec, RandomState>>, + data_type: DataType, +} + +impl FloatDistinctCountGroupsAccumulator +where + T: ArrowPrimitiveType + Send, +{ + pub fn new(data_type: DataType) -> Self { + Self { + distinct_sets: vec![], + data_type, + } + } + + fn ensure_sets(&mut self, total_num_groups: usize) { + if self.distinct_sets.len() < total_num_groups { + self.distinct_sets + .resize_with(total_num_groups, HashSet::default); + } + } +} + +impl GroupsAccumulator for FloatDistinctCountGroupsAccumulator +where + T: ArrowPrimitiveType + Send + Debug, +{ + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); + self.ensure_sets(total_num_groups); + + let array = as_primitive_array::(&values[0])?; + let data = array.values(); + + // Implement a manual iteration rather than using accumulate_indices with a closure + // that needs row_index + match (array.logical_nulls(), opt_filter) { + (None, None) => { + // No nulls, no filter - process all rows + for (row_idx, &group_idx) in group_indices.iter().enumerate() { + self.distinct_sets[group_idx].insert(Hashable(data[row_idx])); + } + } + (Some(nulls), None) => { + // Has nulls, no filter + for (row_idx, (&group_idx, is_valid)) in + group_indices.iter().zip(nulls.iter()).enumerate() + { + if is_valid { + self.distinct_sets[group_idx].insert(Hashable(data[row_idx])); + } + } + } + (None, Some(filter)) => { + // No nulls, has filter + for (row_idx, (&group_idx, filter_value)) in + group_indices.iter().zip(filter.iter()).enumerate() + { + if let Some(true) = filter_value { + self.distinct_sets[group_idx].insert(Hashable(data[row_idx])); + } + } + } + (Some(nulls), Some(filter)) => { + // Has nulls and filter + let iter = filter + .iter() + .zip(group_indices.iter()) + .zip(nulls.iter()) + .enumerate(); + + for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { + if is_valid && filter_value == Some(true) { + self.distinct_sets[group_idx].insert(Hashable(data[row_idx])); + } + } + } + } + + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); + + let counts = distinct_sets + .iter() + .map(|set| set.len() as i64) + .collect::>(); + + Ok(Arc::new(Int64Array::from(counts))) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + _opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!( + values.len(), + 1, + "COUNT DISTINCT merge expects a single state array" + ); + self.ensure_sets(total_num_groups); + + let list_array = as_list_array(&values[0])?; + + // For each group in the incoming batch + for (i, &group_idx) in group_indices.iter().enumerate() { + if i < list_array.len() { + let inner_array = list_array.value(i); + if !inner_array.is_empty() { + // Get the primitive array from the list and extend our set with its values + let primitive_array = as_primitive_array::(&inner_array)?; + self.distinct_sets[group_idx] + .extend(primitive_array.values().iter().map(|v| Hashable(*v))); + } + } + } + + Ok(()) + } + + fn state(&mut self, emit_to: EmitTo) -> Result> { + let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); + + let mut offsets = Vec::with_capacity(distinct_sets.len() + 1); + offsets.push(0); + let mut values = Vec::new(); + + // Create the values array by flattening all sets + for set in distinct_sets { + let start_len = values.len(); + values.extend(set.into_iter().map(|v| v.0)); + offsets.push(values.len() as i32); + } + + // Create the primitive array from the flattened values + let values_array = Arc::new( + PrimitiveArray::::from_iter_values(values.into_iter()) + .with_data_type(self.data_type.clone()), + ) as ArrayRef; + + // Create list array with the offsets + let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); + let list_array = ListArray::new( + Arc::new(Field::new_list_field(self.data_type.clone(), true)), + offset_buffer, + values_array, + None, + ); + + Ok(vec![Arc::new(list_array) as _]) + } + + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + // For a single distinct value per row, create a list array with that value + assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); + let values = ArrayRef::clone(&values[0]); + + let offsets = + OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1)); + let nulls = filtered_null_mask(opt_filter, &values); + let list_array = ListArray::new( + Arc::new(Field::new_list_field(values.data_type().clone(), true)), + offsets, + values, + nulls, + ); + + Ok(vec![Arc::new(list_array)]) + } + + fn supports_convert_to_state(&self) -> bool { + true + } + + fn size(&self) -> usize { + let mut total_size = std::mem::size_of::(); + + // Size of vector container + total_size += + std::mem::size_of::, RandomState>>>(); + + // Size of actual sets and their contents + for set in &self.distinct_sets { + let set_size = + std::mem::size_of::, RandomState>>() + + set.capacity() * std::mem::size_of::>(); + total_size += set_size; + } + + total_size + } +} + +/// A specialized GroupsAccumulator for count distinct operations with string/binary view types +#[derive(Debug)] +pub struct BytesViewDistinctCountGroupsAccumulator { + /// One HashSet per group to track distinct values + distinct_sets: Vec, RandomState>>, + output_type: OutputType, +} + +impl BytesViewDistinctCountGroupsAccumulator { + pub fn new(output_type: OutputType) -> Self { + Self { + distinct_sets: vec![], + output_type, + } + } + + fn ensure_sets(&mut self, total_num_groups: usize) { + if self.distinct_sets.len() < total_num_groups { + self.distinct_sets + .resize_with(total_num_groups, HashSet::default); + } + } +} + +impl GroupsAccumulator for BytesViewDistinctCountGroupsAccumulator { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); + self.ensure_sets(total_num_groups); + + // Handle binary view or string view arrays + if let Some(array) = values[0].as_any().downcast_ref::() { + // Implement a manual iteration rather than using accumulate_indices + match (array.logical_nulls(), opt_filter) { + (None, None) => { + // No nulls, no filter - process all rows + for (row_idx, &group_idx) in group_indices.iter().enumerate() { + let value = array.value(row_idx); + self.distinct_sets[group_idx].insert(value.to_vec()); + } + } + (Some(nulls), None) => { + // Has nulls, no filter + for (row_idx, (&group_idx, is_valid)) in + group_indices.iter().zip(nulls.iter()).enumerate() + { + if is_valid { + let value = array.value(row_idx); + self.distinct_sets[group_idx].insert(value.to_vec()); + } + } + } + (None, Some(filter)) => { + // No nulls, has filter + for (row_idx, (&group_idx, filter_value)) in + group_indices.iter().zip(filter.iter()).enumerate() + { + if let Some(true) = filter_value { + let value = array.value(row_idx); + self.distinct_sets[group_idx].insert(value.to_vec()); + } + } + } + (Some(nulls), Some(filter)) => { + // Has nulls and filter + let iter = filter + .iter() + .zip(group_indices.iter()) + .zip(nulls.iter()) + .enumerate(); + + for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { + if is_valid && filter_value == Some(true) { + let value = array.value(row_idx); + self.distinct_sets[group_idx].insert(value.to_vec()); + } + } + } + } + } else if let Some(array) = values[0].as_any().downcast_ref::() { + // Implement a manual iteration rather than using accumulate_indices + match (array.logical_nulls(), opt_filter) { + (None, None) => { + // No nulls, no filter - process all rows + for (row_idx, &group_idx) in group_indices.iter().enumerate() { + let value = array.value(row_idx).as_bytes(); + self.distinct_sets[group_idx].insert(value.to_vec()); + } + } + (Some(nulls), None) => { + // Has nulls, no filter + for (row_idx, (&group_idx, is_valid)) in + group_indices.iter().zip(nulls.iter()).enumerate() + { + if is_valid { + let value = array.value(row_idx).as_bytes(); + self.distinct_sets[group_idx].insert(value.to_vec()); + } + } + } + (None, Some(filter)) => { + // No nulls, has filter + for (row_idx, (&group_idx, filter_value)) in + group_indices.iter().zip(filter.iter()).enumerate() + { + if let Some(true) = filter_value { + let value = array.value(row_idx).as_bytes(); + self.distinct_sets[group_idx].insert(value.to_vec()); + } + } + } + (Some(nulls), Some(filter)) => { + // Has nulls and filter + let iter = filter + .iter() + .zip(group_indices.iter()) + .zip(nulls.iter()) + .enumerate(); + + for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { + if is_valid && filter_value == Some(true) { + let value = array.value(row_idx).as_bytes(); + self.distinct_sets[group_idx].insert(value.to_vec()); + } + } + } + } + } else { + return Err(datafusion_common::DataFusionError::Internal(format!( + "Unsupported array type for BytesViewDistinctCountGroupsAccumulator: {:?}", + values[0].data_type() + ))); + } + + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); + + let counts = distinct_sets + .iter() + .map(|set| set.len() as i64) + .collect::>(); + + Ok(Arc::new(Int64Array::from(counts))) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + _opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!( + values.len(), + 1, + "COUNT DISTINCT merge expects a single state array" + ); + self.ensure_sets(total_num_groups); + + let list_array = as_list_array(&values[0])?; + + // For each group in the incoming batch + for (i, &group_idx) in group_indices.iter().enumerate() { + if i < list_array.len() { + let inner_array = list_array.value(i); + if !inner_array.is_empty() { + // Handle binary view or string view arrays + if let Some(array) = + inner_array.as_any().downcast_ref::() + { + for j in 0..array.len() { + if !array.is_null(j) { + let value = array.value(j); + self.distinct_sets[group_idx].insert(value.to_vec()); + } + } + } else if let Some(array) = + inner_array.as_any().downcast_ref::() + { + for j in 0..array.len() { + if !array.is_null(j) { + let value = array.value(j).as_bytes(); + self.distinct_sets[group_idx].insert(value.to_vec()); + } + } + } else { + return Err(datafusion_common::DataFusionError::Internal(format!( + "Unsupported inner array type for BytesViewDistinctCountGroupsAccumulator: {:?}", + inner_array.data_type() + ))); + } + } + } + } + + Ok(()) + } + + fn state(&mut self, emit_to: EmitTo) -> Result> { + let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); + + let mut offsets = Vec::with_capacity(distinct_sets.len() + 1); + offsets.push(0); + + // Create arrays for views + let inner_array: ArrayRef = + match self.output_type { + OutputType::Utf8View => { + let mut string_values = Vec::new(); + + // Collect all string values from all sets + for set in &distinct_sets { + for v in set { + // Safety: we know the value is valid UTF-8 since it came from a StringViewArray + let s = unsafe { std::str::from_utf8_unchecked(v) }; + string_values.push(s.to_string()); + } + } + + // Use from_iter_values which works correctly + let array = StringViewArray::from_iter_values( + string_values.iter().map(|s| s.as_str()), + ); + Arc::new(array) + } + OutputType::BinaryView => { + let mut bytes_values = Vec::new(); + + // Collect all byte values from all sets + for set in &distinct_sets { + for v in set { + bytes_values.push(v.clone()); + } + } + + // Use from_iter_values which works correctly + let array = BinaryViewArray::from_iter_values( + bytes_values.iter().map(|v| v.as_slice()), + ); + Arc::new(array) + } + _ => return Err(datafusion_common::DataFusionError::Internal( + "Unsupported output type for BytesViewDistinctCountGroupsAccumulator" + .to_string(), + )), + }; + + // Count elements in each set for offsets + for set in distinct_sets { + offsets.push(offsets.last().unwrap() + set.len() as i32); + } + + // Create list array with the offsets + let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); + let list_array = ListArray::new( + Arc::new(Field::new_list_field(inner_array.data_type().clone(), true)), + offset_buffer, + inner_array, + None, + ); + + Ok(vec![Arc::new(list_array) as _]) + } + + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + // For a single distinct value per row, create a list array with that value + assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); + let values = ArrayRef::clone(&values[0]); + + let offsets = + OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1)); + let nulls = filtered_null_mask(opt_filter, &values); + let list_array = ListArray::new( + Arc::new(Field::new_list_field(values.data_type().clone(), true)), + offsets, + values, + nulls, + ); + + Ok(vec![Arc::new(list_array)]) + } + + fn supports_convert_to_state(&self) -> bool { + true + } + + fn size(&self) -> usize { + let mut total_size = std::mem::size_of::(); + + // Size of vector container + total_size += std::mem::size_of::, RandomState>>>(); + + // Size of actual sets and their contents (similar to BytesDistinctCountGroupsAccumulator) + for set in &self.distinct_sets { + let container_size = std::mem::size_of::, RandomState>>() + + set.capacity() * std::mem::size_of::>(); + + // Add sizes of actual byte vectors + let content_size = set.iter().map(|v| v.capacity()).sum::(); + + total_size += container_size + content_size; + } + + total_size + } +} + +// /// A specialized GroupsAccumulator for count distinct operations with string/binary types (for Utf8 & Binary arrays) +// #[derive(Debug)] +// pub struct BytesDistinctCountGroupsAccumulator { +// /// One HashSet per group to track distinct values +// distinct_sets: Vec, RandomState>>, +// output_type: OutputType, +// _phantom: PhantomData, +// } + +// impl BytesDistinctCountGroupsAccumulator { +// pub fn new(output_type: OutputType) -> Self { +// Self { +// distinct_sets: vec![], +// output_type, +// _phantom: PhantomData, +// } +// } + +// fn ensure_sets(&mut self, total_num_groups: usize) { +// if self.distinct_sets.len() < total_num_groups { +// self.distinct_sets +// .resize_with(total_num_groups, HashSet::default); +// } +// } +// } + +// impl GroupsAccumulator for BytesDistinctCountGroupsAccumulator +// where +// O: 'static + OffsetSizeTrait + Debug, +// { +// fn update_batch( +// &mut self, +// values: &[ArrayRef], +// group_indices: &[usize], +// opt_filter: Option<&BooleanArray>, +// total_num_groups: usize, +// ) -> Result<()> { +// assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); +// self.ensure_sets(total_num_groups); + +// if let Some(array) = values[0] +// .as_any() +// .downcast_ref::>>() +// { +// // String array case +// match (array.logical_nulls(), opt_filter) { +// (None, None) => { +// for (row_idx, &group_idx) in group_indices.iter().enumerate() { +// if row_idx < array.len() { +// let value = array.value(row_idx); +// self.distinct_sets[group_idx].insert(value.to_vec()); +// } +// } +// } +// (Some(nulls), None) => { +// for (row_idx, (&group_idx, is_valid)) in +// group_indices.iter().zip(nulls.iter()).enumerate() +// { +// if is_valid && row_idx < array.len() { +// let value = array.value(row_idx); +// self.distinct_sets[group_idx].insert(value.to_vec()); +// } +// } +// } +// (None, Some(filter)) => { +// for (row_idx, (&group_idx, filter_value)) in +// group_indices.iter().zip(filter.iter()).enumerate() +// { +// if let Some(true) = filter_value { +// if row_idx < array.len() { +// let value = array.value(row_idx); +// self.distinct_sets[group_idx].insert(value.to_vec()); +// } +// } +// } +// } +// (Some(nulls), Some(filter)) => { +// let iter = filter +// .iter() +// .zip(group_indices.iter()) +// .zip(nulls.iter()) +// .enumerate(); + +// for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { +// if is_valid && filter_value == Some(true) && row_idx < array.len() +// { +// let value = array.value(row_idx); +// self.distinct_sets[group_idx].insert(value.to_vec()); +// } +// } +// } +// } +// } else if let Some(array) = values[0] +// .as_any() +// .downcast_ref::>>() +// { +// // Binary array case +// match (array.logical_nulls(), opt_filter) { +// (None, None) => { +// for (row_idx, &group_idx) in group_indices.iter().enumerate() { +// if row_idx < array.len() { +// let value = array.value(row_idx); +// self.distinct_sets[group_idx].insert(value.to_vec()); +// } +// } +// } +// (Some(nulls), None) => { +// for (row_idx, (&group_idx, is_valid)) in +// group_indices.iter().zip(nulls.iter()).enumerate() +// { +// if is_valid && row_idx < array.len() { +// let value = array.value(row_idx); +// self.distinct_sets[group_idx].insert(value.to_vec()); +// } +// } +// } +// (None, Some(filter)) => { +// for (row_idx, (&group_idx, filter_value)) in +// group_indices.iter().zip(filter.iter()).enumerate() +// { +// if let Some(true) = filter_value { +// if row_idx < array.len() { +// let value = array.value(row_idx); +// self.distinct_sets[group_idx].insert(value.to_vec()); +// } +// } +// } +// } +// (Some(nulls), Some(filter)) => { +// let iter = filter +// .iter() +// .zip(group_indices.iter()) +// .zip(nulls.iter()) +// .enumerate(); + +// for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { +// if is_valid && filter_value == Some(true) && row_idx < array.len() +// { +// let value = array.value(row_idx); +// self.distinct_sets[group_idx].insert(value.to_vec()); +// } +// } +// } +// } +// } else { +// return Err(datafusion_common::DataFusionError::Internal(format!( +// "Cannot process array of type {:?} with BytesDistinctCountGroupsAccumulator", +// values[0].data_type() +// ))); +// } + +// Ok(()) +// } + +// fn evaluate(&mut self, emit_to: EmitTo) -> Result { +// let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); + +// let counts = distinct_sets +// .iter() +// .map(|set| set.len() as i64) +// .collect::>(); + +// Ok(Arc::new(Int64Array::from(counts))) +// } + +// fn merge_batch( +// &mut self, +// values: &[ArrayRef], +// group_indices: &[usize], +// _opt_filter: Option<&BooleanArray>, +// total_num_groups: usize, +// ) -> Result<()> { +// assert_eq!( +// values.len(), +// 1, +// "COUNT DISTINCT merge expects a single state array" +// ); +// self.ensure_sets(total_num_groups); + +// let list_array = as_list_array(&values[0])?; + +// // For each group in the incoming batch +// for (i, &group_idx) in group_indices.iter().enumerate() { +// if i < list_array.len() { +// let inner_array = list_array.value(i); +// if !inner_array.is_empty() { +// // Try both String and Binary types +// if let Some(bytes_array) = inner_array +// .as_any() +// .downcast_ref::>>() +// { +// for j in 0..bytes_array.len() { +// if !bytes_array.is_null(j) { +// let value = bytes_array.value(j); +// self.distinct_sets[group_idx].insert(value.to_vec()); +// } +// } +// } else if let Some(bytes_array) = inner_array +// .as_any() +// .downcast_ref::>>() +// { +// for j in 0..bytes_array.len() { +// if !bytes_array.is_null(j) { +// let value = bytes_array.value(j); +// self.distinct_sets[group_idx].insert(value.to_vec()); +// } +// } +// } +// } +// } +// } + +// Ok(()) +// } + +// fn state(&mut self, emit_to: EmitTo) -> Result> { +// let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); + +// let mut offsets = Vec::with_capacity(distinct_sets.len() + 1); +// offsets.push(0); + +// // First pass to calculate total byte length and build offsets +// let mut total_bytes_len = 0; +// let mut value_offsets = Vec::new(); +// value_offsets.push(O::default()); + +// let mut total_values = 0; +// for set in &distinct_sets { +// let set_len = set.len(); +// total_values += set_len; + +// for value in set { +// total_bytes_len += value.len(); +// let curr_offset = *value_offsets.last().unwrap(); +// let value_len = O::from_usize(value.len()).ok_or_else(|| { +// datafusion_common::DataFusionError::Internal( +// "Failed to convert offset".to_string(), +// ) +// })?; + +// let mut next_offset = curr_offset; +// next_offset += value_len; +// value_offsets.push(next_offset); +// } + +// offsets.push(offsets.last().unwrap() + set_len as i32); +// } + +// // Create buffer for all bytes concatenated +// let mut bytes = Vec::with_capacity(total_bytes_len); +// for set in distinct_sets { +// for value in set { +// bytes.extend_from_slice(&value); +// } +// } + +// // Create appropriate array based on output type +// let inner_array: ArrayRef = match self.output_type { +// OutputType::Utf8 => { +// // Create a StringArray +// let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets)); +// let values_buffer = Buffer::from_vec(bytes); +// if O::IS_LARGE { +// // If O is i64, use LargeStringArray +// let large_string_array = +// arrow::array::GenericByteArray::>::new( +// offset_buffer.try_into().unwrap(), +// values_buffer, +// None, +// ); +// Arc::new(large_string_array) +// } else { +// // If O is i32, use StringArray +// let string_array = +// arrow::array::GenericByteArray::>::new( +// offset_buffer.try_into().unwrap(), +// values_buffer, +// None, +// ); +// Arc::new(string_array) +// } +// } +// OutputType::Binary => { +// // Create a BinaryArray +// let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets)); +// let values_buffer = Buffer::from_vec(bytes); +// if O::IS_LARGE { +// // If O is i64, use LargeBinaryArray +// let large_binary_array = +// arrow::array::GenericByteArray::>::new( +// offset_buffer.try_into().unwrap(), +// values_buffer, +// None, +// ); +// Arc::new(large_binary_array) +// } else { +// // If O is i32, use BinaryArray +// let binary_array = +// arrow::array::GenericByteArray::>::new( +// offset_buffer.try_into().unwrap(), +// values_buffer, +// None, +// ); +// Arc::new(binary_array) +// } +// } +// _ => { +// return Err(datafusion_common::DataFusionError::Internal( +// "Unsupported output type for BytesDistinctCountGroupsAccumulator" +// .to_string(), +// )) +// } +// }; + +// // Create list array with the offsets +// let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); +// let list_array = ListArray::new( +// Arc::new(Field::new_list_field(inner_array.data_type().clone(), true)), +// offset_buffer, +// inner_array, +// None, +// ); + +// Ok(vec![Arc::new(list_array) as _]) +// } + +// fn convert_to_state( +// &self, +// values: &[ArrayRef], +// opt_filter: Option<&BooleanArray>, +// ) -> Result> { +// // For a single distinct value per row, create a list array with that value +// assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); +// let values = ArrayRef::clone(&values[0]); + +// let offsets = +// OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1)); +// let nulls = filtered_null_mask(opt_filter, &values); +// let list_array = ListArray::new( +// Arc::new(Field::new_list_field(values.data_type().clone(), true)), +// offsets, +// values, +// nulls, +// ); + +// Ok(vec![Arc::new(list_array)]) +// } + +// fn supports_convert_to_state(&self) -> bool { +// true +// } + +// fn size(&self) -> usize { +// let mut total_size = std::mem::size_of::(); + +// // Size of vector container +// total_size += std::mem::size_of::, RandomState>>>(); + +// // Size of actual sets and their contents +// for set in &self.distinct_sets { +// let container_size = std::mem::size_of::, RandomState>>() +// + set.capacity() * std::mem::size_of::>(); + +// // Add sizes of actual byte vectors +// let content_size = set.iter().map(|v| v.capacity()).sum::(); + +// total_size += container_size + content_size; +// } + +// total_size +// } +// } + #[cfg(test)] mod tests { use super::*; From 86390ef231eb455c3ca9e02214d8eab38ce7ca5a Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 25 Mar 2025 11:57:39 +0800 Subject: [PATCH 6/8] a new method Signed-off-by: Ruihang Xia --- datafusion/functions-aggregate/src/count.rs | 1343 +------------------ 1 file changed, 64 insertions(+), 1279 deletions(-) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 94c9d467377eb..e7ca46e1d7f85 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -18,9 +18,10 @@ use ahash::RandomState; use arrow::array::{ ArrowPrimitiveType, BinaryArray, BinaryViewArray, GenericByteArray, - GenericByteViewArray, OffsetSizeTrait, StringArray, StringViewArray, + GenericByteViewArray, OffsetSizeTrait, StringArray, StringViewArray, UInt64Array, }; use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer}; +use datafusion_common::hash_utils::create_hashes; use datafusion_common::stats::Precision; use datafusion_expr::expr::WindowFunction; use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; @@ -219,7 +220,8 @@ impl AggregateUDFImpl for Count { Ok(vec![Field::new_list( format_state_name(args.name, "count distinct"), // See COMMENTS.md to understand why nullable is set to true - Field::new_list_field(args.input_types[0].clone(), true), + // Field::new_list_field(args.input_types[0].clone(), true), + Field::new_list_field(DataType::UInt64, true), // For group count distinct accumulator, null list item stands for an // empty value set (i.e., all NULL value so far for that group). true, @@ -234,6 +236,8 @@ impl AggregateUDFImpl for Count { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + panic!("should not create normal accumulator"); + if !acc_args.is_distinct { return Ok(Box::new(CountAccumulator::new())); } @@ -375,124 +379,9 @@ impl AggregateUDFImpl for Count { } let data_type = &args.exprs[0].data_type(args.schema)?; - Ok(match data_type { - // try and use a specialized accumulator if possible, otherwise fall back to generic accumulator - DataType::Int8 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< - Int8Type, - >::new(data_type.clone())), - DataType::Int16 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< - Int16Type, - >::new(data_type.clone())), - DataType::Int32 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< - Int32Type, - >::new(data_type.clone())), - DataType::Int64 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< - Int64Type, - >::new(data_type.clone())), - DataType::UInt8 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< - UInt8Type, - >::new(data_type.clone())), - DataType::UInt16 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< - UInt16Type, - >::new(data_type.clone())), - DataType::UInt32 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< - UInt32Type, - >::new(data_type.clone())), - DataType::UInt64 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< - UInt64Type, - >::new(data_type.clone())), - DataType::Decimal128(_, _) => Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new( - data_type.clone(), - ), - ), - DataType::Decimal256(_, _) => Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new( - data_type.clone(), - ), - ), - - DataType::Date32 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< - Date32Type, - >::new(data_type.clone())), - DataType::Date64 => Box::new(PrimitiveDistinctCountGroupsAccumulator::< - Date64Type, - >::new(data_type.clone())), - DataType::Time32(TimeUnit::Millisecond) => { - Box::new(PrimitiveDistinctCountGroupsAccumulator::< - Time32MillisecondType, - >::new(data_type.clone())) - } - DataType::Time32(TimeUnit::Second) => Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new( - data_type.clone(), - ), - ), - DataType::Time64(TimeUnit::Microsecond) => { - Box::new(PrimitiveDistinctCountGroupsAccumulator::< - Time64MicrosecondType, - >::new(data_type.clone())) - } - DataType::Time64(TimeUnit::Nanosecond) => { - Box::new(PrimitiveDistinctCountGroupsAccumulator::< - Time64NanosecondType, - >::new(data_type.clone())) - } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - Box::new(PrimitiveDistinctCountGroupsAccumulator::< - TimestampMicrosecondType, - >::new(data_type.clone())) - } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - Box::new(PrimitiveDistinctCountGroupsAccumulator::< - TimestampMillisecondType, - >::new(data_type.clone())) - } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - Box::new(PrimitiveDistinctCountGroupsAccumulator::< - TimestampNanosecondType, - >::new(data_type.clone())) - } - DataType::Timestamp(TimeUnit::Second, _) => Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new( - data_type.clone(), - ), - ), - - DataType::Float16 => Box::new(FloatDistinctCountGroupsAccumulator::< - Float16Type, - >::new(data_type.clone())), - DataType::Float32 => Box::new(FloatDistinctCountGroupsAccumulator::< - Float32Type, - >::new(data_type.clone())), - DataType::Float64 => Box::new(FloatDistinctCountGroupsAccumulator::< - Float64Type, - >::new(data_type.clone())), - - // DataType::Utf8 => Box::new( - // BytesDistinctCountGroupsAccumulator::::new(OutputType::Utf8), - // ), - // DataType::Utf8View => Box::new( - // BytesViewDistinctCountGroupsAccumulator::new(OutputType::Utf8View), - // ), - // DataType::LargeUtf8 => Box::new( - // BytesDistinctCountGroupsAccumulator::::new(OutputType::Utf8), - // ), - // DataType::Binary => Box::new( - // BytesDistinctCountGroupsAccumulator::::new(OutputType::Binary), - // ), - // DataType::BinaryView => Box::new( - // BytesViewDistinctCountGroupsAccumulator::new(OutputType::BinaryView), - // ), - // DataType::LargeBinary => { - // Box::new(BytesDistinctCountGroupsAccumulator::::new( - // OutputType::Binary, - // )) - // } - - // Use the generic accumulator based on `ScalarValue` for all other types - _ => Box::new(DistinctCountGroupsAccumulator::new(data_type.clone())), - }) + Ok(Box::new(DistinctCountGroupsAccumulator::new( + data_type.clone(), + ))) } else { Ok(Box::new(CountGroupsAccumulator::new())) } @@ -894,8 +783,10 @@ impl Accumulator for DistinctCountAccumulator { #[derive(Debug)] pub struct DistinctCountGroupsAccumulator { /// One HashSet per group to track distinct values - distinct_sets: Vec>, + distinct_sets: Vec>, + random_state: RandomState, data_type: DataType, + batch_hashes: Vec, } impl DistinctCountGroupsAccumulator { @@ -903,6 +794,8 @@ impl DistinctCountGroupsAccumulator { Self { distinct_sets: vec![], data_type, + random_state: RandomState::with_seeds(1, 2, 3, 4), + batch_hashes: vec![], } } @@ -926,6 +819,10 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { self.ensure_sets(total_num_groups); let array = &values[0]; + self.batch_hashes.clear(); + self.batch_hashes.resize(array.len(), 0); + let hashes = + create_hashes(&[array.clone()], &self.random_state, &mut self.batch_hashes)?; // Use a pattern similar to accumulate_indices to process rows // that are not null and pass the filter @@ -935,9 +832,10 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { (None, None) => { // No nulls, no filter - process all rows for (row_idx, &group_idx) in group_indices.iter().enumerate() { - if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { - self.distinct_sets[group_idx].insert(scalar); - } + // if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { + // self.distinct_sets[group_idx].insert(scalar); + // } + self.distinct_sets[group_idx].insert(hashes[row_idx]); } } (Some(nulls), None) => { @@ -946,9 +844,10 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { group_indices.iter().zip(nulls.iter()).enumerate() { if is_valid { - if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { - self.distinct_sets[group_idx].insert(scalar); - } + // if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { + // self.distinct_sets[group_idx].insert(scalar); + // } + self.distinct_sets[group_idx].insert(hashes[row_idx]); } } } @@ -958,9 +857,10 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { group_indices.iter().zip(filter.iter()).enumerate() { if let Some(true) = filter_value { - if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { - self.distinct_sets[group_idx].insert(scalar); - } + // if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { + // self.distinct_sets[group_idx].insert(scalar); + // } + self.distinct_sets[group_idx].insert(hashes[row_idx]); } } } @@ -974,9 +874,10 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { if is_valid && filter_value == Some(true) { - if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { - self.distinct_sets[group_idx].insert(scalar); - } + // if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { + // self.distinct_sets[group_idx].insert(scalar); + // } + self.distinct_sets[group_idx].insert(hashes[row_idx]); } } } @@ -986,7 +887,7 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let distinct_sets: Vec> = + let distinct_sets: Vec> = emit_to.take_needed(&mut self.distinct_sets); let counts = distinct_sets @@ -1016,11 +917,13 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { for (i, &group_idx) in group_indices.iter().enumerate() { if i < list_array.len() { let inner_array = list_array.value(i); + let inner_array = + inner_array.as_any().downcast_ref::().unwrap(); // Add each value to our set for this group for j in 0..inner_array.len() { if !inner_array.is_null(j) { - let scalar = ScalarValue::try_from_array(&inner_array, j)?; - self.distinct_sets[group_idx].insert(scalar); + // let scalar = ScalarValue::try_from_array(&inner_array, j)?; + self.distinct_sets[group_idx].insert(inner_array.value(j)); } } } @@ -1030,7 +933,7 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { } fn state(&mut self, emit_to: EmitTo) -> Result> { - let distinct_sets: Vec> = + let distinct_sets: Vec> = emit_to.take_needed(&mut self.distinct_sets); let mut offsets = Vec::with_capacity(distinct_sets.len() + 1); @@ -1048,14 +951,17 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { }) .peekable(); let data_array: ArrayRef = if value_iter.peek().is_none() { - arrow::array::new_empty_array(&self.data_type) as _ + // arrow::array::new_empty_array(&self.data_type) as _ + arrow::array::new_empty_array(&DataType::UInt64) as _ } else { - Arc::new(ScalarValue::iter_to_array(value_iter)?) as _ + // Arc::new(ScalarValue::iter_to_array(value_iter)?) as _ + Arc::new(UInt64Array::from_iter_values(value_iter)) }; let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); let list_array = ListArray::new( - Arc::new(Field::new_list_field(self.data_type.clone(), true)), + // Arc::new(Field::new_list_field(self.data_type.clone(), true)), + Arc::new(Field::new_list_field(DataType::UInt64, true)), offset_buffer, data_array, None, @@ -1077,7 +983,8 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1)); let nulls = filtered_null_mask(opt_filter, &values); let list_array = ListArray::new( - Arc::new(Field::new_list_field(values.data_type().clone(), true)), + // Arc::new(Field::new_list_field(values.data_type().clone(), true)), + Arc::new(Field::new_list_field(DataType::UInt64, true)), offsets, values, nulls, @@ -1095,1158 +1002,36 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { let mut size = size_of::(); // Size of the vector holding the HashSets - size += size_of::>>() - + self.distinct_sets.capacity() - * size_of::>(); + size += size_of::>>() + + self.distinct_sets.capacity() * size_of::>(); // Estimate HashSet contents size more efficiently // Instead of iterating through all values which is expensive, use an approximation for set in &self.distinct_sets { // Base size of the HashSet - size += set.capacity() * size_of::<(ScalarValue, ())>(); + size += set.capacity() * size_of::<(u64, ())>(); // Estimate ScalarValue size using sample-based approach // Only look at up to 10 items as a sample - let sample_size = 10.min(set.len()); - if sample_size > 0 { - let avg_size = set - .iter() - .take(sample_size) - .map(|v| v.size()) - .sum::() - / sample_size; - - // Extrapolate to the full set - size += avg_size * (set.len() - sample_size); - } + // let sample_size = 10.min(set.len()); + // if sample_size > 0 { + // let avg_size = set + // .iter() + // .take(sample_size) + // .map(|v| v.size()) + // .sum::() + // / sample_size; + + // Extrapolate to the full set + // size += avg_size * (set.len() - sample_size); + size += size_of::() * set.len(); + // } } size } } -/// A specialized GroupsAccumulator for count distinct operations with primitive types -/// This is more efficient than the general DistinctCountGroupsAccumulator for primitive types -#[derive(Debug)] -pub struct PrimitiveDistinctCountGroupsAccumulator -where - T: ArrowPrimitiveType + Send, - T::Native: Eq + Hash, -{ - /// One HashSet per group to track distinct values - distinct_sets: Vec>, - data_type: DataType, -} - -impl PrimitiveDistinctCountGroupsAccumulator -where - T: ArrowPrimitiveType + Send, - T::Native: Eq + Hash, -{ - pub fn new(data_type: DataType) -> Self { - Self { - distinct_sets: vec![], - data_type, - } - } - - fn ensure_sets(&mut self, total_num_groups: usize) { - if self.distinct_sets.len() < total_num_groups { - self.distinct_sets - .resize_with(total_num_groups, HashSet::default); - } - } -} - -impl GroupsAccumulator for PrimitiveDistinctCountGroupsAccumulator -where - T: ArrowPrimitiveType + Send + Debug, - T::Native: Eq + Hash, -{ - fn update_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - opt_filter: Option<&BooleanArray>, - total_num_groups: usize, - ) -> Result<()> { - assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); - self.ensure_sets(total_num_groups); - - let array = as_primitive_array::(&values[0])?; - let data = array.values(); - - // Implement a manual iteration rather than using accumulate_indices with a closure - // that needs row_index - match (array.logical_nulls(), opt_filter) { - (None, None) => { - // No nulls, no filter - process all rows - for (row_idx, &group_idx) in group_indices.iter().enumerate() { - self.distinct_sets[group_idx].insert(data[row_idx]); - } - } - (Some(nulls), None) => { - // Has nulls, no filter - for (row_idx, (&group_idx, is_valid)) in - group_indices.iter().zip(nulls.iter()).enumerate() - { - if is_valid { - self.distinct_sets[group_idx].insert(data[row_idx]); - } - } - } - (None, Some(filter)) => { - // No nulls, has filter - for (row_idx, (&group_idx, filter_value)) in - group_indices.iter().zip(filter.iter()).enumerate() - { - if let Some(true) = filter_value { - self.distinct_sets[group_idx].insert(data[row_idx]); - } - } - } - (Some(nulls), Some(filter)) => { - // Has nulls and filter - let iter = filter - .iter() - .zip(group_indices.iter()) - .zip(nulls.iter()) - .enumerate(); - - for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { - if is_valid && filter_value == Some(true) { - self.distinct_sets[group_idx].insert(data[row_idx]); - } - } - } - } - - Ok(()) - } - - fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); - - let counts = distinct_sets - .iter() - .map(|set| set.len() as i64) - .collect::>(); - - Ok(Arc::new(Int64Array::from(counts))) - } - - fn merge_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - _opt_filter: Option<&BooleanArray>, - total_num_groups: usize, - ) -> Result<()> { - assert_eq!( - values.len(), - 1, - "COUNT DISTINCT merge expects a single state array" - ); - self.ensure_sets(total_num_groups); - - let list_array = as_list_array(&values[0])?; - - // For each group in the incoming batch - for (i, &group_idx) in group_indices.iter().enumerate() { - if i < list_array.len() { - let inner_array = list_array.value(i); - if !inner_array.is_empty() { - // Get the primitive array from the list and extend our set with its values - let primitive_array = as_primitive_array::(&inner_array)?; - self.distinct_sets[group_idx].extend(primitive_array.values()); - } - } - } - - Ok(()) - } - - fn state(&mut self, emit_to: EmitTo) -> Result> { - let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); - - let mut offsets = Vec::with_capacity(distinct_sets.len() + 1); - offsets.push(0); - let mut values = Vec::new(); - - // Create the values array by flattening all sets - for set in distinct_sets { - let start_len = values.len(); - values.extend(set.into_iter()); - offsets.push(values.len() as i32); - } - - // Create the primitive array from the flattened values - let values_array = Arc::new( - PrimitiveArray::::from_iter_values(values.into_iter()) - .with_data_type(self.data_type.clone()), - ) as ArrayRef; - - // Create list array with the offsets - let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); - let list_array = ListArray::new( - Arc::new(Field::new_list_field(self.data_type.clone(), true)), - offset_buffer, - values_array, - None, - ); - - Ok(vec![Arc::new(list_array) as _]) - } - - fn convert_to_state( - &self, - values: &[ArrayRef], - opt_filter: Option<&BooleanArray>, - ) -> Result> { - // For a single distinct value per row, create a list array with that value - assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); - let values = ArrayRef::clone(&values[0]); - - let offsets = - OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1)); - let nulls = filtered_null_mask(opt_filter, &values); - let list_array = ListArray::new( - Arc::new(Field::new_list_field(values.data_type().clone(), true)), - offsets, - values, - nulls, - ); - - Ok(vec![Arc::new(list_array)]) - } - - fn supports_convert_to_state(&self) -> bool { - true - } - - fn size(&self) -> usize { - let mut total_size = std::mem::size_of::(); - - // Size of vector container - total_size += std::mem::size_of::>>(); - - // Size of actual sets and their contents - for set in &self.distinct_sets { - let set_size = std::mem::size_of::>() - + set.capacity() * std::mem::size_of::(); - total_size += set_size; - } - - total_size - } -} - -#[derive(Debug)] -pub struct FloatDistinctCountGroupsAccumulator -where - T: ArrowPrimitiveType + Send, -{ - /// One HashSet per group to track distinct values - distinct_sets: Vec, RandomState>>, - data_type: DataType, -} - -impl FloatDistinctCountGroupsAccumulator -where - T: ArrowPrimitiveType + Send, -{ - pub fn new(data_type: DataType) -> Self { - Self { - distinct_sets: vec![], - data_type, - } - } - - fn ensure_sets(&mut self, total_num_groups: usize) { - if self.distinct_sets.len() < total_num_groups { - self.distinct_sets - .resize_with(total_num_groups, HashSet::default); - } - } -} - -impl GroupsAccumulator for FloatDistinctCountGroupsAccumulator -where - T: ArrowPrimitiveType + Send + Debug, -{ - fn update_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - opt_filter: Option<&BooleanArray>, - total_num_groups: usize, - ) -> Result<()> { - assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); - self.ensure_sets(total_num_groups); - - let array = as_primitive_array::(&values[0])?; - let data = array.values(); - - // Implement a manual iteration rather than using accumulate_indices with a closure - // that needs row_index - match (array.logical_nulls(), opt_filter) { - (None, None) => { - // No nulls, no filter - process all rows - for (row_idx, &group_idx) in group_indices.iter().enumerate() { - self.distinct_sets[group_idx].insert(Hashable(data[row_idx])); - } - } - (Some(nulls), None) => { - // Has nulls, no filter - for (row_idx, (&group_idx, is_valid)) in - group_indices.iter().zip(nulls.iter()).enumerate() - { - if is_valid { - self.distinct_sets[group_idx].insert(Hashable(data[row_idx])); - } - } - } - (None, Some(filter)) => { - // No nulls, has filter - for (row_idx, (&group_idx, filter_value)) in - group_indices.iter().zip(filter.iter()).enumerate() - { - if let Some(true) = filter_value { - self.distinct_sets[group_idx].insert(Hashable(data[row_idx])); - } - } - } - (Some(nulls), Some(filter)) => { - // Has nulls and filter - let iter = filter - .iter() - .zip(group_indices.iter()) - .zip(nulls.iter()) - .enumerate(); - - for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { - if is_valid && filter_value == Some(true) { - self.distinct_sets[group_idx].insert(Hashable(data[row_idx])); - } - } - } - } - - Ok(()) - } - - fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); - - let counts = distinct_sets - .iter() - .map(|set| set.len() as i64) - .collect::>(); - - Ok(Arc::new(Int64Array::from(counts))) - } - - fn merge_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - _opt_filter: Option<&BooleanArray>, - total_num_groups: usize, - ) -> Result<()> { - assert_eq!( - values.len(), - 1, - "COUNT DISTINCT merge expects a single state array" - ); - self.ensure_sets(total_num_groups); - - let list_array = as_list_array(&values[0])?; - - // For each group in the incoming batch - for (i, &group_idx) in group_indices.iter().enumerate() { - if i < list_array.len() { - let inner_array = list_array.value(i); - if !inner_array.is_empty() { - // Get the primitive array from the list and extend our set with its values - let primitive_array = as_primitive_array::(&inner_array)?; - self.distinct_sets[group_idx] - .extend(primitive_array.values().iter().map(|v| Hashable(*v))); - } - } - } - - Ok(()) - } - - fn state(&mut self, emit_to: EmitTo) -> Result> { - let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); - - let mut offsets = Vec::with_capacity(distinct_sets.len() + 1); - offsets.push(0); - let mut values = Vec::new(); - - // Create the values array by flattening all sets - for set in distinct_sets { - let start_len = values.len(); - values.extend(set.into_iter().map(|v| v.0)); - offsets.push(values.len() as i32); - } - - // Create the primitive array from the flattened values - let values_array = Arc::new( - PrimitiveArray::::from_iter_values(values.into_iter()) - .with_data_type(self.data_type.clone()), - ) as ArrayRef; - - // Create list array with the offsets - let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); - let list_array = ListArray::new( - Arc::new(Field::new_list_field(self.data_type.clone(), true)), - offset_buffer, - values_array, - None, - ); - - Ok(vec![Arc::new(list_array) as _]) - } - - fn convert_to_state( - &self, - values: &[ArrayRef], - opt_filter: Option<&BooleanArray>, - ) -> Result> { - // For a single distinct value per row, create a list array with that value - assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); - let values = ArrayRef::clone(&values[0]); - - let offsets = - OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1)); - let nulls = filtered_null_mask(opt_filter, &values); - let list_array = ListArray::new( - Arc::new(Field::new_list_field(values.data_type().clone(), true)), - offsets, - values, - nulls, - ); - - Ok(vec![Arc::new(list_array)]) - } - - fn supports_convert_to_state(&self) -> bool { - true - } - - fn size(&self) -> usize { - let mut total_size = std::mem::size_of::(); - - // Size of vector container - total_size += - std::mem::size_of::, RandomState>>>(); - - // Size of actual sets and their contents - for set in &self.distinct_sets { - let set_size = - std::mem::size_of::, RandomState>>() - + set.capacity() * std::mem::size_of::>(); - total_size += set_size; - } - - total_size - } -} - -/// A specialized GroupsAccumulator for count distinct operations with string/binary view types -#[derive(Debug)] -pub struct BytesViewDistinctCountGroupsAccumulator { - /// One HashSet per group to track distinct values - distinct_sets: Vec, RandomState>>, - output_type: OutputType, -} - -impl BytesViewDistinctCountGroupsAccumulator { - pub fn new(output_type: OutputType) -> Self { - Self { - distinct_sets: vec![], - output_type, - } - } - - fn ensure_sets(&mut self, total_num_groups: usize) { - if self.distinct_sets.len() < total_num_groups { - self.distinct_sets - .resize_with(total_num_groups, HashSet::default); - } - } -} - -impl GroupsAccumulator for BytesViewDistinctCountGroupsAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - opt_filter: Option<&BooleanArray>, - total_num_groups: usize, - ) -> Result<()> { - assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); - self.ensure_sets(total_num_groups); - - // Handle binary view or string view arrays - if let Some(array) = values[0].as_any().downcast_ref::() { - // Implement a manual iteration rather than using accumulate_indices - match (array.logical_nulls(), opt_filter) { - (None, None) => { - // No nulls, no filter - process all rows - for (row_idx, &group_idx) in group_indices.iter().enumerate() { - let value = array.value(row_idx); - self.distinct_sets[group_idx].insert(value.to_vec()); - } - } - (Some(nulls), None) => { - // Has nulls, no filter - for (row_idx, (&group_idx, is_valid)) in - group_indices.iter().zip(nulls.iter()).enumerate() - { - if is_valid { - let value = array.value(row_idx); - self.distinct_sets[group_idx].insert(value.to_vec()); - } - } - } - (None, Some(filter)) => { - // No nulls, has filter - for (row_idx, (&group_idx, filter_value)) in - group_indices.iter().zip(filter.iter()).enumerate() - { - if let Some(true) = filter_value { - let value = array.value(row_idx); - self.distinct_sets[group_idx].insert(value.to_vec()); - } - } - } - (Some(nulls), Some(filter)) => { - // Has nulls and filter - let iter = filter - .iter() - .zip(group_indices.iter()) - .zip(nulls.iter()) - .enumerate(); - - for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { - if is_valid && filter_value == Some(true) { - let value = array.value(row_idx); - self.distinct_sets[group_idx].insert(value.to_vec()); - } - } - } - } - } else if let Some(array) = values[0].as_any().downcast_ref::() { - // Implement a manual iteration rather than using accumulate_indices - match (array.logical_nulls(), opt_filter) { - (None, None) => { - // No nulls, no filter - process all rows - for (row_idx, &group_idx) in group_indices.iter().enumerate() { - let value = array.value(row_idx).as_bytes(); - self.distinct_sets[group_idx].insert(value.to_vec()); - } - } - (Some(nulls), None) => { - // Has nulls, no filter - for (row_idx, (&group_idx, is_valid)) in - group_indices.iter().zip(nulls.iter()).enumerate() - { - if is_valid { - let value = array.value(row_idx).as_bytes(); - self.distinct_sets[group_idx].insert(value.to_vec()); - } - } - } - (None, Some(filter)) => { - // No nulls, has filter - for (row_idx, (&group_idx, filter_value)) in - group_indices.iter().zip(filter.iter()).enumerate() - { - if let Some(true) = filter_value { - let value = array.value(row_idx).as_bytes(); - self.distinct_sets[group_idx].insert(value.to_vec()); - } - } - } - (Some(nulls), Some(filter)) => { - // Has nulls and filter - let iter = filter - .iter() - .zip(group_indices.iter()) - .zip(nulls.iter()) - .enumerate(); - - for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { - if is_valid && filter_value == Some(true) { - let value = array.value(row_idx).as_bytes(); - self.distinct_sets[group_idx].insert(value.to_vec()); - } - } - } - } - } else { - return Err(datafusion_common::DataFusionError::Internal(format!( - "Unsupported array type for BytesViewDistinctCountGroupsAccumulator: {:?}", - values[0].data_type() - ))); - } - - Ok(()) - } - - fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); - - let counts = distinct_sets - .iter() - .map(|set| set.len() as i64) - .collect::>(); - - Ok(Arc::new(Int64Array::from(counts))) - } - - fn merge_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - _opt_filter: Option<&BooleanArray>, - total_num_groups: usize, - ) -> Result<()> { - assert_eq!( - values.len(), - 1, - "COUNT DISTINCT merge expects a single state array" - ); - self.ensure_sets(total_num_groups); - - let list_array = as_list_array(&values[0])?; - - // For each group in the incoming batch - for (i, &group_idx) in group_indices.iter().enumerate() { - if i < list_array.len() { - let inner_array = list_array.value(i); - if !inner_array.is_empty() { - // Handle binary view or string view arrays - if let Some(array) = - inner_array.as_any().downcast_ref::() - { - for j in 0..array.len() { - if !array.is_null(j) { - let value = array.value(j); - self.distinct_sets[group_idx].insert(value.to_vec()); - } - } - } else if let Some(array) = - inner_array.as_any().downcast_ref::() - { - for j in 0..array.len() { - if !array.is_null(j) { - let value = array.value(j).as_bytes(); - self.distinct_sets[group_idx].insert(value.to_vec()); - } - } - } else { - return Err(datafusion_common::DataFusionError::Internal(format!( - "Unsupported inner array type for BytesViewDistinctCountGroupsAccumulator: {:?}", - inner_array.data_type() - ))); - } - } - } - } - - Ok(()) - } - - fn state(&mut self, emit_to: EmitTo) -> Result> { - let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); - - let mut offsets = Vec::with_capacity(distinct_sets.len() + 1); - offsets.push(0); - - // Create arrays for views - let inner_array: ArrayRef = - match self.output_type { - OutputType::Utf8View => { - let mut string_values = Vec::new(); - - // Collect all string values from all sets - for set in &distinct_sets { - for v in set { - // Safety: we know the value is valid UTF-8 since it came from a StringViewArray - let s = unsafe { std::str::from_utf8_unchecked(v) }; - string_values.push(s.to_string()); - } - } - - // Use from_iter_values which works correctly - let array = StringViewArray::from_iter_values( - string_values.iter().map(|s| s.as_str()), - ); - Arc::new(array) - } - OutputType::BinaryView => { - let mut bytes_values = Vec::new(); - - // Collect all byte values from all sets - for set in &distinct_sets { - for v in set { - bytes_values.push(v.clone()); - } - } - - // Use from_iter_values which works correctly - let array = BinaryViewArray::from_iter_values( - bytes_values.iter().map(|v| v.as_slice()), - ); - Arc::new(array) - } - _ => return Err(datafusion_common::DataFusionError::Internal( - "Unsupported output type for BytesViewDistinctCountGroupsAccumulator" - .to_string(), - )), - }; - - // Count elements in each set for offsets - for set in distinct_sets { - offsets.push(offsets.last().unwrap() + set.len() as i32); - } - - // Create list array with the offsets - let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); - let list_array = ListArray::new( - Arc::new(Field::new_list_field(inner_array.data_type().clone(), true)), - offset_buffer, - inner_array, - None, - ); - - Ok(vec![Arc::new(list_array) as _]) - } - - fn convert_to_state( - &self, - values: &[ArrayRef], - opt_filter: Option<&BooleanArray>, - ) -> Result> { - // For a single distinct value per row, create a list array with that value - assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); - let values = ArrayRef::clone(&values[0]); - - let offsets = - OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1)); - let nulls = filtered_null_mask(opt_filter, &values); - let list_array = ListArray::new( - Arc::new(Field::new_list_field(values.data_type().clone(), true)), - offsets, - values, - nulls, - ); - - Ok(vec![Arc::new(list_array)]) - } - - fn supports_convert_to_state(&self) -> bool { - true - } - - fn size(&self) -> usize { - let mut total_size = std::mem::size_of::(); - - // Size of vector container - total_size += std::mem::size_of::, RandomState>>>(); - - // Size of actual sets and their contents (similar to BytesDistinctCountGroupsAccumulator) - for set in &self.distinct_sets { - let container_size = std::mem::size_of::, RandomState>>() - + set.capacity() * std::mem::size_of::>(); - - // Add sizes of actual byte vectors - let content_size = set.iter().map(|v| v.capacity()).sum::(); - - total_size += container_size + content_size; - } - - total_size - } -} - -// /// A specialized GroupsAccumulator for count distinct operations with string/binary types (for Utf8 & Binary arrays) -// #[derive(Debug)] -// pub struct BytesDistinctCountGroupsAccumulator { -// /// One HashSet per group to track distinct values -// distinct_sets: Vec, RandomState>>, -// output_type: OutputType, -// _phantom: PhantomData, -// } - -// impl BytesDistinctCountGroupsAccumulator { -// pub fn new(output_type: OutputType) -> Self { -// Self { -// distinct_sets: vec![], -// output_type, -// _phantom: PhantomData, -// } -// } - -// fn ensure_sets(&mut self, total_num_groups: usize) { -// if self.distinct_sets.len() < total_num_groups { -// self.distinct_sets -// .resize_with(total_num_groups, HashSet::default); -// } -// } -// } - -// impl GroupsAccumulator for BytesDistinctCountGroupsAccumulator -// where -// O: 'static + OffsetSizeTrait + Debug, -// { -// fn update_batch( -// &mut self, -// values: &[ArrayRef], -// group_indices: &[usize], -// opt_filter: Option<&BooleanArray>, -// total_num_groups: usize, -// ) -> Result<()> { -// assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); -// self.ensure_sets(total_num_groups); - -// if let Some(array) = values[0] -// .as_any() -// .downcast_ref::>>() -// { -// // String array case -// match (array.logical_nulls(), opt_filter) { -// (None, None) => { -// for (row_idx, &group_idx) in group_indices.iter().enumerate() { -// if row_idx < array.len() { -// let value = array.value(row_idx); -// self.distinct_sets[group_idx].insert(value.to_vec()); -// } -// } -// } -// (Some(nulls), None) => { -// for (row_idx, (&group_idx, is_valid)) in -// group_indices.iter().zip(nulls.iter()).enumerate() -// { -// if is_valid && row_idx < array.len() { -// let value = array.value(row_idx); -// self.distinct_sets[group_idx].insert(value.to_vec()); -// } -// } -// } -// (None, Some(filter)) => { -// for (row_idx, (&group_idx, filter_value)) in -// group_indices.iter().zip(filter.iter()).enumerate() -// { -// if let Some(true) = filter_value { -// if row_idx < array.len() { -// let value = array.value(row_idx); -// self.distinct_sets[group_idx].insert(value.to_vec()); -// } -// } -// } -// } -// (Some(nulls), Some(filter)) => { -// let iter = filter -// .iter() -// .zip(group_indices.iter()) -// .zip(nulls.iter()) -// .enumerate(); - -// for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { -// if is_valid && filter_value == Some(true) && row_idx < array.len() -// { -// let value = array.value(row_idx); -// self.distinct_sets[group_idx].insert(value.to_vec()); -// } -// } -// } -// } -// } else if let Some(array) = values[0] -// .as_any() -// .downcast_ref::>>() -// { -// // Binary array case -// match (array.logical_nulls(), opt_filter) { -// (None, None) => { -// for (row_idx, &group_idx) in group_indices.iter().enumerate() { -// if row_idx < array.len() { -// let value = array.value(row_idx); -// self.distinct_sets[group_idx].insert(value.to_vec()); -// } -// } -// } -// (Some(nulls), None) => { -// for (row_idx, (&group_idx, is_valid)) in -// group_indices.iter().zip(nulls.iter()).enumerate() -// { -// if is_valid && row_idx < array.len() { -// let value = array.value(row_idx); -// self.distinct_sets[group_idx].insert(value.to_vec()); -// } -// } -// } -// (None, Some(filter)) => { -// for (row_idx, (&group_idx, filter_value)) in -// group_indices.iter().zip(filter.iter()).enumerate() -// { -// if let Some(true) = filter_value { -// if row_idx < array.len() { -// let value = array.value(row_idx); -// self.distinct_sets[group_idx].insert(value.to_vec()); -// } -// } -// } -// } -// (Some(nulls), Some(filter)) => { -// let iter = filter -// .iter() -// .zip(group_indices.iter()) -// .zip(nulls.iter()) -// .enumerate(); - -// for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { -// if is_valid && filter_value == Some(true) && row_idx < array.len() -// { -// let value = array.value(row_idx); -// self.distinct_sets[group_idx].insert(value.to_vec()); -// } -// } -// } -// } -// } else { -// return Err(datafusion_common::DataFusionError::Internal(format!( -// "Cannot process array of type {:?} with BytesDistinctCountGroupsAccumulator", -// values[0].data_type() -// ))); -// } - -// Ok(()) -// } - -// fn evaluate(&mut self, emit_to: EmitTo) -> Result { -// let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); - -// let counts = distinct_sets -// .iter() -// .map(|set| set.len() as i64) -// .collect::>(); - -// Ok(Arc::new(Int64Array::from(counts))) -// } - -// fn merge_batch( -// &mut self, -// values: &[ArrayRef], -// group_indices: &[usize], -// _opt_filter: Option<&BooleanArray>, -// total_num_groups: usize, -// ) -> Result<()> { -// assert_eq!( -// values.len(), -// 1, -// "COUNT DISTINCT merge expects a single state array" -// ); -// self.ensure_sets(total_num_groups); - -// let list_array = as_list_array(&values[0])?; - -// // For each group in the incoming batch -// for (i, &group_idx) in group_indices.iter().enumerate() { -// if i < list_array.len() { -// let inner_array = list_array.value(i); -// if !inner_array.is_empty() { -// // Try both String and Binary types -// if let Some(bytes_array) = inner_array -// .as_any() -// .downcast_ref::>>() -// { -// for j in 0..bytes_array.len() { -// if !bytes_array.is_null(j) { -// let value = bytes_array.value(j); -// self.distinct_sets[group_idx].insert(value.to_vec()); -// } -// } -// } else if let Some(bytes_array) = inner_array -// .as_any() -// .downcast_ref::>>() -// { -// for j in 0..bytes_array.len() { -// if !bytes_array.is_null(j) { -// let value = bytes_array.value(j); -// self.distinct_sets[group_idx].insert(value.to_vec()); -// } -// } -// } -// } -// } -// } - -// Ok(()) -// } - -// fn state(&mut self, emit_to: EmitTo) -> Result> { -// let distinct_sets = emit_to.take_needed(&mut self.distinct_sets); - -// let mut offsets = Vec::with_capacity(distinct_sets.len() + 1); -// offsets.push(0); - -// // First pass to calculate total byte length and build offsets -// let mut total_bytes_len = 0; -// let mut value_offsets = Vec::new(); -// value_offsets.push(O::default()); - -// let mut total_values = 0; -// for set in &distinct_sets { -// let set_len = set.len(); -// total_values += set_len; - -// for value in set { -// total_bytes_len += value.len(); -// let curr_offset = *value_offsets.last().unwrap(); -// let value_len = O::from_usize(value.len()).ok_or_else(|| { -// datafusion_common::DataFusionError::Internal( -// "Failed to convert offset".to_string(), -// ) -// })?; - -// let mut next_offset = curr_offset; -// next_offset += value_len; -// value_offsets.push(next_offset); -// } - -// offsets.push(offsets.last().unwrap() + set_len as i32); -// } - -// // Create buffer for all bytes concatenated -// let mut bytes = Vec::with_capacity(total_bytes_len); -// for set in distinct_sets { -// for value in set { -// bytes.extend_from_slice(&value); -// } -// } - -// // Create appropriate array based on output type -// let inner_array: ArrayRef = match self.output_type { -// OutputType::Utf8 => { -// // Create a StringArray -// let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets)); -// let values_buffer = Buffer::from_vec(bytes); -// if O::IS_LARGE { -// // If O is i64, use LargeStringArray -// let large_string_array = -// arrow::array::GenericByteArray::>::new( -// offset_buffer.try_into().unwrap(), -// values_buffer, -// None, -// ); -// Arc::new(large_string_array) -// } else { -// // If O is i32, use StringArray -// let string_array = -// arrow::array::GenericByteArray::>::new( -// offset_buffer.try_into().unwrap(), -// values_buffer, -// None, -// ); -// Arc::new(string_array) -// } -// } -// OutputType::Binary => { -// // Create a BinaryArray -// let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets)); -// let values_buffer = Buffer::from_vec(bytes); -// if O::IS_LARGE { -// // If O is i64, use LargeBinaryArray -// let large_binary_array = -// arrow::array::GenericByteArray::>::new( -// offset_buffer.try_into().unwrap(), -// values_buffer, -// None, -// ); -// Arc::new(large_binary_array) -// } else { -// // If O is i32, use BinaryArray -// let binary_array = -// arrow::array::GenericByteArray::>::new( -// offset_buffer.try_into().unwrap(), -// values_buffer, -// None, -// ); -// Arc::new(binary_array) -// } -// } -// _ => { -// return Err(datafusion_common::DataFusionError::Internal( -// "Unsupported output type for BytesDistinctCountGroupsAccumulator" -// .to_string(), -// )) -// } -// }; - -// // Create list array with the offsets -// let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); -// let list_array = ListArray::new( -// Arc::new(Field::new_list_field(inner_array.data_type().clone(), true)), -// offset_buffer, -// inner_array, -// None, -// ); - -// Ok(vec![Arc::new(list_array) as _]) -// } - -// fn convert_to_state( -// &self, -// values: &[ArrayRef], -// opt_filter: Option<&BooleanArray>, -// ) -> Result> { -// // For a single distinct value per row, create a list array with that value -// assert_eq!(values.len(), 1, "COUNT DISTINCT expects a single argument"); -// let values = ArrayRef::clone(&values[0]); - -// let offsets = -// OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1)); -// let nulls = filtered_null_mask(opt_filter, &values); -// let list_array = ListArray::new( -// Arc::new(Field::new_list_field(values.data_type().clone(), true)), -// offsets, -// values, -// nulls, -// ); - -// Ok(vec![Arc::new(list_array)]) -// } - -// fn supports_convert_to_state(&self) -> bool { -// true -// } - -// fn size(&self) -> usize { -// let mut total_size = std::mem::size_of::(); - -// // Size of vector container -// total_size += std::mem::size_of::, RandomState>>>(); - -// // Size of actual sets and their contents -// for set in &self.distinct_sets { -// let container_size = std::mem::size_of::, RandomState>>() -// + set.capacity() * std::mem::size_of::>(); - -// // Add sizes of actual byte vectors -// let content_size = set.iter().map(|v| v.capacity()).sum::(); - -// total_size += container_size + content_size; -// } - -// total_size -// } -// } - #[cfg(test)] mod tests { use super::*; From ca805fbc4c41f734adb83aba294facc7d8a7bf6f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 25 Mar 2025 12:32:33 +0800 Subject: [PATCH 7/8] also for normal accumulator Signed-off-by: Ruihang Xia --- datafusion/functions-aggregate/src/count.rs | 173 +++++--------------- 1 file changed, 38 insertions(+), 135 deletions(-) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index e7ca46e1d7f85..df35fdbc3cd12 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -23,6 +23,7 @@ use arrow::array::{ use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::stats::Precision; +use datafusion_common::utils::SingleRowListArrayBuilder; use datafusion_expr::expr::WindowFunction; use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask; @@ -236,8 +237,6 @@ impl AggregateUDFImpl for Count { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - panic!("should not create normal accumulator"); - if !acc_args.is_distinct { return Ok(Box::new(CountAccumulator::new())); } @@ -246,116 +245,11 @@ impl AggregateUDFImpl for Count { return not_impl_err!("COUNT DISTINCT with multiple arguments"); } - let data_type = &acc_args.exprs[0].data_type(acc_args.schema)?; - Ok(match data_type { - // try and use a specialized accumulator if possible, otherwise fall back to generic accumulator - DataType::Int8 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::Int16 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::Int32 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::Int64 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::UInt8 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::UInt16 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::UInt32 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::UInt64 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::Decimal128(_, _) => Box::new(PrimitiveDistinctCountAccumulator::< - Decimal128Type, - >::new(data_type)), - DataType::Decimal256(_, _) => Box::new(PrimitiveDistinctCountAccumulator::< - Decimal256Type, - >::new(data_type)), - - DataType::Date32 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::Date64 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::Time32(TimeUnit::Millisecond) => Box::new( - PrimitiveDistinctCountAccumulator::::new( - data_type, - ), - ), - DataType::Time32(TimeUnit::Second) => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::Time64(TimeUnit::Microsecond) => Box::new( - PrimitiveDistinctCountAccumulator::::new( - data_type, - ), - ), - DataType::Time64(TimeUnit::Nanosecond) => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - DataType::Timestamp(TimeUnit::Microsecond, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new( - data_type, - ), - ), - DataType::Timestamp(TimeUnit::Millisecond, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new( - data_type, - ), - ), - DataType::Timestamp(TimeUnit::Nanosecond, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new( - data_type, - ), - ), - DataType::Timestamp(TimeUnit::Second, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), - - DataType::Float16 => { - Box::new(FloatDistinctCountAccumulator::::new()) - } - DataType::Float32 => { - Box::new(FloatDistinctCountAccumulator::::new()) - } - DataType::Float64 => { - Box::new(FloatDistinctCountAccumulator::::new()) - } - - DataType::Utf8 => { - Box::new(BytesDistinctCountAccumulator::::new(OutputType::Utf8)) - } - DataType::Utf8View => { - Box::new(BytesViewDistinctCountAccumulator::new(OutputType::Utf8View)) - } - DataType::LargeUtf8 => { - Box::new(BytesDistinctCountAccumulator::::new(OutputType::Utf8)) - } - DataType::Binary => Box::new(BytesDistinctCountAccumulator::::new( - OutputType::Binary, - )), - DataType::BinaryView => Box::new(BytesViewDistinctCountAccumulator::new( - OutputType::BinaryView, - )), - DataType::LargeBinary => Box::new(BytesDistinctCountAccumulator::::new( - OutputType::Binary, - )), - - // Use the generic accumulator based on `ScalarValue` for all other types - _ => Box::new(DistinctCountAccumulator { - values: HashSet::default(), - state_data_type: data_type.clone(), - }), - }) + Ok(Box::new(DistinctCountAccumulator { + values: HashSet::default(), + random_state: RandomState::with_seeds(1, 2, 3, 4), + batch_hashes: vec![], + })) } fn aliases(&self) -> &[String] { @@ -681,8 +575,9 @@ fn null_count_for_multiple_cols(values: &[ArrayRef]) -> usize { /// [`BytesDistinctCountAccumulator`] #[derive(Debug)] struct DistinctCountAccumulator { - values: HashSet, - state_data_type: DataType, + values: HashSet, + random_state: RandomState, + batch_hashes: Vec, } impl DistinctCountAccumulator { @@ -691,12 +586,12 @@ impl DistinctCountAccumulator { // not suitable for variable length values like strings or complex types fn fixed_size(&self) -> usize { size_of_val(self) - + (size_of::() * self.values.capacity()) + + (size_of::() * self.values.capacity()) + self .values .iter() .next() - .map(|vals| ScalarValue::size(vals) - size_of_val(vals)) + .map(|vals| 8 - size_of_val(vals)) .unwrap_or(0) + size_of::() } @@ -705,11 +600,11 @@ impl DistinctCountAccumulator { // method is expensive fn full_size(&self) -> usize { size_of_val(self) - + (size_of::() * self.values.capacity()) + + (size_of::() * self.values.capacity()) + self .values .iter() - .map(|vals| ScalarValue::size(vals) - size_of_val(vals)) + .map(|vals| 8 - size_of_val(vals)) .sum::() + size_of::() } @@ -718,10 +613,10 @@ impl DistinctCountAccumulator { impl Accumulator for DistinctCountAccumulator { /// Returns the distinct values seen so far as (one element) ListArray. fn state(&mut self) -> Result> { - let scalars = self.values.iter().cloned().collect::>(); - let arr = - ScalarValue::new_list_nullable(scalars.as_slice(), &self.state_data_type); - Ok(vec![ScalarValue::List(arr)]) + let values = self.values.iter().cloned().collect::>(); + let arr = Arc::new(UInt64Array::from(values)) as _; + let list_scalar = SingleRowListArrayBuilder::new(arr).build_list_scalar(); + Ok(vec![list_scalar]) } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { @@ -734,13 +629,21 @@ impl Accumulator for DistinctCountAccumulator { return Ok(()); } - (0..arr.len()).try_for_each(|index| { - if !arr.is_null(index) { - let scalar = ScalarValue::try_from_array(arr, index)?; - self.values.insert(scalar); - } - Ok(()) - }) + // (0..arr.len()).try_for_each(|index| { + // if !arr.is_null(index) { + // let scalar = ScalarValue::try_from_array(arr, index)?; + // self.values.insert(scalar); + // } + // Ok(()) + // }) + self.batch_hashes.clear(); + self.batch_hashes.resize(arr.len(), 0); + let hashes = + create_hashes(&[arr.clone()], &self.random_state, &mut self.batch_hashes)?; + for hash in hashes.as_slice() { + self.values.insert(*hash); + } + Ok(()) } /// Merges multiple sets of distinct values into the current set. @@ -761,7 +664,11 @@ impl Accumulator for DistinctCountAccumulator { "Intermediate results of COUNT DISTINCT should always be non null" ); }; - self.update_batch(&[inner_array])?; + // self.update_batch(&[inner_array])?; + let hash_array = inner_array.as_any().downcast_ref::().unwrap(); + for i in 0..hash_array.len() { + self.values.insert(hash_array.value(i)); + } } Ok(()) } @@ -771,11 +678,7 @@ impl Accumulator for DistinctCountAccumulator { } fn size(&self) -> usize { - match &self.state_data_type { - DataType::Boolean | DataType::Null => self.fixed_size(), - d if d.is_primitive() => self.fixed_size(), - _ => self.full_size(), - } + self.fixed_size() } } From 9ecd21fde584117b12912b7966f916e992278e33 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 25 Mar 2025 12:50:23 +0800 Subject: [PATCH 8/8] clean up Signed-off-by: Ruihang Xia --- datafusion/functions-aggregate/src/count.rs | 128 ++++++-------------- 1 file changed, 37 insertions(+), 91 deletions(-) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index df35fdbc3cd12..fb87e713b1c7d 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -16,24 +16,17 @@ // under the License. use ahash::RandomState; -use arrow::array::{ - ArrowPrimitiveType, BinaryArray, BinaryViewArray, GenericByteArray, - GenericByteViewArray, OffsetSizeTrait, StringArray, StringViewArray, UInt64Array, -}; -use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer}; +use arrow::array::UInt64Array; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::stats::Precision; use datafusion_common::utils::SingleRowListArrayBuilder; use datafusion_expr::expr::WindowFunction; -use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask; -use datafusion_functions_aggregate_common::utils::Hashable; use datafusion_macros::user_doc; use datafusion_physical_expr::expressions; use std::collections::HashSet; use std::fmt::Debug; -use std::hash::Hash; -use std::marker::PhantomData; use std::mem::{size_of, size_of_val}; use std::ops::BitAnd; use std::sync::Arc; @@ -41,14 +34,7 @@ use std::sync::Arc; use arrow::{ array::{ArrayRef, AsArray}, compute, - datatypes::{ - DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field, - Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, - Time32MillisecondType, Time32SecondType, Time64MicrosecondType, - Time64NanosecondType, TimeUnit, TimestampMicrosecondType, - TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, - UInt16Type, UInt32Type, UInt64Type, UInt8Type, - }, + datatypes::{DataType, Field, Int64Type}, }; use arrow::{ @@ -66,17 +52,12 @@ use datafusion_expr::{ use datafusion_expr::{ Expr, ReversedUDAF, StatisticsArgs, TypeSignature, WindowFunctionDefinition, }; -use datafusion_functions_aggregate_common::aggregate::count_distinct::{ - BytesDistinctCountAccumulator, FloatDistinctCountAccumulator, - PrimitiveDistinctCountAccumulator, -}; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate_indices; -use datafusion_physical_expr_common::binary_map::OutputType; -use arrow::datatypes::{ByteArrayType, GenericStringType}; -use datafusion_common::cast::{as_list_array, as_primitive_array}; +use datafusion_common::cast::as_list_array; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; -use std::convert::TryFrom; + +type HashValueType = u64; make_udaf_expr_and_func!( Count, @@ -272,10 +253,7 @@ impl AggregateUDFImpl for Count { return not_impl_err!("COUNT DISTINCT with multiple arguments"); } - let data_type = &args.exprs[0].data_type(args.schema)?; - Ok(Box::new(DistinctCountGroupsAccumulator::new( - data_type.clone(), - ))) + Ok(Box::new(DistinctCountGroupsAccumulator::new())) } else { Ok(Box::new(CountGroupsAccumulator::new())) } @@ -575,9 +553,9 @@ fn null_count_for_multiple_cols(values: &[ArrayRef]) -> usize { /// [`BytesDistinctCountAccumulator`] #[derive(Debug)] struct DistinctCountAccumulator { - values: HashSet, + values: HashSet, random_state: RandomState, - batch_hashes: Vec, + batch_hashes: Vec, } impl DistinctCountAccumulator { @@ -586,28 +564,15 @@ impl DistinctCountAccumulator { // not suitable for variable length values like strings or complex types fn fixed_size(&self) -> usize { size_of_val(self) - + (size_of::() * self.values.capacity()) + + (size_of::() * self.values.capacity()) + self .values .iter() .next() - .map(|vals| 8 - size_of_val(vals)) + .map(|vals| size_of::() - size_of_val(vals)) .unwrap_or(0) + size_of::() } - - // calculates the size as accurately as possible. Note that calling this - // method is expensive - fn full_size(&self) -> usize { - size_of_val(self) - + (size_of::() * self.values.capacity()) - + self - .values - .iter() - .map(|vals| 8 - size_of_val(vals)) - .sum::() - + size_of::() - } } impl Accumulator for DistinctCountAccumulator { @@ -638,8 +603,11 @@ impl Accumulator for DistinctCountAccumulator { // }) self.batch_hashes.clear(); self.batch_hashes.resize(arr.len(), 0); - let hashes = - create_hashes(&[arr.clone()], &self.random_state, &mut self.batch_hashes)?; + let hashes = create_hashes( + &[ArrayRef::clone(arr)], + &self.random_state, + &mut self.batch_hashes, + )?; for hash in hashes.as_slice() { self.values.insert(*hash); } @@ -686,17 +654,21 @@ impl Accumulator for DistinctCountAccumulator { #[derive(Debug)] pub struct DistinctCountGroupsAccumulator { /// One HashSet per group to track distinct values - distinct_sets: Vec>, + distinct_sets: Vec>, random_state: RandomState, - data_type: DataType, - batch_hashes: Vec, + batch_hashes: Vec, +} + +impl Default for DistinctCountGroupsAccumulator { + fn default() -> Self { + Self::new() + } } impl DistinctCountGroupsAccumulator { - pub fn new(data_type: DataType) -> Self { + pub fn new() -> Self { Self { distinct_sets: vec![], - data_type, random_state: RandomState::with_seeds(1, 2, 3, 4), batch_hashes: vec![], } @@ -724,8 +696,11 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { let array = &values[0]; self.batch_hashes.clear(); self.batch_hashes.resize(array.len(), 0); - let hashes = - create_hashes(&[array.clone()], &self.random_state, &mut self.batch_hashes)?; + let hashes = create_hashes( + &[ArrayRef::clone(array)], + &self.random_state, + &mut self.batch_hashes, + )?; // Use a pattern similar to accumulate_indices to process rows // that are not null and pass the filter @@ -735,9 +710,6 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { (None, None) => { // No nulls, no filter - process all rows for (row_idx, &group_idx) in group_indices.iter().enumerate() { - // if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { - // self.distinct_sets[group_idx].insert(scalar); - // } self.distinct_sets[group_idx].insert(hashes[row_idx]); } } @@ -747,9 +719,6 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { group_indices.iter().zip(nulls.iter()).enumerate() { if is_valid { - // if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { - // self.distinct_sets[group_idx].insert(scalar); - // } self.distinct_sets[group_idx].insert(hashes[row_idx]); } } @@ -760,9 +729,6 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { group_indices.iter().zip(filter.iter()).enumerate() { if let Some(true) = filter_value { - // if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { - // self.distinct_sets[group_idx].insert(scalar); - // } self.distinct_sets[group_idx].insert(hashes[row_idx]); } } @@ -777,9 +743,6 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { for (row_idx, ((filter_value, &group_idx), is_valid)) in iter { if is_valid && filter_value == Some(true) { - // if let Ok(scalar) = ScalarValue::try_from_array(array, row_idx) { - // self.distinct_sets[group_idx].insert(scalar); - // } self.distinct_sets[group_idx].insert(hashes[row_idx]); } } @@ -854,16 +817,13 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { }) .peekable(); let data_array: ArrayRef = if value_iter.peek().is_none() { - // arrow::array::new_empty_array(&self.data_type) as _ arrow::array::new_empty_array(&DataType::UInt64) as _ } else { - // Arc::new(ScalarValue::iter_to_array(value_iter)?) as _ Arc::new(UInt64Array::from_iter_values(value_iter)) }; let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); let list_array = ListArray::new( - // Arc::new(Field::new_list_field(self.data_type.clone(), true)), Arc::new(Field::new_list_field(DataType::UInt64, true)), offset_buffer, data_array, @@ -905,30 +865,16 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator { let mut size = size_of::(); // Size of the vector holding the HashSets - size += size_of::>>() - + self.distinct_sets.capacity() * size_of::>(); + size += size_of::>>() + + self.distinct_sets.capacity() + * size_of::>(); // Estimate HashSet contents size more efficiently // Instead of iterating through all values which is expensive, use an approximation for set in &self.distinct_sets { // Base size of the HashSet - size += set.capacity() * size_of::<(u64, ())>(); - - // Estimate ScalarValue size using sample-based approach - // Only look at up to 10 items as a sample - // let sample_size = 10.min(set.len()); - // if sample_size > 0 { - // let avg_size = set - // .iter() - // .take(sample_size) - // .map(|v| v.size()) - // .sum::() - // / sample_size; - - // Extrapolate to the full set - // size += avg_size * (set.len() - sample_size); - size += size_of::() * set.len(); - // } + size += set.capacity() * size_of::<(HashValueType, ())>(); + size += size_of::() * set.len(); } size @@ -950,7 +896,7 @@ mod tests { #[test] fn test_distinct_count_groups_basic() -> Result<()> { - let mut accumulator = DistinctCountGroupsAccumulator::new(DataType::Int32); + let mut accumulator = DistinctCountGroupsAccumulator::new(); let values = vec![Arc::new(Int32Array::from(vec![1, 2, 1, 3, 2, 1])) as ArrayRef]; // 3 groups @@ -972,7 +918,7 @@ mod tests { #[test] fn test_distinct_count_groups_with_filter() -> Result<()> { - let mut accumulator = DistinctCountGroupsAccumulator::new(DataType::Utf8); + let mut accumulator = DistinctCountGroupsAccumulator::new(); let values = vec![ Arc::new(StringArray::from(vec!["a", "b", "a", "c", "b", "d"])) as ArrayRef, ];