From ba3477a6afb2fd29df5e88124599a37453ab480c Mon Sep 17 00:00:00 2001 From: Tinkoff DWH Date: Mon, 27 Feb 2017 14:41:58 +0500 Subject: [PATCH 1/5] [ZEPPELIN-1988] add property "precode" to JDBCInterpreter --- docs/interpreter/jdbc.md | 4 ++ .../apache/zeppelin/jdbc/JDBCInterpreter.java | 52 ++++++++++++++----- .../main/resources/interpreter-setting.json | 6 +++ .../zeppelin/jdbc/JDBCInterpreterTest.java | 44 ++++++++++++++++ .../zeppelin/interpreter/Constants.java | 2 + 5 files changed, 96 insertions(+), 12 deletions(-) diff --git a/docs/interpreter/jdbc.md b/docs/interpreter/jdbc.md index 32adcba94be..09f1b7206e0 100644 --- a/docs/interpreter/jdbc.md +++ b/docs/interpreter/jdbc.md @@ -167,6 +167,10 @@ There are more JDBC interpreter properties you can specify like below. default.jceks.credentialKey jceks credential key + + zeppelin.interpreter.precode + Some SQL which executes while opening connection + You can also add more properties by using this [method](http://docs.oracle.com/javase/7/docs/api/java/sql/DriverManager.html#getConnection%28java.lang.String,%20java.util.Properties%29). diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index c43e3920ea5..a8b24d2bcb6 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -14,14 +14,7 @@ */ package org.apache.zeppelin.jdbc; -import static org.apache.commons.lang.StringUtils.containsIgnoreCase; -import static org.apache.commons.lang.StringUtils.isEmpty; -import static org.apache.commons.lang.StringUtils.isNotEmpty; -import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; import java.sql.Connection; import java.sql.DriverManager; @@ -37,11 +30,11 @@ import java.util.Properties; import java.util.Set; -import com.google.common.base.Throwables; import org.apache.commons.dbcp2.ConnectionFactory; import org.apache.commons.dbcp2.DriverManagerConnectionFactory; import org.apache.commons.dbcp2.PoolableConnectionFactory; import org.apache.commons.dbcp2.PoolingDriver; +import org.apache.commons.lang.StringUtils; import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.hadoop.conf.Configuration; @@ -49,7 +42,10 @@ import org.apache.hadoop.security.alias.CredentialProvider; import org.apache.hadoop.security.alias.CredentialProviderFactory; import org.apache.thrift.transport.TTransportException; -import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl; @@ -61,9 +57,14 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Function; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.collect.Sets.SetView; + +import static org.apache.commons.lang.StringUtils.containsIgnoreCase; +import static org.apache.commons.lang.StringUtils.isEmpty; +import static org.apache.commons.lang.StringUtils.isNotEmpty; +import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; +import static org.apache.zeppelin.interpreter.Constants.ZEPPELIN_PRECODE_PROPERTY_KEY; /** * JDBC interpreter for Zeppelin. This interpreter can also be used for accessing HAWQ, @@ -340,6 +341,7 @@ private Connection getConnectionFromPool(String url, String user, String propert if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) { createConnectionPool(url, user, propertyKey, properties); + executePreCode(DriverManager.getConnection(jdbcDriver)); } return DriverManager.getConnection(jdbcDriver); } @@ -540,6 +542,33 @@ protected ArrayList splitSqlQueries(String sql) { return queries; } + private void executePreCode(Connection connection) { + String precode = getProperty(ZEPPELIN_PRECODE_PROPERTY_KEY); + if (StringUtils.isNotEmpty(precode)) { + logger.info("Run SQL precode '{}'", precode); + try { + Statement statement = connection.createStatement(); + statement.execute(precode); + if (!connection.getAutoCommit()) { + connection.commit(); + } + if (statement != null) { + statement.close(); + } + } catch (SQLException e) { + logger.error("Cannot create precode statement", e); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + logger.error("Cannot close connection of precode", e); + } + } + } + } + } + private InterpreterResult executeSql(String propertyKey, String sql, InterpreterContext interpreterContext) { Connection connection; @@ -761,4 +790,3 @@ int getMaxConcurrentConnection() { } } } - diff --git a/jdbc/src/main/resources/interpreter-setting.json b/jdbc/src/main/resources/interpreter-setting.json index 20a900f9d79..9fb18269cbb 100644 --- a/jdbc/src/main/resources/interpreter-setting.json +++ b/jdbc/src/main/resources/interpreter-setting.json @@ -63,6 +63,12 @@ "propertyName": "zeppelin.jdbc.principal", "defaultValue": "", "description": "Kerberos principal" + }, + "zeppelin.interpreter.precode": { + "envName": null, + "propertyName": "zeppelin.interpreter.precode", + "defaultValue": "", + "description": "SQL which executes while opening connection" } }, "editor": { diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java index 9a041f923fd..214c304b2c7 100644 --- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java +++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Properties; +import org.apache.zeppelin.interpreter.Constants; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -43,6 +44,9 @@ import org.junit.Test; import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter; + +import static org.apache.zeppelin.interpreter.Constants.ZEPPELIN_PRECODE_PROPERTY_KEY; + /** * JDBC interpreter unit tests */ @@ -386,4 +390,44 @@ public void testMultiTenant() throws SQLException, IOException { assertNull(user2JDBC2Conf.getPropertyMap("default").get("password")); jdbc2.close(); } + + @Test + public void testPrecode() throws SQLException, IOException { + Properties properties = new Properties(); + properties.setProperty("default.driver", "org.h2.Driver"); + properties.setProperty("default.url", getJdbcConnection()); + properties.setProperty("default.user", ""); + properties.setProperty("default.password", ""); + properties.setProperty(ZEPPELIN_PRECODE_PROPERTY_KEY, "SET @testVariable=1"); + JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties); + jdbcInterpreter.open(); + + String sqlQuery = "select @testVariable"; + + InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext); + + assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType()); + assertEquals("@TESTVARIABLE\n1\n", interpreterResult.message().get(0).getData()); + } + + @Test + public void testIncorrectPrecode() throws SQLException, IOException { + Properties properties = new Properties(); + properties.setProperty("default.driver", "org.h2.Driver"); + properties.setProperty("default.url", getJdbcConnection()); + properties.setProperty("default.user", ""); + properties.setProperty("default.password", ""); + properties.setProperty(ZEPPELIN_PRECODE_PROPERTY_KEY, "incorrect command"); + JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties); + jdbcInterpreter.open(); + + String sqlQuery = "select 1"; + + InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext); + + assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType()); + assertEquals("1\n1\n", interpreterResult.message().get(0).getData()); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java index 9115a98385f..16cececee51 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java @@ -26,6 +26,8 @@ public class Constants { public static final String ZEPPELIN_INTERPRETER_HOST = "zeppelin.interpreter.host"; + public static final String ZEPPELIN_PRECODE_PROPERTY_KEY = "zeppelin.interpreter.precode"; + public static final String EXISTING_PROCESS = "existing_process"; public static final int ZEPPELIN_INTERPRETER_DEFAUlT_PORT = 29914; From 9d37bc46e3308c54011f16c903a9451582c16c13 Mon Sep 17 00:00:00 2001 From: Tinkoff DWH Date: Tue, 28 Feb 2017 14:37:31 +0500 Subject: [PATCH 2/5] [ZEPPELIN-1988] fix --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index a8b24d2bcb6..4cebbc2e01e 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -341,7 +341,9 @@ private Connection getConnectionFromPool(String url, String user, String propert if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) { createConnectionPool(url, user, propertyKey, properties); - executePreCode(DriverManager.getConnection(jdbcDriver)); + try (Connection connection = DriverManager.getConnection(jdbcDriver)) { + executePreCode(connection); + } } return DriverManager.getConnection(jdbcDriver); } @@ -546,25 +548,13 @@ private void executePreCode(Connection connection) { String precode = getProperty(ZEPPELIN_PRECODE_PROPERTY_KEY); if (StringUtils.isNotEmpty(precode)) { logger.info("Run SQL precode '{}'", precode); - try { - Statement statement = connection.createStatement(); + try (Statement statement = connection.createStatement()) { statement.execute(precode); if (!connection.getAutoCommit()) { connection.commit(); } - if (statement != null) { - statement.close(); - } } catch (SQLException e) { logger.error("Cannot create precode statement", e); - } finally { - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - logger.error("Cannot close connection of precode", e); - } - } } } } From 66d6ae4040e58ef08da57785757b8d681d53ad86 Mon Sep 17 00:00:00 2001 From: Tinkoff DWH Date: Tue, 28 Feb 2017 15:42:46 +0500 Subject: [PATCH 3/5] [ZEPPELIN-1988] fixes of review items --- docs/interpreter/jdbc.md | 2 +- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 10 ++++------ jdbc/src/main/resources/interpreter-setting.json | 4 ++-- .../apache/zeppelin/jdbc/JDBCInterpreterTest.java | 12 +++++------- .../org/apache/zeppelin/interpreter/Constants.java | 2 -- 5 files changed, 12 insertions(+), 18 deletions(-) diff --git a/docs/interpreter/jdbc.md b/docs/interpreter/jdbc.md index 09f1b7206e0..346fcbb2f3e 100644 --- a/docs/interpreter/jdbc.md +++ b/docs/interpreter/jdbc.md @@ -168,7 +168,7 @@ There are more JDBC interpreter properties you can specify like below. jceks credential key - zeppelin.interpreter.precode + zeppelin.jdbc.precode Some SQL which executes while opening connection diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 4cebbc2e01e..5e42381ecd5 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -64,7 +64,6 @@ import static org.apache.commons.lang.StringUtils.isEmpty; import static org.apache.commons.lang.StringUtils.isNotEmpty; import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; -import static org.apache.zeppelin.interpreter.Constants.ZEPPELIN_PRECODE_PROPERTY_KEY; /** * JDBC interpreter for Zeppelin. This interpreter can also be used for accessing HAWQ, @@ -104,6 +103,7 @@ public class JDBCInterpreter extends Interpreter { static final String PASSWORD_KEY = "password"; static final String JDBC_JCEKS_FILE = "jceks.file"; static final String JDBC_JCEKS_CREDENTIAL_KEY = "jceks.credentialKey"; + static final String ZEPPELIN_JDBC_PRECODE_KEY = "zeppelin.jdbc.precode"; static final String DOT = "."; private static final char WHITESPACE = ' '; @@ -342,7 +342,7 @@ private Connection getConnectionFromPool(String url, String user, String propert if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) { createConnectionPool(url, user, propertyKey, properties); try (Connection connection = DriverManager.getConnection(jdbcDriver)) { - executePreCode(connection); + executePrecode(connection); } } return DriverManager.getConnection(jdbcDriver); @@ -544,8 +544,8 @@ protected ArrayList splitSqlQueries(String sql) { return queries; } - private void executePreCode(Connection connection) { - String precode = getProperty(ZEPPELIN_PRECODE_PROPERTY_KEY); + private void executePrecode(Connection connection) throws SQLException { + String precode = getProperty(ZEPPELIN_JDBC_PRECODE_KEY); if (StringUtils.isNotEmpty(precode)) { logger.info("Run SQL precode '{}'", precode); try (Statement statement = connection.createStatement()) { @@ -553,8 +553,6 @@ private void executePreCode(Connection connection) { if (!connection.getAutoCommit()) { connection.commit(); } - } catch (SQLException e) { - logger.error("Cannot create precode statement", e); } } } diff --git a/jdbc/src/main/resources/interpreter-setting.json b/jdbc/src/main/resources/interpreter-setting.json index 9fb18269cbb..6134243502e 100644 --- a/jdbc/src/main/resources/interpreter-setting.json +++ b/jdbc/src/main/resources/interpreter-setting.json @@ -64,9 +64,9 @@ "defaultValue": "", "description": "Kerberos principal" }, - "zeppelin.interpreter.precode": { + "zeppelin.jdbc.precode": { "envName": null, - "propertyName": "zeppelin.interpreter.precode", + "propertyName": "zeppelin.jdbc.precode", "defaultValue": "", "description": "SQL which executes while opening connection" } diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java index 214c304b2c7..197c368154c 100644 --- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java +++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Properties; -import org.apache.zeppelin.interpreter.Constants; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -45,7 +44,7 @@ import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter; -import static org.apache.zeppelin.interpreter.Constants.ZEPPELIN_PRECODE_PROPERTY_KEY; +import static org.apache.zeppelin.jdbc.JDBCInterpreter.ZEPPELIN_JDBC_PRECODE_KEY; /** * JDBC interpreter unit tests @@ -398,7 +397,7 @@ public void testPrecode() throws SQLException, IOException { properties.setProperty("default.url", getJdbcConnection()); properties.setProperty("default.user", ""); properties.setProperty("default.password", ""); - properties.setProperty(ZEPPELIN_PRECODE_PROPERTY_KEY, "SET @testVariable=1"); + properties.setProperty(ZEPPELIN_JDBC_PRECODE_KEY, "SET @testVariable=1"); JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties); jdbcInterpreter.open(); @@ -418,7 +417,7 @@ public void testIncorrectPrecode() throws SQLException, IOException { properties.setProperty("default.url", getJdbcConnection()); properties.setProperty("default.user", ""); properties.setProperty("default.password", ""); - properties.setProperty(ZEPPELIN_PRECODE_PROPERTY_KEY, "incorrect command"); + properties.setProperty(ZEPPELIN_JDBC_PRECODE_KEY, "incorrect command"); JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties); jdbcInterpreter.open(); @@ -426,8 +425,7 @@ public void testIncorrectPrecode() throws SQLException, IOException { InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext); - assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); - assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType()); - assertEquals("1\n1\n", interpreterResult.message().get(0).getData()); + assertEquals(InterpreterResult.Code.ERROR, interpreterResult.code()); + assertEquals(InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType()); } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java index 16cececee51..9115a98385f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java @@ -26,8 +26,6 @@ public class Constants { public static final String ZEPPELIN_INTERPRETER_HOST = "zeppelin.interpreter.host"; - public static final String ZEPPELIN_PRECODE_PROPERTY_KEY = "zeppelin.interpreter.precode"; - public static final String EXISTING_PROCESS = "existing_process"; public static final int ZEPPELIN_INTERPRETER_DEFAUlT_PORT = 29914; From 42ffcb7f7545be17af104b291f628370a728527b Mon Sep 17 00:00:00 2001 From: Tinkoff DWH Date: Wed, 1 Mar 2017 13:50:45 +0500 Subject: [PATCH 4/5] [ZEPPELIN-1988] fix condition --- .../src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 5e42381ecd5..c3b901eb0c3 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -546,7 +546,7 @@ protected ArrayList splitSqlQueries(String sql) { private void executePrecode(Connection connection) throws SQLException { String precode = getProperty(ZEPPELIN_JDBC_PRECODE_KEY); - if (StringUtils.isNotEmpty(precode)) { + if (StringUtils.isNotBlank(precode)) { logger.info("Run SQL precode '{}'", precode); try (Statement statement = connection.createStatement()) { statement.execute(precode); From cd46cce05e29736aada319dc2237632691e2b853 Mon Sep 17 00:00:00 2001 From: Tinkoff DWH Date: Sun, 5 Mar 2017 13:00:21 +0500 Subject: [PATCH 5/5] [ZEPPELIN-1988] trim precode --- jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index c3b901eb0c3..d4952246c91 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -547,6 +547,7 @@ protected ArrayList splitSqlQueries(String sql) { private void executePrecode(Connection connection) throws SQLException { String precode = getProperty(ZEPPELIN_JDBC_PRECODE_KEY); if (StringUtils.isNotBlank(precode)) { + precode = StringUtils.trim(precode); logger.info("Run SQL precode '{}'", precode); try (Statement statement = connection.createStatement()) { statement.execute(precode);