-
Notifications
You must be signed in to change notification settings - Fork 335
feat: add hex scalar function #449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d5af58d
2bfcf25
1caa6fd
385def8
8033c23
8c049a9
1539d6a
ecd2876
ecf57a8
5e17897
88bdcde
e7062c6
129f00a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,306 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
| use arrow::{ | ||
| array::{as_dictionary_array, as_largestring_array, as_string_array}, | ||
| datatypes::Int32Type, | ||
| }; | ||
| use arrow_array::StringArray; | ||
| use arrow_schema::DataType; | ||
| use datafusion::logical_expr::ColumnarValue; | ||
| use datafusion_common::{ | ||
| cast::{as_binary_array, as_fixed_size_binary_array, as_int64_array}, | ||
| exec_err, DataFusionError, | ||
| }; | ||
| use std::fmt::Write; | ||
|
|
||
| fn hex_int64(num: i64) -> String { | ||
| format!("{:X}", num) | ||
| } | ||
|
|
||
| fn hex_bytes<T: AsRef<[u8]>>(bytes: T) -> Result<String, std::fmt::Error> { | ||
| let bytes = bytes.as_ref(); | ||
| let length = bytes.len(); | ||
| let mut hex_string = String::with_capacity(length * 2); | ||
| for &byte in bytes { | ||
| write!(&mut hex_string, "{:02X}", byte)?; | ||
| } | ||
| Ok(hex_string) | ||
| } | ||
|
|
||
| pub(super) fn spark_hex(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> { | ||
| if args.len() != 1 { | ||
| return Err(DataFusionError::Internal( | ||
| "hex expects exactly one argument".to_string(), | ||
| )); | ||
| } | ||
|
|
||
| match &args[0] { | ||
| ColumnarValue::Array(array) => match array.data_type() { | ||
| DataType::Int64 => { | ||
| let array = as_int64_array(array)?; | ||
|
|
||
| let hexed_array: StringArray = array.iter().map(|v| v.map(hex_int64)).collect(); | ||
|
|
||
| Ok(ColumnarValue::Array(Arc::new(hexed_array))) | ||
| } | ||
| DataType::Utf8 => { | ||
| let array = as_string_array(array); | ||
|
tshauck marked this conversation as resolved.
|
||
|
|
||
| let hexed: StringArray = array | ||
| .iter() | ||
| .map(|v| v.map(hex_bytes).transpose()) | ||
| .collect::<Result<_, _>>()?; | ||
|
|
||
| Ok(ColumnarValue::Array(Arc::new(hexed))) | ||
| } | ||
| DataType::LargeUtf8 => { | ||
| let array = as_largestring_array(array); | ||
|
|
||
| let hexed: StringArray = array | ||
| .iter() | ||
| .map(|v| v.map(hex_bytes).transpose()) | ||
| .collect::<Result<_, _>>()?; | ||
|
|
||
| Ok(ColumnarValue::Array(Arc::new(hexed))) | ||
| } | ||
| DataType::Binary => { | ||
| let array = as_binary_array(array)?; | ||
|
|
||
| let hexed: StringArray = array | ||
| .iter() | ||
| .map(|v| v.map(hex_bytes).transpose()) | ||
| .collect::<Result<_, _>>()?; | ||
|
|
||
| Ok(ColumnarValue::Array(Arc::new(hexed))) | ||
| } | ||
| DataType::FixedSizeBinary(_) => { | ||
| let array = as_fixed_size_binary_array(array)?; | ||
|
|
||
| let hexed: StringArray = array | ||
| .iter() | ||
| .map(|v| v.map(hex_bytes).transpose()) | ||
| .collect::<Result<_, _>>()?; | ||
|
|
||
| Ok(ColumnarValue::Array(Arc::new(hexed))) | ||
| } | ||
| DataType::Dictionary(_, value_type) if matches!(**value_type, DataType::Int64) => { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not totally related to this PR. But this seems a bit unexpected. I believe many other codes in scalar_funcs and datafusion doesn't handle the dictionary type specially, such as cc @sunchao @andygrove and @viirya for more inputs.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also ran into issues with this in handling cast operations and ended up flattening the dictionary type first, but just for cast expressions. I agree that we need to look at this and see if there is a more systematic approach we can use.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The trick we used to use is adding We should consolidate the unpacking logic, otherwise we will need to add it every function. Or until that happens we can workaround with
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Do you have any concrete examples of how this works by any chance? I remember I saw some
Yes, maybe this logic should added in the rust planner side, which can unpack the dictionary automatically if it knows the expression cannot handle dictionary types.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Hmm, it's been a while, I cannot find right away... BTW my comment above is not a blocker. Since this PR already implemented it, we can follow up separately
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not only in Comet. I do remember in DataFusion not all functions/expressions support dictionary types. I suspect if there is a systematic approach to deal with it, because I think there is no general approach to process dictionary-encoded inputs for different functions/expressions. For example, some functions can directly work on dictionary values and re-create a new dictionary with updated values, but for some functions, it is impossible so it needs to unpack dictionary first.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW now I remember |
||
| let dict = as_dictionary_array::<Int32Type>(&array); | ||
|
|
||
| let hexed_values = as_int64_array(dict.values())?; | ||
| let values = hexed_values | ||
| .iter() | ||
| .map(|v| v.map(hex_int64)) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| let keys = dict.keys().clone(); | ||
| let mut new_keys = Vec::with_capacity(values.len()); | ||
|
|
||
| for key in keys.iter() { | ||
| let key = key.map(|k| values[k as usize].clone()).unwrap_or(None); | ||
| new_keys.push(key); | ||
| } | ||
|
|
||
| let string_array_values = StringArray::from(new_keys); | ||
|
Comment on lines
+112
to
+120
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, why we don't re-create a new dictionary array of string?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have the same issue currently in cast from string to other types. @viirya Do we have an example somewhere of converting a dictionary array without unpacking?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this case, we can call to construct a new dictionary array with new values.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viirya, thanks, where I'm a bit unclear is how to have the function return type also be a dictionary. The data type for the hex expression seems to be a Utf8, so I get
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viirya Am I correct in understanding that this PR is functionally correct but just not as efficient as possible? Perhaps we could consider having a follow-up issue to optimize this to rewrite the dictionary? It seems that we don't have a full example of dictionary rewrite for contributors to follow.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @andygrove Yes. It's not efficient but should be correct. |
||
| Ok(ColumnarValue::Array(Arc::new(string_array_values))) | ||
| } | ||
| DataType::Dictionary(_, value_type) if matches!(**value_type, DataType::Utf8) => { | ||
| let dict = as_dictionary_array::<Int32Type>(&array); | ||
|
|
||
| let hexed_values = as_string_array(dict.values()); | ||
| let values: Vec<Option<String>> = hexed_values | ||
| .iter() | ||
| .map(|v| v.map(hex_bytes).transpose()) | ||
| .collect::<Result<_, _>>()?; | ||
|
|
||
| let keys = dict.keys().clone(); | ||
|
|
||
| let mut new_keys = Vec::with_capacity(values.len()); | ||
|
|
||
| for key in keys.iter() { | ||
| let key = key.map(|k| values[k as usize].clone()).unwrap_or(None); | ||
| new_keys.push(key); | ||
| } | ||
|
|
||
| let string_array_values = StringArray::from(new_keys); | ||
| Ok(ColumnarValue::Array(Arc::new(string_array_values))) | ||
| } | ||
| DataType::Dictionary(_, value_type) if matches!(**value_type, DataType::Binary) => { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part of dictionary arrays handling can be rewritten to reduce duplicate actually. It can be follow-up though.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I filed #504 |
||
| let dict = as_dictionary_array::<Int32Type>(&array); | ||
|
|
||
| let hexed_values = as_binary_array(dict.values())?; | ||
| let values: Vec<Option<String>> = hexed_values | ||
| .iter() | ||
| .map(|v| v.map(hex_bytes).transpose()) | ||
| .collect::<Result<_, _>>()?; | ||
|
|
||
| let keys = dict.keys().clone(); | ||
| let mut new_keys = Vec::with_capacity(values.len()); | ||
|
|
||
| for key in keys.iter() { | ||
| let key = key.map(|k| values[k as usize].clone()).unwrap_or(None); | ||
| new_keys.push(key); | ||
| } | ||
|
|
||
| let string_array_values = StringArray::from(new_keys); | ||
| Ok(ColumnarValue::Array(Arc::new(string_array_values))) | ||
| } | ||
| _ => exec_err!( | ||
| "hex got an unexpected argument type: {:?}", | ||
| array.data_type() | ||
| ), | ||
| }, | ||
| _ => exec_err!("native hex does not support scalar values at this time"), | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod test { | ||
| use std::sync::Arc; | ||
|
|
||
| use arrow::{ | ||
| array::{ | ||
| as_string_array, BinaryDictionaryBuilder, PrimitiveDictionaryBuilder, StringBuilder, | ||
| StringDictionaryBuilder, | ||
| }, | ||
| datatypes::{Int32Type, Int64Type}, | ||
| }; | ||
| use arrow_array::{Int64Array, StringArray}; | ||
| use datafusion::logical_expr::ColumnarValue; | ||
|
|
||
| #[test] | ||
| fn test_dictionary_hex_utf8() { | ||
| let mut input_builder = StringDictionaryBuilder::<Int32Type>::new(); | ||
| input_builder.append_value("hi"); | ||
| input_builder.append_value("bye"); | ||
| input_builder.append_null(); | ||
| input_builder.append_value("rust"); | ||
| let input = input_builder.finish(); | ||
|
|
||
| let mut string_builder = StringBuilder::new(); | ||
| string_builder.append_value("6869"); | ||
| string_builder.append_value("627965"); | ||
| string_builder.append_null(); | ||
| string_builder.append_value("72757374"); | ||
| let expected = string_builder.finish(); | ||
|
|
||
| let columnar_value = ColumnarValue::Array(Arc::new(input)); | ||
| let result = super::spark_hex(&[columnar_value]).unwrap(); | ||
|
|
||
| let result = match result { | ||
| ColumnarValue::Array(array) => array, | ||
| _ => panic!("Expected array"), | ||
| }; | ||
|
|
||
| let result = as_string_array(&result); | ||
|
|
||
| assert_eq!(result, &expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_dictionary_hex_int64() { | ||
| let mut input_builder = PrimitiveDictionaryBuilder::<Int32Type, Int64Type>::new(); | ||
| input_builder.append_value(1); | ||
| input_builder.append_value(2); | ||
| input_builder.append_null(); | ||
| input_builder.append_value(3); | ||
| let input = input_builder.finish(); | ||
|
|
||
| let mut string_builder = StringBuilder::new(); | ||
| string_builder.append_value("1"); | ||
| string_builder.append_value("2"); | ||
| string_builder.append_null(); | ||
| string_builder.append_value("3"); | ||
| let expected = string_builder.finish(); | ||
|
|
||
| let columnar_value = ColumnarValue::Array(Arc::new(input)); | ||
| let result = super::spark_hex(&[columnar_value]).unwrap(); | ||
|
|
||
| let result = match result { | ||
| ColumnarValue::Array(array) => array, | ||
| _ => panic!("Expected array"), | ||
| }; | ||
|
|
||
| let result = as_string_array(&result); | ||
|
|
||
| assert_eq!(result, &expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_dictionary_hex_binary() { | ||
| let mut input_builder = BinaryDictionaryBuilder::<Int32Type>::new(); | ||
| input_builder.append_value("1"); | ||
| input_builder.append_value("1"); | ||
| input_builder.append_null(); | ||
| input_builder.append_value("3"); | ||
| let input = input_builder.finish(); | ||
|
|
||
| let mut expected_builder = StringBuilder::new(); | ||
| expected_builder.append_value("31"); | ||
| expected_builder.append_value("31"); | ||
| expected_builder.append_null(); | ||
| expected_builder.append_value("33"); | ||
| let expected = expected_builder.finish(); | ||
|
|
||
| let columnar_value = ColumnarValue::Array(Arc::new(input)); | ||
| let result = super::spark_hex(&[columnar_value]).unwrap(); | ||
|
|
||
| let result = match result { | ||
| ColumnarValue::Array(array) => array, | ||
| _ => panic!("Expected array"), | ||
| }; | ||
|
|
||
| let result = as_string_array(&result); | ||
|
|
||
| assert_eq!(result, &expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_hex_int64() { | ||
| let num = 1234; | ||
| let hexed = super::hex_int64(num); | ||
| assert_eq!(hexed, "4D2".to_string()); | ||
|
|
||
| let num = -1; | ||
| let hexed = super::hex_int64(num); | ||
| assert_eq!(hexed, "FFFFFFFFFFFFFFFF".to_string()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_spark_hex_int64() { | ||
| let int_array = Int64Array::from(vec![Some(1), Some(2), None, Some(3)]); | ||
| let columnar_value = ColumnarValue::Array(Arc::new(int_array)); | ||
|
|
||
| let result = super::spark_hex(&[columnar_value]).unwrap(); | ||
| let result = match result { | ||
| ColumnarValue::Array(array) => array, | ||
| _ => panic!("Expected array"), | ||
| }; | ||
|
|
||
| let string_array = as_string_array(&result); | ||
| let expected_array = StringArray::from(vec![ | ||
| Some("1".to_string()), | ||
| Some("2".to_string()), | ||
| None, | ||
| Some("3".to_string()), | ||
| ]); | ||
|
|
||
| assert_eq!(string_array, &expected_array); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1038,6 +1038,22 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { | |
| } | ||
| } | ||
| } | ||
|
|
||
| test("hex") { | ||
| Seq(true, false).foreach { dictionaryEnabled => | ||
| withTempDir { dir => | ||
| val path = new Path(dir.toURI.toString, "hex.parquet") | ||
| makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) | ||
|
|
||
| withParquetTable(path.toString, "tbl") { | ||
| // _9 and _10 (uint8 and uint16) not supported | ||
| checkSparkAnswerAndOperator( | ||
| "SELECT hex(_1), hex(_2), hex(_3), hex(_4), hex(_5), hex(_6), hex(_7), hex(_8), hex(_11), hex(_12), hex(_13), hex(_14), hex(_15), hex(_16), hex(_17), hex(_18), hex(_19), hex(_20) FROM tbl") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's also add scalar tests too
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for bringing this up. I guess I thought scalars weren't supported yet (maybe for UDFs only)? From what I could tell none of the other tests test scalars, i.e. most of them insert into a table, then query the table. E.g., test("Chr") {
Seq(false, true).foreach { dictionary =>
withSQLConf(
"parquet.enable.dictionary" -> dictionary.toString,
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
val table = "test"
withTable(table) {
sql(s"create table $table(col varchar(20)) using parquet")
sql(
s"insert into $table values('65'), ('66'), ('67'), ('68'), ('65'), ('66'), ('67'), ('68')")
checkSparkAnswerAndOperator(s"SELECT chr(col) FROM $table")
}
}
}
}Also running scalar tests seems to fail for the other UDFs I spot checked, e.g., Should they be working w/ UDFs, and it's just nothing else tests for them?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have several scalar tests i.e. Since you have the code to handle scalar, it is best to test them. Or it is also okay to disable scalar for now and put it as TODO.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I agree it's best to test since they're there. I just genuinely thought it wasn't supported given the test coverage, the errors on the other functions, etc. I'll have a look at adding some tests...
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm realizing that I'm not going to have time next week and didn't expect this PR to take this long, so I've removed the scalar handling for now and hopefully can follow an example in the future. 88bdcde
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can keep the scalar functions, and it should be pretty straightforward to test scalar input? Namely, it should be something like: The constant literal should be encoded as a ScalarValue.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried that already and got test failures because the native function isn't being recognized w/ scalars. The same output I mentioned w/
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like QueryPlanSerde is failing the case match. Also right now, since the scalar code was removed from rust, it will fail if the native code happen to find scalar values...
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, yes, I did also try disabling ConstantFolding to no avail.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("unhex") { | ||
| // When running against Spark 3.2, we include a bug fix for https://issues.apache.org/jira/browse/SPARK-40924 that | ||
| // was added in Spark 3.3, so although Comet's behavior is more correct when running against Spark 3.2, it is not | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.