From 845c0c902656af557d18d5c4e79bbc0a103f2d46 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 27 May 2026 23:29:10 +0200 Subject: [PATCH 01/13] Add java.time Conversion and Dataset Roundtrip for Nanosecond Timestamps --- .../catalyst/encoders/AgnosticEncoder.scala | 8 ++ .../sql/catalyst/encoders/RowEncoder.scala | 6 +- .../catalyst/util/SparkDateTimeUtils.scala | 46 +++++++- .../sql/catalyst/CatalystTypeConverters.scala | 48 +++++++- .../catalyst/DeserializerBuildHelper.scala | 24 +++- .../sql/catalyst/SerializerBuildHelper.scala | 26 ++++- .../sql/catalyst/encoders/EncoderUtils.scala | 8 +- .../CatalystTypeConvertersSuite.scala | 105 ++++++++++++++++++ .../catalyst/encoders/RowEncoderSuite.scala | 41 +++++++ .../catalyst/util/DateTimeUtilsSuite.scala | 62 ++++++++++- .../apache/spark/sql/JavaDatasetSuite.java | 23 ++++ .../org/apache/spark/sql/DatasetSuite.scala | 26 +++++ 12 files changed, 415 insertions(+), 8 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala index 20949c188cb81..57c15de4f0db4 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala @@ -257,6 +257,14 @@ object AgnosticEncoders { case class InstantEncoder(override val lenientSerialization: Boolean) extends LeafEncoder[Instant](TimestampType) case object LocalDateTimeEncoder extends LeafEncoder[LocalDateTime](TimestampNTZType) + // Nanosecond-precision counterparts of `LocalDateTimeEncoder` / `InstantEncoder(false)`. + // They are used by `RowEncoder` when the schema declares a `TimestampNTZNanosType(p)` or + // `TimestampLTZNanosType(p)` column, so Dataset create/collect roundtrips preserve full + // nanosecond precision. See SPARK-57033. + case class LocalDateTimeNanosEncoder(precision: Int) + extends LeafEncoder[LocalDateTime](TimestampNTZNanosType(precision)) + case class InstantNanosEncoder(precision: Int) + extends LeafEncoder[Instant](TimestampLTZNanosType(precision)) case object LocalTimeEncoder extends LeafEncoder[LocalTime](TimeType()) case class SparkDecimalEncoder(dt: DecimalType) extends LeafEncoder[Decimal](dt) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index bad673672188c..a657075719aef 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import scala.reflect.classTag import org.apache.spark.sql.{AnalysisException, Row} -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder} +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalDateTimeNanosEncoder, LocalTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder} import org.apache.spark.sql.errors.DataTypeErrorsBase import org.apache.spark.sql.internal.SqlApiConf import org.apache.spark.sql.types._ @@ -50,6 +50,8 @@ import org.apache.spark.util.ArrayImplicits._ * TimestampType -> java.time.Instant if spark.sql.datetime.java8API.enabled is true * * TimestampNTZType -> java.time.LocalDateTime + * TimestampNTZNanosType -> java.time.LocalDateTime + * TimestampLTZNanosType -> java.time.Instant * TimeType -> java.time.LocalTime * * DayTimeIntervalType -> java.time.Duration @@ -97,6 +99,8 @@ object RowEncoder extends DataTypeErrorsBase { case TimestampType if SqlApiConf.get.datetimeJava8ApiEnabled => InstantEncoder(lenient) case TimestampType => TimestampEncoder(lenient) case TimestampNTZType => LocalDateTimeEncoder + case t: TimestampNTZNanosType => LocalDateTimeNanosEncoder(t.precision) + case t: TimestampLTZNanosType => InstantNanosEncoder(t.precision) case DateType if SqlApiConf.get.datetimeJava8ApiEnabled => LocalDateEncoder(lenient) case DateType => DateEncoder(lenient) case _: TimeType => LocalTimeEncoder diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala index 9684737a22865..ee45c44ab830d 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros, rebaseJulianToGregorianDays, rebaseJulianToGregorianMicros} import org.apache.spark.sql.errors.ExecutionErrors import org.apache.spark.sql.types.{DateType, TimestampType, TimeType} -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String} import org.apache.spark.util.SparkClassUtils trait SparkDateTimeUtils { @@ -208,6 +208,50 @@ trait SparkDateTimeUtils { instantToMicros(localDateTime.toInstant(ZoneOffset.UTC)) } + /** + * Converts a `java.time.LocalDateTime` into the composite `(epochMicros, nanosWithinMicro)` + * pair used by `TimestampNTZNanosType`. The sub-microsecond part of `localDateTime.getNano` + * (its last three decimal digits, in `[0, 999]`) is preserved as `nanosWithinMicro`; the rest + * of the local date-time becomes `epochMicros` via [[localDateTimeToMicros]] (i.e. interpreted + * at UTC). The conversion is lossless within the valid range of `TimestampNTZNanosType`. + */ + def localDateTimeToTimestampNanos(localDateTime: LocalDateTime): TimestampNanosVal = { + val epochMicros = localDateTimeToMicros(localDateTime) + val nanosWithinMicro = (localDateTime.getNano % NANOS_PER_MICROS).toShort + TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro) + } + + /** + * Reverse of [[localDateTimeToTimestampNanos]]: rebuilds a `java.time.LocalDateTime` (at UTC) + * from a `TimestampNanosVal`. `nanosWithinMicro` is in `[0, 999]` so `plusNanos` never crosses + * the second boundary. + */ + def timestampNanosToLocalDateTime(v: TimestampNanosVal): LocalDateTime = { + microsToLocalDateTime(v.epochMicros).plusNanos(v.nanosWithinMicro.toLong) + } + + /** + * Converts a `java.time.Instant` into the composite `(epochMicros, nanosWithinMicro)` pair used + * by `TimestampLTZNanosType`. `epochMicros` is computed via [[instantToMicros]] (floor division + * of `instant.getNano` to micros), and the last three sub-micro nanosecond digits are kept as + * `nanosWithinMicro` in `[0, 999]`. The conversion is lossless within the valid range of + * `TimestampLTZNanosType`. + */ + def instantToTimestampNanos(instant: Instant): TimestampNanosVal = { + val epochMicros = instantToMicros(instant) + val nanosWithinMicro = (instant.getNano % NANOS_PER_MICROS).toShort + TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro) + } + + /** + * Reverse of [[instantToTimestampNanos]]: rebuilds a `java.time.Instant` from a + * `TimestampNanosVal`. `nanosWithinMicro` is in `[0, 999]` so `plusNanos` never crosses the + * second boundary. + */ + def timestampNanosToInstant(v: TimestampNanosVal): Instant = { + microsToInstant(v.epochMicros).plusNanos(v.nanosWithinMicro.toLong) + } + /** * Converts the local date to the number of days since 1970-01-01. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index be66f851e361c..c9f72c1f3e12b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DayTimeIntervalType._ import org.apache.spark.sql.types.YearMonthIntervalType._ -import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, UTF8String} +import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, TimestampNanosVal, UTF8String} import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.collection.Utils @@ -88,6 +88,8 @@ object CatalystTypeConverters { case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => InstantConverter case TimestampType => TimestampConverter case TimestampNTZType => TimestampNTZConverter + case _: TimestampNTZNanosType => TimestampNTZNanosConverter + case _: TimestampLTZNanosType => TimestampLTZNanosConverter case dt: DecimalType => new DecimalConverter(dt) case BooleanType => BooleanConverter case ByteType => ByteConverter @@ -515,6 +517,50 @@ object CatalystTypeConverters { DateTimeUtils.microsToLocalDateTime(row.getLong(column)) } + private object TimestampNTZNanosConverter + extends CatalystTypeConverter[Any, LocalDateTime, TimestampNanosVal] { + override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { + case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l) + case other => throw new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_3219", + messageParameters = scala.collection.immutable.Map( + "other" -> other.toString, + "otherClass" -> other.getClass.getCanonicalName, + "dataType" -> TimestampNTZNanosType().sql)) + } + + override def toScala(catalystValue: TimestampNanosVal): LocalDateTime = + if (catalystValue == null) null + else DateTimeUtils.timestampNanosToLocalDateTime(catalystValue) + + override def toScalaImpl(row: InternalRow, column: Int): LocalDateTime = + DateTimeUtils.timestampNanosToLocalDateTime(row.getTimestampNTZNanos(column)) + } + + // Always maps `TimestampLTZNanosType` to `java.time.Instant`. Unlike micro `TimestampType`, + // the mapping does not consult `spark.sql.datetime.java8API.enabled`: the nanos LTZ type is + // post-Java-8 and the legacy `java.sql.Timestamp` external type is intentionally out of scope + // here. See SPARK-57033. + private object TimestampLTZNanosConverter + extends CatalystTypeConverter[Any, Instant, TimestampNanosVal] { + override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { + case i: Instant => DateTimeUtils.instantToTimestampNanos(i) + case other => throw new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_3219", + messageParameters = scala.collection.immutable.Map( + "other" -> other.toString, + "otherClass" -> other.getClass.getCanonicalName, + "dataType" -> TimestampLTZNanosType().sql)) + } + + override def toScala(catalystValue: TimestampNanosVal): Instant = + if (catalystValue == null) null + else DateTimeUtils.timestampNanosToInstant(catalystValue) + + override def toScalaImpl(row: InternalRow, column: Int): Instant = + DateTimeUtils.timestampNanosToInstant(row.getTimestampLTZNanos(column)) + } + private class DecimalConverter(dataType: DecimalType) extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala index 8bd162afd56db..17ba4fe4203b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.{expressions => exprs} import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue} import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, KryoSerializationCodec} -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder} +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalDateTimeNanosEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder} import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{dataTypeForClass, externalDataTypeFor, isNativeEncoder} import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField, IsNull, Literal, MapKeys, MapValues, UpCast} import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, CreateExternalRow, DecodeUsingSerializer, InitializeJavaBean, Invoke, NewInstance, StaticInvoke, UnresolvedCatalystToExternalMap, UnresolvedMapObjects, WrapOption} @@ -176,6 +176,24 @@ object DeserializerBuildHelper { returnNullable = false) } + def createDeserializerForLocalDateTimeNanos(path: Expression): Expression = { + StaticInvoke( + DateTimeUtils.getClass, + ObjectType(classOf[java.time.LocalDateTime]), + "timestampNanosToLocalDateTime", + path :: Nil, + returnNullable = false) + } + + def createDeserializerForInstantNanos(path: Expression): Expression = { + StaticInvoke( + DateTimeUtils.getClass, + ObjectType(classOf[java.time.Instant]), + "timestampNanosToInstant", + path :: Nil, + returnNullable = false) + } + def createDeserializerForLocalTime(path: Expression): Expression = { StaticInvoke( DateTimeUtils.getClass, @@ -356,6 +374,10 @@ object DeserializerBuildHelper { createDeserializerForInstant(path) case LocalDateTimeEncoder => createDeserializerForLocalDateTime(path) + case _: LocalDateTimeNanosEncoder => + createDeserializerForLocalDateTimeNanos(path) + case _: InstantNanosEncoder => + createDeserializerForInstantNanos(path) case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled => throw org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError() case LocalTimeEncoder => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala index 37a4efc657391..490ed632d0243 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala @@ -22,7 +22,7 @@ import scala.language.existentials import org.apache.spark.sql.catalyst.{expressions => exprs} import org.apache.spark.sql.catalyst.DeserializerBuildHelper.expressionWithNullSafety import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, KryoSerializationCodec} -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveLeafEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder} +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalDateTimeNanosEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveLeafEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder} import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor, isNativeEncoder, lenientExternalDataTypeFor} import org.apache.spark.sql.catalyst.expressions.{BoundReference, CheckOverflow, CreateNamedStruct, Expression, IsNull, KnownNotNull, Literal, UnsafeArrayData} import org.apache.spark.sql.catalyst.expressions.objects._ @@ -166,6 +166,28 @@ object SerializerBuildHelper { returnNullable = false) } + def createSerializerForLocalDateTimeNanos( + inputObject: Expression, + precision: Int): Expression = { + StaticInvoke( + DateTimeUtils.getClass, + TimestampNTZNanosType(precision), + "localDateTimeToTimestampNanos", + inputObject :: Nil, + returnNullable = false) + } + + def createSerializerForInstantNanos( + inputObject: Expression, + precision: Int): Expression = { + StaticInvoke( + DateTimeUtils.getClass, + TimestampLTZNanosType(precision), + "instantToTimestampNanos", + inputObject :: Nil, + returnNullable = false) + } + def createSerializerForJavaLocalDate(inputObject: Expression): Expression = { StaticInvoke( DateTimeUtils.getClass, @@ -376,6 +398,8 @@ object SerializerBuildHelper { case TimestampEncoder(false) => createSerializerForSqlTimestamp(input) case InstantEncoder(false) => createSerializerForJavaInstant(input) case LocalDateTimeEncoder => createSerializerForLocalDateTime(input) + case LocalDateTimeNanosEncoder(p) => createSerializerForLocalDateTimeNanos(input, p) + case InstantNanosEncoder(p) => createSerializerForInstantNanos(input, p) case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled => throw org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError() case LocalTimeEncoder => createSerializerForLocalTime(input) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala index 0fce96c159979..f65203aed8c71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, PhysicalIntegerType, PhysicalLongType} import org.apache.spark.sql.catalyst.types.ops.TypeOps import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType} -import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal, GeometryVal, UTF8String, VariantVal} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType} +import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal, GeometryVal, TimestampNanosVal, UTF8String, VariantVal} /** * :: DeveloperApi :: @@ -107,6 +107,8 @@ object EncoderUtils { case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType] case _: YearMonthIntervalType => classOf[PhysicalIntegerType.InternalType] case _: TimeType => classOf[PhysicalLongType.InternalType] + case _: TimestampNTZNanosType => classOf[TimestampNanosVal] + case _: TimestampLTZNanosType => classOf[TimestampNanosVal] case _: StringType => classOf[UTF8String] case _: StructType => classOf[InternalRow] case _: ArrayType => classOf[ArrayData] @@ -126,6 +128,8 @@ object EncoderUtils { case BinaryType => classOf[Array[Byte]] case _: StringType => classOf[UTF8String] case CalendarIntervalType => classOf[CalendarInterval] + case _: TimestampNTZNanosType => classOf[TimestampNanosVal] + case _: TimestampLTZNanosType => classOf[TimestampNanosVal] case _: StructType => classOf[InternalRow] case _: ArrayType => classOf[ArrayData] case _: MapType => classOf[MapData] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index 222465d82c029..4c6d6cf6d674f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -250,6 +250,111 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { } } + test("SPARK-57033: converting java.time.LocalDateTime to TimestampNTZNanosType") { + Seq( + "0001-01-01T00:00:00", + "1582-10-02T01:02:03.04", + "1582-12-31T23:59:59.999999999", + "1970-01-01T00:00:00.000000001", + "1972-12-31T23:59:59.123456789", + "2019-02-26T16:56:00.123456789", + "9999-12-31T23:59:59.999999999").foreach { text => + val input = LocalDateTime.parse(text) + Seq(7, 8, 9).foreach { p => + val dt = TimestampNTZNanosType(p) + val result = CatalystTypeConverters.createToCatalystConverter(dt)(input) + val expected = DateTimeUtils.localDateTimeToTimestampNanos(input) + assert(result === expected) + } + } + } + + test("SPARK-57033: converting TimestampNTZNanosType to java.time.LocalDateTime") { + Seq( + "1582-10-02T01:02:03.04", + "1582-12-31T23:59:59.999999999", + "1970-01-01T00:00:00.000000001", + "1972-12-31T23:59:59.123456789", + "2019-02-26T16:56:00.123456789", + "9999-12-31T23:59:59.999999999").foreach { text => + val ldt = LocalDateTime.parse(text) + val v = DateTimeUtils.localDateTimeToTimestampNanos(ldt) + Seq(7, 8, 9).foreach { p => + val dt = TimestampNTZNanosType(p) + assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === ldt) + } + } + } + + test("SPARK-57033: converting java.time.Instant to TimestampLTZNanosType") { + Seq( + "0001-01-01T00:00:00Z", + "1582-10-02T01:02:03.04Z", + "1582-12-31T23:59:59.999999999Z", + "1970-01-01T00:00:00.000000001Z", + "1972-12-31T23:59:59.123456789Z", + "2019-02-26T16:56:00.123456789Z", + "9999-12-31T23:59:59.999999999Z").foreach { text => + val input = Instant.parse(text) + Seq(7, 8, 9).foreach { p => + val dt = TimestampLTZNanosType(p) + val result = CatalystTypeConverters.createToCatalystConverter(dt)(input) + val expected = DateTimeUtils.instantToTimestampNanos(input) + assert(result === expected) + } + } + } + + test("SPARK-57033: converting TimestampLTZNanosType to java.time.Instant") { + Seq( + "1582-10-02T01:02:03.04Z", + "1582-12-31T23:59:59.999999999Z", + "1970-01-01T00:00:00.000000001Z", + "1972-12-31T23:59:59.123456789Z", + "2019-02-26T16:56:00.123456789Z", + "9999-12-31T23:59:59.999999999Z").foreach { text => + val instant = Instant.parse(text) + val v = DateTimeUtils.instantToTimestampNanos(instant) + Seq(7, 8, 9).foreach { p => + val dt = TimestampLTZNanosType(p) + assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === instant) + } + } + } + + test("SPARK-57033: TimestampLTZNanosType -> Instant ignores java8 API flag") { + val instant = Instant.parse("2019-02-26T16:56:00.123456789Z") + val v = DateTimeUtils.instantToTimestampNanos(instant) + val dt = TimestampLTZNanosType() + Seq("true", "false").foreach { flag => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> flag) { + assert(CatalystTypeConverters.createToCatalystConverter(dt)(instant) === v) + assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === instant) + } + } + } + + test("SPARK-57033: TimestampNanosConverter null handling in rows") { + val schema = StructType( + StructField("ntz", TimestampNTZNanosType(9), nullable = true) :: + StructField("ltz", TimestampLTZNanosType(9), nullable = true) :: Nil) + val toCat = CatalystTypeConverters.createToCatalystConverter(schema) + val toScala = CatalystTypeConverters.createToScalaConverter(schema) + // Reference value to ensure non-null cells are kept as-is. + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + val instant = Instant.parse("2019-02-26T16:56:00.987654321Z") + val row = Row(ldt, instant) + val catalystRow = toCat(row).asInstanceOf[InternalRow] + assert(catalystRow.getTimestampNTZNanos(0) === + DateTimeUtils.localDateTimeToTimestampNanos(ldt)) + assert(catalystRow.getTimestampLTZNanos(1) === + DateTimeUtils.instantToTimestampNanos(instant)) + assert(toScala(catalystRow) === row) + // Null row. + val nullRow = Row.fromSeq(Seq(null, null)) + assert(toScala(toCat(nullRow)) === nullRow) + } + test("converting java.time.LocalDate to DateType") { Seq( "0101-02-16", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index 09247a459b9c5..ca54494c92bee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -381,6 +381,47 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { assert(readback.get(0) === localDateTime) } + test("SPARK-57033: encoding/decoding TimestampNTZNanosType to/from java.time.LocalDateTime") { + val localDateTime = java.time.LocalDateTime.parse("2019-02-26T16:56:00.123456789") + for (p <- TimestampNTZNanosType.MIN_PRECISION to TimestampNTZNanosType.MAX_PRECISION) { + val schema = new StructType().add("t", TimestampNTZNanosType(p)) + val encoder = ExpressionEncoder(schema).resolveAndBind() + val row = toRow(encoder, Row(localDateTime)) + assert(row.getTimestampNTZNanos(0) === + DateTimeUtils.localDateTimeToTimestampNanos(localDateTime)) + val readback = fromRow(encoder, row) + assert(readback.get(0) === localDateTime) + } + } + + test("SPARK-57033: encoding/decoding TimestampLTZNanosType to/from java.time.Instant") { + val instant = java.time.Instant.parse("2019-02-26T16:56:00.123456789Z") + for (p <- TimestampLTZNanosType.MIN_PRECISION to TimestampLTZNanosType.MAX_PRECISION) { + val schema = new StructType().add("t", TimestampLTZNanosType(p)) + val encoder = ExpressionEncoder(schema).resolveAndBind() + val row = toRow(encoder, Row(instant)) + assert(row.getTimestampLTZNanos(0) === + DateTimeUtils.instantToTimestampNanos(instant)) + val readback = fromRow(encoder, row) + assert(readback.get(0) === instant) + } + } + + test("SPARK-57033: encoding/decoding TimestampLTZNanosType ignores java8 API flag") { + val instant = java.time.Instant.parse("2019-02-26T16:56:00.123456789Z") + val schema = new StructType().add("t", TimestampLTZNanosType()) + Seq("true", "false").foreach { flag => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> flag) { + val encoder = ExpressionEncoder(schema).resolveAndBind() + val row = toRow(encoder, Row(instant)) + assert(row.getTimestampLTZNanos(0) === + DateTimeUtils.instantToTimestampNanos(instant)) + val readback = fromRow(encoder, row) + assert(readback.get(0) === instant) + } + } + } + test("encoding/decoding DateType to/from java.time.LocalDate") { withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") { val schema = new StructType().add("d", DateType) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 4810ec69bb96e..d8857d31fafd6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLConf import org.apache.spark.sql.internal.SqlApiConf import org.apache.spark.sql.types.DayTimeIntervalType.{HOUR, SECOND} import org.apache.spark.sql.types.Decimal -import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} +import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal, UTF8String} class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { @@ -1851,4 +1851,64 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { timeBucketYMInterval(1, Long.MinValue, 0L, utc) } } + + test("SPARK-57033: java.time.LocalDateTime <-> TimestampNanosVal roundtrip") { + val cases = Seq( + LocalDateTime.parse("0001-01-01T00:00:00"), + LocalDateTime.parse("1582-10-02T01:02:03.04"), + LocalDateTime.parse("1582-12-31T23:59:59.999999999"), + LocalDateTime.parse("1969-12-31T23:59:59.999999999"), + LocalDateTime.parse("1970-01-01T00:00:00"), + LocalDateTime.parse("1970-01-01T00:00:00.000000001"), + LocalDateTime.parse("1970-01-01T00:00:00.123456789"), + LocalDateTime.parse("2019-02-26T16:56:00.123456789"), + LocalDateTime.parse("9999-12-31T23:59:59.999999999")) + for (ldt <- cases) { + val v = localDateTimeToTimestampNanos(ldt) + assert(v.nanosWithinMicro >= 0 && v.nanosWithinMicro <= 999, + s"nanosWithinMicro out of range for $ldt: $v") + assert(v.nanosWithinMicro === (ldt.getNano % 1000).toShort) + assert(v.epochMicros === DateTimeUtils.localDateTimeToMicros(ldt)) + assert(timestampNanosToLocalDateTime(v) === ldt) + } + } + + test("SPARK-57033: java.time.Instant <-> TimestampNanosVal roundtrip") { + val cases = Seq( + Instant.EPOCH, + Instant.parse("0001-01-01T00:00:00Z"), + Instant.parse("1582-10-02T01:02:03.04Z"), + Instant.parse("1582-12-31T23:59:59.999999999Z"), + Instant.parse("1969-12-31T23:59:59.999999999Z"), + Instant.parse("1970-01-01T00:00:00.000000001Z"), + Instant.parse("2019-02-26T16:56:00.123456789Z"), + Instant.parse("9999-12-31T23:59:59.999999999Z")) + for (i <- cases) { + val v = instantToTimestampNanos(i) + assert(v.nanosWithinMicro >= 0 && v.nanosWithinMicro <= 999, + s"nanosWithinMicro out of range for $i: $v") + assert(v.nanosWithinMicro === (i.getNano % 1000).toShort) + assert(v.epochMicros === instantToMicros(i)) + assert(timestampNanosToInstant(v) === i) + } + } + + test("SPARK-57033: TimestampNanosVal random roundtrip") { + val rnd = new scala.util.Random(0) + // Random instants across the valid range with every possible sub-micro digit. + val min = Instant.parse("0001-01-01T00:00:00Z").getEpochSecond + val max = Instant.parse("9999-12-31T23:59:59.999999999Z").getEpochSecond + for (_ <- 0 until 200) { + val secs = min + math.abs(rnd.nextLong()) % (max - min + 1) + val nano = rnd.nextInt(1_000_000_000) + val i = Instant.ofEpochSecond(secs, nano.toLong) + val v = instantToTimestampNanos(i) + assert(timestampNanosToInstant(v) === i) + val ldt = LocalDateTime.ofInstant(i, ZoneOffset.UTC) + val v2 = localDateTimeToTimestampNanos(ldt) + assert(timestampNanosToLocalDateTime(v2) === ldt) + // Internal layout matches direct decomposition. + assert(v === TimestampNanosVal.fromParts(instantToMicros(i), (nano % 1000).toShort)) + } + } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 62d44e0af8b08..a8093137c4700 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -789,6 +789,29 @@ public void testLocalDateTimeEncoder() { Assertions.assertEquals(data, ds.collectAsList()); } + @Test + public void testTimestampNanosRowEncoder() { + spark.conf().set("spark.sql.timestampNanosTypes.enabled", "true"); + try { + final StructType schema = new StructType() + .add("ntz", org.apache.spark.sql.types.TimestampNTZNanosType.apply()) + .add("ltz", org.apache.spark.sql.types.TimestampLTZNanosType.apply()); + LocalDateTime ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789"); + Instant instant = Instant.parse("2019-02-26T16:56:00.987654321Z"); + List rows = Arrays.asList(create(ldt, instant), create(null, null)); + Dataset ds = spark.createDataset(rows, Encoders.row(schema)); + Assertions.assertEquals(schema, ds.schema()); + List collected = ds.collectAsList(); + Assertions.assertEquals(2, collected.size()); + Assertions.assertEquals(ldt, collected.get(0).get(0)); + Assertions.assertEquals(instant, collected.get(0).get(1)); + Assertions.assertTrue(collected.get(1).isNullAt(0)); + Assertions.assertTrue(collected.get(1).isNullAt(1)); + } finally { + spark.conf().unset("spark.sql.timestampNanosTypes.enabled"); + } + } + @Test public void testDurationEncoder() { Encoder encoder = Encoders.DURATION(); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 879569045b6d1..6ffdf2c93c5e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2631,6 +2631,32 @@ class DatasetSuite extends SharedSparkSession assert(Seq(localDateTime).toDS().head() === localDateTime) } + test("SPARK-57033: Dataset[Row] roundtrip preserves nanosecond precision") { + withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") { + val ldt = java.time.LocalDateTime.parse("2019-02-26T16:56:00.123456789") + val ldt2 = java.time.LocalDateTime.parse("9999-12-31T23:59:59.999999999") + val instant = java.time.Instant.parse("2019-02-26T16:56:00.987654321Z") + val instant2 = java.time.Instant.parse("9999-12-31T23:59:59.999999999Z") + val schema = StructType( + StructField("ntz", TimestampNTZNanosType(9), nullable = true) :: + StructField("ltz", TimestampLTZNanosType(9), nullable = true) :: Nil) + val rows = java.util.Arrays.asList( + Row(ldt, instant), + Row(ldt2, instant2), + Row(null, null)) + val df = spark.createDataFrame(rows, schema) + assert(df.schema === schema) + val collected = df.collect() + assert(collected.length === 3) + assert(collected(0).get(0) === ldt) + assert(collected(0).get(1) === instant) + assert(collected(1).get(0) === ldt2) + assert(collected(1).get(1) === instant2) + assert(collected(2).isNullAt(0)) + assert(collected(2).isNullAt(1)) + } + } + test("SPARK-34605: implicit encoder for java.time.Duration") { val duration = java.time.Duration.ofMinutes(10) assert(spark.range(1).map { _ => duration }.head() === duration) From 2d5788643e59750a014a745fffcf8c587e44ea22 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 10:49:13 +0200 Subject: [PATCH 02/13] Report column precision in nanos converter type-mismatch error The TimestampNTZNanosConverter / TimestampLTZNanosConverter were object singletons rendering `TimestampNTZNanosType().sql` / `TimestampLTZNanosType().sql` in their type-mismatch errors, which always read as `timestamp_ntz(9)` / `timestamp_ltz(9)` regardless of the column's declared precision. Thread the matched data type into the converter (mirroring DecimalConverter) so the error reflects the actual precision. Co-authored-by: Isaac --- .../spark/sql/catalyst/CatalystTypeConverters.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index c9f72c1f3e12b..32fefaf65378d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -88,8 +88,8 @@ object CatalystTypeConverters { case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => InstantConverter case TimestampType => TimestampConverter case TimestampNTZType => TimestampNTZConverter - case _: TimestampNTZNanosType => TimestampNTZNanosConverter - case _: TimestampLTZNanosType => TimestampLTZNanosConverter + case t: TimestampNTZNanosType => new TimestampNTZNanosConverter(t) + case t: TimestampLTZNanosType => new TimestampLTZNanosConverter(t) case dt: DecimalType => new DecimalConverter(dt) case BooleanType => BooleanConverter case ByteType => ByteConverter @@ -517,7 +517,7 @@ object CatalystTypeConverters { DateTimeUtils.microsToLocalDateTime(row.getLong(column)) } - private object TimestampNTZNanosConverter + private class TimestampNTZNanosConverter(dataType: TimestampNTZNanosType) extends CatalystTypeConverter[Any, LocalDateTime, TimestampNanosVal] { override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l) @@ -526,7 +526,7 @@ object CatalystTypeConverters { messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, - "dataType" -> TimestampNTZNanosType().sql)) + "dataType" -> dataType.sql)) } override def toScala(catalystValue: TimestampNanosVal): LocalDateTime = @@ -541,7 +541,7 @@ object CatalystTypeConverters { // the mapping does not consult `spark.sql.datetime.java8API.enabled`: the nanos LTZ type is // post-Java-8 and the legacy `java.sql.Timestamp` external type is intentionally out of scope // here. See SPARK-57033. - private object TimestampLTZNanosConverter + private class TimestampLTZNanosConverter(dataType: TimestampLTZNanosType) extends CatalystTypeConverter[Any, Instant, TimestampNanosVal] { override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { case i: Instant => DateTimeUtils.instantToTimestampNanos(i) @@ -550,7 +550,7 @@ object CatalystTypeConverters { messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, - "dataType" -> TimestampLTZNanosType().sql)) + "dataType" -> dataType.sql)) } override def toScala(catalystValue: TimestampNanosVal): Instant = From 9176040fbe5a436263065d3ce35a2ffc802a31bc Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 10:58:43 +0200 Subject: [PATCH 03/13] Rename _LEGACY_ERROR_TEMP_3219 to INVALID_EXTERNAL_VALUE Promote the converter type-mismatch error to a proper named class. Pairs with the existing INVALID_EXTERNAL_TYPE (used by the ValidateExternalType codegen path) and reuses its sqlState 42K0N. Updates the 10 call sites in CatalystTypeConverters and the 5 test references in CatalystTypeConvertersSuite. Co-authored-by: Isaac --- .../resources/error/error-conditions.json | 11 +++++----- .../sql/catalyst/CatalystTypeConverters.scala | 20 +++++++++---------- .../CatalystTypeConvertersSuite.scala | 10 +++++----- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 71dbf1a1ebce3..6757b89358729 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3729,6 +3729,12 @@ ], "sqlState" : "42K0N" }, + "INVALID_EXTERNAL_VALUE" : { + "message" : [ + "The value () of the type () cannot be converted to the type." + ], + "sqlState" : "42K0N" + }, "INVALID_EXTRACT_BASE_FIELD_TYPE" : { "message" : [ "Can't extract a value from . Need a complex type [STRUCT, ARRAY, MAP] but got ." @@ -11487,11 +11493,6 @@ "Must be 2 children: " ] }, - "_LEGACY_ERROR_TEMP_3219" : { - "message" : [ - "The value () of the type () cannot be converted to the type." - ] - }, "_LEGACY_ERROR_TEMP_3220" : { "message" : [ "The value () of the type () cannot be converted to an array of " diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 32fefaf65378d..ad3fe677bcbf3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -300,7 +300,7 @@ object CatalystTypeConverters { } new GenericInternalRow(ar) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -359,7 +359,7 @@ object CatalystTypeConverters { case chr: Char => UTF8String.fromString(chr.toString) case ac: Array[Char] => UTF8String.fromString(String.valueOf(ac)) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -385,7 +385,7 @@ object CatalystTypeConverters { case g: org.apache.spark.sql.types.Geometry if SQLConf.get.geospatialEnabled => STUtils.serializeGeomFromWKB(g, dataType) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -410,7 +410,7 @@ object CatalystTypeConverters { case g: org.apache.spark.sql.types.Geography if SQLConf.get.geospatialEnabled => STUtils.serializeGeogFromWKB(g, dataType) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -434,7 +434,7 @@ object CatalystTypeConverters { case d: Date => DateTimeUtils.fromJavaDate(d) case l: LocalDate => DateTimeUtils.localDateToDays(l) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -474,7 +474,7 @@ object CatalystTypeConverters { case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) case i: Instant => DateTimeUtils.instantToMicros(i) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -502,7 +502,7 @@ object CatalystTypeConverters { override def toCatalystImpl(scalaValue: Any): Any = scalaValue match { case l: LocalDateTime => DateTimeUtils.localDateTimeToMicros(l) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -522,7 +522,7 @@ object CatalystTypeConverters { override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -546,7 +546,7 @@ object CatalystTypeConverters { override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { case i: Instant => DateTimeUtils.instantToTimestampNanos(i) case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, @@ -573,7 +573,7 @@ object CatalystTypeConverters { case d: JavaBigInteger => Decimal(d) case d: Decimal => d case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3219", + errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( "other" -> other.toString, "otherClass" -> other.getClass.getCanonicalName, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index 4c6d6cf6d674f..fbfa4ec643892 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -109,7 +109,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { exception = intercept[SparkIllegalArgumentException] { CatalystTypeConverters.createToCatalystConverter(structType)("test") }, - condition = "_LEGACY_ERROR_TEMP_3219", + condition = "INVALID_EXTERNAL_VALUE", parameters = Map( "other" -> "test", "otherClass" -> "java.lang.String", @@ -149,7 +149,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { exception = intercept[SparkIllegalArgumentException] { CatalystTypeConverters.createToCatalystConverter(decimalType)("test") }, - condition = "_LEGACY_ERROR_TEMP_3219", + condition = "INVALID_EXTERNAL_VALUE", parameters = Map( "other" -> "test", "otherClass" -> "java.lang.String", @@ -169,7 +169,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { exception = intercept[SparkIllegalArgumentException] { CatalystTypeConverters.createToCatalystConverter(StringType)(0.1) }, - condition = "_LEGACY_ERROR_TEMP_3219", + condition = "INVALID_EXTERNAL_VALUE", parameters = Map( "other" -> "0.1", "otherClass" -> "java.lang.Double", @@ -684,7 +684,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { exception = intercept[SparkIllegalArgumentException] { CatalystTypeConverters.createToCatalystConverter(gt)("test") }, - condition = "_LEGACY_ERROR_TEMP_3219", + condition = "INVALID_EXTERNAL_VALUE", parameters = Map( "other" -> "test", "otherClass" -> "java.lang.String", @@ -697,7 +697,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { exception = intercept[SparkIllegalArgumentException] { CatalystTypeConverters.createToCatalystConverter(gt)("test") }, - condition = "_LEGACY_ERROR_TEMP_3219", + condition = "INVALID_EXTERNAL_VALUE", parameters = Map( "other" -> "test", "otherClass" -> "java.lang.String", From 359e42c2ed4cd73a47c725924bd5ab2870293c21 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 11:42:38 +0200 Subject: [PATCH 04/13] Truncate sub-micro nanos to column precision at converter boundary localDateTimeToTimestampNanos / instantToTimestampNanos previously stored all three sub-micro digits regardless of the target precision, so a value entering Spark through the encoder/converter path could read back differently than the same value cast via SQL CAST(... AS TIMESTAMP_NTZ(p)) / TIMESTAMP_LTZ(p). Take the target precision as a parameter and floor-truncate the sub-micro nanos to match SQL CAST semantics: p=9 lossless, p=8 drops 1 digit, p=7 drops 2 digits. Thread the precision through CatalystTypeConverters (via the dataType already captured by the per-column converter) and SerializerBuildHelper (via a Literal added to the StaticInvoke args). Co-authored-by: Isaac --- .../catalyst/util/SparkDateTimeUtils.scala | 55 +++++++++++++------ .../sql/catalyst/CatalystTypeConverters.scala | 4 +- .../sql/catalyst/SerializerBuildHelper.scala | 4 +- .../CatalystTypeConvertersSuite.scala | 40 +++++++++++--- .../catalyst/encoders/RowEncoderSuite.scala | 14 ++--- .../catalyst/util/DateTimeUtilsSuite.scala | 32 +++++++++-- 6 files changed, 110 insertions(+), 39 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala index ee45c44ab830d..c391cb83ed721 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala @@ -208,17 +208,37 @@ trait SparkDateTimeUtils { instantToMicros(localDateTime.toInstant(ZoneOffset.UTC)) } + /** + * Truncates the sub-microsecond nanosecond part to the given timestamp precision `p` in [7, 9]. + * Precision 9 keeps all three digits, 8 zeros the last digit, 7 zeros the last two; precisions + * outside that range are passed through unchanged because the surrounding types already + * validate the bound. + */ + private def truncateNanosWithinMicroToPrecision(nanosWithinMicro: Int, precision: Int): Int = { + precision match { + case 7 => (nanosWithinMicro / 100) * 100 + case 8 => (nanosWithinMicro / 10) * 10 + case _ => nanosWithinMicro + } + } + /** * Converts a `java.time.LocalDateTime` into the composite `(epochMicros, nanosWithinMicro)` - * pair used by `TimestampNTZNanosType`. The sub-microsecond part of `localDateTime.getNano` - * (its last three decimal digits, in `[0, 999]`) is preserved as `nanosWithinMicro`; the rest - * of the local date-time becomes `epochMicros` via [[localDateTimeToMicros]] (i.e. interpreted - * at UTC). The conversion is lossless within the valid range of `TimestampNTZNanosType`. - */ - def localDateTimeToTimestampNanos(localDateTime: LocalDateTime): TimestampNanosVal = { + * pair used by `TimestampNTZNanosType(precision)`. The sub-microsecond part of + * `localDateTime.getNano` (its last three decimal digits, in `[0, 999]`) is truncated to the + * target precision (floor toward zero of the fractional second) and stored as + * `nanosWithinMicro`; the rest of the local date-time becomes `epochMicros` via + * [[localDateTimeToMicros]] (i.e. interpreted at UTC). At `precision = 9` the conversion is + * lossless within the valid range; at 7 / 8 the lower 2 / 1 sub-micro digits are dropped to + * match `CAST(... AS TIMESTAMP_NTZ(p))` semantics. + */ + def localDateTimeToTimestampNanos( + localDateTime: LocalDateTime, + precision: Int): TimestampNanosVal = { val epochMicros = localDateTimeToMicros(localDateTime) - val nanosWithinMicro = (localDateTime.getNano % NANOS_PER_MICROS).toShort - TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro) + val rawNanosWithinMicro = localDateTime.getNano % NANOS_PER_MICROS.toInt + val nanosWithinMicro = truncateNanosWithinMicroToPrecision(rawNanosWithinMicro, precision) + TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro.toShort) } /** @@ -232,15 +252,18 @@ trait SparkDateTimeUtils { /** * Converts a `java.time.Instant` into the composite `(epochMicros, nanosWithinMicro)` pair used - * by `TimestampLTZNanosType`. `epochMicros` is computed via [[instantToMicros]] (floor division - * of `instant.getNano` to micros), and the last three sub-micro nanosecond digits are kept as - * `nanosWithinMicro` in `[0, 999]`. The conversion is lossless within the valid range of - * `TimestampLTZNanosType`. - */ - def instantToTimestampNanos(instant: Instant): TimestampNanosVal = { + * by `TimestampLTZNanosType(precision)`. `epochMicros` is computed via [[instantToMicros]] + * (floor division of `instant.getNano` to micros); the last three sub-micro nanosecond digits + * are truncated to the target precision (floor toward zero of the fractional second) and kept + * as `nanosWithinMicro` in `[0, 999]`. At `precision = 9` the conversion is lossless within the + * valid range; at 7 / 8 the lower 2 / 1 sub-micro digits are dropped to match + * `CAST(... AS TIMESTAMP_LTZ(p))` semantics. + */ + def instantToTimestampNanos(instant: Instant, precision: Int): TimestampNanosVal = { val epochMicros = instantToMicros(instant) - val nanosWithinMicro = (instant.getNano % NANOS_PER_MICROS).toShort - TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro) + val rawNanosWithinMicro = instant.getNano % NANOS_PER_MICROS.toInt + val nanosWithinMicro = truncateNanosWithinMicroToPrecision(rawNanosWithinMicro, precision) + TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro.toShort) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index ad3fe677bcbf3..1598eb065a5fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -520,7 +520,7 @@ object CatalystTypeConverters { private class TimestampNTZNanosConverter(dataType: TimestampNTZNanosType) extends CatalystTypeConverter[Any, LocalDateTime, TimestampNanosVal] { override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { - case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l) + case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l, dataType.precision) case other => throw new SparkIllegalArgumentException( errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( @@ -544,7 +544,7 @@ object CatalystTypeConverters { private class TimestampLTZNanosConverter(dataType: TimestampLTZNanosType) extends CatalystTypeConverter[Any, Instant, TimestampNanosVal] { override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = scalaValue match { - case i: Instant => DateTimeUtils.instantToTimestampNanos(i) + case i: Instant => DateTimeUtils.instantToTimestampNanos(i, dataType.precision) case other => throw new SparkIllegalArgumentException( errorClass = "INVALID_EXTERNAL_VALUE", messageParameters = scala.collection.immutable.Map( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala index 490ed632d0243..72337a4b21854 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala @@ -173,7 +173,7 @@ object SerializerBuildHelper { DateTimeUtils.getClass, TimestampNTZNanosType(precision), "localDateTimeToTimestampNanos", - inputObject :: Nil, + inputObject :: Literal(precision) :: Nil, returnNullable = false) } @@ -184,7 +184,7 @@ object SerializerBuildHelper { DateTimeUtils.getClass, TimestampLTZNanosType(precision), "instantToTimestampNanos", - inputObject :: Nil, + inputObject :: Literal(precision) :: Nil, returnNullable = false) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index fbfa4ec643892..6b5154afc9884 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DayTimeIntervalType._ import org.apache.spark.sql.types.YearMonthIntervalType._ -import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, UTF8String} +import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, TimestampNanosVal, UTF8String} class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { @@ -263,7 +263,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { Seq(7, 8, 9).foreach { p => val dt = TimestampNTZNanosType(p) val result = CatalystTypeConverters.createToCatalystConverter(dt)(input) - val expected = DateTimeUtils.localDateTimeToTimestampNanos(input) + val expected = DateTimeUtils.localDateTimeToTimestampNanos(input, p) assert(result === expected) } } @@ -278,7 +278,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { "2019-02-26T16:56:00.123456789", "9999-12-31T23:59:59.999999999").foreach { text => val ldt = LocalDateTime.parse(text) - val v = DateTimeUtils.localDateTimeToTimestampNanos(ldt) + val v = DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision = 9) Seq(7, 8, 9).foreach { p => val dt = TimestampNTZNanosType(p) assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === ldt) @@ -299,7 +299,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { Seq(7, 8, 9).foreach { p => val dt = TimestampLTZNanosType(p) val result = CatalystTypeConverters.createToCatalystConverter(dt)(input) - val expected = DateTimeUtils.instantToTimestampNanos(input) + val expected = DateTimeUtils.instantToTimestampNanos(input, p) assert(result === expected) } } @@ -314,7 +314,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { "2019-02-26T16:56:00.123456789Z", "9999-12-31T23:59:59.999999999Z").foreach { text => val instant = Instant.parse(text) - val v = DateTimeUtils.instantToTimestampNanos(instant) + val v = DateTimeUtils.instantToTimestampNanos(instant, precision = 9) Seq(7, 8, 9).foreach { p => val dt = TimestampLTZNanosType(p) assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === instant) @@ -324,7 +324,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { test("SPARK-57033: TimestampLTZNanosType -> Instant ignores java8 API flag") { val instant = Instant.parse("2019-02-26T16:56:00.123456789Z") - val v = DateTimeUtils.instantToTimestampNanos(instant) + val v = DateTimeUtils.instantToTimestampNanos(instant, precision = 9) val dt = TimestampLTZNanosType() Seq("true", "false").foreach { flag => withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> flag) { @@ -346,15 +346,39 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { val row = Row(ldt, instant) val catalystRow = toCat(row).asInstanceOf[InternalRow] assert(catalystRow.getTimestampNTZNanos(0) === - DateTimeUtils.localDateTimeToTimestampNanos(ldt)) + DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision = 9)) assert(catalystRow.getTimestampLTZNanos(1) === - DateTimeUtils.instantToTimestampNanos(instant)) + DateTimeUtils.instantToTimestampNanos(instant, precision = 9)) assert(toScala(catalystRow) === row) // Null row. val nullRow = Row.fromSeq(Seq(null, null)) assert(toScala(toCat(nullRow)) === nullRow) } + test("SPARK-57033: TimestampNTZNanosType converter truncates sub-micro to precision") { + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + val cases = Map(7 -> 700, 8 -> 780, 9 -> 789) + cases.foreach { case (p, expectedNanosWithinMicro) => + val dt = TimestampNTZNanosType(p) + val v = CatalystTypeConverters.createToCatalystConverter(dt)(ldt) + .asInstanceOf[TimestampNanosVal] + assert(v.nanosWithinMicro === expectedNanosWithinMicro, + s"precision=$p: expected $expectedNanosWithinMicro, got ${v.nanosWithinMicro}") + } + } + + test("SPARK-57033: TimestampLTZNanosType converter truncates sub-micro to precision") { + val instant = Instant.parse("2019-02-26T16:56:00.123456789Z") + val cases = Map(7 -> 700, 8 -> 780, 9 -> 789) + cases.foreach { case (p, expectedNanosWithinMicro) => + val dt = TimestampLTZNanosType(p) + val v = CatalystTypeConverters.createToCatalystConverter(dt)(instant) + .asInstanceOf[TimestampNanosVal] + assert(v.nanosWithinMicro === expectedNanosWithinMicro, + s"precision=$p: expected $expectedNanosWithinMicro, got ${v.nanosWithinMicro}") + } + } + test("converting java.time.LocalDate to DateType") { Seq( "0101-02-16", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index ca54494c92bee..98b77a0c0d99d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -387,10 +387,10 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { val schema = new StructType().add("t", TimestampNTZNanosType(p)) val encoder = ExpressionEncoder(schema).resolveAndBind() val row = toRow(encoder, Row(localDateTime)) - assert(row.getTimestampNTZNanos(0) === - DateTimeUtils.localDateTimeToTimestampNanos(localDateTime)) + val expected = DateTimeUtils.localDateTimeToTimestampNanos(localDateTime, p) + assert(row.getTimestampNTZNanos(0) === expected) val readback = fromRow(encoder, row) - assert(readback.get(0) === localDateTime) + assert(readback.get(0) === DateTimeUtils.timestampNanosToLocalDateTime(expected)) } } @@ -400,10 +400,10 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { val schema = new StructType().add("t", TimestampLTZNanosType(p)) val encoder = ExpressionEncoder(schema).resolveAndBind() val row = toRow(encoder, Row(instant)) - assert(row.getTimestampLTZNanos(0) === - DateTimeUtils.instantToTimestampNanos(instant)) + val expected = DateTimeUtils.instantToTimestampNanos(instant, p) + assert(row.getTimestampLTZNanos(0) === expected) val readback = fromRow(encoder, row) - assert(readback.get(0) === instant) + assert(readback.get(0) === DateTimeUtils.timestampNanosToInstant(expected)) } } @@ -415,7 +415,7 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { val encoder = ExpressionEncoder(schema).resolveAndBind() val row = toRow(encoder, Row(instant)) assert(row.getTimestampLTZNanos(0) === - DateTimeUtils.instantToTimestampNanos(instant)) + DateTimeUtils.instantToTimestampNanos(instant, precision = 9)) val readback = fromRow(encoder, row) assert(readback.get(0) === instant) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index d8857d31fafd6..8256b9caa7d17 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -1864,7 +1864,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { LocalDateTime.parse("2019-02-26T16:56:00.123456789"), LocalDateTime.parse("9999-12-31T23:59:59.999999999")) for (ldt <- cases) { - val v = localDateTimeToTimestampNanos(ldt) + val v = localDateTimeToTimestampNanos(ldt, precision = 9) assert(v.nanosWithinMicro >= 0 && v.nanosWithinMicro <= 999, s"nanosWithinMicro out of range for $ldt: $v") assert(v.nanosWithinMicro === (ldt.getNano % 1000).toShort) @@ -1884,7 +1884,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { Instant.parse("2019-02-26T16:56:00.123456789Z"), Instant.parse("9999-12-31T23:59:59.999999999Z")) for (i <- cases) { - val v = instantToTimestampNanos(i) + val v = instantToTimestampNanos(i, precision = 9) assert(v.nanosWithinMicro >= 0 && v.nanosWithinMicro <= 999, s"nanosWithinMicro out of range for $i: $v") assert(v.nanosWithinMicro === (i.getNano % 1000).toShort) @@ -1902,13 +1902,37 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { val secs = min + math.abs(rnd.nextLong()) % (max - min + 1) val nano = rnd.nextInt(1_000_000_000) val i = Instant.ofEpochSecond(secs, nano.toLong) - val v = instantToTimestampNanos(i) + val v = instantToTimestampNanos(i, precision = 9) assert(timestampNanosToInstant(v) === i) val ldt = LocalDateTime.ofInstant(i, ZoneOffset.UTC) - val v2 = localDateTimeToTimestampNanos(ldt) + val v2 = localDateTimeToTimestampNanos(ldt, precision = 9) assert(timestampNanosToLocalDateTime(v2) === ldt) // Internal layout matches direct decomposition. assert(v === TimestampNanosVal.fromParts(instantToMicros(i), (nano % 1000).toShort)) } } + + test("SPARK-57033: localDateTimeToTimestampNanos truncates sub-micro to precision") { + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + // precision 9 keeps all 3 sub-micro digits, 8 drops the last, 7 drops the last two. + assert(localDateTimeToTimestampNanos(ldt, precision = 9).nanosWithinMicro === 789) + assert(localDateTimeToTimestampNanos(ldt, precision = 8).nanosWithinMicro === 780) + assert(localDateTimeToTimestampNanos(ldt, precision = 7).nanosWithinMicro === 700) + // epochMicros is unaffected by sub-micro truncation. + val expectedMicros = DateTimeUtils.localDateTimeToMicros(ldt) + for (p <- 7 to 9) { + assert(localDateTimeToTimestampNanos(ldt, precision = p).epochMicros === expectedMicros) + } + } + + test("SPARK-57033: instantToTimestampNanos truncates sub-micro to precision") { + val i = Instant.parse("2019-02-26T16:56:00.123456789Z") + assert(instantToTimestampNanos(i, precision = 9).nanosWithinMicro === 789) + assert(instantToTimestampNanos(i, precision = 8).nanosWithinMicro === 780) + assert(instantToTimestampNanos(i, precision = 7).nanosWithinMicro === 700) + val expectedMicros = instantToMicros(i) + for (p <- 7 to 9) { + assert(instantToTimestampNanos(i, precision = p).epochMicros === expectedMicros) + } + } } From 2b580901c29b977c2101bddb503abece42272a35 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 13:03:33 +0200 Subject: [PATCH 05/13] Simplify nanos Dataset[Row] roundtrip test - Drop the withSQLConf(TIMESTAMP_NANOS_TYPES_ENABLED) wrapper: TIMESTAMP_NANOS_TYPES_ENABLED defaults to true under Utils.isTesting. - Import java.time.{Instant, LocalDateTime} and inline the values directly into the input Seq. - Build the schema with StructType().add().add() instead of StructField :: lists. - Use Seq(...).asJava for the input list and checkAnswer for the collected-rows assertions. Co-authored-by: Isaac --- .../org/apache/spark/sql/DatasetSuite.scala | 38 ++++++++----------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 6ffdf2c93c5e3..adcca27de4730 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} +import java.time.{Instant, LocalDateTime} import scala.collection.immutable.HashSet import scala.collection.mutable @@ -2632,29 +2633,20 @@ class DatasetSuite extends SharedSparkSession } test("SPARK-57033: Dataset[Row] roundtrip preserves nanosecond precision") { - withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") { - val ldt = java.time.LocalDateTime.parse("2019-02-26T16:56:00.123456789") - val ldt2 = java.time.LocalDateTime.parse("9999-12-31T23:59:59.999999999") - val instant = java.time.Instant.parse("2019-02-26T16:56:00.987654321Z") - val instant2 = java.time.Instant.parse("9999-12-31T23:59:59.999999999Z") - val schema = StructType( - StructField("ntz", TimestampNTZNanosType(9), nullable = true) :: - StructField("ltz", TimestampLTZNanosType(9), nullable = true) :: Nil) - val rows = java.util.Arrays.asList( - Row(ldt, instant), - Row(ldt2, instant2), - Row(null, null)) - val df = spark.createDataFrame(rows, schema) - assert(df.schema === schema) - val collected = df.collect() - assert(collected.length === 3) - assert(collected(0).get(0) === ldt) - assert(collected(0).get(1) === instant) - assert(collected(1).get(0) === ldt2) - assert(collected(1).get(1) === instant2) - assert(collected(2).isNullAt(0)) - assert(collected(2).isNullAt(1)) - } + val schema = new StructType() + .add("ntz", TimestampNTZNanosType(9), nullable = true) + .add("ltz", TimestampLTZNanosType(9), nullable = true) + val rows = Seq( + Row( + LocalDateTime.parse("2019-02-26T16:56:00.123456789"), + Instant.parse("2019-02-26T16:56:00.987654321Z")), + Row( + LocalDateTime.parse("9999-12-31T23:59:59.999999999"), + Instant.parse("9999-12-31T23:59:59.999999999Z")), + Row(null, null)) + val df = spark.createDataFrame(rows.asJava, schema) + assert(df.schema === schema) + checkAnswer(df, rows) } test("SPARK-34605: implicit encoder for java.time.Duration") { From fb6e3db0d880e2b402f7704834774dd936f55f85 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 13:28:26 +0200 Subject: [PATCH 06/13] Add negative-epoch precision coverage for nanos timestamp converters. This extends SPARK-57033 tests to validate sub-micro truncation and roundtrip behavior for pre-epoch LocalDateTime and Instant values across precisions 7, 8, and 9. --- .../CatalystTypeConvertersSuite.scala | 30 ++++++++----- .../catalyst/encoders/RowEncoderSuite.scala | 44 +++++++++++-------- .../catalyst/util/DateTimeUtilsSuite.scala | 20 +++++++++ 3 files changed, 64 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index 6b5154afc9884..6b74a1db36e58 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -357,25 +357,31 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { test("SPARK-57033: TimestampNTZNanosType converter truncates sub-micro to precision") { val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789") val cases = Map(7 -> 700, 8 -> 780, 9 -> 789) - cases.foreach { case (p, expectedNanosWithinMicro) => - val dt = TimestampNTZNanosType(p) - val v = CatalystTypeConverters.createToCatalystConverter(dt)(ldt) - .asInstanceOf[TimestampNanosVal] - assert(v.nanosWithinMicro === expectedNanosWithinMicro, - s"precision=$p: expected $expectedNanosWithinMicro, got ${v.nanosWithinMicro}") + Seq(ldt, negativeEpochLdt).foreach { input => + cases.foreach { case (p, expectedNanosWithinMicro) => + val dt = TimestampNTZNanosType(p) + val v = CatalystTypeConverters.createToCatalystConverter(dt)(input) + .asInstanceOf[TimestampNanosVal] + assert(v.nanosWithinMicro === expectedNanosWithinMicro, + s"input=$input, precision=$p: expected $expectedNanosWithinMicro, got ${v.nanosWithinMicro}") + } } } test("SPARK-57033: TimestampLTZNanosType converter truncates sub-micro to precision") { val instant = Instant.parse("2019-02-26T16:56:00.123456789Z") + val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z") val cases = Map(7 -> 700, 8 -> 780, 9 -> 789) - cases.foreach { case (p, expectedNanosWithinMicro) => - val dt = TimestampLTZNanosType(p) - val v = CatalystTypeConverters.createToCatalystConverter(dt)(instant) - .asInstanceOf[TimestampNanosVal] - assert(v.nanosWithinMicro === expectedNanosWithinMicro, - s"precision=$p: expected $expectedNanosWithinMicro, got ${v.nanosWithinMicro}") + Seq(instant, negativeEpochInstant).foreach { input => + cases.foreach { case (p, expectedNanosWithinMicro) => + val dt = TimestampLTZNanosType(p) + val v = CatalystTypeConverters.createToCatalystConverter(dt)(input) + .asInstanceOf[TimestampNanosVal] + assert(v.nanosWithinMicro === expectedNanosWithinMicro, + s"input=$input, precision=$p: expected $expectedNanosWithinMicro, got ${v.nanosWithinMicro}") + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index 98b77a0c0d99d..66dba915b5d15 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -382,28 +382,36 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { } test("SPARK-57033: encoding/decoding TimestampNTZNanosType to/from java.time.LocalDateTime") { - val localDateTime = java.time.LocalDateTime.parse("2019-02-26T16:56:00.123456789") - for (p <- TimestampNTZNanosType.MIN_PRECISION to TimestampNTZNanosType.MAX_PRECISION) { - val schema = new StructType().add("t", TimestampNTZNanosType(p)) - val encoder = ExpressionEncoder(schema).resolveAndBind() - val row = toRow(encoder, Row(localDateTime)) - val expected = DateTimeUtils.localDateTimeToTimestampNanos(localDateTime, p) - assert(row.getTimestampNTZNanos(0) === expected) - val readback = fromRow(encoder, row) - assert(readback.get(0) === DateTimeUtils.timestampNanosToLocalDateTime(expected)) + val inputs = Seq( + java.time.LocalDateTime.parse("2019-02-26T16:56:00.123456789"), + java.time.LocalDateTime.parse("1969-12-31T23:59:59.123456789")) + for (localDateTime <- inputs) { + for (p <- TimestampNTZNanosType.MIN_PRECISION to TimestampNTZNanosType.MAX_PRECISION) { + val schema = new StructType().add("t", TimestampNTZNanosType(p)) + val encoder = ExpressionEncoder(schema).resolveAndBind() + val row = toRow(encoder, Row(localDateTime)) + val expected = DateTimeUtils.localDateTimeToTimestampNanos(localDateTime, p) + assert(row.getTimestampNTZNanos(0) === expected) + val readback = fromRow(encoder, row) + assert(readback.get(0) === DateTimeUtils.timestampNanosToLocalDateTime(expected)) + } } } test("SPARK-57033: encoding/decoding TimestampLTZNanosType to/from java.time.Instant") { - val instant = java.time.Instant.parse("2019-02-26T16:56:00.123456789Z") - for (p <- TimestampLTZNanosType.MIN_PRECISION to TimestampLTZNanosType.MAX_PRECISION) { - val schema = new StructType().add("t", TimestampLTZNanosType(p)) - val encoder = ExpressionEncoder(schema).resolveAndBind() - val row = toRow(encoder, Row(instant)) - val expected = DateTimeUtils.instantToTimestampNanos(instant, p) - assert(row.getTimestampLTZNanos(0) === expected) - val readback = fromRow(encoder, row) - assert(readback.get(0) === DateTimeUtils.timestampNanosToInstant(expected)) + val inputs = Seq( + java.time.Instant.parse("2019-02-26T16:56:00.123456789Z"), + java.time.Instant.parse("1969-12-31T23:59:59.123456789Z")) + for (instant <- inputs) { + for (p <- TimestampLTZNanosType.MIN_PRECISION to TimestampLTZNanosType.MAX_PRECISION) { + val schema = new StructType().add("t", TimestampLTZNanosType(p)) + val encoder = ExpressionEncoder(schema).resolveAndBind() + val row = toRow(encoder, Row(instant)) + val expected = DateTimeUtils.instantToTimestampNanos(instant, p) + assert(row.getTimestampLTZNanos(0) === expected) + val readback = fromRow(encoder, row) + assert(readback.get(0) === DateTimeUtils.timestampNanosToInstant(expected)) + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 8256b9caa7d17..6d45f85d5f734 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -1923,6 +1923,16 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { for (p <- 7 to 9) { assert(localDateTimeToTimestampNanos(ldt, precision = p).epochMicros === expectedMicros) } + + val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789") + assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = 9).nanosWithinMicro === 789) + assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = 8).nanosWithinMicro === 780) + assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = 7).nanosWithinMicro === 700) + val negativeExpectedMicros = DateTimeUtils.localDateTimeToMicros(negativeEpochLdt) + for (p <- 7 to 9) { + assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = p).epochMicros === + negativeExpectedMicros) + } } test("SPARK-57033: instantToTimestampNanos truncates sub-micro to precision") { @@ -1934,5 +1944,15 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { for (p <- 7 to 9) { assert(instantToTimestampNanos(i, precision = p).epochMicros === expectedMicros) } + + val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z") + assert(instantToTimestampNanos(negativeEpochInstant, precision = 9).nanosWithinMicro === 789) + assert(instantToTimestampNanos(negativeEpochInstant, precision = 8).nanosWithinMicro === 780) + assert(instantToTimestampNanos(negativeEpochInstant, precision = 7).nanosWithinMicro === 700) + val negativeExpectedMicros = instantToMicros(negativeEpochInstant) + for (p <- 7 to 9) { + assert(instantToTimestampNanos(negativeEpochInstant, precision = p).epochMicros === + negativeExpectedMicros) + } } } From 9b826cd60463ffb9582852834a7332c9c16c3707 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 13:37:24 +0200 Subject: [PATCH 07/13] Fix scalastyle line length in nanos converter tests. Split long assertion messages in CatalystTypeConvertersSuite to satisfy the 100-character line limit without changing test behavior. --- .../spark/sql/catalyst/CatalystTypeConvertersSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index 6b74a1db36e58..c14c7319c7a90 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -365,7 +365,8 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { val v = CatalystTypeConverters.createToCatalystConverter(dt)(input) .asInstanceOf[TimestampNanosVal] assert(v.nanosWithinMicro === expectedNanosWithinMicro, - s"input=$input, precision=$p: expected $expectedNanosWithinMicro, got ${v.nanosWithinMicro}") + s"input=$input, precision=$p: expected $expectedNanosWithinMicro, " + + s"got ${v.nanosWithinMicro}") } } } @@ -380,7 +381,8 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { val v = CatalystTypeConverters.createToCatalystConverter(dt)(input) .asInstanceOf[TimestampNanosVal] assert(v.nanosWithinMicro === expectedNanosWithinMicro, - s"input=$input, precision=$p: expected $expectedNanosWithinMicro, got ${v.nanosWithinMicro}") + s"input=$input, precision=$p: expected $expectedNanosWithinMicro, " + + s"got ${v.nanosWithinMicro}") } } } From 2cc5f856010e71ea0b64d746e8da1a78a2061069 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 14:06:42 +0200 Subject: [PATCH 08/13] Document external mapping and precision truncation for nanos timestamp types. Clarify in the SQL data types reference that schema-driven Dataset/DataFrame conversion maps TimestampNTZNanosType -> java.time.LocalDateTime and TimestampLTZNanosType -> java.time.Instant, and that values with more sub-micro digits than declared by precision are floor-truncated. Expand the truncateNanosWithinMicroToPrecision comment in SparkDateTimeUtils to spell out p=7/8/9 behavior and to note that truncation is independent of epoch sign. --- docs/sql-ref-datatypes.md | 2 +- .../spark/sql/catalyst/util/SparkDateTimeUtils.scala | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/sql-ref-datatypes.md b/docs/sql-ref-datatypes.md index fe1b8724d8d61..e0d36631dec0e 100644 --- a/docs/sql-ref-datatypes.md +++ b/docs/sql-ref-datatypes.md @@ -54,7 +54,7 @@ Spark SQL and DataFrames support the following data types: - `TimestampNTZType`: Timestamp without time zone(TIMESTAMP_NTZ). It represents values comprising values of fields year, month, day, hour, minute, and second. All operations are performed without taking any time zone into account. - Note: TIMESTAMP in Spark is a user-specified alias associated with one of the TIMESTAMP_LTZ and TIMESTAMP_NTZ variations. Users can set the default timestamp type as `TIMESTAMP_LTZ`(default value) or `TIMESTAMP_NTZ` via the configuration `spark.sql.timestampType`. - - `TimestampNTZNanosType(precision)` / `TimestampLTZNanosType(precision)`: Preview nanosecond-capable variants of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` with fractional seconds precision `precision` in `[7, 9]`. Unparameterized `TIMESTAMP`, `TIMESTAMP_NTZ`, and `TIMESTAMP_LTZ` remain microsecond types. Enable the preview feature with `SET spark.sql.timestampNanosTypes.enabled=true;` before using these types in schemas or SQL. + - `TimestampNTZNanosType(precision)` / `TimestampLTZNanosType(precision)`: Preview nanosecond-capable variants of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` with fractional seconds precision `precision` in `[7, 9]`. Unparameterized `TIMESTAMP`, `TIMESTAMP_NTZ`, and `TIMESTAMP_LTZ` remain microsecond types. In schema-driven Dataset/DataFrame conversion, Spark maps `TimestampNTZNanosType` to `java.time.LocalDateTime` and `TimestampLTZNanosType` to `java.time.Instant`; values with more sub-micro digits than declared by `precision` are floor-truncated to that precision. Enable the preview feature with `SET spark.sql.timestampNanosTypes.enabled=true;` before using these types in schemas or SQL. * Interval types - `YearMonthIntervalType(startField, endField)`: Represents a year-month interval which is made up of a contiguous subset of the following fields: diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala index c391cb83ed721..6721ff4e40605 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala @@ -210,9 +210,13 @@ trait SparkDateTimeUtils { /** * Truncates the sub-microsecond nanosecond part to the given timestamp precision `p` in [7, 9]. - * Precision 9 keeps all three digits, 8 zeros the last digit, 7 zeros the last two; precisions - * outside that range are passed through unchanged because the surrounding types already - * validate the bound. + * Precision 9 keeps all three digits, 8 zeros the last digit, 7 zeros the last two. + * + * The input is the already-extracted `nanosWithinMicro` component (`0..999`), so truncation is + * independent of the epoch sign of the original timestamp value. + * + * Precisions outside `[7, 9]` are passed through unchanged because the surrounding timestamp + * nanos types validate the bound. */ private def truncateNanosWithinMicroToPrecision(nanosWithinMicro: Int, precision: Int): Int = { precision match { From 364835ada718e45d04b6288203fdf56b76388dfc Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 14:46:43 +0200 Subject: [PATCH 09/13] Tighten nanos timestamp converter docs and tests Polish the floor-truncation wording in localDateTimeToTimestampNanos / instantToTimestampNanos and add follow-up review coverage: INVALID_EXTERNAL_VALUE checkError tests for both nano converters, nested Array/Map/Struct tests with nano element types, and a precision 7 / 8 Dataset[Row] roundtrip test exercising sub-micro truncation on both positive- and negative-epoch inputs. --- .../catalyst/util/SparkDateTimeUtils.scala | 33 +++--- .../CatalystTypeConvertersSuite.scala | 101 ++++++++++++++++++ .../org/apache/spark/sql/DatasetSuite.scala | 26 +++++ 3 files changed, 147 insertions(+), 13 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala index 6721ff4e40605..78a711bc8feb9 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala @@ -228,13 +228,16 @@ trait SparkDateTimeUtils { /** * Converts a `java.time.LocalDateTime` into the composite `(epochMicros, nanosWithinMicro)` - * pair used by `TimestampNTZNanosType(precision)`. The sub-microsecond part of - * `localDateTime.getNano` (its last three decimal digits, in `[0, 999]`) is truncated to the - * target precision (floor toward zero of the fractional second) and stored as - * `nanosWithinMicro`; the rest of the local date-time becomes `epochMicros` via - * [[localDateTimeToMicros]] (i.e. interpreted at UTC). At `precision = 9` the conversion is - * lossless within the valid range; at 7 / 8 the lower 2 / 1 sub-micro digits are dropped to - * match `CAST(... AS TIMESTAMP_NTZ(p))` semantics. + * pair used by `TimestampNTZNanosType(precision)` (interpreted at UTC). `epochMicros` comes + * from [[localDateTimeToMicros]] (which is floor toward `-inf` for the integral micro part); + * the last three decimal digits of `localDateTime.getNano` (`[0, 999]`) become + * `nanosWithinMicro` after dropping `(9 - precision)` low digits. + * + * Combined, the result is the floor toward `-inf` of the original nanosecond value rounded + * down to the precision step (10^(9 - precision) ns). At `precision = 9` the conversion is + * lossless within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are dropped. + * The same flooring will be the basis of the future `CAST(... AS TIMESTAMP_NTZ(precision))` + * rule. */ def localDateTimeToTimestampNanos( localDateTime: LocalDateTime, @@ -256,12 +259,16 @@ trait SparkDateTimeUtils { /** * Converts a `java.time.Instant` into the composite `(epochMicros, nanosWithinMicro)` pair used - * by `TimestampLTZNanosType(precision)`. `epochMicros` is computed via [[instantToMicros]] - * (floor division of `instant.getNano` to micros); the last three sub-micro nanosecond digits - * are truncated to the target precision (floor toward zero of the fractional second) and kept - * as `nanosWithinMicro` in `[0, 999]`. At `precision = 9` the conversion is lossless within the - * valid range; at 7 / 8 the lower 2 / 1 sub-micro digits are dropped to match - * `CAST(... AS TIMESTAMP_LTZ(p))` semantics. + * by `TimestampLTZNanosType(precision)`. `epochMicros` comes from [[instantToMicros]] (floor + * toward `-inf` for the integral micro part); the last three decimal digits of + * `instant.getNano` (`[0, 999]`) become `nanosWithinMicro` after dropping `(9 - precision)` + * low digits. + * + * Combined, the result is the floor toward `-inf` of the original nanosecond value rounded + * down to the precision step (10^(9 - precision) ns). At `precision = 9` the conversion is + * lossless within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are dropped. + * The same flooring will be the basis of the future `CAST(... AS TIMESTAMP_LTZ(precision))` + * rule. */ def instantToTimestampNanos(instant: Instant, precision: Int): TimestampNanosVal = { val epochMicros = instantToMicros(instant) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index c14c7319c7a90..dbb9bc04914a6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -387,6 +387,107 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { } } + test("SPARK-57033: TimestampNTZNanosType converter rejects wrong external type") { + val dt = TimestampNTZNanosType(9) + checkError( + exception = intercept[SparkIllegalArgumentException] { + CatalystTypeConverters.createToCatalystConverter(dt)("not-a-LocalDateTime") + }, + condition = "INVALID_EXTERNAL_VALUE", + parameters = Map( + "other" -> "not-a-LocalDateTime", + "otherClass" -> "java.lang.String", + "dataType" -> "TIMESTAMP_NTZ(9)")) + // An `Instant` is also not accepted - the NTZ nano converter is wall-clock only. + val instant = Instant.parse("2019-02-26T16:56:00.123456789Z") + checkError( + exception = intercept[SparkIllegalArgumentException] { + CatalystTypeConverters.createToCatalystConverter(dt)(instant) + }, + condition = "INVALID_EXTERNAL_VALUE", + parameters = Map( + "other" -> instant.toString, + "otherClass" -> "java.time.Instant", + "dataType" -> "TIMESTAMP_NTZ(9)")) + } + + test("SPARK-57033: TimestampLTZNanosType converter rejects wrong external type") { + val dt = TimestampLTZNanosType(9) + checkError( + exception = intercept[SparkIllegalArgumentException] { + CatalystTypeConverters.createToCatalystConverter(dt)("not-an-Instant") + }, + condition = "INVALID_EXTERNAL_VALUE", + parameters = Map( + "other" -> "not-an-Instant", + "otherClass" -> "java.lang.String", + "dataType" -> "TIMESTAMP_LTZ(9)")) + // A `LocalDateTime` is also not accepted - LTZ requires an absolute instant. + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + checkError( + exception = intercept[SparkIllegalArgumentException] { + CatalystTypeConverters.createToCatalystConverter(dt)(ldt) + }, + condition = "INVALID_EXTERNAL_VALUE", + parameters = Map( + "other" -> ldt.toString, + "otherClass" -> "java.time.LocalDateTime", + "dataType" -> "TIMESTAMP_LTZ(9)")) + // The legacy `java.sql.Timestamp` external type is intentionally out of scope for + // `TimestampLTZNanosType` (see SPARK-57033). Verify it is rejected, not silently + // accepted via a fallback. + val ts = java.sql.Timestamp.from(Instant.parse("2019-02-26T16:56:00.123456789Z")) + checkError( + exception = intercept[SparkIllegalArgumentException] { + CatalystTypeConverters.createToCatalystConverter(dt)(ts) + }, + condition = "INVALID_EXTERNAL_VALUE", + parameters = Map( + "other" -> ts.toString, + "otherClass" -> "java.sql.Timestamp", + "dataType" -> "TIMESTAMP_LTZ(9)")) + } + + test("SPARK-57033: nested nanos timestamp types in arrays / maps / structs") { + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + val instant = Instant.parse("2019-02-26T16:56:00.987654321Z") + val ldtNano = DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision = 9) + val instantNano = DateTimeUtils.instantToTimestampNanos(instant, precision = 9) + + // Array of TimestampNTZNanosType. + val ntzArrayType = ArrayType(TimestampNTZNanosType(9), containsNull = true) + val ldts = Seq(ldt, null, ldt) + val catArr = CatalystTypeConverters.createToCatalystConverter(ntzArrayType)(ldts) + .asInstanceOf[GenericArrayData] + assert(catArr.numElements() === 3) + assert(catArr.get(0, TimestampNTZNanosType(9)) === ldtNano) + assert(catArr.isNullAt(1)) + assert(catArr.get(2, TimestampNTZNanosType(9)) === ldtNano) + assert(CatalystTypeConverters.createToScalaConverter(ntzArrayType)(catArr) === ldts) + + // Map of (String -> TimestampLTZNanosType). + val ltzMapType = MapType(StringType, TimestampLTZNanosType(9), valueContainsNull = true) + val instants = Map[String, Instant]("a" -> instant, "b" -> null) + val catMap = CatalystTypeConverters.createToCatalystConverter(ltzMapType)(instants) + val backMap = CatalystTypeConverters.createToScalaConverter(ltzMapType)(catMap) + .asInstanceOf[Map[Any, Any]] + assert(backMap("a") === instant) + assert(backMap("b") == null) + + // Struct with both nano fields plus a non-nano field, and a null row. + val nestedStruct = StructType( + StructField("ntz", TimestampNTZNanosType(9), nullable = true) :: + StructField("ltz", TimestampLTZNanosType(9), nullable = true) :: + StructField("name", StringType, nullable = true) :: Nil) + val outerStruct = StructType(StructField("inner", nestedStruct, nullable = true) :: Nil) + val outerToCat = CatalystTypeConverters.createToCatalystConverter(outerStruct) + val outerToScala = CatalystTypeConverters.createToScalaConverter(outerStruct) + val outerRow = Row(Row(ldt, instant, "abc")) + val nullOuterRow = Row(null) + assert(outerToScala(outerToCat(outerRow)) === outerRow) + assert(outerToScala(outerToCat(nullOuterRow)) === nullOuterRow) + } + test("converting java.time.LocalDate to DateType") { Seq( "0101-02-16", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index adcca27de4730..dc930af874908 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2649,6 +2649,32 @@ class DatasetSuite extends SharedSparkSession checkAnswer(df, rows) } + test("SPARK-57033: Dataset[Row] roundtrip truncates sub-micro to declared precision") { + val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789") + val instant = Instant.parse("2019-02-26T16:56:00.123456789Z") + val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789") + val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z") + // At p=7 the last two sub-micro digits are dropped (789 -> 700); + // at p=8 only the last one is dropped (789 -> 780). + val expectedSubMicro = Map(7 -> 700, 8 -> 780) + + for (p <- 7 to 8) { + val schema = new StructType() + .add("ntz", TimestampNTZNanosType(p), nullable = true) + .add("ltz", TimestampLTZNanosType(p), nullable = true) + val rows = Seq( + Row(ldt, instant), + Row(negativeEpochLdt, negativeEpochInstant)) + val df = spark.createDataFrame(rows.asJava, schema) + assert(df.schema === schema) + val drop = 789 - expectedSubMicro(p) + val expected = Seq( + Row(ldt.minusNanos(drop), instant.minusNanos(drop)), + Row(negativeEpochLdt.minusNanos(drop), negativeEpochInstant.minusNanos(drop))) + checkAnswer(df, expected) + } + } + test("SPARK-34605: implicit encoder for java.time.Duration") { val duration = java.time.Duration.ofMinutes(10) assert(spark.range(1).map { _ => duration }.head() === duration) From 466ad9205046fd44f9ebf04c6fecc389133e1e9a Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 15:23:50 +0200 Subject: [PATCH 10/13] Annotate nanos-only design choices and fuzz precision floor Document why bare LocalDateTime / Instant in convertToCatalyst stays on the microsecond converters, and why RowEncoder does not honor lenient serialization for nanos types: both nanosecond paths are schema-driven and out of scope for legacy java.sql.Timestamp / java.sql.Date external types under SPARK-57033. Add a 200-instant random fuzz in DateTimeUtilsSuite that walks precisions 7..9 for both helpers and asserts the documented invariant on each component (epochMicros invariant under truncation, sub-micro nanos floored to the precision step). --- .../sql/catalyst/encoders/RowEncoder.scala | 2 + .../sql/catalyst/CatalystTypeConverters.scala | 3 ++ .../catalyst/util/DateTimeUtilsSuite.scala | 42 +++++++++++++++++++ 3 files changed, 47 insertions(+) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index a657075719aef..859b5a3a9a0d6 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -99,6 +99,8 @@ object RowEncoder extends DataTypeErrorsBase { case TimestampType if SqlApiConf.get.datetimeJava8ApiEnabled => InstantEncoder(lenient) case TimestampType => TimestampEncoder(lenient) case TimestampNTZType => LocalDateTimeEncoder + // Nano timestamp types intentionally do not honor `lenient`: legacy `java.sql.Timestamp` / + // `java.sql.Date` external types are out of scope for nanosecond precision (SPARK-57033). case t: TimestampNTZNanosType => LocalDateTimeNanosEncoder(t.precision) case t: TimestampLTZNanosType => InstantNanosEncoder(t.precision) case DateType if SqlApiConf.get.datetimeJava8ApiEnabled => LocalDateEncoder(lenient) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 1598eb065a5fe..de131f1d58e0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -701,6 +701,9 @@ object CatalystTypeConverters { case ld: LocalDate => LocalDateConverter.toCatalyst(ld) case t: LocalTime => TimeConverter.toCatalyst(t) case t: Timestamp => TimestampConverter.toCatalyst(t) + // SPARK-57033: schema-less convertToCatalyst keeps bare `Instant` / `LocalDateTime` on the + // microsecond converters. The nanosecond path is schema-driven only - users opt in via an + // explicit `TimestampLTZNanosType` / `TimestampNTZNanosType` column in the schema. case i: Instant => InstantConverter.toCatalyst(i) case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l) case d: BigDecimal => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 6d45f85d5f734..47eb4a1e3e3cb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -1955,4 +1955,46 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { negativeExpectedMicros) } } + + test("SPARK-57033: random roundtrip across precisions floors to the precision step") { + val rnd = new scala.util.Random(0) + val min = Instant.parse("0001-01-01T00:00:00Z").getEpochSecond + val max = Instant.parse("9999-12-31T23:59:59.999999999Z").getEpochSecond + // For each random instant, verify both helpers floor the original nanosecond value + // (toward `-inf`) to the precision step `10^(9 - p)` ns. The whole-instant nanosecond + // count overflows `Long` for far-future dates, so we check the floor on the components + // instead: `epochMicros` is invariant across precisions (matches `instantToMicros`) and + // the sub-micro nanosecond residual is floored to the precision step. + for (_ <- 0 until 10) { + val secs = min + math.abs(rnd.nextLong()) % (max - min + 1) + val nano = rnd.nextInt(1_000_000_000) + val instant = Instant.ofEpochSecond(secs, nano.toLong) + val ldt = LocalDateTime.ofInstant(instant, ZoneOffset.UTC) + val expectedMicros = instantToMicros(instant) + val rawSubMicro = (nano % 1000).toShort + for (p <- 7 to 9) { + val step = math.pow(10, 9 - p).toLong.toShort + val expectedSubMicro = ((rawSubMicro / step) * step).toShort + + val ltz = instantToTimestampNanos(instant, p) + assert(ltz.epochMicros === expectedMicros, + s"LTZ p=$p instant=$instant epochMicros=${ltz.epochMicros}") + assert(ltz.nanosWithinMicro === expectedSubMicro, + s"LTZ p=$p instant=$instant nanosWithinMicro=${ltz.nanosWithinMicro}") + // Roundtrip preserves the truncated value. + val ltzBack = timestampNanosToInstant(ltz) + assert(instantToMicros(ltzBack) === expectedMicros) + assert(ltzBack.getNano % 1000 === expectedSubMicro) + + val ntz = localDateTimeToTimestampNanos(ldt, p) + assert(ntz.epochMicros === expectedMicros, + s"NTZ p=$p ldt=$ldt epochMicros=${ntz.epochMicros}") + assert(ntz.nanosWithinMicro === expectedSubMicro, + s"NTZ p=$p ldt=$ldt nanosWithinMicro=${ntz.nanosWithinMicro}") + val ntzBack = timestampNanosToLocalDateTime(ntz) + assert(DateTimeUtils.localDateTimeToMicros(ntzBack) === expectedMicros) + assert(ntzBack.getNano % 1000 === expectedSubMicro) + } + } + } } From beff171996aaf70a1c572462f3be25bdc9c55b9c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 20:23:59 +0200 Subject: [PATCH 11/13] Drop redundant conf set/unset in testTimestampNanosRowEncoder timestampNanosTypes.enabled defaults to Utils.isTesting (true) in tests, so the explicit spark.conf().set / try-finally / unset is unnecessary. Co-authored-by: Max Gekk --- .../apache/spark/sql/JavaDatasetSuite.java | 33 ++++++++----------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index a8093137c4700..416c92602b833 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -791,25 +791,20 @@ public void testLocalDateTimeEncoder() { @Test public void testTimestampNanosRowEncoder() { - spark.conf().set("spark.sql.timestampNanosTypes.enabled", "true"); - try { - final StructType schema = new StructType() - .add("ntz", org.apache.spark.sql.types.TimestampNTZNanosType.apply()) - .add("ltz", org.apache.spark.sql.types.TimestampLTZNanosType.apply()); - LocalDateTime ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789"); - Instant instant = Instant.parse("2019-02-26T16:56:00.987654321Z"); - List rows = Arrays.asList(create(ldt, instant), create(null, null)); - Dataset ds = spark.createDataset(rows, Encoders.row(schema)); - Assertions.assertEquals(schema, ds.schema()); - List collected = ds.collectAsList(); - Assertions.assertEquals(2, collected.size()); - Assertions.assertEquals(ldt, collected.get(0).get(0)); - Assertions.assertEquals(instant, collected.get(0).get(1)); - Assertions.assertTrue(collected.get(1).isNullAt(0)); - Assertions.assertTrue(collected.get(1).isNullAt(1)); - } finally { - spark.conf().unset("spark.sql.timestampNanosTypes.enabled"); - } + final StructType schema = new StructType() + .add("ntz", org.apache.spark.sql.types.TimestampNTZNanosType.apply()) + .add("ltz", org.apache.spark.sql.types.TimestampLTZNanosType.apply()); + LocalDateTime ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789"); + Instant instant = Instant.parse("2019-02-26T16:56:00.987654321Z"); + List rows = Arrays.asList(create(ldt, instant), create(null, null)); + Dataset ds = spark.createDataset(rows, Encoders.row(schema)); + Assertions.assertEquals(schema, ds.schema()); + List collected = ds.collectAsList(); + Assertions.assertEquals(2, collected.size()); + Assertions.assertEquals(ldt, collected.get(0).get(0)); + Assertions.assertEquals(instant, collected.get(0).get(1)); + Assertions.assertTrue(collected.get(1).isNullAt(0)); + Assertions.assertTrue(collected.get(1).isNullAt(1)); } @Test From 9092075998374e519a83e68149b438f0b95074e2 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 20:29:34 +0200 Subject: [PATCH 12/13] Gate RowEncoder nanos cases on timestampNanosTypes.enabled Without the check, Encoders.row(schema) / ExpressionEncoder(schema) would succeed with nanos timestamp types even when the feature flag is off, bypassing the gate that CatalystTypeConverters already enforces. Call DataTypeErrors.checkTimestampNanosTypesEnabled() in both nanos cases and add a negative test for the flag-off path. Co-authored-by: Max Gekk --- .../sql/catalyst/encoders/RowEncoder.scala | 10 +++++--- .../catalyst/encoders/RowEncoderSuite.scala | 23 ++++++++++++++++++- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index 859b5a3a9a0d6..705d5d8f11b1d 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -22,7 +22,7 @@ import scala.reflect.classTag import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalDateTimeNanosEncoder, LocalTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder} -import org.apache.spark.sql.errors.DataTypeErrorsBase +import org.apache.spark.sql.errors.{DataTypeErrors, DataTypeErrorsBase} import org.apache.spark.sql.internal.SqlApiConf import org.apache.spark.sql.types._ import org.apache.spark.sql.types.ops.TypeApiOps @@ -101,8 +101,12 @@ object RowEncoder extends DataTypeErrorsBase { case TimestampNTZType => LocalDateTimeEncoder // Nano timestamp types intentionally do not honor `lenient`: legacy `java.sql.Timestamp` / // `java.sql.Date` external types are out of scope for nanosecond precision (SPARK-57033). - case t: TimestampNTZNanosType => LocalDateTimeNanosEncoder(t.precision) - case t: TimestampLTZNanosType => InstantNanosEncoder(t.precision) + case t: TimestampNTZNanosType => + DataTypeErrors.checkTimestampNanosTypesEnabled() + LocalDateTimeNanosEncoder(t.precision) + case t: TimestampLTZNanosType => + DataTypeErrors.checkTimestampNanosTypesEnabled() + InstantNanosEncoder(t.precision) case DateType if SqlApiConf.get.datetimeJava8ApiEnabled => LocalDateEncoder(lenient) case DateType => DateEncoder(lenient) case _: TimeType => LocalTimeEncoder diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index 66dba915b5d15..69f4995220fe2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.encoders import scala.collection.mutable import scala.util.Random -import org.apache.spark.SparkRuntimeException +import org.apache.spark.{SparkException, SparkRuntimeException} import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest @@ -430,6 +430,27 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { } } + test("SPARK-57033: RowEncoder rejects nanos timestamp types when feature flag is off") { + Seq( + new StructType().add("t", TimestampNTZNanosType()), + new StructType().add("t", TimestampLTZNanosType()) + ).foreach { schema => + withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "false") { + checkError( + exception = intercept[SparkException] { + ExpressionEncoder(schema) + }, + condition = "FEATURE_NOT_ENABLED", + parameters = Map( + "featureName" -> "Nanosecond-precision timestamp types", + "configKey" -> "spark.sql.timestampNanosTypes.enabled", + "configValue" -> "true" + ) + ) + } + } + } + test("encoding/decoding DateType to/from java.time.LocalDate") { withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") { val schema = new StructType().add("d", DateType) From d00837749957241c5352805a4dea2cd2cb44bf84 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 28 May 2026 20:31:01 +0200 Subject: [PATCH 13/13] Apply scalafmt to SparkDateTimeUtils Co-authored-by: Max Gekk --- .../catalyst/util/SparkDateTimeUtils.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala index 78a711bc8feb9..597a96c548ce1 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala @@ -233,11 +233,10 @@ trait SparkDateTimeUtils { * the last three decimal digits of `localDateTime.getNano` (`[0, 999]`) become * `nanosWithinMicro` after dropping `(9 - precision)` low digits. * - * Combined, the result is the floor toward `-inf` of the original nanosecond value rounded - * down to the precision step (10^(9 - precision) ns). At `precision = 9` the conversion is - * lossless within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are dropped. - * The same flooring will be the basis of the future `CAST(... AS TIMESTAMP_NTZ(precision))` - * rule. + * Combined, the result is the floor toward `-inf` of the original nanosecond value rounded down + * to the precision step (10^(9 - precision) ns). At `precision = 9` the conversion is lossless + * within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are dropped. The same + * flooring will be the basis of the future `CAST(... AS TIMESTAMP_NTZ(precision))` rule. */ def localDateTimeToTimestampNanos( localDateTime: LocalDateTime, @@ -261,14 +260,13 @@ trait SparkDateTimeUtils { * Converts a `java.time.Instant` into the composite `(epochMicros, nanosWithinMicro)` pair used * by `TimestampLTZNanosType(precision)`. `epochMicros` comes from [[instantToMicros]] (floor * toward `-inf` for the integral micro part); the last three decimal digits of - * `instant.getNano` (`[0, 999]`) become `nanosWithinMicro` after dropping `(9 - precision)` - * low digits. + * `instant.getNano` (`[0, 999]`) become `nanosWithinMicro` after dropping `(9 - precision)` low + * digits. * - * Combined, the result is the floor toward `-inf` of the original nanosecond value rounded - * down to the precision step (10^(9 - precision) ns). At `precision = 9` the conversion is - * lossless within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are dropped. - * The same flooring will be the basis of the future `CAST(... AS TIMESTAMP_LTZ(precision))` - * rule. + * Combined, the result is the floor toward `-inf` of the original nanosecond value rounded down + * to the precision step (10^(9 - precision) ns). At `precision = 9` the conversion is lossless + * within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are dropped. The same + * flooring will be the basis of the future `CAST(... AS TIMESTAMP_LTZ(precision))` rule. */ def instantToTimestampNanos(instant: Instant, precision: Int): TimestampNanosVal = { val epochMicros = instantToMicros(instant)