From f05b108316cb80ae68022b19c250ffd48a9b2411 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Wed, 12 Apr 2023 09:57:05 +0800 Subject: [PATCH] Refine the size() calculation of accumulator --- .../src/physical_plan/aggregates/row_hash.rs | 113 ++++++++++++------ .../physical-expr/src/aggregate/average.rs | 2 +- datafusion/physical-expr/src/aggregate/sum.rs | 2 +- 3 files changed, 77 insertions(+), 40 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 3cc24425435ba..1e3420c2de9e3 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -338,6 +338,7 @@ impl GroupedHashAggregateStream { map, group_states, .. } = &mut self.aggr_state; + let mut accumulator_set_init_size = None; for (row, hash) in batch_hashes.into_iter().enumerate() { let entry = map.get_mut(hash, |(_hash, group_idx)| { // verify that a group that we are inserting with hash is @@ -384,13 +385,15 @@ impl GroupedHashAggregateStream { + (std::mem::size_of::() * group_state.indices.capacity()); // Allocation done by normal accumulators - *allocated += (std::mem::size_of::>() - * group_state.accumulator_set.capacity()) - + group_state - .accumulator_set - .iter() - .map(|accu| accu.size()) - .sum::(); + *allocated += *accumulator_set_init_size.get_or_insert_with(|| { + std::mem::size_of::>() + * group_state.accumulator_set.capacity() + + group_state + .accumulator_set + .iter() + .map(|accu| accu.size()) + .sum::() + }); // for hasher function, use precomputed hash value map.insert_accounted( @@ -410,7 +413,7 @@ impl GroupedHashAggregateStream { // Update the accumulator results, according to row_aggr_state. #[allow(clippy::too_many_arguments)] - fn update_accumulators( + fn update_accumulators( &mut self, groups_with_rows: &[usize], offsets: &[usize], @@ -418,8 +421,16 @@ impl GroupedHashAggregateStream { normal_values: &[Vec], row_filter_values: &[Option], normal_filter_values: &[Option], + func_row: F1, + func_normal: F2, allocated: &mut usize, - ) -> Result<()> { + ) -> Result<()> + where + F1: Fn(&mut RowAccumulatorItem, &mut RowAccessor, &[ArrayRef]) -> Result<()>, + F2: Fn(&mut AccumulatorItem, &[ArrayRef]) -> Result<()>, + { + let accumulator_set_pre = + get_accumulator_set_size(groups_with_rows, &self.aggr_state.group_states); // 2.1 for each key in this batch // 2.2 for each aggregation // 2.3 `slice` from each of its arrays the keys' values @@ -446,15 +457,7 @@ impl GroupedHashAggregateStream { RowAccessor::new_from_layout(self.row_aggr_layout.clone()); state_accessor .point_to(0, group_state.aggregation_buffer.as_mut_slice()); - match self.mode { - AggregateMode::Partial => { - accumulator.update_batch(&values, &mut state_accessor) - } - AggregateMode::FinalPartitioned | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values, &mut state_accessor) - } - } + func_row(accumulator, &mut state_accessor, &values) })?; // normal accumulators group_state @@ -468,17 +471,7 @@ impl GroupedHashAggregateStream { filter_opt.as_ref(), offsets, )?; - let size_pre = accumulator.size(); - let res = match self.mode { - AggregateMode::Partial => accumulator.update_batch(&values), - AggregateMode::FinalPartitioned | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values) - } - }; - let size_post = accumulator.size(); - *allocated += size_post.saturating_sub(size_pre); - res + func_normal(accumulator, &values) }) // 2.5 .and({ @@ -486,6 +479,9 @@ impl GroupedHashAggregateStream { Ok(()) }) })?; + let accumulator_set_post = + get_accumulator_set_size(groups_with_rows, &self.aggr_state.group_states); + *allocated += accumulator_set_post.saturating_sub(accumulator_set_pre); Ok(()) } @@ -534,15 +530,43 @@ impl GroupedHashAggregateStream { get_optional_filters(&row_filter_values, &batch_indices); let normal_filter_values = get_optional_filters(&normal_filter_values, &batch_indices); - self.update_accumulators( - &groups_with_rows, - &offsets, - &row_values, - &normal_values, - &row_filter_values, - &normal_filter_values, - &mut allocated, - )?; + match self.mode { + AggregateMode::Partial => self.update_accumulators( + &groups_with_rows, + &offsets, + &row_values, + &normal_values, + &row_filter_values, + &normal_filter_values, + |accumulator: &mut RowAccumulatorItem, + state_accessor: &mut RowAccessor, + values: &[ArrayRef]| { + accumulator.update_batch(values, state_accessor) + }, + |accumulator: &mut AccumulatorItem, values: &[ArrayRef]| { + accumulator.update_batch(values) + }, + &mut allocated, + )?, + AggregateMode::FinalPartitioned | AggregateMode::Final => self + .update_accumulators( + &groups_with_rows, + &offsets, + &row_values, + &normal_values, + &row_filter_values, + &normal_filter_values, + |accumulator: &mut RowAccumulatorItem, + state_accessor: &mut RowAccessor, + values: &[ArrayRef]| { + accumulator.merge_batch(values, state_accessor) + }, + |accumulator: &mut AccumulatorItem, values: &[ArrayRef]| { + accumulator.merge_batch(values) + }, + &mut allocated, + )?, + }; } allocated += self .row_converter @@ -552,6 +576,19 @@ impl GroupedHashAggregateStream { } } +fn get_accumulator_set_size( + groups_with_rows: &[usize], + group_states: &[GroupState], +) -> usize { + groups_with_rows.iter().fold(0usize, |acc, group_idx| { + let group_state = &group_states[*group_idx]; + group_state + .accumulator_set + .iter() + .fold(acc, |acc, accumulator| acc + accumulator.size()) + }) +} + /// The state that is built for each output group. #[derive(Debug)] pub struct GroupState { diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index eaf30ea10f82c..c72af7220b850 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -242,7 +242,7 @@ impl Accumulator for AvgAccumulator { } fn size(&self) -> usize { - std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size() + std::mem::size_of_val(self) } } diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index 0e302f332f9f4..8bd52dfd26d34 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -289,7 +289,7 @@ impl Accumulator for SumAccumulator { } fn size(&self) -> usize { - std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size() + std::mem::size_of_val(self) } }