From 9a1724de0287b5ca41e30f3d3401fd721a2e1520 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 15 Mar 2016 11:21:09 +0900 Subject: [PATCH 01/18] Add a test to check if the stage graph is properly built. --- .../spark/scheduler/DAGSchedulerSuite.scala | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 55f4190680dd5..67377cb121296 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -321,6 +321,47 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1)) } + /* + * <-------------------- + * / \ + * [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] + * \ / + * <-------------------- + */ + test("parent stages") { + val rddA = new MyRDD(sc, 1, Nil) + + val shuffleDef1 = new ShuffleDependency(rddA, new HashPartitioner(1)) + val rddB = new MyRDD(sc, 1, List(shuffleDef1), tracker = mapOutputTracker) + + val shuffleDef2 = new ShuffleDependency(rddB, new HashPartitioner(1)) + val rddC = new MyRDD(sc, 1, List(shuffleDef2), tracker = mapOutputTracker) + + val shuffleDef3 = new ShuffleDependency(rddC, new HashPartitioner(1)) + val rddD = new MyRDD(sc, 1, List(shuffleDef3, new OneToOneDependency(rddB)), + tracker = mapOutputTracker) + + val shuffleDef4 = new ShuffleDependency(rddD, new HashPartitioner(1)) + val rddE = new MyRDD(sc, 1, List(shuffleDef4, new OneToOneDependency(rddC)), + tracker = mapOutputTracker) + submit(rddE, Array(0)) + + assert(scheduler.shuffleToMapStage.size === 4) + assert(scheduler.activeJobs.size === 1) + + val mapStage1 = scheduler.shuffleToMapStage(shuffleDef1.shuffleId) + val mapStage2 = scheduler.shuffleToMapStage(shuffleDef2.shuffleId) + val mapStage3 = scheduler.shuffleToMapStage(shuffleDef3.shuffleId) + val mapStage4 = scheduler.shuffleToMapStage(shuffleDef4.shuffleId) + val finalStage = scheduler.activeJobs.head.finalStage + + assert(mapStage1.parents.isEmpty) + assert(mapStage2.parents === List(mapStage1)) + assert(mapStage3.parents === List(mapStage2)) + assert(mapStage4.parents === List(mapStage1, mapStage3)) + assert(finalStage.parents === List(mapStage2, mapStage4)) + } + test("zero split job") { var numResults = 0 var failureReason: Option[Exception] = None From f8b7910ecb52a5954de091ed79d5de9c19ba2744 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 15 Mar 2016 11:22:42 +0900 Subject: [PATCH 02/18] Make DAGScheduler.getAncestorShuffleDependencies() return in topological order to ensure building ancestor stages first. --- .../apache/spark/scheduler/DAGScheduler.scala | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8a36af27bdd27..bcdf9127e2c46 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -403,24 +403,34 @@ class DAGScheduler( parents.toList } - /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ - private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { + /** + * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet, + * in topological order to ensure building ancestor stages first. + */ + private def getAncestorShuffleDependencies(rdd: RDD[_]): List[ShuffleDependency[_, _, _]] = { val parents = new Stack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { - if (!visited(r)) { + val deps = r.dependencies.filter { + case shufDep: ShuffleDependency[_, _, _] => + !shuffleToMapStage.contains(shufDep.shuffleId) + case _ => true + } + if (deps.forall(dep => visited(dep.rdd))) { visited += r - for (dep <- r.dependencies) { + for (dep <- deps) { dep match { case shufDep: ShuffleDependency[_, _, _] => - if (!shuffleToMapStage.contains(shufDep.shuffleId)) { - parents.push(shufDep) - } + parents.push(shufDep) case _ => } + } + } else { + waitingForVisit.push(r) + for (dep <- deps) { waitingForVisit.push(dep.rdd) } } @@ -430,7 +440,7 @@ class DAGScheduler( while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } - parents + parents.toList.reverse } private def getMissingParentStages(stage: Stage): List[Stage] = { From 0ea3fc838f689729794b6ea3aaf0b88a339ec20c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 16 Mar 2016 11:04:45 +0900 Subject: [PATCH 03/18] Refactor getAncestorShuffleDependencies. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index bcdf9127e2c46..803af76b90538 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec import scala.collection.Map -import scala.collection.mutable.{HashMap, HashSet, Stack} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Stack} import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.existentials @@ -407,8 +407,8 @@ class DAGScheduler( * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet, * in topological order to ensure building ancestor stages first. */ - private def getAncestorShuffleDependencies(rdd: RDD[_]): List[ShuffleDependency[_, _, _]] = { - val parents = new Stack[ShuffleDependency[_, _, _]] + private def getAncestorShuffleDependencies(rdd: RDD[_]): Seq[ShuffleDependency[_, _, _]] = { + val parents = new ArrayBuffer[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting @@ -424,13 +424,13 @@ class DAGScheduler( for (dep <- deps) { dep match { case shufDep: ShuffleDependency[_, _, _] => - parents.push(shufDep) + parents += shufDep case _ => } } } else { waitingForVisit.push(r) - for (dep <- deps) { + for (dep <- deps if !visited(dep.rdd)) { waitingForVisit.push(dep.rdd) } } @@ -440,7 +440,7 @@ class DAGScheduler( while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } - parents.toList.reverse + parents } private def getMissingParentStages(stage: Stage): List[Stage] = { From 697b32208262b3c1c10bc2cae43b891c7970811d Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 16 Mar 2016 21:55:50 +0900 Subject: [PATCH 04/18] Fix topological sort. --- .../apache/spark/scheduler/DAGScheduler.scala | 36 ++++++++++--------- .../spark/scheduler/DAGSchedulerSuite.scala | 16 ++++++--- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 803af76b90538..d75e6a46879cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -414,24 +414,26 @@ class DAGScheduler( // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { - val deps = r.dependencies.filter { - case shufDep: ShuffleDependency[_, _, _] => - !shuffleToMapStage.contains(shufDep.shuffleId) - case _ => true - } - if (deps.forall(dep => visited(dep.rdd))) { - visited += r - for (dep <- deps) { - dep match { - case shufDep: ShuffleDependency[_, _, _] => - parents += shufDep - case _ => - } + if (!visited(r)) { + val deps = r.dependencies.filter { + case shufDep: ShuffleDependency[_, _, _] => + !shuffleToMapStage.contains(shufDep.shuffleId) + case _ => true } - } else { - waitingForVisit.push(r) - for (dep <- deps if !visited(dep.rdd)) { - waitingForVisit.push(dep.rdd) + if (deps.forall(dep => visited(dep.rdd))) { + visited += r + for (dep <- deps) { + dep match { + case shufDep: ShuffleDependency[_, _, _] => + parents += shufDep + case _ => + } + } + } else { + waitingForVisit.push(r) + for (dep <- deps if !visited(dep.rdd)) { + waitingForVisit.push(dep.rdd) + } } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 67377cb121296..02fe5b0652344 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -324,7 +324,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou /* * <-------------------- * / \ - * [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] + * [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] <--(5)-- [F] * \ / * <-------------------- */ @@ -342,24 +342,30 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou tracker = mapOutputTracker) val shuffleDef4 = new ShuffleDependency(rddD, new HashPartitioner(1)) - val rddE = new MyRDD(sc, 1, List(shuffleDef4, new OneToOneDependency(rddC)), + val rddE = new MyRDD(sc, 1, List(new OneToOneDependency(rddC), shuffleDef4), tracker = mapOutputTracker) - submit(rddE, Array(0)) - assert(scheduler.shuffleToMapStage.size === 4) + val shuffleDef5 = new ShuffleDependency(rddE, new HashPartitioner(1)) + val rddF = new MyRDD(sc, 1, List(shuffleDef5), + tracker = mapOutputTracker) + submit(rddF, Array(0)) + + assert(scheduler.shuffleToMapStage.size === 5) assert(scheduler.activeJobs.size === 1) val mapStage1 = scheduler.shuffleToMapStage(shuffleDef1.shuffleId) val mapStage2 = scheduler.shuffleToMapStage(shuffleDef2.shuffleId) val mapStage3 = scheduler.shuffleToMapStage(shuffleDef3.shuffleId) val mapStage4 = scheduler.shuffleToMapStage(shuffleDef4.shuffleId) + val mapStage5 = scheduler.shuffleToMapStage(shuffleDef5.shuffleId) val finalStage = scheduler.activeJobs.head.finalStage assert(mapStage1.parents.isEmpty) assert(mapStage2.parents === List(mapStage1)) assert(mapStage3.parents === List(mapStage2)) assert(mapStage4.parents === List(mapStage1, mapStage3)) - assert(finalStage.parents === List(mapStage2, mapStage4)) + assert(mapStage5.parents === List(mapStage2, mapStage4)) + assert(finalStage.parents === List(mapStage5)) } test("zero split job") { From 1636531c65912bbfb68e4c669690a9f9107d9cd1 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 28 Mar 2016 16:01:27 +0900 Subject: [PATCH 05/18] Add assertion to check not to overwrite illegally. --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3b730804fe289..b04da419ac722 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -286,6 +286,7 @@ class DAGScheduler( case None => // We are going to register ancestor shuffle dependencies getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => + assert(!shuffleToMapStage.get(dep.shuffleId).isDefined) shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) } // Then register current shuffleDep From 92e9f4484b09f65829f6e9300042cc2b57979278 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 28 Mar 2016 16:19:09 +0900 Subject: [PATCH 06/18] Modify to mitigate adds extra push&pop. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b04da419ac722..e1a5c8eb0a254 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -422,6 +422,7 @@ class DAGScheduler( case _ => true } if (deps.forall(dep => visited(dep.rdd))) { + waitingForVisit.pop() visited += r for (dep <- deps) { dep match { @@ -431,17 +432,18 @@ class DAGScheduler( } } } else { - waitingForVisit.push(r) for (dep <- deps if !visited(dep.rdd)) { waitingForVisit.push(dep.rdd) } } + } else { + waitingForVisit.pop() } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { - visit(waitingForVisit.pop()) + visit(waitingForVisit.top) } parents } From 4b412f5e73ca9cf5ab2de1a51f6c30f01286e89a Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 28 Mar 2016 16:48:42 +0900 Subject: [PATCH 07/18] Modify comment. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e1a5c8eb0a254..9959a1a699530 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -405,8 +405,9 @@ class DAGScheduler( } /** - * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet, - * in topological order to ensure building ancestor stages first. + * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet. + * This is done in topological order to create ancestor stages first to ensure that the result + * stage graph is correctly built. */ private def getAncestorShuffleDependencies(rdd: RDD[_]): Seq[ShuffleDependency[_, _, _]] = { val parents = new ArrayBuffer[ShuffleDependency[_, _, _]] From 8fb9a149a03543a35c2a08c79edc53d49f66b5c2 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 28 Mar 2016 17:11:37 +0900 Subject: [PATCH 08/18] Add a comment to explain what the test is doing. --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index afbd00ff4bf00..037f1766f27fd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -322,12 +322,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1)) } - /* + /** + * This test ensures that DAGScheduler build stage graph correctly. + * Here, we submit an RDD[F] having a linage of RDDs as follows: + * * <-------------------- * / \ * [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] <--(5)-- [F] * \ / * <-------------------- + * + * then check if all stages have correct parent stages. + * Note: [] means an RDD, () means a shuffle dependency. */ test("parent stages") { val rddA = new MyRDD(sc, 1, Nil) From e2cfeaf3ef5a7291a235bbcbb968d88959e52e93 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 29 Mar 2016 12:22:36 +0900 Subject: [PATCH 09/18] Revert "Add assertion to check not to overwrite illegally." This reverts commit 1636531c65912bbfb68e4c669690a9f9107d9cd1. --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9959a1a699530..88911e120d6b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -286,7 +286,6 @@ class DAGScheduler( case None => // We are going to register ancestor shuffle dependencies getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => - assert(!shuffleToMapStage.get(dep.shuffleId).isDefined) shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) } // Then register current shuffleDep From 3a8ff84622c3f136fa3511561a789163c94b2f2e Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 5 Apr 2016 11:58:53 +0900 Subject: [PATCH 10/18] Modify to cut down on the repeated scanning of data structures. --- .../apache/spark/scheduler/DAGScheduler.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 88911e120d6b3..53372b1519421 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -415,29 +415,29 @@ class DAGScheduler( // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { - if (!visited(r)) { - val deps = r.dependencies.filter { - case shufDep: ShuffleDependency[_, _, _] => - !shuffleToMapStage.contains(shufDep.shuffleId) - case _ => true + if (visited(r)) { + waitingForVisit.pop() + } else { + val visitedShuffleDeps = new ArrayBuffer[ShuffleDependency[_, _, _]] + val unvisitedDeps = new ArrayBuffer[Dependency[_]] + + r.dependencies.foreach { + case dep: ShuffleDependency[_, _, _] if !shuffleToMapStage.contains(dep.shuffleId) => + if (visited(dep.rdd)) visitedShuffleDeps += dep + else unvisitedDeps += dep + case dep if !visited(dep.rdd) => unvisitedDeps += dep + case _ => } - if (deps.forall(dep => visited(dep.rdd))) { + + if (unvisitedDeps.isEmpty) { waitingForVisit.pop() visited += r - for (dep <- deps) { - dep match { - case shufDep: ShuffleDependency[_, _, _] => - parents += shufDep - case _ => - } - } + for (shufDep <- visitedShuffleDeps) { parents += shufDep } } else { - for (dep <- deps if !visited(dep.rdd)) { + for (dep <- unvisitedDeps) { waitingForVisit.push(dep.rdd) } } - } else { - waitingForVisit.pop() } } From b2bd75c81af0f5d377e2dfe2ed59460fced20da4 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 25 Apr 2016 11:59:19 +0900 Subject: [PATCH 11/18] Move a comment of the order of the returned dependencies into the method. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 53372b1519421..68354a0f5244b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -405,10 +405,10 @@ class DAGScheduler( /** * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet. - * This is done in topological order to create ancestor stages first to ensure that the result - * stage graph is correctly built. */ private def getAncestorShuffleDependencies(rdd: RDD[_]): Seq[ShuffleDependency[_, _, _]] = { + // This is done in topological order to create ancestor stages first to ensure that the result + // stage graph is correctly built. val parents = new ArrayBuffer[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError From 3281f0eed1e76af658567a32e70a7ae2b79024e9 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 25 Apr 2016 12:05:07 +0900 Subject: [PATCH 12/18] Add a JIRA name to the test name. --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 037f1766f27fd..0a4e5c7a88629 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -335,7 +335,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou * then check if all stages have correct parent stages. * Note: [] means an RDD, () means a shuffle dependency. */ - test("parent stages") { + test("[SPARK-13902] parent stages") { val rddA = new MyRDD(sc, 1, Nil) val shuffleDef1 = new ShuffleDependency(rddA, new HashPartitioner(1)) From cab52643eddfc7d08673f8e0437a0c71adeb93d5 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 25 Apr 2016 12:20:31 +0900 Subject: [PATCH 13/18] Fix style. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 68354a0f5244b..5ab75001301ea 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -423,16 +423,22 @@ class DAGScheduler( r.dependencies.foreach { case dep: ShuffleDependency[_, _, _] if !shuffleToMapStage.contains(dep.shuffleId) => - if (visited(dep.rdd)) visitedShuffleDeps += dep - else unvisitedDeps += dep - case dep if !visited(dep.rdd) => unvisitedDeps += dep + if (visited(dep.rdd)) { + visitedShuffleDeps += dep + } else { + unvisitedDeps += dep + } + case dep if !visited(dep.rdd) => + unvisitedDeps += dep case _ => } if (unvisitedDeps.isEmpty) { waitingForVisit.pop() visited += r - for (shufDep <- visitedShuffleDeps) { parents += shufDep } + for (shufDep <- visitedShuffleDeps) { + parents += shufDep + } } else { for (dep <- unvisitedDeps) { waitingForVisit.push(dep.rdd) From 1b6872479dc998b65db4c20b1aaa0361ed80bd03 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 4 May 2016 12:19:33 +0900 Subject: [PATCH 14/18] Revert DAGScheduler. --- .../apache/spark/scheduler/DAGScheduler.scala | 51 ++++++------------- 1 file changed, 15 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5ab75001301ea..5cdc91316b696 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec import scala.collection.Map -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Stack} +import scala.collection.mutable.{HashMap, HashSet, Stack} import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps @@ -403,53 +403,32 @@ class DAGScheduler( parents.toList } - /** - * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet. - */ - private def getAncestorShuffleDependencies(rdd: RDD[_]): Seq[ShuffleDependency[_, _, _]] = { - // This is done in topological order to create ancestor stages first to ensure that the result - // stage graph is correctly built. - val parents = new ArrayBuffer[ShuffleDependency[_, _, _]] + /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ + private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { + val parents = new Stack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { - if (visited(r)) { - waitingForVisit.pop() - } else { - val visitedShuffleDeps = new ArrayBuffer[ShuffleDependency[_, _, _]] - val unvisitedDeps = new ArrayBuffer[Dependency[_]] - - r.dependencies.foreach { - case dep: ShuffleDependency[_, _, _] if !shuffleToMapStage.contains(dep.shuffleId) => - if (visited(dep.rdd)) { - visitedShuffleDeps += dep - } else { - unvisitedDeps += dep - } - case dep if !visited(dep.rdd) => - unvisitedDeps += dep - case _ => - } - - if (unvisitedDeps.isEmpty) { - waitingForVisit.pop() - visited += r - for (shufDep <- visitedShuffleDeps) { - parents += shufDep - } - } else { - for (dep <- unvisitedDeps) { - waitingForVisit.push(dep.rdd) + if (!visited(r)) { + visited += r + for (dep <- r.dependencies) { + dep match { + case shufDep: ShuffleDependency[_, _, _] => + if (!shuffleToMapStage.contains(shufDep.shuffleId)) { + parents.push(shufDep) + } + case _ => } + waitingForVisit.push(dep.rdd) } } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { - visit(waitingForVisit.top) + visit(waitingForVisit.pop()) } parents } From ab92488a7c92d1e7824021dc2113df370bb2d79f Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 4 May 2016 12:19:52 +0900 Subject: [PATCH 15/18] Use #8923 change. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5cdc91316b696..3ca7bae563c87 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -286,7 +286,9 @@ class DAGScheduler( case None => // We are going to register ancestor shuffle dependencies getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => - shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) + if (!shuffleToMapStage.contains(dep.shuffleId)) { + shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) + } } // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) From b4e2eb1557dedc34b9a57b371e11ade2693ff38a Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 9 May 2016 14:39:16 +0900 Subject: [PATCH 16/18] Use simpler example. --- .../spark/scheduler/DAGSchedulerSuite.scala | 64 +++++++++---------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0a4e5c7a88629..c6eeba5054d4e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -324,55 +324,49 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou /** * This test ensures that DAGScheduler build stage graph correctly. - * Here, we submit an RDD[F] having a linage of RDDs as follows: * - * <-------------------- - * / \ - * [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] <--(5)-- [F] - * \ / - * <-------------------- + * Suppose you have the following DAG: + * + * [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D] + * \ / + * <------------- + * + * Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both B and A. + * The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example easier to + * understand, let's call the shuffled data from A shuffle dependency ID s_A and the shuffled data + * from B shuffle dependency ID s_B. * - * then check if all stages have correct parent stages. * Note: [] means an RDD, () means a shuffle dependency. */ - test("[SPARK-13902] parent stages") { + test("[SPARK-13902] not to create duplicate stage.") { val rddA = new MyRDD(sc, 1, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1)) + val s_A = shuffleDepA.shuffleId - val shuffleDef1 = new ShuffleDependency(rddA, new HashPartitioner(1)) - val rddB = new MyRDD(sc, 1, List(shuffleDef1), tracker = mapOutputTracker) + val rddB = new MyRDD(sc, 1, List(shuffleDepA), tracker = mapOutputTracker) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1)) + val s_B = shuffleDepB.shuffleId - val shuffleDef2 = new ShuffleDependency(rddB, new HashPartitioner(1)) - val rddC = new MyRDD(sc, 1, List(shuffleDef2), tracker = mapOutputTracker) + val rddC = new MyRDD(sc, 1, List(shuffleDepA, shuffleDepB), tracker = mapOutputTracker) + val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1)) + val s_C = shuffleDepC.shuffleId - val shuffleDef3 = new ShuffleDependency(rddC, new HashPartitioner(1)) - val rddD = new MyRDD(sc, 1, List(shuffleDef3, new OneToOneDependency(rddB)), - tracker = mapOutputTracker) + val rddD = new MyRDD(sc, 1, List(shuffleDepC), tracker = mapOutputTracker) - val shuffleDef4 = new ShuffleDependency(rddD, new HashPartitioner(1)) - val rddE = new MyRDD(sc, 1, List(new OneToOneDependency(rddC), shuffleDef4), - tracker = mapOutputTracker) - - val shuffleDef5 = new ShuffleDependency(rddE, new HashPartitioner(1)) - val rddF = new MyRDD(sc, 1, List(shuffleDef5), - tracker = mapOutputTracker) - submit(rddF, Array(0)) + submit(rddD, Array(0)) - assert(scheduler.shuffleToMapStage.size === 5) + assert(scheduler.shuffleToMapStage.size === 3) assert(scheduler.activeJobs.size === 1) - val mapStage1 = scheduler.shuffleToMapStage(shuffleDef1.shuffleId) - val mapStage2 = scheduler.shuffleToMapStage(shuffleDef2.shuffleId) - val mapStage3 = scheduler.shuffleToMapStage(shuffleDef3.shuffleId) - val mapStage4 = scheduler.shuffleToMapStage(shuffleDef4.shuffleId) - val mapStage5 = scheduler.shuffleToMapStage(shuffleDef5.shuffleId) + val mapStageA = scheduler.shuffleToMapStage(s_A) + val mapStageB = scheduler.shuffleToMapStage(s_B) + val mapStageC = scheduler.shuffleToMapStage(s_C) val finalStage = scheduler.activeJobs.head.finalStage - assert(mapStage1.parents.isEmpty) - assert(mapStage2.parents === List(mapStage1)) - assert(mapStage3.parents === List(mapStage2)) - assert(mapStage4.parents === List(mapStage1, mapStage3)) - assert(mapStage5.parents === List(mapStage2, mapStage4)) - assert(finalStage.parents === List(mapStage5)) + assert(mapStageA.parents.isEmpty) + assert(mapStageB.parents === List(mapStageA)) + assert(mapStageC.parents === List(mapStageA, mapStageB)) + assert(finalStage.parents === List(mapStageC)) } test("zero split job") { From 55d6b6db26aba0054c0da87d66404a68385ad3ff Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 9 May 2016 14:46:09 +0900 Subject: [PATCH 17/18] Fix scalastyle. --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index c6eeba5054d4e..cefbfde75198b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -331,10 +331,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou * \ / * <------------- * - * Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both B and A. - * The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example easier to - * understand, let's call the shuffled data from A shuffle dependency ID s_A and the shuffled data - * from B shuffle dependency ID s_B. + * Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both + * B and A. The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example + * easier to understand, let's call the shuffled data from A shuffle dependency ID s_A and the + * shuffled data from B shuffle dependency ID s_B. * * Note: [] means an RDD, () means a shuffle dependency. */ From 3ceb4d52673077eb511bfbf282fdbbc3f005f638 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 12 May 2016 09:55:48 +0900 Subject: [PATCH 18/18] Change a test name. --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index cefbfde75198b..21f929526debc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -338,7 +338,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou * * Note: [] means an RDD, () means a shuffle dependency. */ - test("[SPARK-13902] not to create duplicate stage.") { + test("[SPARK-13902] Ensure no duplicate stages are created") { val rddA = new MyRDD(sc, 1, Nil) val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1)) val s_A = shuffleDepA.shuffleId