From 8c4ae5eb7441fd5bc0b06276d5d02a2ebc6de4a0 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 27 Oct 2016 14:45:52 -0700 Subject: [PATCH 01/15] Thu Oct 27 14:45:52 PDT 2016 --- .../execution/datasources/DataSource.scala | 2 +- .../datasources/DataSourceStrategy.scala | 27 ++++++++++++------- .../InsertIntoHadoopFsRelationCommand.scala | 3 ++- .../datasources/PartitioningUtils.scala | 12 +++++++++ .../execution/datasources/WriteOutput.scala | 27 ++++++++++++------- 5 files changed, 50 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5b8f05a396241..cbf4f925d4c3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -528,7 +528,7 @@ case class DataSource( columns, bucketSpec, format, - () => Unit, // No existing table needs to be refreshed. + _ => Unit, // No existing table needs to be refreshed. options, data.logicalPlan, mode) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index f0bcf94eadc96..34b77cab65def 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, Inte import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -34,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} -import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils, ExecutedCommandExec} +import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, DDLUtils, ExecutedCommandExec} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -179,24 +180,30 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { "Cannot overwrite a path that is also being read from.") } + def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = { + if (l.catalogTable.isDefined && + l.catalogTable.get.partitionColumnNames.nonEmpty && + l.catalogTable.get.partitionProviderIsHive) { + val metastoreUpdater = AlterTableAddPartitionCommand( + l.catalogTable.get.identifier, + updatedPartitions.map(p => (p, None)), + ifNotExists = true) + metastoreUpdater.run(t.sparkSession) + } + t.location.refresh() + } + val insertCmd = InsertIntoHadoopFsRelationCommand( outputPath, query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver), t.bucketSpec, t.fileFormat, - () => t.location.refresh(), + refreshPartitionsCallback, t.options, query, mode) - if (l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty && - l.catalogTable.get.partitionProviderIsHive) { - // TODO(ekl) we should be more efficient here and only recover the newly added partitions - val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(l.catalogTable.get.identifier) - Union(insertCmd, recoverPartitionCmd) - } else { - insertCmd - } + insertCmd } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 22dbe7149531c..a1221d0ae6d27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand @@ -40,7 +41,7 @@ case class InsertIntoHadoopFsRelationCommand( partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], fileFormat: FileFormat, - refreshFunction: () => Unit, + refreshFunction: (Seq[TablePartitionSpec]) => Unit, options: Map[String, String], @transient query: LogicalPlan, mode: SaveMode) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index f66e8b4e2b551..b51b41869bf06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.util.Shell import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.types._ @@ -244,6 +245,17 @@ object PartitioningUtils { } } + /** + * Given a partition path fragment, e.g. `fieldOne=1/fieldTwo=2`, returns a parsed spec + * for that fragment, e.g. `Map(("fieldOne", "1"), ("fieldTwo", "2"))`. + */ + def parsePathFragment(pathFragment: String): TablePartitionSpec = { + pathFragment.split("/").map { kv => + val pair = kv.split("=", 2) + (unescapePathName(pair(0)), unescapePathName(pair(1))) + }.toMap + } + /** * Normalize the column names in partition specification, w.r.t. the real partition column names * and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala index bd56e511d0ccf..563d00ead2ca1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow @@ -85,7 +86,7 @@ object WriteOutput extends Logging { hadoopConf: Configuration, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], - refreshFunction: () => Unit, + refreshFunction: (Seq[TablePartitionSpec]) => Unit, options: Map[String, String], isAppend: Boolean): Unit = { @@ -120,7 +121,7 @@ object WriteOutput extends Logging { val committer = setupDriverCommitter(job, outputPath.toString, isAppend) try { - sparkSession.sparkContext.runJob(queryExecution.toRdd, + val updatedPartitions = sparkSession.sparkContext.runJob(queryExecution.toRdd, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { executeTask( description = description, @@ -128,11 +129,11 @@ object WriteOutput extends Logging { sparkPartitionId = taskContext.partitionId(), sparkAttemptNumber = taskContext.attemptNumber(), iterator = iter) - }) + }).distinct.flatten committer.commitJob(job) logInfo(s"Job ${job.getJobID} committed.") - refreshFunction() + refreshFunction(updatedPartitions.map(PartitioningUtils.parsePathFragment)) } catch { case cause: Throwable => logError(s"Aborting job ${job.getJobID}.", cause) committer.abortJob(job, JobStatus.State.FAILED) @@ -147,7 +148,7 @@ object WriteOutput extends Logging { sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, - iterator: Iterator[InternalRow]): Unit = { + iterator: Iterator[InternalRow]): Seq[String] = { val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) @@ -187,11 +188,12 @@ object WriteOutput extends Logging { try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { // Execute the task to write rows out - writeTask.execute(iterator) + val outputPaths = writeTask.execute(iterator) writeTask.releaseResources() // Commit the task SparkHadoopMapRedUtil.commitTask(committer, taskAttemptContext, jobId.getId, taskId.getId) + outputPaths })(catchBlock = { // If there is an error, release resource and then abort the task try { @@ -213,7 +215,7 @@ object WriteOutput extends Logging { * automatically trigger task aborts. */ private trait ExecuteWriteTask { - def execute(iterator: Iterator[InternalRow]): Unit + def execute(iterator: Iterator[InternalRow]): Seq[String] def releaseResources(): Unit final def filePrefix(split: Int, uuid: String, bucketId: Option[Int]): String = { @@ -240,11 +242,12 @@ object WriteOutput extends Logging { outputWriter } - override def execute(iter: Iterator[InternalRow]): Unit = { + override def execute(iter: Iterator[InternalRow]): Seq[String] = { while (iter.hasNext) { val internalRow = iter.next() outputWriter.writeInternal(internalRow) } + Nil } override def releaseResources(): Unit = { @@ -327,7 +330,7 @@ object WriteOutput extends Logging { newWriter } - override def execute(iter: Iterator[InternalRow]): Unit = { + override def execute(iter: Iterator[InternalRow]): Seq[String] = { // We should first sort by partition columns, then bucket id, and finally sorting columns. val sortingExpressions: Seq[Expression] = description.partitionColumns ++ bucketIdExpression ++ sortColumns @@ -375,6 +378,7 @@ object WriteOutput extends Logging { // If anything below fails, we should abort the task. var currentKey: UnsafeRow = null + var updatedPartitions: List[String] = Nil while (sortedIterator.next()) { val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] if (currentKey != nextKey) { @@ -386,6 +390,10 @@ object WriteOutput extends Logging { logDebug(s"Writing partition: $currentKey") currentWriter = newOutputWriter(currentKey, getPartitionString) + val partitionStr = getPartitionString(currentKey).getString(0) + if (partitionStr.nonEmpty) { + updatedPartitions ::= partitionStr + } } currentWriter.writeInternal(sortedIterator.getValue) } @@ -393,6 +401,7 @@ object WriteOutput extends Logging { currentWriter.close() currentWriter = null } + updatedPartitions } override def releaseResources(): Unit = { From 2484809e1735a7c3fc875f09c68c12d2cd99dd62 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 27 Oct 2016 17:53:13 -0700 Subject: [PATCH 02/15] Thu Oct 27 17:53:13 PDT 2016 --- .../execution/datasources/WriteOutput.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala index 563d00ead2ca1..b29bfd3793c18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources import java.util.{Date, UUID} +import scala.collection.mutable + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ @@ -129,7 +131,7 @@ object WriteOutput extends Logging { sparkPartitionId = taskContext.partitionId(), sparkAttemptNumber = taskContext.attemptNumber(), iterator = iter) - }).distinct.flatten + }).flatten.distinct committer.commitJob(job) logInfo(s"Job ${job.getJobID} committed.") @@ -148,7 +150,7 @@ object WriteOutput extends Logging { sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, - iterator: Iterator[InternalRow]): Seq[String] = { + iterator: Iterator[InternalRow]): Set[String] = { val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) @@ -215,7 +217,7 @@ object WriteOutput extends Logging { * automatically trigger task aborts. */ private trait ExecuteWriteTask { - def execute(iterator: Iterator[InternalRow]): Seq[String] + def execute(iterator: Iterator[InternalRow]): Set[String] def releaseResources(): Unit final def filePrefix(split: Int, uuid: String, bucketId: Option[Int]): String = { @@ -242,12 +244,12 @@ object WriteOutput extends Logging { outputWriter } - override def execute(iter: Iterator[InternalRow]): Seq[String] = { + override def execute(iter: Iterator[InternalRow]): Set[String] = { while (iter.hasNext) { val internalRow = iter.next() outputWriter.writeInternal(internalRow) } - Nil + Set.empty } override def releaseResources(): Unit = { @@ -330,7 +332,7 @@ object WriteOutput extends Logging { newWriter } - override def execute(iter: Iterator[InternalRow]): Seq[String] = { + override def execute(iter: Iterator[InternalRow]): Set[String] = { // We should first sort by partition columns, then bucket id, and finally sorting columns. val sortingExpressions: Seq[Expression] = description.partitionColumns ++ bucketIdExpression ++ sortColumns @@ -378,7 +380,7 @@ object WriteOutput extends Logging { // If anything below fails, we should abort the task. var currentKey: UnsafeRow = null - var updatedPartitions: List[String] = Nil + val updatedPartitions = mutable.Set[String]() while (sortedIterator.next()) { val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] if (currentKey != nextKey) { @@ -392,7 +394,7 @@ object WriteOutput extends Logging { currentWriter = newOutputWriter(currentKey, getPartitionString) val partitionStr = getPartitionString(currentKey).getString(0) if (partitionStr.nonEmpty) { - updatedPartitions ::= partitionStr + updatedPartitions.add(partitionStr) } } currentWriter.writeInternal(sortedIterator.getValue) @@ -401,7 +403,7 @@ object WriteOutput extends Logging { currentWriter.close() currentWriter = null } - updatedPartitions + updatedPartitions.toSet } override def releaseResources(): Unit = { From 4d967251ce01794f7cdab9f84b70fa5393d1d1f2 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 27 Oct 2016 17:53:30 -0700 Subject: [PATCH 03/15] Thu Oct 27 17:53:29 PDT 2016 --- .../spark/sql/execution/datasources/WriteOutput.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala index b29bfd3793c18..0eb86fdd6caa8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala @@ -392,9 +392,9 @@ object WriteOutput extends Logging { logDebug(s"Writing partition: $currentKey") currentWriter = newOutputWriter(currentKey, getPartitionString) - val partitionStr = getPartitionString(currentKey).getString(0) - if (partitionStr.nonEmpty) { - updatedPartitions.add(partitionStr) + val partitionPath = getPartitionString(currentKey).getString(0) + if (partitionPath.nonEmpty) { + updatedPartitions.add(partitionPath) } } currentWriter.writeInternal(sortedIterator.getValue) From 72c4294bb401ff3795363d3c0bb436bb56844630 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 10:56:49 -0700 Subject: [PATCH 04/15] WIP - commit API --- .../execution/datasources/WriteOutput.scala | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala index bd56e511d0ccf..490eac28a735a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala @@ -27,7 +27,6 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ @@ -35,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter} +import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -117,10 +117,12 @@ object WriteOutput extends Logging { SQLExecution.withNewExecutionId(sparkSession, queryExecution) { // This call shouldn't be put into the `try` block below because it only initializes and // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. - val committer = setupDriverCommitter(job, outputPath.toString, isAppend) + val committer = new MapReduceFileCommitterProtocol( + setupDriverCommitter(job, outputPath.toString, isAppend)) + committer.setupJob(job) try { - sparkSession.sparkContext.runJob(queryExecution.toRdd, + val commitMsgs = sparkSession.sparkContext.runJob(queryExecution.toRdd, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { executeTask( description = description, @@ -130,12 +132,12 @@ object WriteOutput extends Logging { iterator = iter) }) - committer.commitJob(job) + committer.commitJob(job, commitMsgs) logInfo(s"Job ${job.getJobID} committed.") refreshFunction() } catch { case cause: Throwable => logError(s"Aborting job ${job.getJobID}.", cause) - committer.abortJob(job, JobStatus.State.FAILED) + committer.abortJob(job) throw new SparkException("Job aborted.", cause) } } @@ -147,7 +149,7 @@ object WriteOutput extends Logging { sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, - iterator: Iterator[InternalRow]): Unit = { + iterator: Iterator[InternalRow]): TaskCommitMessage = { val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) @@ -166,32 +168,27 @@ object WriteOutput extends Logging { new TaskAttemptContextImpl(hadoopConf, taskAttemptId) } - val committer = newOutputCommitter( - description.outputFormatClass, taskAttemptContext, description.path, description.isAppend) + val committer = new MapReduceFileCommitterProtocol(newOutputCommitter( + description.outputFormatClass, taskAttemptContext, description.path, description.isAppend)) committer.setupTask(taskAttemptContext) // Figure out where we need to write data to for staging. // For FileOutputCommitter it has its own staging path called "work path". - val stagingPath = committer match { - case f: FileOutputCommitter => f.getWorkPath.toString - case _ => description.path - } + val stagingPath = committer.stagingDir.getOrElse(description.path) val writeTask = if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { - new SingleDirectoryWriteTask(description, taskAttemptContext, stagingPath) + new SingleDirectoryWriteTask(description, taskAttemptContext, committer, stagingPath) } else { - new DynamicPartitionWriteTask(description, taskAttemptContext, stagingPath) + new DynamicPartitionWriteTask(description, taskAttemptContext, committer, stagingPath) } try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - // Execute the task to write rows out + // Execute the task to write rows out and commit the task. writeTask.execute(iterator) writeTask.releaseResources() - - // Commit the task - SparkHadoopMapRedUtil.commitTask(committer, taskAttemptContext, jobId.getId, taskId.getId) + committer.commitTask(taskAttemptContext) })(catchBlock = { // If there is an error, release resource and then abort the task try { @@ -226,6 +223,7 @@ object WriteOutput extends Logging { private class SingleDirectoryWriteTask( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, + committer: FileCommitProtocol, stagingPath: String) extends ExecuteWriteTask { private[this] var outputWriter: OutputWriter = { @@ -237,6 +235,7 @@ object WriteOutput extends Logging { dataSchema = description.nonPartitionColumns.toStructType, context = taskAttemptContext) outputWriter.initConverter(dataSchema = description.nonPartitionColumns.toStructType) + committer.addTaskFile(taskAttemptContext, outputWriter.path) outputWriter } @@ -262,6 +261,7 @@ object WriteOutput extends Logging { private class DynamicPartitionWriteTask( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, + committer: FileCommitProtocol, stagingPath: String) extends ExecuteWriteTask { // currentWriter is initialized whenever we see a new key @@ -324,6 +324,7 @@ object WriteOutput extends Logging { dataSchema = description.nonPartitionColumns.toStructType, context = taskAttemptContext) newWriter.initConverter(description.nonPartitionColumns.toStructType) + committer.addTaskFile(taskAttemptContext, newWriter.path) newWriter } From 2a613516dd469bca5ed4d7b0f17f678e9e70e267 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 10:57:18 -0700 Subject: [PATCH 05/15] Add commit protocol itself --- .../datasources/FileCommitProtocol.scala | 117 ++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala new file mode 100644 index 0000000000000..de4568ded9bf4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + +import org.apache.spark.mapred.SparkHadoopMapRedUtil + + +object FileCommitProtocol { + class TaskCommitMessage(obj: Any) extends Serializable + + object EmptyTaskCommitMessage extends TaskCommitMessage(Unit) +} + + +/** + * An interface to define how a Spark job commits its outputs. + * + * The proper call sequence is: + * + * 1. Driver calls setupJob. + * 2. As part of each task's execution, executor calls setupTask[] and then commitTask + * (or abortTask if task failed). + * 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job + * failed to execute (e.g. too many failed tasks), the job should call abortJob. + */ +abstract class FileCommitProtocol { + import FileCommitProtocol._ + + /** + * The temporary location to write to, if available. + * + * If this function returns None, then Spark will always write directly to the final destination. + */ + def stagingDir: Option[String] + + def setupJob(jobContext: JobContext): Unit + + def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit + + def abortJob(jobContext: JobContext): Unit + + def setupTask(taskContext: TaskAttemptContext): Unit + + def addTaskFile(taskContext: TaskAttemptContext, path: String): Unit + + def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage + + def abortTask(taskContext: TaskAttemptContext): Unit +} + + +/** + * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter + * (confusingly from the newer mapreduce API, not the old mapred API). + */ +class MapReduceFileCommitterProtocol(committer: OutputCommitter) extends FileCommitProtocol { + import FileCommitProtocol._ + + def this(outputFormatClass: Class[_ <: OutputFormat[_, _]], taskContext: TaskAttemptContext) = { + this(outputFormatClass.newInstance().getOutputCommitter(taskContext)) + } + + override def stagingDir: Option[String] = committer match { + // For FileOutputCommitter it has its own staging path called "work path". + case f: FileOutputCommitter => Option(f.getWorkPath.toString) + case _ => None + } + + override def setupJob(jobContext: JobContext): Unit = { + committer.setupJob(jobContext) + } + + override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { + committer.commitJob(jobContext) + } + + override def abortJob(jobContext: JobContext): Unit = { + committer.abortJob(jobContext, JobStatus.State.FAILED) + } + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + committer.setupTask(taskContext) + } + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { + val attemptId = taskContext.getTaskAttemptID + SparkHadoopMapRedUtil.commitTask( + committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) + EmptyTaskCommitMessage + } + + override def abortTask(taskContext: TaskAttemptContext): Unit = { + committer.abortTask(taskContext) + } + + override def addTaskFile(taskContext: TaskAttemptContext, path: String): Unit = { + // Do nothing + } +} From 6af14b56590a0882800f62a2a2b939ee3715edbb Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 13:46:35 -0700 Subject: [PATCH 06/15] Move output committer instantiation into MapReduceFileCommitterProtocol. --- .../datasources/FileCommitProtocol.scala | 73 ++++++++++++++-- .../execution/datasources/WriteOutput.scala | 87 ++----------------- 2 files changed, 74 insertions(+), 86 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala index de4568ded9bf4..758fc8e3327e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala @@ -17,10 +17,17 @@ package org.apache.spark.sql.execution.datasources +import java.util.Date + +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.SparkHadoopWriter +import org.apache.spark.internal.Logging import org.apache.spark.mapred.SparkHadoopMapRedUtil +import org.apache.spark.sql.internal.SQLConf object FileCommitProtocol { @@ -36,7 +43,7 @@ object FileCommitProtocol { * The proper call sequence is: * * 1. Driver calls setupJob. - * 2. As part of each task's execution, executor calls setupTask[] and then commitTask + * 2. As part of each task's execution, executor calls setupTask and then commitTask * (or abortTask if task failed). * 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job * failed to execute (e.g. too many failed tasks), the job should call abortJob. @@ -69,13 +76,53 @@ abstract class FileCommitProtocol { /** * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter - * (confusingly from the newer mapreduce API, not the old mapred API). + * (from the newer mapreduce API, not the old mapred API). + * + * Unlike Hadoop's OutputCommitter, this implementation is serializable. */ -class MapReduceFileCommitterProtocol(committer: OutputCommitter) extends FileCommitProtocol { +class MapReduceFileCommitterProtocol(path: String, isAppend: Boolean) + extends FileCommitProtocol with Logging { + import FileCommitProtocol._ - def this(outputFormatClass: Class[_ <: OutputFormat[_, _]], taskContext: TaskAttemptContext) = { - this(outputFormatClass.newInstance().getOutputCommitter(taskContext)) + /** OutputCommitter from Hadoop is not always serializable. */ + @transient private var committer: OutputCommitter = _ + + private def setupCommitter(context: TaskAttemptContext): Unit = { + committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context) + + if (!isAppend) { + // If we are appending data to an existing dir, we will only use the output committer + // associated with the file output format since it is not safe to use a custom + // committer for appending. For example, in S3, direct parquet output committer may + // leave partial data in the destination dir when the appending job fails. + // See SPARK-8578 for more details. + val configuration = context.getConfiguration + val clazz = + configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) + + if (clazz != null) { + logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") + + // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat + // has an associated output committer. To override this output committer, + // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. + // If a data source needs to override the output committer, it needs to set the + // output committer in prepareForWrite method. + if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) { + // The specified output committer is a FileOutputCommitter. + // So, we will use the FileOutputCommitter-specified constructor. + val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) + committer = ctor.newInstance(new Path(path), context) + } else { + // The specified output committer is just an OutputCommitter. + // So, we will use the no-argument constructor. + val ctor = clazz.getDeclaredConstructor() + committer = ctor.newInstance() + } + } + } + logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") } override def stagingDir: Option[String] = committer match { @@ -85,6 +132,21 @@ class MapReduceFileCommitterProtocol(committer: OutputCommitter) extends FileCom } override def setupJob(jobContext: JobContext): Unit = { + // Setup IDs + val jobId = SparkHadoopWriter.createJobID(new Date, 0) + val taskId = new TaskID(jobId, TaskType.MAP, 0) + val taskAttemptId = new TaskAttemptID(taskId, 0) + + // Set up the configuration object + jobContext.getConfiguration.set("mapred.job.id", jobId.toString) + jobContext.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString) + jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString) + jobContext.getConfiguration.setBoolean("mapred.task.is.map", true) + jobContext.getConfiguration.setInt("mapred.task.partition", 0) + + val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) + setupCommitter(taskAttemptContext) + committer.setupJob(jobContext) } @@ -97,6 +159,7 @@ class MapReduceFileCommitterProtocol(committer: OutputCommitter) extends FileCom } override def setupTask(taskContext: TaskAttemptContext): Unit = { + setupCommitter(taskContext) committer.setupTask(taskContext) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala index 490eac28a735a..745f7bb7bec25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala @@ -22,7 +22,7 @@ import java.util.{Date, UUID} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ @@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter} import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter @@ -54,8 +53,7 @@ object WriteOutput extends Logging { val nonPartitionColumns: Seq[Attribute], val bucketSpec: Option[BucketSpec], val isAppend: Boolean, - val path: String, - val outputFormatClass: Class[_ <: OutputFormat[_, _]]) + val path: String) extends Serializable { assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ nonPartitionColumns), @@ -111,14 +109,12 @@ object WriteOutput extends Logging { nonPartitionColumns = dataColumns, bucketSpec = bucketSpec, isAppend = isAppend, - path = outputPath.toString, - outputFormatClass = job.getOutputFormatClass) + path = outputPath.toString) SQLExecution.withNewExecutionId(sparkSession, queryExecution) { // This call shouldn't be put into the `try` block below because it only initializes and // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. - val committer = new MapReduceFileCommitterProtocol( - setupDriverCommitter(job, outputPath.toString, isAppend)) + val committer = new MapReduceFileCommitterProtocol(outputPath.toString, isAppend) committer.setupJob(job) try { @@ -129,6 +125,7 @@ object WriteOutput extends Logging { sparkStageId = taskContext.stageId(), sparkPartitionId = taskContext.partitionId(), sparkAttemptNumber = taskContext.attemptNumber(), + committer, iterator = iter) }) @@ -149,6 +146,7 @@ object WriteOutput extends Logging { sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, + committer: FileCommitProtocol, iterator: Iterator[InternalRow]): TaskCommitMessage = { val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId) @@ -168,8 +166,6 @@ object WriteOutput extends Logging { new TaskAttemptContextImpl(hadoopConf, taskAttemptId) } - val committer = new MapReduceFileCommitterProtocol(newOutputCommitter( - description.outputFormatClass, taskAttemptContext, description.path, description.isAppend)) committer.setupTask(taskAttemptContext) // Figure out where we need to write data to for staging. @@ -403,75 +399,4 @@ object WriteOutput extends Logging { } } } - - private def setupDriverCommitter(job: Job, path: String, isAppend: Boolean): OutputCommitter = { - // Setup IDs - val jobId = SparkHadoopWriter.createJobID(new Date, 0) - val taskId = new TaskID(jobId, TaskType.MAP, 0) - val taskAttemptId = new TaskAttemptID(taskId, 0) - - // Set up the configuration object - job.getConfiguration.set("mapred.job.id", jobId.toString) - job.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString) - job.getConfiguration.set("mapred.task.id", taskAttemptId.toString) - job.getConfiguration.setBoolean("mapred.task.is.map", true) - job.getConfiguration.setInt("mapred.task.partition", 0) - - val taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration, taskAttemptId) - val outputCommitter = newOutputCommitter( - job.getOutputFormatClass, taskAttemptContext, path, isAppend) - outputCommitter.setupJob(job) - outputCommitter - } - - private def newOutputCommitter( - outputFormatClass: Class[_ <: OutputFormat[_, _]], - context: TaskAttemptContext, - path: String, - isAppend: Boolean): OutputCommitter = { - val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) - - if (isAppend) { - // If we are appending data to an existing dir, we will only use the output committer - // associated with the file output format since it is not safe to use a custom - // committer for appending. For example, in S3, direct parquet output committer may - // leave partial data in the destination dir when the appending job fails. - // See SPARK-8578 for more details - logInfo( - s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + - "for appending.") - defaultOutputCommitter - } else { - val configuration = context.getConfiguration - val clazz = - configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) - - if (clazz != null) { - logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") - - // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat - // has an associated output committer. To override this output committer, - // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. - // If a data source needs to override the output committer, it needs to set the - // output committer in prepareForWrite method. - if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) { - // The specified output committer is a FileOutputCommitter. - // So, we will use the FileOutputCommitter-specified constructor. - val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - ctor.newInstance(new Path(path), context) - } else { - // The specified output committer is just an OutputCommitter. - // So, we will use the no-argument constructor. - val ctor = clazz.getDeclaredConstructor() - ctor.newInstance() - } - } else { - // If output committer class is not set, we will use the one associated with the - // file output format. - logInfo( - s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}") - defaultOutputCommitter - } - } - } } From 6166093d511e833587d32e398338e2f47ccbcc8a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 13:50:13 -0700 Subject: [PATCH 07/15] Specify that implementations must be serializable. --- .../spark/sql/execution/datasources/FileCommitProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala index 758fc8e3327e6..b95d1775bea56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala @@ -38,7 +38,7 @@ object FileCommitProtocol { /** - * An interface to define how a Spark job commits its outputs. + * An interface to define how a Spark job commits its outputs. Implementations must be serializable. * * The proper call sequence is: * From 040bbba0bdbd647f963b7a61e18b69fd62565201 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 15:16:05 -0700 Subject: [PATCH 08/15] Specify path --- .../ml/source/libsvm/LibSVMRelation.scala | 17 +++---- .../datasources/FileCommitProtocol.scala | 41 ++++++++++------- .../execution/datasources/OutputWriter.scala | 26 +++-------- .../execution/datasources/WriteOutput.scala | 45 +++++++------------ .../datasources/csv/CSVRelation.scala | 17 +++---- .../datasources/json/JsonFileFormat.scala | 17 +++---- .../parquet/ParquetFileFormat.scala | 8 +++- .../parquet/ParquetOutputWriter.scala | 19 +++----- .../datasources/text/TextFileFormat.scala | 17 +++---- .../spark/sql/hive/orc/OrcFileFormat.scala | 28 +++++------- .../sql/sources/CommitFailureTestSource.scala | 10 ++--- .../sql/sources/SimpleTextRelation.scala | 19 ++++---- 12 files changed, 111 insertions(+), 153 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 5e9e6ff1a5690..cb3ca1b6c4bea 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -41,17 +41,11 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration private[libsvm] class LibSVMOutputWriter( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext) extends OutputWriter { - override val path: String = { - val compressionExtension = TextOutputWriter.getCompressionExtension(context) - new Path(stagingDir, fileNamePrefix + ".libsvm" + compressionExtension).toString - } - private[this] val buffer = new Text() private val recordWriter: RecordWriter[NullWritable, Text] = { @@ -135,11 +129,14 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour dataSchema: StructType): OutputWriterFactory = { new OutputWriterFactory { override def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new LibSVMOutputWriter(stagingDir, fileNamePrefix, dataSchema, context) + new LibSVMOutputWriter(path, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + ".libsvm" + TextOutputWriter.getCompressionExtension(context) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala index b95d1775bea56..630ddc76daba9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources -import java.util.Date +import java.util.{Date, UUID} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ @@ -51,13 +51,6 @@ object FileCommitProtocol { abstract class FileCommitProtocol { import FileCommitProtocol._ - /** - * The temporary location to write to, if available. - * - * If this function returns None, then Spark will always write directly to the final destination. - */ - def stagingDir: Option[String] - def setupJob(jobContext: JobContext): Unit def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit @@ -66,7 +59,7 @@ abstract class FileCommitProtocol { def setupTask(taskContext: TaskAttemptContext): Unit - def addTaskFile(taskContext: TaskAttemptContext, path: String): Unit + def addTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage @@ -88,6 +81,9 @@ class MapReduceFileCommitterProtocol(path: String, isAppend: Boolean) /** OutputCommitter from Hadoop is not always serializable. */ @transient private var committer: OutputCommitter = _ + /** UUID used to identify the job in file name. */ + private val uuid: String = UUID.randomUUID().toString + private def setupCommitter(context: TaskAttemptContext): Unit = { committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context) @@ -125,10 +121,25 @@ class MapReduceFileCommitterProtocol(path: String, isAppend: Boolean) logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") } - override def stagingDir: Option[String] = committer match { - // For FileOutputCommitter it has its own staging path called "work path". - case f: FileOutputCommitter => Option(f.getWorkPath.toString) - case _ => None + override def addTaskTempFile( + taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { + // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + val filename = f"part-$split%05d-$uuid$ext" + + val stagingDir: String = committer match { + // For FileOutputCommitter it has its own staging path called "work path". + case f: FileOutputCommitter => Option(f.getWorkPath.toString).getOrElse(path) + case _ => path + } + + dir.map { d => + new Path(new Path(stagingDir, d), filename).toString + }.getOrElse { + new Path(stagingDir, filename).toString + } } override def setupJob(jobContext: JobContext): Unit = { @@ -173,8 +184,4 @@ class MapReduceFileCommitterProtocol(path: String, isAppend: Boolean) override def abortTask(taskContext: TaskAttemptContext): Unit = { committer.abortTask(taskContext) } - - override def addTaskFile(taskContext: TaskAttemptContext, path: String): Unit = { - // Do nothing - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala index fbf6e96d3f850..a73c8146c1b0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala @@ -30,28 +30,21 @@ import org.apache.spark.sql.types.StructType * to executor side to create actual [[OutputWriter]]s on the fly. */ abstract class OutputWriterFactory extends Serializable { + + /** Returns the file extension to be used when writing files out. */ + def getFileExtension(context: TaskAttemptContext): String + /** * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side * to instantiate new [[OutputWriter]]s. * - * @param stagingDir Base path (directory) of the file to which this [[OutputWriter]] is supposed - * to write. Note that this may not point to the final output file. For - * example, `FileOutputFormat` writes to temporary directories and then merge - * written files back to the final destination. In this case, `path` points to - * a temporary output file under the temporary directory. - * @param fileNamePrefix Prefix of the file name. The returned OutputWriter must make sure this - * prefix is used in the actual file name. For example, if the prefix is - * "part-1-2-3", then the file name must start with "part_1_2_3" but can - * end in arbitrary extension that is deterministic given the configuration - * (i.e. the suffix extension should not depend on any task id, attempt id, - * or partition id). + * @param path Path to write the file. * @param dataSchema Schema of the rows to be written. Partition columns are not included in the * schema if the relation being written is partitioned. * @param context The Hadoop MapReduce task context. */ def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter @@ -77,13 +70,6 @@ abstract class OutputWriterFactory extends Serializable { * executor side. This instance is used to persist rows to this single output file. */ abstract class OutputWriter { - - /** - * The path of the file to be written out. This path should include the staging directory and - * the file name prefix passed into the associated createOutputWriter function. - */ - def path: String - /** * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned * tables, dynamic partition columns are not included in rows to be written. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala index 745f7bb7bec25..d9977eb232d86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala @@ -168,15 +168,11 @@ object WriteOutput extends Logging { committer.setupTask(taskAttemptContext) - // Figure out where we need to write data to for staging. - // For FileOutputCommitter it has its own staging path called "work path". - val stagingPath = committer.stagingDir.getOrElse(description.path) - val writeTask = if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { - new SingleDirectoryWriteTask(description, taskAttemptContext, committer, stagingPath) + new SingleDirectoryWriteTask(description, taskAttemptContext, committer) } else { - new DynamicPartitionWriteTask(description, taskAttemptContext, committer, stagingPath) + new DynamicPartitionWriteTask(description, taskAttemptContext, committer) } try { @@ -211,7 +207,7 @@ object WriteOutput extends Logging { final def filePrefix(split: Int, uuid: String, bucketId: Option[Int]): String = { val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") - f"part-r-$split%05d-$uuid$bucketString" + f"part-$split%05d-$uuid$bucketString" } } @@ -219,19 +215,19 @@ object WriteOutput extends Logging { private class SingleDirectoryWriteTask( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol, - stagingPath: String) extends ExecuteWriteTask { + committer: FileCommitProtocol) extends ExecuteWriteTask { private[this] var outputWriter: OutputWriter = { - val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId + val tmpFilePath = committer.addTaskTempFile( + taskAttemptContext, + None, + description.outputWriterFactory.getFileExtension(taskAttemptContext)) val outputWriter = description.outputWriterFactory.newInstance( - stagingDir = stagingPath, - fileNamePrefix = filePrefix(split, description.uuid, None), + path = tmpFilePath, dataSchema = description.nonPartitionColumns.toStructType, context = taskAttemptContext) outputWriter.initConverter(dataSchema = description.nonPartitionColumns.toStructType) - committer.addTaskFile(taskAttemptContext, outputWriter.path) outputWriter } @@ -257,8 +253,7 @@ object WriteOutput extends Logging { private class DynamicPartitionWriteTask( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol, - stagingPath: String) extends ExecuteWriteTask { + committer: FileCommitProtocol) extends ExecuteWriteTask { // currentWriter is initialized whenever we see a new key private var currentWriter: OutputWriter = _ @@ -298,29 +293,23 @@ object WriteOutput extends Logging { * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet */ private def newOutputWriter(key: InternalRow, partString: UnsafeProjection): OutputWriter = { - val path = - if (description.partitionColumns.nonEmpty) { - val partitionPath = partString(key).getString(0) - new Path(stagingPath, partitionPath).toString - } else { - stagingPath - } + val partDir = + if (description.partitionColumns.isEmpty) None else Option(partString(key).getString(0)) // If the bucket spec is defined, the bucket column is right after the partition columns val bucketId = if (description.bucketSpec.isDefined) { - Some(key.getInt(description.partitionColumns.length)) + BucketingUtils.bucketIdToString(key.getInt(description.partitionColumns.length)) } else { - None + "" } + val ext = bucketId + description.outputWriterFactory.getFileExtension(taskAttemptContext) - val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId + val path = committer.addTaskTempFile(taskAttemptContext, partDir, ext) val newWriter = description.outputWriterFactory.newInstance( - stagingDir = path, - fileNamePrefix = filePrefix(split, description.uuid, bucketId), + path = path, dataSchema = description.nonPartitionColumns.toStructType, context = taskAttemptContext) newWriter.initConverter(description.nonPartitionColumns.toStructType) - committer.addTaskFile(taskAttemptContext, newWriter.path) newWriter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index a35cfdb2c234f..a249b9d9d59b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -171,26 +171,23 @@ object CSVRelation extends Logging { private[csv] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory { override def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new CsvOutputWriter(stagingDir, fileNamePrefix, dataSchema, context, params) + new CsvOutputWriter(path, dataSchema, context, params) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + ".csv" + TextOutputWriter.getCompressionExtension(context) } } private[csv] class CsvOutputWriter( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext, params: CSVOptions) extends OutputWriter with Logging { - override val path: String = { - val compressionExtension = TextOutputWriter.getCompressionExtension(context) - new Path(stagingDir, fileNamePrefix + ".csv" + compressionExtension).toString - } - // create the Generator without separator inserted between 2 records private[this] val text = new Text() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 651fa78a4e924..5a409c04c929d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -83,11 +83,14 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { new OutputWriterFactory { override def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new JsonOutputWriter(stagingDir, parsedOptions, fileNamePrefix, dataSchema, context) + new JsonOutputWriter(path, parsedOptions, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + ".json" + TextOutputWriter.getCompressionExtension(context) } } } @@ -154,18 +157,12 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { } private[json] class JsonOutputWriter( - stagingDir: String, + path: String, options: JSONOptions, - fileNamePrefix: String, dataSchema: StructType, context: TaskAttemptContext) extends OutputWriter with Logging { - override val path: String = { - val compressionExtension = TextOutputWriter.getCompressionExtension(context) - new Path(stagingDir, fileNamePrefix + ".json" + compressionExtension).toString - } - private[this] val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records private[this] val gen = new JacksonGenerator(dataSchema, writer, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 502dd0e8d4cf9..77c83ba38efee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -33,6 +33,7 @@ import org.apache.parquet.{Log => ApacheParquetLog} import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.hadoop._ +import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil import org.apache.parquet.schema.MessageType import org.slf4j.bridge.SLF4JBridgeHandler @@ -133,10 +134,13 @@ class ParquetFileFormat new OutputWriterFactory { override def newInstance( path: String, - fileNamePrefix: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new ParquetOutputWriter(path, fileNamePrefix, context) + new ParquetOutputWriter(path, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + CodecConfig.from(context).getCodec.getExtension + ".parquet" } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala index 1300069c42b05..92d4f27be3fd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala @@ -89,7 +89,7 @@ private[parquet] class ParquetOutputWriterFactory( * Returns a [[OutputWriter]] that writes data to the give path without using * [[OutputCommitter]]. */ - override def newWriter(path1: String): OutputWriter = new OutputWriter { + override def newWriter(path: String): OutputWriter = new OutputWriter { // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) @@ -99,8 +99,6 @@ private[parquet] class ParquetOutputWriterFactory( // Instance of ParquetRecordWriter that does not use OutputCommitter private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext) - override def path: String = path1 - override def write(row: Row): Unit = { throw new UnsupportedOperationException("call writeInternal") } @@ -127,27 +125,22 @@ private[parquet] class ParquetOutputWriterFactory( /** Disable the use of the older API. */ override def newInstance( path: String, - fileNamePrefix: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { throw new UnsupportedOperationException("this version of newInstance not supported for " + "ParquetOutputWriterFactory") } + + override def getFileExtension(context: TaskAttemptContext): String = { + CodecConfig.from(context).getCodec.getExtension + ".parquet" + } } // NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[parquet] class ParquetOutputWriter( - stagingDir: String, - fileNamePrefix: String, - context: TaskAttemptContext) +private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter { - override val path: String = { - val filename = fileNamePrefix + CodecConfig.from(context).getCodec.getExtension + ".parquet" - new Path(stagingDir, filename).toString - } - private val recordWriter: RecordWriter[Void, InternalRow] = { new ParquetOutputFormat[InternalRow]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index d40b5725199a8..8e043960326df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -75,11 +75,14 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { new OutputWriterFactory { override def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new TextOutputWriter(stagingDir, fileNamePrefix, dataSchema, context) + new TextOutputWriter(path, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + ".txt" + TextOutputWriter.getCompressionExtension(context) } } } @@ -124,17 +127,11 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { } class TextOutputWriter( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext) extends OutputWriter { - override val path: String = { - val compressionExtension = TextOutputWriter.getCompressionExtension(context) - new Path(stagingDir, fileNamePrefix + ".txt" + compressionExtension).toString - } - private[this] val buffer = new Text() private val recordWriter: RecordWriter[NullWritable, Text] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index eba7aa386ade2..7c519a074317a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -83,11 +83,19 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable new OutputWriterFactory { override def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new OrcOutputWriter(stagingDir, fileNamePrefix, dataSchema, context) + new OrcOutputWriter(path, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + val compressionExtension: String = { + val name = context.getConfiguration.get(OrcRelation.ORC_COMPRESSION) + OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") + } + + compressionExtension + ".orc" } } } @@ -210,23 +218,11 @@ private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration) } private[orc] class OrcOutputWriter( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext) extends OutputWriter { - override val path: String = { - val compressionExtension: String = { - val name = context.getConfiguration.get(OrcRelation.ORC_COMPRESSION) - OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") - } - // It has the `.orc` extension at the end because (de)compression tools - // such as gunzip would not be able to decompress this as the compression - // is not applied on this whole file but on each "stream" in ORC format. - new Path(stagingDir, fileNamePrefix + compressionExtension + ".orc").toString - } - private[this] val serializer = new OrcSerializer(dataSchema, context.getConfiguration) // `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala index 731540db17eeb..abc7c8cc4db89 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.sources -import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.TaskContext @@ -40,19 +39,16 @@ class CommitFailureTestSource extends SimpleTextSource { dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory { override def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context) { + new SimpleTextOutputWriter(path, context) { var failed = false TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => failed = true SimpleTextRelation.callbackCalled = true } - override val path: String = new Path(stagingDir, fileNamePrefix).toString - override def write(row: Row): Unit = { if (SimpleTextRelation.failWriter) { sys.error("Intentional task writer failure for testing purpose.") @@ -67,6 +63,8 @@ class CommitFailureTestSource extends SimpleTextSource { } } } + + override def getFileExtension(context: TaskAttemptContext): String = "" } override def shortName(): String = "commit-failure-test" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 9896b9bde99c8..64d0ecbeefc98 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -51,12 +51,13 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister { SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration) new OutputWriterFactory { override def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context) + new SimpleTextOutputWriter(path, context) } + + override def getFileExtension(context: TaskAttemptContext): String = "" } } @@ -120,14 +121,11 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister { } } -class SimpleTextOutputWriter( - stagingDir: String, fileNamePrefix: String, context: TaskAttemptContext) +class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter { - override val path: String = new Path(stagingDir, fileNamePrefix).toString - private val recordWriter: RecordWriter[NullWritable, Text] = - new AppendingTextOutputFormat(new Path(stagingDir), fileNamePrefix).getRecordWriter(context) + new AppendingTextOutputFormat(path).getRecordWriter(context) override def write(row: Row): Unit = { val serialized = row.toSeq.map { v => @@ -141,15 +139,14 @@ class SimpleTextOutputWriter( } } -class AppendingTextOutputFormat(stagingDir: Path, fileNamePrefix: String) - extends TextOutputFormat[NullWritable, Text] { +class AppendingTextOutputFormat(path: String) extends TextOutputFormat[NullWritable, Text] { val numberFormat = NumberFormat.getInstance() numberFormat.setMinimumIntegerDigits(5) numberFormat.setGroupingUsed(false) override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - new Path(stagingDir, fileNamePrefix) + new Path(path) } } From 51d0919577c71155adb7d4737e9441cede8fe97d Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 15:36:46 -0700 Subject: [PATCH 09/15] Add documentation. --- .../datasources/FileCommitProtocol.scala | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala index 630ddc76daba9..1abd2575922d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala @@ -51,18 +51,54 @@ object FileCommitProtocol { abstract class FileCommitProtocol { import FileCommitProtocol._ + /** + * Setups up a job. Must be called on the driver before any other methods can be invoked. + */ def setupJob(jobContext: JobContext): Unit + /** + * Commits a job after the writes succeed. Must be called on the driver. + */ def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit + /** + * Aborts a job after the writes fail. Must be called on the driver. + * + * Calling this function is a best-effort attempt, because it is possible that the driver + * just crashes (or killed) before it can call abort. + */ def abortJob(jobContext: JobContext): Unit + /** + * Sets up a task within a job. + * Must be called before any other task related methods can be invoked. + */ def setupTask(taskContext: TaskAttemptContext): Unit + /** + * Notifies the commit protocol to add a new file, and gets back the full path that should be + * used. Must be called on the executors when running tasks. + * + * A full file path consists of the following parts: + * 1. the base path + * 2. some sub-directory within the base path, used to specify partitioning + * 3. file prefix, usually some unique job id with the task id + * 4. bucket id + * 5. source specific file extension, e.g. ".snappy.parquet" + * + * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest + * are left to the commit protocol implementation to decide. + */ def addTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String + /** + * Commits a task after the writes succeed. Must be called on the executors when running tasks. + */ def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage + /** + * Aborts a task after the writes have failed. Must be called on the executors when running tasks. + */ def abortTask(taskContext: TaskAttemptContext): Unit } From 2d7d373fe48d18037653c10424c8b1c978160958 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 15:43:54 -0700 Subject: [PATCH 10/15] Make MapReduceFileCommitterProtocol serializable. --- .../spark/sql/execution/datasources/FileCommitProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala index 1abd2575922d1..efee4725d5b88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala @@ -110,7 +110,7 @@ abstract class FileCommitProtocol { * Unlike Hadoop's OutputCommitter, this implementation is serializable. */ class MapReduceFileCommitterProtocol(path: String, isAppend: Boolean) - extends FileCommitProtocol with Logging { + extends FileCommitProtocol with Serializable with Logging { import FileCommitProtocol._ From cd23d2f7bdf7a3ef9b93e77a3ae540d553398267 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 17:34:31 -0700 Subject: [PATCH 11/15] Make protocol configurable. --- .../datasources/FileCommitProtocol.scala | 26 ++++++++++++++++- .../execution/datasources/WriteOutput.scala | 5 +++- .../apache/spark/sql/internal/SQLConf.scala | 29 +++++++++++++------ 3 files changed, 49 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala index efee4725d5b88..322504cdc9a52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala @@ -28,12 +28,36 @@ import org.apache.spark.SparkHadoopWriter import org.apache.spark.internal.Logging import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils object FileCommitProtocol { class TaskCommitMessage(obj: Any) extends Serializable object EmptyTaskCommitMessage extends TaskCommitMessage(Unit) + + /** + * Instantiates a FileCommitProtocol using the given className. + */ + def instantiate(className: String, outputPath: String, isAppend: Boolean): FileCommitProtocol = { + try { + val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]] + + // First try the one with argument (outputPath: String, isAppend: Boolean). + // If that doesn't exist, try the one with (outputPath: String). + try { + val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[Boolean]) + ctor.newInstance(outputPath, isAppend.asInstanceOf[java.lang.Boolean]) + } catch { + case _: NoSuchMethodException => + val ctor = clazz.getDeclaredConstructor(classOf[String]) + ctor.newInstance(outputPath) + } + } catch { + case e: ClassNotFoundException => + throw e + } + } } @@ -114,7 +138,7 @@ class MapReduceFileCommitterProtocol(path: String, isAppend: Boolean) import FileCommitProtocol._ - /** OutputCommitter from Hadoop is not always serializable. */ + /** OutputCommitter from Hadoop is not serializable so marking it transient. */ @transient private var committer: OutputCommitter = _ /** UUID used to identify the job in file name. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala index d9977eb232d86..9ffb20da070eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala @@ -114,7 +114,10 @@ object WriteOutput extends Logging { SQLExecution.withNewExecutionId(sparkSession, queryExecution) { // This call shouldn't be put into the `try` block below because it only initializes and // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. - val committer = new MapReduceFileCommitterProtocol(outputPath.toString, isAppend) + val committer = FileCommitProtocol.instantiate( + sparkSession.sessionState.conf.fileCommitProtocolClass, + outputPath.toString, + isAppend) committer.setupJob(job) try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dc31f3bc323f6..00de7df782544 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.execution.datasources.MapReduceFileCommitterProtocol import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -240,9 +241,8 @@ object SQLConf { val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class") .doc("The output committer class used by Parquet. The specified class needs to be a " + "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " + - "of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " + - "option must be set in Hadoop Configuration. 2. This option overrides " + - "\"spark.sql.sources.outputCommitterClass\".") + "of org.apache.parquet.hadoop.ParquetOutputCommitter.") + .internal() .stringConf .createWithDefault(classOf[ParquetOutputCommitter].getName) @@ -375,16 +375,17 @@ object SQLConf { .booleanConf .createWithDefault(true) - // The output committer class used by HadoopFsRelation. The specified class needs to be a + // The output committer class used by data sources. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. - // - // NOTE: - // - // 1. Instead of SQLConf, this option *must be set in Hadoop Configuration*. - // 2. This option can be overridden by "spark.sql.parquet.output.committer.class". val OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional + val FILE_COMMIT_PROTOCOL_CLASS = + SQLConfigBuilder("spark.sql.sources.commitProtocolClass") + .internal() + .stringConf + .createWithDefault(classOf[MapReduceFileCommitterProtocol].getName) + val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold") .doc("The maximum number of files allowed for listing files at driver side. If the number " + @@ -518,6 +519,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_FILE_COMMIT_PROTOCOL_CLASS = + SQLConfigBuilder("spark.sql.streaming.commitProtocolClass") + .internal() + .stringConf + .createWithDefault(classOf[MapReduceFileCommitterProtocol].getName) + val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion") .internal() .doc("Whether to delete the expired log files in file stream sink.") @@ -631,6 +638,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) + def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS) + def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) def fileSinkLogCompactInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL) @@ -741,6 +750,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def partitionColumnTypeInferenceEnabled: Boolean = getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE) + def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS) + def parallelPartitionDiscoveryThreshold: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) From 98a17922be97bb2c6b00c57543a5ee14213a7f97 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 19:52:53 -0700 Subject: [PATCH 12/15] Fix compilation error --- .../apache/spark/sql/execution/datasources/WriteOutput.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala index 49ab716e6b2db..7d0ee9edb63da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala @@ -133,7 +133,7 @@ object WriteOutput extends Logging { sparkAttemptNumber = taskContext.attemptNumber(), committer, iterator = iter) - }).flatten.distinct + }) val commitMsgs = ret.map(_._1) val updatedPartitions = ret.flatMap(_._2).map(PartitioningUtils.parsePathFragment) From 0177ded3357a195f48e8e23923b763937ff60cac Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 20:19:47 -0700 Subject: [PATCH 13/15] Code review --- .../datasources/FileCommitProtocol.scala | 15 +++++++++++---- .../sql/execution/datasources/WriteOutput.scala | 4 ++-- .../org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala index 322504cdc9a52..1ce9ae4266c1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala @@ -62,7 +62,8 @@ object FileCommitProtocol { /** - * An interface to define how a Spark job commits its outputs. Implementations must be serializable. + * An interface to define how a Spark job commits its outputs. Implementations must be serializable, + * as the committer instance instantiated on the driver will be used for tasks on executors. * * The proper call sequence is: * @@ -103,6 +104,9 @@ abstract class FileCommitProtocol { * Notifies the commit protocol to add a new file, and gets back the full path that should be * used. Must be called on the executors when running tasks. * + * Note that the returned temp file may have an arbitrary path. The commit protocol only + * promises that the file will be at the location specified by the arguments after job commit. + * * A full file path consists of the following parts: * 1. the base path * 2. some sub-directory within the base path, used to specify partitioning @@ -113,7 +117,7 @@ abstract class FileCommitProtocol { * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest * are left to the commit protocol implementation to decide. */ - def addTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String + def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String /** * Commits a task after the writes succeed. Must be called on the executors when running tasks. @@ -122,6 +126,9 @@ abstract class FileCommitProtocol { /** * Aborts a task after the writes have failed. Must be called on the executors when running tasks. + * + * Calling this function is a best-effort attempt, because it is possible that the executor + * just crashes (or killed) before it can call abort. */ def abortTask(taskContext: TaskAttemptContext): Unit } @@ -133,7 +140,7 @@ abstract class FileCommitProtocol { * * Unlike Hadoop's OutputCommitter, this implementation is serializable. */ -class MapReduceFileCommitterProtocol(path: String, isAppend: Boolean) +class HadoopCommitProtocolWrapper(path: String, isAppend: Boolean) extends FileCommitProtocol with Serializable with Logging { import FileCommitProtocol._ @@ -181,7 +188,7 @@ class MapReduceFileCommitterProtocol(path: String, isAppend: Boolean) logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") } - override def addTaskTempFile( + override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala index 7d0ee9edb63da..8ce8501191a50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala @@ -226,7 +226,7 @@ object WriteOutput extends Logging { committer: FileCommitProtocol) extends ExecuteWriteTask { private[this] var outputWriter: OutputWriter = { - val tmpFilePath = committer.addTaskTempFile( + val tmpFilePath = committer.newTaskTempFile( taskAttemptContext, None, description.outputWriterFactory.getFileExtension(taskAttemptContext)) @@ -313,7 +313,7 @@ object WriteOutput extends Logging { } val ext = bucketId + description.outputWriterFactory.getFileExtension(taskAttemptContext) - val path = committer.addTaskTempFile(taskAttemptContext, partDir, ext) + val path = committer.newTaskTempFile(taskAttemptContext, partDir, ext) val newWriter = description.outputWriterFactory.newInstance( path = path, dataSchema = description.nonPartitionColumns.toStructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 00de7df782544..fd2501aa9400b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.execution.datasources.MapReduceFileCommitterProtocol +import org.apache.spark.sql.execution.datasources.HadoopCommitProtocolWrapper import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -384,7 +384,7 @@ object SQLConf { SQLConfigBuilder("spark.sql.sources.commitProtocolClass") .internal() .stringConf - .createWithDefault(classOf[MapReduceFileCommitterProtocol].getName) + .createWithDefault(classOf[HadoopCommitProtocolWrapper].getName) val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold") @@ -523,7 +523,7 @@ object SQLConf { SQLConfigBuilder("spark.sql.streaming.commitProtocolClass") .internal() .stringConf - .createWithDefault(classOf[MapReduceFileCommitterProtocol].getName) + .createWithDefault(classOf[HadoopCommitProtocolWrapper].getName) val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion") .internal() From 66b79e534435dbcd4474199180a40319771b997b Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 20:25:32 -0700 Subject: [PATCH 14/15] Add back distinct --- .../apache/spark/sql/execution/datasources/WriteOutput.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala index 8ce8501191a50..a07855111b401 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala @@ -136,7 +136,7 @@ object WriteOutput extends Logging { }) val commitMsgs = ret.map(_._1) - val updatedPartitions = ret.flatMap(_._2).map(PartitioningUtils.parsePathFragment) + val updatedPartitions = ret.flatMap(_._2).distinct.map(PartitioningUtils.parsePathFragment) committer.commitJob(job, commitMsgs) logInfo(s"Job ${job.getJobID} committed.") From 65ba5c14ec976d79fe9ee118807663496d0b7845 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 31 Oct 2016 20:39:25 -0700 Subject: [PATCH 15/15] Indent --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fd2501aa9400b..29e79847aa38b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -521,9 +521,9 @@ object SQLConf { val STREAMING_FILE_COMMIT_PROTOCOL_CLASS = SQLConfigBuilder("spark.sql.streaming.commitProtocolClass") - .internal() - .stringConf - .createWithDefault(classOf[HadoopCommitProtocolWrapper].getName) + .internal() + .stringConf + .createWithDefault(classOf[HadoopCommitProtocolWrapper].getName) val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion") .internal()