diff --git a/bin/common.sh b/bin/common.sh index 4d575e891db..a70be7c6b85 100644 --- a/bin/common.sh +++ b/bin/common.sh @@ -105,6 +105,7 @@ if [[ -z "$ZEPPELIN_MEM" ]]; then fi JAVA_OPTS+=" ${ZEPPELIN_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING} ${ZEPPELIN_MEM}" +JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties" export JAVA_OPTS # jvm options for interpreter process @@ -117,6 +118,7 @@ if [[ -z "${ZEPPELIN_INTP_MEM}" ]]; then fi JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}" +JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties" export JAVA_INTP_OPTS diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 3fb55c21d7f..f756dca1f8e 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -144,7 +144,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.rinterpreter.RRepl + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/file/pom.xml b/file/pom.xml index 586f41add2b..e416358a6a8 100644 --- a/file/pom.xml +++ b/file/pom.xml @@ -29,7 +29,7 @@ zeppelin-file jar 0.6.0-incubating-SNAPSHOT - Zeppelin File System Interpreters + Zeppelin: File System Interpreters http://www.apache.org diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 152f70cf6e9..ea0054188ac 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -91,7 +91,12 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand public PySparkInterpreter(Properties property) { super(property); - scriptPath = System.getProperty("java.io.tmpdir") + "/zeppelin_pyspark.py"; + try { + File scriptFile = File.createTempFile("zeppelin_pyspark-", ".py"); + scriptPath = scriptFile.getAbsolutePath(); + } catch (IOException e) { + throw new InterpreterException(e); + } } private void createPythonScript() { @@ -235,6 +240,7 @@ private int findRandomOpenPortOnAllLocalInterfaces() { @Override public void close() { executor.getWatchdog().destroyProcess(); + new File(scriptPath).delete(); gatewayServer.shutdown(); } diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index cd4d36bc3b7..b53e80f9d59 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -254,7 +254,7 @@ private DepInterpreter getDepInterpreter() { } public SparkContext createSparkContext() { - System.err.println("------ Create new SparkContext " + getProperty("master") + " -------"); + logger.info("------ Create new SparkContext {} -------", getProperty("master")); String execUri = System.getenv("SPARK_EXECUTOR_URI"); String[] jars = SparkILoop.getAddedJars(); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index 8d92c968f50..93a3bcb5899 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -111,8 +111,12 @@ public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort) { this.rCmdPath = rCmdPath; this.libPath = libPath; this.port = sparkRBackendPort; - scriptPath = System.getProperty("java.io.tmpdir") + "/zeppelin_sparkr.R"; - + try { + File scriptFile = File.createTempFile("zeppelin_sparkr-", ".R"); + scriptPath = scriptFile.getAbsolutePath(); + } catch (IOException e) { + throw new InterpreterException(e); + } } /** diff --git a/testing/downloadSpark.sh b/testing/downloadSpark.sh index d12580f0f3f..004e1dd483a 100755 --- a/testing/downloadSpark.sh +++ b/testing/downloadSpark.sh @@ -40,10 +40,28 @@ fi set -xe -TIMEOUT_SEC=590 +MAX_DOWNLOAD_TIME_SEC=590 FWDIR="$(dirname "${BASH_SOURCE-$0}")" ZEPPELIN_HOME="$(cd "${FWDIR}/.."; pwd)" +####################################### +# Downloads file from the givrn URL. +# Ties 3 times with 1s delay, 20s read and 15s connection timeouts. +# Globals: +# None +# Arguments: +# url - source URL +# Returns: +# None +####################################### +download_with_retry() { + local url="$1" + wget --retry-connrefused --waitretry=1 --read-timeout=20 --timeout=15 -t 3 "${url}" + if [[ "$?" -ne 0 ]]; then + echo "3 download attempts for ${url} failed" + fi +} + SPARK_CACHE=".spark-dist" SPARK_ARCHIVE="spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" export SPARK_HOME="${ZEPPELIN_HOME}/${SPARK_ARCHIVE}" @@ -56,11 +74,13 @@ if [[ ! -d "${SPARK_HOME}" ]]; then pwd ls -la . echo "${SPARK_CACHE} does not have ${SPARK_ARCHIVE} downloading ..." + # download archive if not cached if [[ "${SPARK_VER_RANGE}" == "<=1.2" ]]; then # spark 1.1.x and spark 1.2.x can be downloaded from archive STARTTIME=`date +%s` - timeout -s KILL "${TIMEOUT_SEC}" wget -q "http://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_ARCHIVE}.tgz" + #timeout -s KILL "${MAX_DOWNLOAD_TIME_SEC}" wget "http://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_ARCHIVE}.tgz" + download_with_retry "http://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_ARCHIVE}.tgz" ENDTIME=`date +%s` DOWNLOADTIME="$((ENDTIME-STARTTIME))" else @@ -72,7 +92,8 @@ if [[ ! -d "${SPARK_HOME}" ]]; then PATHINFO=$(echo "${MIRROR_INFO}" | grep path_info | sed 's/[^"]*.path_info.: .\([^"]*\).*/\1/g') STARTTIME=`date +%s` - timeout -s KILL "${TIMEOUT_SEC}" wget -q "${PREFFERED}${PATHINFO}" + #timeout -s KILL "${MAX_DOWNLOAD_TIME_SEC}" wget -q "${PREFFERED}${PATHINFO}" + download_with_retry "${PREFFERED}${PATHINFO}" ENDTIME=`date +%s` DOWNLOADTIME="$((ENDTIME-STARTTIME))" fi diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 67b4d5fbc9a..da1f8d023ef 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -214,8 +214,18 @@ + + org.apache.commons + commons-vfs2 + 2.0 + + + plexus-utils + org.codehaus.plexus + + + - diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 6e369c0694a..b19fd9cd09e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -134,17 +134,47 @@ public static void main(String[] args) System.exit(0); } + private DistributedResourcePool getResourcePool() + /* InterpreterGroup group, + Properties prop, + RemoteInterpreterEventClient client) */ + throws TException { + if (resourcePool != null) + return resourcePool; + try { + Properties prop = interpreterGroup.getProperty(); + //Happens during tests. + if (prop == null) + prop = new Properties(); + String resourcePoolClassName = (String) prop.getProperty( + "zeppelin.interpreter.resourcePoolClass"); + logger.debug("Getting resource pool {}", resourcePoolClassName); + Class resourcePoolClass = Class.forName(resourcePoolClassName); + + Constructor constructor = resourcePoolClass + .getConstructor(new Class[] {String.class, + ResourcePoolConnector.class, + Properties.class }); + resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(), + this.eventClient, + prop); + interpreterGroup.setResourcePool(resourcePool); + return resourcePool; + } catch (Exception e) { + logger.error(e.toString(), e); + return new DistributedResourcePool(interpreterGroup.getId(), this.eventClient); + // throw new TException(e); + } + } @Override public void createInterpreter(String interpreterGroupId, String noteId, String className, - Map properties) throws TException { + Map properties) throws TException { if (interpreterGroup == null) { interpreterGroup = new InterpreterGroup(interpreterGroupId); angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this); - resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient); interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); - interpreterGroup.setResourcePool(resourcePool); } try { @@ -170,7 +200,12 @@ public void createInterpreter(String interpreterGroupId, String noteId, String } logger.info("Instantiate interpreter {}", className); + + interpreterGroup.setResourcePool(getResourcePool()); + repl.setInterpreterGroup(interpreterGroup); + + //setResourcePool(interpreterGroup, p, eventClient); } catch (ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { @@ -178,7 +213,6 @@ public void createInterpreter(String interpreterGroupId, String noteId, String throw new TException(e); } } - private Interpreter getInterpreter(String noteId, String className) throws TException { if (interpreterGroup == null) { throw new TException( @@ -353,11 +387,13 @@ protected Object jobRun() throws Throwable { } // put result into resource pool - context.getResourcePool().put( - context.getNoteId(), - context.getParagraphId(), - WellKnownResourceName.ParagraphResult.toString(), - combinedResult); + if (context.getResourcePool() != null) { + context.getResourcePool().put( + context.getNoteId(), + context.getParagraphId(), + WellKnownResourceName.ParagraphResult.toString(), + combinedResult); + } return combinedResult; } finally { InterpreterContext.remove(); @@ -388,7 +424,7 @@ public void cancel(String noteId, String className, RemoteInterpreterContext int @Override public int getProgress(String noteId, String className, - RemoteInterpreterContext interpreterContext) + RemoteInterpreterContext interpreterContext) throws TException { Interpreter intp = getInterpreter(noteId, className); return intp.getProgress(convert(interpreterContext)); @@ -411,7 +447,7 @@ public List completion(String noteId, String className, String buf, int private InterpreterContext convert(RemoteInterpreterContext ric) { List contextRunners = new LinkedList(); List runners = gson.fromJson(ric.getRunners(), - new TypeToken>() { + new TypeToken>() { }.getType()); for (InterpreterContextRunner r : runners) { @@ -572,7 +608,7 @@ public void angularObjectUpdate(String name, String noteId, String paragraphId, if (value == null) { try { value = gson.fromJson(object, - new TypeToken>() { + new TypeToken>() { }.getType()); } catch (Exception e) { // it's not a generic json object, too. okay, proceed to threat as a string type @@ -608,7 +644,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str try { value = gson.fromJson(object, new TypeToken>() { - }.getType()); + }.getType()); } catch (Exception e) { // it's okay. proceed to treat object as a string logger.debug(e.getMessage(), e); @@ -624,7 +660,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str @Override public void angularObjectRemove(String name, String noteId, String paragraphId) throws - TException { + TException { AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); registry.remove(name, noteId, paragraphId, false); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java index ba31f017b83..12292f202ad 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java @@ -16,16 +16,27 @@ */ package org.apache.zeppelin.resource; +import java.util.Properties; + /** * distributed resource pool */ public class DistributedResourcePool extends LocalResourcePool { private final ResourcePoolConnector connector; - + protected Properties property; + public DistributedResourcePool(String id, ResourcePoolConnector connector) { super(id); this.connector = connector; + this.property = new Properties(); + } + + + public DistributedResourcePool(String id, ResourcePoolConnector connector, Properties property) { + super(id); + this.connector = connector; + this.property = property; } @Override diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java new file mode 100644 index 00000000000..7315af7c266 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.util.List; + +import org.apache.thrift.TException; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; + +import com.google.gson.Gson; +/** + * Makes a remote interpreter service client act as a resource pool connector. + */ +public class RemoteInterpreterProcessResourcePoolConnector implements ResourcePoolConnector { + + private Client client; + + public RemoteInterpreterProcessResourcePoolConnector(Client client) { + this.client = client; + } + + @Override + public ResourceSet getAllResources() { + try { + List resourceList = client.resourcePoolGetAll(); + ResourceSet resources = new ResourceSet(); + Gson gson = new Gson(); + + for (String res : resourceList) { + RemoteResource r = gson.fromJson(res, RemoteResource.class); + r.setResourcePoolConnector(this); + resources.add(r); + } + + return resources; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object readResource(ResourceId id) { + try { + // TODO(Object): Deserialize object + return client.resourceGet(id.getNoteId(), id.getParagraphId(), id.getName()); + } catch (TException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 1825bfed217..4eb1d6a8896 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -18,6 +18,8 @@ package org.apache.zeppelin.resource; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; @@ -54,10 +56,11 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio boolean broken = false; try { client = remoteInterpreterProcess.getClient(); - List resourceList = client.resourcePoolGetAll(); - Gson gson = new Gson(); - for (String res : resourceList) { - resourceSet.add(gson.fromJson(res, Resource.class)); + RemoteInterpreterProcessResourcePoolConnector remoteConnector = + new RemoteInterpreterProcessResourcePoolConnector(client); + + for (Resource r: remoteConnector.getAllResources()) { + resourceSet.add(r); } } catch (Exception e) { logger.error(e.getMessage(), e); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java new file mode 100644 index 00000000000..887f3078525 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.lang.reflect.Type; + +import com.google.gson.Gson; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +/** + * Serializes and Deserializes resources if they are serializable. + */ +public class ResourceSerializer implements JsonDeserializer, JsonSerializer { + + public ResourceSerializer() { + } + + @Override + public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { + // This is straightforward at the moment. + Gson gson = new Gson(); + JsonElement elem = gson.toJsonTree(src); + JsonObject obj = elem.getAsJsonObject(); + if (src.isSerializable()) { + obj.add("r", gson.toJsonTree(src.get())); + } + return obj; + } + + @Override + public Resource deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + // This requires that we use the class that's stored in the element to deserialize. + JsonObject obj = json.getAsJsonObject(); + String className = obj.getAsJsonPrimitive("className").getAsString(); + + Gson gson = new Gson(); + Object r; + try { + r = gson.fromJson(obj.get("r"), Class.forName(className)); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to deserialize the resource"); + } + ResourceId id = gson.fromJson(obj.get("resourceId"), ResourceId.class); + + return new Resource(id, r); + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java new file mode 100644 index 00000000000..856b03a15b7 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; + +import javax.management.RuntimeErrorException; + +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSelectInfo; +import org.apache.commons.vfs2.FileSelector; +import org.apache.commons.vfs2.FileSystemException; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.commons.vfs2.FileType; +import org.apache.commons.vfs2.NameScope; +import org.apache.commons.vfs2.VFS; + +/** + * Resource pool that saves resources to the local file system. + * + */ +public class VFSResourcePool extends DistributedResourcePool { + Logger logger = LoggerFactory.getLogger(VFSResourcePool.class); + + + @Override + public void put(String name, Object object) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists()) { + resultDir.createFolder(); + } + + if (!isDirectory(resultDir)) { + throw new IOException(resultDir.getName().toString() + " is not a directory"); + } + Gson gson = new Gson(); + + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + // false means not appending. creates file if not exists + OutputStream out = resultFile.getContent().getOutputStream(false); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(object); + out.close(); + } catch (IOException e) { + super.put(name, object); + throw new RuntimeException(e); + } + } + + @Override + public Resource get(String name) { + return get(name, true); + } + + @Override + public Resource get(String noteId, String paragraphId, String name) { + Resource r = get(noteId, paragraphId, name, true); + return new Resource( + new ResourceId(this.id(), noteId, paragraphId, name), r.get()); + } + + @Override + public Resource get(String name, boolean remote) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists() || !isDirectory(resultDir)) + return null; + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + + InputStream instream = resultFile.getContent().getInputStream(); + ObjectInputStream ois = new ObjectInputStream(instream); + + try { + Object o = ois.readObject(); + String[] splitName = name.split("___"); + ResourceId r = null; + if (splitName.length == 3) + { + r = new ResourceId(this.id(), splitName[0], splitName[1], splitName[2]); + } + else { + r = new ResourceId(this.id(), name); + } + return new Resource(r, o); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + } catch (IOException e) { + } + return super.get(name, remote); + } + + @Override + public Resource get(String noteId, String paragraphId, String name, boolean remote) { + return get(noteId + "___" + paragraphId + "___" + name, remote); + } + + @Override + public ResourceSet getAll() { + return getAll(true); + } + + @Override + public ResourceSet getAll(boolean remote) { + ResourceSet resources = new ResourceSet(); + try { + FileObject rootDir = getRootDir(); + for (FileObject resourceDir: rootDir.getChildren()) + { + if (resourceDir.getType() == FileType.FOLDER) + resources.add(get(resourceDir.getName().getBaseName())); + } + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + if (remote) + resources.addAll(super.getAll(remote)); + return resources; + } + + @Override + public void put(String noteId, String paragraphId, String name, Object object) { + put(noteId + "___" + paragraphId + "___name", object); + } + + @Override + public Resource remove(String name) { + try { + Resource r = get(name); + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (resultDir.exists()) { + resultDir.delete(new FileSelector() { + @Override + public boolean traverseDescendents(FileSelectInfo fileInfo) throws Exception { + return true; + } + + @Override + public boolean includeFile(FileSelectInfo fileInfo) throws Exception { + return true; + } + }); + } + return r; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Resource remove(String noteId, String paragraphId, String name) { + return remove(noteId + "___" + paragraphId + "___" + name); + } + + private FileSystemManager fsManager; + private URI filesystemRoot; + public VFSResourcePool(String id, ResourcePoolConnector connector, Properties property) { + super(id, connector, property); + try { + this.filesystemRoot = new + URI(property.getProperty("Resource_Path", "notebook/zeppelin_resources")); + } catch (URISyntaxException e1) { + throw new RuntimeException(e1); + } + + if (filesystemRoot.getScheme() == null) { // it is local path + try { + this.filesystemRoot = new URI(new File( filesystemRoot.getPath()).getAbsolutePath()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } else { + this.filesystemRoot = filesystemRoot; + } + + try { + fsManager = VFS.getManager(); + FileObject file; + file = fsManager.resolveFile(filesystemRoot.getPath()); + if (!file.exists()) { + logger.info("Notebook dir doesn't exist, create."); + file.createFolder(); + } + } catch (FileSystemException e) { + throw new RuntimeException("Unable to load new file system."); + } + } + + private FileObject getRootDir() throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + // Does nothing if the folder already exists. + rootDir.createFolder(); + if (!isDirectory(rootDir)) { + throw new IOException("Root path is not a directory"); + } + + return rootDir; + } + + private boolean isDirectory(FileObject fo) throws IOException { + if (fo == null) return false; + if (fo.getType() == FileType.FOLDER) { + return true; + } else { + return false; + } + } + + private String getPath(String path) { + if (path == null || path.trim().length() == 0) { + return filesystemRoot.toString(); + } + if (path.startsWith("/")) { + return filesystemRoot.toString() + path; + } else { + return filesystemRoot.toString() + "/" + path; + } + } +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java index 3826b903115..8e408da58bd 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -37,7 +37,7 @@ public class MockInterpreterResourcePool extends Interpreter { Interpreter.register( "resourcePoolTest", "resourcePool", - MockInterpreterA.class.getName(), + MockInterpreterResourcePool.class.getName(), new InterpreterPropertyBuilder() .add("p1", "v1", "property1").build()); diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index f2c9ced7d63..3344580eef8 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -243,7 +243,6 @@ org.apache.commons commons-exec 1.3 - test diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 9fe8dab9dc4..d57aa710470 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -17,16 +17,6 @@ package org.apache.zeppelin.server; -import java.io.File; -import java.io.IOException; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.Set; - -import javax.net.ssl.SSLContext; -import javax.servlet.DispatcherType; -import javax.ws.rs.core.Application; - import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; @@ -38,8 +28,8 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSync; import org.apache.zeppelin.rest.*; import org.apache.zeppelin.scheduler.SchedulerFactory; -import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.search.LuceneSearch; +import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.socket.NotebookServer; import org.eclipse.jetty.server.AbstractConnector; import org.eclipse.jetty.server.Handler; @@ -57,6 +47,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; +import javax.servlet.DispatcherType; +import javax.ws.rs.core.Application; +import java.io.File; +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Set; + /** * Main class of Zeppelin. * @@ -86,7 +85,7 @@ public ZeppelinServer() throws Exception { this.notebookRepo = new NotebookRepoSync(conf); this.notebookIndex = new LuceneSearch(); this.notebookAuthorization = new NotebookAuthorization(conf); - notebook = new Notebook(conf, + notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookWsServer, notebookIndex, notebookAuthorization); } @@ -235,6 +234,9 @@ private static ServletContextHandler setupRestApiContextHandler(ZeppelinConfigur cxfContext.addFilter(new FilterHolder(CorsFilter.class), "/*", EnumSet.allOf(DispatcherType.class)); + cxfContext.setInitParameter("shiroConfigLocations", + new File(conf.getShiroPath()).toURI().toString()); + cxfContext.addFilter(org.apache.shiro.web.servlet.ShiroFilter.class, "/*", EnumSet.allOf(DispatcherType.class)); diff --git a/zeppelin-web/bower.json b/zeppelin-web/bower.json index c5fc7ab9332..b5ec64ee442 100644 --- a/zeppelin-web/bower.json +++ b/zeppelin-web/bower.json @@ -44,6 +44,7 @@ "main": [ "src-noconflict/ace.js", "src-noconflict/mode-scala.js", + "src-noconflict/mode-python.js", "src-noconflict/mode-sql.js", "src-noconflict/mode-markdown.js", "src-noconflict/mode-sh.js", diff --git a/zeppelin-web/pom.xml b/zeppelin-web/pom.xml index 6fa62ae2d9b..2c46c76ed4d 100644 --- a/zeppelin-web/pom.xml +++ b/zeppelin-web/pom.xml @@ -95,8 +95,8 @@ install-node-and-npm - v0.10.22 - 1.3.8 + v0.12.13 + 2.15.0 diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 599ed901fa6..c71e096bb48 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -79,7 +79,8 @@ angular.module('zeppelinWebApp') var angularObjectRegistry = {}; var editorModes = { - 'ace/mode/scala': /^%spark/, + 'ace/mode/python': /^%(\w*\.)?pyspark\s*$/, + 'ace/mode/scala': /^%(\w*\.)?spark\s*$/, 'ace/mode/sql': /^%(\w*\.)?\wql/, 'ace/mode/markdown': /^%md/, 'ace/mode/sh': /^%sh/ @@ -167,7 +168,7 @@ angular.module('zeppelinWebApp') angular.element('#p' + $scope.paragraph.id + '_text').bind('mousewheel', function(e) { $scope.keepScrollDown = false; }); - + $scope.flushStreamingOutput = true; } else { $timeout(retryRenderer, 10); } @@ -445,13 +446,17 @@ angular.module('zeppelinWebApp') $scope.$on('appendParagraphOutput', function(event, data) { if ($scope.paragraph.id === data.paragraphId) { + if ($scope.flushStreamingOutput) { + $scope.clearTextOutput(); + $scope.flushStreamingOutput = false; + } $scope.appendTextOutput(data.data); } }); $scope.$on('updateParagraphOutput', function(event, data) { if ($scope.paragraph.id === data.paragraphId) { - $scope.clearTextOutput(data.data); + $scope.clearTextOutput(); $scope.appendTextOutput(data.data); } }); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java old mode 100755 new mode 100644 index eafbbbe224d..81e3ffad8aa --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -350,6 +350,10 @@ public String getNotebookAuthorizationPath() { return getRelativeDir(String.format("%s/notebook-authorization.json", getConfDir())); } + public String getShiroPath() { + return getRelativeDir(String.format("%s/shiro.ini", getConfDir())); + } + public String getInterpreterRemoteRunnerPath() { return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER); } @@ -451,6 +455,8 @@ public static enum ConfVars { ZEPPELIN_WAR_TEMPDIR("zeppelin.war.tempdir", "webapps"), ZEPPELIN_INTERPRETERS("zeppelin.interpreters", "org.apache.zeppelin.spark.SparkInterpreter," + "org.apache.zeppelin.spark.PySparkInterpreter," + + "org.apache.zeppelin.rinterpreter.RRepl," + + "org.apache.zeppelin.rinterpreter.KnitR," + "org.apache.zeppelin.spark.SparkRInterpreter," + "org.apache.zeppelin.spark.SparkSqlInterpreter," + "org.apache.zeppelin.spark.DepInterpreter," @@ -473,9 +479,7 @@ public static enum ConfVars { + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter," + "org.apache.zeppelin.scalding.ScaldingInterpreter," + "org.apache.zeppelin.jdbc.JDBCInterpreter," - + "org.apache.zeppelin.hbase.HbaseInterpreter," - + "org.apache.zeppelin.rinterpreter.RRepl," - + "org.apache.zeppelin.rinterpreter.KnitR"), + + "org.apache.zeppelin.hbase.HbaseInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), @@ -504,6 +508,8 @@ public static enum ConfVars { // i.e. http://localhost:8080 ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"), ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true), + ZEPPELIN_RESOURCE_POOL_CLASS("zeppelin.interpreter.resourcePoolClass", + "org.apacheorg.apache.zeppelin.resource.DistributedResourcePool"); ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"); private String varName; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 269b54a1fce..e060a45ab80 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -198,6 +198,8 @@ private void init() throws InterpreterException, IOException, RepositoryExceptio logger.info("Interpreter setting group {} : id={}, name={}", setting.getGroup(), settingId, setting.getName()); } + + } private void loadFromFile() throws IOException { @@ -234,16 +236,19 @@ private void loadFromFile() throws IOException { // enable/disable option on GUI). // previously created setting should turn this feature on here. setting.getOption().setRemote(true); - + + Properties mergedProperties = + this.getInterpreterPropertiesFromZeppelinConf(); + mergedProperties.putAll(setting.getProperties());; + InterpreterSetting intpSetting = new InterpreterSetting( setting.id(), setting.getName(), setting.getGroup(), setting.getInterpreterInfos(), - setting.getProperties(), + mergedProperties, setting.getDependencies(), setting.getOption()); - InterpreterGroup interpreterGroup = createInterpreterGroup(setting.id(), setting.getOption()); intpSetting.setInterpreterGroup(interpreterGroup); @@ -260,6 +265,16 @@ private void loadFromFile() throws IOException { } } } + + private Properties getInterpreterPropertiesFromZeppelinConf() { + Iterator keySet = this.conf.getKeys("zeppelin.interpreter"); + Properties p = new Properties(); + while (keySet.hasNext()) { + String key = keySet.next(); + p.setProperty(key, this.conf.getProperty(key).toString()); + } + return p; + } private void loadInterpreterDependencies(InterpreterSetting intSetting) throws IOException, RepositoryException { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index bb4d69b717f..5778d0da4ab 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.notebook; +import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.display.GUI; @@ -232,6 +233,12 @@ protected Object jobRun() throws Throwable { String scriptBody = getScriptBody(); Map inputs = Input.extractSimpleQueryParam(scriptBody); // inputs will be built // from script body + + final AngularObjectRegistry angularRegistry = repl.getInterpreterGroup() + .getAngularObjectRegistry(); + + scriptBody = extractVariablesFromAngularRegistry(scriptBody, inputs, angularRegistry); + settings.setForms(inputs); script = Input.getSimpleQuery(settings.getParams(), scriptBody); } @@ -390,4 +397,25 @@ public Object clone() throws CloneNotSupportedException { Paragraph paraClone = (Paragraph) this.clone(); return paraClone; } + + String extractVariablesFromAngularRegistry(String scriptBody, Map inputs, + AngularObjectRegistry angularRegistry) { + + final String noteId = this.getNote().getId(); + final String paragraphId = this.getId(); + + final Set keys = new HashSet<>(inputs.keySet()); + + for (String varName : keys) { + final AngularObject paragraphScoped = angularRegistry.get(varName, noteId, paragraphId); + final AngularObject noteScoped = angularRegistry.get(varName, noteId, null); + final AngularObject angularObject = paragraphScoped != null ? paragraphScoped : noteScoped; + if (angularObject != null) { + inputs.remove(varName); + final String pattern = "[$][{]\\s*" + varName + "\\s*(?:=[^}]+)?[}]"; + scriptBody = scriptBody.replaceAll(pattern, angularObject.get().toString()); + } + } + return scriptBody; + } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java new file mode 100644 index 00000000000..c2018588cf3 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.display; + +public class AngularObjectBuilder { + + public static AngularObject build(String varName, T value, String noteId, + String paragraphId) { + return new AngularObject<>(varName, value, noteId, paragraphId, null); + } +} \ No newline at end of file diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java index 87805cef133..a594873806d 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java @@ -17,9 +17,20 @@ package org.apache.zeppelin.notebook; -import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectBuilder; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.Input; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; public class ParagraphTest { @Test @@ -35,4 +46,43 @@ public void scriptBodyWithoutReplName() { String text = "12345678"; assertEquals(text, Paragraph.getScriptBody(text)); } + + @Test + public void should_extract_variable_from_angular_object_registry() throws Exception { + //Given + final String noteId = "noteId"; + + final AngularObjectRegistry registry = mock(AngularObjectRegistry.class); + final Note note = mock(Note.class); + final Map inputs = new HashMap<>(); + inputs.put("name", null); + inputs.put("age", null); + inputs.put("job", null); + + final String scriptBody = "My name is ${name} and I am ${age=20} years old. " + + "My occupation is ${ job = engineer | developer | artists}"; + + final Paragraph paragraph = new Paragraph(note, null, null); + final String paragraphId = paragraph.getId(); + + final AngularObject nameAO = AngularObjectBuilder.build("name", "DuyHai DOAN", noteId, + paragraphId); + + final AngularObject ageAO = AngularObjectBuilder.build("age", 34, noteId, null); + + when(note.getId()).thenReturn(noteId); + when(registry.get("name", noteId, paragraphId)).thenReturn(nameAO); + when(registry.get("age", noteId, null)).thenReturn(ageAO); + + final String expected = "My name is DuyHai DOAN and I am 34 years old. " + + "My occupation is ${ job = engineer | developer | artists}"; + //When + final String actual = paragraph.extractVariablesFromAngularRegistry(scriptBody, inputs, + registry); + + //Then + verify(registry).get("name", noteId, paragraphId); + verify(registry).get("age", noteId, null); + assertEquals(actual, expected); + } }