From 0ba70df0483c785aea72bc7a2f0e6989393da90b Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 30 Dec 2014 16:14:12 +0800 Subject: [PATCH 01/19] draft version --- .../apache/spark/sql/json/JSONRelation.scala | 13 +- .../apache/spark/sql/parquet/newParquet.scala | 31 ++-- .../org/apache/spark/sql/sources/ddl.scala | 141 ++++++++++++++++-- .../apache/spark/sql/sources/interfaces.scala | 5 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 114 ++++---------- 5 files changed, 187 insertions(+), 117 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index fc70c183437f6..52b79c1ca8c29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -18,31 +18,38 @@ package org.apache.spark.sql.json import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.sources._ private[sql] class DefaultSource extends RelationProvider { /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { + parameters: Map[String, String], + schema: Option[StructType]): BaseRelation = { val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified")) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - JSONRelation(fileName, samplingRatio)(sqlContext) + JSONRelation(fileName, samplingRatio, schema)(sqlContext) } } -private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)( +private[sql] case class JSONRelation( + fileName: String, + samplingRatio: Double, + userSpecifiedSchema: Option[StructType])( @transient val sqlContext: SQLContext) extends TableScan { private def baseRDD = sqlContext.sparkContext.textFile(fileName) override val schema = + userSpecifiedSchema.getOrElse( JsonRDD.inferSchema( baseRDD, samplingRatio, sqlContext.columnNameOfCorruptRecord) + ) override def buildScan() = JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 2e0c6c51c00e5..a237d27794a9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -22,22 +22,21 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job} -import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate - import parquet.hadoop.ParquetInputFormat import parquet.hadoop.util.ContextUtil import org.apache.spark.annotation.DeveloperApi import org.apache.spark.{Partition => SparkPartition, Logging} import org.apache.spark.rdd.{NewHadoopPartition, RDD} - -import org.apache.spark.sql.{SQLConf, Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, StructField, StructType} +import org.apache.spark.sql.catalyst.types.{IntegerType, StructField, StructType} import org.apache.spark.sql.sources._ +import org.apache.spark.sql.{SQLConf, SQLContext} import scala.collection.JavaConversions._ + /** * Allows creation of parquet based tables using the syntax * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option @@ -48,11 +47,12 @@ class DefaultSource extends RelationProvider { /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { + parameters: Map[String, String], + schema: Option[StructType]): BaseRelation = { val path = parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables.")) - ParquetRelation2(path)(sqlContext) + ParquetRelation2(path, schema)(sqlContext) } } @@ -82,7 +82,9 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files: * discovery. */ @DeveloperApi -case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) +case class ParquetRelation2( + path: String, + userSpecifiedSchema: Option[StructType])(@transient val sqlContext: SQLContext) extends CatalystScan with Logging { def sparkContext = sqlContext.sparkContext @@ -133,12 +135,13 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum - val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not deal with attributes. - ParquetTypesConverter.readSchemaFromFile( - partitions.head.files.head.getPath, - Some(sparkContext.hadoopConfiguration), - sqlContext.isParquetBinaryAsString)) - + val dataSchema = userSpecifiedSchema.getOrElse( + StructType.fromAttributes( // TODO: Parquet code should not deal with attributes. + ParquetTypesConverter.readSchemaFromFile( + partitions.head.files.head.getPath, + Some(sparkContext.hadoopConfiguration), + sqlContext.isParquetBinaryAsString)) + ) val dataIncludesKey = partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 8a66ac31f2dfb..69fa64affd961 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -17,16 +17,16 @@ package org.apache.spark.sql.sources +import scala.language.implicitConversions +import scala.util.parsing.combinator.syntactical.StandardTokenParsers +import scala.util.parsing.combinator.{RegexParsers, PackratParsers} + import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.util.Utils - -import scala.language.implicitConversions -import scala.util.parsing.combinator.lexical.StdLexical -import scala.util.parsing.combinator.syntactical.StandardTokenParsers -import scala.util.parsing.combinator.PackratParsers - import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical @@ -49,6 +49,21 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected implicit def asParser(k: Keyword): Parser[String] = lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) + // data types + protected val STRING = Keyword("STRING") + protected val DOUBLE = Keyword("DOUBLE") + protected val BOOLEAN = Keyword("BOOLEAN") + protected val FLOAT = Keyword("FLOAT") + protected val INT = Keyword("INT") + protected val TINYINT = Keyword("TINYINT") + protected val SMALLINT = Keyword("SMALLINT") + protected val BIGINT = Keyword("BIGINT") + protected val BINARY = Keyword("BINARY") + protected val DECIMAL = Keyword("DECIMAL") + protected val DATE = Keyword("DATE") + protected val TIMESTAMP = Keyword("TIMESTAMP") + protected val VARCHAR = Keyword("VARCHAR") + protected val CREATE = Keyword("CREATE") protected val TEMPORARY = Keyword("TEMPORARY") protected val TABLE = Keyword("TABLE") @@ -67,15 +82,30 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val ddl: Parser[LogicalPlan] = createTable /** - * CREATE TEMPORARY TABLE avroTable + * `CREATE TEMPORARY TABLE avroTable * USING org.apache.spark.sql.avro - * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro") + * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` + * or + * `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...) + * USING org.apache.spark.sql.avro + * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` */ protected lazy val createTable: Parser[LogicalPlan] = - CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { + ( CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { case tableName ~ provider ~ opts => - CreateTableUsing(tableName, provider, opts) + CreateTableUsing(tableName, Seq.empty, provider, opts) + } + | + CREATE ~ TEMPORARY ~ TABLE ~> ident + ~ tableCols ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { + case tableName ~ tableColumns ~ provider ~ opts => + CreateTableUsing(tableName, tableColumns, provider, opts) } + ) + + protected lazy val metastoreTypes = new MetastoreTypes + + protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" protected lazy val options: Parser[Map[String, String]] = "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap } @@ -83,10 +113,98 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")} protected lazy val pair: Parser[(String, String)] = ident ~ stringLit ^^ { case k ~ v => (k,v) } + + protected lazy val column: Parser[StructField] = + ident ~ ident ^^ { case name ~ typ => + StructField(name, metastoreTypes.toDataType(typ)) + } +} + +/** + * :: DeveloperApi :: + * Provides a parser for data types. + */ +@DeveloperApi +private[sql] class MetastoreTypes extends RegexParsers { + protected lazy val primitiveType: Parser[DataType] = + "string" ^^^ StringType | + "float" ^^^ FloatType | + "int" ^^^ IntegerType | + "tinyint" ^^^ ByteType | + "smallint" ^^^ ShortType | + "double" ^^^ DoubleType | + "bigint" ^^^ LongType | + "binary" ^^^ BinaryType | + "boolean" ^^^ BooleanType | + fixedDecimalType | // Hive 0.13+ decimal with precision/scale + "decimal" ^^^ DecimalType.Unlimited | // Hive 0.12 decimal with no precision/scale + "date" ^^^ DateType | + "timestamp" ^^^ TimestampType | + "varchar\\((\\d+)\\)".r ^^^ StringType + + protected lazy val fixedDecimalType: Parser[DataType] = + ("decimal" ~> "(" ~> "\\d+".r) ~ ("," ~> "\\d+".r <~ ")") ^^ { + case precision ~ scale => + DecimalType(precision.toInt, scale.toInt) + } + + protected lazy val arrayType: Parser[DataType] = + "array" ~> "<" ~> dataType <~ ">" ^^ { + case tpe => ArrayType(tpe) + } + + protected lazy val mapType: Parser[DataType] = + "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ { + case t1 ~ _ ~ t2 => MapType(t1, t2) + } + + protected lazy val structField: Parser[StructField] = + "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ { + case name ~ _ ~ tpe => StructField(name, tpe, nullable = true) + } + + protected lazy val structType: Parser[DataType] = + "struct" ~> "<" ~> repsep(structField,",") <~ ">" ^^ { + case fields => new StructType(fields) + } + + private[sql] lazy val dataType: Parser[DataType] = + arrayType | + mapType | + structType | + primitiveType + + def toDataType(metastoreType: String): DataType = parseAll(dataType, metastoreType) match { + case Success(result, _) => result + case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType") + } + + def toMetastoreType(dt: DataType): String = dt match { + case ArrayType(elementType, _) => s"array<${toMetastoreType(elementType)}>" + case StructType(fields) => + s"struct<${fields.map(f => s"${f.name}:${toMetastoreType(f.dataType)}").mkString(",")}>" + case MapType(keyType, valueType, _) => + s"map<${toMetastoreType(keyType)},${toMetastoreType(valueType)}>" + case StringType => "string" + case FloatType => "float" + case IntegerType => "int" + case ByteType => "tinyint" + case ShortType => "smallint" + case DoubleType => "double" + case LongType => "bigint" + case BinaryType => "binary" + case BooleanType => "boolean" + case DateType => "date" + case d: DecimalType => "decimal" + case TimestampType => "timestamp" + case NullType => "void" + case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType) + } } private[sql] case class CreateTableUsing( tableName: String, + tableCols: Seq[StructField], provider: String, options: Map[String, String]) extends RunnableCommand { @@ -100,7 +218,8 @@ private[sql] case class CreateTableUsing( } } val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] - val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) + val relation = dataSource.createRelation( + sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 02eff80456dbe..5f9e8a35ef84e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -41,7 +41,10 @@ trait RelationProvider { * Note: the parameters' keywords are case insensitive and this insensitivity is enforced * by the Map that is passed to the function. */ - def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation + def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: Option[StructType]): BaseRelation } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b31a3ec25096b..accdaf591b5ea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,12 +20,7 @@ package org.apache.spark.sql.hive import java.io.IOException import java.util.{List => JList} -import org.apache.spark.sql.execution.SparkPlan - -import scala.util.parsing.combinator.RegexParsers - import org.apache.hadoop.util.ReflectionUtils - import org.apache.hadoop.hive.metastore.TableType import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} @@ -36,7 +31,6 @@ import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.Logging -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{Catalog, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions._ @@ -44,6 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.sources.MetastoreTypes import org.apache.spark.util.Utils /* Implicit conversions */ @@ -386,88 +381,6 @@ private[hive] case class InsertIntoHiveTable( } } -/** - * :: DeveloperApi :: - * Provides conversions between Spark SQL data types and Hive Metastore types. - */ -@DeveloperApi -object HiveMetastoreTypes extends RegexParsers { - protected lazy val primitiveType: Parser[DataType] = - "string" ^^^ StringType | - "float" ^^^ FloatType | - "int" ^^^ IntegerType | - "tinyint" ^^^ ByteType | - "smallint" ^^^ ShortType | - "double" ^^^ DoubleType | - "bigint" ^^^ LongType | - "binary" ^^^ BinaryType | - "boolean" ^^^ BooleanType | - fixedDecimalType | // Hive 0.13+ decimal with precision/scale - "decimal" ^^^ DecimalType.Unlimited | // Hive 0.12 decimal with no precision/scale - "date" ^^^ DateType | - "timestamp" ^^^ TimestampType | - "varchar\\((\\d+)\\)".r ^^^ StringType - - protected lazy val fixedDecimalType: Parser[DataType] = - ("decimal" ~> "(" ~> "\\d+".r) ~ ("," ~> "\\d+".r <~ ")") ^^ { - case precision ~ scale => - DecimalType(precision.toInt, scale.toInt) - } - - protected lazy val arrayType: Parser[DataType] = - "array" ~> "<" ~> dataType <~ ">" ^^ { - case tpe => ArrayType(tpe) - } - - protected lazy val mapType: Parser[DataType] = - "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ { - case t1 ~ _ ~ t2 => MapType(t1, t2) - } - - protected lazy val structField: Parser[StructField] = - "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ { - case name ~ _ ~ tpe => StructField(name, tpe, nullable = true) - } - - protected lazy val structType: Parser[DataType] = - "struct" ~> "<" ~> repsep(structField,",") <~ ">" ^^ { - case fields => new StructType(fields) - } - - protected lazy val dataType: Parser[DataType] = - arrayType | - mapType | - structType | - primitiveType - - def toDataType(metastoreType: String): DataType = parseAll(dataType, metastoreType) match { - case Success(result, _) => result - case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType") - } - - def toMetastoreType(dt: DataType): String = dt match { - case ArrayType(elementType, _) => s"array<${toMetastoreType(elementType)}>" - case StructType(fields) => - s"struct<${fields.map(f => s"${f.name}:${toMetastoreType(f.dataType)}").mkString(",")}>" - case MapType(keyType, valueType, _) => - s"map<${toMetastoreType(keyType)},${toMetastoreType(valueType)}>" - case StringType => "string" - case FloatType => "float" - case IntegerType => "int" - case ByteType => "tinyint" - case ShortType => "smallint" - case DoubleType => "double" - case LongType => "bigint" - case BinaryType => "binary" - case BooleanType => "boolean" - case DateType => "date" - case d: DecimalType => HiveShim.decimalMetastoreString(d) - case TimestampType => "timestamp" - case NullType => "void" - case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType) - } -} - private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) @@ -545,3 +458,28 @@ private[hive] case class MetastoreRelation /** An attribute map for determining the ordinal for non-partition columns. */ val columnOrdinals = AttributeMap(attributes.zipWithIndex) } + + +object HiveMetastoreTypes extends MetastoreTypes { + override def toMetastoreType(dt: DataType): String = dt match { + case ArrayType(elementType, _) => s"array<${toMetastoreType(elementType)}>" + case StructType(fields) => + s"struct<${fields.map(f => s"${f.name}:${toMetastoreType(f.dataType)}").mkString(",")}>" + case MapType(keyType, valueType, _) => + s"map<${toMetastoreType(keyType)},${toMetastoreType(valueType)}>" + case StringType => "string" + case FloatType => "float" + case IntegerType => "int" + case ByteType => "tinyint" + case ShortType => "smallint" + case DoubleType => "double" + case LongType => "bigint" + case BinaryType => "binary" + case BooleanType => "boolean" + case DateType => "date" + case d: DecimalType => HiveShim.decimalMetastoreString(d) + case TimestampType => "timestamp" + case NullType => "void" + case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType) + } +} From 7787ec713dfd398bef28c0319f4df12f009fa128 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 30 Dec 2014 16:29:48 +0800 Subject: [PATCH 02/19] added SchemaRelationProvider --- .../apache/spark/sql/json/JSONRelation.scala | 4 +-- .../apache/spark/sql/parquet/newParquet.scala | 4 +-- .../org/apache/spark/sql/sources/ddl.scala | 3 ++- .../apache/spark/sql/sources/interfaces.scala | 27 ++++++++++++++++++- .../spark/sql/sources/FilteredScanSuite.scala | 5 ++-- .../spark/sql/sources/PrunedScanSuite.scala | 5 ++-- .../spark/sql/sources/TableScanSuite.scala | 5 ++-- 7 files changed, 41 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 52b79c1ca8c29..8f835b8517e61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -21,12 +21,12 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.sources._ -private[sql] class DefaultSource extends RelationProvider { +private[sql] class DefaultSource extends SchemaRelationProvider { /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType]): BaseRelation = { + schema: Option[StructType] = None): BaseRelation = { val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified")) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index a237d27794a9e..6a4a41686eb28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -43,12 +43,12 @@ import scala.collection.JavaConversions._ * required is `path`, which should be the location of a collection of, optionally partitioned, * parquet files. */ -class DefaultSource extends RelationProvider { +class DefaultSource extends SchemaRelationProvider { /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType]): BaseRelation = { + schema: Option[StructType] = None): BaseRelation = { val path = parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables.")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 69fa64affd961..258729c475b6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -217,7 +217,8 @@ private[sql] case class CreateTableUsing( sys.error(s"Failed to load class for data source: $provider") } } - val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] + val dataSource = + clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] val relation = dataSource.createRelation( sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 5f9e8a35ef84e..a23e53e5bf9b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -41,10 +41,35 @@ trait RelationProvider { * Note: the parameters' keywords are case insensitive and this insensitivity is enforced * by the Map that is passed to the function. */ + def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation +} + +/** + * ::DeveloperApi:: + * Implemented by objects that produce relations for a specific kind of data source. When + * Spark SQL is given a DDL operation with a USING clause specified and user defined schema optionally, + * this interface is used to pass in the parameters specified by a user. + * + * Users may specify the fully qualified class name of a given data source. When that class is + * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for + * less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the + * data source 'org.apache.spark.sql.json.DefaultSource' + * + * A new instance of this class with be instantiated each time a DDL call is made. + */ +@DeveloperApi +trait SchemaRelationProvider { + /** + * Returns a new base relation with the given parameters and user defined schema. + * Note: the parameters' keywords are case insensitive and this insensitivity is enforced + * by the Map that is passed to the function. + */ def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType]): BaseRelation + schema: Option[StructType] = None): BaseRelation } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 939b3c0c66de7..7ebcc1227e5cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -21,10 +21,11 @@ import scala.language.existentials import org.apache.spark.sql._ -class FilteredScanSource extends RelationProvider { +class FilteredScanSource extends SchemaRelationProvider { override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { + parameters: Map[String, String], + schema: Option[StructType] = None): BaseRelation = { SimpleFilteredScan(parameters("from").toInt, parameters("to").toInt)(sqlContext) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index fee2e22611cdc..db6684ba22ab8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql.sources import org.apache.spark.sql._ -class PrunedScanSource extends RelationProvider { +class PrunedScanSource extends SchemaRelationProvider { override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { + parameters: Map[String, String], + schema: Option[StructType] = None): BaseRelation = { SimplePrunedScan(parameters("from").toInt, parameters("to").toInt)(sqlContext) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 3cd7b0115d567..63e2648178e22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -21,10 +21,11 @@ import org.apache.spark.sql._ class DefaultSource extends SimpleScanSource -class SimpleScanSource extends RelationProvider { +class SimpleScanSource extends SchemaRelationProvider { override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { + parameters: Map[String, String], + schema: Option[StructType] = None): BaseRelation = { SimpleScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext) } } From 9bf12f89ca829677f308aae34f082513f71c7971 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 30 Dec 2014 17:02:12 +0800 Subject: [PATCH 03/19] adding test case --- .../org/apache/spark/sql/sources/ddl.scala | 15 -- .../apache/spark/sql/sources/interfaces.scala | 2 +- .../org/apache/spark/sql/QueryTest.scala | 2 +- .../spark/sql/sources/NewTableScanSuite.scala | 136 ++++++++++++++++++ 4 files changed, 138 insertions(+), 17 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 258729c475b6c..12c66e24115ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -49,21 +49,6 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected implicit def asParser(k: Keyword): Parser[String] = lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) - // data types - protected val STRING = Keyword("STRING") - protected val DOUBLE = Keyword("DOUBLE") - protected val BOOLEAN = Keyword("BOOLEAN") - protected val FLOAT = Keyword("FLOAT") - protected val INT = Keyword("INT") - protected val TINYINT = Keyword("TINYINT") - protected val SMALLINT = Keyword("SMALLINT") - protected val BIGINT = Keyword("BIGINT") - protected val BINARY = Keyword("BINARY") - protected val DECIMAL = Keyword("DECIMAL") - protected val DATE = Keyword("DATE") - protected val TIMESTAMP = Keyword("TIMESTAMP") - protected val VARCHAR = Keyword("VARCHAR") - protected val CREATE = Keyword("CREATE") protected val TEMPORARY = Keyword("TEMPORARY") protected val TABLE = Keyword("TABLE") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index a23e53e5bf9b2..3f53dbc952c81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.sources import org.apache.spark.annotation.{Experimental, DeveloperApi} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SQLConf, Row, SQLContext, StructType} +import org.apache.spark.sql.{Row, SQLContext, StructType} import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute} /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 3d9f0cbf80fe7..a4ea95c07360e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -70,7 +70,7 @@ class QueryTest extends PlanTest { """.stripMargin) } - if (prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) { + if (prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) { // issues here, sparkAnswer may be GenericRow[] fail(s""" |Results do not match for query: |${rdd.logicalPlan} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala new file mode 100644 index 0000000000000..0a58c979ca7b2 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala @@ -0,0 +1,136 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql._ +import java.sql.{Timestamp, Date} +import org.apache.spark.sql.execution.RDDConversions + +case class PrimaryData( + stringField: String, + intField: Int, + longField: Long, + floatField: Float, + doubleField: Double, + shortField: Short, + byteField: Byte, + booleanField: Boolean, + decimalField: BigDecimal, + date: Date, + timestampField: Timestamp) + +class AllDataTypesScanSource extends SchemaRelationProvider { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: Option[StructType] = None): BaseRelation = { + AllDataTypesScan(parameters("from").toInt, parameters("TO").toInt, schema)(sqlContext) + } +} + +case class AllDataTypesScan( + from: Int, + to: Int, + userSpecifiedSchema: Option[StructType])(@transient val sqlContext: SQLContext) + extends TableScan { + + override def schema = userSpecifiedSchema.get + + override def buildScan() = { + val rdd = sqlContext.sparkContext.parallelize(from to to).map { i => + PrimaryData( + i.toString, + i, + i.toLong, + i.toFloat, + i.toDouble, + i.toShort, + i.toByte, + true, + BigDecimal(i), + new Date(12345), + new Timestamp(12345)) + } + + RDDConversions.productToRowRdd(rdd, schema) + } + +} + +class NewTableScanSuite extends DataSourceTest { + import caseInsensisitiveContext._ + + var records = (1 to 10).map { i => + Row( + i.toString, + i, + i.toLong, + i.toFloat, + i.toDouble, + i.toShort, + i.toByte, + true, + BigDecimal(i), + new Date(12345), + new Timestamp(12345)) + }.toSeq + + before { + sql( + """ + |CREATE TEMPORARY TABLE oneToTen(stringField string, intField int, longField bigint, + |floatField float, doubleField double, shortField smallint, byteField tinyint, + |booleanField boolean, decimalField decimal, dateField date, timestampField timestamp) + |USING org.apache.spark.sql.sources.AllDataTypesScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + } + + sqlTest( + "SELECT * FROM oneToTen", + records) + + sqlTest( + "SELECT stringField FROM oneToTen", + (1 to 10).map(i =>Row(i.toString)).toSeq) + + sqlTest( + "SELECT intField FROM oneToTen WHERE intField < 5", + (1 to 4).map(Row(_)).toSeq) + + sqlTest( + "SELECT longField * 2 FROM oneToTen", + (1 to 10).map(i => Row(i * 2.toLong)).toSeq) + + sqlTest( + """SELECT a.floatField, b.floatField FROM oneToTen a JOIN oneToTen b + |ON a.floatField = b.floatField + 1""".stripMargin, + (2 to 10).map(i => Row(i.toFloat, i - 1.toFloat)).toSeq) + + sqlTest( + "SELECT distinct(a.dateField) FROM oneToTen a", + Some(new Date(12345)).map(Row(_)).toSeq) + + sqlTest( + "SELECT distinct(a.timestampField) FROM oneToTen a", + Some(new Timestamp(12345)).map(Row(_)).toSeq) + +} From 83b6fc331f471205f7f6a72c4c408f84a944108e Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 31 Dec 2014 00:14:31 +0800 Subject: [PATCH 04/19] minor fix --- .../apache/spark/sql/json/JSONRelation.scala | 3 +-- .../org/apache/spark/sql/sources/ddl.scala | 21 ++++++++++++++----- .../apache/spark/sql/sources/interfaces.scala | 4 +--- .../org/apache/spark/sql/QueryTest.scala | 2 +- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 8f835b8517e61..ab013885b7086 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -43,8 +43,7 @@ private[sql] case class JSONRelation( private def baseRDD = sqlContext.sparkContext.textFile(fileName) - override val schema = - userSpecifiedSchema.getOrElse( + override val schema = userSpecifiedSchema.getOrElse( JsonRDD.inferSchema( baseRDD, samplingRatio, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 12c66e24115ad..42f5d81933593 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -100,9 +100,15 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val pair: Parser[(String, String)] = ident ~ stringLit ^^ { case k ~ v => (k,v) } protected lazy val column: Parser[StructField] = - ident ~ ident ^^ { case name ~ typ => + ( ident ~ ident ^^ { case name ~ typ => StructField(name, metastoreTypes.toDataType(typ)) } + | + ident ~ ("decimal" ~ "(" ~> numericLit) ~ ("," ~> numericLit <~ ")") ^^ { + case name ~ precision ~ scale => + StructField(name, DecimalType(precision.toInt, scale.toInt)) + } + ) } /** @@ -121,8 +127,8 @@ private[sql] class MetastoreTypes extends RegexParsers { "bigint" ^^^ LongType | "binary" ^^^ BinaryType | "boolean" ^^^ BooleanType | - fixedDecimalType | // Hive 0.13+ decimal with precision/scale - "decimal" ^^^ DecimalType.Unlimited | // Hive 0.12 decimal with no precision/scale + fixedDecimalType | // decimal with precision/scale + "decimal" ^^^ DecimalType.Unlimited | // decimal with no precision/scale "date" ^^^ DateType | "timestamp" ^^^ TimestampType | "varchar\\((\\d+)\\)".r ^^^ StringType @@ -204,8 +210,13 @@ private[sql] case class CreateTableUsing( } val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] - val relation = dataSource.createRelation( - sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) + val relation = if(tableCols.isEmpty) { + dataSource.createRelation( + sqlContext, new CaseInsensitiveMap(options)) + } else { + dataSource.createRelation( + sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) + } sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 3f53dbc952c81..6ae6dc9395062 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -41,9 +41,7 @@ trait RelationProvider { * Note: the parameters' keywords are case insensitive and this insensitivity is enforced * by the Map that is passed to the function. */ - def createRelation( - sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation + def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index a4ea95c07360e..3d9f0cbf80fe7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -70,7 +70,7 @@ class QueryTest extends PlanTest { """.stripMargin) } - if (prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) { // issues here, sparkAnswer may be GenericRow[] + if (prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) { fail(s""" |Results do not match for query: |${rdd.logicalPlan} From 44eb70cda9049a68d7a3a4a4ca74e5bc41f04991 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 31 Dec 2014 00:30:44 +0800 Subject: [PATCH 05/19] fix decimal parser issue --- sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala | 3 ++- .../scala/org/apache/spark/sql/sources/NewTableScanSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 42f5d81933593..0fb8597ded062 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -50,6 +50,7 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) protected val CREATE = Keyword("CREATE") + protected val DECIMAL = Keyword("DECIMAL") protected val TEMPORARY = Keyword("TEMPORARY") protected val TABLE = Keyword("TABLE") protected val USING = Keyword("USING") @@ -104,7 +105,7 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi StructField(name, metastoreTypes.toDataType(typ)) } | - ident ~ ("decimal" ~ "(" ~> numericLit) ~ ("," ~> numericLit <~ ")") ^^ { + ident ~ (DECIMAL ~ "(" ~> numericLit) ~ ("," ~> numericLit <~ ")") ^^ { case name ~ precision ~ scale => StructField(name, DecimalType(precision.toInt, scale.toInt)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala index 0a58c979ca7b2..c8095b336f8e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala @@ -95,7 +95,7 @@ class NewTableScanSuite extends DataSourceTest { """ |CREATE TEMPORARY TABLE oneToTen(stringField string, intField int, longField bigint, |floatField float, doubleField double, shortField smallint, byteField tinyint, - |booleanField boolean, decimalField decimal, dateField date, timestampField timestamp) + |booleanField boolean, decimalField decimal(10,2), dateField date, timestampField timestamp) |USING org.apache.spark.sql.sources.AllDataTypesScanSource |OPTIONS ( | From '1', From 02a662c4cb3605b3abc7033ad14e3b7400c30964 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 31 Dec 2014 00:38:45 +0800 Subject: [PATCH 06/19] style issue --- .../main/scala/org/apache/spark/sql/sources/interfaces.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 6ae6dc9395062..1ad82ecbb6ee6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -47,8 +47,9 @@ trait RelationProvider { /** * ::DeveloperApi:: * Implemented by objects that produce relations for a specific kind of data source. When - * Spark SQL is given a DDL operation with a USING clause specified and user defined schema optionally, - * this interface is used to pass in the parameters specified by a user. + * Spark SQL is given a DDL operation with + * 1. USING clause: to specify the implemented SchemaRelationProvider + * 2. User defined schema: users can define schema optionally when create table * * Users may specify the fully qualified class name of a given data source. When that class is * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for From 445b57bc8dca3753be5dfeecc964be072933898a Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 31 Dec 2014 07:12:41 +0800 Subject: [PATCH 07/19] address comments --- .../main/scala/org/apache/spark/sql/sources/ddl.scala | 11 +++++++---- .../apache/spark/sql/sources/FilteredScanSuite.scala | 5 ++--- .../apache/spark/sql/sources/PrunedScanSuite.scala | 5 ++--- .../org/apache/spark/sql/sources/TableScanSuite.scala | 5 ++--- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 0fb8597ded062..e5caa1c84c90b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -209,12 +209,15 @@ private[sql] case class CreateTableUsing( sys.error(s"Failed to load class for data source: $provider") } } - val dataSource = - clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] val relation = if(tableCols.isEmpty) { - dataSource.createRelation( - sqlContext, new CaseInsensitiveMap(options)) + val dataSource = + clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] + + dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) } else { + val dataSource = + clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + dataSource.createRelation( sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 7ebcc1227e5cb..939b3c0c66de7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -21,11 +21,10 @@ import scala.language.existentials import org.apache.spark.sql._ -class FilteredScanSource extends SchemaRelationProvider { +class FilteredScanSource extends RelationProvider { override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String], - schema: Option[StructType] = None): BaseRelation = { + parameters: Map[String, String]): BaseRelation = { SimpleFilteredScan(parameters("from").toInt, parameters("to").toInt)(sqlContext) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index db6684ba22ab8..fee2e22611cdc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -19,11 +19,10 @@ package org.apache.spark.sql.sources import org.apache.spark.sql._ -class PrunedScanSource extends SchemaRelationProvider { +class PrunedScanSource extends RelationProvider { override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String], - schema: Option[StructType] = None): BaseRelation = { + parameters: Map[String, String]): BaseRelation = { SimplePrunedScan(parameters("from").toInt, parameters("to").toInt)(sqlContext) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 63e2648178e22..3cd7b0115d567 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -21,11 +21,10 @@ import org.apache.spark.sql._ class DefaultSource extends SimpleScanSource -class SimpleScanSource extends SchemaRelationProvider { +class SimpleScanSource extends RelationProvider { override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String], - schema: Option[StructType] = None): BaseRelation = { + parameters: Map[String, String]): BaseRelation = { SimpleScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext) } } From cf982d2759c61988f1612380a950942cd41e734a Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 31 Dec 2014 08:01:13 +0800 Subject: [PATCH 08/19] fixed test failure --- .../org/apache/spark/sql/sources/ddl.scala | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index e5caa1c84c90b..7fc7bf6b15dfa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -209,17 +209,22 @@ private[sql] case class CreateTableUsing( sys.error(s"Failed to load class for data source: $provider") } } - val relation = if(tableCols.isEmpty) { - val dataSource = - clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] - - dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) - } else { - val dataSource = - clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] - - dataSource.createRelation( - sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) + val relation = clazz.newInstance match { + case dataSource: org.apache.spark.sql.sources.RelationProvider => + dataSource + .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options)) + case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => + if(tableCols.isEmpty) { + dataSource + .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options)) + } else { + dataSource + .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + .createRelation( + sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) + } } sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) From 91ad91bc1b9d6152219dd799cfdf9d8bf60a1c11 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 6 Jan 2015 17:29:36 -0800 Subject: [PATCH 09/19] Parse data types in DDLParser. --- .../org/apache/spark/sql/sources/ddl.scala | 143 ++++++++---------- .../spark/sql/sources/NewTableScanSuite.scala | 6 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 8 +- .../sql/hive/HiveMetastoreCatalogSuite.scala | 5 +- 4 files changed, 74 insertions(+), 88 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 7fc7bf6b15dfa..28d59645c0d5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql.sources import scala.language.implicitConversions import scala.util.parsing.combinator.syntactical.StandardTokenParsers -import scala.util.parsing.combinator.{RegexParsers, PackratParsers} +import scala.util.parsing.combinator.PackratParsers import org.apache.spark.Logging -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution.RunnableCommand @@ -44,18 +43,43 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi } } + def parseType(input: String): DataType = { + phrase(dataType)(new lexical.Scanner(input)) match { + case Success(r, x) => r + case x => + sys.error(s"Unsupported dataType: $x") + } + } + protected case class Keyword(str: String) protected implicit def asParser(k: Keyword): Parser[String] = lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) protected val CREATE = Keyword("CREATE") - protected val DECIMAL = Keyword("DECIMAL") protected val TEMPORARY = Keyword("TEMPORARY") protected val TABLE = Keyword("TABLE") protected val USING = Keyword("USING") protected val OPTIONS = Keyword("OPTIONS") + // Data types. + protected val STRING = Keyword("STRING") + protected val FLOAT = Keyword("FLOAT") + protected val INT = Keyword("INT") + protected val TINYINT = Keyword("TINYINT") + protected val SMALLINT = Keyword("SMALLINT") + protected val DOUBLE = Keyword("DOUBLE") + protected val BIGINT = Keyword("BIGINT") + protected val BINARY = Keyword("BINARY") + protected val BOOLEAN = Keyword("BOOLEAN") + protected val DECIMAL = Keyword("DECIMAL") + protected val DATE = Keyword("DATE") + protected val TIMESTAMP = Keyword("TIMESTAMP") + protected val VARCHAR = Keyword("VARCHAR") + protected val ARRAY = Keyword("ARRAY") + protected val MAP = Keyword("MAP") + protected val STRUCT = Keyword("STRUCT") + // Use reflection to find the reserved words defined in this class. protected val reservedWords = this.getClass @@ -77,20 +101,15 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` */ protected lazy val createTable: Parser[LogicalPlan] = - ( CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { - case tableName ~ provider ~ opts => - CreateTableUsing(tableName, Seq.empty, provider, opts) - } - | + ( CREATE ~ TEMPORARY ~ TABLE ~> ident - ~ tableCols ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { - case tableName ~ tableColumns ~ provider ~ opts => - CreateTableUsing(tableName, tableColumns, provider, opts) + ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { + case tableName ~ columns ~ provider ~ opts => + val tblColumns = if(columns.isEmpty) Seq.empty else columns.get + CreateTableUsing(tableName, tblColumns, provider, opts) } ) - protected lazy val metastoreTypes = new MetastoreTypes - protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" protected lazy val options: Parser[Map[String, String]] = @@ -101,96 +120,62 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val pair: Parser[(String, String)] = ident ~ stringLit ^^ { case k ~ v => (k,v) } protected lazy val column: Parser[StructField] = - ( ident ~ ident ^^ { case name ~ typ => - StructField(name, metastoreTypes.toDataType(typ)) + ident ~ dataType ^^ { case columnName ~ typ => + StructField(cleanIdentifier(columnName), typ) } - | - ident ~ (DECIMAL ~ "(" ~> numericLit) ~ ("," ~> numericLit <~ ")") ^^ { - case name ~ precision ~ scale => - StructField(name, DecimalType(precision.toInt, scale.toInt)) - } - ) -} -/** - * :: DeveloperApi :: - * Provides a parser for data types. - */ -@DeveloperApi -private[sql] class MetastoreTypes extends RegexParsers { protected lazy val primitiveType: Parser[DataType] = - "string" ^^^ StringType | - "float" ^^^ FloatType | - "int" ^^^ IntegerType | - "tinyint" ^^^ ByteType | - "smallint" ^^^ ShortType | - "double" ^^^ DoubleType | - "bigint" ^^^ LongType | - "binary" ^^^ BinaryType | - "boolean" ^^^ BooleanType | - fixedDecimalType | // decimal with precision/scale - "decimal" ^^^ DecimalType.Unlimited | // decimal with no precision/scale - "date" ^^^ DateType | - "timestamp" ^^^ TimestampType | - "varchar\\((\\d+)\\)".r ^^^ StringType + STRING ^^^ StringType | + BINARY ^^^ BinaryType | + BOOLEAN ^^^ BooleanType | + TINYINT ^^^ ByteType | + SMALLINT ^^^ ShortType | + INT ^^^ IntegerType | + BIGINT ^^^ LongType | + FLOAT ^^^ FloatType | + DOUBLE ^^^ DoubleType | + fixedDecimalType | // decimal with precision/scale + DECIMAL ^^^ DecimalType.Unlimited | // decimal with no precision/scale + DATE ^^^ DateType | + TIMESTAMP ^^^ TimestampType | + VARCHAR ~ "(" ~ numericLit ~ ")" ^^^ StringType protected lazy val fixedDecimalType: Parser[DataType] = - ("decimal" ~> "(" ~> "\\d+".r) ~ ("," ~> "\\d+".r <~ ")") ^^ { - case precision ~ scale => - DecimalType(precision.toInt, scale.toInt) + (DECIMAL ~ "(" ~> numericLit) ~ ("," ~> numericLit <~ ")") ^^ { + case precision ~ scale => DecimalType(precision.toInt, scale.toInt) } protected lazy val arrayType: Parser[DataType] = - "array" ~> "<" ~> dataType <~ ">" ^^ { + ARRAY ~> "<" ~> dataType <~ ">" ^^ { case tpe => ArrayType(tpe) } protected lazy val mapType: Parser[DataType] = - "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ { + MAP ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ { case t1 ~ _ ~ t2 => MapType(t1, t2) } protected lazy val structField: Parser[StructField] = - "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ { - case name ~ _ ~ tpe => StructField(name, tpe, nullable = true) + ident ~ ":" ~ dataType ^^ { + case fieldName ~ _ ~ tpe => StructField(cleanIdentifier(fieldName), tpe, nullable = true) } protected lazy val structType: Parser[DataType] = - "struct" ~> "<" ~> repsep(structField,",") <~ ">" ^^ { + STRUCT ~> "<" ~> repsep(structField, ",") <~ ">" ^^ { case fields => new StructType(fields) } private[sql] lazy val dataType: Parser[DataType] = arrayType | - mapType | - structType | - primitiveType - - def toDataType(metastoreType: String): DataType = parseAll(dataType, metastoreType) match { - case Success(result, _) => result - case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType") - } - - def toMetastoreType(dt: DataType): String = dt match { - case ArrayType(elementType, _) => s"array<${toMetastoreType(elementType)}>" - case StructType(fields) => - s"struct<${fields.map(f => s"${f.name}:${toMetastoreType(f.dataType)}").mkString(",")}>" - case MapType(keyType, valueType, _) => - s"map<${toMetastoreType(keyType)},${toMetastoreType(valueType)}>" - case StringType => "string" - case FloatType => "float" - case IntegerType => "int" - case ByteType => "tinyint" - case ShortType => "smallint" - case DoubleType => "double" - case LongType => "bigint" - case BinaryType => "binary" - case BooleanType => "boolean" - case DateType => "date" - case d: DecimalType => "decimal" - case TimestampType => "timestamp" - case NullType => "void" - case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType) + mapType | + structType | + primitiveType + + protected val escapedIdentifier = "`([^`]+)`".r + /** Strips backticks from ident if present */ + protected def cleanIdentifier(ident: String): String = ident match { + case escapedIdentifier(i) => i + case plainIdent => plainIdent } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala index c8095b336f8e8..b860ca302cee1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala @@ -93,9 +93,9 @@ class NewTableScanSuite extends DataSourceTest { before { sql( """ - |CREATE TEMPORARY TABLE oneToTen(stringField string, intField int, longField bigint, - |floatField float, doubleField double, shortField smallint, byteField tinyint, - |booleanField boolean, decimalField decimal(10,2), dateField date, timestampField timestamp) + |CREATE TEMPORARY TABLE oneToTen(stringField stRIng, intField iNt, longField Bigint, + |floatField flOat, doubleField doubLE, shortField smaLlint, byteField tinyint, + |booleanField boolean, decimalField decimal(10,2), dateField dAte, timestampField tiMestamp) |USING org.apache.spark.sql.sources.AllDataTypesScanSource |OPTIONS ( | From '1', diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index accdaf591b5ea..74584c72cebbf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.sources.MetastoreTypes import org.apache.spark.util.Utils /* Implicit conversions */ @@ -438,7 +437,7 @@ private[hive] case class MetastoreRelation implicit class SchemaAttribute(f: FieldSchema) { def toAttribute = AttributeReference( f.getName, - HiveMetastoreTypes.toDataType(f.getType), + sqlContext.ddlParser.parseType(f.getType), // Since data can be dumped in randomly with no validation, everything is nullable. nullable = true )(qualifiers = Seq(alias.getOrElse(tableName))) @@ -459,9 +458,8 @@ private[hive] case class MetastoreRelation val columnOrdinals = AttributeMap(attributes.zipWithIndex) } - -object HiveMetastoreTypes extends MetastoreTypes { - override def toMetastoreType(dt: DataType): String = dt match { +object HiveMetastoreTypes { + def toMetastoreType(dt: DataType): String = dt match { case ArrayType(elementType, _) => s"array<${toMetastoreType(elementType)}>" case StructType(fields) => s"struct<${fields.map(f => s"${f.name}:${toMetastoreType(f.dataType)}").mkString(",")}>" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 86535f8dd4f58..041a36f1295ef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.types.StructType +import org.apache.spark.sql.sources.DDLParser import org.apache.spark.sql.test.ExamplePointUDT class HiveMetastoreCatalogSuite extends FunSuite { @@ -27,7 +28,9 @@ class HiveMetastoreCatalogSuite extends FunSuite { test("struct field should accept underscore in sub-column name") { val metastr = "struct" - val datatype = HiveMetastoreTypes.toDataType(metastr) + val ddlParser = new DDLParser + + val datatype = ddlParser.parseType(metastr) assert(datatype.isInstanceOf[StructType]) } From 8dfbf7a062bee5ef7c0e0c10c30ac411578ce389 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 7 Jan 2015 11:37:07 +0800 Subject: [PATCH 10/19] more tests for complex data type --- .../spark/sql/sources/NewTableScanSuite.scala | 39 ++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala index b860ca302cee1..8272c57c29131 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql._ import java.sql.{Timestamp, Date} import org.apache.spark.sql.execution.RDDConversions -case class PrimaryData( +case class AllDataTypesData( stringField: String, intField: Int, longField: Long, @@ -32,7 +32,10 @@ case class PrimaryData( booleanField: Boolean, decimalField: BigDecimal, date: Date, - timestampField: Timestamp) + timestampField: Timestamp, + arrayFiled: Seq[Int], + mapField: Map[Int, String], + structField: Row) class AllDataTypesScanSource extends SchemaRelationProvider { override def createRelation( @@ -53,7 +56,7 @@ case class AllDataTypesScan( override def buildScan() = { val rdd = sqlContext.sparkContext.parallelize(from to to).map { i => - PrimaryData( + AllDataTypesData( i.toString, i, i.toLong, @@ -64,7 +67,10 @@ case class AllDataTypesScan( true, BigDecimal(i), new Date(12345), - new Timestamp(12345)) + new Timestamp(12345), + Seq(i, i+1), + Map(i -> i.toString), + Row(i, i.toString)) } RDDConversions.productToRowRdd(rdd, schema) @@ -87,7 +93,10 @@ class NewTableScanSuite extends DataSourceTest { true, BigDecimal(i), new Date(12345), - new Timestamp(12345)) + new Timestamp(12345), + Seq(i, i+1), + Map(i -> i.toString), + Row(i, i.toString)) }.toSeq before { @@ -95,7 +104,9 @@ class NewTableScanSuite extends DataSourceTest { """ |CREATE TEMPORARY TABLE oneToTen(stringField stRIng, intField iNt, longField Bigint, |floatField flOat, doubleField doubLE, shortField smaLlint, byteField tinyint, - |booleanField boolean, decimalField decimal(10,2), dateField dAte, timestampField tiMestamp) + |booleanField boolean, decimalField decimal(10,2), dateField dAte, + |timestampField tiMestamp, arrayField Array, mapField MAP, + |structField StRuct) |USING org.apache.spark.sql.sources.AllDataTypesScanSource |OPTIONS ( | From '1', @@ -108,6 +119,10 @@ class NewTableScanSuite extends DataSourceTest { "SELECT * FROM oneToTen", records) + sqlTest( + "SELECT count(*) FROM oneToTen", + 10) + sqlTest( "SELECT stringField FROM oneToTen", (1 to 10).map(i =>Row(i.toString)).toSeq) @@ -133,4 +148,16 @@ class NewTableScanSuite extends DataSourceTest { "SELECT distinct(a.timestampField) FROM oneToTen a", Some(new Timestamp(12345)).map(Row(_)).toSeq) + sqlTest( + "SELECT distinct(arrayField) FROM oneToTen a where intField=1", + Some(Seq(1, 2)).map(Row(_)).toSeq) + + sqlTest( + "SELECT distinct(mapField) FROM oneToTen a where intField=1", + Some(Map(1 -> 1.toString)).map(Row(_)).toSeq) + + sqlTest( + "SELECT distinct(structField) FROM oneToTen a where intField=1", + Some(Row(1, "1")).map(Row(_)).toSeq) + } From d02547f1c2df677fc9b76bd5cf052e8fd700a2b5 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 7 Jan 2015 12:48:29 +0800 Subject: [PATCH 11/19] fix HiveCompatibilitySuite test failure --- .../main/scala/org/apache/spark/sql/sources/ddl.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 28d59645c0d5f..f5b72f3c4ca52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -161,9 +161,12 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi } protected lazy val structType: Parser[DataType] = - STRUCT ~> "<" ~> repsep(structField, ",") <~ ">" ^^ { - case fields => new StructType(fields) - } + (STRUCT ~> "<" ~> repsep(structField, ",") <~ ">" ^^ { + case fields => new StructType(fields) + }) | + (STRUCT ~> "<>" ^^ { + case fields => new StructType(Nil) + }) private[sql] lazy val dataType: Parser[DataType] = arrayType | From b621c8f8885dbcc67dc34c53fbef79346900cb9c Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 7 Jan 2015 16:43:41 +0800 Subject: [PATCH 12/19] minor refactory --- .../apache/spark/sql/sources/commands.scala | 61 +++++++++++++++++++ .../org/apache/spark/sql/sources/ddl.scala | 41 ------------- 2 files changed, 61 insertions(+), 41 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala new file mode 100644 index 0000000000000..b8c10c1eb0d23 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.types.{StructType, StructField} +import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.util.Utils + +private[sql] case class CreateTableUsing( + tableName: String, + tableCols: Seq[StructField], + provider: String, + options: Map[String, String]) extends RunnableCommand { + + def run(sqlContext: SQLContext) = { + val loader = Utils.getContextOrSparkClassLoader + val clazz: Class[_] = try loader.loadClass(provider) catch { + case cnf: java.lang.ClassNotFoundException => + try loader.loadClass(provider + ".DefaultSource") catch { + case cnf: java.lang.ClassNotFoundException => + sys.error(s"Failed to load class for data source: $provider") + } + } + val relation = clazz.newInstance match { + case dataSource: org.apache.spark.sql.sources.RelationProvider => + dataSource + .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options)) + case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => + if(tableCols.isEmpty) { + dataSource + .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options)) + } else { + dataSource + .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + .createRelation( + sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) + } + } + + sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) + Seq.empty + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index f5b72f3c4ca52..1d1af2795958b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -22,10 +22,7 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers import scala.util.parsing.combinator.PackratParsers import org.apache.spark.Logging -import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.util.Utils import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical @@ -182,44 +179,6 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi } } -private[sql] case class CreateTableUsing( - tableName: String, - tableCols: Seq[StructField], - provider: String, - options: Map[String, String]) extends RunnableCommand { - - def run(sqlContext: SQLContext) = { - val loader = Utils.getContextOrSparkClassLoader - val clazz: Class[_] = try loader.loadClass(provider) catch { - case cnf: java.lang.ClassNotFoundException => - try loader.loadClass(provider + ".DefaultSource") catch { - case cnf: java.lang.ClassNotFoundException => - sys.error(s"Failed to load class for data source: $provider") - } - } - val relation = clazz.newInstance match { - case dataSource: org.apache.spark.sql.sources.RelationProvider => - dataSource - .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] - .createRelation(sqlContext, new CaseInsensitiveMap(options)) - case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => - if(tableCols.isEmpty) { - dataSource - .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] - .createRelation(sqlContext, new CaseInsensitiveMap(options)) - } else { - dataSource - .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] - .createRelation( - sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) - } - } - - sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) - Seq.empty - } -} - /** * Builds a map in which keys are case insensitive */ From f1cffe4a9c4f52eafbda04aba191b30cdcfee3a4 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 7 Jan 2015 14:01:19 -0800 Subject: [PATCH 13/19] Revert "minor refactory" This reverts commit b621c8f8885dbcc67dc34c53fbef79346900cb9c. --- .../apache/spark/sql/sources/commands.scala | 61 ------------------- .../org/apache/spark/sql/sources/ddl.scala | 41 +++++++++++++ 2 files changed, 41 insertions(+), 61 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala deleted file mode 100644 index b8c10c1eb0d23..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.types.{StructType, StructField} -import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.util.Utils - -private[sql] case class CreateTableUsing( - tableName: String, - tableCols: Seq[StructField], - provider: String, - options: Map[String, String]) extends RunnableCommand { - - def run(sqlContext: SQLContext) = { - val loader = Utils.getContextOrSparkClassLoader - val clazz: Class[_] = try loader.loadClass(provider) catch { - case cnf: java.lang.ClassNotFoundException => - try loader.loadClass(provider + ".DefaultSource") catch { - case cnf: java.lang.ClassNotFoundException => - sys.error(s"Failed to load class for data source: $provider") - } - } - val relation = clazz.newInstance match { - case dataSource: org.apache.spark.sql.sources.RelationProvider => - dataSource - .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] - .createRelation(sqlContext, new CaseInsensitiveMap(options)) - case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => - if(tableCols.isEmpty) { - dataSource - .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] - .createRelation(sqlContext, new CaseInsensitiveMap(options)) - } else { - dataSource - .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] - .createRelation( - sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) - } - } - - sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) - Seq.empty - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 1d1af2795958b..f5b72f3c4ca52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -22,7 +22,10 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers import scala.util.parsing.combinator.PackratParsers import org.apache.spark.Logging +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.util.Utils import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical @@ -179,6 +182,44 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi } } +private[sql] case class CreateTableUsing( + tableName: String, + tableCols: Seq[StructField], + provider: String, + options: Map[String, String]) extends RunnableCommand { + + def run(sqlContext: SQLContext) = { + val loader = Utils.getContextOrSparkClassLoader + val clazz: Class[_] = try loader.loadClass(provider) catch { + case cnf: java.lang.ClassNotFoundException => + try loader.loadClass(provider + ".DefaultSource") catch { + case cnf: java.lang.ClassNotFoundException => + sys.error(s"Failed to load class for data source: $provider") + } + } + val relation = clazz.newInstance match { + case dataSource: org.apache.spark.sql.sources.RelationProvider => + dataSource + .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options)) + case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => + if(tableCols.isEmpty) { + dataSource + .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options)) + } else { + dataSource + .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + .createRelation( + sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) + } + } + + sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) + Seq.empty + } +} + /** * Builds a map in which keys are case insensitive */ From f5c22b0cd2601531e453682bf99b20983f7fc7ec Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 7 Jan 2015 16:01:18 -0800 Subject: [PATCH 14/19] Refactor code and update test cases. --- .../apache/spark/sql/json/JSONRelation.scala | 2 +- .../apache/spark/sql/parquet/newParquet.scala | 2 +- .../org/apache/spark/sql/sources/ddl.scala | 29 ++- .../apache/spark/sql/sources/interfaces.scala | 2 +- .../spark/sql/sources/NewTableScanSuite.scala | 163 --------------- .../spark/sql/sources/TableScanSuite.scala | 187 ++++++++++++++++++ 6 files changed, 201 insertions(+), 184 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index ab013885b7086..49a9382037a84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -26,7 +26,7 @@ private[sql] class DefaultSource extends SchemaRelationProvider { override def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType] = None): BaseRelation = { + schema: Option[StructType]): BaseRelation = { val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified")) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 6a4a41686eb28..506be8ccde6b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -48,7 +48,7 @@ class DefaultSource extends SchemaRelationProvider { override def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType] = None): BaseRelation = { + schema: Option[StructType]): BaseRelation = { val path = parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables.")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index f5b72f3c4ca52..457cbbb39abd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -64,14 +64,14 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi // Data types. protected val STRING = Keyword("STRING") - protected val FLOAT = Keyword("FLOAT") - protected val INT = Keyword("INT") + protected val BINARY = Keyword("BINARY") + protected val BOOLEAN = Keyword("BOOLEAN") protected val TINYINT = Keyword("TINYINT") protected val SMALLINT = Keyword("SMALLINT") - protected val DOUBLE = Keyword("DOUBLE") + protected val INT = Keyword("INT") protected val BIGINT = Keyword("BIGINT") - protected val BINARY = Keyword("BINARY") - protected val BOOLEAN = Keyword("BOOLEAN") + protected val FLOAT = Keyword("FLOAT") + protected val DOUBLE = Keyword("DOUBLE") protected val DECIMAL = Keyword("DECIMAL") protected val DATE = Keyword("DATE") protected val TIMESTAMP = Keyword("TIMESTAMP") @@ -105,8 +105,8 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { case tableName ~ columns ~ provider ~ opts => - val tblColumns = if(columns.isEmpty) Seq.empty else columns.get - CreateTableUsing(tableName, tblColumns, provider, opts) + val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) + CreateTableUsing(tableName, userSpecifiedSchema, provider, opts) } ) @@ -184,7 +184,7 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi private[sql] case class CreateTableUsing( tableName: String, - tableCols: Seq[StructField], + userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String]) extends RunnableCommand { @@ -203,16 +203,9 @@ private[sql] case class CreateTableUsing( .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] .createRelation(sqlContext, new CaseInsensitiveMap(options)) case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => - if(tableCols.isEmpty) { - dataSource - .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] - .createRelation(sqlContext, new CaseInsensitiveMap(options)) - } else { - dataSource - .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] - .createRelation( - sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) - } + dataSource + .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options), userSpecifiedSchema) } sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 1ad82ecbb6ee6..97157c868cc90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -68,7 +68,7 @@ trait SchemaRelationProvider { def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType] = None): BaseRelation + schema: Option[StructType]): BaseRelation } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala deleted file mode 100644 index 8272c57c29131..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.sources - -import org.apache.spark.sql._ -import java.sql.{Timestamp, Date} -import org.apache.spark.sql.execution.RDDConversions - -case class AllDataTypesData( - stringField: String, - intField: Int, - longField: Long, - floatField: Float, - doubleField: Double, - shortField: Short, - byteField: Byte, - booleanField: Boolean, - decimalField: BigDecimal, - date: Date, - timestampField: Timestamp, - arrayFiled: Seq[Int], - mapField: Map[Int, String], - structField: Row) - -class AllDataTypesScanSource extends SchemaRelationProvider { - override def createRelation( - sqlContext: SQLContext, - parameters: Map[String, String], - schema: Option[StructType] = None): BaseRelation = { - AllDataTypesScan(parameters("from").toInt, parameters("TO").toInt, schema)(sqlContext) - } -} - -case class AllDataTypesScan( - from: Int, - to: Int, - userSpecifiedSchema: Option[StructType])(@transient val sqlContext: SQLContext) - extends TableScan { - - override def schema = userSpecifiedSchema.get - - override def buildScan() = { - val rdd = sqlContext.sparkContext.parallelize(from to to).map { i => - AllDataTypesData( - i.toString, - i, - i.toLong, - i.toFloat, - i.toDouble, - i.toShort, - i.toByte, - true, - BigDecimal(i), - new Date(12345), - new Timestamp(12345), - Seq(i, i+1), - Map(i -> i.toString), - Row(i, i.toString)) - } - - RDDConversions.productToRowRdd(rdd, schema) - } - -} - -class NewTableScanSuite extends DataSourceTest { - import caseInsensisitiveContext._ - - var records = (1 to 10).map { i => - Row( - i.toString, - i, - i.toLong, - i.toFloat, - i.toDouble, - i.toShort, - i.toByte, - true, - BigDecimal(i), - new Date(12345), - new Timestamp(12345), - Seq(i, i+1), - Map(i -> i.toString), - Row(i, i.toString)) - }.toSeq - - before { - sql( - """ - |CREATE TEMPORARY TABLE oneToTen(stringField stRIng, intField iNt, longField Bigint, - |floatField flOat, doubleField doubLE, shortField smaLlint, byteField tinyint, - |booleanField boolean, decimalField decimal(10,2), dateField dAte, - |timestampField tiMestamp, arrayField Array, mapField MAP, - |structField StRuct) - |USING org.apache.spark.sql.sources.AllDataTypesScanSource - |OPTIONS ( - | From '1', - | To '10' - |) - """.stripMargin) - } - - sqlTest( - "SELECT * FROM oneToTen", - records) - - sqlTest( - "SELECT count(*) FROM oneToTen", - 10) - - sqlTest( - "SELECT stringField FROM oneToTen", - (1 to 10).map(i =>Row(i.toString)).toSeq) - - sqlTest( - "SELECT intField FROM oneToTen WHERE intField < 5", - (1 to 4).map(Row(_)).toSeq) - - sqlTest( - "SELECT longField * 2 FROM oneToTen", - (1 to 10).map(i => Row(i * 2.toLong)).toSeq) - - sqlTest( - """SELECT a.floatField, b.floatField FROM oneToTen a JOIN oneToTen b - |ON a.floatField = b.floatField + 1""".stripMargin, - (2 to 10).map(i => Row(i.toFloat, i - 1.toFloat)).toSeq) - - sqlTest( - "SELECT distinct(a.dateField) FROM oneToTen a", - Some(new Date(12345)).map(Row(_)).toSeq) - - sqlTest( - "SELECT distinct(a.timestampField) FROM oneToTen a", - Some(new Timestamp(12345)).map(Row(_)).toSeq) - - sqlTest( - "SELECT distinct(arrayField) FROM oneToTen a where intField=1", - Some(Seq(1, 2)).map(Row(_)).toSeq) - - sqlTest( - "SELECT distinct(mapField) FROM oneToTen a where intField=1", - Some(Map(1 -> 1.toString)).map(Row(_)).toSeq) - - sqlTest( - "SELECT distinct(structField) FROM oneToTen a where intField=1", - Some(Row(1, "1")).map(Row(_)).toSeq) - -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 3cd7b0115d567..26191a8a5c769 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.sources +import java.sql.{Timestamp, Date} + import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.types.DecimalType class DefaultSource extends SimpleScanSource @@ -38,9 +41,77 @@ case class SimpleScan(from: Int, to: Int)(@transient val sqlContext: SQLContext) override def buildScan() = sqlContext.sparkContext.parallelize(from to to).map(Row(_)) } +class AllDataTypesScanSource extends SchemaRelationProvider { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: Option[StructType]): BaseRelation = { + AllDataTypesScan(parameters("from").toInt, parameters("TO").toInt, schema)(sqlContext) + } +} + +case class AllDataTypesScan( + from: Int, + to: Int, + userSpecifiedSchema: Option[StructType])(@transient val sqlContext: SQLContext) + extends TableScan { + + override def schema = userSpecifiedSchema.get + + override def buildScan() = { + sqlContext.sparkContext.parallelize(from to to).map { i => + Row( + s"str_$i", + s"str_$i".getBytes(), + i % 2 == 0, + i.toByte, + i.toShort, + i, + i.toLong, + i.toFloat, + i.toDouble, + BigDecimal(i), + BigDecimal(i), + new Date(10000 + i), + new Timestamp(20000 + i), + s"varchar_$i", + Seq(i, i + 1), + Seq(Map(s"str_$i" -> Row(i.toLong))), + Map(i -> i.toString), + Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)), + Row(i, i.toString), + Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(new Date(30000 + i))))) + } + } +} + class TableScanSuite extends DataSourceTest { import caseInsensisitiveContext._ + var tableWithSchemaExpected = (1 to 10).map { i => + Row( + s"str_$i", + s"str_$i", + i % 2 == 0, + i.toByte, + i.toShort, + i, + i.toLong, + i.toFloat, + i.toDouble, + BigDecimal(i), + BigDecimal(i), + new Date(10000 + i), + new Timestamp(20000 + i), + s"varchar_$i", + Seq(i, i + 1), + Seq(Map(s"str_$i" -> Row(i.toLong))), + Map(i -> i.toString), + Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)), + Row(i, i.toString), + Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(new Date(30000 + i))))) + }.toSeq + before { sql( """ @@ -51,6 +122,37 @@ class TableScanSuite extends DataSourceTest { | To '10' |) """.stripMargin) + + sql( + """ + |CREATE TEMPORARY TABLE tableWithSchema ( + |stringField stRIng, + |binaryField binary, + |booleanField boolean, + |byteField tinyint, + |shortField smaLlint, + |intField iNt, + |longField Bigint, + |floatField flOat, + |doubleField doubLE, + |decimalField1 decimal, + |decimalField2 decimal(9,2), + |dateField dAte, + |timestampField tiMestamp, + |varcharField varchaR(12), + |arrayFieldSimple Array, + |arrayFieldComplex Array>>, + |mapFieldSimple MAP, + |mapFieldComplex Map, Struct>, + |structFieldSimple StRuct, + |structFieldComplex StRuct, Value:struct>> + |) + |USING org.apache.spark.sql.sources.AllDataTypesScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) } sqlTest( @@ -73,6 +175,91 @@ class TableScanSuite extends DataSourceTest { "SELECT a.i, b.i FROM oneToTen a JOIN oneToTen b ON a.i = b.i + 1", (2 to 10).map(i => Row(i, i - 1)).toSeq) + test("Schema and all fields") { + val expectedSchema = StructType( + StructField("stringField", StringType, true) :: + StructField("binaryField", BinaryType, true) :: + StructField("booleanField", BooleanType, true) :: + StructField("byteField", ByteType, true) :: + StructField("shortField", ShortType, true) :: + StructField("intField", IntegerType, true) :: + StructField("longField", LongType, true) :: + StructField("floatField", FloatType, true) :: + StructField("doubleField", DoubleType, true) :: + StructField("decimalField1", DecimalType.Unlimited, true) :: + StructField("decimalField2", DecimalType(9, 2), true) :: + StructField("dateField", DateType, true) :: + StructField("timestampField", TimestampType, true) :: + StructField("varcharField", StringType, true) :: + StructField("arrayFieldSimple", ArrayType(IntegerType), true) :: + StructField("arrayFieldComplex", + ArrayType( + MapType(StringType, StructType(StructField("key", LongType, true) :: Nil))), true) :: + StructField("mapFieldSimple", MapType(IntegerType, StringType), true) :: + StructField("mapFieldComplex", + MapType( + MapType(StringType, FloatType), + StructType(StructField("key", LongType, true) :: Nil)), true) :: + StructField("structFieldSimple", + StructType( + StructField("key", IntegerType, true) :: + StructField("Value", StringType, true) :: Nil), true) :: + StructField("structFieldComplex", + StructType( + StructField("key", ArrayType(StringType), true) :: + StructField("Value", + StructType( + StructField("value", ArrayType(DateType), true) :: Nil), true) :: Nil), true) :: Nil + ) + + assert(expectedSchema == table("tableWithSchema").schema) + + checkAnswer( + sql( + """SELECT + | stringField, + | cast(binaryField as string), + | booleanField, + | byteField, + | shortField, + | intField, + | longField, + | floatField, + | doubleField, + | decimalField1, + | decimalField2, + | dateField, + | timestampField, + | varcharField, + | arrayFieldSimple, + | arrayFieldComplex, + | mapFieldSimple, + | mapFieldComplex, + | structFieldSimple, + | structFieldComplex FROM tableWithSchema""".stripMargin), + tableWithSchemaExpected + ) + } + + sqlTest( + "SELECT count(*) FROM tableWithSchema", + 10) + + sqlTest( + "SELECT stringField FROM tableWithSchema", + (1 to 10).map(i => Row(s"str_$i")).toSeq) + + sqlTest( + "SELECT intField FROM tableWithSchema WHERE intField < 5", + (1 to 4).map(Row(_)).toSeq) + + sqlTest( + "SELECT longField * 2 FROM tableWithSchema", + (1 to 10).map(i => Row(i * 2.toLong)).toSeq) + + sqlTest( + "SELECT structFieldSimple.key, arrayFieldSimple[1] FROM tableWithSchema a where intField=1", + Seq(Seq(1, 2))) test("Caching") { // Cached Query Execution From 50a03b04347e87655f637e3c29e50966c118e2c9 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 8 Jan 2015 13:18:31 -0800 Subject: [PATCH 15/19] Use JsonRDD.nullTypeToStringType to convert NullType to StringType. --- .../scala/org/apache/spark/sql/json/JSONRelation.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 49a9382037a84..47da6f5e5237b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -44,11 +44,11 @@ private[sql] case class JSONRelation( private def baseRDD = sqlContext.sparkContext.textFile(fileName) override val schema = userSpecifiedSchema.getOrElse( - JsonRDD.inferSchema( - baseRDD, - samplingRatio, - sqlContext.columnNameOfCorruptRecord) - ) + JsonRDD.nullTypeToStringType( + JsonRDD.inferSchema( + baseRDD, + samplingRatio, + sqlContext.columnNameOfCorruptRecord))) override def buildScan() = JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord) From baf79b5dd3c3b946728042beaeb49cd301b33cdf Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 8 Jan 2015 13:25:16 -0800 Subject: [PATCH 16/19] Test special characters quoted by backticks. --- .../spark/sql/sources/TableScanSuite.scala | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 26191a8a5c769..a0e0172a4a548 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -72,7 +72,7 @@ case class AllDataTypesScan( i.toDouble, BigDecimal(i), BigDecimal(i), - new Date(10000 + i), + new Date((i + 1) * 8640000), new Timestamp(20000 + i), s"varchar_$i", Seq(i, i + 1), @@ -80,7 +80,7 @@ case class AllDataTypesScan( Map(i -> i.toString), Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)), Row(i, i.toString), - Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(new Date(30000 + i))))) + Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(new Date((i + 2) * 8640000))))) } } } @@ -101,7 +101,7 @@ class TableScanSuite extends DataSourceTest { i.toDouble, BigDecimal(i), BigDecimal(i), - new Date(10000 + i), + new Date((i + 1) * 8640000), new Timestamp(20000 + i), s"varchar_$i", Seq(i, i + 1), @@ -109,7 +109,7 @@ class TableScanSuite extends DataSourceTest { Map(i -> i.toString), Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)), Row(i, i.toString), - Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(new Date(30000 + i))))) + Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(new Date((i + 2) * 8640000))))) }.toSeq before { @@ -126,13 +126,13 @@ class TableScanSuite extends DataSourceTest { sql( """ |CREATE TEMPORARY TABLE tableWithSchema ( - |stringField stRIng, + |`string$%Field` stRIng, |binaryField binary, - |booleanField boolean, - |byteField tinyint, + |`booleanField` boolean, + |ByteField tinyint, |shortField smaLlint, - |intField iNt, - |longField Bigint, + |int_Field iNt, + |`longField_:,<>=+/~^` Bigint, |floatField flOat, |doubleField doubLE, |decimalField1 decimal, @@ -145,7 +145,7 @@ class TableScanSuite extends DataSourceTest { |mapFieldSimple MAP, |mapFieldComplex Map, Struct>, |structFieldSimple StRuct, - |structFieldComplex StRuct, Value:struct>> + |structFieldComplex StRuct, Value:struct<`value_(2)`:Array>> |) |USING org.apache.spark.sql.sources.AllDataTypesScanSource |OPTIONS ( @@ -177,13 +177,13 @@ class TableScanSuite extends DataSourceTest { test("Schema and all fields") { val expectedSchema = StructType( - StructField("stringField", StringType, true) :: + StructField("string$%Field", StringType, true) :: StructField("binaryField", BinaryType, true) :: StructField("booleanField", BooleanType, true) :: - StructField("byteField", ByteType, true) :: + StructField("ByteField", ByteType, true) :: StructField("shortField", ShortType, true) :: - StructField("intField", IntegerType, true) :: - StructField("longField", LongType, true) :: + StructField("int_Field", IntegerType, true) :: + StructField("longField_:,<>=+/~^", LongType, true) :: StructField("floatField", FloatType, true) :: StructField("doubleField", DoubleType, true) :: StructField("decimalField1", DecimalType.Unlimited, true) :: @@ -209,7 +209,8 @@ class TableScanSuite extends DataSourceTest { StructField("key", ArrayType(StringType), true) :: StructField("Value", StructType( - StructField("value", ArrayType(DateType), true) :: Nil), true) :: Nil), true) :: Nil + StructField("value_(2)", ArrayType(DateType), true) :: Nil), true) :: Nil), true) :: + Nil ) assert(expectedSchema == table("tableWithSchema").schema) @@ -217,13 +218,13 @@ class TableScanSuite extends DataSourceTest { checkAnswer( sql( """SELECT - | stringField, + | `string$%Field`, | cast(binaryField as string), | booleanField, | byteField, | shortField, - | intField, - | longField, + | int_Field, + | `longField_:,<>=+/~^`, | floatField, | doubleField, | decimalField1, @@ -246,21 +247,25 @@ class TableScanSuite extends DataSourceTest { 10) sqlTest( - "SELECT stringField FROM tableWithSchema", + "SELECT `string$%Field` FROM tableWithSchema", (1 to 10).map(i => Row(s"str_$i")).toSeq) sqlTest( - "SELECT intField FROM tableWithSchema WHERE intField < 5", + "SELECT int_Field FROM tableWithSchema WHERE int_Field < 5", (1 to 4).map(Row(_)).toSeq) sqlTest( - "SELECT longField * 2 FROM tableWithSchema", + "SELECT `longField_:,<>=+/~^` * 2 FROM tableWithSchema", (1 to 10).map(i => Row(i * 2.toLong)).toSeq) sqlTest( - "SELECT structFieldSimple.key, arrayFieldSimple[1] FROM tableWithSchema a where intField=1", + "SELECT structFieldSimple.key, arrayFieldSimple[1] FROM tableWithSchema a where int_Field=1", Seq(Seq(1, 2))) + sqlTest( + "SELECT structFieldComplex.Value.`value_(2)` FROM tableWithSchema", + (1 to 10).map(i => Row(Seq(new Date((i + 2) * 8640000)))).toSeq) + test("Caching") { // Cached Query Execution cacheTable("oneToTen") From a852b100b5fc6ddd6a19271f01c8df12c00553a6 Mon Sep 17 00:00:00 2001 From: scwf Date: Fri, 9 Jan 2015 10:19:00 +0800 Subject: [PATCH 17/19] remove cleanIdentifier --- .../main/scala/org/apache/spark/sql/sources/ddl.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 457cbbb39abd6..991e8d888c110 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -121,7 +121,7 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val column: Parser[StructField] = ident ~ dataType ^^ { case columnName ~ typ => - StructField(cleanIdentifier(columnName), typ) + StructField(columnName, typ) } protected lazy val primitiveType: Parser[DataType] = @@ -157,7 +157,7 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val structField: Parser[StructField] = ident ~ ":" ~ dataType ^^ { - case fieldName ~ _ ~ tpe => StructField(cleanIdentifier(fieldName), tpe, nullable = true) + case fieldName ~ _ ~ tpe => StructField(fieldName, tpe, nullable = true) } protected lazy val structType: Parser[DataType] = @@ -173,13 +173,6 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi mapType | structType | primitiveType - - protected val escapedIdentifier = "`([^`]+)`".r - /** Strips backticks from ident if present */ - protected def cleanIdentifier(ident: String): String = ident match { - case escapedIdentifier(i) => i - case plainIdent => plainIdent - } } private[sql] case class CreateTableUsing( From 65e9c73958f7e28f55ae40c0c1136a5c28e2a66b Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 9 Jan 2015 15:52:00 -0800 Subject: [PATCH 18/19] Revert all changes since applying a given schema has not been testd. --- .../apache/spark/sql/parquet/newParquet.scala | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 506be8ccde6b3..2e0c6c51c00e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -22,37 +22,37 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job} +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate + import parquet.hadoop.ParquetInputFormat import parquet.hadoop.util.ContextUtil import org.apache.spark.annotation.DeveloperApi import org.apache.spark.{Partition => SparkPartition, Logging} import org.apache.spark.rdd.{NewHadoopPartition, RDD} -import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate + +import org.apache.spark.sql.{SQLConf, Row, SQLContext} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, StructField, StructType} import org.apache.spark.sql.sources._ -import org.apache.spark.sql.{SQLConf, SQLContext} import scala.collection.JavaConversions._ - /** * Allows creation of parquet based tables using the syntax * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option * required is `path`, which should be the location of a collection of, optionally partitioned, * parquet files. */ -class DefaultSource extends SchemaRelationProvider { +class DefaultSource extends RelationProvider { /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String], - schema: Option[StructType]): BaseRelation = { + parameters: Map[String, String]): BaseRelation = { val path = parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables.")) - ParquetRelation2(path, schema)(sqlContext) + ParquetRelation2(path)(sqlContext) } } @@ -82,9 +82,7 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files: * discovery. */ @DeveloperApi -case class ParquetRelation2( - path: String, - userSpecifiedSchema: Option[StructType])(@transient val sqlContext: SQLContext) +case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) extends CatalystScan with Logging { def sparkContext = sqlContext.sparkContext @@ -135,13 +133,12 @@ case class ParquetRelation2( override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum - val dataSchema = userSpecifiedSchema.getOrElse( - StructType.fromAttributes( // TODO: Parquet code should not deal with attributes. - ParquetTypesConverter.readSchemaFromFile( - partitions.head.files.head.getPath, - Some(sparkContext.hadoopConfiguration), - sqlContext.isParquetBinaryAsString)) - ) + val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not deal with attributes. + ParquetTypesConverter.readSchemaFromFile( + partitions.head.files.head.getPath, + Some(sparkContext.hadoopConfiguration), + sqlContext.isParquetBinaryAsString)) + val dataIncludesKey = partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true) From 38f634e978e73faac5173fb9fbb22ed48c8c7f3f Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 9 Jan 2015 15:57:13 -0800 Subject: [PATCH 19/19] Remove Option from createRelation. --- .../apache/spark/sql/json/JSONRelation.scala | 19 +++++++++--- .../org/apache/spark/sql/sources/ddl.scala | 31 +++++++++++++------ .../apache/spark/sql/sources/interfaces.scala | 2 +- .../spark/sql/sources/TableScanSuite.scala | 6 ++-- 4 files changed, 41 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 47da6f5e5237b..a9a6696cb15e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -21,16 +21,27 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.sources._ -private[sql] class DefaultSource extends SchemaRelationProvider { - /** Returns a new base relation with the given parameters. */ +private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider { + + /** Returns a new base relation with the parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified")) + val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) + + JSONRelation(fileName, samplingRatio, None)(sqlContext) + } + + /** Returns a new base relation with the given schema and parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType]): BaseRelation = { + schema: StructType): BaseRelation = { val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified")) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - JSONRelation(fileName, samplingRatio, schema)(sqlContext) + JSONRelation(fileName, samplingRatio, Some(schema))(sqlContext) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 991e8d888c110..7f0fd73aa721c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -190,15 +190,28 @@ private[sql] case class CreateTableUsing( sys.error(s"Failed to load class for data source: $provider") } } - val relation = clazz.newInstance match { - case dataSource: org.apache.spark.sql.sources.RelationProvider => - dataSource - .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] - .createRelation(sqlContext, new CaseInsensitiveMap(options)) - case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => - dataSource - .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] - .createRelation(sqlContext, new CaseInsensitiveMap(options), userSpecifiedSchema) + + val relation = userSpecifiedSchema match { + case Some(schema: StructType) => { + clazz.newInstance match { + case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => + dataSource + .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options), schema) + case _ => + sys.error(s"${clazz.getCanonicalName} should extend SchemaRelationProvider.") + } + } + case None => { + clazz.newInstance match { + case dataSource: org.apache.spark.sql.sources.RelationProvider => + dataSource + .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options)) + case _ => + sys.error(s"${clazz.getCanonicalName} should extend RelationProvider.") + } + } } sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 97157c868cc90..990f7e0e74bcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -68,7 +68,7 @@ trait SchemaRelationProvider { def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType]): BaseRelation + schema: StructType): BaseRelation } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index a0e0172a4a548..605190f5ae6a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -45,7 +45,7 @@ class AllDataTypesScanSource extends SchemaRelationProvider { override def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType]): BaseRelation = { + schema: StructType): BaseRelation = { AllDataTypesScan(parameters("from").toInt, parameters("TO").toInt, schema)(sqlContext) } } @@ -53,10 +53,10 @@ class AllDataTypesScanSource extends SchemaRelationProvider { case class AllDataTypesScan( from: Int, to: Int, - userSpecifiedSchema: Option[StructType])(@transient val sqlContext: SQLContext) + userSpecifiedSchema: StructType)(@transient val sqlContext: SQLContext) extends TableScan { - override def schema = userSpecifiedSchema.get + override def schema = userSpecifiedSchema override def buildScan() = { sqlContext.sparkContext.parallelize(from to to).map { i =>