diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala new file mode 100644 index 0000000000000..9b141d6075721 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy + +import java.util.{Map => JMap} +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkContext +import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DRIVER_TIMEOUT +import org.apache.spark.util.{SparkExitCode, ThreadUtils} + +/** + * A built-in plugin to provide Driver timeout feature. + */ +class DriverTimeoutPlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = new DriverTimeoutDriverPlugin() + + // No-op + override def executorPlugin(): ExecutorPlugin = null +} + +class DriverTimeoutDriverPlugin extends DriverPlugin with Logging { + + private val timeoutService: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-timeout") + + override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { + val timeout = sc.conf.get(DRIVER_TIMEOUT) + if (timeout == 0) { + logWarning("Disabled with the timeout value 0.") + } else { + val task: Runnable = () => { + logWarning(s"Terminate Driver JVM because it runs after $timeout minute" + + (if (timeout == 1) "" else "s")) + // We cannot use 'SparkContext.stop' because SparkContext might be in abnormal situation. + System.exit(SparkExitCode.DRIVER_TIMEOUT) + } + timeoutService.schedule(task, timeout, TimeUnit.MINUTES) + } + Map.empty[String, String].asJava + } + + override def shutdown(): Unit = timeoutService.shutdown() +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7caac5884c745..1fcf75b025033 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1099,6 +1099,15 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val DRIVER_TIMEOUT = ConfigBuilder("spark.driver.timeout") + .doc("A timeout for Spark driver in minutes. 0 means infinite. For the positive time value, " + + "terminate the driver with the exit code 124 if it runs after timeout duration. To use, " + + "it's required to set `spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin`.") + .version("4.0.0") + .timeConf(TimeUnit.MINUTES) + .checkValue(v => v >= 0, "The value should be a non-negative time value.") + .createWithDefaultString("0min") + private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress") .doc("Address where to bind network listen sockets on the driver.") .version("2.1.0") diff --git a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala index 75b3d134b94d7..e8f8788243cd9 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala @@ -45,6 +45,9 @@ private[spark] object SparkExitCode { OutOfMemoryError. */ val OOM = 52 + /** Exit because the driver is running over the given threshold. */ + val DRIVER_TIMEOUT = 124 + /** Exception indicate command not found. */ val ERROR_COMMAND_NOT_FOUND = 127 } diff --git a/docs/configuration.md b/docs/configuration.md index f6e1e449e2dcc..f0d68c55e7b39 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -434,6 +434,17 @@ of the most common options to set are:
spark.driver.timeoutspark.plugins with
+ org.apache.spark.deploy.DriverTimeoutPlugin.
+ spark.driver.log.localDir