From 280b941a1adc2ffcd82810a69f3d9e475607b70a Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sat, 28 Apr 2018 16:26:48 -0700 Subject: [PATCH 01/13] Checkpoint changes --- .../sql/catalyst/expressions/SortOrder.scala | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index ff7c98f714905..9d2b5fefe696c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.collection.unsafe.sort.PrefixComparators._ abstract sealed class SortDirection { @@ -147,7 +148,40 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { (!child.isAscending && child.nullOrdering == NullsLast) } - override def eval(input: InternalRow): Any = throw new UnsupportedOperationException + override def eval(input: InternalRow): Any = { + val value = child.child.eval(input) + if (value == null) { + return null + } + val prefix = child.child.dataType match { + case BooleanType => + if (value.asInstanceOf[Boolean]) 1L else 0L + case _: IntegralType => + value.asInstanceOf[java.lang.Number].longValue() + case DateType | TimestampType => + value.asInstanceOf[java.lang.Number].longValue() + case FloatType | DoubleType => + val dVal = value.asInstanceOf[java.lang.Number].doubleValue() + DoublePrefixComparator.computePrefix(dVal) + case StringType => + StringPrefixComparator.computePrefix(value.asInstanceOf[UTF8String]) + case BinaryType => + BinaryPrefixComparator.computePrefix(value.asInstanceOf[Array[Byte]]) + case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => + val dtValue = value.asInstanceOf[Decimal] + if (dt.precision <= Decimal.MAX_LONG_DIGITS) { + dtValue + } else { + val p = Decimal.MAX_LONG_DIGITS + val s = p - (dt.precision - dt.scale) + if (dtValue.changePrecision(p, s)) dtValue.toUnscaledLong else Long.MinValue + } + case dt: DecimalType => + val dtValue = value.asInstanceOf[Decimal].toDouble + DoublePrefixComparator.computePrefix(dtValue) + } + prefix + } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val childCode = child.child.genCode(ctx) From 4a810ae8776893ba1d12e620930473fdd61d1f49 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sun, 29 Apr 2018 11:25:26 -0700 Subject: [PATCH 02/13] Checkpoint commit --- .../SortOrderExpressionsSuite.scala | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala new file mode 100644 index 0000000000000..5dcf48fde54d8 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.sql.{Date, Timestamp} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.{BinaryPrefixComparator, DoublePrefixComparator, StringPrefixComparator} + +class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { + + test("SortPrefix") { + val i1 = Literal.create(20132983, IntegerType) + val i2 = Literal.create(-20132983, IntegerType) + val l1 = Literal.create(20132983, LongType) + val l2 = Literal.create(-20132983, LongType) + val millis = 1524954911000L; + val d1 = Literal.create(new java.sql.Date(millis), DateType) + val t1 = Literal.create(new Timestamp(millis), TimestampType) + val f1 = Literal.create(0.7788229f, FloatType) + val f2 = Literal.create(-0.7788229f, FloatType) + val db1 = Literal.create(0.7788229d, DoubleType) + val db2 = Literal.create(-0.7788229d, DoubleType) + val s1 = Literal.create("T", StringType) + val s2 = Literal.create("This is longer than 8 characters", StringType) + val b1 = Literal.create(Array[Byte](12), BinaryType) + val b2 = Literal.create(Array[Byte](12, 17, 99, 0, 0, 0, 2, 3, 0xf4.asInstanceOf[Byte]), + BinaryType) + + checkEvaluation(SortPrefix(SortOrder(i1, Ascending)), 20132983L) + checkEvaluation(SortPrefix(SortOrder(i2, Ascending)), -20132983L) + checkEvaluation(SortPrefix(SortOrder(l1, Ascending)), 20132983L) + checkEvaluation(SortPrefix(SortOrder(l2, Ascending)), -20132983L) + // TODO: Find out why 17649L is the correct number for both eval and codegen paths + checkEvaluation(SortPrefix(SortOrder(d1, Ascending)), 17649L) + checkEvaluation(SortPrefix(SortOrder(t1, Ascending)), millis*1000) + checkEvaluation(SortPrefix(SortOrder(f1, Ascending)), + DoublePrefixComparator.computePrefix(f1.value.asInstanceOf[Float].toDouble)) + checkEvaluation(SortPrefix(SortOrder(f2, Ascending)), + DoublePrefixComparator.computePrefix(f2.value.asInstanceOf[Float].toDouble)) + checkEvaluation(SortPrefix(SortOrder(db1, Ascending)), + DoublePrefixComparator.computePrefix(db1.value.asInstanceOf[Double])) + checkEvaluation(SortPrefix(SortOrder(db2, Ascending)), + DoublePrefixComparator.computePrefix(db2.value.asInstanceOf[Double])) + checkEvaluation(SortPrefix(SortOrder(s1, Ascending)), + StringPrefixComparator.computePrefix(s1.value.asInstanceOf[UTF8String])) + checkEvaluation(SortPrefix(SortOrder(s2, Ascending)), + StringPrefixComparator.computePrefix(s2.value.asInstanceOf[UTF8String])) + checkEvaluation(SortPrefix(SortOrder(b1, Ascending)), + BinaryPrefixComparator.computePrefix(b1.value.asInstanceOf[Array[Byte]])) + checkEvaluation(SortPrefix(SortOrder(b2, Ascending)), + BinaryPrefixComparator.computePrefix(b2.value.asInstanceOf[Array[Byte]])) + } +} From d1a7e220040ba1cfe4facb7d6eef9ae1251768aa Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sun, 29 Apr 2018 12:19:37 -0700 Subject: [PATCH 03/13] Checkpoint commit --- .../apache/spark/sql/catalyst/expressions/SortOrder.scala | 2 +- .../catalyst/expressions/SortOrderExpressionsSuite.scala | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index 9d2b5fefe696c..fcf84ff44f490 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -170,7 +170,7 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => val dtValue = value.asInstanceOf[Decimal] if (dt.precision <= Decimal.MAX_LONG_DIGITS) { - dtValue + dtValue.toUnscaledLong } else { val p = Decimal.MAX_LONG_DIGITS val s = p - (dt.precision - dt.scale) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala index 5dcf48fde54d8..602787526d3f1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala @@ -43,6 +43,9 @@ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper val b1 = Literal.create(Array[Byte](12), BinaryType) val b2 = Literal.create(Array[Byte](12, 17, 99, 0, 0, 0, 2, 3, 0xf4.asInstanceOf[Byte]), BinaryType) + val dec1 = Literal(Decimal(20132983L, 10, 2)) + val dec2 = Literal(Decimal(20132983L, 19, 2)) + val dec3 = Literal(Decimal(20132983L, 21, 2)) checkEvaluation(SortPrefix(SortOrder(i1, Ascending)), 20132983L) checkEvaluation(SortPrefix(SortOrder(i2, Ascending)), -20132983L) @@ -67,5 +70,9 @@ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper BinaryPrefixComparator.computePrefix(b1.value.asInstanceOf[Array[Byte]])) checkEvaluation(SortPrefix(SortOrder(b2, Ascending)), BinaryPrefixComparator.computePrefix(b2.value.asInstanceOf[Array[Byte]])) + checkEvaluation(SortPrefix(SortOrder(dec1, Ascending)), 20132983L) + checkEvaluation(SortPrefix(SortOrder(dec2, Ascending)), 2013298L) + checkEvaluation(SortPrefix(SortOrder(dec3, Ascending)), + DoublePrefixComparator.computePrefix(201329.83d)) } } From 4dbb0b7ae959a6a4f85122a887bab4e1563255f0 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sun, 29 Apr 2018 15:33:21 -0700 Subject: [PATCH 04/13] Comment on testing oddity --- .../sql/catalyst/expressions/SortOrderExpressionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala index 602787526d3f1..7a49f1d5d4427 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala @@ -51,7 +51,7 @@ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(SortPrefix(SortOrder(i2, Ascending)), -20132983L) checkEvaluation(SortPrefix(SortOrder(l1, Ascending)), 20132983L) checkEvaluation(SortPrefix(SortOrder(l2, Ascending)), -20132983L) - // TODO: Find out why 17649L is the correct number for both eval and codegen paths + // For some reason, the Literal.create code gives us the number of days since the epoch checkEvaluation(SortPrefix(SortOrder(d1, Ascending)), 17649L) checkEvaluation(SortPrefix(SortOrder(t1, Ascending)), millis*1000) checkEvaluation(SortPrefix(SortOrder(f1, Ascending)), From 227b6ac71bcfe6a54051f47dd16aec047b0a98d9 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Mon, 30 Apr 2018 14:22:09 -0700 Subject: [PATCH 05/13] Add boolean test for Sortprefix --- .../expressions/SortOrderExpressionsSuite.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala index 7a49f1d5d4427..b111a8822d877 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala @@ -27,6 +27,8 @@ import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.{BinaryPre class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("SortPrefix") { + val b1 = Literal.create(false, BooleanType) + val b2 = Literal.create(true, BooleanType) val i1 = Literal.create(20132983, IntegerType) val i2 = Literal.create(-20132983, IntegerType) val l1 = Literal.create(20132983, LongType) @@ -40,13 +42,15 @@ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper val db2 = Literal.create(-0.7788229d, DoubleType) val s1 = Literal.create("T", StringType) val s2 = Literal.create("This is longer than 8 characters", StringType) - val b1 = Literal.create(Array[Byte](12), BinaryType) - val b2 = Literal.create(Array[Byte](12, 17, 99, 0, 0, 0, 2, 3, 0xf4.asInstanceOf[Byte]), + val bin1 = Literal.create(Array[Byte](12), BinaryType) + val bin2 = Literal.create(Array[Byte](12, 17, 99, 0, 0, 0, 2, 3, 0xf4.asInstanceOf[Byte]), BinaryType) val dec1 = Literal(Decimal(20132983L, 10, 2)) val dec2 = Literal(Decimal(20132983L, 19, 2)) val dec3 = Literal(Decimal(20132983L, 21, 2)) + checkEvaluation(SortPrefix(SortOrder(b1, Ascending)), 0L) + checkEvaluation(SortPrefix(SortOrder(b2, Ascending)), 1L) checkEvaluation(SortPrefix(SortOrder(i1, Ascending)), 20132983L) checkEvaluation(SortPrefix(SortOrder(i2, Ascending)), -20132983L) checkEvaluation(SortPrefix(SortOrder(l1, Ascending)), 20132983L) @@ -66,10 +70,10 @@ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper StringPrefixComparator.computePrefix(s1.value.asInstanceOf[UTF8String])) checkEvaluation(SortPrefix(SortOrder(s2, Ascending)), StringPrefixComparator.computePrefix(s2.value.asInstanceOf[UTF8String])) - checkEvaluation(SortPrefix(SortOrder(b1, Ascending)), - BinaryPrefixComparator.computePrefix(b1.value.asInstanceOf[Array[Byte]])) - checkEvaluation(SortPrefix(SortOrder(b2, Ascending)), - BinaryPrefixComparator.computePrefix(b2.value.asInstanceOf[Array[Byte]])) + checkEvaluation(SortPrefix(SortOrder(bin1, Ascending)), + BinaryPrefixComparator.computePrefix(bin1.value.asInstanceOf[Array[Byte]])) + checkEvaluation(SortPrefix(SortOrder(bin2, Ascending)), + BinaryPrefixComparator.computePrefix(bin2.value.asInstanceOf[Array[Byte]])) checkEvaluation(SortPrefix(SortOrder(dec1, Ascending)), 20132983L) checkEvaluation(SortPrefix(SortOrder(dec2, Ascending)), 2013298L) checkEvaluation(SortPrefix(SortOrder(dec3, Ascending)), From bde7b5ea925c8a15e63d2cfadb248c686f0e1e06 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sat, 5 May 2018 03:59:16 +0800 Subject: [PATCH 06/13] Fix local timezone issue with tests --- .../catalyst/expressions/SortOrderExpressionsSuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala index b111a8822d877..be158c10b24d6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.{Date, Timestamp} +import java.util.TimeZone import org.apache.spark.SparkFunSuite import org.apache.spark.sql.types._ @@ -27,6 +28,9 @@ import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.{BinaryPre class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("SortPrefix") { + // Explicitly choose a time zone, since Date objects can create different values depending on + // local time zone of the machine on which the test is running + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) val b1 = Literal.create(false, BooleanType) val b2 = Literal.create(true, BooleanType) val i1 = Literal.create(20132983, IntegerType) @@ -57,7 +61,7 @@ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(SortPrefix(SortOrder(l2, Ascending)), -20132983L) // For some reason, the Literal.create code gives us the number of days since the epoch checkEvaluation(SortPrefix(SortOrder(d1, Ascending)), 17649L) - checkEvaluation(SortPrefix(SortOrder(t1, Ascending)), millis*1000) + checkEvaluation(SortPrefix(SortOrder(t1, Ascending)), millis * 1000) checkEvaluation(SortPrefix(SortOrder(f1, Ascending)), DoublePrefixComparator.computePrefix(f1.value.asInstanceOf[Float].toDouble)) checkEvaluation(SortPrefix(SortOrder(f2, Ascending)), From 71007e568496ea9350e10a2a9a9411a863e8441d Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sat, 5 May 2018 06:14:25 +0800 Subject: [PATCH 07/13] Respond to review comments --- .../sql/catalyst/expressions/SortOrder.scala | 62 ++++++++++--------- .../SortOrderExpressionsSuite.scala | 6 +- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index fcf84ff44f490..5c526aa52508d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -148,39 +148,43 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { (!child.isAscending && child.nullOrdering == NullsLast) } + private lazy val calcPrefix: Any => Long = child.child.dataType match { + case BooleanType => (raw: Any) => + if (raw.asInstanceOf[Boolean]) 1 else 0 + case _: IntegralType => + _.asInstanceOf[java.lang.Number].longValue() + case DateType | TimestampType => + _.asInstanceOf[java.lang.Number].longValue() + case FloatType | DoubleType => (raw: Any) => { + val dVal = raw.asInstanceOf[java.lang.Number].doubleValue() + DoublePrefixComparator.computePrefix(dVal) + } + case StringType => (raw: Any) => + StringPrefixComparator.computePrefix(raw.asInstanceOf[UTF8String]) + case BinaryType => (raw: Any) => + BinaryPrefixComparator.computePrefix(raw.asInstanceOf[Array[Byte]]) + case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => (raw: Any) => { + val value = raw.asInstanceOf[Decimal] + if (dt.precision <= Decimal.MAX_LONG_DIGITS) { + value.toUnscaledLong + } else { + val p = Decimal.MAX_LONG_DIGITS + val s = p - (dt.precision - dt.scale) + if (value.changePrecision(p, s)) value.toUnscaledLong else Long.MinValue + } + } + case dt: DecimalType => (raw: Any) => + DoublePrefixComparator.computePrefix(raw.asInstanceOf[Decimal].toDouble) + case dt => (Any) => 0L + } + override def eval(input: InternalRow): Any = { val value = child.child.eval(input) if (value == null) { - return null - } - val prefix = child.child.dataType match { - case BooleanType => - if (value.asInstanceOf[Boolean]) 1L else 0L - case _: IntegralType => - value.asInstanceOf[java.lang.Number].longValue() - case DateType | TimestampType => - value.asInstanceOf[java.lang.Number].longValue() - case FloatType | DoubleType => - val dVal = value.asInstanceOf[java.lang.Number].doubleValue() - DoublePrefixComparator.computePrefix(dVal) - case StringType => - StringPrefixComparator.computePrefix(value.asInstanceOf[UTF8String]) - case BinaryType => - BinaryPrefixComparator.computePrefix(value.asInstanceOf[Array[Byte]]) - case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => - val dtValue = value.asInstanceOf[Decimal] - if (dt.precision <= Decimal.MAX_LONG_DIGITS) { - dtValue.toUnscaledLong - } else { - val p = Decimal.MAX_LONG_DIGITS - val s = p - (dt.precision - dt.scale) - if (dtValue.changePrecision(p, s)) dtValue.toUnscaledLong else Long.MinValue - } - case dt: DecimalType => - val dtValue = value.asInstanceOf[Decimal].toDouble - DoublePrefixComparator.computePrefix(dtValue) + null + } else { + calcPrefix(value) } - prefix } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala index be158c10b24d6..4dec10041bbb8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala @@ -23,7 +23,7 @@ import java.util.TimeZone import org.apache.spark.SparkFunSuite import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.{BinaryPrefixComparator, DoublePrefixComparator, StringPrefixComparator} +import org.apache.spark.util.collection.unsafe.sort.PrefixComparators._ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -52,6 +52,8 @@ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper val dec1 = Literal(Decimal(20132983L, 10, 2)) val dec2 = Literal(Decimal(20132983L, 19, 2)) val dec3 = Literal(Decimal(20132983L, 21, 2)) + val list1 = Literal(List(1, 2), ArrayType(IntegerType)) + val nullVal = Literal.create(null, IntegerType) checkEvaluation(SortPrefix(SortOrder(b1, Ascending)), 0L) checkEvaluation(SortPrefix(SortOrder(b2, Ascending)), 1L) @@ -82,5 +84,7 @@ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(SortPrefix(SortOrder(dec2, Ascending)), 2013298L) checkEvaluation(SortPrefix(SortOrder(dec3, Ascending)), DoublePrefixComparator.computePrefix(201329.83d)) + checkEvaluation(SortPrefix(SortOrder(list1, Ascending)), 0L) + checkEvaluation(SortPrefix(SortOrder(nullVal, Ascending)), null) } } From 1060a666463426f2f70f2fe0a4fa7e2b66f22c67 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sat, 5 May 2018 07:20:51 +0800 Subject: [PATCH 08/13] Small cleanup --- .../sql/catalyst/expressions/SortOrder.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index 5c526aa52508d..a2b9dd7b85034 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -149,21 +149,21 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { } private lazy val calcPrefix: Any => Long = child.child.dataType match { - case BooleanType => (raw: Any) => + case BooleanType => (raw) => if (raw.asInstanceOf[Boolean]) 1 else 0 - case _: IntegralType => - _.asInstanceOf[java.lang.Number].longValue() + case _: IntegralType => (raw) => + raw.asInstanceOf[java.lang.Number].longValue() case DateType | TimestampType => _.asInstanceOf[java.lang.Number].longValue() - case FloatType | DoubleType => (raw: Any) => { + case FloatType | DoubleType => (raw) => { val dVal = raw.asInstanceOf[java.lang.Number].doubleValue() DoublePrefixComparator.computePrefix(dVal) } - case StringType => (raw: Any) => + case StringType => (raw) => StringPrefixComparator.computePrefix(raw.asInstanceOf[UTF8String]) - case BinaryType => (raw: Any) => + case BinaryType => (raw) => BinaryPrefixComparator.computePrefix(raw.asInstanceOf[Array[Byte]]) - case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => (raw: Any) => { + case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => (raw) => { val value = raw.asInstanceOf[Decimal] if (dt.precision <= Decimal.MAX_LONG_DIGITS) { value.toUnscaledLong @@ -173,9 +173,9 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { if (value.changePrecision(p, s)) value.toUnscaledLong else Long.MinValue } } - case dt: DecimalType => (raw: Any) => + case dt: DecimalType => (raw) => DoublePrefixComparator.computePrefix(raw.asInstanceOf[Decimal].toDouble) - case dt => (Any) => 0L + case _ => (Any) => 0L } override def eval(input: InternalRow): Any = { From dc9a0702584c9fd16a1d25c675aaf2b424cd7623 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sun, 6 May 2018 18:33:02 -0700 Subject: [PATCH 09/13] Fix test --- .../expressions/SortOrderExpressionsSuite.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala index 4dec10041bbb8..25ff6e992c7bf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala @@ -28,9 +28,7 @@ import org.apache.spark.util.collection.unsafe.sort.PrefixComparators._ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("SortPrefix") { - // Explicitly choose a time zone, since Date objects can create different values depending on - // local time zone of the machine on which the test is running - TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) + val b1 = Literal.create(false, BooleanType) val b2 = Literal.create(true, BooleanType) val i1 = Literal.create(20132983, IntegerType) @@ -38,7 +36,15 @@ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper val l1 = Literal.create(20132983, LongType) val l2 = Literal.create(-20132983, LongType) val millis = 1524954911000L; - val d1 = Literal.create(new java.sql.Date(millis), DateType) + // Explicitly choose a time zone, since Date objects can create different values depending on + // local time zone of the machine on which the test is running + val oldDefaultTZ = TimeZone.getDefault + val d1 = try { + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) + Literal.create(new java.sql.Date(millis), DateType) + } finally { + TimeZone.setDefault(oldDefaultTZ) + } val t1 = Literal.create(new Timestamp(millis), TimestampType) val f1 = Literal.create(0.7788229f, FloatType) val f2 = Literal.create(-0.7788229f, FloatType) From 2044aed01789ec4b58b7c18b16e4a6bd5691a23c Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Wed, 9 May 2018 02:36:44 +0800 Subject: [PATCH 10/13] Split decimal handling function into two --- .../spark/sql/catalyst/expressions/SortOrder.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index a2b9dd7b85034..f10125ea9ff30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -163,15 +163,13 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { StringPrefixComparator.computePrefix(raw.asInstanceOf[UTF8String]) case BinaryType => (raw) => BinaryPrefixComparator.computePrefix(raw.asInstanceOf[Array[Byte]]) + case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => + _.asInstanceOf[Decimal].toUnscaledLong case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => (raw) => { val value = raw.asInstanceOf[Decimal] - if (dt.precision <= Decimal.MAX_LONG_DIGITS) { - value.toUnscaledLong - } else { - val p = Decimal.MAX_LONG_DIGITS - val s = p - (dt.precision - dt.scale) - if (value.changePrecision(p, s)) value.toUnscaledLong else Long.MinValue - } + val p = Decimal.MAX_LONG_DIGITS + val s = p - (dt.precision - dt.scale) + if (value.changePrecision(p, s)) value.toUnscaledLong else Long.MinValue } case dt: DecimalType => (raw) => DoublePrefixComparator.computePrefix(raw.asInstanceOf[Decimal].toDouble) From b81419f0309bfafbf1ff894597bcdb67c8a5d9b6 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Wed, 9 May 2018 04:33:20 +0800 Subject: [PATCH 11/13] Collapse function creation for Integral, Date, and Timestamp types --- .../org/apache/spark/sql/catalyst/expressions/SortOrder.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index f10125ea9ff30..e6da9448908b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -151,10 +151,8 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { private lazy val calcPrefix: Any => Long = child.child.dataType match { case BooleanType => (raw) => if (raw.asInstanceOf[Boolean]) 1 else 0 - case _: IntegralType => (raw) => + case DateType | TimestampType | _: IntegralType => (raw) => raw.asInstanceOf[java.lang.Number].longValue() - case DateType | TimestampType => - _.asInstanceOf[java.lang.Number].longValue() case FloatType | DoubleType => (raw) => { val dVal = raw.asInstanceOf[java.lang.Number].doubleValue() DoublePrefixComparator.computePrefix(dVal) From 28f1b7041ef6b4a233e523476e2a7ab3e9a17a71 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Wed, 9 May 2018 04:48:30 +0800 Subject: [PATCH 12/13] Remove extraneous empty line --- .../sql/catalyst/expressions/SortOrderExpressionsSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala index 25ff6e992c7bf..cc2e2a993d629 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala @@ -28,7 +28,6 @@ import org.apache.spark.util.collection.unsafe.sort.PrefixComparators._ class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("SortPrefix") { - val b1 = Literal.create(false, BooleanType) val b2 = Literal.create(true, BooleanType) val i1 = Literal.create(20132983, IntegerType) From 590ba26c54b22de670cc699dcd0e1e48aaf71ab2 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Thu, 10 May 2018 06:26:54 +0800 Subject: [PATCH 13/13] Move out of closure those variable evaluations that need to happen only once --- .../spark/sql/catalyst/expressions/SortOrder.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index e6da9448908b7..040bce96de400 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -163,12 +163,13 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { BinaryPrefixComparator.computePrefix(raw.asInstanceOf[Array[Byte]]) case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => _.asInstanceOf[Decimal].toUnscaledLong - case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => (raw) => { - val value = raw.asInstanceOf[Decimal] + case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => val p = Decimal.MAX_LONG_DIGITS val s = p - (dt.precision - dt.scale) - if (value.changePrecision(p, s)) value.toUnscaledLong else Long.MinValue - } + (raw) => { + val value = raw.asInstanceOf[Decimal] + if (value.changePrecision(p, s)) value.toUnscaledLong else Long.MinValue + } case dt: DecimalType => (raw) => DoublePrefixComparator.computePrefix(raw.asInstanceOf[Decimal].toDouble) case _ => (Any) => 0L