From ac06b0ca28d1da81fadbe0742a199b5e7b0de1ec Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 1 Sep 2018 15:22:10 -0700 Subject: [PATCH 1/6] [SPARK-25306][SQL] Use cache to speed up `createFilter` --- .../datasources/orc/OrcFilters.scala | 59 +++++++++++++++---- .../datasources/orc/OrcFilterSuite.scala | 16 ++++- .../spark/sql/hive/orc/OrcFilters.scala | 53 ++++++++++++++--- .../sql/hive/orc/HiveOrcFilterSuite.scala | 16 ++++- 4 files changed, 119 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index c4c3b3053a3b1..5dd35196df93a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -17,10 +17,14 @@ package org.apache.spark.sql.execution.datasources.orc +import java.util.concurrent.TimeUnit + +import com.google.common.cache.{CacheBuilder, CacheLoader} import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument, SearchArgumentFactory} import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder import org.apache.orc.storage.serde2.io.HiveDecimalWritable +import org.apache.spark.SparkEnv import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ @@ -54,7 +58,37 @@ import org.apache.spark.sql.types._ * builder methods mentioned above can only be found in test code, where all tested filters are * known to be convertible. */ -private[orc] object OrcFilters { +private[sql] object OrcFilters { + + case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, DataType]) + + private val defaultCacheExpireTimeout = TimeUnit.SECONDS.toSeconds(20) + + lazy val cacheExpireTimeout: Long = + Option(SparkEnv.get).map(_.conf.getTimeAsSeconds( + "spark.sql.orc.cache.sarg.timeout", + s"${defaultCacheExpireTimeout}s")).getOrElse(defaultCacheExpireTimeout) + + private lazy val searchArgumentCache = CacheBuilder.newBuilder() + .expireAfterAccess(cacheExpireTimeout, TimeUnit.SECONDS) + .build( + new CacheLoader[FilterWithTypeMap, Option[Builder]]() { + override def load(typeMapAndFilter: FilterWithTypeMap): Option[Builder] = { + buildSearchArgument( + typeMapAndFilter.typeMap, typeMapAndFilter.filter, SearchArgumentFactory.newBuilder()) + } + }) + + private def getOrBuildSearchArgumentWithNewBuilder( + dataTypeMap: Map[String, DataType], + expression: Filter): Option[Builder] = { + // When `spark.sql.orc.cache.sarg.timeout` is 0, cache is disabled. + if (cacheExpireTimeout > 0) { + searchArgumentCache.get(FilterWithTypeMap(expression, dataTypeMap)) + } else { + buildSearchArgument(dataTypeMap, expression, SearchArgumentFactory.newBuilder()) + } + } /** * Create ORC filter as a SearchArgument instance. @@ -66,12 +100,19 @@ private[orc] object OrcFilters { // collect all convertible ones to build the final `SearchArgument`. val convertibleFilters = for { filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder()) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, filter) } yield filter for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- convertibleFilters.reduceOption(org.apache.spark.sql.sources.And) + conjunction <- convertibleFilters.reduceOption { (x, y) => + val newFilter = org.apache.spark.sql.sources.And(x, y) + if (cacheExpireTimeout > 0) { + // Build in a bottom-up manner + getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, newFilter) + } + newFilter + } // Then tries to build a single ORC `SearchArgument` for the conjunction predicate builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder()) } yield builder.build() @@ -127,8 +168,6 @@ private[orc] object OrcFilters { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - def newBuilder = SearchArgumentFactory.newBuilder() - def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) @@ -144,23 +183,23 @@ private[orc] object OrcFilters { // Pushing one side of AND down is only safe to do at the top level. // You can see ParquetRelation's initializeLocalJobFunc method as an example. for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, left) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, right) lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Or(left, right) => for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, left) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, right) lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Not(child) => for { - _ <- buildSearchArgument(dataTypeMap, child, newBuilder) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, child) negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) } yield negate.end() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 8680b86517b19..84d950ae7b610 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.datasources.orc import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import scala.collection.JavaConverters._ - import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.scalatest.concurrent.TimeLimits +import org.scalatest.time.SpanSugar._ +import scala.collection.JavaConverters._ import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -39,7 +40,7 @@ import org.apache.spark.sql.types._ * - OrcFilterSuite uses 'org.apache.orc.storage.ql.io.sarg' package. * - HiveOrcFilterSuite uses 'org.apache.hadoop.hive.ql.io.sarg' package. */ -class OrcFilterSuite extends OrcTest with SharedSQLContext { +class OrcFilterSuite extends OrcTest with SharedSQLContext with TimeLimits { private def checkFilterPredicate( df: DataFrame, @@ -383,4 +384,13 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { )).get.toString } } + + test("createFilter should not hang") { + import org.apache.spark.sql.sources._ + val schema = new StructType(Array(StructField("a", IntegerType, nullable = true))) + val filters = (1 to 500).map(LessThan("a", _)).toArray[Filter] + failAfter(2 seconds) { + OrcFilters.createFilter(schema, filters) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index d9efd0cb457cd..e24c2af5c665a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.hive.orc +import java.util.concurrent.TimeUnit + +import com.google.common.cache.{CacheBuilder, CacheLoader} import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, SearchArgumentFactory} import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder +import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -55,6 +59,32 @@ import org.apache.spark.sql.types._ * known to be convertible. */ private[orc] object OrcFilters extends Logging { + case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, DataType]) + + private lazy val cacheExpireTimeout = + org.apache.spark.sql.execution.datasources.orc.OrcFilters.cacheExpireTimeout + + private lazy val searchArgumentCache = CacheBuilder.newBuilder() + .expireAfterAccess(cacheExpireTimeout, TimeUnit.SECONDS) + .build( + new CacheLoader[FilterWithTypeMap, Option[Builder]]() { + override def load(typeMapAndFilter: FilterWithTypeMap): Option[Builder] = { + buildSearchArgument( + typeMapAndFilter.typeMap, typeMapAndFilter.filter, SearchArgumentFactory.newBuilder()) + } + }) + + private def getOrBuildSearchArgumentWithNewBuilder( + dataTypeMap: Map[String, DataType], + expression: Filter): Option[Builder] = { + // When `spark.sql.orc.cache.sarg.timeout` is 0, cache is disabled. + if (cacheExpireTimeout > 0) { + searchArgumentCache.get(FilterWithTypeMap(expression, dataTypeMap)) + } else { + buildSearchArgument(dataTypeMap, expression, SearchArgumentFactory.newBuilder()) + } + } + def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap @@ -62,12 +92,19 @@ private[orc] object OrcFilters extends Logging { // collect all convertible ones to build the final `SearchArgument`. val convertibleFilters = for { filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder()) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, filter) } yield filter for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- convertibleFilters.reduceOption(And) + conjunction <- convertibleFilters.reduceOption { (x, y) => + val newFilter = org.apache.spark.sql.sources.And(x, y) + if (cacheExpireTimeout > 0) { + // Build in a bottom-up manner + getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, newFilter) + } + newFilter + } // Then tries to build a single ORC `SearchArgument` for the conjunction predicate builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder()) } yield builder.build() @@ -77,8 +114,6 @@ private[orc] object OrcFilters extends Logging { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - def newBuilder = SearchArgumentFactory.newBuilder() - def isSearchableType(dataType: DataType): Boolean = dataType match { // Only the values in the Spark types below can be recognized by // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. @@ -98,23 +133,23 @@ private[orc] object OrcFilters extends Logging { // Pushing one side of AND down is only safe to do at the top level. // You can see ParquetRelation's initializeLocalJobFunc method as an example. for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, left) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, right) lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Or(left, right) => for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, left) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, right) lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Not(child) => for { - _ <- buildSearchArgument(dataTypeMap, child, newBuilder) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, child) negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) } yield negate.end() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala index 283037caf4a9b..9f991cb147a18 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.hive.orc import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import scala.collection.JavaConverters._ - import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.scalatest.concurrent.TimeLimits +import org.scalatest.time.SpanSugar._ +import scala.collection.JavaConverters._ import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -36,7 +37,7 @@ import org.apache.spark.sql.types._ /** * A test suite that tests Hive ORC filter API based filter pushdown optimization. */ -class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { +class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton with TimeLimits { override val orcImp: String = "hive" @@ -384,4 +385,13 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { )).get.toString } } + + test("createFilter should not hang") { + import org.apache.spark.sql.sources._ + val schema = new StructType(Array(StructField("a", IntegerType, nullable = true))) + val filters = (1 to 500).map(LessThan("a", _)).toArray[Filter] + failAfter(2 seconds) { + OrcFilters.createFilter(schema, filters) + } + } } From 7720134706f08b8a22993c378f9d3097eeae425b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 3 Sep 2018 12:40:30 -0700 Subject: [PATCH 2/6] Revert "[SPARK-25306][SQL] Use cache to speed up `createFilter`" This reverts commit ac06b0ca28d1da81fadbe0742a199b5e7b0de1ec. --- .../datasources/orc/OrcFilters.scala | 59 ++++--------------- .../datasources/orc/OrcFilterSuite.scala | 16 +---- .../spark/sql/hive/orc/OrcFilters.scala | 53 +++-------------- .../sql/hive/orc/HiveOrcFilterSuite.scala | 16 +---- 4 files changed, 25 insertions(+), 119 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 5dd35196df93a..c4c3b3053a3b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -17,14 +17,10 @@ package org.apache.spark.sql.execution.datasources.orc -import java.util.concurrent.TimeUnit - -import com.google.common.cache.{CacheBuilder, CacheLoader} import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument, SearchArgumentFactory} import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder import org.apache.orc.storage.serde2.io.HiveDecimalWritable -import org.apache.spark.SparkEnv import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ @@ -58,37 +54,7 @@ import org.apache.spark.sql.types._ * builder methods mentioned above can only be found in test code, where all tested filters are * known to be convertible. */ -private[sql] object OrcFilters { - - case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, DataType]) - - private val defaultCacheExpireTimeout = TimeUnit.SECONDS.toSeconds(20) - - lazy val cacheExpireTimeout: Long = - Option(SparkEnv.get).map(_.conf.getTimeAsSeconds( - "spark.sql.orc.cache.sarg.timeout", - s"${defaultCacheExpireTimeout}s")).getOrElse(defaultCacheExpireTimeout) - - private lazy val searchArgumentCache = CacheBuilder.newBuilder() - .expireAfterAccess(cacheExpireTimeout, TimeUnit.SECONDS) - .build( - new CacheLoader[FilterWithTypeMap, Option[Builder]]() { - override def load(typeMapAndFilter: FilterWithTypeMap): Option[Builder] = { - buildSearchArgument( - typeMapAndFilter.typeMap, typeMapAndFilter.filter, SearchArgumentFactory.newBuilder()) - } - }) - - private def getOrBuildSearchArgumentWithNewBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter): Option[Builder] = { - // When `spark.sql.orc.cache.sarg.timeout` is 0, cache is disabled. - if (cacheExpireTimeout > 0) { - searchArgumentCache.get(FilterWithTypeMap(expression, dataTypeMap)) - } else { - buildSearchArgument(dataTypeMap, expression, SearchArgumentFactory.newBuilder()) - } - } +private[orc] object OrcFilters { /** * Create ORC filter as a SearchArgument instance. @@ -100,19 +66,12 @@ private[sql] object OrcFilters { // collect all convertible ones to build the final `SearchArgument`. val convertibleFilters = for { filter <- filters - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, filter) + _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder()) } yield filter for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- convertibleFilters.reduceOption { (x, y) => - val newFilter = org.apache.spark.sql.sources.And(x, y) - if (cacheExpireTimeout > 0) { - // Build in a bottom-up manner - getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, newFilter) - } - newFilter - } + conjunction <- convertibleFilters.reduceOption(org.apache.spark.sql.sources.And) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder()) } yield builder.build() @@ -168,6 +127,8 @@ private[sql] object OrcFilters { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { + def newBuilder = SearchArgumentFactory.newBuilder() + def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) @@ -183,23 +144,23 @@ private[sql] object OrcFilters { // Pushing one side of AND down is only safe to do at the top level. // You can see ParquetRelation's initializeLocalJobFunc method as an example. for { - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, left) - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, right) + _ <- buildSearchArgument(dataTypeMap, left, newBuilder) + _ <- buildSearchArgument(dataTypeMap, right, newBuilder) lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Or(left, right) => for { - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, left) - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, right) + _ <- buildSearchArgument(dataTypeMap, left, newBuilder) + _ <- buildSearchArgument(dataTypeMap, right, newBuilder) lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Not(child) => for { - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, child) + _ <- buildSearchArgument(dataTypeMap, child, newBuilder) negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) } yield negate.end() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 84d950ae7b610..8680b86517b19 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -20,11 +20,10 @@ package org.apache.spark.sql.execution.datasources.orc import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.scalatest.concurrent.TimeLimits -import org.scalatest.time.SpanSugar._ import scala.collection.JavaConverters._ +import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} + import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ @@ -40,7 +39,7 @@ import org.apache.spark.sql.types._ * - OrcFilterSuite uses 'org.apache.orc.storage.ql.io.sarg' package. * - HiveOrcFilterSuite uses 'org.apache.hadoop.hive.ql.io.sarg' package. */ -class OrcFilterSuite extends OrcTest with SharedSQLContext with TimeLimits { +class OrcFilterSuite extends OrcTest with SharedSQLContext { private def checkFilterPredicate( df: DataFrame, @@ -384,13 +383,4 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext with TimeLimits { )).get.toString } } - - test("createFilter should not hang") { - import org.apache.spark.sql.sources._ - val schema = new StructType(Array(StructField("a", IntegerType, nullable = true))) - val filters = (1 to 500).map(LessThan("a", _)).toArray[Filter] - failAfter(2 seconds) { - OrcFilters.createFilter(schema, filters) - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index e24c2af5c665a..d9efd0cb457cd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -17,13 +17,9 @@ package org.apache.spark.sql.hive.orc -import java.util.concurrent.TimeUnit - -import com.google.common.cache.{CacheBuilder, CacheLoader} import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, SearchArgumentFactory} import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder -import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -59,32 +55,6 @@ import org.apache.spark.sql.types._ * known to be convertible. */ private[orc] object OrcFilters extends Logging { - case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, DataType]) - - private lazy val cacheExpireTimeout = - org.apache.spark.sql.execution.datasources.orc.OrcFilters.cacheExpireTimeout - - private lazy val searchArgumentCache = CacheBuilder.newBuilder() - .expireAfterAccess(cacheExpireTimeout, TimeUnit.SECONDS) - .build( - new CacheLoader[FilterWithTypeMap, Option[Builder]]() { - override def load(typeMapAndFilter: FilterWithTypeMap): Option[Builder] = { - buildSearchArgument( - typeMapAndFilter.typeMap, typeMapAndFilter.filter, SearchArgumentFactory.newBuilder()) - } - }) - - private def getOrBuildSearchArgumentWithNewBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter): Option[Builder] = { - // When `spark.sql.orc.cache.sarg.timeout` is 0, cache is disabled. - if (cacheExpireTimeout > 0) { - searchArgumentCache.get(FilterWithTypeMap(expression, dataTypeMap)) - } else { - buildSearchArgument(dataTypeMap, expression, SearchArgumentFactory.newBuilder()) - } - } - def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap @@ -92,19 +62,12 @@ private[orc] object OrcFilters extends Logging { // collect all convertible ones to build the final `SearchArgument`. val convertibleFilters = for { filter <- filters - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, filter) + _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder()) } yield filter for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- convertibleFilters.reduceOption { (x, y) => - val newFilter = org.apache.spark.sql.sources.And(x, y) - if (cacheExpireTimeout > 0) { - // Build in a bottom-up manner - getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, newFilter) - } - newFilter - } + conjunction <- convertibleFilters.reduceOption(And) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder()) } yield builder.build() @@ -114,6 +77,8 @@ private[orc] object OrcFilters extends Logging { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { + def newBuilder = SearchArgumentFactory.newBuilder() + def isSearchableType(dataType: DataType): Boolean = dataType match { // Only the values in the Spark types below can be recognized by // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. @@ -133,23 +98,23 @@ private[orc] object OrcFilters extends Logging { // Pushing one side of AND down is only safe to do at the top level. // You can see ParquetRelation's initializeLocalJobFunc method as an example. for { - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, left) - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, right) + _ <- buildSearchArgument(dataTypeMap, left, newBuilder) + _ <- buildSearchArgument(dataTypeMap, right, newBuilder) lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Or(left, right) => for { - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, left) - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, right) + _ <- buildSearchArgument(dataTypeMap, left, newBuilder) + _ <- buildSearchArgument(dataTypeMap, right, newBuilder) lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Not(child) => for { - _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, child) + _ <- buildSearchArgument(dataTypeMap, child, newBuilder) negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) } yield negate.end() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala index 9f991cb147a18..283037caf4a9b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala @@ -20,11 +20,10 @@ package org.apache.spark.sql.hive.orc import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.scalatest.concurrent.TimeLimits -import org.scalatest.time.SpanSugar._ import scala.collection.JavaConverters._ +import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} + import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ @@ -37,7 +36,7 @@ import org.apache.spark.sql.types._ /** * A test suite that tests Hive ORC filter API based filter pushdown optimization. */ -class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton with TimeLimits { +class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { override val orcImp: String = "hive" @@ -385,13 +384,4 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton with TimeLimits )).get.toString } } - - test("createFilter should not hang") { - import org.apache.spark.sql.sources._ - val schema = new StructType(Array(StructField("a", IntegerType, nullable = true))) - val filters = (1 to 500).map(LessThan("a", _)).toArray[Filter] - failAfter(2 seconds) { - OrcFilters.createFilter(schema, filters) - } - } } From 4acbaf8be9e572c5cdbc61c49b488e8aef9e646b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 3 Sep 2018 13:11:11 -0700 Subject: [PATCH 3/6] Address comments --- .../execution/datasources/orc/OrcFilters.scala | 16 ++++++++++++++-- .../datasources/orc/OrcFilterSuite.scala | 14 +++++++++++++- .../apache/spark/sql/hive/orc/OrcFilters.scala | 3 ++- .../spark/sql/hive/orc/HiveOrcFilterSuite.scala | 14 +++++++++++++- 4 files changed, 42 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index c4c3b3053a3b1..574492e900756 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -54,7 +54,7 @@ import org.apache.spark.sql.types._ * builder methods mentioned above can only be found in test code, where all tested filters are * known to be convertible. */ -private[orc] object OrcFilters { +private[sql] object OrcFilters { /** * Create ORC filter as a SearchArgument instance. @@ -71,12 +71,24 @@ private[orc] object OrcFilters { for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- convertibleFilters.reduceOption(org.apache.spark.sql.sources.And) + conjunction <- buildTree(convertibleFilters) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder()) } yield builder.build() } + def buildTree(filters: Seq[Filter]): Option[Filter] = { + import org.apache.spark.sql.sources.And + filters match { + case Seq() => None + case Seq(filter) => Some(filter) + case Seq(filter1, filter2) => Some(And(filter1, filter2)) + case _ => // length > 2 + val (left, right) = filters.splitAt(filters.length / 2) + Some(And(buildTree(left).get, buildTree(right).get)) + } + } + /** * Return true if this is a searchable type in ORC. * Both CharType and VarcharType are cleaned at AstBuilder. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 8680b86517b19..4798a59ea007d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -23,6 +23,9 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.scalatest.concurrent.TimeLimits +import org.scalatest.time.SpanSugar._ +import scala.collection.JavaConverters._ import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -39,7 +42,7 @@ import org.apache.spark.sql.types._ * - OrcFilterSuite uses 'org.apache.orc.storage.ql.io.sarg' package. * - HiveOrcFilterSuite uses 'org.apache.hadoop.hive.ql.io.sarg' package. */ -class OrcFilterSuite extends OrcTest with SharedSQLContext { +class OrcFilterSuite extends OrcTest with SharedSQLContext with TimeLimits { private def checkFilterPredicate( df: DataFrame, @@ -383,4 +386,13 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { )).get.toString } } + + test("SPARK-25306 createFilter should not hang") { + import org.apache.spark.sql.sources._ + val schema = new StructType(Array(StructField("a", IntegerType, nullable = true))) + val filters = (1 to 2000).map(LessThan("a", _)).toArray[Filter] + failAfter(2 seconds) { + OrcFilters.createFilter(schema, filters) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index d9efd0cb457cd..35f6e01cf2da1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, SearchArgumentFactory} import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -67,7 +68,7 @@ private[orc] object OrcFilters extends Logging { for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- convertibleFilters.reduceOption(And) + conjunction <- buildTree(convertibleFilters) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder()) } yield builder.build() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala index 283037caf4a9b..5cf2904d26ede 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala @@ -23,6 +23,9 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.scalatest.concurrent.TimeLimits +import org.scalatest.time.SpanSugar._ +import scala.collection.JavaConverters._ import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -36,7 +39,7 @@ import org.apache.spark.sql.types._ /** * A test suite that tests Hive ORC filter API based filter pushdown optimization. */ -class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { +class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton with TimeLimits { override val orcImp: String = "hive" @@ -384,4 +387,13 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { )).get.toString } } + + test("SPARK-25306 createFilter should not hang") { + import org.apache.spark.sql.sources._ + val schema = new StructType(Array(StructField("a", IntegerType, nullable = true))) + val filters = (1 to 2000).map(LessThan("a", _)).toArray[Filter] + failAfter(2 seconds) { + OrcFilters.createFilter(schema, filters) + } + } } From 5c46693e58e0f71fe8e67dce16f4b8c783c80aa6 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 3 Sep 2018 21:25:34 -0700 Subject: [PATCH 4/6] Address comments --- .../FilterPushdownBenchmark-results.txt | 34 +++++++++++++++ .../datasources/orc/OrcFilters.scala | 43 +++++++------------ .../benchmark/FilterPushdownBenchmark.scala | 18 ++++++++ .../datasources/orc/OrcFilterSuite.scala | 14 +----- .../spark/sql/hive/orc/OrcFilters.scala | 21 +++------ .../sql/hive/orc/HiveOrcFilterSuite.scala | 14 +----- 6 files changed, 74 insertions(+), 70 deletions(-) diff --git a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt index 2215ed91e2018..1b58392b323df 100644 --- a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt +++ b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt @@ -702,3 +702,37 @@ Parquet Vectorized (Pushdown) 11766 / 11927 1.3 7 Native ORC Vectorized 12101 / 12301 1.3 769.3 1.0X Native ORC Vectorized (Pushdown) 11983 / 12651 1.3 761.9 1.0X + +================================================================================================ +Pushdown benchmark with many filters +================================================================================================ + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 1 row with 1 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +Parquet Vectorized 167 / 183 0.0 166581369.0 1.0X +Parquet Vectorized (Pushdown) 148 / 157 0.0 148497299.0 1.1X +Native ORC Vectorized 142 / 151 0.0 142053680.0 1.2X +Native ORC Vectorized (Pushdown) 142 / 148 0.0 142490700.0 1.2X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 1 row with 250 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +Parquet Vectorized 1135 / 1147 0.0 1134508548.0 1.0X +Parquet Vectorized (Pushdown) 1432 / 1442 0.0 1431916497.0 0.8X +Native ORC Vectorized 1122 / 1128 0.0 1121722239.0 1.0X +Native ORC Vectorized (Pushdown) 1175 / 1182 0.0 1175152267.0 1.0X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 1 row with 500 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +Parquet Vectorized 4480 / 4517 0.0 4480162380.0 1.0X +Parquet Vectorized (Pushdown) 5191 / 5222 0.0 5191121525.0 0.9X +Native ORC Vectorized 4474 / 4485 0.0 4474218663.0 1.0X +Native ORC Vectorized (Pushdown) 4704 / 4721 0.0 4704080940.0 1.0X diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 574492e900756..42db72b25c909 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.execution.datasources.orc -import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument, SearchArgumentFactory} +import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder +import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.orc.storage.serde2.io.HiveDecimalWritable -import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.{And, Filter} import org.apache.spark.sql.types._ /** @@ -55,30 +56,7 @@ import org.apache.spark.sql.types._ * known to be convertible. */ private[sql] object OrcFilters { - - /** - * Create ORC filter as a SearchArgument instance. - */ - def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { - val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - - // First, tries to convert each filter individually to see whether it's convertible, and then - // collect all convertible ones to build the final `SearchArgument`. - val convertibleFilters = for { - filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder()) - } yield filter - - for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters) - // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder()) - } yield builder.build() - } - - def buildTree(filters: Seq[Filter]): Option[Filter] = { - import org.apache.spark.sql.sources.And + private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = { filters match { case Seq() => None case Seq(filter) => Some(filter) @@ -89,6 +67,17 @@ private[sql] object OrcFilters { } } + /** + * Create ORC filter as a SearchArgument instance. + */ + def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { + val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap + + buildTree(filters.filter(buildSearchArgument(dataTypeMap, _, newBuilder).isDefined)) + .flatMap(buildSearchArgument(dataTypeMap, _, newBuilder)) + .map(_.build) + } + /** * Return true if this is a searchable type in ORC. * Both CharType and VarcharType are cleaned at AstBuilder. @@ -139,8 +128,6 @@ private[sql] object OrcFilters { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - def newBuilder = SearchArgumentFactory.newBuilder() - def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index bdb60b44750c7..dcde363ce7ab6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -398,6 +398,24 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter } } } + + test(s"Pushdown benchmark with many filters") { + val numRows = 1 + val width = 500 + + withTempPath { dir => + val columns = (1 to width).map(i => s"id c$i") + val df = spark.range(1).selectExpr(columns: _*) + withTempTable("orcTable", "patquetTable") { + saveAsTable(df, dir) + Seq(1, 250, 500).foreach { numFilter => + val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ") + // Note: InferFiltersFromConstraints will add more filters to this given filters + filterPushDownBenchmark(numRows, s"Select 1 row with $numFilter filters", whereExpr) + } + } + } + } } trait BenchmarkBeforeAndAfterEachTest extends BeforeAndAfterEachTestData { this: Suite => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 4798a59ea007d..8680b86517b19 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -23,9 +23,6 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.scalatest.concurrent.TimeLimits -import org.scalatest.time.SpanSugar._ -import scala.collection.JavaConverters._ import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -42,7 +39,7 @@ import org.apache.spark.sql.types._ * - OrcFilterSuite uses 'org.apache.orc.storage.ql.io.sarg' package. * - HiveOrcFilterSuite uses 'org.apache.hadoop.hive.ql.io.sarg' package. */ -class OrcFilterSuite extends OrcTest with SharedSQLContext with TimeLimits { +class OrcFilterSuite extends OrcTest with SharedSQLContext { private def checkFilterPredicate( df: DataFrame, @@ -386,13 +383,4 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext with TimeLimits { )).get.toString } } - - test("SPARK-25306 createFilter should not hang") { - import org.apache.spark.sql.sources._ - val schema = new StructType(Array(StructField("a", IntegerType, nullable = true))) - val filters = (1 to 2000).map(LessThan("a", _)).toArray[Filter] - failAfter(2 seconds) { - OrcFilters.createFilter(schema, filters) - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 35f6e01cf2da1..1703da8395fbb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.hive.orc -import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, SearchArgumentFactory} +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree @@ -59,27 +60,15 @@ private[orc] object OrcFilters extends Logging { def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - // First, tries to convert each filter individually to see whether it's convertible, and then - // collect all convertible ones to build the final `SearchArgument`. - val convertibleFilters = for { - filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder()) - } yield filter - - for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters) - // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder()) - } yield builder.build() + buildTree(filters.filter(buildSearchArgument(dataTypeMap, _, newBuilder).isDefined)) + .flatMap(buildSearchArgument(dataTypeMap, _, newBuilder)) + .map(_.build) } private def buildSearchArgument( dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - def newBuilder = SearchArgumentFactory.newBuilder() - def isSearchableType(dataType: DataType): Boolean = dataType match { // Only the values in the Spark types below can be recognized by // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala index 5cf2904d26ede..283037caf4a9b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala @@ -23,9 +23,6 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.scalatest.concurrent.TimeLimits -import org.scalatest.time.SpanSugar._ -import scala.collection.JavaConverters._ import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -39,7 +36,7 @@ import org.apache.spark.sql.types._ /** * A test suite that tests Hive ORC filter API based filter pushdown optimization. */ -class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton with TimeLimits { +class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { override val orcImp: String = "hive" @@ -387,13 +384,4 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton with TimeLimits )).get.toString } } - - test("SPARK-25306 createFilter should not hang") { - import org.apache.spark.sql.sources._ - val schema = new StructType(Array(StructField("a", IntegerType, nullable = true))) - val filters = (1 to 2000).map(LessThan("a", _)).toArray[Filter] - failAfter(2 seconds) { - OrcFilters.createFilter(schema, filters) - } - } } From 4a372a328b33961a16ae6ad69bb58ba0720e9b63 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 3 Sep 2018 23:31:18 -0700 Subject: [PATCH 5/6] fix typo --- .../benchmark/FilterPushdownBenchmark.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index dcde363ce7ab6..41087f1a97174 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -242,7 +242,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter ignore("Pushdown for many distinct value case") { withTempPath { dir => - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { Seq(true, false).foreach { useStringForValue => prepareTable(dir, numRows, width, useStringForValue) if (useStringForValue) { @@ -259,7 +259,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter withTempPath { dir => val numDistinctValues = 200 - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { prepareStringDictTable(dir, numRows, numDistinctValues, width) runStringBenchmark(numRows, width, numDistinctValues / 2, "distinct string") } @@ -268,7 +268,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter ignore("Pushdown benchmark for StringStartsWith") { withTempPath { dir => - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { prepareTable(dir, numRows, width, true) Seq( "value like '10%'", @@ -296,7 +296,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter monotonically_increasing_id() } val df = spark.range(numRows).selectExpr(columns: _*).withColumn("value", valueCol.cast(dt)) - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir) Seq(s"value = $mid").foreach { whereExpr => @@ -320,7 +320,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter ignore("Pushdown benchmark for InSet -> InFilters") { withTempPath { dir => - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { prepareTable(dir, numRows, width, false) Seq(5, 10, 50, 100).foreach { count => Seq(10, 50, 90).foreach { distribution => @@ -341,7 +341,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter val df = spark.range(numRows).selectExpr(columns: _*) .withColumn("value", (monotonically_increasing_id() % Byte.MaxValue).cast(ByteType)) .orderBy("value") - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir) Seq(s"value = CAST(${Byte.MaxValue / 2} AS ${ByteType.simpleString})") @@ -373,7 +373,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter val columns = (1 to width).map(i => s"CAST(id AS string) c$i") val df = spark.range(numRows).selectExpr(columns: _*) .withColumn("value", monotonically_increasing_id().cast(TimestampType)) - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir) Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr => @@ -406,7 +406,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter withTempPath { dir => val columns = (1 to width).map(i => s"id c$i") val df = spark.range(1).selectExpr(columns: _*) - withTempTable("orcTable", "patquetTable") { + withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir) Seq(1, 250, 500).foreach { numFilter => val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ") From 3cd444306c3b8b6c42a74b7cfb0755b8ce209c84 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 4 Sep 2018 01:55:46 -0700 Subject: [PATCH 6/6] Use old style. --- .../FilterPushdownBenchmark-results.txt | 24 +++++++++---------- .../datasources/orc/OrcFilters.scala | 16 ++++++++++--- .../spark/sql/hive/orc/OrcFilters.scala | 16 ++++++++++--- 3 files changed, 38 insertions(+), 18 deletions(-) diff --git a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt index 1b58392b323df..a75a15c99328a 100644 --- a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt +++ b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt @@ -712,27 +712,27 @@ Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz Select 1 row with 1 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 167 / 183 0.0 166581369.0 1.0X -Parquet Vectorized (Pushdown) 148 / 157 0.0 148497299.0 1.1X -Native ORC Vectorized 142 / 151 0.0 142053680.0 1.2X -Native ORC Vectorized (Pushdown) 142 / 148 0.0 142490700.0 1.2X +Parquet Vectorized 158 / 182 0.0 158442969.0 1.0X +Parquet Vectorized (Pushdown) 150 / 158 0.0 149718289.0 1.1X +Native ORC Vectorized 141 / 148 0.0 141259852.0 1.1X +Native ORC Vectorized (Pushdown) 142 / 147 0.0 142016472.0 1.1X Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6 Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz Select 1 row with 250 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 1135 / 1147 0.0 1134508548.0 1.0X -Parquet Vectorized (Pushdown) 1432 / 1442 0.0 1431916497.0 0.8X -Native ORC Vectorized 1122 / 1128 0.0 1121722239.0 1.0X -Native ORC Vectorized (Pushdown) 1175 / 1182 0.0 1175152267.0 1.0X +Parquet Vectorized 1013 / 1026 0.0 1013194322.0 1.0X +Parquet Vectorized (Pushdown) 1326 / 1332 0.0 1326301956.0 0.8X +Native ORC Vectorized 1005 / 1010 0.0 1005266379.0 1.0X +Native ORC Vectorized (Pushdown) 1068 / 1071 0.0 1067964993.0 0.9X Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6 Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz Select 1 row with 500 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 4480 / 4517 0.0 4480162380.0 1.0X -Parquet Vectorized (Pushdown) 5191 / 5222 0.0 5191121525.0 0.9X -Native ORC Vectorized 4474 / 4485 0.0 4474218663.0 1.0X -Native ORC Vectorized (Pushdown) 4704 / 4721 0.0 4704080940.0 1.0X +Parquet Vectorized 3598 / 3614 0.0 3598001202.0 1.0X +Parquet Vectorized (Pushdown) 4282 / 4333 0.0 4281849770.0 0.8X +Native ORC Vectorized 3594 / 3619 0.0 3593551548.0 1.0X +Native ORC Vectorized (Pushdown) 3834 / 3840 0.0 3834240570.0 0.9X diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 42db72b25c909..dbafc468c6c40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -73,9 +73,19 @@ private[sql] object OrcFilters { def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - buildTree(filters.filter(buildSearchArgument(dataTypeMap, _, newBuilder).isDefined)) - .flatMap(buildSearchArgument(dataTypeMap, _, newBuilder)) - .map(_.build) + // First, tries to convert each filter individually to see whether it's convertible, and then + // collect all convertible ones to build the final `SearchArgument`. + val convertibleFilters = for { + filter <- filters + _ <- buildSearchArgument(dataTypeMap, filter, newBuilder) + } yield filter + + for { + // Combines all convertible filters using `And` to produce a single conjunction + conjunction <- buildTree(convertibleFilters) + // Then tries to build a single ORC `SearchArgument` for the conjunction predicate + builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + } yield builder.build() } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 1703da8395fbb..aee9cb58a031e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -60,9 +60,19 @@ private[orc] object OrcFilters extends Logging { def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - buildTree(filters.filter(buildSearchArgument(dataTypeMap, _, newBuilder).isDefined)) - .flatMap(buildSearchArgument(dataTypeMap, _, newBuilder)) - .map(_.build) + // First, tries to convert each filter individually to see whether it's convertible, and then + // collect all convertible ones to build the final `SearchArgument`. + val convertibleFilters = for { + filter <- filters + _ <- buildSearchArgument(dataTypeMap, filter, newBuilder) + } yield filter + + for { + // Combines all convertible filters using `And` to produce a single conjunction + conjunction <- buildTree(convertibleFilters) + // Then tries to build a single ORC `SearchArgument` for the conjunction predicate + builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + } yield builder.build() } private def buildSearchArgument(