From eb26886986ec067a6104fa5de6f8bd99deb2ee1b Mon Sep 17 00:00:00 2001 From: Alex Goodman Date: Tue, 13 Sep 2016 13:32:36 -0700 Subject: [PATCH 1/9] First attempt at adding pre/post-execute callbacks Added support for pre/post-execute callback hooks to interpreters --- .../zeppelin/flink/FlinkInterpreterTest.java | 2 +- .../ignite/IgniteInterpreterTest.java | 2 +- .../ignite/IgniteSqlInterpreterTest.java | 2 +- .../zeppelin/jdbc/JDBCInterpreterTest.java | 2 +- python/src/main/resources/bootstrap.pyc | Bin 0 -> 5788 bytes .../PythonInterpreterPandasSqlTest.java | 2 +- .../scalding/ScaldingInterpreterTest.java | 2 +- .../zeppelin/shell/ShellInterpreterTest.java | 4 +- .../zeppelin/spark/DepInterpreterTest.java | 2 +- .../zeppelin/spark/SparkInterpreterTest.java | 1 + .../spark/SparkSqlInterpreterTest.java | 1 + .../angular/AbstractAngularElemTest.scala | 2 +- .../angular/AbstractAngularModelTest.scala | 2 +- .../zeppelin/interpreter/Interpreter.java | 2 + .../InterpreterCallbackListener.java | 26 ++++ .../InterpreterCallbackRegistry.java | 120 ++++++++++++++++++ .../interpreter/InterpreterContext.java | 7 + .../interpreter/InterpreterGroup.java | 6 + .../interpreter/dev/InterpreterCallback.java | 33 +++++ .../remote/RemoteInterpreterServer.java | 1 + .../interpreter/InterpreterContextTest.java | 2 +- .../interpreter/LazyOpenInterpreterTest.java | 4 +- .../remote/RemoteAngularObjectTest.java | 3 +- .../RemoteInterpreterOutputTestStream.java | 2 +- .../remote/RemoteInterpreterTest.java | 10 ++ .../resource/DistributedResourcePoolTest.java | 3 +- .../scheduler/RemoteSchedulerTest.java | 4 + .../apache/zeppelin/notebook/Paragraph.java | 83 +++++++++--- .../interpreter/InterpreterFactoryTest.java | 2 +- 29 files changed, 296 insertions(+), 36 deletions(-) create mode 100644 python/src/main/resources/bootstrap.pyc create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackListener.java create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistry.java create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/InterpreterCallback.java diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index 1d8f437980a..6657a1ebdab 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -40,7 +40,7 @@ public static void setUp() { Properties p = new Properties(); flink = new FlinkInterpreter(p); flink.open(); - context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null); + context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null); } @AfterClass diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java index f1517631c0b..78f86aeffb6 100644 --- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java @@ -40,7 +40,7 @@ public class IgniteInterpreterTest { private static final String HOST = "127.0.0.1:47500..47509"; private static final InterpreterContext INTP_CONTEXT = - new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null); + new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null); private IgniteInterpreter intp; private Ignite ignite; diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java index 9076c360dd6..07f11f883be 100644 --- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java @@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest { private static final String HOST = "127.0.0.1:47500..47509"; private static final InterpreterContext INTP_CONTEXT = - new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null); + new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null); private Ignite ignite; private IgniteSqlInterpreter intp; diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java index bd5bae686f6..145432bcb0c 100644 --- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java +++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java @@ -87,7 +87,7 @@ public void setUp() throws Exception { insertStatement.setString(1, null); insertStatement.execute(); interpreterContext = new InterpreterContext("", "1", "", "", new AuthenticationInfo(), null, null, null, null, - null, null); + null, null, null); } diff --git a/python/src/main/resources/bootstrap.pyc b/python/src/main/resources/bootstrap.pyc new file mode 100644 index 0000000000000000000000000000000000000000..13c93160034c3996dc3202c1a5a0f2a031620730 GIT binary patch literal 5788 zcmb_gTW=f36`m!j3u!4v>Nu$rB$*JkMHPZ%U7QxSB((8Wjjg&Xp@9_`#Bz5?uC&}; z?F?nnmQbM1OCRzt`r5ZX73gdJNrAi;1^U{4=gcmti>;<;DRDSEbLN~g=YF>QkLg6O@G#qB$Xg3Ik1w0B}{&2I_HIc`d}VotjWJn(bU7F18vI;3)hTKAV0C zQ~Emi-o;n{2m^o~J~Djx_sru~gJ3rGRg6!)%)$2 zOg~ey0Wzn~%#sFI);Hp`ZzO@@IxDQPOs%IGtVuRr&WE(!sJ9pEbM-*Ay?$hV>qUK4 zpCtwXX|ZgnNck-DH(~-J%@Dw}RG*VyH|hytT&&N|NzAcUU#ibOpIbVO_HI}Cd#&W~ z9DkA-Y}D)XyGa-Wy`MvZ!16@|z6Qr(}|nJ$R8!5dnJ?Jv8(01zu{blp)5Ripi-p`9gq5 zfB=Rd6Lz|0mgi(n9xO49JFsx64?3sixY}ATJa~1XwSJo-4z51B!W`pWo4Iu}Jv@1} zIZI`2W|4Mg=488P)@FczW>(s3=K3N7N1y{^5ZgV^cn|10Ck^D3E?I{;^natj)BefS zX3r}1ZPs`Y>OQRCB|_yHHfuJ%QXnkMrBE;Vzjff;PEvc8j02VywB;Hm^Axn2zCLKw zTVA|3j~U!ct8}e?r#`ZkpiSsFAe<80)^8rm1hMuUV?RWGH|%ZpM=|t7?0mZ?yg`z% zK~mp-bVj7$nPi%$AA~%WK_ryuI$6@Ec z_B8H!T(QO%t;wzXkMG~zmOqv4Ea{OcJl=S+u`O>EI;LL|TutILs!K+*^A!q}BUC-q z@vEsk5)ZimdO9dXa5sT`56!Stp%Y&aG)guq+id+ul^<$B)O@ziL&1kR$~i2K@qCQu z=*R9i-K#oVKhA|nIfZ_LL)RXV9E8B7DtPkvhzQZzgbo8MZ)+AvibcHy*$;UkMHLGAlGaI9 zXdjNy)Hu{XWs5XNQw87KvyV_+i58Da;#)^J@h^ob)AMT!S7x|ptm4W^xN8OC3M0<= z3^Wnv)=2u=G{kAMNy>&TQM)v%xAI!XP#-hQ%J~5w^>wq^ARMfsuBSa?GMAb)a~T5_ zWX%ySHYqbm6$_F#10;Ieb&U*<%p~R3Z107ab@W9tCPN`azoTC=4dHy^Tyd_JuQ-<- zZX?N_h(nh29&lSpLvZ8|j@YY+tQYi{sgWT8?8L5ewQCBc9fiDs6((7cNY)A3alU10IAiBt;`Q4A}t;2`9&Cghl-f+mJJiIe16xjfw0#g5HHm0m7lf``?N9 zF;kvk%70{3u3(C&z7}O`s?)@r z6HLQEIZAzot3Mfam=sTjW&E(-a2Ei@)?s@kHfw&1BUi&-N9xR9dxgm{N)~ijYp?j{ zcrI_V)?V30=y+Yf4@4CAZihXuqZXizJ1tME&sXNcGSMK_c-jv`Ru)<%eFrN!DU^d! zn?a=3W|+Y3oX#DK{&qoTW}TVgx?7WNqVKXVG*C>GC0{kb1wV9)ttV3dYmkp*dV)aY>eyuAx2jMnEXD)tjpb`%IY%CKWGJ@{Xk+6$rv zr^ubZDnd!qP8d<3=&1V%5hQox7b5qmP;qbK(Y=A8K{jkFg=;ihrhx(uqsE=tFWpYRzVl_{}C4H!ibGg`rE<=Tg3PKcazL+a;(), new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), null, + new AngularObjectRegistry(intpGroup.getId(), null), null, null, new LinkedList(), new InterpreterOutput( new InterpreterOutputListener() { @Override public void onAppend(InterpreterOutput out, byte[] line) {} diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java index 7ffbd975b3e..2c0651e176b 100644 --- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java +++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java @@ -66,7 +66,7 @@ public void setUp() throws Exception { InterpreterGroup intpGroup = new InterpreterGroup(); context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(), new HashMap(), new GUI(), new AngularObjectRegistry( - intpGroup.getId(), null), null, + intpGroup.getId(), null), null, null, new LinkedList(), null); } diff --git a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java index acdb65cc5ce..6c6978c7894 100644 --- a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java +++ b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java @@ -47,7 +47,7 @@ public void tearDown() throws Exception { @Test public void test() { shell.open(); - InterpreterContext context = new InterpreterContext("", "1", "", "", null, null, null, null, null, null, null); + InterpreterContext context = new InterpreterContext("", "1", "", "", null, null, null, null, null, null, null, null); InterpreterResult result = new InterpreterResult(Code.ERROR); if (System.getProperty("os.name").startsWith("Windows")) { result = shell.interpret("dir", context); @@ -64,7 +64,7 @@ public void test() { @Test public void testInvalidCommand(){ shell.open(); - InterpreterContext context = new InterpreterContext("","1","","",null,null,null,null,null,null,null); + InterpreterContext context = new InterpreterContext("","1","","",null,null,null,null,null,null,null,null); InterpreterResult result = new InterpreterResult(Code.ERROR); if (System.getProperty("os.name").startsWith("Windows")) { result = shell.interpret("invalid_command\ndir",context); diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java index 03ecb9efedd..b73c87d514e 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java @@ -67,7 +67,7 @@ public void setUp() throws Exception { context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(), new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - null, + null, null, new LinkedList(), null); } diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 1c7979fc428..c7d6838051b 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -97,6 +97,7 @@ public void setUp() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("id"), new LinkedList(), new InterpreterOutput(new InterpreterOutputListener() { diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index badd0404bc5..b7039ed318c 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -78,6 +78,7 @@ public void setUp() throws Exception { context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(), new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("id"), new LinkedList(), new InterpreterOutput(new InterpreterOutputListener() { @Override diff --git a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala index 9b5cd6269c8..65c3a276d79 100644 --- a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala +++ b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala @@ -36,7 +36,7 @@ trait AbstractAngularElemTest val context = new InterpreterContext("note", "paragraph", "title", "text", new AuthenticationInfo(), new util.HashMap[String, Object](), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - null, + null, null, new util.LinkedList[InterpreterContextRunner](), new InterpreterOutput(new InterpreterOutputListener() { override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = { diff --git a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala index 33ca5086427..8f046c7eb4e 100644 --- a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala +++ b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala @@ -32,7 +32,7 @@ with BeforeAndAfter with BeforeAndAfterEach with Eventually with Matchers { val context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(), new java.util.HashMap[String, Object](), new GUI(), new AngularObjectRegistry( intpGroup.getId(), null), - null, + null, null, new java.util.LinkedList[InterpreterContextRunner](), new InterpreterOutput(new InterpreterOutputListener() { override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java index 9678b4691df..4c12f7cbf6b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java @@ -140,6 +140,7 @@ public void destroy() { public static Logger logger = LoggerFactory.getLogger(Interpreter.class); private InterpreterGroup interpreterGroup; private URL [] classloaderUrls; + protected Map callbackRegistry = new HashMap(); protected Properties property; @ZeppelinApi @@ -370,4 +371,5 @@ public static RegisteredInterpreter findRegisteredInterpreterByClassName(String } return null; } + } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackListener.java new file mode 100644 index 00000000000..5f404ff308f --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +/** + * An interface for processing custom callback code into the interpreter. + */ +public interface InterpreterCallbackListener { + public String onPreExecute(String script); + public String onPostExecute(String script); +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistry.java new file mode 100644 index 00000000000..fdfed5ef959 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistry.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.util.HashMap; +import java.util.Map; + +/** + * The InterpreterinterpreterCallbackRegistry specifies code to be conditionally executed by an + * interpreter. The constants defined in this class denote currently + * supported events. Each instance is bound to a single InterpreterGroup + * and has Note-wide scope. + */ +public class InterpreterCallbackRegistry { + // Execute the callback code PRIOR to main paragraph code execution + public static final String PRE_EXEC = "pre_exec"; + + // Execute the callback code AFTER main paragraph code execution + public static final String POST_EXEC = "post_exec"; + + private String interpreterId; + private Map>> registry = + new HashMap>>(); + + /** + * CallbackRegistry constructor. + * + * @param interpreterId The Id of the InterpreterGroup instance to bind to + */ + public InterpreterCallbackRegistry(final String interpreterId) { + this.interpreterId = interpreterId; + } + + /** + * Adds a note to the registry + * + * @param noteId The Id of the Note instance to add + */ + public void addNote(String noteId) { + synchronized (registry) { + if (registry.get(noteId) == null) { + registry.put(noteId, new HashMap>()); + } + } + } + + /** + * Adds a replName to the registry + * + * @param noteId The note id + * @param replName The name of the interpreter repl to map the callbacks to + */ + public void addRepl(String noteId, String replName) { + synchronized (registry) { + addNote(noteId); + if (registry.get(noteId).get(replName) == null) { + registry.get(noteId).put(replName, new HashMap()); + } + } + } + + /** + * Register a callback for a specific event. + * + * @param noteId Denotes the note this instance belongs to + * @param replName The name of the interpreter repl to map the callbacks to + * @param event Callback event (see constants defined in this class) + * @param cmd Code to be executed by the interpreter + */ + public void register(String noteId, String replName, String event, String cmd) { + synchronized (registry) { + addRepl(noteId, replName); + registry.get(noteId).get(replName).put(event, cmd); + } + } + + /** + * Unregister a callback for a specific event. + * + * @param noteId Denotes the note this instance belongs to + * @param replName The name of the interpreter repl to map the callbacks to + * @param event Callback event (see constants defined in this class) + */ + public void unregister(String noteId, String replName, String event) { + synchronized (registry) { + addRepl(noteId, replName); + registry.get(noteId).get(replName).remove(event); + } + } + + /** + * Get a callback for a specific event. + * + * @param noteId Denotes the note this instance belongs to + * @param replName The name of the interpreter repl to map the callbacks to + * @param event Callback event (see constants defined in this class) + */ + public String get(String noteId, String replName, String event) { + synchronized (registry) { + addRepl(noteId, replName); + return registry.get(noteId).get(replName).get(event); + } + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 21ca2e67b72..3e74377ae4c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -55,6 +55,7 @@ public static void remove() { private final Map config; private GUI gui; private AngularObjectRegistry angularObjectRegistry; + private InterpreterCallbackRegistry callbackRegistry; private ResourcePool resourcePool; private List runners; @@ -66,6 +67,7 @@ public InterpreterContext(String noteId, Map config, GUI gui, AngularObjectRegistry angularObjectRegistry, + InterpreterCallbackRegistry callbackRegistry, ResourcePool resourcePool, List runners, InterpreterOutput out @@ -78,6 +80,7 @@ public InterpreterContext(String noteId, this.config = config; this.gui = gui; this.angularObjectRegistry = angularObjectRegistry; + this.callbackRegistry = callbackRegistry; this.resourcePool = resourcePool; this.runners = runners; this.out = out; @@ -116,6 +119,10 @@ public AngularObjectRegistry getAngularObjectRegistry() { return angularObjectRegistry; } + public InterpreterCallbackRegistry getInterpreterCallbackRegistry() { + return callbackRegistry; + } + public ResourcePool getResourcePool() { return resourcePool; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java index bc56784b15b..931db034404 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -45,6 +45,7 @@ public class InterpreterGroup extends ConcurrentHashMap getAll() { public InterpreterGroup(String id) { this.id = id; allInterpreterGroups.put(id, this); + callbackRegistry = new InterpreterCallbackRegistry(id); } /** @@ -118,6 +120,10 @@ public Properties getProperty() { public AngularObjectRegistry getAngularObjectRegistry() { return angularObjectRegistry; } + + public InterpreterCallbackRegistry getInterpreterCallbackRegistry() { + return callbackRegistry; + } public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) { this.angularObjectRegistry = angularObjectRegistry; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/InterpreterCallback.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/InterpreterCallback.java new file mode 100644 index 00000000000..6df428268bb --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/InterpreterCallback.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +/** + * The InterpreterCallback interface makes it possible to bind the execution of + * code into the interpreter repl conditionally to events. + */ +public interface InterpreterCallback { + //Binds the callback cmd to the given event + public void registerCallback(String event, String cmd); + + // Unbinds the callback cmd from the given event + public void unregisterCallback(String event); + + // Retrieves the callback code (cmd) for the given event + public String getCallback(String event); +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 7ddb92838f4..de0ed1484fe 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -494,6 +494,7 @@ private InterpreterContext convert(RemoteInterpreterContext ric, InterpreterOutp new TypeToken>() {}.getType()), gson.fromJson(ric.getGui(), GUI.class), interpreterGroup.getAngularObjectRegistry(), + interpreterGroup.getInterpreterCallbackRegistry(), interpreterGroup.getResourcePool(), contextRunners, output); } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java index 764c8b3823b..ecdf1089efe 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java @@ -27,7 +27,7 @@ public class InterpreterContextTest { public void testThreadLocal() { assertNull(InterpreterContext.get()); - InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null)); + InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null)); assertNotNull(InterpreterContext.get()); InterpreterContext.remove(); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java index bc34539c6aa..af57ba7a487 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java @@ -36,8 +36,8 @@ public void isOpenTest() { assertFalse("Interpreter is not open", lazyOpenInterpreter.isOpen()); InterpreterContext interpreterContext = - new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null); + new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null, null); lazyOpenInterpreter.interpret("intp 1", interpreterContext); assertTrue("Interpeter is open", lazyOpenInterpreter.isOpen()); } -} \ No newline at end of file +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java index 5def888b637..4e6796d21a7 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -90,6 +90,7 @@ public void setUp() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null); @@ -174,7 +175,7 @@ public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedExcepti // create object localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null); - // get from remote registry + // get from remote registry ret = intp.interpret("get", context); Thread.sleep(500); // waitFor eventpoller pool event result = ret.message().split(" "); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index 74649b1e750..2d86ee3ed90 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -88,7 +88,7 @@ private InterpreterContext createInterpreterContext() { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - null, + null, null, new LinkedList(), null); } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index af1c4471a05..8c9decfd263 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -150,6 +150,7 @@ public void testRemoteInterperterCall() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -186,6 +187,7 @@ public void testRemoteInterperterErrorStatus() throws TTransportException, IOExc new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -242,6 +244,7 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null)); assertEquals("500", ret.message()); @@ -256,6 +259,7 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null)); assertEquals("1000", ret.message()); @@ -310,6 +314,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null)); } @@ -346,6 +351,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null)); } @@ -412,6 +418,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -492,6 +499,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -594,6 +602,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null)); } @@ -750,6 +759,7 @@ public void testEnvronmentAndPropertySet() { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java index 02dba20c05d..347c7de863f 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java @@ -84,7 +84,7 @@ public void setUp() throws Exception { MockInterpreterResourcePool.class.getName(), new File(INTERPRETER_SCRIPT).getAbsolutePath(), "fake", - "fakeRepo", + "fakeRepo", env, 10 * 1000, null, @@ -106,6 +106,7 @@ public void setUp() throws Exception { new GUI(), null, null, + null, new LinkedList(), null); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index f17d88d50af..518f12f1b8d 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -32,6 +32,7 @@ import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterCallbackRegistry; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -116,6 +117,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null)); return "1000"; @@ -192,6 +194,7 @@ public void testAbortOnPending() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null); @@ -230,6 +233,7 @@ protected boolean jobAbort() { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new InterpreterCallbackRegistry(intpGroup.getId()), new LocalResourcePool("pool1"), new LinkedList(), null); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 7807abba416..3ef97236f83 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -301,27 +301,38 @@ protected Object jobRun() throws Throwable { } } - String script = getScriptBody(); - // inject form - if (repl.getFormType() == FormType.NATIVE) { - settings.clear(); - } else if (repl.getFormType() == FormType.SIMPLE) { - String scriptBody = getScriptBody(); - Map inputs = Input.extractSimpleQueryParam(scriptBody); // inputs will be built - // from script body - - final AngularObjectRegistry angularRegistry = repl.getInterpreterGroup() - .getAngularObjectRegistry(); - - scriptBody = extractVariablesFromAngularRegistry(scriptBody, inputs, angularRegistry); - - settings.setForms(inputs); - script = Input.getSimpleQuery(settings.getParams(), scriptBody); - } - logger.debug("RUN : " + script); try { InterpreterContext context = getInterpreterContext(); InterpreterContext.set(context); + + // Get text input from paragraph + String script = getScriptBody(); + + // Process callbacks + InterpreterCallbackRegistry callbacks = context.getInterpreterCallbackRegistry(); + script = processInterpreterCallbacks(callbacks, replName, script); + + // inject form + if (repl.getFormType() == FormType.NATIVE) { + settings.clear(); + } else if (repl.getFormType() == FormType.SIMPLE) { + String scriptBody = getScriptBody(); + scriptBody = processInterpreterCallbacks(callbacks, replName, scriptBody); + + // inputs will be built from script body + Map inputs = Input.extractSimpleQueryParam(scriptBody); + + final AngularObjectRegistry angularRegistry = repl.getInterpreterGroup() + .getAngularObjectRegistry(); + + scriptBody = extractVariablesFromAngularRegistry(scriptBody, inputs, angularRegistry); + + settings.setForms(inputs); + script = Input.getSimpleQuery(settings.getParams(), scriptBody); + } + + // Run the script code through interpreter REPL + logger.debug("RUN : " + script); InterpreterResult ret = repl.interpret(script, context); if (Code.KEEP_PREVIOUS_RESULT == ret.code()) { @@ -403,6 +414,39 @@ protected boolean jobAbort() { } return true; } + + private String processInterpreterCallbacks(final InterpreterCallbackRegistry callbacks, + final String replName, + String script) { + if (callbacks == null) { + return script; + } + + InterpreterCallbackListener callbackListener = new InterpreterCallbackListener() { + @Override + public String onPreExecute(String script) { + String cmd = callbacks.get(note.getId(), replName, InterpreterCallbackRegistry.PRE_EXEC); + if (cmd != null) { + script = cmd + '\n' + script; + } + + return script; + } + + @Override + public String onPostExecute(String script) { + String cmd = callbacks.get(note.getId(), replName, InterpreterCallbackRegistry.POST_EXEC); + if (cmd != null) { + script += '\n' + cmd; + } + + return script; + } + }; + script = callbackListener.onPreExecute(script); + script = callbackListener.onPostExecute(script); + return script; + } private InterpreterContext getInterpreterContext() { final Paragraph self = this; @@ -438,11 +482,13 @@ private void updateParagraphResult(InterpreterOutput out) { private InterpreterContext getInterpreterContext(InterpreterOutput output) { AngularObjectRegistry registry = null; + InterpreterCallbackRegistry callbacks = null; ResourcePool resourcePool = null; if (!factory.getInterpreterSettings(note.getId()).isEmpty()) { InterpreterSetting intpGroup = factory.getInterpreterSettings(note.getId()).get(0); registry = intpGroup.getInterpreterGroup(note.getId()).getAngularObjectRegistry(); + callbacks = intpGroup.getInterpreterGroup(note.getId()).getInterpreterCallbackRegistry(); resourcePool = intpGroup.getInterpreterGroup(note.getId()).getResourcePool(); } @@ -469,6 +515,7 @@ private InterpreterContext getInterpreterContext(InterpreterOutput output) { this.getConfig(), this.settings, registry, + callbacks, resourcePool, runners, output); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index 09031a59351..e326d3f9617 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -88,7 +88,7 @@ public void setUp() throws Exception { schedulerFactory = new SchedulerFactory(); depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo"); factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver); - context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null); + context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null, null); SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); From f50eccaa56889f0feda4a48a2222226df6596d53 Mon Sep 17 00:00:00 2001 From: Alex Goodman Date: Mon, 19 Sep 2016 15:48:34 -0700 Subject: [PATCH 2/9] Added support for user defined callbacks in spark/pyspark interpreters --- .../zeppelin/spark/ZeppelinContext.java | 66 +++++++++++++++++++ .../main/resources/python/zeppelin_pyspark.py | 18 +++++ .../zeppelin/spark/SparkInterpreterTest.java | 2 +- .../spark/SparkSqlInterpreterTest.java | 2 +- .../InterpreterCallbackRegistry.java | 26 +++++++- .../InterpreterCallbackRegistryListener.java | 29 ++++++++ .../interpreter/InterpreterGroup.java | 11 ++-- .../remote/RemoteInterpreterEventClient.java | 30 +++++++++ .../remote/RemoteInterpreterEventPoller.java | 16 +++++ .../remote/RemoteInterpreterServer.java | 17 ++++- .../thrift/InterpreterCompletion.java | 4 +- .../thrift/RemoteApplicationResult.java | 6 +- .../thrift/RemoteInterpreterContext.java | 4 +- .../thrift/RemoteInterpreterEvent.java | 4 +- .../thrift/RemoteInterpreterEventType.java | 10 ++- .../thrift/RemoteInterpreterResult.java | 4 +- .../thrift/RemoteInterpreterService.java | 10 +-- .../thrift/RemoteInterpreterService.thrift | 2 + .../remote/RemoteAngularObjectTest.java | 2 +- .../remote/RemoteInterpreterTest.java | 20 +++--- .../scheduler/RemoteSchedulerTest.java | 6 +- .../interpreter/InterpreterFactory.java | 2 + .../apache/zeppelin/notebook/Paragraph.java | 55 ++++++++-------- 23 files changed, 276 insertions(+), 70 deletions(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryListener.java diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index 7bccbac7d52..6c9a0903a06 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -37,6 +37,7 @@ import org.apache.zeppelin.display.AngularObjectWatcher; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.display.Input.ParamOption; +import org.apache.zeppelin.interpreter.InterpreterCallbackRegistry; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterException; @@ -695,6 +696,71 @@ private void angularUnbind(String name, String noteId) { registry.remove(name, noteId, null); } + /** + * General function to register callback event + * @param replName Name of the interpreter + * @param event The type of event to hook to (pre_exec, post_exec) + * @param cmd The code to be executed by the interpreter on given event + */ + @ZeppelinApi + public void registerCallback(String replName, String event, String cmd) { + InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); + String noteId = interpreterContext.getNoteId(); + callbacks.register(noteId, replName, event, cmd); + } + + /** + * registerCallback() wrapper for the spark (scala) interpreter + * @param event The type of event to hook to (pre_exec, post_exec) + * @param cmd The code to be executed by the interpreter on given event + */ + @ZeppelinApi + public void registerCallback(String event, String cmd) { + registerCallback("spark", event, cmd); + } + + /** + * Get the callback code + * @param replName Name of the interpreter + * @param event The type of event to hook to (pre_exec, post_exec) + */ + @ZeppelinApi + public String getCallback(String replName, String event) { + InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); + String noteId = interpreterContext.getNoteId(); + return callbacks.get(noteId, replName, event); + } + + /** + * getCallback() wrapper for the spark (scala) interpreter + * @param event The type of event to hook to (pre_exec, post_exec) + * @param cmd The code to be executed by the interpreter on given event + */ + @ZeppelinApi + public String getCallback(String event) { + return getCallback("spark", event); + } + + /** + * Unbind code from given callback event + * @param replName Name of the interpreter + * @param event The type of event to hook to (pre_exec, post_exec) + */ + @ZeppelinApi + public void unregisterCallback(String replName, String event) { + InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); + String noteId = interpreterContext.getNoteId(); + callbacks.unregister(noteId, replName, event); + } + + /** + * Unbind code from given callback event + * @param event The type of event to hook to (pre_exec, post_exec) + */ + @ZeppelinApi + public void unregisterCallback(String event) { + unregisterCallback("spark", event); + } /** * Add object into resource pool diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 53465c2cd80..c0e4e35ef02 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -98,6 +98,24 @@ def checkbox(self, name, options, defaultChecked = None): checkedIterables = self.z.checkbox(name, defaultCheckedIterables, optionIterables) return gateway.jvm.scala.collection.JavaConversions.asJavaCollection(checkedIterables) + + def registerCallback(self, event, cmd): + self.z.registerCallback("pyspark", event, cmd) + + def registerCallbackToRepl(self, replName, event, cmd): + self.z.registerCallback(replName, event, cmd) + + def unregisterCallback(self, event): + self.z.unregisterCallback("pyspark", event) + + def unregisterCallbackFromRepl(self, replName, event): + self.z.unregisterCallback(replName, event) + + def getCallback(self, event): + return self.z.getCallback("pyspark", event) + + def getCallbackForRepl(self, replName, event): + return self.z.getCallback(replName, event) def __tupleToScalaTuple2(self, tuple): if (len(tuple) == 2): diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index c7d6838051b..d2f481bed39 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -97,7 +97,7 @@ public void setUp() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("id"), new LinkedList(), new InterpreterOutput(new InterpreterOutputListener() { diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index b7039ed318c..e33b7259756 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -78,7 +78,7 @@ public void setUp() throws Exception { context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(), new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("id"), new LinkedList(), new InterpreterOutput(new InterpreterOutputListener() { @Override diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistry.java index fdfed5ef959..88b1de3d5b2 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistry.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistry.java @@ -33,6 +33,7 @@ public class InterpreterCallbackRegistry { // Execute the callback code AFTER main paragraph code execution public static final String POST_EXEC = "post_exec"; + private InterpreterCallbackRegistryListener listener; private String interpreterId; private Map>> registry = new HashMap>>(); @@ -41,9 +42,19 @@ public class InterpreterCallbackRegistry { * CallbackRegistry constructor. * * @param interpreterId The Id of the InterpreterGroup instance to bind to + * @param listener InterpreterCallbackRegistryListener instance to use for broadcasting */ - public InterpreterCallbackRegistry(final String interpreterId) { + public InterpreterCallbackRegistry(final String interpreterId, + final InterpreterCallbackRegistryListener listener) { this.interpreterId = interpreterId; + this.listener = listener; + } + + /** + * Get the interpreterGroup id this instance is bound to + */ + public String getInterpreterId() { + return interpreterId; } /** @@ -82,10 +93,18 @@ public void addRepl(String noteId, String replName) { * @param event Callback event (see constants defined in this class) * @param cmd Code to be executed by the interpreter */ - public void register(String noteId, String replName, String event, String cmd) { + public void register(String noteId, String replName, + String event, String cmd) throws IllegalArgumentException { synchronized (registry) { addRepl(noteId, replName); + if (!event.equals(POST_EXEC) && !event.equals(PRE_EXEC)) { + throw new IllegalArgumentException("Must be " + POST_EXEC + + " or " + PRE_EXEC); + } registry.get(noteId).get(replName).put(event, cmd); + if (listener != null) { + listener.onRegister(interpreterId, noteId, replName, event, cmd); + } } } @@ -100,6 +119,9 @@ public void unregister(String noteId, String replName, String event) { synchronized (registry) { addRepl(noteId, replName); registry.get(noteId).get(replName).remove(event); + if (listener != null) { + listener.onUnregister(interpreterId, noteId, replName, event); + } } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryListener.java new file mode 100644 index 00000000000..cf063428e6f --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +/** + * This interface is used to broadcast changes in the callback registry + * from a remote interpreter process to the Zeppelin Engine. + */ +public interface InterpreterCallbackRegistryListener { + public void onRegister(String interpreterGroupId, String noteId, String replName, + String event, String cmd); + public void onUnregister(String interpreterGroupId, String noteId, String replName, + String event); +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java index 931db034404..4778be3b1f1 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -72,7 +72,6 @@ public static Collection getAll() { public InterpreterGroup(String id) { this.id = id; allInterpreterGroups.put(id, this); - callbackRegistry = new InterpreterCallbackRegistry(id); } /** @@ -121,12 +120,16 @@ public AngularObjectRegistry getAngularObjectRegistry() { return angularObjectRegistry; } + public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) { + this.angularObjectRegistry = angularObjectRegistry; + } + public InterpreterCallbackRegistry getInterpreterCallbackRegistry() { return callbackRegistry; } - - public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) { - this.angularObjectRegistry = angularObjectRegistry; + + public void setInterpreterCallbackRegistry(InterpreterCallbackRegistry callbackRegistry) { + this.callbackRegistry = callbackRegistry; } public RemoteInterpreterProcess getRemoteInterpreterProcess() { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index 6f26ffd959b..03d4d03bea2 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.interpreter.remote; import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; @@ -26,6 +27,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.reflect.Type; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; @@ -88,6 +90,34 @@ public void angularObjectRemove(String name, String noteId, String paragraphId) RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE, gson.toJson(removeObject))); } + /** + * notify callback registration + */ + public void registerCallback(String noteId, String replName, String eventName, String cmd) { + Map callbackInfo = new HashMap(); + callbackInfo.put("noteId", noteId); + callbackInfo.put("replName", replName); + callbackInfo.put("eventName", eventName); + callbackInfo.put("cmd", cmd); + Type mapType = new TypeToken>() {}.getType(); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.REGISTER_CALLBACK, gson.toJson(callbackInfo, mapType))); + } + + /** + * notify callback unregistration + */ + public void unregisterCallback(String noteId, String replName, String eventName) { + Map callbackInfo = new HashMap(); + callbackInfo.put("noteId", noteId); + callbackInfo.put("replName", replName); + callbackInfo.put("eventName", eventName); + Type mapType = new TypeToken>() {}.getType(); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.UNREGISTER_CALLBACK, gson.toJson(callbackInfo, mapType))); + } /** * Get all resources except for specific resourcePool diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 090aeeaaee3..5f8b1c760d7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -23,6 +23,7 @@ import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.InterpreterCallbackRegistry; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; @@ -35,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Type; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; @@ -115,8 +117,11 @@ public void run() { } Gson gson = new Gson(); + Type mapType = new TypeToken>() {}.getType(); AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry(); + InterpreterCallbackRegistry callbackRegistry = + interpreterGroup.getInterpreterCallbackRegistry(); try { if (event.getType() == RemoteInterpreterEventType.NO_OP) { @@ -141,6 +146,17 @@ public void run() { AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class); angularObjectRegistry.remove(angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId()); + } else if (event.getType() == RemoteInterpreterEventType.REGISTER_CALLBACK) { + Map callbackInfo = gson.fromJson(event.getData(), mapType); + callbackRegistry.register(callbackInfo.get("noteId"), + callbackInfo.get("replName"), + callbackInfo.get("eventName"), + callbackInfo.get("cmd")); + } else if (event.getType() == RemoteInterpreterEventType.UNREGISTER_CALLBACK) { + Map callbackInfo = gson.fromJson(event.getData(), mapType); + callbackRegistry.unregister(callbackInfo.get("noteId"), + callbackInfo.get("replName"), + callbackInfo.get("eventName")); } else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) { InterpreterContextRunner runnerFromRemote = gson.fromJson( event.getData(), RemoteInterpreterContextRunner.class); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index de0ed1484fe..fdf554b6570 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -55,11 +55,13 @@ */ public class RemoteInterpreterServer extends Thread - implements RemoteInterpreterService.Iface, AngularObjectRegistryListener { + implements RemoteInterpreterService.Iface, AngularObjectRegistryListener, + InterpreterCallbackRegistryListener { Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class); InterpreterGroup interpreterGroup; AngularObjectRegistry angularObjectRegistry; + InterpreterCallbackRegistry callbackRegistry; DistributedResourcePool resourcePool; private ApplicationLoader appLoader; @@ -152,7 +154,9 @@ public void createInterpreter(String interpreterGroupId, String noteId, String if (interpreterGroup == null) { interpreterGroup = new InterpreterGroup(interpreterGroupId); angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this); + callbackRegistry = new InterpreterCallbackRegistry(interpreterGroup.getId(), this); resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient); + interpreterGroup.setInterpreterCallbackRegistry(callbackRegistry); interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); interpreterGroup.setResourcePool(resourcePool); @@ -588,6 +592,17 @@ public void onRemove(String interpreterGroupId, String name, String noteId, Stri eventClient.angularObjectRemove(name, noteId, paragraphId); } + @Override + public void onRegister(String interpreterGroupId, String noteId, String replName, + String event, String cmd) { + eventClient.registerCallback(noteId, replName, event, cmd); + } + + @Override + public void onUnregister(String interpreterGroupId, String noteId, String replName, + String event) { + eventClient.unregisterCallback(noteId, replName, event); + } /** * Poll event from RemoteInterpreterEventPoller diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java index 9ceba88826d..b4987339437 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.2) + * Autogenerated by Thrift Compiler (0.9.3) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-6-8") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") public class InterpreterCompletion implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java index a192899daa1..2193ac431b4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.2) + * Autogenerated by Thrift Compiler (0.9.3) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") public class RemoteApplicationResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult"); @@ -248,7 +248,7 @@ public void setFieldValue(_Fields field, Object value) { public Object getFieldValue(_Fields field) { switch (field) { case SUCCESS: - return Boolean.valueOf(isSuccess()); + return isSuccess(); case MSG: return getMsg(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java index f8b63ff3422..4dea873bbd8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.2) + * Autogenerated by Thrift Compiler (0.9.3) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-6-8") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") public class RemoteInterpreterContext implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java index 0db5697bbf5..2c6e565b27a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.2) + * Autogenerated by Thrift Compiler (0.9.3) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-6-8") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") public class RemoteInterpreterEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 955461951f3..3f5e17d8fa1 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.2) + * Autogenerated by Thrift Compiler (0.9.3) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -39,7 +39,9 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { OUTPUT_APPEND(8), OUTPUT_UPDATE(9), ANGULAR_REGISTRY_PUSH(10), - APP_STATUS_UPDATE(11); + APP_STATUS_UPDATE(11), + REGISTER_CALLBACK(12), + UNREGISTER_CALLBACK(13); private final int value; @@ -82,6 +84,10 @@ public static RemoteInterpreterEventType findByValue(int value) { return ANGULAR_REGISTRY_PUSH; case 11: return APP_STATUS_UPDATE; + case 12: + return REGISTER_CALLBACK; + case 13: + return UNREGISTER_CALLBACK; default: return null; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java index 1af0ce0b2b5..566d2c0d61d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.2) + * Autogenerated by Thrift Compiler (0.9.3) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-6-8") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") public class RemoteInterpreterResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java index e3ddeebc664..e0c5b78f0ce 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.2) + * Autogenerated by Thrift Compiler (0.9.3) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-6-8") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") public class RemoteInterpreterService { public interface Iface { @@ -8252,7 +8252,7 @@ public void setFieldValue(_Fields field, Object value) { public Object getFieldValue(_Fields field) { switch (field) { case SUCCESS: - return Integer.valueOf(getSuccess()); + return getSuccess(); } throw new IllegalStateException(); @@ -9584,7 +9584,7 @@ public Object getFieldValue(_Fields field) { return getBuf(); case CURSOR: - return Integer.valueOf(getCursor()); + return getCursor(); } throw new IllegalStateException(); @@ -16019,7 +16019,7 @@ public void setFieldValue(_Fields field, Object value) { public Object getFieldValue(_Fields field) { switch (field) { case SUCCESS: - return Boolean.valueOf(isSuccess()); + return isSuccess(); } throw new IllegalStateException(); diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 32be4a4a0a8..f82847aa577 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -50,6 +50,8 @@ enum RemoteInterpreterEventType { OUTPUT_UPDATE = 9, ANGULAR_REGISTRY_PUSH = 10, APP_STATUS_UPDATE = 11, + REGISTER_CALLBACK = 12, + UNREGISTER_CALLBACK = 13, } struct RemoteInterpreterEvent { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java index 4e6796d21a7..16159116b02 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -90,7 +90,7 @@ public void setUp() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 8c9decfd263..0f89e082b33 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -150,7 +150,7 @@ public void testRemoteInterperterCall() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -187,7 +187,7 @@ public void testRemoteInterperterErrorStatus() throws TTransportException, IOExc new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -244,7 +244,7 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); assertEquals("500", ret.message()); @@ -259,7 +259,7 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); assertEquals("1000", ret.message()); @@ -314,7 +314,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); } @@ -351,7 +351,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); } @@ -418,7 +418,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -499,7 +499,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); @@ -602,7 +602,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); } @@ -759,7 +759,7 @@ public void testEnvronmentAndPropertySet() { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index 518f12f1b8d..c54926add94 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -117,7 +117,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null)); return "1000"; @@ -194,7 +194,7 @@ public void testAbortOnPending() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null); @@ -233,7 +233,7 @@ protected boolean jobAbort() { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new InterpreterCallbackRegistry(intpGroup.getId()), + new InterpreterCallbackRegistry(intpGroup.getId(), null), new LocalResourcePool("pool1"), new LinkedList(), null); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 40dcc30b36c..85a1c20a1f7 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -639,6 +639,7 @@ public InterpreterGroup createInterpreterGroup(String id, InterpreterOption opti if (option == null) throw new NullArgumentException("option"); + InterpreterCallbackRegistry callbackRegistry = new InterpreterCallbackRegistry(id, null); AngularObjectRegistry angularObjectRegistry; InterpreterGroup interpreterGroup = new InterpreterGroup(id); @@ -652,6 +653,7 @@ public InterpreterGroup createInterpreterGroup(String id, InterpreterOption opti } interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); + interpreterGroup.setInterpreterCallbackRegistry(callbackRegistry); return interpreterGroup; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 3ef97236f83..31c6f57a1dd 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -301,38 +301,35 @@ protected Object jobRun() throws Throwable { } } - try { - InterpreterContext context = getInterpreterContext(); - InterpreterContext.set(context); + // Get text input from paragraph + String script = getScriptBody(); + + // Process pre/post-execute callback hooks + final InterpreterCallbackRegistry callbacks = repl.getInterpreterGroup() + .getInterpreterCallbackRegistry(); + script = processInterpreterCallbacks(callbacks, replName, script); + + // inject form + if (repl.getFormType() == FormType.NATIVE) { + settings.clear(); + } else if (repl.getFormType() == FormType.SIMPLE) { + String scriptBody = getScriptBody(); + scriptBody = processInterpreterCallbacks(callbacks, replName, scriptBody); - // Get text input from paragraph - String script = getScriptBody(); - - // Process callbacks - InterpreterCallbackRegistry callbacks = context.getInterpreterCallbackRegistry(); - script = processInterpreterCallbacks(callbacks, replName, script); - - // inject form - if (repl.getFormType() == FormType.NATIVE) { - settings.clear(); - } else if (repl.getFormType() == FormType.SIMPLE) { - String scriptBody = getScriptBody(); - scriptBody = processInterpreterCallbacks(callbacks, replName, scriptBody); - - // inputs will be built from script body - Map inputs = Input.extractSimpleQueryParam(scriptBody); - - final AngularObjectRegistry angularRegistry = repl.getInterpreterGroup() - .getAngularObjectRegistry(); + // inputs will be built from script body + Map inputs = Input.extractSimpleQueryParam(scriptBody); + final AngularObjectRegistry angularRegistry = repl.getInterpreterGroup() + .getAngularObjectRegistry(); - scriptBody = extractVariablesFromAngularRegistry(scriptBody, inputs, angularRegistry); + scriptBody = extractVariablesFromAngularRegistry(scriptBody, inputs, angularRegistry); - settings.setForms(inputs); - script = Input.getSimpleQuery(settings.getParams(), scriptBody); - } - - // Run the script code through interpreter REPL - logger.debug("RUN : " + script); + settings.setForms(inputs); + script = Input.getSimpleQuery(settings.getParams(), scriptBody); + } + logger.debug("RUN : " + script); + try { + InterpreterContext context = getInterpreterContext(); + InterpreterContext.set(context); InterpreterResult ret = repl.interpret(script, context); if (Code.KEEP_PREVIOUS_RESULT == ret.code()) { From 1594dfbcb2df805705143df913e9a5eb63c6be18 Mon Sep 17 00:00:00 2001 From: Alex Goodman Date: Tue, 20 Sep 2016 11:14:47 -0700 Subject: [PATCH 3/9] Deleted unused files --- python/src/main/resources/bootstrap.pyc | Bin 5788 -> 0 bytes .../interpreter/dev/InterpreterCallback.java | 33 ------------------ 2 files changed, 33 deletions(-) delete mode 100644 python/src/main/resources/bootstrap.pyc delete mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/InterpreterCallback.java diff --git a/python/src/main/resources/bootstrap.pyc b/python/src/main/resources/bootstrap.pyc deleted file mode 100644 index 13c93160034c3996dc3202c1a5a0f2a031620730..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 5788 zcmb_gTW=f36`m!j3u!4v>Nu$rB$*JkMHPZ%U7QxSB((8Wjjg&Xp@9_`#Bz5?uC&}; z?F?nnmQbM1OCRzt`r5ZX73gdJNrAi;1^U{4=gcmti>;<;DRDSEbLN~g=YF>QkLg6O@G#qB$Xg3Ik1w0B}{&2I_HIc`d}VotjWJn(bU7F18vI;3)hTKAV0C zQ~Emi-o;n{2m^o~J~Djx_sru~gJ3rGRg6!)%)$2 zOg~ey0Wzn~%#sFI);Hp`ZzO@@IxDQPOs%IGtVuRr&WE(!sJ9pEbM-*Ay?$hV>qUK4 zpCtwXX|ZgnNck-DH(~-J%@Dw}RG*VyH|hytT&&N|NzAcUU#ibOpIbVO_HI}Cd#&W~ z9DkA-Y}D)XyGa-Wy`MvZ!16@|z6Qr(}|nJ$R8!5dnJ?Jv8(01zu{blp)5Ripi-p`9gq5 zfB=Rd6Lz|0mgi(n9xO49JFsx64?3sixY}ATJa~1XwSJo-4z51B!W`pWo4Iu}Jv@1} zIZI`2W|4Mg=488P)@FczW>(s3=K3N7N1y{^5ZgV^cn|10Ck^D3E?I{;^natj)BefS zX3r}1ZPs`Y>OQRCB|_yHHfuJ%QXnkMrBE;Vzjff;PEvc8j02VywB;Hm^Axn2zCLKw zTVA|3j~U!ct8}e?r#`ZkpiSsFAe<80)^8rm1hMuUV?RWGH|%ZpM=|t7?0mZ?yg`z% zK~mp-bVj7$nPi%$AA~%WK_ryuI$6@Ec z_B8H!T(QO%t;wzXkMG~zmOqv4Ea{OcJl=S+u`O>EI;LL|TutILs!K+*^A!q}BUC-q z@vEsk5)ZimdO9dXa5sT`56!Stp%Y&aG)guq+id+ul^<$B)O@ziL&1kR$~i2K@qCQu z=*R9i-K#oVKhA|nIfZ_LL)RXV9E8B7DtPkvhzQZzgbo8MZ)+AvibcHy*$;UkMHLGAlGaI9 zXdjNy)Hu{XWs5XNQw87KvyV_+i58Da;#)^J@h^ob)AMT!S7x|ptm4W^xN8OC3M0<= z3^Wnv)=2u=G{kAMNy>&TQM)v%xAI!XP#-hQ%J~5w^>wq^ARMfsuBSa?GMAb)a~T5_ zWX%ySHYqbm6$_F#10;Ieb&U*<%p~R3Z107ab@W9tCPN`azoTC=4dHy^Tyd_JuQ-<- zZX?N_h(nh29&lSpLvZ8|j@YY+tQYi{sgWT8?8L5ewQCBc9fiDs6((7cNY)A3alU10IAiBt;`Q4A}t;2`9&Cghl-f+mJJiIe16xjfw0#g5HHm0m7lf``?N9 zF;kvk%70{3u3(C&z7}O`s?)@r z6HLQEIZAzot3Mfam=sTjW&E(-a2Ei@)?s@kHfw&1BUi&-N9xR9dxgm{N)~ijYp?j{ zcrI_V)?V30=y+Yf4@4CAZihXuqZXizJ1tME&sXNcGSMK_c-jv`Ru)<%eFrN!DU^d! zn?a=3W|+Y3oX#DK{&qoTW}TVgx?7WNqVKXVG*C>GC0{kb1wV9)ttV3dYmkp*dV)aY>eyuAx2jMnEXD)tjpb`%IY%CKWGJ@{Xk+6$rv zr^ubZDnd!qP8d<3=&1V%5hQox7b5qmP;qbK(Y=A8K{jkFg=;ihrhx(uqsE=tFWpYRzVl_{}C4H!ibGg`rE<=Tg3PKcazL+a; Date: Tue, 20 Sep 2016 11:16:28 -0700 Subject: [PATCH 4/9] Regenerated thrift files using v0.9.2 --- .../interpreter/thrift/InterpreterCompletion.java | 4 ++-- .../interpreter/thrift/RemoteApplicationResult.java | 6 +++--- .../interpreter/thrift/RemoteInterpreterContext.java | 4 ++-- .../interpreter/thrift/RemoteInterpreterEvent.java | 4 ++-- .../interpreter/thrift/RemoteInterpreterEventType.java | 2 +- .../interpreter/thrift/RemoteInterpreterResult.java | 4 ++-- .../interpreter/thrift/RemoteInterpreterService.java | 10 +++++----- 7 files changed, 17 insertions(+), 17 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java index b4987339437..f724b0de916 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.3) + * Autogenerated by Thrift Compiler (0.9.2) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class InterpreterCompletion implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java index 2193ac431b4..4bc434f2e6a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.3) + * Autogenerated by Thrift Compiler (0.9.2) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class RemoteApplicationResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult"); @@ -248,7 +248,7 @@ public void setFieldValue(_Fields field, Object value) { public Object getFieldValue(_Fields field) { switch (field) { case SUCCESS: - return isSuccess(); + return Boolean.valueOf(isSuccess()); case MSG: return getMsg(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java index 4dea873bbd8..fc0803b7a44 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.3) + * Autogenerated by Thrift Compiler (0.9.2) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class RemoteInterpreterContext implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java index 2c6e565b27a..c4d36a906bc 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.3) + * Autogenerated by Thrift Compiler (0.9.2) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class RemoteInterpreterEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 3f5e17d8fa1..31475d7a9df 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.3) + * Autogenerated by Thrift Compiler (0.9.2) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java index 566d2c0d61d..dad455ff296 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.3) + * Autogenerated by Thrift Compiler (0.9.2) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class RemoteInterpreterResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java index e0c5b78f0ce..8b86e11381c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * Autogenerated by Thrift Compiler (0.9.3) + * Autogenerated by Thrift Compiler (0.9.2) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-09-19") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-9-20") public class RemoteInterpreterService { public interface Iface { @@ -8252,7 +8252,7 @@ public void setFieldValue(_Fields field, Object value) { public Object getFieldValue(_Fields field) { switch (field) { case SUCCESS: - return getSuccess(); + return Integer.valueOf(getSuccess()); } throw new IllegalStateException(); @@ -9584,7 +9584,7 @@ public Object getFieldValue(_Fields field) { return getBuf(); case CURSOR: - return getCursor(); + return Integer.valueOf(getCursor()); } throw new IllegalStateException(); @@ -16019,7 +16019,7 @@ public void setFieldValue(_Fields field, Object value) { public Object getFieldValue(_Fields field) { switch (field) { case SUCCESS: - return isSuccess(); + return Boolean.valueOf(isSuccess()); } throw new IllegalStateException(); From b8265fea798c92a411324a2eb3a73f245b8e50ee Mon Sep 17 00:00:00 2001 From: Alex Goodman Date: Tue, 20 Sep 2016 13:13:33 -0700 Subject: [PATCH 5/9] Added unit tests for InterpreterCallbackRegistry --- .../InterpreterCallbackRegistryTest.java | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryTest.java diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryTest.java new file mode 100644 index 00000000000..ef36145d973 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterCallbackRegistryTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +public class InterpreterCallbackRegistryTest { + + @Test + public void testBasic() { + final AtomicInteger onRegister = new AtomicInteger(0); + final AtomicInteger onUnregister = new AtomicInteger(0); + final String PRE_EXEC = InterpreterCallbackRegistry.PRE_EXEC; + final String POST_EXEC = InterpreterCallbackRegistry.POST_EXEC; + final String noteId = "note"; + final String replName = "repl"; + final String preExecCallback = "pre"; + final String postExecCallback = "post"; + InterpreterCallbackRegistry registry = new InterpreterCallbackRegistry("intpId", + new InterpreterCallbackRegistryListener() { + + @Override + public void onRegister(String interpreterGroupId, String noteId, String replName, + String event, String cmd) { + onRegister.incrementAndGet(); + } + + @Override + public void onUnregister(String interpreterGroupId, String noteId, String replName, + String event) { + onUnregister.incrementAndGet(); + } + }); + + // Test register() + registry.register(noteId, replName, PRE_EXEC, preExecCallback); + registry.register(noteId, replName, POST_EXEC, postExecCallback); + assertEquals(2, onRegister.get()); + assertEquals(0, onUnregister.get()); + + // Test get() + assertEquals(registry.get(noteId, replName, PRE_EXEC), preExecCallback); + assertEquals(registry.get(noteId, replName, POST_EXEC), postExecCallback); + + // Test Unregister + registry.unregister(noteId, replName, PRE_EXEC); + assertNull(registry.get(noteId, replName, PRE_EXEC)); + assertEquals(1, onUnregister.get()); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidEventCode() { + InterpreterCallbackRegistry registry = new InterpreterCallbackRegistry("intpId", null); + + // Test that only valid event codes ("pre_exec", "post_exec") are accepted + registry.register("foo", "bar", "baz", "whatever"); + } + +} From 3e7c8520786190891e70762828406f5ff7b431b0 Mon Sep 17 00:00:00 2001 From: Alex Goodman Date: Tue, 20 Sep 2016 14:02:19 -0700 Subject: [PATCH 6/9] Cleaned up ZeppelinContext callback registry API --- .../zeppelin/spark/ZeppelinContext.java | 18 ++++++++-------- .../main/resources/python/zeppelin_pyspark.py | 21 ++++++------------- 2 files changed, 15 insertions(+), 24 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index 6c9a0903a06..84995a73bff 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -698,12 +698,12 @@ private void angularUnbind(String name, String noteId) { /** * General function to register callback event - * @param replName Name of the interpreter * @param event The type of event to hook to (pre_exec, post_exec) * @param cmd The code to be executed by the interpreter on given event + * @param replName Name of the interpreter */ @ZeppelinApi - public void registerCallback(String replName, String event, String cmd) { + public void registerCallback(String event, String cmd, String replName) { InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); String noteId = interpreterContext.getNoteId(); callbacks.register(noteId, replName, event, cmd); @@ -716,16 +716,16 @@ public void registerCallback(String replName, String event, String cmd) { */ @ZeppelinApi public void registerCallback(String event, String cmd) { - registerCallback("spark", event, cmd); + registerCallback(event, cmd, "spark"); } /** * Get the callback code - * @param replName Name of the interpreter * @param event The type of event to hook to (pre_exec, post_exec) + * @param replName Name of the interpreter */ @ZeppelinApi - public String getCallback(String replName, String event) { + public String getCallback(String event, String replName) { InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); String noteId = interpreterContext.getNoteId(); return callbacks.get(noteId, replName, event); @@ -738,16 +738,16 @@ public String getCallback(String replName, String event) { */ @ZeppelinApi public String getCallback(String event) { - return getCallback("spark", event); + return getCallback(event, "spark"); } /** * Unbind code from given callback event - * @param replName Name of the interpreter * @param event The type of event to hook to (pre_exec, post_exec) + * @param replName Name of the interpreter */ @ZeppelinApi - public void unregisterCallback(String replName, String event) { + public void unregisterCallback(String event, String replName) { InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); String noteId = interpreterContext.getNoteId(); callbacks.unregister(noteId, replName, event); @@ -759,7 +759,7 @@ public void unregisterCallback(String replName, String event) { */ @ZeppelinApi public void unregisterCallback(String event) { - unregisterCallback("spark", event); + unregisterCallback(event, "spark"); } /** diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index c0e4e35ef02..6c2aa0c54de 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -99,23 +99,14 @@ def checkbox(self, name, options, defaultChecked = None): checkedIterables = self.z.checkbox(name, defaultCheckedIterables, optionIterables) return gateway.jvm.scala.collection.JavaConversions.asJavaCollection(checkedIterables) - def registerCallback(self, event, cmd): - self.z.registerCallback("pyspark", event, cmd) + def registerCallback(self, event, cmd, replName="pyspark"): + self.z.registerCallback(event, cmd, replName) - def registerCallbackToRepl(self, replName, event, cmd): - self.z.registerCallback(replName, event, cmd) - - def unregisterCallback(self, event): - self.z.unregisterCallback("pyspark", event) - - def unregisterCallbackFromRepl(self, replName, event): - self.z.unregisterCallback(replName, event) + def unregisterCallback(self, event, replName="pyspark"): + self.z.unregisterCallback(event, cmd, replName) - def getCallback(self, event): - return self.z.getCallback("pyspark", event) - - def getCallbackForRepl(self, replName, event): - return self.z.getCallback(replName, event) + def getCallback(self, event, replName="pyspark"): + return self.z.getCallback(event, replName) def __tupleToScalaTuple2(self, tuple): if (len(tuple) == 2): From 5822ecd85ec19c156ee7722fcab90dac675e94dd Mon Sep 17 00:00:00 2001 From: Alex Goodman Date: Tue, 20 Sep 2016 14:11:25 -0700 Subject: [PATCH 7/9] Added documentation for callback registry system --- docs/manual/interpreters.md | 44 +++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/docs/manual/interpreters.md b/docs/manual/interpreters.md index c47d5369d8a..6255f983e03 100644 --- a/docs/manual/interpreters.md +++ b/docs/manual/interpreters.md @@ -82,3 +82,47 @@ interpreter.start() The above code will start interpreter thread inside your process. Once the interpreter is started you can configure zeppelin to connect to RemoteInterpreter by checking **Connect to existing process** checkbox and then provide **Host** and **Port** on which interpreter porocess is listening as shown in the image below: + +## (Experimental) Interpreter Execution Callbacks + +Zeppelin allows for users to specify additional code to be executed by an interpreter at pre and post-paragraph code execution. This is primarily useful if you need to run the same set of code for all of the paragraphs within your notebook at specific times. Currently, this feature is only available for the spark and pyspark interpreters. To specify your "callback" code, you may use '`z.registerCallback()`. For example, enter the following into one paragraph: + +```python +%pyspark +z.registerCallback("post_exec", "print 'This code should be executed before the parapgraph code!'") +z.registerCallback("pre_exec", "print 'This code should be executed after the paragraph code!'") +``` + +These calls will not take into effect until the next time you run a paragraph. In another paragraph, enter +```python +%pyspark +print "This code should be entered into the paragraph by the user!" +``` + +The output should be: +``` +This code should be executed before the paragraph code! +This code should be entered into the paragraph by the user! +This code should be executed after the paragraph code! +``` + +If you ever need to know the callback code, use `z.getCallback()`: +```python +%pyspark +print z.getCallback("post_exec") +``` +``` +print 'This code should be executed after the paragraph code!' +``` +Any call to `z.registerCallback()` will automatically overwrite what was previously registered. To completely unregister a callback event, use `z.unregisterCallback(eventCode)`. Currently only `"post_exec"` and `"pre_exec"` are valid event codes for the Zeppelin Callback Registry system. + +Finally, the callback registry is internally shared by other interpreters in the same group. This would allow for callback code for one interpreter REPL to be set by another as follows: + +```scala +%spark +z.unregisterCallback("post_exec", "pyspark") +``` +The API is identical for both the spark (scala) and pyspark (python) implementations. + +### Caveats +Calls to `z.registerCallback("pre_exec", ...)` should be made with care. If there are errors in your specified callback code, this will cause the interpreter REPL to become unable to execute any code pass the pre-execute stage making it impossible for direct calls to `z.unregisterCallback()` to take into effect. Current workarounds include calling `z.unregisterCallback()` from a different interpreter REPL in the same interpreter group (see above) or manually restarting the interpreter group in the UI. From ffc02f7f1e29d7e635ee7562d493fd305a45479a Mon Sep 17 00:00:00 2001 From: Alex Goodman Date: Fri, 23 Sep 2016 13:49:02 -0700 Subject: [PATCH 8/9] Get default replName for callback registry from paragraph --- spark/pom.xml | 6 ++++ .../zeppelin/spark/ZeppelinContext.java | 20 ++++++++----- .../main/resources/python/zeppelin_pyspark.py | 30 ++++++++++++------- 3 files changed, 37 insertions(+), 19 deletions(-) diff --git a/spark/pom.xml b/spark/pom.xml index 66d93c42ee6..b42a405b1d7 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -51,6 +51,12 @@ slf4j-log4j12 + + ${project.groupId} + zeppelin-zengine + ${project.version} + + ${project.groupId} zeppelin-display_${scala.binary.version} diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index 84995a73bff..ff33a4652b8 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -41,6 +41,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; @@ -708,7 +709,7 @@ public void registerCallback(String event, String cmd, String replName) { String noteId = interpreterContext.getNoteId(); callbacks.register(noteId, replName, event, cmd); } - + /** * registerCallback() wrapper for the spark (scala) interpreter * @param event The type of event to hook to (pre_exec, post_exec) @@ -716,9 +717,10 @@ public void registerCallback(String event, String cmd, String replName) { */ @ZeppelinApi public void registerCallback(String event, String cmd) { - registerCallback(event, cmd, "spark"); + String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText()); + registerCallback(event, cmd, replName); } - + /** * Get the callback code * @param event The type of event to hook to (pre_exec, post_exec) @@ -730,7 +732,7 @@ public String getCallback(String event, String replName) { String noteId = interpreterContext.getNoteId(); return callbacks.get(noteId, replName, event); } - + /** * getCallback() wrapper for the spark (scala) interpreter * @param event The type of event to hook to (pre_exec, post_exec) @@ -738,9 +740,10 @@ public String getCallback(String event, String replName) { */ @ZeppelinApi public String getCallback(String event) { - return getCallback(event, "spark"); + String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText()); + return getCallback(event, replName); } - + /** * Unbind code from given callback event * @param event The type of event to hook to (pre_exec, post_exec) @@ -752,14 +755,15 @@ public void unregisterCallback(String event, String replName) { String noteId = interpreterContext.getNoteId(); callbacks.unregister(noteId, replName, event); } - + /** * Unbind code from given callback event * @param event The type of event to hook to (pre_exec, post_exec) */ @ZeppelinApi public void unregisterCallback(String event) { - unregisterCallback(event, "spark"); + String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText()); + unregisterCallback(event, replName); } /** diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 6c2aa0c54de..58e5fbaf313 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -80,16 +80,16 @@ def put(self, key, value): def get(self, key): return self.__getitem__(key) - def input(self, name, defaultValue = ""): + def input(self, name, defaultValue=""): return self.z.input(name, defaultValue) - def select(self, name, options, defaultValue = ""): + def select(self, name, options, defaultValue=""): # auto_convert to ArrayList doesn't match the method signature on JVM side tuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options)) iterables = gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(tuples) return self.z.select(name, defaultValue, iterables) - def checkbox(self, name, options, defaultChecked = None): + def checkbox(self, name, options, defaultChecked=None): if defaultChecked is None: defaultChecked = list(map(lambda items: items[0], options)) optionTuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options)) @@ -98,14 +98,22 @@ def checkbox(self, name, options, defaultChecked = None): checkedIterables = self.z.checkbox(name, defaultCheckedIterables, optionIterables) return gateway.jvm.scala.collection.JavaConversions.asJavaCollection(checkedIterables) - - def registerCallback(self, event, cmd, replName="pyspark"): - self.z.registerCallback(event, cmd, replName) - - def unregisterCallback(self, event, replName="pyspark"): - self.z.unregisterCallback(event, cmd, replName) - - def getCallback(self, event, replName="pyspark"): + + def registerCallback(self, event, cmd, replName=None): + if replName is None: + self.z.registerCallback(event, cmd) + else: + self.z.registerCallback(event, cmd, replName) + + def unregisterCallback(self, event, replName=None): + if replName is None: + self.z.unregisterCallback(event) + else: + self.z.unregisterCallback(event, replName) + + def getCallback(self, event, replName=None): + if replName is None: + return self.z.getCallback(event) return self.z.getCallback(event, replName) def __tupleToScalaTuple2(self, tuple): From 7b41a5e7c4c2e5bee29eca646275f3e8b3732514 Mon Sep 17 00:00:00 2001 From: Alex Goodman Date: Sun, 25 Sep 2016 12:58:13 -0700 Subject: [PATCH 9/9] Use @Experimental annotation for callback registry interface Use Experimental annotation for callback registry interface --- .../org/apache/zeppelin/spark/ZeppelinContext.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index ff33a4652b8..06f1dc98ad2 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -32,6 +32,7 @@ import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.zeppelin.annotation.ZeppelinApi; +import org.apache.zeppelin.annotation.Experimental; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectWatcher; @@ -703,7 +704,7 @@ private void angularUnbind(String name, String noteId) { * @param cmd The code to be executed by the interpreter on given event * @param replName Name of the interpreter */ - @ZeppelinApi + @Experimental public void registerCallback(String event, String cmd, String replName) { InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); String noteId = interpreterContext.getNoteId(); @@ -715,7 +716,7 @@ public void registerCallback(String event, String cmd, String replName) { * @param event The type of event to hook to (pre_exec, post_exec) * @param cmd The code to be executed by the interpreter on given event */ - @ZeppelinApi + @Experimental public void registerCallback(String event, String cmd) { String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText()); registerCallback(event, cmd, replName); @@ -726,7 +727,7 @@ public void registerCallback(String event, String cmd) { * @param event The type of event to hook to (pre_exec, post_exec) * @param replName Name of the interpreter */ - @ZeppelinApi + @Experimental public String getCallback(String event, String replName) { InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); String noteId = interpreterContext.getNoteId(); @@ -738,7 +739,7 @@ public String getCallback(String event, String replName) { * @param event The type of event to hook to (pre_exec, post_exec) * @param cmd The code to be executed by the interpreter on given event */ - @ZeppelinApi + @Experimental public String getCallback(String event) { String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText()); return getCallback(event, replName); @@ -749,7 +750,7 @@ public String getCallback(String event) { * @param event The type of event to hook to (pre_exec, post_exec) * @param replName Name of the interpreter */ - @ZeppelinApi + @Experimental public void unregisterCallback(String event, String replName) { InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry(); String noteId = interpreterContext.getNoteId(); @@ -760,7 +761,7 @@ public void unregisterCallback(String event, String replName) { * Unbind code from given callback event * @param event The type of event to hook to (pre_exec, post_exec) */ - @ZeppelinApi + @Experimental public void unregisterCallback(String event) { String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText()); unregisterCallback(event, replName);