Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,15 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("256MB")

val SKEW_JOIN_MAX_SPLITS_PER_PARTITION =
buildConf("spark.sql.adaptive.skewJoin.maxSplitsPerPartition")
.doc(s"When '${ADAPTIVE_EXECUTION_ENABLED.key}' and '${SKEW_JOIN_ENABLED.key}' " +
s"are true, the max number (inclusive) of splits from a partition.")
.version("3.4.0")
.intConf
.checkValue(_ >= 10, "The max splits must be no less than 10.")
.createWithDefault(1000)

val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.internal()
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning
def aggregateAttributes: Seq[Attribute]
def initialInputBufferOffset: Int
def resultExpressions: Seq[NamedExpression]
def isSkew: Boolean = false
override def nodeName: String = if (isSkew) super.nodeName + "(skew=true)" else super.nodeName

override def verboseStringWithOperatorId(): String = {
s"""
Expand Down Expand Up @@ -94,6 +96,7 @@ trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning

override def requiredChildDistribution: List[Distribution] = {
requiredChildDistributionExpressions match {
case _ if isSkew => UnspecifiedDistribution :: Nil
case Some(exprs) if exprs.isEmpty => AllTuples :: Nil
case Some(exprs) =>
if (isStreaming) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ case class HashAggregateExec(
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
child: SparkPlan,
override val isSkew: Boolean = false)
extends AggregateCodegenSupport {

require(Aggregate.supportsHashAggregate(aggregateBufferAttributes))

override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator

override lazy val allAttributes: AttributeSeq =
child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ case class ObjectHashAggregateExec(
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
child: SparkPlan,
override val isSkew: Boolean = false)
extends BaseAggregateExec {

override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator

override lazy val allAttributes: AttributeSeq =
child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ case class SortAggregateExec(
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
child: SparkPlan,
override val isSkew: Boolean = false)
extends AggregateCodegenSupport
with AliasAwareOutputOrdering {

override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.window
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan}

/**
Expand Down Expand Up @@ -87,8 +88,19 @@ case class WindowExec(
windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan)
child: SparkPlan,
isSkew: Boolean = false)
extends WindowExecBase {
override def nodeName: String = if (isSkew) super.nodeName + "(skew=true)" else super.nodeName
override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator

override def requiredChildDistribution: Seq[Distribution] = {
if (isSkew) {
UnspecifiedDistribution :: Nil
} else {
super[WindowExecBase].requiredChildDistribution
}
}

protected override def doExecute(): RDD[InternalRow] = {
// Unwrap the window expressions and window frame factories from the map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
assert(
executedPlan.exists {
case WholeStageCodegenExec(
HashAggregateExec(_, _, _, _, _, _, _, _, _: LocalTableScanExec)) => true
HashAggregateExec(_, _, _, _, _, _, _, _, _: LocalTableScanExec, _)) => true
case _ => false
},
"LocalTableScanExec should be within a WholeStageCodegen domain.")
Expand Down
Loading