From cbdb22d309948eaa2d7562c66c3b8780a6581647 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 15 Mar 2016 19:11:20 -0400 Subject: [PATCH 01/37] Added serialization to resourcepoolutils. --- .../org/apache/zeppelin/resource/ResourcePoolUtils.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index ecd84d4c01f..9768bc11b25 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -18,6 +18,8 @@ package org.apache.zeppelin.resource; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; @@ -54,8 +56,11 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio boolean broken = false; try { client = remoteInterpreterProcess.getClient(); - List resourceList = client.resoucePoolGetAll(); - Gson gson = new Gson(); + List resourceList = client.resourcePoolGetAll(); + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); + Gson gson = gsonBuilder.create(); + for (String res : resourceList) { resourceSet.add(gson.fromJson(res, Resource.class)); } From ffd092b6e7df495ecd1f4e753e3e88678bae2ead Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 15 Mar 2016 19:06:35 -0400 Subject: [PATCH 02/37] Added pool persistance. --- .../apache/zeppelin/resource/Resource.java | 2 +- .../zeppelin/resource/ResourceSerializer.java | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java index 6988b3ea762..9e38d402302 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java @@ -23,7 +23,7 @@ * Information and reference to the resource */ public class Resource { - private final transient Object r; + private final Object r; private final boolean serializable; private final ResourceId resourceId; private final String className; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java new file mode 100644 index 00000000000..a0b35fa63cc --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -0,0 +1,46 @@ +package org.apache.zeppelin.resource; + +import java.lang.reflect.Type; + +import com.google.gson.Gson; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +/** + * Serializes and Deserializes resources if they are serializable. + */ +public class ResourceSerializer implements JsonDeserializer, JsonSerializer { + + public ResourceSerializer() { + } + + @Override + public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { + // This is straightforward at the moment. + return context.serialize(src); + } + + @Override + public Resource deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + // This requires that we use the class that's stored in the element to deserialize. + JsonObject obj = json.getAsJsonObject(); + String className = obj.getAsJsonPrimitive("className").getAsString(); + + Gson gson = new Gson(); + Object r; + try { + r = gson.fromJson(obj.get("r"), Class.forName(className)); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to deserialize the resource"); + } + ResourceId id = gson.fromJson(obj.get("resourceId"), ResourceId.class); + + return new Resource(id, r); + } + +} \ No newline at end of file From 84cabe2d02a7cf49a82f56d8ae8cbbecb6699aaf Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 16 Mar 2016 10:59:29 -0400 Subject: [PATCH 03/37] Added test. --- .../zeppelin/resource/ResourceSerializer.java | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java index a0b35fa63cc..4db1f8742c0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.resource; import java.lang.reflect.Type; @@ -21,7 +37,8 @@ public ResourceSerializer() { @Override public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { // This is straightforward at the moment. - return context.serialize(src); + Gson gson = new Gson(); + return gson.toJsonTree(src); } @Override @@ -43,4 +60,4 @@ public Resource deserialize(JsonElement json, Type typeOfT, JsonDeserializationC return new Resource(id, r); } -} \ No newline at end of file +} From f1f0d91dbce47a50d5f9737b052ee1f73cc04590 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 28 Mar 2016 12:14:36 -0400 Subject: [PATCH 04/37] Changed resource pool utils so that it will work anywhere. --- ...terpreterProcessResourcePoolConnector.java | 65 +++++++++++++++++++ .../apache/zeppelin/resource/Resource.java | 2 +- .../zeppelin/resource/ResourcePoolUtils.java | 12 ++-- 3 files changed, 72 insertions(+), 7 deletions(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java new file mode 100644 index 00000000000..7315af7c266 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.util.List; + +import org.apache.thrift.TException; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; + +import com.google.gson.Gson; +/** + * Makes a remote interpreter service client act as a resource pool connector. + */ +public class RemoteInterpreterProcessResourcePoolConnector implements ResourcePoolConnector { + + private Client client; + + public RemoteInterpreterProcessResourcePoolConnector(Client client) { + this.client = client; + } + + @Override + public ResourceSet getAllResources() { + try { + List resourceList = client.resourcePoolGetAll(); + ResourceSet resources = new ResourceSet(); + Gson gson = new Gson(); + + for (String res : resourceList) { + RemoteResource r = gson.fromJson(res, RemoteResource.class); + r.setResourcePoolConnector(this); + resources.add(r); + } + + return resources; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object readResource(ResourceId id) { + try { + // TODO(Object): Deserialize object + return client.resourceGet(id.getNoteId(), id.getParagraphId(), id.getName()); + } catch (TException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java index 9e38d402302..6988b3ea762 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java @@ -23,7 +23,7 @@ * Information and reference to the resource */ public class Resource { - private final Object r; + private final transient Object r; private final boolean serializable; private final ResourceId resourceId; private final String className; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 678b0cc7033..3aadffb8524 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -56,13 +56,13 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio boolean broken = false; try { client = remoteInterpreterProcess.getClient(); - List resourceList = client.resourcePoolGetAll(); - GsonBuilder gsonBuilder = new GsonBuilder(); - gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); - Gson gson = gsonBuilder.create(); + RemoteInterpreterProcessResourcePoolConnector remoteConnector = + new RemoteInterpreterProcessResourcePoolConnector(client); + //List resourceList = client.resourcePoolGetAll(); + //gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); - for (String res : resourceList) { - resourceSet.add(gson.fromJson(res, Resource.class)); + for (Resource r: remoteConnector.getAllResources()) { + resourceSet.add(r); } } catch (Exception e) { logger.error(e.getMessage(), e); From a19c48e124cf45b6287656133bd4dca26dcce002 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 28 Mar 2016 13:42:00 -0400 Subject: [PATCH 05/37] Removed unnecessary serialization. --- .../java/org/apache/zeppelin/resource/ResourcePoolUtils.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 3aadffb8524..4eb1d6a8896 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -58,8 +58,6 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio client = remoteInterpreterProcess.getClient(); RemoteInterpreterProcessResourcePoolConnector remoteConnector = new RemoteInterpreterProcessResourcePoolConnector(client); - //List resourceList = client.resourcePoolGetAll(); - //gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); for (Resource r: remoteConnector.getAllResources()) { resourceSet.add(r); From 8fa84a832200351465793f83aafe8d1c0c3c3573 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 28 Mar 2016 17:02:18 -0400 Subject: [PATCH 06/37] Added explicit save to the serializer. --- .../org/apache/zeppelin/resource/ResourceSerializer.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java index 4db1f8742c0..887f3078525 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -38,7 +38,12 @@ public ResourceSerializer() { public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { // This is straightforward at the moment. Gson gson = new Gson(); - return gson.toJsonTree(src); + JsonElement elem = gson.toJsonTree(src); + JsonObject obj = elem.getAsJsonObject(); + if (src.isSerializable()) { + obj.add("r", gson.toJsonTree(src.get())); + } + return obj; } @Override From 1f2d2ec3306fa0b6dd2ffe907b7e5566cd0fc391 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 21 Mar 2016 15:46:03 -0400 Subject: [PATCH 07/37] Added vfs resource pool. --- .../zeppelin/resource/VFSResourcePool.java | 218 ++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java new file mode 100644 index 00000000000..ce0a01a5504 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -0,0 +1,218 @@ +package org.apache.zeppelin.resource; + +import org.slf4j.Logger; +import org.apache.zeppelin.conf.ZeppelinConfiguration; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; + +import javax.management.RuntimeErrorException; + +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSelectInfo; +import org.apache.commons.vfs2.FileSelector; +import org.apache.commons.vfs2.FileSystemException; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.commons.vfs2.FileType; +import org.apache.commons.vfs2.NameScope; +import org.apache.commons.vfs2.VFS; + +/** + * Resource pool that saves resources to the local file system. + * + */ +public class VFSResourcePool extends DistributedResourcePool { + Logger logger = LoggerFactory.getLogger(VFSResourcePool.class); + + + @Override + public void put(String name, Object object) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists()) { + resultDir.createFolder(); + } + + if (!isDirectory(resultDir)) { + throw new IOException(resultDir.getName().toString() + " is not a directory"); + } + Gson gson = new Gson(); + + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + // false means not appending. creates file if not exists + OutputStream out = resultFile.getContent().getOutputStream(false); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(object); + out.close(); + } catch (IOException e) { + super.put(name, object); + throw new RuntimeException(e); + } + } + + @Override + public Resource get(String name) { + return get(name, true); + } + + @Override + public Resource get(String noteId, String paragraphId, String name) { + Resource r = get(noteId, paragraphId, name, true); + return new Resource( + new ResourceId(this.id(), noteId, paragraphId, name), r.get()); + } + + @Override + public Resource get(String name, boolean remote) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists() || !isDirectory(resultDir)) + return null; + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + + InputStream instream = resultFile.getContent().getInputStream(); + ObjectInputStream ois = new ObjectInputStream(instream); + + try { + Object o = ois.readObject(); + return new Resource(new ResourceId(this.id(), name), o); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + } catch (IOException e) { + } + return super.get(name, remote); + } + + @Override + public Resource get(String noteId, String paragraphId, String name, boolean remote) { + return get(noteId + "___" + paragraphId + "___" + name, remote); + } + + @Override + public ResourceSet getAll() { + return super.getAll(true); + } + + @Override + public ResourceSet getAll(boolean remote) { + return super.getAll(remote); + } + + @Override + public void put(String noteId, String paragraphId, String name, Object object) { + put(noteId + "___" + paragraphId + "___name", object); + } + + @Override + public Resource remove(String name) { + try { + Resource r = get(name); + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (resultDir.exists()) { + resultDir.delete(new FileSelector() { + @Override + public boolean traverseDescendents(FileSelectInfo fileInfo) throws Exception { + return true; + } + + @Override + public boolean includeFile(FileSelectInfo fileInfo) throws Exception { + return true; + } + }); + } + return r; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Resource remove(String noteId, String paragraphId, String name) { + return remove(noteId + "___" + paragraphId + "___" + name); + } + + private FileSystemManager fsManager; + private URI filesystemRoot; + public VFSResourcePool(String id, ResourcePoolConnector connector, Properties property) { + super(id, connector, property); + try { + this.filesystemRoot = new + URI(property.getProperty("Resource_Path", "notebook/zeppelin_resources")); + } catch (URISyntaxException e1) { + throw new RuntimeException(e1); + } + + if (filesystemRoot.getScheme() == null) { // it is local path + try { + this.filesystemRoot = new URI(new File( filesystemRoot.getPath()).getAbsolutePath()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } else { + this.filesystemRoot = filesystemRoot; + } + + try { + fsManager = VFS.getManager(); + FileObject file; + file = fsManager.resolveFile(filesystemRoot.getPath()); + if (!file.exists()) { + logger.info("Notebook dir doesn't exist, create."); + file.createFolder(); + } + } catch (FileSystemException e) { + throw new RuntimeException("Unable to load new file system."); + } + } + + private FileObject getRootDir() throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + // Does nothing if the folder already exists. + rootDir.createFolder(); + + + if (!isDirectory(rootDir)) { + throw new IOException("Root path is not a directory"); + } + + return rootDir; + } + + private boolean isDirectory(FileObject fo) throws IOException { + if (fo == null) return false; + if (fo.getType() == FileType.FOLDER) { + return true; + } else { + return false; + } + } + + private String getPath(String path) { + if (path == null || path.trim().length() == 0) { + return filesystemRoot.toString(); + } + if (path.startsWith("/")) { + return filesystemRoot.toString() + path; + } else { + return filesystemRoot.toString() + "/" + path; + } + } +} From 5ab296206c79136a06267b90e5d0758de3eacb95 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 5 Apr 2016 16:37:40 -0400 Subject: [PATCH 08/37] Added pluggable resource pool. --- zeppelin-interpreter/pom.xml | 12 ++++- .../remote/RemoteInterpreterServer.java | 51 +++++++++++++++---- .../resource/DistributedResourcePool.java | 6 ++- .../zeppelin/resource/VFSResourcePool.java | 33 +++++++++--- 4 files changed, 85 insertions(+), 17 deletions(-) diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 67b4d5fbc9a..da1f8d023ef 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -214,8 +214,18 @@ + + org.apache.commons + commons-vfs2 + 2.0 + + + plexus-utils + org.codehaus.plexus + + + - diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 84477eae8da..4dd55943e22 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -134,17 +134,46 @@ public static void main(String[] args) System.exit(0); } + private DistributedResourcePool getResourcePool() + /* InterpreterGroup group, + Properties prop, + RemoteInterpreterEventClient client) */ + throws TException { + if (resourcePool != null) + return resourcePool; + try { + String resourcePoolClassName = (String) interpreterGroup.getProperty() + .getOrDefault("ResourcePoolClass", + "org.apache.zeppelin.resource.DistributedResourcePool"); + logger.debug("Getting resource pool {}", resourcePoolClassName); + Class resourcePoolClass = Class.forName(resourcePoolClassName); + + Constructor constructor = resourcePoolClass + .getConstructor(new Class[] {String.class, + ResourcePoolConnector.class, + Properties.class }); + resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(), + this.eventClient, + interpreterGroup.getProperty()); + interpreterGroup.setResourcePool(resourcePool); + return resourcePool; + } catch (SecurityException | NoSuchMethodException | + InstantiationException | IllegalAccessException | + IllegalArgumentException | InvocationTargetException | + ClassNotFoundException e) { + logger.error(e.toString(), e); + throw new TException(e); + } + } @Override public void createInterpreter(String interpreterGroupId, String noteId, String className, - Map properties) throws TException { + Map properties) throws TException { if (interpreterGroup == null) { interpreterGroup = new InterpreterGroup(interpreterGroupId); angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this); - resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient); interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); - interpreterGroup.setResourcePool(resourcePool); } try { @@ -170,7 +199,12 @@ public void createInterpreter(String interpreterGroupId, String noteId, String } logger.info("Instantiate interpreter {}", className); + + interpreterGroup.setResourcePool(getResourcePool()); + repl.setInterpreterGroup(interpreterGroup); + + //setResourcePool(interpreterGroup, p, eventClient); } catch (ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { @@ -178,7 +212,6 @@ public void createInterpreter(String interpreterGroupId, String noteId, String throw new TException(e); } } - private Interpreter getInterpreter(String noteId, String className) throws TException { if (interpreterGroup == null) { throw new TException( @@ -388,7 +421,7 @@ public void cancel(String noteId, String className, RemoteInterpreterContext int @Override public int getProgress(String noteId, String className, - RemoteInterpreterContext interpreterContext) + RemoteInterpreterContext interpreterContext) throws TException { Interpreter intp = getInterpreter(noteId, className); return intp.getProgress(convert(interpreterContext)); @@ -411,7 +444,7 @@ public List completion(String noteId, String className, String buf, int private InterpreterContext convert(RemoteInterpreterContext ric) { List contextRunners = new LinkedList(); List runners = gson.fromJson(ric.getRunners(), - new TypeToken>() { + new TypeToken>() { }.getType()); for (InterpreterContextRunner r : runners) { @@ -572,7 +605,7 @@ public void angularObjectUpdate(String name, String noteId, String paragraphId, if (value == null) { try { value = gson.fromJson(object, - new TypeToken>() { + new TypeToken>() { }.getType()); } catch (Exception e) { // it's not a generic json object, too. okay, proceed to threat as a string type @@ -608,7 +641,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str try { value = gson.fromJson(object, new TypeToken>() { - }.getType()); + }.getType()); } catch (Exception e) { // it's okay. proceed to treat object as a string logger.debug(e.getMessage(), e); @@ -624,7 +657,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str @Override public void angularObjectRemove(String name, String noteId, String paragraphId) throws - TException { + TException { AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); registry.remove(name, noteId, paragraphId, false); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java index ba31f017b83..57cef050f5f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java @@ -16,16 +16,20 @@ */ package org.apache.zeppelin.resource; +import java.util.Properties; + /** * distributed resource pool */ public class DistributedResourcePool extends LocalResourcePool { private final ResourcePoolConnector connector; + protected Properties property; - public DistributedResourcePool(String id, ResourcePoolConnector connector) { + public DistributedResourcePool(String id, ResourcePoolConnector connector, Properties property) { super(id); this.connector = connector; + this.property = property; } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java index ce0a01a5504..045c8261820 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -1,7 +1,7 @@ + package org.apache.zeppelin.resource; import org.slf4j.Logger; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import java.io.File; import java.io.IOException; @@ -89,7 +89,16 @@ public Resource get(String name, boolean remote) { try { Object o = ois.readObject(); - return new Resource(new ResourceId(this.id(), name), o); + String[] splitName = name.split("___"); + ResourceId r = null; + if (splitName.length == 3) + { + r = new ResourceId(this.id(), splitName[0], splitName[1], splitName[2]); + } + else { + r = new ResourceId(this.id(), name); + } + return new Resource(r, o); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } @@ -106,12 +115,26 @@ public Resource get(String noteId, String paragraphId, String name, boolean remo @Override public ResourceSet getAll() { - return super.getAll(true); + return getAll(true); } @Override public ResourceSet getAll(boolean remote) { - return super.getAll(remote); + ResourceSet resources = new ResourceSet(); + try { + FileObject rootDir = getRootDir(); + for (FileObject resourceDir: rootDir.getChildren()) + { + if (resourceDir.getType() == FileType.FOLDER) + resources.add(get(resourceDir.getName().getBaseName())); + } + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + if (remote) + resources.addAll(super.getAll(remote)); + return resources; } @Override @@ -187,8 +210,6 @@ private FileObject getRootDir() throws IOException { FileObject rootDir = fsManager.resolveFile(getPath("/")); // Does nothing if the folder already exists. rootDir.createFolder(); - - if (!isDirectory(rootDir)) { throw new IOException("Root path is not a directory"); } From 47e0ee5978d897234173711cf54b119eb32d0140 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 09:00:53 -0400 Subject: [PATCH 09/37] Added missing rename. --- .../main/java/org/apache/zeppelin/resource/VFSResourcePool.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {zeppelin-zengine => zeppelin-interpreter}/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java (100%) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java similarity index 100% rename from zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java From 3c0e0c2736bf91a2f99b38a3c343046e676e0e75 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 10:44:43 -0400 Subject: [PATCH 10/37] Added an API to access resource pools. --- .../zeppelin/rest/NoteRestSerializer.java | 84 +++++++++++++++++++ .../apache/zeppelin/rest/NotebookRestApi.java | 58 +++++++++++++ .../apache/zeppelin/server/JsonResponse.java | 19 ++++- .../apache/zeppelin/notebook/Paragraph.java | 21 +++++ 4 files changed, 178 insertions(+), 4 deletions(-) create mode 100644 zeppelin-server/src/main/java/org/apache/zeppelin/rest/NoteRestSerializer.java diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NoteRestSerializer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NoteRestSerializer.java new file mode 100644 index 00000000000..d1dc37fb2aa --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NoteRestSerializer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.rest; + +import java.lang.reflect.Type; + +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourceSet; +import org.apache.zeppelin.scheduler.Job; + +import com.google.gson.ExclusionStrategy; +import com.google.gson.FieldAttributes; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +/** + * Responsible for passing along minimal information for a REST API to consume notes. + * + */ +public class NoteRestSerializer implements JsonSerializer { + + ResourceSet resources; + + @Override + public JsonElement serialize(Note src, Type typeOfSrc, JsonSerializationContext context) { + JsonObject obj = new JsonObject(); + obj.addProperty("id", src.getId()); + obj.addProperty("name", src.getName()); + GsonBuilder builder = new GsonBuilder(); + // We don't serialize paragraph results in this case. + builder.addSerializationExclusionStrategy(new ExclusionStrategy() { + @Override + public boolean shouldSkipField(FieldAttributes f) { + return (Job.class.isAssignableFrom(f.getDeclaringClass())) && + (f.getName().equals("config") || + f.getName().equals("settings") || + f.getName().equals("result") || + f.getName().equals("jobName")); + } + + @Override + public boolean shouldSkipClass(Class clazz) { + return false; + } + }); + Gson gson = builder.create(); + JsonArray paragraphs = new JsonArray(); + for (Resource r: resources) { + for (Paragraph p: src.getParagraphs()) { + if (r.getResourceId().getParagraphId().equals(p.getId())) { + paragraphs.add(gson.toJsonTree(p).getAsJsonObject()); + + } + } + } + obj.add("paragraphs", paragraphs); + return obj; + } + + public NoteRestSerializer(ResourceSet resources) { + this.resources = resources; + } +} diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 2796500ac86..962a54575ba 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -29,6 +29,7 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.Status; import org.apache.commons.lang3.StringUtils; @@ -37,6 +38,9 @@ import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.NotebookAuthorization; import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourcePoolUtils; +import org.apache.zeppelin.resource.ResourceSet; import org.apache.zeppelin.rest.message.CronRequest; import org.apache.zeppelin.rest.message.InterpreterSettingListForNoteBind; import org.apache.zeppelin.rest.message.NewNotebookRequest; @@ -654,5 +658,59 @@ public Response search(@QueryParam("q") String queryTerm) { LOG.info("{} notbooks found", notebooksFound.size()); return new JsonResponse<>(Status.OK, notebooksFound).build(); } + + /** + * Get paragraphs that have a resource pool and what note they came from. + * @param + * @return JSON with status.OK + */ + @GET + @Path("/results") + public Response getParagraphsWithResults() + throws IOException { + LOG.info("Getting paragraphs from all notes"); + ResourceSet resources = ResourcePoolUtils.getAllResources(); + GsonBuilder builder = new GsonBuilder(); + builder.registerTypeAdapter(Note.class, new NoteRestSerializer(resources)); + List notes = new ArrayList(); + for (Resource r: resources) { + notes.add(notebook.getNote(r.getResourceId().getNoteId())); + } + + // Want to control how the element is built, but not return it as a + // singleton of type element. + // If we add this as a JSON Array rather than including it with the builder, + // then it adds a pesky "element" field to the middle of the result. + return new JsonResponse<>(Status.OK, "", notes, builder).build(); + } + + /** + * Get paragraph result REST API + * @param + * @return JSON with status.OK + * @throws IOException + */ + @GET + @Path("{notebookId}/paragraph/{paragraphId}/result") + public Response getParagraphResult(@PathParam("notebookId") String notebookId, + @PathParam("paragraphId") String paragraphId) + throws IOException { + LOG.info("Downloading paragraph {} {}", notebookId, paragraphId); + + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + Paragraph p = note.getParagraph(paragraphId); + if (p == null) { + return new JsonResponse(Status.NOT_FOUND, "paragraph not found.").build(); + } + + ResponseBuilder builder = Response.ok(p.getResultFromPool().message()) + .header("Content-Disposition", "attachemnt; filename=" + paragraphId + ".txt"); + + return builder.build(); + } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java index 887d42ac631..8cbeb6735c1 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java @@ -38,6 +38,7 @@ public class JsonResponse { private javax.ws.rs.core.Response.Status status; private String message; private T body; + private transient GsonBuilder builder; transient ArrayList cookies; transient boolean pretty = false; @@ -58,18 +59,28 @@ public JsonResponse(javax.ws.rs.core.Response.Status status, T body) { this.status = status; this.message = null; this.body = body; + this.builder = new GsonBuilder(); } public JsonResponse(javax.ws.rs.core.Response.Status status, String message, T body) { this.status = status; this.message = message; this.body = body; + this.builder = new GsonBuilder(); } public JsonResponse setPretty(boolean pretty) { this.pretty = pretty; return this; } + + public JsonResponse(javax.ws.rs.core.Response.Status status, String message, T body, + GsonBuilder builder) { + this.status = status; + this.message = message; + this.body = body; + this.builder = builder; + } /** * Add cookie for building. @@ -99,14 +110,14 @@ public JsonResponse addCookie(String name, String value) { @Override public String toString() { - GsonBuilder gsonBuilder = new GsonBuilder().registerTypeAdapter( + builder.registerTypeAdapter( InterpreterSetting.InterpreterInfo.class, new InterpreterInfoSerializer()); if (pretty) { - gsonBuilder.setPrettyPrinting(); + builder.setPrettyPrinting(); } - gsonBuilder.setExclusionStrategies(new JsonExclusionStrategy()); - Gson gson = gsonBuilder.create(); + builder.setExclusionStrategies(new JsonExclusionStrategy()); + Gson gson = builder.create(); return gson.toJson(this); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 09c9026c905..0a3a80793a8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -24,7 +24,10 @@ import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.Interpreter.FormType; import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; +import org.apache.zeppelin.resource.ResourcePoolUtils; +import org.apache.zeppelin.resource.ResourceSet; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.scheduler.Scheduler; @@ -181,6 +184,24 @@ public void setNoteReplLoader(NoteInterpreterLoader repls) { this.replLoader = repls; } + /** + * Gets the first resource for this paragraph that responds to the interpreter result class. + */ + public InterpreterResult getResultFromPool() { + ResourceSet resources = ResourcePoolUtils.getAllResources() + .filterByParagraphId(this.getId()).filterByNoteId(this.getNote().getId()); + if (resources.size() > 0) + { + for (Resource r: resources) + { + if (InterpreterResult.class.isAssignableFrom(r.get().getClass())) + return (InterpreterResult) r.get(); + } + } + return null; + } + + public InterpreterResult getResult() { return (InterpreterResult) getReturn(); } From cb4c167caddf1c36bef629706796f3a407cfe7f0 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 11:11:49 -0400 Subject: [PATCH 11/37] Added a constructor to DistributedResourcePools to fix tests. --- .../apache/zeppelin/resource/DistributedResourcePool.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java index 57cef050f5f..12292f202ad 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java @@ -25,6 +25,13 @@ public class DistributedResourcePool extends LocalResourcePool { private final ResourcePoolConnector connector; protected Properties property; + + public DistributedResourcePool(String id, ResourcePoolConnector connector) { + super(id); + this.connector = connector; + this.property = new Properties(); + } + public DistributedResourcePool(String id, ResourcePoolConnector connector, Properties property) { super(id); From cf658e36372c9bffbf5bcdf749fbec713f85183f Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 12:09:08 -0400 Subject: [PATCH 12/37] Switched to java 7 method for java.util.properties. --- .../zeppelin/interpreter/remote/RemoteInterpreterServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 4dd55943e22..ea644ead605 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -143,7 +143,7 @@ private DistributedResourcePool getResourcePool() return resourcePool; try { String resourcePoolClassName = (String) interpreterGroup.getProperty() - .getOrDefault("ResourcePoolClass", + .getProperty("ResourcePoolClass", "org.apache.zeppelin.resource.DistributedResourcePool"); logger.debug("Getting resource pool {}", resourcePoolClassName); Class resourcePoolClass = Class.forName(resourcePoolClassName); From cb5519a0a2f582db6885b312b0fce009e0e27b62 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 13:47:02 -0400 Subject: [PATCH 13/37] Removed the need for resource pools during tests. --- .../remote/RemoteInterpreterServer.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index ea644ead605..dc722bac998 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -157,12 +157,10 @@ private DistributedResourcePool getResourcePool() interpreterGroup.getProperty()); interpreterGroup.setResourcePool(resourcePool); return resourcePool; - } catch (SecurityException | NoSuchMethodException | - InstantiationException | IllegalAccessException | - IllegalArgumentException | InvocationTargetException | - ClassNotFoundException e) { + } catch (Exception e) { logger.error(e.toString(), e); - throw new TException(e); + return null; + // throw new TException(e); } } @@ -386,11 +384,13 @@ protected Object jobRun() throws Throwable { } // put result into resource pool - context.getResourcePool().put( - context.getNoteId(), - context.getParagraphId(), - WellKnownResourceName.ParagraphResult.toString(), - combinedResult); + if (context.getResourcePool() != null) { + context.getResourcePool().put( + context.getNoteId(), + context.getParagraphId(), + WellKnownResourceName.ParagraphResult.toString(), + combinedResult); + } return combinedResult; } finally { InterpreterContext.remove(); From c0cd60de396a82e9daec50633c045a775312438e Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 14:37:12 -0400 Subject: [PATCH 14/37] Adding missing license file. --- .../zeppelin/resource/VFSResourcePool.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java index 045c8261820..856b03a15b7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -1,4 +1,19 @@ - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.resource; import org.slf4j.Logger; From f3fb7197ec621d41fe902ba015c85f76992fcf86 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 11 Apr 2016 11:03:14 -0400 Subject: [PATCH 15/37] Set mock interpreter to register properly. --- .../interpreter/remote/RemoteInterpreterServer.java | 13 ++++++++----- .../remote/mock/MockInterpreterResourcePool.java | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index dc722bac998..38af5b6e3b0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -142,24 +142,27 @@ private DistributedResourcePool getResourcePool() if (resourcePool != null) return resourcePool; try { - String resourcePoolClassName = (String) interpreterGroup.getProperty() - .getProperty("ResourcePoolClass", + Properties prop = interpreterGroup.getProperty(); + //Happens during tests. + if (prop == null) + prop = new Properties(); + String resourcePoolClassName = (String) prop.getProperty("ResourcePoolClass", "org.apache.zeppelin.resource.DistributedResourcePool"); logger.debug("Getting resource pool {}", resourcePoolClassName); Class resourcePoolClass = Class.forName(resourcePoolClassName); - + Constructor constructor = resourcePoolClass .getConstructor(new Class[] {String.class, ResourcePoolConnector.class, Properties.class }); resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(), this.eventClient, - interpreterGroup.getProperty()); + prop); interpreterGroup.setResourcePool(resourcePool); return resourcePool; } catch (Exception e) { logger.error(e.toString(), e); - return null; + return new DistributedResourcePool(interpreterGroup.getId(), this.eventClient); // throw new TException(e); } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java index 3826b903115..8e408da58bd 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -37,7 +37,7 @@ public class MockInterpreterResourcePool extends Interpreter { Interpreter.register( "resourcePoolTest", "resourcePool", - MockInterpreterA.class.getName(), + MockInterpreterResourcePool.class.getName(), new InterpreterPropertyBuilder() .add("p1", "v1", "property1").build()); From 8e2fe6af6586311d1a00db318cc9cb6db46b0ce8 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 11 Apr 2016 15:43:44 -0400 Subject: [PATCH 16/37] Fixed typo and addressed issues with resource pool connector. --- ...teInterpreterProcessResourcePoolConnector.java | 15 +++++++++++---- .../org/apache/zeppelin/rest/NotebookRestApi.java | 2 +- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java index 7315af7c266..a093ab77d45 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java @@ -16,6 +16,8 @@ */ package org.apache.zeppelin.resource; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import org.apache.thrift.TException; @@ -28,6 +30,7 @@ public class RemoteInterpreterProcessResourcePoolConnector implements ResourcePoolConnector { private Client client; + private ResourceSet resources; public RemoteInterpreterProcessResourcePoolConnector(Client client) { this.client = client; @@ -37,7 +40,7 @@ public RemoteInterpreterProcessResourcePoolConnector(Client client) { public ResourceSet getAllResources() { try { List resourceList = client.resourcePoolGetAll(); - ResourceSet resources = new ResourceSet(); + resources = new ResourceSet(); Gson gson = new Gson(); for (String res : resourceList) { @@ -55,9 +58,13 @@ public ResourceSet getAllResources() { @Override public Object readResource(ResourceId id) { try { - // TODO(Object): Deserialize object - return client.resourceGet(id.getNoteId(), id.getParagraphId(), id.getName()); - } catch (TException e) { + ByteBuffer buffer = client.resourceGet(id.getNoteId(), id.getParagraphId(), id.getName()); + for (Resource r: resources) { + if (r.getResourceId().equals(id)) + return r.deserializeObject(buffer); + } + return null; + } catch (TException | ClassNotFoundException | IOException e) { throw new RuntimeException(e); } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 962a54575ba..9ac3c3b7d51 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -708,7 +708,7 @@ public Response getParagraphResult(@PathParam("notebookId") String notebookId, } ResponseBuilder builder = Response.ok(p.getResultFromPool().message()) - .header("Content-Disposition", "attachemnt; filename=" + paragraphId + ".txt"); + .header("Content-Disposition", "attachment; filename=" + paragraphId + ".txt"); return builder.build(); } From e422d07e2e9d5d859f2dcf0466b9d4b75150d9ad Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 12 Apr 2016 15:44:37 -0400 Subject: [PATCH 17/37] Removed requirement for spark dependency from abstract test rest api. --- .../org/apache/zeppelin/rest/AbstractTestRestApi.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 81f161af4f6..7d48c5e56ec 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -99,10 +99,10 @@ protected static void startUp() throws Exception { if (!wasRunning) { LOG.info("Staring test Zeppelin up..."); executor = Executors.newSingleThreadExecutor(); - executor.submit(server); + executor.execute(server); long s = System.currentTimeMillis(); boolean started = false; - while (System.currentTimeMillis() - s < 1000 * 60 * 3) { // 3 minutes + while (System.currentTimeMillis() - s < 1000 * 60 * 5) { // 3 minutes Thread.sleep(2000); started = checkIfServerIsRunning(); if (started == true) { @@ -149,8 +149,11 @@ protected static void startUp() throws Exception { sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome); pySpark = true; } - - ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id()); + // Only do this if we find a spark interpreter. + // Not all tests that use this code depend on spark. + if(sparkIntpSetting != null) { + ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id()); + } } } } From 1c8c53ccd17d5871e8bf168b0f5c11aec45283cc Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 12 Apr 2016 16:07:50 -0400 Subject: [PATCH 18/37] Fixed error with the JSON response. --- .../src/main/java/org/apache/zeppelin/server/JsonResponse.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java index 8cbeb6735c1..88ed0b9f993 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java @@ -46,13 +46,14 @@ public JsonResponse(javax.ws.rs.core.Response.Status status) { this.status = status; this.message = null; this.body = null; - + this.builder = new GsonBuilder(); } public JsonResponse(javax.ws.rs.core.Response.Status status, String message) { this.status = status; this.message = message; this.body = null; + this.builder = new GsonBuilder(); } public JsonResponse(javax.ws.rs.core.Response.Status status, T body) { From 43fa7b9f277c81eeb55843b6ceea25788073fddc Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 15 Mar 2016 19:11:20 -0400 Subject: [PATCH 19/37] Added serialization to resourcepoolutils. --- .../org/apache/zeppelin/resource/ResourcePoolUtils.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 1825bfed217..678b0cc7033 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -18,6 +18,8 @@ package org.apache.zeppelin.resource; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; @@ -55,7 +57,10 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio try { client = remoteInterpreterProcess.getClient(); List resourceList = client.resourcePoolGetAll(); - Gson gson = new Gson(); + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); + Gson gson = gsonBuilder.create(); + for (String res : resourceList) { resourceSet.add(gson.fromJson(res, Resource.class)); } From 5de57565fac57465618995dc656a1d0b8802b002 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 15 Mar 2016 19:06:35 -0400 Subject: [PATCH 20/37] Added pool persistance. --- .../apache/zeppelin/resource/Resource.java | 2 +- .../zeppelin/resource/ResourceSerializer.java | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java index 6988b3ea762..9e38d402302 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java @@ -23,7 +23,7 @@ * Information and reference to the resource */ public class Resource { - private final transient Object r; + private final Object r; private final boolean serializable; private final ResourceId resourceId; private final String className; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java new file mode 100644 index 00000000000..a0b35fa63cc --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -0,0 +1,46 @@ +package org.apache.zeppelin.resource; + +import java.lang.reflect.Type; + +import com.google.gson.Gson; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +/** + * Serializes and Deserializes resources if they are serializable. + */ +public class ResourceSerializer implements JsonDeserializer, JsonSerializer { + + public ResourceSerializer() { + } + + @Override + public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { + // This is straightforward at the moment. + return context.serialize(src); + } + + @Override + public Resource deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + // This requires that we use the class that's stored in the element to deserialize. + JsonObject obj = json.getAsJsonObject(); + String className = obj.getAsJsonPrimitive("className").getAsString(); + + Gson gson = new Gson(); + Object r; + try { + r = gson.fromJson(obj.get("r"), Class.forName(className)); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to deserialize the resource"); + } + ResourceId id = gson.fromJson(obj.get("resourceId"), ResourceId.class); + + return new Resource(id, r); + } + +} \ No newline at end of file From e633c444cee7cb3938dc591f03d0250063cc6b73 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 16 Mar 2016 10:59:29 -0400 Subject: [PATCH 21/37] Added test. --- .../zeppelin/resource/ResourceSerializer.java | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java index a0b35fa63cc..4db1f8742c0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.resource; import java.lang.reflect.Type; @@ -21,7 +37,8 @@ public ResourceSerializer() { @Override public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { // This is straightforward at the moment. - return context.serialize(src); + Gson gson = new Gson(); + return gson.toJsonTree(src); } @Override @@ -43,4 +60,4 @@ public Resource deserialize(JsonElement json, Type typeOfT, JsonDeserializationC return new Resource(id, r); } -} \ No newline at end of file +} From 71609fae4d3476c19d23596ccd044c826fa3c7a9 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 28 Mar 2016 12:14:36 -0400 Subject: [PATCH 22/37] Changed resource pool utils so that it will work anywhere. --- ...terpreterProcessResourcePoolConnector.java | 65 +++++++++++++++++++ .../apache/zeppelin/resource/Resource.java | 2 +- .../zeppelin/resource/ResourcePoolUtils.java | 12 ++-- 3 files changed, 72 insertions(+), 7 deletions(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java new file mode 100644 index 00000000000..7315af7c266 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.util.List; + +import org.apache.thrift.TException; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; + +import com.google.gson.Gson; +/** + * Makes a remote interpreter service client act as a resource pool connector. + */ +public class RemoteInterpreterProcessResourcePoolConnector implements ResourcePoolConnector { + + private Client client; + + public RemoteInterpreterProcessResourcePoolConnector(Client client) { + this.client = client; + } + + @Override + public ResourceSet getAllResources() { + try { + List resourceList = client.resourcePoolGetAll(); + ResourceSet resources = new ResourceSet(); + Gson gson = new Gson(); + + for (String res : resourceList) { + RemoteResource r = gson.fromJson(res, RemoteResource.class); + r.setResourcePoolConnector(this); + resources.add(r); + } + + return resources; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object readResource(ResourceId id) { + try { + // TODO(Object): Deserialize object + return client.resourceGet(id.getNoteId(), id.getParagraphId(), id.getName()); + } catch (TException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java index 9e38d402302..6988b3ea762 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java @@ -23,7 +23,7 @@ * Information and reference to the resource */ public class Resource { - private final Object r; + private final transient Object r; private final boolean serializable; private final ResourceId resourceId; private final String className; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 678b0cc7033..3aadffb8524 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -56,13 +56,13 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio boolean broken = false; try { client = remoteInterpreterProcess.getClient(); - List resourceList = client.resourcePoolGetAll(); - GsonBuilder gsonBuilder = new GsonBuilder(); - gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); - Gson gson = gsonBuilder.create(); + RemoteInterpreterProcessResourcePoolConnector remoteConnector = + new RemoteInterpreterProcessResourcePoolConnector(client); + //List resourceList = client.resourcePoolGetAll(); + //gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); - for (String res : resourceList) { - resourceSet.add(gson.fromJson(res, Resource.class)); + for (Resource r: remoteConnector.getAllResources()) { + resourceSet.add(r); } } catch (Exception e) { logger.error(e.getMessage(), e); From 6a22e3bbf819a9102292f2bd7630e468cd9b2ab9 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 28 Mar 2016 13:42:00 -0400 Subject: [PATCH 23/37] Removed unnecessary serialization. --- .../java/org/apache/zeppelin/resource/ResourcePoolUtils.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 3aadffb8524..4eb1d6a8896 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -58,8 +58,6 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio client = remoteInterpreterProcess.getClient(); RemoteInterpreterProcessResourcePoolConnector remoteConnector = new RemoteInterpreterProcessResourcePoolConnector(client); - //List resourceList = client.resourcePoolGetAll(); - //gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); for (Resource r: remoteConnector.getAllResources()) { resourceSet.add(r); From 17fed3d52f1cf74a56913ff661b3884e1d30296b Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 28 Mar 2016 17:02:18 -0400 Subject: [PATCH 24/37] Added explicit save to the serializer. --- .../org/apache/zeppelin/resource/ResourceSerializer.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java index 4db1f8742c0..887f3078525 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -38,7 +38,12 @@ public ResourceSerializer() { public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { // This is straightforward at the moment. Gson gson = new Gson(); - return gson.toJsonTree(src); + JsonElement elem = gson.toJsonTree(src); + JsonObject obj = elem.getAsJsonObject(); + if (src.isSerializable()) { + obj.add("r", gson.toJsonTree(src.get())); + } + return obj; } @Override From 067da3703af14d614b835f5117bb04fbbe9bde7c Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 21 Mar 2016 15:46:03 -0400 Subject: [PATCH 25/37] Added vfs resource pool. --- .../zeppelin/resource/VFSResourcePool.java | 218 ++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java new file mode 100644 index 00000000000..ce0a01a5504 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -0,0 +1,218 @@ +package org.apache.zeppelin.resource; + +import org.slf4j.Logger; +import org.apache.zeppelin.conf.ZeppelinConfiguration; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; + +import javax.management.RuntimeErrorException; + +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSelectInfo; +import org.apache.commons.vfs2.FileSelector; +import org.apache.commons.vfs2.FileSystemException; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.commons.vfs2.FileType; +import org.apache.commons.vfs2.NameScope; +import org.apache.commons.vfs2.VFS; + +/** + * Resource pool that saves resources to the local file system. + * + */ +public class VFSResourcePool extends DistributedResourcePool { + Logger logger = LoggerFactory.getLogger(VFSResourcePool.class); + + + @Override + public void put(String name, Object object) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists()) { + resultDir.createFolder(); + } + + if (!isDirectory(resultDir)) { + throw new IOException(resultDir.getName().toString() + " is not a directory"); + } + Gson gson = new Gson(); + + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + // false means not appending. creates file if not exists + OutputStream out = resultFile.getContent().getOutputStream(false); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(object); + out.close(); + } catch (IOException e) { + super.put(name, object); + throw new RuntimeException(e); + } + } + + @Override + public Resource get(String name) { + return get(name, true); + } + + @Override + public Resource get(String noteId, String paragraphId, String name) { + Resource r = get(noteId, paragraphId, name, true); + return new Resource( + new ResourceId(this.id(), noteId, paragraphId, name), r.get()); + } + + @Override + public Resource get(String name, boolean remote) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists() || !isDirectory(resultDir)) + return null; + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + + InputStream instream = resultFile.getContent().getInputStream(); + ObjectInputStream ois = new ObjectInputStream(instream); + + try { + Object o = ois.readObject(); + return new Resource(new ResourceId(this.id(), name), o); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + } catch (IOException e) { + } + return super.get(name, remote); + } + + @Override + public Resource get(String noteId, String paragraphId, String name, boolean remote) { + return get(noteId + "___" + paragraphId + "___" + name, remote); + } + + @Override + public ResourceSet getAll() { + return super.getAll(true); + } + + @Override + public ResourceSet getAll(boolean remote) { + return super.getAll(remote); + } + + @Override + public void put(String noteId, String paragraphId, String name, Object object) { + put(noteId + "___" + paragraphId + "___name", object); + } + + @Override + public Resource remove(String name) { + try { + Resource r = get(name); + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (resultDir.exists()) { + resultDir.delete(new FileSelector() { + @Override + public boolean traverseDescendents(FileSelectInfo fileInfo) throws Exception { + return true; + } + + @Override + public boolean includeFile(FileSelectInfo fileInfo) throws Exception { + return true; + } + }); + } + return r; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Resource remove(String noteId, String paragraphId, String name) { + return remove(noteId + "___" + paragraphId + "___" + name); + } + + private FileSystemManager fsManager; + private URI filesystemRoot; + public VFSResourcePool(String id, ResourcePoolConnector connector, Properties property) { + super(id, connector, property); + try { + this.filesystemRoot = new + URI(property.getProperty("Resource_Path", "notebook/zeppelin_resources")); + } catch (URISyntaxException e1) { + throw new RuntimeException(e1); + } + + if (filesystemRoot.getScheme() == null) { // it is local path + try { + this.filesystemRoot = new URI(new File( filesystemRoot.getPath()).getAbsolutePath()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } else { + this.filesystemRoot = filesystemRoot; + } + + try { + fsManager = VFS.getManager(); + FileObject file; + file = fsManager.resolveFile(filesystemRoot.getPath()); + if (!file.exists()) { + logger.info("Notebook dir doesn't exist, create."); + file.createFolder(); + } + } catch (FileSystemException e) { + throw new RuntimeException("Unable to load new file system."); + } + } + + private FileObject getRootDir() throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + // Does nothing if the folder already exists. + rootDir.createFolder(); + + + if (!isDirectory(rootDir)) { + throw new IOException("Root path is not a directory"); + } + + return rootDir; + } + + private boolean isDirectory(FileObject fo) throws IOException { + if (fo == null) return false; + if (fo.getType() == FileType.FOLDER) { + return true; + } else { + return false; + } + } + + private String getPath(String path) { + if (path == null || path.trim().length() == 0) { + return filesystemRoot.toString(); + } + if (path.startsWith("/")) { + return filesystemRoot.toString() + path; + } else { + return filesystemRoot.toString() + "/" + path; + } + } +} From 0cfeb33364d1f8779f616373eaf780d3ac62f6af Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 5 Apr 2016 16:37:40 -0400 Subject: [PATCH 26/37] Added pluggable resource pool. --- zeppelin-interpreter/pom.xml | 12 ++++- .../remote/RemoteInterpreterServer.java | 51 +++++++++++++++---- .../resource/DistributedResourcePool.java | 6 ++- .../zeppelin/resource/VFSResourcePool.java | 33 +++++++++--- 4 files changed, 85 insertions(+), 17 deletions(-) diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 67b4d5fbc9a..da1f8d023ef 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -214,8 +214,18 @@ + + org.apache.commons + commons-vfs2 + 2.0 + + + plexus-utils + org.codehaus.plexus + + + - diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 6e369c0694a..db0cb366b1b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -134,17 +134,46 @@ public static void main(String[] args) System.exit(0); } + private DistributedResourcePool getResourcePool() + /* InterpreterGroup group, + Properties prop, + RemoteInterpreterEventClient client) */ + throws TException { + if (resourcePool != null) + return resourcePool; + try { + String resourcePoolClassName = (String) interpreterGroup.getProperty() + .getOrDefault("ResourcePoolClass", + "org.apache.zeppelin.resource.DistributedResourcePool"); + logger.debug("Getting resource pool {}", resourcePoolClassName); + Class resourcePoolClass = Class.forName(resourcePoolClassName); + + Constructor constructor = resourcePoolClass + .getConstructor(new Class[] {String.class, + ResourcePoolConnector.class, + Properties.class }); + resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(), + this.eventClient, + interpreterGroup.getProperty()); + interpreterGroup.setResourcePool(resourcePool); + return resourcePool; + } catch (SecurityException | NoSuchMethodException | + InstantiationException | IllegalAccessException | + IllegalArgumentException | InvocationTargetException | + ClassNotFoundException e) { + logger.error(e.toString(), e); + throw new TException(e); + } + } @Override public void createInterpreter(String interpreterGroupId, String noteId, String className, - Map properties) throws TException { + Map properties) throws TException { if (interpreterGroup == null) { interpreterGroup = new InterpreterGroup(interpreterGroupId); angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this); - resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient); interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); - interpreterGroup.setResourcePool(resourcePool); } try { @@ -170,7 +199,12 @@ public void createInterpreter(String interpreterGroupId, String noteId, String } logger.info("Instantiate interpreter {}", className); + + interpreterGroup.setResourcePool(getResourcePool()); + repl.setInterpreterGroup(interpreterGroup); + + //setResourcePool(interpreterGroup, p, eventClient); } catch (ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { @@ -178,7 +212,6 @@ public void createInterpreter(String interpreterGroupId, String noteId, String throw new TException(e); } } - private Interpreter getInterpreter(String noteId, String className) throws TException { if (interpreterGroup == null) { throw new TException( @@ -388,7 +421,7 @@ public void cancel(String noteId, String className, RemoteInterpreterContext int @Override public int getProgress(String noteId, String className, - RemoteInterpreterContext interpreterContext) + RemoteInterpreterContext interpreterContext) throws TException { Interpreter intp = getInterpreter(noteId, className); return intp.getProgress(convert(interpreterContext)); @@ -411,7 +444,7 @@ public List completion(String noteId, String className, String buf, int private InterpreterContext convert(RemoteInterpreterContext ric) { List contextRunners = new LinkedList(); List runners = gson.fromJson(ric.getRunners(), - new TypeToken>() { + new TypeToken>() { }.getType()); for (InterpreterContextRunner r : runners) { @@ -572,7 +605,7 @@ public void angularObjectUpdate(String name, String noteId, String paragraphId, if (value == null) { try { value = gson.fromJson(object, - new TypeToken>() { + new TypeToken>() { }.getType()); } catch (Exception e) { // it's not a generic json object, too. okay, proceed to threat as a string type @@ -608,7 +641,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str try { value = gson.fromJson(object, new TypeToken>() { - }.getType()); + }.getType()); } catch (Exception e) { // it's okay. proceed to treat object as a string logger.debug(e.getMessage(), e); @@ -624,7 +657,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str @Override public void angularObjectRemove(String name, String noteId, String paragraphId) throws - TException { + TException { AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); registry.remove(name, noteId, paragraphId, false); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java index ba31f017b83..57cef050f5f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java @@ -16,16 +16,20 @@ */ package org.apache.zeppelin.resource; +import java.util.Properties; + /** * distributed resource pool */ public class DistributedResourcePool extends LocalResourcePool { private final ResourcePoolConnector connector; + protected Properties property; - public DistributedResourcePool(String id, ResourcePoolConnector connector) { + public DistributedResourcePool(String id, ResourcePoolConnector connector, Properties property) { super(id); this.connector = connector; + this.property = property; } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java index ce0a01a5504..045c8261820 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -1,7 +1,7 @@ + package org.apache.zeppelin.resource; import org.slf4j.Logger; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import java.io.File; import java.io.IOException; @@ -89,7 +89,16 @@ public Resource get(String name, boolean remote) { try { Object o = ois.readObject(); - return new Resource(new ResourceId(this.id(), name), o); + String[] splitName = name.split("___"); + ResourceId r = null; + if (splitName.length == 3) + { + r = new ResourceId(this.id(), splitName[0], splitName[1], splitName[2]); + } + else { + r = new ResourceId(this.id(), name); + } + return new Resource(r, o); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } @@ -106,12 +115,26 @@ public Resource get(String noteId, String paragraphId, String name, boolean remo @Override public ResourceSet getAll() { - return super.getAll(true); + return getAll(true); } @Override public ResourceSet getAll(boolean remote) { - return super.getAll(remote); + ResourceSet resources = new ResourceSet(); + try { + FileObject rootDir = getRootDir(); + for (FileObject resourceDir: rootDir.getChildren()) + { + if (resourceDir.getType() == FileType.FOLDER) + resources.add(get(resourceDir.getName().getBaseName())); + } + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + if (remote) + resources.addAll(super.getAll(remote)); + return resources; } @Override @@ -187,8 +210,6 @@ private FileObject getRootDir() throws IOException { FileObject rootDir = fsManager.resolveFile(getPath("/")); // Does nothing if the folder already exists. rootDir.createFolder(); - - if (!isDirectory(rootDir)) { throw new IOException("Root path is not a directory"); } From 52cf1be713c1462d768f8df2ad804e5a551d9298 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 09:00:53 -0400 Subject: [PATCH 27/37] Added missing rename. --- .../main/java/org/apache/zeppelin/resource/VFSResourcePool.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {zeppelin-zengine => zeppelin-interpreter}/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java (100%) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java similarity index 100% rename from zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java From a1b0bbda1105453656bdaf1977761272aba0c59e Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 11:11:49 -0400 Subject: [PATCH 28/37] Added a constructor to DistributedResourcePools to fix tests. --- .../apache/zeppelin/resource/DistributedResourcePool.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java index 57cef050f5f..12292f202ad 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java @@ -25,6 +25,13 @@ public class DistributedResourcePool extends LocalResourcePool { private final ResourcePoolConnector connector; protected Properties property; + + public DistributedResourcePool(String id, ResourcePoolConnector connector) { + super(id); + this.connector = connector; + this.property = new Properties(); + } + public DistributedResourcePool(String id, ResourcePoolConnector connector, Properties property) { super(id); From f24127299ee9890b303268cca10c07fdfd60caf9 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 12:09:08 -0400 Subject: [PATCH 29/37] Switched to java 7 method for java.util.properties. --- .../zeppelin/interpreter/remote/RemoteInterpreterServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index db0cb366b1b..4505bffa331 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -143,7 +143,7 @@ private DistributedResourcePool getResourcePool() return resourcePool; try { String resourcePoolClassName = (String) interpreterGroup.getProperty() - .getOrDefault("ResourcePoolClass", + .getProperty("ResourcePoolClass", "org.apache.zeppelin.resource.DistributedResourcePool"); logger.debug("Getting resource pool {}", resourcePoolClassName); Class resourcePoolClass = Class.forName(resourcePoolClassName); From 3ed89b083f2aa6f1183c65c8618695bba01e248d Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 13:47:02 -0400 Subject: [PATCH 30/37] Removed the need for resource pools during tests. --- .../remote/RemoteInterpreterServer.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 4505bffa331..89066671c4d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -157,12 +157,10 @@ private DistributedResourcePool getResourcePool() interpreterGroup.getProperty()); interpreterGroup.setResourcePool(resourcePool); return resourcePool; - } catch (SecurityException | NoSuchMethodException | - InstantiationException | IllegalAccessException | - IllegalArgumentException | InvocationTargetException | - ClassNotFoundException e) { + } catch (Exception e) { logger.error(e.toString(), e); - throw new TException(e); + return null; + // throw new TException(e); } } @@ -386,11 +384,13 @@ protected Object jobRun() throws Throwable { } // put result into resource pool - context.getResourcePool().put( - context.getNoteId(), - context.getParagraphId(), - WellKnownResourceName.ParagraphResult.toString(), - combinedResult); + if (context.getResourcePool() != null) { + context.getResourcePool().put( + context.getNoteId(), + context.getParagraphId(), + WellKnownResourceName.ParagraphResult.toString(), + combinedResult); + } return combinedResult; } finally { InterpreterContext.remove(); From 0fa4b3288ba95b8ab8f8d69d0239a72c8b707429 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 14:37:12 -0400 Subject: [PATCH 31/37] Adding missing license file. --- .../zeppelin/resource/VFSResourcePool.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java index 045c8261820..856b03a15b7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -1,4 +1,19 @@ - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.resource; import org.slf4j.Logger; From 69d11d30f7b7f110cbe81a5b66d24dd5a30eb8b7 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 11 Apr 2016 11:03:14 -0400 Subject: [PATCH 32/37] Set mock interpreter to register properly. --- .../interpreter/remote/RemoteInterpreterServer.java | 13 ++++++++----- .../remote/mock/MockInterpreterResourcePool.java | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 89066671c4d..2ec532b648a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -142,24 +142,27 @@ private DistributedResourcePool getResourcePool() if (resourcePool != null) return resourcePool; try { - String resourcePoolClassName = (String) interpreterGroup.getProperty() - .getProperty("ResourcePoolClass", + Properties prop = interpreterGroup.getProperty(); + //Happens during tests. + if (prop == null) + prop = new Properties(); + String resourcePoolClassName = (String) prop.getProperty("ResourcePoolClass", "org.apache.zeppelin.resource.DistributedResourcePool"); logger.debug("Getting resource pool {}", resourcePoolClassName); Class resourcePoolClass = Class.forName(resourcePoolClassName); - + Constructor constructor = resourcePoolClass .getConstructor(new Class[] {String.class, ResourcePoolConnector.class, Properties.class }); resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(), this.eventClient, - interpreterGroup.getProperty()); + prop); interpreterGroup.setResourcePool(resourcePool); return resourcePool; } catch (Exception e) { logger.error(e.toString(), e); - return null; + return new DistributedResourcePool(interpreterGroup.getId(), this.eventClient); // throw new TException(e); } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java index 3826b903115..8e408da58bd 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -37,7 +37,7 @@ public class MockInterpreterResourcePool extends Interpreter { Interpreter.register( "resourcePoolTest", "resourcePool", - MockInterpreterA.class.getName(), + MockInterpreterResourcePool.class.getName(), new InterpreterPropertyBuilder() .add("p1", "v1", "property1").build()); From df46376213a3f9c9d766f0f108fdac72125cdc93 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Fri, 15 Apr 2016 13:38:21 -0400 Subject: [PATCH 33/37] Switched interpreter to use default configuration. --- .../remote/RemoteInterpreterServer.java | 4 ++-- .../zeppelin/conf/ZeppelinConfiguration.java | 2 ++ .../interpreter/InterpreterFactory.java | 21 ++++++++++++++++--- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 2ec532b648a..b19fd9cd09e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -146,8 +146,8 @@ private DistributedResourcePool getResourcePool() //Happens during tests. if (prop == null) prop = new Properties(); - String resourcePoolClassName = (String) prop.getProperty("ResourcePoolClass", - "org.apache.zeppelin.resource.DistributedResourcePool"); + String resourcePoolClassName = (String) prop.getProperty( + "zeppelin.interpreter.resourcePoolClass"); logger.debug("Getting resource pool {}", resourcePoolClassName); Class resourcePoolClass = Class.forName(resourcePoolClassName); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index eafbbbe224d..9fc32cd5a13 100755 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -504,6 +504,8 @@ public static enum ConfVars { // i.e. http://localhost:8080 ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"), ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true), + ZEPPELIN_RESOURCE_POOL_CLASS("zeppelin.interpreter.resourcePoolClass", + "org.apacheorg.apache.zeppelin.resource.DistributedResourcePool"); ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"); private String varName; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 269b54a1fce..e060a45ab80 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -198,6 +198,8 @@ private void init() throws InterpreterException, IOException, RepositoryExceptio logger.info("Interpreter setting group {} : id={}, name={}", setting.getGroup(), settingId, setting.getName()); } + + } private void loadFromFile() throws IOException { @@ -234,16 +236,19 @@ private void loadFromFile() throws IOException { // enable/disable option on GUI). // previously created setting should turn this feature on here. setting.getOption().setRemote(true); - + + Properties mergedProperties = + this.getInterpreterPropertiesFromZeppelinConf(); + mergedProperties.putAll(setting.getProperties());; + InterpreterSetting intpSetting = new InterpreterSetting( setting.id(), setting.getName(), setting.getGroup(), setting.getInterpreterInfos(), - setting.getProperties(), + mergedProperties, setting.getDependencies(), setting.getOption()); - InterpreterGroup interpreterGroup = createInterpreterGroup(setting.id(), setting.getOption()); intpSetting.setInterpreterGroup(interpreterGroup); @@ -260,6 +265,16 @@ private void loadFromFile() throws IOException { } } } + + private Properties getInterpreterPropertiesFromZeppelinConf() { + Iterator keySet = this.conf.getKeys("zeppelin.interpreter"); + Properties p = new Properties(); + while (keySet.hasNext()) { + String key = keySet.next(); + p.setProperty(key, this.conf.getProperty(key).toString()); + } + return p; + } private void loadInterpreterDependencies(InterpreterSetting intSetting) throws IOException, RepositoryException { From bbeacaca610fde806c90156b1dcfa73554903c29 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 18 Apr 2016 11:45:40 -0400 Subject: [PATCH 34/37] Fixed missing comma. --- .../java/org/apache/zeppelin/conf/ZeppelinConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 81e3ffad8aa..17e40769b40 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -509,7 +509,7 @@ public static enum ConfVars { ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"), ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true), ZEPPELIN_RESOURCE_POOL_CLASS("zeppelin.interpreter.resourcePoolClass", - "org.apacheorg.apache.zeppelin.resource.DistributedResourcePool"); + "org.apacheorg.apache.zeppelin.resource.DistributedResourcePool"), ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"); private String varName; From 8fef1ca6325fa4457b025a191a9360ed9bb85cc8 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 18 Apr 2016 14:20:42 -0400 Subject: [PATCH 35/37] Added if-modified-by header support to notebookrestapi. --- .../org/apache/zeppelin/rest/NotebookRestApi.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 9ac3c3b7d51..da3617eee13 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -22,6 +22,7 @@ import javax.ws.rs.DELETE; import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; @@ -32,6 +33,7 @@ import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.Status; +import org.apache.commons.httpclient.util.DateUtil; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Note; @@ -693,10 +695,11 @@ public Response getParagraphsWithResults() @GET @Path("{notebookId}/paragraph/{paragraphId}/result") public Response getParagraphResult(@PathParam("notebookId") String notebookId, - @PathParam("paragraphId") String paragraphId) + @PathParam("paragraphId") String paragraphId, + @HeaderParam("If-Modified-Since") String modificationDateString) throws IOException { LOG.info("Downloading paragraph {} {}", notebookId, paragraphId); - + Note note = notebook.getNote(notebookId); if (note == null) { return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); @@ -706,11 +709,17 @@ public Response getParagraphResult(@PathParam("notebookId") String notebookId, if (p == null) { return new JsonResponse(Status.NOT_FOUND, "paragraph not found.").build(); } + try { + Date modificationDate = DateUtil.parseDate(modificationDateString); + if(!p.getDateFinished().after(modificationDate)) + return Response.ok() + .status(Response.Status.NOT_MODIFIED) + .build(); + } catch (Exception e) { } ResponseBuilder builder = Response.ok(p.getResultFromPool().message()) .header("Content-Disposition", "attachment; filename=" + paragraphId + ".txt"); return builder.build(); } - } From 3993c0d5814f24b1c15f8197f9bd35bbfd024b3b Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 19 Apr 2016 13:51:00 -0400 Subject: [PATCH 36/37] Added fixes to tests --- .../interpreter/remote/RemoteInterpreterServer.java | 11 ++++++----- zeppelin-zengine/pom.xml | 6 ++++++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 0347e5993db..3de4bb47d3c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -158,15 +158,16 @@ private DistributedResourcePool getResourcePool() resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(), this.eventClient, prop); - interpreterGroup.setResourcePool(resourcePool); - return resourcePool; } catch (Exception e) { - logger.error(e.toString(), e); - return new DistributedResourcePool(interpreterGroup.getId(), this.eventClient); + logger.error("Did not find resource pool. Using DistributedResourcePool"); + resourcePool = new DistributedResourcePool(interpreterGroup.getId(), this.eventClient); // throw new TException(e); + } finally { + interpreterGroup.setResourcePool(resourcePool); + return resourcePool; } } - + @Override public void createInterpreter(String interpreterGroupId, String noteId, String className, diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index 6c0275b344a..23fea0b09ec 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -107,6 +107,12 @@ org.apache.commons commons-vfs2 2.0 + + + plexus-utils + org.codehaus.plexus + + From 462cde8b657eea4f9806e1379422e5f13f5835fe Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 20 Apr 2016 13:13:51 -0400 Subject: [PATCH 37/37] Fixes a style error. --- .../main/java/org/apache/zeppelin/rest/NotebookRestApi.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index da3617eee13..1ae38541288 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -695,8 +695,8 @@ public Response getParagraphsWithResults() @GET @Path("{notebookId}/paragraph/{paragraphId}/result") public Response getParagraphResult(@PathParam("notebookId") String notebookId, - @PathParam("paragraphId") String paragraphId, - @HeaderParam("If-Modified-Since") String modificationDateString) + @PathParam("paragraphId") String paragraphId, + @HeaderParam("If-Modified-Since") String modificationDateString) throws IOException { LOG.info("Downloading paragraph {} {}", notebookId, paragraphId); @@ -711,7 +711,7 @@ public Response getParagraphResult(@PathParam("notebookId") String notebookId, } try { Date modificationDate = DateUtil.parseDate(modificationDateString); - if(!p.getDateFinished().after(modificationDate)) + if (!p.getDateFinished().after(modificationDate)) return Response.ok() .status(Response.Status.NOT_MODIFIED) .build();