Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, RebaseDateTime}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps
import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf}
import org.apache.spark.sql.internal.SQLConf._
Expand Down Expand Up @@ -414,7 +415,12 @@ class ParquetFileFormat
}
}

override def supportDataType(dataType: DataType): Boolean = dataType match {
override def supportDataType(dataType: DataType): Boolean =
// Types Framework: framework FIRST, original match as fallback.
ParquetTypeOps(dataType).map(_.supportDataType)
.getOrElse(supportDataTypeDefault(dataType))

private def supportDataTypeDefault(dataType: DataType): Boolean = dataType match {
// GeoSpatial data types in Parquet are limited only to types with supported SRIDs.
case g: GeometryType => GeometryType.isSridSupported(g.srid)
case g: GeographyType => GeographyType.isSridSupported(g.srid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.VariantMetadata
import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -215,32 +216,15 @@ object ParquetReadSupport extends Logging {
caseSensitive: Boolean,
useFieldId: Boolean,
returnNullStructIfAllFieldsMissing: Boolean): Type = {
val newParquetType = catalystType match {
case t: ArrayType if ParquetSchemaConverter.isComplexType(t.elementType) =>
// Only clips array types with nested type as element type.
clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive, useFieldId,
returnNullStructIfAllFieldsMissing)

case t: MapType
if ParquetSchemaConverter.isComplexType(t.keyType) ||
ParquetSchemaConverter.isComplexType(t.valueType) =>
// Only clips map types with nested key type or value type
clipParquetMapType(
parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive, useFieldId,
returnNullStructIfAllFieldsMissing)

case t: StructType if VariantMetadata.isVariantStruct(t) =>
clipVariantSchema(parquetType.asGroupType(), t, returnNullStructIfAllFieldsMissing)

case t: StructType =>
clipParquetGroup(parquetType.asGroupType(), t, caseSensitive, useFieldId,
returnNullStructIfAllFieldsMissing)

case _ =>
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
// to be mapped to desired user-space types. So UDTs shouldn't participate schema merging.
parquetType
}
// Types Framework: framework FIRST for struct-backed types that declare
// parquetStructSchema. Primitive framework types (parquetStructSchema = None)
// fall through to *Default which returns parquetType unchanged.
val newParquetType = ParquetTypeOps(catalystType)
.flatMap(_.parquetStructSchema)
.map(st => clipParquetGroup(parquetType.asGroupType(), st, caseSensitive, useFieldId,
returnNullStructIfAllFieldsMissing))
.getOrElse(clipParquetTypeDefault(parquetType, catalystType, caseSensitive, useFieldId,
returnNullStructIfAllFieldsMissing))

if (useFieldId && parquetType.getId != null) {
newParquetType.withId(parquetType.getId.intValue())
Expand All @@ -249,6 +233,38 @@ object ParquetReadSupport extends Logging {
}
}

private def clipParquetTypeDefault(
parquetType: Type,
catalystType: DataType,
caseSensitive: Boolean,
useFieldId: Boolean,
returnNullStructIfAllFieldsMissing: Boolean): Type = catalystType match {
case t: ArrayType if ParquetSchemaConverter.isComplexType(t.elementType) =>
// Only clips array types with nested type as element type.
clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive, useFieldId,
returnNullStructIfAllFieldsMissing)

case t: MapType
if ParquetSchemaConverter.isComplexType(t.keyType) ||
ParquetSchemaConverter.isComplexType(t.valueType) =>
// Only clips map types with nested key type or value type
clipParquetMapType(
parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive, useFieldId,
returnNullStructIfAllFieldsMissing)

case t: StructType if VariantMetadata.isVariantStruct(t) =>
clipVariantSchema(parquetType.asGroupType(), t, returnNullStructIfAllFieldsMissing)

case t: StructType =>
clipParquetGroup(parquetType.asGroupType(), t, caseSensitive, useFieldId,
returnNullStructIfAllFieldsMissing)

case _ =>
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
// to be mapped to desired user-space types. So UDTs shouldn't participate schema merging.
parquetType
}

/**
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]]. The element type
* of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, VariantMetadata}
import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{BinaryView, TimestampNanosVal, UTF8String, VariantVal}
Expand Down Expand Up @@ -306,6 +307,20 @@ private[parquet] class ParquetRowConverter(
parquetType: Type,
catalystType: DataType,
updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {
// Types Framework: framework FIRST, original match as fallback.
// Passes all ParquetRowConverter constructor params to the extended newConverter overload
// so struct-backed types can create recursive converters.
ParquetTypeOps(catalystType)
.map(_.newConverter(
parquetType, updater, schemaConverter, convertTz,
datetimeRebaseSpec, int96RebaseSpec))
.getOrElse(newConverterDefault(parquetType, catalystType, updater))
}

private def newConverterDefault(
parquetType: Type,
catalystType: DataType,
updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {

def isUnsignedIntTypeMatched(bitWidth: Int): Boolean = {
parquetType.getLogicalTypeAnnotation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.parquet.schema.Type.Repetition._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.VariantMetadata
import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{EdgeInterpolationAlgorithm => SparkEdgeInterpolationAlgorithm}
Expand Down Expand Up @@ -341,8 +342,7 @@ class ParquetToSparkSchemaConverter(
case time: TimeLogicalTypeAnnotation
if time.getUnit == TimeUnit.MICROS && !time.isAdjustedToUTC =>
TimeType(TimeType.MICROS_PRECISION)
case _ =>
illegalType()
case _ => illegalType()
}

case INT96 =>
Expand Down Expand Up @@ -655,8 +655,15 @@ class SparkToParquetSchemaConverter(
field: StructField,
repetition: Type.Repetition,
inShredded: Boolean): Type = {
// Types Framework: framework FIRST, original match as fallback.
ParquetTypeOps(field.dataType).map(_.convertToParquetType(field.name, repetition, inShredded))
.getOrElse(convertFieldDefault(field, repetition, inShredded))
}

field.dataType match {
private def convertFieldDefault(
field: StructField,
repetition: Type.Repetition,
inShredded: Boolean): Type = field.dataType match {
// ===================
// Simple atomic types
// ===================
Expand Down Expand Up @@ -936,7 +943,6 @@ class SparkToParquetSchemaConverter(
case _ =>
throw QueryCompilationErrors.cannotConvertDataTypeToParquetTypeError(field)
}
}
}

private[sql] object ParquetSchemaConverter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUt
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min}
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps
import org.apache.spark.sql.execution.datasources.v2.V2ColumnUtils
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.internal.SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED
Expand Down Expand Up @@ -206,7 +207,12 @@ object ParquetUtils extends Logging {
sqlConf.parquetVectorizedReaderEnabled &&
schema.forall(f => isBatchReadSupported(sqlConf, f.dataType))

def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean = dt match {
def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean =
// Types Framework: framework FIRST, original match as fallback.
ParquetTypeOps(dt).map(_.isBatchReadSupported(sqlConf))
.getOrElse(isBatchReadSupportedDefault(sqlConf, dt))

private def isBatchReadSupportedDefault(sqlConf: SQLConf, dt: DataType): Boolean = dt match {
case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
false
case _: AtomicType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils, STUtils}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.types.variant.Variant
Expand Down Expand Up @@ -204,6 +205,19 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
// `inShredded` indicates whether the current traversal is nested within a shredded Variant
// schema. This affects how timestamp values are written.
private def makeWriter(dataType: DataType, inShredded: Boolean): ValueWriter = {
// Types Framework: framework FIRST, original match as fallback.
// The recursive callback passes makeWriter (framework-first) so that sub-fields of
// struct-backed types also go through the framework, consistent with schema conversion.
// NOTE: recordConsumer is null during init() when makeWriter is first called -
// it's set later in prepareForWrite(). The existing code works because closures
// over `this.recordConsumer` (a var field) capture the var reference, not its value.
// We wrap in a lambda to achieve the same lazy evaluation for the ops method.
ParquetTypeOps(dataType)
.map(_.makeWriter(() => recordConsumer, makeWriter(_, inShredded)))
.getOrElse(makeWriterDefault(dataType, inShredded))
}

private def makeWriterDefault(dataType: DataType, inShredded: Boolean): ValueWriter =
dataType match {
case NullType => // No values of NullType should ever be written, as all values are null.
(_: SpecializedGetters, _: Int) => throw SparkUnsupportedOperationException()
Expand Down Expand Up @@ -358,7 +372,6 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {

case _ => throw SparkException.internalError(s"Unsupported data type $dataType.")
}
}

private def makeDecimalWriter(precision: Int, scale: Int): ValueWriter = {
assert(
Expand Down
Loading