diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 582cb6b06c5..a4ebd7858c6 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -315,6 +315,7 @@ public void appendOutput(String message) throws IOException { @Override public InterpreterResult interpret(String st, InterpreterContext context) { SparkInterpreter sparkInterpreter = getSparkInterpreter(); + sparkInterpreter.populateSparkWebUrl(context); if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) { return new InterpreterResult(Code.ERROR, "Spark " + sparkInterpreter.getSparkVersion().toString() + " is not supported"); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 53bf30b9533..26faca90310 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -27,8 +27,6 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.*; -import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Joiner; @@ -45,6 +43,7 @@ import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Pool; import org.apache.spark.sql.SQLContext; +import org.apache.spark.ui.SparkUI; import org.apache.spark.ui.jobs.JobProgressListener; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -112,6 +111,7 @@ public class SparkInterpreter extends Interpreter { private SparkOutputStream out; private SparkDependencyResolver dep; + private String sparkUrl; /** * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10) @@ -938,6 +938,13 @@ public void open() { numReferenceOfSparkContext.incrementAndGet(); } + private String getSparkUIUrl() { + Option sparkUiOption = (Option) Utils.invokeMethod(sc, "ui"); + SparkUI sparkUi = sparkUiOption.get(); + String sparkWebUrl = sparkUi.appUIAddress(); + return sparkWebUrl; + } + private Results.Result interpret(String line) { return (Results.Result) Utils.invokeMethod( intp, @@ -946,6 +953,20 @@ private Results.Result interpret(String line) { new Object[] {line}); } + public void populateSparkWebUrl(InterpreterContext ctx) { + if (sparkUrl == null) { + sparkUrl = getSparkUIUrl(); + Map infos = new java.util.HashMap<>(); + if (sparkUrl != null) { + infos.put("url", sparkUrl); + logger.info("Sending metainfos to Zeppelin server: {}", infos.toString()); + if (ctx != null && ctx.getClient() != null) { + ctx.getClient().onMetaInfosReceived(infos); + } + } + } + } + private List currentClassPath() { List paths = classPath(Thread.currentThread().getContextClassLoader()); String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); @@ -1085,7 +1106,7 @@ public InterpreterResult interpret(String line, InterpreterContext context) { return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString() + " is not supported"); } - + populateSparkWebUrl(context); z.setInterpreterContext(context); if (line == null || line.trim().length() == 0) { return new InterpreterResult(Code.SUCCESS); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 06139496020..15ce6581698 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -97,6 +97,7 @@ public void open() { @Override public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) { + getSparkInterpreter().populateSparkWebUrl(interpreterContext); String imageWidth = getProperty("zeppelin.R.image.width"); String[] sl = lines.split("\n"); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index fc8923c4172..e6fe137273a 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -96,6 +96,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { + sparkInterpreter.getSparkVersion().toString() + " is not supported"); } + sparkInterpreter.populateSparkWebUrl(context); sqlc = getSparkInterpreter().getSQLContext(); SparkContext sc = sqlc.sparkContext(); if (concurrentSQL()) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index bf3cfcb557d..f8c9032e291 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -20,10 +20,12 @@ import java.util.List; import java.util.Map; -import org.apache.zeppelin.annotation.ZeppelinApi; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; +import org.apache.zeppelin.interpreter.remote.RemoteEventClient; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; import org.apache.zeppelin.resource.ResourcePool; /** @@ -57,6 +59,7 @@ public static void remove() { private ResourcePool resourcePool; private List runners; private String className; + private RemoteEventClientWrapper client; public InterpreterContext(String noteId, String paragraphId, @@ -83,6 +86,22 @@ public InterpreterContext(String noteId, this.out = out; } + public InterpreterContext(String noteId, + String paragraphId, + String paragraphTitle, + String paragraphText, + AuthenticationInfo authenticationInfo, + Map config, + GUI gui, + AngularObjectRegistry angularObjectRegistry, + ResourcePool resourcePool, + List contextRunners, + InterpreterOutput output, + RemoteInterpreterEventClient eventClient) { + this(noteId, paragraphId, paragraphTitle, paragraphText, authenticationInfo, config, gui, + angularObjectRegistry, resourcePool, contextRunners, output); + this.client = new RemoteEventClient(eventClient); + } public String getNoteId() { return noteId; @@ -131,4 +150,8 @@ public String getClassName() { public void setClassName(String className) { this.className = className; } + + public RemoteEventClientWrapper getClient() { + return client; + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java new file mode 100644 index 00000000000..3585a59eae4 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java @@ -0,0 +1,24 @@ +package org.apache.zeppelin.interpreter.remote; + +import java.util.Map; + +/** + * + * Wrapper arnd RemoteInterpreterEventClient + * to expose methods in the client + * + */ +public class RemoteEventClient implements RemoteEventClientWrapper { + + private RemoteInterpreterEventClient client; + + public RemoteEventClient(RemoteInterpreterEventClient client) { + this.client = client; + } + + @Override + public void onMetaInfosReceived(Map infos) { + client.onMetaInfosReceived(infos); + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java new file mode 100644 index 00000000000..339f7714a21 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java @@ -0,0 +1,15 @@ +package org.apache.zeppelin.interpreter.remote; + +import java.util.Map; + +/** + * + * Wrapper interface for RemoterInterpreterEventClient + * to expose only required methods from EventClient + * + */ +public interface RemoteEventClientWrapper { + + public void onMetaInfosReceived(Map infos); + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index c59a6f60ce3..ae38ee84c31 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -279,6 +279,11 @@ public void onAppStatusUpdate(String noteId, String paragraphId, String appId, S gson.toJson(appendOutput))); } + public void onMetaInfosReceived(Map infos) { + sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.META_INFOS, + gson.toJson(infos))); + } + /** * Wait for eventQueue becomes empty */ diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index f7fee490daa..b75e5fae6b1 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -195,6 +195,14 @@ public void run() { String status = appStatusUpdate.get("status"); appListener.onStatusChange(noteId, paragraphId, appId, status); + } else if (event.getType() == RemoteInterpreterEventType.META_INFOS) { + Map metaInfos = gson.fromJson(event.getData(), + new TypeToken>() { + }.getType()); + String id = interpreterGroup.getId(); + int indexOfColon = id.indexOf(":"); + String settingId = id.substring(0, indexOfColon); + listener.onMetaInfosReceived(settingId, metaInfos); } logger.debug("Event from remoteproceess {}", event.getType()); } catch (Exception e) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java index da6ac63c51d..d25683f0cf8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -16,10 +16,13 @@ */ package org.apache.zeppelin.interpreter.remote; +import java.util.Map; + /** * Event from remoteInterpreterProcess */ public interface RemoteInterpreterProcessListener { public void onOutputAppend(String noteId, String paragraphId, String output); public void onOutputUpdated(String noteId, String paragraphId, String output); + public void onMetaInfosReceived(String settingId, Map metaInfos); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index cde6a7ba7ef..4d6f3bae7d1 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -552,7 +552,7 @@ private InterpreterContext convert(RemoteInterpreterContext ric, InterpreterOutp gson.fromJson(ric.getGui(), GUI.class), interpreterGroup.getAngularObjectRegistry(), interpreterGroup.getResourcePool(), - contextRunners, output); + contextRunners, output, eventClient); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 955461951f3..75692c04216 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -24,9 +24,6 @@ package org.apache.zeppelin.interpreter.thrift; -import java.util.Map; -import java.util.HashMap; -import org.apache.thrift.TEnum; public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { NO_OP(1), @@ -39,7 +36,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { OUTPUT_APPEND(8), OUTPUT_UPDATE(9), ANGULAR_REGISTRY_PUSH(10), - APP_STATUS_UPDATE(11); + APP_STATUS_UPDATE(11), + META_INFOS(12); private final int value; @@ -82,6 +80,8 @@ public static RemoteInterpreterEventType findByValue(int value) { return ANGULAR_REGISTRY_PUSH; case 11: return APP_STATUS_UPDATE; + case 12: + return META_INFOS; default: return null; } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index 2ba62c30172..27e38089f66 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -29,6 +29,7 @@ import java.io.File; import java.util.HashMap; import java.util.LinkedList; +import java.util.Map; import java.util.Properties; import static org.junit.Assert.assertEquals; @@ -154,4 +155,9 @@ public void onOutputAppend(String noteId, String paragraphId, String output) { public void onOutputUpdated(String noteId, String paragraphId, String output) { } + + @Override + public void onMetaInfosReceived(String settingId, Map metaInfos) { + + } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index 49de4a70539..54cc25d8cda 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -299,4 +299,9 @@ public void onOutputAppend(String noteId, String paragraphId, String output) { public void onOutputUpdated(String noteId, String paragraphId, String output) { } + + @Override + public void onMetaInfosReceived(String settingId, Map metaInfos) { + + } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index c9094ebc7a3..cb50a8a9a77 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -18,9 +18,12 @@ package org.apache.zeppelin.rest; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; + +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -28,6 +31,7 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; @@ -215,6 +219,31 @@ public Response addRepository(String message) { return new JsonResponse(Status.CREATED).build(); } + /** + * get the metainfo property value + */ + @GET + @Path("getmetainfos/{settingId}") + public Response getMetaInfo(@Context HttpServletRequest req, + @PathParam("settingId") String settingId) { + String propName = req.getParameter("propName"); + if (propName == null) { + return new JsonResponse<>(Status.BAD_REQUEST).build(); + } + String propValue = null; + InterpreterSetting interpreterSetting = interpreterFactory.get(settingId); + Map infos = interpreterSetting.getInfos(); + if (infos != null) { + propValue = infos.get(propName); + } + Map respMap = new HashMap<>(); + respMap.put(propName, propValue); + logger.debug("Get meta info"); + logger.debug("Interpretersetting Id: {}, property Name:{}, property value: {}", settingId, + propName, propValue); + return new JsonResponse<>(Status.OK, respMap).build(); + } + /** * Delete repository * diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 8a84587f75c..969186401ae 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -1759,5 +1759,12 @@ private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subj .put("interpreterSettings", availableSettings))); } + @Override + public void onMetaInfosReceived(String settingId, Map metaInfos) { + InterpreterSetting interpreterSetting = notebook().getInterpreterFactory() + .get(settingId); + interpreterSetting.setInfos(metaInfos); + } + } diff --git a/zeppelin-web/src/app/interpreter/interpreter.controller.js b/zeppelin-web/src/app/interpreter/interpreter.controller.js index e4bebebd6c6..69dfbd4431f 100644 --- a/zeppelin-web/src/app/interpreter/interpreter.controller.js +++ b/zeppelin-web/src/app/interpreter/interpreter.controller.js @@ -691,6 +691,22 @@ getRepositories(); }; + $scope.showSparkUI = function(settingId) { + $http.get(baseUrlSrv.getRestApiBase() + '/interpreter/getmetainfos/' + settingId + '?propName=url') + .success(function(data, status, headers, config) { + var url = data.body.url; + if (!url) { + BootstrapDialog.alert({ + message: 'No spark application running' + }); + return; + } + window.open(url, '_blank'); + }).error(function(data, status, headers, config) { + console.log('Error %o %o', status, data.message); + }); + }; + init(); } diff --git a/zeppelin-web/src/app/interpreter/interpreter.html b/zeppelin-web/src/app/interpreter/interpreter.html index abb83d8a65e..e0cebbac867 100644 --- a/zeppelin-web/src/app/interpreter/interpreter.html +++ b/zeppelin-web/src/app/interpreter/interpreter.html @@ -128,6 +128,10 @@

{{setting.name}}

+