From ee6fe7118be350fdf964fa6d8adf96d17247c409 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 1 Feb 2025 21:31:33 +0800 Subject: [PATCH 1/2] improve median() no grouping case --- datafusion/functions-aggregate/src/median.rs | 26 ++++++++++++++------ 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/datafusion/functions-aggregate/src/median.rs b/datafusion/functions-aggregate/src/median.rs index defbbe737a9de..79efa082e3fbc 100644 --- a/datafusion/functions-aggregate/src/median.rs +++ b/datafusion/functions-aggregate/src/median.rs @@ -242,14 +242,26 @@ impl Debug for MedianAccumulator { impl Accumulator for MedianAccumulator { fn state(&mut self) -> Result> { - let all_values = self - .all_values - .iter() - .map(|x| ScalarValue::new_primitive::(Some(*x), &self.data_type)) - .collect::>>()?; + // Convert `all_values` to `ListArray` and return a single List ScalarValue - let arr = ScalarValue::new_list_nullable(&all_values, &self.data_type); - Ok(vec![ScalarValue::List(arr)]) + // Build offsets + let offsets = + OffsetBuffer::new(ScalarBuffer::from(vec![0, self.all_values.len() as i32])); + + // Build inner array + let values_array = + PrimitiveArray::::new(ScalarBuffer::from(self.all_values.clone()), None) + .with_data_type(self.data_type.clone()); + + // Build the result list array + let list_array = ListArray::new( + Arc::new(Field::new_list_field(self.data_type.clone(), true)), + offsets, + Arc::new(values_array), + None, + ); + + Ok(vec![ScalarValue::List(Arc::new(list_array))]) } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { From ab4eeaa459d82830f29d5711847070e826563aa7 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 1 Feb 2025 22:13:13 +0800 Subject: [PATCH 2/2] review --- datafusion/functions-aggregate/src/median.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/median.rs b/datafusion/functions-aggregate/src/median.rs index 79efa082e3fbc..ba6b63260e068 100644 --- a/datafusion/functions-aggregate/src/median.rs +++ b/datafusion/functions-aggregate/src/median.rs @@ -249,9 +249,11 @@ impl Accumulator for MedianAccumulator { OffsetBuffer::new(ScalarBuffer::from(vec![0, self.all_values.len() as i32])); // Build inner array - let values_array = - PrimitiveArray::::new(ScalarBuffer::from(self.all_values.clone()), None) - .with_data_type(self.data_type.clone()); + let values_array = PrimitiveArray::::new( + ScalarBuffer::from(std::mem::take(&mut self.all_values)), + None, + ) + .with_data_type(self.data_type.clone()); // Build the result list array let list_array = ListArray::new(