From 5687a3b5527817c809244305468bfe4968bedcec Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 28 May 2016 05:03:06 +0000 Subject: [PATCH 01/13] Try to push down filter at RowGroups level for parquet reader. --- .../expressions/namedExpressions.scala | 8 +++ .../sql/catalyst/expressions/package.scala | 2 +- .../datasources/FileSourceStrategy.scala | 9 ++- .../parquet/ParquetFileFormat.scala | 61 ++----------------- 4 files changed, 22 insertions(+), 58 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 306a99d5a37bf..c06a1ea356093 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -292,6 +292,14 @@ case class AttributeReference( } } + def withMetadata(newMetadata: Metadata): AttributeReference = { + if (metadata == newMetadata) { + this + } else { + AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated) + } + } + override protected final def otherCopyArgs: Seq[AnyRef] = { exprId :: qualifier :: isGenerated :: Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index 23baa6f7837fb..08d3e5782c001 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -89,7 +89,7 @@ package object expressions { implicit class AttributeSeq(attrs: Seq[Attribute]) { /** Creates a StructType with a schema matching this `Seq[Attribute]`. */ def toStructType: StructType = { - StructType(attrs.map(a => StructField(a.name, a.dataType, a.nullable))) + StructType(attrs.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 350508c1d9f4c..6b7205e3146d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -84,7 +84,14 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") val dataColumns = - l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver) + l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver).map { c => + files.dataSchema.find(_.name == c.name).map { f => + c match { + case a: AttributeReference => a.withMetadata(f.metadata) + case _ => c + } + }.getOrElse(c) + } // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index b47d41e1661f0..2bccf75edf4f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -344,6 +344,11 @@ private[sql] class ParquetFileFormat val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + pushed.foreach { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _) + } val parquetReader = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) @@ -578,62 +583,6 @@ private[sql] object ParquetFileFormat extends Logging { } } - /** This closure sets various Parquet configurations at both driver side and executor side. */ - private[parquet] def initializeLocalJobFunc( - requiredColumns: Array[String], - filters: Array[Filter], - dataSchema: StructType, - parquetBlockSize: Long, - useMetadataCache: Boolean, - parquetFilterPushDown: Boolean, - assumeBinaryIsString: Boolean, - assumeInt96IsTimestamp: Boolean)(job: Job): Unit = { - val conf = job.getConfiguration - conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) - - // Try to push down filters when filter push-down is enabled. - if (parquetFilterPushDown) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(ParquetFilters.createFilter(dataSchema, _)) - .reduceOption(FilterApi.and) - .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) - } - - conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { - val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) - CatalystSchemaConverter.checkFieldNames(requestedSchema).json - }) - - conf.set( - CatalystWriteSupport.SPARK_ROW_SCHEMA, - CatalystSchemaConverter.checkFieldNames(dataSchema).json) - - // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata - conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) - - // Sets flags for `CatalystSchemaConverter` - conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString) - conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp) - - overrideMinSplitSize(parquetBlockSize, conf) - } - - /** This closure sets input paths at the driver side. */ - private[parquet] def initializeDriverSideJobFunc( - inputFiles: Array[FileStatus], - parquetBlockSize: Long)(job: Job): Unit = { - // We side the input paths at the driver side. - logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}") - if (inputFiles.nonEmpty) { - FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*) - } - - overrideMinSplitSize(parquetBlockSize, job.getConfiguration) - } - private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { From a8bae96f9da8476de42dde1c7251a67136f3a25d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 3 Aug 2016 14:32:17 +0800 Subject: [PATCH 02/13] Add regression test. --- .../SpecificParquetRecordReaderBase.java | 13 +++++ .../parquet/ParquetFileFormat.scala | 27 +--------- .../parquet/ParquetFilterSuite.scala | 52 +++++++++++++++++++ 3 files changed, 67 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index d823275d857b4..1167955dd1a23 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -81,6 +81,12 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader minSplitSize) { - val message = - s"Parquet's block size (row group size) is larger than " + - s"mapred.min.split.size/mapreduce.input.fileinputformat.split.minsize. Setting " + - s"mapred.min.split.size and mapreduce.input.fileinputformat.split.minsize to " + - s"$parquetBlockSize." - logDebug(message) - conf.set("mapred.min.split.size", parquetBlockSize.toString) - conf.set("mapreduce.input.fileinputformat.split.minsize", parquetBlockSize.toString) - } - } - private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 84fdcfea3c8f3..37ee195f3ff74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -17,11 +17,20 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.io.File import java.nio.charset.StandardCharsets +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.JobID +import org.apache.hadoop.mapreduce.TaskAttemptID +import org.apache.hadoop.mapreduce.TaskID +import org.apache.hadoop.mapreduce.TaskType +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators} import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate.Operators.{Column => _, _} +import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -557,4 +566,47 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assert(df.filter("_1 IS NOT NULL").count() === 4) } } + + test("Fiters should be pushed down for vectorized Parquet reader at row group level") { + import testImplicits._ + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table" + (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path) + + val requiredSchema = StructType(Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", IntegerType, nullable = false) + )) + + val hadoopConf = new Configuration() + hadoopConf.setInt("parquet.block.size", 100) + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set( + ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + ParquetSchemaConverter.checkFieldNames(requiredSchema).json) + + val filters = ParquetFilters.createFilter(requiredSchema, sources.LessThan("a", 100)) + assert(filters.isDefined) + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(hadoopConf, attemptId) + + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, filters.get) + + new File(path).listFiles().filter(f => f.getPath().endsWith(".parquet")).map { f => + val file = new File(f.getPath()) + val split = new org.apache.parquet.hadoop.ParquetInputSplit( + new Path(f.getPath), 0L, file.length(), file.length(), Array.empty, null) + + val vectorizedReader = new VectorizedParquetRecordReader() + vectorizedReader.initialize(split, hadoopAttemptContext) + assert(vectorizedReader.getRowGroupCount == 0) + } + } + } + } + } } From 50095a575de11e5f3f8918148abc7c339c2f0728 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 3 Aug 2016 15:09:00 +0800 Subject: [PATCH 03/13] Don't need two SQLConf settings. --- .../parquet/ParquetFilterSuite.scala | 70 +++++++++---------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index fad11c87ae48f..ffa08304fb58c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -539,43 +539,39 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex test("Fiters should be pushed down for vectorized Parquet reader at row group level") { import testImplicits._ - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/table" - (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path) - - val requiredSchema = StructType(Seq( - StructField("a", IntegerType, nullable = false), - StructField("b", IntegerType, nullable = false) - )) - - val hadoopConf = new Configuration() - hadoopConf.setInt("parquet.block.size", 100) - hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) - hadoopConf.set( - ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, - ParquetSchemaConverter.checkFieldNames(requiredSchema).json) - - val filters = ParquetFilters.createFilter(requiredSchema, sources.LessThan("a", 100)) - assert(filters.isDefined) - - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = - new TaskAttemptContextImpl(hadoopConf, attemptId) - - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, filters.get) - - new File(path).listFiles().filter(f => f.getPath().endsWith(".parquet")).map { f => - val file = new File(f.getPath()) - val split = new org.apache.parquet.hadoop.ParquetInputSplit( - new Path(f.getPath), 0L, file.length(), file.length(), Array.empty, null) - - val vectorizedReader = new VectorizedParquetRecordReader() - vectorizedReader.initialize(split, hadoopAttemptContext) - assert(vectorizedReader.getRowGroupCount == 0) - } - } + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table" + (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path) + + val requiredSchema = StructType(Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", IntegerType, nullable = false) + )) + + val hadoopConf = new Configuration() + hadoopConf.setInt("parquet.block.size", 1) + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set( + ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + ParquetSchemaConverter.checkFieldNames(requiredSchema).json) + + val filters = ParquetFilters.createFilter(requiredSchema, sources.LessThan("a", 100)) + assert(filters.isDefined) + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(hadoopConf, attemptId) + + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, filters.get) + + new File(path).listFiles().filter(f => f.getPath().endsWith(".parquet")).map { f => + val file = new File(f.getPath()) + val split = new org.apache.parquet.hadoop.ParquetInputSplit( + new Path(f.getPath), 0L, file.length(), file.length(), Array.empty, null) + + val vectorizedReader = new VectorizedParquetRecordReader() + vectorizedReader.initialize(split, hadoopAttemptContext) + assert(vectorizedReader.getRowGroupCount == 0) } } } From 246129c2f73a8a79247be4d0f36737af227b0111 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 3 Aug 2016 16:34:04 +0800 Subject: [PATCH 04/13] Improve test cases and revert the change not needed now. --- .../expressions/namedExpressions.scala | 8 - .../datasources/FileSourceStrategy.scala | 11 +- .../parquet/ParquetFilterSuite.scala | 157 ++++++++++-------- 3 files changed, 87 insertions(+), 89 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index c06a1ea356093..306a99d5a37bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -292,14 +292,6 @@ case class AttributeReference( } } - def withMetadata(newMetadata: Metadata): AttributeReference = { - if (metadata == newMetadata) { - this - } else { - AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated) - } - } - override protected final def otherCopyArgs: Seq[AnyRef] = { exprId :: qualifier :: isGenerated :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 4e654d911c730..32aa4713ebdbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -86,15 +86,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") - val dataColumns = l.resolve(fsRelation.dataSchema, - fsRelation.sparkSession.sessionState.analyzer.resolver).map { c => - fsRelation.dataSchema.find(_.name == c.name).map { f => - c match { - case a: AttributeReference => a.withMetadata(f.metadata) - case _ => c - } - }.getOrElse(c) - } + val dataColumns = + l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver) // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index ffa08304fb58c..608ecd3457c05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -377,73 +377,75 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { import testImplicits._ - - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { - withTempPath { dir => - val pathOne = s"${dir.getCanonicalPath}/table1" - (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne) - val pathTwo = s"${dir.getCanonicalPath}/table2" - (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo) - - // If the "c = 1" filter gets pushed down, this query will throw an exception which - // Parquet emits. This is a Parquet issue (PARQUET-389). - val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a") - checkAnswer( - df, - Row(1, "1", null)) - - // The fields "a" and "c" only exist in one Parquet file. - assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - - val pathThree = s"${dir.getCanonicalPath}/table3" - df.write.parquet(pathThree) - - // We will remove the temporary metadata when writing Parquet file. - val schema = spark.read.parquet(pathThree).schema - assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) - - val pathFour = s"${dir.getCanonicalPath}/table4" - val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b") - dfStruct.select(struct("a").as("s")).write.parquet(pathFour) - - val pathFive = s"${dir.getCanonicalPath}/table5" - val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b") - dfStruct2.select(struct("c").as("s")).write.parquet(pathFive) - - // If the "s.c = 1" filter gets pushed down, this query will throw an exception which - // Parquet emits. - val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1") - .selectExpr("s") - checkAnswer(dfStruct3, Row(Row(null, 1))) - - // The fields "s.a" and "s.c" only exist in one Parquet file. - val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType] - assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - - val pathSix = s"${dir.getCanonicalPath}/table6" - dfStruct3.write.parquet(pathSix) - - // We will remove the temporary metadata when writing Parquet file. - val forPathSix = spark.read.parquet(pathSix).schema - assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) - - // sanity test: make sure optional metadata field is not wrongly set. - val pathSeven = s"${dir.getCanonicalPath}/table7" - (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven) - val pathEight = s"${dir.getCanonicalPath}/table8" - (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight) - - val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b") - checkAnswer( - df2, - Row(1, "1")) - - // The fields "a" and "b" exist in both two Parquet files. No metadata is set. - assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField)) - assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField)) + Seq("true", "false").map { vectorized => + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", + SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { + withTempPath { dir => + val pathOne = s"${dir.getCanonicalPath}/table1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne) + val pathTwo = s"${dir.getCanonicalPath}/table2" + (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo) + + // If the "c = 1" filter gets pushed down, this query will throw an exception which + // Parquet emits. This is a Parquet issue (PARQUET-389). + val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a") + checkAnswer( + df, + Row(1, "1", null)) + + // The fields "a" and "c" only exist in one Parquet file. + assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + + val pathThree = s"${dir.getCanonicalPath}/table3" + df.write.parquet(pathThree) + + // We will remove the temporary metadata when writing Parquet file. + val schema = spark.read.parquet(pathThree).schema + assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) + + val pathFour = s"${dir.getCanonicalPath}/table4" + val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b") + dfStruct.select(struct("a").as("s")).write.parquet(pathFour) + + val pathFive = s"${dir.getCanonicalPath}/table5" + val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b") + dfStruct2.select(struct("c").as("s")).write.parquet(pathFive) + + // If the "s.c = 1" filter gets pushed down, this query will throw an exception which + // Parquet emits. + val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1") + .selectExpr("s") + checkAnswer(dfStruct3, Row(Row(null, 1))) + + // The fields "s.a" and "s.c" only exist in one Parquet file. + val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType] + assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + + val pathSix = s"${dir.getCanonicalPath}/table6" + dfStruct3.write.parquet(pathSix) + + // We will remove the temporary metadata when writing Parquet file. + val forPathSix = spark.read.parquet(pathSix).schema + assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) + + // sanity test: make sure optional metadata field is not wrongly set. + val pathSeven = s"${dir.getCanonicalPath}/table7" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven) + val pathEight = s"${dir.getCanonicalPath}/table8" + (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight) + + val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b") + checkAnswer( + df2, + Row(1, "1")) + + // The fields "a" and "b" exist in both two Parquet files. No metadata is set. + assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField)) + assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField)) + } } } } @@ -549,7 +551,9 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex )) val hadoopConf = new Configuration() - hadoopConf.setInt("parquet.block.size", 1) + + // Set up parquet block size to make sure many row groups produced. + hadoopConf.setInt("parquet.block.size", 8) hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, @@ -559,10 +563,12 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assert(filters.isDefined) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = + val hadoopAttemptContext1 = + new TaskAttemptContextImpl(hadoopConf, attemptId) + val hadoopAttemptContext2 = new TaskAttemptContextImpl(hadoopConf, attemptId) - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, filters.get) + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext1.getConfiguration, filters.get) new File(path).listFiles().filter(f => f.getPath().endsWith(".parquet")).map { f => val file = new File(f.getPath()) @@ -570,8 +576,15 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex new Path(f.getPath), 0L, file.length(), file.length(), Array.empty, null) val vectorizedReader = new VectorizedParquetRecordReader() - vectorizedReader.initialize(split, hadoopAttemptContext) - assert(vectorizedReader.getRowGroupCount == 0) + // Use the context with filters pushed down. + vectorizedReader.initialize(split, hadoopAttemptContext1) + // Row groups are filtered. + assert(vectorizedReader.getRowGroupCount() == 0) + + // Use the context without filters pushed down. + vectorizedReader.initialize(split, hadoopAttemptContext2) + // Row groups are not filtered. + assert(vectorizedReader.getRowGroupCount() > 0) } } } From f7baf41882748bd67e25cd299f3d04da002b342b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 5 Aug 2016 12:54:06 +0800 Subject: [PATCH 05/13] Improve test case. --- .../spark/sql/execution/ExistingRDD.scala | 25 +++++++- .../parquet/ParquetFileFormat.scala | 7 +++ .../parquet/ParquetFilterSuite.scala | 62 ++++++------------- 3 files changed, 47 insertions(+), 47 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 79d9114ff39ae..1662637df5d8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -338,11 +338,15 @@ private[sql] case class FileSourceScanExec( DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"), DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", ")) + // Only for test purpose to get the number of row groups to read in vectorized Parquet reader. + // Since the relation is not serialized, we obtain the file format object here. + private val fileFormat = relation.fileFormat + private lazy val inputRDD: RDD[InternalRow] = { val selectedPartitions = relation.location.listFiles(partitionFilters) - val readFile: (PartitionedFile) => Iterator[InternalRow] = - relation.fileFormat.buildReaderWithPartitionValues( + val readFile: (PartitionedFile) => Iterator[InternalRow] = { + val func = fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, @@ -351,6 +355,19 @@ private[sql] case class FileSourceScanExec( options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) + (file: PartitionedFile) => { + val iter = func(file) + // Only for test purpose. + // Once the vectorized Parquet reader is initialized in the above method, we can read its + // variable numRowGroups. + if (fileFormat.isInstanceOf[ParquetSource]) { + val numRowGroups = longMetric("numRowGroups") + numRowGroups.add(fileFormat.asInstanceOf[ParquetSource].numRowGroups) + } + iter + } + } + relation.bucketSpec match { case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) @@ -365,7 +382,9 @@ private[sql] case class FileSourceScanExec( private[sql] override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"), + // Only for test purpose. + "numRowGroups" -> SQLMetrics.createMetric(sparkContext, "number of row groups to read")) protected override def doExecute(): RDD[InternalRow] = { if (supportsBatch) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index c1fa4f25d17c5..5fcd708a7ce80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -366,6 +366,10 @@ private[sql] class ParquetFileFormat val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") + + // For test purpose, getting the number to row groups this vectorized reader to read. + numRowGroups = vectorizedReader.getRowGroupCount() + vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { vectorizedReader.enableReturningBatches() @@ -415,6 +419,9 @@ private[sql] class ParquetFileFormat sqlContext.sessionState.newHadoopConf(), options) } + + // Only for test purpose. + private[sql] var numRowGroups: Int = -1 } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 608ecd3457c05..a93df356596c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -541,50 +542,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex test("Fiters should be pushed down for vectorized Parquet reader at row group level") { import testImplicits._ - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/table" - (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path) - - val requiredSchema = StructType(Seq( - StructField("a", IntegerType, nullable = false), - StructField("b", IntegerType, nullable = false) - )) - - val hadoopConf = new Configuration() - - // Set up parquet block size to make sure many row groups produced. - hadoopConf.setInt("parquet.block.size", 8) - hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) - hadoopConf.set( - ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, - ParquetSchemaConverter.checkFieldNames(requiredSchema).json) - - val filters = ParquetFilters.createFilter(requiredSchema, sources.LessThan("a", 100)) - assert(filters.isDefined) - - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext1 = - new TaskAttemptContextImpl(hadoopConf, attemptId) - val hadoopAttemptContext2 = - new TaskAttemptContextImpl(hadoopConf, attemptId) - - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext1.getConfiguration, filters.get) - - new File(path).listFiles().filter(f => f.getPath().endsWith(".parquet")).map { f => - val file = new File(f.getPath()) - val split = new org.apache.parquet.hadoop.ParquetInputSplit( - new Path(f.getPath), 0L, file.length(), file.length(), Array.empty, null) - - val vectorizedReader = new VectorizedParquetRecordReader() - // Use the context with filters pushed down. - vectorizedReader.initialize(split, hadoopAttemptContext1) - // Row groups are filtered. - assert(vectorizedReader.getRowGroupCount() == 0) - - // Use the context without filters pushed down. - vectorizedReader.initialize(split, hadoopAttemptContext2) - // Row groups are not filtered. - assert(vectorizedReader.getRowGroupCount() > 0) + + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table" + (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path) + + Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) => + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) { + val df = spark.read.parquet(path).filter("a < 100") + df.collect() + val source = df.queryExecution.sparkPlan.collect { + case f: FileSourceScanExec => f + }.head + assert(func(source.longMetric("numRowGroups").value)) + } + } } } } From 3c7afaa680aca0d508b6770d863bc164d2bb78bd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 5 Aug 2016 13:03:47 +0800 Subject: [PATCH 06/13] Add SQL metrics for number of row groups for test purpose. --- .../sql/execution/DataSourceScanExec.scala | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 1e749b3dfcffb..34c365ef79d43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -186,11 +186,15 @@ private[sql] case class FileSourceScanExec( "PushedFilters" -> dataFilters.mkString("[", ", ", "]"), "InputPaths" -> relation.location.paths.mkString(", ")) + // Only for test purpose to get the number of row groups to read in vectorized Parquet reader. + // Since the relation is not serialized, we obtain the file format object here. + private val fileFormat = relation.fileFormat + private lazy val inputRDD: RDD[InternalRow] = { val selectedPartitions = relation.location.listFiles(partitionFilters) - val readFile: (PartitionedFile) => Iterator[InternalRow] = - relation.fileFormat.buildReaderWithPartitionValues( + val readFile: (PartitionedFile) => Iterator[InternalRow] = { + val func = fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, @@ -199,6 +203,19 @@ private[sql] case class FileSourceScanExec( options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) + (file: PartitionedFile) => { + val iter = func(file) + // Only for test purpose. + // Once the vectorized Parquet reader is initialized in the above method, we can read its + // variable numRowGroups. + if (fileFormat.isInstanceOf[ParquetSource]) { + val numRowGroups = longMetric("numRowGroups") + numRowGroups.add(fileFormat.asInstanceOf[ParquetSource].numRowGroups) + } + iter + } + } + relation.bucketSpec match { case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) @@ -213,7 +230,9 @@ private[sql] case class FileSourceScanExec( private[sql] override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"), + // Only for test purpose. + "numRowGroups" -> SQLMetrics.createMetric(sparkContext, "number of row groups to read")) protected override def doExecute(): RDD[InternalRow] = { if (supportsBatch) { From ea81fdcb0744f938f083e287bbd32d61b83af8d2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 5 Aug 2016 14:36:15 +0800 Subject: [PATCH 07/13] A few file format are not serializable. --- .../spark/sql/execution/DataSourceScanExec.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 34c365ef79d43..f755450d8fb48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -188,13 +188,19 @@ private[sql] case class FileSourceScanExec( // Only for test purpose to get the number of row groups to read in vectorized Parquet reader. // Since the relation is not serialized, we obtain the file format object here. - private val fileFormat = relation.fileFormat + private val fileFormat: ParquetSource = + // A few file formats are not serializable... + if (relation.fileFormat.isInstanceOf[ParquetSource]) { + relation.fileFormat.asInstanceOf[ParquetSource] + } else { + null + } private lazy val inputRDD: RDD[InternalRow] = { val selectedPartitions = relation.location.listFiles(partitionFilters) val readFile: (PartitionedFile) => Iterator[InternalRow] = { - val func = fileFormat.buildReaderWithPartitionValues( + val func = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, @@ -208,9 +214,9 @@ private[sql] case class FileSourceScanExec( // Only for test purpose. // Once the vectorized Parquet reader is initialized in the above method, we can read its // variable numRowGroups. - if (fileFormat.isInstanceOf[ParquetSource]) { + if (fileFormat != null) { val numRowGroups = longMetric("numRowGroups") - numRowGroups.add(fileFormat.asInstanceOf[ParquetSource].numRowGroups) + numRowGroups.add(fileFormat.numRowGroups) } iter } From 2d3480381317bba06274e4ea899bc8d98d5cb82c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 6 Aug 2016 10:30:34 +0800 Subject: [PATCH 08/13] Improve the approach to update accumulator. --- .../sql/execution/DataSourceScanExec.scala | 29 ++----------------- .../parquet/ParquetFileFormat.scala | 15 ++++++++-- 2 files changed, 15 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index f755450d8fb48..b3feefdb751fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -186,21 +186,11 @@ private[sql] case class FileSourceScanExec( "PushedFilters" -> dataFilters.mkString("[", ", ", "]"), "InputPaths" -> relation.location.paths.mkString(", ")) - // Only for test purpose to get the number of row groups to read in vectorized Parquet reader. - // Since the relation is not serialized, we obtain the file format object here. - private val fileFormat: ParquetSource = - // A few file formats are not serializable... - if (relation.fileFormat.isInstanceOf[ParquetSource]) { - relation.fileFormat.asInstanceOf[ParquetSource] - } else { - null - } - private lazy val inputRDD: RDD[InternalRow] = { val selectedPartitions = relation.location.listFiles(partitionFilters) - val readFile: (PartitionedFile) => Iterator[InternalRow] = { - val func = relation.fileFormat.buildReaderWithPartitionValues( + val readFile: (PartitionedFile) => Iterator[InternalRow] = + relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, @@ -209,19 +199,6 @@ private[sql] case class FileSourceScanExec( options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) - (file: PartitionedFile) => { - val iter = func(file) - // Only for test purpose. - // Once the vectorized Parquet reader is initialized in the above method, we can read its - // variable numRowGroups. - if (fileFormat != null) { - val numRowGroups = longMetric("numRowGroups") - numRowGroups.add(fileFormat.numRowGroups) - } - iter - } - } - relation.bucketSpec match { case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) @@ -238,7 +215,7 @@ private[sql] case class FileSourceScanExec( Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"), // Only for test purpose. - "numRowGroups" -> SQLMetrics.createMetric(sparkContext, "number of row groups to read")) + "numRowGroups" -> SQLMetrics.createMetric(sparkContext, "numRowGroups")) protected override def doExecute(): RDD[InternalRow] = { if (supportsBatch) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 0238dbbb22673..09683737d5963 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.util.ContextUtil import org.apache.parquet.schema.MessageType import org.slf4j.bridge.SLF4JBridgeHandler -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow @@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -368,7 +369,15 @@ private[sql] class ParquetFileFormat logDebug(s"Appending $partitionSchema ${file.partitionValues}") // For test purpose, getting the number to row groups this vectorized reader to read. - numRowGroups = vectorizedReader.getRowGroupCount() + val taskContext = TaskContext.get() + if (taskContext != null) { + val accu = taskContext.taskMetrics.externalAccums.find { acc => + acc.name.isDefined && acc.name.get == accuNameForNumRowGroup + } + if (accu.isDefined) { + accu.get.asInstanceOf[SQLMetric].add(vectorizedReader.getRowGroupCount().toLong) + } + } vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { @@ -427,7 +436,7 @@ private[sql] class ParquetFileFormat } // Only for test purpose. - private[sql] var numRowGroups: Int = -1 + private val accuNameForNumRowGroup = "numRowGroups" } /** From 462edc758370d7407e447ec6e9fe144f472f3c4e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 8 Aug 2016 10:49:46 +0800 Subject: [PATCH 09/13] Add a method in TaskMetrics to look up for accumulator by name. --- .../scala/org/apache/spark/executor/TaskMetrics.scala | 9 +++++++++ .../datasources/parquet/ParquetFileFormat.scala | 8 ++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 5bb505bf09f17..dd149a919fe55 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -225,6 +225,15 @@ class TaskMetrics private[spark] () extends Serializable { } private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums + + /** + * Looks for a registered accumulator by accumulator name. + */ + private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = { + accumulators.find { acc => + acc.name.isDefined && acc.name.get == name + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 09683737d5963..9f99312828db6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -368,12 +368,12 @@ private[sql] class ParquetFileFormat vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") - // For test purpose, getting the number to row groups this vectorized reader to read. + // For test purpose. + // If the predefined accumulator exists, the row group number to read will be updated + // to the accumulator. So we can check if the row groups are filtered or not in test case. val taskContext = TaskContext.get() if (taskContext != null) { - val accu = taskContext.taskMetrics.externalAccums.find { acc => - acc.name.isDefined && acc.name.get == accuNameForNumRowGroup - } + val accu = taskContext.taskMetrics.lookForAccumulatorByName(accuNameForNumRowGroup) if (accu.isDefined) { accu.get.asInstanceOf[SQLMetric].add(vectorizedReader.getRowGroupCount().toLong) } From 0b38ba18bb51f4e6ee9dabe00c377601ae32777e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 9 Aug 2016 12:19:46 +0800 Subject: [PATCH 10/13] Add comments. --- .../org/apache/spark/util/AccumulatorV2.scala | 12 ++++++++ .../SpecificParquetRecordReaderBase.java | 29 +++++++++++-------- .../sql/execution/DataSourceScanExec.scala | 4 +-- .../parquet/ParquetFileFormat.scala | 17 +---------- .../parquet/ParquetFilterSuite.scala | 15 ++++++---- 5 files changed, 41 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 044dd69cc92c7..8eef62ad8e819 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -23,6 +23,8 @@ import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong +import scala.collection.JavaConverters._ + import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext} import org.apache.spark.scheduler.AccumulableInfo @@ -257,6 +259,16 @@ private[spark] object AccumulatorContext { originals.clear() } + /** + * Looks for a registered accumulator by accumulator name. + */ + private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = { + originals.values().asScala.find { ref => + val acc = ref.get + acc.name.isDefined && acc.name.get == name + }.map(_.get) + } + // Identifier for distinguishing SQL metrics from other accumulators private[spark] val SQL_ACCUM_IDENTIFIER = "sql" } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 9add100026b62..520c9f17d980b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -31,6 +31,8 @@ import java.util.Map; import java.util.Set; +import scala.Option; + import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; @@ -59,8 +61,12 @@ import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType$; +import org.apache.spark.util.AccumulatorV2; +import org.apache.spark.util.LongAccumulator; /** * Base class for custom RecordReaders for Parquet that directly materialize to `T`. @@ -82,12 +88,6 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader> accu = (Option>) taskContext.taskMetrics() + .lookForAccumulatorByName("numRowGroups"); + if (accu.isDefined()) { + ((LongAccumulator)accu.get()).add((long)blocks.size()); + } + } } - /** - * Returns the total number of row groups to read. For test purpose only. - */ - public int getRowGroupCount() { return rowGroupCount; } - /** * Returns the list of files at 'path' recursively. This skips files that are ignored normally * by MapReduce. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index b3feefdb751fc..1e749b3dfcffb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -213,9 +213,7 @@ private[sql] case class FileSourceScanExec( private[sql] override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"), - // Only for test purpose. - "numRowGroups" -> SQLMetrics.createMetric(sparkContext, "numRowGroups")) + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = { if (supportsBatch) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 9f99312828db6..18b95bb312d11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.util.ContextUtil import org.apache.parquet.schema.MessageType import org.slf4j.bridge.SLF4JBridgeHandler -import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow @@ -367,18 +367,6 @@ private[sql] class ParquetFileFormat val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") - - // For test purpose. - // If the predefined accumulator exists, the row group number to read will be updated - // to the accumulator. So we can check if the row groups are filtered or not in test case. - val taskContext = TaskContext.get() - if (taskContext != null) { - val accu = taskContext.taskMetrics.lookForAccumulatorByName(accuNameForNumRowGroup) - if (accu.isDefined) { - accu.get.asInstanceOf[SQLMetric].add(vectorizedReader.getRowGroupCount().toLong) - } - } - vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { vectorizedReader.enableReturningBatches() @@ -434,9 +422,6 @@ private[sql] class ParquetFileFormat sqlContext.sessionState.newHadoopConf(), options) } - - // Only for test purpose. - private val accuNameForNumRowGroup = "numRowGroups" } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index a93df356596c4..c6eb798cbf1a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.util.{AccumulatorContext, LongAccumulator} /** * A test suite that tests Parquet filter2 API based filter pushdown optimization. @@ -551,12 +552,16 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) => withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) { + val accu = new LongAccumulator + accu.register(sparkContext, Some("numRowGroups")) + val df = spark.read.parquet(path).filter("a < 100") - df.collect() - val source = df.queryExecution.sparkPlan.collect { - case f: FileSourceScanExec => f - }.head - assert(func(source.longMetric("numRowGroups").value)) + df.foreachPartition(_.foreach(v => accu.add(0))) + df.collect + + val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups") + assert(numRowGroups.isDefined) + assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value)) } } } From cee74b7b9cba73a91d9120add0cfe8e3226f19a6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 9 Aug 2016 12:29:05 +0800 Subject: [PATCH 11/13] Remove unneeded changes. --- .../datasources/parquet/ParquetFilterSuite.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index c6eb798cbf1a6..af43ad3213da6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -17,26 +17,16 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.io.File import java.nio.charset.StandardCharsets -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.JobID -import org.apache.hadoop.mapreduce.TaskAttemptID -import org.apache.hadoop.mapreduce.TaskID -import org.apache.hadoop.mapreduce.TaskType -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators} import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate.Operators.{Column => _, _} -import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf From a2ba343420493f9ca1caf393c17c1e763e0e231b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 10 Aug 2016 13:03:14 +0800 Subject: [PATCH 12/13] Prevent acculumator to be released early. --- .../src/main/scala/org/apache/spark/util/AccumulatorV2.scala | 2 +- .../execution/datasources/parquet/ParquetFilterSuite.scala | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 925f9ee80a733..d130a37db5b5d 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -265,7 +265,7 @@ private[spark] object AccumulatorContext { private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = { originals.values().asScala.find { ref => val acc = ref.get - acc.name.isDefined && acc.name.get == name + acc != null && acc.name.isDefined && acc.name.get == name }.map(_.get) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index af43ad3213da6..ea1087a2000e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -552,6 +552,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups") assert(numRowGroups.isDefined) assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value)) + // As accumulator is referred by weak reference in AccumulatorContext, + // sometimes `accu` will be early released if JVM finds it is not used anymore. + // This optimization causes previous asserts failed. So adding an additional assert + // on `accu` here. + assert(accu != null) } } } From ca074f11cb51f792c603dcdf3c07502525b9d134 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 10 Aug 2016 20:43:06 +0800 Subject: [PATCH 13/13] Remove previous accumulator. --- .../execution/datasources/parquet/ParquetFilterSuite.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index ea1087a2000e2..4246b54c21f0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -552,11 +552,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups") assert(numRowGroups.isDefined) assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value)) - // As accumulator is referred by weak reference in AccumulatorContext, - // sometimes `accu` will be early released if JVM finds it is not used anymore. - // This optimization causes previous asserts failed. So adding an additional assert - // on `accu` here. - assert(accu != null) + AccumulatorContext.remove(accu.id) } } }