From 24be69298c8a3ca514b478a0ca95e9ac0f151178 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Tue, 17 Apr 2018 18:38:56 +0900 Subject: [PATCH 1/3] Removed all synchronized blocks and replace them to read/write lock --- .../InterpreterSettingManager.java | 234 ++++++++++-------- 1 file changed, 124 insertions(+), 110 deletions(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index a5184c2c3ae..8a30e7f8dd9 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -24,6 +24,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; @@ -103,6 +104,7 @@ public class InterpreterSettingManager { */ private final Map interpreterSettings = Maps.newConcurrentMap(); + private final ReentrantReadWriteLock interpreterSettingsLock = new ReentrantReadWriteLock(); /** * noteId --> list of InterpreterSettingId @@ -199,7 +201,9 @@ private void loadFromFile() throws IOException { for (InterpreterSetting interpreterSettingTemplate : interpreterSettingTemplates.values()) { InterpreterSetting interpreterSetting = new InterpreterSetting(interpreterSettingTemplate); initInterpreterSetting(interpreterSetting); + interpreterSettingsLock.writeLock().lock(); interpreterSettings.put(interpreterSetting.getId(), interpreterSetting); + interpreterSettingsLock.writeLock().unlock(); } return; } @@ -211,9 +215,11 @@ private void loadFromFile() throws IOException { List oldSettingIdList = entry.getValue(); List newSettingIdList = new ArrayList<>(); for (String oldId : oldSettingIdList) { + interpreterSettingsLock.readLock().lock(); if (infoSaving.interpreterSettings.containsKey(oldId)) { newSettingIdList.add(infoSaving.interpreterSettings.get(oldId).getName()); - }; + } + interpreterSettingsLock.readLock().unlock(); } newBindingMap.put(noteId, newSettingIdList); } @@ -271,15 +277,19 @@ private void loadFromFile() throws IOException { // Overwrite the default InterpreterSetting we registered from InterpreterSetting Templates // remove it first + interpreterSettingsLock.writeLock().lock(); for (InterpreterSetting setting : interpreterSettings.values()) { if (setting.getName().equals(savedInterpreterSetting.getName())) { interpreterSettings.remove(setting.getId()); } } + interpreterSettingsLock.writeLock().unlock(); savedInterpreterSetting.postProcessing(); LOGGER.info("Create Interpreter Setting {} from interpreter.json", savedInterpreterSetting.getName()); + interpreterSettingsLock.writeLock().lock(); interpreterSettings.put(savedInterpreterSetting.getId(), savedInterpreterSetting); + interpreterSettingsLock.writeLock().unlock(); } if (infoSaving.interpreterRepositories != null) { @@ -291,20 +301,20 @@ private void loadFromFile() throws IOException { // force interpreter dependencies loading once the // repositories have been loaded. + interpreterSettingsLock.readLock().lock(); for (InterpreterSetting setting : interpreterSettings.values()) { setting.setDependencies(setting.getDependencies()); } + interpreterSettingsLock.readLock().unlock(); } } public void saveToFile() throws IOException { - synchronized (interpreterSettings) { - InterpreterInfoSaving info = new InterpreterInfoSaving(); - info.interpreterBindings = interpreterBindings; - info.interpreterSettings = interpreterSettings; - info.interpreterRepositories = interpreterRepositories; - configStorage.save(info); - } + InterpreterInfoSaving info = new InterpreterInfoSaving(); + info.interpreterBindings = interpreterBindings; + info.interpreterSettings = Maps.newHashMap(interpreterSettings); + info.interpreterRepositories = interpreterRepositories; + configStorage.save(info); } private void init() throws IOException { @@ -439,7 +449,7 @@ public InterpreterSetting getDefaultInterpreterSetting(String noteId) { public List getInterpreterSettings(String noteId) { List settings = new ArrayList<>(); - synchronized (interpreterSettings) { + interpreterSettingsLock.readLock().lock(); List interpreterSettingIds = interpreterBindings.get(noteId); if (interpreterSettingIds != null) { for (String settingId : interpreterSettingIds) { @@ -451,19 +461,22 @@ public List getInterpreterSettings(String noteId) { } } } - } + interpreterSettingsLock.readLock().unlock(); return settings; } public InterpreterSetting getInterpreterSettingByName(String name) { - synchronized (interpreterSettings) { + try { + interpreterSettingsLock.readLock().lock(); for (InterpreterSetting setting : interpreterSettings.values()) { if (setting.getName().equals(name)) { return setting; } } + throw new RuntimeException("No such interpreter setting: " + name); + } finally { + interpreterSettingsLock.readLock().unlock(); } - throw new RuntimeException("No such interpreter setting: " + name); } public ManagedInterpreterGroup getInterpreterGroupById(String groupId) { @@ -617,12 +630,11 @@ public void removeResourcesBelongsToNote(String noteId) { } /** - * Overwrite dependency jar under local-repo/{interpreterId} - * if jar file in original path is changed + * Overwrite dependency jar under local-repo/{interpreterId} if jar file in original path is + * changed */ private void copyDependenciesFromLocalPath(final InterpreterSetting setting) { setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES); - synchronized (interpreterSettings) { final Thread t = new Thread() { public void run() { try { @@ -653,7 +665,6 @@ public void run() { } }; t.start(); - } } /** @@ -713,28 +724,28 @@ public void setInterpreterBinding(String user, String noteId, List setti throws IOException { List unBindedSettingIdList = new LinkedList<>(); - synchronized (interpreterSettings) { - List oldSettingIdList = interpreterBindings.get(noteId); - if (oldSettingIdList != null) { - for (String oldSettingId : oldSettingIdList) { - if (!settingIdList.contains(oldSettingId)) { - unBindedSettingIdList.add(oldSettingId); - } + interpreterSettingsLock.readLock().lock(); + List oldSettingIdList = interpreterBindings.get(noteId); + if (oldSettingIdList != null) { + for (String oldSettingId : oldSettingIdList) { + if (!settingIdList.contains(oldSettingId)) { + unBindedSettingIdList.add(oldSettingId); } } - interpreterBindings.put(noteId, settingIdList); - saveToFile(); - - for (String settingId : unBindedSettingIdList) { - InterpreterSetting interpreterSetting = interpreterSettings.get(settingId); - //TODO(zjffdu) Add test for this scenario - //only close Interpreters when it is note scoped - if (interpreterSetting.getOption().perNoteIsolated() || - interpreterSetting.getOption().perNoteScoped()) { - interpreterSetting.closeInterpreters(user, noteId); - } + } + interpreterBindings.put(noteId, settingIdList); + saveToFile(); + + for (String settingId : unBindedSettingIdList) { + InterpreterSetting interpreterSetting = interpreterSettings.get(settingId); + //TODO(zjffdu) Add test for this scenario + //only close Interpreters when it is note scoped + if (interpreterSetting.getOption().perNoteIsolated() || + interpreterSetting.getOption().perNoteScoped()) { + interpreterSetting.closeInterpreters(user, noteId); } } + interpreterSettingsLock.readLock().unlock(); } public List getInterpreterBinding(String noteId) { @@ -794,30 +805,30 @@ public void removeNoteInterpreterSettingBinding(String user, String noteId) thro interpreterBindings.remove(noteId); } - /** - * Change interpreter properties and restart - */ - public void setPropertyAndRestart(String id, InterpreterOption option, - Map properties, - List dependencies) + /** Change interpreter properties and restart */ + public void setPropertyAndRestart( + String id, + InterpreterOption option, + Map properties, + List dependencies) throws InterpreterException, IOException { - synchronized (interpreterSettings) { - InterpreterSetting intpSetting = interpreterSettings.get(id); - if (intpSetting != null) { - try { - intpSetting.close(); - intpSetting.setOption(option); - intpSetting.setProperties(properties); - intpSetting.setDependencies(dependencies); - intpSetting.postProcessing(); - saveToFile(); - } catch (Exception e) { - loadFromFile(); - throw new IOException(e); - } - } else { - throw new InterpreterException("Interpreter setting id " + id + " not found"); + interpreterSettingsLock.readLock().lock(); + InterpreterSetting intpSetting = interpreterSettings.get(id); + interpreterSettingsLock.readLock().unlock(); + if (intpSetting != null) { + try { + intpSetting.close(); + intpSetting.setOption(option); + intpSetting.setProperties(properties); + intpSetting.setDependencies(dependencies); + intpSetting.postProcessing(); + saveToFile(); + } catch (Exception e) { + loadFromFile(); + throw new IOException(e); } + } else { + throw new InterpreterException("Interpreter setting id " + id + " not found"); } } @@ -825,18 +836,18 @@ public void setPropertyAndRestart(String id, InterpreterOption option, public void restart(String settingId, String noteId, String user) throws InterpreterException { InterpreterSetting intpSetting = interpreterSettings.get(settingId); Preconditions.checkNotNull(intpSetting); - synchronized (interpreterSettings) { - intpSetting = interpreterSettings.get(settingId); - // Check if dependency in specified path is changed - // If it did, overwrite old dependency jar with new one - if (intpSetting != null) { - //clean up metaInfos - intpSetting.setInfos(null); - copyDependenciesFromLocalPath(intpSetting); - intpSetting.closeInterpreters(user, noteId); - } else { - throw new InterpreterException("Interpreter setting id " + settingId + " not found"); - } + interpreterSettingsLock.readLock().lock(); + intpSetting = interpreterSettings.get(settingId); + interpreterSettingsLock.readLock().unlock(); + // Check if dependency in specified path is changed + // If it did, overwrite old dependency jar with new one + if (intpSetting != null) { + // clean up metaInfos + intpSetting.setInfos(null); + copyDependenciesFromLocalPath(intpSetting); + intpSetting.closeInterpreters(user, noteId); + } else { + throw new InterpreterException("Interpreter setting id " + settingId + " not found"); } } @@ -845,8 +856,11 @@ public void restart(String id) throws InterpreterException { } public InterpreterSetting get(String id) { - synchronized (interpreterSettings) { + try { + interpreterSettingsLock.readLock().lock(); return interpreterSettings.get(id); + } finally { + interpreterSettingsLock.readLock().unlock(); } } @@ -893,36 +907,36 @@ public void remove(String id) throws IOException { * Get interpreter settings */ public List get() { - synchronized (interpreterSettings) { - List orderedSettings = new ArrayList<>(interpreterSettings.values()); - Collections.sort(orderedSettings, new Comparator() { - @Override - public int compare(InterpreterSetting o1, InterpreterSetting o2) { - int i = interpreterGroupOrderList.indexOf(o1.getGroup()); - int j = interpreterGroupOrderList.indexOf(o2.getGroup()); - if (i < 0) { - LOGGER.warn("InterpreterGroup " + o1.getGroup() - + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); - // move the unknown interpreter to last - i = Integer.MAX_VALUE; - } - if (j < 0) { - LOGGER.warn("InterpreterGroup " + o2.getGroup() - + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); - // move the unknown interpreter to last - j = Integer.MAX_VALUE; - } - if (i < j) { - return -1; - } else if (i > j) { - return 1; - } else { - return 0; - } + interpreterSettingsLock.readLock().lock(); + List orderedSettings = new ArrayList<>(interpreterSettings.values()); + interpreterSettingsLock.readLock().unlock(); + Collections.sort(orderedSettings, new Comparator() { + @Override + public int compare(InterpreterSetting o1, InterpreterSetting o2) { + int i = interpreterGroupOrderList.indexOf(o1.getGroup()); + int j = interpreterGroupOrderList.indexOf(o2.getGroup()); + if (i < 0) { + LOGGER.warn("InterpreterGroup " + o1.getGroup() + + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); + // move the unknown interpreter to last + i = Integer.MAX_VALUE; } - }); - return orderedSettings; - } + if (j < 0) { + LOGGER.warn("InterpreterGroup " + o2.getGroup() + + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); + // move the unknown interpreter to last + j = Integer.MAX_VALUE; + } + if (i < j) { + return -1; + } else if (i > j) { + return 1; + } else { + return 0; + } + } + }); + return orderedSettings; } @VisibleForTesting @@ -940,17 +954,18 @@ public void close(String settingId) { public void close() { List closeThreads = new LinkedList<>(); - synchronized (interpreterSettings) { - Collection intpSettings = interpreterSettings.values(); - for (final InterpreterSetting intpSetting : intpSettings) { - Thread t = new Thread() { - public void run() { - intpSetting.close(); - } - }; - t.start(); - closeThreads.add(t); - } + interpreterSettingsLock.readLock().lock(); + Collection intpSettings = interpreterSettings.values(); + interpreterSettingsLock.readLock().unlock(); + for (final InterpreterSetting intpSetting : intpSettings) { + Thread t = + new Thread() { + public void run() { + intpSetting.close(); + } + }; + t.start(); + closeThreads.add(t); } for (Thread t : closeThreads) { @@ -961,5 +976,4 @@ public void run() { } } } - } From 4691301af23526d8f8ef63e39f93541bea4848e4 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Mon, 23 Apr 2018 13:14:14 +0900 Subject: [PATCH 2/3] Removed lock/synchronized codes because interpreterSettings already is implemented by currentHashMap --- .../InterpreterSettingManager.java | 59 ++++--------------- 1 file changed, 12 insertions(+), 47 deletions(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index 8a30e7f8dd9..c4f6e9a0141 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -104,7 +104,6 @@ public class InterpreterSettingManager { */ private final Map interpreterSettings = Maps.newConcurrentMap(); - private final ReentrantReadWriteLock interpreterSettingsLock = new ReentrantReadWriteLock(); /** * noteId --> list of InterpreterSettingId @@ -125,8 +124,6 @@ public class InterpreterSettingManager { private RecoveryStorage recoveryStorage; private ConfigStorage configStorage; - - public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration, AngularObjectRegistryListener angularObjectRegistryListener, RemoteInterpreterProcessListener @@ -201,9 +198,7 @@ private void loadFromFile() throws IOException { for (InterpreterSetting interpreterSettingTemplate : interpreterSettingTemplates.values()) { InterpreterSetting interpreterSetting = new InterpreterSetting(interpreterSettingTemplate); initInterpreterSetting(interpreterSetting); - interpreterSettingsLock.writeLock().lock(); interpreterSettings.put(interpreterSetting.getId(), interpreterSetting); - interpreterSettingsLock.writeLock().unlock(); } return; } @@ -215,11 +210,9 @@ private void loadFromFile() throws IOException { List oldSettingIdList = entry.getValue(); List newSettingIdList = new ArrayList<>(); for (String oldId : oldSettingIdList) { - interpreterSettingsLock.readLock().lock(); if (infoSaving.interpreterSettings.containsKey(oldId)) { newSettingIdList.add(infoSaving.interpreterSettings.get(oldId).getName()); } - interpreterSettingsLock.readLock().unlock(); } newBindingMap.put(noteId, newSettingIdList); } @@ -277,19 +270,15 @@ private void loadFromFile() throws IOException { // Overwrite the default InterpreterSetting we registered from InterpreterSetting Templates // remove it first - interpreterSettingsLock.writeLock().lock(); for (InterpreterSetting setting : interpreterSettings.values()) { if (setting.getName().equals(savedInterpreterSetting.getName())) { interpreterSettings.remove(setting.getId()); } } - interpreterSettingsLock.writeLock().unlock(); savedInterpreterSetting.postProcessing(); LOGGER.info("Create Interpreter Setting {} from interpreter.json", savedInterpreterSetting.getName()); - interpreterSettingsLock.writeLock().lock(); interpreterSettings.put(savedInterpreterSetting.getId(), savedInterpreterSetting); - interpreterSettingsLock.writeLock().unlock(); } if (infoSaving.interpreterRepositories != null) { @@ -301,11 +290,9 @@ private void loadFromFile() throws IOException { // force interpreter dependencies loading once the // repositories have been loaded. - interpreterSettingsLock.readLock().lock(); for (InterpreterSetting setting : interpreterSettings.values()) { setting.setDependencies(setting.getDependencies()); } - interpreterSettingsLock.readLock().unlock(); } } @@ -449,7 +436,6 @@ public InterpreterSetting getDefaultInterpreterSetting(String noteId) { public List getInterpreterSettings(String noteId) { List settings = new ArrayList<>(); - interpreterSettingsLock.readLock().lock(); List interpreterSettingIds = interpreterBindings.get(noteId); if (interpreterSettingIds != null) { for (String settingId : interpreterSettingIds) { @@ -461,13 +447,11 @@ public List getInterpreterSettings(String noteId) { } } } - interpreterSettingsLock.readLock().unlock(); return settings; } public InterpreterSetting getInterpreterSettingByName(String name) { try { - interpreterSettingsLock.readLock().lock(); for (InterpreterSetting setting : interpreterSettings.values()) { if (setting.getName().equals(name)) { return setting; @@ -475,7 +459,6 @@ public InterpreterSetting getInterpreterSettingByName(String name) { } throw new RuntimeException("No such interpreter setting: " + name); } finally { - interpreterSettingsLock.readLock().unlock(); } } @@ -724,7 +707,6 @@ public void setInterpreterBinding(String user, String noteId, List setti throws IOException { List unBindedSettingIdList = new LinkedList<>(); - interpreterSettingsLock.readLock().lock(); List oldSettingIdList = interpreterBindings.get(noteId); if (oldSettingIdList != null) { for (String oldSettingId : oldSettingIdList) { @@ -745,7 +727,6 @@ public void setInterpreterBinding(String user, String noteId, List setti interpreterSetting.closeInterpreters(user, noteId); } } - interpreterSettingsLock.readLock().unlock(); } public List getInterpreterBinding(String noteId) { @@ -812,9 +793,7 @@ public void setPropertyAndRestart( Map properties, List dependencies) throws InterpreterException, IOException { - interpreterSettingsLock.readLock().lock(); InterpreterSetting intpSetting = interpreterSettings.get(id); - interpreterSettingsLock.readLock().unlock(); if (intpSetting != null) { try { intpSetting.close(); @@ -836,9 +815,7 @@ public void setPropertyAndRestart( public void restart(String settingId, String noteId, String user) throws InterpreterException { InterpreterSetting intpSetting = interpreterSettings.get(settingId); Preconditions.checkNotNull(intpSetting); - interpreterSettingsLock.readLock().lock(); intpSetting = interpreterSettings.get(settingId); - interpreterSettingsLock.readLock().unlock(); // Check if dependency in specified path is changed // If it did, overwrite old dependency jar with new one if (intpSetting != null) { @@ -856,12 +833,7 @@ public void restart(String id) throws InterpreterException { } public InterpreterSetting get(String id) { - try { - interpreterSettingsLock.readLock().lock(); - return interpreterSettings.get(id); - } finally { - interpreterSettingsLock.readLock().unlock(); - } + return interpreterSettings.get(id); } @VisibleForTesting @@ -880,23 +852,20 @@ public void remove(String id) throws IOException { // 3. remove this interpreter setting from note binding // 4. clean local repo directory LOGGER.info("Remove interpreter setting: " + id); - synchronized (interpreterSettings) { - if (interpreterSettings.containsKey(id)) { - - InterpreterSetting intp = interpreterSettings.get(id); - intp.close(); - interpreterSettings.remove(id); - for (List settings : interpreterBindings.values()) { - Iterator it = settings.iterator(); - while (it.hasNext()) { - String settingId = it.next(); - if (settingId.equals(id)) { - it.remove(); - } + if (interpreterSettings.containsKey(id)) { + InterpreterSetting intp = interpreterSettings.get(id); + intp.close(); + interpreterSettings.remove(id); + for (List settings : interpreterBindings.values()) { + Iterator it = settings.iterator(); + while (it.hasNext()) { + String settingId = it.next(); + if (settingId.equals(id)) { + it.remove(); } } - saveToFile(); } + saveToFile(); } File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + id); @@ -907,9 +876,7 @@ public void remove(String id) throws IOException { * Get interpreter settings */ public List get() { - interpreterSettingsLock.readLock().lock(); List orderedSettings = new ArrayList<>(interpreterSettings.values()); - interpreterSettingsLock.readLock().unlock(); Collections.sort(orderedSettings, new Comparator() { @Override public int compare(InterpreterSetting o1, InterpreterSetting o2) { @@ -954,9 +921,7 @@ public void close(String settingId) { public void close() { List closeThreads = new LinkedList<>(); - interpreterSettingsLock.readLock().lock(); Collection intpSettings = interpreterSettings.values(); - interpreterSettingsLock.readLock().unlock(); for (final InterpreterSetting intpSetting : intpSettings) { Thread t = new Thread() { From 3b90155b93b91b2a5a11e072883d6051b915ee53 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Mon, 23 Apr 2018 15:43:45 +0900 Subject: [PATCH 3/3] Removed unused `import` statements Simplified `for` loop --- .../zeppelin/interpreter/InterpreterSettingManager.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index c4f6e9a0141..e725675c868 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -24,7 +24,6 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; @@ -65,7 +64,6 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -921,8 +919,7 @@ public void close(String settingId) { public void close() { List closeThreads = new LinkedList<>(); - Collection intpSettings = interpreterSettings.values(); - for (final InterpreterSetting intpSetting : intpSettings) { + for (final InterpreterSetting intpSetting : interpreterSettings.values()) { Thread t = new Thread() { public void run() {