From 8e49a055748a8b77d0ca369101d710a16c34f75d Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Wed, 27 Jan 2016 21:03:50 +0100 Subject: [PATCH 01/11] [ZEPPELIN-901] Add missing support for ALTER statements --- .../apache/zeppelin/cassandra/ParagraphParser.scala | 2 +- .../zeppelin/cassandra/ParagraphParserTest.scala | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala index e2cb64dbbe8..700439461af 100644 --- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala @@ -69,7 +69,7 @@ object ParagraphParser { val UDF_PATTERN = """(?is)\s*(CREATE(?:\s+OR REPLACE)?\s+FUNCTION(?:\s+IF\s+NOT\s+EXISTS)?.+?(?:\s+|\n|\r|\f)AS(?:\s+|\n|\r|\f)(?:'|\$\$).+?(?:'|\$\$)\s*;)""".r val GENERIC_STATEMENT_PREFIX = - """(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|UPDATE| + """(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|ALTER| |DROP|GRANT|REVOKE|TRUNCATE|LIST|USE)\s+""".r val VALID_IDENTIFIER = "[a-z][a-z0-9_]*" diff --git a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala index 520f7a61fc5..94a163d30aa 100644 --- a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala +++ b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala @@ -944,4 +944,15 @@ class ParagraphParserTest extends FlatSpec case parser.Success(List(SimpleStm(query)), _) => } } + + "Parser" should "parse ALTER KEYSPACE" in { + val query = "ALTER KEYSPACE toto WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 1};" + + val parsed = parser.parseAll(parser.queries, query) + + parsed should matchPattern { + case parser.Success(List(SimpleStm(query)), _) => + } + } } From 4914c696eab7373eb032eacedc0a1d9fbefad390 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Wed, 27 Jan 2016 23:20:42 +0100 Subject: [PATCH 02/11] [ZEPPELIN-901] Look for data in AngularObjectRegistry before creating dynamic form --- .../zeppelin/cassandra/InterpreterLogic.scala | 37 +++++++++++++++---- .../cassandra/CassandraInterpreterTest.java | 8 ++-- .../cassandra/InterpreterLogicTest.java | 23 ++++++++++++ 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala index 707c16a8bae..7e3acbc1527 100644 --- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala @@ -29,6 +29,7 @@ import com.datastax.driver.core._ import com.datastax.driver.core.exceptions.DriverException import com.datastax.driver.core.policies.{LoggingRetryPolicy, FallthroughRetryPolicy, DowngradingConsistencyRetryPolicy, Policies} import org.apache.zeppelin.cassandra.TextBlockHierarchy._ +import org.apache.zeppelin.display.AngularObjectRegistry import org.apache.zeppelin.display.Input.ParamOption import org.apache.zeppelin.interpreter.InterpreterResult.Code import org.apache.zeppelin.interpreter.{InterpreterException, InterpreterResult, InterpreterContext} @@ -41,7 +42,8 @@ import scala.collection.mutable.ArrayBuffer /** * Value object to store runtime query parameters - * @param consistency consistency level + * + * @param consistency consistency level * @param serialConsistency serial consistency level * @param timestamp timestamp * @param retryPolicy retry policy @@ -305,19 +307,38 @@ class InterpreterLogic(val session: Session) { def maybeExtractVariables(statement: String, context: InterpreterContext): String = { + def findInAngularRepository(variable: String): Option[AnyRef] = { + val registry = context.getAngularObjectRegistry + val noteId = context.getNoteId + val paragraphId = context.getParagraphId + val paragraphScoped: Option[AnyRef] = Option(registry.get(variable, noteId, paragraphId)).map[AnyRef](_.get()) + + paragraphScoped + } + def extractVariableAndDefaultValue(statement: String, exp: String):String = { exp match { - case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable,choices) => { + case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable, choices) => { val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""").replaceAll("""\|""","""\\|""") - val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList - val paramOptions= listChoices.map(choice => new ParamOption(choice, choice)) - val selected = context.getGui.select(variable, listChoices.head, paramOptions.toArray) - statement.replaceAll(escapedExp,selected.toString) + findInAngularRepository(variable) match { + case Some(value) => statement.replaceAll(escapedExp,value.toString) + case None => { + val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList + val paramOptions= listChoices.map(choice => new ParamOption(choice, choice)) + val selected = context.getGui.select(variable, listChoices.head, paramOptions.toArray) + statement.replaceAll(escapedExp,selected.toString) + } + } } case SIMPLE_VARIABLE_DEFINITION_PATTERN(variable,defaultVal) => { val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""") - val value = context.getGui.input(variable,defaultVal) - statement.replaceAll(escapedExp,value.toString) + findInAngularRepository(variable) match { + case Some(value) => statement.replaceAll(escapedExp,value.toString) + case None => { + val value = context.getGui.input(variable,defaultVal) + statement.replaceAll(escapedExp,value.toString) + } + } } case _ => throw new ParsingException(s"Invalid bound variable definition for '$exp' in '$statement'. It should be of form 'variable=defaultValue' or 'variable=value1|value2|...|valueN'") } diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java index 560c57e8239..a393a175083 100644 --- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java +++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java @@ -35,6 +35,7 @@ import info.archinnov.achilles.embedded.CassandraEmbeddedServerBuilder; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; @@ -45,7 +46,6 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; @@ -63,7 +63,7 @@ public class CassandraInterpreterTest { .withScript("prepare_data.cql") .withProtocolVersion(ProtocolVersion.V3) .buildNativeSessionOnly(); -// public static Session session = null; + private static CassandraInterpreter interpreter; @Mock(answer = Answers.RETURNS_DEEP_STUBS) @@ -73,7 +73,7 @@ public class CassandraInterpreterTest { public static void setUp() { Properties properties = new Properties(); final Cluster cluster = session.getCluster(); -// final Cluster cluster = null; + properties.setProperty(CASSANDRA_CLUSTER_NAME, cluster.getClusterName()); properties.setProperty(CASSANDRA_COMPRESSION_PROTOCOL, "NONE"); properties.setProperty(CASSANDRA_CREDENTIALS_USERNAME, "none"); @@ -354,6 +354,8 @@ public void should_exception_when_executing_unknown_bound_statement() throws Exc @Test public void should_extract_variable_from_statement() throws Exception { //Given + AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null); + when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry); when(intrContext.getGui().input("login", "hsue")).thenReturn("hsue"); when(intrContext.getGui().input("age", "27")).thenReturn("27"); diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java index 77b5cd8b445..cf6d5ef282d 100644 --- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java +++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java @@ -31,6 +31,8 @@ import com.datastax.driver.core.Session; import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.Statement; + +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.display.Input.ParamOption; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -101,6 +103,8 @@ public void should_exception_while_parsing_input() throws Exception { @Test public void should_extract_variable_and_default_value() throws Exception { //Given + AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null); + when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry); when(intrContext.getGui().input("table", "zeppelin.demo")).thenReturn("zeppelin.demo"); when(intrContext.getGui().input("id", "'John'")).thenReturn("'John'"); @@ -114,6 +118,8 @@ public void should_extract_variable_and_default_value() throws Exception { @Test public void should_extract_variable_and_choices() throws Exception { //Given + AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null); + when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry); when(intrContext.getGui().select(eq("name"), eq("'Paul'"), optionsCaptor.capture())).thenReturn("'Jack'"); //When @@ -141,6 +147,23 @@ public void should_extract_no_variable() throws Exception { assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo"); } + @Test + public void should_extract_variable_from_angular_object_registry() throws Exception { + //Given + AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null); + angularObjectRegistry.add("id", "from_angular_registry", "noteId", "paragraphId"); + when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry); + when(intrContext.getNoteId()).thenReturn("noteId"); + when(intrContext.getParagraphId()).thenReturn("paragraphId"); + + //When + final String actual = helper.maybeExtractVariables("SELECT * FROM zeppelin.demo WHERE id='{{id=John}}'", intrContext); + + //Then + assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo WHERE id='from_angular_registry'"); + verify(intrContext, never()).getGui(); + } + @Test public void should_error_if_incorrect_variable_definition() throws Exception { //Given From 449e42cf9b27b173829de310439ed18994c4eb92 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Wed, 11 May 2016 18:47:04 +0200 Subject: [PATCH 03/11] [ZEPPELIN-901] Fixing typo on FallThroughPolicy --- .../org/apache/zeppelin/cassandra/InterpreterLogic.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala index 7e3acbc1527..11863ddd472 100644 --- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala @@ -73,7 +73,7 @@ object InterpreterLogic { val fallThroughRetryPolicy = FallthroughRetryPolicy.INSTANCE val loggingDefaultRetryPolicy = new LoggingRetryPolicy(defaultRetryPolicy) val loggingDownGradingRetryPolicy = new LoggingRetryPolicy(downgradingConsistencyRetryPolicy) - val loggingFallThrougRetryPolicy = new LoggingRetryPolicy(fallThroughRetryPolicy) + val loggingFallThroughRetryPolicy = new LoggingRetryPolicy(fallThroughRetryPolicy) val preparedStatements : mutable.Map[String,PreparedStatement] = new ConcurrentHashMap[String,PreparedStatement]().asScala @@ -357,7 +357,7 @@ class InterpreterLogic(val session: Session) { case FallThroughRetryPolicy => statement.setRetryPolicy(fallThroughRetryPolicy) case LoggingDefaultRetryPolicy => statement.setRetryPolicy(loggingDefaultRetryPolicy) case LoggingDowngradingRetryPolicy => statement.setRetryPolicy(loggingDownGradingRetryPolicy) - case LoggingFallThroughRetryPolicy => statement.setRetryPolicy(loggingFallThrougRetryPolicy) + case LoggingFallThroughRetryPolicy => statement.setRetryPolicy(loggingFallThroughRetryPolicy) case _ => throw new InterpreterException(s"""Unknown retry policy ${options.retryPolicy.getOrElse("???")}""") } options.fetchSize.foreach(statement.setFetchSize(_)) From 6a057492ad37bb5e91afdea243141c9513a9cb9c Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Wed, 11 May 2016 18:47:32 +0200 Subject: [PATCH 04/11] [ZEPPELIN-901] Allow dynamic form using default Zeppelin syntax --- .../org/apache/zeppelin/cassandra/CassandraInterpreter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java index cc4520d2101..d5b6ad9b2c3 100644 --- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java +++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java @@ -311,7 +311,7 @@ public void cancel(InterpreterContext context) { @Override public FormType getFormType() { - return FormType.NATIVE; + return FormType.SIMPLE; } @Override From c85d928e5667f116170f688f515312af3d3999a9 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Wed, 11 May 2016 16:15:33 +0200 Subject: [PATCH 05/11] [ZEPPELIN-901] Allow interpreter to add dynamic forms programmatically when using FormType.SIMPLE --- .../interpreter/remote/RemoteInterpreter.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 18291621249..39ddf528ad2 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -23,6 +23,7 @@ import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.display.Input; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Type; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; @@ -262,7 +263,8 @@ public InterpreterResult interpret(String st, InterpreterContext context) { boolean broken = false; try { - GUI settings = context.getGui(); + + final GUI currentGUI = context.getGui(); RemoteInterpreterResult remoteResult = client.interpret( noteId, className, st, convert(context)); @@ -272,11 +274,20 @@ public InterpreterResult interpret(String st, InterpreterContext context) { context.getConfig().clear(); context.getConfig().putAll(remoteConfig); + if (form == FormType.NATIVE) { GUI remoteGui = gson.fromJson(remoteResult.getGui(), GUI.class); - context.getGui().clear(); - context.getGui().setParams(remoteGui.getParams()); - context.getGui().setForms(remoteGui.getForms()); + currentGUI.clear(); + currentGUI.setParams(remoteGui.getParams()); + currentGUI.setForms(remoteGui.getForms()); + } else if (form == FormType.SIMPLE) { + final Map currentForms = currentGUI.getForms(); + final Map currentParams = currentGUI.getParams(); + final GUI remoteGUI = gson.fromJson(remoteResult.getGui(), GUI.class); + final Map remoteForms = remoteGUI.getForms(); + final Map remoteParams = remoteGUI.getParams(); + currentForms.putAll(remoteForms); + currentParams.putAll(remoteParams); } InterpreterResult result = convert(remoteResult); From e27741f448c9878ab90744cb7ce6d95b0a808b99 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Wed, 11 May 2016 17:08:02 +0200 Subject: [PATCH 06/11] [ZEPPELIN-901] Upgrade Java driver version to 3.0.1 --- cassandra/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cassandra/pom.xml b/cassandra/pom.xml index d5731933b77..46d2530d462 100644 --- a/cassandra/pom.xml +++ b/cassandra/pom.xml @@ -30,12 +30,12 @@ zeppelin-cassandra jar 0.6.0-SNAPSHOT - Zeppelin: Cassandra + Zeppelin: Apache Cassandra interpreter Zeppelin cassandra support http://zeppelin.apache.org - 3.0.0-rc1 + 3.0.1 1.0.5.4 1.3.0 2.10.4 From 9d45bba8f54ba55abc10678a836f0e9a7c2964fd Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Wed, 11 May 2016 19:06:51 +0200 Subject: [PATCH 07/11] [ZEPPELIN-901] Implement new @requestTimeOut runtime option --- .../zeppelin/cassandra/InterpreterLogic.scala | 13 +++++++++++-- .../zeppelin/cassandra/ParagraphParser.scala | 12 +++++++++++- .../zeppelin/cassandra/TextBlockHierarchy.scala | 3 +++ .../cassandra/CassandraInterpreterTest.java | 15 ++++++++++++++- .../zeppelin/cassandra/InterpreterLogicTest.java | 14 ++++++++++++++ .../zeppelin/cassandra/ParagraphParserTest.scala | 6 ++++++ 6 files changed, 59 insertions(+), 4 deletions(-) diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala index 11863ddd472..363da7b1d22 100644 --- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala @@ -48,12 +48,14 @@ import scala.collection.mutable.ArrayBuffer * @param timestamp timestamp * @param retryPolicy retry policy * @param fetchSize query fetch size + * @param requestTimeOut request time out in millisecs */ case class CassandraQueryOptions(consistency: Option[ConsistencyLevel], serialConsistency:Option[ConsistencyLevel], timestamp: Option[Long], retryPolicy: Option[RetryPolicy], - fetchSize: Option[Int]) + fetchSize: Option[Int], + requestTimeOut: Option[Int]) /** * Singleton object to store constants @@ -275,7 +277,13 @@ class InterpreterLogic(val session: Session) { .flatMap(x => Option(x.value)) .headOption - CassandraQueryOptions(consistency,serialConsistency, timestamp, retryPolicy, fetchSize) + val requestTimeOut: Option[Int] = parameters + .filter(_.paramType == RequestTimeOutParam) + .map(_.getParam[RequestTimeOut]) + .flatMap(x => Option(x.value)) + .headOption + + CassandraQueryOptions(consistency,serialConsistency, timestamp, retryPolicy, fetchSize, requestTimeOut) } def generateSimpleStatement(st: SimpleStm, options: CassandraQueryOptions,context: InterpreterContext): SimpleStatement = { @@ -361,6 +369,7 @@ class InterpreterLogic(val session: Session) { case _ => throw new InterpreterException(s"""Unknown retry policy ${options.retryPolicy.getOrElse("???")}""") } options.fetchSize.foreach(statement.setFetchSize(_)) + options.requestTimeOut.foreach(statement.setReadTimeoutMillis(_)) } private def createBoundStatement(codecRegistry: CodecRegistry, name: String, ps: PreparedStatement, rawBoundValues: String): BoundStatement = { diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala index 700439461af..29c013f1171 100644 --- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala @@ -44,6 +44,7 @@ object ParagraphParser { LOGGING_DEFAULT_RETRY, LOGGING_DOWNGRADING_RETRY, LOGGING_FALLTHROUGH_RETRY) .mkString("""^\s*@retryPolicy\s*=\s*(""", "|" , """)\s*$""").r val FETCHSIZE_PATTERN = """^\s*@fetchSize\s*=\s*([0-9]+)\s*$""".r + val REQUEST_TIMEOUT_PATTERN = """^\s*@requestTimeOut\s*=\s*([0-9]+)\s*$""".r val SIMPLE_STATEMENT_PATTERN = """([^;]+;)""".r val PREPARE_STATEMENT_PATTERN = """^\s*@prepare\[([^]]+)\]\s*=\s*([^;]+)$""".r @@ -146,6 +147,7 @@ class ParagraphParser extends RegexParsers{ def timestamp: Parser[Timestamp] = """\s*@timestamp.+""".r ^^ {case x => extractTimestamp(x.trim)} def retryPolicy: Parser[RetryPolicy] = """\s*@retryPolicy.+""".r ^^ {case x => extractRetryPolicy(x.trim)} def fetchSize: Parser[FetchSize] = """\s*@fetchSize.+""".r ^^ {case x => extractFetchSize(x.trim)} + def requestTimeOut: Parser[RequestTimeOut] = """\s*@requestTimeOut.+""".r ^^ {case x => extractRequestTimeOut(x.trim)} //Statements def createFunctionStatement: Parser[SimpleStm] = UDF_PATTERN ^^{case x => extractUdfStatement(x.trim)} @@ -188,7 +190,7 @@ class ParagraphParser extends RegexParsers{ case begin ~ cqls ~ end => BatchStm(extractBatchType(begin),cqls)} def queries:Parser[List[AnyBlock]] = rep(singleLineComment | multiLineComment | consistency | serialConsistency | - timestamp | retryPolicy | fetchSize | removePrepare | prepare | bind | batch | describeCluster | + timestamp | retryPolicy | fetchSize | requestTimeOut | removePrepare | prepare | bind | batch | describeCluster | describeKeyspace | describeKeyspaces | describeTable | describeTables | describeType | describeTypes | @@ -244,6 +246,14 @@ class ParagraphParser extends RegexParsers{ } } + def extractRequestTimeOut(text: String): RequestTimeOut = { + text match { + case REQUEST_TIMEOUT_PATTERN(requestTimeOut) => RequestTimeOut(requestTimeOut.trim.toInt) + case _ => throw new InterpreterException(s"Invalid syntax for @requestTimeOut. " + + s"It should comply to the pattern ${REQUEST_TIMEOUT_PATTERN.toString}") + } + } + def extractSimpleStatement(text: String): SimpleStm = { text match { case SIMPLE_STATEMENT_PATTERN(statement) => SimpleStm(statement) diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala index 61a2d8d9cb5..be55564b316 100644 --- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala @@ -44,6 +44,7 @@ object TextBlockHierarchy { object TimestampParam extends ParameterType object RetryPolicyParam extends ParameterType object FetchSizeParam extends ParameterType + object RequestTimeOutParam extends ParameterType abstract class QueryParameters(val paramType: ParameterType) extends AnyBlock(ParameterBlock) { @@ -60,6 +61,8 @@ object TextBlockHierarchy { case class FetchSize(value: Int) extends QueryParameters(FetchSizeParam) + case class RequestTimeOut(value: Int) extends QueryParameters(RequestTimeOutParam) + abstract class RetryPolicy extends QueryParameters(RetryPolicyParam) object DefaultRetryPolicy extends RetryPolicy diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java index a393a175083..a8f9edce0d3 100644 --- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java +++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java @@ -289,6 +289,19 @@ public void should_execute_statement_with_retry_policy() throws Exception { assertThat(actual.code()).isEqualTo(Code.SUCCESS); } + @Test + public void should_execute_statement_with_request_timeout() throws Exception { + //Given + String statement = "@requestTimeOut=10000000\n" + + "SELECT * FROM zeppelin.artists;"; + + //When + final InterpreterResult actual = interpreter.interpret(statement, intrContext); + + //Then + assertThat(actual.code()).isEqualTo(Code.SUCCESS); + } + @Test public void should_execute_prepared_and_bound_statements() throws Exception { //Given @@ -301,7 +314,7 @@ public void should_execute_prepared_and_bound_statements() throws Exception { final InterpreterResult actual = interpreter.interpret(queries, intrContext); //Then - assertThat(actual.code()).isEqualTo(Code.SUCCESS); + assertThat(actual.code()).isEqualTo(Code.ERROR); assertThat(actual.message()).isEqualTo("key\tval\n" + "myKey\tmyValue\n"); } diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java index cf6d5ef282d..698397ae946 100644 --- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java +++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java @@ -226,6 +226,18 @@ public void should_extract_retry_policy_option() throws Exception { assertThat(actual.retryPolicy().get()).isSameAs(DowngradingRetryPolicy$.MODULE$); } + @Test + public void should_extract_request_timeout_option() throws Exception { + //Given + List options = Arrays.asList(new RequestTimeOut(100)); + + //When + final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options)); + + //Then + assertThat(actual.requestTimeOut().get()).isEqualTo(100); + } + @Test public void should_generate_simple_statement() throws Exception { //Given @@ -234,6 +246,7 @@ public void should_generate_simple_statement() throws Exception { Option.empty(), Option.empty(), Option.empty(), + Option.empty(), Option.empty()); //When @@ -255,6 +268,7 @@ public void should_generate_batch_statement() throws Exception { Option.empty(), Option.empty(), Option.empty(), + Option.empty(), Option.empty()); //When diff --git a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala index 94a163d30aa..4c5c92934e4 100644 --- a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala +++ b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala @@ -182,6 +182,12 @@ class ParagraphParserTest extends FlatSpec parsed should matchPattern { case parser.Success(FetchSize(100), _) =>} } + "Parser" should "parse request timeout" in { + val query:String ="@requestTimeOut=100" + val parsed = parser.parseAll(parser.requestTimeOut, query) + parsed should matchPattern { case parser.Success(RequestTimeOut(100), _) =>} + } + "Parser" should "fails parsing invalid fetch size" in { val query:String =""" @fetchSize=TEST""".stripMargin val ex = intercept[InterpreterException] { From b434cc131b308145b077017b1d5d5f152fa8a6f2 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Mon, 30 May 2016 18:07:50 +0200 Subject: [PATCH 08/11] [ZEPPELIN-901] Add support for binary protocol V4 --- .../zeppelin/cassandra/CassandraInterpreter.java | 4 ++-- .../apache/zeppelin/cassandra/JavaDriverConfig.scala | 10 ++++++++++ .../zeppelin/cassandra/CassandraInterpreterTest.java | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java index d5b6ad9b2c3..ca77aba2991 100644 --- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java +++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java @@ -110,7 +110,7 @@ public class CassandraInterpreter extends Interpreter { public static final String DEFAULT_PORT = "9042"; public static final String DEFAULT_CLUSTER = "Test Cluster"; public static final String DEFAULT_KEYSPACE = "system"; - public static final String DEFAULT_PROTOCOL_VERSION = "3"; + public static final String DEFAULT_PROTOCOL_VERSION = "4"; public static final String DEFAULT_COMPRESSION = "NONE"; public static final String DEFAULT_CREDENTIAL = "none"; public static final String DEFAULT_POLICY = "DEFAULT"; @@ -159,7 +159,7 @@ public CassandraInterpreter(Properties properties) { "IP address). Default = localhost. Ex: '192.168.0.12,node2,node3'") .add(CASSANDRA_PORT, DEFAULT_PORT, "Cassandra native port. Default = 9042") .add(CASSANDRA_PROTOCOL_VERSION, DEFAULT_PROTOCOL_VERSION, - "Cassandra protocol version. Default = 3") + "Cassandra protocol version. Default = 4") .add(CASSANDRA_CLUSTER_NAME, DEFAULT_CLUSTER, "Cassandra cluster name. " + "Default = 'Test Cluster'") .add(CASSANDRA_KEYSPACE_NAME, DEFAULT_KEYSPACE, "Cassandra keyspace name. " + diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala index d64ad90c1d3..5b2dbefdc19 100644 --- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala @@ -207,6 +207,16 @@ class JavaDriverConfig { DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024" DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256" return ProtocolVersion.V3 + case "4" => + DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1" + DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1" + DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1" + DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1" + DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800" + DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200" + DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024" + DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256" + return ProtocolVersion.V4 case _ => DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1" DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1" diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java index a8f9edce0d3..db3c3914683 100644 --- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java +++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java @@ -314,7 +314,7 @@ public void should_execute_prepared_and_bound_statements() throws Exception { final InterpreterResult actual = interpreter.interpret(queries, intrContext); //Then - assertThat(actual.code()).isEqualTo(Code.ERROR); + assertThat(actual.code()).isEqualTo(Code.SUCCESS); assertThat(actual.message()).isEqualTo("key\tval\n" + "myKey\tmyValue\n"); } From e12400ecc2e13c9ee9b87a0e9ca2f558bea94ada Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Mon, 30 May 2016 21:30:46 +0200 Subject: [PATCH 09/11] [ZEPPELIN-901] Update interactive documentation --- .../src/main/resources/scalate/helpMenu.ssp | 126 +++++++++++++----- .../src/test/resources/scalate/Help.html | 2 +- 2 files changed, 90 insertions(+), 38 deletions(-) diff --git a/cassandra/src/main/resources/scalate/helpMenu.ssp b/cassandra/src/main/resources/scalate/helpMenu.ssp index 26ff1171abd..eeb58668acf 100644 --- a/cassandra/src/main/resources/scalate/helpMenu.ssp +++ b/cassandra/src/main/resources/scalate/helpMenu.ssp @@ -103,12 +103,12 @@