Skip to content
39 changes: 30 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ val df: DataFrame = sqlContext.read
// Data Source API to write the data back to another table

df.write
.format("com.databricks.spark.redshift")
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("tempdir", "s3n://path/for/temp/data")
.mode("error")
.save()
.option("avrocompression", "snappy")
.mode("error")
.save()
```

#### Python
Expand Down Expand Up @@ -105,12 +106,13 @@ df = sql_context.read \

# Write back to a table
df.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3n://path/for/temp/data") \
.mode("error") \
.save()
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3n://path/for/temp/data") \
.option("avrocompression", "snappy")
.mode("error") \
.save()
```

#### SQL
Expand All @@ -120,6 +122,7 @@ CREATE TABLE my_table
USING com.databricks.spark.redshift
OPTIONS (dbtable 'my_table',
tempdir 's3n://my_bucket/tmp',
avrocompression 'snappy',
url 'jdbc:redshift://host:port/db?user=username&password=pass');
```

Expand Down Expand Up @@ -299,6 +302,24 @@ It may be useful to have some <tt>GRANT</tt> commands or similar run here when l
table, the changes will be reverted and the backup table restored if post actions fail.</p>
</td>
</tr>
<tr>
<td><tt>avrocompression</tt></td>
<td>No</td>
<td>No compression (unless set in Hadoop config)</td>
<td>
<p>Sets the compression codec to use on the Avro data to be loaded into Redshift. This overwrites the <tt>avro.output.codec</tt>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to enumerate the available options here, to save people researching how it's plumbed through. I believe "snappy", "bzip2and "gzip" will all work.

Maybe it's worth us supporting an explicit "none" option for the sake of clarity? Now that I think about it, the empty string / null option to disable compression is a bit unnecessarily subtle?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add the options to the table, I think another valid one is "deflate".
I'm not sure that an empty string is so bad - leaving compression settings alone is not really the same as "none", unless "none" would explicitly disable compression.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to this: https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files only "deflate" and "snappy" are available (and "null" for no compression).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably verify that Redshift supports all of these compression modes. I don't see why it would necessarily support fewer of them, but it's possible. I may be able to take care of this as part of integration testing (I might add a new suite which tests all of the compression options).

key in the Hadoop configuration with the specified value and also sets <tt>mapred.output.compress = true</tt> and
<tt>mapred.output.compression.type = BLOCK</tt>. If left unset (or set to null or an empty string) it will leave
the Hadoop configuration unchanged.</p>
<p>Valid settings are:</p>
<ul>
<li><tt>""</tt> (default): use compression settings from Hadoop config (usually none unless explicitly set).</li>
<li><tt>"snappy"</tt>: use snappy compression.</li>
<li><tt>"deflate"</tt>: use deflate (zlib) compression (better ratio but more CPU intensive than snappy).</li>
<li><tt>"null"</tt>: disable compression.</li>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? It looks like this just leaves the default configuration options unchanged.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would overwrite whatever value was set before with the string "null", which I believe is a dummy codec that does no compression (at least I think I read that somewhere, but can't find it now).

</ul>
</td>
</tr>
</table>

## Additional configuration options
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2015 TouchType Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.databricks.spark.redshift

import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

/**
* End-to-end tests for loading/unloading data from Redshift using Avro
* compression.
*/
class CompressionIntegrationSuite extends IntegrationSuiteBase {

test("roundtrip save and load with Avro snappy compression") {
val tableName = s"roundtrip_save_and_load$randomSuffix"
val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))),
StructType(StructField("a", IntegerType) :: Nil))
try {
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.option("avrocompression", "snappy")
.mode(SaveMode.ErrorIfExists)
.save()

assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.option("avrocompression", "snappy")
.load()
assert(loadedDf.schema.length === 1)
assert(loadedDf.columns === Seq("a"))
checkAnswer(loadedDf, Seq(Row(1)))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("roundtrip save and load with Avro deflate compression") {
val tableName = s"roundtrip_save_and_load$randomSuffix"
val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))),
StructType(StructField("a", IntegerType) :: Nil))
try {
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.option("avrocompression", "deflate")
.mode(SaveMode.ErrorIfExists)
.save()

assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.option("avrocompression", "deflate")
.load()
assert(loadedDf.schema.length === 1)
assert(loadedDf.columns === Seq("a"))
checkAnswer(loadedDf, Seq(Row(1)))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}
}
10 changes: 9 additions & 1 deletion src/main/scala/com/databricks/spark/redshift/Parameters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ private[redshift] object Parameters {
"overwrite" -> "false",
"diststyle" -> "EVEN",
"usestagingtable" -> "true",
"postactions" -> ";"
"postactions" -> ";",
"avrocompression" -> ""
)

/**
Expand Down Expand Up @@ -187,5 +188,12 @@ private[redshift] object Parameters {
sessionToken <- parameters.get("temporary_aws_session_token")
) yield new BasicSessionCredentials(accessKey, secretAccessKey, sessionToken)
}

/**
* When nonempty/non-null sets the compression codec to use for writing Avro data.
*
* Defaults to disabled (i.e. whatever is set in Hadoop config).
*/
def avrocompression: String = parameters("avrocompression")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,14 @@ private[redshift] class RedshiftWriter(
val conn = jdbcWrapper.getConnector(params.jdbcDriver, params.jdbcUrl)

val tempDir = params.createPerQueryTempDir()

if (params.avrocompression != null && params.avrocompression.nonEmpty) {
val conf = sqlContext.sparkContext.hadoopConfiguration

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of mutating the (effectively) global Hadoop configuration, I think that it would be better to clone this configuration and pass our own local copy throughout the Redshift library. I'm going to make a separate pull request to perform this cleanup.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, there's nowhere else that we mutate this configuration, so this change will have to be part of this patch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it wasn't immediately obvious to me how to copy and pass through a new config, but that would be much nicer than mutating the global config.

conf.set("mapred.output.compress", "true")
conf.set("mapred.output.compression.type", "BLOCK")
conf.set("avro.output.codec", params.avrocompression)
}

try {
if (params.overwrite && params.useStagingTable) {
withStagingTable(conn, params.table.get, stagingTable => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,49 @@ class RedshiftSourceSuite
checkAnswer(written, TestUtils.expectedDataWithConvertedTimesAndDates)
}

test("DefaultSource serializes data as Avro when avrocompression is enabled") {

val params = defaultParams ++ Map(
"postactions" -> "GRANT SELECT ON %s TO jeremy",
"diststyle" -> "KEY",
"distkey" -> "testInt",
"avrocompression" -> "snappy")

val expectedCommands =
Seq("DROP TABLE IF EXISTS test_table_staging_.*".r,
"CREATE TABLE IF NOT EXISTS test_table_staging.* DISTSTYLE KEY DISTKEY \\(testInt\\).*".r,
"COPY test_table_staging_.*".r,
"GRANT SELECT ON test_table_staging.+ TO jeremy".r,
"ALTER TABLE test_table RENAME TO test_table_backup_.*".r,
"ALTER TABLE test_table_staging_.* RENAME TO test_table".r,
"DROP TABLE IF EXISTS test_table_backup.*".r)

val jdbcWrapper = mockJdbcWrapper(params("url"), expectedCommands)

(jdbcWrapper.tableExists _)
.expects(*, "test_table")
.returning(true)
.anyNumberOfTimes()

(jdbcWrapper.schemaString _)
.expects(*)
.returning("schema")
.anyNumberOfTimes()

val relation = RedshiftRelation(
jdbcWrapper, _ => mockS3Client, Parameters.mergeParameters(params), None)(testSqlContext)
relation.asInstanceOf[InsertableRelation].insert(expectedDataDF, true)

// Make sure we wrote the data out ready for Redshift load, in the expected formats
// The data should have been written to a random subdirectory of `tempdir`. Since we clear
// `tempdir` between every unit test, there should only be one directory here.
// Note: this does not actually test that the written files are properly compressed.
assert(s3FileSystem.listStatus(new Path(s3TempDir)).length === 1)
val dirWithAvroFiles = s3FileSystem.listStatus(new Path(s3TempDir)).head.getPath.toUri.toString
val written = testSqlContext.read.format("com.databricks.spark.avro").load(dirWithAvroFiles)
checkAnswer(written, TestUtils.expectedDataWithConvertedTimesAndDates)
}

test("Cannot write table with column names that become ambiguous under case insensitivity") {
val jdbcWrapper = mock[JDBCWrapper]
val mockedConnection = mock[Connection]
Expand Down