From 717dc8a5a6071decd82bbbbff1fb04d0730a1b7c Mon Sep 17 00:00:00 2001 From: felixcheung Date: Mon, 4 Jan 2016 17:32:38 -0800 Subject: [PATCH 1/4] migrate from deprecated API --- R/pkg/R/DataFrame.R | 33 +++++++++++------------ R/pkg/R/SQLContext.R | 4 ++- R/pkg/R/column.R | 2 +- R/pkg/R/utils.R | 9 +++++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 25 ++++++++--------- 5 files changed, 42 insertions(+), 31 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 0cfa12b997d69..c126f9efb475a 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -458,7 +458,10 @@ setMethod("registerTempTable", setMethod("insertInto", signature(x = "DataFrame", tableName = "character"), function(x, tableName, overwrite = FALSE) { - callJMethod(x@sdf, "insertInto", tableName, overwrite) + jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append")) + write <- callJMethod(x@sdf, "write") + write <- callJMethod(write, "mode", jmode) + callJMethod(write, "insertInto", tableName) }) #' Cache @@ -1948,18 +1951,15 @@ setMethod("write.df", source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", "org.apache.spark.sql.parquet") } - allModes <- c("append", "overwrite", "error", "ignore") - # nolint start - if (!(mode %in% allModes)) { - stop('mode should be one of "append", "overwrite", "error", "ignore"') - } - # nolint end - jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode) + jmode <- convertToJSaveMode(mode) options <- varargsToEnv(...) if (!is.null(path)) { options[["path"]] <- path } - callJMethod(df@sdf, "save", source, jmode, options) + write <- callJMethod(df@sdf, "write") + write <- callJMethod(write, "format", source) + write <- callJMethod(write, "mode", jmode) + write <- callJMethod(write, "save", path) }) #' @rdname write.df @@ -2013,15 +2013,14 @@ setMethod("saveAsTable", source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", "org.apache.spark.sql.parquet") } - allModes <- c("append", "overwrite", "error", "ignore") - # nolint start - if (!(mode %in% allModes)) { - stop('mode should be one of "append", "overwrite", "error", "ignore"') - } - # nolint end - jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode) + jmode <- convertToJSaveMode(mode) options <- varargsToEnv(...) - callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options) + + write <- callJMethod(df@sdf, "write") + write <- callJMethod(write, "format", source) + write <- callJMethod(write, "mode", jmode) + write <- callJMethod(write, "options", options) + callJMethod(write, "saveAsTable", tableName) }) #' summary diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 9243d70e66f75..3242d29123bcd 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -256,9 +256,11 @@ jsonFile <- function(sqlContext, path) { # TODO: support schema jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { + .Deprecated("read.json") rdd <- serializeToString(rdd) if (is.null(schema)) { - sdf <- callJMethod(sqlContext, "jsonRDD", callJMethod(getJRDD(rdd), "rdd"), samplingRatio) + read <- callJMethod(sqlContext, "read") + sdf <- callJMethod(read, "json", callJMethod(getJRDD(rdd), "rdd"), samplingRatio) dataFrame(sdf) } else { stop("not implemented") diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R index 356bcee3cf5c6..3ffd9a9890b2e 100644 --- a/R/pkg/R/column.R +++ b/R/pkg/R/column.R @@ -209,7 +209,7 @@ setMethod("cast", setMethod("%in%", signature(x = "Column"), function(x, table) { - jc <- callJMethod(x@jc, "in", as.list(table)) + jc <- callJMethod(x@jc, "isin", as.list(table)) return(column(jc)) }) diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 43105aaa38424..aa386e5da933b 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -641,3 +641,12 @@ assignNewEnv <- function(data) { splitString <- function(input) { Filter(nzchar, unlist(strsplit(input, ",|\\s"))) } + +convertToJSaveMode <- function(mode) { + allModes <- c("append", "overwrite", "error", "ignore") + if (!(mode %in% allModes)) { + stop('mode should be one of "append", "overwrite", "error", "ignore"') # nolint + } + jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode) + jmode +} diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 9e5d0ebf60720..39315f0ca8b56 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -420,18 +420,19 @@ test_that("read/write json files", { unlink(jsonPath3) }) -test_that("jsonRDD() on a RDD with json string", { - rdd <- parallelize(sc, mockLines) - expect_equal(count(rdd), 3) - df <- jsonRDD(sqlContext, rdd) - expect_is(df, "DataFrame") - expect_equal(count(df), 3) - - rdd2 <- flatMap(rdd, function(x) c(x, x)) - df <- jsonRDD(sqlContext, rdd2) - expect_is(df, "DataFrame") - expect_equal(count(df), 6) -}) +# test_that("jsonRDD() on a RDD with json string", { +# rdd <- parallelize(sc, mockLines) +# expect_equal(count(rdd), 3) +# # Suppress warnings because jsonRDD is deprecated +# df <- suppressWarnings(jsonRDD(sqlContext, rdd)) +# expect_is(df, "DataFrame") +# expect_equal(count(df), 3) +# +# rdd2 <- flatMap(rdd, function(x) c(x, x)) +# df <- suppressWarnings(jsonRDD(sqlContext, rdd2)) +# expect_is(df, "DataFrame") +# expect_equal(count(df), 6) +# }) test_that("test cache, uncache and clearCache", { df <- read.json(sqlContext, jsonPath) From ea9e8851c66b389774e56f302b05314f3fe9e12a Mon Sep 17 00:00:00 2001 From: felixcheung Date: Mon, 4 Jan 2016 18:54:11 -0800 Subject: [PATCH 2/4] remove jsonRDD tests for now --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 39315f0ca8b56..4620f6261e50d 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -420,19 +420,6 @@ test_that("read/write json files", { unlink(jsonPath3) }) -# test_that("jsonRDD() on a RDD with json string", { -# rdd <- parallelize(sc, mockLines) -# expect_equal(count(rdd), 3) -# # Suppress warnings because jsonRDD is deprecated -# df <- suppressWarnings(jsonRDD(sqlContext, rdd)) -# expect_is(df, "DataFrame") -# expect_equal(count(df), 3) -# -# rdd2 <- flatMap(rdd, function(x) c(x, x)) -# df <- suppressWarnings(jsonRDD(sqlContext, rdd2)) -# expect_is(df, "DataFrame") -# expect_equal(count(df), 6) -# }) test_that("test cache, uncache and clearCache", { df <- read.json(sqlContext, jsonPath) From d8d9b21b9c030c379e6de021404925169ef78e79 Mon Sep 17 00:00:00 2001 From: felixcheung Date: Mon, 4 Jan 2016 18:56:39 -0800 Subject: [PATCH 3/4] reenable test --- dev/run-tests.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dev/run-tests.py b/dev/run-tests.py index acc9450586fe3..8726889cbc777 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -425,13 +425,12 @@ def run_build_tests(): def run_sparkr_tests(): - # set_title_and_block("Running SparkR tests", "BLOCK_SPARKR_UNIT_TESTS") + set_title_and_block("Running SparkR tests", "BLOCK_SPARKR_UNIT_TESTS") - # if which("R"): - # run_cmd([os.path.join(SPARK_HOME, "R", "run-tests.sh")]) - # else: - # print("Ignoring SparkR tests as R was not found in PATH") - pass + if which("R"): + run_cmd([os.path.join(SPARK_HOME, "R", "run-tests.sh")]) + else: + print("Ignoring SparkR tests as R was not found in PATH") def parse_opts(): From dc59f8472c95ec9ccc4182882c46ffd205498766 Mon Sep 17 00:00:00 2001 From: felixcheung Date: Mon, 4 Jan 2016 20:43:24 -0800 Subject: [PATCH 4/4] fix parquetFile and jsonRDD --- R/pkg/R/SQLContext.R | 8 +++----- R/pkg/inst/tests/testthat/test_sparkSQL.R | 12 ++++++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 3242d29123bcd..ccc683d86a3e5 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -260,7 +260,8 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { rdd <- serializeToString(rdd) if (is.null(schema)) { read <- callJMethod(sqlContext, "read") - sdf <- callJMethod(read, "json", callJMethod(getJRDD(rdd), "rdd"), samplingRatio) + # samplingRatio is deprecated + sdf <- callJMethod(read, "json", callJMethod(getJRDD(rdd), "rdd")) dataFrame(sdf) } else { stop("not implemented") @@ -291,10 +292,7 @@ read.parquet <- function(sqlContext, path) { # TODO: Implement saveasParquetFile and write examples for both parquetFile <- function(sqlContext, ...) { .Deprecated("read.parquet") - # Allow the user to have a more flexible definiton of the text file path - paths <- lapply(list(...), function(x) suppressWarnings(normalizePath(x))) - sdf <- callJMethod(sqlContext, "parquetFile", paths) - dataFrame(sdf) + read.parquet(sqlContext, unlist(list(...))) } #' SQL Query diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 4620f6261e50d..ebe8faa34cf7d 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -420,6 +420,18 @@ test_that("read/write json files", { unlink(jsonPath3) }) +test_that("jsonRDD() on a RDD with json string", { + rdd <- parallelize(sc, mockLines) + expect_equal(count(rdd), 3) + df <- suppressWarnings(jsonRDD(sqlContext, rdd)) + expect_is(df, "DataFrame") + expect_equal(count(df), 3) + + rdd2 <- flatMap(rdd, function(x) c(x, x)) + df <- suppressWarnings(jsonRDD(sqlContext, rdd2)) + expect_is(df, "DataFrame") + expect_equal(count(df), 6) +}) test_that("test cache, uncache and clearCache", { df <- read.json(sqlContext, jsonPath)