Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,21 @@ case class BatchScanExec(

val newRows = new InternalRowSet(p.expressions.map(_.dataType))
newRows ++= newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey())
val oldRows = p.partitionValuesOpt.get

if (oldRows.size != newRows.size) {
throw new SparkException("Data source must have preserved the original partitioning " +
"during runtime filtering: the number of unique partition values obtained " +
s"through HasPartitionKey changed: before ${oldRows.size}, after ${newRows.size}")
val oldRows = p.partitionValuesOpt.get.toSet

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use InternalRowSet?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops you're right! I forgot about InternalRowSet. Let me create a follow-up.

// We require the new number of partition keys to be equal or less than the old number
// of partition keys here. In the case of less than, empty partitions will be added for
// those missing keys that are not present in the new input partitions.
if (oldRows.size < newRows.size) {
throw new SparkException("During runtime filtering, data source must either report " +
"the same number of partition keys, or a subset of partition keys from the " +
s"original. Before: ${oldRows.size} partition keys. After: ${newRows.size} " +
"partition keys")
}

if (!oldRows.forall(newRows.contains)) {
throw new SparkException("Data source must have preserved the original partitioning " +
"during runtime filtering: the number of unique partition values obtained " +
s"through HasPartitionKey remain the same but do not exactly match")
if (!newRows.forall(oldRows.contains)) {
throw new SparkException("During runtime filtering, data source must not report new " +
"partition keys that are not present in the original partitioning.")
}

groupPartitions(newPartitions).get.map(_._2)
Expand All @@ -114,8 +117,21 @@ case class BatchScanExec(
// return an empty RDD with 1 partition if dynamic filtering removed the only split
sparkContext.parallelize(Array.empty[InternalRow], 1)
} else {
var finalPartitions = filteredPartitions

outputPartitioning match {
case p: KeyGroupedPartitioning =>
val partitionMapping = finalPartitions.map(s =>
s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap
finalPartitions = p.partitionValuesOpt.get.map { partKey =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move this logic to lazy val filteredPartitions?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm any obvious advantage of this? It looks the same to me 😅

@cloud-fan cloud-fan Dec 7, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to centralize the related code.

This empty filling is quite related to the if (oldRows.size < newRows.size) check. We should put them together if possible

@sunchao sunchao Dec 7, 2022

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping to re-use the logic in #38950 too, which is not related to runtime filtering. If moving this close to the if (oldRows.size < newRows.size) check then I may have to duplicate it somewhere else?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah then it makes sense to keep it here.

// Use empty partition for those partition keys that are not present
partitionMapping.getOrElse(partKey, Seq.empty)
}
case _ =>
}

new DataSourceRDD(
sparkContext, filteredPartitions, readerFactory, supportsColumnar, customMetrics)
sparkContext, finalPartitions, readerFactory, supportsColumnar, customMetrics)
}
postDriverMetrics()
rdd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,11 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")

// number of unique partitions changed after dynamic filtering - should throw exception
// number of unique partitions changed after dynamic filtering - the gap should be filled
// with empty partitions and the job should still succeed
var df = sql(s"SELECT sum(p.price) from testcat.ns.$items i, testcat.ns.$purchases p WHERE " +
s"i.id = p.item_id AND i.price > 40.0")
val e = intercept[Exception](df.collect())
assert(e.getMessage.contains("number of unique partition values"))
checkAnswer(df, Seq(Row(131)))

// dynamic filtering doesn't change partitioning so storage-partitioned join should kick in
df = sql(s"SELECT sum(p.price) from testcat.ns.$items i, testcat.ns.$purchases p WHERE " +
Expand Down