diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index 600b49536aec..dd31993f2bf6 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, B import org.apache.spark.sql.errors.DataTypeErrorsBase import org.apache.spark.sql.internal.SqlApiConf import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.ops.EncodeTypeOps import org.apache.spark.util.ArrayImplicits._ /** @@ -71,6 +72,9 @@ object RowEncoder extends DataTypeErrorsBase { private[sql] def encoderForDataType(dataType: DataType, lenient: Boolean): AgnosticEncoder[_] = dataType match { + // Types Framework: delegate to EncodeTypeOps for supported types when enabled + case _ if SqlApiConf.get.typesFrameworkEnabled && EncodeTypeOps.supports(dataType) => + EncodeTypeOps(dataType).getEncoder case NullType => NullEncoder case BooleanType => BoxedBooleanEncoder case ByteType => BoxedByteEncoder diff --git a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala index 2e2105c852e6..bedd4afe0ed5 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala @@ -53,6 +53,7 @@ private[sql] trait SqlApiConf { def parserDfaCacheFlushRatio: Double def legacyParameterSubstitutionConstantsOnly: Boolean def legacyIdentifierClauseOnly: Boolean + def typesFrameworkEnabled: Boolean } private[sql] object SqlApiConf { @@ -110,4 +111,5 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf { override def parserDfaCacheFlushRatio: Double = -1.0 override def legacyParameterSubstitutionConstantsOnly: Boolean = false override def legacyIdentifierClauseOnly: Boolean = false + override def typesFrameworkEnabled: Boolean = false } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/EncodeTypeOps.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/EncodeTypeOps.scala new file mode 100644 index 000000000000..845f5d82fae2 --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/EncodeTypeOps.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types.ops + +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder +import org.apache.spark.sql.types.DataType + +/** + * Operations for row encoding and decoding. + * + * PURPOSE: + * Provides the encoder needed for Dataset[T] operations. The encoder handles + * serialization and deserialization of user objects to/from internal rows. + * + * USAGE CONTEXT: + * Used by: + * - RowEncoder.scala - creates encoders for schema fields + * - EncoderUtils.scala - encoder utility functions + * - Spark Connect - client-side encoding + * - Dataset[T] API - all typed dataset operations + * + * @see TimeTypeApiOps for reference implementation + * @since 4.1.0 + */ +trait EncodeTypeOps extends TypeApiOps { + /** + * Returns the AgnosticEncoder for this type. + * + * The encoder handles serialization (external -> internal) and deserialization + * (internal -> external) for Dataset[T] operations. + * + * @return AgnosticEncoder instance (e.g., LocalTimeEncoder for TimeType) + * @example TimeType -> LocalTimeEncoder (handles java.time.LocalTime) + * @example DateType -> LocalDateEncoder or DateEncoder (depending on config) + */ + def getEncoder: AgnosticEncoder[_] +} + +/** + * Companion object providing factory methods for EncodeTypeOps. + */ +object EncodeTypeOps { + /** + * Creates an EncodeTypeOps instance for the given DataType. + * + * @param dt The DataType to get encoding operations for + * @return EncodeTypeOps instance + * @throws SparkException if the type doesn't support EncodeTypeOps + */ + def apply(dt: DataType): EncodeTypeOps = TypeApiOps(dt).asInstanceOf[EncodeTypeOps] + + /** + * Checks if a DataType supports EncodeTypeOps operations. + * + * @param dt The DataType to check + * @return true if the type supports EncodeTypeOps + */ + def supports(dt: DataType): Boolean = TypeApiOps.supports(dt) +} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/FormatTypeOps.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/FormatTypeOps.scala new file mode 100644 index 000000000000..f92d3a021fdd --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/FormatTypeOps.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types.ops + +import org.apache.spark.sql.types.DataType +import org.apache.spark.unsafe.types.UTF8String + +/** + * Operations for formatting values as strings. + * + * PURPOSE: + * Handles string formatting for display and SQL output. This includes formatting + * values for CAST to STRING, EXPLAIN output, SHOW commands, and SQL literals. + * + * USAGE CONTEXT: + * Used by: + * - ToStringBase.scala - CAST(x AS STRING) expressions + * - EXPLAIN output - displaying literal values + * - SHOW commands - displaying column values + * - SQL generation - creating SQL literal representations + * - HiveResult.scala - formatting results for Thrift/JDBC + * + * @see TimeTypeApiOps for reference implementation + * @since 4.1.0 + */ +trait FormatTypeOps extends TypeApiOps { + /** + * Formats an internal value as a display string. + * + * This method converts the internal Catalyst representation to a human-readable + * string suitable for display or CAST to STRING operations. + * + * @param v The internal value (e.g., Long nanoseconds for TimeType) + * @return Formatted string (e.g., "10:30:45.123456") + * @example 37800000000000L -> "10:30:00" (for TIME) + */ + def format(v: Any): String + + /** + * Formats an internal value as a UTF8String. + * + * Convenience method that wraps format() for use in expressions that + * need UTF8String output directly. + * + * @param v The internal value + * @return UTF8String representation + */ + def formatUTF8(v: Any): UTF8String = UTF8String.fromString(format(v)) + + /** + * Formats an internal value as a SQL literal string. + * + * This method produces a string that can be used in SQL statements, + * including the type prefix if appropriate (e.g., "TIME '10:30:00'"). + * + * @param v The internal value + * @return SQL literal string (e.g., "TIME '10:30:00'") + * @example 37800000000000L -> "TIME '10:30:00'" (for TIME) + */ + def toSQLValue(v: Any): String +} + +/** + * Companion object providing factory methods for FormatTypeOps. + */ +object FormatTypeOps { + /** + * Creates a FormatTypeOps instance for the given DataType. + * + * @param dt The DataType to get formatting operations for + * @return FormatTypeOps instance + * @throws SparkException if the type doesn't support FormatTypeOps + */ + def apply(dt: DataType): FormatTypeOps = TypeApiOps(dt).asInstanceOf[FormatTypeOps] + + /** + * Checks if a DataType supports FormatTypeOps operations. + * + * @param dt The DataType to check + * @return true if the type supports FormatTypeOps + */ + def supports(dt: DataType): Boolean = TypeApiOps.supports(dt) +} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimeTypeApiOps.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimeTypeApiOps.scala new file mode 100644 index 000000000000..9840b4b5588d --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimeTypeApiOps.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types.ops + +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.LocalTimeEncoder +import org.apache.spark.sql.catalyst.util.{FractionTimeFormatter, TimeFormatter} +import org.apache.spark.sql.types.{DataType, TimeType} + +/** + * Client-side (spark-api) operations for TimeType. + * + * This class provides all client-side operations for TIME type including: + * - String formatting (FormatTypeOps) + * - Row encoding/decoding (EncodeTypeOps) + * + * IMPLEMENTATION NOTES: + * - Uses FractionTimeFormatter for consistent formatting with ToStringBase + * - Uses LocalTimeEncoder for Dataset[T] operations with java.time.LocalTime + * - SQL literals use the format: TIME 'HH:mm:ss.ffffff' + * + * RELATIONSHIP TO TimeTypeOps: + * TimeTypeOps (in catalyst package) extends this class to inherit client-side + * operations while adding server-side operations (physical type, literals, etc.). + * + * @param t The TimeType with precision information + * @since 4.1.0 + */ +class TimeTypeApiOps(val t: TimeType) + extends TypeApiOps + with FormatTypeOps + with EncodeTypeOps { + + override def dataType: DataType = t + + // ==================== FormatTypeOps ==================== + + /** + * Formatter for TIME values. + * + * Uses FractionTimeFormatter which: + * - Formats times as HH:mm:ss with fractional seconds + * - Does not output trailing zeros in the fraction + * - Example: "15:00:01.123400" formats as "15:00:01.1234" + */ + @transient + private lazy val timeFormatter: TimeFormatter = new FractionTimeFormatter() + + /** + * Formats a TIME value (nanoseconds since midnight) as a display string. + * + * @param v Long nanoseconds since midnight + * @return Formatted string (e.g., "10:30:45.123456") + */ + override def format(v: Any): String = { + timeFormatter.format(v.asInstanceOf[Long]) + } + + /** + * Formats a TIME value as a SQL literal. + * + * @param v Long nanoseconds since midnight + * @return SQL literal (e.g., "TIME '10:30:45.123456'") + */ + override def toSQLValue(v: Any): String = { + s"TIME '${format(v)}'" + } + + // ==================== EncodeTypeOps ==================== + + /** + * Returns the encoder for java.time.LocalTime. + * + * LocalTimeEncoder handles serialization/deserialization between + * LocalTime objects and internal Long nanosecond representation. + */ + override def getEncoder: AgnosticEncoder[_] = LocalTimeEncoder +} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala new file mode 100644 index 000000000000..98a4e9b51b5e --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types.ops + +import org.apache.spark.SparkException +import org.apache.spark.sql.types.{DataType, TimeType} + +/** + * Base trait for client-side (spark-api) type operations. + * + * PURPOSE: + * TypeApiOps handles operations that require spark-api internals (e.g., AgnosticEncoder) + * that are not available in the catalyst package. This separation prevents circular + * dependencies between sql/api and sql/catalyst modules. + * + * USAGE: + * TypeApiOps is used for: + * - Row encoding/decoding (EncodeTypeOps) + * - String formatting (FormatTypeOps) + * + * RELATIONSHIP TO TypeOps: + * - TypeOps (catalyst): Server-side operations - physical types, literals, conversions + * - TypeApiOps (spark-api): Client-side operations - encoding, formatting + * + * For TimeType, TimeTypeOps extends TimeTypeApiOps to inherit both sets of operations. + * + * @see TimeTypeApiOps for a reference implementation + * @since 4.1.0 + */ +trait TypeApiOps extends Serializable { + /** The DataType this Ops instance handles */ + def dataType: DataType +} + +/** + * Factory object for creating TypeApiOps instances. + */ +object TypeApiOps { + /** + * Creates a TypeApiOps instance for the given DataType. + * + * @param dt The DataType to get operations for + * @return TypeApiOps instance for the type + * @throws SparkException if no TypeApiOps implementation exists for the type + */ + def apply(dt: DataType): TypeApiOps = dt match { + case tt: TimeType => new TimeTypeApiOps(tt) + // Future types will be added here + case _ => throw SparkException.internalError( + s"No TypeApiOps implementation for ${dt.typeName}. " + + "This type is not yet supported by the Types Framework.") + } + + /** + * Checks if a DataType is supported by the Types Framework (client-side). + * + * @param dt The DataType to check + * @return true if the type is supported by the framework + */ + def supports(dt: DataType): Boolean = dt match { + case _: TimeType => true + // Future types will be added here + case _ => false + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index b800b84e35ce..b6f5a1ecd280 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -30,6 +30,7 @@ import scala.language.existentials import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types.ops.ExternalTypeOps import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -63,6 +64,9 @@ object CatalystTypeConverters { private def getConverterForType(dataType: DataType): CatalystTypeConverter[Any, Any, Any] = { TypeUtils.failUnsupportedDataType(dataType, SQLConf.get) val converter = dataType match { + // Types Framework: delegate to ExternalTypeOps for supported types when enabled + case _ if SQLConf.get.typesFrameworkEnabled && ExternalTypeOps.supports(dataType) => + new TypeOpsConverter(dataType) case udt: UserDefinedType[_] => UDTConverter(udt) case arrayType: ArrayType => ArrayConverter(arrayType.elementType) case mapType: MapType => MapConverter(mapType.keyType, mapType.valueType) @@ -150,6 +154,18 @@ object CatalystTypeConverters { override def toScalaImpl(row: InternalRow, column: Int): Any = row.get(column, dataType) } + /** + * Adapter that wraps ExternalTypeOps to implement CatalystTypeConverter. + * Used by the Types Framework to provide type conversion for framework-supported types. + */ + private class TypeOpsConverter(dt: DataType) + extends CatalystTypeConverter[Any, Any, Any] { + private val ops = ExternalTypeOps(dt) + override def toCatalystImpl(scalaValue: Any): Any = ops.toCatalystImpl(scalaValue) + override def toScala(catalystValue: Any): Any = ops.toScala(catalystValue) + override def toScalaImpl(row: InternalRow, column: Int): Any = ops.toScalaImpl(row, column) + } + private case class UDTConverter[A >: Null]( udt: UserDefinedType[A]) extends CatalystTypeConverter[A, A, Any] { // toCatalyst (it calls toCatalystImpl) will do null check. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala index f9bf0ebdfd9a..106a2a7ccc25 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.types.ops.PhyTypeOps import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.util.ArrayImplicits._ @@ -170,6 +172,8 @@ object InternalRow { */ @scala.annotation.tailrec def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { + case _ if SQLConf.get.typesFrameworkEnabled && PhyTypeOps.supports(dt) => + PhyTypeOps(dt).getRowWriter(ordinal) case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean]) case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte]) case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala index e7b53344abbd..35a49c1bfc5b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala @@ -23,7 +23,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, CalendarIntervalEncoder, NullEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, SparkDecimalEncoder, VariantEncoder} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, PhysicalIntegerType, PhysicalLongType} +import org.apache.spark.sql.catalyst.types.ops.PhyTypeOps import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType} import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal, GeometryVal, UTF8String, VariantVal} @@ -99,6 +101,9 @@ object EncoderUtils { def dataTypeJavaClass(dt: DataType): Class[_] = { dt match { + // Types Framework: delegate to PhyTypeOps for supported types when enabled + case _ if SQLConf.get.typesFrameworkEnabled && PhyTypeOps.supports(dt) => + PhyTypeOps(dt).getJavaClass case _: DecimalType => classOf[Decimal] case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType] case _: YearMonthIntervalType => classOf[PhysicalIntegerType.InternalType] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala index 1f755df0516f..1734089e421a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions import scala.annotation.tailrec +import org.apache.spark.sql.catalyst.types.ops.PhyTypeOps +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -196,6 +198,9 @@ final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGen @tailrec private[this] def dataTypeToMutableValue(dataType: DataType): MutableValue = dataType match { + // Types Framework: delegate to PhyTypeOps for supported types when enabled + case _ if SQLConf.get.typesFrameworkEnabled && PhyTypeOps.supports(dataType) => + PhyTypeOps(dataType).getMutableValue // We use INT for DATE and YearMonthIntervalType internally case IntegerType | DateType | _: YearMonthIntervalType => new MutableInt // We use Long for Timestamp, Timestamp without time zone and DayTimeInterval internally diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala index bc294fd722b3..706a974b90d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.BinaryOutputStyle import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.ops.FormatTypeOps import org.apache.spark.unsafe.UTF8StringBuilder import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.util.ArrayImplicits._ @@ -66,6 +67,9 @@ trait ToStringBase { self: UnaryExpression with TimeZoneAwareExpression => } private def castToString(from: DataType): Any => UTF8String = from match { + // Types Framework: delegate to FormatTypeOps for supported types when enabled + case _ if SQLConf.get.typesFrameworkEnabled && FormatTypeOps.supports(from) => + acceptAny[Any](v => FormatTypeOps(from).formatUTF8(v)) case CalendarIntervalType => acceptAny[CalendarInterval](i => UTF8String.fromString(i.toString)) case BinaryType => acceptAny[Array[Byte]](binaryFormatter.apply) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 13b1d329f7ec..880bd4ae3cba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.encoders.HashableWeakReference import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.types.ops.PhyTypeOps import org.apache.spark.sql.catalyst.util.{ArrayData, CollationAwareUTF8String, CollationFactory, CollationSupport, MapData, SQLOrderingUtil, UnsafeRowUtils} import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS import org.apache.spark.sql.errors.QueryExecutionErrors @@ -1991,6 +1992,9 @@ object CodeGenerator extends Logging { @tailrec def javaClass(dt: DataType): Class[_] = dt match { + // Types Framework: delegate to PhyTypeOps for supported types when enabled + case _ if SQLConf.get.typesFrameworkEnabled && PhyTypeOps.supports(dt) => + PhyTypeOps(dt).getJavaClass case BooleanType => java.lang.Boolean.TYPE case ByteType => java.lang.Byte.TYPE case ShortType => java.lang.Short.TYPE diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 6448194f9705..c8a0b36e107d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.trees.TreePattern import org.apache.spark.sql.catalyst.trees.TreePattern.{LITERAL, NULL_LITERAL, TRUE_OR_FALSE_LITERAL} import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.types.ops.LiteralTypeOps import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localTimeToNanos} import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE @@ -187,6 +188,9 @@ object Literal { * Create a literal with default value for given DataType */ def default(dataType: DataType): Literal = dataType match { + // Types Framework: delegate to LiteralTypeOps for supported types when enabled + case _ if SQLConf.get.typesFrameworkEnabled && LiteralTypeOps.supports(dataType) => + LiteralTypeOps(dataType).getDefaultLiteral case NullType => create(null, NullType) case BooleanType => Literal(false) case ByteType => Literal(0.toByte) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala index 2f2b91e0b969..ee0720236de3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala @@ -21,8 +21,10 @@ import scala.reflect.runtime.universe.TypeTag import scala.reflect.runtime.universe.typeTag import org.apache.spark.sql.catalyst.expressions.{Ascending, BoundReference, InterpretedOrdering, SortOrder} +import org.apache.spark.sql.catalyst.types.ops.PhyTypeOps import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory, MapData, SQLOrderingUtil} import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType, DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType, GeographyType, GeometryType, IntegerExactNumeric, IntegerType, IntegralType, LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric, ShortType, StringType, StructField, StructType, TimestampNTZType, TimestampType, TimeType, VarcharType, VariantType, YearMonthIntervalType} import org.apache.spark.unsafe.types.{ByteArray, GeographyVal, GeometryVal, UTF8String, VariantVal} import org.apache.spark.util.ArrayImplicits._ @@ -35,6 +37,9 @@ sealed abstract class PhysicalDataType { object PhysicalDataType { def apply(dt: DataType): PhysicalDataType = dt match { + // Types Framework: delegate to PhyTypeOps for supported types when enabled + case _ if SQLConf.get.typesFrameworkEnabled && PhyTypeOps.supports(dt) => + PhyTypeOps(dt).getPhysicalType case NullType => PhysicalNullType case ByteType => PhysicalByteType case ShortType => PhysicalShortType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/ExternalTypeOps.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/ExternalTypeOps.scala new file mode 100644 index 000000000000..e88326528f6f --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/ExternalTypeOps.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.types.ops + +import javax.annotation.Nullable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.DataType + +/** + * Operations for converting between external (Java/Scala) and internal (Catalyst) representations. + * + * PURPOSE: + * Handles bidirectional conversion between user-facing types (e.g., java.time.LocalTime) + * and internal Catalyst types (e.g., Long representing nanoseconds since midnight). + * This is essential for Dataset[T] operations, UDFs, and collect() operations. + * + * USAGE CONTEXT: + * Used by: + * - CatalystTypeConverters.scala - main conversion utilities + * - Dataset[T] encoders - converting user objects to/from internal rows + * - UDF execution - converting arguments and return values + * - collect() operations - converting results back to user types + * - Cast expressions - some cast operations use external type conversion + * + * TYPE PARAMETERS: + * - ScalaInputType: The external type accepted as input (e.g., LocalTime) + * - ScalaOutputType: The external type produced as output (e.g., LocalTime) + * - CatalystType: The internal representation type (e.g., Long) + * + * In most cases, ScalaInputType and ScalaOutputType are the same type. + * + * @see TimeTypeOps for reference implementation + * @since 4.1.0 + */ +trait ExternalTypeOps extends TypeOps { + /** + * Converts an external (Scala/Java) value to its internal Catalyst representation. + * + * This method handles null checking and Option unwrapping automatically via toCatalyst(). + * Implementations should handle the non-null case in toCatalystImpl(). + * + * @param maybeScalaValue The external value (may be null or Option) + * @return The internal Catalyst representation, or null if input was null/None + */ + final def toCatalyst(@Nullable maybeScalaValue: Any): Any = { + maybeScalaValue match { + case null | None => null + case opt: Some[_] => toCatalystImpl(opt.get) + case other => toCatalystImpl(other) + } + } + + /** + * Converts an external value to its internal representation. + * + * This method is called by toCatalyst() after null checking. + * Implementations should assume the input is non-null. + * + * @param scalaValue The external value (guaranteed non-null) + * @return The internal Catalyst representation + * @example LocalTime.of(10, 30, 0) -> 37800000000000L (nanoseconds since midnight) + */ + def toCatalystImpl(scalaValue: Any): Any + + /** + * Converts an internal Catalyst value to its external representation. + * + * @param catalystValue The internal value (may be null) + * @return The external representation, or null if input was null + * @example 37800000000000L -> LocalTime.of(10, 30, 0) + */ + def toScala(@Nullable catalystValue: Any): Any + + /** + * Extracts a value from an InternalRow and converts to external representation. + * + * This is a convenience method used when reading values from rows. + * The caller should check for null before calling this method. + * + * @param row The InternalRow containing the value + * @param column The column index + * @return The external representation of the value at the given column + */ + def toScalaImpl(row: InternalRow, column: Int): Any + + /** + * Extracts a value from an InternalRow with null checking. + * + * @param row The InternalRow containing the value + * @param column The column index + * @return The external representation, or null if the column is null + */ + final def toScala(row: InternalRow, column: Int): Any = { + if (row.isNullAt(column)) null else toScalaImpl(row, column) + } +} + +/** + * Companion object providing factory methods for ExternalTypeOps. + */ +object ExternalTypeOps { + /** + * Creates an ExternalTypeOps instance for the given DataType. + * + * @param dt The DataType to get external conversion operations for + * @return ExternalTypeOps instance + * @throws SparkException if the type doesn't support ExternalTypeOps + */ + def apply(dt: DataType): ExternalTypeOps = TypeOps(dt).asInstanceOf[ExternalTypeOps] + + /** + * Checks if a DataType supports ExternalTypeOps operations. + * + * @param dt The DataType to check + * @return true if the type supports ExternalTypeOps + */ + def supports(dt: DataType): Boolean = TypeOps.supports(dt) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/LiteralTypeOps.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/LiteralTypeOps.scala new file mode 100644 index 000000000000..ddb6ee3c2908 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/LiteralTypeOps.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.types.ops + +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.types.DataType + +/** + * Operations related to literal creation and code generation. + * + * PURPOSE: + * Provides default literal values and Java literal representations for types. + * This is used when a column needs a default value (e.g., ALTER TABLE ADD COLUMN) + * or when generating code that includes literal values. + * + * USAGE CONTEXT: + * Used by: + * - literals.scala - Literal.default() method + * - CodeGenerator.scala - generating Java literal strings + * - ALTER TABLE ADD COLUMN - default column values + * - Query optimization - constant folding + * + * @see TimeTypeOps for reference implementation + * @since 4.1.0 + */ +trait LiteralTypeOps extends TypeOps { + /** + * Returns the default literal value for this type. + * + * This is used when a default value is needed, such as when adding + * a new column without specifying a default. + * + * @return Literal with the default value and correct type + * @example TimeType -> Literal(0L, TimeType(precision)) representing "00:00:00" + * @example DecimalType(10,2) -> Literal(Decimal(0), DecimalType(10,2)) + */ + def getDefaultLiteral: Literal + + /** + * Returns the Java literal representation for code generation. + * + * When generating Java/Scala code, literal values need to be represented + * as strings that the compiler can parse. This method provides that + * representation. + * + * @param v The internal value to represent + * @return Java literal string (e.g., "123456L" for a Long value) + * @example 123456L -> "123456L" (for TimeType nanoseconds) + * @example Decimal(10.5) -> "Decimal(10.5)" (for DecimalType) + */ + def getJavaLiteral(v: Any): String +} + +/** + * Companion object providing factory methods for LiteralTypeOps. + */ +object LiteralTypeOps { + /** + * Creates a LiteralTypeOps instance for the given DataType. + * + * @param dt The DataType to get literal operations for + * @return LiteralTypeOps instance + * @throws SparkException if the type doesn't support LiteralTypeOps + */ + def apply(dt: DataType): LiteralTypeOps = TypeOps(dt).asInstanceOf[LiteralTypeOps] + + /** + * Checks if a DataType supports LiteralTypeOps operations. + * + * @param dt The DataType to check + * @return true if the type supports LiteralTypeOps + */ + def supports(dt: DataType): Boolean = TypeOps.supports(dt) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/PhyTypeOps.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/PhyTypeOps.scala new file mode 100644 index 000000000000..c2c72c992144 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/PhyTypeOps.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.types.ops + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.MutableValue +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.DataType + +/** + * Operations related to physical type representation. + * + * PURPOSE: + * Defines how a logical type (e.g., TimeType) is physically represented in memory. + * This includes the physical data type, Java class for code generation, and mutable + * value type for SpecificInternalRow. + * + * USAGE CONTEXT: + * Used by: + * - PhysicalDataType.scala - determines physical type mapping + * - CodeGenerator.scala - determines Java class for codegen + * - SpecificInternalRow.scala - determines MutableValue type + * - ColumnVector operations + * + * @see TimeTypeOps for reference implementation + * @since 4.1.0 + */ +trait PhyTypeOps extends TypeOps { + /** + * Returns the physical data type representation. + * + * The physical type determines how values are stored in memory and accessed + * from InternalRow and ColumnVector. + * + * @return PhysicalDataType (e.g., PhysicalLongType for TimeType) + * @example TimeType -> PhysicalLongType (stored as Long nanoseconds) + * @example DecimalType -> PhysicalDecimalType (stored as Decimal object) + */ + def getPhysicalType: PhysicalDataType + + /** + * Returns the Java class used for code generation. + * + * This class is used when generating Java/Scala code for expressions + * that operate on this type. + * + * @return Java class (e.g., classOf[Long] for TimeType) + */ + def getJavaClass: Class[_] + + /** + * Returns a MutableValue instance for use in SpecificInternalRow. + * + * MutableValue is a mutable wrapper that can hold values of the physical type. + * It's used for efficient row mutation without boxing/unboxing overhead. + * + * @return MutableValue instance (e.g., MutableLong for TimeType) + */ + def getMutableValue: MutableValue + + /** + * Returns a writer function for setting values in an InternalRow. + * + * The writer function takes an InternalRow and a value, and sets the value + * at the given ordinal using the appropriate type-specific setter (e.g., setLong). + * + * Used by InternalRow.getWriter() for interpreted aggregation and row mutation. + * + * @param ordinal The column index to write to + * @return Writer function (InternalRow, Any) => Unit + * @example TimeType -> (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) + */ + def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit +} + +/** + * Companion object providing factory methods for PhyTypeOps. + */ +object PhyTypeOps { + /** + * Creates a PhyTypeOps instance for the given DataType. + * + * @param dt The DataType to get physical operations for + * @return PhyTypeOps instance + * @throws SparkException if the type doesn't support PhyTypeOps + */ + def apply(dt: DataType): PhyTypeOps = TypeOps(dt).asInstanceOf[PhyTypeOps] + + /** + * Checks if a DataType supports PhyTypeOps operations. + * + * Note: All types in the framework support PhyTypeOps, so this is equivalent + * to TypeOps.supports(dt). This method exists for consistency with the + * check-and-delegate pattern. + * + * @param dt The DataType to check + * @return true if the type supports PhyTypeOps + */ + def supports(dt: DataType): Boolean = TypeOps.supports(dt) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala new file mode 100644 index 000000000000..92d83a969d9e --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.types.ops + +import java.time.LocalTime + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Literal, MutableLong, MutableValue} +import org.apache.spark.sql.catalyst.types.{PhysicalDataType, PhysicalLongType} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types.{DataType, TimeType} +import org.apache.spark.sql.types.ops.TimeTypeApiOps + +/** + * Server-side (catalyst) operations for TimeType. + * + * This class provides all type-specific operations for TIME type including: + * - Physical type representation (PhyTypeOps) + * - Literal creation (LiteralTypeOps) + * - External type conversion (ExternalTypeOps) + * + * It also inherits client-side operations from TimeTypeApiOps: + * - String formatting (FormatTypeOps) + * - Row encoding (EncodeTypeOps) + * + * INTERNAL REPRESENTATION: + * TIME values are stored as Long representing nanoseconds since midnight. + * - Range: 0 to 86,399,999,999,999 (00:00:00.000000000 to 23:59:59.999999999) + * - Physical type: PhysicalLongType + * - External type: java.time.LocalTime + * + * PRECISION: + * TimeType supports precision from 0 to 6 (fractional seconds digits). + * The internal representation always uses nanoseconds regardless of precision. + * Precision only affects parsing and display formatting. + * + * EXAMPLE USAGE: + * {{{ + * val ops = TimeTypeOps(TimeType(6)) + * ops.getPhysicalType // Returns PhysicalLongType + * ops.getDefaultLiteral // Returns Literal(0L, TimeType(6)) representing "00:00:00" + * ops.toScala(37800000000000L) // Returns LocalTime.of(10, 30, 0) + * }}} + * + * @param t The TimeType with precision information + * @since 4.1.0 + */ +case class TimeTypeOps(override val t: TimeType) + extends TimeTypeApiOps(t) + with TypeOps + with PhyTypeOps + with LiteralTypeOps + with ExternalTypeOps { + + override def dataType: DataType = t + + // ==================== PhyTypeOps ==================== + + /** + * TIME is physically stored as Long (nanoseconds since midnight). + */ + override def getPhysicalType: PhysicalDataType = PhysicalLongType + + /** + * Java class for code generation is Long. + */ + override def getJavaClass: Class[_] = classOf[Long] + + /** + * MutableLong for SpecificInternalRow operations. + */ + override def getMutableValue: MutableValue = new MutableLong + + /** + * Row writer that uses setLong for TIME's Long physical representation. + */ + override def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit = + (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) + + // ==================== LiteralTypeOps ==================== + + /** + * Default TIME literal is midnight (00:00:00). + * + * Returns Literal(0L, TimeType(precision)) where 0L represents midnight + * (0 nanoseconds since midnight). + */ + override def getDefaultLiteral: Literal = Literal.create(0L, t) + + /** + * Java literal representation for code generation. + * + * @param v Long nanoseconds value + * @return Java literal string (e.g., "37800000000000L") + */ + override def getJavaLiteral(v: Any): String = s"${v}L" + + // ==================== ExternalTypeOps ==================== + + /** + * Converts LocalTime to internal Long nanoseconds representation. + * + * @param scalaValue java.time.LocalTime instance + * @return Long nanoseconds since midnight + */ + override def toCatalystImpl(scalaValue: Any): Any = { + DateTimeUtils.localTimeToNanos(scalaValue.asInstanceOf[LocalTime]) + } + + /** + * Converts internal Long nanoseconds to LocalTime. + * + * @param catalystValue Long nanoseconds since midnight (may be null) + * @return java.time.LocalTime instance, or null if input was null + */ + override def toScala(catalystValue: Any): Any = { + if (catalystValue == null) null + else DateTimeUtils.nanosToLocalTime(catalystValue.asInstanceOf[Long]) + } + + /** + * Extracts TIME value from InternalRow and converts to LocalTime. + * + * @param row The InternalRow containing the TIME value + * @param column The column index + * @return java.time.LocalTime instance + */ + override def toScalaImpl(row: InternalRow, column: Int): Any = { + DateTimeUtils.nanosToLocalTime(row.getLong(column)) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala new file mode 100644 index 000000000000..6740382d83c8 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.types.ops + +import org.apache.spark.SparkException +import org.apache.spark.sql.types.{DataType, TimeType} + +/** + * Base trait for all type operations in the Types Framework. + * + * PURPOSE: + * TypeOps centralizes type-specific operations that were previously scattered across 40+ files + * in pattern matching expressions like `case _: TimeType => ...`. By implementing TypeOps for + * a new type, all integration points automatically work without modifying those files. + * + * USAGE: + * Integration points use the check-and-delegate pattern: + * {{{ + * def getPhysicalType(dt: DataType): PhysicalDataType = dt match { + * case _ if TypeOps.supports(dt) => TypeOps(dt).asInstanceOf[PhyTypeOps].getPhysicalType + * case DateType => PhysicalIntegerType + * // ... legacy types + * } + * }}} + * + * IMPLEMENTATION: + * To add a new type to the framework: + * 1. Create a case class extending TypeOps and the relevant traits (PhyTypeOps, etc.) + * 2. Register it in the TypeOps.apply() and TypeOps.supports() methods below + * 3. No other file modifications needed - all integration points automatically work + * + * @see TimeTypeOps for a reference implementation + * @since 4.1.0 + */ +trait TypeOps extends Serializable { + /** The DataType this Ops instance handles */ + def dataType: DataType +} + +/** + * Factory object for creating TypeOps instances. + * + * Uses pattern matching rather than Set enumeration to support parameterized types + * like TimeType(precision) or DecimalType(precision, scale). + */ +object TypeOps { + /** + * Creates a TypeOps instance for the given DataType. + * + * @param dt The DataType to get operations for + * @return TypeOps instance for the type + * @throws SparkException if no TypeOps implementation exists for the type + */ + def apply(dt: DataType): TypeOps = dt match { + case tt: TimeType => TimeTypeOps(tt) + // Future types will be added here: + // case dt: DecimalType => DecimalTypeOps(dt) + // case _: DurationType => DurationTypeOps(dt) + case _ => throw SparkException.internalError( + s"No TypeOps implementation for ${dt.typeName}. " + + "This type is not yet supported by the Types Framework.") + } + + /** + * Checks if a DataType is supported by the Types Framework. + * + * This method should be used in the check-and-delegate pattern at integration points: + * {{{ + * case _ if TypeOps.supports(dt) => TypeOps(dt).someMethod() + * }}} + * + * @param dt The DataType to check + * @return true if the type is supported by the framework + */ + def supports(dt: DataType): Boolean = dt match { + case _: TimeType => true + // Future types will be added here + case _ => false + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f17199547665..a888a6e0e6c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -592,6 +592,16 @@ object SQLConf { .booleanConf .createWithDefaultFunction(() => Utils.isTesting) + val TYPES_FRAMEWORK_ENABLED = + buildConf("spark.sql.types.framework.enabled") + .internal() + .doc("When true, use the Types Framework for supported types (currently TimeType). " + + "The framework centralizes type-specific operations in Ops classes instead of " + + "scattered pattern matching. When false, use legacy scattered implementation.") + .version("4.1.0") + .booleanConf + .createWithDefaultFunction(() => Utils.isTesting) + val EXTENDED_EXPLAIN_PROVIDERS = buildConf("spark.sql.extendedExplainProviders") .doc("A comma-separated list of classes that implement the" + " org.apache.spark.sql.ExtendedExplainGenerator trait. If provided, Spark will print" + @@ -7065,6 +7075,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def geospatialEnabled: Boolean = getConf(GEOSPATIAL_ENABLED) + def typesFrameworkEnabled: Boolean = getConf(TYPES_FRAMEWORK_ENABLED) + def dataSourceV2JoinPushdown: Boolean = getConf(DATA_SOURCE_V2_JOIN_PUSHDOWN) def dynamicPartitionPruningEnabled: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_ENABLED)