diff --git a/datafusion/common/src/scalar/cache.rs b/datafusion/common/src/scalar/cache.rs new file mode 100644 index 0000000000000..f1476a518774b --- /dev/null +++ b/datafusion/common/src/scalar/cache.rs @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Array caching utilities for scalar values + +use std::iter::repeat_n; +use std::sync::{Arc, LazyLock, Mutex}; + +use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray}; +use arrow::datatypes::{ + ArrowDictionaryKeyType, DataType, Int16Type, Int32Type, Int64Type, Int8Type, + UInt16Type, UInt32Type, UInt64Type, UInt8Type, +}; + +/// Maximum number of rows to cache to be conservative on memory usage +const MAX_CACHE_SIZE: usize = 1024 * 1024; + +/// Cache for dictionary key arrays to avoid repeated allocations +/// when the same size is used frequently. +/// +/// Similar to PartitionColumnProjector's ZeroBufferGenerators, this cache +/// stores key arrays for different dictionary key types. The cache is +/// limited to 1 entry per type (the last size used) to prevent memory leaks +/// for extremely large array requests. +#[derive(Debug)] +struct KeyArrayCache { + cache: Option<(usize, bool, PrimitiveArray)>, // (num_rows, is_null, key_array) +} + +impl Default for KeyArrayCache { + fn default() -> Self { + Self { cache: None } + } +} + +impl KeyArrayCache { + /// Get or create a cached key array for the given number of rows and null status + fn get_or_create(&mut self, num_rows: usize, is_null: bool) -> PrimitiveArray { + // Check cache size limit to prevent memory leaks + if num_rows > MAX_CACHE_SIZE { + // For very large arrays, don't cache them - just create and return + return self.create_key_array(num_rows, is_null); + } + + match &self.cache { + Some((cached_num_rows, cached_is_null, cached_array)) + if *cached_num_rows == num_rows && *cached_is_null == is_null => + { + // Cache hit: reuse existing array if same size and null status + cached_array.clone() + } + _ => { + // Cache miss: create new array and cache it + let key_array = self.create_key_array(num_rows, is_null); + self.cache = Some((num_rows, is_null, key_array.clone())); + key_array + } + } + } + + /// Create a new key array with the specified number of rows and null status + fn create_key_array(&self, num_rows: usize, is_null: bool) -> PrimitiveArray { + let key_array: PrimitiveArray = repeat_n( + if is_null { + None + } else { + Some(K::default_value()) + }, + num_rows, + ) + .collect(); + key_array + } +} + +/// Cache for null arrays to avoid repeated allocations +/// when the same size is used frequently. +#[derive(Debug, Default)] +struct NullArrayCache { + cache: Option<(usize, ArrayRef)>, // (num_rows, null_array) +} + +impl NullArrayCache { + /// Get or create a cached null array for the given number of rows + fn get_or_create(&mut self, num_rows: usize) -> ArrayRef { + // Check cache size limit to prevent memory leaks + if num_rows > MAX_CACHE_SIZE { + // For very large arrays, don't cache them - just create and return + return new_null_array(&DataType::Null, num_rows); + } + + match &self.cache { + Some((cached_num_rows, cached_array)) if *cached_num_rows == num_rows => { + // Cache hit: reuse existing array if same size + Arc::clone(cached_array) + } + _ => { + // Cache miss: create new array and cache it + let null_array = new_null_array(&DataType::Null, num_rows); + self.cache = Some((num_rows, Arc::clone(&null_array))); + null_array + } + } + } +} + +/// Global cache for dictionary key arrays and null arrays +#[derive(Debug, Default)] +struct ArrayCaches { + cache_i8: KeyArrayCache, + cache_i16: KeyArrayCache, + cache_i32: KeyArrayCache, + cache_i64: KeyArrayCache, + cache_u8: KeyArrayCache, + cache_u16: KeyArrayCache, + cache_u32: KeyArrayCache, + cache_u64: KeyArrayCache, + null_cache: NullArrayCache, +} + +static ARRAY_CACHES: LazyLock> = + LazyLock::new(|| Mutex::new(ArrayCaches::default())); + +/// Get the global cache for arrays +fn get_array_caches() -> &'static Mutex { + &ARRAY_CACHES +} + +/// Get or create a cached null array for the given number of rows +pub(crate) fn get_or_create_cached_null_array(num_rows: usize) -> ArrayRef { + let cache = get_array_caches(); + let mut caches = cache.lock().unwrap(); + caches.null_cache.get_or_create(num_rows) +} + +/// Get or create a cached key array for a specific key type +pub(crate) fn get_or_create_cached_key_array( + num_rows: usize, + is_null: bool, +) -> PrimitiveArray { + let cache = get_array_caches(); + let mut caches = cache.lock().unwrap(); + + // Use the DATA_TYPE to dispatch to the correct cache, similar to original implementation + match K::DATA_TYPE { + DataType::Int8 => { + let array = caches.cache_i8.get_or_create(num_rows, is_null); + // Convert using ArrayData to avoid unsafe transmute + let array_data = array.to_data(); + PrimitiveArray::::from(array_data) + } + DataType::Int16 => { + let array = caches.cache_i16.get_or_create(num_rows, is_null); + let array_data = array.to_data(); + PrimitiveArray::::from(array_data) + } + DataType::Int32 => { + let array = caches.cache_i32.get_or_create(num_rows, is_null); + let array_data = array.to_data(); + PrimitiveArray::::from(array_data) + } + DataType::Int64 => { + let array = caches.cache_i64.get_or_create(num_rows, is_null); + let array_data = array.to_data(); + PrimitiveArray::::from(array_data) + } + DataType::UInt8 => { + let array = caches.cache_u8.get_or_create(num_rows, is_null); + let array_data = array.to_data(); + PrimitiveArray::::from(array_data) + } + DataType::UInt16 => { + let array = caches.cache_u16.get_or_create(num_rows, is_null); + let array_data = array.to_data(); + PrimitiveArray::::from(array_data) + } + DataType::UInt32 => { + let array = caches.cache_u32.get_or_create(num_rows, is_null); + let array_data = array.to_data(); + PrimitiveArray::::from(array_data) + } + DataType::UInt64 => { + let array = caches.cache_u64.get_or_create(num_rows, is_null); + let array_data = array.to_data(); + PrimitiveArray::::from(array_data) + } + _ => { + // Fallback for unsupported types - create array directly without caching + let key_array: PrimitiveArray = repeat_n( + if is_null { + None + } else { + Some(K::default_value()) + }, + num_rows, + ) + .collect(); + key_array + } + } +} diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 75a341bcb1359..3d488b130dbfd 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -17,6 +17,7 @@ //! [`ScalarValue`]: stores single values +mod cache; mod consts; mod struct_builder; @@ -81,6 +82,7 @@ use arrow::datatypes::{ UInt32Type, UInt64Type, UInt8Type, UnionFields, UnionMode, DECIMAL128_MAX_PRECISION, }; use arrow::util::display::{array_value_to_string, ArrayFormatter, FormatOptions}; +use cache::{get_or_create_cached_key_array, get_or_create_cached_null_array}; use chrono::{Duration, NaiveDate}; use half::f16; pub use struct_builder::ScalarStructBuilder; @@ -864,15 +866,9 @@ fn dict_from_scalar( let values_array = value.to_array_of_size(1)?; // Create a key array with `size` elements, each of 0 - let key_array: PrimitiveArray = repeat_n( - if value.is_null() { - None - } else { - Some(K::default_value()) - }, - size, - ) - .collect(); + // Use cache to avoid repeated allocations for the same size + let key_array: PrimitiveArray = + get_or_create_cached_key_array::(size, value.is_null()); // create a new DictionaryArray // @@ -2677,7 +2673,7 @@ impl ScalarValue { _ => unreachable!("Invalid dictionary keys type: {:?}", key_type), } } - ScalarValue::Null => new_null_array(&DataType::Null, size), + ScalarValue::Null => get_or_create_cached_null_array(size), }) }