diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java index f0913648251..913e184619e 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java @@ -105,6 +105,8 @@ public static enum OP { ANGULAR_OBJECT_CLIENT_BIND, // [c-s] angular object updated from AngularJS z object + ANGULAR_OBJECT_CLIENT_UNBIND, // [c-s] angular object unbind from AngularJS z object + LIST_CONFIGURATIONS, // [c-s] ask all key/value pairs of configurations CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations // @param settings serialized Map object diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 98a1aaaec13..9412d7198d7 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -190,6 +190,9 @@ public void onMessage(NotebookSocket conn, String msg) { case ANGULAR_OBJECT_CLIENT_BIND: angularObjectClientBind(conn, userAndRoles, notebook, messagereceived); break; + case ANGULAR_OBJECT_CLIENT_UNBIND: + angularObjectClientUnbind(conn, userAndRoles, notebook, messagereceived); + break; case LIST_CONFIGURATIONS: sendAllConfigurations(conn, userAndRoles, notebook); break; @@ -769,6 +772,45 @@ protected void angularObjectClientBind(NotebookSocket conn, HashSet user } } + /** + * Remove the given Angular variable to the target + * interpreter(s) angular registry given a noteId + * and an optional list of paragraph id(s) + * @param conn + * @param notebook + * @param fromMessage + * @throws Exception + */ + protected void angularObjectClientUnbind(NotebookSocket conn, HashSet userAndRoles, + Notebook notebook, Message fromMessage) + throws Exception{ + String noteId = fromMessage.getType("noteId"); + String varName = fromMessage.getType("name"); + String paragraphId = fromMessage.getType("paragraphId"); + Note note = notebook.getNote(noteId); + + if (paragraphId == null) { + throw new IllegalArgumentException("target paragraph not specified for " + + "angular value unBind"); + } + + if (note != null) { + final InterpreterGroup interpreterGroup = findInterpreterGroupForParagraph(note, + paragraphId); + + final AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); + + if (registry instanceof RemoteAngularObjectRegistry) { + RemoteAngularObjectRegistry remoteRegistry = (RemoteAngularObjectRegistry) registry; + removeAngularFromRemoteRegistry(noteId, paragraphId, varName, remoteRegistry, + interpreterGroup.getId(), conn); + } else { + removeAngularObjectFromLocalRepo(noteId, paragraphId, varName, registry, + interpreterGroup.getId(), conn); + } + } + } + private InterpreterGroup findInterpreterGroupForParagraph(Note note, String paragraphId) throws Exception { final Paragraph paragraph = note.getParagraph(paragraphId); @@ -794,6 +836,20 @@ private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId conn); } + private void removeAngularFromRemoteRegistry(String noteId, String paragraphId, + String varName, RemoteAngularObjectRegistry remoteRegistry, + String interpreterGroupId, NotebookSocket conn) { + final AngularObject ao = remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, + paragraphId); + this.broadcastExcept( + noteId, + new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", noteId) + .put("paragraphId", paragraphId), + conn); + } + private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, String varName, Object varValue, AngularObjectRegistry registry, String interpreterGroupId, NotebookSocket conn) { @@ -813,6 +869,20 @@ private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, Str conn); } + private void removeAngularObjectFromLocalRepo(String noteId, String paragraphId, String varName, + AngularObjectRegistry registry, String interpreterGroupId, NotebookSocket conn) { + final AngularObject removed = registry.remove(varName, noteId, paragraphId); + if (removed != null) { + this.broadcastExcept( + noteId, + new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", removed) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", noteId) + .put("paragraphId", paragraphId), + conn); + } + } + private void moveParagraph(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index 6989c1620ba..98895deca63 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -262,6 +262,96 @@ public void should_bind_angular_object_to_local_for_paragraphs() throws Exceptio verify(otherConn).send(mdMsg1); } + @Test + public void should_unbind_angular_object_from_remote_for_paragraphs() throws Exception { + //Given + final String varName = "name"; + final String value = "val"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_UNBIND) + .put("noteId", "noteId") + .put("name", varName) + .put("paragraphId", "paragraphId"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS); + when(note.getParagraph("paragraphId")).thenReturn(paragraph); + + final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + + when(paragraph.getCurrentRepl().getInterpreterGroup()).thenReturn(mdGroup); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraphId"); + when(mdRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraphId")).thenReturn(ao1); + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraphId")); + + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientUnbind(conn, new HashSet(), notebook, messageReceived); + + // Then + verify(mdRegistry, never()).removeAndNotifyRemoteProcess(varName, "noteId", null); + + verify(otherConn).send(mdMsg1); + } + + @Test + public void should_unbind_angular_object_from_local_for_paragraphs() throws Exception { + //Given + final String varName = "name"; + final String value = "val"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_UNBIND) + .put("noteId", "noteId") + .put("name", varName) + .put("paragraphId", "paragraphId"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS); + when(note.getParagraph("paragraphId")).thenReturn(paragraph); + + final AngularObjectRegistry mdRegistry = mock(AngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + + when(paragraph.getCurrentRepl().getInterpreterGroup()).thenReturn(mdGroup); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraphId"); + + + when(mdRegistry.remove(varName, "noteId", "paragraphId")).thenReturn(ao1); + + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraphId")); + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientUnbind(conn, new HashSet(), notebook, messageReceived); + + // Then + verify(otherConn).send(mdMsg1); + } + private NotebookSocket createWebSocket() { NotebookSocket sock = mock(NotebookSocket.class); when(sock.getRequest()).thenReturn(createHttpServletRequest()); diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 097ee84fbb6..a6489fd6209 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -35,6 +35,13 @@ angular.module('zeppelinWebApp') if (paragraphId) { websocketMsgSrv.clientBindAngularObject($routeParams.noteId, varName, value, paragraphId); } + }, + // Example: z.angularUnBind('my_var', '20150213-231621_168813393') + angularUnbind: function(varName, paragraphId) { + // Only push to server if paragraphId is defined + if (paragraphId) { + websocketMsgSrv.clientUnbindAngularObject($routeParams.noteId, varName, paragraphId); + } } }; diff --git a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js index 3fba0f5e70e..3b4df03796c 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js +++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js @@ -82,6 +82,17 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, }); }, + clientUnbindAngularObject: function(noteId, name, paragraphId) { + websocketEvents.sendNewEvent({ + op: 'ANGULAR_OBJECT_CLIENT_UNBIND', + data: { + noteId: noteId, + name: name, + paragraphId: paragraphId + } + }); + }, + cancelParagraphRun: function(paragraphId) { websocketEvents.sendNewEvent({op: 'CANCEL_PARAGRAPH', data: {id: paragraphId}}); },