diff --git a/docs/manual/interpreters.md b/docs/manual/interpreters.md index c47d5369d8a..6255f983e03 100644 --- a/docs/manual/interpreters.md +++ b/docs/manual/interpreters.md @@ -82,3 +82,47 @@ interpreter.start() The above code will start interpreter thread inside your process. Once the interpreter is started you can configure zeppelin to connect to RemoteInterpreter by checking **Connect to existing process** checkbox and then provide **Host** and **Port** on which interpreter porocess is listening as shown in the image below: + +## (Experimental) Interpreter Execution Callbacks + +Zeppelin allows for users to specify additional code to be executed by an interpreter at pre and post-paragraph code execution. This is primarily useful if you need to run the same set of code for all of the paragraphs within your notebook at specific times. Currently, this feature is only available for the spark and pyspark interpreters. To specify your "callback" code, you may use '`z.registerCallback()`. For example, enter the following into one paragraph: + +```python +%pyspark +z.registerCallback("post_exec", "print 'This code should be executed before the parapgraph code!'") +z.registerCallback("pre_exec", "print 'This code should be executed after the paragraph code!'") +``` + +These calls will not take into effect until the next time you run a paragraph. In another paragraph, enter +```python +%pyspark +print "This code should be entered into the paragraph by the user!" +``` + +The output should be: +``` +This code should be executed before the paragraph code! +This code should be entered into the paragraph by the user! +This code should be executed after the paragraph code! +``` + +If you ever need to know the callback code, use `z.getCallback()`: +```python +%pyspark +print z.getCallback("post_exec") +``` +``` +print 'This code should be executed after the paragraph code!' +``` +Any call to `z.registerCallback()` will automatically overwrite what was previously registered. To completely unregister a callback event, use `z.unregisterCallback(eventCode)`. Currently only `"post_exec"` and `"pre_exec"` are valid event codes for the Zeppelin Callback Registry system. + +Finally, the callback registry is internally shared by other interpreters in the same group. This would allow for callback code for one interpreter REPL to be set by another as follows: + +```scala +%spark +z.unregisterCallback("post_exec", "pyspark") +``` +The API is identical for both the spark (scala) and pyspark (python) implementations. + +### Caveats +Calls to `z.registerCallback("pre_exec", ...)` should be made with care. If there are errors in your specified callback code, this will cause the interpreter REPL to become unable to execute any code pass the pre-execute stage making it impossible for direct calls to `z.unregisterCallback()` to take into effect. Current workarounds include calling `z.unregisterCallback()` from a different interpreter REPL in the same interpreter group (see above) or manually restarting the interpreter group in the UI. diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index 1d8f437980a..6657a1ebdab 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -40,7 +40,7 @@ public static void setUp() { Properties p = new Properties(); flink = new FlinkInterpreter(p); flink.open(); - context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null); + context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null); } @AfterClass diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java index f1517631c0b..78f86aeffb6 100644 --- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java @@ -40,7 +40,7 @@ public class IgniteInterpreterTest { private static final String HOST = "127.0.0.1:47500..47509"; private static final InterpreterContext INTP_CONTEXT = - new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null); + new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null); private IgniteInterpreter intp; private Ignite ignite; diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java index 9076c360dd6..07f11f883be 100644 --- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java @@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest { private static final String HOST = "127.0.0.1:47500..47509"; private static final InterpreterContext INTP_CONTEXT = - new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null); + new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null); private Ignite ignite; private IgniteSqlInterpreter intp; diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java index bd5bae686f6..145432bcb0c 100644 --- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java +++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java @@ -87,7 +87,7 @@ public void setUp() throws Exception { insertStatement.setString(1, null); insertStatement.execute(); interpreterContext = new InterpreterContext("", "1", "", "", new AuthenticationInfo(), null, null, null, null, - null, null); + null, null, null); } diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java index f9538562c22..f81f47ffda3 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java @@ -80,7 +80,7 @@ public void setUp() throws Exception { context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(), new HashMap(), new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), null, + new AngularObjectRegistry(intpGroup.getId(), null), null, null, new LinkedList(), new InterpreterOutput( new InterpreterOutputListener() { @Override public void onAppend(InterpreterOutput out, byte[] line) {} diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java index 7ffbd975b3e..2c0651e176b 100644 --- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java +++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java @@ -66,7 +66,7 @@ public void setUp() throws Exception { InterpreterGroup intpGroup = new InterpreterGroup(); context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(), new HashMap(), new GUI(), new AngularObjectRegistry( - intpGroup.getId(), null), null, + intpGroup.getId(), null), null, null, new LinkedList(), null); } diff --git a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java index acdb65cc5ce..6c6978c7894 100644 --- a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java +++ b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java @@ -47,7 +47,7 @@ public void tearDown() throws Exception { @Test public void test() { shell.open(); - InterpreterContext context = new InterpreterContext("", "1", "", "", null, null, null, null, null, null, null); + InterpreterContext context = new InterpreterContext("", "1", "", "", null, null, null, null, null, null, null, null); InterpreterResult result = new InterpreterResult(Code.ERROR); if (System.getProperty("os.name").startsWith("Windows")) { result = shell.interpret("dir", context); @@ -64,7 +64,7 @@ public void test() { @Test public void testInvalidCommand(){ shell.open(); - InterpreterContext context = new InterpreterContext("","1","","",null,null,null,null,null,null,null); + InterpreterContext context = new InterpreterContext("","1","","",null,null,null,null,null,null,null,null); InterpreterResult result = new InterpreterResult(Code.ERROR); if (System.getProperty("os.name").startsWith("Windows")) { result = shell.interpret("invalid_command\ndir",context); diff --git a/spark/pom.xml b/spark/pom.xml index 66d93c42ee6..b42a405b1d7 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -51,6 +51,12 @@ slf4j-log4j12 + + ${project.groupId} + zeppelin-zengine + ${project.version} + + ${project.groupId} zeppelin-display_${scala.binary.version} diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index 7bccbac7d52..06f1dc98ad2 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -32,14 +32,17 @@ import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.zeppelin.annotation.ZeppelinApi; +import org.apache.zeppelin.annotation.Experimental; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectWatcher; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.display.Input.ParamOption; +import org.apache.zeppelin.interpreter.InterpreterCallbackRegistry; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; @@ -695,6 +698,74 @@ private void angularUnbind(String name, String noteId) { registry.remove(name, noteId, null); } + /** + * General function to register callback event + * @param event The type of event to hook to (pre_exec, post_exec) + * @param cmd The code to be executed by the interpreter on given event + * @param replName Name of the interpreter + */ + @Experimental + public void registerCallback(String event, String cmd, String replName) { + InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); + String noteId = interpreterContext.getNoteId(); + callbacks.register(noteId, replName, event, cmd); + } + + /** + * registerCallback() wrapper for the spark (scala) interpreter + * @param event The type of event to hook to (pre_exec, post_exec) + * @param cmd The code to be executed by the interpreter on given event + */ + @Experimental + public void registerCallback(String event, String cmd) { + String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText()); + registerCallback(event, cmd, replName); + } + + /** + * Get the callback code + * @param event The type of event to hook to (pre_exec, post_exec) + * @param replName Name of the interpreter + */ + @Experimental + public String getCallback(String event, String replName) { + InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); + String noteId = interpreterContext.getNoteId(); + return callbacks.get(noteId, replName, event); + } + + /** + * getCallback() wrapper for the spark (scala) interpreter + * @param event The type of event to hook to (pre_exec, post_exec) + * @param cmd The code to be executed by the interpreter on given event + */ + @Experimental + public String getCallback(String event) { + String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText()); + return getCallback(event, replName); + } + + /** + * Unbind code from given callback event + * @param event The type of event to hook to (pre_exec, post_exec) + * @param replName Name of the interpreter + */ + @Experimental + public void unregisterCallback(String event, String replName) { + InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); + String noteId = interpreterContext.getNoteId(); + callbacks.unregister(noteId, replName, event); + } + + /** + * Unbind code from given callback event + * @param event The type of event to hook to (pre_exec, post_exec) + */ + @Experimental + public void unregisterCallback(String event) { + String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText()); + unregisterCallback(event, replName); + } /** * Add object into resource pool diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 53465c2cd80..58e5fbaf313 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -80,16 +80,16 @@ def put(self, key, value): def get(self, key): return self.__getitem__(key) - def input(self, name, defaultValue = ""): + def input(self, name, defaultValue=""): return self.z.input(name, defaultValue) - def select(self, name, options, defaultValue = ""): + def select(self, name, options, defaultValue=""): # auto_convert to ArrayList doesn't match the method signature on JVM side tuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options)) iterables = gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(tuples) return self.z.select(name, defaultValue, iterables) - def checkbox(self, name, options, defaultChecked = None): + def checkbox(self, name, options, defaultChecked=None): if defaultChecked is None: defaultChecked = list(map(lambda items: items[0], options)) optionTuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options)) @@ -99,6 +99,23 @@ def checkbox(self, name, options, defaultChecked = None): checkedIterables = self.z.checkbox(name, defaultCheckedIterables, optionIterables) return gateway.jvm.scala.collection.JavaConversions.asJavaCollection(checkedIterables) + def registerCallback(self, event, cmd, replName=None): + if replName is None: + self.z.registerCallback(event, cmd) + else: + self.z.registerCallback(event, cmd, replName) + + def unregisterCallback(self, event, replName=None): + if replName is None: + self.z.unregisterCallback(event) + else: + self.z.unregisterCallback(event, replName) + + def getCallback(self, event, replName=None): + if replName is None: + return self.z.getCallback(event) + return self.z.getCallback(event, replName) + def __tupleToScalaTuple2(self, tuple): if (len(tuple) == 2): return gateway.jvm.scala.Tuple2(tuple[0], tuple[1]) diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java index 03ecb9efedd..b73c87d514e 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java @@ -67,7 +67,7 @@ public void setUp() throws Exception { context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(), new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - null, + null, null, new LinkedList(), null); } diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 1c7979fc428..d2f481bed39 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -97,6 +97,7 @@ public void setUp() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("id"), new LinkedList(), new InterpreterOutput(new InterpreterOutputListener() { diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index badd0404bc5..e33b7259756 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -78,6 +78,7 @@ public void setUp() throws Exception { context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(), new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("id"), new LinkedList(), new InterpreterOutput(new InterpreterOutputListener() { @Override diff --git a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala index 9b5cd6269c8..65c3a276d79 100644 --- a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala +++ b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala @@ -36,7 +36,7 @@ trait AbstractAngularElemTest val context = new InterpreterContext("note", "paragraph", "title", "text", new AuthenticationInfo(), new util.HashMap[String, Object](), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - null, + null, null, new util.LinkedList[InterpreterContextRunner](), new InterpreterOutput(new InterpreterOutputListener() { override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = { diff --git a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala index 33ca5086427..8f046c7eb4e 100644 --- a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala +++ b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala @@ -32,7 +32,7 @@ with BeforeAndAfter with BeforeAndAfterEach with Eventually with Matchers { val context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(), new java.util.HashMap[String, Object](), new GUI(), new AngularObjectRegistry( intpGroup.getId(), null), - null, + null, null, new java.util.LinkedList[InterpreterContextRunner](), new InterpreterOutput(new InterpreterOutputListener() { override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java index 9678b4691df..4c12f7cbf6b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java @@ -140,6 +140,7 @@ public void destroy() { public static Logger logger = LoggerFactory.getLogger(Interpreter.class); private InterpreterGroup interpreterGroup; private URL [] classloaderUrls; + protected Map callbackRegistry = new HashMap(); protected Properties property; @ZeppelinApi @@ -370,4 +371,5 @@ public static RegisteredInterpreter findRegisteredInterpreterByClassName(String } return null; } + } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackListener.java new file mode 100644 index 00000000000..5f404ff308f --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +/** + * An interface for processing custom callback code into the interpreter. + */ +public interface InterpreterCallbackListener { + public String onPreExecute(String script); + public String onPostExecute(String script); +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistry.java new file mode 100644 index 00000000000..88b1de3d5b2 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistry.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.util.HashMap; +import java.util.Map; + +/** + * The InterpreterinterpreterCallbackRegistry specifies code to be conditionally executed by an + * interpreter. The constants defined in this class denote currently + * supported events. Each instance is bound to a single InterpreterGroup + * and has Note-wide scope. + */ +public class InterpreterCallbackRegistry { + // Execute the callback code PRIOR to main paragraph code execution + public static final String PRE_EXEC = "pre_exec"; + + // Execute the callback code AFTER main paragraph code execution + public static final String POST_EXEC = "post_exec"; + + private InterpreterCallbackRegistryListener listener; + private String interpreterId; + private Map>> registry = + new HashMap>>(); + + /** + * CallbackRegistry constructor. + * + * @param interpreterId The Id of the InterpreterGroup instance to bind to + * @param listener InterpreterCallbackRegistryListener instance to use for broadcasting + */ + public InterpreterCallbackRegistry(final String interpreterId, + final InterpreterCallbackRegistryListener listener) { + this.interpreterId = interpreterId; + this.listener = listener; + } + + /** + * Get the interpreterGroup id this instance is bound to + */ + public String getInterpreterId() { + return interpreterId; + } + + /** + * Adds a note to the registry + * + * @param noteId The Id of the Note instance to add + */ + public void addNote(String noteId) { + synchronized (registry) { + if (registry.get(noteId) == null) { + registry.put(noteId, new HashMap>()); + } + } + } + + /** + * Adds a replName to the registry + * + * @param noteId The note id + * @param replName The name of the interpreter repl to map the callbacks to + */ + public void addRepl(String noteId, String replName) { + synchronized (registry) { + addNote(noteId); + if (registry.get(noteId).get(replName) == null) { + registry.get(noteId).put(replName, new HashMap()); + } + } + } + + /** + * Register a callback for a specific event. + * + * @param noteId Denotes the note this instance belongs to + * @param replName The name of the interpreter repl to map the callbacks to + * @param event Callback event (see constants defined in this class) + * @param cmd Code to be executed by the interpreter + */ + public void register(String noteId, String replName, + String event, String cmd) throws IllegalArgumentException { + synchronized (registry) { + addRepl(noteId, replName); + if (!event.equals(POST_EXEC) && !event.equals(PRE_EXEC)) { + throw new IllegalArgumentException("Must be " + POST_EXEC + + " or " + PRE_EXEC); + } + registry.get(noteId).get(replName).put(event, cmd); + if (listener != null) { + listener.onRegister(interpreterId, noteId, replName, event, cmd); + } + } + } + + /** + * Unregister a callback for a specific event. + * + * @param noteId Denotes the note this instance belongs to + * @param replName The name of the interpreter repl to map the callbacks to + * @param event Callback event (see constants defined in this class) + */ + public void unregister(String noteId, String replName, String event) { + synchronized (registry) { + addRepl(noteId, replName); + registry.get(noteId).get(replName).remove(event); + if (listener != null) { + listener.onUnregister(interpreterId, noteId, replName, event); + } + } + } + + /** + * Get a callback for a specific event. + * + * @param noteId Denotes the note this instance belongs to + * @param replName The name of the interpreter repl to map the callbacks to + * @param event Callback event (see constants defined in this class) + */ + public String get(String noteId, String replName, String event) { + synchronized (registry) { + addRepl(noteId, replName); + return registry.get(noteId).get(replName).get(event); + } + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryListener.java new file mode 100644 index 00000000000..cf063428e6f --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +/** + * This interface is used to broadcast changes in the callback registry + * from a remote interpreter process to the Zeppelin Engine. + */ +public interface InterpreterCallbackRegistryListener { + public void onRegister(String interpreterGroupId, String noteId, String replName, + String event, String cmd); + public void onUnregister(String interpreterGroupId, String noteId, String replName, + String event); +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 21ca2e67b72..3e74377ae4c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -55,6 +55,7 @@ public static void remove() { private final Map config; private GUI gui; private AngularObjectRegistry angularObjectRegistry; + private InterpreterCallbackRegistry callbackRegistry; private ResourcePool resourcePool; private List runners; @@ -66,6 +67,7 @@ public InterpreterContext(String noteId, Map config, GUI gui, AngularObjectRegistry angularObjectRegistry, + InterpreterCallbackRegistry callbackRegistry, ResourcePool resourcePool, List runners, InterpreterOutput out @@ -78,6 +80,7 @@ public InterpreterContext(String noteId, this.config = config; this.gui = gui; this.angularObjectRegistry = angularObjectRegistry; + this.callbackRegistry = callbackRegistry; this.resourcePool = resourcePool; this.runners = runners; this.out = out; @@ -116,6 +119,10 @@ public AngularObjectRegistry getAngularObjectRegistry() { return angularObjectRegistry; } + public InterpreterCallbackRegistry getInterpreterCallbackRegistry() { + return callbackRegistry; + } + public ResourcePool getResourcePool() { return resourcePool; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java index bc56784b15b..4778be3b1f1 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -45,6 +45,7 @@ public class InterpreterGroup extends ConcurrentHashMap callbackInfo = new HashMap(); + callbackInfo.put("noteId", noteId); + callbackInfo.put("replName", replName); + callbackInfo.put("eventName", eventName); + callbackInfo.put("cmd", cmd); + Type mapType = new TypeToken>() {}.getType(); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.REGISTER_CALLBACK, gson.toJson(callbackInfo, mapType))); + } + + /** + * notify callback unregistration + */ + public void unregisterCallback(String noteId, String replName, String eventName) { + Map callbackInfo = new HashMap(); + callbackInfo.put("noteId", noteId); + callbackInfo.put("replName", replName); + callbackInfo.put("eventName", eventName); + Type mapType = new TypeToken>() {}.getType(); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.UNREGISTER_CALLBACK, gson.toJson(callbackInfo, mapType))); + } /** * Get all resources except for specific resourcePool diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 090aeeaaee3..5f8b1c760d7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -23,6 +23,7 @@ import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.InterpreterCallbackRegistry; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; @@ -35,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Type; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; @@ -115,8 +117,11 @@ public void run() { } Gson gson = new Gson(); + Type mapType = new TypeToken>() {}.getType(); AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry(); + InterpreterCallbackRegistry callbackRegistry = + interpreterGroup.getInterpreterCallbackRegistry(); try { if (event.getType() == RemoteInterpreterEventType.NO_OP) { @@ -141,6 +146,17 @@ public void run() { AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class); angularObjectRegistry.remove(angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId()); + } else if (event.getType() == RemoteInterpreterEventType.REGISTER_CALLBACK) { + Map callbackInfo = gson.fromJson(event.getData(), mapType); + callbackRegistry.register(callbackInfo.get("noteId"), + callbackInfo.get("replName"), + callbackInfo.get("eventName"), + callbackInfo.get("cmd")); + } else if (event.getType() == RemoteInterpreterEventType.UNREGISTER_CALLBACK) { + Map callbackInfo = gson.fromJson(event.getData(), mapType); + callbackRegistry.unregister(callbackInfo.get("noteId"), + callbackInfo.get("replName"), + callbackInfo.get("eventName")); } else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) { InterpreterContextRunner runnerFromRemote = gson.fromJson( event.getData(), RemoteInterpreterContextRunner.class); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 7ddb92838f4..fdf554b6570 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -55,11 +55,13 @@ */ public class RemoteInterpreterServer extends Thread - implements RemoteInterpreterService.Iface, AngularObjectRegistryListener { + implements RemoteInterpreterService.Iface, AngularObjectRegistryListener, + InterpreterCallbackRegistryListener { Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class); InterpreterGroup interpreterGroup; AngularObjectRegistry angularObjectRegistry; + InterpreterCallbackRegistry callbackRegistry; DistributedResourcePool resourcePool; private ApplicationLoader appLoader; @@ -152,7 +154,9 @@ public void createInterpreter(String interpreterGroupId, String noteId, String if (interpreterGroup == null) { interpreterGroup = new InterpreterGroup(interpreterGroupId); angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this); + callbackRegistry = new InterpreterCallbackRegistry(interpreterGroup.getId(), this); resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient); + interpreterGroup.setInterpreterCallbackRegistry(callbackRegistry); interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); interpreterGroup.setResourcePool(resourcePool); @@ -494,6 +498,7 @@ private InterpreterContext convert(RemoteInterpreterContext ric, InterpreterOutp new TypeToken>() {}.getType()), gson.fromJson(ric.getGui(), GUI.class), interpreterGroup.getAngularObjectRegistry(), + interpreterGroup.getInterpreterCallbackRegistry(), interpreterGroup.getResourcePool(), contextRunners, output); } @@ -587,6 +592,17 @@ public void onRemove(String interpreterGroupId, String name, String noteId, Stri eventClient.angularObjectRemove(name, noteId, paragraphId); } + @Override + public void onRegister(String interpreterGroupId, String noteId, String replName, + String event, String cmd) { + eventClient.registerCallback(noteId, replName, event, cmd); + } + + @Override + public void onUnregister(String interpreterGroupId, String noteId, String replName, + String event) { + eventClient.unregisterCallback(noteId, replName, event); + } /** * Poll event from RemoteInterpreterEventPoller diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java index 9ceba88826d..f724b0de916 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-6-8") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class InterpreterCompletion implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java index a192899daa1..4bc434f2e6a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class RemoteApplicationResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java index f8b63ff3422..fc0803b7a44 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-6-8") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class RemoteInterpreterContext implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java index 0db5697bbf5..c4d36a906bc 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-6-8") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class RemoteInterpreterEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 955461951f3..31475d7a9df 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -39,7 +39,9 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { OUTPUT_APPEND(8), OUTPUT_UPDATE(9), ANGULAR_REGISTRY_PUSH(10), - APP_STATUS_UPDATE(11); + APP_STATUS_UPDATE(11), + REGISTER_CALLBACK(12), + UNREGISTER_CALLBACK(13); private final int value; @@ -82,6 +84,10 @@ public static RemoteInterpreterEventType findByValue(int value) { return ANGULAR_REGISTRY_PUSH; case 11: return APP_STATUS_UPDATE; + case 12: + return REGISTER_CALLBACK; + case 13: + return UNREGISTER_CALLBACK; default: return null; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java index 1af0ce0b2b5..dad455ff296 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-6-8") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class RemoteInterpreterResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java index e3ddeebc664..8b86e11381c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-6-8") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class RemoteInterpreterService { public interface Iface { diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 32be4a4a0a8..f82847aa577 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -50,6 +50,8 @@ enum RemoteInterpreterEventType { OUTPUT_UPDATE = 9, ANGULAR_REGISTRY_PUSH = 10, APP_STATUS_UPDATE = 11, + REGISTER_CALLBACK = 12, + UNREGISTER_CALLBACK = 13, } struct RemoteInterpreterEvent { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryTest.java new file mode 100644 index 00000000000..ef36145d973 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +public class InterpreterCallbackRegistryTest { + + @Test + public void testBasic() { + final AtomicInteger onRegister = new AtomicInteger(0); + final AtomicInteger onUnregister = new AtomicInteger(0); + final String PRE_EXEC = InterpreterCallbackRegistry.PRE_EXEC; + final String POST_EXEC = InterpreterCallbackRegistry.POST_EXEC; + final String noteId = "note"; + final String replName = "repl"; + final String preExecCallback = "pre"; + final String postExecCallback = "post"; + InterpreterCallbackRegistry registry = new InterpreterCallbackRegistry("intpId", + new InterpreterCallbackRegistryListener() { + + @Override + public void onRegister(String interpreterGroupId, String noteId, String replName, + String event, String cmd) { + onRegister.incrementAndGet(); + } + + @Override + public void onUnregister(String interpreterGroupId, String noteId, String replName, + String event) { + onUnregister.incrementAndGet(); + } + }); + + // Test register() + registry.register(noteId, replName, PRE_EXEC, preExecCallback); + registry.register(noteId, replName, POST_EXEC, postExecCallback); + assertEquals(2, onRegister.get()); + assertEquals(0, onUnregister.get()); + + // Test get() + assertEquals(registry.get(noteId, replName, PRE_EXEC), preExecCallback); + assertEquals(registry.get(noteId, replName, POST_EXEC), postExecCallback); + + // Test Unregister + registry.unregister(noteId, replName, PRE_EXEC); + assertNull(registry.get(noteId, replName, PRE_EXEC)); + assertEquals(1, onUnregister.get()); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidEventCode() { + InterpreterCallbackRegistry registry = new InterpreterCallbackRegistry("intpId", null); + + // Test that only valid event codes ("pre_exec", "post_exec") are accepted + registry.register("foo", "bar", "baz", "whatever"); + } + +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java index 764c8b3823b..ecdf1089efe 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java @@ -27,7 +27,7 @@ public class InterpreterContextTest { public void testThreadLocal() { assertNull(InterpreterContext.get()); - InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null)); + InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null)); assertNotNull(InterpreterContext.get()); InterpreterContext.remove(); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java index bc34539c6aa..af57ba7a487 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java @@ -36,8 +36,8 @@ public void isOpenTest() { assertFalse("Interpreter is not open", lazyOpenInterpreter.isOpen()); InterpreterContext interpreterContext = - new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null); + new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null, null); lazyOpenInterpreter.interpret("intp 1", interpreterContext); assertTrue("Interpeter is open", lazyOpenInterpreter.isOpen()); } -} \ No newline at end of file +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java index 5def888b637..16159116b02 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -90,6 +90,7 @@ public void setUp() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null); @@ -174,7 +175,7 @@ public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedExcepti // create object localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null); - // get from remote registry + // get from remote registry ret = intp.interpret("get", context); Thread.sleep(500); // waitFor eventpoller pool event result = ret.message().split(" "); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index 74649b1e750..2d86ee3ed90 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -88,7 +88,7 @@ private InterpreterContext createInterpreterContext() { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - null, + null, null, new LinkedList(), null); } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index af1c4471a05..0f89e082b33 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -150,6 +150,7 @@ public void testRemoteInterperterCall() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -186,6 +187,7 @@ public void testRemoteInterperterErrorStatus() throws TTransportException, IOExc new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -242,6 +244,7 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); assertEquals("500", ret.message()); @@ -256,6 +259,7 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); assertEquals("1000", ret.message()); @@ -310,6 +314,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); } @@ -346,6 +351,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); } @@ -412,6 +418,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -492,6 +499,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -594,6 +602,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); } @@ -750,6 +759,7 @@ public void testEnvronmentAndPropertySet() { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java index 02dba20c05d..347c7de863f 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java @@ -84,7 +84,7 @@ public void setUp() throws Exception { MockInterpreterResourcePool.class.getName(), new File(INTERPRETER_SCRIPT).getAbsolutePath(), "fake", - "fakeRepo", + "fakeRepo", env, 10 * 1000, null, @@ -106,6 +106,7 @@ public void setUp() throws Exception { new GUI(), null, null, + null, new LinkedList(), null); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index f17d88d50af..c54926add94 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -32,6 +32,7 @@ import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterCallbackRegistry; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -116,6 +117,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); return "1000"; @@ -192,6 +194,7 @@ public void testAbortOnPending() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null); @@ -230,6 +233,7 @@ protected boolean jobAbort() { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 40dcc30b36c..85a1c20a1f7 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -639,6 +639,7 @@ public InterpreterGroup createInterpreterGroup(String id, InterpreterOption opti if (option == null) throw new NullArgumentException("option"); + InterpreterCallbackRegistry callbackRegistry = new InterpreterCallbackRegistry(id, null); AngularObjectRegistry angularObjectRegistry; InterpreterGroup interpreterGroup = new InterpreterGroup(id); @@ -652,6 +653,7 @@ public InterpreterGroup createInterpreterGroup(String id, InterpreterOption opti } interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); + interpreterGroup.setInterpreterCallbackRegistry(callbackRegistry); return interpreterGroup; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 7807abba416..31c6f57a1dd 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -301,15 +301,23 @@ protected Object jobRun() throws Throwable { } } + // Get text input from paragraph String script = getScriptBody(); + + // Process pre/post-execute callback hooks + final InterpreterCallbackRegistry callbacks = repl.getInterpreterGroup() + .getInterpreterCallbackRegistry(); + script = processInterpreterCallbacks(callbacks, replName, script); + // inject form if (repl.getFormType() == FormType.NATIVE) { settings.clear(); } else if (repl.getFormType() == FormType.SIMPLE) { String scriptBody = getScriptBody(); - Map inputs = Input.extractSimpleQueryParam(scriptBody); // inputs will be built - // from script body - + scriptBody = processInterpreterCallbacks(callbacks, replName, scriptBody); + + // inputs will be built from script body + Map inputs = Input.extractSimpleQueryParam(scriptBody); final AngularObjectRegistry angularRegistry = repl.getInterpreterGroup() .getAngularObjectRegistry(); @@ -403,6 +411,39 @@ protected boolean jobAbort() { } return true; } + + private String processInterpreterCallbacks(final InterpreterCallbackRegistry callbacks, + final String replName, + String script) { + if (callbacks == null) { + return script; + } + + InterpreterCallbackListener callbackListener = new InterpreterCallbackListener() { + @Override + public String onPreExecute(String script) { + String cmd = callbacks.get(note.getId(), replName, InterpreterCallbackRegistry.PRE_EXEC); + if (cmd != null) { + script = cmd + '\n' + script; + } + + return script; + } + + @Override + public String onPostExecute(String script) { + String cmd = callbacks.get(note.getId(), replName, InterpreterCallbackRegistry.POST_EXEC); + if (cmd != null) { + script += '\n' + cmd; + } + + return script; + } + }; + script = callbackListener.onPreExecute(script); + script = callbackListener.onPostExecute(script); + return script; + } private InterpreterContext getInterpreterContext() { final Paragraph self = this; @@ -438,11 +479,13 @@ private void updateParagraphResult(InterpreterOutput out) { private InterpreterContext getInterpreterContext(InterpreterOutput output) { AngularObjectRegistry registry = null; + InterpreterCallbackRegistry callbacks = null; ResourcePool resourcePool = null; if (!factory.getInterpreterSettings(note.getId()).isEmpty()) { InterpreterSetting intpGroup = factory.getInterpreterSettings(note.getId()).get(0); registry = intpGroup.getInterpreterGroup(note.getId()).getAngularObjectRegistry(); + callbacks = intpGroup.getInterpreterGroup(note.getId()).getInterpreterCallbackRegistry(); resourcePool = intpGroup.getInterpreterGroup(note.getId()).getResourcePool(); } @@ -469,6 +512,7 @@ private InterpreterContext getInterpreterContext(InterpreterOutput output) { this.getConfig(), this.settings, registry, + callbacks, resourcePool, runners, output); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index 09031a59351..e326d3f9617 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -88,7 +88,7 @@ public void setUp() throws Exception { schedulerFactory = new SchedulerFactory(); depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo"); factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver); - context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null); + context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null, null); SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf);