Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -722,5 +728,4 @@
</build>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,11 @@ private AngularObject getAngularObject(String name, InterpreterContext interpret
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
String noteId = interpreterContext.getNoteId();
// try get local object
AngularObject ao = registry.get(name, interpreterContext.getNoteId(), null);
AngularObject paragraphAo = registry.get(name, noteId, interpreterContext.getParagraphId());
AngularObject noteAo = registry.get(name, noteId, null);

AngularObject ao = paragraphAo != null ? paragraphAo : noteAo;

if (ao == null) {
// then global object
ao = registry.get(name, null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;

import org.apache.zeppelin.scheduler.ExecutorFactory;
Expand All @@ -35,25 +36,33 @@
public class AngularObject<T> {
private String name;
private T object;

private transient AngularObjectListener listener;
private transient List<AngularObjectWatcher> watchers
= new LinkedList<AngularObjectWatcher>();

private transient List<AngularObjectWatcher> watchers = new LinkedList<AngularObjectWatcher>();

private String noteId; // noteId belonging to. null for global scope
private String paragraphId; // paragraphId belongs to. null for notebook scope

/**
* Public constructor, neccessary for the deserialization when using Thrift angularRegistryPush()
* Without public constructor, GSON library will instantiate the AngularObject using
* serialization so the <strong>watchers</strong> list won't be initialized and will throw
* NullPointerException the first time it is accessed
*/
public AngularObject() {
}

/**
* To create new AngularObject, use AngularObjectRegistry.add()
*
* @param name name of object
* @param o reference to user provided object to sent to front-end
* @param noteId noteId belongs to. can be null
* @param name name of object
* @param o reference to user provided object to sent to front-end
* @param noteId noteId belongs to. can be null
* @param paragraphId paragraphId belongs to. can be null
* @param listener event listener
* @param listener event listener
*/
protected AngularObject(String name, T o, String noteId, String paragraphId,
AngularObjectListener listener) {
AngularObjectListener listener) {
this.name = name;
this.noteId = noteId;
this.paragraphId = paragraphId;
Expand All @@ -63,6 +72,7 @@ protected AngularObject(String name, T o, String noteId, String paragraphId,

/**
* Get name of this object
*
* @return name
*/
public String getName() {
Expand All @@ -71,6 +81,7 @@ public String getName() {

/**
* Set noteId
*
* @param noteId noteId belongs to. can be null
*/
public void setNoteId(String noteId) {
Expand All @@ -79,6 +90,7 @@ public void setNoteId(String noteId) {

/**
* Get noteId
*
* @return noteId
*/
public String getNoteId() {
Expand All @@ -87,6 +99,7 @@ public String getNoteId() {

/**
* get ParagraphId
*
* @return paragraphId
*/
public String getParagraphId() {
Expand All @@ -95,6 +108,7 @@ public String getParagraphId() {

/**
* Set paragraphId
*
* @param paragraphId paragraphId. can be null
*/
public void setParagraphId(String paragraphId) {
Expand All @@ -103,29 +117,31 @@ public void setParagraphId(String paragraphId) {

/**
* Check if it is global scope object
*
* @return true it is global scope
*/
public boolean isGlobal() {
return noteId == null;
}

@Override
public int hashCode() {
return Objects.hash(name, noteId, paragraphId);
}

@Override
public boolean equals(Object o) {
if (o instanceof AngularObject) {
AngularObject ao = (AngularObject) o;
if (noteId == null && ao.noteId == null ||
(noteId != null && ao.noteId != null && noteId.equals(ao.noteId))) {
if (paragraphId == null && ao.paragraphId == null ||
(paragraphId != null && ao.paragraphId != null && paragraphId.equals(ao.paragraphId))) {
return name.equals(ao.name);
}
}
}
return false;
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AngularObject<?> that = (AngularObject<?>) o;
return Objects.equals(name, that.name) &&
Objects.equals(noteId, that.noteId) &&
Objects.equals(paragraphId, that.paragraphId);
}

/**
* Get value
*
* @return
*/
public Object get() {
Expand All @@ -136,14 +152,15 @@ public Object get() {
* fire updated() event for listener
* Note that it does not invoke watcher.watch()
*/
public void emit(){
public void emit() {
if (listener != null) {
listener.updated(this);
}
}

/**
* Set value
*
* @param o reference to new user provided object
*/
public void set(T o) {
Expand All @@ -152,7 +169,8 @@ public void set(T o) {

/**
* Set value
* @param o reference to new user provided object
*
* @param o reference to new user provided object
* @param emit false on skip firing event for listener. note that it does not skip invoke
* watcher.watch() in any case
*/
Expand Down Expand Up @@ -187,6 +205,7 @@ public void run() {

/**
* Set event listener for this object
*
* @param listener
*/
public void setListener(AngularObjectListener listener) {
Expand All @@ -195,6 +214,7 @@ public void setListener(AngularObjectListener listener) {

/**
* Get event listener of this object
*
* @return event listener
*/
public AngularObjectListener getListener() {
Expand All @@ -215,6 +235,7 @@ public void addWatcher(AngularObjectWatcher watcher) {

/**
* Remove a watcher from this object
*
* @param watcher watcher to remove
*/
public void removeWatcher(AngularObjectWatcher watcher) {
Expand All @@ -232,4 +253,14 @@ public void clearAllWatchers() {
}
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("AngularObject{");
sb.append("noteId='").append(noteId).append('\'');
sb.append(", paragraphId='").append(paragraphId).append('\'');
sb.append(", object=").append(object);
sb.append(", name='").append(name).append('\'');
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,22 @@ public List<AngularObject> getAllWithGlobal(String noteId) {
return all;
}

/**
* Return the complete registry to be pushed to remote interpreter process
* @return registry map
*/
public Map<String, Map<String, AngularObject>> getRegistry() {
return this.registry;
}

/**
* Set the complete registry send from ZeppelinServer to remote interpreter
* @param registry
*/
public void setRegistry(Map<String, Map<String, AngularObject>> registry) {
this.registry = registry;
}

public String getInterpreterGroupId() {
return interpreterId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public AngularObject addAndNotifyRemoteProcess(String name, Object o, String not
Gson gson = new Gson();
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (!remoteInterpreterProcess.isRunning()) {
return null;
return super.add(name, o, noteId, paragraphId, true);
}

Client client = null;
Expand Down Expand Up @@ -109,7 +109,7 @@ public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, St
paragraphId) {
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (!remoteInterpreterProcess.isRunning()) {
return null;
return super.remove(name, noteId, paragraphId);
}

Client client = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
import java.util.Properties;

import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
Expand All @@ -44,6 +44,7 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;


/**
*
*/
Expand Down Expand Up @@ -141,8 +142,10 @@ private synchronized void init() {
for (Interpreter intp : this.getInterpreterGroup()) {
logger.info("Create remote interpreter {}", intp.getClassName());
client.createInterpreter(intp.getClassName(), (Map) property);

}

pushAngularObjectRegistryToRemote(client);

} catch (TException e) {
broken = true;
throw new InterpreterException(e);
Expand All @@ -154,8 +157,6 @@ private synchronized void init() {
initialized = true;
}



@Override
public void open() {
init();
Expand Down Expand Up @@ -362,4 +363,30 @@ private InterpreterResult convert(RemoteInterpreterResult result) {
Type.valueOf(result.getType()),
result.getMsg());
}

/**
* Push local angular object registry to
* remote interpreter. This method should be
* call ONLY inside the init() method
* @param client
* @throws TException
*/
void pushAngularObjectRegistryToRemote(Client client) throws TException {
final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
.getAngularObjectRegistry();

if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
.getRegistry();

logger.info("Push local angular object registry from ZeppelinServer to" +
" remote interpreter group {}", this.getInterpreterGroup().getId());

final java.lang.reflect.Type registryType = new TypeToken<Map<String,
Map<String, AngularObject>>>() {}.getType();

Gson gson = new Gson();
client.angularRegistryPush(gson.toJson(registry, registryType));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -612,4 +612,16 @@ public void angularObjectRemove(String name, String noteId, String paragraphId)
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
registry.remove(name, noteId, paragraphId, false);
}

@Override
public void angularRegistryPush(String registryAsString) throws TException {
try {
Map<String, Map<String, AngularObject>> deserializedRegistry = gson
.fromJson(registryAsString,
new TypeToken<Map<String, Map<String, AngularObject>>>() { }.getType());
interpreterGroup.getAngularObjectRegistry().setRegistry(deserializedRegistry);
} catch (Exception e) {
logger.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -51,7 +50,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-28")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

Expand Down
Loading