From a33c3b223f34f33154d21f2e3fbcd9319687f5c0 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Wed, 18 Sep 2019 14:22:29 -0700 Subject: [PATCH 1/2] pickup SparkUI address from zeppelin.spark.uiWebUrl --- .../org/apache/zeppelin/spark/SparkInterpreter.java | 11 ----------- .../apache/zeppelin/spark/SparkInterpreterTest.java | 7 ++++--- .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 6 +++++- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 33769be8255..7bacce8e26d 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -60,8 +60,6 @@ public class SparkInterpreter extends AbstractInterpreter { private SparkVersion sparkVersion; private boolean enableSupportedVersionCheck; - private String sparkUrl; - public SparkInterpreter(Properties properties) { super(properties); @@ -109,11 +107,6 @@ public void open() throws InterpreterException { } sqlContext = this.innerInterpreter.getSqlContext(); sparkSession = this.innerInterpreter.getSparkSession(); - sparkUrl = this.innerInterpreter.getSparkUrl(); - String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", ""); - if (!StringUtils.isBlank(sparkUrlProp)) { - sparkUrl = sparkUrlProp; - } SESSION_NUM.incrementAndGet(); } catch (Exception e) { @@ -260,10 +253,6 @@ private List getDependencyFiles() throws InterpreterException { return depFiles; } - public String getSparkUIUrl() { - return sparkUrl; - } - public boolean isUnsupportedSparkVersion() { return enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion(); } diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index bceda3a66ba..9a3d471bea6 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -48,6 +48,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class SparkInterpreterTest { @@ -89,8 +90,6 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int interpreter.setInterpreterGroup(mock(InterpreterGroup.class)); interpreter.open(); - assertEquals("fake_spark_weburl", interpreter.getSparkUIUrl()); - InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals("a: String = hello world\n", output); @@ -181,7 +180,9 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int assertEquals("pid_2", captorEvent.getValue().get("paraId")); // spark job url is sent - verify(mockRemoteEventClient).onParaInfosReceived(any(Map.class)); + ArgumentCaptor onParaInfosReceivedArg = ArgumentCaptor.forClass(Map.class); + verify(mockRemoteEventClient).onParaInfosReceived(onParaInfosReceivedArg.capture()); + assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl")); // case class result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext()); diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 421d85aa301..4995276dbb7 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -303,7 +303,11 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, protected def createZeppelinContext(): Unit = { val sparkShims = SparkShims.getInstance(sc.version, properties) - sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) + var webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); + if (webUiUrl == null || webUiUrl.trim().length() == 0) { + webUiUrl = sparkUrl; + } + sparkShims.setupSparkListener(sc.master, webUiUrl, InterpreterContext.get) z = new SparkZeppelinContext(sc, sparkShims, interpreterGroup.getInterpreterHookRegistry, From 7e34542f07811572bc9230b10f9f11fcd146e9a2 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Thu, 19 Sep 2019 12:10:17 -0700 Subject: [PATCH 2/2] use StringUtils.isBlank --- .../org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 4995276dbb7..ced1c1fc987 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -23,6 +23,7 @@ import java.net.URLClassLoader import java.nio.file.Paths import java.util.concurrent.atomic.AtomicInteger +import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.zeppelin.interpreter.util.InterpreterOutputStream @@ -304,7 +305,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, protected def createZeppelinContext(): Unit = { val sparkShims = SparkShims.getInstance(sc.version, properties) var webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); - if (webUiUrl == null || webUiUrl.trim().length() == 0) { + if (StringUtils.isBlank(webUiUrl)) { webUiUrl = sparkUrl; } sparkShims.setupSparkListener(sc.master, webUiUrl, InterpreterContext.get)