Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions datafusion/common/src/scalar/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Array caching utilities for scalar values

use std::iter::repeat_n;
use std::sync::{Arc, LazyLock, Mutex};

use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray};
use arrow::datatypes::{
ArrowDictionaryKeyType, DataType, Int16Type, Int32Type, Int64Type, Int8Type,
UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};

/// Maximum number of rows to cache to be conservative on memory usage
const MAX_CACHE_SIZE: usize = 1024 * 1024;

/// Cache for dictionary key arrays to avoid repeated allocations
/// when the same size is used frequently.
///
/// Similar to PartitionColumnProjector's ZeroBufferGenerators, this cache
/// stores key arrays for different dictionary key types. The cache is
/// limited to 1 entry per type (the last size used) to prevent memory leaks
/// for extremely large array requests.
#[derive(Debug)]
struct KeyArrayCache<K: ArrowDictionaryKeyType> {
cache: Option<(usize, bool, PrimitiveArray<K>)>, // (num_rows, is_null, key_array)
}

impl<K: ArrowDictionaryKeyType> Default for KeyArrayCache<K> {
fn default() -> Self {
Self { cache: None }
}
}

impl<K: ArrowDictionaryKeyType> KeyArrayCache<K> {
/// Get or create a cached key array for the given number of rows and null status
fn get_or_create(&mut self, num_rows: usize, is_null: bool) -> PrimitiveArray<K> {
// Check cache size limit to prevent memory leaks
if num_rows > MAX_CACHE_SIZE {
// For very large arrays, don't cache them - just create and return
return self.create_key_array(num_rows, is_null);
}

match &self.cache {
Some((cached_num_rows, cached_is_null, cached_array))
if *cached_num_rows == num_rows && *cached_is_null == is_null =>
{
// Cache hit: reuse existing array if same size and null status
cached_array.clone()
}
_ => {
// Cache miss: create new array and cache it
let key_array = self.create_key_array(num_rows, is_null);
self.cache = Some((num_rows, is_null, key_array.clone()));
key_array
}
}
}

/// Create a new key array with the specified number of rows and null status
fn create_key_array(&self, num_rows: usize, is_null: bool) -> PrimitiveArray<K> {
let key_array: PrimitiveArray<K> = repeat_n(
if is_null {
None
} else {
Some(K::default_value())
},
num_rows,
)
.collect();
key_array
}
}

/// Cache for null arrays to avoid repeated allocations
/// when the same size is used frequently.
#[derive(Debug, Default)]
struct NullArrayCache {
cache: Option<(usize, ArrayRef)>, // (num_rows, null_array)
}

impl NullArrayCache {
/// Get or create a cached null array for the given number of rows
fn get_or_create(&mut self, num_rows: usize) -> ArrayRef {
// Check cache size limit to prevent memory leaks
if num_rows > MAX_CACHE_SIZE {
// For very large arrays, don't cache them - just create and return
return new_null_array(&DataType::Null, num_rows);
}

match &self.cache {
Some((cached_num_rows, cached_array)) if *cached_num_rows == num_rows => {
// Cache hit: reuse existing array if same size
Arc::clone(cached_array)
}
_ => {
// Cache miss: create new array and cache it
let null_array = new_null_array(&DataType::Null, num_rows);
self.cache = Some((num_rows, Arc::clone(&null_array)));
null_array
}
}
}
}

/// Global cache for dictionary key arrays and null arrays
#[derive(Debug, Default)]
struct ArrayCaches {
cache_i8: KeyArrayCache<Int8Type>,
cache_i16: KeyArrayCache<Int16Type>,
cache_i32: KeyArrayCache<Int32Type>,
cache_i64: KeyArrayCache<Int64Type>,
cache_u8: KeyArrayCache<UInt8Type>,
cache_u16: KeyArrayCache<UInt16Type>,
cache_u32: KeyArrayCache<UInt32Type>,
cache_u64: KeyArrayCache<UInt64Type>,
null_cache: NullArrayCache,
}

static ARRAY_CACHES: LazyLock<Mutex<ArrayCaches>> =
LazyLock::new(|| Mutex::new(ArrayCaches::default()));

/// Get the global cache for arrays
fn get_array_caches() -> &'static Mutex<ArrayCaches> {
&ARRAY_CACHES
}

/// Get or create a cached null array for the given number of rows
pub(crate) fn get_or_create_cached_null_array(num_rows: usize) -> ArrayRef {
let cache = get_array_caches();
let mut caches = cache.lock().unwrap();
caches.null_cache.get_or_create(num_rows)
}

/// Get or create a cached key array for a specific key type
pub(crate) fn get_or_create_cached_key_array<K: ArrowDictionaryKeyType>(
num_rows: usize,
is_null: bool,
) -> PrimitiveArray<K> {
let cache = get_array_caches();
let mut caches = cache.lock().unwrap();

// Use the DATA_TYPE to dispatch to the correct cache, similar to original implementation
match K::DATA_TYPE {
DataType::Int8 => {
let array = caches.cache_i8.get_or_create(num_rows, is_null);
// Convert using ArrayData to avoid unsafe transmute
let array_data = array.to_data();
PrimitiveArray::<K>::from(array_data)
}
DataType::Int16 => {
let array = caches.cache_i16.get_or_create(num_rows, is_null);
let array_data = array.to_data();
PrimitiveArray::<K>::from(array_data)
}
DataType::Int32 => {
let array = caches.cache_i32.get_or_create(num_rows, is_null);
let array_data = array.to_data();
PrimitiveArray::<K>::from(array_data)
}
DataType::Int64 => {
let array = caches.cache_i64.get_or_create(num_rows, is_null);
let array_data = array.to_data();
PrimitiveArray::<K>::from(array_data)
}
DataType::UInt8 => {
let array = caches.cache_u8.get_or_create(num_rows, is_null);
let array_data = array.to_data();
PrimitiveArray::<K>::from(array_data)
}
DataType::UInt16 => {
let array = caches.cache_u16.get_or_create(num_rows, is_null);
let array_data = array.to_data();
PrimitiveArray::<K>::from(array_data)
}
DataType::UInt32 => {
let array = caches.cache_u32.get_or_create(num_rows, is_null);
let array_data = array.to_data();
PrimitiveArray::<K>::from(array_data)
}
DataType::UInt64 => {
let array = caches.cache_u64.get_or_create(num_rows, is_null);
let array_data = array.to_data();
PrimitiveArray::<K>::from(array_data)
}
_ => {
// Fallback for unsupported types - create array directly without caching
let key_array: PrimitiveArray<K> = repeat_n(
if is_null {
None
} else {
Some(K::default_value())
},
num_rows,
)
.collect();
key_array
}
}
}
16 changes: 6 additions & 10 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! [`ScalarValue`]: stores single values

mod cache;
mod consts;
mod struct_builder;

Expand Down Expand Up @@ -81,6 +82,7 @@ use arrow::datatypes::{
UInt32Type, UInt64Type, UInt8Type, UnionFields, UnionMode, DECIMAL128_MAX_PRECISION,
};
use arrow::util::display::{array_value_to_string, ArrayFormatter, FormatOptions};
use cache::{get_or_create_cached_key_array, get_or_create_cached_null_array};
use chrono::{Duration, NaiveDate};
use half::f16;
pub use struct_builder::ScalarStructBuilder;
Expand Down Expand Up @@ -864,15 +866,9 @@ fn dict_from_scalar<K: ArrowDictionaryKeyType>(
let values_array = value.to_array_of_size(1)?;

// Create a key array with `size` elements, each of 0
let key_array: PrimitiveArray<K> = repeat_n(
if value.is_null() {
None
} else {
Some(K::default_value())
},
size,
)
.collect();
// Use cache to avoid repeated allocations for the same size
let key_array: PrimitiveArray<K> =
get_or_create_cached_key_array::<K>(size, value.is_null());

// create a new DictionaryArray
//
Expand Down Expand Up @@ -2677,7 +2673,7 @@ impl ScalarValue {
_ => unreachable!("Invalid dictionary keys type: {:?}", key_type),
}
}
ScalarValue::Null => new_null_array(&DataType::Null, size),
ScalarValue::Null => get_or_create_cached_null_array(size),
})
}

Expand Down