From 9b54da67483f885bba14596c53e23105391f88f7 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 24 Nov 2020 17:15:40 +0800 Subject: [PATCH 01/12] init --- .../apache/spark/sql/internal/SQLConf.scala | 10 +++++ .../SparkExecuteStatementOperation.scala | 6 ++- .../ThriftServerWithSparkContextSuite.scala | 44 +++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ef974dc176e5..2cea233232c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -937,6 +937,16 @@ object SQLConf { .timeConf(TimeUnit.SECONDS) .createWithDefault(0L) + val THRIFTSERVER_FORCE_CANCEL = + buildConf("spark.sql.thriftServer.forceCancel") + .doc("When true, all the job of query will be cancelled and running tasks will be" + + "interrupted. When false, all the job of query will be cancelled but running task" + + "will be remained until finished. Note that, this config must be set before query" + + "otherwise it doesn't help.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + val THRIFTSERVER_UI_STATEMENT_LIMIT = buildConf("spark.sql.thriftserver.ui.retainedStatements") .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index f7a4be959181..0ced58b1b6d1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -63,6 +63,8 @@ private[hive] class SparkExecuteStatementOperation( } } + private val forceCancel = sqlContext.conf.getConf(SQLConf.THRIFTSERVER_FORCE_CANCEL) + private val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) { new VariableSubstitution().substitute(statement) } @@ -131,7 +133,7 @@ private[hive] class SparkExecuteStatementOperation( def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties { try { - sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement) + sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, forceCancel) getNextRowSetInternal(order, maxRowsL) } finally { sqlContext.sparkContext.clearJobGroup() @@ -321,7 +323,7 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader) } - sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement) + sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, forceCancel) result = sqlContext.sql(statement) logDebug(result.queryExecution.toString()) HiveThriftServer2.eventManager.onStatementParsed(statementId, diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index fd3a638c4fa4..6ccd5695d4df 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -18,9 +18,14 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.SQLException +import java.util.concurrent.atomic.AtomicBoolean import org.apache.hive.service.cli.HiveSQLException +import org.apache.spark.TaskKilled +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.sql.internal.SQLConf + trait ThriftServerWithSparkContextSuite extends SharedThriftServer { test("the scratch dir will be deleted during server start but recreated with new operation") { @@ -79,6 +84,45 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2")) } } + + test("SPARK-33526: Add config to control if interrupt task on thriftserver") { + withJdbcStatement { statement => + val forceCancel = new AtomicBoolean(false) + val listener = new SparkListener { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + taskEnd.reason match { + case _: TaskKilled => + if (forceCancel.get()) { + assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 1000) + } else { + assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 2900) + } + case _ => + } + } + } + + spark.sparkContext.addSparkListener(listener) + try { + statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=1") + statement.execute(s"SET ${SQLConf.THRIFTSERVER_FORCE_CANCEL.key}=false") + forceCancel.set(false) + val e1 = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + }.getMessage + assert(e1.contains("Query timed out")) + + statement.execute(s"SET ${SQLConf.THRIFTSERVER_FORCE_CANCEL.key}=true") + forceCancel.set(true) + val e2 = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + }.getMessage + assert(e2.contains("Query timed out")) + } finally { + spark.sparkContext.removeSparkListener(listener) + } + } + } } From ba382a350751291042b177abce67a50e5aa20540 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 24 Nov 2020 17:17:46 +0800 Subject: [PATCH 02/12] fix --- .../hive/thriftserver/ThriftServerWithSparkContextSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 6ccd5695d4df..866e62cf0986 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -85,7 +85,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { } } - test("SPARK-33526: Add config to control if interrupt task on thriftserver") { + test("SPARK-33526: Add config to control if cancel invoke interrupt task on thriftserver") { withJdbcStatement { statement => val forceCancel = new AtomicBoolean(false) val listener = new SparkListener { From 911da7054abfabb6b3707175483be4a5eebe8729 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 7 Dec 2020 13:07:44 +0800 Subject: [PATCH 03/12] version --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2cea233232c2..7b86ba5e0d84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -943,7 +943,7 @@ object SQLConf { "interrupted. When false, all the job of query will be cancelled but running task" + "will be remained until finished. Note that, this config must be set before query" + "otherwise it doesn't help.") - .version("3.1.0") + .version("3.2.0") .booleanConf .createWithDefault(false) From d1ebbba35946262cff8787ec95190bc5f6db318f Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 7 Dec 2020 14:21:08 +0800 Subject: [PATCH 04/12] check tme --- .../hive/thriftserver/ThriftServerWithSparkContextSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 866e62cf0986..64627ba382e8 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -95,7 +95,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { if (forceCancel.get()) { assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 1000) } else { - assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 2900) + assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 2000) } case _ => } From de892325ef1e44e5fcdb4ffecbb46b15994474c3 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 7 Dec 2020 17:15:10 +0800 Subject: [PATCH 05/12] assert task killed --- .../ThriftServerWithSparkContextSuite.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 64627ba382e8..3c30b79625d5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -90,14 +90,12 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { val forceCancel = new AtomicBoolean(false) val listener = new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - taskEnd.reason match { - case _: TaskKilled => - if (forceCancel.get()) { - assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 1000) - } else { - assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 2000) - } - case _ => + assert(taskEnd.reason.isInstanceOf[TaskKilled]) + if (forceCancel.get()) { + assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 1000) + } else { + // avoid accuracy, we check 2s instead of 3s. + assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 2000) } } } From 9a05f0b70d727d3b3639c16773dbdba40cdd46a7 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 7 Dec 2020 22:09:20 +0800 Subject: [PATCH 06/12] refector --- .../ThriftServerWithSparkContextSuite.scala | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 3c30b79625d5..036eb5850695 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -103,19 +103,14 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { spark.sparkContext.addSparkListener(listener) try { statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=1") - statement.execute(s"SET ${SQLConf.THRIFTSERVER_FORCE_CANCEL.key}=false") - forceCancel.set(false) - val e1 = intercept[SQLException] { - statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") - }.getMessage - assert(e1.contains("Query timed out")) - - statement.execute(s"SET ${SQLConf.THRIFTSERVER_FORCE_CANCEL.key}=true") - forceCancel.set(true) - val e2 = intercept[SQLException] { - statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") - }.getMessage - assert(e2.contains("Query timed out")) + Seq(true, false).foreach { force => + statement.execute(s"SET ${SQLConf.THRIFTSERVER_FORCE_CANCEL.key}=$force") + forceCancel.set(force) + val e1 = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + }.getMessage + assert(e1.contains("Query timed out")) + } } finally { spark.sparkContext.removeSparkListener(listener) } From f8d63843dd75231aebe9740e7df1aa62fc04575f Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 7 Dec 2020 22:10:41 +0800 Subject: [PATCH 07/12] doc --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7b86ba5e0d84..3e4216d2a19a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -941,8 +941,7 @@ object SQLConf { buildConf("spark.sql.thriftServer.forceCancel") .doc("When true, all the job of query will be cancelled and running tasks will be" + "interrupted. When false, all the job of query will be cancelled but running task" + - "will be remained until finished. Note that, this config must be set before query" + - "otherwise it doesn't help.") + "will be remained until finished.") .version("3.2.0") .booleanConf .createWithDefault(false) From cd54e4843d95d0a594d77a651d316f504903f24f Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 8 Dec 2020 09:46:45 +0800 Subject: [PATCH 08/12] doc --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3e4216d2a19a..e5c1a43a78ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -938,9 +938,9 @@ object SQLConf { .createWithDefault(0L) val THRIFTSERVER_FORCE_CANCEL = - buildConf("spark.sql.thriftServer.forceCancel") - .doc("When true, all the job of query will be cancelled and running tasks will be" + - "interrupted. When false, all the job of query will be cancelled but running task" + + buildConf("spark.sql.thriftServer.interruptOnCancel") + .doc("When true, if a running query has been cancelled then all running tasks will be" + + "interrupted. When false, if a running query has been cancelled then all running task " + "will be remained until finished.") .version("3.2.0") .booleanConf From c9c5cd3ab38aefa4d157a316b0038d1ec0a81e0f Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 8 Dec 2020 15:34:40 +0800 Subject: [PATCH 09/12] doc --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e5c1a43a78ed..1de8880e977c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -939,9 +939,8 @@ object SQLConf { val THRIFTSERVER_FORCE_CANCEL = buildConf("spark.sql.thriftServer.interruptOnCancel") - .doc("When true, if a running query has been cancelled then all running tasks will be" + - "interrupted. When false, if a running query has been cancelled then all running task " + - "will be remained until finished.") + .doc("When true, all running tasks will be interrupted if one cancels a query. " + + "When false, all running tasks will be remained until finished.") .version("3.2.0") .booleanConf .createWithDefault(false) From 921f9cfc6191da49a8adff55fa9750a3874284a1 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Wed, 9 Dec 2020 20:48:01 +0800 Subject: [PATCH 10/12] doc --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1de8880e977c..289f67bb81cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -932,7 +932,8 @@ object SQLConf { "a positive value, a running query will be cancelled automatically when the timeout is " + "exceeded, otherwise the query continues to run till completion. If timeout values are " + "set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller " + - "than this configuration value, they take precedence.") + "than this configuration value, they take precedence. After set this config, you may be " + + "interesting in spark.sql.thriftServer.interruptOnCancel which can help interrupt task.") .version("3.1.0") .timeConf(TimeUnit.SECONDS) .createWithDefault(0L) From 99ec8640178c4097c4e53e76d7ffa07d821edef9 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 11 Dec 2020 13:54:34 +0800 Subject: [PATCH 11/12] doc --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 289f67bb81cb..1b8e096fecc7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -932,8 +932,9 @@ object SQLConf { "a positive value, a running query will be cancelled automatically when the timeout is " + "exceeded, otherwise the query continues to run till completion. If timeout values are " + "set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller " + - "than this configuration value, they take precedence. After set this config, you may be " + - "interesting in spark.sql.thriftServer.interruptOnCancel which can help interrupt task.") + "than this configuration value, they take precedence. If you set this timeout and prefer" + + "to cancel the queries right away without waiting task to finish, consider enabling" + + s"${THRIFTSERVER_FORCE_CANCEL.key} together.") .version("3.1.0") .timeConf(TimeUnit.SECONDS) .createWithDefault(0L) @@ -941,7 +942,7 @@ object SQLConf { val THRIFTSERVER_FORCE_CANCEL = buildConf("spark.sql.thriftServer.interruptOnCancel") .doc("When true, all running tasks will be interrupted if one cancels a query. " + - "When false, all running tasks will be remained until finished.") + "When false, all running tasks will remain until finished.") .version("3.2.0") .booleanConf .createWithDefault(false) From 0466c0cc3bb75a0315c168e4612ea044e83ce50b Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 11 Dec 2020 14:17:24 +0800 Subject: [PATCH 12/12] place --- .../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1b8e096fecc7..75a510d389cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -926,6 +926,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val THRIFTSERVER_FORCE_CANCEL = + buildConf("spark.sql.thriftServer.interruptOnCancel") + .doc("When true, all running tasks will be interrupted if one cancels a query. " + + "When false, all running tasks will remain until finished.") + .version("3.2.0") + .booleanConf + .createWithDefault(false) + val THRIFTSERVER_QUERY_TIMEOUT = buildConf("spark.sql.thriftServer.queryTimeout") .doc("Set a query duration timeout in seconds in Thrift Server. If the timeout is set to " + @@ -939,14 +947,6 @@ object SQLConf { .timeConf(TimeUnit.SECONDS) .createWithDefault(0L) - val THRIFTSERVER_FORCE_CANCEL = - buildConf("spark.sql.thriftServer.interruptOnCancel") - .doc("When true, all running tasks will be interrupted if one cancels a query. " + - "When false, all running tasks will remain until finished.") - .version("3.2.0") - .booleanConf - .createWithDefault(false) - val THRIFTSERVER_UI_STATEMENT_LIMIT = buildConf("spark.sql.thriftserver.ui.retainedStatements") .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")