From f025ae1010fc9cf1990c5aa7b8db5a964b45eb37 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 20 Jan 2019 17:42:47 +0100 Subject: [PATCH 1/9] Using stringToDate/stringToTimestamp --- .../datasources/jdbc/JDBCRelation.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 13ed105004d70..9ad2598bcfab2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.jdbc -import java.sql.{Date, Timestamp} - import scala.collection.mutable.ArrayBuffer import org.apache.spark.Partition @@ -27,10 +25,12 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getTimeZone, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, DateType, NumericType, StructType, TimestampType} +import org.apache.spark.unsafe.types.UTF8String /** * Instructions on how to partition the table among workers. @@ -174,10 +174,19 @@ private[sql] object JDBCRelation extends Logging { (dialect.quoteIdentifier(column.name), column.dataType) } - private def toInternalBoundValue(value: String, columnType: DataType): Long = columnType match { - case _: NumericType => value.toLong - case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value)).toLong - case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value)) + private def toInternalBoundValue(value: String, columnType: DataType): Long = { + def parse[T](f: UTF8String => Option[T]): T = { + f(UTF8String.fromString(value)).getOrElse { + throw new IllegalArgumentException( + s"Cannot parse the bound value $value as ${columnType.catalogString}") + } + } + columnType match { + case _: NumericType => value.toLong + case DateType => parse(stringToDate).toLong + case TimestampType => + parse(stringToTimestamp(_, getTimeZone(SQLConf.get.sessionLocalTimeZone))) + } } private def toBoundValueInWhereClause( From 6171e9adf97df4183ea6768636d814f81f08e30f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 20 Jan 2019 18:07:20 +0100 Subject: [PATCH 2/9] Add a test --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 284900b68ae52..122ea765540ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1523,4 +1523,30 @@ class JDBCSuite extends QueryTest assert(e.contains("The driver could not open a JDBC connection. " + "Check the URL: jdbc:mysql://localhost/db")) } + + test("parsing timestamp bounds") { + Seq( + ("2018-07-04 03:30:00.0", "2018-07-15 20:50:32.5", "2018-07-27 14:11:05.0"), + ("2019-01-20 12:00:00.502", "2019-01-20 12:00:00.751", "2019-01-20 12:00:01.000"), + ("2019-01-20T00:00:00.123456", "2019-01-20 00:05:00.123456", "2019-01-20T00:10:00.123456"), + ("1500-01-20T00:00:00.123456", "1500-01-20 00:05:00.123456", "1500-01-20T00:10:00.123456") + ).foreach { case (lower, middle, upper) => + val df = spark.read.format("jdbc") + .option("url", urlWithUserAndPass) + .option("dbtable", "TEST.DATETIME") + .option("partitionColumn", "t") + .option("lowerBound", lower) + .option("upperBound", upper) + .option("numPartitions", 2) + .load() + + df.logicalPlan match { + case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) => + val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet + assert(whereClauses === Set( + s""""T" < '$middle' or "T" is null""", + s""""T" >= '$middle'""")) + } + } + } } From 0b610760b40800dbead5dd2b214467529cec730a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 20 Jan 2019 18:23:19 +0100 Subject: [PATCH 3/9] Update test --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 122ea765540ad..b6b12f10fc869 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1526,7 +1526,7 @@ class JDBCSuite extends QueryTest test("parsing timestamp bounds") { Seq( - ("2018-07-04 03:30:00.0", "2018-07-15 20:50:32.5", "2018-07-27 14:11:05.0"), + ("1972-07-04 03:30:00", "1972-07-15 20:50:32.5", "1972-07-27 14:11:05"), ("2019-01-20 12:00:00.502", "2019-01-20 12:00:00.751", "2019-01-20 12:00:01.000"), ("2019-01-20T00:00:00.123456", "2019-01-20 00:05:00.123456", "2019-01-20T00:10:00.123456"), ("1500-01-20T00:00:00.123456", "1500-01-20 00:05:00.123456", "1500-01-20T00:10:00.123456") From 8bb4f3a7bcfca0827f5ba20491bb8f04f7abb808 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 21 Jan 2019 18:12:41 +0100 Subject: [PATCH 4/9] Run the test in different time zones --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index b6b12f10fc869..98b73acae4919 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils} import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -1525,27 +1525,34 @@ class JDBCSuite extends QueryTest } test("parsing timestamp bounds") { - Seq( - ("1972-07-04 03:30:00", "1972-07-15 20:50:32.5", "1972-07-27 14:11:05"), - ("2019-01-20 12:00:00.502", "2019-01-20 12:00:00.751", "2019-01-20 12:00:01.000"), - ("2019-01-20T00:00:00.123456", "2019-01-20 00:05:00.123456", "2019-01-20T00:10:00.123456"), - ("1500-01-20T00:00:00.123456", "1500-01-20 00:05:00.123456", "1500-01-20T00:10:00.123456") - ).foreach { case (lower, middle, upper) => - val df = spark.read.format("jdbc") - .option("url", urlWithUserAndPass) - .option("dbtable", "TEST.DATETIME") - .option("partitionColumn", "t") - .option("lowerBound", lower) - .option("upperBound", upper) - .option("numPartitions", 2) - .load() - - df.logicalPlan match { - case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) => - val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet - assert(whereClauses === Set( - s""""T" < '$middle' or "T" is null""", - s""""T" >= '$middle'""")) + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + Seq( + ("1972-07-04 03:30:00", "1972-07-15 20:50:32.5", "1972-07-27 14:11:05"), + ("2019-01-20 12:00:00.502", "2019-01-20 12:00:00.751", "2019-01-20 12:00:01.000"), + ( + "2019-01-20T00:00:00.123456", + "2019-01-20 00:05:00.123456", + "2019-01-20T00:10:00.123456"), + ("1500-01-20T00:00:00.123456", "1500-01-20 00:05:00.123456", "1500-01-20T00:10:00.123456") + ).foreach { case (lower, middle, upper) => + val df = spark.read.format("jdbc") + .option("url", urlWithUserAndPass) + .option("dbtable", "TEST.DATETIME") + .option("partitionColumn", "t") + .option("lowerBound", lower) + .option("upperBound", upper) + .option("numPartitions", 2) + .load() + + df.logicalPlan match { + case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) => + val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet + assert(whereClauses === Set( + s""""T" < '$middle' or "T" is null""", + s""""T" >= '$middle'""")) + } + } } } } From 7ba3cbbd355b21e73daed3b1a1587a11abd463a0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 22 Jan 2019 10:48:48 +0100 Subject: [PATCH 5/9] Pass time zone as a parameter --- .../execution/datasources/jdbc/JDBCRelation.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 9ad2598bcfab2..c0f78b5273885 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -85,8 +85,8 @@ private[sql] object JDBCRelation extends Logging { val (column, columnType) = verifyAndGetNormalizedPartitionColumn( schema, partitionColumn.get, resolver, jdbcOptions) - val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType) - val upperBoundValue = toInternalBoundValue(upperBound.get, columnType) + val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType, timeZoneId) + val upperBoundValue = toInternalBoundValue(upperBound.get, columnType, timeZoneId) JDBCPartitioningInfo( column, columnType, lowerBoundValue, upperBoundValue, numPartitions.get) } @@ -174,7 +174,10 @@ private[sql] object JDBCRelation extends Logging { (dialect.quoteIdentifier(column.name), column.dataType) } - private def toInternalBoundValue(value: String, columnType: DataType): Long = { + private def toInternalBoundValue( + value: String, + columnType: DataType, + timeZoneId: String): Long = { def parse[T](f: UTF8String => Option[T]): T = { f(UTF8String.fromString(value)).getOrElse { throw new IllegalArgumentException( @@ -184,8 +187,7 @@ private[sql] object JDBCRelation extends Logging { columnType match { case _: NumericType => value.toLong case DateType => parse(stringToDate).toLong - case TimestampType => - parse(stringToTimestamp(_, getTimeZone(SQLConf.get.sessionLocalTimeZone))) + case TimestampType => parse(stringToTimestamp(_, getTimeZone(timeZoneId))) } } From a0b23ed8bc4376cd8cad27c6887cb7e7579fcf34 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 22 Jan 2019 11:12:59 +0100 Subject: [PATCH 6/9] Upgrade the migration guide --- docs/sql-migration-guide-upgrade.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 5d3d4c6ece39d..5fad87c4adbb5 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -43,6 +43,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default. + - Since Spark 3.0, the JDBC options `lowerBound` and `upperBound` are converted to TimestampType/DateType values in the same way as casting strings to TimestampType/DateType values. The conversion is based on Proleptic Gregorian calendar, and time zone defined by the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion is based on the hybrid calendar (Julian + Gregorian) and on default system time zone. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. From e79397067fdd6767059eee9eca94b0e500b1f3af Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 22 Jan 2019 16:02:58 +0100 Subject: [PATCH 7/9] Fix indentation --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 98b73acae4919..859ce6d665a82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1530,9 +1530,7 @@ class JDBCSuite extends QueryTest Seq( ("1972-07-04 03:30:00", "1972-07-15 20:50:32.5", "1972-07-27 14:11:05"), ("2019-01-20 12:00:00.502", "2019-01-20 12:00:00.751", "2019-01-20 12:00:01.000"), - ( - "2019-01-20T00:00:00.123456", - "2019-01-20 00:05:00.123456", + ("2019-01-20T00:00:00.123456", "2019-01-20 00:05:00.123456", "2019-01-20T00:10:00.123456"), ("1500-01-20T00:00:00.123456", "1500-01-20 00:05:00.123456", "1500-01-20T00:10:00.123456") ).foreach { case (lower, middle, upper) => From c6df7304bfd614cadc937fed82eaf92baa7d4618 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 22 Jan 2019 16:08:59 +0100 Subject: [PATCH 8/9] Improving test's title --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 859ce6d665a82..f6481d9cd44fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1524,7 +1524,7 @@ class JDBCSuite extends QueryTest "Check the URL: jdbc:mysql://localhost/db")) } - test("parsing timestamp bounds") { + test("support casting patterns for lower/upper bounds of TimestampType") { DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { Seq( From 4dc4a2afb6e3bee55bcf160268fa9b68ba5a5b53 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 22 Jan 2019 16:48:57 +0100 Subject: [PATCH 9/9] Avoid to extend arguments fully --- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index f6481d9cd44fc..a4dc537d31b7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1544,9 +1544,10 @@ class JDBCSuite extends QueryTest .load() df.logicalPlan match { - case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) => - val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet - assert(whereClauses === Set( + case lr: LogicalRelation if lr.relation.isInstanceOf[JDBCRelation] => + val jdbcRelation = lr.relation.asInstanceOf[JDBCRelation] + val whereClauses = jdbcRelation.parts.map(_.asInstanceOf[JDBCPartition].whereClause) + assert(whereClauses.toSet === Set( s""""T" < '$middle' or "T" is null""", s""""T" >= '$middle'""")) }