-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-57033][SQL] Add java.time LocalDateTime/Instant conversion and Dataset roundtrip for nanosecond timestamps #56158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
845c0c9
2d57886
9176040
359e42c
2b58090
fb6e3db
9b826cd
2cc5f85
364835a
466ad92
beff171
9092075
d008377
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf | |
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.sql.types.DayTimeIntervalType._ | ||
| import org.apache.spark.sql.types.YearMonthIntervalType._ | ||
| import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, UTF8String} | ||
| import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, TimestampNanosVal, UTF8String} | ||
| import org.apache.spark.util.ArrayImplicits._ | ||
| import org.apache.spark.util.collection.Utils | ||
|
|
||
|
|
@@ -88,6 +88,8 @@ object CatalystTypeConverters { | |
| case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => InstantConverter | ||
| case TimestampType => TimestampConverter | ||
| case TimestampNTZType => TimestampNTZConverter | ||
| case t: TimestampNTZNanosType => new TimestampNTZNanosConverter(t) | ||
| case t: TimestampLTZNanosType => new TimestampLTZNanosConverter(t) | ||
| case dt: DecimalType => new DecimalConverter(dt) | ||
| case BooleanType => BooleanConverter | ||
| case ByteType => ByteConverter | ||
|
|
@@ -298,7 +300,7 @@ object CatalystTypeConverters { | |
| } | ||
| new GenericInternalRow(ar) | ||
| case other => throw new SparkIllegalArgumentException( | ||
| errorClass = "_LEGACY_ERROR_TEMP_3219", | ||
| errorClass = "INVALID_EXTERNAL_VALUE", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why rename the legacy error in this PR?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do believe it is related to the changes. We postpone this issue every time while adding new types. I think it is worth to assign proper name now. |
||
| messageParameters = scala.collection.immutable.Map( | ||
| "other" -> other.toString, | ||
| "otherClass" -> other.getClass.getCanonicalName, | ||
|
|
@@ -357,7 +359,7 @@ object CatalystTypeConverters { | |
| case chr: Char => UTF8String.fromString(chr.toString) | ||
| case ac: Array[Char] => UTF8String.fromString(String.valueOf(ac)) | ||
| case other => throw new SparkIllegalArgumentException( | ||
| errorClass = "_LEGACY_ERROR_TEMP_3219", | ||
| errorClass = "INVALID_EXTERNAL_VALUE", | ||
| messageParameters = scala.collection.immutable.Map( | ||
| "other" -> other.toString, | ||
| "otherClass" -> other.getClass.getCanonicalName, | ||
|
|
@@ -383,7 +385,7 @@ object CatalystTypeConverters { | |
| case g: org.apache.spark.sql.types.Geometry if SQLConf.get.geospatialEnabled => | ||
| STUtils.serializeGeomFromWKB(g, dataType) | ||
| case other => throw new SparkIllegalArgumentException( | ||
| errorClass = "_LEGACY_ERROR_TEMP_3219", | ||
| errorClass = "INVALID_EXTERNAL_VALUE", | ||
| messageParameters = scala.collection.immutable.Map( | ||
| "other" -> other.toString, | ||
| "otherClass" -> other.getClass.getCanonicalName, | ||
|
|
@@ -408,7 +410,7 @@ object CatalystTypeConverters { | |
| case g: org.apache.spark.sql.types.Geography if SQLConf.get.geospatialEnabled => | ||
| STUtils.serializeGeogFromWKB(g, dataType) | ||
| case other => throw new SparkIllegalArgumentException( | ||
| errorClass = "_LEGACY_ERROR_TEMP_3219", | ||
| errorClass = "INVALID_EXTERNAL_VALUE", | ||
| messageParameters = scala.collection.immutable.Map( | ||
| "other" -> other.toString, | ||
| "otherClass" -> other.getClass.getCanonicalName, | ||
|
|
@@ -432,7 +434,7 @@ object CatalystTypeConverters { | |
| case d: Date => DateTimeUtils.fromJavaDate(d) | ||
| case l: LocalDate => DateTimeUtils.localDateToDays(l) | ||
| case other => throw new SparkIllegalArgumentException( | ||
| errorClass = "_LEGACY_ERROR_TEMP_3219", | ||
| errorClass = "INVALID_EXTERNAL_VALUE", | ||
| messageParameters = scala.collection.immutable.Map( | ||
| "other" -> other.toString, | ||
| "otherClass" -> other.getClass.getCanonicalName, | ||
|
|
@@ -472,7 +474,7 @@ object CatalystTypeConverters { | |
| case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) | ||
| case i: Instant => DateTimeUtils.instantToMicros(i) | ||
| case other => throw new SparkIllegalArgumentException( | ||
| errorClass = "_LEGACY_ERROR_TEMP_3219", | ||
| errorClass = "INVALID_EXTERNAL_VALUE", | ||
| messageParameters = scala.collection.immutable.Map( | ||
| "other" -> other.toString, | ||
| "otherClass" -> other.getClass.getCanonicalName, | ||
|
|
@@ -500,7 +502,7 @@ object CatalystTypeConverters { | |
| override def toCatalystImpl(scalaValue: Any): Any = scalaValue match { | ||
| case l: LocalDateTime => DateTimeUtils.localDateTimeToMicros(l) | ||
| case other => throw new SparkIllegalArgumentException( | ||
| errorClass = "_LEGACY_ERROR_TEMP_3219", | ||
| errorClass = "INVALID_EXTERNAL_VALUE", | ||
| messageParameters = scala.collection.immutable.Map( | ||
| "other" -> other.toString, | ||
| "otherClass" -> other.getClass.getCanonicalName, | ||
|
|
@@ -515,6 +517,50 @@ object CatalystTypeConverters { | |
| DateTimeUtils.microsToLocalDateTime(row.getLong(column)) | ||
| } | ||
|
|
||
| private class TimestampNTZNanosConverter(dataType: TimestampNTZNanosType) | ||
| extends CatalystTypeConverter[Any, LocalDateTime, TimestampNanosVal] { | ||
| override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { | ||
| case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l, dataType.precision) | ||
| case other => throw new SparkIllegalArgumentException( | ||
| errorClass = "INVALID_EXTERNAL_VALUE", | ||
| messageParameters = scala.collection.immutable.Map( | ||
| "other" -> other.toString, | ||
| "otherClass" -> other.getClass.getCanonicalName, | ||
| "dataType" -> dataType.sql)) | ||
| } | ||
|
|
||
| override def toScala(catalystValue: TimestampNanosVal): LocalDateTime = | ||
| if (catalystValue == null) null | ||
| else DateTimeUtils.timestampNanosToLocalDateTime(catalystValue) | ||
|
|
||
| override def toScalaImpl(row: InternalRow, column: Int): LocalDateTime = | ||
| DateTimeUtils.timestampNanosToLocalDateTime(row.getTimestampNTZNanos(column)) | ||
| } | ||
|
|
||
| // Always maps `TimestampLTZNanosType` to `java.time.Instant`. Unlike micro `TimestampType`, | ||
| // the mapping does not consult `spark.sql.datetime.java8API.enabled`: the nanos LTZ type is | ||
| // post-Java-8 and the legacy `java.sql.Timestamp` external type is intentionally out of scope | ||
| // here. See SPARK-57033. | ||
| private class TimestampLTZNanosConverter(dataType: TimestampLTZNanosType) | ||
| extends CatalystTypeConverter[Any, Instant, TimestampNanosVal] { | ||
| override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { | ||
| case i: Instant => DateTimeUtils.instantToTimestampNanos(i, dataType.precision) | ||
| case other => throw new SparkIllegalArgumentException( | ||
| errorClass = "INVALID_EXTERNAL_VALUE", | ||
| messageParameters = scala.collection.immutable.Map( | ||
| "other" -> other.toString, | ||
| "otherClass" -> other.getClass.getCanonicalName, | ||
| "dataType" -> dataType.sql)) | ||
| } | ||
|
|
||
| override def toScala(catalystValue: TimestampNanosVal): Instant = | ||
| if (catalystValue == null) null | ||
| else DateTimeUtils.timestampNanosToInstant(catalystValue) | ||
|
|
||
| override def toScalaImpl(row: InternalRow, column: Int): Instant = | ||
| DateTimeUtils.timestampNanosToInstant(row.getTimestampLTZNanos(column)) | ||
| } | ||
|
|
||
| private class DecimalConverter(dataType: DecimalType) | ||
| extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] { | ||
|
|
||
|
|
@@ -527,7 +573,7 @@ object CatalystTypeConverters { | |
| case d: JavaBigInteger => Decimal(d) | ||
| case d: Decimal => d | ||
| case other => throw new SparkIllegalArgumentException( | ||
| errorClass = "_LEGACY_ERROR_TEMP_3219", | ||
| errorClass = "INVALID_EXTERNAL_VALUE", | ||
| messageParameters = scala.collection.immutable.Map( | ||
| "other" -> other.toString, | ||
| "otherClass" -> other.getClass.getCanonicalName, | ||
|
|
@@ -655,6 +701,9 @@ object CatalystTypeConverters { | |
| case ld: LocalDate => LocalDateConverter.toCatalyst(ld) | ||
| case t: LocalTime => TimeConverter.toCatalyst(t) | ||
| case t: Timestamp => TimestampConverter.toCatalyst(t) | ||
| // SPARK-57033: schema-less convertToCatalyst keeps bare `Instant` / `LocalDateTime` on the | ||
| // microsecond converters. The nanosecond path is schema-driven only - users opt in via an | ||
| // explicit `TimestampLTZNanosType` / `TimestampNTZNanosType` column in the schema. | ||
| case i: Instant => InstantConverter.toCatalyst(i) | ||
| case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l) | ||
| case d: BigDecimal => | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR's
DatasetSuite/JavaDatasetSuitetests confirmcreateDataFrame(rows, schemaWithNanos).collect()works under classic mode. The same call under Spark Connect throws before it ever reachescollect():sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala:51-77(toArrowTypeDefault) has noTimestampNTZNanosType/TimestampLTZNanosTypecase — falls tocase _ => throw ExecutionErrors.unsupportedDataTypeError(dt)(line 75-76). Server-side schema → Arrow type construction fails.sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala:524-525and.../ArrowDeserializer.scala:437-439have no cases for the newLocalDateTimeNanosEncoder/InstantNanosEncoder— fall tocase _ => throw new RuntimeException(s"Unsupported Encoder($encoder)/Vector($v) combination."). Even if the Arrow schema mapping were added, the encoder ↔ vector dispatch would still fail.Is the Connect / Arrow integration tracked in a follow-up ticket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - this is intentional. The scope of SPARK-57033 is the classic-mode
java.time <-> internal-row conversion (encoders / CatalystTypeConverters /
Dataset roundtrip), so the Connect + Arrow path is deliberately left out here and
is tracked by follow-up sub-tasks under the umbrella SPARK-56822:
ArrowUtils.toArrowType/fromArrowType- theunsupportedDataTypeErroryou hit): SPARK-57159 "Add Arrow type mapping fornanosecond-capable timestamp types".
ArrowSerializer/ArrowDeserializer(the
Unsupported Encoder/VectorRuntimeException) plus the end-to-end Connectflow: SPARK-57160 "Add Spark Connect protocol support for nanosecond-capable
timestamp types and literals" and SPARK-57161 "Convert nanosecond-capable
timestamp types and literals between proto and Catalyst in Spark Connect".