From 4210aa6dc4d837988f101689bab6117404c00eb2 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Fri, 23 Oct 2015 22:30:18 -0400 Subject: [PATCH 01/26] reworked FutureAction and JobWaiter to avoid blocking or consuming threads while waiting for jobs --- .../scala/org/apache/spark/FutureAction.scala | 43 +++-------------- .../apache/spark/scheduler/DAGScheduler.scala | 10 ++-- .../apache/spark/scheduler/JobWaiter.scala | 46 +++++++------------ 3 files changed, 29 insertions(+), 70 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 48792a958130c..b63bdecb9e917 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -116,57 +116,26 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { - if (!atMost.isFinite()) { - awaitResult() - } else jobWaiter.synchronized { - val finishTime = System.currentTimeMillis() + atMost.toMillis - while (!isCompleted) { - val time = System.currentTimeMillis() - if (time >= finishTime) { - throw new TimeoutException - } else { - jobWaiter.wait(finishTime - time) - } - } - } + jobWaiter.completionFuture.ready(atMost) this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { - ready(atMost)(permit) - awaitResult() match { - case scala.util.Success(res) => res - case scala.util.Failure(e) => throw e - } + jobWaiter.completionFuture.ready(atMost) + value.get.get } override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) { - executor.execute(new Runnable { - override def run() { - func(awaitResult()) - } - }) + jobWaiter.completionFuture onComplete {_ => func(value.get)} } override def isCompleted: Boolean = jobWaiter.jobFinished override def isCancelled: Boolean = _cancelled - override def value: Option[Try[T]] = { - if (jobWaiter.jobFinished) { - Some(awaitResult()) - } else { - None - } - } - - private def awaitResult(): Try[T] = { - jobWaiter.awaitResult() match { - case JobSucceeded => scala.util.Success(resultFunc) - case JobFailed(e: Exception) => scala.util.Failure(e) - } - } + override def value: Option[Try[T]] = + jobWaiter.completionFuture.value map {res => res map {_ => resultFunc}} def jobIds: Seq[Int] = Seq(jobWaiter.jobId) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 995862ece5944..59dcef4fcd07f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -22,8 +22,11 @@ import java.util.Properties import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.Success + import scala.collection.Map import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Stack} +import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps @@ -605,11 +608,12 @@ class DAGScheduler( properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) - waiter.awaitResult() match { - case JobSucceeded => + Await.ready(waiter.completionFuture, atMost=Duration.Inf) + waiter.completionFuture.value.get match { + case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) - case JobFailed(exception: Exception) => + case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 382b09422a4a0..6efdae152b1ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -17,6 +17,10 @@ package org.apache.spark.scheduler +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.{Future, Promise} + /** * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their * results to the given handler function. @@ -28,17 +32,15 @@ private[spark] class JobWaiter[T]( resultHandler: (Int, T) => Unit) extends JobListener { - private var finishedTasks = 0 - - // Is the job as a whole finished (succeeded or failed)? - @volatile - private var _jobFinished = totalTasks == 0 - - def jobFinished: Boolean = _jobFinished - + private val finishedTasks = new AtomicInteger(0) // If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero // partition RDDs), we set the jobResult directly to JobSucceeded. - private var jobResult: JobResult = if (jobFinished) JobSucceeded else null + private val jobPromise : Promise[Unit] = + if (totalTasks == 0) Promise.successful(()) else Promise() + + def jobFinished: Boolean = jobPromise.isCompleted + + def completionFuture : Future[Unit] = jobPromise.future /** * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled @@ -49,29 +51,13 @@ private[spark] class JobWaiter[T]( dagScheduler.cancelJob(jobId) } - override def taskSucceeded(index: Int, result: Any): Unit = synchronized { - if (_jobFinished) { - throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") - } + override def taskSucceeded(index: Int, result: Any): Unit = { resultHandler(index, result.asInstanceOf[T]) - finishedTasks += 1 - if (finishedTasks == totalTasks) { - _jobFinished = true - jobResult = JobSucceeded - this.notifyAll() - } + if (finishedTasks.incrementAndGet() == totalTasks) + jobPromise.success(()) } - override def jobFailed(exception: Exception): Unit = synchronized { - _jobFinished = true - jobResult = JobFailed(exception) - this.notifyAll() - } + override def jobFailed(exception: Exception): Unit = + jobPromise.failure(exception) - def awaitResult(): JobResult = synchronized { - while (!_jobFinished) { - this.wait() - } - return jobResult - } } From 1ad1abdd995a6347b2a18a31260168a987fee06b Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Sat, 24 Oct 2015 12:17:09 -0400 Subject: [PATCH 02/26] reworked ComplexFutureAction and AsyncRDDActions.takeAsync to be non-blocking --- .../scala/org/apache/spark/FutureAction.scala | 70 +++++-------------- .../apache/spark/rdd/AsyncRDDActions.scala | 41 ++++++----- 2 files changed, 43 insertions(+), 68 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index b63bdecb9e917..edbefd6e9cba9 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -22,11 +22,11 @@ import java.util.concurrent.TimeUnit import org.apache.spark.api.java.JavaFutureAction import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter} +import org.apache.spark.scheduler.JobWaiter import scala.concurrent._ import scala.concurrent.duration.Duration -import scala.util.{Failure, Try} +import scala.util.Try /** * A future for the result of an action to support cancellation. This is an extension of the @@ -148,44 +148,25 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: */ class ComplexFutureAction[T] extends FutureAction[T] { - // Pointer to the thread that is executing the action. It is set when the action is run. - @volatile private var thread: Thread = _ + @volatile private var _cancelled = false - // A flag indicating whether the future has been cancelled. This is used in case the future - // is cancelled before the action was even run (and thus we have no thread to interrupt). - @volatile private var _cancelled: Boolean = false - - @volatile private var jobs: Seq[Int] = Nil + @volatile private var subActions: List[FutureAction[_]] = Nil // A promise used to signal the future. - private val p = promise[T]() + private val p = Promise[T]() - override def cancel(): Unit = this.synchronized { + override def cancel(): Unit = synchronized { _cancelled = true - if (thread != null) { - thread.interrupt() - } + p.tryFailure(new SparkException("Action has been cancelled")) + subActions foreach {_.cancel()} } /** * Executes some action enclosed in the closure. To properly enable cancellation, the closure * should use runJob implementation in this promise. See takeAsync for example. */ - def run(func: => T)(implicit executor: ExecutionContext): this.type = { - scala.concurrent.future { - thread = Thread.currentThread - try { - p.success(func) - } catch { - case e: Exception => p.failure(e) - } finally { - // This lock guarantees when calling `thread.interrupt()` in `cancel`, - // thread won't be set to null. - ComplexFutureAction.this.synchronized { - thread = null - } - } - } + def run(func: => Future[T])(implicit executor: ExecutionContext): this.type = { + p tryCompleteWith func this } @@ -198,28 +179,15 @@ class ComplexFutureAction[T] extends FutureAction[T] { processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, - resultFunc: => R) { + resultFunc: => R)(implicit executor: ExecutionContext) = synchronized { // If the action hasn't been cancelled yet, submit the job. The check and the submitJob // command need to be in an atomic block. - val job = this.synchronized { - if (!isCancelled) { - rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) - } else { - throw new SparkException("Action has been cancelled") - } - } - - this.jobs = jobs ++ job.jobIds - - // Wait for the job to complete. If the action is cancelled (with an interrupt), - // cancel the job and stop the execution. This is not in a synchronized block because - // Await.ready eventually waits on the monitor in FutureJob.jobWaiter. - try { - Await.ready(job, Duration.Inf) - } catch { - case e: InterruptedException => - job.cancel() - throw new SparkException("Action has been cancelled") + if (!isCancelled) { + val job = rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) + subActions = job::subActions + job + } else { + throw new SparkException("Action has been cancelled") } } @@ -245,7 +213,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { override def value: Option[Try[T]] = p.future.value - def jobIds: Seq[Int] = jobs + def jobIds: Seq[Int] = subActions flatMap {_.jobIds} } @@ -272,7 +240,7 @@ class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S Await.ready(futureAction, timeout) futureAction.value.get match { case scala.util.Success(value) => converter(value) - case Failure(exception) => + case scala.util.Failure(exception) => if (isCancelled) { throw new CancellationException("Job cancelled").initCause(exception) } else { diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index ca1eb1f4e4a9a..41a4b70b6b43d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -22,10 +22,10 @@ import java.util.concurrent.atomic.AtomicLong import org.apache.spark.util.ThreadUtils import scala.collection.mutable.ArrayBuffer -import scala.concurrent.ExecutionContext +import scala.concurrent.{Future, ExecutionContext} import scala.reflect.ClassTag -import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} +import org.apache.spark.{SimpleFutureAction, ComplexFutureAction, FutureAction, Logging} /** * A set of asynchronous RDD actions available through an implicit conversion. @@ -66,14 +66,22 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi */ def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope { val f = new ComplexFutureAction[Seq[T]] - - f.run { - // This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which - // is a cached thread pool. - val results = new ArrayBuffer[T](num) - val totalParts = self.partitions.length - var partsScanned = 0 - while (results.size < num && partsScanned < totalParts) { + // Cached thread pool to handle aggregation of subtasks. + implicit val executionContext = AsyncRDDActions.futureExecutionContext + val results = new ArrayBuffer[T](num) + val totalParts = self.partitions.length + + /* + Recursively triggers jobs to scan partitions until either the requested + number of elements are retrieved, or the partitions to scan are exhausted. + This implementation is non-blocking, asynchronously handling the + results of each job and triggering the next job using callbacks on futures. + */ + def continue(partsScanned : Int) : Future[Seq[T]] = + if (results.size >= num || partsScanned >= totalParts) { + Future.successful(results.toSeq) + } + else { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1 @@ -95,19 +103,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val buf = new Array[Array[T]](p.size) - f.runJob(self, + val job = f.runJob(self, (it: Iterator[T]) => it.take(left).toArray, p, (index: Int, data: Array[T]) => buf(index) = data, Unit) - - buf.foreach(results ++= _.take(num - results.size)) - partsScanned += numPartsToTry + job flatMap {case _ => + buf.foreach(results ++= _.take(num - results.size)) + continue(partsScanned + numPartsToTry) + } } - results.toSeq - }(AsyncRDDActions.futureExecutionContext) - f + f.run {continue(0)} } /** From de008ceda1427bc547fdb6cb7206144001db402b Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Sat, 24 Oct 2015 15:59:03 -0400 Subject: [PATCH 03/26] corrected issues flagged by scalastyle --- core/src/main/scala/org/apache/spark/FutureAction.scala | 2 +- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index edbefd6e9cba9..7cfa4e452895e 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -179,7 +179,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, - resultFunc: => R)(implicit executor: ExecutionContext) = synchronized { + resultFunc: => R)(implicit executor: ExecutionContext) : FutureAction[R] = synchronized { // If the action hasn't been cancelled yet, submit the job. The check and the submitJob // command need to be in an atomic block. if (!isCancelled) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 59dcef4fcd07f..92bae5fa8675e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -608,7 +608,7 @@ class DAGScheduler( properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) - Await.ready(waiter.completionFuture, atMost=Duration.Inf) + Await.ready(waiter.completionFuture, atMost = Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 6efdae152b1ed..f3f2793af0868 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -53,8 +53,9 @@ private[spark] class JobWaiter[T]( override def taskSucceeded(index: Int, result: Any): Unit = { resultHandler(index, result.asInstanceOf[T]) - if (finishedTasks.incrementAndGet() == totalTasks) + if (finishedTasks.incrementAndGet() == totalTasks) { jobPromise.success(()) + } } override def jobFailed(exception: Exception): Unit = From 17f69885303fa9ee9ac4bb6c487e489d1bba9873 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Sat, 7 Nov 2015 15:45:24 -0500 Subject: [PATCH 04/26] added test to verify that FutureAction callback doesn't invoke ExecutionContext too early --- .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index ec99f2a1bad66..55ea4806352b6 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.rdd import java.util.concurrent.Semaphore -import scala.concurrent.{Await, TimeoutException} +import scala.concurrent._ import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext.Implicits.global @@ -197,4 +197,17 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim Await.result(f, Duration(20, "milliseconds")) } } + + test("FutureAction callback must not consume a thread while waiting") { + val executorInvoked = Promise[Unit] + val fakeExecutionContext = new ExecutionContext { + override def execute(runnable: Runnable): Unit = { + executorInvoked.success(()) + } + override def reportFailure(t: Throwable): Unit = ??? + } + val f = sc.parallelize(1 to 100, 4).mapPartitions(itr => {Thread.sleep(1000L); itr}).countAsync() + f.onComplete(_ => ())(fakeExecutionContext) + assert(!executorInvoked.isCompleted) + } } From 4def9894a7b08b2443804107808698c5f3f6588d Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Sat, 7 Nov 2015 15:49:22 -0500 Subject: [PATCH 05/26] split out tests for SimpleFutureAction and ComplexFutureAction callback handling --- .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index 55ea4806352b6..fbcdb8132a606 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -198,7 +198,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim } } - test("FutureAction callback must not consume a thread while waiting") { + test("SimpleFutureAction callback must not consume a thread while waiting") { val executorInvoked = Promise[Unit] val fakeExecutionContext = new ExecutionContext { override def execute(runnable: Runnable): Unit = { @@ -210,4 +210,17 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim f.onComplete(_ => ())(fakeExecutionContext) assert(!executorInvoked.isCompleted) } + + test("ComplexFutureAction callback must not consume a thread while waiting") { + val executorInvoked = Promise[Unit] + val fakeExecutionContext = new ExecutionContext { + override def execute(runnable: Runnable): Unit = { + executorInvoked.success(()) + } + override def reportFailure(t: Throwable): Unit = ??? + } + val f = sc.parallelize(1 to 100, 4).mapPartitions(itr => {Thread.sleep(1000L); itr}).takeAsync(100) + f.onComplete(_ => ())(fakeExecutionContext) + assert(!executorInvoked.isCompleted) + } } From 8624ef04419f63c988ef6b658b79b70beb65487f Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Tue, 10 Nov 2015 00:31:41 -0500 Subject: [PATCH 06/26] refactored duplicate code in SimpleFutureAction/ComplexFutureAction tests for non-blocking; cleaned up a few minor style issues in the process; added comment explaining why we're using Thread.sleep --- .../spark/rdd/AsyncRDDActionsSuite.scala | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index fbcdb8132a606..28bd7e6c0651a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ -import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark._ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Timeouts { @@ -198,29 +198,30 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim } } - test("SimpleFutureAction callback must not consume a thread while waiting") { + private def testAsyncAction[R](action : RDD[Int] => FutureAction[R]) : Unit = { val executorInvoked = Promise[Unit] val fakeExecutionContext = new ExecutionContext { override def execute(runnable: Runnable): Unit = { executorInvoked.success(()) } - override def reportFailure(t: Throwable): Unit = ??? - } - val f = sc.parallelize(1 to 100, 4).mapPartitions(itr => {Thread.sleep(1000L); itr}).countAsync() + override def reportFailure(t: Throwable): Unit = () + } + /* + We sleep here so that we get to the assertion before the job completes. + I wish there were a cleaner way to do this, but trying to use any sort of synchronization + with this fails due to task serialization. + */ + val rdd = sc.parallelize(1 to 100, 4).mapPartitions(itr => {Thread.sleep(1000L); itr}) + val f = action(rdd) f.onComplete(_ => ())(fakeExecutionContext) assert(!executorInvoked.isCompleted) } + test("SimpleFutureAction callback must not consume a thread while waiting") { + testAsyncAction(_.countAsync()) + } + test("ComplexFutureAction callback must not consume a thread while waiting") { - val executorInvoked = Promise[Unit] - val fakeExecutionContext = new ExecutionContext { - override def execute(runnable: Runnable): Unit = { - executorInvoked.success(()) - } - override def reportFailure(t: Throwable): Unit = ??? - } - val f = sc.parallelize(1 to 100, 4).mapPartitions(itr => {Thread.sleep(1000L); itr}).takeAsync(100) - f.onComplete(_ => ())(fakeExecutionContext) - assert(!executorInvoked.isCompleted) + testAsyncAction((_.takeAsync(100))) } } From 31de51bd869b0761e26b07b48e92296f8a7b40fc Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Tue, 10 Nov 2015 00:37:03 -0500 Subject: [PATCH 07/26] removed unused/redundant imports and cleaned up order of import statements --- core/src/main/scala/org/apache/spark/FutureAction.scala | 7 ++++--- .../main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 5 ++--- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 -- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 7cfa4e452895e..c0824fc654caf 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -20,13 +20,14 @@ package org.apache.spark import java.util.Collections import java.util.concurrent.TimeUnit +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.util.Try + import org.apache.spark.api.java.JavaFutureAction import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.JobWaiter -import scala.concurrent._ -import scala.concurrent.duration.Duration -import scala.util.Try /** * A future for the result of an action to support cancellation. This is an extension of the diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 41a4b70b6b43d..ae6b2a8da757e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -19,13 +19,12 @@ package org.apache.spark.rdd import java.util.concurrent.atomic.AtomicLong -import org.apache.spark.util.ThreadUtils - import scala.collection.mutable.ArrayBuffer import scala.concurrent.{Future, ExecutionContext} import scala.reflect.ClassTag -import org.apache.spark.{SimpleFutureAction, ComplexFutureAction, FutureAction, Logging} +import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} +import org.apache.spark.util.ThreadUtils /** * A set of asynchronous RDD actions available through an implicit conversion. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7deb60461f7d3..6992adf72c0f1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -22,8 +22,6 @@ import java.util.Properties import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger -import org.apache.spark.Success - import scala.collection.Map import scala.concurrent.Await import scala.collection.mutable.{HashMap, HashSet, Stack} From 0a6c614ffdc26256039dbe1b3965bfaad3c5e46c Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Tue, 10 Nov 2015 21:42:56 -0500 Subject: [PATCH 08/26] removed unused implicit ExecutionContext parameter from ComplexFutureAction.runJob --- core/src/main/scala/org/apache/spark/FutureAction.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index c0824fc654caf..e458bb69800a9 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -180,7 +180,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, - resultFunc: => R)(implicit executor: ExecutionContext) : FutureAction[R] = synchronized { + resultFunc: => R) : FutureAction[R] = synchronized { // If the action hasn't been cancelled yet, submit the job. The check and the submitJob // command need to be in an atomic block. if (!isCancelled) { @@ -218,6 +218,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { } + private[spark] class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T) extends JavaFutureAction[T] { From 2c2da36f97810effa3beeba192a7a30797f65b36 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Tue, 10 Nov 2015 22:27:23 -0500 Subject: [PATCH 09/26] added @DeveloperApi annotation to SimpleFutureAction and ComplexFutureAction --- core/src/main/scala/org/apache/spark/FutureAction.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index e458bb69800a9..ee2c7aa101f8b 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -20,6 +20,8 @@ package org.apache.spark import java.util.Collections import java.util.concurrent.TimeUnit +import org.apache.spark.annotation.DeveloperApi + import scala.concurrent._ import scala.concurrent.duration.Duration import scala.util.Try @@ -106,6 +108,7 @@ trait FutureAction[T] extends Future[T] { * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include * count, collect, reduce. */ +@DeveloperApi class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) extends FutureAction[T] { @@ -147,6 +150,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the * action thread if it is being blocked by a job. */ +@DeveloperApi class ComplexFutureAction[T] extends FutureAction[T] { @volatile private var _cancelled = false From cef76370814d123b659fb617a6db7dc6277a5ce2 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Wed, 11 Nov 2015 13:07:09 -0500 Subject: [PATCH 10/26] corrected order of imports --- core/src/main/scala/org/apache/spark/FutureAction.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index ee2c7aa101f8b..3b55c956f617a 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -20,12 +20,11 @@ package org.apache.spark import java.util.Collections import java.util.concurrent.TimeUnit -import org.apache.spark.annotation.DeveloperApi - import scala.concurrent._ import scala.concurrent.duration.Duration import scala.util.Try +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.JavaFutureAction import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.JobWaiter From e364f93dad8b7f314d3e5359c6b180a8913e1fbc Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Wed, 11 Nov 2015 13:11:07 -0500 Subject: [PATCH 11/26] corrected minor style issue --- .../test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index 28bd7e6c0651a..735b2582c05bf 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -211,7 +211,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim I wish there were a cleaner way to do this, but trying to use any sort of synchronization with this fails due to task serialization. */ - val rdd = sc.parallelize(1 to 100, 4).mapPartitions(itr => {Thread.sleep(1000L); itr}) + val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => Thread.sleep(1000L); itr} val f = action(rdd) f.onComplete(_ => ())(fakeExecutionContext) assert(!executorInvoked.isCompleted) From 0dd1128ec3383b08570ff6f9918e33091c4f1f2c Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Wed, 11 Nov 2015 13:35:01 -0500 Subject: [PATCH 12/26] reworked AsyncRDDActionsSuite.testAsyncAction to use semaphores instead of Thread.sleep in order to reduce the potential for flakiness. --- .../spark/rdd/AsyncRDDActionsSuite.scala | 43 +++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index 735b2582c05bf..ae9ca47d6b5e9 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -198,30 +198,49 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim } } - private def testAsyncAction[R](action : RDD[Int] => FutureAction[R]) : Unit = { - val executorInvoked = Promise[Unit] + private def testAsyncAction[R](action : RDD[Int] => FutureAction[R]) + (starter : => Semaphore) : Unit = { + val executionContextInvoked = Promise[Unit] val fakeExecutionContext = new ExecutionContext { override def execute(runnable: Runnable): Unit = { - executorInvoked.success(()) + executionContextInvoked.success(()) } override def reportFailure(t: Throwable): Unit = () } - /* - We sleep here so that we get to the assertion before the job completes. - I wish there were a cleaner way to do this, but trying to use any sort of synchronization - with this fails due to task serialization. - */ - val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => Thread.sleep(1000L); itr} + starter.drainPermits() + val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => starter.acquire(1); itr} val f = action(rdd) f.onComplete(_ => ())(fakeExecutionContext) - assert(!executorInvoked.isCompleted) + // Here we verify that registering the callback didn't cause a thread to be consumed. + assert(!executionContextInvoked.isCompleted) + // Now allow the executors to proceed with task processing. + starter.release(rdd.partitions.length) + // Waiting for the result verifies that the tasks were successfully processed. + // This mainly exists to verify that we didn't break task deserialization. + Await.result(executionContextInvoked.future, atMost = 15.seconds) } test("SimpleFutureAction callback must not consume a thread while waiting") { - testAsyncAction(_.countAsync()) + testAsyncAction(_.countAsync())(AsyncRDDActionsSuite.simpleAsyncActionStart) } test("ComplexFutureAction callback must not consume a thread while waiting") { - testAsyncAction((_.takeAsync(100))) + testAsyncAction((_.takeAsync(100)))(AsyncRDDActionsSuite.complexAsyncActionStart) } } + + +object AsyncRDDActionsSuite { + /* + These are used by the tests that verify that callbacks don't consume threads while waiting + to force the executors to wait for a "go" signal before processing, so that the job + doesn't complete before we've had a chance to verify that the ExecutionContext was not + invoked. + + They must be placed here, in the companion object, rather than in the tests themselves, + so that they don't get serialized along with the tasks. + Each test gets its own semaphore so that the tests can be safely run in parallel if desired. + */ + private val simpleAsyncActionStart = new Semaphore(0) + private val complexAsyncActionStart = new Semaphore(0) +} From 50cd13e1c78a020003617c83a158035acc896f93 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Wed, 11 Nov 2015 13:55:47 -0500 Subject: [PATCH 13/26] cleaned up minor style issues --- core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 2 +- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index ae6b2a8da757e..5343e4a7145b3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -107,7 +107,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi p, (index: Int, data: Array[T]) => buf(index) = data, Unit) - job flatMap {case _ => + job.flatMap {case _ => buf.foreach(results ++= _.take(num - results.size)) continue(partsScanned + numPartsToTry) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6992adf72c0f1..05c4359a3932e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -23,8 +23,8 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map -import scala.concurrent.Await import scala.collection.mutable.{HashMap, HashSet, Stack} +import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps From 7f4244da42034e29d35919d911a9603c679657bb Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Wed, 11 Nov 2015 14:21:51 -0500 Subject: [PATCH 14/26] synchronized resultHandler call in JobWaiter.taskSucceeded --- .../main/scala/org/apache/spark/scheduler/JobWaiter.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index f3f2793af0868..cae4abd9a0d2c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -52,7 +52,11 @@ private[spark] class JobWaiter[T]( } override def taskSucceeded(index: Int, result: Any): Unit = { - resultHandler(index, result.asInstanceOf[T]) + /* + The resultHandler call must be synchronized in case resultHandler itself + is not thread safe. + */ + synchronized(resultHandler(index, result.asInstanceOf[T])) if (finishedTasks.incrementAndGet() == totalTasks) { jobPromise.success(()) } From dddad7f841db6fc696b8cd7d9e3041d53b4233db Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Wed, 11 Nov 2015 14:28:37 -0500 Subject: [PATCH 15/26] more style fixes; updated documentation for ComplexFutureAction.runJob, which has been renamed to submitJob --- .../main/scala/org/apache/spark/FutureAction.scala | 13 +++++++------ .../org/apache/spark/rdd/AsyncRDDActions.scala | 2 +- .../org/apache/spark/scheduler/JobWaiter.scala | 13 ++++++------- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 3b55c956f617a..e332a0f53b592 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -138,7 +138,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: override def isCancelled: Boolean = _cancelled override def value: Option[Try[T]] = - jobWaiter.completionFuture.value map {res => res map {_ => resultFunc}} + jobWaiter.completionFuture.value.map {res => res.map {_ => resultFunc}} def jobIds: Seq[Int] = Seq(jobWaiter.jobId) } @@ -162,7 +162,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { override def cancel(): Unit = synchronized { _cancelled = true p.tryFailure(new SparkException("Action has been cancelled")) - subActions foreach {_.cancel()} + subActions.foreach {_.cancel()} } /** @@ -170,20 +170,21 @@ class ComplexFutureAction[T] extends FutureAction[T] { * should use runJob implementation in this promise. See takeAsync for example. */ def run(func: => Future[T])(implicit executor: ExecutionContext): this.type = { - p tryCompleteWith func + p.tryCompleteWith(func) this } /** - * Runs a Spark job. This is a wrapper around the same functionality provided by SparkContext + * Submit a job for execution and return a FutureAction holding the result. + * This is a wrapper around the same functionality provided by SparkContext * to enable cancellation. */ - def runJob[T, U, R]( + def submitJob[T, U, R]( rdd: RDD[T], processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, - resultFunc: => R) : FutureAction[R] = synchronized { + resultFunc: => R): FutureAction[R] = synchronized { // If the action hasn't been cancelled yet, submit the job. The check and the submitJob // command need to be in an atomic block. if (!isCancelled) { diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 5343e4a7145b3..1fb2527f73e6b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -102,7 +102,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val buf = new Array[Array[T]](p.size) - val job = f.runJob(self, + val job = f.submitJob(self, (it: Iterator[T]) => it.take(left).toArray, p, (index: Int, data: Array[T]) => buf(index) = data, diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index cae4abd9a0d2c..4326135186a73 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -35,12 +35,12 @@ private[spark] class JobWaiter[T]( private val finishedTasks = new AtomicInteger(0) // If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero // partition RDDs), we set the jobResult directly to JobSucceeded. - private val jobPromise : Promise[Unit] = + private val jobPromise: Promise[Unit] = if (totalTasks == 0) Promise.successful(()) else Promise() def jobFinished: Boolean = jobPromise.isCompleted - def completionFuture : Future[Unit] = jobPromise.future + def completionFuture: Future[Unit] = jobPromise.future /** * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled @@ -52,11 +52,10 @@ private[spark] class JobWaiter[T]( } override def taskSucceeded(index: Int, result: Any): Unit = { - /* - The resultHandler call must be synchronized in case resultHandler itself - is not thread safe. - */ - synchronized(resultHandler(index, result.asInstanceOf[T])) + // resultHandler call must be synchronized in case resultHandler itself is not thread safe. + synchronized { + resultHandler(index, result.asInstanceOf[T]) + } if (finishedTasks.incrementAndGet() == totalTasks) { jobPromise.success(()) } From 19ef962e51186223196650a4d82a71fde402e911 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Wed, 11 Nov 2015 15:00:15 -0500 Subject: [PATCH 16/26] fixed a couple of other style issues --- .../scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index ae9ca47d6b5e9..a8a436046d1e6 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -198,8 +198,8 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim } } - private def testAsyncAction[R](action : RDD[Int] => FutureAction[R]) - (starter : => Semaphore) : Unit = { + private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]) + (starter: =>Semaphore) : Unit = { val executionContextInvoked = Promise[Unit] val fakeExecutionContext = new ExecutionContext { override def execute(runnable: Runnable): Unit = { From d1a905d9de5447b09d5c9f3e47ddf92a09399562 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Wed, 11 Nov 2015 16:33:11 -0500 Subject: [PATCH 17/26] fixed issue flagged by scalastyle --- .../test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index a8a436046d1e6..8206c553cc6cd 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -199,7 +199,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim } private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]) - (starter: =>Semaphore) : Unit = { + (starter: => Semaphore) : Unit = { val executionContextInvoked = Promise[Unit] val fakeExecutionContext = new ExecutionContext { override def execute(runnable: Runnable): Unit = { From c63ac17555595413022582c97a64fa2e998e9bba Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Wed, 11 Nov 2015 20:26:02 -0500 Subject: [PATCH 18/26] added an assertion to verify that future completed properly --- core/src/main/scala/org/apache/spark/FutureAction.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index e332a0f53b592..78369ce0c3213 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -126,6 +126,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { jobWaiter.completionFuture.ready(atMost) + assert(value.isDefined, "Future has not completed properly") value.get.get } From 489aabc649104cddc8c3c41fbafe4e82e249f427 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Sat, 14 Nov 2015 17:30:32 -0500 Subject: [PATCH 19/26] picked some nits --- core/src/main/scala/org/apache/spark/FutureAction.scala | 6 +++--- .../main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 78369ce0c3213..ba195eea297b0 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -139,7 +139,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: override def isCancelled: Boolean = _cancelled override def value: Option[Try[T]] = - jobWaiter.completionFuture.value.map {res => res.map {_ => resultFunc}} + jobWaiter.completionFuture.value.map {res => res.map(_ => resultFunc)} def jobIds: Seq[Int] = Seq(jobWaiter.jobId) } @@ -163,7 +163,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { override def cancel(): Unit = synchronized { _cancelled = true p.tryFailure(new SparkException("Action has been cancelled")) - subActions.foreach {_.cancel()} + subActions.foreach(_.cancel()) } /** @@ -219,7 +219,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { override def value: Option[Try[T]] = p.future.value - def jobIds: Seq[Int] = subActions flatMap {_.jobIds} + def jobIds: Seq[Int] = subActions.flatMap(_.jobIds) } diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 1fb2527f73e6b..7134bed010b34 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -79,8 +79,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi def continue(partsScanned : Int) : Future[Seq[T]] = if (results.size >= num || partsScanned >= totalParts) { Future.successful(results.toSeq) - } - else { + } else { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1 @@ -107,7 +106,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi p, (index: Int, data: Array[T]) => buf(index) = data, Unit) - job.flatMap {case _ => + job.flatMap {_ => buf.foreach(results ++= _.take(num - results.size)) continue(partsScanned + numPartsToTry) } From c19b3c084d0c870a422df6e32f8efbe7620d335c Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Sat, 14 Nov 2015 18:02:39 -0500 Subject: [PATCH 20/26] picked another nit --- core/src/main/scala/org/apache/spark/FutureAction.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index ba195eea297b0..171e17c6ad188 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -190,7 +190,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { // command need to be in an atomic block. if (!isCancelled) { val job = rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) - subActions = job::subActions + subActions = job :: subActions job } else { throw new SparkException("Action has been cancelled") From 601bb95fa902d23785c6144346c096c92d4b0396 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Mon, 16 Nov 2015 22:08:27 -0500 Subject: [PATCH 21/26] Refactored ComplexFutureAction to take a 'run' function as a constructor argument accepting a JobSubmitter, rather than having 'run' and 'submitJob' methods on ComplexFutureAction itself. --- .../scala/org/apache/spark/FutureAction.scala | 70 +++++++++++-------- .../apache/spark/rdd/AsyncRDDActions.scala | 9 ++- 2 files changed, 46 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 171e17c6ad188..2a8220ff40090 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -145,20 +145,41 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } +/** + * Handle via which a "run" function passed to a [[ComplexFutureAction]] + * can submit jobs for execution. + */ +@DeveloperApi +trait JobSubmitter { + /** + * Submit a job for execution and return a FutureAction holding the result. + * This is a wrapper around the same functionality provided by SparkContext + * to enable cancellation. + */ + def submitJob[T, U, R]( + rdd: RDD[T], + processPartition: Iterator[T] => U, + partitions: Seq[Int], + resultHandler: (Int, U) => Unit, + resultFunc: => R): FutureAction[R] +} + + /** * A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take, - * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the - * action thread if it is being blocked by a job. + * takeSample. Cancellation works by setting the cancelled flag to true and cancelling any pending + * jobs. */ @DeveloperApi -class ComplexFutureAction[T] extends FutureAction[T] { +class ComplexFutureAction[T](run : JobSubmitter => Future[T]) + extends FutureAction[T] { self => @volatile private var _cancelled = false @volatile private var subActions: List[FutureAction[_]] = Nil // A promise used to signal the future. - private val p = Promise[T]() + private val p = Promise[T]().tryCompleteWith(run(jobSubmitter)) override def cancel(): Unit = synchronized { _cancelled = true @@ -166,34 +187,27 @@ class ComplexFutureAction[T] extends FutureAction[T] { subActions.foreach(_.cancel()) } - /** - * Executes some action enclosed in the closure. To properly enable cancellation, the closure - * should use runJob implementation in this promise. See takeAsync for example. - */ - def run(func: => Future[T])(implicit executor: ExecutionContext): this.type = { - p.tryCompleteWith(func) - this - } - - /** - * Submit a job for execution and return a FutureAction holding the result. - * This is a wrapper around the same functionality provided by SparkContext - * to enable cancellation. - */ - def submitJob[T, U, R]( + private def jobSubmitter = new JobSubmitter { + def submitJob[T, U, R]( rdd: RDD[T], processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, - resultFunc: => R): FutureAction[R] = synchronized { - // If the action hasn't been cancelled yet, submit the job. The check and the submitJob - // command need to be in an atomic block. - if (!isCancelled) { - val job = rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) - subActions = job :: subActions - job - } else { - throw new SparkException("Action has been cancelled") + resultFunc: => R): FutureAction[R] = self.synchronized { + // If the action hasn't been cancelled yet, submit the job. The check and the submitJob + // command need to be in an atomic block. + if (!isCancelled) { + val job = rdd.context.submitJob( + rdd, + processPartition, + partitions, + resultHandler, + resultFunc) + subActions = job :: subActions + job + } else { + throw new SparkException("Action has been cancelled") + } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 7134bed010b34..fbbfdbb67d703 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.{Future, ExecutionContext} import scala.reflect.ClassTag -import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} +import org.apache.spark.{JobSubmitter, ComplexFutureAction, FutureAction, Logging} import org.apache.spark.util.ThreadUtils /** @@ -64,7 +64,6 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi * Returns a future for retrieving the first num elements of the RDD. */ def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope { - val f = new ComplexFutureAction[Seq[T]] // Cached thread pool to handle aggregation of subtasks. implicit val executionContext = AsyncRDDActions.futureExecutionContext val results = new ArrayBuffer[T](num) @@ -76,7 +75,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi This implementation is non-blocking, asynchronously handling the results of each job and triggering the next job using callbacks on futures. */ - def continue(partsScanned : Int) : Future[Seq[T]] = + def continue(partsScanned: Int)(implicit jobSubmitter: JobSubmitter) : Future[Seq[T]] = if (results.size >= num || partsScanned >= totalParts) { Future.successful(results.toSeq) } else { @@ -101,7 +100,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val buf = new Array[Array[T]](p.size) - val job = f.submitJob(self, + val job = jobSubmitter.submitJob(self, (it: Iterator[T]) => it.take(left).toArray, p, (index: Int, data: Array[T]) => buf(index) = data, @@ -112,7 +111,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi } } - f.run {continue(0)} + new ComplexFutureAction[Seq[T]](continue(0)(_)) } /** From a8ba89977e7d749ada73228adb8744098309017f Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Sat, 21 Nov 2015 16:00:30 -0500 Subject: [PATCH 22/26] created reusable Smuggle construct to allow unserializable objects to be 'smuggled' into tasks for testing purposes, and refactored AsyncRDDActionsSuite to make use of it --- .../test/scala/org/apache/spark/Smuggle.scala | 82 +++++++++++++++++++ .../spark/rdd/AsyncRDDActionsSuite.scala | 24 +----- 2 files changed, 86 insertions(+), 20 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/Smuggle.scala diff --git a/core/src/test/scala/org/apache/spark/Smuggle.scala b/core/src/test/scala/org/apache/spark/Smuggle.scala new file mode 100644 index 0000000000000..01694a6e6f741 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/Smuggle.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.UUID +import java.util.concurrent.locks.ReentrantReadWriteLock + +import scala.collection.mutable + +/** + * Utility wrapper to "smuggle" objects into tasks while bypassing serialization. + * This is intended for testing purposes, primarily to make locks, semaphores, and + * other constructs that would not survive serialization available from within tasks. + * A Smuggle reference is itself serializable, but after being serialized and + * deserialized, it still refers to the same underlying "smuggled" object, as long + * as it was deserialized within the same JVM. This can be useful for tests that + * depend on the timing of task completion to be deterministic, since one can "smuggle" + * a lock or semaphore into the task, and then the task can block until the test gives + * the go-ahead to proceed via the lock. + */ +class Smuggle[T] private(val key: Symbol) extends Serializable { + def smuggledObject: T = Smuggle.get(key) +} + + +object Smuggle { + /** + * Wraps the specified object to be smuggled into a serialized task without + * being serialized itself. + * + * @param smuggledObject + * @tparam T + * @return Smuggle wrapper around smuggledObject. + */ + def apply[T](smuggledObject: T): Smuggle[T] = { + val key = Symbol(UUID.randomUUID().toString) + lock.writeLock().lock() + try { + smuggledObjects += key -> smuggledObject + } finally { + lock.writeLock().unlock() + } + new Smuggle(key) + } + + private val lock = new ReentrantReadWriteLock + private val smuggledObjects = mutable.WeakHashMap.empty[Symbol, Any] + + private def get[T](key: Symbol) : T = { + lock.readLock().lock() + try { + smuggledObjects(key).asInstanceOf[T] + } finally { + lock.readLock().unlock() + } + } + + /** + * Implicit conversion of a Smuggle wrapper to the object being smuggled. + * + * @param smuggle the wrapper to unpack. + * @tparam T + * @return the smuggled object represented by the wrapper. + */ + implicit def unpackSmuggledObject[T](smuggle : Smuggle[T]): T = smuggle.smuggledObject + +} diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index 8206c553cc6cd..ca29defd81489 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -198,8 +198,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim } } - private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]) - (starter: => Semaphore) : Unit = { + private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): Unit = { val executionContextInvoked = Promise[Unit] val fakeExecutionContext = new ExecutionContext { override def execute(runnable: Runnable): Unit = { @@ -207,6 +206,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim } override def reportFailure(t: Throwable): Unit = () } + val starter = Smuggle(new Semaphore(0)) starter.drainPermits() val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => starter.acquire(1); itr} val f = action(rdd) @@ -221,26 +221,10 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim } test("SimpleFutureAction callback must not consume a thread while waiting") { - testAsyncAction(_.countAsync())(AsyncRDDActionsSuite.simpleAsyncActionStart) + testAsyncAction(_.countAsync()) } test("ComplexFutureAction callback must not consume a thread while waiting") { - testAsyncAction((_.takeAsync(100)))(AsyncRDDActionsSuite.complexAsyncActionStart) + testAsyncAction((_.takeAsync(100))) } } - - -object AsyncRDDActionsSuite { - /* - These are used by the tests that verify that callbacks don't consume threads while waiting - to force the executors to wait for a "go" signal before processing, so that the job - doesn't complete before we've had a chance to verify that the ExecutionContext was not - invoked. - - They must be placed here, in the companion object, rather than in the tests themselves, - so that they don't get serialized along with the tasks. - Each test gets its own semaphore so that the tests can be safely run in parallel if desired. - */ - private val simpleAsyncActionStart = new Semaphore(0) - private val complexAsyncActionStart = new Semaphore(0) -} From 38b144226f6a3cb33558dc9100babd4d77534007 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Sun, 22 Nov 2015 15:52:43 -0500 Subject: [PATCH 23/26] added unit test for SPARK-4514 --- .../spark/rdd/AsyncRDDActionsSuite.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index ca29defd81489..68c438d692630 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -23,13 +23,14 @@ import scala.concurrent._ import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext.Implicits.global -import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.Timeouts +import org.scalatest.{Matchers, BeforeAndAfterAll} +import org.scalatest.concurrent.{Eventually, Timeouts} import org.scalatest.time.SpanSugar._ import org.apache.spark._ -class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Timeouts { +class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Timeouts + with Eventually with Matchers { @transient private var sc: SparkContext = _ @@ -227,4 +228,16 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim test("ComplexFutureAction callback must not consume a thread while waiting") { testAsyncAction((_.takeAsync(100))) } + + test("getJobIdsForGroup() with takeAsync()") { + sc.setJobGroup("my-job-group2", "description") + sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty) + val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1) + val firstJobId = eventually(timeout(10 seconds)) { + firstJobFuture.jobIds.head + } + eventually(timeout(10 seconds)) { + sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId)) + } + } } From 58164891be00627845da1fd8bce8906c20678d12 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Sun, 22 Nov 2015 15:57:03 -0500 Subject: [PATCH 24/26] Moved unit test for SPARK-4514 from AsyncRDDActionsSuite into StatusTrackerSuite This reverts commit 38b144226f6a3cb33558dc9100babd4d77534007. --- .../org/apache/spark/StatusTrackerSuite.scala | 13 +++++++++++++ .../spark/rdd/AsyncRDDActionsSuite.scala | 19 +++---------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index 46516e8d25298..61e030fc26a82 100644 --- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -86,4 +86,17 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont Set(firstJobId, secondJobId)) } } + + test("getJobIdsForGroup() with takeAsync()") { + sc = new SparkContext("local", "test", new SparkConf(false)) + sc.setJobGroup("my-job-group2", "description") + sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty) + val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1) + val firstJobId = eventually(timeout(10 seconds)) { + firstJobFuture.jobIds.head + } + eventually(timeout(10 seconds)) { + sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId)) + } + } } diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index 68c438d692630..ca29defd81489 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -23,14 +23,13 @@ import scala.concurrent._ import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext.Implicits.global -import org.scalatest.{Matchers, BeforeAndAfterAll} -import org.scalatest.concurrent.{Eventually, Timeouts} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark._ -class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Timeouts - with Eventually with Matchers { +class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Timeouts { @transient private var sc: SparkContext = _ @@ -228,16 +227,4 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim test("ComplexFutureAction callback must not consume a thread while waiting") { testAsyncAction((_.takeAsync(100))) } - - test("getJobIdsForGroup() with takeAsync()") { - sc.setJobGroup("my-job-group2", "description") - sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty) - val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1) - val firstJobId = eventually(timeout(10 seconds)) { - firstJobFuture.jobIds.head - } - eventually(timeout(10 seconds)) { - sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId)) - } - } } From 8fe8000ee5ada0f82ab171a27cab75e8142debcb Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Sun, 22 Nov 2015 16:17:00 -0500 Subject: [PATCH 25/26] fixed SPARK-4514 by calling setLocalProperties before submitting jobs within takeAsync --- .../org/apache/spark/rdd/AsyncRDDActions.scala | 2 ++ .../org/apache/spark/StatusTrackerSuite.scala | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 303a152565348..14f541f937b4c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -65,6 +65,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi */ def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope { val callSite = self.context.getCallSite + val localProperties = self.context.getLocalProperties // Cached thread pool to handle aggregation of subtasks. implicit val executionContext = AsyncRDDActions.futureExecutionContext val results = new ArrayBuffer[T](num) @@ -102,6 +103,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi val buf = new Array[Array[T]](p.size) self.context.setCallSite(callSite) + self.context.setLocalProperties(localProperties) val job = jobSubmitter.submitJob(self, (it: Iterator[T]) => it.take(left).toArray, p, diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index 61e030fc26a82..5483f2b8434aa 100644 --- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -90,7 +90,7 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont test("getJobIdsForGroup() with takeAsync()") { sc = new SparkContext("local", "test", new SparkConf(false)) sc.setJobGroup("my-job-group2", "description") - sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty) + sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1) val firstJobId = eventually(timeout(10 seconds)) { firstJobFuture.jobIds.head @@ -99,4 +99,17 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId)) } } + + test("getJobIdsForGroup() with takeAsync() across multiple partitions") { + sc = new SparkContext("local", "test", new SparkConf(false)) + sc.setJobGroup("my-job-group2", "description") + sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty + val firstJobFuture = sc.parallelize(1 to 1000, 2).takeAsync(999) + val firstJobId = eventually(timeout(10 seconds)) { + firstJobFuture.jobIds.head + } + eventually(timeout(10 seconds)) { + sc.statusTracker.getJobIdsForGroup("my-job-group2") should have size 2 + } + } } From 539ac43c3be54abb61b5a44450cc13cc196113b2 Mon Sep 17 00:00:00 2001 From: "Richard W. Eggert II" Date: Sun, 13 Dec 2015 11:33:12 -0500 Subject: [PATCH 26/26] removed inaccurate comment line from AsyncRDDActionsSuite --- .../test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index ca29defd81489..de015ebd5d237 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -216,7 +216,6 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim // Now allow the executors to proceed with task processing. starter.release(rdd.partitions.length) // Waiting for the result verifies that the tasks were successfully processed. - // This mainly exists to verify that we didn't break task deserialization. Await.result(executionContextInvoked.future, atMost = 15.seconds) }