diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeleteSummary.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeleteSummary.java new file mode 100644 index 0000000000000..76ece79b09a96 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeleteSummary.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.write; + +import org.apache.spark.annotation.Evolving; + +/** + * Provides an informational summary of the DELETE operation producing write. + * + * @since 4.2.0 + */ +@Evolving +public interface DeleteSummary extends WriteSummary { + + /** + * Returns the number of rows deleted, or -1 if not found. + */ + long numDeletedRows(); + + /** + * Returns the number of rows copied unmodified, or -1 if not found. + */ + long numCopiedRows(); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/DeleteSummaryImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/DeleteSummaryImpl.scala new file mode 100644 index 0000000000000..b96bf86a57681 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/DeleteSummaryImpl.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.write + +/** + * Implementation of [[DeleteSummary]] that provides DELETE operation summary. + */ +private[sql] case class DeleteSummaryImpl( + numDeletedRows: Long, + numCopiedRows: Long) + extends DeleteSummary { +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 45143e496a19c..6bb1eb6f4b6d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{COPY_OPERATION, DELETE_ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.CustomMetric -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperation, UpdateSummaryImpl, Write, WriterCommitMessage, WriteSummary} +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeleteSummaryImpl, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperation, RowLevelOperationTable, UpdateSummaryImpl, Write, WriterCommitMessage, WriteSummary} import org.apache.spark.sql.connector.write.RowLevelOperation.Command._ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution, UnaryExecNode} @@ -334,6 +334,26 @@ case class ReplaceDataExec( override protected def withNewChildInternal(newChild: SparkPlan): ReplaceDataExec = { copy(query = newChild) } + + override protected def getWriteSummary(query: SparkPlan): Option[WriteSummary] = { + if (rowLevelCommand == DELETE) { + // DELETE ReplaceData plans filter out the deleted rows early in the plan, and they don't + // reach this node. We need to calculate this value as numScannedRows - numCopiedRows. + val numScannedRows = collectFirst(query) { + case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] => + getMetricValue(b.metrics, "numOutputRows") + } + val numCopiedRows = getMetricValue(metrics, "numCopiedRows") + val numDeletedRows = if (numScannedRows.exists(_ >= 0) && numCopiedRows >= 0) { + numScannedRows.get - numCopiedRows + } else { + // One of the metrics couldn't be found, also mark numDeletedRows as not found. + -1L + } + metrics("numDeletedRows").set(numDeletedRows) + } + super.getWriteSummary(query) + } } /** @@ -426,13 +446,17 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec { Map( "numUpdatedRows" -> SQLMetrics.createMetric(sparkContext, "number of updated rows"), "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of copied rows")) + case DELETE => + Map( + "numDeletedRows" -> SQLMetrics.createMetric(sparkContext, "number of deleted rows"), + "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of copied rows")) case _ => Map.empty } /** * Returns the value of the named metric, or -1 if the metric is not found. */ - private def getMetricValue(metrics: Map[String, SQLMetric], name: String): Long = { + protected def getMetricValue(metrics: Map[String, SQLMetric], name: String): Long = { metrics.get(name).map(_.value).getOrElse(-1L) } @@ -456,7 +480,9 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec { getMetricValue(operationMetrics, "numUpdatedRows"), getMetricValue(operationMetrics, "numCopiedRows"))) case DELETE => - None + Some(DeleteSummaryImpl( + getMetricValue(operationMetrics, "numDeletedRows"), + getMetricValue(operationMetrics, "numCopiedRows"))) } } } @@ -744,6 +770,7 @@ case class DeltaWritingSparkTask( override protected def write( writer: DeltaWriter[InternalRow], iter: java.util.Iterator[InternalRow]): Unit = { var numUpdatedRows = 0L + var numDeletedRows = 0L while (iter.hasNext) { val row = iter.next() @@ -751,6 +778,7 @@ case class DeltaWritingSparkTask( operation match { case DELETE_OPERATION => + numDeletedRows += 1L rowIdProjection.project(row) writer.delete(null, rowIdProjection) @@ -777,6 +805,7 @@ case class DeltaWritingSparkTask( } operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) + operationMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows)) } } @@ -792,6 +821,7 @@ case class DeltaWithMetadataWritingSparkTask( override protected def write( writer: DeltaWriter[InternalRow], iter: java.util.Iterator[InternalRow]): Unit = { var numUpdatedRows = 0L + var numDeletedRows = 0L while (iter.hasNext) { val row = iter.next() @@ -799,6 +829,7 @@ case class DeltaWithMetadataWritingSparkTask( operation match { case DELETE_OPERATION => + numDeletedRows += 1L rowIdProjection.project(row) metadataProjection.project(row) writer.delete(metadataProjection, rowIdProjection) @@ -828,6 +859,7 @@ case class DeltaWithMetadataWritingSparkTask( } operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) + operationMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala index 0f7f4cefe2feb..2682487e51ba0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.CheckInvariant import org.apache.spark.sql.catalyst.plans.logical.Filter +import org.apache.spark.sql.connector.catalog.InMemoryTable +import org.apache.spark.sql.connector.write.DeleteSummary import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec, ReplaceDataExec, WriteDeltaExec} abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { @@ -28,6 +30,24 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { protected def enforceCheckConstraintOnDelete: Boolean = true + protected def deltaDelete: Boolean = false + + protected def getDeleteSummary(): DeleteSummary = { + val t = catalog.loadTable(ident).asInstanceOf[InMemoryTable] + t.commits.last.writeSummary.get.asInstanceOf[DeleteSummary] + } + + protected def checkDeleteMetrics( + numDeletedRows: Long, + numCopiedRows: Long): Unit = { + val summary = getDeleteSummary() + assert(summary.numDeletedRows() === numDeletedRows, + s"Expected numDeletedRows=$numDeletedRows, got ${summary.numDeletedRows()}") + val expectedCopied = if (deltaDelete) 0L else numCopiedRows + assert(summary.numCopiedRows() === expectedCopied, + s"Expected numCopiedRows=$expectedCopied, got ${summary.numCopiedRows()}") + } + test("delete from table containing added column with default value") { createAndInitTable("pk INT NOT NULL, dep STRING", """{ "pk": 1, "dep": "hr" }""") @@ -65,6 +85,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { Row(3, "software", "initial-text"), Row(4, "hr", "initial-text"), Row(6, "hr", "new-text"))) + checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 3) } test("delete from table with table constraints") { @@ -90,10 +111,13 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq(Row(2, 4, "eng"), Row(3, 6, "eng"))) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + sql(s"DELETE FROM $tableNameAsString WHERE pk >=3") checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq(Row(2, 4, "eng"))) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) } test("delete from table containing struct column with default value") { @@ -151,6 +175,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"DELETE FROM $tableNameAsString WHERE id <= 1") checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Nil) + + checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) } test("delete with basic filters") { @@ -165,6 +191,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil) + + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) } test("delete with aliases") { @@ -177,6 +205,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"DELETE FROM $tableNameAsString AS t WHERE t.id <= 1 OR t.dep = 'hr'") checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "software") :: Nil) + + checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 0) } test("delete with IN predicates") { @@ -191,6 +221,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "software") :: Row(3, null, "hr") :: Nil) + + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) } test("delete with NOT IN predicates") { @@ -205,12 +237,14 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(2, 2, "software") :: Row(3, null, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) sql(s"DELETE FROM $tableNameAsString WHERE id NOT IN (1, 10)") checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(3, null, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) } test("delete with conditions on nested columns") { @@ -224,10 +258,12 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, Row(2, "v2"), "software") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) sql(s"DELETE FROM $tableNameAsString t WHERE t.complex.c1 = id") checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) } test("delete with IN subqueries") { @@ -255,6 +291,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) append("pk INT NOT NULL, id INT, dep STRING", """{ "pk": 4, "id": 1, "dep": "hr" } @@ -276,6 +313,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(5, -1, "hr") :: Row(4, 1, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 2) append("pk INT NOT NULL, id INT, dep STRING", """{ "pk": 6, "id": null, "dep": "hr" } @@ -297,6 +335,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(5, -1, "hr") :: Row(4, 1, "hr") :: Row(6, null, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 3) } } @@ -320,6 +359,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) } } @@ -346,6 +386,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) sql( s"""DELETE FROM $tableNameAsString @@ -354,6 +395,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { |""".stripMargin) checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(3, null, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 1) append("pk INT NOT NULL, id INT, dep STRING", """{ "pk": 4, "id": 1, "dep": "hr" } @@ -374,6 +416,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { |""".stripMargin) checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(5, 2, "hardware") :: Nil) + checkDeleteMetrics(numDeletedRows = 3, numCopiedRows = 0) sql( s"""DELETE FROM $tableNameAsString @@ -420,6 +463,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) sql( s"""DELETE FROM $tableNameAsString t @@ -430,6 +474,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) sql( s"""DELETE FROM $tableNameAsString t @@ -440,6 +485,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) sql( s"""DELETE FROM $tableNameAsString t @@ -452,6 +498,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Nil) + checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) } } @@ -480,6 +527,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(3, null, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) sql( s"""DELETE FROM $tableNameAsString t @@ -488,6 +536,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { |""".stripMargin) checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) sql( s"""DELETE FROM $tableNameAsString t @@ -498,6 +547,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { |""".stripMargin) checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) } } @@ -521,6 +571,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) } } @@ -553,6 +604,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(3, 2, "hardware") :: Row(4, 3, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 2) // verify the view reflects the changes in the table checkAnswer(sql("SELECT * FROM temp"), Nil) @@ -598,6 +650,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, 200) :: Nil) + checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 0) } test("delete with subquery cannot be converted into delete with filters") { @@ -617,6 +670,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, 200) :: Row(3, 3, 100) :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala index 9046123ddbd3f..15d259d44a4fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala @@ -25,6 +25,8 @@ class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { import testImplicits._ + override protected def deltaDelete: Boolean = true + override protected lazy val extraTableProps: java.util.Map[String, String] = { val props = new java.util.HashMap[String, String]() props.put("supports-deltas", "true") @@ -52,6 +54,7 @@ class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { expectedMetadataSchema = Some(StructType(Array(PARTITION_FIELD, INDEX_FIELD_NULLABLE)))) checkLastWriteLog(deleteWriteLogEntry(id = 1, metadata = Row("hr", null))) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) } test("delete with subquery handles metadata columns correctly") { @@ -83,6 +86,7 @@ class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { expectedMetadataSchema = Some(StructType(Array(PARTITION_FIELD, INDEX_FIELD_NULLABLE)))) checkLastWriteLog(deleteWriteLogEntry(id = 1, metadata = Row("hr", null))) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) } } @@ -136,6 +140,7 @@ class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "us", "software") :: Row(3, 3, "canada", "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) } test("delete does not double plan table") { @@ -162,5 +167,6 @@ class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, 150, "software") :: Row(3, 3, 120, "hr") :: Nil) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataDeleteFromTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataDeleteFromTableSuite.scala index 73407d640923a..b33cf87402b3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataDeleteFromTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataDeleteFromTableSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.connector class DeltaBasedNoMetadataDeleteFromTableSuite extends DeleteFromTableSuiteBase { + override protected def deltaDelete: Boolean = true + override protected def extraTableProps: java.util.Map[String, String] = { val props = new java.util.HashMap[String, String]() props.put("supports-deltas", "true") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala index 2f922295010ff..3889b0d172adc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala @@ -47,6 +47,7 @@ class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { checkLastWriteLog( writeWithMetadataLogEntry(metadata = Row("hr", 1), data = Row(3, 3, "hr")), writeWithMetadataLogEntry(metadata = Row("hr", 2), data = Row(4, 4, "hr"))) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 2) } test("delete with nondeterministic conditions") { @@ -86,6 +87,7 @@ class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { Row(2, 150, "software") :: Row(3, 120, "hr") :: Nil) checkReplacedPartitions(Seq("hr")) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) } test("delete with subqueries and runtime group filtering") { @@ -118,6 +120,7 @@ class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { Row(1, 300, "hr") :: Row(3, 120, "hr") :: Row(4, 150, "software") :: Nil) checkReplacedPartitions(Seq("software")) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) } } @@ -166,6 +169,7 @@ class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { Row(2, 150, "software") :: Row(3, 120, "hr") :: Nil) checkReplacedPartitions(Seq("hr")) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) } } @@ -199,6 +203,7 @@ class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { Row(2, 150, "software") :: Row(3, 120, "hr") :: Nil) checkReplacedPartitions(Seq("hr")) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) } } @@ -229,6 +234,7 @@ class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { Row(2, 150, "software") :: Row(3, 120, "hr") :: Nil) checkReplacedPartitions(Seq("software", "hr")) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 2) } } }