From b7fdc9357fadf72becea547e4ef9e4a0d0dd2026 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 24 Jul 2019 14:53:49 +0800 Subject: [PATCH 1/3] Assignable Cast --- .../sql/catalyst/analysis/Analyzer.scala | 35 ++++++++++++++- .../spark/sql/catalyst/expressions/Cast.scala | 42 ++++++++++++++++++ .../sql/catalyst/expressions/CastSuite.scala | 43 +++++++++++++++++++ 3 files changed, 119 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e55cdfedd3234..f3225ec652219 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -174,6 +174,7 @@ class Analyzer( ResolveDeserializer :: ResolveNewInstance :: ResolveUpCast :: + ResolveAssignableCast :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveOrdinalInOrderByAndGroupBy :: @@ -2454,7 +2455,7 @@ class Analyzer( } else { // always add an UpCast. it will be removed in the optimizer if it is unnecessary. Some(Alias( - UpCast(queryExpr, tableAttr.dataType), tableAttr.name + AssignableCast(queryExpr, tableAttr.dataType), tableAttr.name )( explicitMetadata = Option(tableAttr.metadata) )) @@ -2665,6 +2666,38 @@ class Analyzer( } } } + + /** + * Replace the [[AssignableCast]] expression by [[Cast]], and throw exceptions if the cast is + * not assignable. + */ + object ResolveAssignableCast extends Rule[LogicalPlan] { + private def fail(from: Expression, to: DataType, walkedTypePath: Seq[String]) = { + val fromStr = from match { + case l: LambdaVariable => "array element" + case e => e.sql + } + throw new AnalysisException(s"Cannot assign $fromStr from " + + s"${from.dataType.catalogString} to ${to.catalogString}.\n" + + "The type path of the target object is:\n" + walkedTypePath.mkString("", "\n", "\n") + + "You can add an explicit cast to the input data.") + } + + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case p if !p.childrenResolved => p + case p if p.resolved => p + + case p => p transformExpressions { + case u @ AssignableCast(child, _, _) if !child.resolved => u + + case AssignableCast(child, dataType, walkedTypePath) + if !Cast.canAssign(child.dataType, dataType) => + fail(child, dataType, walkedTypePath) + + case AssignableCast(child, dataType, _) => Cast(child, dataType.asNullable) + } + } + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 969128838eba4..0031c70a8dbe8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -154,6 +154,39 @@ object Cast { case _ => false } + /** + * Returns true iff we can assign the `from` type to `to` type according to the store assignment + * rules of ANSI SQL. For example, string -> int, array -> string are not assignable cast. + */ + def canAssign(from: DataType, to: DataType): Boolean = (from, to) match { + case _ if from == to => true + + case (_: NumericType, _: NumericType) => true + case (_: AtomicType, StringType) => true + case (NullType, _) => true + case (DateType, TimestampType) => true + case (TimestampType, DateType) => true + // Spark supports casting between long and timestamp, please see `longToTimestamp` and + // `timestampToLong` for details. + case (TimestampType, LongType) => true + case (LongType, TimestampType) => true + + case (ArrayType(fromType, fn), ArrayType(toType, tn)) => + resolvableNullability(fn, tn) && canAssign(fromType, toType) + + case (MapType(fromKey, fromValue, fn), MapType(toKey, toValue, tn)) => + resolvableNullability(fn, tn) && canAssign(fromKey, toKey) && canAssign(fromValue, toValue) + + case (StructType(fromFields), StructType(toFields)) => + fromFields.length == toFields.length && + fromFields.zip(toFields).forall { + case (f1, f2) => + resolvableNullability(f1.nullable, f2.nullable) && canAssign(f1.dataType, f2.dataType) + } + + case _ => false + } + private def legalNumericPrecedence(from: DataType, to: DataType): Boolean = { val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from) val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to) @@ -1407,3 +1440,12 @@ case class UpCast(child: Expression, dataType: DataType, walkedTypePath: Seq[Str extends UnaryExpression with Unevaluable { override lazy val resolved = false } + +/** + * Cast the child expression to the target data type, but will throw error if the cast violates + * the store assignment rules of ANSI SQL, e.g. string -> int, array -> string. + */ +case class AssignableCast(child: Expression, dataType: DataType, walkedTypePath: Seq[String] = Nil) + extends UnaryExpression with Unevaluable { + override lazy val resolved = false +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index 4d667fd61ae01..6f71fedd455de 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -1004,6 +1004,49 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("Assignable cast") { + import DataTypeTestUtils.numericTypes + + numericTypes.foreach { from => + assert(!Cast.canAssign(StringType, from)) + assert(!Cast.canAssign(ArrayType(StringType), ArrayType(from))) + assert(!Cast.canAssign(MapType(from, StringType), MapType(from, from))) + assert(!Cast.canAssign(MapType(StringType, from), MapType(from, from))) + assert(!Cast.canAssign(StructType(Seq(new StructField("f", StringType))), + StructType(Seq(new StructField("f", from))))) + numericTypes.foreach { to => + assert(Cast.canAssign(from, to)) + assert(Cast.canAssign(ArrayType(from), ArrayType(to))) + assert(Cast.canAssign(MapType(from, to), MapType(to, from))) + assert(Cast.canAssign(StructType(Seq(new StructField("f", from))), + StructType(Seq(new StructField("f", to))))) + } + } + + val dataTypes = Seq(IntegerType, BinaryType, CalendarIntervalType, ArrayType(LongType), + MapType(StringType, StringType), StructType(Seq(new StructField("f", DoubleType)))) + + dataTypes.foreach { from => + assert(Cast.canAssign(NullType, from)) + from match { + case _: AtomicType => assert(Cast.canAssign(from, StringType)) + case _ => assert(!Cast.canAssign(from, StringType)) + } + dataTypes.foreach { to => + if (from == to) { + assert(Cast.canAssign(from, to)) + } else { + assert(!Cast.canAssign(from, to)) + } + } + } + + assert(Cast.canAssign(DateType, TimestampType)) + assert(Cast.canAssign(TimestampType, DateType)) + assert(Cast.canAssign(TimestampType, LongType)) + assert(Cast.canAssign(LongType, TimestampType)) + } + test("SPARK-27671: cast from nested null type in struct") { import DataTypeTestUtils._ From 84c92004db9fe0a5d40dfd84794b8833de099156 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 24 Jul 2019 20:35:01 +0800 Subject: [PATCH 2/3] simplify code --- .../sql/catalyst/analysis/Analyzer.scala | 35 +-------------- .../spark/sql/catalyst/expressions/Cast.scala | 42 ------------------ .../org/apache/spark/sql/types/DataType.scala | 21 +++++++-- .../sql/catalyst/expressions/CastSuite.scala | 43 ------------------- 4 files changed, 19 insertions(+), 122 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f3225ec652219..85fcdb7c1704a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -174,7 +174,6 @@ class Analyzer( ResolveDeserializer :: ResolveNewInstance :: ResolveUpCast :: - ResolveAssignableCast :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveOrdinalInOrderByAndGroupBy :: @@ -2455,7 +2454,7 @@ class Analyzer( } else { // always add an UpCast. it will be removed in the optimizer if it is unnecessary. Some(Alias( - AssignableCast(queryExpr, tableAttr.dataType), tableAttr.name + Cast(queryExpr, tableAttr.dataType), tableAttr.name )( explicitMetadata = Option(tableAttr.metadata) )) @@ -2666,38 +2665,6 @@ class Analyzer( } } } - - /** - * Replace the [[AssignableCast]] expression by [[Cast]], and throw exceptions if the cast is - * not assignable. - */ - object ResolveAssignableCast extends Rule[LogicalPlan] { - private def fail(from: Expression, to: DataType, walkedTypePath: Seq[String]) = { - val fromStr = from match { - case l: LambdaVariable => "array element" - case e => e.sql - } - throw new AnalysisException(s"Cannot assign $fromStr from " + - s"${from.dataType.catalogString} to ${to.catalogString}.\n" + - "The type path of the target object is:\n" + walkedTypePath.mkString("", "\n", "\n") + - "You can add an explicit cast to the input data.") - } - - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case p if !p.childrenResolved => p - case p if p.resolved => p - - case p => p transformExpressions { - case u @ AssignableCast(child, _, _) if !child.resolved => u - - case AssignableCast(child, dataType, walkedTypePath) - if !Cast.canAssign(child.dataType, dataType) => - fail(child, dataType, walkedTypePath) - - case AssignableCast(child, dataType, _) => Cast(child, dataType.asNullable) - } - } - } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 0031c70a8dbe8..969128838eba4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -154,39 +154,6 @@ object Cast { case _ => false } - /** - * Returns true iff we can assign the `from` type to `to` type according to the store assignment - * rules of ANSI SQL. For example, string -> int, array -> string are not assignable cast. - */ - def canAssign(from: DataType, to: DataType): Boolean = (from, to) match { - case _ if from == to => true - - case (_: NumericType, _: NumericType) => true - case (_: AtomicType, StringType) => true - case (NullType, _) => true - case (DateType, TimestampType) => true - case (TimestampType, DateType) => true - // Spark supports casting between long and timestamp, please see `longToTimestamp` and - // `timestampToLong` for details. - case (TimestampType, LongType) => true - case (LongType, TimestampType) => true - - case (ArrayType(fromType, fn), ArrayType(toType, tn)) => - resolvableNullability(fn, tn) && canAssign(fromType, toType) - - case (MapType(fromKey, fromValue, fn), MapType(toKey, toValue, tn)) => - resolvableNullability(fn, tn) && canAssign(fromKey, toKey) && canAssign(fromValue, toValue) - - case (StructType(fromFields), StructType(toFields)) => - fromFields.length == toFields.length && - fromFields.zip(toFields).forall { - case (f1, f2) => - resolvableNullability(f1.nullable, f2.nullable) && canAssign(f1.dataType, f2.dataType) - } - - case _ => false - } - private def legalNumericPrecedence(from: DataType, to: DataType): Boolean = { val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from) val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to) @@ -1440,12 +1407,3 @@ case class UpCast(child: Expression, dataType: DataType, walkedTypePath: Seq[Str extends UnaryExpression with Unevaluable { override lazy val resolved = false } - -/** - * Cast the child expression to the target data type, but will throw error if the cast violates - * the store assignment rules of ANSI SQL, e.g. string -> int, array -> string. - */ -case class AssignableCast(child: Expression, dataType: DataType, walkedTypePath: Seq[String] = Nil) - extends UnaryExpression with Unevaluable { - override lazy val resolved = false -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index a35e971d08823..514d0543c0a66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -442,14 +442,16 @@ object DataType { fieldCompatible case (w: AtomicType, r: AtomicType) => - if (!Cast.canUpCast(w, r)) { - addError(s"Cannot safely cast '$context': $w to $r") + if (!canWriteAtomicType(w, r)) { + addError(s"Cannot assign '$context': $w to $r") false } else { true } - case (w, r) if w.sameType(r) && !w.isInstanceOf[NullType] => + case (NullType, _) => true + + case (w, r) if w.sameType(r) => true case (w, r) => @@ -457,4 +459,17 @@ object DataType { false } } + + private def canWriteAtomicType(from: AtomicType, to: AtomicType): Boolean = { + case _ if from == to => true + case (_: NumericType, _: NumericType) => true + case (_, StringType) => true + case (DateType, TimestampType) => true + case (TimestampType, DateType) => true + // Spark supports casting between long and timestamp, please see `longToTimestamp` and + // `timestampToLong` for details. + case (TimestampType, LongType) => true + case (LongType, TimestampType) => true + case _ => false + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index 6f71fedd455de..4d667fd61ae01 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -1004,49 +1004,6 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { } } - test("Assignable cast") { - import DataTypeTestUtils.numericTypes - - numericTypes.foreach { from => - assert(!Cast.canAssign(StringType, from)) - assert(!Cast.canAssign(ArrayType(StringType), ArrayType(from))) - assert(!Cast.canAssign(MapType(from, StringType), MapType(from, from))) - assert(!Cast.canAssign(MapType(StringType, from), MapType(from, from))) - assert(!Cast.canAssign(StructType(Seq(new StructField("f", StringType))), - StructType(Seq(new StructField("f", from))))) - numericTypes.foreach { to => - assert(Cast.canAssign(from, to)) - assert(Cast.canAssign(ArrayType(from), ArrayType(to))) - assert(Cast.canAssign(MapType(from, to), MapType(to, from))) - assert(Cast.canAssign(StructType(Seq(new StructField("f", from))), - StructType(Seq(new StructField("f", to))))) - } - } - - val dataTypes = Seq(IntegerType, BinaryType, CalendarIntervalType, ArrayType(LongType), - MapType(StringType, StringType), StructType(Seq(new StructField("f", DoubleType)))) - - dataTypes.foreach { from => - assert(Cast.canAssign(NullType, from)) - from match { - case _: AtomicType => assert(Cast.canAssign(from, StringType)) - case _ => assert(!Cast.canAssign(from, StringType)) - } - dataTypes.foreach { to => - if (from == to) { - assert(Cast.canAssign(from, to)) - } else { - assert(!Cast.canAssign(from, to)) - } - } - } - - assert(Cast.canAssign(DateType, TimestampType)) - assert(Cast.canAssign(TimestampType, DateType)) - assert(Cast.canAssign(TimestampType, LongType)) - assert(Cast.canAssign(LongType, TimestampType)) - } - test("SPARK-27671: cast from nested null type in struct") { import DataTypeTestUtils._ From 1abcc9d8a228fbceaabb24de9deb0c92c6298087 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 24 Jul 2019 20:52:55 +0800 Subject: [PATCH 3/3] fix build --- .../src/main/scala/org/apache/spark/sql/types/DataType.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 514d0543c0a66..e51d7eb5e2a51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -460,7 +460,7 @@ object DataType { } } - private def canWriteAtomicType(from: AtomicType, to: AtomicType): Boolean = { + private def canWriteAtomicType(from: AtomicType, to: AtomicType): Boolean = (from, to) match { case _ if from == to => true case (_: NumericType, _: NumericType) => true case (_, StringType) => true