From c725692b7973a1d961a82a5f5a1f395c769d6875 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Mon, 4 Nov 2024 21:13:20 +0800 Subject: [PATCH 1/7] [GLUTEN-7749][VL] Trim ISOControl characters in string for casting to integral type --- .../backendsapi/velox/VeloxSparkPlanExecApi.scala | 10 ++++++++++ .../org/apache/spark/sql/GlutenDataFrameSuite.scala | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 81564a44011..c9cb7769082 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -723,6 +723,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val trimParaSepStr = "\u2029" // Needs to be trimmed for casting to float/double/decimal val trimSpaceStr = ('\u0000' to '\u0020').toList.mkString + // ISOControl characters, refer java.lang.Character.isISOControl(int) + val isoControlControlStr = (('\u0000' to '\u001F') ++ ('\u007F' to '\u009F')).toList.mkString // scalastyle:on nonascii c.dataType match { case BinaryType | _: ArrayType | _: MapType | _: StructType | _: UserDefinedType[_] => @@ -735,6 +737,14 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { case _ => c } + case ByteType | ShortType | IntegerType | LongType => + c.child.dataType match { + case StringType => + val trimNode = StringTrim(c.child, Some(Literal(trimWhitespaceStr + isoControlControlStr))) + c.withNewChildren(Seq(trimNode)).asInstanceOf[Cast] + case _ => + c + } case _ => c.child.dataType match { case StringType if GlutenConfig.getConf.castFromVarcharAddTrimNode => diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala index 4008f862e17..f3c8dc0bd22 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala @@ -329,12 +329,12 @@ class GlutenDataFrameSuite extends DataFrameSuite with GlutenSQLTestsTrait { } // scalastyle:off nonascii - Seq(" 123", "123 ", " 123 ", "\u2000123\n\n\n", "123\r\r\r", "123\f\f\f", "123\u000C") + Seq(" 123", "123 ", " 123 ", "\u2000123\n\n\n", "123\r\r\r", "123\f\f\f", "123\u000C", "123\u0000") .toDF("col1") .createOrReplaceTempView("t1") // scalastyle:on nonascii val expectedIntResult = Row(123) :: Row(123) :: - Row(123) :: Row(123) :: Row(123) :: Row(123) :: Row(123) :: Nil + Row(123) :: Row(123) :: Row(123) :: Row(123) :: Row(123) :: Row(123) :: Nil var df = spark.sql("select cast(col1 as int) from t1") checkResult(df, expectedIntResult) df = spark.sql("select cast(col1 as long) from t1") From 8483256e3bda50f1aac97591a14f7c9a7a7acb58 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Mon, 4 Nov 2024 21:29:43 +0800 Subject: [PATCH 2/7] respect castFromVarcharAddTrimNode --- .../apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index c9cb7769082..d6973fec4cc 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -739,7 +739,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { } case ByteType | ShortType | IntegerType | LongType => c.child.dataType match { - case StringType => + case StringType if GlutenConfig.getConf.castFromVarcharAddTrimNode => val trimNode = StringTrim(c.child, Some(Literal(trimWhitespaceStr + isoControlControlStr))) c.withNewChildren(Seq(trimNode)).asInstanceOf[Cast] case _ => From 2bd230197067b520541328277b5cdbf46dc7f988 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Mon, 4 Nov 2024 21:33:06 +0800 Subject: [PATCH 3/7] enable castFromVarcharAddTrimNode for test --- .../spark/sql/GlutenDataFrameSuite.scala | 71 ++++++++++--------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala index f3c8dc0bd22..662dbd4ee2a 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql +import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.{ProjectExecTransformer, WholeStageTransformer} import org.apache.spark.SparkException @@ -323,41 +324,43 @@ class GlutenDataFrameSuite extends DataFrameSuite with GlutenSQLTestsTrait { } testGluten("Allow leading/trailing whitespace in string before casting") { - def checkResult(df: DataFrame, expectedResult: Seq[Row]): Unit = { - checkAnswer(df, expectedResult) - assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExecTransformer]).isDefined) - } + withSQLConf(GlutenConfig.CAST_FROM_VARCHAR_ADD_TRIM_NODE.key -> "true") { + def checkResult(df: DataFrame, expectedResult: Seq[Row]): Unit = { + checkAnswer(df, expectedResult) + assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExecTransformer]).isDefined) + } - // scalastyle:off nonascii - Seq(" 123", "123 ", " 123 ", "\u2000123\n\n\n", "123\r\r\r", "123\f\f\f", "123\u000C", "123\u0000") - .toDF("col1") - .createOrReplaceTempView("t1") - // scalastyle:on nonascii - val expectedIntResult = Row(123) :: Row(123) :: - Row(123) :: Row(123) :: Row(123) :: Row(123) :: Row(123) :: Row(123) :: Nil - var df = spark.sql("select cast(col1 as int) from t1") - checkResult(df, expectedIntResult) - df = spark.sql("select cast(col1 as long) from t1") - checkResult(df, expectedIntResult) - - Seq(" 123.5", "123.5 ", " 123.5 ", "123.5\n\n\n", "123.5\r\r\r", "123.5\f\f\f", "123.5\u000C") - .toDF("col1") - .createOrReplaceTempView("t1") - val expectedFloatResult = Row(123.5) :: Row(123.5) :: - Row(123.5) :: Row(123.5) :: Row(123.5) :: Row(123.5) :: Row(123.5) :: Nil - df = spark.sql("select cast(col1 as float) from t1") - checkResult(df, expectedFloatResult) - df = spark.sql("select cast(col1 as double) from t1") - checkResult(df, expectedFloatResult) - - // scalastyle:off nonascii - val rawData = - Seq(" abc", "abc ", " abc ", "\u2000abc\n\n\n", "abc\r\r\r", "abc\f\f\f", "abc\u000C") - // scalastyle:on nonascii - rawData.toDF("col1").createOrReplaceTempView("t1") - val expectedBinaryResult = rawData.map(d => Row(d.getBytes(StandardCharsets.UTF_8))).seq - df = spark.sql("select cast(col1 as binary) from t1") - checkResult(df, expectedBinaryResult) + // scalastyle:off nonascii + Seq(" 123", "123 ", " 123 ", "\u2000123\n\n\n", "123\r\r\r", "123\f\f\f", "123\u000C", "123\u0000") + .toDF("col1") + .createOrReplaceTempView("t1") + // scalastyle:on nonascii + val expectedIntResult = Row(123) :: Row(123) :: + Row(123) :: Row(123) :: Row(123) :: Row(123) :: Row(123) :: Row(123) :: Nil + var df = spark.sql("select cast(col1 as int) from t1") + checkResult(df, expectedIntResult) + df = spark.sql("select cast(col1 as long) from t1") + checkResult(df, expectedIntResult) + + Seq(" 123.5", "123.5 ", " 123.5 ", "123.5\n\n\n", "123.5\r\r\r", "123.5\f\f\f", "123.5\u000C") + .toDF("col1") + .createOrReplaceTempView("t1") + val expectedFloatResult = Row(123.5) :: Row(123.5) :: + Row(123.5) :: Row(123.5) :: Row(123.5) :: Row(123.5) :: Row(123.5) :: Nil + df = spark.sql("select cast(col1 as float) from t1") + checkResult(df, expectedFloatResult) + df = spark.sql("select cast(col1 as double) from t1") + checkResult(df, expectedFloatResult) + + // scalastyle:off nonascii + val rawData = + Seq(" abc", "abc ", " abc ", "\u2000abc\n\n\n", "abc\r\r\r", "abc\f\f\f", "abc\u000C") + // scalastyle:on nonascii + rawData.toDF("col1").createOrReplaceTempView("t1") + val expectedBinaryResult = rawData.map(d => Row(d.getBytes(StandardCharsets.UTF_8))).seq + df = spark.sql("select cast(col1 as binary) from t1") + checkResult(df, expectedBinaryResult) + } } private def withExpr(newExpr: Expression): Column = new Column(newExpr) From ddf0e5712797b2777ad0471089d331f4bcdd46d8 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Mon, 4 Nov 2024 21:48:01 +0800 Subject: [PATCH 4/7] refactor --- .../velox/VeloxSparkPlanExecApi.scala | 48 +++++++------------ 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index d6973fec4cc..77c60a1eeae 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -726,37 +726,25 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { // ISOControl characters, refer java.lang.Character.isISOControl(int) val isoControlControlStr = (('\u0000' to '\u001F') ++ ('\u007F' to '\u009F')).toList.mkString // scalastyle:on nonascii - c.dataType match { - case BinaryType | _: ArrayType | _: MapType | _: StructType | _: UserDefinedType[_] => - c - case FloatType | DoubleType | _: DecimalType => - c.child.dataType match { - case StringType if GlutenConfig.getConf.castFromVarcharAddTrimNode => - val trimNode = StringTrim(c.child, Some(Literal(trimSpaceStr))) - c.withNewChildren(Seq(trimNode)).asInstanceOf[Cast] - case _ => - c - } - case ByteType | ShortType | IntegerType | LongType => - c.child.dataType match { - case StringType if GlutenConfig.getConf.castFromVarcharAddTrimNode => - val trimNode = StringTrim(c.child, Some(Literal(trimWhitespaceStr + isoControlControlStr))) - c.withNewChildren(Seq(trimNode)).asInstanceOf[Cast] - case _ => - c - } - case _ => - c.child.dataType match { - case StringType if GlutenConfig.getConf.castFromVarcharAddTrimNode => - val trimNode = StringTrim( - c.child, - Some( - Literal(trimWhitespaceStr + - trimSpaceSepStr + trimLineSepStr + trimParaSepStr))) - c.withNewChildren(Seq(trimNode)).asInstanceOf[Cast] - case _ => - c + if (GlutenConfig.getConf.castFromVarcharAddTrimNode && c.child.dataType == StringType) { + val trimStr = c.dataType match { + case BinaryType | _: ArrayType | _: MapType | _: StructType | _: UserDefinedType[_] => + None + case FloatType | DoubleType | _: DecimalType => + Some(trimSpaceStr) + case ByteType | ShortType | IntegerType | LongType => + Some((trimWhitespaceStr + isoControlControlStr).toSet.mkString) + case _ => + Some(trimWhitespaceStr + trimSpaceSepStr + trimLineSepStr + trimParaSepStr) + } + trimStr + .map { + trim => + c.withNewChildren(Seq(StringTrim(c.child, Some(Literal(trim))))).asInstanceOf[Cast] } + .getOrElse(c) + } else { + c } } From 15f8cfbbf58ecc1d0c4241cb957a36e0379b34fd Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Tue, 5 Nov 2024 10:07:54 +0800 Subject: [PATCH 5/7] fix style --- .../org/apache/spark/sql/GlutenDataFrameSuite.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala index 662dbd4ee2a..3b2db7117f4 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameSuite.scala @@ -327,11 +327,20 @@ class GlutenDataFrameSuite extends DataFrameSuite with GlutenSQLTestsTrait { withSQLConf(GlutenConfig.CAST_FROM_VARCHAR_ADD_TRIM_NODE.key -> "true") { def checkResult(df: DataFrame, expectedResult: Seq[Row]): Unit = { checkAnswer(df, expectedResult) - assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExecTransformer]).isDefined) + assert( + find(df.queryExecution.executedPlan)(_.isInstanceOf[ProjectExecTransformer]).isDefined) } // scalastyle:off nonascii - Seq(" 123", "123 ", " 123 ", "\u2000123\n\n\n", "123\r\r\r", "123\f\f\f", "123\u000C", "123\u0000") + Seq( + " 123", + "123 ", + " 123 ", + "\u2000123\n\n\n", + "123\r\r\r", + "123\f\f\f", + "123\u000C", + "123\u0000") .toDF("col1") .createOrReplaceTempView("t1") // scalastyle:on nonascii From 00eff4c6bfc8bed087fc2852862a2b418c090656 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Tue, 5 Nov 2024 16:00:09 +0800 Subject: [PATCH 6/7] address comment --- .../gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 77c60a1eeae..c72a615be6e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -732,10 +732,11 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { None case FloatType | DoubleType | _: DecimalType => Some(trimSpaceStr) - case ByteType | ShortType | IntegerType | LongType => - Some((trimWhitespaceStr + isoControlControlStr).toSet.mkString) case _ => - Some(trimWhitespaceStr + trimSpaceSepStr + trimLineSepStr + trimParaSepStr) + Some( + (trimWhitespaceStr + trimSpaceSepStr + trimLineSepStr + + trimParaSepStr + isoControlControlStr).toSet.mkString + ) } trimStr .map { From 945dfddeaa9bac8021cddc3a87a895503d5d2db1 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Wed, 6 Nov 2024 11:31:45 +0800 Subject: [PATCH 7/7] fix typo --- .../gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index c72a615be6e..7901374f6bf 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -724,7 +724,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { // Needs to be trimmed for casting to float/double/decimal val trimSpaceStr = ('\u0000' to '\u0020').toList.mkString // ISOControl characters, refer java.lang.Character.isISOControl(int) - val isoControlControlStr = (('\u0000' to '\u001F') ++ ('\u007F' to '\u009F')).toList.mkString + val isoControlStr = (('\u0000' to '\u001F') ++ ('\u007F' to '\u009F')).toList.mkString // scalastyle:on nonascii if (GlutenConfig.getConf.castFromVarcharAddTrimNode && c.child.dataType == StringType) { val trimStr = c.dataType match { @@ -735,7 +735,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { case _ => Some( (trimWhitespaceStr + trimSpaceSepStr + trimLineSepStr - + trimParaSepStr + isoControlControlStr).toSet.mkString + + trimParaSepStr + isoControlStr).toSet.mkString ) } trimStr