Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0755e67
Add operation metrics for UPDATE queries in DSv2
ZiyaZa Apr 1, 2026
7336546
Add comment
ZiyaZa Apr 2, 2026
e58f003
Address comments
ZiyaZa Apr 2, 2026
b8fcfb4
Remove IncrementMetric, compute metrics via additional attribute
ZiyaZa Apr 7, 2026
5e12e94
Revert unnecessary change
ZiyaZa Apr 7, 2026
6eb4407
Address comments
ZiyaZa Apr 8, 2026
9757ba7
Address comments
ZiyaZa Apr 14, 2026
072df0f
Add 2 more tests
ZiyaZa Apr 14, 2026
2b100a3
-1 if missing, add RowLevelWriteExec
ZiyaZa Apr 14, 2026
f54923b
Replace __is_updated with operation column
ZiyaZa Apr 14, 2026
c70e1da
Fix ReplaceData DML without metadata attributes not projecting out th…
ZiyaZa Apr 16, 2026
8b37604
Rename WRITE_OPERATION and WRITE_WITH_METADATA_OPERATION
ZiyaZa Apr 16, 2026
f69f4e6
Tests without metadata attributes
ZiyaZa Apr 16, 2026
f70cc22
Fix comment
ZiyaZa Apr 16, 2026
cae5e1e
Remove WRITE_OPERATION and instead use fine-grained operations
ZiyaZa Apr 17, 2026
67726d4
Merge branch 'dml-no-metadata' into dsv2-update-metrics
ZiyaZa Apr 17, 2026
268c29c
Resolve conflicts
ZiyaZa Apr 17, 2026
69b2c42
Address comments
ZiyaZa Apr 18, 2026
d73381d
Merge branch 'dml-no-metadata' into dsv2-update-metrics
ZiyaZa Apr 18, 2026
f22c999
Merge branch 'master' into dsv2-update-metrics
ZiyaZa Apr 20, 2026
551a701
DELETE metrics for WriteDelta
ZiyaZa Apr 17, 2026
0d6c5f4
DELETE metrics for ReplaceData
ZiyaZa Apr 17, 2026
ddcba05
Merge branch 'master' into dsv2-delete-metrics
ZiyaZa Apr 22, 2026
670a0c4
Resolve conflicts
ZiyaZa Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.Evolving;

/**
* Provides an informational summary of the DELETE operation producing write.
*
* @since 4.2.0
*/
@Evolving
public interface DeleteSummary extends WriteSummary {

/**
* Returns the number of rows deleted, or -1 if not found.
*/
long numDeletedRows();

/**
* Returns the number of rows copied unmodified, or -1 if not found.
*/
long numCopiedRows();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write

/**
* Implementation of [[DeleteSummary]] that provides DELETE operation summary.
*/
private[sql] case class DeleteSummaryImpl(
numDeletedRows: Long,
numCopiedRows: Long)
extends DeleteSummary {
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{COPY_OPERATION, DELETE_
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperation, UpdateSummaryImpl, Write, WriterCommitMessage, WriteSummary}
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeleteSummaryImpl, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperation, RowLevelOperationTable, UpdateSummaryImpl, Write, WriterCommitMessage, WriteSummary}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command._
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution, UnaryExecNode}
Expand Down Expand Up @@ -334,6 +334,26 @@ case class ReplaceDataExec(
override protected def withNewChildInternal(newChild: SparkPlan): ReplaceDataExec = {
copy(query = newChild)
}

override protected def getWriteSummary(query: SparkPlan): Option[WriteSummary] = {
if (rowLevelCommand == DELETE) {
// DELETE ReplaceData plans filter out the deleted rows early in the plan, and they don't
// reach this node. We need to calculate this value as numScannedRows - numCopiedRows.
val numScannedRows = collectFirst(query) {
case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only scan type we have today?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataSourceV2ScanExecBase has 4 child scans, seemingly only this is used for DELETEs. The others scans seem to be used only for streaming reads.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't expect this to happen right? Should we log a warning if we cannot find batchscanexec, or combine it with one of the assertions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently if we can't find such a scan node, we set the metric as -1. Is this good enough, what do you think? It kind of fits the theme we have been going with so far, that if there is some problem, we use -1 as the metric value. If any metric value is -1, it means we have a problem somewhere (excluding insert-only merges where this is intentional at the moment, but we'll fix that). When we discussed this previously, we decided against throwing an error and letting connectors handle it, but we didn't discuss logging.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A case to consider is when the optimizer squashes and replaces the scan with an empty relation. This can happen in multiple cases. What is our behavior in that case?

getMetricValue(b.metrics, "numOutputRows")
}
val numCopiedRows = getMetricValue(metrics, "numCopiedRows")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a reasonable approach to handle DELETE. Is there a metric for the overall number of output rows? Can we add some sanity checks that we only had copied rows and its count matches the number of output rows?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do this using totalNumRowsAccumulator.value and verify if the numbers in WriteSummary matches what we expect. Since this can be done for other commands too, let's do this in a follow-up PR for all commands.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the idea (check whether it matches numCopiedRows in ReplaceData , and numDeletedRows in WriteDelta case)

val numDeletedRows = if (numScannedRows.exists(_ >= 0) && numCopiedRows >= 0) {
numScannedRows.get - numCopiedRows

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can scanned rows be less than numCopiedRows? This looks like it needs a sanity check?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would mean the plan is creating new rows, and it shouldn't do that. We can add a sanity check, together with others in a follow-up PR. See #55428 (comment). This also has dependency on #55371.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metric values can get overcounted on retries. The scan and the write can be executed in different stages, so can have different retries, so technically you can get an overcounted numCopiedRows to turn this negative. Using the metrics infrastructure from #55371 that I want to get it would fix that.

} else {
// One of the metrics couldn't be found, also mark numDeletedRows as not found.
-1L
}
metrics("numDeletedRows").set(numDeletedRows)
}
super.getWriteSummary(query)
}
}

/**
Expand Down Expand Up @@ -426,13 +446,17 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec {
Map(
"numUpdatedRows" -> SQLMetrics.createMetric(sparkContext, "number of updated rows"),
"numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of copied rows"))
case DELETE =>
Map(
"numDeletedRows" -> SQLMetrics.createMetric(sparkContext, "number of deleted rows"),
"numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of copied rows"))
case _ => Map.empty
}

/**
* Returns the value of the named metric, or -1 if the metric is not found.
*/
private def getMetricValue(metrics: Map[String, SQLMetric], name: String): Long = {
protected def getMetricValue(metrics: Map[String, SQLMetric], name: String): Long = {
metrics.get(name).map(_.value).getOrElse(-1L)
}

Expand All @@ -456,7 +480,9 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec {
getMetricValue(operationMetrics, "numUpdatedRows"),
getMetricValue(operationMetrics, "numCopiedRows")))
case DELETE =>
None
Some(DeleteSummaryImpl(
getMetricValue(operationMetrics, "numDeletedRows"),
getMetricValue(operationMetrics, "numCopiedRows")))
}
}
}
Expand Down Expand Up @@ -744,13 +770,15 @@ case class DeltaWritingSparkTask(
override protected def write(
writer: DeltaWriter[InternalRow], iter: java.util.Iterator[InternalRow]): Unit = {
var numUpdatedRows = 0L
var numDeletedRows = 0L

while (iter.hasNext) {
val row = iter.next()
val operation = row.getInt(0)

operation match {
case DELETE_OPERATION =>
numDeletedRows += 1L
rowIdProjection.project(row)
writer.delete(null, rowIdProjection)

Expand All @@ -777,6 +805,7 @@ case class DeltaWritingSparkTask(
}

operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows))
operationMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows))
}
}

Expand All @@ -792,13 +821,15 @@ case class DeltaWithMetadataWritingSparkTask(
override protected def write(
writer: DeltaWriter[InternalRow], iter: java.util.Iterator[InternalRow]): Unit = {
var numUpdatedRows = 0L
var numDeletedRows = 0L

while (iter.hasNext) {
val row = iter.next()
val operation = row.getInt(0)

operation match {
case DELETE_OPERATION =>
numDeletedRows += 1L
rowIdProjection.project(row)
metadataProjection.project(row)
writer.delete(metadataProjection, rowIdProjection)
Expand Down Expand Up @@ -828,6 +859,7 @@ case class DeltaWithMetadataWritingSparkTask(
}

operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows))
operationMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows))
}
}

Expand Down
Loading