From 9b30c2b2660d3087b67c7ed2682782587999d04d Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 25 May 2021 12:09:16 +0200 Subject: [PATCH 1/2] reduce memory needed for concat --- arrow/src/array/transform/mod.rs | 55 ++++++++++++++++++++++++++++- arrow/src/compute/kernels/concat.rs | 19 ++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/arrow/src/array/transform/mod.rs b/arrow/src/array/transform/mod.rs index e7ec41e97a75..cd51d3c06203 100644 --- a/arrow/src/array/transform/mod.rs +++ b/arrow/src/array/transform/mod.rs @@ -21,6 +21,7 @@ use crate::{ error::{ArrowError, Result}, util::bit_util, }; +use std::mem; use super::{ data::{into_buffers, new_buffers}, @@ -341,7 +342,59 @@ impl<'a> MutableArrayData<'a> { use_nulls = true; }; - let [buffer1, buffer2] = new_buffers(data_type, capacity); + // We can prevent reallocation by precomputing the needed size. + // This is faster and more memory efficient. + let [buffer1, buffer2] = match data_type { + DataType::LargeUtf8 => { + // offsets + let mut buffer = + MutableBuffer::new((1 + capacity) * mem::size_of::()); + // safety: `unsafe` code assumes that this buffer is initialized with one element + buffer.push(0i64); + let str_values_size = arrays + .iter() + .map(|data| { + // get the length of the value buffer + let buf_len = data.buffers()[1].len(); + // find the offset of the buffer + // this returns a slice of offsets, starting from the offset of the array + // so we can take the first value + let offset = data.buffer::(0)[0]; + buf_len - offset as usize + }) + .sum::(); + + [ + buffer, + MutableBuffer::new(str_values_size * mem::size_of::()), + ] + } + DataType::Utf8 => { + // offsets + let mut buffer = + MutableBuffer::new((1 + capacity) * mem::size_of::()); + // safety: `unsafe` code assumes that this buffer is initialized with one element + buffer.push(0i32); + let str_values_size = arrays + .iter() + .map(|data| { + // get the length of the value buffer + let buf_len = data.buffers()[1].len(); + // find the offset of the buffer + // this returns a slice of offsets, starting from the offset of the array + // so we can take the first value + let offset = data.buffer::(0)[0]; + buf_len - offset as usize + }) + .sum::(); + + [ + buffer, + MutableBuffer::new(str_values_size * mem::size_of::()), + ] + } + _ => new_buffers(data_type, capacity), + }; let child_data = match &data_type { DataType::Null diff --git a/arrow/src/compute/kernels/concat.rs b/arrow/src/compute/kernels/concat.rs index 35ff183ed91c..83140c8fd646 100644 --- a/arrow/src/compute/kernels/concat.rs +++ b/arrow/src/compute/kernels/concat.rs @@ -452,4 +452,23 @@ mod tests { let concat = concat_dictionary(input_1, input_2); assert_eq!(concat, expected); } + + #[test] + fn test_concat_string_sizes() -> Result<()> { + let a: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect(); + let b: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect(); + let c = LargeStringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + // 150 * 3 = 450 + // 150 * 3 = 450 + // 3 * 3 = 9 + // ------------+ + // 909 + // closest 64 byte aligned cap = 960 + + let arr = concat(&[&a, &b, &c])?; + // this would have been 1280 if we did not precompute the value lengths. + assert_eq!(arr.data().buffers()[1].capacity(), 960); + + Ok(()) + } } From b3bfb5d529b7a9018741f579a717d86e71824c50 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 26 May 2021 08:25:09 +0200 Subject: [PATCH 2/2] reuse code for str allocation buffer --- arrow/src/array/transform/mod.rs | 82 +++++++++++++------------------- 1 file changed, 34 insertions(+), 48 deletions(-) diff --git a/arrow/src/array/transform/mod.rs b/arrow/src/array/transform/mod.rs index cd51d3c06203..5611671568f5 100644 --- a/arrow/src/array/transform/mod.rs +++ b/arrow/src/array/transform/mod.rs @@ -27,6 +27,7 @@ use super::{ data::{into_buffers, new_buffers}, ArrayData, }; +use crate::array::StringOffsetSizeTrait; mod boolean; mod fixed_binary; @@ -325,6 +326,37 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls { }) } +fn preallocate_str_buffer( + capacity: usize, + arrays: &[&ArrayData], +) -> [MutableBuffer; 2] { + // offsets + let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); + // safety: `unsafe` code assumes that this buffer is initialized with one element + if Offset::is_large() { + buffer.push(0i64); + } else { + buffer.push(0i32) + } + let str_values_size = arrays + .iter() + .map(|data| { + // get the length of the value buffer + let buf_len = data.buffers()[1].len(); + // find the offset of the buffer + // this returns a slice of offsets, starting from the offset of the array + // so we can take the first value + let offset = data.buffer::(0)[0]; + buf_len - offset.to_usize().unwrap() + }) + .sum::(); + + [ + buffer, + MutableBuffer::new(str_values_size * mem::size_of::()), + ] +} + impl<'a> MutableArrayData<'a> { /// returns a new [MutableArrayData] with capacity to `capacity` slots and specialized to create an /// [ArrayData] from multiple `arrays`. @@ -345,54 +377,8 @@ impl<'a> MutableArrayData<'a> { // We can prevent reallocation by precomputing the needed size. // This is faster and more memory efficient. let [buffer1, buffer2] = match data_type { - DataType::LargeUtf8 => { - // offsets - let mut buffer = - MutableBuffer::new((1 + capacity) * mem::size_of::()); - // safety: `unsafe` code assumes that this buffer is initialized with one element - buffer.push(0i64); - let str_values_size = arrays - .iter() - .map(|data| { - // get the length of the value buffer - let buf_len = data.buffers()[1].len(); - // find the offset of the buffer - // this returns a slice of offsets, starting from the offset of the array - // so we can take the first value - let offset = data.buffer::(0)[0]; - buf_len - offset as usize - }) - .sum::(); - - [ - buffer, - MutableBuffer::new(str_values_size * mem::size_of::()), - ] - } - DataType::Utf8 => { - // offsets - let mut buffer = - MutableBuffer::new((1 + capacity) * mem::size_of::()); - // safety: `unsafe` code assumes that this buffer is initialized with one element - buffer.push(0i32); - let str_values_size = arrays - .iter() - .map(|data| { - // get the length of the value buffer - let buf_len = data.buffers()[1].len(); - // find the offset of the buffer - // this returns a slice of offsets, starting from the offset of the array - // so we can take the first value - let offset = data.buffer::(0)[0]; - buf_len - offset as usize - }) - .sum::(); - - [ - buffer, - MutableBuffer::new(str_values_size * mem::size_of::()), - ] - } + DataType::LargeUtf8 => preallocate_str_buffer::(capacity, &arrays), + DataType::Utf8 => preallocate_str_buffer::(capacity, &arrays), _ => new_buffers(data_type, capacity), };