Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static void setUp() {
Properties p = new Properties();
flink = new FlinkInterpreter(p);
flink.open();
context = new InterpreterContext(null, null, null, null, null, null, null, null, null);
context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public void readTest() throws IOException {
HiveInterpreter t = new HiveInterpreter(properties);
t.open();

assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME"));
assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null,null)).message().contains("SCHEMA_NAME"));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null)).message());
}

@Test
Expand All @@ -101,7 +101,7 @@ public void readTestWithConfiguration() throws IOException {
t.open();

assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null)).message());
}

@Test
Expand All @@ -117,13 +117,13 @@ public void jdbcRestart() throws IOException, SQLException, ClassNotFoundExcepti
t.open();

InterpreterResult interpreterResult =
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());

t.getConnection("default").close();

interpreterResult =
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
}

Expand All @@ -139,7 +139,7 @@ public void test() throws IOException {
HiveInterpreter t = new HiveInterpreter(properties);
t.open();

InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null);
InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null, null);

//simple select test
InterpreterResult result = t.interpret("select * from test_table", interpreterContext);
Expand Down Expand Up @@ -193,4 +193,4 @@ public void getPropertyKey() {
assertEquals("get key of default", "default", hi.getPropertyKey(testCommand));
hi.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class IgniteInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";

private static final InterpreterContext INTP_CONTEXT =
new InterpreterContext(null, null, null, null, null, null, null, null, null);
new InterpreterContext(null, null, null, null, null, null, null, null, null, null);

private IgniteInterpreter intp;
private Ignite ignite;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";

private static final InterpreterContext INTP_CONTEXT =
new InterpreterContext(null, null, null, null, null, null, null, null, null);
new InterpreterContext(null, null, null, null, null, null, null, null, null, null);

private Ignite ignite;
private IgniteSqlInterpreter intp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testSelectQuery() throws SQLException, IOException {

String sqlQuery = "select * from test_table";

InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null));
InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null,null));

assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
Expand All @@ -116,7 +116,7 @@ public void testSelectQueryMaxResult() throws SQLException, IOException {

String sqlQuery = "select * from test_table";

InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null));
InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null,null));

assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void setUp() throws Exception {
InterpreterGroup intpGroup = new InterpreterGroup();
context = new InterpreterContext("note", "id", "title", "text",
new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null),
intpGroup.getId(), null), null,
new LinkedList<InterpreterContextRunner>(), null);
}

Expand Down
64 changes: 61 additions & 3 deletions spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SQLContext.QueryExecution;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.zeppelin.display.AngularObject;
Expand All @@ -45,15 +43,19 @@
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.ResourceSet;

import scala.Tuple2;
import scala.Unit;
import scala.collection.Iterable;
import scala.collection.JavaConversions;

/**
* Spark context for zeppelin.
*/
public class ZeppelinContext extends HashMap<String, Object> {
public class ZeppelinContext {
private SparkDependencyResolver dep;
private InterpreterContext interpreterContext;
private int maxResult;
Expand Down Expand Up @@ -754,4 +756,60 @@ private void angularUnbind(String name, String noteId) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
registry.remove(name, noteId, null);
}


/**
* Add object into resource pool
* @param name
* @param value
*/
public void put(String name, Object value) {
ResourcePool resourcePool = interpreterContext.getResourcePool();
resourcePool.put(name, value);
}

/**
* Get object from resource pool
* Search local process first and then the other processes
* @param name
* @return null if resource not found
*/
public Object get(String name) {
ResourcePool resourcePool = interpreterContext.getResourcePool();
Resource resource = resourcePool.get(name);
if (resource != null) {
return resource.get();
} else {
return null;
}
}

/**
* Remove object from resourcePool
* @param name
*/
public void remove(String name) {
ResourcePool resourcePool = interpreterContext.getResourcePool();
resourcePool.remove(name);
}

/**
* Check if resource pool has the object
* @param name
* @return
*/
public boolean containsKey(String name) {
ResourcePool resourcePool = interpreterContext.getResourcePool();
Resource resource = resourcePool.get(name);
return resource != null;
}

/**
* Get all resources
*/
public ResourceSet getAll() {
ResourcePool resourcePool = interpreterContext.getResourcePool();
return resourcePool.getAll();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void setUp() throws Exception {

context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
null,
new LinkedList<InterpreterContextRunner>(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void setUp() throws Exception {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
null,
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(new InterpreterOutputListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -66,6 +67,7 @@ public void setUp() throws Exception {
}
context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
null,
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.resource.ResourcePool;

/**
* Interpreter context
Expand Down Expand Up @@ -50,6 +51,7 @@ public static void remove() {
private final Map<String, Object> config;
private GUI gui;
private AngularObjectRegistry angularObjectRegistry;
private ResourcePool resourcePool;
private List<InterpreterContextRunner> runners;

public InterpreterContext(String noteId,
Expand All @@ -59,6 +61,7 @@ public InterpreterContext(String noteId,
Map<String, Object> config,
GUI gui,
AngularObjectRegistry angularObjectRegistry,
ResourcePool resourcePool,
List<InterpreterContextRunner> runners,
InterpreterOutput out
) {
Expand All @@ -69,6 +72,7 @@ public InterpreterContext(String noteId,
this.config = config;
this.gui = gui;
this.angularObjectRegistry = angularObjectRegistry;
this.resourcePool = resourcePool;
this.runners = runners;
this.out = out;
}
Expand Down Expand Up @@ -102,6 +106,10 @@ public AngularObjectRegistry getAngularObjectRegistry() {
return angularObjectRegistry;
}

public ResourcePool getResourcePool() {
return resourcePool;
}

public List<InterpreterContextRunner> getRunners() {
return runners;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.zeppelin.interpreter;

import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.log4j.Logger;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.resource.ResourcePool;

/**
* InterpreterGroup is list of interpreters in the same group.
Expand All @@ -37,13 +36,27 @@ public class InterpreterGroup extends LinkedList<Interpreter>{

AngularObjectRegistry angularObjectRegistry;
RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
ResourcePool resourcePool;

private static final Map<String, InterpreterGroup> allInterpreterGroups =
new ConcurrentHashMap<String, InterpreterGroup>();

public static InterpreterGroup get(String id) {
return allInterpreterGroups.get(id);
}

public static Collection<InterpreterGroup> getAll() {
return new LinkedList(allInterpreterGroups.values());
}

public InterpreterGroup(String id) {
this.id = id;
allInterpreterGroups.put(id, this);
}

public InterpreterGroup() {
getId();
allInterpreterGroups.put(id, this);
}

private static String generateId() {
Expand Down Expand Up @@ -135,5 +148,15 @@ public void run() {
remoteInterpreterProcess.dereference();
}
}

allInterpreterGroups.remove(id);
}

public void setResourcePool(ResourcePool resourcePool) {
this.resourcePool = resourcePool;
}

public ResourcePool getResourcePool() {
return resourcePool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ private synchronized void init() {
try {
for (Interpreter intp : this.getInterpreterGroup()) {
logger.info("Create remote interpreter {}", intp.getClassName());
client.createInterpreter(intp.getClassName(), (Map) property);
client.createInterpreter(getInterpreterGroup().getId(),
intp.getClassName(), (Map) property);

}
} catch (TException e) {
Expand Down Expand Up @@ -169,7 +170,9 @@ public void close() {
boolean broken = false;
try {
client = interpreterProcess.getClient();
client.close(className);
if (client != null) {
client.close(className);
}
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
Expand Down Expand Up @@ -288,6 +291,10 @@ public FormType getFormType() {
@Override
public int getProgress(InterpreterContext context) {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
if (interpreterProcess == null || !interpreterProcess.isRunning()) {
return 0;
}

Client client = null;
try {
client = interpreterProcess.getClient();
Expand Down
Loading