Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 28 additions & 27 deletions spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public void run(List<Object> paragraphIdOrIdx) {

/**
* Run paragraphs
* @param paragraphIdOrIdxs list of paragraph id or idx
* @param paragraphIdOrIdx list of paragraph id or idx
*/
public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
for (Object idOrIdx : paragraphIdOrIdx) {
Expand Down Expand Up @@ -475,17 +475,17 @@ private AngularObject getAngularObject(String name, InterpreterContext interpret
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
String noteId = interpreterContext.getNoteId();
// try get local object
AngularObject ao = registry.get(name, interpreterContext.getNoteId());
AngularObject ao = registry.get(name, interpreterContext.getNoteId(), null);
if (ao == null) {
// then global object
ao = registry.get(name, null);
ao = registry.get(name, null, null);
}
return ao;
}


/**
* Get angular object. Look up local registry first and then global registry
* Get angular object. Look up notebook scope first and then global scope
* @param name variable name
* @return value
*/
Expand All @@ -499,13 +499,13 @@ public Object angular(String name) {
}

/**
* Get angular object. Look up global registry
* Get angular object. Look up global scope
* @param name variable name
* @return value
*/
public Object angularGlobal(String name) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
AngularObject ao = registry.get(name, null);
AngularObject ao = registry.get(name, null, null);
if (ao == null) {
return null;
} else {
Expand All @@ -514,7 +514,7 @@ public Object angularGlobal(String name) {
}

/**
* Create angular variable in local registry and bind with front end Angular display system.
* Create angular variable in notebook scope and bind with front end Angular display system.
* If variable exists, it'll be overwritten.
* @param name name of the variable
* @param o value
Expand All @@ -524,7 +524,7 @@ public void angularBind(String name, Object o) {
}

/**
* Create angular variable in global registry and bind with front end Angular display system.
* Create angular variable in global scope and bind with front end Angular display system.
* If variable exists, it'll be overwritten.
* @param name name of the variable
* @param o value
Expand All @@ -534,7 +534,7 @@ public void angularBindGlobal(String name, Object o) {
}

/**
* Create angular variable in local registry and bind with front end Angular display system.
* Create angular variable in local scope and bind with front end Angular display system.
* If variable exists, value will be overwritten and watcher will be added.
* @param name name of variable
* @param o value
Expand All @@ -545,7 +545,7 @@ public void angularBind(String name, Object o, AngularObjectWatcher watcher) {
}

/**
* Create angular variable in global registry and bind with front end Angular display system.
* Create angular variable in global scope and bind with front end Angular display system.
* If variable exists, value will be overwritten and watcher will be added.
* @param name name of variable
* @param o value
Expand All @@ -556,7 +556,7 @@ public void angularBindGlobal(String name, Object o, AngularObjectWatcher watche
}

/**
* Add watcher into angular variable (local registry)
* Add watcher into angular variable (local scope)
* @param name name of the variable
* @param watcher watcher
*/
Expand All @@ -565,7 +565,7 @@ public void angularWatch(String name, AngularObjectWatcher watcher) {
}

/**
* Add watcher into angular variable (global registry)
* Add watcher into angular variable (global scope)
* @param name name of the variable
* @param watcher watcher
*/
Expand Down Expand Up @@ -649,23 +649,24 @@ public void angularUnbindGlobal(String name) {
}

/**
* Create angular variable in local registry and bind with front end Angular display system.
* Create angular variable in notebook scope and bind with front end Angular display system.
* If variable exists, it'll be overwritten.
* @param name name of the variable
* @param o value
*/
private void angularBind(String name, Object o, String noteId) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();

if (registry.get(name, noteId) == null) {
registry.add(name, o, noteId);
if (registry.get(name, noteId, null) == null) {
registry.add(name, o, noteId, null);
} else {
registry.get(name, noteId).set(o);
registry.get(name, noteId, null).set(o);
}
}

/**
* Create angular variable in local registry and bind with front end Angular display system.
* Create angular variable in notebook scope and bind with front end Angular display
* system.
* If variable exists, value will be overwritten and watcher will be added.
* @param name name of variable
* @param o value
Expand All @@ -674,10 +675,10 @@ private void angularBind(String name, Object o, String noteId) {
private void angularBind(String name, Object o, String noteId, AngularObjectWatcher watcher) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();

if (registry.get(name, noteId) == null) {
registry.add(name, o, noteId);
if (registry.get(name, noteId, null) == null) {
registry.add(name, o, noteId, null);
} else {
registry.get(name, noteId).set(o);
registry.get(name, noteId, null).set(o);
}
angularWatch(name, watcher);
}
Expand All @@ -690,8 +691,8 @@ private void angularBind(String name, Object o, String noteId, AngularObjectWatc
private void angularWatch(String name, String noteId, AngularObjectWatcher watcher) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();

if (registry.get(name, noteId) != null) {
registry.get(name, noteId).addWatcher(watcher);
if (registry.get(name, noteId, null) != null) {
registry.get(name, noteId, null).addWatcher(watcher);
}
}

Expand Down Expand Up @@ -729,8 +730,8 @@ public void watch(Object oldObject, Object newObject,
*/
private void angularUnwatch(String name, String noteId, AngularObjectWatcher watcher) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
if (registry.get(name, noteId) != null) {
registry.get(name, noteId).removeWatcher(watcher);
if (registry.get(name, noteId, null) != null) {
registry.get(name, noteId, null).removeWatcher(watcher);
}
}

Expand All @@ -740,8 +741,8 @@ private void angularUnwatch(String name, String noteId, AngularObjectWatcher wat
*/
private void angularUnwatch(String name, String noteId) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
if (registry.get(name, noteId) != null) {
registry.get(name, noteId).clearAllWatchers();
if (registry.get(name, noteId, null) != null) {
registry.get(name, noteId, null).clearAllWatchers();
}
}

Expand All @@ -751,6 +752,6 @@ private void angularUnwatch(String name, String noteId) {
*/
private void angularUnbind(String name, String noteId) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
registry.remove(name, noteId);
registry.remove(name, noteId, null);
}
}
4 changes: 2 additions & 2 deletions testing/startSparkCluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ if [ ! -d "${SPARK_HOME}" ]; then
echo "${SPARK_VERSION}" | grep "^1.[12].[0-9]" > /dev/null
if [ $? -eq 0 ]; then
# spark 1.1.x and spark 1.2.x can be downloaded from archive
wget http://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
wget -q http://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will help reduce travis log length quite a bit!

else
# spark 1.3.x and later can be downloaded from mirror
# get download address from mirror
MIRROR_INFO=$(curl -s "http://www.apache.org/dyn/closer.cgi/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz?asjson=1")

PREFFERED=$(echo "${MIRROR_INFO}" | grep preferred | sed 's/[^"]*.preferred.: .\([^"]*\).*/\1/g')
PATHINFO=$(echo "${MIRROR_INFO}" | grep path_info | sed 's/[^"]*.path_info.: .\([^"]*\).*/\1/g')
wget "${PREFFERED}${PATHINFO}"
wget -q "${PREFFERED}${PATHINFO}"
fi
tar zxf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.slf4j.LoggerFactory;

/**
*
* AngularObject provides binding between back-end (interpreter) and front-end
* User provided object will automatically synchronized with front-end side.
* i.e. update from back-end will be sent to front-end, update from front-end will sent-to backend
*
* @param <T>
*/
Expand All @@ -39,27 +41,70 @@ public class AngularObject<T> {
= new LinkedList<AngularObjectWatcher>();

private String noteId; // noteId belonging to. null for global scope

protected AngularObject(String name, T o, String noteId,
private String paragraphId; // paragraphId belongs to. null for notebook scope

/**
* To create new AngularObject, use AngularObjectRegistry.add()
*
* @param name name of object
* @param o reference to user provided object to sent to front-end
* @param noteId noteId belongs to. can be null
* @param paragraphId paragraphId belongs to. can be null
* @param listener event listener
*/
protected AngularObject(String name, T o, String noteId, String paragraphId,
AngularObjectListener listener) {
this.name = name;
this.noteId = noteId;
this.paragraphId = paragraphId;
this.listener = listener;
object = o;
}

/**
* Get name of this object
* @return name
*/
public String getName() {
return name;
}


/**
* Set noteId
* @param noteId noteId belongs to. can be null
*/
public void setNoteId(String noteId) {
this.noteId = noteId;
}


/**
* Get noteId
* @return noteId
*/
public String getNoteId() {
return noteId;
}


/**
* get ParagraphId
* @return paragraphId
*/
public String getParagraphId() {
return paragraphId;
}

/**
* Set paragraphId
* @param paragraphId paragraphId. can be null
*/
public void setParagraphId(String paragraphId) {
this.paragraphId = paragraphId;
}

/**
* Check if it is global scope object
* @return true it is global scope
*/
public boolean isGlobal() {
return noteId == null;
}
Expand All @@ -70,26 +115,47 @@ public boolean equals(Object o) {
AngularObject ao = (AngularObject) o;
if (noteId == null && ao.noteId == null ||
(noteId != null && ao.noteId != null && noteId.equals(ao.noteId))) {
return name.equals(ao.name);
if (paragraphId == null && ao.paragraphId == null ||
(paragraphId != null && ao.paragraphId != null && paragraphId.equals(ao.paragraphId))) {
return name.equals(ao.name);
}
}
}
return false;
}

/**
* Get value
* @return
*/
public Object get() {
return object;
}

/**
* fire updated() event for listener
* Note that it does not invoke watcher.watch()
*/
public void emit(){
if (listener != null) {
listener.updated(this);
}
}


/**
* Set value
* @param o reference to new user provided object
*/
public void set(T o) {
set(o, true);
}

/**
* Set value
* @param o reference to new user provided object
* @param emit false on skip firing event for listener. note that it does not skip invoke
* watcher.watch() in any case
*/
public void set(T o, boolean emit) {
final T before = object;
final T after = o;
Expand Down Expand Up @@ -119,26 +185,47 @@ public void run() {
}
}

/**
* Set event listener for this object
* @param listener
*/
public void setListener(AngularObjectListener listener) {
this.listener = listener;
}

/**
* Get event listener of this object
* @return event listener
*/
public AngularObjectListener getListener() {
return listener;
}

/**
* Add a watcher for this object.
* Multiple watcher can be registered.
*
* @param watcher watcher to add
*/
public void addWatcher(AngularObjectWatcher watcher) {
synchronized (watchers) {
watchers.add(watcher);
}
}

/**
* Remove a watcher from this object
* @param watcher watcher to remove
*/
public void removeWatcher(AngularObjectWatcher watcher) {
synchronized (watchers) {
watchers.remove(watcher);
}
}

/**
* Remove all watchers from this object
*/
public void clearAllWatchers() {
synchronized (watchers) {
watchers.clear();
Expand Down
Loading