From 9e1974e28041abd28b85abb63d5fc0c34e972b7a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 5 May 2025 09:52:27 -0600 Subject: [PATCH 1/3] Add Spark-compatible hex function --- datafusion/spark/src/function/math/hex.rs | 404 ++++++++++++++++++ datafusion/spark/src/function/math/mod.rs | 5 +- .../test_files/spark/math/hex.slt | 26 ++ 3 files changed, 434 insertions(+), 1 deletion(-) create mode 100644 datafusion/spark/src/function/math/hex.rs create mode 100644 datafusion/sqllogictest/test_files/spark/math/hex.slt diff --git a/datafusion/spark/src/function/math/hex.rs b/datafusion/spark/src/function/math/hex.rs new file mode 100644 index 0000000000000..965030b6c98fa --- /dev/null +++ b/datafusion/spark/src/function/math/hex.rs @@ -0,0 +1,404 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use crate::function::error_utils::{ + invalid_arg_count_exec_err, unsupported_data_type_exec_err, +}; +use arrow::array::{Array, StringArray}; +use arrow::datatypes::DataType; +use arrow::{ + array::{as_dictionary_array, as_largestring_array, as_string_array}, + datatypes::Int32Type, +}; +use datafusion_common::{ + cast::{as_binary_array, as_fixed_size_binary_array, as_int64_array}, + exec_err, DataFusionError, +}; +use datafusion_expr::Signature; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Volatility}; +use std::fmt::Write; + +/// +#[derive(Debug)] +pub struct SparkHex { + signature: Signature, + aliases: Vec, +} + +impl Default for SparkHex { + fn default() -> Self { + Self::new() + } +} + +impl SparkHex { + pub fn new() -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + aliases: vec![], + } + } +} + +impl ScalarUDFImpl for SparkHex { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "hex" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type( + &self, + _arg_types: &[DataType], + ) -> datafusion_common::Result { + Ok(DataType::Utf8) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + spark_hex(&args.args) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn coerce_types( + &self, + arg_types: &[DataType], + ) -> datafusion_common::Result> { + if arg_types.len() != 1 { + return Err(invalid_arg_count_exec_err("expm1", (1, 1), arg_types.len())); + } + match &arg_types[0] { + DataType::Int64 + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary => Ok(vec![arg_types[0].clone()]), + DataType::Dictionary(key_type, value_type) => match value_type.as_ref() { + DataType::Int64 + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary => Ok(vec![arg_types[0].clone()]), + other => { + if other.is_numeric() { + Ok(vec![DataType::Dictionary( + key_type.clone(), + Box::new(DataType::Int64), + )]) + } else { + Err(unsupported_data_type_exec_err( + "hex", + "Numeric, String, or Binary", + &arg_types[0], + )) + } + } + }, + other => { + if other.is_numeric() { + Ok(vec![DataType::Int64]) + } else { + Err(unsupported_data_type_exec_err( + "hex", + "Numeric, String, or Binary", + &arg_types[0], + )) + } + } + } + } +} + +fn hex_int64(num: i64) -> String { + format!("{:X}", num) +} + +#[inline(always)] +fn hex_encode>(data: T, lower_case: bool) -> String { + let mut s = String::with_capacity(data.as_ref().len() * 2); + if lower_case { + for b in data.as_ref() { + // Writing to a string never errors, so we can unwrap here. + write!(&mut s, "{b:02x}").unwrap(); + } + } else { + for b in data.as_ref() { + // Writing to a string never errors, so we can unwrap here. + write!(&mut s, "{b:02X}").unwrap(); + } + } + s +} + +#[inline(always)] +fn hex_bytes>(bytes: T) -> Result { + let hex_string = hex_encode(bytes, false); + Ok(hex_string) +} + +/// Spark-compatible `hex` function +pub fn spark_hex(args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return Err(DataFusionError::Internal( + "hex expects exactly one argument".to_string(), + )); + } + + let input = match &args[0] { + ColumnarValue::Scalar(value) => ColumnarValue::Array(value.to_array()?), + ColumnarValue::Array(_) => args[0].clone(), + }; + + match &input { + ColumnarValue::Array(array) => match array.data_type() { + DataType::Int64 => { + let array = as_int64_array(array)?; + + let hexed_array: StringArray = + array.iter().map(|v| v.map(hex_int64)).collect(); + + Ok(ColumnarValue::Array(Arc::new(hexed_array))) + } + DataType::Utf8 => { + let array = as_string_array(array); + + let hexed: StringArray = array + .iter() + .map(|v| v.map(hex_bytes).transpose()) + .collect::>()?; + + Ok(ColumnarValue::Array(Arc::new(hexed))) + } + DataType::LargeUtf8 => { + let array = as_largestring_array(array); + + let hexed: StringArray = array + .iter() + .map(|v| v.map(hex_bytes).transpose()) + .collect::>()?; + + Ok(ColumnarValue::Array(Arc::new(hexed))) + } + DataType::Binary => { + let array = as_binary_array(array)?; + + let hexed: StringArray = array + .iter() + .map(|v| v.map(hex_bytes).transpose()) + .collect::>()?; + + Ok(ColumnarValue::Array(Arc::new(hexed))) + } + DataType::FixedSizeBinary(_) => { + let array = as_fixed_size_binary_array(array)?; + + let hexed: StringArray = array + .iter() + .map(|v| v.map(hex_bytes).transpose()) + .collect::>()?; + + Ok(ColumnarValue::Array(Arc::new(hexed))) + } + DataType::Dictionary(_, value_type) => { + let dict = as_dictionary_array::(&array); + + let values = match **value_type { + DataType::Int64 => as_int64_array(dict.values())? + .iter() + .map(|v| v.map(hex_int64)) + .collect::>(), + DataType::Utf8 => as_string_array(dict.values()) + .iter() + .map(|v| v.map(hex_bytes).transpose()) + .collect::>()?, + DataType::Binary => as_binary_array(dict.values())? + .iter() + .map(|v| v.map(hex_bytes).transpose()) + .collect::>()?, + _ => exec_err!( + "hex got an unexpected argument type: {:?}", + array.data_type() + )?, + }; + + let new_values: Vec> = dict + .keys() + .iter() + .map(|key| key.map(|k| values[k as usize].clone()).unwrap_or(None)) + .collect(); + + let string_array_values = StringArray::from(new_values); + + Ok(ColumnarValue::Array(Arc::new(string_array_values))) + } + _ => exec_err!( + "hex got an unexpected argument type: {:?}", + array.data_type() + ), + }, + _ => exec_err!("native hex does not support scalar values at this time"), + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow::array::{Int64Array, StringArray}; + use arrow::{ + array::{ + as_string_array, BinaryDictionaryBuilder, PrimitiveDictionaryBuilder, + StringBuilder, StringDictionaryBuilder, + }, + datatypes::{Int32Type, Int64Type}, + }; + use datafusion_expr::ColumnarValue; + + #[test] + fn test_dictionary_hex_utf8() { + let mut input_builder = StringDictionaryBuilder::::new(); + input_builder.append_value("hi"); + input_builder.append_value("bye"); + input_builder.append_null(); + input_builder.append_value("rust"); + let input = input_builder.finish(); + + let mut string_builder = StringBuilder::new(); + string_builder.append_value("6869"); + string_builder.append_value("627965"); + string_builder.append_null(); + string_builder.append_value("72757374"); + let expected = string_builder.finish(); + + let columnar_value = ColumnarValue::Array(Arc::new(input)); + let result = super::spark_hex(&[columnar_value]).unwrap(); + + let result = match result { + ColumnarValue::Array(array) => array, + _ => panic!("Expected array"), + }; + + let result = as_string_array(&result); + + assert_eq!(result, &expected); + } + + #[test] + fn test_dictionary_hex_int64() { + let mut input_builder = PrimitiveDictionaryBuilder::::new(); + input_builder.append_value(1); + input_builder.append_value(2); + input_builder.append_null(); + input_builder.append_value(3); + let input = input_builder.finish(); + + let mut string_builder = StringBuilder::new(); + string_builder.append_value("1"); + string_builder.append_value("2"); + string_builder.append_null(); + string_builder.append_value("3"); + let expected = string_builder.finish(); + + let columnar_value = ColumnarValue::Array(Arc::new(input)); + let result = super::spark_hex(&[columnar_value]).unwrap(); + + let result = match result { + ColumnarValue::Array(array) => array, + _ => panic!("Expected array"), + }; + + let result = as_string_array(&result); + + assert_eq!(result, &expected); + } + + #[test] + fn test_dictionary_hex_binary() { + let mut input_builder = BinaryDictionaryBuilder::::new(); + input_builder.append_value("1"); + input_builder.append_value("j"); + input_builder.append_null(); + input_builder.append_value("3"); + let input = input_builder.finish(); + + let mut expected_builder = StringBuilder::new(); + expected_builder.append_value("31"); + expected_builder.append_value("6A"); + expected_builder.append_null(); + expected_builder.append_value("33"); + let expected = expected_builder.finish(); + + let columnar_value = ColumnarValue::Array(Arc::new(input)); + let result = super::spark_hex(&[columnar_value]).unwrap(); + + let result = match result { + ColumnarValue::Array(array) => array, + _ => panic!("Expected array"), + }; + + let result = as_string_array(&result); + + assert_eq!(result, &expected); + } + + #[test] + fn test_hex_int64() { + let num = 1234; + let hexed = super::hex_int64(num); + assert_eq!(hexed, "4D2".to_string()); + + let num = -1; + let hexed = super::hex_int64(num); + assert_eq!(hexed, "FFFFFFFFFFFFFFFF".to_string()); + } + + #[test] + fn test_spark_hex_int64() { + let int_array = Int64Array::from(vec![Some(1), Some(2), None, Some(3)]); + let columnar_value = ColumnarValue::Array(Arc::new(int_array)); + + let result = super::spark_hex(&[columnar_value]).unwrap(); + let result = match result { + ColumnarValue::Array(array) => array, + _ => panic!("Expected array"), + }; + + let string_array = as_string_array(&result); + let expected_array = StringArray::from(vec![ + Some("1".to_string()), + Some("2".to_string()), + None, + Some("3".to_string()), + ]); + + assert_eq!(string_array, &expected_array); + } +} diff --git a/datafusion/spark/src/function/math/mod.rs b/datafusion/spark/src/function/math/mod.rs index c5b007f40d7ff..80bcdc39a41de 100644 --- a/datafusion/spark/src/function/math/mod.rs +++ b/datafusion/spark/src/function/math/mod.rs @@ -16,19 +16,22 @@ // under the License. pub mod expm1; +pub mod hex; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; use std::sync::Arc; make_udf_function!(expm1::SparkExpm1, expm1); +make_udf_function!(hex::SparkHex, hex); pub mod expr_fn { use datafusion_functions::export_functions; export_functions!((expm1, "Returns exp(expr) - 1 as a Float64.", arg1)); + export_functions!((hex, "Computes hex value of the given column.", arg1)); } pub fn functions() -> Vec> { - vec![expm1()] + vec![expm1(), hex()] } diff --git a/datafusion/sqllogictest/test_files/spark/math/hex.slt b/datafusion/sqllogictest/test_files/spark/math/hex.slt new file mode 100644 index 0000000000000..907f67d07ee67 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/math/hex.slt @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +query T +SELECT hex('Spark SQL'); +---- +537061726B2053514C + +query T +SELECT hex(1234::INT); +---- +4D2 From 779f39c97989c8fd384c31dc40a943f843058683 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 5 May 2025 09:55:37 -0600 Subject: [PATCH 2/3] fix --- datafusion/spark/src/function/math/hex.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/math/hex.rs b/datafusion/spark/src/function/math/hex.rs index 965030b6c98fa..5cce882cce0af 100644 --- a/datafusion/spark/src/function/math/hex.rs +++ b/datafusion/spark/src/function/math/hex.rs @@ -93,7 +93,7 @@ impl ScalarUDFImpl for SparkHex { arg_types: &[DataType], ) -> datafusion_common::Result> { if arg_types.len() != 1 { - return Err(invalid_arg_count_exec_err("expm1", (1, 1), arg_types.len())); + return Err(invalid_arg_count_exec_err("hex", (1, 1), arg_types.len())); } match &arg_types[0] { DataType::Int64 From 1ce03bffffd6c980dc99555499d565190864f5fe Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 7 May 2025 17:05:07 -0400 Subject: [PATCH 3/3] Add array slt tests --- .../sqllogictest/test_files/spark/math/hex.slt | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/datafusion/sqllogictest/test_files/spark/math/hex.slt b/datafusion/sqllogictest/test_files/spark/math/hex.slt index 907f67d07ee67..24db1a318358a 100644 --- a/datafusion/sqllogictest/test_files/spark/math/hex.slt +++ b/datafusion/sqllogictest/test_files/spark/math/hex.slt @@ -24,3 +24,17 @@ query T SELECT hex(1234::INT); ---- 4D2 + +query T +SELECT hex(a) from VALUES (1234::INT), (NULL), (456::INT) AS t(a); +---- +4D2 +NULL +1C8 + +query T +SELECT hex(a) from VALUES ('foo'), (NULL), ('foobarbaz') AS t(a); +---- +666F6F +NULL +666F6F62617262617A