Skip to content

Commit e7cd746

Browse files
authored
feat: Use Datafusion's Sha2 and remove Comet's implementation. (#2063)
1 parent 76ea742 commit e7cd746

File tree

8 files changed

+30
-134
lines changed

8 files changed

+30
-134
lines changed

native/core/src/execution/jni_api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use datafusion::{
4040
prelude::{SessionConfig, SessionContext},
4141
};
4242
use datafusion_comet_proto::spark_operator::Operator;
43+
use datafusion_spark::function::hash::sha2::SparkSha2;
4344
use datafusion_spark::function::math::expm1::SparkExpm1;
4445
use futures::poll;
4546
use futures::stream::StreamExt;
@@ -288,6 +289,7 @@ fn prepare_datafusion_session_context(
288289

289290
// register UDFs from datafusion-spark crate
290291
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkExpm1::default()));
292+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha2::default()));
291293

292294
// Must be the last one to override existing functions with the same name
293295
datafusion_comet_spark_expr::register_all_comet_functions(&mut session_ctx)?;

native/spark-expr/src/comet_scalar_funcs.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -128,22 +128,6 @@ pub fn create_comet_physical_fun(
128128
let func = Arc::new(spark_isnan);
129129
make_comet_scalar_udf!("isnan", func, without data_type)
130130
}
131-
"sha224" => {
132-
let func = Arc::new(spark_sha224);
133-
make_comet_scalar_udf!("sha224", func, without data_type)
134-
}
135-
"sha256" => {
136-
let func = Arc::new(spark_sha256);
137-
make_comet_scalar_udf!("sha256", func, without data_type)
138-
}
139-
"sha384" => {
140-
let func = Arc::new(spark_sha384);
141-
make_comet_scalar_udf!("sha384", func, without data_type)
142-
}
143-
"sha512" => {
144-
let func = Arc::new(spark_sha512);
145-
make_comet_scalar_udf!("sha512", func, without data_type)
146-
}
147131
"date_add" => {
148132
let func = Arc::new(spark_date_add);
149133
make_comet_scalar_udf!("date_add", func, without data_type)

native/spark-expr/src/hash_funcs/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616
// under the License.
1717

1818
pub mod murmur3;
19-
mod sha2;
2019
pub(super) mod utils;
2120
mod xxhash64;
2221

2322
pub use murmur3::spark_murmur3_hash;
24-
pub use sha2::{spark_sha224, spark_sha256, spark_sha384, spark_sha512};
2523
pub use xxhash64::spark_xxhash64;

native/spark-expr/src/hash_funcs/sha2.rs

Lines changed: 0 additions & 84 deletions
This file was deleted.

native/spark-expr/src/math_funcs/hex.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,6 @@ fn hex_encode<T: AsRef<[u8]>>(data: T, lower_case: bool) -> String {
5151
s
5252
}
5353

54-
#[inline(always)]
55-
pub(crate) fn hex_strings<T: AsRef<[u8]>>(data: T) -> String {
56-
hex_encode(data, true)
57-
}
58-
5954
#[inline(always)]
6055
fn hex_bytes<T: AsRef<[u8]>>(bytes: T) -> Result<String, std::fmt::Error> {
6156
let hex_string = hex_encode(bytes, false);

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
122122
classOf[Lower] -> CometLower,
123123
classOf[Murmur3Hash] -> CometMurmur3Hash,
124124
classOf[XxHash64] -> CometXxHash64,
125+
classOf[Sha2] -> CometSha2,
125126
classOf[MapKeys] -> CometMapKeys,
126127
classOf[MapEntries] -> CometMapEntries,
127128
classOf[MapValues] -> CometMapValues,
@@ -1542,29 +1543,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
15421543
None
15431544
}
15441545

1545-
case Sha2(left, numBits) =>
1546-
if (!numBits.foldable) {
1547-
withInfo(expr, "non literal numBits is not supported")
1548-
return None
1549-
}
1550-
// it's possible for spark to dynamically compute the number of bits from input
1551-
// expression, however DataFusion does not support that yet.
1552-
val childExpr = exprToProtoInternal(left, inputs, binding)
1553-
val bits = numBits.eval().asInstanceOf[Int]
1554-
val algorithm = bits match {
1555-
case 224 => "sha224"
1556-
case 256 | 0 => "sha256"
1557-
case 384 => "sha384"
1558-
case 512 => "sha512"
1559-
case _ =>
1560-
null
1561-
}
1562-
if (algorithm == null) {
1563-
exprToProtoInternal(Literal(null, StringType), inputs, binding)
1564-
} else {
1565-
scalarFunctionExprToProtoWithReturnType(algorithm, StringType, childExpr)
1566-
}
1567-
15681546
case struct @ CreateNamedStruct(_) =>
15691547
if (struct.names.length != struct.names.distinct.length) {
15701548
withInfo(expr, "CreateNamedStruct with duplicate field names are not supported")

spark/src/main/scala/org/apache/comet/serde/hash.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
package org.apache.comet.serde
2121

22-
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Murmur3Hash, XxHash64}
23-
import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType}
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Murmur3Hash, Sha2, XxHash64}
23+
import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType, StringType}
2424

2525
import org.apache.comet.CometSparkSessionExtensions.withInfo
2626
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, scalarFunctionExprToProtoWithReturnType, serializeDataType, supportedDataType}
@@ -65,6 +65,29 @@ object CometMurmur3Hash extends CometExpressionSerde {
6565
}
6666
}
6767

68+
object CometSha2 extends CometExpressionSerde {
69+
override def convert(
70+
expr: Expression,
71+
inputs: Seq[Attribute],
72+
binding: Boolean): Option[ExprOuterClass.Expr] = {
73+
if (!HashUtils.isSupportedType(expr)) {
74+
return None
75+
}
76+
77+
// It's possible for spark to dynamically compute the number of bits from input
78+
// expression, however DataFusion does not support that yet.
79+
val sha2Expr = expr.asInstanceOf[Sha2]
80+
if (!sha2Expr.right.foldable) {
81+
withInfo(expr, "For Sha2, non-foldable right argument is not supported")
82+
return None
83+
}
84+
85+
val leftExpr = exprToProtoInternal(sha2Expr.left, inputs, binding)
86+
val numBitsExpr = exprToProtoInternal(sha2Expr.right, inputs, binding)
87+
scalarFunctionExprToProtoWithReturnType("sha2", StringType, leftExpr, numBitsExpr)
88+
}
89+
}
90+
6891
private object HashUtils {
6992
def isSupportedType(expr: Expression): Boolean = {
7093
for (child <- expr.children) {

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1865,7 +1865,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
18651865
|md5(col), md5(cast(a as string)), md5(cast(b as string)),
18661866
|hash(col), hash(col, 1), hash(col, 0), hash(col, a, b), hash(b, a, col),
18671867
|xxhash64(col), xxhash64(col, 1), xxhash64(col, 0), xxhash64(col, a, b), xxhash64(b, a, col),
1868-
|sha2(col, 0), sha2(col, 256), sha2(col, 224), sha2(col, 384), sha2(col, 512), sha2(col, 128)
1868+
|sha2(col, 0), sha2(col, 256), sha2(col, 224), sha2(col, 384), sha2(col, 512), sha2(col, 128), sha2(col, -1)
18691869
|from test
18701870
|""".stripMargin)
18711871
}
@@ -1977,7 +1977,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
19771977
|md5(col), md5(cast(a as string)), --md5(cast(b as string)),
19781978
|hash(col), hash(col, 1), hash(col, 0), hash(col, a, b), hash(b, a, col),
19791979
|xxhash64(col), xxhash64(col, 1), xxhash64(col, 0), xxhash64(col, a, b), xxhash64(b, a, col),
1980-
|sha2(col, 0), sha2(col, 256), sha2(col, 224), sha2(col, 384), sha2(col, 512), sha2(col, 128)
1980+
|sha2(col, 0), sha2(col, 256), sha2(col, 224), sha2(col, 384), sha2(col, 512), sha2(col, 128), sha2(col, -1)
19811981
|from test
19821982
|""".stripMargin)
19831983
}

0 commit comments

Comments
 (0)