From 4e61fc4ce555e03460c13107b63e2df65b77a537 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 29 Jun 2021 21:09:07 -0700 Subject: [PATCH 1/3] Add Unstable annotation to newly added methods of FileCommitProtocol --- .../spark/internal/io/FileCommitProtocol.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 6465cc7df6dd4..17587242fff78 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -20,6 +20,7 @@ package org.apache.spark.internal.io import org.apache.hadoop.fs._ import org.apache.hadoop.mapreduce._ +import org.apache.spark.annotation.Unstable import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -41,6 +42,9 @@ import org.apache.spark.util.Utils * (or abortTask if task failed). * 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job * failed to execute (e.g. too many failed tasks), the job should call abortJob. + * + * NOTE: this class is exposed as an API considering the usage of many downstream custom + * implementations, but will be subject to be changed and/or moved. */ abstract class FileCommitProtocol extends Logging { import FileCommitProtocol._ @@ -107,10 +111,9 @@ abstract class FileCommitProtocol extends Logging { * if a task is going to write out multiple files to the same dir. The file commit protocol only * guarantees that files written by different tasks will not conflict. * - * This API should be implemented and called, instead of - * [[newTaskTempFile(taskContest, dir, ext)]]. Provide a default implementation here to be - * backward compatible with custom [[FileCommitProtocol]] implementations before Spark 3.2.0. + * @since 3.2.0 */ + @Unstable def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = { if (spec.prefix.isEmpty) { @@ -144,11 +147,9 @@ abstract class FileCommitProtocol extends Logging { * if a task is going to write out multiple files to the same dir. The file commit protocol only * guarantees that files written by different tasks will not conflict. * - * This API should be implemented and called, instead of - * [[newTaskTempFileAbsPath(taskContest, absoluteDir, ext)]]. Provide a default implementation - * here to be backward compatible with custom [[FileCommitProtocol]] implementations before - * Spark 3.2.0. + * @since 3.2.0 */ + @Unstable def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { if (spec.prefix.isEmpty) { From 4fb511356384367bd803663068ef12e323fbbf06 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 29 Jun 2021 21:15:41 -0700 Subject: [PATCH 2/3] Mark the whole class as Unstable --- .../org/apache/spark/internal/io/FileCommitProtocol.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 17587242fff78..5a27856fdb9dd 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -46,6 +46,7 @@ import org.apache.spark.util.Utils * NOTE: this class is exposed as an API considering the usage of many downstream custom * implementations, but will be subject to be changed and/or moved. */ +@Unstable abstract class FileCommitProtocol extends Logging { import FileCommitProtocol._ @@ -113,7 +114,6 @@ abstract class FileCommitProtocol extends Logging { * * @since 3.2.0 */ - @Unstable def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = { if (spec.prefix.isEmpty) { @@ -149,7 +149,6 @@ abstract class FileCommitProtocol extends Logging { * * @since 3.2.0 */ - @Unstable def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { if (spec.prefix.isEmpty) { From d41ba14b7789e8d0806d248b199b983dfa4946ce Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 29 Jun 2021 21:18:49 -0700 Subject: [PATCH 3/3] Update the note with @note --- .../scala/org/apache/spark/internal/io/FileCommitProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 5a27856fdb9dd..5cd7397ea358f 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -43,7 +43,7 @@ import org.apache.spark.util.Utils * 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job * failed to execute (e.g. too many failed tasks), the job should call abortJob. * - * NOTE: this class is exposed as an API considering the usage of many downstream custom + * @note This class is exposed as an API considering the usage of many downstream custom * implementations, but will be subject to be changed and/or moved. */ @Unstable