Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -123,8 +122,6 @@ public class InterpreterSettingManager {
private RecoveryStorage recoveryStorage;
private ConfigStorage configStorage;



public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
AngularObjectRegistryListener angularObjectRegistryListener,
RemoteInterpreterProcessListener
Expand Down Expand Up @@ -213,7 +210,7 @@ private void loadFromFile() throws IOException {
for (String oldId : oldSettingIdList) {
if (infoSaving.interpreterSettings.containsKey(oldId)) {
newSettingIdList.add(infoSaving.interpreterSettings.get(oldId).getName());
};
}
}
newBindingMap.put(noteId, newSettingIdList);
}
Expand Down Expand Up @@ -298,13 +295,11 @@ private void loadFromFile() throws IOException {
}

public void saveToFile() throws IOException {
synchronized (interpreterSettings) {
InterpreterInfoSaving info = new InterpreterInfoSaving();
info.interpreterBindings = interpreterBindings;
info.interpreterSettings = interpreterSettings;
info.interpreterRepositories = interpreterRepositories;
configStorage.save(info);
}
InterpreterInfoSaving info = new InterpreterInfoSaving();
info.interpreterBindings = interpreterBindings;
info.interpreterSettings = Maps.newHashMap(interpreterSettings);
info.interpreterRepositories = interpreterRepositories;
configStorage.save(info);
}

private void init() throws IOException {
Expand Down Expand Up @@ -439,7 +434,6 @@ public InterpreterSetting getDefaultInterpreterSetting(String noteId) {

public List<InterpreterSetting> getInterpreterSettings(String noteId) {
List<InterpreterSetting> settings = new ArrayList<>();
synchronized (interpreterSettings) {
List<String> interpreterSettingIds = interpreterBindings.get(noteId);
if (interpreterSettingIds != null) {
for (String settingId : interpreterSettingIds) {
Expand All @@ -451,19 +445,19 @@ public List<InterpreterSetting> getInterpreterSettings(String noteId) {
}
}
}
}
return settings;
}

public InterpreterSetting getInterpreterSettingByName(String name) {
synchronized (interpreterSettings) {
try {
for (InterpreterSetting setting : interpreterSettings.values()) {
if (setting.getName().equals(name)) {
return setting;
}
}
throw new RuntimeException("No such interpreter setting: " + name);
} finally {
}
throw new RuntimeException("No such interpreter setting: " + name);
}

public ManagedInterpreterGroup getInterpreterGroupById(String groupId) {
Expand Down Expand Up @@ -617,12 +611,11 @@ public void removeResourcesBelongsToNote(String noteId) {
}

/**
* Overwrite dependency jar under local-repo/{interpreterId}
* if jar file in original path is changed
* Overwrite dependency jar under local-repo/{interpreterId} if jar file in original path is
* changed
*/
private void copyDependenciesFromLocalPath(final InterpreterSetting setting) {
setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
synchronized (interpreterSettings) {
final Thread t = new Thread() {
public void run() {
try {
Expand Down Expand Up @@ -653,7 +646,6 @@ public void run() {
}
};
t.start();
}
}

/**
Expand Down Expand Up @@ -713,26 +705,24 @@ public void setInterpreterBinding(String user, String noteId, List<String> setti
throws IOException {
List<String> unBindedSettingIdList = new LinkedList<>();

synchronized (interpreterSettings) {
List<String> oldSettingIdList = interpreterBindings.get(noteId);
if (oldSettingIdList != null) {
for (String oldSettingId : oldSettingIdList) {
if (!settingIdList.contains(oldSettingId)) {
unBindedSettingIdList.add(oldSettingId);
}
List<String> oldSettingIdList = interpreterBindings.get(noteId);
if (oldSettingIdList != null) {
for (String oldSettingId : oldSettingIdList) {
if (!settingIdList.contains(oldSettingId)) {
unBindedSettingIdList.add(oldSettingId);
}
}
interpreterBindings.put(noteId, settingIdList);
saveToFile();
}
interpreterBindings.put(noteId, settingIdList);
saveToFile();

for (String settingId : unBindedSettingIdList) {
InterpreterSetting interpreterSetting = interpreterSettings.get(settingId);
//TODO(zjffdu) Add test for this scenario
//only close Interpreters when it is note scoped
if (interpreterSetting.getOption().perNoteIsolated() ||
interpreterSetting.getOption().perNoteScoped()) {
interpreterSetting.closeInterpreters(user, noteId);
}
for (String settingId : unBindedSettingIdList) {
InterpreterSetting interpreterSetting = interpreterSettings.get(settingId);
//TODO(zjffdu) Add test for this scenario
//only close Interpreters when it is note scoped
if (interpreterSetting.getOption().perNoteIsolated() ||
interpreterSetting.getOption().perNoteScoped()) {
interpreterSetting.closeInterpreters(user, noteId);
}
}
}
Expand Down Expand Up @@ -794,49 +784,45 @@ public void removeNoteInterpreterSettingBinding(String user, String noteId) thro
interpreterBindings.remove(noteId);
}

/**
* Change interpreter properties and restart
*/
public void setPropertyAndRestart(String id, InterpreterOption option,
Map<String, InterpreterProperty> properties,
List<Dependency> dependencies)
/** Change interpreter properties and restart */
public void setPropertyAndRestart(
String id,
InterpreterOption option,
Map<String, InterpreterProperty> properties,
List<Dependency> dependencies)
throws InterpreterException, IOException {
synchronized (interpreterSettings) {
InterpreterSetting intpSetting = interpreterSettings.get(id);
if (intpSetting != null) {
try {
intpSetting.close();
intpSetting.setOption(option);
intpSetting.setProperties(properties);
intpSetting.setDependencies(dependencies);
intpSetting.postProcessing();
saveToFile();
} catch (Exception e) {
loadFromFile();
throw new IOException(e);
}
} else {
throw new InterpreterException("Interpreter setting id " + id + " not found");
InterpreterSetting intpSetting = interpreterSettings.get(id);
if (intpSetting != null) {
try {
intpSetting.close();
intpSetting.setOption(option);
intpSetting.setProperties(properties);
intpSetting.setDependencies(dependencies);
intpSetting.postProcessing();
saveToFile();
} catch (Exception e) {
loadFromFile();
throw new IOException(e);
}
} else {
throw new InterpreterException("Interpreter setting id " + id + " not found");
}
}

// restart in note page
public void restart(String settingId, String noteId, String user) throws InterpreterException {
InterpreterSetting intpSetting = interpreterSettings.get(settingId);
Preconditions.checkNotNull(intpSetting);
synchronized (interpreterSettings) {
intpSetting = interpreterSettings.get(settingId);
// Check if dependency in specified path is changed
// If it did, overwrite old dependency jar with new one
if (intpSetting != null) {
//clean up metaInfos
intpSetting.setInfos(null);
copyDependenciesFromLocalPath(intpSetting);
intpSetting.closeInterpreters(user, noteId);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zjffdu It was the original problem.

} else {
throw new InterpreterException("Interpreter setting id " + settingId + " not found");
}
intpSetting = interpreterSettings.get(settingId);
// Check if dependency in specified path is changed
// If it did, overwrite old dependency jar with new one
if (intpSetting != null) {
// clean up metaInfos
intpSetting.setInfos(null);
copyDependenciesFromLocalPath(intpSetting);
intpSetting.closeInterpreters(user, noteId);
} else {
throw new InterpreterException("Interpreter setting id " + settingId + " not found");
}
}

Expand All @@ -845,9 +831,7 @@ public void restart(String id) throws InterpreterException {
}

public InterpreterSetting get(String id) {
synchronized (interpreterSettings) {
return interpreterSettings.get(id);
}
return interpreterSettings.get(id);
}

@VisibleForTesting
Expand All @@ -866,23 +850,20 @@ public void remove(String id) throws IOException {
// 3. remove this interpreter setting from note binding
// 4. clean local repo directory
LOGGER.info("Remove interpreter setting: " + id);
synchronized (interpreterSettings) {
if (interpreterSettings.containsKey(id)) {

InterpreterSetting intp = interpreterSettings.get(id);
intp.close();
interpreterSettings.remove(id);
for (List<String> settings : interpreterBindings.values()) {
Iterator<String> it = settings.iterator();
while (it.hasNext()) {
String settingId = it.next();
if (settingId.equals(id)) {
it.remove();
}
if (interpreterSettings.containsKey(id)) {
InterpreterSetting intp = interpreterSettings.get(id);
intp.close();
interpreterSettings.remove(id);
for (List<String> settings : interpreterBindings.values()) {
Iterator<String> it = settings.iterator();
while (it.hasNext()) {
String settingId = it.next();
if (settingId.equals(id)) {
it.remove();
}
}
saveToFile();
}
saveToFile();
}

File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + id);
Expand All @@ -893,36 +874,34 @@ public void remove(String id) throws IOException {
* Get interpreter settings
*/
public List<InterpreterSetting> get() {
synchronized (interpreterSettings) {
List<InterpreterSetting> orderedSettings = new ArrayList<>(interpreterSettings.values());
Collections.sort(orderedSettings, new Comparator<InterpreterSetting>() {
@Override
public int compare(InterpreterSetting o1, InterpreterSetting o2) {
int i = interpreterGroupOrderList.indexOf(o1.getGroup());
int j = interpreterGroupOrderList.indexOf(o2.getGroup());
if (i < 0) {
LOGGER.warn("InterpreterGroup " + o1.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
i = Integer.MAX_VALUE;
}
if (j < 0) {
LOGGER.warn("InterpreterGroup " + o2.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
j = Integer.MAX_VALUE;
}
if (i < j) {
return -1;
} else if (i > j) {
return 1;
} else {
return 0;
}
List<InterpreterSetting> orderedSettings = new ArrayList<>(interpreterSettings.values());
Collections.sort(orderedSettings, new Comparator<InterpreterSetting>() {
@Override
public int compare(InterpreterSetting o1, InterpreterSetting o2) {
int i = interpreterGroupOrderList.indexOf(o1.getGroup());
int j = interpreterGroupOrderList.indexOf(o2.getGroup());
if (i < 0) {
LOGGER.warn("InterpreterGroup " + o1.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
i = Integer.MAX_VALUE;
}
});
return orderedSettings;
}
if (j < 0) {
LOGGER.warn("InterpreterGroup " + o2.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
j = Integer.MAX_VALUE;
}
if (i < j) {
return -1;
} else if (i > j) {
return 1;
} else {
return 0;
}
}
});
return orderedSettings;
}

@VisibleForTesting
Expand All @@ -940,17 +919,15 @@ public void close(String settingId) {

public void close() {
List<Thread> closeThreads = new LinkedList<>();
synchronized (interpreterSettings) {
Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
for (final InterpreterSetting intpSetting : intpSettings) {
Thread t = new Thread() {
public void run() {
intpSetting.close();
}
};
t.start();
closeThreads.add(t);
}
for (final InterpreterSetting intpSetting : interpreterSettings.values()) {
Thread t =
new Thread() {
public void run() {
intpSetting.close();
}
};
t.start();
closeThreads.add(t);
}

for (Thread t : closeThreads) {
Expand All @@ -961,5 +938,4 @@ public void run() {
}
}
}

}