Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/manual/interpreters.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,47 @@ interpreter.start()
The above code will start interpreter thread inside your process. Once the interpreter is started you can configure zeppelin to connect to RemoteInterpreter by checking **Connect to existing process** checkbox and then provide **Host** and **Port** on which interpreter porocess is listening as shown in the image below:

<img src="../assets/themes/zeppelin/img/screenshots/existing_interpreter.png" width="450px">

## (Experimental) Interpreter Execution Callbacks

Zeppelin allows for users to specify additional code to be executed by an interpreter at pre and post-paragraph code execution. This is primarily useful if you need to run the same set of code for all of the paragraphs within your notebook at specific times. Currently, this feature is only available for the spark and pyspark interpreters. To specify your "callback" code, you may use '`z.registerCallback()`. For example, enter the following into one paragraph:

```python
%pyspark
z.registerCallback("post_exec", "print 'This code should be executed before the parapgraph code!'")
z.registerCallback("pre_exec", "print 'This code should be executed after the paragraph code!'")
```

These calls will not take into effect until the next time you run a paragraph. In another paragraph, enter
```python
%pyspark
print "This code should be entered into the paragraph by the user!"
```

The output should be:
```
This code should be executed before the paragraph code!
This code should be entered into the paragraph by the user!
This code should be executed after the paragraph code!
```

If you ever need to know the callback code, use `z.getCallback()`:
```python
%pyspark
print z.getCallback("post_exec")
```
```
print 'This code should be executed after the paragraph code!'
```
Any call to `z.registerCallback()` will automatically overwrite what was previously registered. To completely unregister a callback event, use `z.unregisterCallback(eventCode)`. Currently only `"post_exec"` and `"pre_exec"` are valid event codes for the Zeppelin Callback Registry system.

Finally, the callback registry is internally shared by other interpreters in the same group. This would allow for callback code for one interpreter REPL to be set by another as follows:

```scala
%spark
z.unregisterCallback("post_exec", "pyspark")
```
The API is identical for both the spark (scala) and pyspark (python) implementations.

### Caveats
Calls to `z.registerCallback("pre_exec", ...)` should be made with care. If there are errors in your specified callback code, this will cause the interpreter REPL to become unable to execute any code pass the pre-execute stage making it impossible for direct calls to `z.unregisterCallback()` to take into effect. Current workarounds include calling `z.unregisterCallback()` from a different interpreter REPL in the same interpreter group (see above) or manually restarting the interpreter group in the UI.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static void setUp() {
Properties p = new Properties();
flink = new FlinkInterpreter(p);
flink.open();
context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null);
context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class IgniteInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";

private static final InterpreterContext INTP_CONTEXT =
new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null);
new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null);

private IgniteInterpreter intp;
private Ignite ignite;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";

private static final InterpreterContext INTP_CONTEXT =
new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null);
new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null);

private Ignite ignite;
private IgniteSqlInterpreter intp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void setUp() throws Exception {
insertStatement.setString(1, null);
insertStatement.execute();
interpreterContext = new InterpreterContext("", "1", "", "", new AuthenticationInfo(), null, null, null, null,
null, null);
null, null, null);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void setUp() throws Exception {

context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null), null,
new AngularObjectRegistry(intpGroup.getId(), null), null, null,
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(
new InterpreterOutputListener() {
@Override public void onAppend(InterpreterOutput out, byte[] line) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void setUp() throws Exception {
InterpreterGroup intpGroup = new InterpreterGroup();
context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null), null,
intpGroup.getId(), null), null, null,
new LinkedList<InterpreterContextRunner>(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void tearDown() throws Exception {
@Test
public void test() {
shell.open();
InterpreterContext context = new InterpreterContext("", "1", "", "", null, null, null, null, null, null, null);
InterpreterContext context = new InterpreterContext("", "1", "", "", null, null, null, null, null, null, null, null);
InterpreterResult result = new InterpreterResult(Code.ERROR);
if (System.getProperty("os.name").startsWith("Windows")) {
result = shell.interpret("dir", context);
Expand All @@ -64,7 +64,7 @@ public void test() {
@Test
public void testInvalidCommand(){
shell.open();
InterpreterContext context = new InterpreterContext("","1","","",null,null,null,null,null,null,null);
InterpreterContext context = new InterpreterContext("","1","","",null,null,null,null,null,null,null,null);
InterpreterResult result = new InterpreterResult(Code.ERROR);
if (System.getProperty("os.name").startsWith("Windows")) {
result = shell.interpret("invalid_command\ndir",context);
Expand Down
6 changes: 6 additions & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<artifactId>slf4j-log4j12</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-zengine</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-display_${scala.binary.version}</artifactId>
Expand Down
71 changes: 71 additions & 0 deletions spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.annotation.Experimental;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectWatcher;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input.ParamOption;
import org.apache.zeppelin.interpreter.InterpreterCallbackRegistry;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
Expand Down Expand Up @@ -695,6 +698,74 @@ private void angularUnbind(String name, String noteId) {
registry.remove(name, noteId, null);
}

/**
* General function to register callback event
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
* @param replName Name of the interpreter
*/
@Experimental
public void registerCallback(String event, String cmd, String replName) {
InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry();
String noteId = interpreterContext.getNoteId();
callbacks.register(noteId, replName, event, cmd);
}

/**
* registerCallback() wrapper for the spark (scala) interpreter
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
*/
@Experimental
public void registerCallback(String event, String cmd) {
String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText());
registerCallback(event, cmd, replName);
}

/**
* Get the callback code
* @param event The type of event to hook to (pre_exec, post_exec)
* @param replName Name of the interpreter
*/
@Experimental
public String getCallback(String event, String replName) {
InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry();
String noteId = interpreterContext.getNoteId();
return callbacks.get(noteId, replName, event);
}

/**
* getCallback() wrapper for the spark (scala) interpreter
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
*/
@Experimental
public String getCallback(String event) {
String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText());
return getCallback(event, replName);
}

/**
* Unbind code from given callback event
* @param event The type of event to hook to (pre_exec, post_exec)
* @param replName Name of the interpreter
*/
@Experimental
public void unregisterCallback(String event, String replName) {
InterpreterCallbackRegistry callbacks = interpreterContext.getInterpreterCallbackRegistry();
String noteId = interpreterContext.getNoteId();
callbacks.unregister(noteId, replName, event);
}

/**
* Unbind code from given callback event
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public void unregisterCallback(String event) {
String replName = Paragraph.getRequiredReplName(interpreterContext.getParagraphText());
unregisterCallback(event, replName);
}

/**
* Add object into resource pool
Expand Down
23 changes: 20 additions & 3 deletions spark/src/main/resources/python/zeppelin_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ def put(self, key, value):
def get(self, key):
return self.__getitem__(key)

def input(self, name, defaultValue = ""):
def input(self, name, defaultValue=""):
return self.z.input(name, defaultValue)

def select(self, name, options, defaultValue = ""):
def select(self, name, options, defaultValue=""):
# auto_convert to ArrayList doesn't match the method signature on JVM side
tuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options))
iterables = gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(tuples)
return self.z.select(name, defaultValue, iterables)

def checkbox(self, name, options, defaultChecked = None):
def checkbox(self, name, options, defaultChecked=None):
if defaultChecked is None:
defaultChecked = list(map(lambda items: items[0], options))
optionTuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options))
Expand All @@ -99,6 +99,23 @@ def checkbox(self, name, options, defaultChecked = None):
checkedIterables = self.z.checkbox(name, defaultCheckedIterables, optionIterables)
return gateway.jvm.scala.collection.JavaConversions.asJavaCollection(checkedIterables)

def registerCallback(self, event, cmd, replName=None):
if replName is None:
self.z.registerCallback(event, cmd)
else:
self.z.registerCallback(event, cmd, replName)

def unregisterCallback(self, event, replName=None):
if replName is None:
self.z.unregisterCallback(event)
else:
self.z.unregisterCallback(event, replName)

def getCallback(self, event, replName=None):
if replName is None:
return self.z.getCallback(event)
return self.z.getCallback(event, replName)

def __tupleToScalaTuple2(self, tuple):
if (len(tuple) == 2):
return gateway.jvm.scala.Tuple2(tuple[0], tuple[1])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void setUp() throws Exception {
context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
null,
null, null,
new LinkedList<InterpreterContextRunner>(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void setUp() throws Exception {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new InterpreterCallbackRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(new InterpreterOutputListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void setUp() throws Exception {
context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new InterpreterCallbackRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ trait AbstractAngularElemTest
val context = new InterpreterContext("note", "paragraph", "title", "text",
new AuthenticationInfo(), new util.HashMap[String, Object](), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
null,
null, null,
new util.LinkedList[InterpreterContextRunner](),
new InterpreterOutput(new InterpreterOutputListener() {
override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ with BeforeAndAfter with BeforeAndAfterEach with Eventually with Matchers {
val context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
new java.util.HashMap[String, Object](), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null),
null,
null, null,
new java.util.LinkedList[InterpreterContextRunner](),
new InterpreterOutput(new InterpreterOutputListener() {
override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public void destroy() {
public static Logger logger = LoggerFactory.getLogger(Interpreter.class);
private InterpreterGroup interpreterGroup;
private URL [] classloaderUrls;
protected Map<String, String> callbackRegistry = new HashMap<String, String>();
protected Properties property;

@ZeppelinApi
Expand Down Expand Up @@ -370,4 +371,5 @@ public static RegisteredInterpreter findRegisteredInterpreterByClassName(String
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.interpreter;

/**
* An interface for processing custom callback code into the interpreter.
*/
public interface InterpreterCallbackListener {
public String onPreExecute(String script);
public String onPostExecute(String script);
}
Loading