From c010a4ecd985d2c7a5b22c30504b3bee876ea3df Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 17 Jul 2020 15:56:29 +0300 Subject: [PATCH 01/23] Support row skipping in Avro datasource --- .../spark/sql/avro/AvroDataToCatalyst.scala | 2 +- .../spark/sql/avro/AvroDeserializer.scala | 15 ++++++++------- .../apache/spark/sql/avro/AvroFileFormat.scala | 18 ++++++++++-------- .../v2/avro/AvroPartitionReaderFactory.scala | 18 ++++++++++-------- .../avro/AvroCatalystDataConversionSuite.scala | 2 +- 5 files changed, 30 insertions(+), 25 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index 79c72057c5823..3374db73a5afa 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -98,7 +98,7 @@ case class AvroDataToCatalyst( try { decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) result = reader.read(result, decoder) - deserializer.deserialize(result) + deserializer.deserialize(result).get } catch { // There could be multiple possible exceptions here, e.g. java.io.IOException, // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc. diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 1d18594fd349c..359a3319f2548 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -61,10 +61,10 @@ class AvroDeserializer( private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead( datetimeRebaseMode, "Avro") - private val converter: Any => Any = rootCatalystType match { + private val converter: Any => Option[Any] = rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => - (data: Any) => InternalRow.empty + (data: Any) => Some(InternalRow.empty) case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) @@ -72,8 +72,8 @@ class AvroDeserializer( val writer = getRecordWriter(rootAvroType, st, Nil) (data: Any) => { val record = data.asInstanceOf[GenericRecord] - writer(fieldUpdater, record) - resultRow + val skipRow = writer(fieldUpdater, record) + if (skipRow) None else Some(resultRow) } case _ => @@ -82,11 +82,11 @@ class AvroDeserializer( val writer = newWriter(rootAvroType, rootCatalystType, Nil) (data: Any) => { writer(fieldUpdater, 0, data) - tmpRow.get(0, rootCatalystType) + Some(tmpRow.get(0, rootCatalystType)) } } - def deserialize(data: Any): Any = converter(data) + def deserialize(data: Any): Option[Any] = converter(data) /** * Creates a writer to write avro values to Catalyst values at the given ordinal with the given @@ -315,7 +315,7 @@ class AvroDeserializer( private def getRecordWriter( avroType: Schema, sqlType: StructType, - path: List[String]): (CatalystDataUpdater, GenericRecord) => Unit = { + path: List[String]): (CatalystDataUpdater, GenericRecord) => Boolean = { val validFieldIndexes = ArrayBuffer.empty[Int] val fieldWriters = ArrayBuffer.empty[(CatalystDataUpdater, Any) => Unit] @@ -354,6 +354,7 @@ class AvroDeserializer( fieldWriters(i)(fieldUpdater, record.get(validFieldIndexes(i))) i += 1 } + false } } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 59d54bc433f8b..67d59f46e61ad 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -133,26 +133,28 @@ private[sql] class AvroFileFormat extends FileFormat new Iterator[InternalRow] { private[this] var completed = false + private[this] var nextRow: Option[InternalRow] = None override def hasNext: Boolean = { - if (completed) { - false - } else { + do { val r = reader.hasNext && !reader.pastSync(stop) if (!r) { reader.close() completed = true + nextRow = None + } else { + val record = reader.next() + nextRow = deserializer.deserialize(record).asInstanceOf[Option[InternalRow]] } - r - } + } while (!completed && nextRow.isEmpty) + + nextRow.isDefined } override def next(): InternalRow = { - if (!hasNext) { + nextRow.getOrElse { throw new NoSuchElementException("next on empty iterator") } - val record = reader.next() - deserializer.deserialize(record).asInstanceOf[InternalRow] } } } else { diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index 15918f46a83bb..e3b61a81af9bc 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -96,26 +96,28 @@ case class AvroPartitionReaderFactory( val fileReader = new PartitionReader[InternalRow] { private[this] var completed = false + private[this] var nextRow: Option[InternalRow] = None override def next(): Boolean = { - if (completed) { - false - } else { + do { val r = reader.hasNext && !reader.pastSync(stop) if (!r) { reader.close() completed = true + nextRow = None + } else { + val record = reader.next() + nextRow = deserializer.deserialize(record).asInstanceOf[Option[InternalRow]] } - r - } + } while (!completed && nextRow.isEmpty) + + nextRow.isDefined } override def get(): InternalRow = { - if (!next) { + nextRow.getOrElse { throw new NoSuchElementException("next on empty iterator") } - val record = reader.next() - deserializer.deserialize(record).asInstanceOf[InternalRow] } override def close(): Unit = reader.close() diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index c8a1f670bda9e..bce2b6236f946 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -293,7 +293,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite def checkDeserialization(data: GenericData.Record, expected: Any): Unit = { assert(checkResult( expected, - deserializer.deserialize(data), + deserializer.deserialize(data).get, dataType, exprNullable = false )) } From ab17bd0e96e05ddef0930a5bad5a0fb09eaecd76 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 17 Jul 2020 19:20:13 +0300 Subject: [PATCH 02/23] Apply NoopFilters so far --- .../apache/spark/sql/avro/AvroDeserializer.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 359a3319f2548..f79d3886c496f 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -30,7 +30,7 @@ import org.apache.avro.Schema.Type._ import org.apache.avro.generic._ import org.apache.avro.util.Utf8 -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters} import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY @@ -69,7 +69,8 @@ class AvroDeserializer( case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) val fieldUpdater = new RowUpdater(resultRow) - val writer = getRecordWriter(rootAvroType, st, Nil) + val applyFilters = new NoopFilters().skipRow(resultRow, _) + val writer = getRecordWriter(rootAvroType, st, Nil, applyFilters) (data: Any) => { val record = data.asInstanceOf[GenericRecord] val skipRow = writer(fieldUpdater, record) @@ -178,7 +179,7 @@ class AvroDeserializer( updater.setDecimal(ordinal, decimal) case (RECORD, st: StructType) => - val writeRecord = getRecordWriter(avroType, st, path) + val writeRecord = getRecordWriter(avroType, st, path, applyFilters = _ => false) (updater, ordinal, value) => val row = new SpecificInternalRow(st) writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord]) @@ -315,7 +316,8 @@ class AvroDeserializer( private def getRecordWriter( avroType: Schema, sqlType: StructType, - path: List[String]): (CatalystDataUpdater, GenericRecord) => Boolean = { + path: List[String], + applyFilters: Int => Boolean): (CatalystDataUpdater, GenericRecord) => Boolean = { val validFieldIndexes = ArrayBuffer.empty[Int] val fieldWriters = ArrayBuffer.empty[(CatalystDataUpdater, Any) => Unit] @@ -350,11 +352,13 @@ class AvroDeserializer( (fieldUpdater, record) => { var i = 0 - while (i < validFieldIndexes.length) { + var skipRow = false + while (i < validFieldIndexes.length && !skipRow) { fieldWriters(i)(fieldUpdater, record.get(validFieldIndexes(i))) + skipRow = applyFilters(i) i += 1 } - false + skipRow } } From 665865d250c14b4d65433b6c6b10e099359889f7 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 17 Jul 2020 22:17:52 +0300 Subject: [PATCH 03/23] Pushdown real filters --- .../spark/sql/avro/AvroDeserializer.scala | 15 ++++++----- .../spark/sql/avro/AvroFileFormat.scala | 6 ++++- .../v2/avro/AvroPartitionReaderFactory.scala | 10 +++++-- .../apache/spark/sql/v2/avro/AvroScan.scala | 24 ++++++++++++++--- .../spark/sql/v2/avro/AvroScanBuilder.scala | 27 ++++++++++++++++--- .../org/apache/spark/sql/avro/AvroSuite.scala | 4 ++- 6 files changed, 69 insertions(+), 17 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index f79d3886c496f..2fc277066e2be 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -30,7 +30,7 @@ import org.apache.avro.Schema.Type._ import org.apache.avro.generic._ import org.apache.avro.util.Utf8 -import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters} +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters} import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY @@ -45,12 +45,15 @@ import org.apache.spark.unsafe.types.UTF8String class AvroDeserializer( rootAvroType: Schema, rootCatalystType: DataType, - datetimeRebaseMode: LegacyBehaviorPolicy.Value) { + datetimeRebaseMode: LegacyBehaviorPolicy.Value, + filters: StructFilters) { def this(rootAvroType: Schema, rootCatalystType: DataType) { - this(rootAvroType, rootCatalystType, - LegacyBehaviorPolicy.withName( - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))) + this( + rootAvroType, + rootCatalystType, + LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)), + new NoopFilters) } private lazy val decimalConversions = new DecimalConversion() @@ -69,7 +72,7 @@ class AvroDeserializer( case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) val fieldUpdater = new RowUpdater(resultRow) - val applyFilters = new NoopFilters().skipRow(resultRow, _) + val applyFilters = filters.skipRow(resultRow, _) val writer = getRecordWriter(rootAvroType, st, Nil, applyFilters) (data: Any) => { val record = data.asInstanceOf[GenericRecord] diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 67d59f46e61ad..eb9815dcc75fd 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -34,6 +34,7 @@ import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv.CSVFilters import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileFormat, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, Filter} @@ -129,7 +130,10 @@ private[sql] class AvroFileFormat extends FileFormat SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) val deserializer = new AvroDeserializer( - userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode) + userProvidedSchema.getOrElse(reader.getSchema), + requiredSchema, + datetimeRebaseMode, + new CSVFilters(filters, requiredSchema)) new Iterator[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index e3b61a81af9bc..514a893c30ca1 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -31,10 +31,12 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv.CSVFilters import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile} import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader, FilePartitionReaderFactory, PartitionReaderWithPartitionValues} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -54,7 +56,8 @@ case class AvroPartitionReaderFactory( dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType, - parsedOptions: AvroOptions) extends FilePartitionReaderFactory with Logging { + parsedOptions: AvroOptions, + filters: Seq[Filter]) extends FilePartitionReaderFactory with Logging { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value @@ -92,7 +95,10 @@ case class AvroPartitionReaderFactory( reader.asInstanceOf[DataFileReader[_]].getMetaString, SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) val deserializer = new AvroDeserializer( - userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode) + userProvidedSchema.getOrElse(reader.getSchema), + readDataSchema, + datetimeRebaseMode, + new CSVFilters(filters, readDataSchema)) val fileReader = new PartitionReader[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala index fe7315c739296..e94bef2f8bebe 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScan +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration @@ -37,6 +38,7 @@ case class AvroScan( readDataSchema: StructType, readPartitionSchema: StructType, options: CaseInsensitiveStringMap, + pushedFilters: Array[Filter], partitionFilters: Seq[Expression] = Seq.empty, dataFilters: Seq[Expression] = Seq.empty) extends FileScan { override def isSplitable(path: Path): Boolean = true @@ -50,8 +52,14 @@ case class AvroScan( val parsedOptions = new AvroOptions(caseSensitiveMap, hadoopConf) // The partition values are already truncated in `FileScan.partitions`. // We should use `readPartitionSchema` as the partition schema here. - AvroPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, readDataSchema, readPartitionSchema, parsedOptions) + AvroPartitionReaderFactory( + sparkSession.sessionState.conf, + broadcastedConf, + dataSchema, + readDataSchema, + readPartitionSchema, + parsedOptions, + pushedFilters) } override def withFilters( @@ -59,10 +67,18 @@ case class AvroScan( this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) override def equals(obj: Any): Boolean = obj match { - case a: AvroScan => super.equals(a) && dataSchema == a.dataSchema && options == a.options - + case a: AvroScan => super.equals(a) && dataSchema == a.dataSchema && options == a.options && + equivalentFilters(pushedFilters, a.pushedFilters) case _ => false } override def hashCode(): Int = super.hashCode() + + override def description(): String = { + super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]") + } + + override def getMetaData(): Map[String, String] = { + super.getMetaData() ++ Map("PushedFilers" -> seqToString(pushedFilters)) + } } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala index e36c71ef4b1f7..40df0e7b79d4b 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.v2.avro import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.sql.catalyst.StructFilters +import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters} import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -29,8 +31,27 @@ class AvroScanBuilder ( schema: StructType, dataSchema: StructType, options: CaseInsensitiveStringMap) - extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) with SupportsPushDownFilters { + override def build(): Scan = { - AvroScan(sparkSession, fileIndex, dataSchema, readDataSchema(), readPartitionSchema(), options) + AvroScan( + sparkSession, + fileIndex, + dataSchema, + readDataSchema(), + readPartitionSchema(), + options, + pushedFilters()) } + + private var _pushedFilters: Array[Filter] = Array.empty + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + if (sparkSession.sessionState.conf.csvFilterPushDown) { + _pushedFilters = StructFilters.pushedFilters(filters, dataSchema) + } + filters + } + + override def pushedFilters(): Array[Filter] = _pushedFilters } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 83a7ef0061fb2..dca6a2debcac8 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1920,6 +1920,7 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper { |Format: avro |Location: InMemoryFileIndex\\[.*\\] |PartitionFilters: \\[isnotnull\\(id#x\\), \\(id#x > 1\\)\\] + |PushedFilers: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\] |ReadSchema: struct\\ |""".stripMargin.trim spark.range(10) @@ -1933,7 +1934,8 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper { .format("avro") .load(basePath).where($"id" > 1 && $"value" > 2) val normalizedOutput = getNormalizedExplain(df, FormattedMode) - assert(expected_plan_fragment.r.findAllMatchIn(normalizedOutput).length == 1) + assert(expected_plan_fragment.r.findAllMatchIn(normalizedOutput).length == 1, + normalizedOutput) } } } From 33d9383838bbc5587f352f3b31a51773da6ff58a Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 17 Jul 2020 22:25:01 +0300 Subject: [PATCH 04/23] Add a benchmark --- .../benchmark/AvroReadBenchmark.scala | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala index dc9606f405191..771346492ed0f 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.execution.benchmark import java.io.File +import java.time.Instant import scala.util.Random import org.apache.spark.benchmark.Benchmark -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -36,6 +39,8 @@ import org.apache.spark.sql.types._ * }}} */ object AvroReadBenchmark extends SqlBasedBenchmark { + import spark.implicits._ + def withTempTable(tableNames: String*)(f: => Unit): Unit = { try f finally tableNames.foreach(spark.catalog.dropTempView) } @@ -186,6 +191,51 @@ object AvroReadBenchmark extends SqlBasedBenchmark { } } + private def filtersPushdownBenchmark(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark("Filters pushdown", rowsNum, output = output) + val colsNum = 100 + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", TimestampType)) + val schema = StructType(StructField("key", IntegerType) +: fields) + def columns(): Seq[Column] = { + val ts = Seq.tabulate(colsNum) { i => + lit(Instant.ofEpochSecond(i * 12345678)).as(s"col$i") + } + ($"id" % 1000).as("key") +: ts + } + withTempPath { path => + spark.range(rowsNum).select(columns(): _*) + .write + .format("avro") + .save(path.getAbsolutePath) + def readback = { + spark.read + .schema(schema) + .format("avro") + .load(path.getAbsolutePath) + } + + benchmark.addCase("w/o filters", numIters) { _ => + readback.noop() + } + + def withFilter(configEnabled: Boolean): Unit = { + withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> configEnabled.toString()) { + readback.filter($"key" === 0).noop() + } + } + + benchmark.addCase("pushdown disabled", numIters) { _ => + withFilter(configEnabled = false) + } + + benchmark.addCase("w/ filters", numIters) { _ => + withFilter(configEnabled = true) + } + + benchmark.run() + } + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("SQL Single Numeric Column Scan") { Seq(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType).foreach { dataType => @@ -211,5 +261,8 @@ object AvroReadBenchmark extends SqlBasedBenchmark { columnsBenchmark(1024 * 1024 * 1, 200) columnsBenchmark(1024 * 1024 * 1, 300) } + // Benchmark pushdown filters that refer to top-level columns. + // TODO (SPARK-XXXXX): Add benchmarks for filters with nested column attributes. + filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters = 3) } } From ad21524ce073bc962d0b6c54ff030344ac6ed5c3 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 17 Jul 2020 22:41:32 +0300 Subject: [PATCH 05/23] Fix the benchmark --- .../spark/sql/execution/benchmark/AvroReadBenchmark.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala index 771346492ed0f..c7fc4559d1175 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala @@ -195,7 +195,7 @@ object AvroReadBenchmark extends SqlBasedBenchmark { val benchmark = new Benchmark("Filters pushdown", rowsNum, output = output) val colsNum = 100 val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", TimestampType)) - val schema = StructType(StructField("key", IntegerType) +: fields) + val schema = StructType(StructField("key", LongType) +: fields) def columns(): Seq[Column] = { val ts = Seq.tabulate(colsNum) { i => lit(Instant.ofEpochSecond(i * 12345678)).as(s"col$i") @@ -263,6 +263,6 @@ object AvroReadBenchmark extends SqlBasedBenchmark { } // Benchmark pushdown filters that refer to top-level columns. // TODO (SPARK-XXXXX): Add benchmarks for filters with nested column attributes. - filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters = 3) + filtersPushdownBenchmark(rowsNum = 1000 * 1000, numIters = 3) } } From 0ea0ef7f67428012db8674e7b7582582e478bea2 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 17 Jul 2020 23:02:43 +0300 Subject: [PATCH 06/23] Make timestamp read more expensive --- .../benchmark/AvroReadBenchmark.scala | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala index c7fc4559d1175..b183a89301292 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala @@ -198,15 +198,17 @@ object AvroReadBenchmark extends SqlBasedBenchmark { val schema = StructType(StructField("key", LongType) +: fields) def columns(): Seq[Column] = { val ts = Seq.tabulate(colsNum) { i => - lit(Instant.ofEpochSecond(i * 12345678)).as(s"col$i") + lit(Instant.ofEpochSecond(-30610224000L + i * 123456)).as(s"col$i") } ($"id" % 1000).as("key") +: ts } withTempPath { path => - spark.range(rowsNum).select(columns(): _*) - .write - .format("avro") - .save(path.getAbsolutePath) + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> "LEGACY") { + spark.range(rowsNum).select(columns(): _*) + .write + .format("avro") + .save(path.getAbsolutePath) + } def readback = { spark.read .schema(schema) @@ -215,17 +217,23 @@ object AvroReadBenchmark extends SqlBasedBenchmark { } benchmark.addCase("w/o filters", numIters) { _ => - readback.noop() + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> "LEGACY") { + readback.noop() + } } def withFilter(configEnabled: Boolean): Unit = { - withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> configEnabled.toString()) { + withSQLConf( + SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> "LEGACY", + SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> configEnabled.toString()) { readback.filter($"key" === 0).noop() } } benchmark.addCase("pushdown disabled", numIters) { _ => - withFilter(configEnabled = false) + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> "LEGACY") { + withFilter(configEnabled = false) + } } benchmark.addCase("w/ filters", numIters) { _ => From cf083e2e234c5221eb70a6f74c5f5f006c30b691 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 17 Jul 2020 20:24:02 +0000 Subject: [PATCH 07/23] Update AvroReadBenchmark-results.txt --- .../benchmarks/AvroReadBenchmark-results.txt | 72 ++++++++++--------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/external/avro/benchmarks/AvroReadBenchmark-results.txt b/external/avro/benchmarks/AvroReadBenchmark-results.txt index 0ab611a0f9a29..3108a9c8e13fe 100644 --- a/external/avro/benchmarks/AvroReadBenchmark-results.txt +++ b/external/avro/benchmarks/AvroReadBenchmark-results.txt @@ -2,121 +2,129 @@ SQL Single Numeric Column Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 3049 3071 32 5.2 193.8 1.0X +Sum 2841 2846 7 5.5 180.6 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single SMALLINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2982 2992 13 5.3 189.6 1.0X +Sum 2777 2799 30 5.7 176.6 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single INT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2984 2989 7 5.3 189.7 1.0X +Sum 2730 2753 33 5.8 173.6 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single BIGINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 3262 3353 128 4.8 207.4 1.0X +Sum 3278 3284 9 4.8 208.4 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single FLOAT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2716 2723 10 5.8 172.7 1.0X +Sum 2801 2805 6 5.6 178.1 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single DOUBLE Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2868 2870 3 5.5 182.4 1.0X +Sum 2976 2984 12 5.3 189.2 1.0X ================================================================================================ Int and String Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Int and String Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of columns 4714 4739 35 2.2 449.6 1.0X +Sum of columns 4674 4686 17 2.2 445.8 1.0X ================================================================================================ Partitioned Table Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Partitioned Table: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Data column 3257 3286 41 4.8 207.1 1.0X -Partition column 3258 3277 27 4.8 207.2 1.0X -Both columns 3399 3405 9 4.6 216.1 1.0X +Data column 3273 3284 17 4.8 208.1 1.0X +Partition column 2934 2935 2 5.4 186.6 1.1X +Both columns 3395 3405 14 4.6 215.8 1.0X ================================================================================================ Repeated String Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Repeated String: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 3292 3316 33 3.2 314.0 1.0X +Sum of string length 3340 3353 19 3.1 318.5 1.0X ================================================================================================ String with Nulls Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (0.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 5450 5456 9 1.9 519.7 1.0X +Sum of string length 5484 5493 12 1.9 523.0 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (50.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 4410 4435 35 2.4 420.6 1.0X +Sum of string length 3817 3833 22 2.7 364.0 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (95.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 3074 3122 68 3.4 293.2 1.0X +Sum of string length 2340 2354 20 4.5 223.2 1.0X ================================================================================================ Single Column Scan From Wide Columns ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 100 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 5120 5136 23 0.2 4882.7 1.0X +Sum of single column 4709 4719 14 0.2 4491.1 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 200 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 9952 10002 71 0.1 9490.7 1.0X +Sum of single column 9159 9171 18 0.1 8734.3 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 300 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 14973 14978 7 0.1 14279.8 1.0X +Sum of single column 13645 13751 151 0.1 13012.8 1.0X +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +w/o filters 9215 9309 146 0.1 9215.2 1.0X +pushdown disabled 9535 9637 96 0.1 9534.9 1.0X +w/ filters 3969 3994 22 0.3 3969.5 2.3X + From 5c4c1779e60c51a7dd8fcbce49d93dc76de1a606 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 17 Jul 2020 20:46:51 +0000 Subject: [PATCH 08/23] Update AvroReadBenchmark-jdk11-results.txt --- .../AvroReadBenchmark-jdk11-results.txt | 72 ++++++++++--------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/external/avro/benchmarks/AvroReadBenchmark-jdk11-results.txt b/external/avro/benchmarks/AvroReadBenchmark-jdk11-results.txt index 3c1b5af0d5986..b70b1446f6f64 100644 --- a/external/avro/benchmarks/AvroReadBenchmark-jdk11-results.txt +++ b/external/avro/benchmarks/AvroReadBenchmark-jdk11-results.txt @@ -2,121 +2,129 @@ SQL Single Numeric Column Scan ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2689 2694 7 5.8 170.9 1.0X +Sum 2872 2936 90 5.5 182.6 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single SMALLINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2741 2759 26 5.7 174.2 1.0X +Sum 2810 2838 40 5.6 178.6 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single INT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2736 2748 17 5.7 173.9 1.0X +Sum 2901 2922 30 5.4 184.4 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single BIGINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 3305 3317 17 4.8 210.2 1.0X +Sum 3387 3391 5 4.6 215.4 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single FLOAT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2904 2952 68 5.4 184.6 1.0X +Sum 2890 2960 99 5.4 183.7 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single DOUBLE Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 3090 3093 4 5.1 196.5 1.0X +Sum 3067 3088 30 5.1 195.0 1.0X ================================================================================================ Int and String Scan ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Int and String Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of columns 5351 5365 20 2.0 510.3 1.0X +Sum of columns 4736 4818 116 2.2 451.7 1.0X ================================================================================================ Partitioned Table Scan ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Partitioned Table: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Data column 3278 3288 14 4.8 208.4 1.0X -Partition column 3149 3193 62 5.0 200.2 1.0X -Both columns 3198 3204 7 4.9 203.4 1.0X +Data column 3383 3400 23 4.6 215.1 1.0X +Partition column 2949 2959 14 5.3 187.5 1.1X +Both columns 3522 3545 33 4.5 223.9 1.0X ================================================================================================ Repeated String Scan ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Repeated String: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 3435 3438 5 3.1 327.6 1.0X +Sum of string length 3332 3355 32 3.1 317.7 1.0X ================================================================================================ String with Nulls Scan ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (0.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 5634 5650 23 1.9 537.3 1.0X +Sum of string length 5588 5652 90 1.9 532.9 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (50.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 4725 4752 39 2.2 450.6 1.0X +Sum of string length 3858 3865 9 2.7 368.0 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (95.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 3550 3566 23 3.0 338.6 1.0X +Sum of string length 2562 2571 12 4.1 244.3 1.0X ================================================================================================ Single Column Scan From Wide Columns ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 100 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 5271 5279 11 0.2 5027.0 1.0X +Sum of single column 5241 5243 3 0.2 4998.0 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 200 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 10393 10516 174 0.1 9911.3 1.0X +Sum of single column 10178 10185 10 0.1 9706.5 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 300 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 15330 15343 19 0.1 14619.6 1.0X +Sum of single column 15201 15232 44 0.1 14496.4 1.0X +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +w/o filters 9614 9669 54 0.1 9614.1 1.0X +pushdown disabled 10077 10141 66 0.1 10077.2 1.0X +w/ filters 4681 4713 29 0.2 4681.5 2.1X + From eb0c983d8534385e1a8e68ac294c887ac7650a74 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 18 Jul 2020 21:28:32 +0300 Subject: [PATCH 09/23] Tests for Avro deserializer --- .../AvroCatalystDataConversionSuite.scala | 81 ++++++++++++++++--- 1 file changed, 68 insertions(+), 13 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index bce2b6236f946..2a3c0ef7fac15 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -26,11 +26,15 @@ import org.apache.avro.message.{BinaryMessageDecoder, BinaryMessageEncoder} import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.{RandomDataGenerator, Row} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, NoopFilters, StructFilters} +import org.apache.spark.sql.catalyst.csv.CSVFilters import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.{EqualTo, Not} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String class AvroCatalystDataConversionSuite extends SparkFunSuite with SharedSparkSession @@ -272,6 +276,25 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite assert(message == "Cannot convert Catalyst type StringType to Avro type \"long\".") } + private def checkDeserialization( + schema: Schema, + data: GenericData.Record, + expected: Option[Any], + filters: StructFilters = new NoopFilters): Unit = { + val dataType = SchemaConverters.toSqlType(schema).dataType + val deserializer = new AvroDeserializer( + schema, + dataType, + SQLConf.LegacyBehaviorPolicy.CORRECTED, + filters) + val deserialized = deserializer.deserialize(data) + expected match { + case None => assert(deserialized == None) + case Some(d) => + assert(checkResult(d, deserialized.get, dataType, exprNullable = false)) + } + } + test("avro array can be generic java collection") { val jsonFormatSchema = """ @@ -287,30 +310,62 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite |} """.stripMargin val avroSchema = new Schema.Parser().parse(jsonFormatSchema) - val dataType = SchemaConverters.toSqlType(avroSchema).dataType - val deserializer = new AvroDeserializer(avroSchema, dataType) - - def checkDeserialization(data: GenericData.Record, expected: Any): Unit = { - assert(checkResult( - expected, - deserializer.deserialize(data).get, - dataType, exprNullable = false - )) - } def validateDeserialization(array: java.util.Collection[Integer]): Unit = { val data = new GenericRecordBuilder(avroSchema) .set("array", array) .build() val expected = InternalRow(new GenericArrayData(new util.ArrayList[Any](array))) - checkDeserialization(data, expected) + checkDeserialization(avroSchema, data, Some(expected)) val reEncoded = new BinaryMessageDecoder[GenericData.Record](new GenericData(), avroSchema) .decode(new BinaryMessageEncoder(new GenericData(), avroSchema).encode(data)) - checkDeserialization(reEncoded, expected) + checkDeserialization(avroSchema, reEncoded, Some(expected)) } validateDeserialization(Collections.emptySet()) validateDeserialization(util.Arrays.asList(1, null, 3)) } + + test("filter pushdown to Avro deserializer") { + val schema = + """ + |{ + | "type" : "record", + | "name" : "test_schema", + | "fields" : [ + | {"name": "Age", "type": "int"}, + | {"name": "Name", "type": "string"} + | ] + |} + """.stripMargin + val avroSchema = new Schema.Parser().parse(schema) + val sqlSchema = new StructType().add("Age", "int").add("Name", "string") + val data = new GenericRecordBuilder(avroSchema) + .set("Age", 39) + .set("Name", "Maxim") + .build() + val expectedRow = Some(InternalRow(39, UTF8String.fromString("Maxim"))) + + checkDeserialization(avroSchema, data, expectedRow) + withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> "true") { + checkDeserialization( + avroSchema, + data, + expectedRow, + new CSVFilters(Seq(EqualTo("Age", 39)), sqlSchema)) + checkDeserialization( + avroSchema, + data, + None, + new CSVFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) + } + withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> "false") { + checkDeserialization( + avroSchema, + data, + expectedRow, + new CSVFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) + } + } } From 72022bb1534980462fa95a89b188e2410a956bca Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 18 Jul 2020 21:36:34 +0300 Subject: [PATCH 10/23] Set JIRA ID in the benchmark --- .../spark/sql/execution/benchmark/AvroReadBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala index b183a89301292..6ce0e73662c17 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala @@ -270,7 +270,7 @@ object AvroReadBenchmark extends SqlBasedBenchmark { columnsBenchmark(1024 * 1024 * 1, 300) } // Benchmark pushdown filters that refer to top-level columns. - // TODO (SPARK-XXXXX): Add benchmarks for filters with nested column attributes. + // TODO (SPARK-32328): Add benchmarks for filters with nested column attributes. filtersPushdownBenchmark(rowsNum = 1000 * 1000, numIters = 3) } } From 072eab0a7c35f5827b1c59d884fe3eea18251bc9 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 18 Jul 2020 23:14:00 +0300 Subject: [PATCH 11/23] CSVFilters -> OrderedFilters --- .../apache/spark/sql/avro/AvroFileFormat.scala | 5 ++--- .../sql/v2/avro/AvroPartitionReaderFactory.scala | 5 ++--- .../avro/AvroCatalystDataConversionSuite.scala | 9 ++++----- .../CSVFilters.scala => OrderedFilters.scala} | 15 +++++++-------- .../spark/sql/catalyst/csv/UnivocityParser.scala | 4 ++-- ...ltersSuite.scala => OrderedFiltersSuite.scala} | 7 +++---- 6 files changed, 20 insertions(+), 25 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/{csv/CSVFilters.scala => OrderedFilters.scala} (89%) rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/{csv/CSVFiltersSuite.scala => OrderedFiltersSuite.scala} (83%) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index eb9815dcc75fd..3c335472879af 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -33,8 +33,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.csv.CSVFilters +import org.apache.spark.sql.catalyst.{InternalRow, OrderedFilters} import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileFormat, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, Filter} @@ -133,7 +132,7 @@ private[sql] class AvroFileFormat extends FileFormat userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode, - new CSVFilters(filters, requiredSchema)) + new OrderedFilters(filters, requiredSchema)) new Iterator[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index 514a893c30ca1..fae73da49828a 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -30,8 +30,7 @@ import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.csv.CSVFilters +import org.apache.spark.sql.catalyst.{InternalRow, OrderedFilters} import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile} import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader, FilePartitionReaderFactory, PartitionReaderWithPartitionValues} @@ -98,7 +97,7 @@ case class AvroPartitionReaderFactory( userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode, - new CSVFilters(filters, readDataSchema)) + new OrderedFilters(filters, readDataSchema)) val fileReader = new PartitionReader[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index 2a3c0ef7fac15..7f890bf11c260 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -26,8 +26,7 @@ import org.apache.avro.message.{BinaryMessageDecoder, BinaryMessageEncoder} import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.{RandomDataGenerator, Row} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, NoopFilters, StructFilters} -import org.apache.spark.sql.catalyst.csv.CSVFilters +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, NoopFilters, OrderedFilters, StructFilters} import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} import org.apache.spark.sql.internal.SQLConf @@ -353,19 +352,19 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite avroSchema, data, expectedRow, - new CSVFilters(Seq(EqualTo("Age", 39)), sqlSchema)) + new OrderedFilters(Seq(EqualTo("Age", 39)), sqlSchema)) checkDeserialization( avroSchema, data, None, - new CSVFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) + new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) } withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> "false") { checkDeserialization( avroSchema, data, expectedRow, - new CSVFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) + new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala similarity index 89% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala index d2cb2c4d8134a..535e118f6d322 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala @@ -15,23 +15,22 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst.csv +package org.apache.spark.sql.catalyst -import org.apache.spark.sql.catalyst.{InternalRow, StructFilters} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType /** - * An instance of the class compiles filters to predicates and allows to - * apply the predicates to an internal row with partially initialized values - * converted from parsed CSV fields. + * An instance of the class compiles filters to predicates and sorts them in + * the order which allows to apply the predicates to an internal row with partially + * initialized values, for instance converted from parsed CSV fields. * - * @param filters The filters pushed down to CSV datasource. + * @param filters The filters pushed down to a datasource. * @param requiredSchema The schema with only fields requested by the upper layer. */ -class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) +class OrderedFilters(filters: Seq[sources.Filter], requiredSchema: StructType) extends StructFilters(filters, requiredSchema) { /** * Converted filters to predicates and grouped by maximum field index @@ -94,7 +93,7 @@ class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) predicate != null && !predicate.eval(row) } - // CSV filters are applied sequentially, and no need to track which filter references + // The filters are applied sequentially, and no need to track which filter references // point out to already set row values. The `reset()` method is trivial because // the filters don't have any states. def reset(): Unit = {} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 898b963fd0ab5..fa9f1034f069b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -25,7 +25,7 @@ import com.univocity.parsers.csv.CsvParser import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, OrderedFilters} import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT @@ -98,7 +98,7 @@ class UnivocityParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) - private val csvFilters = new CSVFilters(filters, requiredSchema) + private val csvFilters = new OrderedFilters(filters, requiredSchema) // Retrieve the raw record string. private def getCurrentInput: UTF8String = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/OrderedFiltersSuite.scala similarity index 83% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVFiltersSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/OrderedFiltersSuite.scala index 21bef20d7d4d9..b156cb52e921c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVFiltersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/OrderedFiltersSuite.scala @@ -15,14 +15,13 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst.csv +package org.apache.spark.sql.catalyst -import org.apache.spark.sql.catalyst.{StructFilters, StructFiltersSuite} import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -class CSVFiltersSuite extends StructFiltersSuite { +class OrderedFiltersSuite extends StructFiltersSuite { override def createFilters(filters: Seq[sources.Filter], schema: StructType): StructFilters = { - new CSVFilters(filters, schema) + new OrderedFilters(filters, schema) } } From 7eb3f50199b214d95f2b81576fbb6480f0d81e96 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 19 Jul 2020 11:19:38 +0300 Subject: [PATCH 12/23] Move config checking from OrderedFilters --- .../spark/sql/catalyst/OrderedFilters.scala | 50 +++++++++---------- .../sql/catalyst/csv/UnivocityParser.scala | 9 +++- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala index 535e118f6d322..a4b322d2bd8ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala @@ -47,33 +47,31 @@ class OrderedFilters(filters: Seq[sources.Filter], requiredSchema: StructType) private val predicates: Array[BasePredicate] = { val len = requiredSchema.fields.length val groupedPredicates = Array.fill[BasePredicate](len)(null) - if (SQLConf.get.csvFilterPushDown) { - val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter]) - for (filter <- filters) { - val refs = filter.references - val index = if (refs.isEmpty) { - // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any references - // Filters w/o refs always return the same result. Taking into account - // that predicates are combined via `And`, we can apply such filters only - // once at the position 0. - 0 - } else { - // readSchema must contain attributes of all filters. - // Accordingly, `fieldIndex()` returns a valid index always. - refs.map(requiredSchema.fieldIndex).max - } - groupedFilters(index) :+= filter + val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter]) + for (filter <- filters) { + val refs = filter.references + val index = if (refs.isEmpty) { + // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any references + // Filters w/o refs always return the same result. Taking into account + // that predicates are combined via `And`, we can apply such filters only + // once at the position 0. + 0 + } else { + // readSchema must contain attributes of all filters. + // Accordingly, `fieldIndex()` returns a valid index always. + refs.map(requiredSchema.fieldIndex).max } - if (len > 0 && !groupedFilters(0).isEmpty) { - // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse` - // can be evaluated faster that others. We put them in front of others. - val (literals, others) = groupedFilters(0).partition(_.references.isEmpty) - groupedFilters(0) = literals ++ others - } - for (i <- 0 until len) { - if (!groupedFilters(i).isEmpty) { - groupedPredicates(i) = toPredicate(groupedFilters(i)) - } + groupedFilters(index) :+= filter + } + if (len > 0 && !groupedFilters(0).isEmpty) { + // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse` + // can be evaluated faster that others. We put them in front of others. + val (literals, others) = groupedFilters(0).partition(_.references.isEmpty) + groupedFilters(0) = literals ++ others + } + for (i <- 0 until len) { + if (!groupedFilters(i).isEmpty) { + groupedPredicates(i) = toPredicate(groupedFilters(i)) } } groupedPredicates diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index fa9f1034f069b..b5c14a193ddee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -25,10 +25,11 @@ import com.univocity.parsers.csv.CsvParser import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.{InternalRow, OrderedFilters} +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -98,7 +99,11 @@ class UnivocityParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) - private val csvFilters = new OrderedFilters(filters, requiredSchema) + private val csvFilters = if (SQLConf.get.csvFilterPushDown) { + new OrderedFilters(filters, requiredSchema) + } else { + new NoopFilters + } // Retrieve the raw record string. private def getCurrentInput: UTF8String = { From dc2f2a7ceba6dfcfab6e964466ce9b10f15dc289 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 19 Jul 2020 11:25:40 +0300 Subject: [PATCH 13/23] Add the SQL config spark.sql.avro.filterPushdown.enabled --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f0d0a601ff196..48a43e67ca89a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2549,6 +2549,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val AVRO_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.avro.filterPushdown.enabled") + .doc("When true, enable filter pushdown to Avro datasource.") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + val ADD_PARTITION_BATCH_SIZE = buildConf("spark.sql.addPartitionInBatch.size") .internal() @@ -3263,6 +3269,8 @@ class SQLConf extends Serializable with Logging { def jsonFilterPushDown: Boolean = getConf(JSON_FILTER_PUSHDOWN_ENABLED) + def avroFilterPushDown: Boolean = getConf(AVRO_FILTER_PUSHDOWN_ENABLED) + def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) def legacyAllowCastNumericToTimestamp: Boolean = From fdfd2ca39f47010fec6d5692e3387e8ba7fe95c8 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 19 Jul 2020 11:40:47 +0300 Subject: [PATCH 14/23] Check the flag spark.sql.avro.filterPushdown.enabled --- .../spark/sql/avro/AvroFileFormat.scala | 10 +++++-- .../v2/avro/AvroPartitionReaderFactory.scala | 11 +++++-- .../spark/sql/v2/avro/AvroScanBuilder.scala | 2 +- .../AvroCatalystDataConversionSuite.scala | 29 +++++++------------ .../benchmark/AvroReadBenchmark.scala | 2 +- 5 files changed, 29 insertions(+), 25 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 3c335472879af..f517c28066b46 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.{InternalRow, OrderedFilters} +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileFormat, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, Filter} @@ -128,11 +128,17 @@ private[sql] class AvroFileFormat extends FileFormat reader.asInstanceOf[DataFileReader[_]].getMetaString, SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) + val avroFilters = if (SQLConf.get.avroFilterPushDown) { + new OrderedFilters(filters, requiredSchema) + } else { + new NoopFilters + } + val deserializer = new AvroDeserializer( userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode, - new OrderedFilters(filters, requiredSchema)) + avroFilters) new Iterator[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index fae73da49828a..d66ce3ef12ea8 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -30,7 +30,7 @@ import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions} -import org.apache.spark.sql.catalyst.{InternalRow, OrderedFilters} +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile} import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader, FilePartitionReaderFactory, PartitionReaderWithPartitionValues} @@ -93,11 +93,18 @@ case class AvroPartitionReaderFactory( val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( reader.asInstanceOf[DataFileReader[_]].getMetaString, SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) + + val avroFilters = if (SQLConf.get.avroFilterPushDown) { + new OrderedFilters(filters, readDataSchema) + } else { + new NoopFilters + } + val deserializer = new AvroDeserializer( userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode, - new OrderedFilters(filters, readDataSchema)) + avroFilters) val fileReader = new PartitionReader[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala index 40df0e7b79d4b..9420608bb22ce 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala @@ -47,7 +47,7 @@ class AvroScanBuilder ( private var _pushedFilters: Array[Filter] = Array.empty override def pushFilters(filters: Array[Filter]): Array[Filter] = { - if (sparkSession.sessionState.conf.csvFilterPushDown) { + if (sparkSession.sessionState.conf.avroFilterPushDown) { _pushedFilters = StructFilters.pushedFilters(filters, dataSchema) } filters diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index 7f890bf11c260..8f539ec8fcfec 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -347,24 +347,15 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite val expectedRow = Some(InternalRow(39, UTF8String.fromString("Maxim"))) checkDeserialization(avroSchema, data, expectedRow) - withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> "true") { - checkDeserialization( - avroSchema, - data, - expectedRow, - new OrderedFilters(Seq(EqualTo("Age", 39)), sqlSchema)) - checkDeserialization( - avroSchema, - data, - None, - new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) - } - withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> "false") { - checkDeserialization( - avroSchema, - data, - expectedRow, - new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) - } + checkDeserialization( + avroSchema, + data, + expectedRow, + new OrderedFilters(Seq(EqualTo("Age", 39)), sqlSchema)) + checkDeserialization( + avroSchema, + data, + None, + new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) } } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala index 6ce0e73662c17..00c3ec7614cc4 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala @@ -225,7 +225,7 @@ object AvroReadBenchmark extends SqlBasedBenchmark { def withFilter(configEnabled: Boolean): Unit = { withSQLConf( SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> "LEGACY", - SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> configEnabled.toString()) { + SQLConf.AVRO_FILTER_PUSHDOWN_ENABLED.key -> configEnabled.toString()) { readback.filter($"key" === 0).noop() } } From 92efd4bd44988281a0339b08ea9ac4495a637569 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 19 Jul 2020 12:02:40 +0300 Subject: [PATCH 15/23] Add JIRA id to AvroCatalystDataConversionSuite --- .../apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index 8f539ec8fcfec..2d3209f8daa26 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -326,7 +326,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite validateDeserialization(util.Arrays.asList(1, null, 3)) } - test("filter pushdown to Avro deserializer") { + test("SPARK-32346: filter pushdown to Avro deserializer") { val schema = """ |{ From 668e4972c7aa3f344228f687410c2427d483591b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 19 Jul 2020 12:12:42 +0300 Subject: [PATCH 16/23] Add a test to AvroSuite to check the SQL config --- .../org/apache/spark/sql/avro/AvroSuite.scala | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index dca6a2debcac8..46fe9b2c44529 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1938,4 +1938,34 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper { normalizedOutput) } } + + test("SPARK-32346: filters pushdown to Avro datasource v2") { + Seq(true, false).foreach { filtersPushdown => + withSQLConf(SQLConf.AVRO_FILTER_PUSHDOWN_ENABLED.key -> filtersPushdown.toString) { + withTempPath { dir => + Seq(("a", 1, 2), ("b", 1, 2), ("c", 2, 1)) + .toDF("value", "p1", "p2") + .write + .format("avro") + .save(dir.getCanonicalPath) + val df = spark + .read + .format("avro") + .load(dir.getCanonicalPath) + .where("value = 'a'") + + val fileScan = df.queryExecution.executedPlan collectFirst { + case BatchScanExec(_, f: AvroScan) => f + } + assert(fileScan.nonEmpty) + if (filtersPushdown) { + assert(fileScan.get.pushedFilters.nonEmpty) + } else { + assert(fileScan.get.pushedFilters.isEmpty) + } + checkAnswer(df, Row("a", 1, 2)) + } + } + } + } } From fc48d131d3e0873509b9c07768a1f42844670e3a Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 20 Jul 2020 23:14:27 +0300 Subject: [PATCH 17/23] Add an assert --- .../scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index 3374db73a5afa..285a30bcd046e 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -98,7 +98,10 @@ case class AvroDataToCatalyst( try { decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) result = reader.read(result, decoder) - deserializer.deserialize(result).get + val deserialized = deserializer.deserialize(result) + assert(deserialized.isDefined, + "Avro deserializer cannot return an empty result because filters are not pushed down") + deserialized.get } catch { // There could be multiple possible exceptions here, e.g. java.io.IOException, // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc. From 74f8f1b05d5b062d84ebf8f844daeea71dcc87bb Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 20 Jul 2020 23:16:27 +0300 Subject: [PATCH 18/23] Add a comment to AvroReadBenchmark --- .../apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala index 00c3ec7614cc4..fde858e0a7419 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala @@ -203,6 +203,7 @@ object AvroReadBenchmark extends SqlBasedBenchmark { ($"id" % 1000).as("key") +: ts } withTempPath { path => + // Write and read timestamp in the LEGACY mode to make timestamp conversions more expensive withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> "LEGACY") { spark.range(rowsNum).select(columns(): _*) .write From 19e3321e2dcdf94501c6b5a1c85d59cad8ea028b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 21 Jul 2020 15:32:33 +0300 Subject: [PATCH 19/23] Add comment for nested records --- .../main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 2fc277066e2be..29385b78e3490 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -182,6 +182,8 @@ class AvroDeserializer( updater.setDecimal(ordinal, decimal) case (RECORD, st: StructType) => + // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328. + // We can always return `false` from `applyFilters` for nested records. val writeRecord = getRecordWriter(avroType, st, path, applyFilters = _ => false) (updater, ordinal, value) => val row = new SpecificInternalRow(st) From 438e634fe01a8fcc212751ee1c98d616ac7f31be Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 21 Jul 2020 16:44:50 +0300 Subject: [PATCH 20/23] Put common code to RowReader --- .../spark/sql/avro/AvroFileFormat.scala | 43 +++++------------ .../org/apache/spark/sql/avro/AvroUtils.scala | 36 ++++++++++++++- .../v2/avro/AvroPartitionReaderFactory.scala | 46 +++++-------------- 3 files changed, 58 insertions(+), 67 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index f517c28066b46..fa4b6b829bdde 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -122,7 +122,6 @@ private[sql] class AvroFileFormat extends FileFormat } reader.sync(file.start) - val stop = file.start + file.length val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( reader.asInstanceOf[DataFileReader[_]].getMetaString, @@ -134,37 +133,17 @@ private[sql] class AvroFileFormat extends FileFormat new NoopFilters } - val deserializer = new AvroDeserializer( - userProvidedSchema.getOrElse(reader.getSchema), - requiredSchema, - datetimeRebaseMode, - avroFilters) - - new Iterator[InternalRow] { - private[this] var completed = false - private[this] var nextRow: Option[InternalRow] = None - - override def hasNext: Boolean = { - do { - val r = reader.hasNext && !reader.pastSync(stop) - if (!r) { - reader.close() - completed = true - nextRow = None - } else { - val record = reader.next() - nextRow = deserializer.deserialize(record).asInstanceOf[Option[InternalRow]] - } - } while (!completed && nextRow.isEmpty) - - nextRow.isDefined - } - - override def next(): InternalRow = { - nextRow.getOrElse { - throw new NoSuchElementException("next on empty iterator") - } - } + new Iterator[InternalRow] with AvroUtils.RowReader { + override val fileReader = reader + override val deserializer = new AvroDeserializer( + userProvidedSchema.getOrElse(reader.getSchema), + requiredSchema, + datetimeRebaseMode, + avroFilters) + override val stopPosition = file.start + file.length + + override def hasNext: Boolean = hasNextRow + override def next(): InternalRow = nextRow } } else { Iterator.empty diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index 70dcd58a600fc..51cc51e70cd18 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.avro import java.io.{FileNotFoundException, IOException} import org.apache.avro.Schema +import org.apache.avro.file.{DataFileReader, FileReader} import org.apache.avro.file.DataFileConstants.{BZIP2_CODEC, DEFLATE_CODEC, SNAPPY_CODEC, XZ_CODEC} -import org.apache.avro.file.DataFileReader import org.apache.avro.generic.{GenericDatumReader, GenericRecord} import org.apache.avro.mapred.{AvroOutputFormat, FsInput} import org.apache.avro.mapreduce.AvroJob @@ -32,6 +32,7 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.AvroOptions.ignoreExtensionKey +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.OutputWriterFactory import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -161,4 +162,37 @@ object AvroUtils extends Logging { "No Avro files found. If files don't have .avro extension, set ignoreExtension to true") } } + + // The trait provides iterator-like interface for reading records from an Avro file, + // deserializing and returning them as internal rows. + trait RowReader { + protected val fileReader: FileReader[GenericRecord] + protected val deserializer: AvroDeserializer + protected val stopPosition: Long + + private[this] var completed = false + private[this] var currentRow: Option[InternalRow] = None + + def hasNextRow: Boolean = { + do { + val r = fileReader.hasNext && !fileReader.pastSync(stopPosition) + if (!r) { + fileReader.close() + completed = true + currentRow = None + } else { + val record = fileReader.next() + currentRow = deserializer.deserialize(record).asInstanceOf[Option[InternalRow]] + } + } while (!completed && currentRow.isEmpty) + + currentRow.isDefined + } + + def nextRow: InternalRow = { + currentRow.getOrElse { + throw new NoSuchElementException("next on empty iterator") + } + } + } } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index d66ce3ef12ea8..1e6c382041efb 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions} +import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions, AvroUtils} import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile} @@ -88,7 +88,6 @@ case class AvroPartitionReaderFactory( } reader.sync(partitionedFile.start) - val stop = partitionedFile.start + partitionedFile.length val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( reader.asInstanceOf[DataFileReader[_]].getMetaString, @@ -100,38 +99,17 @@ case class AvroPartitionReaderFactory( new NoopFilters } - val deserializer = new AvroDeserializer( - userProvidedSchema.getOrElse(reader.getSchema), - readDataSchema, - datetimeRebaseMode, - avroFilters) - - val fileReader = new PartitionReader[InternalRow] { - private[this] var completed = false - private[this] var nextRow: Option[InternalRow] = None - - override def next(): Boolean = { - do { - val r = reader.hasNext && !reader.pastSync(stop) - if (!r) { - reader.close() - completed = true - nextRow = None - } else { - val record = reader.next() - nextRow = deserializer.deserialize(record).asInstanceOf[Option[InternalRow]] - } - } while (!completed && nextRow.isEmpty) - - nextRow.isDefined - } - - override def get(): InternalRow = { - nextRow.getOrElse { - throw new NoSuchElementException("next on empty iterator") - } - } - + val fileReader = new PartitionReader[InternalRow] with AvroUtils.RowReader { + override val fileReader = reader + override val deserializer = new AvroDeserializer( + userProvidedSchema.getOrElse(reader.getSchema), + readDataSchema, + datetimeRebaseMode, + avroFilters) + override val stopPosition = partitionedFile.start + partitionedFile.length + + override def next(): Boolean = hasNextRow + override def get(): InternalRow = nextRow override def close(): Unit = reader.close() } new PartitionReaderWithPartitionValues(fileReader, readDataSchema, From 0756a2d4751a4acd91026e3d9c1eff7865a2f3b4 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 22 Jul 2020 16:20:46 +0300 Subject: [PATCH 21/23] !isEmpty -> nonEmpty --- .../scala/org/apache/spark/sql/catalyst/OrderedFilters.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala index a4b322d2bd8ae..2d7fcddd885fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala @@ -63,14 +63,14 @@ class OrderedFilters(filters: Seq[sources.Filter], requiredSchema: StructType) } groupedFilters(index) :+= filter } - if (len > 0 && !groupedFilters(0).isEmpty) { + if (len > 0 && groupedFilters(0).nonEmpty) { // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse` // can be evaluated faster that others. We put them in front of others. val (literals, others) = groupedFilters(0).partition(_.references.isEmpty) groupedFilters(0) = literals ++ others } for (i <- 0 until len) { - if (!groupedFilters(i).isEmpty) { + if (groupedFilters(i).nonEmpty) { groupedPredicates(i) = toPredicate(groupedFilters(i)) } } From a20c04c4c7f6d77304d80e2478ea6910bbf01469 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 22 Jul 2020 16:39:48 +0300 Subject: [PATCH 22/23] Remove an unused import --- .../scala/org/apache/spark/sql/catalyst/OrderedFilters.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala index 2d7fcddd885fa..6f970948cb4d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType From f6b2a9ae8f0fd9426f96cd214c8dc1f2f636ef98 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 22 Jul 2020 17:29:04 +0300 Subject: [PATCH 23/23] Add an assert to skipRow() --- .../scala/org/apache/spark/sql/catalyst/OrderedFilters.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala index 6f970948cb4d3..b7c8a0140ea66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala @@ -86,6 +86,8 @@ class OrderedFilters(filters: Seq[sources.Filter], requiredSchema: StructType) * otherwise `false` if at least one of the filters returns `false`. */ def skipRow(row: InternalRow, index: Int): Boolean = { + assert(0 <= index && index < requiredSchema.fields.length, + "Index is out of the valid range: it must point out to a field of the required schema.") val predicate = predicates(index) predicate != null && !predicate.eval(row) }