From 8cc4899d30356371f4a977e91666237f5b1685a0 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Mon, 21 Jun 2021 22:05:35 -0700 Subject: [PATCH 1/4] Introduce new API to FileCommitProtocol allow flexible file naming --- .../internal/io/FileCommitProtocol.scala | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index d9d7b06cdb8ce..a7d0bf7b68cbe 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -90,8 +90,38 @@ abstract class FileCommitProtocol extends Logging { * if a task is going to write out multiple files to the same dir. The file commit protocol only * guarantees that files written by different tasks will not conflict. */ + @deprecated("use newTaskFile(taskContext, dir, spec)", "3.2.0") def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String + /** + * Notifies the commit protocol to add a new file, and gets back the full path that should be + * used. Must be called on the executors when running tasks. + * + * Note that the returned temp file may have an arbitrary path. The commit protocol only + * promises that the file will be at the location specified by the arguments after job commit. + * + * The "dir" parameter specifies the sub-directory within the base path, used to specify + * partitioning. The "spec" parameter specifies the file name. The rest are left to the commit + * protocol implementation to decide. + * + * Important: it is the caller's responsibility to add uniquely identifying content to "spec" + * if a task is going to write out multiple files to the same dir. The file commit protocol only + * guarantees that files written by different tasks will not conflict. + * + * This API should be implemented and called, instead of deprecated + * [[newTaskTempFile(taskContest, dir, ext)]]. Provide a default implementation here to be + * backward compatible with custom [[FileCommitProtocol]] implementations before Spark 3.2.0. + */ + def newTaskTempFile( + taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = { + if (spec.prefix.isEmpty) { + newTaskTempFile(taskContext, dir, spec.ext) + } else { + throw new UnsupportedOperationException(s"${getClass.getSimpleName}.newTaskTempFile does " + + s"not support file name prefix: ${spec.prefix}") + } + } + /** * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. * Depending on the implementation, there may be weaker guarantees around adding files this way. @@ -100,9 +130,38 @@ abstract class FileCommitProtocol extends Logging { * if a task is going to write out multiple files to the same dir. The file commit protocol only * guarantees that files written by different tasks will not conflict. */ + @deprecated("use newTaskTempFileAbsPath(taskContext, absoluteDir, spec)", "3.2.0") def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String + /** + * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. + * Depending on the implementation, there may be weaker guarantees around adding files this way. + * + * The "absoluteDir" parameter specifies the final absolute directory of file. The "spec" + * parameter specifies the file name. The rest are left to the commit protocol implementation to + * decide. + * + * Important: it is the caller's responsibility to add uniquely identifying content to "spec" + * if a task is going to write out multiple files to the same dir. The file commit protocol only + * guarantees that files written by different tasks will not conflict. + * + * This API should be implemented and called, instead of deprecated + * [[newTaskTempFileAbsPath(taskContest, absoluteDir, ext)]]. Provide a default implementation + * here to be backward compatible with custom [[FileCommitProtocol]] implementations before + * Spark 3.2.0. + */ + def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { + if (spec.prefix.isEmpty) { + newTaskTempFileAbsPath(taskContext, absoluteDir, spec.ext) + } else { + throw new UnsupportedOperationException( + s"${getClass.getSimpleName}.newTaskTempFileAbsPath does not support file name prefix: " + + s"${spec.prefix}") + } + } + /** * Commits a task after the writes succeed. Must be called on the executors when running tasks. */ @@ -174,3 +233,12 @@ object FileCommitProtocol extends Logging { new Path(path, ".spark-staging-" + jobId) } } + +/** + * The specification for Spark output file name. + * This is used by [[FileCommitProtocol]] to create full path of file. + * + * @param prefix Prefix of file. + * @param ext Extension of file. + */ +final case class FileNameSpec(prefix: String, ext: String) From b88a9f65f47d1b5cb7b74fa1537ab5afdb06c9bc Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 22 Jun 2021 18:27:11 -0700 Subject: [PATCH 2/4] Address comment of ext --- .../org/apache/spark/internal/io/FileCommitProtocol.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index a7d0bf7b68cbe..e1fd599eeff34 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -239,6 +239,6 @@ object FileCommitProtocol extends Logging { * This is used by [[FileCommitProtocol]] to create full path of file. * * @param prefix Prefix of file. - * @param ext Extension of file. + * @param suffix Suffix of file. */ -final case class FileNameSpec(prefix: String, ext: String) +final case class FileNameSpec(prefix: String, suffix: String) From b24f40de3ab625c9b9058a5490be55c6ce26c392 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 22 Jun 2021 18:43:48 -0700 Subject: [PATCH 3/4] Fix code for ext --- .../org/apache/spark/internal/io/FileCommitProtocol.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index e1fd599eeff34..dfaf1385dee87 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -115,7 +115,7 @@ abstract class FileCommitProtocol extends Logging { def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = { if (spec.prefix.isEmpty) { - newTaskTempFile(taskContext, dir, spec.ext) + newTaskTempFile(taskContext, dir, spec.suffix) } else { throw new UnsupportedOperationException(s"${getClass.getSimpleName}.newTaskTempFile does " + s"not support file name prefix: ${spec.prefix}") @@ -154,7 +154,7 @@ abstract class FileCommitProtocol extends Logging { def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { if (spec.prefix.isEmpty) { - newTaskTempFileAbsPath(taskContext, absoluteDir, spec.ext) + newTaskTempFileAbsPath(taskContext, absoluteDir, spec.suffix) } else { throw new UnsupportedOperationException( s"${getClass.getSimpleName}.newTaskTempFileAbsPath does not support file name prefix: " + From 2ec6fc61c561999aa7680d6dfa4441b1a6cf7ff5 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 23 Jun 2021 11:34:36 -0700 Subject: [PATCH 4/4] Remove deprecated annotation for existing APIs --- .../org/apache/spark/internal/io/FileCommitProtocol.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index dfaf1385dee87..6465cc7df6dd4 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -90,7 +90,6 @@ abstract class FileCommitProtocol extends Logging { * if a task is going to write out multiple files to the same dir. The file commit protocol only * guarantees that files written by different tasks will not conflict. */ - @deprecated("use newTaskFile(taskContext, dir, spec)", "3.2.0") def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String /** @@ -108,7 +107,7 @@ abstract class FileCommitProtocol extends Logging { * if a task is going to write out multiple files to the same dir. The file commit protocol only * guarantees that files written by different tasks will not conflict. * - * This API should be implemented and called, instead of deprecated + * This API should be implemented and called, instead of * [[newTaskTempFile(taskContest, dir, ext)]]. Provide a default implementation here to be * backward compatible with custom [[FileCommitProtocol]] implementations before Spark 3.2.0. */ @@ -130,7 +129,6 @@ abstract class FileCommitProtocol extends Logging { * if a task is going to write out multiple files to the same dir. The file commit protocol only * guarantees that files written by different tasks will not conflict. */ - @deprecated("use newTaskTempFileAbsPath(taskContext, absoluteDir, spec)", "3.2.0") def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String @@ -146,7 +144,7 @@ abstract class FileCommitProtocol extends Logging { * if a task is going to write out multiple files to the same dir. The file commit protocol only * guarantees that files written by different tasks will not conflict. * - * This API should be implemented and called, instead of deprecated + * This API should be implemented and called, instead of * [[newTaskTempFileAbsPath(taskContest, absoluteDir, ext)]]. Provide a default implementation * here to be backward compatible with custom [[FileCommitProtocol]] implementations before * Spark 3.2.0.