From a1304a22889e2e277913af56e89bace1ef38f25b Mon Sep 17 00:00:00 2001 From: karuppayya Date: Tue, 8 Nov 2016 23:20:09 +0530 Subject: [PATCH 1/7] Change to make spark web UI accesible from interpreters page --- .../zeppelin/spark/SparkInterpreter.java | 19 +++++++++++++++++-- .../interpreter/InterpreterGroup.java | 12 ++++++++++++ .../remote/RemoteInterpreterEventClient.java | 5 +++++ .../remote/RemoteInterpreterEventPoller.java | 9 +++++++++ .../RemoteInterpreterProcessListener.java | 3 +++ .../remote/RemoteInterpreterServer.java | 2 +- .../thrift/RemoteInterpreterEventType.java | 8 ++++---- .../RemoteInterpreterOutputTestStream.java | 6 ++++++ .../scheduler/RemoteSchedulerTest.java | 5 +++++ .../zeppelin/rest/InterpreterRestApi.java | 19 +++++++++++++++++++ .../zeppelin/socket/NotebookServer.java | 7 +++++++ .../app/interpreter/interpreter.controller.js | 16 ++++++++++++++++ .../src/app/interpreter/interpreter.html | 4 ++++ .../interpreter/InterpreterFactory.java | 2 ++ .../interpreter/InterpreterSetting.java | 10 ++++++++++ 15 files changed, 120 insertions(+), 7 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 53bf30b9533..19f4b792da1 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -27,8 +27,6 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.*; -import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Joiner; @@ -45,6 +43,7 @@ import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Pool; import org.apache.spark.sql.SQLContext; +import org.apache.spark.ui.SparkUI; import org.apache.spark.ui.jobs.JobProgressListener; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -57,6 +56,7 @@ import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.resource.WellKnownResourceName; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; @@ -803,6 +803,10 @@ public void open() { sparkSession = getSparkSession(); } sc = getSparkContext(); + RemoteInterpreterEventClient eventClient = getInterpreterGroup().getEventClient(); + if (eventClient != null) { + eventClient.onMetaInfodReceived(getSparkUIUrl()); + } if (sc.getPoolForName("fair").isEmpty()) { Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR(); int minimumShare = 0; @@ -938,6 +942,17 @@ public void open() { numReferenceOfSparkContext.incrementAndGet(); } + private Map getSparkUIUrl() { + Option sparkUiOption = (Option) Utils.invokeMethod(sc, "ui"); + SparkUI sparkUi = sparkUiOption.get(); + String sparkWebUrl = sparkUi.appUIAddress(); + Map infos = new java.util.HashMap<>(); + if (sparkWebUrl != null) { + infos.put("url", sparkWebUrl); + } + return infos; + } + private Results.Result interpret(String line) { return (Results.Result) Utils.invokeMethod( intp, diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java index 8a1c888a577..697a48bd11e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -22,6 +22,7 @@ import org.apache.log4j.Logger; import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.scheduler.Scheduler; @@ -50,6 +51,8 @@ public class InterpreterGroup extends ConcurrentHashMap> interpreters = new ConcurrentHashMap>(); @@ -82,6 +85,11 @@ public InterpreterGroup() { allInterpreterGroups.put(id, this); } + public InterpreterGroup(String interpreterGroupId, RemoteInterpreterEventClient eventClient) { + this(interpreterGroupId); + this.eventClient = eventClient; + } + private static String generateId() { return "InterpreterGroup_" + System.currentTimeMillis() + "_" + new Random().nextInt(); @@ -280,4 +288,8 @@ public boolean isAngularRegistryPushed() { public void setAngularRegistryPushed(boolean angularRegistryPushed) { this.angularRegistryPushed = angularRegistryPushed; } + + public RemoteInterpreterEventClient getEventClient() { + return eventClient; + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index c59a6f60ce3..e1e6b2f10d7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -279,6 +279,11 @@ public void onAppStatusUpdate(String noteId, String paragraphId, String appId, S gson.toJson(appendOutput))); } + public void onMetaInfodReceived(Map infos) { + sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.META_INFOS, + gson.toJson(infos))); + } + /** * Wait for eventQueue becomes empty */ diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index f7fee490daa..c1ed6463e8c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -195,6 +195,15 @@ public void run() { String status = appStatusUpdate.get("status"); appListener.onStatusChange(noteId, paragraphId, appId, status); + } else if (event.getType() == RemoteInterpreterEventType.META_INFOS) { + // on output update + Map metaInfos = gson.fromJson(event.getData(), + new TypeToken>() { + }.getType()); + String id = interpreterGroup.getId(); + int indexOfColon = id.indexOf(":"); + String settingId = id.substring(0, indexOfColon); + listener.onMetaInfosReceived(settingId, metaInfos); } logger.debug("Event from remoteproceess {}", event.getType()); } catch (Exception e) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java index da6ac63c51d..d25683f0cf8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -16,10 +16,13 @@ */ package org.apache.zeppelin.interpreter.remote; +import java.util.Map; + /** * Event from remoteInterpreterProcess */ public interface RemoteInterpreterProcessListener { public void onOutputAppend(String noteId, String paragraphId, String output); public void onOutputUpdated(String noteId, String paragraphId, String output); + public void onMetaInfosReceived(String settingId, Map metaInfos); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index cde6a7ba7ef..856de87859e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -153,7 +153,7 @@ public static void main(String[] args) public void createInterpreter(String interpreterGroupId, String noteId, String className, Map properties) throws TException { if (interpreterGroup == null) { - interpreterGroup = new InterpreterGroup(interpreterGroupId); + interpreterGroup = new InterpreterGroup(interpreterGroupId, eventClient); angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this); hookRegistry = new InterpreterHookRegistry(interpreterGroup.getId()); resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 955461951f3..75692c04216 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -24,9 +24,6 @@ package org.apache.zeppelin.interpreter.thrift; -import java.util.Map; -import java.util.HashMap; -import org.apache.thrift.TEnum; public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { NO_OP(1), @@ -39,7 +36,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { OUTPUT_APPEND(8), OUTPUT_UPDATE(9), ANGULAR_REGISTRY_PUSH(10), - APP_STATUS_UPDATE(11); + APP_STATUS_UPDATE(11), + META_INFOS(12); private final int value; @@ -82,6 +80,8 @@ public static RemoteInterpreterEventType findByValue(int value) { return ANGULAR_REGISTRY_PUSH; case 11: return APP_STATUS_UPDATE; + case 12: + return META_INFOS; default: return null; } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index 2ba62c30172..27e38089f66 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -29,6 +29,7 @@ import java.io.File; import java.util.HashMap; import java.util.LinkedList; +import java.util.Map; import java.util.Properties; import static org.junit.Assert.assertEquals; @@ -154,4 +155,9 @@ public void onOutputAppend(String noteId, String paragraphId, String output) { public void onOutputUpdated(String noteId, String paragraphId, String output) { } + + @Override + public void onMetaInfosReceived(String settingId, Map metaInfos) { + + } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index 49de4a70539..54cc25d8cda 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -299,4 +299,9 @@ public void onOutputAppend(String noteId, String paragraphId, String output) { public void onOutputUpdated(String noteId, String paragraphId, String output) { } + + @Override + public void onMetaInfosReceived(String settingId, Map metaInfos) { + + } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index c9094ebc7a3..9aa10aaac89 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.rest; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -215,6 +216,24 @@ public Response addRepository(String message) { return new JsonResponse(Status.CREATED).build(); } + /** + * get the property value + */ + @GET + @Path("getmetainfos/{settingId}") + public Response getMetaInfo(@PathParam("settingId") String settingId, + @PathParam("name") String propName) { + String url = null; + InterpreterSetting interpreterSetting = interpreterFactory.get(settingId); + Map infos = interpreterSetting.getInfos(); + if (infos != null) { + url = infos.get("url"); + } + Map respMap = new HashMap<>(); + respMap.put("url", url); + return new JsonResponse<>(Status.OK, respMap).build(); + } + /** * Delete repository * diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 8a84587f75c..969186401ae 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -1759,5 +1759,12 @@ private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subj .put("interpreterSettings", availableSettings))); } + @Override + public void onMetaInfosReceived(String settingId, Map metaInfos) { + InterpreterSetting interpreterSetting = notebook().getInterpreterFactory() + .get(settingId); + interpreterSetting.setInfos(metaInfos); + } + } diff --git a/zeppelin-web/src/app/interpreter/interpreter.controller.js b/zeppelin-web/src/app/interpreter/interpreter.controller.js index e4bebebd6c6..c25899efee9 100644 --- a/zeppelin-web/src/app/interpreter/interpreter.controller.js +++ b/zeppelin-web/src/app/interpreter/interpreter.controller.js @@ -691,6 +691,22 @@ getRepositories(); }; + $scope.showSparkUI = function(settingId) { + $http.get(baseUrlSrv.getRestApiBase() + '/interpreter/getmetainfos/' + settingId) + .success(function(data, status, headers, config) { + var url = data.body.url; + if (!url) { + BootstrapDialog.alert({ + message: 'No spark application running' + }); + return; + } + window.open(url, '_blank'); + }).error(function(data, status, headers, config) { + console.log('Error %o %o', status, data.message); + }); + }; + init(); } diff --git a/zeppelin-web/src/app/interpreter/interpreter.html b/zeppelin-web/src/app/interpreter/interpreter.html index abb83d8a65e..e0cebbac867 100644 --- a/zeppelin-web/src/app/interpreter/interpreter.html +++ b/zeppelin-web/src/app/interpreter/interpreter.html @@ -128,6 +128,10 @@

{{setting.name}}

+