From 996297ec40d06576cc1e50bfb8afb6380c017ee9 Mon Sep 17 00:00:00 2001 From: zzzzming95 <505306252@qq.com> Date: Sun, 30 Apr 2023 15:13:40 +0800 Subject: [PATCH 1/2] SPARK-43327 --- .../datasources/FileFormatWriter.scala | 20 +++++++------- .../sql/test/DataFrameReaderWriterSuite.scala | 27 +++++++++++++++++-- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 4f7d5069be1eb..8a8ef03d60d2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -159,6 +159,16 @@ object FileFormatWriter extends Logging { statsTrackers = statsTrackers ) + SQLExecution.checkSQLExecutionId(sparkSession) + + // propagate the description UUID into the jobs, so that committers + // get an ID guaranteed to be unique. + job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) + + // This call shouldn't be put into the `try` block below because it only initializes and + // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. + committer.setupJob(job) + // We should first sort by partition columns, then bucket id, and finally sorting columns. val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns @@ -181,16 +191,6 @@ object FileFormatWriter extends Logging { } } - SQLExecution.checkSQLExecutionId(sparkSession) - - // propagate the description UUID into the jobs, so that committers - // get an ID guaranteed to be unique. - job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) - - // This call shouldn't be put into the `try` block below because it only initializes and - // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. - committer.setupJob(job) - try { val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { (materializedPlan.execute(), None) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index ccdba809292de..41cae6a280af0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.schema.PrimitiveType @@ -32,7 +32,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.scalatest.BeforeAndAfter -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} @@ -1275,4 +1275,27 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } } + + test("SPARK-43327: location exists when insertoverwrite fails") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + withTable("t", "t1") { + sql("create table t(c1 int) using parquet") + sql("create table t1(c2 long) using parquet") + sql("INSERT OVERWRITE TABLE t1 select 6000044164") + + // spark.sql("CREATE TABLE IF NOT EXISTS t(amt1 int) using ORC") + val identifier = TableIdentifier("t") + val location = spark.sessionState.catalog.getTableMetadata(identifier).location + + intercept[SparkException] { + sql("INSERT OVERWRITE TABLE t select c2 from " + + "(select cast(c2 as int) as c2 from t1 distribute by c2)") + } + // scalastyle:off hadoopconfiguration + val fs = FileSystem.get(location, spark.sparkContext.hadoopConfiguration) + // scalastyle:on hadoopconfiguration + assert(fs.exists(new Path(location))) + } + } + } } From f999cedcd68c78d3114d4139529cae5ab15560b2 Mon Sep 17 00:00:00 2001 From: zzzzming95 <505306252@qq.com> Date: Thu, 11 May 2023 20:52:00 +0800 Subject: [PATCH 2/2] FileFormatWriter.scala --- .../spark/sql/execution/datasources/FileFormatWriter.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 8a8ef03d60d2b..e4ec4c05fafaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -167,6 +167,7 @@ object FileFormatWriter extends Logging { // This call shouldn't be put into the `try` block below because it only initializes and // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. + // It must be run before `materializeAdaptiveSparkPlan()` committer.setupJob(job) // We should first sort by partition columns, then bucket id, and finally sorting columns.