From a80b57b54e36677c188a333d23b86de349301001 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 22 Jun 2018 12:58:16 -0700 Subject: [PATCH 1/2] [SPARK-24552][core][sql] Use unique id instead of attempt number for writes. This passes a unique attempt id instead of attempt number to v2 data sources and hadoop APIs, because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted. --- .../org/apache/spark/internal/io/SparkHadoopWriter.scala | 6 +++++- .../sql/execution/datasources/v2/WriteToDataSourceV2.scala | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index abf39213fa0d2..c22f90ed2f8e1 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { + // SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers. + // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. + val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber + executeTask( context = context, config = config, jobTrackerId = jobTrackerId, commitJobId = commitJobId, sparkPartitionId = context.partitionId, - sparkAttemptNumber = context.attemptNumber, + sparkAttemptNumber = attemptId, committer = committer, iterator = iter) }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index d02faacd9c19a..aa6218796bba5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -123,7 +123,10 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { - val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) + // SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers. + // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. + val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber + val dataWriter = writeTask.createDataWriter(context.partitionId(), attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { From f9b134e5fb50434a39e13d471403d1fced3e7638 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 22 Jun 2018 16:37:09 -0700 Subject: [PATCH 2/2] Typo. --- .../scala/org/apache/spark/internal/io/SparkHadoopWriter.scala | 2 +- .../sql/execution/datasources/v2/WriteToDataSourceV2.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index c22f90ed2f8e1..9ebd0aa301592 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -76,7 +76,7 @@ object SparkHadoopWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { - // SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers. + // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers. // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index aa6218796bba5..e4d2639632174 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -123,7 +123,7 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { - // SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers. + // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers. // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber val dataWriter = writeTask.createDataWriter(context.partitionId(), attemptId)