From 95ac5ad8b2b1f38554f54a62b195f8d87bcd8c17 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 28 Feb 2024 01:04:03 -0800 Subject: [PATCH 1/2] [SPARK-47207][CORE] Support `spark.driver.timeout` and `DriverTimeoutPlugin` --- .../spark/deploy/DriverTimeoutPlugin.scala | 62 +++++++++++++++++++ .../spark/internal/config/package.scala | 9 +++ 2 files changed, 71 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala new file mode 100644 index 0000000000000..542d4463807c0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy + +import java.util.{Map => JMap} +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkContext +import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DRIVER_TIMEOUT +import org.apache.spark.util.ThreadUtils + +/** + * A built-in plugin to provide Driver timeout feature. + */ +class DriverTimeoutPlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = new DriverTimeoutDriverPlugin() + + // No-op + override def executorPlugin(): ExecutorPlugin = null +} + +class DriverTimeoutDriverPlugin extends DriverPlugin with Logging { + + private val timeoutService: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-timeout") + + override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { + val timeout = sc.conf.get(DRIVER_TIMEOUT) + if (timeout == 0) { + logWarning("Disabled with the timeout value 0.") + } else { + val task: Runnable = () => { + logWarning(s"Terminate Driver JVM because it runs after $timeout minute" + + (if (timeout == 1) "" else "s")) + // We cannot use 'SparkContext.stop' because SparkContext might be in abnormal situation. + System.exit(124) + } + timeoutService.schedule(task, timeout, TimeUnit.MINUTES) + } + Map.empty[String, String].asJava + } + + override def shutdown(): Unit = timeoutService.shutdown() +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7caac5884c745..1fcf75b025033 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1099,6 +1099,15 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val DRIVER_TIMEOUT = ConfigBuilder("spark.driver.timeout") + .doc("A timeout for Spark driver in minutes. 0 means infinite. For the positive time value, " + + "terminate the driver with the exit code 124 if it runs after timeout duration. To use, " + + "it's required to set `spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin`.") + .version("4.0.0") + .timeConf(TimeUnit.MINUTES) + .checkValue(v => v >= 0, "The value should be a non-negative time value.") + .createWithDefaultString("0min") + private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress") .doc("Address where to bind network listen sockets on the driver.") .version("2.1.0") From 40099c32af9f6e36f045291e94052337bb24a4d1 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 28 Feb 2024 09:23:42 -0800 Subject: [PATCH 2/2] Address comments --- .../org/apache/spark/deploy/DriverTimeoutPlugin.scala | 4 ++-- .../scala/org/apache/spark/util/SparkExitCode.scala | 3 +++ docs/configuration.md | 11 +++++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala index 542d4463807c0..9b141d6075721 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala @@ -25,7 +25,7 @@ import org.apache.spark.SparkContext import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.DRIVER_TIMEOUT -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{SparkExitCode, ThreadUtils} /** * A built-in plugin to provide Driver timeout feature. @@ -51,7 +51,7 @@ class DriverTimeoutDriverPlugin extends DriverPlugin with Logging { logWarning(s"Terminate Driver JVM because it runs after $timeout minute" + (if (timeout == 1) "" else "s")) // We cannot use 'SparkContext.stop' because SparkContext might be in abnormal situation. - System.exit(124) + System.exit(SparkExitCode.DRIVER_TIMEOUT) } timeoutService.schedule(task, timeout, TimeUnit.MINUTES) } diff --git a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala index 75b3d134b94d7..e8f8788243cd9 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala @@ -45,6 +45,9 @@ private[spark] object SparkExitCode { OutOfMemoryError. */ val OOM = 52 + /** Exit because the driver is running over the given threshold. */ + val DRIVER_TIMEOUT = 124 + /** Exception indicate command not found. */ val ERROR_COMMAND_NOT_FOUND = 127 } diff --git a/docs/configuration.md b/docs/configuration.md index f6e1e449e2dcc..f0d68c55e7b39 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -434,6 +434,17 @@ of the most common options to set are: 1.3.0 + + spark.driver.timeout + 0min + + A timeout for Spark driver in minutes. 0 means infinite. For the positive time value, + terminate the driver with the exit code 124 if it runs after timeout duration. To use, + it's required to set spark.plugins with + org.apache.spark.deploy.DriverTimeoutPlugin. + + 4.0.0 + spark.driver.log.localDir (none)