diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index cf2626c2d63e..733e55a8609e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.{DateTimeUtils, RebaseDateTime} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector} import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf} import org.apache.spark.sql.internal.SQLConf._ @@ -414,7 +415,12 @@ class ParquetFileFormat } } - override def supportDataType(dataType: DataType): Boolean = dataType match { + override def supportDataType(dataType: DataType): Boolean = + // Types Framework: framework FIRST, original match as fallback. + ParquetTypeOps(dataType).map(_.supportDataType) + .getOrElse(supportDataTypeDefault(dataType)) + + private def supportDataTypeDefault(dataType: DataType): Boolean = dataType match { // GeoSpatial data types in Parquet are limited only to types with supported SRIDs. case g: GeometryType => GeometryType.isSridSupported(g.srid) case g: GeographyType => GeographyType.isSridSupported(g.srid) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 7ee5b4d224b3..e92858f6a1e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.VariantMetadata +import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.types._ @@ -215,32 +216,15 @@ object ParquetReadSupport extends Logging { caseSensitive: Boolean, useFieldId: Boolean, returnNullStructIfAllFieldsMissing: Boolean): Type = { - val newParquetType = catalystType match { - case t: ArrayType if ParquetSchemaConverter.isComplexType(t.elementType) => - // Only clips array types with nested type as element type. - clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive, useFieldId, - returnNullStructIfAllFieldsMissing) - - case t: MapType - if ParquetSchemaConverter.isComplexType(t.keyType) || - ParquetSchemaConverter.isComplexType(t.valueType) => - // Only clips map types with nested key type or value type - clipParquetMapType( - parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive, useFieldId, - returnNullStructIfAllFieldsMissing) - - case t: StructType if VariantMetadata.isVariantStruct(t) => - clipVariantSchema(parquetType.asGroupType(), t, returnNullStructIfAllFieldsMissing) - - case t: StructType => - clipParquetGroup(parquetType.asGroupType(), t, caseSensitive, useFieldId, - returnNullStructIfAllFieldsMissing) - - case _ => - // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able - // to be mapped to desired user-space types. So UDTs shouldn't participate schema merging. - parquetType - } + // Types Framework: framework FIRST for struct-backed types that declare + // parquetStructSchema. Primitive framework types (parquetStructSchema = None) + // fall through to *Default which returns parquetType unchanged. + val newParquetType = ParquetTypeOps(catalystType) + .flatMap(_.parquetStructSchema) + .map(st => clipParquetGroup(parquetType.asGroupType(), st, caseSensitive, useFieldId, + returnNullStructIfAllFieldsMissing)) + .getOrElse(clipParquetTypeDefault(parquetType, catalystType, caseSensitive, useFieldId, + returnNullStructIfAllFieldsMissing)) if (useFieldId && parquetType.getId != null) { newParquetType.withId(parquetType.getId.intValue()) @@ -249,6 +233,38 @@ object ParquetReadSupport extends Logging { } } + private def clipParquetTypeDefault( + parquetType: Type, + catalystType: DataType, + caseSensitive: Boolean, + useFieldId: Boolean, + returnNullStructIfAllFieldsMissing: Boolean): Type = catalystType match { + case t: ArrayType if ParquetSchemaConverter.isComplexType(t.elementType) => + // Only clips array types with nested type as element type. + clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive, useFieldId, + returnNullStructIfAllFieldsMissing) + + case t: MapType + if ParquetSchemaConverter.isComplexType(t.keyType) || + ParquetSchemaConverter.isComplexType(t.valueType) => + // Only clips map types with nested key type or value type + clipParquetMapType( + parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive, useFieldId, + returnNullStructIfAllFieldsMissing) + + case t: StructType if VariantMetadata.isVariantStruct(t) => + clipVariantSchema(parquetType.asGroupType(), t, returnNullStructIfAllFieldsMissing) + + case t: StructType => + clipParquetGroup(parquetType.asGroupType(), t, caseSensitive, useFieldId, + returnNullStructIfAllFieldsMissing) + + case _ => + // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able + // to be mapped to desired user-space types. So UDTs shouldn't participate schema merging. + parquetType + } + /** * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]]. The element type * of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 6c9485dc6fc8..c3016d929ac9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.{DataSourceUtils, VariantMetadata} +import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{BinaryView, TimestampNanosVal, UTF8String, VariantVal} @@ -306,6 +307,20 @@ private[parquet] class ParquetRowConverter( parquetType: Type, catalystType: DataType, updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = { + // Types Framework: framework FIRST, original match as fallback. + // Passes all ParquetRowConverter constructor params to the extended newConverter overload + // so struct-backed types can create recursive converters. + ParquetTypeOps(catalystType) + .map(_.newConverter( + parquetType, updater, schemaConverter, convertTz, + datetimeRebaseSpec, int96RebaseSpec)) + .getOrElse(newConverterDefault(parquetType, catalystType, updater)) + } + + private def newConverterDefault( + parquetType: Type, + catalystType: DataType, + updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = { def isUnsignedIntTypeMatched(bitWidth: Int): Boolean = { parquetType.getLogicalTypeAnnotation match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index c479c37b89fd..ba5261200464 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -30,6 +30,7 @@ import org.apache.parquet.schema.Type.Repetition._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.VariantMetadata +import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.types.{EdgeInterpolationAlgorithm => SparkEdgeInterpolationAlgorithm} @@ -341,8 +342,7 @@ class ParquetToSparkSchemaConverter( case time: TimeLogicalTypeAnnotation if time.getUnit == TimeUnit.MICROS && !time.isAdjustedToUTC => TimeType(TimeType.MICROS_PRECISION) - case _ => - illegalType() + case _ => illegalType() } case INT96 => @@ -655,8 +655,15 @@ class SparkToParquetSchemaConverter( field: StructField, repetition: Type.Repetition, inShredded: Boolean): Type = { + // Types Framework: framework FIRST, original match as fallback. + ParquetTypeOps(field.dataType).map(_.convertToParquetType(field.name, repetition, inShredded)) + .getOrElse(convertFieldDefault(field, repetition, inShredded)) + } - field.dataType match { + private def convertFieldDefault( + field: StructField, + repetition: Type.Repetition, + inShredded: Boolean): Type = field.dataType match { // =================== // Simple atomic types // =================== @@ -936,7 +943,6 @@ class SparkToParquetSchemaConverter( case _ => throw QueryCompilationErrors.cannotConvertDataTypeToParquetTypeError(field) } - } } private[sql] object ParquetSchemaConverter { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index c60754813994..bafc3a9ea52f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUt import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min} import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps import org.apache.spark.sql.execution.datasources.v2.V2ColumnUtils import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.internal.SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED @@ -206,7 +207,12 @@ object ParquetUtils extends Logging { sqlConf.parquetVectorizedReaderEnabled && schema.forall(f => isBatchReadSupported(sqlConf, f.dataType)) - def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean = dt match { + def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean = + // Types Framework: framework FIRST, original match as fallback. + ParquetTypeOps(dt).map(_.isBatchReadSupported(sqlConf)) + .getOrElse(isBatchReadSupportedDefault(sqlConf, dt)) + + private def isBatchReadSupportedDefault(sqlConf: SQLConf, dt: DataType): Boolean = dt match { case _: TimestampNTZNanosType | _: TimestampLTZNanosType => false case _: AtomicType => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index d7fd5991c75f..18c0a47facda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils, STUtils} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.types._ import org.apache.spark.types.variant.Variant @@ -204,6 +205,19 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { // `inShredded` indicates whether the current traversal is nested within a shredded Variant // schema. This affects how timestamp values are written. private def makeWriter(dataType: DataType, inShredded: Boolean): ValueWriter = { + // Types Framework: framework FIRST, original match as fallback. + // The recursive callback passes makeWriter (framework-first) so that sub-fields of + // struct-backed types also go through the framework, consistent with schema conversion. + // NOTE: recordConsumer is null during init() when makeWriter is first called - + // it's set later in prepareForWrite(). The existing code works because closures + // over `this.recordConsumer` (a var field) capture the var reference, not its value. + // We wrap in a lambda to achieve the same lazy evaluation for the ops method. + ParquetTypeOps(dataType) + .map(_.makeWriter(() => recordConsumer, makeWriter(_, inShredded))) + .getOrElse(makeWriterDefault(dataType, inShredded)) + } + + private def makeWriterDefault(dataType: DataType, inShredded: Boolean): ValueWriter = dataType match { case NullType => // No values of NullType should ever be written, as all values are null. (_: SpecializedGetters, _: Int) => throw SparkUnsupportedOperationException() @@ -358,7 +372,6 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case _ => throw SparkException.internalError(s"Unsupported data type $dataType.") } - } private def makeDecimalWriter(precision: Int, scale: Int): ValueWriter = { assert( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/ParquetTypeOps.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/ParquetTypeOps.scala new file mode 100644 index 000000000000..76296500a792 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/ParquetTypeOps.scala @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet.types.ops + +import java.time.ZoneId + +import org.apache.parquet.io.api.{Converter, RecordConsumer} +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Type.Repetition + +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec +import org.apache.spark.sql.execution.datasources.parquet.{HasParentContainerUpdater, ParentContainerUpdater, ParquetToSparkSchemaConverter} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DataType, StructType, TimeType} + +/** + * Optional trait for Parquet storage format integration in the Types Framework. + * + * Implement this trait to enable Parquet read/write support for a framework type. Each framework + * type that supports Parquet provides a concrete implementation and registers it in the companion + * object's apply() method. + * + * The trait covers these Parquet concerns: + * - Schema conversion: Spark DataType -> Parquet schema type (write path) + * - Value write: writing values to Parquet RecordConsumer + * - Row-based read: creating Parquet converters for reading into InternalRow + * - Type gates: declaring Parquet support (supportDataType) and the vectorized-read + * capability flag (isBatchReadSupported) + * - Schema clipping: declaring internal struct schema for column pruning + * + * NOT yet on the trait (deferred to follow-ups): vectorized-read batch updaters and + * filter-pushdown predicates. Only the isBatchReadSupported capability gate exists today; + * the actual vectorized updater and filter predicate hooks are not implemented here. + * + * DISPATCH PATTERN: Framework FIRST at all integration sites. Each Parquet infrastructure + * method wraps itself with: + * {{{ + * ParquetTypeOps(dt).map(_.method(...)).getOrElse(methodDefault(dt, ...)) + * }}} + * The original code is extracted to a *Default method unchanged. For a framework-managed type + * the ops handles it; for any other type ParquetTypeOps(dt) is None and the *Default fallback + * executes the original code path. + * + * STRUCT-BACKED TYPES: Types stored as Parquet groups should override the + * extended newConverter overload (which provides schemaConverter/convertTz/rebase specs for + * recursive field conversion) and declare parquetStructSchema for column pruning. + * + * @see TimeTypeParquetOps for a reference implementation (primitive Long-backed type) + * @since 4.3.0 + */ +private[parquet] trait ParquetTypeOps extends Serializable { + + // ==================== Schema Conversion ==================== + + /** + * Converts this Spark DataType to a Parquet schema Type (for the write path). + * + * For primitive types: returns a PrimitiveType with the appropriate annotation. + * For struct-backed types: returns a GroupType with sub-fields and a logical type annotation. + * + * @param fieldName the column/field name in the Parquet schema + * @param repetition REQUIRED, OPTIONAL, or REPEATED + * @param inShredded whether the field is nested within a shredded Variant schema + * @return the Parquet Type for this DataType + */ + def convertToParquetType( + fieldName: String, repetition: Repetition, inShredded: Boolean = false): Type + + // ==================== Value Write ==================== + + /** + * Creates a value writer that writes values of this type to a Parquet RecordConsumer. + * + * The writer is a closure that captures the RecordConsumer and is called once per row during + * Parquet file writing. The RecordConsumer is instance-specific (one per output file). + * + * IMPORTANT: The RecordConsumer is passed as a lazy supplier (() => RecordConsumer) because + * makeWriter is called during ParquetWriteSupport.init(), when recordConsumer is still null. + * It gets set later in prepareForWrite(). The closure must evaluate the supplier at write + * time, not at creation time. The existing infrastructure closures work because they capture + * `this.recordConsumer` (a var field - Scala closures over vars capture the reference, not + * the value). The supplier lambda achieves the same lazy evaluation for ops code. + * + * For primitive types: directly calls recordConsumer.addLong/addBinary/etc. with any + * necessary conversion (e.g., nanos to micros for TimeType). + * + * @param recordConsumer lazy supplier for the Parquet output stream (null during init) + * @param makeFieldWriter callback into ParquetWriteSupport for recursive field writer creation + * (struct-backed types use this to create writers for sub-fields) + * @return a closure that writes a value from a row at the given ordinal + */ + def makeWriter( + recordConsumer: () => RecordConsumer, + makeFieldWriter: DataType => (SpecializedGetters, Int) => Unit + ): (SpecializedGetters, Int) => Unit + + // ==================== Row-Based Read ==================== + + /** + * Creates a Parquet Converter for reading values of this type (simple version). + * + * Primitive types override this method. The converter typically extends + * ParquetPrimitiveConverter and overrides addLong/addBinary/etc. to perform any necessary + * conversion (e.g., micros to nanos for TimeType). + * + * WARNING: Struct-backed types must override the EXTENDED overload below instead. This simple + * version does not provide the schemaConverter, timezone, or rebase specs needed for recursive + * field conversion. Overriding only this method for a struct-backed type will compile but + * produce incorrect behavior at runtime (missing timezone conversion, calendar rebasing, etc.). + * + * @param parquetType the Parquet schema type for this field + * @param updater the parent container to set converted values into + * @return a Converter that reads Parquet values into the parent container + */ + def newConverter( + parquetType: Type, + updater: ParentContainerUpdater): Converter with HasParentContainerUpdater + + /** + * Creates a Parquet Converter for reading values of this type (extended version). + * + * Struct-backed types override this method to receive the extra context needed for recursive + * field conversion (schemaConverter for nested type resolution, timezone for timestamp + * conversion, rebase specs for calendar rebasing). + * + * Default delegates to the simple version - primitive types inherit this default and + * ignore the extra parameters. + * + * @param parquetType the Parquet schema type for this field + * @param updater the parent container to set converted values into + * @param schemaConverter for resolving nested Parquet schemas to Spark types + * @param convertTz timezone for timestamp conversion + * @param datetimeRebaseSpec calendar rebasing spec for datetime types + * @param int96RebaseSpec calendar rebasing spec for INT96 timestamps + * @return a Converter that reads Parquet values into the parent container + */ + def newConverter( + parquetType: Type, + updater: ParentContainerUpdater, + schemaConverter: ParquetToSparkSchemaConverter, + convertTz: Option[ZoneId], + datetimeRebaseSpec: RebaseSpec, + int96RebaseSpec: RebaseSpec): Converter with HasParentContainerUpdater = + newConverter(parquetType, updater) + + // ==================== Type Gates ==================== + + /** + * Whether this type is supported by the Parquet data source. + * Used by ParquetFileFormat.supportDataType. + */ + def supportDataType: Boolean = true + + /** + * Whether vectorized (batch) reading is supported for this type. + * Used by ParquetUtils.isBatchReadSupported. Default is false - types must opt in + * by overriding to true. When false, Spark uses the row-based read path (newConverter) + * which is always available. + * + * PRECONDITION: there is no framework vectorized-read hook yet, so returning true is only + * safe for a type the legacy Java vectorized path (ParquetVectorUpdaterFactory / + * VectorizedColumnReader) already handles. A new type that returns true without that + * legacy support would route into a factory that does not recognize it. TimeType is safe + * here precisely because the legacy path handles it; until the vectorized integration + * (follow-up) lands, other types should leave this false. + * + * @param sqlConf the active SQL configuration + */ + def isBatchReadSupported(sqlConf: SQLConf): Boolean = false + + // ==================== Schema Clipping (Struct-Backed Types) ==================== + + /** + * The Parquet-level struct schema for column pruning. + * + * Struct-backed types (stored as a Parquet GROUP) return the field schema so that + * ParquetReadSupport.clipParquetType can prune sub-fields based on the query's + * requested columns. + * + * This is independent of PhysicalDataType - Parquet storage representation may differ from + * internal row representation. A type could be PhysicalBinaryType in rows but a GROUP in + * Parquet (e.g., a type stored as binary in rows but as a GROUP with metadata fields on disk). + * + * Primitive types return None (no sub-fields to clip). + */ + def parquetStructSchema: Option[StructType] = None +} + +/** + * Factory object for creating ParquetTypeOps instances. + * + * Provides forward lookup (DataType -> ops) for framework-first dispatch at Parquet + * integration sites. apply() returns Some only for framework-managed types, so callers + * fall back to the legacy path for everything else. + */ +private[parquet] object ParquetTypeOps { + + /** + * Returns a ParquetTypeOps instance for the given DataType, if supported. + * + * Returns None if the type has no Parquet ops. + * This is the single registration point for all Parquet type operations. + */ + def apply(dt: DataType): Option[ParquetTypeOps] = { + dt match { + case tt: TimeType => Some(TimeTypeParquetOps(tt)) + // Add new types here - single registration point + case _ => None + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/TimeTypeParquetOps.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/TimeTypeParquetOps.scala new file mode 100644 index 000000000000..7f05361d8f6c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/TimeTypeParquetOps.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet.types.ops + +import org.apache.parquet.io.api.{Converter, RecordConsumer} +import org.apache.parquet.schema.{LogicalTypeAnnotation, Type, Types} +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Type.Repetition + +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.datasources.parquet.{HasParentContainerUpdater, ParentContainerUpdater, ParquetPrimitiveConverter} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DataType, TimeType} + +/** + * Parquet operations for TimeType. + * + * TimeType is a primitive Long-backed type stored in Parquet as INT64 with the + * TIME(isAdjustedToUTC=false, unit=MICROS) logical type annotation. + * + * IMPORTANT - internal vs Parquet representation: + * - Spark internal: nanoseconds since midnight (Long) + * - Parquet storage: microseconds since midnight (INT64) + * - Write path: nanos -> micros (DateTimeUtils.nanosToMicros) + * - Read path: micros -> nanos (DateTimeUtils.microsToNanos) + * + * @param t the TimeType with precision information + * @since 4.3.0 + */ +case class TimeTypeParquetOps(t: TimeType) extends ParquetTypeOps { + + // ==================== Schema Conversion ==================== + + override def convertToParquetType( + fieldName: String, repetition: Repetition, inShredded: Boolean): Type = + Types.primitive(INT64, repetition) + .as(LogicalTypeAnnotation.timeType(false, TimeUnit.MICROS)) + .named(fieldName) + + // ==================== Value Write ==================== + + override def makeWriter( + recordConsumer: () => RecordConsumer, + makeFieldWriter: DataType => (SpecializedGetters, Int) => Unit + ): (SpecializedGetters, Int) => Unit = + // Evaluate the supplier at write time (not creation time) because recordConsumer + // is null during init() and set later in prepareForWrite(). + (row: SpecializedGetters, ordinal: Int) => + recordConsumer().addLong(DateTimeUtils.nanosToMicros(row.getLong(ordinal))) + + // ==================== Row-Based Read ==================== + + override def newConverter( + parquetType: Type, + updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = { + // Framework-first dispatch in ParquetRowConverter routes here whenever the + // requested Spark type is TimeType, regardless of the actual Parquet encoding. + // Without this guard, files whose column is raw INT64, INT64 TIME(NANOS), + // INT64 TIMESTAMP(MICROS), INT32 TIME(MILLIS), etc. would silently decode as + // microsToNanos(value) and produce wrong results. Mirrors the inline guard + // that existed in ParquetRowConverter before the framework dispatch. + TimeTypeParquetOps.requireCompatibleParquetType(t, parquetType) + new ParquetPrimitiveConverter(updater) { + override def addLong(value: Long): Unit = { + this.updater.setLong(DateTimeUtils.microsToNanos(value)) + } + } + } + + // ==================== Vectorized Read ==================== + + override def isBatchReadSupported(sqlConf: SQLConf): Boolean = true +} + +private[ops] object TimeTypeParquetOps { + + /** + * Validates that a Parquet field can be decoded as TimeType. TimeType is stored + * as INT64 with TIME(MICROS, isAdjustedToUTC=false). Any other encoding (raw + * INT64, INT64 TIME(NANOS), INT32 TIME(MILLIS), INT64 TIMESTAMP(_), decimal- + * annotated, etc.) cannot be decoded as TimeType - throw the same error as + * the legacy ParquetRowConverter path so reads fail loudly instead of + * silently misinterpreting bytes. + */ + private[ops] def requireCompatibleParquetType( + sparkType: TimeType, parquetType: Type): Unit = { + val ok = parquetType.isPrimitive && + parquetType.asPrimitiveType.getPrimitiveTypeName == INT64 && + (parquetType.getLogicalTypeAnnotation match { + case t: LogicalTypeAnnotation.TimeLogicalTypeAnnotation => + t.getUnit == TimeUnit.MICROS && !t.isAdjustedToUTC + case _ => false + }) + if (!ok) { + throw QueryExecutionErrors.cannotCreateParquetConverterForDataTypeError( + sparkType, parquetType.toString) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/TimeTypeParquetOpsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/TimeTypeParquetOpsSuite.scala new file mode 100644 index 000000000000..9a21b5e3f4bb --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/TimeTypeParquetOpsSuite.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet.types.ops + +import org.apache.parquet.schema.{LogicalTypeAnnotation, Type, Types} +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{INT32, INT64} +import org.apache.parquet.schema.Type.Repetition.REQUIRED + +import org.apache.spark.{SparkFunSuite, SparkRuntimeException} +import org.apache.spark.sql.types.TimeType + +/** + * Unit tests for [[TimeTypeParquetOps.requireCompatibleParquetType]]. + * + * TimeType is stored in Parquet as INT64 TIME(MICROS, isAdjustedToUTC=false). + * The read-path guard accepts only that canonical encoding and rejects every + * other primitive/annotation combination so that reading fails loudly rather + * than silently mis-decoding (e.g. interpreting NANOS as MICROS, which would + * be off by 1000x). + * + * Note: rejecting isAdjustedToUTC=true is stricter than the legacy + * ParquetRowConverter guard, which accepts that encoding. This is a known, + * intentional divergence between the framework and legacy paths for this + * single case; reconciling it (either by relaxing the framework guard or + * tightening the legacy one) is tracked by SPARK-57416. + */ +class TimeTypeParquetOpsSuite extends SparkFunSuite { + + private val timeMicros = TimeType(TimeType.MICROS_PRECISION) + + // ---------- accept ---------- + + test("accepts INT64 TIME(MICROS, isAdjustedToUTC=false) - the canonical encoding") { + val field = Types.primitive(INT64, REQUIRED) + .as(LogicalTypeAnnotation.timeType(false, TimeUnit.MICROS)) + .named("c") + // Must not throw. + TimeTypeParquetOps.requireCompatibleParquetType(timeMicros, field) + } + + // ---------- the four primary reject paths ---------- + + test("rejects raw INT64 with no logical type annotation") { + val field = Types.primitive(INT64, REQUIRED).named("c") + assertRejects(timeMicros, field) + } + + test("rejects INT64 TIME(NANOS, isAdjustedToUTC=false)") { + val field = Types.primitive(INT64, REQUIRED) + .as(LogicalTypeAnnotation.timeType(false, TimeUnit.NANOS)) + .named("c") + assertRejects(timeMicros, field) + } + + test("rejects INT32 TIME(MILLIS, isAdjustedToUTC=false)") { + // Per Parquet spec TIME(MILLIS) is INT32; the primitive-type guard catches it. + val field = Types.primitive(INT32, REQUIRED) + .as(LogicalTypeAnnotation.timeType(false, TimeUnit.MILLIS)) + .named("c") + assertRejects(timeMicros, field) + } + + test("rejects INT64 TIME(MICROS, isAdjustedToUTC=true)") { + // The intended framework behavior is to reject this encoding: the canonical + // TimeType representation is local-time (isAdjustedToUTC=false). The legacy + // ParquetRowConverter guard accepts the encoding, so this is a known, + // intentional framework-vs-legacy divergence; reconciliation is tracked by + // SPARK-57416. + val field = Types.primitive(INT64, REQUIRED) + .as(LogicalTypeAnnotation.timeType(true, TimeUnit.MICROS)) + .named("c") + assertRejects(timeMicros, field) + } + + // ---------- additional rejects for full reject-set coverage ---------- + + test("rejects INT64 TIMESTAMP(MICROS) - wrong annotation kind") { + val field = Types.primitive(INT64, REQUIRED) + .as(LogicalTypeAnnotation.timestampType(false, TimeUnit.MICROS)) + .named("c") + assertRejects(timeMicros, field) + } + + test("rejects INT64 DECIMAL - wrong annotation kind") { + val field = Types.primitive(INT64, REQUIRED) + .as(LogicalTypeAnnotation.decimalType(2, 18)) + .named("c") + assertRejects(timeMicros, field) + } + + test("rejects non-primitive (group) type") { + val field: Type = Types.buildGroup(REQUIRED).named("c") + assertRejects(timeMicros, field) + } + + // Note: a "BINARY with TIME(MICROS) annotation" combination is impossible to + // construct - the parquet-mr Types builder itself rejects it with + // IllegalStateException("TIME(MICROS,false) can only annotate INT64"). So the + // wrong-primitive branch of requireCompatibleParquetType is unreachable for + // the TIME annotation; the raw-INT64 / TIMESTAMP / DECIMAL / group tests + // above already exercise the !isPrimitive and "non-TIME annotation" branches. + + // ---------- helper ---------- + + private def assertRejects(sparkType: TimeType, field: Type): Unit = { + val ex = intercept[SparkRuntimeException] { + TimeTypeParquetOps.requireCompatibleParquetType(sparkType, field) + } + assert(ex.getCondition === "PARQUET_CONVERSION_FAILURE.UNSUPPORTED", + s"expected PARQUET_CONVERSION_FAILURE.UNSUPPORTED, got ${ex.getCondition}") + } +}