From 8070497e569c37a8f14607e00ec8d47c7df723b4 Mon Sep 17 00:00:00 2001 From: turbofei Date: Tue, 10 Sep 2019 21:00:40 +0800 Subject: [PATCH 01/11] [SPARK-29037] Make staging dir identified with applicationId --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 11ce608f52ee2..08ae2f45643e7 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.mapred.SparkHadoopMapRedUtil @@ -85,11 +86,13 @@ class HadoopMapReduceCommitProtocol( */ @transient private var partitionPaths: mutable.Set[String] = null + private val appId = SparkEnv.get.conf.getAppId + /** * The staging directory of this write job. Spark uses it to deal with files with absolute output * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. */ - private def stagingDir = new Path(path, ".spark-staging-" + jobId) + private def stagingDir = new Path(path, s".spark-staging-${appId}-${jobId}") protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.getConstructor().newInstance() From 3e8b69bf790cd7aee206f32ab7c32afdce6927f6 Mon Sep 17 00:00:00 2001 From: turbofei Date: Sun, 15 Sep 2019 14:44:56 +0800 Subject: [PATCH 02/11] [SPARK-29037] Spark gives duplicate result when an application was killed --- .../internal/io/HadoopMapReduceCommitProtocol.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 08ae2f45643e7..570cb4e3f8126 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.mapred.SparkHadoopMapRedUtil @@ -86,13 +85,11 @@ class HadoopMapReduceCommitProtocol( */ @transient private var partitionPaths: mutable.Set[String] = null - private val appId = SparkEnv.get.conf.getAppId - /** * The staging directory of this write job. Spark uses it to deal with files with absolute output * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. */ - private def stagingDir = new Path(path, s".spark-staging-${appId}-${jobId}") + private def stagingDir = new Path(path, ".spark-staging-" + jobId) protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.getConstructor().newInstance() @@ -167,7 +164,9 @@ class HadoopMapReduceCommitProtocol( } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { - committer.commitJob(jobContext) + if (!dynamicPartitionOverwrite) { + committer.commitJob(jobContext) + } if (hasValidPath) { val (allAbsPathFiles, allPartitionPaths) = From 82d2173b0f630b58b7e81bd8387f51201a452f06 Mon Sep 17 00:00:00 2001 From: turbofei Date: Sun, 15 Sep 2019 16:15:02 +0800 Subject: [PATCH 03/11] unit test --- .../sql/hive/execution/HiveQuerySuite.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 53798e0ac2727..e0be7a3ae1bb7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -22,6 +22,8 @@ import java.net.URI import java.sql.Timestamp import java.util.{Locale, TimeZone} +import com.google.common.io.Files + import scala.util.Try import org.apache.commons.lang3.{JavaVersion, SystemUtils} @@ -38,6 +40,7 @@ import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.{HiveTestUtils, TestHive} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.test.SQLTestUtils case class TestData(a: Int, b: String) @@ -1224,6 +1227,34 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd } } } + + test("SPARK-29037: Spark gives duplicate result when an application was killed") { + withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) { + withTable("test") { + sql("create table test(id int, p1 int, p2 int) using parquet partitioned by (p1, p2)") + sql("insert overwrite table test partition(p1=1,p2) select 1,3") + val df = sql("select id from test where p1=1 and p2=3").collect() + assertResult(Array(Row(1)))(df) + + val warehouse = SQLConf.get.warehousePath + val tblPath = Array(warehouse, "test").mkString(File.separator) + val taskAttemptPath = Array(tblPath, "_temporary", "0", "task_20190914232019_0000_m_000000", + "p1=1", "p2=3").mkString(File.separator) + new File(taskAttemptPath).mkdirs() + + val tblResult = new File(Array(tblPath, "p1=1", "p2=3").mkString(File.separator)) + val tFile = tblResult.list((_: File, name: String) => !name.startsWith(".")).apply(0) + val from = new File(tblResult.getAbsolutePath + File.separator + tFile) + val to = new File(taskAttemptPath + File.separator + tFile) + Files.copy(from, to) + + sql("insert overwrite table test partition(p1=1,p2) select 2, 3") + assert(tblResult.list((_: File, name: String) => !name.startsWith(".")).size === 1) + val df2 = sql("select id from test where p1=1 and p2=3").collect() + assertResult(Array(Row(2)))(df2) + } + } + } } // for SPARK-2180 test From c3aff4e595c8d4cf38c13eba3388a6c6e5bd8207 Mon Sep 17 00:00:00 2001 From: turbofei Date: Sun, 15 Sep 2019 16:32:54 +0800 Subject: [PATCH 04/11] fix code style --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index e0be7a3ae1bb7..ce7e2967c1735 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -22,10 +22,9 @@ import java.net.URI import java.sql.Timestamp import java.util.{Locale, TimeZone} -import com.google.common.io.Files - import scala.util.Try +import com.google.common.io.Files import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.scalatest.BeforeAndAfter From 4e445506515af58bc2fe94176782df32df97bd6d Mon Sep 17 00:00:00 2001 From: turbofei Date: Mon, 16 Sep 2019 07:04:26 +0800 Subject: [PATCH 05/11] move ut to sql/core --- .../org/apache/spark/sql/SQLQuerySuite.scala | 31 +++++++++++++++++++ .../sql/hive/execution/HiveQuerySuite.scala | 30 ------------------ 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 80c1e24bfa568..9c2eb82902e3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -22,6 +22,8 @@ import java.net.{MalformedURLException, URL} import java.sql.{Date, Timestamp} import java.util.concurrent.atomic.AtomicBoolean +import com.google.common.io.Files + import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils @@ -33,6 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.test.{SharedSparkSession, TestSQLContext} import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ @@ -3192,6 +3195,34 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { checkAnswer(df3, Array(Row(new java.math.BigDecimal("0.100000000000000000000000100")))) } } + + test("SPARK-29037: Spark gives duplicate result when an application was killed") { + withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) { + withTable("test") { + sql("create table test(id int, p1 int, p2 int) using parquet partitioned by (p1, p2)") + sql("insert overwrite table test partition(p1=1,p2) select 1,3") + val df = sql("select id from test where p1=1 and p2=3") + checkAnswer(df, Array(Row(1))) + + val warehouse = SQLConf.get.warehousePath.split(":").last + val tblPath = Array(warehouse, "test").mkString(File.separator) + val taskAttemptPath = Array(tblPath, "_temporary", "0", "task_20190914232019_0000_m_000000", + "p1=1", "p2=3").mkString(File.separator) + new File(taskAttemptPath).mkdirs() + + val tblResult = new File(Array(tblPath, "p1=1", "p2=3").mkString(File.separator)) + val tFile = tblResult.list((_: File, name: String) => !name.startsWith(".")).apply(0) + val from = new File(tblResult.getAbsolutePath + File.separator + tFile) + val to = new File(taskAttemptPath + File.separator + tFile) + Files.copy(from, to) + + sql("insert overwrite table test partition(p1=1,p2) select 2, 3") + assert(tblResult.list((_: File, name: String) => !name.startsWith(".")).size === 1) + val df2 = sql("select id from test where p1=1 and p2=3") + checkAnswer(df2, Array(Row(2))) + } + } + } } case class Foo(bar: Option[String]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index ce7e2967c1735..53798e0ac2727 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -24,7 +24,6 @@ import java.util.{Locale, TimeZone} import scala.util.Try -import com.google.common.io.Files import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.scalatest.BeforeAndAfter @@ -39,7 +38,6 @@ import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.{HiveTestUtils, TestHive} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.test.SQLTestUtils case class TestData(a: Int, b: String) @@ -1226,34 +1224,6 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd } } } - - test("SPARK-29037: Spark gives duplicate result when an application was killed") { - withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) { - withTable("test") { - sql("create table test(id int, p1 int, p2 int) using parquet partitioned by (p1, p2)") - sql("insert overwrite table test partition(p1=1,p2) select 1,3") - val df = sql("select id from test where p1=1 and p2=3").collect() - assertResult(Array(Row(1)))(df) - - val warehouse = SQLConf.get.warehousePath - val tblPath = Array(warehouse, "test").mkString(File.separator) - val taskAttemptPath = Array(tblPath, "_temporary", "0", "task_20190914232019_0000_m_000000", - "p1=1", "p2=3").mkString(File.separator) - new File(taskAttemptPath).mkdirs() - - val tblResult = new File(Array(tblPath, "p1=1", "p2=3").mkString(File.separator)) - val tFile = tblResult.list((_: File, name: String) => !name.startsWith(".")).apply(0) - val from = new File(tblResult.getAbsolutePath + File.separator + tFile) - val to = new File(taskAttemptPath + File.separator + tFile) - Files.copy(from, to) - - sql("insert overwrite table test partition(p1=1,p2) select 2, 3") - assert(tblResult.list((_: File, name: String) => !name.startsWith(".")).size === 1) - val df2 = sql("select id from test where p1=1 and p2=3").collect() - assertResult(Array(Row(2)))(df2) - } - } - } } // for SPARK-2180 test From 656668e301d04bb0759221705ea93027155cf60e Mon Sep 17 00:00:00 2001 From: turbofei Date: Mon, 16 Sep 2019 07:57:01 +0800 Subject: [PATCH 06/11] fix ut --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9c2eb82902e3e..cf904f6b62ea3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3205,7 +3205,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { checkAnswer(df, Array(Row(1))) val warehouse = SQLConf.get.warehousePath.split(":").last - val tblPath = Array(warehouse, "test").mkString(File.separator) + val tblPath = Array(warehouse, "org.apache.spark.sql.SQLQuerySuite", "test") + .mkString(File.separator) val taskAttemptPath = Array(tblPath, "_temporary", "0", "task_20190914232019_0000_m_000000", "p1=1", "p2=3").mkString(File.separator) new File(taskAttemptPath).mkdirs() From 804ac8ce1460c95818a3b5339dc6c4f5651c0556 Mon Sep 17 00:00:00 2001 From: turbofei Date: Mon, 16 Sep 2019 12:07:41 +0800 Subject: [PATCH 07/11] also skip setupJob and abortJob when dynamicPartitionOverwrite is true --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 570cb4e3f8126..af7f45b19072c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -160,7 +160,9 @@ class HadoopMapReduceCommitProtocol( val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) - committer.setupJob(jobContext) + if (!dynamicPartitionOverwrite) { + committer.setupJob(jobContext) + } } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { @@ -217,7 +219,9 @@ class HadoopMapReduceCommitProtocol( */ override def abortJob(jobContext: JobContext): Unit = { try { - committer.abortJob(jobContext, JobStatus.State.FAILED) + if (!dynamicPartitionOverwrite) { + committer.abortJob(jobContext, JobStatus.State.FAILED) + } } catch { case e: IOException => logWarning(s"Exception while aborting ${jobContext.getJobID}", e) From 3275685db7bc4ef597d60d6c510362f9c41ba12f Mon Sep 17 00:00:00 2001 From: turbofei Date: Tue, 17 Sep 2019 11:01:02 +0800 Subject: [PATCH 08/11] just to triger a build --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index af7f45b19072c..b095e1bef8768 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -161,7 +161,7 @@ class HadoopMapReduceCommitProtocol( val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) if (!dynamicPartitionOverwrite) { - committer.setupJob(jobContext) + committer.setupJob(jobContext ) } } From 144ec6337e63f9100e0fb1cb800d776ffc25b796 Mon Sep 17 00:00:00 2001 From: turbofei Date: Tue, 17 Sep 2019 11:01:41 +0800 Subject: [PATCH 09/11] just to triger a new building --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index b095e1bef8768..af7f45b19072c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -161,7 +161,7 @@ class HadoopMapReduceCommitProtocol( val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) if (!dynamicPartitionOverwrite) { - committer.setupJob(jobContext ) + committer.setupJob(jobContext) } } From 979fbe478b0ce1ef84ea090aac60b803a67d6c28 Mon Sep 17 00:00:00 2001 From: turbofei Date: Tue, 17 Sep 2019 16:06:21 +0800 Subject: [PATCH 10/11] add comment --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index af7f45b19072c..4081e1c1a1aa6 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -160,6 +160,8 @@ class HadoopMapReduceCommitProtocol( val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) + // For dynamic partition overwrite, it has specific job attempt path, so we don't need + // committer.setupJob here. Same with the commitJob and abortJob operations below. if (!dynamicPartitionOverwrite) { committer.setupJob(jobContext) } From f0b6c844b02ec8349137fa3f2a6773b2e94e941b Mon Sep 17 00:00:00 2001 From: turbofei Date: Wed, 18 Sep 2019 01:09:45 +0800 Subject: [PATCH 11/11] add insert datasource operations conflict check when dynamicPartitionOverwrite is false firstly --- .../org/apache/spark/SparkException.scala | 6 ++++ .../io/HadoopMapReduceCommitProtocol.scala | 28 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 4ad9a0cc4b103..6c2eb8defc587 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -43,3 +43,9 @@ private[spark] case class SparkUserAppException(exitCode: Int) */ private[spark] case class ExecutorDeadException(message: String) extends SparkException(message) + +/** + * Exception thrown when several InsertDataSource operations are conflicted. + */ +private[spark] case class InsertDataSourceConflictException(message: String) + extends SparkException(message) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 4081e1c1a1aa6..f945c16783dff 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -19,6 +19,7 @@ package org.apache.spark.internal.io import java.io.IOException import java.util.{Date, UUID} +import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.util.Try @@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.InsertDataSourceConflictException import org.apache.spark.internal.Logging import org.apache.spark.mapred.SparkHadoopMapRedUtil @@ -91,6 +93,31 @@ class HadoopMapReduceCommitProtocol( */ private def stagingDir = new Path(path, ".spark-staging-" + jobId) + // The job attempt path when dynamicPartitionOverwrite is false, + // please keep it consistent with `FileOutputCommitter`.`getJobAttemptPath`. + private def staticJobAttemptPath = new Path(path, "_temporary") + + private def checkStaticInsertConflict(jobContext: JobContext): Unit = { + val fs = new Path(path).getFileSystem(jobContext.getConfiguration) + if (fs.exists(staticJobAttemptPath)) { + val fileStatus = fs.getFileStatus(staticJobAttemptPath) + val accessTime = new Date(fileStatus.getAccessTime) + val modificationTime = new Date(fileStatus.getModificationTime) + val lastedTime = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - + fileStatus.getModificationTime) + throw InsertDataSourceConflictException( + s""" + | The staging dir for non DynamicPartitionOverwrite is existed, its create time is + | $accessTime, and its modified time is $modificationTime, already $lastedTime seconds + | from now. There may be two possibilities: + | 1. Another InsertDataSource operation is executing, you need wait for it to complete. + | 2. The staging dir is belong to a killed application and not be cleaned up gracefully, + | please refer to its modification time and it need be cleaned up manually. + | Please process this staging dir according to above information. + |""".stripMargin) + } + } + protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.getConstructor().newInstance() // If OutputFormat is Configurable, we should set conf to it. @@ -163,6 +190,7 @@ class HadoopMapReduceCommitProtocol( // For dynamic partition overwrite, it has specific job attempt path, so we don't need // committer.setupJob here. Same with the commitJob and abortJob operations below. if (!dynamicPartitionOverwrite) { + checkStaticInsertConflict(jobContext) committer.setupJob(jobContext) } }