diff --git a/.travis.yml b/.travis.yml
index ac80e8a6959..24a55c583ac 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -51,6 +51,13 @@ script:
- ./testing/startSparkCluster.sh 1.1.1 2.3
- SPARK_HOME=./spark-1.1.1-bin-hadoop2.3 mvn verify -Pspark-1.1 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,spark'
- ./testing/stopSparkCluster.sh 1.1.1 2.3
+
+# CassandraSpark 1.3
+ - mvn clean package -DskipTests -Pspark-1.3 -Phadoop-2.3 -Pcassandra-spark-1.3 -B -pl 'zeppelin-interpreter,spark'
+ - mvn package -Pbuild-distr -Pspark-1.3 -Phadoop-2.3 -Pcassandra-spark-1.3 -B
+ - ./testing/startSparkCluster.sh 1.3.1 2.3
+ - SPARK_HOME=./spark-1.3.1-bin-hadoop2.3 mvn verify -Pspark-1.3 -Phadoop-2.3 -Pcassandra-spark-1.3 -B -pl 'zeppelin-interpreter,spark'
+ - ./testing/stopSparkCluster.sh 1.3.1 2.3
after_failure:
- cat target/rat.txt
diff --git a/spark/pom.xml b/spark/pom.xml
index b9327634818..cf9e8e84de6 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -38,6 +38,7 @@
1.4.0
2.10.4
2.10
+ **/CassandraSparkSqlInterpreterTest.java
2.3.0
${hadoop.version}
@@ -53,6 +54,7 @@
+
cloudera
https://repository.cloudera.com/artifactory/cloudera-repos/
@@ -494,6 +496,26 @@
junit
test
+
+
+ org.cassandraunit
+ cassandra-unit
+ 2.1.3.1
+ test
+
+
+
+ com.datastax.cassandra
+ *
+
+
+
+ org.apache.cassandra
+ cassandra-all
+
+
+
@@ -514,7 +536,7 @@
com.datastax.spark
spark-cassandra-connector_${scala.binary.version}
- 1.1.1
+ 1.1.2
org.joda
@@ -526,6 +548,7 @@
1.1.1
2.2.3-shaded-protobuf
+
@@ -542,8 +565,14 @@
cassandra-spark-1.2
1.2.1
+
+
+ org.apache.cassandra
+ cassandra-all
+ 2.1.8
+
com.datastax.spark
spark-cassandra-connector_${scala.binary.version}
@@ -574,15 +603,22 @@
cassandra-spark-1.3
1.3.0
+
+
+
+ org.apache.cassandra
+ cassandra-all
+ 2.1.8
+
com.datastax.spark
spark-cassandra-connector_${scala.binary.version}
-
- 1.3.0-SNAPSHOT
+
+ 1.3.0-M2
org.joda
@@ -893,6 +929,9 @@
maven-surefire-plugin
2.17
+
+ ${exclude.tests}
+
1
false
-Xmx1024m -XX:MaxPermSize=256m
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index aec6d16d55a..030f28dd29a 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -102,6 +102,8 @@ public class SparkInterpreter extends Interpreter {
getSystemDefault("SPARK_YARN_JAR", "spark.yarn.jar", ""),
"The location of the Spark jar file. If you use yarn as a cluster, "
+ "we should set this value")
+ .add("zeppelin.spark.useCassandraContext", "false",
+ "Use CassandraContext instead of SQLContext if it is true")
.add("zeppelin.spark.useHiveContext",
getSystemDefault("ZEPPELIN_SPARK_USEHIVECONTEXT",
"zeppelin.spark.useHiveContext", "true"),
@@ -164,30 +166,8 @@ private boolean useHiveContext() {
return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
}
- public SQLContext getSQLContext() {
- if (sqlc == null) {
- if (useHiveContext()) {
- String name = "org.apache.spark.sql.hive.HiveContext";
- Constructor> hc;
- try {
- hc = getClass().getClassLoader().loadClass(name)
- .getConstructor(SparkContext.class);
- sqlc = (SQLContext) hc.newInstance(getSparkContext());
- } catch (NoSuchMethodException | SecurityException
- | ClassNotFoundException | InstantiationException
- | IllegalAccessException | IllegalArgumentException
- | InvocationTargetException e) {
- logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
- // when hive dependency is not loaded, it'll fail.
- // in this case SQLContext can be used.
- sqlc = new SQLContext(getSparkContext());
- }
- } else {
- sqlc = new SQLContext(getSparkContext());
- }
- }
-
- return sqlc;
+ private boolean useCassandraContext() {
+ return Boolean.parseBoolean(getProperty("zeppelin.spark.useCassandraContext"));
}
public DependencyResolver getDependencyResolver() {
@@ -214,6 +194,45 @@ private DepInterpreter getDepInterpreter() {
return null;
}
+ private SQLContext loadCustomContext(final String contextName) {
+ Constructor> hc;
+ SQLContext context;
+ try {
+ hc = getClass().getClassLoader().loadClass(contextName)
+ .getConstructor(SparkContext.class);
+ context = (SQLContext) hc.newInstance(getSparkContext());
+ } catch (NoSuchMethodException | SecurityException
+ | ClassNotFoundException | InstantiationException
+ | IllegalAccessException | IllegalArgumentException
+ | InvocationTargetException e) {
+ logger.warn("Can't create " + contextName + ". Fallback to SQLContext", e);
+ // when hive dependency is not loaded, it'll fail.
+ // in this case SQLContext can be used.
+ context = new SQLContext(getSparkContext());
+ }
+ return context;
+ }
+
+ public SQLContext getSQLContext() {
+ if (sqlc == null) {
+ if (useCassandraContext() && useHiveContext())
+ throw new InterpreterException("Cassandra and Hive context are both enabled, " +
+ "please enable only one");
+
+ if (useCassandraContext()) {
+ sqlc = loadCustomContext("org.apache.spark.sql.cassandra.CassandraSQLContext");
+ logger.debug("Loading Cassandra SQL Context");
+ } else if (useHiveContext()) {
+ sqlc = loadCustomContext("org.apache.spark.sql.hive.HiveContext");
+ logger.debug("Loading Hive SQL Context");
+ } else {
+ sqlc = new SQLContext(getSparkContext());
+ logger.debug("Loading Standard SQL Context");
+ }
+ }
+ return sqlc;
+ }
+
public SparkContext createSparkContext() {
System.err.println("------ Create new SparkContext " + getProperty("master") + " -------");
diff --git a/spark/src/main/resources/cassandra.cql b/spark/src/main/resources/cassandra.cql
new file mode 100644
index 00000000000..ab79cc2069f
--- /dev/null
+++ b/spark/src/main/resources/cassandra.cql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE test (
+ name text,
+ age int,
+ PRIMARY KEY(name));
+
+INSERT INTO test (name, age) values ('moon', 33);
+INSERT INTO test (name, age) values ('jobs', 51);
+INSERT INTO test (name, age) values ('gates', 51);
+INSERT INTO test (name, age) values ('park', 34);
\ No newline at end of file
diff --git a/spark/src/main/resources/log4j-embedded-cassandra.properties b/spark/src/main/resources/log4j-embedded-cassandra.properties
new file mode 100644
index 00000000000..df58384071a
--- /dev/null
+++ b/spark/src/main/resources/log4j-embedded-cassandra.properties
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# for production, you should probably set the root to INFO
+# and the pattern to %c instead of %l. (%l is slower.)
+
+# output messages into a rolling log file as well as stdout
+log4j.rootLogger=ERROR,stdout,HColumnFamilyLogger
+
+# stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{3} - %m%n
+log4j.appender.stdout.follow=true
+
+log4j.appender.HColumnFamilyLogger=org.apache.log4j.ConsoleAppender
+log4j.appender.HColumnFamilyLogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.HColumnFamilyLogger.layout.ConversionPattern=%m%n
+log4j.category.HColumnFamilyLogger=DEBUG
+#log4j.category.org.apache=INFO, stdout
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java
new file mode 100644
index 00000000000..1d0519628f3
--- /dev/null
+++ b/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.cassandraunit.CassandraCQLUnit;
+import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class CassandraSparkSqlInterpreterTest {
+
+ private SparkSqlInterpreter sql;
+ private SparkInterpreter repl;
+ private InterpreterContext context;
+ private InterpreterGroup intpGroup;
+
+ @Rule
+ public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("cassandra.cql","sparkkeyspace"));
+
+
+ @Before
+ public void setUp() throws Exception {
+ Properties p = new Properties();
+ p.setProperty("zeppelin.spark.useCassandraContext", "true");
+ p.setProperty("zeppelin.spark.useHiveContext", "false");
+ p.setProperty("spark.cassandra.connection.host", "127.0.0.1");
+ p.setProperty("spark.cassandra.connection.port", "9142");
+
+ if (repl == null) {
+
+ if (SparkInterpreterTest.repl == null) {
+ repl = new SparkInterpreter(p);
+ repl.open();
+ SparkInterpreterTest.repl = repl;
+ } else {
+ repl = SparkInterpreterTest.repl;
+ }
+
+ sql = new SparkSqlInterpreter(p);
+
+ intpGroup = new InterpreterGroup();
+ intpGroup.add(repl);
+ intpGroup.add(sql);
+ sql.setInterpreterGroup(intpGroup);
+ sql.open();
+ }
+ context = new InterpreterContext("note", "id", "title", "text", new HashMap(), new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void test() {
+ InterpreterResult ret = sql.interpret("select name, age from sparkkeyspace.test where age < 40", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(Type.TABLE, ret.type());
+ assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
+
+ assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select * FROM sparkkeyspace.test as t1 INNER JOIN sparkkeyspace.test as t2 on t1.name = t2.name", context).code());
+ }
+}
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index bb818fd2c56..ff4b65e9fd1 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -44,6 +44,7 @@ public class SparkSqlInterpreterTest {
@Before
public void setUp() throws Exception {
Properties p = new Properties();
+ p.setProperty("zeppelin.spark.useHiveContext", "true");
if (repl == null) {