From aefcdf332e324125ef2eb6bf5700b9a4e9c00d14 Mon Sep 17 00:00:00 2001 From: Ajith Date: Sun, 29 Dec 2019 12:38:56 +0530 Subject: [PATCH 1/5] Support LOCAL keyword in INSERT OVERWRITE DIRECTORY --- .../spark/sql/execution/SparkSqlParser.scala | 5 ----- .../apache/spark/sql/sources/InsertSuite.scala | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index aa139cb6b0c3b..ada0962838a6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -761,11 +761,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { */ override def visitInsertOverwriteDir( ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) { - if (ctx.LOCAL != null) { - throw new ParseException( - "LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source", ctx) - } - val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) var storage = DataSource.buildStorageFormatFromOptions(options) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index bcff30a51c3f5..d64f181013d94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -820,6 +820,21 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } } + + test("SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY to data source") { + withTempDir { dir => + val path = dir.toURI.getPath + sql(s"""create table tab1 ( a int) location '$path'""") + sql("insert into tab1 values(1)") + checkAnswer(sql("select * from tab1"), Seq(1).map(i => Row(i))) + sql("create table tab2 ( a int)") + sql("insert into tab2 values(2)") + checkAnswer(sql("select * from tab2"), Seq(2).map(i => Row(i))) + sql(s"""insert overwrite local directory '$path' using parquet select * from tab2""") + sql("refresh table tab1") + checkAnswer(sql("select * from tab1"), Seq(2).map(i => Row(i))) + } + } } class FileExistingTestFileSystem extends RawLocalFileSystem { From 37caeba8b672faef32d3e544e84a0d99f73f3657 Mon Sep 17 00:00:00 2001 From: Ajith Date: Mon, 30 Dec 2019 14:46:19 +0530 Subject: [PATCH 2/5] Force location scheme to be file:// incase of insert overwrite local directory --- .../org/apache/spark/sql/execution/SparkSqlParser.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ada0962838a6b..b443d678050c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import java.util.Locale +import javax.ws.rs.core.UriBuilder import scala.collection.JavaConverters._ @@ -772,7 +773,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } if (!path.isEmpty) { - val customLocation = Some(CatalogUtils.stringToURI(path)) + val customLocation = if (ctx.LOCAL() != null) { + // force scheme to be file rather than fs.default.name + Some(UriBuilder.fromUri(CatalogUtils.stringToURI(path)).scheme("file").build()) + } else { + Some(CatalogUtils.stringToURI(path)) + } storage = storage.copy(locationUri = customLocation) } From a18da2e098422421b7e7836f5b235a52ff5e904f Mon Sep 17 00:00:00 2001 From: Ajith Date: Mon, 30 Dec 2019 19:31:44 +0530 Subject: [PATCH 3/5] Fix testcase --- .../test/scala/org/apache/spark/sql/sources/InsertSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index d64f181013d94..a3911ca3c6bb9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -822,7 +822,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } test("SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY to data source") { - withTempDir { dir => + withTempPath { dir => val path = dir.toURI.getPath sql(s"""create table tab1 ( a int) location '$path'""") sql("insert into tab1 values(1)") From 9e849d1eb44d820de9da29d0577bbfcafa2c25f7 Mon Sep 17 00:00:00 2001 From: Ajith Date: Fri, 17 Jan 2020 14:11:37 +0530 Subject: [PATCH 4/5] Align with Hive behaviour --- .../spark/sql/execution/SparkSqlParser.scala | 20 +++++++++++++------ .../spark/sql/sources/InsertSuite.scala | 8 ++++++++ 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index b443d678050c6..806bc9f0717f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -773,15 +773,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } if (!path.isEmpty) { - val customLocation = if (ctx.LOCAL() != null) { - // force scheme to be file rather than fs.default.name - Some(UriBuilder.fromUri(CatalogUtils.stringToURI(path)).scheme("file").build()) - } else { - Some(CatalogUtils.stringToURI(path)) - } + val customLocation = Some(CatalogUtils.stringToURI(path)) storage = storage.copy(locationUri = customLocation) } + if (ctx.LOCAL() != null) { + // assert if directory is local when LOCAL keyword is mentioned + val scheme = Option(storage.locationUri.get.getScheme) + scheme match { + case None => + // force scheme to be file rather than fs.default.name + val loc = Some(UriBuilder.fromUri(CatalogUtils.stringToURI(path)).scheme("file").build()) + storage = storage.copy(locationUri = loc) + case Some(pathScheme) if (!pathScheme.equals("file")) => + throw new ParseException("LOCAL is supported only with file: scheme", ctx) + } + } + val provider = ctx.tableProvider.multipartIdentifier.getText (false, storage, Some(provider)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index a3911ca3c6bb9..0101803561c90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SharedSparkSession @@ -835,6 +836,13 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { checkAnswer(sql("select * from tab1"), Seq(2).map(i => Row(i))) } } + + test("SPARK-29174 fail LOCAL in INSERT OVERWRITE DIRECT remote path") { + val message = intercept[ParseException] { + sql("insert overwrite local directory 'hdfs:/abcd' using parquet select 1") + }.getMessage + assert(message.contains("LOCAL is supported only with file: scheme")) + } } class FileExistingTestFileSystem extends RawLocalFileSystem { From 6ddf84a68d9590f3ac7d2cd8d1b30ada6b699c64 Mon Sep 17 00:00:00 2001 From: Ajith Date: Mon, 17 Feb 2020 16:49:31 +0530 Subject: [PATCH 5/5] Update the comment --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 806bc9f0717f7..078813b7d631d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -754,7 +754,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * * Expected format: * {{{ - * INSERT OVERWRITE DIRECTORY + * INSERT OVERWRITE [LOCAL] DIRECTORY * [path] * [OPTIONS table_property_list] * select_statement;