From 740dc1923d75f3980ad3e815d36af0685fe2aaef Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Tue, 23 May 2017 16:16:57 -0700 Subject: [PATCH 1/3] bugfix --- .../spark/storage/BlockInfoManager.scala | 15 ++++++---- .../apache/spark/storage/BlockManager.scala | 29 ++++++++++++++----- .../scala/org/apache/spark/rdd/RDDSuite.scala | 18 +++++++++++- 3 files changed, 49 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 3db59837fbebd..7064872ec1c77 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -281,22 +281,27 @@ private[storage] class BlockInfoManager extends Logging { /** * Release a lock on the given block. + * In case a TaskContext is not propagated properly to all child threads for the task, we fail to + * get the TID from TaskContext, so we have to explicitly pass the TID value to release the lock. + * + * See SPARK-18406 for more discussion of this issue. */ - def unlock(blockId: BlockId): Unit = synchronized { - logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId") + def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized { + val taskId = taskAttemptId.getOrElse(currentTaskAttemptId) + logTrace(s"Task $taskId releasing lock for $blockId") val info = get(blockId).getOrElse { throw new IllegalStateException(s"Block $blockId not found") } if (info.writerTask != BlockInfo.NO_WRITER) { info.writerTask = BlockInfo.NO_WRITER - writeLocksByTask.removeBinding(currentTaskAttemptId, blockId) + writeLocksByTask.removeBinding(taskId, blockId) } else { assert(info.readerCount > 0, s"Block $blockId is not locked for reading") info.readerCount -= 1 - val countsForTask = readLocksByTask(currentTaskAttemptId) + val countsForTask = readLocksByTask(taskId) val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1 assert(newPinCountForTask >= 0, - s"Task $currentTaskAttemptId release lock on block $blockId more times than it acquired it") + s"Task $taskId release lock on block $blockId more times than it acquired it") } notifyAll() } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 137d24b525155..aefa3add31de9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -501,6 +501,8 @@ private[spark] class BlockManager( case Some(info) => val level = info.level logDebug(s"Level for block $blockId is $level") + val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId()) + .getOrElse(BlockInfo.NON_TASK_WRITER) if (level.useMemory && memoryStore.contains(blockId)) { val iter: Iterator[Any] = if (level.deserialized) { memoryStore.getValues(blockId).get @@ -508,7 +510,9 @@ private[spark] class BlockManager( serializerManager.dataDeserializeStream( blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag) } - val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) + val ci = CompletionIterator[Any, Iterator[Any]](iter, { + releaseLock(blockId, Some(taskAttemptId)) + }) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) } else if (level.useDisk && diskStore.contains(blockId)) { val diskData = diskStore.getBytes(blockId) @@ -525,8 +529,9 @@ private[spark] class BlockManager( serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) } } - val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, - releaseLockAndDispose(blockId, diskData)) + val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, { + releaseLockAndDispose(blockId, diskData, Some(taskAttemptId)) + }) Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } else { handleLocalReadFailure(blockId) @@ -713,8 +718,15 @@ private[spark] class BlockManager( /** * Release a lock on the given block. */ - def releaseLock(blockId: BlockId): Unit = { - blockInfoManager.unlock(blockId) + def releaseLock(blockId: BlockId): Unit = releaseLock(blockId, taskAttemptId = None) + + /** + * Release a lock on the given block with explicit TID. + * This method should be used in case we can't get the correct TID from TaskContext, for example, + * the input iterator of a cached RDD iterates to the end in a child thread. + */ + def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit = { + blockInfoManager.unlock(blockId, taskAttemptId) } /** @@ -1467,8 +1479,11 @@ private[spark] class BlockManager( } } - def releaseLockAndDispose(blockId: BlockId, data: BlockData): Unit = { - blockInfoManager.unlock(blockId) + def releaseLockAndDispose( + blockId: BlockId, + data: BlockData, + taskAttemptId: Option[Long] = None): Unit = { + releaseLock(blockId, taskAttemptId) data.dispose() } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index ad56715656c85..8d06f5468f4f1 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.{FileSplit, TextInputFormat} import org.apache.spark._ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDDSuiteUtils._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} class RDDSuite extends SparkFunSuite with SharedSparkContext { var tempDir: File = _ @@ -1082,6 +1082,22 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(totalPartitionCount == 10) } + test("SPARK-18406: race between end-of-task and completion iterator read lock release") { + val rdd = sc.parallelize(1 to 1000, 10) + rdd.cache() + + rdd.mapPartitions { iter => + ThreadUtils.runInNewThread("TestThread") { + // Iterate to the end of the input iterator, to cause the CompletionIterator completion to + // fire outside of the task's main thread. + while (iter.hasNext) { + iter.next() + } + iter + } + }.collect() + } + // NOTE // Below tests calling sc.stop() have to be the last tests in this suite. If there are tests // running after them and if they access sc those tests will fail as sc is already closed, because From 72cee6eee9fd771e0aebc3cfb6fc6e906b67b351 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Tue, 23 May 2017 18:04:32 -0700 Subject: [PATCH 2/3] update comments and minor refactor. --- .../scala/org/apache/spark/storage/BlockManager.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index aefa3add31de9..85b8314f3444f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -502,7 +502,6 @@ private[spark] class BlockManager( val level = info.level logDebug(s"Level for block $blockId is $level") val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId()) - .getOrElse(BlockInfo.NON_TASK_WRITER) if (level.useMemory && memoryStore.contains(blockId)) { val iter: Iterator[Any] = if (level.deserialized) { memoryStore.getValues(blockId).get @@ -510,8 +509,11 @@ private[spark] class BlockManager( serializerManager.dataDeserializeStream( blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag) } + // We need to capture the current taskId in case the iterator completion is triggered + // from a different thread which does not have TaskContext set; see SPARK-18406 for + // discussion. val ci = CompletionIterator[Any, Iterator[Any]](iter, { - releaseLock(blockId, Some(taskAttemptId)) + releaseLock(blockId, taskAttemptId) }) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) } else if (level.useDisk && diskStore.contains(blockId)) { @@ -530,7 +532,7 @@ private[spark] class BlockManager( } } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, { - releaseLockAndDispose(blockId, diskData, Some(taskAttemptId)) + releaseLockAndDispose(blockId, diskData, taskAttemptId) }) Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } else { From bc66ec52adf0f741a0c533b28ca64e3fef9e848e Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Tue, 23 May 2017 19:36:40 -0700 Subject: [PATCH 3/3] modify the trait BlockDataManager. --- .../org/apache/spark/network/BlockDataManager.scala | 2 +- .../org/apache/spark/storage/BlockManager.scala | 12 ++++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index 8f83668d79029..b3f8bfe8b1d48 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -46,5 +46,5 @@ trait BlockDataManager { /** * Release locks acquired by [[putBlockData()]] and [[getBlockData()]]. */ - def releaseLock(blockId: BlockId): Unit + def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 85b8314f3444f..1689baa832d52 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -717,17 +717,13 @@ private[spark] class BlockManager( blockInfoManager.downgradeLock(blockId) } - /** - * Release a lock on the given block. - */ - def releaseLock(blockId: BlockId): Unit = releaseLock(blockId, taskAttemptId = None) - /** * Release a lock on the given block with explicit TID. - * This method should be used in case we can't get the correct TID from TaskContext, for example, - * the input iterator of a cached RDD iterates to the end in a child thread. + * The param `taskAttemptId` should be passed in case we can't get the correct TID from + * TaskContext, for example, the input iterator of a cached RDD iterates to the end in a child + * thread. */ - def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit = { + def releaseLock(blockId: BlockId, taskAttemptId: Option[Long] = None): Unit = { blockInfoManager.unlock(blockId, taskAttemptId) }