diff --git a/README.md b/README.md index 6aac5715ba9..454bf309f5d 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,7 @@ Yarn ### Run ./bin/zeppelin-daemon.sh start - browse localhost:8080 in your browser. 8081 port should be accessible for websocket connection. + browse localhost:8080 in your browser. For configuration details check __./conf__ subdirectory. diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index c2294cbd445..ac43216f8bd 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -28,22 +28,7 @@ zeppelin.server.port 8080 - Server port. port+1 is used for web socket. - - - - zeppelin.websocket.addr - 0.0.0.0 - Testing websocket address - - - - - zeppelin.websocket.port - -1 - Testing websocket port + Server port. diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index e85a3ae21da..2b43e1b0260 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -95,12 +95,6 @@ ${cxf.version} - - org.java-websocket - Java-WebSocket - 1.3.0 - - com.wordnik @@ -297,19 +291,9 @@ - org.atmosphere - atmosphere-jersey - 2.2.0 - - - com.sun.jersey - jersey-server - - - javax.ws.rs - javax.ws.rs-api - - + com.sun.jersey + jersey-servlet + 1.13 diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java deleted file mode 100644 index 7a314614faf..00000000000 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.server; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.util.resource.Resource; - -/** - * Simple servlet to dynamically set the Websocket port - * in the JavaScript sent to the client - */ -public class AppScriptServlet extends DefaultServlet { - - // Hash containing the possible scripts that contain the getPort() - // function originally defined in app.js - private static Set scriptPaths = new HashSet( - Arrays.asList( - "/scripts/scripts.js", - "/components/baseUrl/baseUrl.js" - ) - ); - - private int websocketPort; - - public AppScriptServlet(int websocketPort) { - this.websocketPort = websocketPort; - } - - @Override - protected void doGet(HttpServletRequest request, HttpServletResponse response) - throws ServletException, - IOException { - - // Process all requests not for the app script to the parent - // class - String uri = request.getRequestURI(); - if (!scriptPaths.contains(uri)) { - super.doGet(request, response); - return; - } - - // Read the script file chunk by chunk - Resource scriptFile = getResource(uri); - InputStream is = scriptFile.getInputStream(); - StringBuffer script = new StringBuffer(); - byte[] buffer = new byte[1024]; - while (is.available() > 0) { - int numRead = is.read(buffer); - if (numRead <= 0) { - break; - } - script.append(new String(buffer, 0, numRead, "UTF-8")); - } - - // Replace the getPort function to return the proper value - String startReplaceString = "/* @preserve AppScriptServlet - getPort */"; - String endReplaceString = "/* @preserve AppScriptServlet - close */"; - - int startIndex = script.indexOf(startReplaceString); - int endIndex = script.indexOf(endReplaceString, startIndex); - - if (startIndex >= 0 && endIndex >= 0) { - String replaceString = "this.getPort=function(){return " + websocketPort + "};"; - script.replace(startIndex, endIndex + endReplaceString.length(), replaceString); - } - - response.getWriter().println(script.toString()); - } -} - diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 2bd23bbd5b9..ad1d9078952 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -26,6 +26,7 @@ import javax.net.ssl.SSLContext; import javax.servlet.DispatcherType; +import javax.servlet.Servlet; import javax.ws.rs.core.Application; import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet; @@ -40,13 +41,14 @@ import org.apache.zeppelin.rest.ZeppelinRestApi; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.socket.NotebookServer; -import org.apache.zeppelin.socket.SslWebSocketServerFactory; +import org.eclipse.jetty.server.AbstractConnector; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.bio.SocketConnector; import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.session.SessionHandler; -import org.eclipse.jetty.server.ssl.SslSocketConnector; +import org.eclipse.jetty.server.ssl.SslSelectChannelConnector; +import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -83,8 +85,6 @@ public static void main(String[] args) throws Exception { conf.setProperty("args", args); jettyServer = setupJettyServer(conf); - notebookServer = setupNotebookServer(conf); - notebookServer.start(); // REST api final ServletContextHandler restApi = setupRestApiContextHandler(); @@ -93,17 +93,18 @@ public static void main(String[] args) throws Exception { */ final ServletContextHandler swagger = setupSwaggerContextHandler(conf); + // Notebook server + final ServletContextHandler notebook = setupNotebookServer(conf); + // Web UI - LOG.info("Create zeppelin websocket on {}:{}", notebookServer.getAddress() - .getAddress(), notebookServer.getPort()); - final WebAppContext webApp = setupWebAppContext(conf, notebookServer.getPort()); + final WebAppContext webApp = setupWebAppContext(conf); //Below is commented since zeppelin-docs module is removed. //final WebAppContext webAppSwagg = setupWebAppSwagger(conf); // add all handlers ContextHandlerCollection contexts = new ContextHandlerCollection(); //contexts.setHandlers(new Handler[]{swagger, restApi, webApp, webAppSwagg}); - contexts.setHandlers(new Handler[]{swagger, restApi, webApp}); + contexts.setHandlers(new Handler[]{swagger, restApi, notebook, webApp}); jettyServer.setHandler(contexts); LOG.info("Start zeppelin server"); @@ -114,10 +115,7 @@ public static void main(String[] args) throws Exception { @Override public void run() { LOG.info("Shutting down Zeppelin Server ... "); try { - notebook.getInterpreterFactory().close(); - jettyServer.stop(); - notebookServer.stop(); } catch (Exception e) { LOG.error("Error while stopping servlet container", e); } @@ -142,12 +140,12 @@ public static void main(String[] args) throws Exception { private static Server setupJettyServer(ZeppelinConfiguration conf) throws Exception { - SocketConnector connector; + AbstractConnector connector; if (conf.useSsl()) { - connector = new SslSocketConnector(getSslContextFactory(conf)); + connector = new SslSelectChannelConnector(getSslContextFactory(conf)); } else { - connector = new SocketConnector(); + connector = new SelectChannelConnector(); } // Set some timeout options to make debugging easier. @@ -163,20 +161,22 @@ private static Server setupJettyServer(ZeppelinConfiguration conf) return server; } - private static NotebookServer setupNotebookServer(ZeppelinConfiguration conf) + private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) throws Exception { - NotebookServer server = new NotebookServer(conf.getWebSocketAddress(), conf.getWebSocketPort()); + notebookServer = new NotebookServer(); + final ServletHolder servletHolder = new ServletHolder(notebookServer); + servletHolder.setInitParameter("maxTextMessageSize", "1024000"); - // Default WebSocketServer uses unencrypted connector, so only need to - // change the connector if SSL should be used. - if (conf.useSsl()) { - SslWebSocketServerFactory wsf = new SslWebSocketServerFactory(getSslContext(conf)); - wsf.setNeedClientAuth(conf.useClientAuth()); - server.setWebSocketFactory(wsf); - } + final ServletContextHandler cxfContext = new ServletContextHandler( + ServletContextHandler.SESSIONS); - return server; + cxfContext.setSessionHandler(new SessionHandler()); + cxfContext.setContextPath("/"); + cxfContext.addServlet(servletHolder, "/ws/*"); + cxfContext.addFilter(new FilterHolder(CorsFilter.class), "/*", + EnumSet.allOf(DispatcherType.class)); + return cxfContext; } private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) @@ -257,7 +257,7 @@ private static ServletContextHandler setupSwaggerContextHandler( } private static WebAppContext setupWebAppContext( - ZeppelinConfiguration conf, int websocketPort) { + ZeppelinConfiguration conf) { WebAppContext webApp = new WebAppContext(); File warPath = new File(conf.getString(ConfVars.ZEPPELIN_WAR)); @@ -273,7 +273,7 @@ private static WebAppContext setupWebAppContext( } // Explicit bind to root webApp.addServlet( - new ServletHolder(new AppScriptServlet(websocketPort)), + new ServletHolder(new DefaultServlet()), "/*" ); return webApp; diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 90a2a95e9f7..ed35ea1b0ca 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Set; +import javax.servlet.http.HttpServletRequest; + import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; @@ -39,9 +41,8 @@ import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.socket.Message.OP; -import org.java_websocket.WebSocket; -import org.java_websocket.handshake.ClientHandshake; -import org.java_websocket.server.WebSocketServer; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketServlet; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,40 +55,36 @@ * * @author anthonycorbacho */ -public class NotebookServer extends WebSocketServer implements - JobListenerFactory, AngularObjectRegistryListener { +public class NotebookServer extends WebSocketServlet implements + NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener { - private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); - private static final String DEFAULT_ADDR = "0.0.0.0"; - private static final int DEFAULT_PORT = 8282; + private static final Logger LOG = LoggerFactory + .getLogger(NotebookServer.class); Gson gson = new Gson(); - Map> noteSocketMap = new HashMap>(); - List connectedSockets = new LinkedList(); - - public NotebookServer() { - super(new InetSocketAddress(DEFAULT_ADDR, DEFAULT_PORT)); - } - - public NotebookServer(String address, int port) { - super(new InetSocketAddress(address, port)); - } + Map> noteSocketMap = new HashMap>(); + List connectedSockets = new LinkedList(); private Notebook notebook() { return ZeppelinServer.notebook; } @Override - public void onOpen(WebSocket conn, ClientHandshake handshake) { - LOG.info("New connection from {} : {}", conn.getRemoteSocketAddress().getHostName(), conn - .getRemoteSocketAddress().getPort()); + public WebSocket doWebSocketConnect(HttpServletRequest req, String protocol) { + return new NotebookSocket(req, protocol, this); + } + + @Override + public void onOpen(NotebookSocket conn) { + LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(), + conn.getRequest().getRemotePort()); synchronized (connectedSockets) { connectedSockets.add(conn); } } @Override - public void onMessage(WebSocket conn, String msg) { + public void onMessage(NotebookSocket conn, String msg) { Notebook notebook = notebook(); try { Message messagereceived = deserializeMessage(msg); @@ -132,7 +129,7 @@ public void onMessage(WebSocket conn, String msg) { break; case PING: pong(); - break; + break; case ANGULAR_OBJECT_UPDATED: angularObjectUpdated(conn, notebook, messagereceived); break; @@ -146,17 +143,9 @@ public void onMessage(WebSocket conn, String msg) { } @Override - public void onClose(WebSocket conn, int code, String reason, boolean remote) { - LOG.info("Closed connection to {} : {}", conn.getRemoteSocketAddress().getHostName(), conn - .getRemoteSocketAddress().getPort()); - removeConnectionFromAllNote(conn); - synchronized (connectedSockets) { - connectedSockets.remove(conn); - } - } - - @Override - public void onError(WebSocket conn, Exception message) { + public void onClose(NotebookSocket conn, int code, String reason) { + LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest() + .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); removeConnectionFromAllNote(conn); synchronized (connectedSockets) { connectedSockets.remove(conn); @@ -172,12 +161,13 @@ private String serializeMessage(Message m) { return gson.toJson(m); } - private void addConnectionToNote(String noteId, WebSocket socket) { + private void addConnectionToNote(String noteId, NotebookSocket socket) { synchronized (noteSocketMap) { - removeConnectionFromAllNote(socket); // make sure a socket relates only a single note. - List socketList = noteSocketMap.get(noteId); + removeConnectionFromAllNote(socket); // make sure a socket relates only a + // single note. + List socketList = noteSocketMap.get(noteId); if (socketList == null) { - socketList = new LinkedList(); + socketList = new LinkedList(); noteSocketMap.put(noteId, socketList); } @@ -187,9 +177,9 @@ private void addConnectionToNote(String noteId, WebSocket socket) { } } - private void removeConnectionFromNote(String noteId, WebSocket socket) { + private void removeConnectionFromNote(String noteId, NotebookSocket socket) { synchronized (noteSocketMap) { - List socketList = noteSocketMap.get(noteId); + List socketList = noteSocketMap.get(noteId); if (socketList != null) { socketList.remove(socket); } @@ -198,11 +188,11 @@ private void removeConnectionFromNote(String noteId, WebSocket socket) { private void removeNote(String noteId) { synchronized (noteSocketMap) { - List socketList = noteSocketMap.remove(noteId); + List socketList = noteSocketMap.remove(noteId); } } - private void removeConnectionFromAllNote(WebSocket socket) { + private void removeConnectionFromAllNote(NotebookSocket socket) { synchronized (noteSocketMap) { Set keys = noteSocketMap.keySet(); for (String noteId : keys) { @@ -211,12 +201,12 @@ private void removeConnectionFromAllNote(WebSocket socket) { } } - private String getOpenNoteId(WebSocket socket) { + private String getOpenNoteId(NotebookSocket socket) { String id = null; synchronized (noteSocketMap) { Set keys = noteSocketMap.keySet(); for (String noteId : keys) { - List sockets = noteSocketMap.get(noteId); + List sockets = noteSocketMap.get(noteId); if (sockets.contains(socket)) { id = noteId; } @@ -225,7 +215,8 @@ private String getOpenNoteId(WebSocket socket) { return id; } - private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message m) { + private void broadcastToNoteBindedInterpreter(String interpreterGroupId, + Message m) { Notebook notebook = notebook(); List notes = notebook.getAllNotes(); for (Note note : notes) { @@ -240,23 +231,31 @@ private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message private void broadcast(String noteId, Message m) { synchronized (noteSocketMap) { - List socketLists = noteSocketMap.get(noteId); + List socketLists = noteSocketMap.get(noteId); if (socketLists == null || socketLists.size() == 0) { return; } LOG.info("SEND >> " + m.op); - for (WebSocket conn : socketLists) { - conn.send(serializeMessage(m)); + for (NotebookSocket conn : socketLists) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + } } } } private void broadcastAll(Message m) { synchronized (connectedSockets) { - for (WebSocket conn : connectedSockets) { - conn.send(serializeMessage(m)); + for (NotebookSocket conn : connectedSockets) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + } } } } @@ -278,7 +277,8 @@ private void broadcastNoteList() { broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); } - private void sendNote(WebSocket conn, Notebook notebook, Message fromMessage) { + private void sendNote(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; @@ -296,7 +296,8 @@ private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) throws SchedulerException, IOException { String noteId = (String) fromMessage.get("id"); String name = (String) fromMessage.get("name"); - Map config = (Map) fromMessage.get("config"); + Map config = (Map) fromMessage + .get("config"); if (noteId == null) { return; } @@ -319,7 +320,8 @@ private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) } } - private boolean isCronUpdated(Map configA, Map configB) { + private boolean isCronUpdated(Map configA, + Map configB) { boolean cronUpdated = false; if (configA.get("cron") != null && configB.get("cron") != null && configA.get("cron").equals(configB.get("cron"))) { @@ -352,14 +354,16 @@ private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage) broadcastNoteList(); } - private void updateParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void updateParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } - Map params = (Map) fromMessage.get("params"); - Map config = (Map) fromMessage.get("config"); + Map params = (Map) fromMessage + .get("params"); + Map config = (Map) fromMessage + .get("config"); final Note note = notebook.getNote(getOpenNoteId(conn)); Paragraph p = note.getParagraph(paragraphId); p.settings.setParams(params); @@ -370,8 +374,8 @@ private void updateParagraph(WebSocket conn, Notebook notebook, Message fromMess broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p)); } - private void removeParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void removeParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -385,7 +389,8 @@ private void removeParagraph(WebSocket conn, Notebook notebook, Message fromMess } } - private void completion(WebSocket conn, Notebook notebook, Message fromMessage) { + private void completion(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); String buffer = (String) fromMessage.get("buf"); int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); @@ -404,6 +409,7 @@ private void completion(WebSocket conn, Notebook notebook, Message fromMessage) /** * When angular object updated from client + * * @param conn * @param notebook * @param fromMessage @@ -417,12 +423,12 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, AngularObject ao = null; boolean global = false; - - + // propagate change to (Remote) AngularObjectRegistry Note note = notebook.getNote(noteId); if (note != null) { - List settings = note.getNoteReplLoader().getInterpreterSettings(); + List settings = note.getNoteReplLoader() + .getInterpreterSettings(); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup() == null) { continue; @@ -433,7 +439,7 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, .getInterpreterGroup().getAngularObjectRegistry(); // first trying to get local registry - ao = angularObjectRegistry.get(varName, noteId); + ao = angularObjectRegistry.get(varName, noteId); if (ao == null) { // then try global registry ao = angularObjectRegistry.get(varName, null); @@ -454,26 +460,29 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, } } } - - if (global) { // broadcast change to all web session that uses related interpreter. + + if (global) { // broadcast change to all web session that uses related + // interpreter. for (Note n : notebook.getAllNotes()) { - List settings = note.getNoteReplLoader().getInterpreterSettings(); + List settings = note.getNoteReplLoader() + .getInterpreterSettings(); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup() == null) { continue; } - + if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { AngularObjectRegistry angularObjectRegistry = setting .getInterpreterGroup().getAngularObjectRegistry(); - this.broadcast(n.id(), new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", n.id())); + this.broadcast( + n.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", n.id())); } } } - } else { // broadcast to all web session for the note + } else { // broadcast to all web session for the note this.broadcast( note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) @@ -482,24 +491,25 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, } } - - private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void moveParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } - final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString()); + final int newIndex = (int) Double.parseDouble(fromMessage.get("index") + .toString()); final Note note = notebook.getNote(getOpenNoteId(conn)); note.moveParagraph(paragraphId, newIndex); note.persist(); broadcastNote(note); } - private void insertParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { - final int index = (int) Double.parseDouble(fromMessage.get("index").toString()); + private void insertParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { + final int index = (int) Double.parseDouble(fromMessage.get("index") + .toString()); final Note note = notebook.getNote(getOpenNoteId(conn)); note.insertParagraph(index); @@ -507,9 +517,8 @@ private void insertParagraph(WebSocket conn, Notebook notebook, Message fromMess broadcastNote(note); } - - private void cancelParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void cancelParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -520,8 +529,8 @@ private void cancelParagraph(WebSocket conn, Notebook notebook, Message fromMess p.abort(); } - private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void runParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -531,13 +540,16 @@ private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage String text = (String) fromMessage.get("paragraph"); p.setText(text); p.setTitle((String) fromMessage.get("title")); - Map params = (Map) fromMessage.get("params"); + Map params = (Map) fromMessage + .get("params"); p.settings.setParams(params); - Map config = (Map) fromMessage.get("config"); + Map config = (Map) fromMessage + .get("config"); p.setConfig(config); // if it's the last paragraph, let's add a new one - boolean isTheLastParagraph = note.getLastParagraph().getId().equals(p.getId()); + boolean isTheLastParagraph = note.getLastParagraph().getId() + .equals(p.getId()); if (!Strings.isNullOrEmpty(text) && isTheLastParagraph) { note.addParagraph(); } @@ -546,12 +558,12 @@ private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage try { note.run(paragraphId); - } - catch (Exception ex) { + } catch (Exception ex) { LOG.error("Exception from run", ex); if (p != null) { - p.setReturn(new InterpreterResult( - InterpreterResult.Code.ERROR, ex.getMessage()), ex); + p.setReturn( + new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), + ex); p.setStatus(Status.ERROR); } } @@ -572,12 +584,15 @@ public ParagraphJobListener(NotebookServer notebookServer, Note note) { @Override public void onProgressUpdate(Job job, int progress) { - notebookServer.broadcast(note.id(), - new Message(OP.PROGRESS).put("id", job.getId()).put("progress", job.progress())); + notebookServer.broadcast( + note.id(), + new Message(OP.PROGRESS).put("id", job.getId()).put("progress", + job.progress())); } @Override - public void beforeStatusChange(Job job, Status before, Status after) {} + public void beforeStatusChange(Job job, Status before, Status after) { + } @Override public void afterStatusChange(Job job, Status before, Status after) { @@ -606,19 +621,22 @@ public JobListener getParagraphJobListener(Note note) { private void pong() { } - private void sendAllAngularObjects(Note note, WebSocket conn) { - List settings = note.getNoteReplLoader().getInterpreterSettings(); + private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { + List settings = note.getNoteReplLoader() + .getInterpreterSettings(); if (settings == null || settings.size() == 0) { return; } for (InterpreterSetting intpSetting : settings) { - AngularObjectRegistry registry = intpSetting.getInterpreterGroup().getAngularObjectRegistry(); + AngularObjectRegistry registry = intpSetting.getInterpreterGroup() + .getAngularObjectRegistry(); List objects = registry.getAllWithGlobal(note.id()); for (AngularObject object : objects) { conn.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", intpSetting.getInterpreterGroup().getId()) + .put("angularObject", object) + .put("interpreterGroupId", + intpSetting.getInterpreterGroup().getId()) .put("noteId", note.id()))); } } @@ -641,23 +659,25 @@ public void onUpdate(String interpreterGroupId, AngularObject object) { if (object.getNoteId() != null && !note.id().equals(object.getNoteId())) { continue; } - + List intpSettings = note.getNoteReplLoader() .getInterpreterSettings(); - if (intpSettings.isEmpty()) continue; + if (intpSettings.isEmpty()) + continue; for (InterpreterSetting setting : intpSettings) { if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) { - broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.id())); + broadcast( + note.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", object) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", note.id())); } } - } + } } - @Override public void onRemove(String interpreterGroupId, String name, String noteId) { diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java new file mode 100644 index 00000000000..aceea456b28 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.socket; + +import java.io.IOException; + +import javax.servlet.http.HttpServletRequest; + +import org.eclipse.jetty.websocket.WebSocket; + +/** + * Notebook websocket + */ +public class NotebookSocket implements WebSocket.OnTextMessage{ + + private Connection connection; + private NotebookSocketListener listener; + private HttpServletRequest request; + private String protocol; + + + public NotebookSocket(HttpServletRequest req, String protocol, + NotebookSocketListener listener) { + this.listener = listener; + this.request = req; + this.protocol = protocol; + } + + @Override + public void onClose(int closeCode, String message) { + listener.onClose(this, closeCode, message); + } + + @Override + public void onOpen(Connection connection) { + this.connection = connection; + listener.onOpen(this); + } + + @Override + public void onMessage(String message) { + listener.onMessage(this, message); + } + + + public HttpServletRequest getRequest() { + return request; + } + + public String getProtocol() { + return protocol; + } + + public void send(String serializeMessage) throws IOException { + connection.sendMessage(serializeMessage); + } + + +} diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java new file mode 100644 index 00000000000..77fed6ed7b1 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.socket; + +/** + * NoteboookSocket listener + */ +public interface NotebookSocketListener { + public void onClose(NotebookSocket socket, int code, String message); + public void onOpen(NotebookSocket socket); + public void onMessage(NotebookSocket socket, String message); +} diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java deleted file mode 100644 index f44dc1f2ce8..00000000000 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.socket; - -import java.io.IOException; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.concurrent.ExecutorService; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -import org.java_websocket.SSLSocketChannel2; -import org.java_websocket.server.DefaultSSLWebSocketServerFactory; - -/** - * Extension of the java_websocket library's DefaultSslWebSocketServerFactory - * to require client side authentication during the SSL handshake - */ -public class SslWebSocketServerFactory - extends DefaultSSLWebSocketServerFactory { - - protected boolean needClientAuth; - - public SslWebSocketServerFactory(SSLContext sslcontext) { - super(sslcontext); - initAttributes(); - } - - public SslWebSocketServerFactory( - SSLContext sslcontext, - ExecutorService exec) { - - super(sslcontext, exec); - initAttributes(); - } - - protected void initAttributes() { - this.needClientAuth = false; - } - - @Override - public ByteChannel wrapChannel(SocketChannel channel, SelectionKey key) - throws IOException { - - SSLEngine sslEngine = sslcontext.createSSLEngine(); - sslEngine.setUseClientMode(false); - sslEngine.setNeedClientAuth(needClientAuth); - return new SSLSocketChannel2( channel, sslEngine, exec, key ); - } - - public boolean getNeedClientAuth() { - return needClientAuth; - } - - public void setNeedClientAuth(boolean needClientAuth) { - this.needClientAuth = needClientAuth; - } -} - diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java index 779396cb917..b170a951efa 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java @@ -150,6 +150,20 @@ boolean endToEndTestEnabled() { return null != System.getenv("CI"); } + boolean waitForText(final String txt, final By by) { + try { + new WebDriverWait(driver, 5).until(new ExpectedCondition() { + @Override + public Boolean apply(WebDriver d) { + return txt.equals(driver.findElement(by).getText()); + } + }); + return true; + } catch (TimeoutException e) { + return false; + } + } + @Test public void testAngularDisplay() throws InterruptedException{ if (!endToEndTestEnabled()) { @@ -176,8 +190,8 @@ public void testAngularDisplay() throws InterruptedException{ waitForParagraph(1, "FINISHED"); // check expected text - assertEquals("BindingTest__", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest__", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* * Bind variable @@ -190,8 +204,8 @@ public void testAngularDisplay() throws InterruptedException{ waitForParagraph(2, "FINISHED"); // check expected text - assertEquals("BindingTest_1_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_1_", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* @@ -206,8 +220,8 @@ public void testAngularDisplay() throws InterruptedException{ waitForParagraph(3, "FINISHED"); // check expected text - assertEquals("myVar=1", driver.findElement(By.xpath( - getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")).getText()); + waitForText("myVar=1", By.xpath( + getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")); /* * Click element @@ -216,8 +230,8 @@ public void testAngularDisplay() throws InterruptedException{ getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click(); // check expected text - assertEquals("BindingTest_2_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_2_", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* * Register watcher @@ -242,13 +256,13 @@ public void testAngularDisplay() throws InterruptedException{ getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click(); // check expected text - assertEquals("BindingTest_3_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_3_", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); waitForParagraph(3, "FINISHED"); // check expected text by watcher - assertEquals("myVar=3", driver.findElement(By.xpath( - getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")).getText()); + waitForText("myVar=3", By.xpath( + getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")); /* * Unbind @@ -261,8 +275,8 @@ public void testAngularDisplay() throws InterruptedException{ waitForParagraph(5, "FINISHED"); // check expected text - assertEquals("BindingTest__", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest__", + By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* * Bind again and see rebind works. @@ -272,8 +286,8 @@ public void testAngularDisplay() throws InterruptedException{ waitForParagraph(2, "FINISHED"); // check expected text - assertEquals("BindingTest_1_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_1_", + By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); System.out.println("testCreateNotebook Test executed"); } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 744c1e0ce33..393dc7bcabc 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -187,7 +187,6 @@ private static boolean isActiveSparkHome(File dir) { protected static void shutDown() throws Exception { if (!wasRunning) { LOG.info("Terminating test Zeppelin..."); - ZeppelinServer.notebookServer.stop(); ZeppelinServer.jettyServer.stop(); executor.shutdown(); diff --git a/zeppelin-web/src/components/baseUrl/baseUrl.service.js b/zeppelin-web/src/components/baseUrl/baseUrl.service.js index 662d88fa000..f5eb2df5970 100644 --- a/zeppelin-web/src/components/baseUrl/baseUrl.service.js +++ b/zeppelin-web/src/components/baseUrl/baseUrl.service.js @@ -15,36 +15,7 @@ angular.module('zeppelinWebApp').service('baseUrlSrv', function() { - /** Get the current port of the websocket - * - * When running Zeppelin, the body of this function will be dynamically - * overridden with the AppScriptServlet from zeppelin-site.xml config value. - * - * If the config value is not defined, it defaults to the HTTP port + 1 - * - * In the case of running "grunt serve", this function will appear - * as is. - */ - - /* @preserve AppScriptServlet - getPort */ this.getPort = function() { - var port = Number(location.port); - if (location.protocol !== 'https:' && !port) { - port = 80; - } else if (location.protocol === 'https:' && !port) { - port = 443; - } else if (port === 3333 || port === 9000) { - port = 8080; - } - return port + 1; - }; - /* @preserve AppScriptServlet - close */ - - this.getWebsocketProtocol = function() { - return location.protocol === 'https:' ? 'wss' : 'ws'; - }; - - this.getRestApiBase = function() { var port = Number(location.port); if (!port) { port = 80; @@ -52,15 +23,24 @@ angular.module('zeppelinWebApp').service('baseUrlSrv', function() { port = 443; } } - + //Exception for when running locally via grunt if (port === 3333 || port === 9000) { port = 8080; } - return location.protocol + '//' + location.hostname + ':' + port + skipTrailingSlash(location.pathname) + '/api'; + return port; + }; + + this.getWebsocketUrl = function() { + var wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:'; + return wsProtocol + '//' + location.hostname + ':' + this.getPort() + '/ws'; }; - + + this.getRestApiBase = function() { + return location.protocol + '//' + location.hostname + ':' + this.getPort() + skipTrailingSlash(location.pathname) + '/api'; + }; + var skipTrailingSlash = function(path) { return path.replace(/\/$/, ''); }; -}); \ No newline at end of file +}); diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index 731266f8071..6d9f17742f4 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -16,7 +16,7 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, $websocket, baseUrlSrv) { var websocketCalls = {}; - websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketProtocol() + '://' + location.hostname + ':' + baseUrlSrv.getPort()); + websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketUrl()); websocketCalls.ws.onOpen(function() { console.log('Websocket created'); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index d5c81550ec2..2be744bb366 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -268,25 +268,6 @@ public int getServerPort() { return getInt(ConfVars.ZEPPELIN_PORT); } - public String getWebSocketAddress() { - return getString(ConfVars.ZEPPELIN_WEBSOCKET_ADDR); - } - - public int getWebSocketPort() { - int port = getInt(ConfVars.ZEPPELIN_WEBSOCKET_PORT); - int serverPort = getServerPort(); - - if (port < 0) { - if (serverPort <= 0) { - return 0; - } else { - return serverPort + 1; - } - } else { - return port; - } - } - public String getKeyStorePath() { return getRelativeDir( String.format("%s/%s", @@ -389,9 +370,6 @@ public static enum ConfVars { ZEPPELIN_HOME("zeppelin.home", "../"), ZEPPELIN_ADDR("zeppelin.server.addr", "0.0.0.0"), ZEPPELIN_PORT("zeppelin.server.port", 8080), - // negative websocket port denotes that server port + 1 should be used - ZEPPELIN_WEBSOCKET_ADDR("zeppelin.websocket.addr", "0.0.0.0"), - ZEPPELIN_WEBSOCKET_PORT("zeppelin.websocket.port", -1), ZEPPELIN_SSL("zeppelin.ssl", false), ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false), ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "keystore"),