Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
impl state.
  • Loading branch information
Rachelint committed Jan 29, 2025
commit 7f10006434a4bb038c3b17f401f2460bb75005e5
70 changes: 51 additions & 19 deletions datafusion/functions-aggregate/src/median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use std::sync::Arc;

use arrow::array::{
downcast_integer, ArrowNumericType, BooleanArray, GenericListBuilder,
GenericListViewArray,
GenericListViewArray, ListArray, ListBuilder, PrimitiveArray, PrimitiveBuilder,
};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::{
array::{ArrayRef, AsArray},
datatypes::{
Expand Down Expand Up @@ -235,21 +236,23 @@ impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
}
}

/// The median accumulator accumulates the raw input values
/// as `ScalarValue`s
/// The median groups accumulator accumulates the raw input values
///
/// For calculating the accurate medians of groups, we need to store all values
/// of groups before final evaluation.
/// And values in each group will be stored in a `Vec<T>`, so the total group values
/// will be actually organized as a `Vec<Vec<T>>`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given it is important to track the median values for each group separately I don't really see a way around Vec/Vec -- I think it is the simplest version and will have pretty reasonable performance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I tried not to use Vec<Vec<T>> for avoiding copying from Vec<Vec<T>> to the result Vec<T>, but it is hard to do that.

///
/// In partial aggregation stage, the `values`
///
/// The intermediate state is represented as a List of scalar values updated by
/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
/// in the final evaluation step so that we avoid expensive conversions and
/// allocations during `update_batch`.
#[derive(Debug)]
struct MedianGroupAccumulator<T: ArrowNumericType + Send> {
struct MedianGroupsAccumulator<T: ArrowNumericType + Send> {
data_type: DataType,
group_values: Vec<Vec<T::Native>>,
Copy link
Contributor

@korowa korowa Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wonder -- using Vec<Vec<>> for as a state storage doesn't seem to differ much from what regular accumulator does, but this PR still introduces a noticeable performance improvement. Are there any other optimizations that could be used in regular accumulator?

P.S. asking just because when I was doing +- same for count distinct (PR), the performance for GroupsAccumulator with Vec<HashSet<>> was not that significant comparing to regular accumulators with HashSet<> states.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think among other things, the intermediate state management (creating ListArrays directly rather than from ScalarValue) probably helps a lot:

https://github.com/apache/datafusion/blob/6c9355d5be8b6045865fed67cb6d028b2dfc2e06/datafusion/functions-aggregate/src/median.rs#L200-L199

There is also an extra allocation per group when using the groups accumulator adapter thingie

That being said, it is a fair question how much better the existing MedianAccumulator could be if it built the ListArrays as does this PR directly 🤔

Copy link
Contributor Author

@Rachelint Rachelint Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@korowa I think what mentioned by @alamb is a important point about the improvement.

Following are some other points for me:

  • in GroupsAccumulatorAdapter::update_batch, we need to reorder the input batch, and use slice to split the reordered batch after. I think such two operations may be not cheap.

    let values = take_arrays(values, &batch_indices, None)?;
    let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;
    // invoke each accumulator with the appropriate rows, first
    // pulling the input arguments for this group into their own
    // RecordBatch(es)
    let iter = groups_with_rows.iter().zip(offsets.windows(2));
    let mut sizes_pre = 0;
    let mut sizes_post = 0;
    for (&group_idx, offsets) in iter {
    let state = &mut self.states[group_idx];
    sizes_pre += state.size();
    let values_to_accumulate = slice_and_maybe_filter(
    &values,
    opt_filter.as_ref().map(|f| f.as_boolean()),
    offsets,
    )?;
    f(state.accumulator.as_mut(), &values_to_accumulate)?;
    // clear out the state so they are empty for next
    // iteration
    state.indices.clear();
    sizes_post += state.size();

  • in GroupsAccumulatorAdapter::merge_batch, the similar problem as input batch may be even more serious... Becasue we need to reorder a ListArray

  • and in GroupsAccumulatorAdapter::state, extra allocations exist as mentioned by @alamb .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@korowa it means impl a group accumulator for distinct count not get a obviously improvement?
It is really surprise for me, I am learning #8721

Copy link
Contributor

@korowa korowa Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was some improvements, but overall results for clickbench q9 (I was mostly looking at this query) were like x2.63 for GroupsAccumulator, and x2.30 for the regular Accumulator -- so it would be like 13-15% overall difference, which is not as massive as this PR results.

However, maybe things has changed in GroupsAccumulator implementation, and now even plain Vec<HashSet<>> will be way faster.

UPD: and, yes, maybe producing state, as pointed out by @alamb above, was (at least partially) the cause of non-significant improvement -- in count distinct it was implemented via ListArray::from_iter_primitive (commit), instead of building it from single flattened array and its offsets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem really worth seeking the reason more deeply.

null_state: NullState,
}

impl<T: ArrowNumericType + Send> MedianGroupAccumulator<T> {
impl<T: ArrowNumericType + Send> MedianGroupsAccumulator<T> {
pub fn new(data_type: DataType) -> Self {
Self {
data_type,
Expand All @@ -259,7 +262,7 @@ impl<T: ArrowNumericType + Send> MedianGroupAccumulator<T> {
}
}

impl<T: ArrowNumericType + Send> GroupsAccumulator for MedianGroupAccumulator<T> {
impl<T: ArrowNumericType + Send> GroupsAccumulator for MedianGroupsAccumulator<T> {
fn update_batch(
&mut self,
values: &[ArrayRef],
Expand Down Expand Up @@ -295,7 +298,7 @@ impl<T: ArrowNumericType + Send> GroupsAccumulator for MedianGroupAccumulator<T>
) -> Result<()> {
assert_eq!(values.len(), 1, "one argument to merge_batch");

// The merged values should be organized like as a `ListArray` like:
// The merged values should be organized like as a `non-nullable ListArray` like:
//
// ```text
// group 0: [1, 2, 3]
Expand All @@ -306,26 +309,55 @@ impl<T: ArrowNumericType + Send> GroupsAccumulator for MedianGroupAccumulator<T>
// ```
//
let input_group_values = values[0].as_list::<i32>();
assert!(input_group_values.null_count() == 0);

// Ensure group values big enough
self.group_values.resize(total_num_groups, Vec::new());

// Adds the counts with the partial counts
// Extend values to related groups
group_indices
.iter()
.zip(input_group_values.iter())
.for_each(|(&group_index, values_opt)| {
if let Some(values) = values_opt {
let values = values.as_primitive::<T>();
self.group_values[group_index].extend(values.values().iter());
}
let values = values_opt.unwrap();
let values = values.as_primitive::<T>();
self.group_values[group_index].extend(values.values().iter());
});

Ok(())
}

fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
todo!()
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let emit_group_values = emit_to.take_needed(&mut self.group_values);

// Build offsets
let mut offsets = Vec::with_capacity(self.group_values.len() + 1);
offsets.push(0);
let mut cur_len = 0;
for group_value in &emit_group_values {
cur_len += group_value.len() as i32;
offsets.push(cur_len);
}
let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));

// Build inner array
let flatten_group_values =
emit_group_values.into_iter().flatten().collect::<Vec<_>>();
let group_values_array =
PrimitiveArray::<T>::new(ScalarBuffer::from(flatten_group_values), None);

// Build the result list array
let result_list_array = ListArray::new(
Arc::new(Field::new_list_field(self.data_type.clone(), false)),
offsets,
Arc::new(group_values_array),
None,
);

Ok(vec![Arc::new(result_list_array)])
}

fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
todo!()
}

Expand Down