From 51beb713d3ef2d0209706298e1f9245533bf3bca Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 19 Apr 2016 15:29:47 +0800 Subject: [PATCH 01/17] [SPARK-12919][SPARKR] Implement dapply() on DataFrame in SparkR. --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 35 +++++++++++ R/pkg/R/generics.R | 4 ++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 33 ++++++++++ R/pkg/inst/worker/worker.R | 19 +++++- .../scala/org/apache/spark/api/r/RRDD.scala | 2 +- .../org/apache/spark/api/r/RRunner.scala | 8 ++- .../scala/org/apache/spark/api/r/SerDe.scala | 2 +- .../sql/catalyst/plans/logical/object.scala | 55 +++++++++++++++- .../scala/org/apache/spark/sql/Dataset.scala | 18 ++++++ .../org/apache/spark/sql/api/r/SQLUtils.scala | 32 +++++++++- .../spark/sql/execution/SparkStrategies.scala | 3 + .../execution/r/MapPartitionsRWrapper.scala | 62 +++++++++++++++++++ 13 files changed, 263 insertions(+), 11 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index ea31baed3d97e..a9f0508e2feb2 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -45,6 +45,7 @@ exportMethods("arrange", "covar_samp", "covar_pop", "crosstab", + "dapply", "describe", "dim", "distinct", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 36aedfae86b33..14def637085e7 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1125,6 +1125,41 @@ setMethod("summarize", agg(x, ...) }) +#' dapply +#' +#' Apply a function to each partition of a DataFrame. +#' +#' @param x A DataFrame +#' @param func A function to be applied to each partition of the DataFrame. +#' @param schema Optional. The schema of the resulting DataFrame after the function is applied. +#' If NULL, the content of the resulting DataFrame is serialized R data. +#' @family DataFrame functions +#' @rdname dapply +#' @name dapply +#' @export +setMethod("dapply", + signature(x = "DataFrame", func = "function"), + function(x, func, schema = NULL) { + if (!is.null(schema)) { + stopifnot(class(schema) == "structType") + } + + packageNamesArr <- serialize(.sparkREnv[[".packages"]], + connection = NULL) + + broadcastArr <- lapply(ls(.broadcastNames), + function(name) { get(name, .broadcastNames) }) + + sdf <- callJStatic( + "org.apache.spark.sql.api.r.SQLUtils", + "dapply", + x@sdf, + serialize(cleanClosure(func), connection = NULL), + packageNamesArr, + broadcastArr, + schema$jobj) + dataFrame(sdf) + }) ############################## RDD Map Functions ################################## # All of the following functions mirror the existing RDD map functions, # diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 62907118ef3b3..b72927f7a0b91 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -446,6 +446,10 @@ setGeneric("covar_samp", function(col1, col2) {standardGeneric("covar_samp") }) #' @export setGeneric("covar_pop", function(col1, col2) {standardGeneric("covar_pop") }) +#' @rdname dapply +#' @export +setGeneric("dapply", function(x, func, schema = NULL) { standardGeneric("dapply") }) + #' @rdname summary #' @export setGeneric("describe", function(x, col, ...) { standardGeneric("describe") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 336068035eaf8..80591be2d63a0 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2017,6 +2017,39 @@ test_that("Histogram", { df <- as.DataFrame(sqlContext, data.frame(x = c(1, 2, 3, 4, 100))) expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1)) }) + +test_that("dapply() on a DataFrame", { + df <- createDataFrame (sqlContext, mtcars) + df1 <- dapply(df, function(x) { x }, schema(df)) + result <- collect(df1) + expected <- mtcars + rownames(expected) <- NULL + expect_identical(expected, result) + + df1 <- dapply(df, function(x) { x + 1 }, schema(df)) + result <- collect(df1) + expected <- mtcars + 1 + rownames(expected) <- NULL + expect_identical(expected, result) + + schema <- structType(structField("a", "double"), structField("b", "double")) + df1 <- dapply(df, function(x) { x[, 1:2, drop = F] }, schema) + result <- collect(df1) + expected <- mtcars[, 1:2, drop = F] + names(expected) <- c("a", "b") + rownames(expected) <- NULL + expect_identical(expected, result) + + df1 <- dapply(df, function(x) { x + 1 }) + schema <- structType(structField("a", "double"), structField("b", "double")) + df2 <- dapply(df1, function(x) { x[, 1:2, drop = F] }, schema) + result <- collect(df2) + expected <- (mtcars + 1)[, 1:2, drop = F] + names(expected) <- c("a", "b") + rownames(expected) <- NULL + expect_identical(expected, result) +}) + unlink(parquetPath) unlink(jsonPath) unlink(jsonPathNa) diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index b6784dbae3203..f72f7194a304d 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -84,6 +84,10 @@ broadcastElap <- elapsedSecs() # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) +# If true, working for RDD +# If false, working for DataFrame +isDataFrame <- as.logical(SparkR:::readInt(inputCon)) + isEmpty <- SparkR:::readInt(inputCon) if (isEmpty != 0) { @@ -100,7 +104,20 @@ if (isEmpty != 0) { # Timing reading input data for execution inputElap <- elapsedSecs() - output <- computeFunc(partition, data) + if (isDataFrame) { + if (deserializer == "row") { + # Transform the list of rows into a data.frame + data <- do.call(rbind.data.frame, c(data, stringsAsFactors = FALSE)) + } + output <- computeFunc(data) + if (serializer == "row") { + # Transform the result data.frame back to a list of rows + output <- split(output, seq(nrow(output))) + } + } else { + output <- computeFunc(partition, data) + } + # Timing computing computeElap <- elapsedSecs() diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala index 606ba6ef867a1..59c8429c80172 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala @@ -46,7 +46,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag]( // The parent may be also an RRDD, so we should launch it first. val parentIterator = firstParent[T].iterator(partition, context) - runner.compute(parentIterator, partition.index, context) + runner.compute(parentIterator, partition.index) } } diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala index 07d1fa2c4a9a5..9381116815a0e 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala @@ -38,7 +38,8 @@ private[spark] class RRunner[U]( serializer: String, packageNames: Array[Byte], broadcastVars: Array[Broadcast[Object]], - numPartitions: Int = -1) + numPartitions: Int = -1, + isDataFrame: Boolean = false) extends Logging { private var bootTime: Double = _ private var dataStream: DataInputStream = _ @@ -53,8 +54,7 @@ private[spark] class RRunner[U]( def compute( inputIterator: Iterator[_], - partitionIndex: Int, - context: TaskContext): Iterator[U] = { + partitionIndex: Int): Iterator[U] = { // Timing start bootTime = System.currentTimeMillis / 1000.0 @@ -148,6 +148,8 @@ private[spark] class RRunner[U]( dataOut.writeInt(numPartitions) + dataOut.writeInt(if (isDataFrame) 1 else 0) + if (!iter.hasNext) { dataOut.writeInt(0) } else { diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index 8e4e80a24acee..e4932a4192d39 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -459,7 +459,7 @@ private[spark] object SerDe { } -private[r] object SerializationFormats { +private[spark] object SerializationFormats { val BYTE = "byte" val STRING = "string" val ROW = "row" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 4a1bdb0b8ac2e..a6f57a44ad777 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.Encoder +import org.apache.spark.SparkException +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.{Encoder, Row} import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types._ object CatalystSerde { def deserialize[T : Encoder](child: LogicalPlan): DeserializeToObject = { @@ -29,13 +31,26 @@ object CatalystSerde { DeserializeToObject(deserializer, generateObjAttr[T], child) } + def deserialize(child: LogicalPlan, encoder: ExpressionEncoder[Row]): DeserializeToObject = { + val deserializer = UnresolvedDeserializer(encoder.deserializer) + DeserializeToObject(deserializer, generateObjAttrForRow(encoder), child) + } + def serialize[T : Encoder](child: LogicalPlan): SerializeFromObject = { SerializeFromObject(encoderFor[T].namedExpressions, child) } + def serialize(child: LogicalPlan, encoder: ExpressionEncoder[Row]): SerializeFromObject = { + SerializeFromObject(encoder.namedExpressions, child) + } + def generateObjAttr[T : Encoder]: Attribute = { AttributeReference("obj", encoderFor[T].deserializer.dataType, nullable = false)() } + + def generateObjAttrForRow(encoder: ExpressionEncoder[Row]): Attribute = { + AttributeReference("obj", encoder.deserializer.dataType, nullable = false)() + } } /** @@ -106,6 +121,42 @@ case class MapPartitions( outputObjAttr: Attribute, child: LogicalPlan) extends UnaryNode with ObjectConsumer with ObjectProducer +object MapPartitionsInR { + def apply( + func: Array[Byte], + packageNames: Array[Byte], + broadcastVars: Array[Broadcast[Object]], + schema: StructType, + encoder: ExpressionEncoder[Row], + child: LogicalPlan): LogicalPlan = { + // If the content of current DataFrame is serialized R data? + val isSerializedRData = + if (encoder.schema == StructType(Seq(StructField("R", BinaryType)))) true else false + + val deserialized = CatalystSerde.deserialize(child, encoder) + val mapped = MapPartitionsInR( + func, + packageNames, + broadcastVars, + schema, + isSerializedRData, + CatalystSerde.generateObjAttrForRow(RowEncoder(schema)), + deserialized) + CatalystSerde.serialize(mapped, RowEncoder(schema)) + } +} + +case class MapPartitionsInR( + func: Array[Byte], + packageNames: Array[Byte], + broadcastVars: Array[Broadcast[Object]], + outputSchema: StructType, + isSerializedRData: Boolean, + outputObjAttr: Attribute, + child: LogicalPlan) extends UnaryNode with ObjectConsumer with ObjectProducer { + override lazy val schema = outputSchema +} + object MapElements { def apply[T : Encoder, U : Encoder]( func: AnyRef, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index bcb3fdb8a3d9a..6b2eed29c61a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -31,6 +31,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.function._ import org.apache.spark.api.python.PythonRDD +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ @@ -1980,6 +1981,23 @@ class Dataset[T] private[sql]( mapPartitions(func)(encoder) } + /** + * Returns a new [[DataFrame]] that contains the result of applying a serialized R function + * `func` to each partition. + * + * @group func + */ + private[sql] def mapPartitionsInR( + func: Array[Byte], + packageNames: Array[Byte], + broadcastVars: Array[Broadcast[Object]], + schema: StructType): DataFrame = { + val rowEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]] + Dataset.ofRows( + sqlContext, + MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, logicalPlan)) + } + /** * :: Experimental :: * (Scala-specific) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 22ded7a4bf5b5..94142ef11288c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -23,12 +23,15 @@ import scala.util.matching.Regex import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.api.r.SerDe +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext} +import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.Encoder import org.apache.spark.sql.types._ -private[r] object SQLUtils { +private[sql] object SQLUtils { SerDe.registerSqlSerDe((readSqlObject, writeSqlObject)) def createSQLContext(jsc: JavaSparkContext): SQLContext = { @@ -111,7 +114,7 @@ private[r] object SQLUtils { } } - private[this] def bytesToRow(bytes: Array[Byte], schema: StructType): Row = { + private[sql] def bytesToRow(bytes: Array[Byte], schema: StructType): Row = { val bis = new ByteArrayInputStream(bytes) val dis = new DataInputStream(bis) val num = SerDe.readInt(dis) @@ -120,7 +123,7 @@ private[r] object SQLUtils { }.toSeq) } - private[this] def rowToRBytes(row: Row): Array[Byte] = { + private[sql] def rowToRBytes(row: Row): Array[Byte] = { val bos = new ByteArrayOutputStream() val dos = new DataOutputStream(bos) @@ -129,6 +132,29 @@ private[r] object SQLUtils { bos.toByteArray() } + // Schema for DataFrame of serialized R data + // TODO: introduce a user defined type for serialized R data. + val SERIALIZED_R_DATA_SCHEMA = StructType(StructField("R", BinaryType) :: Nil) + + /** + * The helper function for dapply() on R side. + */ + def dapply( + df: DataFrame, + func: Array[Byte], + packageNames: Array[Byte], + broadcastVars: Array[Object], + schema: StructType): DataFrame = { + val bv = broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]]) + val realSchema = + if (schema == null) { + SERIALIZED_R_DATA_SCHEMA + } else { + schema + } + df.mapPartitionsInR(func, packageNames, bv, realSchema) + } + def dfToCols(df: DataFrame): Array[Array[Any]] = { val localDF: Array[Row] = df.collect() val numCols = df.columns.length diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 3955c5dc9260b..3e08dc6c783fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -304,6 +304,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.SerializeFromObjectExec(serializer, planLater(child)) :: Nil case logical.MapPartitions(f, objAttr, child) => execution.MapPartitionsExec(f, objAttr, planLater(child)) :: Nil + case logical.MapPartitionsInR(f, p, b, s, isr, objAttr, child) => + execution.MapPartitionsExec( + execution.r.MapPartitionsRWrapper(f, p, b, s, isr), objAttr, planLater(child)) :: Nil case logical.MapElements(f, objAttr, child) => execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil case logical.AppendColumns(f, in, out, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala new file mode 100644 index 0000000000000..a22083fac1f1f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.r + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.api.r.RRunner +import org.apache.spark.api.r.SerializationFormats +import org.apache.spark.sql.api.r.SQLUtils._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.Row + +/** + * Physical plan node that applies the given R function to each partition. + */ +private[sql] case class MapPartitionsRWrapper( + func: Array[Byte], + packageNames: Array[Byte], + broadcastVars: Array[Broadcast[Object]], + schema: StructType, + isSerializedRData: Boolean) extends (Iterator[Any] => Iterator[Any]) { + def apply(iter: Iterator[Any]): Iterator[Any] = { + val (newIter, deserializer) = + if (!isSerializedRData) { + // Serialize each row into an byte array that can be deserialized in the R worker + (iter.asInstanceOf[Iterator[Row]].map { row => rowToRBytes(row)}, SerializationFormats.ROW) + } else { + (iter.asInstanceOf[Iterator[Row]].map { row => row(0) }, SerializationFormats.BYTE) + } + + val serializer = if (schema != SERIALIZED_R_DATA_SCHEMA) { + SerializationFormats.ROW + } else { + SerializationFormats.BYTE + } + + val runner = new RRunner[Array[Byte]]( + func, deserializer, serializer, packageNames, broadcastVars, isDataFrame = true) + // Partition index is ignored. Dataset has no support for mapPartitionsWithIndex. + val outputIter = runner.compute(newIter, -1) + + if (serializer == SerializationFormats.ROW) { + outputIter.map { bytes => bytesToRow(bytes, schema) } + } else{ + outputIter.map { bytes => Row.fromSeq(bytes :: Nil) } + } + } +} From ccc610cbc64e4813270ed7c8e11c689bdec79177 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 19 Apr 2016 15:37:00 +0800 Subject: [PATCH 02/17] Fix a comment. --- .../apache/spark/sql/execution/r/MapPartitionsRWrapper.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala index a22083fac1f1f..9715f06cb68de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.Row /** - * Physical plan node that applies the given R function to each partition. + * A function wrapper that applies the given R function to each partition. */ private[sql] case class MapPartitionsRWrapper( func: Array[Byte], From 79fac3b474d8f83b10c08dfed74ec5f2d8e7ed8b Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 19 Apr 2016 15:50:36 +0800 Subject: [PATCH 03/17] Fix coding style. --- .../src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- .../apache/spark/sql/execution/r/MapPartitionsRWrapper.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 94142ef11288c..ba65409f1b760 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -145,7 +145,7 @@ private[sql] object SQLUtils { packageNames: Array[Byte], broadcastVars: Array[Object], schema: StructType): DataFrame = { - val bv = broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]]) + val bv = broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]]) val realSchema = if (schema == null) { SERIALIZED_R_DATA_SCHEMA diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala index 9715f06cb68de..82f3b17783900 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.execution.r -import org.apache.spark.broadcast.Broadcast import org.apache.spark.api.r.RRunner import org.apache.spark.api.r.SerializationFormats +import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.api.r.SQLUtils._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.Row @@ -55,7 +55,7 @@ private[sql] case class MapPartitionsRWrapper( if (serializer == SerializationFormats.ROW) { outputIter.map { bytes => bytesToRow(bytes, schema) } - } else{ + } else { outputIter.map { bytes => Row.fromSeq(bytes :: Nil) } } } From 908bc28eb485afde09f677729f1524c87a6c74bf Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 20 Apr 2016 13:00:01 +0800 Subject: [PATCH 04/17] Address comments. --- R/pkg/R/DataFrame.R | 12 +++++++++++- R/pkg/inst/worker/worker.R | 3 +++ .../scala/org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- .../sql/execution/r/MapPartitionsRWrapper.scala | 2 +- 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 14def637085e7..7df469b5d7caa 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1137,11 +1137,21 @@ setMethod("summarize", #' @rdname dapply #' @name dapply #' @export +#' \dontrun{ +#' df <- createDataFrame (sqlContext, mtcars) +#' df1 <- dapply(df, function(x) { x }, schema(df)) +#' collect(df1) +#' +#' df1 <- dapply(df, function(x) { x }) +#' df2 <- dapply(df1, function(x) { x + 1 }, schema(df)) +#' collect(df2) +#' } setMethod("dapply", signature(x = "DataFrame", func = "function"), function(x, func, schema = NULL) { if (!is.null(schema)) { stopifnot(class(schema) == "structType") + schema <- schema$jobj } packageNamesArr <- serialize(.sparkREnv[[".packages"]], @@ -1157,7 +1167,7 @@ setMethod("dapply", serialize(cleanClosure(func), connection = NULL), packageNamesArr, broadcastArr, - schema$jobj) + schema) dataFrame(sdf) }) diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index f72f7194a304d..15863ca185742 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -108,6 +108,9 @@ if (isEmpty != 0) { if (deserializer == "row") { # Transform the list of rows into a data.frame data <- do.call(rbind.data.frame, c(data, stringsAsFactors = FALSE)) + } else { + # Check to see if data is a valid data.frame + stopifnot(class(data) == "data.frame") } output <- computeFunc(data) if (serializer == "row") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index ba65409f1b760..36173a49250b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -134,7 +134,7 @@ private[sql] object SQLUtils { // Schema for DataFrame of serialized R data // TODO: introduce a user defined type for serialized R data. - val SERIALIZED_R_DATA_SCHEMA = StructType(StructField("R", BinaryType) :: Nil) + val SERIALIZED_R_DATA_SCHEMA = StructType(Seq(StructField("R", BinaryType))) /** * The helper function for dapply() on R side. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala index 82f3b17783900..f4530b5704925 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala @@ -56,7 +56,7 @@ private[sql] case class MapPartitionsRWrapper( if (serializer == SerializationFormats.ROW) { outputIter.map { bytes => bytesToRow(bytes, schema) } } else { - outputIter.map { bytes => Row.fromSeq(bytes :: Nil) } + outputIter.map { bytes => Row.fromSeq(Seq(bytes)) } } } } From 1ecef081d12611213280bca462958f342b301f58 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 20 Apr 2016 13:36:31 +0800 Subject: [PATCH 05/17] Fix roxygen doc error in dapply(). --- R/pkg/R/DataFrame.R | 1 + 1 file changed, 1 insertion(+) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 7df469b5d7caa..6b335b714dd40 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1137,6 +1137,7 @@ setMethod("summarize", #' @rdname dapply #' @name dapply #' @export +#' @examples #' \dontrun{ #' df <- createDataFrame (sqlContext, mtcars) #' df1 <- dapply(df, function(x) { x }, schema(df)) From ed296789b9eac5f45cb8bf81ea6720399a18a6e1 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Fri, 22 Apr 2016 14:25:59 +0800 Subject: [PATCH 06/17] Address comments. --- R/pkg/R/DataFrame.R | 42 ++++++++++------ R/pkg/R/generics.R | 2 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 49 +++++++++++-------- .../sql/catalyst/plans/logical/object.scala | 1 - .../execution/r/MapPartitionsRWrapper.scala | 10 ++-- 5 files changed, 62 insertions(+), 42 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 6b335b714dd40..34e6e1aa231ad 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1131,30 +1131,44 @@ setMethod("summarize", #' #' @param x A DataFrame #' @param func A function to be applied to each partition of the DataFrame. -#' @param schema Optional. The schema of the resulting DataFrame after the function is applied. -#' If NULL, the content of the resulting DataFrame is serialized R data. +#' func should have only one parameter, to which a data.frame corresponds +#' to each partition will be passed. +#' The output of func should be a data.frame. +#' @param schema The schema of the resulting DataFrame after the function is applied. +#' It must match the output of func. #' @family DataFrame functions #' @rdname dapply #' @name dapply #' @export #' @examples #' \dontrun{ -#' df <- createDataFrame (sqlContext, mtcars) +#' df <- createDataFrame (sqlContext, iris) #' df1 <- dapply(df, function(x) { x }, schema(df)) #' collect(df1) #' -#' df1 <- dapply(df, function(x) { x }) -#' df2 <- dapply(df1, function(x) { x + 1 }, schema(df)) -#' collect(df2) +#' # filter and add a column +#' df <- createDataFrame ( +#' sqlContext, +#' list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")), +#' c("a", "b", "c")) +#' schema <- structType(structField("a", "integer"), structField("b", "double"), +#' structField("c", "string"), structField("d", "integer")) +#' df1 <- dapply( +#' df, +#' function(x) { +#' y <- x[x[1] > 1, ] +#' y <- cbind(y, y[1] + 1L) +#' }, +#' schema) +#' collect(df1) +#' # the result +#' # a b c d +#' # 1 2 2 2 3 +#' # 2 3 3 3 4 #' } setMethod("dapply", - signature(x = "DataFrame", func = "function"), - function(x, func, schema = NULL) { - if (!is.null(schema)) { - stopifnot(class(schema) == "structType") - schema <- schema$jobj - } - + signature(x = "DataFrame", func = "function", schema = "structType"), + function(x, func, schema) { packageNamesArr <- serialize(.sparkREnv[[".packages"]], connection = NULL) @@ -1168,7 +1182,7 @@ setMethod("dapply", serialize(cleanClosure(func), connection = NULL), packageNamesArr, broadcastArr, - schema) + schema$jobj) dataFrame(sdf) }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index b72927f7a0b91..3db8925730af2 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -448,7 +448,7 @@ setGeneric("covar_pop", function(col1, col2) {standardGeneric("covar_pop") }) #' @rdname dapply #' @export -setGeneric("dapply", function(x, func, schema = NULL) { standardGeneric("dapply") }) +setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") }) #' @rdname summary #' @export diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 80591be2d63a0..abe05bcd986ba 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2019,34 +2019,41 @@ test_that("Histogram", { }) test_that("dapply() on a DataFrame", { - df <- createDataFrame (sqlContext, mtcars) + df <- createDataFrame ( + sqlContext, + list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")), + c("a", "b", "c")) + ldf <- collect(df) df1 <- dapply(df, function(x) { x }, schema(df)) result <- collect(df1) - expected <- mtcars - rownames(expected) <- NULL - expect_identical(expected, result) - - df1 <- dapply(df, function(x) { x + 1 }, schema(df)) + expect_identical(ldf, result) + + + # Filter and add a column + schema <- structType(structField("a", "integer"), structField("b", "double"), + structField("c", "string"), structField("d", "integer")) + df1 <- dapply( + df, + function(x) { + y <- x[x[1] > 1, ] + y <- cbind(y, y[1] + 1L) + }, + schema) result <- collect(df1) - expected <- mtcars + 1 + expected <- ldf[ldf$a > 1, ] + expected$d <- expected$a + 1L rownames(expected) <- NULL expect_identical(expected, result) - schema <- structType(structField("a", "double"), structField("b", "double")) - df1 <- dapply(df, function(x) { x[, 1:2, drop = F] }, schema) - result <- collect(df1) - expected <- mtcars[, 1:2, drop = F] - names(expected) <- c("a", "b") - rownames(expected) <- NULL - expect_identical(expected, result) - - df1 <- dapply(df, function(x) { x + 1 }) - schema <- structType(structField("a", "double"), structField("b", "double")) - df2 <- dapply(df1, function(x) { x[, 1:2, drop = F] }, schema) + # Remove the added column + df2 <- dapply( + df1, + function(x) { + x[, c(1, 2, 3)] + }, + schema(df)) result <- collect(df2) - expected <- (mtcars + 1)[, 1:2, drop = F] - names(expected) <- c("a", "b") - rownames(expected) <- NULL + expected <- expected[, c(1, 2, 3)] expect_identical(expected, result) }) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index a6f57a44ad777..d32bceda9c6d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.SparkException import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{Encoder, Row} import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala index f4530b5704925..4d0d6e5548212 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala @@ -28,11 +28,11 @@ import org.apache.spark.sql.Row * A function wrapper that applies the given R function to each partition. */ private[sql] case class MapPartitionsRWrapper( - func: Array[Byte], - packageNames: Array[Byte], - broadcastVars: Array[Broadcast[Object]], - schema: StructType, - isSerializedRData: Boolean) extends (Iterator[Any] => Iterator[Any]) { + func: Array[Byte], + packageNames: Array[Byte], + broadcastVars: Array[Broadcast[Object]], + schema: StructType, + isSerializedRData: Boolean) extends (Iterator[Any] => Iterator[Any]) { def apply(iter: Iterator[Any]): Iterator[Any] = { val (newIter, deserializer) = if (!isSerializedRData) { From a3326d7e400734c4099f9eb124e0892fd68d4fa9 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Fri, 22 Apr 2016 15:33:28 +0800 Subject: [PATCH 07/17] Allow passing column names to R worker. --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 8 +++---- R/pkg/inst/worker/worker.R | 6 +++++ .../org/apache/spark/api/r/RRunner.scala | 7 +++++- .../sql/catalyst/plans/logical/object.scala | 12 +++++----- .../spark/sql/execution/SparkStrategies.scala | 4 ++-- .../execution/r/MapPartitionsRWrapper.scala | 24 ++++++++++++------- 6 files changed, 39 insertions(+), 22 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index abe05bcd986ba..ef88c3f3d5c06 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2035,8 +2035,8 @@ test_that("dapply() on a DataFrame", { df1 <- dapply( df, function(x) { - y <- x[x[1] > 1, ] - y <- cbind(y, y[1] + 1L) + y <- x[x$a > 1, ] + y <- cbind(y, y$a + 1L) }, schema) result <- collect(df1) @@ -2049,11 +2049,11 @@ test_that("dapply() on a DataFrame", { df2 <- dapply( df1, function(x) { - x[, c(1, 2, 3)] + x[, c("a", "b", "c")] }, schema(df)) result <- collect(df2) - expected <- expected[, c(1, 2, 3)] + expected <- expected[, c("a", "b", "c")] expect_identical(expected, result) }) diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index 15863ca185742..9db91ad89e68e 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -88,6 +88,11 @@ numPartitions <- SparkR:::readInt(inputCon) # If false, working for DataFrame isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# If isDataFrame, then read column names +if (isDataFrame) { + colNames <- SparkR:::readObject(inputCon) +} + isEmpty <- SparkR:::readInt(inputCon) if (isEmpty != 0) { @@ -108,6 +113,7 @@ if (isEmpty != 0) { if (deserializer == "row") { # Transform the list of rows into a data.frame data <- do.call(rbind.data.frame, c(data, stringsAsFactors = FALSE)) + names(data) <- colNames } else { # Check to see if data is a valid data.frame stopifnot(class(data) == "data.frame") diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala index 9381116815a0e..24ad689f8321c 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala @@ -39,7 +39,8 @@ private[spark] class RRunner[U]( packageNames: Array[Byte], broadcastVars: Array[Broadcast[Object]], numPartitions: Int = -1, - isDataFrame: Boolean = false) + isDataFrame: Boolean = false, + colNames: Array[String] = null) extends Logging { private var bootTime: Double = _ private var dataStream: DataInputStream = _ @@ -150,6 +151,10 @@ private[spark] class RRunner[U]( dataOut.writeInt(if (isDataFrame) 1 else 0) + if (isDataFrame) { + SerDe.writeObject(dataOut, colNames) + } + if (!iter.hasNext) { dataOut.writeInt(0) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index d32bceda9c6d9..84339f439a666 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -128,29 +128,29 @@ object MapPartitionsInR { schema: StructType, encoder: ExpressionEncoder[Row], child: LogicalPlan): LogicalPlan = { - // If the content of current DataFrame is serialized R data? - val isSerializedRData = - if (encoder.schema == StructType(Seq(StructField("R", BinaryType)))) true else false - val deserialized = CatalystSerde.deserialize(child, encoder) val mapped = MapPartitionsInR( func, packageNames, broadcastVars, + encoder.schema, schema, - isSerializedRData, CatalystSerde.generateObjAttrForRow(RowEncoder(schema)), deserialized) CatalystSerde.serialize(mapped, RowEncoder(schema)) } } +/** + * A relation produced by applying a serialized R function `func` to each partition of the `child`. + * + */ case class MapPartitionsInR( func: Array[Byte], packageNames: Array[Byte], broadcastVars: Array[Broadcast[Object]], + inputSchema: StructType, outputSchema: StructType, - isSerializedRData: Boolean, outputObjAttr: Attribute, child: LogicalPlan) extends UnaryNode with ObjectConsumer with ObjectProducer { override lazy val schema = outputSchema diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 3e08dc6c783fa..f2d8a7a70d083 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -304,9 +304,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.SerializeFromObjectExec(serializer, planLater(child)) :: Nil case logical.MapPartitions(f, objAttr, child) => execution.MapPartitionsExec(f, objAttr, planLater(child)) :: Nil - case logical.MapPartitionsInR(f, p, b, s, isr, objAttr, child) => + case logical.MapPartitionsInR(f, p, b, is, os, objAttr, child) => execution.MapPartitionsExec( - execution.r.MapPartitionsRWrapper(f, p, b, s, isr), objAttr, planLater(child)) :: Nil + execution.r.MapPartitionsRWrapper(f, p, b, is, os), objAttr, planLater(child)) :: Nil case logical.MapElements(f, objAttr, child) => execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil case logical.AppendColumns(f, in, out, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala index 4d0d6e5548212..ee660f9e6c52f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala @@ -21,7 +21,7 @@ import org.apache.spark.api.r.RRunner import org.apache.spark.api.r.SerializationFormats import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.api.r.SQLUtils._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructType, BinaryType, StructField} import org.apache.spark.sql.Row /** @@ -31,30 +31,36 @@ private[sql] case class MapPartitionsRWrapper( func: Array[Byte], packageNames: Array[Byte], broadcastVars: Array[Broadcast[Object]], - schema: StructType, - isSerializedRData: Boolean) extends (Iterator[Any] => Iterator[Any]) { + inputSchema: StructType, + outputSchema: StructType) extends (Iterator[Any] => Iterator[Any]) { def apply(iter: Iterator[Any]): Iterator[Any] = { - val (newIter, deserializer) = + // If the content of current DataFrame is serialized R data? + val isSerializedRData = + if (inputSchema == SERIALIZED_R_DATA_SCHEMA) true else false + + val (newIter, deserializer, colNames) = if (!isSerializedRData) { // Serialize each row into an byte array that can be deserialized in the R worker - (iter.asInstanceOf[Iterator[Row]].map { row => rowToRBytes(row)}, SerializationFormats.ROW) + (iter.asInstanceOf[Iterator[Row]].map {row => rowToRBytes(row)}, + SerializationFormats.ROW, inputSchema.fieldNames) } else { - (iter.asInstanceOf[Iterator[Row]].map { row => row(0) }, SerializationFormats.BYTE) + (iter.asInstanceOf[Iterator[Row]].map { row => row(0) }, SerializationFormats.BYTE, null) } - val serializer = if (schema != SERIALIZED_R_DATA_SCHEMA) { + val serializer = if (outputSchema != SERIALIZED_R_DATA_SCHEMA) { SerializationFormats.ROW } else { SerializationFormats.BYTE } val runner = new RRunner[Array[Byte]]( - func, deserializer, serializer, packageNames, broadcastVars, isDataFrame = true) + func, deserializer, serializer, packageNames, broadcastVars, + isDataFrame = true, colNames = colNames) // Partition index is ignored. Dataset has no support for mapPartitionsWithIndex. val outputIter = runner.compute(newIter, -1) if (serializer == SerializationFormats.ROW) { - outputIter.map { bytes => bytesToRow(bytes, schema) } + outputIter.map { bytes => bytesToRow(bytes, outputSchema) } } else { outputIter.map { bytes => Row.fromSeq(Seq(bytes)) } } From 04d44e6a3d3477420414ad2fab126575e33d503f Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Fri, 22 Apr 2016 16:06:55 +0800 Subject: [PATCH 08/17] Fix import ordering. --- .../apache/spark/sql/execution/r/MapPartitionsRWrapper.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala index ee660f9e6c52f..7bfbd4c4217ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala @@ -21,8 +21,8 @@ import org.apache.spark.api.r.RRunner import org.apache.spark.api.r.SerializationFormats import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.api.r.SQLUtils._ -import org.apache.spark.sql.types.{StructType, BinaryType, StructField} import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{StructType, BinaryType, StructField} /** * A function wrapper that applies the given R function to each partition. From 605814e3d873e7c51b2fb187c9c71c29faa12dbc Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Fri, 22 Apr 2016 16:25:53 +0800 Subject: [PATCH 09/17] Fix import ordering. --- .../apache/spark/sql/execution/r/MapPartitionsRWrapper.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala index 7bfbd4c4217ff..dc6f2ef371584 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala @@ -22,7 +22,7 @@ import org.apache.spark.api.r.SerializationFormats import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.api.r.SQLUtils._ import org.apache.spark.sql.Row -import org.apache.spark.sql.types.{StructType, BinaryType, StructField} +import org.apache.spark.sql.types.{BinaryType, StructField, StructType} /** * A function wrapper that applies the given R function to each partition. From fefa98e8fdd63db9bef5f938670889ad34976d56 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 25 Apr 2016 10:56:08 +0800 Subject: [PATCH 10/17] Minor fix. --- R/pkg/R/DataFrame.R | 1 + R/pkg/inst/worker/worker.R | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 34e6e1aa231ad..32dda0d3071cd 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -21,6 +21,7 @@ NULL setOldClass("jobj") +setOldClass("structType") #' @title S4 class that represents a SparkDataFrame #' @description DataFrames can be created using functions like \link{createDataFrame}, diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index 9db91ad89e68e..01773a1eb597f 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -116,12 +116,16 @@ if (isEmpty != 0) { names(data) <- colNames } else { # Check to see if data is a valid data.frame + stopifnot(deserializer == "byte") stopifnot(class(data) == "data.frame") } output <- computeFunc(data) if (serializer == "row") { # Transform the result data.frame back to a list of rows output <- split(output, seq(nrow(output))) + } else { + # Serialize the ouput to a byte array + stopifnot(serializer == "byte") } } else { output <- computeFunc(partition, data) From f9efa7ffdcbecd34b6b237888b4d46aa81e2bc0f Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 25 Apr 2016 15:45:59 +0800 Subject: [PATCH 11/17] dapply() changes to be consistent with renaming DataFrame to SparkDataFrame. --- R/pkg/R/DataFrame.R | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 32dda0d3071cd..01f53d038dfa7 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1130,14 +1130,14 @@ setMethod("summarize", #' #' Apply a function to each partition of a DataFrame. #' -#' @param x A DataFrame -#' @param func A function to be applied to each partition of the DataFrame. +#' @param x A SparkDataFrame +#' @param func A function to be applied to each partition of the SparkDataFrame. #' func should have only one parameter, to which a data.frame corresponds #' to each partition will be passed. #' The output of func should be a data.frame. #' @param schema The schema of the resulting DataFrame after the function is applied. #' It must match the output of func. -#' @family DataFrame functions +#' @family SparkDataFrame functions #' @rdname dapply #' @name dapply #' @export @@ -1168,7 +1168,7 @@ setMethod("summarize", #' # 2 3 3 3 4 #' } setMethod("dapply", - signature(x = "DataFrame", func = "function", schema = "structType"), + signature(x = "SparkDataFrame", func = "function", schema = "structType"), function(x, func, schema) { packageNamesArr <- serialize(.sparkREnv[[".packages"]], connection = NULL) From af16c466f52b3ffe68359e58f957f6c55f3fedc4 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 25 Apr 2016 16:02:51 +0800 Subject: [PATCH 12/17] A workaround for codegen compiling error for successive calls to dapply(). --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b26ceba228963..79acd72dbb7e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -157,7 +157,9 @@ object SamplePushDown extends Rule[LogicalPlan] { object EliminateSerialization extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case d @ DeserializeToObject(_, _, s: SerializeFromObject) - if d.outputObjectType == s.inputObjectType => + if d.outputObjectType == s.inputObjectType && + // A workaround for SPARK-14803. Remove this after it is fixed. + !d.outputObjectType.isInstanceOf[ObjectType] => // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`. val objAttr = Alias(s.child.output.head, "obj")(exprId = d.output.head.exprId) Project(objAttr :: Nil, s.child) From 64395eba8371bf564031d247ec2a55d1aa1161e7 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 25 Apr 2016 16:16:58 +0800 Subject: [PATCH 13/17] Update docs to add an example for dapply(). --- docs/sql-programming-guide.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 9a3db9c3f9b73..a16a6bb1d93ef 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1147,6 +1147,11 @@ parquetFile <- read.parquet(sqlContext, "people.parquet") # Parquet files can also be registered as tables and then used in SQL statements. registerTempTable(parquetFile, "parquetFile") teenagers <- sql(sqlContext, "SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +schema <- structType(structField("name", "string")) +teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema) +for (teenName in collect(teenNames)$name) { + cat(teenName, "\n") +} {% endhighlight %} From 21c856c71f208a3e933a217ef1c425b3e882fdac Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 26 Apr 2016 15:44:31 +0800 Subject: [PATCH 14/17] Improve workaround for SPARK-14803 to avoid test failures. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 79acd72dbb7e5..4bc29fbecf7c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -157,13 +157,16 @@ object SamplePushDown extends Rule[LogicalPlan] { object EliminateSerialization extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case d @ DeserializeToObject(_, _, s: SerializeFromObject) - if d.outputObjectType == s.inputObjectType && + if d.outputObjectType == s.inputObjectType => // A workaround for SPARK-14803. Remove this after it is fixed. - !d.outputObjectType.isInstanceOf[ObjectType] => - // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`. - val objAttr = Alias(s.child.output.head, "obj")(exprId = d.output.head.exprId) - Project(objAttr :: Nil, s.child) - + if (d.outputObjectType.isInstanceOf[ObjectType] && + d.outputObjectType.asInstanceOf[ObjectType].cls == classOf[org.apache.spark.sql.Row]) { + s.child + } else { + // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`. + val objAttr = Alias(s.child.output.head, "obj")(exprId = d.output.head.exprId) + Project(objAttr :: Nil, s.child) + } case a @ AppendColumns(_, _, _, s: SerializeFromObject) if a.deserializer.dataType == s.inputObjectType => AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child) From b39466cf3ed6fcf856d73d368b4bb67f9aabd1e7 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 27 Apr 2016 14:41:22 +0800 Subject: [PATCH 15/17] Fix build break. --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6b2eed29c61a9..9c3f198b6b85b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1994,7 +1994,7 @@ class Dataset[T] private[sql]( schema: StructType): DataFrame = { val rowEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]] Dataset.ofRows( - sqlContext, + sparkSession, MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, logicalPlan)) } From 2264b57a2d5f375eae6520b492a2152be259ccaa Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Thu, 28 Apr 2016 09:59:58 +0800 Subject: [PATCH 16/17] 'stringsAsFactors'argument for rbind is not supported before R 3.2.4. --- R/pkg/inst/worker/worker.R | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index 01773a1eb597f..31d10d22f51db 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -112,7 +112,13 @@ if (isEmpty != 0) { if (isDataFrame) { if (deserializer == "row") { # Transform the list of rows into a data.frame - data <- do.call(rbind.data.frame, c(data, stringsAsFactors = FALSE)) + # Note that the optional argument stringsAsFactors for rbind is + # available since R 3.2.4. So we set the global option here. + oldOpt <- getOption("stringsAsFactors") + options(stringsAsFactors = FALSE) + data <- do.call(rbind.data.frame, data) + options(stringsAsFactors = oldOpt) + names(data) <- colNames } else { # Check to see if data is a valid data.frame From 3efe9f5f067bf66d35c1c8243d00f2f1fdb4e6f9 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Fri, 29 Apr 2016 09:53:32 +0800 Subject: [PATCH 17/17] Address comments. --- R/pkg/inst/worker/worker.R | 2 -- .../sql/catalyst/optimizer/Optimizer.scala | 18 +++++++++--------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index 31d10d22f51db..40cda0c5ef9c1 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -84,8 +84,6 @@ broadcastElap <- elapsedSecs() # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -# If true, working for RDD -# If false, working for DataFrame isDataFrame <- as.logical(SparkR:::readInt(inputCon)) # If isDataFrame, then read column names diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 4bc29fbecf7c6..07ff05bc2f798 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -158,15 +158,15 @@ object EliminateSerialization extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case d @ DeserializeToObject(_, _, s: SerializeFromObject) if d.outputObjectType == s.inputObjectType => - // A workaround for SPARK-14803. Remove this after it is fixed. - if (d.outputObjectType.isInstanceOf[ObjectType] && - d.outputObjectType.asInstanceOf[ObjectType].cls == classOf[org.apache.spark.sql.Row]) { - s.child - } else { - // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`. - val objAttr = Alias(s.child.output.head, "obj")(exprId = d.output.head.exprId) - Project(objAttr :: Nil, s.child) - } + // A workaround for SPARK-14803. Remove this after it is fixed. + if (d.outputObjectType.isInstanceOf[ObjectType] && + d.outputObjectType.asInstanceOf[ObjectType].cls == classOf[org.apache.spark.sql.Row]) { + s.child + } else { + // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`. + val objAttr = Alias(s.child.output.head, "obj")(exprId = d.output.head.exprId) + Project(objAttr :: Nil, s.child) + } case a @ AppendColumns(_, _, _, s: SerializeFromObject) if a.deserializer.dataType == s.inputObjectType => AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child)