diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 71dbf1a1ebce3..6757b89358729 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3729,6 +3729,12 @@ ], "sqlState" : "42K0N" }, + "INVALID_EXTERNAL_VALUE" : { + "message" : [ + "The value () of the type () cannot be converted to the type." + ], + "sqlState" : "42K0N" + }, "INVALID_EXTRACT_BASE_FIELD_TYPE" : { "message" : [ "Can't extract a value from . Need a complex type [STRUCT, ARRAY, MAP] but got ." @@ -11487,11 +11493,6 @@ "Must be 2 children: " ] }, - "_LEGACY_ERROR_TEMP_3219" : { - "message" : [ - "The value () of the type () cannot be converted to the type." - ] - }, "_LEGACY_ERROR_TEMP_3220" : { "message" : [ "The value () of the type () cannot be converted to an array of " diff --git a/docs/sql-ref-datatypes.md b/docs/sql-ref-datatypes.md index fe1b8724d8d61..e0d36631dec0e 100644 --- a/docs/sql-ref-datatypes.md +++ b/docs/sql-ref-datatypes.md @@ -54,7 +54,7 @@ Spark SQL and DataFrames support the following data types: - `TimestampNTZType`: Timestamp without time zone(TIMESTAMP_NTZ). It represents values comprising values of fields year, month, day, hour, minute, and second. All operations are performed without taking any time zone into account. - Note: TIMESTAMP in Spark is a user-specified alias associated with one of the TIMESTAMP_LTZ and TIMESTAMP_NTZ variations. Users can set the default timestamp type as `TIMESTAMP_LTZ`(default value) or `TIMESTAMP_NTZ` via the configuration `spark.sql.timestampType`. - - `TimestampNTZNanosType(precision)` / `TimestampLTZNanosType(precision)`: Preview nanosecond-capable variants of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` with fractional seconds precision `precision` in `[7, 9]`. Unparameterized `TIMESTAMP`, `TIMESTAMP_NTZ`, and `TIMESTAMP_LTZ` remain microsecond types. Enable the preview feature with `SET spark.sql.timestampNanosTypes.enabled=true;` before using these types in schemas or SQL. + - `TimestampNTZNanosType(precision)` / `TimestampLTZNanosType(precision)`: Preview nanosecond-capable variants of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` with fractional seconds precision `precision` in `[7, 9]`. Unparameterized `TIMESTAMP`, `TIMESTAMP_NTZ`, and `TIMESTAMP_LTZ` remain microsecond types. In schema-driven Dataset/DataFrame conversion, Spark maps `TimestampNTZNanosType` to `java.time.LocalDateTime` and `TimestampLTZNanosType` to `java.time.Instant`; values with more sub-micro digits than declared by `precision` are floor-truncated to that precision. Enable the preview feature with `SET spark.sql.timestampNanosTypes.enabled=true;` before using these types in schemas or SQL. * Interval types - `YearMonthIntervalType(startField, endField)`: Represents a year-month interval which is made up of a contiguous subset of the following fields: diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala index 20949c188cb81..57c15de4f0db4 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala @@ -257,6 +257,14 @@ object AgnosticEncoders { case class InstantEncoder(override val lenientSerialization: Boolean) extends LeafEncoder[Instant](TimestampType) case object LocalDateTimeEncoder extends LeafEncoder[LocalDateTime](TimestampNTZType) + // Nanosecond-precision counterparts of `LocalDateTimeEncoder` / `InstantEncoder(false)`. + // They are used by `RowEncoder` when the schema declares a `TimestampNTZNanosType(p)` or + // `TimestampLTZNanosType(p)` column, so Dataset create/collect roundtrips preserve full + // nanosecond precision. See SPARK-57033. + case class LocalDateTimeNanosEncoder(precision: Int) + extends LeafEncoder[LocalDateTime](TimestampNTZNanosType(precision)) + case class InstantNanosEncoder(precision: Int) + extends LeafEncoder[Instant](TimestampLTZNanosType(precision)) case object LocalTimeEncoder extends LeafEncoder[LocalTime](TimeType()) case class SparkDecimalEncoder(dt: DecimalType) extends LeafEncoder[Decimal](dt) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index bad673672188c..705d5d8f11b1d 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -21,8 +21,8 @@ import scala.collection.mutable import scala.reflect.classTag import org.apache.spark.sql.{AnalysisException, Row} -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder} -import org.apache.spark.sql.errors.DataTypeErrorsBase +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalDateTimeNanosEncoder, LocalTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder} +import org.apache.spark.sql.errors.{DataTypeErrors, DataTypeErrorsBase} import org.apache.spark.sql.internal.SqlApiConf import org.apache.spark.sql.types._ import org.apache.spark.sql.types.ops.TypeApiOps @@ -50,6 +50,8 @@ import org.apache.spark.util.ArrayImplicits._ * TimestampType -> java.time.Instant if spark.sql.datetime.java8API.enabled is true * * TimestampNTZType -> java.time.LocalDateTime + * TimestampNTZNanosType -> java.time.LocalDateTime + * TimestampLTZNanosType -> java.time.Instant * TimeType -> java.time.LocalTime * * DayTimeIntervalType -> java.time.Duration @@ -97,6 +99,14 @@ object RowEncoder extends DataTypeErrorsBase { case TimestampType if SqlApiConf.get.datetimeJava8ApiEnabled => InstantEncoder(lenient) case TimestampType => TimestampEncoder(lenient) case TimestampNTZType => LocalDateTimeEncoder + // Nano timestamp types intentionally do not honor `lenient`: legacy `java.sql.Timestamp` / + // `java.sql.Date` external types are out of scope for nanosecond precision (SPARK-57033). + case t: TimestampNTZNanosType => + DataTypeErrors.checkTimestampNanosTypesEnabled() + LocalDateTimeNanosEncoder(t.precision) + case t: TimestampLTZNanosType => + DataTypeErrors.checkTimestampNanosTypesEnabled() + InstantNanosEncoder(t.precision) case DateType if SqlApiConf.get.datetimeJava8ApiEnabled => LocalDateEncoder(lenient) case DateType => DateEncoder(lenient) case _: TimeType => LocalTimeEncoder diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala index 9684737a22865..597a96c548ce1 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros, rebaseJulianToGregorianDays, rebaseJulianToGregorianMicros} import org.apache.spark.sql.errors.ExecutionErrors import org.apache.spark.sql.types.{DateType, TimestampType, TimeType} -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String} import org.apache.spark.util.SparkClassUtils trait SparkDateTimeUtils { @@ -208,6 +208,82 @@ trait SparkDateTimeUtils { instantToMicros(localDateTime.toInstant(ZoneOffset.UTC)) } + /** + * Truncates the sub-microsecond nanosecond part to the given timestamp precision `p` in [7, 9]. + * Precision 9 keeps all three digits, 8 zeros the last digit, 7 zeros the last two. + * + * The input is the already-extracted `nanosWithinMicro` component (`0..999`), so truncation is + * independent of the epoch sign of the original timestamp value. + * + * Precisions outside `[7, 9]` are passed through unchanged because the surrounding timestamp + * nanos types validate the bound. + */ + private def truncateNanosWithinMicroToPrecision(nanosWithinMicro: Int, precision: Int): Int = { + precision match { + case 7 => (nanosWithinMicro / 100) * 100 + case 8 => (nanosWithinMicro / 10) * 10 + case _ => nanosWithinMicro + } + } + + /** + * Converts a `java.time.LocalDateTime` into the composite `(epochMicros, nanosWithinMicro)` + * pair used by `TimestampNTZNanosType(precision)` (interpreted at UTC). `epochMicros` comes + * from [[localDateTimeToMicros]] (which is floor toward `-inf` for the integral micro part); + * the last three decimal digits of `localDateTime.getNano` (`[0, 999]`) become + * `nanosWithinMicro` after dropping `(9 - precision)` low digits. + * + * Combined, the result is the floor toward `-inf` of the original nanosecond value rounded down + * to the precision step (10^(9 - precision) ns). At `precision = 9` the conversion is lossless + * within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are dropped. The same + * flooring will be the basis of the future `CAST(... AS TIMESTAMP_NTZ(precision))` rule. + */ + def localDateTimeToTimestampNanos( + localDateTime: LocalDateTime, + precision: Int): TimestampNanosVal = { + val epochMicros = localDateTimeToMicros(localDateTime) + val rawNanosWithinMicro = localDateTime.getNano % NANOS_PER_MICROS.toInt + val nanosWithinMicro = truncateNanosWithinMicroToPrecision(rawNanosWithinMicro, precision) + TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro.toShort) + } + + /** + * Reverse of [[localDateTimeToTimestampNanos]]: rebuilds a `java.time.LocalDateTime` (at UTC) + * from a `TimestampNanosVal`. `nanosWithinMicro` is in `[0, 999]` so `plusNanos` never crosses + * the second boundary. + */ + def timestampNanosToLocalDateTime(v: TimestampNanosVal): LocalDateTime = { + microsToLocalDateTime(v.epochMicros).plusNanos(v.nanosWithinMicro.toLong) + } + + /** + * Converts a `java.time.Instant` into the composite `(epochMicros, nanosWithinMicro)` pair used + * by `TimestampLTZNanosType(precision)`. `epochMicros` comes from [[instantToMicros]] (floor + * toward `-inf` for the integral micro part); the last three decimal digits of + * `instant.getNano` (`[0, 999]`) become `nanosWithinMicro` after dropping `(9 - precision)` low + * digits. + * + * Combined, the result is the floor toward `-inf` of the original nanosecond value rounded down + * to the precision step (10^(9 - precision) ns). At `precision = 9` the conversion is lossless + * within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are dropped. The same + * flooring will be the basis of the future `CAST(... AS TIMESTAMP_LTZ(precision))` rule. + */ + def instantToTimestampNanos(instant: Instant, precision: Int): TimestampNanosVal = { + val epochMicros = instantToMicros(instant) + val rawNanosWithinMicro = instant.getNano % NANOS_PER_MICROS.toInt + val nanosWithinMicro = truncateNanosWithinMicroToPrecision(rawNanosWithinMicro, precision) + TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro.toShort) + } + + /** + * Reverse of [[instantToTimestampNanos]]: rebuilds a `java.time.Instant` from a + * `TimestampNanosVal`. `nanosWithinMicro` is in `[0, 999]` so `plusNanos` never crosses the + * second boundary. + */ + def timestampNanosToInstant(v: TimestampNanosVal): Instant = { + microsToInstant(v.epochMicros).plusNanos(v.nanosWithinMicro.toLong) + } + /** * Converts the local date to the number of days since 1970-01-01. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index be66f851e361c..de131f1d58e0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DayTimeIntervalType._ import org.apache.spark.sql.types.YearMonthIntervalType._ -import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, UTF8String} +import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, TimestampNanosVal, UTF8String} import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.collection.Utils @@ -88,6 +88,8 @@ object CatalystTypeConverters { case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => InstantConverter case TimestampType => TimestampConverter case TimestampNTZType => TimestampNTZConverter + case t: TimestampNTZNanosType => new TimestampNTZNanosConverter(t) + case t: TimestampLTZNanosType => new TimestampLTZNanosConverter(t) case dt: DecimalType => new DecimalConverter(dt) case BooleanType => BooleanConverter case ByteType => ByteConverter @@ -298,7 +300,7 @@ object CatalystTypeConverters { } new GenericInternalRow(ar) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -357,7 +359,7 @@ object CatalystTypeConverters { case chr: Char => UTF8String.fromString(chr.toString) case ac: Array[Char] => UTF8String.fromString(String.valueOf(ac)) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -383,7 +385,7 @@ object CatalystTypeConverters { case g: org.apache.spark.sql.types.Geometry if SQLConf.get.geospatialEnabled => STUtils.serializeGeomFromWKB(g, dataType) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -408,7 +410,7 @@ object CatalystTypeConverters { case g: org.apache.spark.sql.types.Geography if SQLConf.get.geospatialEnabled => STUtils.serializeGeogFromWKB(g, dataType) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -432,7 +434,7 @@ object CatalystTypeConverters { case d: Date => DateTimeUtils.fromJavaDate(d) case l: LocalDate => DateTimeUtils.localDateToDays(l) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -472,7 +474,7 @@ object CatalystTypeConverters { case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) case i: Instant => DateTimeUtils.instantToMicros(i) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -500,7 +502,7 @@ object CatalystTypeConverters { override def toCatalystImpl(scalaValue: Any): Any = scalaValue match { case l: LocalDateTime => DateTimeUtils.localDateTimeToMicros(l) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -515,6 +517,50 @@ object CatalystTypeConverters { DateTimeUtils.microsToLocalDateTime(row.getLong(column)) } + private class TimestampNTZNanosConverter(dataType: TimestampNTZNanosType) + extends CatalystTypeConverter[Any, LocalDateTime, TimestampNanosVal] { + override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { + case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l, dataType.precision) + case other => throw new SparkIllegalArgumentException( + errorClass = "INVALID_EXTERNAL_VALUE", + messageParameters = scala.collection.immutable.Map( + "other" -> other.toString, + "otherClass" -> other.getClass.getCanonicalName, + "dataType" -> dataType.sql)) + } + + override def toScala(catalystValue: TimestampNanosVal): LocalDateTime = + if (catalystValue == null) null + else DateTimeUtils.timestampNanosToLocalDateTime(catalystValue) + + override def toScalaImpl(row: InternalRow, column: Int): LocalDateTime = + DateTimeUtils.timestampNanosToLocalDateTime(row.getTimestampNTZNanos(column)) + } + + // Always maps `TimestampLTZNanosType` to `java.time.Instant`. Unlike micro `TimestampType`, + // the mapping does not consult `spark.sql.datetime.java8API.enabled`: the nanos LTZ type is + // post-Java-8 and the legacy `java.sql.Timestamp` external type is intentionally out of scope + // here. See SPARK-57033. + private class TimestampLTZNanosConverter(dataType: TimestampLTZNanosType) + extends CatalystTypeConverter[Any, Instant, TimestampNanosVal] { + override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { + case i: Instant => DateTimeUtils.instantToTimestampNanos(i, dataType.precision) + case other => throw new SparkIllegalArgumentException( + errorClass = "INVALID_EXTERNAL_VALUE", + messageParameters = scala.collection.immutable.Map( + "other" -> other.toString, + "otherClass" -> other.getClass.getCanonicalName, + "dataType" -> dataType.sql)) + } + + override def toScala(catalystValue: TimestampNanosVal): Instant = + if (catalystValue == null) null + else DateTimeUtils.timestampNanosToInstant(catalystValue) + + override def toScalaImpl(row: InternalRow, column: Int): Instant = + DateTimeUtils.timestampNanosToInstant(row.getTimestampLTZNanos(column)) + } + private class DecimalConverter(dataType: DecimalType) extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] { @@ -527,7 +573,7 @@ object CatalystTypeConverters { case d: JavaBigInteger => Decimal(d) case d: Decimal => d case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -655,6 +701,9 @@ object CatalystTypeConverters { case ld: LocalDate => LocalDateConverter.toCatalyst(ld) case t: LocalTime => TimeConverter.toCatalyst(t) case t: Timestamp => TimestampConverter.toCatalyst(t) + // SPARK-57033: schema-less convertToCatalyst keeps bare `Instant` / `LocalDateTime` on the + // microsecond converters. The nanosecond path is schema-driven only - users opt in via an + // explicit `TimestampLTZNanosType` / `TimestampNTZNanosType` column in the schema. case i: Instant => InstantConverter.toCatalyst(i) case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l) case d: BigDecimal => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala index 8bd162afd56db..17ba4fe4203b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.{expressions => exprs} import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue} import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, KryoSerializationCodec} -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder} +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalDateTimeNanosEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder} import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{dataTypeForClass, externalDataTypeFor, isNativeEncoder} import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField, IsNull, Literal, MapKeys, MapValues, UpCast} import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, CreateExternalRow, DecodeUsingSerializer, InitializeJavaBean, Invoke, NewInstance, StaticInvoke, UnresolvedCatalystToExternalMap, UnresolvedMapObjects, WrapOption} @@ -176,6 +176,24 @@ object DeserializerBuildHelper { returnNullable = false) } + def createDeserializerForLocalDateTimeNanos(path: Expression): Expression = { + StaticInvoke( + DateTimeUtils.getClass, + ObjectType(classOf[java.time.LocalDateTime]), + "timestampNanosToLocalDateTime", + path :: Nil, + returnNullable = false) + } + + def createDeserializerForInstantNanos(path: Expression): Expression = { + StaticInvoke( + DateTimeUtils.getClass, + ObjectType(classOf[java.time.Instant]), + "timestampNanosToInstant", + path :: Nil, + returnNullable = false) + } + def createDeserializerForLocalTime(path: Expression): Expression = { StaticInvoke( DateTimeUtils.getClass, @@ -356,6 +374,10 @@ object DeserializerBuildHelper { createDeserializerForInstant(path) case LocalDateTimeEncoder => createDeserializerForLocalDateTime(path) + case _: LocalDateTimeNanosEncoder => + createDeserializerForLocalDateTimeNanos(path) + case _: InstantNanosEncoder => + createDeserializerForInstantNanos(path) case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled => throw org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError() case LocalTimeEncoder => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala index 37a4efc657391..72337a4b21854 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala @@ -22,7 +22,7 @@ import scala.language.existentials import org.apache.spark.sql.catalyst.{expressions => exprs} import org.apache.spark.sql.catalyst.DeserializerBuildHelper.expressionWithNullSafety import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, KryoSerializationCodec} -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveLeafEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder} +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalDateTimeNanosEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveLeafEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder} import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor, isNativeEncoder, lenientExternalDataTypeFor} import org.apache.spark.sql.catalyst.expressions.{BoundReference, CheckOverflow, CreateNamedStruct, Expression, IsNull, KnownNotNull, Literal, UnsafeArrayData} import org.apache.spark.sql.catalyst.expressions.objects._ @@ -166,6 +166,28 @@ object SerializerBuildHelper { returnNullable = false) } + def createSerializerForLocalDateTimeNanos( + inputObject: Expression, + precision: Int): Expression = { + StaticInvoke( + DateTimeUtils.getClass, + TimestampNTZNanosType(precision), + "localDateTimeToTimestampNanos", + inputObject :: Literal(precision) :: Nil, + returnNullable = false) + } + + def createSerializerForInstantNanos( + inputObject: Expression, + precision: Int): Expression = { + StaticInvoke( + DateTimeUtils.getClass, + TimestampLTZNanosType(precision), + "instantToTimestampNanos", + inputObject :: Literal(precision) :: Nil, + returnNullable = false) + } + def createSerializerForJavaLocalDate(inputObject: Expression): Expression = { StaticInvoke( DateTimeUtils.getClass, @@ -376,6 +398,8 @@ object SerializerBuildHelper { case TimestampEncoder(false) => createSerializerForSqlTimestamp(input) case InstantEncoder(false) => createSerializerForJavaInstant(input) case LocalDateTimeEncoder => createSerializerForLocalDateTime(input) + case LocalDateTimeNanosEncoder(p) => createSerializerForLocalDateTimeNanos(input, p) + case InstantNanosEncoder(p) => createSerializerForInstantNanos(input, p) case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled => throw org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError() case LocalTimeEncoder => createSerializerForLocalTime(input) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala index 0fce96c159979..f65203aed8c71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, PhysicalIntegerType, PhysicalLongType} import org.apache.spark.sql.catalyst.types.ops.TypeOps import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType} -import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal, GeometryVal, UTF8String, VariantVal} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType} +import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal, GeometryVal, TimestampNanosVal, UTF8String, VariantVal} /** * :: DeveloperApi :: @@ -107,6 +107,8 @@ object EncoderUtils { case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType] case _: YearMonthIntervalType => classOf[PhysicalIntegerType.InternalType] case _: TimeType => classOf[PhysicalLongType.InternalType] + case _: TimestampNTZNanosType => classOf[TimestampNanosVal] + case _: TimestampLTZNanosType => classOf[TimestampNanosVal] case _: StringType => classOf[UTF8String] case _: StructType => classOf[InternalRow] case _: ArrayType => classOf[ArrayData] @@ -126,6 +128,8 @@ object EncoderUtils { case BinaryType => classOf[Array[Byte]] case _: StringType => classOf[UTF8String] case CalendarIntervalType => classOf[CalendarInterval] + case _: TimestampNTZNanosType => classOf[TimestampNanosVal] + case _: TimestampLTZNanosType => classOf[TimestampNanosVal] case _: StructType => classOf[InternalRow] case _: ArrayType => classOf[ArrayData] case _: MapType => classOf[MapData] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index 222465d82c029..dbb9bc04914a6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DayTimeIntervalType._ import org.apache.spark.sql.types.YearMonthIntervalType._ -import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, UTF8String} +import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, TimestampNanosVal, UTF8String} class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { @@ -109,7 +109,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { exception = intercept[SparkIllegalArgumentException] { CatalystTypeConverters.createToCatalystConverter(structType)("test") }, - condition = "_LEGACY_ERROR_TEMP_3219", + condition = "INVALID_EXTERNAL_VALUE", parameters = Map( "other" -> "test", "otherClass" -> "java.lang.String", @@ -149,7 +149,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { exception = intercept[SparkIllegalArgumentException] { CatalystTypeConverters.createToCatalystConverter(decimalType)("test") }, - condition = "_LEGACY_ERROR_TEMP_3219", + condition = "INVALID_EXTERNAL_VALUE", parameters = Map( "other" -> "test", "otherClass" -> "java.lang.String", @@ -169,7 +169,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { exception = intercept[SparkIllegalArgumentException] { CatalystTypeConverters.createToCatalystConverter(StringType)(0.1) }, - condition = "_LEGACY_ERROR_TEMP_3219", + condition = "INVALID_EXTERNAL_VALUE", parameters = Map( "other" -> "0.1", "otherClass" -> "java.lang.Double", @@ -250,6 +250,244 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { } } + test("SPARK-57033: converting java.time.LocalDateTime to TimestampNTZNanosType") { + Seq( + "0001-01-01T00:00:00", + "1582-10-02T01:02:03.04", + "1582-12-31T23:59:59.999999999", + "1970-01-01T00:00:00.000000001", + "1972-12-31T23:59:59.123456789", + "2019-02-26T16:56:00.123456789", + "9999-12-31T23:59:59.999999999").foreach { text => + val input = LocalDateTime.parse(text) + Seq(7, 8, 9).foreach { p => + val dt = TimestampNTZNanosType(p) + val result = CatalystTypeConverters.createToCatalystConverter(dt)(input) + val expected = DateTimeUtils.localDateTimeToTimestampNanos(input, p) + assert(result === expected) + } + } + } + + test("SPARK-57033: converting TimestampNTZNanosType to java.time.LocalDateTime") { + Seq( + "1582-10-02T01:02:03.04", + "1582-12-31T23:59:59.999999999", + "1970-01-01T00:00:00.000000001", + "1972-12-31T23:59:59.123456789", + "2019-02-26T16:56:00.123456789", + "9999-12-31T23:59:59.999999999").foreach { text => + val ldt = LocalDateTime.parse(text) + val v = DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision = 9) + Seq(7, 8, 9).foreach { p => + val dt = TimestampNTZNanosType(p) + assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === ldt) + } + } + } + + test("SPARK-57033: converting java.time.Instant to TimestampLTZNanosType") { + Seq( + "0001-01-01T00:00:00Z", + "1582-10-02T01:02:03.04Z", + "1582-12-31T23:59:59.999999999Z", + "1970-01-01T00:00:00.000000001Z", + "1972-12-31T23:59:59.123456789Z", + "2019-02-26T16:56:00.123456789Z", + "9999-12-31T23:59:59.999999999Z").foreach { text => + val input = Instant.parse(text) + Seq(7, 8, 9).foreach { p => + val dt = TimestampLTZNanosType(p) + val result = CatalystTypeConverters.createToCatalystConverter(dt)(input) + val expected = DateTimeUtils.instantToTimestampNanos(input, p) + assert(result === expected) + } + } + } + + test("SPARK-57033: converting TimestampLTZNanosType to java.time.Instant") { + Seq( + "1582-10-02T01:02:03.04Z", + "1582-12-31T23:59:59.999999999Z", + "1970-01-01T00:00:00.000000001Z", + "1972-12-31T23:59:59.123456789Z", + "2019-02-26T16:56:00.123456789Z", + "9999-12-31T23:59:59.999999999Z").foreach { text => + val instant = Instant.parse(text) + val v = DateTimeUtils.instantToTimestampNanos(instant, precision = 9) + Seq(7, 8, 9).foreach { p => + val dt = TimestampLTZNanosType(p) + assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === instant) + } + } + } + + test("SPARK-57033: TimestampLTZNanosType -> Instant ignores java8 API flag") { + val instant = Instant.parse("2019-02-26T16:56:00.123456789Z") + val v = DateTimeUtils.instantToTimestampNanos(instant, precision = 9) + val dt = TimestampLTZNanosType() + Seq("true", "false").foreach { flag => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> flag) { + assert(CatalystTypeConverters.createToCatalystConverter(dt)(instant) === v) + assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === instant) + } + } + } + + test("SPARK-57033: TimestampNanosConverter null handling in rows") { + val schema = StructType( + StructField("ntz", TimestampNTZNanosType(9), nullable = true) :: + StructField("ltz", TimestampLTZNanosType(9), nullable = true) :: Nil) + val toCat = CatalystTypeConverters.createToCatalystConverter(schema) + val toScala = CatalystTypeConverters.createToScalaConverter(schema) + // Reference value to ensure non-null cells are kept as-is. + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + val instant = Instant.parse("2019-02-26T16:56:00.987654321Z") + val row = Row(ldt, instant) + val catalystRow = toCat(row).asInstanceOf[InternalRow] + assert(catalystRow.getTimestampNTZNanos(0) === + DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision = 9)) + assert(catalystRow.getTimestampLTZNanos(1) === + DateTimeUtils.instantToTimestampNanos(instant, precision = 9)) + assert(toScala(catalystRow) === row) + // Null row. + val nullRow = Row.fromSeq(Seq(null, null)) + assert(toScala(toCat(nullRow)) === nullRow) + } + + test("SPARK-57033: TimestampNTZNanosType converter truncates sub-micro to precision") { + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789") + val cases = Map(7 -> 700, 8 -> 780, 9 -> 789) + Seq(ldt, negativeEpochLdt).foreach { input => + cases.foreach { case (p, expectedNanosWithinMicro) => + val dt = TimestampNTZNanosType(p) + val v = CatalystTypeConverters.createToCatalystConverter(dt)(input) + .asInstanceOf[TimestampNanosVal] + assert(v.nanosWithinMicro === expectedNanosWithinMicro, + s"input=$input, precision=$p: expected $expectedNanosWithinMicro, " + + s"got ${v.nanosWithinMicro}") + } + } + } + + test("SPARK-57033: TimestampLTZNanosType converter truncates sub-micro to precision") { + val instant = Instant.parse("2019-02-26T16:56:00.123456789Z") + val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z") + val cases = Map(7 -> 700, 8 -> 780, 9 -> 789) + Seq(instant, negativeEpochInstant).foreach { input => + cases.foreach { case (p, expectedNanosWithinMicro) => + val dt = TimestampLTZNanosType(p) + val v = CatalystTypeConverters.createToCatalystConverter(dt)(input) + .asInstanceOf[TimestampNanosVal] + assert(v.nanosWithinMicro === expectedNanosWithinMicro, + s"input=$input, precision=$p: expected $expectedNanosWithinMicro, " + + s"got ${v.nanosWithinMicro}") + } + } + } + + test("SPARK-57033: TimestampNTZNanosType converter rejects wrong external type") { + val dt = TimestampNTZNanosType(9) + checkError( + exception = intercept[SparkIllegalArgumentException] { + CatalystTypeConverters.createToCatalystConverter(dt)("not-a-LocalDateTime") + }, + condition = "INVALID_EXTERNAL_VALUE", + parameters = Map( + "other" -> "not-a-LocalDateTime", + "otherClass" -> "java.lang.String", + "dataType" -> "TIMESTAMP_NTZ(9)")) + // An `Instant` is also not accepted - the NTZ nano converter is wall-clock only. + val instant = Instant.parse("2019-02-26T16:56:00.123456789Z") + checkError( + exception = intercept[SparkIllegalArgumentException] { + CatalystTypeConverters.createToCatalystConverter(dt)(instant) + }, + condition = "INVALID_EXTERNAL_VALUE", + parameters = Map( + "other" -> instant.toString, + "otherClass" -> "java.time.Instant", + "dataType" -> "TIMESTAMP_NTZ(9)")) + } + + test("SPARK-57033: TimestampLTZNanosType converter rejects wrong external type") { + val dt = TimestampLTZNanosType(9) + checkError( + exception = intercept[SparkIllegalArgumentException] { + CatalystTypeConverters.createToCatalystConverter(dt)("not-an-Instant") + }, + condition = "INVALID_EXTERNAL_VALUE", + parameters = Map( + "other" -> "not-an-Instant", + "otherClass" -> "java.lang.String", + "dataType" -> "TIMESTAMP_LTZ(9)")) + // A `LocalDateTime` is also not accepted - LTZ requires an absolute instant. + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + checkError( + exception = intercept[SparkIllegalArgumentException] { + CatalystTypeConverters.createToCatalystConverter(dt)(ldt) + }, + condition = "INVALID_EXTERNAL_VALUE", + parameters = Map( + "other" -> ldt.toString, + "otherClass" -> "java.time.LocalDateTime", + "dataType" -> "TIMESTAMP_LTZ(9)")) + // The legacy `java.sql.Timestamp` external type is intentionally out of scope for + // `TimestampLTZNanosType` (see SPARK-57033). Verify it is rejected, not silently + // accepted via a fallback. + val ts = java.sql.Timestamp.from(Instant.parse("2019-02-26T16:56:00.123456789Z")) + checkError( + exception = intercept[SparkIllegalArgumentException] { + CatalystTypeConverters.createToCatalystConverter(dt)(ts) + }, + condition = "INVALID_EXTERNAL_VALUE", + parameters = Map( + "other" -> ts.toString, + "otherClass" -> "java.sql.Timestamp", + "dataType" -> "TIMESTAMP_LTZ(9)")) + } + + test("SPARK-57033: nested nanos timestamp types in arrays / maps / structs") { + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + val instant = Instant.parse("2019-02-26T16:56:00.987654321Z") + val ldtNano = DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision = 9) + val instantNano = DateTimeUtils.instantToTimestampNanos(instant, precision = 9) + + // Array of TimestampNTZNanosType. + val ntzArrayType = ArrayType(TimestampNTZNanosType(9), containsNull = true) + val ldts = Seq(ldt, null, ldt) + val catArr = CatalystTypeConverters.createToCatalystConverter(ntzArrayType)(ldts) + .asInstanceOf[GenericArrayData] + assert(catArr.numElements() === 3) + assert(catArr.get(0, TimestampNTZNanosType(9)) === ldtNano) + assert(catArr.isNullAt(1)) + assert(catArr.get(2, TimestampNTZNanosType(9)) === ldtNano) + assert(CatalystTypeConverters.createToScalaConverter(ntzArrayType)(catArr) === ldts) + + // Map of (String -> TimestampLTZNanosType). + val ltzMapType = MapType(StringType, TimestampLTZNanosType(9), valueContainsNull = true) + val instants = Map[String, Instant]("a" -> instant, "b" -> null) + val catMap = CatalystTypeConverters.createToCatalystConverter(ltzMapType)(instants) + val backMap = CatalystTypeConverters.createToScalaConverter(ltzMapType)(catMap) + .asInstanceOf[Map[Any, Any]] + assert(backMap("a") === instant) + assert(backMap("b") == null) + + // Struct with both nano fields plus a non-nano field, and a null row. + val nestedStruct = StructType( + StructField("ntz", TimestampNTZNanosType(9), nullable = true) :: + StructField("ltz", TimestampLTZNanosType(9), nullable = true) :: + StructField("name", StringType, nullable = true) :: Nil) + val outerStruct = StructType(StructField("inner", nestedStruct, nullable = true) :: Nil) + val outerToCat = CatalystTypeConverters.createToCatalystConverter(outerStruct) + val outerToScala = CatalystTypeConverters.createToScalaConverter(outerStruct) + val outerRow = Row(Row(ldt, instant, "abc")) + val nullOuterRow = Row(null) + assert(outerToScala(outerToCat(outerRow)) === outerRow) + assert(outerToScala(outerToCat(nullOuterRow)) === nullOuterRow) + } + test("converting java.time.LocalDate to DateType") { Seq( "0101-02-16", @@ -579,7 +817,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { exception = intercept[SparkIllegalArgumentException] { CatalystTypeConverters.createToCatalystConverter(gt)("test") }, - condition = "_LEGACY_ERROR_TEMP_3219", + condition = "INVALID_EXTERNAL_VALUE", parameters = Map( "other" -> "test", "otherClass" -> "java.lang.String", @@ -592,7 +830,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { exception = intercept[SparkIllegalArgumentException] { CatalystTypeConverters.createToCatalystConverter(gt)("test") }, - condition = "_LEGACY_ERROR_TEMP_3219", + condition = "INVALID_EXTERNAL_VALUE", parameters = Map( "other" -> "test", "otherClass" -> "java.lang.String", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index 09247a459b9c5..69f4995220fe2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.encoders import scala.collection.mutable import scala.util.Random -import org.apache.spark.SparkRuntimeException +import org.apache.spark.{SparkException, SparkRuntimeException} import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest @@ -381,6 +381,76 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { assert(readback.get(0) === localDateTime) } + test("SPARK-57033: encoding/decoding TimestampNTZNanosType to/from java.time.LocalDateTime") { + val inputs = Seq( + java.time.LocalDateTime.parse("2019-02-26T16:56:00.123456789"), + java.time.LocalDateTime.parse("1969-12-31T23:59:59.123456789")) + for (localDateTime <- inputs) { + for (p <- TimestampNTZNanosType.MIN_PRECISION to TimestampNTZNanosType.MAX_PRECISION) { + val schema = new StructType().add("t", TimestampNTZNanosType(p)) + val encoder = ExpressionEncoder(schema).resolveAndBind() + val row = toRow(encoder, Row(localDateTime)) + val expected = DateTimeUtils.localDateTimeToTimestampNanos(localDateTime, p) + assert(row.getTimestampNTZNanos(0) === expected) + val readback = fromRow(encoder, row) + assert(readback.get(0) === DateTimeUtils.timestampNanosToLocalDateTime(expected)) + } + } + } + + test("SPARK-57033: encoding/decoding TimestampLTZNanosType to/from java.time.Instant") { + val inputs = Seq( + java.time.Instant.parse("2019-02-26T16:56:00.123456789Z"), + java.time.Instant.parse("1969-12-31T23:59:59.123456789Z")) + for (instant <- inputs) { + for (p <- TimestampLTZNanosType.MIN_PRECISION to TimestampLTZNanosType.MAX_PRECISION) { + val schema = new StructType().add("t", TimestampLTZNanosType(p)) + val encoder = ExpressionEncoder(schema).resolveAndBind() + val row = toRow(encoder, Row(instant)) + val expected = DateTimeUtils.instantToTimestampNanos(instant, p) + assert(row.getTimestampLTZNanos(0) === expected) + val readback = fromRow(encoder, row) + assert(readback.get(0) === DateTimeUtils.timestampNanosToInstant(expected)) + } + } + } + + test("SPARK-57033: encoding/decoding TimestampLTZNanosType ignores java8 API flag") { + val instant = java.time.Instant.parse("2019-02-26T16:56:00.123456789Z") + val schema = new StructType().add("t", TimestampLTZNanosType()) + Seq("true", "false").foreach { flag => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> flag) { + val encoder = ExpressionEncoder(schema).resolveAndBind() + val row = toRow(encoder, Row(instant)) + assert(row.getTimestampLTZNanos(0) === + DateTimeUtils.instantToTimestampNanos(instant, precision = 9)) + val readback = fromRow(encoder, row) + assert(readback.get(0) === instant) + } + } + } + + test("SPARK-57033: RowEncoder rejects nanos timestamp types when feature flag is off") { + Seq( + new StructType().add("t", TimestampNTZNanosType()), + new StructType().add("t", TimestampLTZNanosType()) + ).foreach { schema => + withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "false") { + checkError( + exception = intercept[SparkException] { + ExpressionEncoder(schema) + }, + condition = "FEATURE_NOT_ENABLED", + parameters = Map( + "featureName" -> "Nanosecond-precision timestamp types", + "configKey" -> "spark.sql.timestampNanosTypes.enabled", + "configValue" -> "true" + ) + ) + } + } + } + test("encoding/decoding DateType to/from java.time.LocalDate") { withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") { val schema = new StructType().add("d", DateType) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 4810ec69bb96e..47eb4a1e3e3cb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLConf import org.apache.spark.sql.internal.SqlApiConf import org.apache.spark.sql.types.DayTimeIntervalType.{HOUR, SECOND} import org.apache.spark.sql.types.Decimal -import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} +import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal, UTF8String} class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { @@ -1851,4 +1851,150 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { timeBucketYMInterval(1, Long.MinValue, 0L, utc) } } + + test("SPARK-57033: java.time.LocalDateTime <-> TimestampNanosVal roundtrip") { + val cases = Seq( + LocalDateTime.parse("0001-01-01T00:00:00"), + LocalDateTime.parse("1582-10-02T01:02:03.04"), + LocalDateTime.parse("1582-12-31T23:59:59.999999999"), + LocalDateTime.parse("1969-12-31T23:59:59.999999999"), + LocalDateTime.parse("1970-01-01T00:00:00"), + LocalDateTime.parse("1970-01-01T00:00:00.000000001"), + LocalDateTime.parse("1970-01-01T00:00:00.123456789"), + LocalDateTime.parse("2019-02-26T16:56:00.123456789"), + LocalDateTime.parse("9999-12-31T23:59:59.999999999")) + for (ldt <- cases) { + val v = localDateTimeToTimestampNanos(ldt, precision = 9) + assert(v.nanosWithinMicro >= 0 && v.nanosWithinMicro <= 999, + s"nanosWithinMicro out of range for $ldt: $v") + assert(v.nanosWithinMicro === (ldt.getNano % 1000).toShort) + assert(v.epochMicros === DateTimeUtils.localDateTimeToMicros(ldt)) + assert(timestampNanosToLocalDateTime(v) === ldt) + } + } + + test("SPARK-57033: java.time.Instant <-> TimestampNanosVal roundtrip") { + val cases = Seq( + Instant.EPOCH, + Instant.parse("0001-01-01T00:00:00Z"), + Instant.parse("1582-10-02T01:02:03.04Z"), + Instant.parse("1582-12-31T23:59:59.999999999Z"), + Instant.parse("1969-12-31T23:59:59.999999999Z"), + Instant.parse("1970-01-01T00:00:00.000000001Z"), + Instant.parse("2019-02-26T16:56:00.123456789Z"), + Instant.parse("9999-12-31T23:59:59.999999999Z")) + for (i <- cases) { + val v = instantToTimestampNanos(i, precision = 9) + assert(v.nanosWithinMicro >= 0 && v.nanosWithinMicro <= 999, + s"nanosWithinMicro out of range for $i: $v") + assert(v.nanosWithinMicro === (i.getNano % 1000).toShort) + assert(v.epochMicros === instantToMicros(i)) + assert(timestampNanosToInstant(v) === i) + } + } + + test("SPARK-57033: TimestampNanosVal random roundtrip") { + val rnd = new scala.util.Random(0) + // Random instants across the valid range with every possible sub-micro digit. + val min = Instant.parse("0001-01-01T00:00:00Z").getEpochSecond + val max = Instant.parse("9999-12-31T23:59:59.999999999Z").getEpochSecond + for (_ <- 0 until 200) { + val secs = min + math.abs(rnd.nextLong()) % (max - min + 1) + val nano = rnd.nextInt(1_000_000_000) + val i = Instant.ofEpochSecond(secs, nano.toLong) + val v = instantToTimestampNanos(i, precision = 9) + assert(timestampNanosToInstant(v) === i) + val ldt = LocalDateTime.ofInstant(i, ZoneOffset.UTC) + val v2 = localDateTimeToTimestampNanos(ldt, precision = 9) + assert(timestampNanosToLocalDateTime(v2) === ldt) + // Internal layout matches direct decomposition. + assert(v === TimestampNanosVal.fromParts(instantToMicros(i), (nano % 1000).toShort)) + } + } + + test("SPARK-57033: localDateTimeToTimestampNanos truncates sub-micro to precision") { + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + // precision 9 keeps all 3 sub-micro digits, 8 drops the last, 7 drops the last two. + assert(localDateTimeToTimestampNanos(ldt, precision = 9).nanosWithinMicro === 789) + assert(localDateTimeToTimestampNanos(ldt, precision = 8).nanosWithinMicro === 780) + assert(localDateTimeToTimestampNanos(ldt, precision = 7).nanosWithinMicro === 700) + // epochMicros is unaffected by sub-micro truncation. + val expectedMicros = DateTimeUtils.localDateTimeToMicros(ldt) + for (p <- 7 to 9) { + assert(localDateTimeToTimestampNanos(ldt, precision = p).epochMicros === expectedMicros) + } + + val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789") + assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = 9).nanosWithinMicro === 789) + assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = 8).nanosWithinMicro === 780) + assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = 7).nanosWithinMicro === 700) + val negativeExpectedMicros = DateTimeUtils.localDateTimeToMicros(negativeEpochLdt) + for (p <- 7 to 9) { + assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = p).epochMicros === + negativeExpectedMicros) + } + } + + test("SPARK-57033: instantToTimestampNanos truncates sub-micro to precision") { + val i = Instant.parse("2019-02-26T16:56:00.123456789Z") + assert(instantToTimestampNanos(i, precision = 9).nanosWithinMicro === 789) + assert(instantToTimestampNanos(i, precision = 8).nanosWithinMicro === 780) + assert(instantToTimestampNanos(i, precision = 7).nanosWithinMicro === 700) + val expectedMicros = instantToMicros(i) + for (p <- 7 to 9) { + assert(instantToTimestampNanos(i, precision = p).epochMicros === expectedMicros) + } + + val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z") + assert(instantToTimestampNanos(negativeEpochInstant, precision = 9).nanosWithinMicro === 789) + assert(instantToTimestampNanos(negativeEpochInstant, precision = 8).nanosWithinMicro === 780) + assert(instantToTimestampNanos(negativeEpochInstant, precision = 7).nanosWithinMicro === 700) + val negativeExpectedMicros = instantToMicros(negativeEpochInstant) + for (p <- 7 to 9) { + assert(instantToTimestampNanos(negativeEpochInstant, precision = p).epochMicros === + negativeExpectedMicros) + } + } + + test("SPARK-57033: random roundtrip across precisions floors to the precision step") { + val rnd = new scala.util.Random(0) + val min = Instant.parse("0001-01-01T00:00:00Z").getEpochSecond + val max = Instant.parse("9999-12-31T23:59:59.999999999Z").getEpochSecond + // For each random instant, verify both helpers floor the original nanosecond value + // (toward `-inf`) to the precision step `10^(9 - p)` ns. The whole-instant nanosecond + // count overflows `Long` for far-future dates, so we check the floor on the components + // instead: `epochMicros` is invariant across precisions (matches `instantToMicros`) and + // the sub-micro nanosecond residual is floored to the precision step. + for (_ <- 0 until 10) { + val secs = min + math.abs(rnd.nextLong()) % (max - min + 1) + val nano = rnd.nextInt(1_000_000_000) + val instant = Instant.ofEpochSecond(secs, nano.toLong) + val ldt = LocalDateTime.ofInstant(instant, ZoneOffset.UTC) + val expectedMicros = instantToMicros(instant) + val rawSubMicro = (nano % 1000).toShort + for (p <- 7 to 9) { + val step = math.pow(10, 9 - p).toLong.toShort + val expectedSubMicro = ((rawSubMicro / step) * step).toShort + + val ltz = instantToTimestampNanos(instant, p) + assert(ltz.epochMicros === expectedMicros, + s"LTZ p=$p instant=$instant epochMicros=${ltz.epochMicros}") + assert(ltz.nanosWithinMicro === expectedSubMicro, + s"LTZ p=$p instant=$instant nanosWithinMicro=${ltz.nanosWithinMicro}") + // Roundtrip preserves the truncated value. + val ltzBack = timestampNanosToInstant(ltz) + assert(instantToMicros(ltzBack) === expectedMicros) + assert(ltzBack.getNano % 1000 === expectedSubMicro) + + val ntz = localDateTimeToTimestampNanos(ldt, p) + assert(ntz.epochMicros === expectedMicros, + s"NTZ p=$p ldt=$ldt epochMicros=${ntz.epochMicros}") + assert(ntz.nanosWithinMicro === expectedSubMicro, + s"NTZ p=$p ldt=$ldt nanosWithinMicro=${ntz.nanosWithinMicro}") + val ntzBack = timestampNanosToLocalDateTime(ntz) + assert(DateTimeUtils.localDateTimeToMicros(ntzBack) === expectedMicros) + assert(ntzBack.getNano % 1000 === expectedSubMicro) + } + } + } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 62d44e0af8b08..416c92602b833 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -789,6 +789,24 @@ public void testLocalDateTimeEncoder() { Assertions.assertEquals(data, ds.collectAsList()); } + @Test + public void testTimestampNanosRowEncoder() { + final StructType schema = new StructType() + .add("ntz", org.apache.spark.sql.types.TimestampNTZNanosType.apply()) + .add("ltz", org.apache.spark.sql.types.TimestampLTZNanosType.apply()); + LocalDateTime ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789"); + Instant instant = Instant.parse("2019-02-26T16:56:00.987654321Z"); + List rows = Arrays.asList(create(ldt, instant), create(null, null)); + Dataset ds = spark.createDataset(rows, Encoders.row(schema)); + Assertions.assertEquals(schema, ds.schema()); + List collected = ds.collectAsList(); + Assertions.assertEquals(2, collected.size()); + Assertions.assertEquals(ldt, collected.get(0).get(0)); + Assertions.assertEquals(instant, collected.get(0).get(1)); + Assertions.assertTrue(collected.get(1).isNullAt(0)); + Assertions.assertTrue(collected.get(1).isNullAt(1)); + } + @Test public void testDurationEncoder() { Encoder encoder = Encoders.DURATION(); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 879569045b6d1..dc930af874908 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} +import java.time.{Instant, LocalDateTime} import scala.collection.immutable.HashSet import scala.collection.mutable @@ -2631,6 +2632,49 @@ class DatasetSuite extends SharedSparkSession assert(Seq(localDateTime).toDS().head() === localDateTime) } + test("SPARK-57033: Dataset[Row] roundtrip preserves nanosecond precision") { + val schema = new StructType() + .add("ntz", TimestampNTZNanosType(9), nullable = true) + .add("ltz", TimestampLTZNanosType(9), nullable = true) + val rows = Seq( + Row( + LocalDateTime.parse("2019-02-26T16:56:00.123456789"), + Instant.parse("2019-02-26T16:56:00.987654321Z")), + Row( + LocalDateTime.parse("9999-12-31T23:59:59.999999999"), + Instant.parse("9999-12-31T23:59:59.999999999Z")), + Row(null, null)) + val df = spark.createDataFrame(rows.asJava, schema) + assert(df.schema === schema) + checkAnswer(df, rows) + } + + test("SPARK-57033: Dataset[Row] roundtrip truncates sub-micro to declared precision") { + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + val instant = Instant.parse("2019-02-26T16:56:00.123456789Z") + val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789") + val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z") + // At p=7 the last two sub-micro digits are dropped (789 -> 700); + // at p=8 only the last one is dropped (789 -> 780). + val expectedSubMicro = Map(7 -> 700, 8 -> 780) + + for (p <- 7 to 8) { + val schema = new StructType() + .add("ntz", TimestampNTZNanosType(p), nullable = true) + .add("ltz", TimestampLTZNanosType(p), nullable = true) + val rows = Seq( + Row(ldt, instant), + Row(negativeEpochLdt, negativeEpochInstant)) + val df = spark.createDataFrame(rows.asJava, schema) + assert(df.schema === schema) + val drop = 789 - expectedSubMicro(p) + val expected = Seq( + Row(ldt.minusNanos(drop), instant.minusNanos(drop)), + Row(negativeEpochLdt.minusNanos(drop), negativeEpochInstant.minusNanos(drop))) + checkAnswer(df, expected) + } + } + test("SPARK-34605: implicit encoder for java.time.Duration") { val duration = java.time.Duration.ofMinutes(10) assert(spark.range(1).map { _ => duration }.head() === duration)