From a7b82aaaffbd58d14c3eae221ea5179a88b0caf3 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Tue, 28 Jul 2015 12:41:31 +0900 Subject: [PATCH 1/8] Initial implementation of Websocket inside of Jetty server --- .../zeppelin/server/AppScriptServlet.java | 9 +- .../zeppelin/server/ZeppelinServer.java | 51 ++- .../zeppelin/socket/NotebookServer.java | 324 ++++++++++-------- .../zeppelin/socket/NotebookSocket.java | 55 +++ .../socket/NotebookSocketListener.java | 10 + .../socket/SslWebSocketServerFactory.java | 76 ---- .../zeppelin/rest/AbstractTestRestApi.java | 1 - .../src/components/baseUrl/baseUrl.service.js | 4 +- .../websocketEvents.factory.js | 2 +- 9 files changed, 267 insertions(+), 265 deletions(-) create mode 100644 zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java create mode 100644 zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java delete mode 100644 zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java index 7a314614faf..d126ccd6142 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java @@ -27,6 +27,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.util.resource.Resource; @@ -45,10 +46,7 @@ public class AppScriptServlet extends DefaultServlet { ) ); - private int websocketPort; - - public AppScriptServlet(int websocketPort) { - this.websocketPort = websocketPort; + public AppScriptServlet() { } @Override @@ -85,7 +83,8 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) int endIndex = script.indexOf(endReplaceString, startIndex); if (startIndex >= 0 && endIndex >= 0) { - String replaceString = "this.getPort=function(){return " + websocketPort + "};"; + Server server = ZeppelinServer.jettyServer; + String replaceString = "this.getPort=function(){return " + ZeppelinServer.jettyServer.getConnectors()[0].getLocalPort() + "};"; script.replace(startIndex, endIndex + endReplaceString.length(), replaceString); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 2bd23bbd5b9..756d9a8b32f 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -40,13 +40,13 @@ import org.apache.zeppelin.rest.ZeppelinRestApi; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.socket.NotebookServer; -import org.apache.zeppelin.socket.SslWebSocketServerFactory; +import org.eclipse.jetty.server.AbstractConnector; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.bio.SocketConnector; import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.session.SessionHandler; -import org.eclipse.jetty.server.ssl.SslSocketConnector; +import org.eclipse.jetty.server.ssl.SslSelectChannelConnector; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -83,8 +83,6 @@ public static void main(String[] args) throws Exception { conf.setProperty("args", args); jettyServer = setupJettyServer(conf); - notebookServer = setupNotebookServer(conf); - notebookServer.start(); // REST api final ServletContextHandler restApi = setupRestApiContextHandler(); @@ -92,18 +90,19 @@ public static void main(String[] args) throws Exception { * But the rest of swagger is configured here */ final ServletContextHandler swagger = setupSwaggerContextHandler(conf); + + // Notebook server + final ServletContextHandler notebook = setupNotebookServer(conf); // Web UI - LOG.info("Create zeppelin websocket on {}:{}", notebookServer.getAddress() - .getAddress(), notebookServer.getPort()); - final WebAppContext webApp = setupWebAppContext(conf, notebookServer.getPort()); + final WebAppContext webApp = setupWebAppContext(conf); //Below is commented since zeppelin-docs module is removed. //final WebAppContext webAppSwagg = setupWebAppSwagger(conf); // add all handlers ContextHandlerCollection contexts = new ContextHandlerCollection(); //contexts.setHandlers(new Handler[]{swagger, restApi, webApp, webAppSwagg}); - contexts.setHandlers(new Handler[]{swagger, restApi, webApp}); + contexts.setHandlers(new Handler[]{swagger, restApi, notebook, webApp}); jettyServer.setHandler(contexts); LOG.info("Start zeppelin server"); @@ -114,10 +113,7 @@ public static void main(String[] args) throws Exception { @Override public void run() { LOG.info("Shutting down Zeppelin Server ... "); try { - notebook.getInterpreterFactory().close(); - jettyServer.stop(); - notebookServer.stop(); } catch (Exception e) { LOG.error("Error while stopping servlet container", e); } @@ -142,12 +138,12 @@ public static void main(String[] args) throws Exception { private static Server setupJettyServer(ZeppelinConfiguration conf) throws Exception { - SocketConnector connector; + AbstractConnector connector; if (conf.useSsl()) { - connector = new SslSocketConnector(getSslContextFactory(conf)); + connector = new SslSelectChannelConnector(getSslContextFactory(conf)); } else { - connector = new SocketConnector(); + connector = new SelectChannelConnector(); } // Set some timeout options to make debugging easier. @@ -163,20 +159,19 @@ private static Server setupJettyServer(ZeppelinConfiguration conf) return server; } - private static NotebookServer setupNotebookServer(ZeppelinConfiguration conf) + private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) throws Exception { - NotebookServer server = new NotebookServer(conf.getWebSocketAddress(), conf.getWebSocketPort()); + final ServletHolder notebookServer = new ServletHolder("ws", NotebookServer.class); - // Default WebSocketServer uses unencrypted connector, so only need to - // change the connector if SSL should be used. - if (conf.useSsl()) { - SslWebSocketServerFactory wsf = new SslWebSocketServerFactory(getSslContext(conf)); - wsf.setNeedClientAuth(conf.useClientAuth()); - server.setWebSocketFactory(wsf); - } - - return server; + final ServletContextHandler cxfContext = new ServletContextHandler( + ServletContextHandler.SESSIONS); + cxfContext.setSessionHandler(new SessionHandler()); + cxfContext.setContextPath("/"); + cxfContext.addServlet(notebookServer, "/ws/*"); + cxfContext.addFilter(new FilterHolder(CorsFilter.class), "/*", + EnumSet.allOf(DispatcherType.class)); + return cxfContext; } private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) @@ -257,7 +252,7 @@ private static ServletContextHandler setupSwaggerContextHandler( } private static WebAppContext setupWebAppContext( - ZeppelinConfiguration conf, int websocketPort) { + ZeppelinConfiguration conf) { WebAppContext webApp = new WebAppContext(); File warPath = new File(conf.getString(ConfVars.ZEPPELIN_WAR)); @@ -273,7 +268,7 @@ private static WebAppContext setupWebAppContext( } // Explicit bind to root webApp.addServlet( - new ServletHolder(new AppScriptServlet(websocketPort)), + new ServletHolder(new AppScriptServlet()), "/*" ); return webApp; diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 90a2a95e9f7..d0c672e3d85 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Set; +import javax.servlet.http.HttpServletRequest; + import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; @@ -39,9 +41,8 @@ import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.socket.Message.OP; -import org.java_websocket.WebSocket; -import org.java_websocket.handshake.ClientHandshake; -import org.java_websocket.server.WebSocketServer; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketServlet; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,91 +55,87 @@ * * @author anthonycorbacho */ -public class NotebookServer extends WebSocketServer implements - JobListenerFactory, AngularObjectRegistryListener { +public class NotebookServer extends WebSocketServlet implements + NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener { - private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); - private static final String DEFAULT_ADDR = "0.0.0.0"; - private static final int DEFAULT_PORT = 8282; + private static final Logger LOG = LoggerFactory + .getLogger(NotebookServer.class); Gson gson = new Gson(); - Map> noteSocketMap = new HashMap>(); - List connectedSockets = new LinkedList(); - - public NotebookServer() { - super(new InetSocketAddress(DEFAULT_ADDR, DEFAULT_PORT)); - } - - public NotebookServer(String address, int port) { - super(new InetSocketAddress(address, port)); - } + Map> noteSocketMap = new HashMap>(); + List connectedSockets = new LinkedList(); private Notebook notebook() { return ZeppelinServer.notebook; } @Override - public void onOpen(WebSocket conn, ClientHandshake handshake) { - LOG.info("New connection from {} : {}", conn.getRemoteSocketAddress().getHostName(), conn - .getRemoteSocketAddress().getPort()); + public WebSocket doWebSocketConnect(HttpServletRequest req, String protocol) { + return new NotebookSocket(req, protocol, this); + } + + @Override + public void onOpen(NotebookSocket conn) { + LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(), + conn.getRequest().getRemotePort()); synchronized (connectedSockets) { connectedSockets.add(conn); } } @Override - public void onMessage(WebSocket conn, String msg) { + public void onMessage(NotebookSocket conn, String msg) { Notebook notebook = notebook(); try { Message messagereceived = deserializeMessage(msg); LOG.info("RECEIVE << " + messagereceived.op); /** Lets be elegant here */ switch (messagereceived.op) { - case LIST_NOTES: - broadcastNoteList(); - break; - case GET_NOTE: - sendNote(conn, notebook, messagereceived); - break; - case NEW_NOTE: - createNote(conn, notebook); - break; - case DEL_NOTE: - removeNote(conn, notebook, messagereceived); - break; - case COMMIT_PARAGRAPH: - updateParagraph(conn, notebook, messagereceived); - break; - case RUN_PARAGRAPH: - runParagraph(conn, notebook, messagereceived); - break; - case CANCEL_PARAGRAPH: - cancelParagraph(conn, notebook, messagereceived); - break; - case MOVE_PARAGRAPH: - moveParagraph(conn, notebook, messagereceived); - break; - case INSERT_PARAGRAPH: - insertParagraph(conn, notebook, messagereceived); - break; - case PARAGRAPH_REMOVE: - removeParagraph(conn, notebook, messagereceived); - break; - case NOTE_UPDATE: - updateNote(conn, notebook, messagereceived); - break; - case COMPLETION: - completion(conn, notebook, messagereceived); - break; - case PING: - pong(); - break; - case ANGULAR_OBJECT_UPDATED: - angularObjectUpdated(conn, notebook, messagereceived); - break; - default: - broadcastNoteList(); - break; + case LIST_NOTES: + broadcastNoteList(); + break; + case GET_NOTE: + sendNote(conn, notebook, messagereceived); + break; + case NEW_NOTE: + createNote(conn, notebook); + break; + case DEL_NOTE: + removeNote(conn, notebook, messagereceived); + break; + case COMMIT_PARAGRAPH: + updateParagraph(conn, notebook, messagereceived); + break; + case RUN_PARAGRAPH: + runParagraph(conn, notebook, messagereceived); + break; + case CANCEL_PARAGRAPH: + cancelParagraph(conn, notebook, messagereceived); + break; + case MOVE_PARAGRAPH: + moveParagraph(conn, notebook, messagereceived); + break; + case INSERT_PARAGRAPH: + insertParagraph(conn, notebook, messagereceived); + break; + case PARAGRAPH_REMOVE: + removeParagraph(conn, notebook, messagereceived); + break; + case NOTE_UPDATE: + updateNote(conn, notebook, messagereceived); + break; + case COMPLETION: + completion(conn, notebook, messagereceived); + break; + case PING: + pong(); + break; + case ANGULAR_OBJECT_UPDATED: + angularObjectUpdated(conn, notebook, messagereceived); + break; + default: + broadcastNoteList(); + break; } } catch (Exception e) { LOG.error("Can't handle message", e); @@ -146,17 +143,9 @@ public void onMessage(WebSocket conn, String msg) { } @Override - public void onClose(WebSocket conn, int code, String reason, boolean remote) { - LOG.info("Closed connection to {} : {}", conn.getRemoteSocketAddress().getHostName(), conn - .getRemoteSocketAddress().getPort()); - removeConnectionFromAllNote(conn); - synchronized (connectedSockets) { - connectedSockets.remove(conn); - } - } - - @Override - public void onError(WebSocket conn, Exception message) { + public void onClose(NotebookSocket conn, int code, String reason) { + LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest() + .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); removeConnectionFromAllNote(conn); synchronized (connectedSockets) { connectedSockets.remove(conn); @@ -172,12 +161,13 @@ private String serializeMessage(Message m) { return gson.toJson(m); } - private void addConnectionToNote(String noteId, WebSocket socket) { + private void addConnectionToNote(String noteId, NotebookSocket socket) { synchronized (noteSocketMap) { - removeConnectionFromAllNote(socket); // make sure a socket relates only a single note. - List socketList = noteSocketMap.get(noteId); + removeConnectionFromAllNote(socket); // make sure a socket relates only a + // single note. + List socketList = noteSocketMap.get(noteId); if (socketList == null) { - socketList = new LinkedList(); + socketList = new LinkedList(); noteSocketMap.put(noteId, socketList); } @@ -187,9 +177,9 @@ private void addConnectionToNote(String noteId, WebSocket socket) { } } - private void removeConnectionFromNote(String noteId, WebSocket socket) { + private void removeConnectionFromNote(String noteId, NotebookSocket socket) { synchronized (noteSocketMap) { - List socketList = noteSocketMap.get(noteId); + List socketList = noteSocketMap.get(noteId); if (socketList != null) { socketList.remove(socket); } @@ -198,11 +188,11 @@ private void removeConnectionFromNote(String noteId, WebSocket socket) { private void removeNote(String noteId) { synchronized (noteSocketMap) { - List socketList = noteSocketMap.remove(noteId); + List socketList = noteSocketMap.remove(noteId); } } - private void removeConnectionFromAllNote(WebSocket socket) { + private void removeConnectionFromAllNote(NotebookSocket socket) { synchronized (noteSocketMap) { Set keys = noteSocketMap.keySet(); for (String noteId : keys) { @@ -211,12 +201,12 @@ private void removeConnectionFromAllNote(WebSocket socket) { } } - private String getOpenNoteId(WebSocket socket) { + private String getOpenNoteId(NotebookSocket socket) { String id = null; synchronized (noteSocketMap) { Set keys = noteSocketMap.keySet(); for (String noteId : keys) { - List sockets = noteSocketMap.get(noteId); + List sockets = noteSocketMap.get(noteId); if (sockets.contains(socket)) { id = noteId; } @@ -225,7 +215,8 @@ private String getOpenNoteId(WebSocket socket) { return id; } - private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message m) { + private void broadcastToNoteBindedInterpreter(String interpreterGroupId, + Message m) { Notebook notebook = notebook(); List notes = notebook.getAllNotes(); for (Note note : notes) { @@ -240,23 +231,31 @@ private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message private void broadcast(String noteId, Message m) { synchronized (noteSocketMap) { - List socketLists = noteSocketMap.get(noteId); + List socketLists = noteSocketMap.get(noteId); if (socketLists == null || socketLists.size() == 0) { return; } LOG.info("SEND >> " + m.op); - for (WebSocket conn : socketLists) { - conn.send(serializeMessage(m)); + for (NotebookSocket conn : socketLists) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + } } } } private void broadcastAll(Message m) { synchronized (connectedSockets) { - for (WebSocket conn : connectedSockets) { - conn.send(serializeMessage(m)); + for (NotebookSocket conn : connectedSockets) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + } } } } @@ -278,7 +277,8 @@ private void broadcastNoteList() { broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); } - private void sendNote(WebSocket conn, Notebook notebook, Message fromMessage) { + private void sendNote(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; @@ -296,7 +296,8 @@ private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) throws SchedulerException, IOException { String noteId = (String) fromMessage.get("id"); String name = (String) fromMessage.get("name"); - Map config = (Map) fromMessage.get("config"); + Map config = (Map) fromMessage + .get("config"); if (noteId == null) { return; } @@ -319,7 +320,8 @@ private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) } } - private boolean isCronUpdated(Map configA, Map configB) { + private boolean isCronUpdated(Map configA, + Map configB) { boolean cronUpdated = false; if (configA.get("cron") != null && configB.get("cron") != null && configA.get("cron").equals(configB.get("cron"))) { @@ -352,14 +354,16 @@ private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage) broadcastNoteList(); } - private void updateParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void updateParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } - Map params = (Map) fromMessage.get("params"); - Map config = (Map) fromMessage.get("config"); + Map params = (Map) fromMessage + .get("params"); + Map config = (Map) fromMessage + .get("config"); final Note note = notebook.getNote(getOpenNoteId(conn)); Paragraph p = note.getParagraph(paragraphId); p.settings.setParams(params); @@ -370,8 +374,8 @@ private void updateParagraph(WebSocket conn, Notebook notebook, Message fromMess broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p)); } - private void removeParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void removeParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -385,7 +389,8 @@ private void removeParagraph(WebSocket conn, Notebook notebook, Message fromMess } } - private void completion(WebSocket conn, Notebook notebook, Message fromMessage) { + private void completion(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); String buffer = (String) fromMessage.get("buf"); int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); @@ -404,6 +409,7 @@ private void completion(WebSocket conn, Notebook notebook, Message fromMessage) /** * When angular object updated from client + * * @param conn * @param notebook * @param fromMessage @@ -417,12 +423,12 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, AngularObject ao = null; boolean global = false; - - + // propagate change to (Remote) AngularObjectRegistry Note note = notebook.getNote(noteId); if (note != null) { - List settings = note.getNoteReplLoader().getInterpreterSettings(); + List settings = note.getNoteReplLoader() + .getInterpreterSettings(); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup() == null) { continue; @@ -433,7 +439,7 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, .getInterpreterGroup().getAngularObjectRegistry(); // first trying to get local registry - ao = angularObjectRegistry.get(varName, noteId); + ao = angularObjectRegistry.get(varName, noteId); if (ao == null) { // then try global registry ao = angularObjectRegistry.get(varName, null); @@ -454,26 +460,29 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, } } } - - if (global) { // broadcast change to all web session that uses related interpreter. + + if (global) { // broadcast change to all web session that uses related + // interpreter. for (Note n : notebook.getAllNotes()) { - List settings = note.getNoteReplLoader().getInterpreterSettings(); + List settings = note.getNoteReplLoader() + .getInterpreterSettings(); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup() == null) { continue; } - + if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { AngularObjectRegistry angularObjectRegistry = setting .getInterpreterGroup().getAngularObjectRegistry(); - this.broadcast(n.id(), new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", n.id())); + this.broadcast( + n.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", n.id())); } } } - } else { // broadcast to all web session for the note + } else { // broadcast to all web session for the note this.broadcast( note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) @@ -482,24 +491,25 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, } } - - private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void moveParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } - final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString()); + final int newIndex = (int) Double.parseDouble(fromMessage.get("index") + .toString()); final Note note = notebook.getNote(getOpenNoteId(conn)); note.moveParagraph(paragraphId, newIndex); note.persist(); broadcastNote(note); } - private void insertParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { - final int index = (int) Double.parseDouble(fromMessage.get("index").toString()); + private void insertParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { + final int index = (int) Double.parseDouble(fromMessage.get("index") + .toString()); final Note note = notebook.getNote(getOpenNoteId(conn)); note.insertParagraph(index); @@ -507,9 +517,8 @@ private void insertParagraph(WebSocket conn, Notebook notebook, Message fromMess broadcastNote(note); } - - private void cancelParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void cancelParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -520,8 +529,8 @@ private void cancelParagraph(WebSocket conn, Notebook notebook, Message fromMess p.abort(); } - private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void runParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -531,13 +540,16 @@ private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage String text = (String) fromMessage.get("paragraph"); p.setText(text); p.setTitle((String) fromMessage.get("title")); - Map params = (Map) fromMessage.get("params"); + Map params = (Map) fromMessage + .get("params"); p.settings.setParams(params); - Map config = (Map) fromMessage.get("config"); + Map config = (Map) fromMessage + .get("config"); p.setConfig(config); // if it's the last paragraph, let's add a new one - boolean isTheLastParagraph = note.getLastParagraph().getId().equals(p.getId()); + boolean isTheLastParagraph = note.getLastParagraph().getId() + .equals(p.getId()); if (!Strings.isNullOrEmpty(text) && isTheLastParagraph) { note.addParagraph(); } @@ -546,12 +558,12 @@ private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage try { note.run(paragraphId); - } - catch (Exception ex) { + } catch (Exception ex) { LOG.error("Exception from run", ex); if (p != null) { - p.setReturn(new InterpreterResult( - InterpreterResult.Code.ERROR, ex.getMessage()), ex); + p.setReturn( + new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), + ex); p.setStatus(Status.ERROR); } } @@ -572,12 +584,15 @@ public ParagraphJobListener(NotebookServer notebookServer, Note note) { @Override public void onProgressUpdate(Job job, int progress) { - notebookServer.broadcast(note.id(), - new Message(OP.PROGRESS).put("id", job.getId()).put("progress", job.progress())); + notebookServer.broadcast( + note.id(), + new Message(OP.PROGRESS).put("id", job.getId()).put("progress", + job.progress())); } @Override - public void beforeStatusChange(Job job, Status before, Status after) {} + public void beforeStatusChange(Job job, Status before, Status after) { + } @Override public void afterStatusChange(Job job, Status before, Status after) { @@ -606,19 +621,22 @@ public JobListener getParagraphJobListener(Note note) { private void pong() { } - private void sendAllAngularObjects(Note note, WebSocket conn) { - List settings = note.getNoteReplLoader().getInterpreterSettings(); + private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { + List settings = note.getNoteReplLoader() + .getInterpreterSettings(); if (settings == null || settings.size() == 0) { return; } for (InterpreterSetting intpSetting : settings) { - AngularObjectRegistry registry = intpSetting.getInterpreterGroup().getAngularObjectRegistry(); + AngularObjectRegistry registry = intpSetting.getInterpreterGroup() + .getAngularObjectRegistry(); List objects = registry.getAllWithGlobal(note.id()); for (AngularObject object : objects) { conn.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", intpSetting.getInterpreterGroup().getId()) + .put("angularObject", object) + .put("interpreterGroupId", + intpSetting.getInterpreterGroup().getId()) .put("noteId", note.id()))); } } @@ -641,23 +659,25 @@ public void onUpdate(String interpreterGroupId, AngularObject object) { if (object.getNoteId() != null && !note.id().equals(object.getNoteId())) { continue; } - + List intpSettings = note.getNoteReplLoader() .getInterpreterSettings(); - if (intpSettings.isEmpty()) continue; + if (intpSettings.isEmpty()) + continue; for (InterpreterSetting setting : intpSettings) { if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) { - broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.id())); + broadcast( + note.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", object) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", note.id())); } } - } + } } - @Override public void onRemove(String interpreterGroupId, String name, String noteId) { diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java new file mode 100644 index 00000000000..bc07d21cf62 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java @@ -0,0 +1,55 @@ +package org.apache.zeppelin.socket; + +import java.io.IOException; +import java.util.List; + +import javax.servlet.http.HttpServletRequest; + +import org.eclipse.jetty.websocket.WebSocket; + +public class NotebookSocket implements WebSocket.OnTextMessage{ + + private Connection connection; + private NotebookSocketListener listener; + private HttpServletRequest request; + private String protocol; + + + public NotebookSocket(HttpServletRequest req, String protocol, + NotebookSocketListener listener) { + this.listener = listener; + this.request = req; + this.protocol = protocol; + } + + @Override + public void onClose(int closeCode, String message) { + listener.onClose(this, closeCode, message); + } + + @Override + public void onOpen(Connection connection) { + this.connection = connection; + listener.onOpen(this); + } + + @Override + public void onMessage(String message) { + listener.onMessage(this, message); + } + + + public HttpServletRequest getRequest() { + return request; + } + + public String getProtocol() { + return protocol; + } + + public void send(String serializeMessage) throws IOException { + connection.sendMessage(serializeMessage); + } + + +} diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java new file mode 100644 index 00000000000..fa0dc8c4c37 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java @@ -0,0 +1,10 @@ +package org.apache.zeppelin.socket; + +/** + * NoteboookSocket listener + */ +public interface NotebookSocketListener { + public void onClose(NotebookSocket socket, int code, String message); + public void onOpen(NotebookSocket socket); + public void onMessage(NotebookSocket socket, String message); +} diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java deleted file mode 100644 index f44dc1f2ce8..00000000000 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.socket; - -import java.io.IOException; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.concurrent.ExecutorService; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -import org.java_websocket.SSLSocketChannel2; -import org.java_websocket.server.DefaultSSLWebSocketServerFactory; - -/** - * Extension of the java_websocket library's DefaultSslWebSocketServerFactory - * to require client side authentication during the SSL handshake - */ -public class SslWebSocketServerFactory - extends DefaultSSLWebSocketServerFactory { - - protected boolean needClientAuth; - - public SslWebSocketServerFactory(SSLContext sslcontext) { - super(sslcontext); - initAttributes(); - } - - public SslWebSocketServerFactory( - SSLContext sslcontext, - ExecutorService exec) { - - super(sslcontext, exec); - initAttributes(); - } - - protected void initAttributes() { - this.needClientAuth = false; - } - - @Override - public ByteChannel wrapChannel(SocketChannel channel, SelectionKey key) - throws IOException { - - SSLEngine sslEngine = sslcontext.createSSLEngine(); - sslEngine.setUseClientMode(false); - sslEngine.setNeedClientAuth(needClientAuth); - return new SSLSocketChannel2( channel, sslEngine, exec, key ); - } - - public boolean getNeedClientAuth() { - return needClientAuth; - } - - public void setNeedClientAuth(boolean needClientAuth) { - this.needClientAuth = needClientAuth; - } -} - diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 744c1e0ce33..393dc7bcabc 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -187,7 +187,6 @@ private static boolean isActiveSparkHome(File dir) { protected static void shutDown() throws Exception { if (!wasRunning) { LOG.info("Terminating test Zeppelin..."); - ZeppelinServer.notebookServer.stop(); ZeppelinServer.jettyServer.stop(); executor.shutdown(); diff --git a/zeppelin-web/src/components/baseUrl/baseUrl.service.js b/zeppelin-web/src/components/baseUrl/baseUrl.service.js index 662d88fa000..b5afa208f7f 100644 --- a/zeppelin-web/src/components/baseUrl/baseUrl.service.js +++ b/zeppelin-web/src/components/baseUrl/baseUrl.service.js @@ -36,7 +36,7 @@ angular.module('zeppelinWebApp').service('baseUrlSrv', function() { } else if (port === 3333 || port === 9000) { port = 8080; } - return port + 1; + return port; }; /* @preserve AppScriptServlet - close */ @@ -63,4 +63,4 @@ angular.module('zeppelinWebApp').service('baseUrlSrv', function() { return path.replace(/\/$/, ''); }; -}); \ No newline at end of file +}); diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index 731266f8071..04c7380ea77 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -16,7 +16,7 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, $websocket, baseUrlSrv) { var websocketCalls = {}; - websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketProtocol() + '://' + location.hostname + ':' + baseUrlSrv.getPort()); + websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketProtocol() + '://' + location.hostname + ':' + baseUrlSrv.getPort() + '/ws'); websocketCalls.ws.onOpen(function() { console.log('Websocket created'); From 85d14a0d85955771d42bd675b0ba350cff22e20a Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Tue, 28 Jul 2015 14:58:02 +0900 Subject: [PATCH 2/8] Create notebookserver instance manually --- zeppelin-server/pom.xml | 22 +---- .../zeppelin/server/AppScriptServlet.java | 4 +- .../zeppelin/server/ZeppelinServer.java | 6 +- .../zeppelin/socket/NotebookServer.java | 90 +++++++++---------- .../zeppelin/socket/NotebookSocket.java | 4 +- 5 files changed, 57 insertions(+), 69 deletions(-) diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index e85a3ae21da..2b43e1b0260 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -95,12 +95,6 @@ ${cxf.version} - - org.java-websocket - Java-WebSocket - 1.3.0 - - com.wordnik @@ -297,19 +291,9 @@ - org.atmosphere - atmosphere-jersey - 2.2.0 - - - com.sun.jersey - jersey-server - - - javax.ws.rs - javax.ws.rs-api - - + com.sun.jersey + jersey-servlet + 1.13 diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java index d126ccd6142..f1ccba25c4f 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java @@ -83,8 +83,8 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) int endIndex = script.indexOf(endReplaceString, startIndex); if (startIndex >= 0 && endIndex >= 0) { - Server server = ZeppelinServer.jettyServer; - String replaceString = "this.getPort=function(){return " + ZeppelinServer.jettyServer.getConnectors()[0].getLocalPort() + "};"; + String replaceString = "this.getPort=function(){return " + + ZeppelinServer.jettyServer.getConnectors()[0].getLocalPort() + "};"; script.replace(startIndex, endIndex + endReplaceString.length(), replaceString); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 756d9a8b32f..d3cdbec7057 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -26,6 +26,7 @@ import javax.net.ssl.SSLContext; import javax.servlet.DispatcherType; +import javax.servlet.Servlet; import javax.ws.rs.core.Application; import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet; @@ -162,13 +163,14 @@ private static Server setupJettyServer(ZeppelinConfiguration conf) private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) throws Exception { - final ServletHolder notebookServer = new ServletHolder("ws", NotebookServer.class); + notebookServer = new NotebookServer(); + final ServletHolder servletHolder = new ServletHolder(notebookServer); final ServletContextHandler cxfContext = new ServletContextHandler( ServletContextHandler.SESSIONS); cxfContext.setSessionHandler(new SessionHandler()); cxfContext.setContextPath("/"); - cxfContext.addServlet(notebookServer, "/ws/*"); + cxfContext.addServlet(servletHolder, "/ws/*"); cxfContext.addFilter(new FilterHolder(CorsFilter.class), "/*", EnumSet.allOf(DispatcherType.class)); return cxfContext; diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index d0c672e3d85..ed35ea1b0ca 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -91,51 +91,51 @@ public void onMessage(NotebookSocket conn, String msg) { LOG.info("RECEIVE << " + messagereceived.op); /** Lets be elegant here */ switch (messagereceived.op) { - case LIST_NOTES: - broadcastNoteList(); - break; - case GET_NOTE: - sendNote(conn, notebook, messagereceived); - break; - case NEW_NOTE: - createNote(conn, notebook); - break; - case DEL_NOTE: - removeNote(conn, notebook, messagereceived); - break; - case COMMIT_PARAGRAPH: - updateParagraph(conn, notebook, messagereceived); - break; - case RUN_PARAGRAPH: - runParagraph(conn, notebook, messagereceived); - break; - case CANCEL_PARAGRAPH: - cancelParagraph(conn, notebook, messagereceived); - break; - case MOVE_PARAGRAPH: - moveParagraph(conn, notebook, messagereceived); - break; - case INSERT_PARAGRAPH: - insertParagraph(conn, notebook, messagereceived); - break; - case PARAGRAPH_REMOVE: - removeParagraph(conn, notebook, messagereceived); - break; - case NOTE_UPDATE: - updateNote(conn, notebook, messagereceived); - break; - case COMPLETION: - completion(conn, notebook, messagereceived); - break; - case PING: - pong(); - break; - case ANGULAR_OBJECT_UPDATED: - angularObjectUpdated(conn, notebook, messagereceived); - break; - default: - broadcastNoteList(); - break; + case LIST_NOTES: + broadcastNoteList(); + break; + case GET_NOTE: + sendNote(conn, notebook, messagereceived); + break; + case NEW_NOTE: + createNote(conn, notebook); + break; + case DEL_NOTE: + removeNote(conn, notebook, messagereceived); + break; + case COMMIT_PARAGRAPH: + updateParagraph(conn, notebook, messagereceived); + break; + case RUN_PARAGRAPH: + runParagraph(conn, notebook, messagereceived); + break; + case CANCEL_PARAGRAPH: + cancelParagraph(conn, notebook, messagereceived); + break; + case MOVE_PARAGRAPH: + moveParagraph(conn, notebook, messagereceived); + break; + case INSERT_PARAGRAPH: + insertParagraph(conn, notebook, messagereceived); + break; + case PARAGRAPH_REMOVE: + removeParagraph(conn, notebook, messagereceived); + break; + case NOTE_UPDATE: + updateNote(conn, notebook, messagereceived); + break; + case COMPLETION: + completion(conn, notebook, messagereceived); + break; + case PING: + pong(); + break; + case ANGULAR_OBJECT_UPDATED: + angularObjectUpdated(conn, notebook, messagereceived); + break; + default: + broadcastNoteList(); + break; } } catch (Exception e) { LOG.error("Can't handle message", e); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java index bc07d21cf62..cb91d655b38 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java @@ -1,12 +1,14 @@ package org.apache.zeppelin.socket; import java.io.IOException; -import java.util.List; import javax.servlet.http.HttpServletRequest; import org.eclipse.jetty.websocket.WebSocket; +/** + * Notebook websocket + */ public class NotebookSocket implements WebSocket.OnTextMessage{ private Connection connection; From 6180ed304872fd20a150abe57729b5e28ca1a63f Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Tue, 28 Jul 2015 14:59:02 +0900 Subject: [PATCH 3/8] Update README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 6aac5715ba9..454bf309f5d 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,7 @@ Yarn ### Run ./bin/zeppelin-daemon.sh start - browse localhost:8080 in your browser. 8081 port should be accessible for websocket connection. + browse localhost:8080 in your browser. For configuration details check __./conf__ subdirectory. From 806db9b7735cda90e3eddc6a5214628f2ddca84a Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Tue, 28 Jul 2015 15:03:03 +0900 Subject: [PATCH 4/8] Remove websocket addr/port configuration --- conf/zeppelin-site.xml.template | 17 +------------- .../zeppelin/conf/ZeppelinConfiguration.java | 22 ------------------- 2 files changed, 1 insertion(+), 38 deletions(-) diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index c2294cbd445..ac43216f8bd 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -28,22 +28,7 @@ zeppelin.server.port 8080 - Server port. port+1 is used for web socket. - - - - zeppelin.websocket.addr - 0.0.0.0 - Testing websocket address - - - - - zeppelin.websocket.port - -1 - Testing websocket port + Server port. diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index d5c81550ec2..2be744bb366 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -268,25 +268,6 @@ public int getServerPort() { return getInt(ConfVars.ZEPPELIN_PORT); } - public String getWebSocketAddress() { - return getString(ConfVars.ZEPPELIN_WEBSOCKET_ADDR); - } - - public int getWebSocketPort() { - int port = getInt(ConfVars.ZEPPELIN_WEBSOCKET_PORT); - int serverPort = getServerPort(); - - if (port < 0) { - if (serverPort <= 0) { - return 0; - } else { - return serverPort + 1; - } - } else { - return port; - } - } - public String getKeyStorePath() { return getRelativeDir( String.format("%s/%s", @@ -389,9 +370,6 @@ public static enum ConfVars { ZEPPELIN_HOME("zeppelin.home", "../"), ZEPPELIN_ADDR("zeppelin.server.addr", "0.0.0.0"), ZEPPELIN_PORT("zeppelin.server.port", 8080), - // negative websocket port denotes that server port + 1 should be used - ZEPPELIN_WEBSOCKET_ADDR("zeppelin.websocket.addr", "0.0.0.0"), - ZEPPELIN_WEBSOCKET_PORT("zeppelin.websocket.port", -1), ZEPPELIN_SSL("zeppelin.ssl", false), ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false), ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "keystore"), From f56e417ce04158becbc32b8e32b4eaa0b4c0a11b Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Tue, 28 Jul 2015 16:03:01 +0900 Subject: [PATCH 5/8] Add license header --- .../apache/zeppelin/socket/NotebookSocket.java | 16 ++++++++++++++++ .../zeppelin/socket/NotebookSocketListener.java | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java index cb91d655b38..aceea456b28 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.socket; import java.io.IOException; diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java index fa0dc8c4c37..77fed6ed7b1 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.socket; /** From 412927fe0a252b9f82c0b885af82da92f2d21b95 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Wed, 29 Jul 2015 00:21:42 +0900 Subject: [PATCH 6/8] Handle large message --- .../main/java/org/apache/zeppelin/server/ZeppelinServer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index d3cdbec7057..1c96680e8f9 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -165,9 +165,11 @@ private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration c notebookServer = new NotebookServer(); final ServletHolder servletHolder = new ServletHolder(notebookServer); + servletHolder.setInitParameter("maxTextMessageSize", "1024000"); final ServletContextHandler cxfContext = new ServletContextHandler( ServletContextHandler.SESSIONS); + cxfContext.setSessionHandler(new SessionHandler()); cxfContext.setContextPath("/"); cxfContext.addServlet(servletHolder, "/ws/*"); From 7f8bc47686e733cc05a9c4cfe5eb32e8ec7e078a Mon Sep 17 00:00:00 2001 From: Sjoerd Mulder Date: Thu, 30 Jul 2015 12:24:30 +0200 Subject: [PATCH 7/8] Cleanup of Javascript logic and Server code detecting the correct port --- .../zeppelin/server/AppScriptServlet.java | 94 ------------------- .../zeppelin/server/ZeppelinServer.java | 5 +- .../src/components/baseUrl/baseUrl.service.js | 44 +++------ .../websocketEvents.factory.js | 2 +- 4 files changed, 16 insertions(+), 129 deletions(-) delete mode 100644 zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java deleted file mode 100644 index f1ccba25c4f..00000000000 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.server; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.util.resource.Resource; - -/** - * Simple servlet to dynamically set the Websocket port - * in the JavaScript sent to the client - */ -public class AppScriptServlet extends DefaultServlet { - - // Hash containing the possible scripts that contain the getPort() - // function originally defined in app.js - private static Set scriptPaths = new HashSet( - Arrays.asList( - "/scripts/scripts.js", - "/components/baseUrl/baseUrl.js" - ) - ); - - public AppScriptServlet() { - } - - @Override - protected void doGet(HttpServletRequest request, HttpServletResponse response) - throws ServletException, - IOException { - - // Process all requests not for the app script to the parent - // class - String uri = request.getRequestURI(); - if (!scriptPaths.contains(uri)) { - super.doGet(request, response); - return; - } - - // Read the script file chunk by chunk - Resource scriptFile = getResource(uri); - InputStream is = scriptFile.getInputStream(); - StringBuffer script = new StringBuffer(); - byte[] buffer = new byte[1024]; - while (is.available() > 0) { - int numRead = is.read(buffer); - if (numRead <= 0) { - break; - } - script.append(new String(buffer, 0, numRead, "UTF-8")); - } - - // Replace the getPort function to return the proper value - String startReplaceString = "/* @preserve AppScriptServlet - getPort */"; - String endReplaceString = "/* @preserve AppScriptServlet - close */"; - - int startIndex = script.indexOf(startReplaceString); - int endIndex = script.indexOf(endReplaceString, startIndex); - - if (startIndex >= 0 && endIndex >= 0) { - String replaceString = "this.getPort=function(){return " - + ZeppelinServer.jettyServer.getConnectors()[0].getLocalPort() + "};"; - script.replace(startIndex, endIndex + endReplaceString.length(), replaceString); - } - - response.getWriter().println(script.toString()); - } -} - diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 1c96680e8f9..ad1d9078952 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -48,6 +48,7 @@ import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.session.SessionHandler; import org.eclipse.jetty.server.ssl.SslSelectChannelConnector; +import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -91,7 +92,7 @@ public static void main(String[] args) throws Exception { * But the rest of swagger is configured here */ final ServletContextHandler swagger = setupSwaggerContextHandler(conf); - + // Notebook server final ServletContextHandler notebook = setupNotebookServer(conf); @@ -272,7 +273,7 @@ private static WebAppContext setupWebAppContext( } // Explicit bind to root webApp.addServlet( - new ServletHolder(new AppScriptServlet()), + new ServletHolder(new DefaultServlet()), "/*" ); return webApp; diff --git a/zeppelin-web/src/components/baseUrl/baseUrl.service.js b/zeppelin-web/src/components/baseUrl/baseUrl.service.js index b5afa208f7f..f5eb2df5970 100644 --- a/zeppelin-web/src/components/baseUrl/baseUrl.service.js +++ b/zeppelin-web/src/components/baseUrl/baseUrl.service.js @@ -15,36 +15,7 @@ angular.module('zeppelinWebApp').service('baseUrlSrv', function() { - /** Get the current port of the websocket - * - * When running Zeppelin, the body of this function will be dynamically - * overridden with the AppScriptServlet from zeppelin-site.xml config value. - * - * If the config value is not defined, it defaults to the HTTP port + 1 - * - * In the case of running "grunt serve", this function will appear - * as is. - */ - - /* @preserve AppScriptServlet - getPort */ this.getPort = function() { - var port = Number(location.port); - if (location.protocol !== 'https:' && !port) { - port = 80; - } else if (location.protocol === 'https:' && !port) { - port = 443; - } else if (port === 3333 || port === 9000) { - port = 8080; - } - return port; - }; - /* @preserve AppScriptServlet - close */ - - this.getWebsocketProtocol = function() { - return location.protocol === 'https:' ? 'wss' : 'ws'; - }; - - this.getRestApiBase = function() { var port = Number(location.port); if (!port) { port = 80; @@ -52,13 +23,22 @@ angular.module('zeppelinWebApp').service('baseUrlSrv', function() { port = 443; } } - + //Exception for when running locally via grunt if (port === 3333 || port === 9000) { port = 8080; } - return location.protocol + '//' + location.hostname + ':' + port + skipTrailingSlash(location.pathname) + '/api'; + return port; + }; + + this.getWebsocketUrl = function() { + var wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:'; + return wsProtocol + '//' + location.hostname + ':' + this.getPort() + '/ws'; }; - + + this.getRestApiBase = function() { + return location.protocol + '//' + location.hostname + ':' + this.getPort() + skipTrailingSlash(location.pathname) + '/api'; + }; + var skipTrailingSlash = function(path) { return path.replace(/\/$/, ''); }; diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index 04c7380ea77..6d9f17742f4 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -16,7 +16,7 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, $websocket, baseUrlSrv) { var websocketCalls = {}; - websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketProtocol() + '://' + location.hostname + ':' + baseUrlSrv.getPort() + '/ws'); + websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketUrl()); websocketCalls.ws.onOpen(function() { console.log('Websocket created'); From 11a302adc2969725f724db767bfe02da40173cf2 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 3 Aug 2015 05:01:00 +0900 Subject: [PATCH 8/8] Check text in more safe way --- .../java/org/apache/zeppelin/ZeppelinIT.java | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java index 779396cb917..b170a951efa 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java @@ -150,6 +150,20 @@ boolean endToEndTestEnabled() { return null != System.getenv("CI"); } + boolean waitForText(final String txt, final By by) { + try { + new WebDriverWait(driver, 5).until(new ExpectedCondition() { + @Override + public Boolean apply(WebDriver d) { + return txt.equals(driver.findElement(by).getText()); + } + }); + return true; + } catch (TimeoutException e) { + return false; + } + } + @Test public void testAngularDisplay() throws InterruptedException{ if (!endToEndTestEnabled()) { @@ -176,8 +190,8 @@ public void testAngularDisplay() throws InterruptedException{ waitForParagraph(1, "FINISHED"); // check expected text - assertEquals("BindingTest__", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest__", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* * Bind variable @@ -190,8 +204,8 @@ public void testAngularDisplay() throws InterruptedException{ waitForParagraph(2, "FINISHED"); // check expected text - assertEquals("BindingTest_1_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_1_", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* @@ -206,8 +220,8 @@ public void testAngularDisplay() throws InterruptedException{ waitForParagraph(3, "FINISHED"); // check expected text - assertEquals("myVar=1", driver.findElement(By.xpath( - getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")).getText()); + waitForText("myVar=1", By.xpath( + getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")); /* * Click element @@ -216,8 +230,8 @@ public void testAngularDisplay() throws InterruptedException{ getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click(); // check expected text - assertEquals("BindingTest_2_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_2_", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* * Register watcher @@ -242,13 +256,13 @@ public void testAngularDisplay() throws InterruptedException{ getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click(); // check expected text - assertEquals("BindingTest_3_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_3_", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); waitForParagraph(3, "FINISHED"); // check expected text by watcher - assertEquals("myVar=3", driver.findElement(By.xpath( - getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")).getText()); + waitForText("myVar=3", By.xpath( + getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")); /* * Unbind @@ -261,8 +275,8 @@ public void testAngularDisplay() throws InterruptedException{ waitForParagraph(5, "FINISHED"); // check expected text - assertEquals("BindingTest__", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest__", + By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* * Bind again and see rebind works. @@ -272,8 +286,8 @@ public void testAngularDisplay() throws InterruptedException{ waitForParagraph(2, "FINISHED"); // check expected text - assertEquals("BindingTest_1_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_1_", + By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); System.out.println("testCreateNotebook Test executed"); }