From c5be045986adb54aa5a4a64e303b22d9e9c2ec5c Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Sun, 24 Jan 2016 19:55:30 +0100 Subject: [PATCH 01/10] Add Thrift RPC method angularRegistryPush() --- .../interpreter/remote/RemoteInterpreter.java | 34 +- .../remote/RemoteInterpreterServer.java | 12 + .../thrift/RemoteInterpreterContext.java | 19 +- .../thrift/RemoteInterpreterEvent.java | 19 +- .../thrift/RemoteInterpreterEventType.java | 22 +- .../thrift/RemoteInterpreterResult.java | 19 +- .../thrift/RemoteInterpreterService.java | 756 +++++++++++++++++- .../thrift/RemoteInterpreterService.thrift | 4 +- .../remote/RemoteInterpreterTest.java | 31 + 9 files changed, 860 insertions(+), 56 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index d2a24e85a39..b3ebf06db09 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -23,6 +23,7 @@ import java.util.Properties; import org.apache.thrift.TException; +import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -30,9 +31,7 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterResult.Type; -import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; @@ -44,6 +43,7 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; + /** * */ @@ -141,8 +141,10 @@ private synchronized void init() { for (Interpreter intp : this.getInterpreterGroup()) { logger.info("Create remote interpreter {}", intp.getClassName()); client.createInterpreter(intp.getClassName(), (Map) property); - } + + pushAngularObjectRegistryToRemote(client); + } catch (TException e) { broken = true; throw new InterpreterException(e); @@ -154,8 +156,6 @@ private synchronized void init() { initialized = true; } - - @Override public void open() { init(); @@ -362,4 +362,28 @@ private InterpreterResult convert(RemoteInterpreterResult result) { Type.valueOf(result.getType()), result.getMsg()); } + + /** + * Push local angular object registry to + * remote interpreter. This method should be + * call ONLY inside the init() method + * @param client + * @throws TException + */ + void pushAngularObjectRegistryToRemote(Client client) throws TException { + final Map> registry = this.getInterpreterGroup() + .getAngularObjectRegistry().getRegistry(); + + if (registry != null) { + logger.info("Push local angular object registry from ZeppelinServer to" + + " remote interpreter group {}", + this.getInterpreterGroup().getId()); + + final java.lang.reflect.Type registryType = new TypeToken>>() {}.getType(); + + Gson gson = new Gson(); + client.angularRegistryPush(gson.toJson(registry, registryType)); + } + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index a59293b483b..69489c3073b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -612,4 +612,16 @@ public void angularObjectRemove(String name, String noteId, String paragraphId) AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); registry.remove(name, noteId, paragraphId, false); } + + @Override + public void angularRegistryPush(String registryAsString) throws TException { + try { + Map> deserializedRegistry = gson + .fromJson(registryAsString, + new TypeToken>>() { }.getType()); + interpreterGroup.getAngularObjectRegistry().setRegistry(deserializedRegistry); + } catch (Exception e) { + logger.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e); + } + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java index 175f482cadb..feb3235081c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -51,7 +50,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-28") public class RemoteInterpreterContext implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java index 79203fbec29..842ef5aab25 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -51,7 +50,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-28") public class RemoteInterpreterEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index d6503183d06..665f882f730 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -35,7 +34,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { ANGULAR_OBJECT_REMOVE(4), RUN_INTERPRETER_CONTEXT_RUNNER(5), OUTPUT_APPEND(6), - OUTPUT_UPDATE(7); + OUTPUT_UPDATE(7), + ANGULAR_REGISTRY_PUSH(8); private final int value; @@ -70,6 +70,8 @@ public static RemoteInterpreterEventType findByValue(int value) { return OUTPUT_APPEND; case 7: return OUTPUT_UPDATE; + case 8: + return ANGULAR_REGISTRY_PUSH; default: return null; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java index cc50f9c89fe..61052a54599 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -51,7 +50,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-28") public class RemoteInterpreterResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java index fbcc5140dbd..aec93b3e5f5 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -51,7 +50,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-28") public class RemoteInterpreterService { public interface Iface { @@ -84,6 +83,8 @@ public interface Iface { public void angularObjectRemove(String name, String noteId, String paragraphId) throws org.apache.thrift.TException; + public void angularRegistryPush(String registry) throws org.apache.thrift.TException; + } public interface AsyncIface { @@ -116,6 +117,8 @@ public interface AsyncIface { public void angularObjectRemove(String name, String noteId, String paragraphId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void angularRegistryPush(String registry, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + } public static class Client extends org.apache.thrift.TServiceClient implements Iface { @@ -449,6 +452,26 @@ public void recv_angularObjectRemove() throws org.apache.thrift.TException return; } + public void angularRegistryPush(String registry) throws org.apache.thrift.TException + { + send_angularRegistryPush(registry); + recv_angularRegistryPush(); + } + + public void send_angularRegistryPush(String registry) throws org.apache.thrift.TException + { + angularRegistryPush_args args = new angularRegistryPush_args(); + args.setRegistry(registry); + sendBase("angularRegistryPush", args); + } + + public void recv_angularRegistryPush() throws org.apache.thrift.TException + { + angularRegistryPush_result result = new angularRegistryPush_result(); + receiveBase(result, "angularRegistryPush"); + return; + } + } public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface { public static class Factory implements org.apache.thrift.async.TAsyncClientFactory { @@ -954,6 +977,38 @@ public void getResult() throws org.apache.thrift.TException { } } + public void angularRegistryPush(String registry, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + checkReady(); + angularRegistryPush_call method_call = new angularRegistryPush_call(registry, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class angularRegistryPush_call extends org.apache.thrift.async.TAsyncMethodCall { + private String registry; + public angularRegistryPush_call(String registry, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.registry = registry; + } + + public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { + prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("angularRegistryPush", org.apache.thrift.protocol.TMessageType.CALL, 0)); + angularRegistryPush_args args = new angularRegistryPush_args(); + args.setRegistry(registry); + args.write(prot); + prot.writeMessageEnd(); + } + + public void getResult() throws org.apache.thrift.TException { + if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new IllegalStateException("Method call not finished!"); + } + org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + (new Client(prot)).recv_angularRegistryPush(); + } + } + } public static class Processor extends org.apache.thrift.TBaseProcessor implements org.apache.thrift.TProcessor { @@ -981,6 +1036,7 @@ protected Processor(I iface, Map extends org.apache.thrift.ProcessFunction { + public angularRegistryPush() { + super("angularRegistryPush"); + } + + public angularRegistryPush_args getEmptyArgsInstance() { + return new angularRegistryPush_args(); + } + + protected boolean isOneway() { + return false; + } + + public angularRegistryPush_result getResult(I iface, angularRegistryPush_args args) throws org.apache.thrift.TException { + angularRegistryPush_result result = new angularRegistryPush_result(); + iface.angularRegistryPush(args.registry); + return result; + } + } + } public static class AsyncProcessor extends org.apache.thrift.TBaseAsyncProcessor { @@ -1292,6 +1368,7 @@ protected AsyncProcessor(I iface, Map extends org.apache.thrift.AsyncProcessFunction { + public angularRegistryPush() { + super("angularRegistryPush"); + } + + public angularRegistryPush_args getEmptyArgsInstance() { + return new angularRegistryPush_args(); + } + + public AsyncMethodCallback getResultHandler(final AsyncFrameBuffer fb, final int seqid) { + final org.apache.thrift.AsyncProcessFunction fcall = this; + return new AsyncMethodCallback() { + public void onComplete(Void o) { + angularRegistryPush_result result = new angularRegistryPush_result(); + try { + fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid); + return; + } catch (Exception e) { + LOGGER.error("Exception writing to internal frame buffer", e); + } + fb.close(); + } + public void onError(Exception e) { + byte msgType = org.apache.thrift.protocol.TMessageType.REPLY; + org.apache.thrift.TBase msg; + angularRegistryPush_result result = new angularRegistryPush_result(); + { + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); + } + try { + fcall.sendResponse(fb,msg,msgType,seqid); + return; + } catch (Exception ex) { + LOGGER.error("Exception writing to internal frame buffer", ex); + } + fb.close(); + } + }; + } + + protected boolean isOneway() { + return false; + } + + public void start(I iface, angularRegistryPush_args args, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws TException { + iface.angularRegistryPush(args.registry,resultHandler); + } + } + } public static class createInterpreter_args implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { @@ -12681,4 +12808,613 @@ public void read(org.apache.thrift.protocol.TProtocol prot, angularObjectRemove_ } + public static class angularRegistryPush_args implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("angularRegistryPush_args"); + + private static final org.apache.thrift.protocol.TField REGISTRY_FIELD_DESC = new org.apache.thrift.protocol.TField("registry", org.apache.thrift.protocol.TType.STRING, (short)1); + + private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new angularRegistryPush_argsStandardSchemeFactory()); + schemes.put(TupleScheme.class, new angularRegistryPush_argsTupleSchemeFactory()); + } + + public String registry; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + REGISTRY((short)1, "registry"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // REGISTRY + return REGISTRY; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.REGISTRY, new org.apache.thrift.meta_data.FieldMetaData("registry", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(angularRegistryPush_args.class, metaDataMap); + } + + public angularRegistryPush_args() { + } + + public angularRegistryPush_args( + String registry) + { + this(); + this.registry = registry; + } + + /** + * Performs a deep copy on other. + */ + public angularRegistryPush_args(angularRegistryPush_args other) { + if (other.isSetRegistry()) { + this.registry = other.registry; + } + } + + public angularRegistryPush_args deepCopy() { + return new angularRegistryPush_args(this); + } + + @Override + public void clear() { + this.registry = null; + } + + public String getRegistry() { + return this.registry; + } + + public angularRegistryPush_args setRegistry(String registry) { + this.registry = registry; + return this; + } + + public void unsetRegistry() { + this.registry = null; + } + + /** Returns true if field registry is set (has been assigned a value) and false otherwise */ + public boolean isSetRegistry() { + return this.registry != null; + } + + public void setRegistryIsSet(boolean value) { + if (!value) { + this.registry = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case REGISTRY: + if (value == null) { + unsetRegistry(); + } else { + setRegistry((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case REGISTRY: + return getRegistry(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case REGISTRY: + return isSetRegistry(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof angularRegistryPush_args) + return this.equals((angularRegistryPush_args)that); + return false; + } + + public boolean equals(angularRegistryPush_args that) { + if (that == null) + return false; + + boolean this_present_registry = true && this.isSetRegistry(); + boolean that_present_registry = true && that.isSetRegistry(); + if (this_present_registry || that_present_registry) { + if (!(this_present_registry && that_present_registry)) + return false; + if (!this.registry.equals(that.registry)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + List list = new ArrayList(); + + boolean present_registry = true && (isSetRegistry()); + list.add(present_registry); + if (present_registry) + list.add(registry); + + return list.hashCode(); + } + + @Override + public int compareTo(angularRegistryPush_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetRegistry()).compareTo(other.isSetRegistry()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetRegistry()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.registry, other.registry); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("angularRegistryPush_args("); + boolean first = true; + + sb.append("registry:"); + if (this.registry == null) { + sb.append("null"); + } else { + sb.append(this.registry); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class angularRegistryPush_argsStandardSchemeFactory implements SchemeFactory { + public angularRegistryPush_argsStandardScheme getScheme() { + return new angularRegistryPush_argsStandardScheme(); + } + } + + private static class angularRegistryPush_argsStandardScheme extends StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, angularRegistryPush_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // REGISTRY + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.registry = iprot.readString(); + struct.setRegistryIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, angularRegistryPush_args struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.registry != null) { + oprot.writeFieldBegin(REGISTRY_FIELD_DESC); + oprot.writeString(struct.registry); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class angularRegistryPush_argsTupleSchemeFactory implements SchemeFactory { + public angularRegistryPush_argsTupleScheme getScheme() { + return new angularRegistryPush_argsTupleScheme(); + } + } + + private static class angularRegistryPush_argsTupleScheme extends TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, angularRegistryPush_args struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetRegistry()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetRegistry()) { + oprot.writeString(struct.registry); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, angularRegistryPush_args struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.registry = iprot.readString(); + struct.setRegistryIsSet(true); + } + } + } + + } + + public static class angularRegistryPush_result implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("angularRegistryPush_result"); + + + private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new angularRegistryPush_resultStandardSchemeFactory()); + schemes.put(TupleScheme.class, new angularRegistryPush_resultTupleSchemeFactory()); + } + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { +; + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(angularRegistryPush_result.class, metaDataMap); + } + + public angularRegistryPush_result() { + } + + /** + * Performs a deep copy on other. + */ + public angularRegistryPush_result(angularRegistryPush_result other) { + } + + public angularRegistryPush_result deepCopy() { + return new angularRegistryPush_result(this); + } + + @Override + public void clear() { + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof angularRegistryPush_result) + return this.equals((angularRegistryPush_result)that); + return false; + } + + public boolean equals(angularRegistryPush_result that) { + if (that == null) + return false; + + return true; + } + + @Override + public int hashCode() { + List list = new ArrayList(); + + return list.hashCode(); + } + + @Override + public int compareTo(angularRegistryPush_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("angularRegistryPush_result("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class angularRegistryPush_resultStandardSchemeFactory implements SchemeFactory { + public angularRegistryPush_resultStandardScheme getScheme() { + return new angularRegistryPush_resultStandardScheme(); + } + } + + private static class angularRegistryPush_resultStandardScheme extends StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, angularRegistryPush_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, angularRegistryPush_result struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class angularRegistryPush_resultTupleSchemeFactory implements SchemeFactory { + public angularRegistryPush_resultTupleScheme getScheme() { + return new angularRegistryPush_resultTupleScheme(); + } + } + + private static class angularRegistryPush_resultTupleScheme extends TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, angularRegistryPush_result struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, angularRegistryPush_result struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + } + } + + } + } diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 5cd14a2b765..52736e8e483 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -44,7 +44,8 @@ enum RemoteInterpreterEventType { ANGULAR_OBJECT_REMOVE = 4, RUN_INTERPRETER_CONTEXT_RUNNER = 5, OUTPUT_APPEND = 6, - OUTPUT_UPDATE = 7 + OUTPUT_UPDATE = 7, + ANGULAR_REGISTRY_PUSH = 8 } struct RemoteInterpreterEvent { @@ -71,4 +72,5 @@ service RemoteInterpreterService { object); void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object); void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId); + void angularRegistryPush(1: string registry); } \ No newline at end of file diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 034a676c2ed..8ca94b6de74 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -28,6 +28,7 @@ import java.util.Properties; import org.apache.thrift.transport.TTransportException; +import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -37,12 +38,18 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.Scheduler; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; + + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; public class RemoteInterpreterTest { @@ -585,4 +592,28 @@ public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() assertEquals(intpA.getScheduler(), intpB.getScheduler()); } + + @Test + public void should_push_local_angular_repo_to_remote() throws Exception { + //Given + final Client client = Mockito.mock(Client.class); + final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), + MockInterpreterA.class.getName(), "runner", "path", env, 10 * 1000, null); + final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null); + registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId"); + final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId"); + interpreterGroup.setAngularObjectRegistry(registry); + intr.setInterpreterGroup(interpreterGroup); + + final java.lang.reflect.Type registryType = new TypeToken>>() {}.getType(); + final Gson gson = new Gson(); + final String expected = gson.toJson(registry.getRegistry(), registryType); + + //When + intr.pushAngularObjectRegistryToRemote(client); + + //Then + Mockito.verify(client).angularRegistryPush(expected); + } } From c6f2a278e4cb2768845db64b6ca71e6bbe13461b Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Tue, 26 Jan 2016 16:50:10 +0100 Subject: [PATCH 02/10] Make AngularObject constructor public because of serialization issue --- .../zeppelin/display/AngularObject.java | 65 ++++++++++++------- 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java index 4b0c3e93d75..72f57148160 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java @@ -19,6 +19,7 @@ import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutorService; import org.apache.zeppelin.scheduler.ExecutorFactory; @@ -35,25 +36,33 @@ public class AngularObject { private String name; private T object; - + private transient AngularObjectListener listener; - private transient List watchers - = new LinkedList(); - + private transient List watchers = new LinkedList(); + private String noteId; // noteId belonging to. null for global scope private String paragraphId; // paragraphId belongs to. null for notebook scope + /** + * Public constructor, neccessary for the deserialization when using Thrift angularRegistryPush() + * Without public constructor, GSON library will instantiate the AngularObject using + * serialization so the watchers list won't be initialized and will throw + * NullPointerException the first time it is accessed + */ + public AngularObject() { + } + /** * To create new AngularObject, use AngularObjectRegistry.add() * - * @param name name of object - * @param o reference to user provided object to sent to front-end - * @param noteId noteId belongs to. can be null + * @param name name of object + * @param o reference to user provided object to sent to front-end + * @param noteId noteId belongs to. can be null * @param paragraphId paragraphId belongs to. can be null - * @param listener event listener + * @param listener event listener */ protected AngularObject(String name, T o, String noteId, String paragraphId, - AngularObjectListener listener) { + AngularObjectListener listener) { this.name = name; this.noteId = noteId; this.paragraphId = paragraphId; @@ -63,6 +72,7 @@ protected AngularObject(String name, T o, String noteId, String paragraphId, /** * Get name of this object + * * @return name */ public String getName() { @@ -71,6 +81,7 @@ public String getName() { /** * Set noteId + * * @param noteId noteId belongs to. can be null */ public void setNoteId(String noteId) { @@ -79,6 +90,7 @@ public void setNoteId(String noteId) { /** * Get noteId + * * @return noteId */ public String getNoteId() { @@ -87,6 +99,7 @@ public String getNoteId() { /** * get ParagraphId + * * @return paragraphId */ public String getParagraphId() { @@ -95,6 +108,7 @@ public String getParagraphId() { /** * Set paragraphId + * * @param paragraphId paragraphId. can be null */ public void setParagraphId(String paragraphId) { @@ -103,29 +117,31 @@ public void setParagraphId(String paragraphId) { /** * Check if it is global scope object + * * @return true it is global scope */ public boolean isGlobal() { return noteId == null; } + @Override + public int hashCode() { + return Objects.hash(name, noteId, paragraphId); + } + @Override public boolean equals(Object o) { - if (o instanceof AngularObject) { - AngularObject ao = (AngularObject) o; - if (noteId == null && ao.noteId == null || - (noteId != null && ao.noteId != null && noteId.equals(ao.noteId))) { - if (paragraphId == null && ao.paragraphId == null || - (paragraphId != null && ao.paragraphId != null && paragraphId.equals(ao.paragraphId))) { - return name.equals(ao.name); - } - } - } - return false; + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AngularObject that = (AngularObject) o; + return Objects.equals(name, that.name) && + Objects.equals(noteId, that.noteId) && + Objects.equals(paragraphId, that.paragraphId); } /** * Get value + * * @return */ public Object get() { @@ -136,7 +152,7 @@ public Object get() { * fire updated() event for listener * Note that it does not invoke watcher.watch() */ - public void emit(){ + public void emit() { if (listener != null) { listener.updated(this); } @@ -144,6 +160,7 @@ public void emit(){ /** * Set value + * * @param o reference to new user provided object */ public void set(T o) { @@ -152,7 +169,8 @@ public void set(T o) { /** * Set value - * @param o reference to new user provided object + * + * @param o reference to new user provided object * @param emit false on skip firing event for listener. note that it does not skip invoke * watcher.watch() in any case */ @@ -187,6 +205,7 @@ public void run() { /** * Set event listener for this object + * * @param listener */ public void setListener(AngularObjectListener listener) { @@ -195,6 +214,7 @@ public void setListener(AngularObjectListener listener) { /** * Get event listener of this object + * * @return event listener */ public AngularObjectListener getListener() { @@ -215,6 +235,7 @@ public void addWatcher(AngularObjectWatcher watcher) { /** * Remove a watcher from this object + * * @param watcher watcher to remove */ public void removeWatcher(AngularObjectWatcher watcher) { From 1facc110984fa0101ed9f9221a38fe77dd6c2976 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Tue, 26 Jan 2016 20:30:36 +0100 Subject: [PATCH 03/10] Implement runParagraph() and runParagraphs() Angular functions --- .../notebook/paragraph/paragraph.controller.js | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index a985f64b89a..44eb9aac5c6 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -22,6 +22,24 @@ angular.module('zeppelinWebApp') $scope.originalText = ''; $scope.editor = null; var paragraphScope = $rootScope.$new(true, $rootScope); + + paragraphScope.runParagraph = function(paragraphId) { + if (paragraphId) { + var paragraphDiv = angular + .element('#' + paragraphId + '_paragraphColumn_main[ng-controller="ParagraphCtrl"]'); + if (paragraphDiv) { + var paragraph = paragraphDiv.scope().paragraph; + websocketMsgSrv.runParagraph(paragraph.id, paragraph.title, paragraph.text, + paragraph.config, paragraph.settings.params); + } + } + }; + + paragraphScope.runParagraphs = function(paragraphsId) { + var paragraphs = paragraphsId || []; + paragraphs.forEach(paragraphScope.runParagraph); + }; + var angularObjectRegistry = {}; var editorModes = { From a235a851b6e3383ca1fa1601c5df2bb878885286 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Tue, 26 Jan 2016 13:41:12 +0100 Subject: [PATCH 04/10] Implement pushToServer() Angular function --- pom.xml | 6 + .../zeppelin/display/AngularObject.java | 10 + .../display/AngularObjectRegistry.java | 16 + .../remote/RemoteAngularObjectRegistry.java | 4 +- zeppelin-server/pom.xml | 5 + .../org/apache/zeppelin/socket/Message.java | 14 + .../zeppelin/socket/NotebookServer.java | 260 +++++++- .../display/AngularObjectBuilder.java | 10 + .../zeppelin/socket/NotebookServerTest.java | 619 ++++++++++++++++++ .../paragraph/paragraph.controller.js | 27 + .../websocketEvents/websocketMsg.service.js | 14 + 11 files changed, 980 insertions(+), 5 deletions(-) create mode 100644 zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java diff --git a/pom.xml b/pom.xml index 2e75d2ad47a..590791841b6 100755 --- a/pom.xml +++ b/pom.xml @@ -196,6 +196,12 @@ 1.1.1 + + commons-collections + commons-collections + 3.2.1 + + com.google.guava guava diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java index 72f57148160..884edf3dae5 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java @@ -253,4 +253,14 @@ public void clearAllWatchers() { } } + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("AngularObject{"); + sb.append("noteId='").append(noteId).append('\''); + sb.append(", paragraphId='").append(paragraphId).append('\''); + sb.append(", object=").append(object); + sb.append(", name='").append(name).append('\''); + sb.append('}'); + return sb.toString(); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java index cf360af124b..b421bc8607b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java @@ -243,6 +243,22 @@ public List getAllWithGlobal(String noteId) { return all; } + /** + * Return the complete registry to be pushed to remote interpreter process + * @return registry map + */ + public Map> getRegistry() { + return this.registry; + } + + /** + * Set the complete registry send from ZeppelinServer to remote interpreter + * @param registry + */ + public void setRegistry(Map> registry) { + this.registry = registry; + } + public String getInterpreterGroupId() { return interpreterId; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java index 790ed950bb8..24e397036f6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -75,7 +75,7 @@ public AngularObject addAndNotifyRemoteProcess(String name, Object o, String not Gson gson = new Gson(); RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); if (!remoteInterpreterProcess.isRunning()) { - return null; + return super.add(name, o, noteId, paragraphId, true); } Client client = null; @@ -109,7 +109,7 @@ public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, St paragraphId) { RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); if (!remoteInterpreterProcess.isRunning()) { - return null; + return super.remove(name, noteId, paragraphId); } Client client = null; diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index 73e878a58f5..ee03c334d71 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -191,6 +191,11 @@ + + commons-collections + commons-collections + + org.quartz-scheduler quartz diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java index 2a8e06d7687..b18019a6d7d 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java @@ -101,6 +101,7 @@ public static enum OP { ANGULAR_OBJECT_REMOVE, // [s-c] add angular object del ANGULAR_OBJECT_UPDATED, // [c-s] angular object value updated, + ANGULAR_OBJECT_CLIENT_UPDATE, // [c-s] angular object pushed from client LIST_CONFIGURATIONS, // [c-s] ask all key/value pairs of configurations CONFIGURATIONS_INFO // [s-c] all key/value pairs of configurations @@ -124,4 +125,17 @@ public Message put(String k, Object v) { public Object get(String k) { return data.get(k); } + + public T getType(String key) { + return (T) data.get(key); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("Message{"); + sb.append("data=").append(data); + sb.append(", op=").append(op); + sb.append('}'); + return sb.toString(); + } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 9a4a378aa99..5bf50ede9bc 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -16,6 +16,10 @@ */ package org.apache.zeppelin.socket; +import static java.lang.String.format; +import static org.apache.commons.collections.CollectionUtils.isEmpty; +import static org.apache.commons.lang.StringUtils.equalsIgnoreCase; + import java.io.IOException; import java.net.URISyntaxException; import java.net.UnknownHostException; @@ -24,20 +28,22 @@ import javax.servlet.http.HttpServletRequest; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; -import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; -import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.socket.Message.OP; import org.apache.zeppelin.ticket.TicketContainer; @@ -99,6 +105,11 @@ public void onMessage(NotebookSocket conn, String msg) { LOG.debug("RECEIVE << " + messagereceived.op); LOG.debug("RECEIVE PRINCIPAL << " + messagereceived.principal); LOG.debug("RECEIVE TICKET << " + messagereceived.ticket); + + if (LOG.isTraceEnabled()) { + LOG.trace("RECEIVE MSG = " + messagereceived); + } + String ticket = TicketContainer.instance.getTicket(messagereceived.principal); if (ticket != null && !ticket.equals(messagereceived.ticket)) throw new Exception("Invalid ticket " + messagereceived.ticket + " != " + ticket); @@ -168,6 +179,9 @@ public void onMessage(NotebookSocket conn, String msg) { case ANGULAR_OBJECT_UPDATED: angularObjectUpdated(conn, notebook, messagereceived); break; + case ANGULAR_OBJECT_CLIENT_UPDATE: + angularObjectClientUpdate(conn, notebook, messagereceived); + break; case LIST_CONFIGURATIONS: sendAllConfigurations(conn, notebook); break; @@ -192,7 +206,7 @@ protected Message deserializeMessage(String msg) { return gson.fromJson(msg, Message.class); } - private String serializeMessage(Message m) { + protected String serializeMessage(Message m) { return gson.toJson(m); } @@ -644,6 +658,233 @@ private void angularObjectUpdated(NotebookSocket conn, Notebook notebook, } } + /** + * Push the given Angular variable to the target + * interpreter(s) angular registry given a noteId + * and an optional list of paragraph id(s) + * @param conn + * @param notebook + * @param fromMessage + * @throws Exception + */ + protected void angularObjectClientUpdate(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws Exception{ + String noteId = fromMessage.getType("noteId"); + String varName = fromMessage.getType("name"); + Object varValue = fromMessage.get("value"); + List interpreters = fromMessage.getType("interpreters"); + List targetParagraphs = fromMessage.getType("paragraphs"); + targetParagraphs = CollectionUtils.isNotEmpty(targetParagraphs) + ? targetParagraphs : new ArrayList(); + + String scope = fromMessage.getType("scope"); + Note note = notebook.getNote(noteId); + + if (isEmpty(interpreters)) { + throw new Exception("interpreter name not specified for angular value push"); + } + + if (note != null) { + + for (String interpreterName : interpreters) { + + final InterpreterGroup interpreterGroup = findGroupForInterpreter(noteId, + note.getNoteReplLoader().getInterpreterSettings(), interpreterName); + + final AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); + final List matchedParagraphs = findMatchingParagraphs(targetParagraphs, + note, interpreterName); + + if (registry instanceof RemoteAngularObjectRegistry) { + + RemoteAngularObjectRegistry remoteRegistry = (RemoteAngularObjectRegistry) registry; + pushAngularObjectToRemoteRegistry(noteId, varName, varValue, matchedParagraphs, scope, + remoteRegistry, interpreterGroup.getId(), conn); + + } else { + pushAngularObjectToLocalRepo(noteId, varName, varValue, matchedParagraphs, scope, + registry, interpreterGroup.getId(), conn); + } + + } + } + } + + /** + * Remove the given Angular variable to the target + * interpreter(s) angular registry given a noteId + * and an optional list of paragraph id(s) + * @param conn + * @param notebook + * @param fromMessage + * @throws Exception + */ + protected void angularObjectClientDelete(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws Exception{ + String noteId = fromMessage.getType("noteId"); + String varName = fromMessage.getType("name"); + List interpreters = fromMessage.getType("interpreters"); + List targetParagraphs = fromMessage.getType("paragraphs"); + targetParagraphs = CollectionUtils.isNotEmpty(targetParagraphs) + ? targetParagraphs : new ArrayList(); + + Note note = notebook.getNote(noteId); + + if (isEmpty(interpreters)) { + throw new Exception("interpreter name not specified for angular value deletion"); + } + + if (note != null) { + + for (String interpreterName : interpreters) { + + final InterpreterGroup interpreterGroup = findGroupForInterpreter(noteId, + note.getNoteReplLoader().getInterpreterSettings(), interpreterName); + + final AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); + final List matchedParagraphs = findMatchingParagraphs(targetParagraphs, note, + interpreterName); + + if (registry instanceof RemoteAngularObjectRegistry) { + + RemoteAngularObjectRegistry remoteRegistry = (RemoteAngularObjectRegistry) registry; + + for (String paragraphId : matchedParagraphs) { + final AngularObject ao = remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, + paragraphId); + + this.broadcastExcept( + noteId, + new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroup.getId()) + .put("noteId", noteId) + .put("paragraphId", paragraphId), + conn); + } + + if (isEmpty(matchedParagraphs)) { + final AngularObject ao = remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, + null); + + this.broadcastExcept( + noteId, + new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroup.getId()) + .put("noteId", noteId), + conn); + } + } else { + for (String paragraphId : matchedParagraphs) { + final AngularObject removed = registry.remove(varName, noteId, paragraphId); + if (removed != null) { + this.broadcastExcept( + noteId, + new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", removed) + .put("interpreterGroupId", interpreterGroup.getId()) + .put("noteId", noteId) + .put("paragraphId", paragraphId), + conn); + } + } + + // Note scope removal + if (isEmpty(matchedParagraphs)) { + final AngularObject removed = registry.remove(varName, noteId, null); + if (removed != null) { + this.broadcastExcept( + noteId, + new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", removed) + .put("interpreterGroupId", interpreterGroup.getId()) + .put("noteId", noteId), + conn); + } + } + } + } + } + } + + private List findMatchingParagraphs(List targetParagraphs, Note note, String interpreterName) { + final List matchedParagraph = new ArrayList<>(); + for (String paragraph : targetParagraphs) { + final String replName = note.getParagraph(paragraph).getRequiredReplName(); + if (StringUtils.equalsIgnoreCase(replName, interpreterName)) { + matchedParagraph.add(paragraph); + } + } + return matchedParagraph; + } + + private void pushAngularObjectToRemoteRegistry(String noteId, String varName, Object varValue, + List matchedParagraphs, String scope, RemoteAngularObjectRegistry remoteRegistry, + String interpreterGroupId, NotebookSocket conn) { + + for (String paragraphId : matchedParagraphs) { + final AngularObject ao = remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, + noteId, paragraphId); + + this.broadcastExcept( + noteId, + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", noteId) + .put("paragraphId", paragraphId), + conn); + } + + // If scope forced to note or empty target paragraphIs, push also variable to note level + if (equalsIgnoreCase(scope, "note") || isEmpty(matchedParagraphs)) { + final AngularObject ao = remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, + noteId, null); + + this.broadcastExcept( + noteId, + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", noteId), + conn); + } + } + + private void pushAngularObjectToLocalRepo(String noteId, String varName, Object varValue, + List targetParagraphs, String scope, AngularObjectRegistry registry, + String interpreterGroupId, NotebookSocket conn) { + + for (String paragraphId : targetParagraphs) { + AngularObject angularObject = registry.get(varName, noteId, paragraphId); + if (angularObject == null) { + angularObject = registry.add(varName, varValue, noteId, paragraphId); + } else { + angularObject.set(varValue, true); + } + + this.broadcastExcept( + noteId, + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", angularObject) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", noteId) + .put("paragraphId", paragraphId), + conn); + } + + // If scope forced to note or empty target paragraphIs, push also variable to note level + if (equalsIgnoreCase(scope, "note") || isEmpty(targetParagraphs)) { + AngularObject angularObject = registry.get(varName, noteId, null); + if (angularObject == null) { + angularObject = registry.add(varName, varValue, noteId, null); + } else { + angularObject.set(varValue, true); + } + + this.broadcastExcept( + noteId, + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", angularObject) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", noteId), + conn); + } + } + private void moveParagraph(NotebookSocket conn, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); @@ -929,5 +1170,18 @@ public void onRemove(String interpreterGroupId, String name, String noteId, Stri } } } + + private InterpreterGroup findGroupForInterpreter(String noteId, + List interpreterSettings, String interpreterName) throws Exception { + for (InterpreterSetting setting : interpreterSettings) { + if (setting.getName().toLowerCase().equals(interpreterName.toLowerCase())) { + LOG.debug("Found interpreter setting {} for interpreter name {}", + setting, interpreterName); + return setting.getInterpreterGroup(); + } + } + throw new Exception(format("Cannot find interpreter %s in the note with id %s", + interpreterName, noteId)); + } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java b/zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java new file mode 100644 index 00000000000..f4d1120bc46 --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java @@ -0,0 +1,10 @@ +package org.apache.zeppelin.display; + +public class AngularObjectBuilder { + + public static AngularObject build(String varName, T value, String noteId, + String paragraphId) { + + return new AngularObject<>(varName, value, noteId, paragraphId, null); + } +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index 7d8f3cf710f..c30a1c7e06f 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -20,8 +20,13 @@ package org.apache.zeppelin.socket; import com.google.gson.Gson; + +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectBuilder; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.rest.AbstractTestRestApi; @@ -37,6 +42,7 @@ import java.net.UnknownHostException; import java.util.List; +import static java.util.Arrays.asList; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -152,6 +158,619 @@ public void testImportNotebook() throws IOException { assertEquals("Test paragraphs import", notebook.getNote(note.getId()).getParagraphs().get(0).getText()); notebook.removeNote(note.getId()); } + + @Test + public void should_push_angular_object_to_remote_for_paragraphs() throws Exception { + //Given + final String varName = "name"; + final String value = "DuyHai DOAN"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_UPDATE) + .put("noteId", "noteId") + .put("name", varName) + .put("value", value) + .put("interpreters", asList("md", "spark")) + .put("paragraphs", asList("paragraph1", "paragraph2")) + .put("scope", "paragraph"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + + final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class); + final RemoteAngularObjectRegistry sparkRegistry = mock(RemoteAngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + final InterpreterGroup sparkGroup = new InterpreterGroup("sparkGroup"); + sparkGroup.setAngularObjectRegistry(sparkRegistry); + final InterpreterSetting mdSetting = new InterpreterSetting("id1", "md", "md", null); + final InterpreterSetting sparkSetting = new InterpreterSetting("id2", "spark", "spark", null); + mdSetting.setInterpreterGroup(mdGroup); + sparkSetting.setInterpreterGroup(sparkGroup); + + when(note.getNoteReplLoader().getInterpreterSettings()) + .thenReturn(asList(mdSetting, sparkSetting)); + + when(note.getParagraph("paragraph1").getRequiredReplName()).thenReturn("md"); + when(note.getParagraph("paragraph2").getRequiredReplName()).thenReturn("spark"); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph1"); + final AngularObject ao2 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph2"); + + when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", "paragraph1")) + .thenReturn(ao1); + + when(sparkRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", "paragraph2")) + .thenReturn(ao2); + + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph1")); + + final String sparkMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao2) + .put("interpreterGroupId", "sparkGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph2")); + + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientUpdate(conn, notebook, messageReceived); + + // Then + verify(mdRegistry, never()).addAndNotifyRemoteProcess(varName, value, "noteId", null); + verify(sparkRegistry, never()).addAndNotifyRemoteProcess(varName, value, "noteId", null); + + verify(otherConn).send(mdMsg1); + verify(otherConn).send(sparkMsg2); + } + + @Test + public void should_push_angular_object_to_remote_for_paragraph_and_note() throws Exception { + //Given + final String varName = "name"; + final String value = "DuyHai DOAN"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_UPDATE) + .put("noteId", "noteId") + .put("name", varName) + .put("value", value) + .put("interpreters", asList("md")) + .put("paragraphs", asList("paragraph1", "paragraph2")) + .put("scope", "note"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + + final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + final InterpreterSetting mdSetting = new InterpreterSetting("id1", "md", "md", null); + mdSetting.setInterpreterGroup(mdGroup); + + when(note.getNoteReplLoader().getInterpreterSettings()) + .thenReturn(asList(mdSetting)); + + when(note.getParagraph("paragraph1").getRequiredReplName()).thenReturn("md"); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph1"); + final AngularObject ao2 = AngularObjectBuilder.build(varName, value, "noteId", null); + + when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", "paragraph1")) + .thenReturn(ao1); + when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", null)) + .thenReturn(ao2); + + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph1")); + + final String mdMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao2) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId")); + + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientUpdate(conn, notebook, messageReceived); + + // Then + verify(otherConn).send(mdMsg1); + verify(otherConn).send(mdMsg2); + } + + @Test + public void should_push_angular_object_to_remote_for_note_only() throws Exception { + //Given + final String varName = "name"; + final String value = "DuyHai DOAN"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_UPDATE) + .put("noteId", "noteId") + .put("name", varName) + .put("value", value) + .put("interpreters", asList("md")) + .put("scope", "paragraph"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + + final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + final InterpreterSetting mdSetting = new InterpreterSetting("id1", "md", "md", null); + mdSetting.setInterpreterGroup(mdGroup); + + when(note.getNoteReplLoader().getInterpreterSettings()) + .thenReturn(asList(mdSetting)); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", null); + + when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", null)) + .thenReturn(ao1); + + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId")); + + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientUpdate(conn, notebook, messageReceived); + + // Then + verify(otherConn).send(mdMsg1); + } + + @Test + public void should_push_angular_object_to_local_for_paragraphs() throws Exception { + //Given + final String varName = "name"; + final String value = "DuyHai DOAN"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_UPDATE) + .put("noteId", "noteId") + .put("name", varName) + .put("value", value) + .put("interpreters", asList("md", "spark")) + .put("paragraphs", asList("paragraph1", "paragraph2")) + .put("scope", "paragraph"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + + final AngularObjectRegistry mdRegistry = mock(AngularObjectRegistry.class); + final AngularObjectRegistry sparkRegistry = mock(AngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + final InterpreterGroup sparkGroup = new InterpreterGroup("sparkGroup"); + sparkGroup.setAngularObjectRegistry(sparkRegistry); + final InterpreterSetting mdSetting = new InterpreterSetting("id1", "md", "md", null); + final InterpreterSetting sparkSetting = new InterpreterSetting("id2", "spark", "spark", null); + mdSetting.setInterpreterGroup(mdGroup); + sparkSetting.setInterpreterGroup(sparkGroup); + + when(note.getNoteReplLoader().getInterpreterSettings()) + .thenReturn(asList(mdSetting, sparkSetting)); + + when(note.getParagraph("paragraph1").getRequiredReplName()).thenReturn("md"); + when(note.getParagraph("paragraph2").getRequiredReplName()).thenReturn("spark"); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph1"); + final AngularObject ao2 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph2"); + + when(mdRegistry.add(varName, value, "noteId", "paragraph1")).thenReturn(ao1); + when(sparkRegistry.add(varName, value, "noteId", "paragraph2")).thenReturn(ao2); + + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph1")); + + final String sparkMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao2) + .put("interpreterGroupId", "sparkGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph2")); + + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientUpdate(conn, notebook, messageReceived); + + // Then + verify(otherConn).send(mdMsg1); + verify(otherConn).send(sparkMsg2); + } + + @Test + public void should_push_angular_object_to_local_for_paragraphs_and_note() throws Exception { + //Given + final String varName = "name"; + final String value = "DuyHai DOAN"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_UPDATE) + .put("noteId", "noteId") + .put("name", varName) + .put("value", value) + .put("interpreters", asList("md")) + .put("paragraphs", asList("paragraph1", "paragraph2")) + .put("scope", "note"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + + final AngularObjectRegistry mdRegistry = mock(AngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + final InterpreterSetting mdSetting = new InterpreterSetting("id1", "md", "md", null); + mdSetting.setInterpreterGroup(mdGroup); + + when(note.getNoteReplLoader().getInterpreterSettings()) + .thenReturn(asList(mdSetting)); + + when(note.getParagraph("paragraph1").getRequiredReplName()).thenReturn("md"); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph1"); + final AngularObject ao2 = AngularObjectBuilder.build(varName, value, "noteId", null); + + when(mdRegistry.add(varName, value, "noteId", "paragraph1")).thenReturn(ao1); + when(mdRegistry.add(varName, value, "noteId", null)).thenReturn(ao2); + + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph1")); + + final String mdMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao2) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId")); + + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientUpdate(conn, notebook, messageReceived); + + // Then + verify(otherConn).send(mdMsg1); + verify(otherConn).send(mdMsg2); + } + + @Test + public void should_push_angular_object_to_local_for_note_only() throws Exception { + //Given + final String varName = "name"; + final String value = "DuyHai DOAN"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_UPDATE) + .put("noteId", "noteId") + .put("name", varName) + .put("value", value) + .put("interpreters", asList("md")) + .put("scope", "paragraph"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + + final AngularObjectRegistry mdRegistry = mock(AngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + final InterpreterSetting mdSetting = new InterpreterSetting("id1", "md", "md", null); + mdSetting.setInterpreterGroup(mdGroup); + + when(note.getNoteReplLoader().getInterpreterSettings()) + .thenReturn(asList(mdSetting)); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", null); + + when(mdRegistry.add(varName, value, "noteId", null)).thenReturn(ao1); + + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId")); + + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientUpdate(conn, notebook, messageReceived); + + // Then + verify(otherConn).send(mdMsg1); + } + + @Test + public void should_delete_angular_object_from_remote_for_paragraphs() throws Exception { + //Given + final String varName = "name"; + final String value = "val"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_REMOVE) + .put("noteId", "noteId") + .put("name", varName) + .put("interpreters", asList("md", "spark")) + .put("paragraphs", asList("paragraph1", "paragraph2")) + .put("scope", "paragraph"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + + final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class); + final RemoteAngularObjectRegistry sparkRegistry = mock(RemoteAngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + final InterpreterGroup sparkGroup = new InterpreterGroup("sparkGroup"); + sparkGroup.setAngularObjectRegistry(sparkRegistry); + final InterpreterSetting mdSetting = new InterpreterSetting("id1", "md", "md", null); + final InterpreterSetting sparkSetting = new InterpreterSetting("id2", "spark", "spark", null); + mdSetting.setInterpreterGroup(mdGroup); + sparkSetting.setInterpreterGroup(sparkGroup); + + when(note.getNoteReplLoader().getInterpreterSettings()) + .thenReturn(asList(mdSetting, sparkSetting)); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph1"); + final AngularObject ao2 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph2"); + + when(mdRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraph1")) + .thenReturn(ao1); + when(mdRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraph2")) + .thenReturn(ao2); + + when(sparkRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraph1")) + .thenReturn(ao1); + when(sparkRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraph2")) + .thenReturn(ao2); + + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph1")); + + final String mdMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao2) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph2")); + + final String sparkMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao1) + .put("interpreterGroupId", "sparkGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph1")); + + final String sparkMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao2) + .put("interpreterGroupId", "sparkGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph2")); + + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientDelete(conn, notebook, messageReceived); + + // Then + verify(mdRegistry, never()).removeAndNotifyRemoteProcess(varName, "noteId", null); + verify(sparkRegistry, never()).removeAndNotifyRemoteProcess(varName, "noteId", null); + + verify(otherConn).send(mdMsg1); + verify(otherConn).send(mdMsg2); + verify(otherConn).send(sparkMsg1); + verify(otherConn).send(sparkMsg2); + } + + @Test + public void should_delete_angular_object_from_remote_for_notes() throws Exception { + //Given + final String varName = "name"; + final String value = "val"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_REMOVE) + .put("noteId", "noteId") + .put("name", varName) + .put("interpreters", asList("md")) + .put("scope", "paragraph"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + + final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + final InterpreterSetting mdSetting = new InterpreterSetting("id1", "md", "md", null); + mdSetting.setInterpreterGroup(mdGroup); + + when(note.getNoteReplLoader().getInterpreterSettings()) + .thenReturn(asList(mdSetting)); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", null); + + when(mdRegistry.removeAndNotifyRemoteProcess(varName, "noteId", null)) + .thenReturn(ao1); + + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId")); + + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientDelete(conn, notebook, messageReceived); + + // Then + verify(mdRegistry).removeAndNotifyRemoteProcess(varName, "noteId", null); + + verify(otherConn).send(mdMsg1); + } + + @Test + public void should_delete_angular_object_from_local_for_paragraphs() throws Exception { + //Given + final String varName = "name"; + final String value = "val"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_REMOVE) + .put("noteId", "noteId") + .put("name", varName) + .put("interpreters", asList("md", "spark")) + .put("paragraphs", asList("paragraph1", "paragraph2")) + .put("scope", "paragraph"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + + final AngularObjectRegistry mdRegistry = mock(AngularObjectRegistry.class); + final AngularObjectRegistry sparkRegistry = mock(AngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + final InterpreterGroup sparkGroup = new InterpreterGroup("sparkGroup"); + sparkGroup.setAngularObjectRegistry(sparkRegistry); + final InterpreterSetting mdSetting = new InterpreterSetting("id1", "md", "md", null); + final InterpreterSetting sparkSetting = new InterpreterSetting("id2", "spark", "spark", null); + mdSetting.setInterpreterGroup(mdGroup); + sparkSetting.setInterpreterGroup(sparkGroup); + + when(note.getNoteReplLoader().getInterpreterSettings()) + .thenReturn(asList(mdSetting, sparkSetting)); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph1"); + final AngularObject ao2 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph2"); + + when(mdRegistry.remove(varName, "noteId", "paragraph1")).thenReturn(ao1); + when(mdRegistry.remove(varName, "noteId", "paragraph2")).thenReturn(ao2); + + when(sparkRegistry.remove(varName, "noteId", "paragraph1")).thenReturn(ao1); + when(sparkRegistry.remove(varName, "noteId", "paragraph2")).thenReturn(ao2); + + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph1")); + + final String mdMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao2) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph2")); + + final String sparkMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao1) + .put("interpreterGroupId", "sparkGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph1")); + + final String sparkMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao2) + .put("interpreterGroupId", "sparkGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraph2")); + + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientDelete(conn, notebook, messageReceived); + + // Then + verify(otherConn).send(mdMsg1); + verify(otherConn).send(mdMsg2); + verify(otherConn).send(sparkMsg1); + verify(otherConn).send(sparkMsg2); + } + + @Test + public void should_delete_angular_object_from_local_for_notes() throws Exception { + //Given + final String varName = "name"; + final String value = "val"; + final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_REMOVE) + .put("noteId", "noteId") + .put("name", varName) + .put("interpreters", asList("md")) + .put("scope", "paragraph"); + + final NotebookServer server = new NotebookServer(); + final Notebook notebook = mock(Notebook.class); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + + final AngularObjectRegistry mdRegistry = mock(AngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); + final InterpreterSetting mdSetting = new InterpreterSetting("id1", "md", "md", null); + mdSetting.setInterpreterGroup(mdGroup); + + when(note.getNoteReplLoader().getInterpreterSettings()) + .thenReturn(asList(mdSetting)); + + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", null); + + when(mdRegistry.remove(varName, "noteId", null)).thenReturn(ao1); + + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); + + final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId")); + + server.noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + server.angularObjectClientDelete(conn, notebook, messageReceived); + + // Then + verify(otherConn).send(mdMsg1); + } private NotebookSocket createWebSocket() { NotebookSocket sock = mock(NotebookSocket.class); diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 44eb9aac5c6..da14882476d 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -40,6 +40,33 @@ angular.module('zeppelinWebApp') paragraphs.forEach(paragraphScope.runParagraph); }; + paragraphScope.pushToServer = function(varName, value, pushParams) { + var defaultParams = {interpreters: [], paragraphs: [], scope: 'paragraph', + runParagraphs: true}; + var params = jQuery.extend(defaultParams, angular.copy(pushParams)); + + if (params.interpreter) { + params.interpreters.push(params.interpreter); + delete params.interpreter; + } + + if (params.paragraph) { + params.paragraphs.push(params.paragraph); + delete params.paragraph; + } + + // Only push to server if there is at least 1 interpreter + if (params.interpreters.length > 0) { + websocketMsgSrv.clientUpdateAngularObject($routeParams.noteId, varName, value, params); + + // Only run target paragraphs if there are at least 1 interpreter updated + // and runParagraphs set to true + if (params.paragraphs.length > 0 && params.runParagraphs) { + params.paragraphs.forEach(paragraphScope.runParagraph); + } + } + }; + var angularObjectRegistry = {}; var editorModes = { diff --git a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js index df440100942..ebb49476a52 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js +++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js @@ -70,6 +70,20 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, }); }, + clientUpdateAngularObject: function(noteId, name, value, params) { + websocketEvents.sendNewEvent({ + op: 'ANGULAR_OBJECT_CLIENT_UPDATE', + data: { + noteId: noteId, + name: name, + value: value, + interpreters: params.interpreters, + paragraphs: params.paragraphs, + scope: params.scope + } + }); + }, + cancelParagraphRun: function(paragraphId) { websocketEvents.sendNewEvent({op: 'CANCEL_PARAGRAPH', data: {id: paragraphId}}); }, From 6c516d11e57b647a239383e9014b53aff9259452 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Tue, 26 Jan 2016 16:51:11 +0100 Subject: [PATCH 05/10] ZeppelinContext angular() method should look for variable using the paragraph scope, then note scope and then global scope, in this order --- .../java/org/apache/zeppelin/spark/ZeppelinContext.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index 926f3e7546b..19fc1a1bcad 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -475,7 +475,11 @@ private AngularObject getAngularObject(String name, InterpreterContext interpret AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry(); String noteId = interpreterContext.getNoteId(); // try get local object - AngularObject ao = registry.get(name, interpreterContext.getNoteId(), null); + AngularObject paragraphAo = registry.get(name, noteId, interpreterContext.getParagraphId()); + AngularObject noteAo = registry.get(name, noteId, null); + + AngularObject ao = paragraphAo != null ? paragraphAo : noteAo; + if (ao == null) { // then global object ao = registry.get(name, null, null); From bac769571adbfde246f4150d34fbdb1d11f96501 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Tue, 26 Jan 2016 20:29:20 +0100 Subject: [PATCH 06/10] Implement removeFromServer() Angular function --- .../org/apache/zeppelin/socket/Message.java | 1 + .../zeppelin/socket/NotebookServer.java | 7 +++- .../zeppelin/socket/NotebookServerTest.java | 42 +++---------------- .../paragraph/paragraph.controller.js | 20 +++++++++ .../websocketEvents/websocketMsg.service.js | 12 ++++++ 5 files changed, 45 insertions(+), 37 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java index b18019a6d7d..e5c3f195463 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java @@ -102,6 +102,7 @@ public static enum OP { ANGULAR_OBJECT_UPDATED, // [c-s] angular object value updated, ANGULAR_OBJECT_CLIENT_UPDATE, // [c-s] angular object pushed from client + ANGULAR_OBJECT_CLIENT_REMOVE, // [c-s] angular object removed from client LIST_CONFIGURATIONS, // [c-s] ask all key/value pairs of configurations CONFIGURATIONS_INFO // [s-c] all key/value pairs of configurations diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 5bf50ede9bc..70f19d6f7b9 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -179,6 +179,9 @@ public void onMessage(NotebookSocket conn, String msg) { case ANGULAR_OBJECT_UPDATED: angularObjectUpdated(conn, notebook, messagereceived); break; + case ANGULAR_OBJECT_CLIENT_REMOVE: + angularObjectClientDelete(conn, notebook, messagereceived); + break; case ANGULAR_OBJECT_CLIENT_UPDATE: angularObjectClientUpdate(conn, notebook, messagereceived); break; @@ -804,7 +807,9 @@ protected void angularObjectClientDelete(NotebookSocket conn, Notebook notebook, } } - private List findMatchingParagraphs(List targetParagraphs, Note note, String interpreterName) { + private List findMatchingParagraphs(List targetParagraphs, Note note, + String interpreterName) { + final List matchedParagraph = new ArrayList<>(); for (String paragraph : targetParagraphs) { final String replName = note.getParagraph(paragraph).getRequiredReplName(); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index c30a1c7e06f..d2730f7ca3a 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -543,16 +543,14 @@ public void should_delete_angular_object_from_remote_for_paragraphs() throws Exc when(note.getNoteReplLoader().getInterpreterSettings()) .thenReturn(asList(mdSetting, sparkSetting)); + when(note.getParagraph("paragraph1").getRequiredReplName()).thenReturn("md"); + when(note.getParagraph("paragraph2").getRequiredReplName()).thenReturn("spark"); + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph1"); final AngularObject ao2 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph2"); when(mdRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraph1")) .thenReturn(ao1); - when(mdRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraph2")) - .thenReturn(ao2); - - when(sparkRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraph1")) - .thenReturn(ao1); when(sparkRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraph2")) .thenReturn(ao2); @@ -565,18 +563,6 @@ public void should_delete_angular_object_from_remote_for_paragraphs() throws Exc .put("noteId", "noteId") .put("paragraphId", "paragraph1")); - final String mdMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) - .put("angularObject", ao2) - .put("interpreterGroupId", "mdGroup") - .put("noteId", "noteId") - .put("paragraphId", "paragraph2")); - - final String sparkMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) - .put("angularObject", ao1) - .put("interpreterGroupId", "sparkGroup") - .put("noteId", "noteId") - .put("paragraphId", "paragraph1")); - final String sparkMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) .put("angularObject", ao2) .put("interpreterGroupId", "sparkGroup") @@ -593,8 +579,6 @@ public void should_delete_angular_object_from_remote_for_paragraphs() throws Exc verify(sparkRegistry, never()).removeAndNotifyRemoteProcess(varName, "noteId", null); verify(otherConn).send(mdMsg1); - verify(otherConn).send(mdMsg2); - verify(otherConn).send(sparkMsg1); verify(otherConn).send(sparkMsg2); } @@ -678,13 +662,13 @@ public void should_delete_angular_object_from_local_for_paragraphs() throws Exce when(note.getNoteReplLoader().getInterpreterSettings()) .thenReturn(asList(mdSetting, sparkSetting)); + when(note.getParagraph("paragraph1").getRequiredReplName()).thenReturn("md"); + when(note.getParagraph("paragraph2").getRequiredReplName()).thenReturn("spark"); + final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph1"); final AngularObject ao2 = AngularObjectBuilder.build(varName, value, "noteId", "paragraph2"); when(mdRegistry.remove(varName, "noteId", "paragraph1")).thenReturn(ao1); - when(mdRegistry.remove(varName, "noteId", "paragraph2")).thenReturn(ao2); - - when(sparkRegistry.remove(varName, "noteId", "paragraph1")).thenReturn(ao1); when(sparkRegistry.remove(varName, "noteId", "paragraph2")).thenReturn(ao2); NotebookSocket conn = mock(NotebookSocket.class); @@ -696,18 +680,6 @@ public void should_delete_angular_object_from_local_for_paragraphs() throws Exce .put("noteId", "noteId") .put("paragraphId", "paragraph1")); - final String mdMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) - .put("angularObject", ao2) - .put("interpreterGroupId", "mdGroup") - .put("noteId", "noteId") - .put("paragraphId", "paragraph2")); - - final String sparkMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) - .put("angularObject", ao1) - .put("interpreterGroupId", "sparkGroup") - .put("noteId", "noteId") - .put("paragraphId", "paragraph1")); - final String sparkMsg2 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) .put("angularObject", ao2) .put("interpreterGroupId", "sparkGroup") @@ -721,8 +693,6 @@ public void should_delete_angular_object_from_local_for_paragraphs() throws Exce // Then verify(otherConn).send(mdMsg1); - verify(otherConn).send(mdMsg2); - verify(otherConn).send(sparkMsg1); verify(otherConn).send(sparkMsg2); } diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index da14882476d..8b1cf6c6687 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -67,6 +67,26 @@ angular.module('zeppelinWebApp') } }; + paragraphScope.removeFromServer = function(varName, removeParams) { + var defaultParams = {interpreters: [], paragraphs: [], scope: 'paragraph'}; + var params = jQuery.extend(defaultParams, angular.copy(removeParams)); + + if (params.interpreter) { + params.interpreters.push(params.interpreter); + delete params.interpreter; + } + + if (params.paragraph) { + params.paragraphs.push(params.paragraph); + delete params.paragraph; + } + + // Only push to server if there is at least 1 interpreter + if (params.interpreters.length > 0) { + websocketMsgSrv.clientRemoveAngularObject($routeParams.noteId, varName, params); + } + }; + var angularObjectRegistry = {}; var editorModes = { diff --git a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js index ebb49476a52..b88b510f18e 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js +++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js @@ -84,6 +84,18 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, }); }, + clientRemoveAngularObject: function(noteId, name, params) { + websocketEvents.sendNewEvent({ + op: 'ANGULAR_OBJECT_CLIENT_REMOVE', + data: { + noteId: noteId, + name: name, + interpreters: params.interpreters, + paragraphs: params.paragraphs + } + }); + }, + cancelParagraphRun: function(paragraphId) { websocketEvents.sendNewEvent({op: 'CANCEL_PARAGRAPH', data: {id: paragraphId}}); }, From 75f9cddfe9949ce90124753879855321f0655697 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Sun, 24 Jan 2016 21:31:53 +0100 Subject: [PATCH 07/10] Add Hide/Show paragraph id feature --- .../notebook/paragraph/paragraph-control.html | 6 ++++++ .../notebook/paragraph/paragraph.controller.js | 18 +++++++++++++++++- .../src/app/notebook/paragraph/paragraph.html | 14 +++++++++++--- .../src/assets/styles/looknfeel/default.css | 7 +++++++ .../src/assets/styles/looknfeel/report.css | 1 + .../src/assets/styles/looknfeel/simple.css | 4 ++++ 6 files changed, 46 insertions(+), 4 deletions(-) diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html b/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html index 2ab26b74d59..376d0081fef 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html @@ -71,6 +71,12 @@ Show line numbers +
  • + Hide paragraph id + Show paragraph id +
  • {{paragraph.config.enabled ? "Disable" : "Enable"}} run diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 8b1cf6c6687..f1ac27aa284 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -620,7 +620,23 @@ angular.module('zeppelinWebApp') commitParagraph($scope.paragraph.title, $scope.paragraph.text, newConfig, newParams); }; - $scope.columnWidthClass = function(n) { + $scope.showParagraphId = function() { + var newParams = angular.copy($scope.paragraph.settings.params); + var newConfig = angular.copy($scope.paragraph.config); + newConfig.paragraphId = true; + + commitParagraph($scope.paragraph.title, $scope.paragraph.text, newConfig, newParams); + }; + + $scope.hideParagraphId = function() { + var newParams = angular.copy($scope.paragraph.settings.params); + var newConfig = angular.copy($scope.paragraph.config); + newConfig.paragraphId = false; + + commitParagraph($scope.paragraph.title, $scope.paragraph.text, newConfig, newParams); + }; + + $scope.columnWidthClass = function(n) { if ($scope.asIframe) { return 'col-md-12'; } else { diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.html b/zeppelin-web/src/app/notebook/paragraph/paragraph.html index 043b6e81739..2ca86a742dc 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.html +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.html @@ -15,6 +15,12 @@
    +
    +
    + Paragraph ID: {{paragraph.id}} +
    +
    +
    @@ -64,9 +70,11 @@
    -
    +
    +
    +
    diff --git a/zeppelin-web/src/assets/styles/looknfeel/default.css b/zeppelin-web/src/assets/styles/looknfeel/default.css index 78a3bb339a8..bc6da00a429 100644 --- a/zeppelin-web/src/assets/styles/looknfeel/default.css +++ b/zeppelin-web/src/assets/styles/looknfeel/default.css @@ -28,6 +28,13 @@ body { display: block; } +.paragraphId { + display: block; + font-size: 10px; + text-align: left; + margin-left: 5px; +} + .paragraph-col .focused { box-shadow: 0px 2px 7px rgba(0, 0, 0, 0.3); border-color: white; diff --git a/zeppelin-web/src/assets/styles/looknfeel/report.css b/zeppelin-web/src/assets/styles/looknfeel/report.css index 0a15e8a59a6..580e42e89d1 100644 --- a/zeppelin-web/src/assets/styles/looknfeel/report.css +++ b/zeppelin-web/src/assets/styles/looknfeel/report.css @@ -65,6 +65,7 @@ body { } .executionTime, +.paragraphId, .nv-controlsWrap, .lastEmptyParagraph { display: none; diff --git a/zeppelin-web/src/assets/styles/looknfeel/simple.css b/zeppelin-web/src/assets/styles/looknfeel/simple.css index 7817da28ec8..4eba33570ef 100644 --- a/zeppelin-web/src/assets/styles/looknfeel/simple.css +++ b/zeppelin-web/src/assets/styles/looknfeel/simple.css @@ -67,6 +67,10 @@ body { margin-right: 5px; } +.paragraphId { + display: none; +} + .paragraph:hover .paragraphFooter { visibility: visible; } From e151b8271752afdbe469b11878c58aef15a81d6e Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Mon, 25 Jan 2016 21:13:53 +0100 Subject: [PATCH 08/10] Replace dynamic form with angular object from registry if exists --- zeppelin-zengine/pom.xml | 7 +++ .../apache/zeppelin/notebook/Paragraph.java | 29 ++++++++++ .../display/AngularObjectBuilder.java | 10 ++++ .../zeppelin/notebook/ParagraphTest.java | 54 +++++++++++++++++++ 4 files changed, 100 insertions(+) create mode 100644 zeppelin-zengine/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index 0ef60bf3a69..d13750c0764 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -194,6 +194,13 @@ test + + org.assertj + assertj-core + 2.0.0 + test + + com.google.truth truth diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 65210f52bdb..881765793c2 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.notebook; +import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.display.Input; @@ -202,6 +203,12 @@ protected Object jobRun() throws Throwable { String scriptBody = getScriptBody(); Map inputs = Input.extractSimpleQueryParam(scriptBody); // inputs will be built // from script body + + final AngularObjectRegistry angularRegistry = repl.getInterpreterGroup() + .getAngularObjectRegistry(); + + scriptBody = extractVariablesFromAngularRegistry(scriptBody, inputs, angularRegistry); + settings.setForms(inputs); script = Input.getSimpleQuery(settings.getParams(), scriptBody); } @@ -346,4 +353,26 @@ public Object clone() throws CloneNotSupportedException { Paragraph paraClone = (Paragraph) this.clone(); return paraClone; } + + + String extractVariablesFromAngularRegistry(String scriptBody, Map inputs, + AngularObjectRegistry angularRegistry) { + + final String noteId = this.getNote().getId(); + final String paragraphId = this.getId(); + + final Set keys = new HashSet<>(inputs.keySet()); + + for (String varName : keys) { + final AngularObject paragraphScoped = angularRegistry.get(varName, noteId, paragraphId); + final AngularObject noteScoped = angularRegistry.get(varName, noteId, null); + final AngularObject angularObject = paragraphScoped != null ? paragraphScoped : noteScoped; + if (angularObject != null) { + inputs.remove(varName); + final String pattern = "[$][{]\\s*" + varName + "\\s*(?:=[^}]+)?[}]"; + scriptBody = scriptBody.replaceAll(pattern, angularObject.get().toString()); + } + } + return scriptBody; + } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java new file mode 100644 index 00000000000..f4d1120bc46 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java @@ -0,0 +1,10 @@ +package org.apache.zeppelin.display; + +public class AngularObjectBuilder { + + public static AngularObject build(String varName, T value, String noteId, + String paragraphId) { + + return new AngularObject<>(varName, value, noteId, paragraphId, null); + } +} diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java index 87805cef133..b918f72e20a 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java @@ -17,9 +17,24 @@ package org.apache.zeppelin.notebook; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectBuilder; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.Input; +import org.assertj.core.api.Assertions; import org.junit.Test; +import org.mockito.Mockito; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; public class ParagraphTest { @Test @@ -30,9 +45,48 @@ public void scriptBodyWithReplName() { text = "%table 1234567"; assertEquals("1234567", Paragraph.getScriptBody(text)); } + @Test public void scriptBodyWithoutReplName() { String text = "12345678"; assertEquals(text, Paragraph.getScriptBody(text)); } + + @Test + public void should_extract_variable_from_angular_object_registry() throws Exception { + //Given + final String noteId = "noteId"; + + final AngularObjectRegistry registry = mock(AngularObjectRegistry.class); + final Note note = mock(Note.class); + final Map inputs = new HashMap<>(); + inputs.put("name", null); + inputs.put("age", null); + inputs.put("job", null); + + final String scriptBody = "My name is ${name} and I am ${age=20} years old. " + + "My occupation is ${ job = engineer | developer | artists}"; + + final Paragraph paragraph = new Paragraph(note, null, null); + final String paragraphId = paragraph.getId(); + + final AngularObject nameAO = AngularObjectBuilder.build("name", "DuyHai DOAN", noteId, + paragraphId); + + final AngularObject ageAO = AngularObjectBuilder.build("age", 34, noteId, null); + + when(note.getId()).thenReturn(noteId); + when(registry.get("name", noteId, paragraphId)).thenReturn(nameAO); + when(registry.get("age", noteId, null)).thenReturn(ageAO); + + final String expected = "My name is DuyHai DOAN and I am 34 years old. " + + "My occupation is ${ job = engineer | developer | artists}"; + //When + final String actual = paragraph.extractVariablesFromAngularRegistry(scriptBody, inputs, registry); + + //Then + verify(registry).get("name", noteId, paragraphId); + verify(registry).get("age", noteId, null); + assertThat(actual).isEqualTo(expected); + } } From 0114836d22d6fedf7a61d72b167cb7bfb547c3f8 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Wed, 27 Jan 2016 13:38:05 +0100 Subject: [PATCH 09/10] Fix NPE in RemoteInterpreter.pushAngularRegistry() --- .../interpreter/remote/RemoteInterpreter.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index b3ebf06db09..24fff1470c2 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -24,6 +24,7 @@ import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -371,13 +372,15 @@ private InterpreterResult convert(RemoteInterpreterResult result) { * @throws TException */ void pushAngularObjectRegistryToRemote(Client client) throws TException { - final Map> registry = this.getInterpreterGroup() - .getAngularObjectRegistry().getRegistry(); + final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup() + .getAngularObjectRegistry(); + + if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) { + final Map> registry = angularObjectRegistry + .getRegistry(); - if (registry != null) { logger.info("Push local angular object registry from ZeppelinServer to" + - " remote interpreter group {}", - this.getInterpreterGroup().getId()); + " remote interpreter group {}", this.getInterpreterGroup().getId()); final java.lang.reflect.Type registryType = new TypeToken>>() {}.getType(); From 80e1c273b610be7e8781c3f96d860d5517583899 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Wed, 27 Jan 2016 13:38:25 +0100 Subject: [PATCH 10/10] Add licence header on test classes --- pom.xml | 1 - .../display/AngularObjectBuilder.java | 20 +++++++++++++++++++ .../display/AngularObjectBuilder.java | 20 +++++++++++++++++++ 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 590791841b6..67963a575be 100755 --- a/pom.xml +++ b/pom.xml @@ -728,5 +728,4 @@ - diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java b/zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java index f4d1120bc46..f64a9d8c286 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java @@ -1,5 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zeppelin.display; +/** + * Utility class to build an angular object for tests + */ public class AngularObjectBuilder { public static AngularObject build(String varName, T value, String noteId, diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java index f4d1120bc46..f64a9d8c286 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java @@ -1,5 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zeppelin.display; +/** + * Utility class to build an angular object for tests + */ public class AngularObjectBuilder { public static AngularObject build(String varName, T value, String noteId,