From bdb4981b9dae07023ca044d72a80f04c8bcdb10a Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Thu, 24 Jan 2019 21:21:56 +0800 Subject: [PATCH 1/2] Support multi directories for executor shuffle info recovery --- .../shuffle/ExternalShuffleBlockHandler.java | 7 + .../shuffle/ExternalShuffleBlockResolver.java | 135 +++++++++++++++++- .../network/yarn/YarnShuffleService.java | 32 ++++- .../yarn/YarnShuffleServiceSuite.scala | 54 +++++++ 4 files changed, 220 insertions(+), 8 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index b25e48a164e6b..5d3c358efd717 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import com.codahale.metrics.Gauge; @@ -66,6 +67,12 @@ public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFi new ExternalShuffleBlockResolver(conf, registeredExecutorFile)); } + public ExternalShuffleBlockHandler( + TransportConf conf, List localDirs, String recoveryFileName, long diskCheckInterval) throws IOException { + this(new OneForOneStreamManager(), + new ExternalShuffleBlockResolver(conf, localDirs, recoveryFileName, diskCheckInterval)); + } + /** Enables mocking out the StreamManager and BlockManager. */ @VisibleForTesting public ExternalShuffleBlockHandler( diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 0b7a27402369d..aca241f7d0d65 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -24,6 +24,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,6 +39,7 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.Weigher; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; @@ -86,10 +90,16 @@ public class ExternalShuffleBlockResolver { private final TransportConf conf; + private final List recoveryPaths; + + private final String recoveryFileName; + + private AtomicBoolean registerExecutorFailed = new AtomicBoolean(false); + @VisibleForTesting - final File registeredExecutorFile; + File registeredExecutorFile; @VisibleForTesting - final DB db; + DB db; private final List knownManagers = Arrays.asList( "org.apache.spark.shuffle.sort.SortShuffleManager", @@ -97,19 +107,56 @@ public class ExternalShuffleBlockResolver { public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException { - this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor( + this(conf, registeredExecutorFile, null, null, -1, + Executors.newSingleThreadExecutor( // Add `spark` prefix because it will run in NM in Yarn mode. NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner"))); } + public ExternalShuffleBlockResolver( + TransportConf conf, + File registeredExecutorFile, + Executor directoryCleaner) throws IOException { + this(conf, registeredExecutorFile, null, null, -1, directoryCleaner); + } + + ExternalShuffleBlockResolver( + TransportConf conf, + List recoveryPaths, + String registeredExecutorFileName, + long checkInterval) throws IOException { + this(conf, null, recoveryPaths, registeredExecutorFileName, checkInterval, + Executors.newSingleThreadExecutor( + NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner"))); + } + + // Allows tests to have more control over when directories are cleaned up. @VisibleForTesting ExternalShuffleBlockResolver( TransportConf conf, File registeredExecutorFile, + List recoveryPaths, + String recoveryFileName, + long checkInterval, Executor directoryCleaner) throws IOException { this.conf = conf; - this.registeredExecutorFile = registeredExecutorFile; + if (registeredExecutorFile != null) { + this.registeredExecutorFile = registeredExecutorFile; + this.recoveryPaths = null; + this.recoveryFileName = null; + } else { + this.recoveryPaths = recoveryPaths; + this.recoveryFileName = recoveryFileName; + this.registeredExecutorFile = findRegisteredExecutorFile(); + } + if (checkInterval > 0) { + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + NettyUtils.createThreadFactory("spark-shuffle-disk-checker")); + executor.scheduleAtFixedRate(() -> { + checkRegisteredExecutorFileAndRecover(); + }, checkInterval, checkInterval, TimeUnit.MILLISECONDS); + } String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m"); CacheLoader indexCacheLoader = new CacheLoader() { @@ -138,6 +185,69 @@ public int getRegisteredExecutorsSize() { return executors.size(); } + public File findRegisteredExecutorFile() { + File newFile = null; + if (recoveryPaths != null) { + File latestFile = null; + for (String path : recoveryPaths) { + File registeredExecutorFile = new File(path, recoveryFileName); + if (registeredExecutorFile.exists()) { + if (registeredExecutorFile.canRead() + && registeredExecutorFile.canWrite()) { + if (latestFile == null || registeredExecutorFile.lastModified() > latestFile.lastModified()) { + latestFile = registeredExecutorFile; + } + } + } else { + try { + JavaUtils.deleteRecursively(registeredExecutorFile); + newFile = registeredExecutorFile; + } catch (IOException e) { + logger.warn("Failed to delete old registered executor file " + registeredExecutorFile, e); + } + } + } + if (latestFile == null) { + return newFile; + } + return latestFile; + } + return null; + } + + public File saveNewRegisteredExecutorFile() { + if (recoveryPaths != null) { + for (String path : recoveryPaths) { + File newRegisteredExecutorFile = new File(path, recoveryFileName); + if (!newRegisteredExecutorFile.exists()) { + try { + DB newDb = LevelDBProvider.initLevelDB(newRegisteredExecutorFile, CURRENT_VERSION, mapper); + for (Map.Entry entry : executors.entrySet()) { + byte[] key = dbAppExecKey(entry.getKey()); + byte[] value = mapper.writeValueAsString(entry.getValue()).getBytes(StandardCharsets.UTF_8); + db.put(key, value); + } + if (this.db != null) { + try { + this.db.close(); + JavaUtils.deleteRecursively(registeredExecutorFile); + } catch (IOException e) { + logger.warn("Failed to clean up old registered executors file at " + registeredExecutorFile, e); + } + } + this.db = newDb; + this.registeredExecutorFile = newRegisteredExecutorFile; + return newRegisteredExecutorFile; + } catch (Exception e) { + logger.error("Exception occurred while saving registered executors info to new file " + newRegisteredExecutorFile, e); + // continue + } + } + } + } + return null; + } + /** Registers a new Executor with all the configuration we need to find its shuffle files. */ public void registerExecutor( String appId, @@ -157,6 +267,7 @@ public void registerExecutor( } } catch (Exception e) { logger.error("Error saving registered executors", e); + registerExecutorFailed.getAndSet(true); } executors.put(fullId, executorInfo); } @@ -274,6 +385,22 @@ public boolean accept(File dir, String name) { } } + /** + * Check if the registeredExecutorFile is unhealthy, and try to recover. In the recovery, + * It will create a new registeredExecutorFile and save the latest executors info. + */ + private void checkRegisteredExecutorFileAndRecover() { + if (this.registeredExecutorFile == null) { + return; + } else if (db != null && (!registeredExecutorFile.exists() || + !registeredExecutorFile.canRead() || + !registeredExecutorFile.canWrite())) { + saveNewRegisteredExecutorFile(); + } else if (registerExecutorFailed.getAndSet(false)) { + saveNewRegisteredExecutorFile(); + } + } + /** * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver, diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 7e8d3b2bc3ba4..faf5b9d7ce02c 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -92,6 +93,17 @@ public class YarnShuffleService extends AuxiliaryService { static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure"; private static final boolean DEFAULT_STOP_ON_FAILURE = false; + // Whether enable multiple disk recovery on registered executors, we assume that + // yarn.nodemanager.local-dirs are mounted on multiple disks. + @VisibleForTesting + static final String REGISTERED_EXECUTORS_MULTIPLE_DISK_RECOVERY = + "spark.yarn.shuffle.registeredExecutors.multipleDiskRecovery"; + private static final boolean DEFAULT_REGISTERED_EXECUTORS_MULTIPLE_DISK_RECOVERY = false; + // Disk check interval in milliseconds + private static final String REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL = + "spark.yarn.shuffle.registeredExecutorsFile.checkInterval"; + private static final long DEFAULT_REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL = 60 * 1000; + // just for testing when you want to find an open port @VisibleForTesting static int boundPort = -1; @@ -156,6 +168,8 @@ protected void serviceInit(Configuration conf) throws Exception { _conf = conf; boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); + boolean multiDiskRecovery = conf.getBoolean(REGISTERED_EXECUTORS_MULTIPLE_DISK_RECOVERY, + DEFAULT_REGISTERED_EXECUTORS_MULTIPLE_DISK_RECOVERY); try { // In case this NM was killed while there were running spark applications, we need to restore @@ -163,13 +177,23 @@ protected void serviceInit(Configuration conf) throws Exception { // If we don't find one, then we choose a file to use to save the state next time. Even if // an application was stopped while the NM was down, we expect yarn to call stopApplication() // when it comes back + TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); if (_recoveryPath != null) { - registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); + if (multiDiskRecovery) { + List recoveryPaths = Lists.newArrayList(); + recoveryPaths.add(_recoveryPath.toUri().getPath()); + Collection localDirs = conf.getTrimmedStringCollection("yarn.nodemanager.local-dirs"); + recoveryPaths.addAll(localDirs); + blockHandler = new ExternalShuffleBlockHandler(transportConf, recoveryPaths, RECOVERY_FILE_NAME, + conf.getLong(REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL, DEFAULT_REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL)); + } else { + registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); + blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); + } + } else { + blockHandler = new ExternalShuffleBlockHandler(transportConf, null); } - TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); - blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); - // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List bootstraps = Lists.newArrayList(); diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 268f4bd13f6c3..f6fb1fbc1223f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -381,4 +381,58 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s1.secretsFile should be (null) } + test("shuffle serivce should be robust if multiple disk recovery enabled") { + yarnConfig.set("spark.yarn.shuffle.registeredExecutors.multipleDiskRecovery", "true") + yarnConfig.set("spark.yarn.shuffle.registeredExecutorsFile.checkInterval", "500") + s1 = new YarnShuffleService + val recoveryPath = new Path(recoveryLocalDir.toURI.getPath) + s1.setRecoveryPath(recoveryPath) + s1.init(yarnConfig) + val appId1 = ApplicationId.newInstance(0, 1); + val app1 = makeAppInfo("user1", appId1) + s1.initializeApplication(app1) + + val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) + val shuffleInfo2 = new ExecutorShuffleInfo(Array("/foo2", "/bar2"), 3, SORT_MANAGER) + val shuffleInfo3 = new ExecutorShuffleInfo(Array("/foo3", "/bar3"), 3, SORT_MANAGER) + + val blockResolver = ShuffleTestAccessor.getBlockResolver(s1.blockHandler) + val execStateFile = ShuffleTestAccessor.registeredExecutorFile(blockResolver) + + blockResolver.registerExecutor(appId1.toString, "exec1", shuffleInfo1) + val getExecInfo1 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec1", blockResolver) + getExecInfo1 should be (Some(shuffleInfo1)) + + execStateFile.setWritable(false) + Thread.sleep(5000) + s1.stopApplication(new ApplicationTerminationContext(appId1)) + s1.stop() + execStateFile.listFiles().foreach(_.delete()) + execStateFile.delete() + + + s2 = new YarnShuffleService + s2.setRecoveryPath(recoveryPath) + s2.init(yarnConfig) + val blockResolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler) + val execStateFile2 = ShuffleTestAccessor.registeredExecutorFile(blockResolver2) + + assert(execStateFile !== execStateFile2) + val getExecInfo2 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec1", blockResolver2) + getExecInfo2 should be (Some(shuffleInfo1)) + blockResolver.registerExecutor(appId1.toString, "exec2", shuffleInfo2) + val getExecInfo3 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec2", blockResolver2) + getExecInfo3 should be (Some(shuffleInfo2)) + + val appId2 = ApplicationId.newInstance(0, 2) + val app2 = makeAppInfo("user1", appId2) + s2.initializeApplication(app2) + + blockResolver2.registerExecutor(appId2.toString, "exec1", shuffleInfo3) + val getExecInfo4 = ShuffleTestAccessor.getExecutorInfo(appId2, "exec1", blockResolver2) + getExecInfo4 should be (Some(shuffleInfo3)) + + s2.stopApplication(new ApplicationTerminationContext(appId2)) + s2.stop() + } } From ed1e043ff7bde8e3e2072e28ac86b01f42d85ccf Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Fri, 25 Jan 2019 14:44:55 +0800 Subject: [PATCH 2/2] Support multi directories recovery for executor shuffle info in YarnShuffleService --- .../shuffle/ExternalShuffleBlockHandler.java | 11 +- .../shuffle/ExternalShuffleBlockResolver.java | 144 ++++++------------ .../network/yarn/YarnShuffleService.java | 85 ++++++++--- .../network/shuffle/ShuffleTestAccessor.scala | 5 + .../yarn/YarnShuffleServiceSuite.scala | 88 ++++++----- 5 files changed, 177 insertions(+), 156 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 5d3c358efd717..3c61bb1af5f2f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -61,16 +61,17 @@ public class ExternalShuffleBlockHandler extends RpcHandler { private final OneForOneStreamManager streamManager; private final ShuffleMetrics metrics; - public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) + public ExternalShuffleBlockHandler( + TransportConf conf, File registeredExecutorFile) throws IOException { - this(new OneForOneStreamManager(), - new ExternalShuffleBlockResolver(conf, registeredExecutorFile)); + this(conf, registeredExecutorFile, null, -1L); } public ExternalShuffleBlockHandler( - TransportConf conf, List localDirs, String recoveryFileName, long diskCheckInterval) throws IOException { + TransportConf conf, File registeredExecutorFile, String[] recoveryDirs, long checkInterval) + throws IOException { this(new OneForOneStreamManager(), - new ExternalShuffleBlockResolver(conf, localDirs, recoveryFileName, diskCheckInterval)); + new ExternalShuffleBlockResolver(conf, registeredExecutorFile, recoveryDirs, checkInterval)); } /** Enables mocking out the StreamManager and BlockManager. */ diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index aca241f7d0d65..2675270d5ff6a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -35,11 +35,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.Weigher; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; @@ -90,12 +90,10 @@ public class ExternalShuffleBlockResolver { private final TransportConf conf; - private final List recoveryPaths; - - private final String recoveryFileName; - private AtomicBoolean registerExecutorFailed = new AtomicBoolean(false); + private String[] recoveryDirs; + @VisibleForTesting File registeredExecutorFile; @VisibleForTesting @@ -105,58 +103,40 @@ public class ExternalShuffleBlockResolver { "org.apache.spark.shuffle.sort.SortShuffleManager", "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager"); - public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) - throws IOException { - this(conf, registeredExecutorFile, null, null, -1, - Executors.newSingleThreadExecutor( - // Add `spark` prefix because it will run in NM in Yarn mode. - NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner"))); - } - public ExternalShuffleBlockResolver( TransportConf conf, File registeredExecutorFile, - Executor directoryCleaner) throws IOException { - this(conf, registeredExecutorFile, null, null, -1, directoryCleaner); - } - - ExternalShuffleBlockResolver( - TransportConf conf, - List recoveryPaths, - String registeredExecutorFileName, + String[] recoveryDirs, long checkInterval) throws IOException { - this(conf, null, recoveryPaths, registeredExecutorFileName, checkInterval, + this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor( NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner"))); + this.recoveryDirs = recoveryDirs; + if (this.recoveryDirs != null) { + Preconditions.checkArgument(checkInterval > 0, + "Check interval of multiple directories recovery should be a positive number"); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + NettyUtils.createThreadFactory("spark-shuffle-disk-checker")); + executor.scheduleAtFixedRate(this::checkRegisteredExecutorFileAndRecover, + checkInterval, checkInterval, TimeUnit.MILLISECONDS); + } } + public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) + throws IOException { + this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor( + // Add `spark` prefix because it will run in NM in Yarn mode. + NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner"))); + } // Allows tests to have more control over when directories are cleaned up. @VisibleForTesting ExternalShuffleBlockResolver( TransportConf conf, File registeredExecutorFile, - List recoveryPaths, - String recoveryFileName, - long checkInterval, Executor directoryCleaner) throws IOException { this.conf = conf; - if (registeredExecutorFile != null) { - this.registeredExecutorFile = registeredExecutorFile; - this.recoveryPaths = null; - this.recoveryFileName = null; - } else { - this.recoveryPaths = recoveryPaths; - this.recoveryFileName = recoveryFileName; - this.registeredExecutorFile = findRegisteredExecutorFile(); - } - if (checkInterval > 0) { - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( - NettyUtils.createThreadFactory("spark-shuffle-disk-checker")); - executor.scheduleAtFixedRate(() -> { - checkRegisteredExecutorFileAndRecover(); - }, checkInterval, checkInterval, TimeUnit.MILLISECONDS); - } + this.registeredExecutorFile = registeredExecutorFile; String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m"); CacheLoader indexCacheLoader = new CacheLoader() { @@ -185,63 +165,33 @@ public int getRegisteredExecutorsSize() { return executors.size(); } - public File findRegisteredExecutorFile() { - File newFile = null; - if (recoveryPaths != null) { - File latestFile = null; - for (String path : recoveryPaths) { - File registeredExecutorFile = new File(path, recoveryFileName); - if (registeredExecutorFile.exists()) { - if (registeredExecutorFile.canRead() - && registeredExecutorFile.canWrite()) { - if (latestFile == null || registeredExecutorFile.lastModified() > latestFile.lastModified()) { - latestFile = registeredExecutorFile; - } - } - } else { - try { - JavaUtils.deleteRecursively(registeredExecutorFile); - newFile = registeredExecutorFile; - } catch (IOException e) { - logger.warn("Failed to delete old registered executor file " + registeredExecutorFile, e); - } - } - } - if (latestFile == null) { - return newFile; - } - return latestFile; - } - return null; - } - public File saveNewRegisteredExecutorFile() { - if (recoveryPaths != null) { - for (String path : recoveryPaths) { - File newRegisteredExecutorFile = new File(path, recoveryFileName); - if (!newRegisteredExecutorFile.exists()) { - try { - DB newDb = LevelDBProvider.initLevelDB(newRegisteredExecutorFile, CURRENT_VERSION, mapper); - for (Map.Entry entry : executors.entrySet()) { - byte[] key = dbAppExecKey(entry.getKey()); - byte[] value = mapper.writeValueAsString(entry.getValue()).getBytes(StandardCharsets.UTF_8); - db.put(key, value); - } - if (this.db != null) { - try { - this.db.close(); - JavaUtils.deleteRecursively(registeredExecutorFile); - } catch (IOException e) { - logger.warn("Failed to clean up old registered executors file at " + registeredExecutorFile, e); - } + String recoveryFileName = registeredExecutorFile.getName(); + for (String path : recoveryDirs) { + File newRegisteredExecutorFile = new File(path, recoveryFileName); + if (!newRegisteredExecutorFile.exists()) { + try { + DB newDb = LevelDBProvider.initLevelDB(newRegisteredExecutorFile, CURRENT_VERSION, mapper); + for (Map.Entry entry : executors.entrySet()) { + byte[] key = dbAppExecKey(entry.getKey()); + byte[] value = mapper.writeValueAsString(entry.getValue()).getBytes(StandardCharsets.UTF_8); + newDb.put(key, value); + } + // closing old db and clean up old registeredExecutorFile + if (this.db != null) { + try { + this.db.close(); + JavaUtils.deleteRecursively(registeredExecutorFile); + } catch (IOException e) { + logger.warn("Failed to clean up old registered executors file at " + registeredExecutorFile, e); } - this.db = newDb; - this.registeredExecutorFile = newRegisteredExecutorFile; - return newRegisteredExecutorFile; - } catch (Exception e) { - logger.error("Exception occurred while saving registered executors info to new file " + newRegisteredExecutorFile, e); - // continue } + this.db = newDb; + this.registeredExecutorFile = newRegisteredExecutorFile; + return newRegisteredExecutorFile; + } catch (Exception e) { + logger.error("Exception occurred while saving registered executors info to new file " + newRegisteredExecutorFile, e); + // continue } } } @@ -392,9 +342,9 @@ public boolean accept(File dir, String name) { private void checkRegisteredExecutorFileAndRecover() { if (this.registeredExecutorFile == null) { return; - } else if (db != null && (!registeredExecutorFile.exists() || + } else if (!registeredExecutorFile.exists() || !registeredExecutorFile.canRead() || - !registeredExecutorFile.canWrite())) { + !registeredExecutorFile.canWrite()) { saveNewRegisteredExecutorFile(); } else if (registerExecutorFailed.getAndSet(false)) { saveNewRegisteredExecutorFile(); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index faf5b9d7ce02c..9167db313c2ab 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -39,7 +39,9 @@ import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.*; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.LevelDBProvider; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; @@ -85,7 +87,8 @@ public class YarnShuffleService extends AuxiliaryService { private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate"; private static final boolean DEFAULT_SPARK_AUTHENTICATE = false; - private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb"; + @VisibleForTesting + static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb"; private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb"; // Whether failure during service initialization should stop the NM. @@ -93,14 +96,16 @@ public class YarnShuffleService extends AuxiliaryService { static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure"; private static final boolean DEFAULT_STOP_ON_FAILURE = false; - // Whether enable multiple disk recovery on registered executors, we assume that - // yarn.nodemanager.local-dirs are mounted on multiple disks. + // Whether enable multiple directories recovery on registered executors info, + // If enabled, both _recoveryPath and yarn.nodemanager.local-dirs are used for + // recovery, and they are supposed to be mounted on multiple disks. @VisibleForTesting - static final String REGISTERED_EXECUTORS_MULTIPLE_DISK_RECOVERY = - "spark.yarn.shuffle.registeredExecutors.multipleDiskRecovery"; - private static final boolean DEFAULT_REGISTERED_EXECUTORS_MULTIPLE_DISK_RECOVERY = false; + static final String REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY = + "spark.yarn.shuffle.registeredExecutors.multipleDirsRecovery"; + private static final boolean DEFAULT_REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY = false; // Disk check interval in milliseconds - private static final String REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL = + @VisibleForTesting + static final String REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL = "spark.yarn.shuffle.registeredExecutorsFile.checkInterval"; private static final long DEFAULT_REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL = 60 * 1000; @@ -168,8 +173,10 @@ protected void serviceInit(Configuration conf) throws Exception { _conf = conf; boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); - boolean multiDiskRecovery = conf.getBoolean(REGISTERED_EXECUTORS_MULTIPLE_DISK_RECOVERY, - DEFAULT_REGISTERED_EXECUTORS_MULTIPLE_DISK_RECOVERY); + + // Whether should check multiple directories for recovery + boolean multiDirsRecovery = conf.getBoolean(REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY, + DEFAULT_REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY); try { // In case this NM was killed while there were running spark applications, we need to restore @@ -178,21 +185,27 @@ protected void serviceInit(Configuration conf) throws Exception { // an application was stopped while the NM was down, we expect yarn to call stopApplication() // when it comes back TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); + // Directories for recovery + String[] recoveryDirs = null; + // The interval for checking registered executor file and failing over if it's broken. + // Can only be used with multiple directories recovery + long checkInterval = -1; + if (_recoveryPath != null) { - if (multiDiskRecovery) { - List recoveryPaths = Lists.newArrayList(); - recoveryPaths.add(_recoveryPath.toUri().getPath()); - Collection localDirs = conf.getTrimmedStringCollection("yarn.nodemanager.local-dirs"); - recoveryPaths.addAll(localDirs); - blockHandler = new ExternalShuffleBlockHandler(transportConf, recoveryPaths, RECOVERY_FILE_NAME, - conf.getLong(REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL, DEFAULT_REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL)); + if (multiDirsRecovery) { + List recoveryDirList = Lists.newArrayList(); + recoveryDirList.add(_recoveryPath.toUri().getPath()); + Collection localDirs = conf.getTrimmedStringCollection(YarnConfiguration.NM_LOCAL_DIRS); + recoveryDirList.addAll(localDirs); + recoveryDirs = recoveryDirList.toArray(new String[0]); + registeredExecutorFile = findRegisteredExecutorFile(recoveryDirs, RECOVERY_FILE_NAME); + checkInterval = conf.getLong(REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL, + DEFAULT_REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL); } else { registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); - blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); } - } else { - blockHandler = new ExternalShuffleBlockHandler(transportConf, null); } + blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile, recoveryDirs, checkInterval); // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests @@ -419,6 +432,40 @@ protected File initRecoveryDb(String dbName) { return new File(_recoveryPath.toUri().getPath(), dbName); } + public static File findRegisteredExecutorFile( + String[] recoveryPaths, String recoveryFileName) { + File newFile = null; + if (recoveryPaths != null) { + File latestFile = null; + for (String path : recoveryPaths) { + File registeredExecutorFile = new File(path, recoveryFileName); + if (registeredExecutorFile.exists()) { + if (registeredExecutorFile.canRead() + && registeredExecutorFile.canWrite()) { + if (latestFile == null || registeredExecutorFile.lastModified() > latestFile.lastModified()) { + latestFile = registeredExecutorFile; + } + } + } else { + if (newFile == null) { + try { + JavaUtils.deleteRecursively(registeredExecutorFile); + newFile = registeredExecutorFile; + } catch (IOException e) { + logger.warn("Failed to delete old registered executor file " + registeredExecutorFile, e); + } + } + } + } + if (latestFile == null) { + return newFile; + } + return latestFile; + } + return null; + } + + /** * Simply encodes an application ID. */ diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala index 1fed2562fcadb..7882cdc189b1e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala @@ -44,6 +44,11 @@ object ShuffleTestAccessor { Option(resolver.executors.get(id)) } + def getAllExecutorInfos(resolver: ExternalShuffleBlockResolver): + ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = { + resolver.executors + } + def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = { resolver.registeredExecutorFile } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index f6fb1fbc1223f..584e3314055a9 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -381,58 +381,76 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s1.secretsFile should be (null) } - test("shuffle serivce should be robust if multiple disk recovery enabled") { - yarnConfig.set("spark.yarn.shuffle.registeredExecutors.multipleDiskRecovery", "true") - yarnConfig.set("spark.yarn.shuffle.registeredExecutorsFile.checkInterval", "500") + test("shuffle service should be robust when disk error if multiple disk recovery enabled") { + yarnConfig.set(YarnShuffleService.REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY, "true") + yarnConfig.set(YarnShuffleService.REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL, "500") s1 = new YarnShuffleService val recoveryPath = new Path(recoveryLocalDir.toURI.getPath) s1.setRecoveryPath(recoveryPath) s1.init(yarnConfig) + val appId1 = ApplicationId.newInstance(0, 1); val app1 = makeAppInfo("user1", appId1) - s1.initializeApplication(app1) + val appId2 = ApplicationId.newInstance(0, 2) + val app2 = makeAppInfo("user1", appId2) val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) val shuffleInfo2 = new ExecutorShuffleInfo(Array("/foo2", "/bar2"), 3, SORT_MANAGER) val shuffleInfo3 = new ExecutorShuffleInfo(Array("/foo3", "/bar3"), 3, SORT_MANAGER) val blockResolver = ShuffleTestAccessor.getBlockResolver(s1.blockHandler) - val execStateFile = ShuffleTestAccessor.registeredExecutorFile(blockResolver) + val execStateFile1 = ShuffleTestAccessor.registeredExecutorFile(blockResolver) + assert(execStateFile1.getParentFile === recoveryLocalDir) + s1.initializeApplication(app1) blockResolver.registerExecutor(appId1.toString, "exec1", shuffleInfo1) val getExecInfo1 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec1", blockResolver) getExecInfo1 should be (Some(shuffleInfo1)) - execStateFile.setWritable(false) - Thread.sleep(5000) - s1.stopApplication(new ApplicationTerminationContext(appId1)) - s1.stop() - execStateFile.listFiles().foreach(_.delete()) - execStateFile.delete() - - - s2 = new YarnShuffleService - s2.setRecoveryPath(recoveryPath) - s2.init(yarnConfig) - val blockResolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler) - val execStateFile2 = ShuffleTestAccessor.registeredExecutorFile(blockResolver2) - - assert(execStateFile !== execStateFile2) - val getExecInfo2 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec1", blockResolver2) - getExecInfo2 should be (Some(shuffleInfo1)) - blockResolver.registerExecutor(appId1.toString, "exec2", shuffleInfo2) - val getExecInfo3 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec2", blockResolver2) - getExecInfo3 should be (Some(shuffleInfo2)) - - val appId2 = ApplicationId.newInstance(0, 2) - val app2 = makeAppInfo("user1", appId2) - s2.initializeApplication(app2) - - blockResolver2.registerExecutor(appId2.toString, "exec1", shuffleInfo3) - val getExecInfo4 = ShuffleTestAccessor.getExecutorInfo(appId2, "exec1", blockResolver2) - getExecInfo4 should be (Some(shuffleInfo3)) + // Simulate disk error + recoveryLocalDir.setWritable(false) + execStateFile1.setWritable(false) + Thread.sleep(2000) + try { + // Test whether the failover of registeredExecutorFile happen when encountered disk error + val execStateFile2 = ShuffleTestAccessor.registeredExecutorFile(blockResolver) + assert(execStateFile1 !== execStateFile2) + val getExecInfo2 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec1", blockResolver) + getExecInfo2 should be (Some(shuffleInfo1)) + + // Assume restarts happen here. + // Test whether we can find the last available registeredExecutorFile if restarts + val recoveryDirs = Seq(recoveryPath.toUri.getPath).toArray ++ + yarnConfig.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS) + val findResult = YarnShuffleService.findRegisteredExecutorFile( + recoveryDirs, YarnShuffleService.RECOVERY_FILE_NAME) + assert(findResult === execStateFile2) + + // Test whether registeredExecutors reloaded from leveldb is the same with the one + // before restarts + val currentDB = ShuffleTestAccessor.shuffleServiceLevelDB(blockResolver) + val reloadExecutors1 = ShuffleTestAccessor.reloadRegisteredExecutors(currentDB) + reloadExecutors1 should be (ShuffleTestAccessor.getAllExecutorInfos(blockResolver)) + + blockResolver.registerExecutor(appId1.toString, "exec2", shuffleInfo2) + val getExecInfo3 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec2", blockResolver) + getExecInfo3 should be(Some(shuffleInfo2)) + val reloadExecutors2 = ShuffleTestAccessor.reloadRegisteredExecutors(currentDB) + reloadExecutors2 should be (ShuffleTestAccessor.getAllExecutorInfos(blockResolver)) + + s1.initializeApplication(app2) + blockResolver.registerExecutor(appId2.toString, "exec1", shuffleInfo3) + val getExecInfo4 = ShuffleTestAccessor.getExecutorInfo(appId2, "exec1", blockResolver) + getExecInfo4 should be(Some(shuffleInfo3)) + val reloadExecutors3 = ShuffleTestAccessor.reloadRegisteredExecutors(currentDB) + reloadExecutors3 should be (ShuffleTestAccessor.getAllExecutorInfos(blockResolver)) - s2.stopApplication(new ApplicationTerminationContext(appId2)) - s2.stop() + } finally { + s1.stopApplication(new ApplicationTerminationContext(appId1)) + s1.stopApplication(new ApplicationTerminationContext(appId2)) + s1.stop() + recoveryLocalDir.setWritable(true) + execStateFile1.setWritable(true) + } } }