From 4ef3d0bcbffbf2104026edddf6489c87eb3f852b Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Thu, 23 Jul 2015 18:13:58 +0100
Subject: [PATCH 01/12] Parameter to set Avro compression
---
README.md | 10 ++++++++++
.../com/databricks/spark/redshift/Parameters.scala | 10 +++++++++-
.../com/databricks/spark/redshift/RedshiftWriter.scala | 6 ++++++
3 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/README.md b/README.md
index e23debf4..5ed41889 100644
--- a/README.md
+++ b/README.md
@@ -279,6 +279,16 @@ It may be useful to have some GRANT commands or similar run here when l
table, the changes will be reverted and the backup table restored if post actions fail.
+
+ | avrocompression |
+ No |
+ snappy |
+
+ Sets the compression codec to use on the Avro data to be loaded into Redshift. This overwrites the avro.output.codec
+key in the Hadoop configuration with the specified value. To disable this and use the value set in the Hadoop configuration,
+set this to null or an empty string.
+ |
+
## AWS Credentials
diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
index 4be90554..f7d2a250 100644
--- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala
+++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
@@ -39,7 +39,8 @@ private[redshift] object Parameters extends Logging {
"overwrite" -> "false",
"diststyle" -> "EVEN",
"usestagingtable" -> "true",
- "postactions" -> ";"
+ "postactions" -> ";",
+ "avrocompression" -> "snappy"
)
/**
@@ -210,5 +211,12 @@ private[redshift] object Parameters extends Logging {
credentials
}
}
+
+ /**
+ * When nonempty/non-null sets the compression codec to use for writing Avro data.
+ *
+ * Defaults to snappy.
+ */
+ def avrocompression = parameters("avrocompression")
}
}
diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
index 59f28998..aab22586 100644
--- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
+++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
@@ -180,6 +180,12 @@ class RedshiftWriter(jdbcWrapper: JDBCWrapper) extends Logging {
def saveToRedshift(sqlContext: SQLContext, data: DataFrame, params: MergedParameters) : Unit = {
val conn = jdbcWrapper.getConnector(params.jdbcDriver, params.jdbcUrl, new Properties()).apply()
+ if (params.avrocompression != null && params.avrocompression.nonEmpty) {
+ val conf = sqlContext.sparkContext.hadoopConfiguration
+ conf.set("mapred.output.compress", "true")
+ conf.set("mapred.output.compression.type", "BLOCK")
+ conf.set("avro.output.codec", params.avrocompression)
+ }
try {
if (params.overwrite && params.useStagingTable) {
withStagingTable(conn, params, table => {
From 8069aa56cdcf679e15e4e17639d1037f76c028bc Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Thu, 23 Jul 2015 18:25:53 +0100
Subject: [PATCH 02/12] Default to not munging Hadoop config, but include in
examples (and clean up example syntax)
---
README.md | 47 ++++++++++---------
.../spark/redshift/Parameters.scala | 2 +-
2 files changed, 26 insertions(+), 23 deletions(-)
diff --git a/README.md b/README.md
index 5ed41889..3d791d23 100644
--- a/README.md
+++ b/README.md
@@ -50,21 +50,22 @@ val sqlContext = new SQLContext(sc)
// Get some data from a Redshift table
val df: DataFrame = sqlContext.read
.format("com.databricks.spark.redshift")
- .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
- .option("dbtable" -> "my_table")
- .option("tempdir" -> "s3://path/for/temp/data")
+ .option("url", "jdbc:postgresql://redshifthost:5439/database?user=username&password=pass")
+ .option("dbtable", "my_table")
+ .option("tempdir", "s3://path/for/temp/data")
.load()
// Apply some transformations to the data as per normal, then you can use the
// Data Source API to write the data back to another table
df.write
- .format("com.databricks.spark.redshift")
- .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
- .option("dbtable" -> "my_table_copy")
- .option("tempdir" -> "s3://path/for/temp/data")
- .mode("error")
- .save()
+ .format("com.databricks.spark.redshift")
+ .option("url", "jdbc:postgresql://redshifthost:5439/database?user=username&password=pass")
+ .option("dbtable", "my_table_copy")
+ .option("tempdir", "s3://path/for/temp/data")
+ .option("avrocompression", "snappy")
+ .mode("error")
+ .save()
```
#### Python
@@ -78,19 +79,20 @@ sql_context = SQLContext(sc)
# Read data from a table
df = sql_context.read \
.format("com.databricks.spark.redshift") \
- .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
- .option("dbtable" -> "my_table") \
- .option("tempdir" -> "s3://path/for/temp/data") \
+ .option("url", "jdbc:postgresql://redshifthost:5439/database?user=username&password=pass") \
+ .option("dbtable", "my_table") \
+ .option("tempdir", "s3://path/for/temp/data") \
.load()
# Write back to a table
df.write \
- .format("com.databricks.spark.redshift")
- .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
- .option("dbtable" -> "my_table_copy") \
- .option("tempdir" -> "s3://path/for/temp/data") \
- .mode("error")
- .save()
+ .format("com.databricks.spark.redshift")
+ .option("url", "jdbc:postgresql://redshifthost:5439/database?user=username&password=pass") \
+ .option("dbtable", "my_table_copy") \
+ .option("tempdir", "s3://path/for/temp/data") \
+ .option("avrocompression", "snappy")
+ .mode("error")
+ .save()
```
#### SQL
@@ -100,7 +102,8 @@ CREATE TABLE my_table
USING com.databricks.spark.redshift
OPTIONS (dbtable 'my_table',
tempdir 's3://my_bucket/tmp',
- url 'jdbc:redshift://host:port/db?user=username&password=pass');
+ avrocompression 'snappy',
+ url 'jdbc:postgresql://host:port/db?user=username&password=pass');
```
### Scala helper functions
@@ -282,11 +285,11 @@ table, the changes will be reverted and the backup table restored if post action
| avrocompression |
No |
- snappy |
+ No compression (unless set in Hadoop config) |
Sets the compression codec to use on the Avro data to be loaded into Redshift. This overwrites the avro.output.codec
-key in the Hadoop configuration with the specified value. To disable this and use the value set in the Hadoop configuration,
-set this to null or an empty string.
+key in the Hadoop configuration with the specified value. If left unset (or set to null or an empty string) it will leave
+the Hadoop configuration unchanged.
|
diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
index f7d2a250..4caf0096 100644
--- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala
+++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
@@ -40,7 +40,7 @@ private[redshift] object Parameters extends Logging {
"diststyle" -> "EVEN",
"usestagingtable" -> "true",
"postactions" -> ";",
- "avrocompression" -> "snappy"
+ "avrocompression" -> ""
)
/**
From c1fa793f67790d0755eee0475be24f53881df3b9 Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Fri, 24 Jul 2015 09:09:19 +0100
Subject: [PATCH 03/12] Improve readme
---
README.md | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/README.md b/README.md
index 3d791d23..4e05c44c 100644
--- a/README.md
+++ b/README.md
@@ -288,7 +288,8 @@ table, the changes will be reverted and the backup table restored if post action
No compression (unless set in Hadoop config) |
Sets the compression codec to use on the Avro data to be loaded into Redshift. This overwrites the avro.output.codec
-key in the Hadoop configuration with the specified value. If left unset (or set to null or an empty string) it will leave
+key in the Hadoop configuration with the specified value and also sets mapred.output.compress = true and
+mapred.output.compression.type = BLOCK. If left unset (or set to null or an empty string) it will leave
the Hadoop configuration unchanged.
|
From 93721664583b171f541240983d1c2c5761bf3620 Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Fri, 24 Jul 2015 09:40:30 +0100
Subject: [PATCH 04/12] Test things don't break with compression enabled (not
sure how to test if data is actually compressed)
---
.../spark/redshift/RedshiftSourceSuite.scala | 51 +++++++++++++++++++
1 file changed, 51 insertions(+)
diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
index 7d6da05d..ac6069c6 100644
--- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
+++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
@@ -272,6 +272,57 @@ class RedshiftSourceSuite
checkAnswer(written, TestUtils.expectedDataWithConvertedTimesAndDates)
}
+ test("DefaultSource serializes data as Avro when avrocompression is enabled") {
+
+ val testSqlContext = new SQLContext(sc)
+
+ val jdbcUrl = "jdbc:postgresql://foo/bar"
+ val params =
+ Map("url" -> jdbcUrl,
+ "tempdir" -> tempDir,
+ "dbtable" -> "test_table",
+ "aws_access_key_id" -> "test1",
+ "aws_secret_access_key" -> "test2",
+ "postactions" -> "GRANT SELECT ON %s TO jeremy",
+ "diststyle" -> "KEY",
+ "distkey" -> "testInt",
+ "avrocompression" -> "snappy")
+
+ val rdd = sc.parallelize(expectedData.toSeq)
+ val df = testSqlContext.createDataFrame(rdd, TestUtils.testSchema)
+
+ val expectedCommands =
+ Seq("DROP TABLE IF EXISTS test_table_staging_.*".r,
+ "CREATE TABLE IF NOT EXISTS test_table_staging.* DISTSTYLE KEY DISTKEY \\(testInt\\).*".r,
+ "COPY test_table_staging_.*".r,
+ "GRANT SELECT ON test_table_staging.+ TO jeremy".r,
+ "ALTER TABLE test_table RENAME TO test_table_backup_.*".r,
+ "ALTER TABLE test_table_staging_.* RENAME TO test_table".r,
+ "DROP TABLE test_table_backup.*".r)
+
+ val jdbcWrapper = mockJdbcWrapper(jdbcUrl, expectedCommands)
+
+ (jdbcWrapper.tableExists _)
+ .expects(*, "test_table")
+ .returning(true)
+ .anyNumberOfTimes()
+
+ (jdbcWrapper.schemaString _)
+ .expects(*, jdbcUrl)
+ .returning("schema")
+ .anyNumberOfTimes()
+
+ val relation = RedshiftRelation(jdbcWrapper, Parameters.mergeParameters(params), None)(testSqlContext)
+ relation.asInstanceOf[InsertableRelation].insert(df, true)
+
+ // Make sure we wrote the data out ready for Redshift load, in the expected formats
+ val written = testSqlContext.read.format("com.databricks.spark.avro").load(tempDir)
+ written.collect() zip expectedData foreach {
+ case (loaded, expected) =>
+ loaded shouldBe expected
+ }
+ }
+
test("Failed copies are handled gracefully when using a staging table") {
val params = defaultParams ++ Map("usestagingtable" -> "true")
From d7010f8c69093c17b279c3977d1a167c06ef191a Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Fri, 24 Jul 2015 09:42:46 +0100
Subject: [PATCH 05/12] Update avrocompression comment
---
src/main/scala/com/databricks/spark/redshift/Parameters.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
index 4caf0096..9f3b9d81 100644
--- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala
+++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
@@ -215,7 +215,7 @@ private[redshift] object Parameters extends Logging {
/**
* When nonempty/non-null sets the compression codec to use for writing Avro data.
*
- * Defaults to snappy.
+ * Defaults to disabled (i.e. whatever is set in Hadoop config).
*/
def avrocompression = parameters("avrocompression")
}
From 1bda5a0dd3f5806516e1e95d7859b9a470ad5614 Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Fri, 24 Jul 2015 10:10:17 +0100
Subject: [PATCH 06/12] Improve readme
---
README.md | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/README.md b/README.md
index 4e05c44c..962c11e2 100644
--- a/README.md
+++ b/README.md
@@ -291,6 +291,13 @@ table, the changes will be reverted and the backup table restored if post action
key in the Hadoop configuration with the specified value and also sets mapred.output.compress = true and
mapred.output.compression.type = BLOCK. If left unset (or set to null or an empty string) it will leave
the Hadoop configuration unchanged.
+Valid settings are:
+
+ - "" (default): use compression settings from Hadoop config (usually none unless explicitly set).
+ - "snappy": use snappy compression.
+ - "deflate": use deflate (zlib) compression (better ratio but more CPU intensive than snappy).
+ - "null": disable compression.
+
From 5227c6df7d5f23fed8cab8d70b4c70666df36d24 Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Tue, 25 Aug 2015 09:48:25 +0100
Subject: [PATCH 07/12] fix tests
---
.../spark/redshift/RedshiftSourceSuite.scala | 50 +++++++++----------
1 file changed, 24 insertions(+), 26 deletions(-)
diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
index ac6069c6..e8cbd4cd 100644
--- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
+++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
@@ -274,22 +274,17 @@ class RedshiftSourceSuite
test("DefaultSource serializes data as Avro when avrocompression is enabled") {
- val testSqlContext = new SQLContext(sc)
-
- val jdbcUrl = "jdbc:postgresql://foo/bar"
- val params =
- Map("url" -> jdbcUrl,
- "tempdir" -> tempDir,
- "dbtable" -> "test_table",
- "aws_access_key_id" -> "test1",
- "aws_secret_access_key" -> "test2",
- "postactions" -> "GRANT SELECT ON %s TO jeremy",
- "diststyle" -> "KEY",
- "distkey" -> "testInt",
- "avrocompression" -> "snappy")
-
- val rdd = sc.parallelize(expectedData.toSeq)
- val df = testSqlContext.createDataFrame(rdd, TestUtils.testSchema)
+ //val testSqlContext = new SQLContext(sc)
+
+ //val jdbcUrl = "jdbc:postgresql://foo/bar"
+ val params = defaultParams ++ Map(
+ "postactions" -> "GRANT SELECT ON %s TO jeremy",
+ "diststyle" -> "KEY",
+ "distkey" -> "testInt",
+ "avrocompression" -> "snappy")
+
+ //val rdd = sc.parallelize(TestUtils.expectedData.toSeq)
+ //val df = testSqlContext.createDataFrame(rdd, TestUtils.testSchema)
val expectedCommands =
Seq("DROP TABLE IF EXISTS test_table_staging_.*".r,
@@ -298,9 +293,9 @@ class RedshiftSourceSuite
"GRANT SELECT ON test_table_staging.+ TO jeremy".r,
"ALTER TABLE test_table RENAME TO test_table_backup_.*".r,
"ALTER TABLE test_table_staging_.* RENAME TO test_table".r,
- "DROP TABLE test_table_backup.*".r)
+ "DROP TABLE IF EXISTS test_table_backup.*".r)
- val jdbcWrapper = mockJdbcWrapper(jdbcUrl, expectedCommands)
+ val jdbcWrapper = mockJdbcWrapper(params("url"), expectedCommands)
(jdbcWrapper.tableExists _)
.expects(*, "test_table")
@@ -308,19 +303,22 @@ class RedshiftSourceSuite
.anyNumberOfTimes()
(jdbcWrapper.schemaString _)
- .expects(*, jdbcUrl)
+ .expects(*)
.returning("schema")
.anyNumberOfTimes()
- val relation = RedshiftRelation(jdbcWrapper, Parameters.mergeParameters(params), None)(testSqlContext)
- relation.asInstanceOf[InsertableRelation].insert(df, true)
+ val relation =
+ RedshiftRelation(jdbcWrapper, Parameters.mergeParameters(params), None)(testSqlContext)
+ relation.asInstanceOf[InsertableRelation].insert(expectedDataDF, true)
// Make sure we wrote the data out ready for Redshift load, in the expected formats
- val written = testSqlContext.read.format("com.databricks.spark.avro").load(tempDir)
- written.collect() zip expectedData foreach {
- case (loaded, expected) =>
- loaded shouldBe expected
- }
+ // The data should have been written to a random subdirectory of `tempdir`. Since we clear
+ // `tempdir` between every unit test, there should only be one directory here.
+ // Note: this does not actually test that the written files are properly compressed.
+ assert(tempDir.list().length === 1)
+ val dirWithAvroFiles = tempDir.listFiles().head.toURI.toString
+ val written = testSqlContext.read.format("com.databricks.spark.avro").load(dirWithAvroFiles)
+ checkAnswer(written, TestUtils.expectedDataWithConvertedTimesAndDates)
}
test("Failed copies are handled gracefully when using a staging table") {
From e94e807e7f46d1d51317c164c204245f7e63bab2 Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Tue, 25 Aug 2015 09:58:39 +0100
Subject: [PATCH 08/12] fix scalastyle
---
src/main/scala/com/databricks/spark/redshift/Parameters.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
index 9f3b9d81..5dde2afd 100644
--- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala
+++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
@@ -217,6 +217,6 @@ private[redshift] object Parameters extends Logging {
*
* Defaults to disabled (i.e. whatever is set in Hadoop config).
*/
- def avrocompression = parameters("avrocompression")
+ def avrocompression: String = parameters("avrocompression")
}
}
From 0db1708be6ec0d29698cd7b173cd92df940b148c Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Tue, 25 Aug 2015 10:06:38 +0100
Subject: [PATCH 09/12] Remove commented out code
---
.../com/databricks/spark/redshift/RedshiftSourceSuite.scala | 6 ------
1 file changed, 6 deletions(-)
diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
index e8cbd4cd..9749fbc3 100644
--- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
+++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
@@ -274,18 +274,12 @@ class RedshiftSourceSuite
test("DefaultSource serializes data as Avro when avrocompression is enabled") {
- //val testSqlContext = new SQLContext(sc)
-
- //val jdbcUrl = "jdbc:postgresql://foo/bar"
val params = defaultParams ++ Map(
"postactions" -> "GRANT SELECT ON %s TO jeremy",
"diststyle" -> "KEY",
"distkey" -> "testInt",
"avrocompression" -> "snappy")
- //val rdd = sc.parallelize(TestUtils.expectedData.toSeq)
- //val df = testSqlContext.createDataFrame(rdd, TestUtils.testSchema)
-
val expectedCommands =
Seq("DROP TABLE IF EXISTS test_table_staging_.*".r,
"CREATE TABLE IF NOT EXISTS test_table_staging.* DISTSTYLE KEY DISTKEY \\(testInt\\).*".r,
From ef056261c0f1a4a71c29eba113d35038825c86eb Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Tue, 25 Aug 2015 23:37:22 +0100
Subject: [PATCH 10/12] Use redshift JDBC subprotocol
---
README.md | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/README.md b/README.md
index 962c11e2..71577498 100644
--- a/README.md
+++ b/README.md
@@ -50,7 +50,7 @@ val sqlContext = new SQLContext(sc)
// Get some data from a Redshift table
val df: DataFrame = sqlContext.read
.format("com.databricks.spark.redshift")
- .option("url", "jdbc:postgresql://redshifthost:5439/database?user=username&password=pass")
+ .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table")
.option("tempdir", "s3://path/for/temp/data")
.load()
@@ -60,7 +60,7 @@ val df: DataFrame = sqlContext.read
df.write
.format("com.databricks.spark.redshift")
- .option("url", "jdbc:postgresql://redshifthost:5439/database?user=username&password=pass")
+ .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("tempdir", "s3://path/for/temp/data")
.option("avrocompression", "snappy")
@@ -79,7 +79,7 @@ sql_context = SQLContext(sc)
# Read data from a table
df = sql_context.read \
.format("com.databricks.spark.redshift") \
- .option("url", "jdbc:postgresql://redshifthost:5439/database?user=username&password=pass") \
+ .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable", "my_table") \
.option("tempdir", "s3://path/for/temp/data") \
.load()
@@ -87,7 +87,7 @@ df = sql_context.read \
# Write back to a table
df.write \
.format("com.databricks.spark.redshift")
- .option("url", "jdbc:postgresql://redshifthost:5439/database?user=username&password=pass") \
+ .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3://path/for/temp/data") \
.option("avrocompression", "snappy")
@@ -103,7 +103,7 @@ USING com.databricks.spark.redshift
OPTIONS (dbtable 'my_table',
tempdir 's3://my_bucket/tmp',
avrocompression 'snappy',
- url 'jdbc:postgresql://host:port/db?user=username&password=pass');
+ url 'jdbc:redshift://host:port/db?user=username&password=pass');
```
### Scala helper functions
From 8a42daf14e4f90a9feaeb10143a77ac52ec63edb Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Wed, 9 Sep 2015 10:40:39 +0100
Subject: [PATCH 11/12] fix tests
---
.../databricks/spark/redshift/RedshiftSourceSuite.scala | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
index 7ca36715..29dd1b28 100644
--- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
+++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
@@ -377,16 +377,16 @@ class RedshiftSourceSuite
.returning("schema")
.anyNumberOfTimes()
- val relation =
- RedshiftRelation(jdbcWrapper, Parameters.mergeParameters(params), None)(testSqlContext)
+ val relation = RedshiftRelation(
+ jdbcWrapper, _ => mockS3Client, Parameters.mergeParameters(params), None)(testSqlContext)
relation.asInstanceOf[InsertableRelation].insert(expectedDataDF, true)
// Make sure we wrote the data out ready for Redshift load, in the expected formats
// The data should have been written to a random subdirectory of `tempdir`. Since we clear
// `tempdir` between every unit test, there should only be one directory here.
// Note: this does not actually test that the written files are properly compressed.
- assert(tempDir.list().length === 1)
- val dirWithAvroFiles = tempDir.listFiles().head.toURI.toString
+ assert(s3FileSystem.listStatus(new Path(s3TempDir)).length === 1)
+ val dirWithAvroFiles = s3FileSystem.listStatus(new Path(s3TempDir)).head.getPath.toUri.toString
val written = testSqlContext.read.format("com.databricks.spark.avro").load(dirWithAvroFiles)
checkAnswer(written, TestUtils.expectedDataWithConvertedTimesAndDates)
}
From 8d785d38bb6c594f88f4985c6f5ef8df42459e22 Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Wed, 9 Sep 2015 10:49:13 +0100
Subject: [PATCH 12/12] Start of compression integration suite
---
.../CompressionIntegrationSuite.scala | 89 +++++++++++++++++++
1 file changed, 89 insertions(+)
create mode 100644 src/it/scala/com/databricks/spark/redshift/CompressionIntegrationSuite.scala
diff --git a/src/it/scala/com/databricks/spark/redshift/CompressionIntegrationSuite.scala b/src/it/scala/com/databricks/spark/redshift/CompressionIntegrationSuite.scala
new file mode 100644
index 00000000..317a73ca
--- /dev/null
+++ b/src/it/scala/com/databricks/spark/redshift/CompressionIntegrationSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2015 TouchType Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databricks.spark.redshift
+
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+/**
+ * End-to-end tests for loading/unloading data from Redshift using Avro
+ * compression.
+ */
+class CompressionIntegrationSuite extends IntegrationSuiteBase {
+
+ test("roundtrip save and load with Avro snappy compression") {
+ val tableName = s"roundtrip_save_and_load$randomSuffix"
+ val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))),
+ StructType(StructField("a", IntegerType) :: Nil))
+ try {
+ df.write
+ .format("com.databricks.spark.redshift")
+ .option("url", jdbcUrl)
+ .option("dbtable", tableName)
+ .option("tempdir", tempDir)
+ .option("avrocompression", "snappy")
+ .mode(SaveMode.ErrorIfExists)
+ .save()
+
+ assert(DefaultJDBCWrapper.tableExists(conn, tableName))
+ val loadedDf = sqlContext.read
+ .format("com.databricks.spark.redshift")
+ .option("url", jdbcUrl)
+ .option("dbtable", tableName)
+ .option("tempdir", tempDir)
+ .option("avrocompression", "snappy")
+ .load()
+ assert(loadedDf.schema.length === 1)
+ assert(loadedDf.columns === Seq("a"))
+ checkAnswer(loadedDf, Seq(Row(1)))
+ } finally {
+ conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
+ conn.commit()
+ }
+ }
+
+ test("roundtrip save and load with Avro deflate compression") {
+ val tableName = s"roundtrip_save_and_load$randomSuffix"
+ val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))),
+ StructType(StructField("a", IntegerType) :: Nil))
+ try {
+ df.write
+ .format("com.databricks.spark.redshift")
+ .option("url", jdbcUrl)
+ .option("dbtable", tableName)
+ .option("tempdir", tempDir)
+ .option("avrocompression", "deflate")
+ .mode(SaveMode.ErrorIfExists)
+ .save()
+
+ assert(DefaultJDBCWrapper.tableExists(conn, tableName))
+ val loadedDf = sqlContext.read
+ .format("com.databricks.spark.redshift")
+ .option("url", jdbcUrl)
+ .option("dbtable", tableName)
+ .option("tempdir", tempDir)
+ .option("avrocompression", "deflate")
+ .load()
+ assert(loadedDf.schema.length === 1)
+ assert(loadedDf.columns === Seq("a"))
+ checkAnswer(loadedDf, Seq(Row(1)))
+ } finally {
+ conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
+ conn.commit()
+ }
+ }
+}