From bcdd070029cb667fe177bea088a3d50363948ba5 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Wed, 5 Jun 2019 11:58:09 -0400 Subject: [PATCH 01/18] add counter to monitor number of active SparkSessions --- .../org/apache/spark/sql/SparkSession.scala | 46 +++++++++++++++---- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 0b5bf3f48b593..6f077c6b80f2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.io.Closeable import java.util.concurrent.TimeUnit._ -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -711,12 +711,16 @@ class SparkSession private( // scalastyle:on /** - * Stop the underlying `SparkContext`. + * Stop the underlying `SparkContext` if there are are no active sessions remaining. * * @since 2.0.0 */ def stop(): Unit = { - sparkContext.stop() + if (SparkSession.numActiveSessions.get() == 0) { + sparkContext.stop() + } else { + SparkSession.clearActiveSession() + } } /** @@ -776,6 +780,8 @@ class SparkSession private( @Stable object SparkSession extends Logging { + private[spark] val numActiveSessions: AtomicInteger = new AtomicInteger(0) + /** * Builder for [[SparkSession]]. */ @@ -958,6 +964,8 @@ object SparkSession extends Logging { sparkContext.addSparkListener(new SparkListener { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { defaultSession.set(null) + // Should remove listener after this event fires + sparkContext.removeSparkListener(this) } }) } @@ -981,17 +989,30 @@ object SparkSession extends Logging { * @since 2.0.0 */ def setActiveSession(session: SparkSession): Unit = { - activeThreadSession.set(session) + if (getActiveSession.isEmpty + || (session != getActiveSession.get && getActiveSession.isDefined)) { + numActiveSessions.getAndIncrement + activeThreadSession.set(session) + } else if (session == null) { + this.clearActiveSession() + } } /** - * Clears the active SparkSession for current thread. Subsequent calls to getOrCreate will - * return the first created context instead of a thread-local override. + * Clears the active SparkSession for current thread assuming it is defined. + * Subsequent calls to getOrCreate will return the first created context + * instead of a thread-local override. * * @since 2.0.0 */ def clearActiveSession(): Unit = { - activeThreadSession.remove() + if (getActiveSession.isDefined) { + activeThreadSession.remove() + numActiveSessions.decrementAndGet() + } else { + logWarning("Calling clearActiveSession() on a SparkSession " + + "without an active session is a noop.") + } } /** @@ -1004,12 +1025,17 @@ object SparkSession extends Logging { } /** - * Clears the default SparkSession that is returned by the builder. - * + * Clears the default SparkSession that is returned by the builder + * if it is not null. * @since 2.0.0 */ def clearDefaultSession(): Unit = { - defaultSession.set(null) + if (getDefaultSession.isDefined) { + defaultSession.set(null) + } else { + logWarning("Calling clearDefaultSession() on a SparkSession " + + "without an default session is a noop.") + } } /** From f333a30fdf30f8556e7c4d8bf20e1e4d27592068 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Wed, 5 Jun 2019 18:20:08 -0400 Subject: [PATCH 02/18] clear session always in stop() --- .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 6f077c6b80f2f..fd4189c51a519 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -716,11 +716,10 @@ class SparkSession private( * @since 2.0.0 */ def stop(): Unit = { + SparkSession.clearActiveSession() if (SparkSession.numActiveSessions.get() == 0) { sparkContext.stop() - } else { - SparkSession.clearActiveSession() - } + } } /** From 8fc95e95af7ba699cf919f085a4a99aaff182d7a Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Thu, 6 Jun 2019 10:27:21 -0400 Subject: [PATCH 03/18] added tests --- .../org/apache/spark/sql/SparkSession.scala | 2 +- .../sql/SparkSessionLifecycleSuite.scala | 66 +++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/SparkSessionLifecycleSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index fd4189c51a519..c903aa59b2495 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -719,7 +719,7 @@ class SparkSession private( SparkSession.clearActiveSession() if (SparkSession.numActiveSessions.get() == 0) { sparkContext.stop() - } + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionLifecycleSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionLifecycleSuite.scala new file mode 100644 index 0000000000000..951daa70457b9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionLifecycleSuite.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.config.UI.UI_ENABLED + + +/** + * Test cases for the lifecycle of a [[SparkSession]]. + */ +class SparkSessionLifecycleSuite extends SparkFunSuite { + test("test SparkContext stopped when last SparkSession is stopped ") { + val session1 = SparkSession.builder() + .master("local") + .config(UI_ENABLED.key, value = false) + .config("some-config", "a") + .getOrCreate() + + assert(!session1.sparkContext.isStopped) + + val session2 = SparkSession.builder() + .master("local") + .config(UI_ENABLED.key, value = false) + .config("some-config", "b") + .getOrCreate() + + session1.stop() + session2.stop() + assert(session1.sparkContext.isStopped) + } + + test("test SparkContext is not stopped when other sessions exist") { + val session1 = SparkSession.builder() + .master("local") + .config(UI_ENABLED.key, value = false) + .config("some-config", "a") + .getOrCreate() + + assert(!session1.sparkContext.isStopped) + + val session2 = SparkSession.builder() + .master("local") + .config(UI_ENABLED.key, value = false) + .config("some-config", "b") + .getOrCreate() + + session1.stop() + assert(!session1.sparkContext.isStopped) + } +} From f69cabb02b65cbcaa4a873a851243dfa902a69d6 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Mon, 10 Jun 2019 10:57:31 -0400 Subject: [PATCH 04/18] responding to PR comments --- .../org/apache/spark/sql/SparkSession.scala | 8 +-- .../spark/sql/SparkSessionBuilderSuite.scala | 23 ++++++- .../sql/SparkSessionLifecycleSuite.scala | 66 ------------------- 3 files changed, 23 insertions(+), 74 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/SparkSessionLifecycleSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index c903aa59b2495..064a2272fc7be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -711,7 +711,7 @@ class SparkSession private( // scalastyle:on /** - * Stop the underlying `SparkContext` if there are are no active sessions remaining. + * Stop the underlying `SparkContext` if there are no active sessions remaining. * * @since 2.0.0 */ @@ -1008,9 +1008,6 @@ object SparkSession extends Logging { if (getActiveSession.isDefined) { activeThreadSession.remove() numActiveSessions.decrementAndGet() - } else { - logWarning("Calling clearActiveSession() on a SparkSession " + - "without an active session is a noop.") } } @@ -1031,9 +1028,6 @@ object SparkSession extends Logging { def clearDefaultSession(): Unit = { if (getDefaultSession.isDefined) { defaultSession.set(null) - } else { - logWarning("Calling clearDefaultSession() on a SparkSession " + - "without an default session is a noop.") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 10b17571d2aaa..83484983bb5e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -152,4 +152,25 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { session.sparkContext.hadoopConfiguration.unset(mySpecialKey) } } -} + + test("test SparkContext stopped when last SparkSession is stopped ") { + val session1 = SparkSession.builder() + .master("local") + .config(UI_ENABLED.key, value = false) + .config("some-config", "a") + .getOrCreate() + + assert(!session1.sparkContext.isStopped) + + val session2 = SparkSession.builder() + .master("local") + .config(UI_ENABLED.key, value = false) + .config("some-config", "b") + .getOrCreate() + + session1.stop() + assert(!session1.sparkContext.isStopped) + session2.stop() + assert(session1.sparkContext.isStopped) + } +} \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionLifecycleSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionLifecycleSuite.scala deleted file mode 100644 index 951daa70457b9..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionLifecycleSuite.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.SparkFunSuite -import org.apache.spark.internal.config.UI.UI_ENABLED - - -/** - * Test cases for the lifecycle of a [[SparkSession]]. - */ -class SparkSessionLifecycleSuite extends SparkFunSuite { - test("test SparkContext stopped when last SparkSession is stopped ") { - val session1 = SparkSession.builder() - .master("local") - .config(UI_ENABLED.key, value = false) - .config("some-config", "a") - .getOrCreate() - - assert(!session1.sparkContext.isStopped) - - val session2 = SparkSession.builder() - .master("local") - .config(UI_ENABLED.key, value = false) - .config("some-config", "b") - .getOrCreate() - - session1.stop() - session2.stop() - assert(session1.sparkContext.isStopped) - } - - test("test SparkContext is not stopped when other sessions exist") { - val session1 = SparkSession.builder() - .master("local") - .config(UI_ENABLED.key, value = false) - .config("some-config", "a") - .getOrCreate() - - assert(!session1.sparkContext.isStopped) - - val session2 = SparkSession.builder() - .master("local") - .config(UI_ENABLED.key, value = false) - .config("some-config", "b") - .getOrCreate() - - session1.stop() - assert(!session1.sparkContext.isStopped) - } -} From 3fafd2a22873ed4cc688d6ea663ef86f1919115d Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Mon, 10 Jun 2019 10:59:16 -0400 Subject: [PATCH 05/18] cleanup --- .../spark/sql/SparkSessionBuilderSuite.scala | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 83484983bb5e3..4a64e109116d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -154,20 +154,9 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { } test("test SparkContext stopped when last SparkSession is stopped ") { - val session1 = SparkSession.builder() - .master("local") - .config(UI_ENABLED.key, value = false) - .config("some-config", "a") - .getOrCreate() - + val session1 = SparkSession.builder().master("local").getOrCreate() assert(!session1.sparkContext.isStopped) - - val session2 = SparkSession.builder() - .master("local") - .config(UI_ENABLED.key, value = false) - .config("some-config", "b") - .getOrCreate() - + val session2 = SparkSession.builder().master("local").getOrCreate() session1.stop() assert(!session1.sparkContext.isStopped) session2.stop() From 0c9c4269fd2703c160a9b47354fd1bb379f2070e Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Mon, 10 Jun 2019 11:00:52 -0400 Subject: [PATCH 06/18] adding ticket number to test --- .../scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 4a64e109116d5..a06e783b1bc5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -153,7 +153,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { } } - test("test SparkContext stopped when last SparkSession is stopped ") { + test("SPARK-27958: SparkContext stopped when last SparkSession is stopped ") { val session1 = SparkSession.builder().master("local").getOrCreate() assert(!session1.sparkContext.isStopped) val session2 = SparkSession.builder().master("local").getOrCreate() From 843491fbd21f79614e8eff9e1af49ba263898451 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Mon, 10 Jun 2019 12:45:54 -0400 Subject: [PATCH 07/18] style --- .../scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index a06e783b1bc5c..4b59d70a044a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -162,4 +162,4 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { session2.stop() assert(session1.sparkContext.isStopped) } -} \ No newline at end of file +} From 92c7b22ab5a1db0f25b4b418277b793b0e851491 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Fri, 14 Jun 2019 13:41:26 -0400 Subject: [PATCH 08/18] addressing sean's PR and updating test --- .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 3 +-- .../org/apache/spark/sql/SparkSessionBuilderSuite.scala | 6 ++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 064a2272fc7be..d8b88d64273be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -988,8 +988,7 @@ object SparkSession extends Logging { * @since 2.0.0 */ def setActiveSession(session: SparkSession): Unit = { - if (getActiveSession.isEmpty - || (session != getActiveSession.get && getActiveSession.isDefined)) { + if (session != getActiveSession.get && getActiveSession.isDefined) { numActiveSessions.getAndIncrement activeThreadSession.set(session) } else if (session == null) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 4b59d70a044a2..615d32226db13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -154,9 +154,11 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { } test("SPARK-27958: SparkContext stopped when last SparkSession is stopped ") { - val session1 = SparkSession.builder().master("local").getOrCreate() + val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1") + val newSC = new SparkContext(conf) + val session1 = SparkSession.builder().sparkContext(newSC).master("local").getOrCreate() assert(!session1.sparkContext.isStopped) - val session2 = SparkSession.builder().master("local").getOrCreate() + val session2 = SparkSession.builder().sparkContext(newSC).master("local").getOrCreate() session1.stop() assert(!session1.sparkContext.isStopped) session2.stop() From 61c6fed06b4b2c5826c386354ce52020b18102a6 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Fri, 27 Mar 2020 15:51:48 -0400 Subject: [PATCH 09/18] first iteration of new proposal --- .../org/apache/spark/sql/SparkSession.scala | 170 +++++++++++++----- 1 file changed, 127 insertions(+), 43 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 62fd0569c4882..737b1faf8e4d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -18,19 +18,20 @@ package org.apache.spark.sql import java.io.Closeable +import java.util.UUID import java.util.concurrent.TimeUnit._ -import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal - import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext} import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.sql.SparkSession.defaultSession import org.apache.spark.sql.catalog.Catalog import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -87,6 +88,28 @@ class SparkSession private( // The call site where this SparkSession was constructed. private val creationSite: CallSite = Utils.getCallSite() + private val sessionListener: SparkListener = new SparkListener { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + defaultSession.set(null) + } + } + // Used to manage state in the the `SparkSession` singleton + private[spark] val sessionId: UUID = UUID.randomUUID + private[spark] val terminated: AtomicBoolean = new AtomicBoolean(false) + sparkContext.addSparkListener(sessionListener) + + private[spark] def assertNotTerminated(): Unit = { + if (terminated.get()) { + throw new IllegalStateException( + s"""Cannot call methods on a terminated SparkSession. + |This terminated SparkSession was created at: + | + |${creationSite.longForm} + """.stripMargin) + } + } + + /** * Constructor used in Pyspark. Contains explicit application of Spark Session Extensions * which otherwise only occurs during getOrCreate. We cannot add this to the default constructor @@ -108,6 +131,8 @@ class SparkSession private( .getOrElse(SQLConf.getFallbackConf) }) + def removeListener(): Unit = sparkContext.removeSparkListener(sessionListener) + /** * The version of Spark on which this application is running. * @@ -284,6 +309,7 @@ class SparkSession private( * @return 2.0.0 */ def emptyDataset[T: Encoder]: Dataset[T] = { + assertNotTerminated() val encoder = implicitly[Encoder[T]] new Dataset(self, LocalRelation(encoder.schema.toAttributes), encoder) } @@ -294,6 +320,7 @@ class SparkSession private( * @since 2.0.0 */ def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = withActive { + assertNotTerminated() val encoder = Encoders.product[A] Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder)) } @@ -304,6 +331,7 @@ class SparkSession private( * @since 2.0.0 */ def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = withActive { + assertNotTerminated() val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes Dataset.ofRows(self, LocalRelation.fromProduct(attributeSeq, data)) @@ -342,6 +370,7 @@ class SparkSession private( */ @DeveloperApi def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = withActive { + assertNotTerminated() // TODO: use MutableProjection when rowRDD is another DataFrame and the applied // schema differs from the existing schema on any field data type. val encoder = RowEncoder(schema) @@ -359,6 +388,7 @@ class SparkSession private( */ @DeveloperApi def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = { + assertNotTerminated() createDataFrame(rowRDD.rdd, schema) } @@ -372,6 +402,7 @@ class SparkSession private( */ @DeveloperApi def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = withActive { + assertNotTerminated() Dataset.ofRows(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala)) } @@ -384,6 +415,7 @@ class SparkSession private( * @since 2.0.0 */ def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = withActive { + assertNotTerminated() val attributeSeq: Seq[AttributeReference] = getSchema(beanClass) val className = beanClass.getName val rowRdd = rdd.mapPartitions { iter => @@ -402,6 +434,7 @@ class SparkSession private( * @since 2.0.0 */ def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = { + assertNotTerminated() createDataFrame(rdd.rdd, beanClass) } @@ -413,6 +446,7 @@ class SparkSession private( * @since 1.6.0 */ def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame = withActive { + assertNotTerminated() val attrSeq = getSchema(beanClass) val rows = SQLContext.beansToRows(data.asScala.iterator, beanClass, attrSeq) Dataset.ofRows(self, LocalRelation(attrSeq, rows.toSeq)) @@ -424,6 +458,7 @@ class SparkSession private( * @since 2.0.0 */ def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { + assertNotTerminated() Dataset.ofRows(self, LogicalRelation(baseRelation)) } @@ -459,6 +494,7 @@ class SparkSession private( * @since 2.0.0 */ def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = { + assertNotTerminated() // `ExpressionEncoder` is not thread-safe, here we create a new encoder. val enc = encoderFor[T].copy() val attributes = enc.schema.toAttributes @@ -476,6 +512,7 @@ class SparkSession private( * @since 2.0.0 */ def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = { + assertNotTerminated() Dataset[T](self, ExternalRDD(data, self)) } @@ -495,6 +532,7 @@ class SparkSession private( * @since 2.0.0 */ def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T] = { + assertNotTerminated() createDataset(data.asScala) } @@ -504,7 +542,10 @@ class SparkSession private( * * @since 2.0.0 */ - def range(end: Long): Dataset[java.lang.Long] = range(0, end) + def range(end: Long): Dataset[java.lang.Long] = { + assertNotTerminated() + range(0, end) + } /** * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements @@ -513,6 +554,7 @@ class SparkSession private( * @since 2.0.0 */ def range(start: Long, end: Long): Dataset[java.lang.Long] = { + assertNotTerminated() range(start, end, step = 1, numPartitions = sparkContext.defaultParallelism) } @@ -523,6 +565,7 @@ class SparkSession private( * @since 2.0.0 */ def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = { + assertNotTerminated() range(start, end, step, numPartitions = sparkContext.defaultParallelism) } @@ -534,6 +577,7 @@ class SparkSession private( * @since 2.0.0 */ def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] = { + assertNotTerminated() new Dataset(self, Range(start, end, step, numPartitions), Encoders.LONG) } @@ -544,6 +588,7 @@ class SparkSession private( catalystRows: RDD[InternalRow], schema: StructType, isStreaming: Boolean = false): DataFrame = { + assertNotTerminated() // TODO: use MutableProjection when rowRDD is another DataFrame and the applied // schema differs from the existing schema on any field data type. val logicalPlan = LogicalRDD( @@ -577,14 +622,17 @@ class SparkSession private( * @since 2.0.0 */ def table(tableName: String): DataFrame = { + assertNotTerminated() table(sessionState.sqlParser.parseMultipartIdentifier(tableName)) } private[sql] def table(multipartIdentifier: Seq[String]): DataFrame = { + assertNotTerminated() Dataset.ofRows(self, UnresolvedRelation(multipartIdentifier)) } private[sql] def table(tableIdent: TableIdentifier): DataFrame = { + assertNotTerminated() Dataset.ofRows(self, UnresolvedRelation(tableIdent)) } @@ -599,6 +647,7 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = withActive { + assertNotTerminated() val tracker = new QueryPlanningTracker val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) { sessionState.sqlParser.parsePlan(sqlText) @@ -623,6 +672,7 @@ class SparkSession private( */ @Unstable def executeCommand(runner: String, command: String, options: Map[String, String]): DataFrame = { + assertNotTerminated() DataSource.lookupDataSource(runner, sessionState.conf) match { case source if classOf[ExternalCommandRunner].isAssignableFrom(source) => Dataset.ofRows(self, ExternalCommandExecutor( @@ -643,7 +693,10 @@ class SparkSession private( * * @since 2.0.0 */ - def read: DataFrameReader = new DataFrameReader(self) + def read: DataFrameReader = { + assertNotTerminated() + new DataFrameReader(self) + } /** * Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`. @@ -654,7 +707,10 @@ class SparkSession private( * * @since 2.0.0 */ - def readStream: DataStreamReader = new DataStreamReader(self) + def readStream: DataStreamReader = { + assertNotTerminated() + new DataStreamReader(self) + } /** * Executes some code block and prints to stdout the time taken to execute the block. This is @@ -691,23 +747,55 @@ class SparkSession private( // scalastyle:on /** - * Stop the underlying `SparkContext` if there are no active sessions remaining. + * Lifecycle method that cleans up state of spark session, and mark session + * "ended" forever. This differs from `stop()` or `stopContext()` as it keeps + * the underlying `SparkContext` alive, while only getting + */ + + def terminate(): Unit = { + // Session is still active + if (!terminated.get()) { + sparkContext.removeSparkListener(this.sessionListener) + SparkSession.removeTerminatedSession(sessionId) + terminated.set(true) + } + else { + throw new IllegalStateException( + s"""Cannot call methods on a terminated SparkSession. Call + |getOrCreate() to create a new session. + |""".stripMargin) + } + } + + /** + * Stop the underlying `SparkContext`. * * @since 2.0.0 */ - def stop(): Unit = { - SparkSession.clearActiveSession() - if (SparkSession.numActiveSessions.get() == 0) { - sparkContext.stop() + def stopContext(): Unit = { + if (!terminated.get()) { + // stopping the context should also terminate this session + terminate() + SparkSession.removeTerminatedSession(sessionId) + SparkSession.clearDefaultSession() + SparkSession.clearActiveSession() } + sparkContext.stop() } + /** + * Synonym for `stopContext()`. + * + * @since 2.0.0 + */ + def stop(): Unit = stopContext() + /** * Synonym for `stop()`. * * @since 2.1.0 */ - override def close(): Unit = stop() + override def close(): Unit = stopContext() /** * Parses the data type in our internal string representation. The data type string should @@ -773,8 +861,6 @@ class SparkSession private( @Stable object SparkSession extends Logging { - private[spark] val numActiveSessions: AtomicInteger = new AtomicInteger(0) - /** * Builder for [[SparkSession]]. */ @@ -918,7 +1004,7 @@ object SparkSession extends Logging { // Global synchronization so we will only set the default session once. SparkSession.synchronized { - // If the current thread does not have an active session, get it from the global session. + // If the current thread does not have an active session, get it from the default session. session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } @@ -950,17 +1036,6 @@ object SparkSession extends Logging { options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) } setDefaultSession(session) setActiveSession(session) - - // Register a successfully instantiated context to the singleton. This should be at the - // end of the class definition so that the singleton is updated only if there is no - // exception in the construction of the instance. - sparkContext.addSparkListener(new SparkListener { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - defaultSession.set(null) - // Should remove listener after this event fires - sparkContext.removeSparkListener(this) - } - }) } return session @@ -974,6 +1049,23 @@ object SparkSession extends Logging { */ def builder(): Builder = new Builder + def removeTerminatedSession(sessionId: UUID): Unit = { + // clean up active session + if (getActiveSession.isDefined) { + val activeSession = getActiveSession.get + if(sessionId.equals(activeSession.sessionId)) { + clearActiveSession() + } + } + // clean up default session + if (getDefaultSession.isDefined) { + val defaultSession = getDefaultSession.get + if(sessionId.equals(defaultSession.sessionId)) { + clearDefaultSession() + } + } + } + /** * Changes the SparkSession that will be returned in this thread and its children when * SparkSession.getOrCreate() is called. This can be used to ensure that a given thread receives @@ -982,26 +1074,18 @@ object SparkSession extends Logging { * @since 2.0.0 */ def setActiveSession(session: SparkSession): Unit = { - if (session != getActiveSession.get && getActiveSession.isDefined) { - numActiveSessions.getAndIncrement - activeThreadSession.set(session) - } else if (session == null) { - this.clearActiveSession() - } + activeThreadSession.set(session) } /** - * Clears the active SparkSession for current thread assuming it is defined. - * Subsequent calls to getOrCreate will return the first created context - * instead of a thread-local override. + * Clears the active SparkSession for current thread. Subsequent calls to getOrCreate will + * return the first created context instead of a thread-local override. * * @since 2.0.0 */ def clearActiveSession(): Unit = { - if (getActiveSession.isDefined) { - activeThreadSession.remove() - numActiveSessions.decrementAndGet() - } + activeThreadSession.remove() + } /** @@ -1014,14 +1098,12 @@ object SparkSession extends Logging { } /** - * Clears the default SparkSession that is returned by the builder - * if it is not null. + * Clears the default SparkSession that is returned by the builder. + * * @since 2.0.0 */ def clearDefaultSession(): Unit = { - if (getDefaultSession.isDefined) { - defaultSession.set(null) - } + defaultSession.set(null) } /** @@ -1082,6 +1164,7 @@ object SparkSession extends Logging { private def sessionStateClassName(conf: SparkConf): String = { conf.get(CATALOG_IMPLEMENTATION) match { + case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME case "in-memory" => classOf[SessionStateBuilder].getCanonicalName } @@ -1089,6 +1172,7 @@ object SparkSession extends Logging { private def assertOnDriver(): Unit = { if (Utils.isTesting && TaskContext.get != null) { + defaultSession.get().sessionListener // we're accessing it during task execution, fail. throw new IllegalStateException( "SparkSession should only be created and accessed on the driver.") From 387acb14a44d3ab2c12515e674a19eb924b20019 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Sun, 5 Apr 2020 10:31:32 -0400 Subject: [PATCH 10/18] testing and style --- .../org/apache/spark/sql/SparkSession.scala | 35 ++++++++++--------- .../spark/sql/SparkSessionBuilderSuite.scala | 14 ++++---- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 45fa3ab9419bd..7531b83fe97c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -88,18 +88,18 @@ class SparkSession private( // The call site where this SparkSession was constructed. private val creationSite: CallSite = Utils.getCallSite() - private val sessionListener: SparkListener = new SparkListener { + private val _sessionListener: SparkListener = new SparkListener { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { defaultSession.set(null) } } // Used to manage state in the the `SparkSession` singleton - private[spark] val sessionId: UUID = UUID.randomUUID - private[spark] val terminated: AtomicBoolean = new AtomicBoolean(false) - sparkContext.addSparkListener(sessionListener) + private[spark] val _sessionId: UUID = UUID.randomUUID + private[spark] val _terminated: AtomicBoolean = new AtomicBoolean(false) + sparkContext.addSparkListener(_sessionListener) private[spark] def assertNotTerminated(): Unit = { - if (terminated.get()) { + if (_terminated.get()) { throw new IllegalStateException( s"""Cannot call methods on a terminated SparkSession. |This terminated SparkSession was created at: @@ -109,7 +109,6 @@ class SparkSession private( } } - /** * Constructor used in Pyspark. Contains explicit application of Spark Session Extensions * which otherwise only occurs during getOrCreate. We cannot add this to the default constructor @@ -131,7 +130,7 @@ class SparkSession private( .getOrElse(SQLConf.getFallbackConf) }) - def removeListener(): Unit = sparkContext.removeSparkListener(sessionListener) + def removeListener(): Unit = sparkContext.removeSparkListener(_sessionListener) /** * The version of Spark on which this application is running. @@ -301,7 +300,10 @@ class SparkSession private( * @since 2.0.0 */ @transient - lazy val emptyDataFrame: DataFrame = Dataset.ofRows(self, LocalRelation()) + lazy val emptyDataFrame: DataFrame = { + assertNotTerminated() + Dataset.ofRows(self, LocalRelation()) + } /** * Creates a new [[Dataset]] of type T containing zero elements. @@ -754,10 +756,10 @@ class SparkSession private( def terminate(): Unit = { // Session is still active - if (!terminated.get()) { - sparkContext.removeSparkListener(this.sessionListener) - SparkSession.removeTerminatedSession(sessionId) - terminated.set(true) + if (!_terminated.get()) { + sparkContext.removeSparkListener(this._sessionListener) + SparkSession.removeTerminatedSession(_sessionId) + _terminated.set(true) } else { throw new IllegalStateException( @@ -773,10 +775,10 @@ class SparkSession private( * @since 2.0.0 */ def stopContext(): Unit = { - if (!terminated.get()) { + if (!_terminated.get()) { // stopping the context should also terminate this session terminate() - SparkSession.removeTerminatedSession(sessionId) + SparkSession.removeTerminatedSession(_sessionId) SparkSession.clearDefaultSession() SparkSession.clearActiveSession() } @@ -1053,14 +1055,14 @@ object SparkSession extends Logging { // clean up active session if (getActiveSession.isDefined) { val activeSession = getActiveSession.get - if(sessionId.equals(activeSession.sessionId)) { + if(sessionId.equals(activeSession._sessionId)) { clearActiveSession() } } // clean up default session if (getDefaultSession.isDefined) { val defaultSession = getDefaultSession.get - if(sessionId.equals(defaultSession.sessionId)) { + if(sessionId.equals(defaultSession._sessionId)) { clearDefaultSession() } } @@ -1085,7 +1087,6 @@ object SparkSession extends Logging { */ def clearActiveSession(): Unit = { activeThreadSession.remove() - } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 615d32226db13..249c900c17543 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -153,15 +153,17 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { } } - test("SPARK-27958: SparkContext stopped when last SparkSession is stopped ") { + test("SPARK-27958: Terminating a SparkSession") { val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1") val newSC = new SparkContext(conf) val session1 = SparkSession.builder().sparkContext(newSC).master("local").getOrCreate() assert(!session1.sparkContext.isStopped) - val session2 = SparkSession.builder().sparkContext(newSC).master("local").getOrCreate() - session1.stop() - assert(!session1.sparkContext.isStopped) - session2.stop() - assert(session1.sparkContext.isStopped) + assert(SparkSession.getActiveSession.isDefined) + session1.emptyDataFrame + session1.terminate() + assertThrows[IllegalStateException] { + session1.emptyDataFrame + } + assert(SparkSession.getActiveSession.isEmpty) } } From 99c5f6428b0322271bc0155ef1c71f37a3ba7ddc Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Sun, 5 Apr 2020 10:49:25 -0400 Subject: [PATCH 11/18] style fix --- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 7531b83fe97c8..ed789db500bee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal + import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext} import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable} import org.apache.spark.api.java.JavaRDD From 5a1e0ab95bc247a07a699dfefa94776d2a865e54 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Wed, 6 May 2020 15:44:32 -0400 Subject: [PATCH 12/18] remove unnecessary s --- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index ceefee98719bc..029f7e3ec3597 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -767,7 +767,7 @@ class SparkSession private( } else { throw new IllegalStateException( - s"""Cannot call methods on a terminated SparkSession. Call + """Cannot call methods on a terminated SparkSession. Call |getOrCreate() to create a new session. |""".stripMargin) } From 0c7e9df39991e141500daf5c4028addd68f44231 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Tue, 12 May 2020 17:10:39 -0400 Subject: [PATCH 13/18] remove lifecycle methods - context tracks listener --- .../scala/org/apache/spark/SparkContext.scala | 10 ++ .../org/apache/spark/sql/SparkSession.scala | 139 +++--------------- .../spark/sql/SparkSessionBuilderSuite.scala | 25 ++-- 3 files changed, 41 insertions(+), 133 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5c92527b7b80e..f176a9fb6004e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -91,6 +91,7 @@ class SparkContext(config: SparkConf) extends Logging { val startTime = System.currentTimeMillis() private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false) + private[spark] val sessionListenerRegistered: AtomicBoolean = new AtomicBoolean(false) private[spark] def assertNotStopped(): Unit = { if (stopped.get()) { @@ -256,6 +257,15 @@ class SparkContext(config: SparkConf) extends Logging { */ def isStopped: Boolean = stopped.get() + def isSessionListenerRegistered: Boolean = sessionListenerRegistered.get() + + def registerSessionListener(listener: SparkListenerInterface) { + if (!isSessionListenerRegistered) { + addSparkListener(listener) + sessionListenerRegistered.set(true) + } + } + private[spark] def statusStore: AppStatusStore = _statusStore // An asynchronous listener bus for Spark events diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 029f7e3ec3597..b353b6d4a0b0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql import java.io.Closeable -import java.util.UUID import java.util.concurrent.TimeUnit._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} @@ -32,7 +31,6 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} -import org.apache.spark.sql.SparkSession.defaultSession import org.apache.spark.sql.catalog.Catalog import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -51,7 +49,6 @@ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.ExecutionListenerManager import org.apache.spark.util.{CallSite, Utils} - /** * The entry point to programming Spark with the Dataset and DataFrame API. * @@ -89,27 +86,6 @@ class SparkSession private( // The call site where this SparkSession was constructed. private val creationSite: CallSite = Utils.getCallSite() - private val _sessionListener: SparkListener = new SparkListener { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - defaultSession.set(null) - } - } - // Used to manage state in the the `SparkSession` singleton - private[spark] val _sessionId: UUID = UUID.randomUUID - private[spark] val _terminated: AtomicBoolean = new AtomicBoolean(false) - sparkContext.addSparkListener(_sessionListener) - - private[spark] def assertNotTerminated(): Unit = { - if (_terminated.get()) { - throw new IllegalStateException( - s"""Cannot call methods on a terminated SparkSession. - |This terminated SparkSession was created at: - | - |${creationSite.longForm} - """.stripMargin) - } - } - /** * Constructor used in Pyspark. Contains explicit application of Spark Session Extensions * which otherwise only occurs during getOrCreate. We cannot add this to the default constructor @@ -131,8 +107,6 @@ class SparkSession private( .getOrElse(SQLConf.getFallbackConf) }) - def removeListener(): Unit = sparkContext.removeSparkListener(_sessionListener) - /** * The version of Spark on which this application is running. * @@ -301,10 +275,7 @@ class SparkSession private( * @since 2.0.0 */ @transient - lazy val emptyDataFrame: DataFrame = { - assertNotTerminated() - Dataset.ofRows(self, LocalRelation()) - } + lazy val emptyDataFrame: DataFrame = Dataset.ofRows(self, LocalRelation()) /** * Creates a new [[Dataset]] of type T containing zero elements. @@ -312,7 +283,6 @@ class SparkSession private( * @return 2.0.0 */ def emptyDataset[T: Encoder]: Dataset[T] = { - assertNotTerminated() val encoder = implicitly[Encoder[T]] new Dataset(self, LocalRelation(encoder.schema.toAttributes), encoder) } @@ -323,7 +293,6 @@ class SparkSession private( * @since 2.0.0 */ def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = withActive { - assertNotTerminated() val encoder = Encoders.product[A] Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder)) } @@ -334,7 +303,6 @@ class SparkSession private( * @since 2.0.0 */ def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = withActive { - assertNotTerminated() val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes Dataset.ofRows(self, LocalRelation.fromProduct(attributeSeq, data)) @@ -373,7 +341,6 @@ class SparkSession private( */ @DeveloperApi def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = withActive { - assertNotTerminated() // TODO: use MutableProjection when rowRDD is another DataFrame and the applied // schema differs from the existing schema on any field data type. val encoder = RowEncoder(schema) @@ -392,7 +359,6 @@ class SparkSession private( */ @DeveloperApi def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = { - assertNotTerminated() createDataFrame(rowRDD.rdd, schema) } @@ -406,7 +372,6 @@ class SparkSession private( */ @DeveloperApi def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = withActive { - assertNotTerminated() Dataset.ofRows(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala)) } @@ -419,7 +384,6 @@ class SparkSession private( * @since 2.0.0 */ def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = withActive { - assertNotTerminated() val attributeSeq: Seq[AttributeReference] = getSchema(beanClass) val className = beanClass.getName val rowRdd = rdd.mapPartitions { iter => @@ -438,7 +402,6 @@ class SparkSession private( * @since 2.0.0 */ def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = { - assertNotTerminated() createDataFrame(rdd.rdd, beanClass) } @@ -450,7 +413,6 @@ class SparkSession private( * @since 1.6.0 */ def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame = withActive { - assertNotTerminated() val attrSeq = getSchema(beanClass) val rows = SQLContext.beansToRows(data.asScala.iterator, beanClass, attrSeq) Dataset.ofRows(self, LocalRelation(attrSeq, rows.toSeq)) @@ -462,7 +424,6 @@ class SparkSession private( * @since 2.0.0 */ def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { - assertNotTerminated() Dataset.ofRows(self, LogicalRelation(baseRelation)) } @@ -497,9 +458,7 @@ class SparkSession private( * * @since 2.0.0 */ - def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = { - assertNotTerminated() val enc = encoderFor[T] val toRow = enc.createSerializer() val attributes = enc.schema.toAttributes @@ -508,7 +467,6 @@ class SparkSession private( Dataset[T](self, plan) } - /** * Creates a [[Dataset]] from an RDD of a given type. This method requires an * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL representation) @@ -518,7 +476,6 @@ class SparkSession private( * @since 2.0.0 */ def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = { - assertNotTerminated() Dataset[T](self, ExternalRDD(data, self)) } @@ -538,7 +495,6 @@ class SparkSession private( * @since 2.0.0 */ def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T] = { - assertNotTerminated() createDataset(data.asScala) } @@ -548,10 +504,7 @@ class SparkSession private( * * @since 2.0.0 */ - def range(end: Long): Dataset[java.lang.Long] = { - assertNotTerminated() - range(0, end) - } + def range(end: Long): Dataset[java.lang.Long] = range(0, end) /** * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements @@ -560,7 +513,6 @@ class SparkSession private( * @since 2.0.0 */ def range(start: Long, end: Long): Dataset[java.lang.Long] = { - assertNotTerminated() range(start, end, step = 1, numPartitions = sparkContext.defaultParallelism) } @@ -571,7 +523,6 @@ class SparkSession private( * @since 2.0.0 */ def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = { - assertNotTerminated() range(start, end, step, numPartitions = sparkContext.defaultParallelism) } @@ -583,7 +534,6 @@ class SparkSession private( * @since 2.0.0 */ def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] = { - assertNotTerminated() new Dataset(self, Range(start, end, step, numPartitions), Encoders.LONG) } @@ -594,7 +544,6 @@ class SparkSession private( catalystRows: RDD[InternalRow], schema: StructType, isStreaming: Boolean = false): DataFrame = { - assertNotTerminated() // TODO: use MutableProjection when rowRDD is another DataFrame and the applied // schema differs from the existing schema on any field data type. val logicalPlan = LogicalRDD( @@ -628,17 +577,14 @@ class SparkSession private( * @since 2.0.0 */ def table(tableName: String): DataFrame = { - assertNotTerminated() table(sessionState.sqlParser.parseMultipartIdentifier(tableName)) } private[sql] def table(multipartIdentifier: Seq[String]): DataFrame = { - assertNotTerminated() Dataset.ofRows(self, UnresolvedRelation(multipartIdentifier)) } private[sql] def table(tableIdent: TableIdentifier): DataFrame = { - assertNotTerminated() Dataset.ofRows(self, UnresolvedRelation(tableIdent)) } @@ -653,7 +599,6 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = withActive { - assertNotTerminated() val tracker = new QueryPlanningTracker val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) { sessionState.sqlParser.parsePlan(sqlText) @@ -678,7 +623,6 @@ class SparkSession private( */ @Unstable def executeCommand(runner: String, command: String, options: Map[String, String]): DataFrame = { - assertNotTerminated() DataSource.lookupDataSource(runner, sessionState.conf) match { case source if classOf[ExternalCommandRunner].isAssignableFrom(source) => Dataset.ofRows(self, ExternalCommandExecutor( @@ -699,10 +643,7 @@ class SparkSession private( * * @since 2.0.0 */ - def read: DataFrameReader = { - assertNotTerminated() - new DataFrameReader(self) - } + def read: DataFrameReader = new DataFrameReader(self) /** * Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`. @@ -713,10 +654,7 @@ class SparkSession private( * * @since 2.0.0 */ - def readStream: DataStreamReader = { - assertNotTerminated() - new DataStreamReader(self) - } + def readStream: DataStreamReader = new DataStreamReader(self) /** * Executes some code block and prints to stdout the time taken to execute the block. This is @@ -752,56 +690,21 @@ class SparkSession private( } // scalastyle:on - /** - * Lifecycle method that cleans up state of spark session, and mark session - * "ended" forever. This differs from `stop()` or `stopContext()` as it keeps - * the underlying `SparkContext` alive, while only getting - */ - - def terminate(): Unit = { - // Session is still active - if (!_terminated.get()) { - sparkContext.removeSparkListener(this._sessionListener) - SparkSession.removeTerminatedSession(_sessionId) - _terminated.set(true) - } - else { - throw new IllegalStateException( - """Cannot call methods on a terminated SparkSession. Call - |getOrCreate() to create a new session. - |""".stripMargin) - } - } - /** * Stop the underlying `SparkContext`. * * @since 2.0.0 */ - def stopContext(): Unit = { - if (!_terminated.get()) { - // stopping the context should also terminate this session - terminate() - SparkSession.removeTerminatedSession(_sessionId) - SparkSession.clearDefaultSession() - SparkSession.clearActiveSession() - } + def stop(): Unit = { sparkContext.stop() } - /** - * Synonym for `stopContext()`. - * - * @since 2.0.0 - */ - def stop(): Unit = stopContext() - /** * Synonym for `stop()`. * * @since 2.1.0 */ - override def close(): Unit = stopContext() + override def close(): Unit = stop() /** * Parses the data type in our internal string representation. The data type string should @@ -981,6 +884,10 @@ object SparkSession extends Logging { this } + def registerListenerIfUnregistered(session: SparkSession): Unit = { + session.sparkContext.registerSessionListener(_sessionListener) + } + /** * Gets an existing [[SparkSession]] or, if there is no existing one, creates a new * one based on the options set in this builder. @@ -1007,7 +914,7 @@ object SparkSession extends Logging { // Global synchronization so we will only set the default session once. SparkSession.synchronized { - // If the current thread does not have an active session, get it from the default session. + // If the current thread does not have an active session, get it from the global session. session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { applyModifiableSettings(session) @@ -1036,6 +943,7 @@ object SparkSession extends Logging { options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) } setDefaultSession(session) setActiveSession(session) + registerListenerIfUnregistered(session) } return session @@ -1065,23 +973,6 @@ object SparkSession extends Logging { */ def builder(): Builder = new Builder - def removeTerminatedSession(sessionId: UUID): Unit = { - // clean up active session - if (getActiveSession.isDefined) { - val activeSession = getActiveSession.get - if(sessionId.equals(activeSession._sessionId)) { - clearActiveSession() - } - } - // clean up default session - if (getDefaultSession.isDefined) { - val defaultSession = getDefaultSession.get - if(sessionId.equals(defaultSession._sessionId)) { - clearDefaultSession() - } - } - } - /** * Changes the SparkSession that will be returned in this thread and its children when * SparkSession.getOrCreate() is called. This can be used to ensure that a given thread receives @@ -1167,6 +1058,12 @@ object SparkSession extends Logging { //////////////////////////////////////////////////////////////////////////////////////// // Private methods from now on //////////////////////////////////////////////////////////////////////////////////////// + /** Default listener on SparkContext */ + private val _sessionListener: SparkListener = new SparkListener { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + defaultSession.set(null) + } + } /** The active SparkSession for the current thread. */ private val activeThreadSession = new InheritableThreadLocal[SparkSession] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 0520b0c06013d..218a59b5f2f22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -154,18 +154,19 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { } } - test("SPARK-27958: Terminating a SparkSession") { - val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1") - val newSC = new SparkContext(conf) - val session1 = SparkSession.builder().sparkContext(newSC).master("local").getOrCreate() - assert(!session1.sparkContext.isStopped) - assert(SparkSession.getActiveSession.isDefined) - session1.emptyDataFrame - session1.terminate() - assertThrows[IllegalStateException] { - session1.emptyDataFrame - } - assert(SparkSession.getActiveSession.isEmpty) + test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test-app-SPARK-XXXXX-1") + val context = new SparkContext(conf) + assert(!context.isSessionListenerRegistered) + val preSessionCreation = context.listenerBus.listeners.size() + val session1 = SparkSession.builder() + .master("local") + .sparkContext(context) + .getOrCreate() + assert(session1.sparkContext.listenerBus.listeners.size() == preSessionCreation + 1) + assert(session1.sparkContext.isSessionListenerRegistered) } test("SPARK-31234: RESET command will not change static sql configs and " + From 0284c794b9a10113a9cb3e775d0096b7bbcd25ba Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Tue, 12 May 2020 17:12:31 -0400 Subject: [PATCH 14/18] remove unnecessary import --- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index b353b6d4a0b0b..0e10eccf5a67c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.io.Closeable import java.util.concurrent.TimeUnit._ -import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag From 9573805aec6772cedaeb321b27d444745d0bdc38 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Wed, 13 May 2020 08:29:35 -0400 Subject: [PATCH 15/18] add ticket number to test --- .../scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 218a59b5f2f22..5a41de1c14865 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -157,7 +157,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") { val conf = new SparkConf() .setMaster("local") - .setAppName("test-app-SPARK-XXXXX-1") + .setAppName("test-app-SPARK-31354-1") val context = new SparkContext(conf) assert(!context.isSessionListenerRegistered) val preSessionCreation = context.listenerBus.listeners.size() From e5563a71b93f9f5295fff57d5a0f29a234b5f281 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Tue, 19 May 2020 12:04:43 -0400 Subject: [PATCH 16/18] move logic to listener --- .../scala/org/apache/spark/SparkContext.scala | 10 ---------- .../org/apache/spark/sql/SparkSession.scala | 20 +++++++++++++------ .../spark/sql/SparkSessionBuilderSuite.scala | 15 -------------- 3 files changed, 14 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f176a9fb6004e..5c92527b7b80e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -91,7 +91,6 @@ class SparkContext(config: SparkConf) extends Logging { val startTime = System.currentTimeMillis() private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false) - private[spark] val sessionListenerRegistered: AtomicBoolean = new AtomicBoolean(false) private[spark] def assertNotStopped(): Unit = { if (stopped.get()) { @@ -257,15 +256,6 @@ class SparkContext(config: SparkConf) extends Logging { */ def isStopped: Boolean = stopped.get() - def isSessionListenerRegistered: Boolean = sessionListenerRegistered.get() - - def registerSessionListener(listener: SparkListenerInterface) { - if (!isSessionListenerRegistered) { - addSparkListener(listener) - sessionListenerRegistered.set(true) - } - } - private[spark] def statusStore: AppStatusStore = _statusStore // An asynchronous listener bus for Spark events diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 0e10eccf5a67c..b616e959a4c0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.io.Closeable import java.util.concurrent.TimeUnit._ -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag @@ -884,10 +884,6 @@ object SparkSession extends Logging { this } - def registerListenerIfUnregistered(session: SparkSession): Unit = { - session.sparkContext.registerSessionListener(_sessionListener) - } - /** * Gets an existing [[SparkSession]] or, if there is no existing one, creates a new * one based on the options set in this builder. @@ -943,12 +939,19 @@ object SparkSession extends Logging { options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) } setDefaultSession(session) setActiveSession(session) - registerListenerIfUnregistered(session) + registerSessionListenerOnContext(sparkContext) } return session } + private def registerSessionListenerOnContext(sparkContext: SparkContext): Unit = { + if (!SparkSession.sessionListenerRegistered.get()) { + sparkContext.addSparkListener(_sessionListener) + SparkSession.sessionListenerRegistered.set(true) + } + } + private def applyModifiableSettings(session: SparkSession): Unit = { val (staticConfs, otherConfs) = options.partition(kv => SQLConf.staticConfKeys.contains(kv._1)) @@ -1065,6 +1068,9 @@ object SparkSession extends Logging { } } + /** Whether the app end listener has been registered on the context */ + private val sessionListenerRegistered: AtomicBoolean = new AtomicBoolean(false) + /** The active SparkSession for the current thread. */ private val activeThreadSession = new InheritableThreadLocal[SparkSession] @@ -1081,6 +1087,8 @@ object SparkSession extends Logging { } } + private[spark] def isSessionListenerRegistered: Boolean = sessionListenerRegistered.get + private def assertOnDriver(): Unit = { if (Utils.isTesting && TaskContext.get != null) { // we're accessing it during task execution, fail. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 5a41de1c14865..7b76d0702d835 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -154,21 +154,6 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { } } - test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") { - val conf = new SparkConf() - .setMaster("local") - .setAppName("test-app-SPARK-31354-1") - val context = new SparkContext(conf) - assert(!context.isSessionListenerRegistered) - val preSessionCreation = context.listenerBus.listeners.size() - val session1 = SparkSession.builder() - .master("local") - .sparkContext(context) - .getOrCreate() - assert(session1.sparkContext.listenerBus.listeners.size() == preSessionCreation + 1) - assert(session1.sparkContext.isSessionListenerRegistered) - } - test("SPARK-31234: RESET command will not change static sql configs and " + "spark context conf values in SessionState") { val session = SparkSession.builder() From e5ef33a2096cb17129f7797cb6c740fd5619bd77 Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Wed, 20 May 2020 14:20:24 -0400 Subject: [PATCH 17/18] adding test and cleanup --- .../org/apache/spark/sql/SparkSession.scala | 30 ++++++++----------- .../spark/sql/SparkSessionBuilderSuite.scala | 25 ++++++++++++++++ 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index b616e959a4c0c..e12805ffc4640 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -939,19 +939,12 @@ object SparkSession extends Logging { options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) } setDefaultSession(session) setActiveSession(session) - registerSessionListenerOnContext(sparkContext) + registerContextListener(sparkContext) } return session } - private def registerSessionListenerOnContext(sparkContext: SparkContext): Unit = { - if (!SparkSession.sessionListenerRegistered.get()) { - sparkContext.addSparkListener(_sessionListener) - SparkSession.sessionListenerRegistered.set(true) - } - } - private def applyModifiableSettings(session: SparkSession): Unit = { val (staticConfs, otherConfs) = options.partition(kv => SQLConf.staticConfKeys.contains(kv._1)) @@ -1061,16 +1054,21 @@ object SparkSession extends Logging { //////////////////////////////////////////////////////////////////////////////////////// // Private methods from now on //////////////////////////////////////////////////////////////////////////////////////// - /** Default listener on SparkContext */ - private val _sessionListener: SparkListener = new SparkListener { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - defaultSession.set(null) + + private val listenerRegistered: AtomicBoolean = new AtomicBoolean(false) + + /** Register the AppEnd listener onto the Context */ + private def registerContextListener(sparkContext: SparkContext): Unit = { + if (!SparkSession.listenerRegistered.get()) { + sparkContext.addSparkListener(new SparkListener { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + defaultSession.set(null) + } + }) + SparkSession.listenerRegistered.set(true) } } - /** Whether the app end listener has been registered on the context */ - private val sessionListenerRegistered: AtomicBoolean = new AtomicBoolean(false) - /** The active SparkSession for the current thread. */ private val activeThreadSession = new InheritableThreadLocal[SparkSession] @@ -1087,8 +1085,6 @@ object SparkSession extends Logging { } } - private[spark] def isSessionListenerRegistered: Boolean = sessionListenerRegistered.get - private def assertOnDriver(): Unit = { if (Utils.isTesting && TaskContext.get != null) { // we're accessing it during task execution, fail. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 7b76d0702d835..0a522fdbdeed8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -169,6 +169,31 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234") } + test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test-app-SPARK-31354-1") + val context = new SparkContext(conf) + SparkSession + .builder() + .sparkContext(context) + .master("local") + .getOrCreate() + val postFirstCreation = context.listenerBus.listeners.size() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + + SparkSession + .builder() + .sparkContext(context) + .master("local") + .getOrCreate() + val postSecondCreation = context.listenerBus.listeners.size() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + assert(postFirstCreation == postSecondCreation) + } + test("SPARK-31532: should not propagate static sql configs to the existing" + " active/default SparkSession") { val session = SparkSession.builder() From cfa14626fccd4ff2ab0e7175b8457b582c32017f Mon Sep 17 00:00:00 2001 From: Vinoo Ganesh Date: Thu, 21 May 2020 07:36:04 -0400 Subject: [PATCH 18/18] same class, don't need SparkSession --- .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index e12805ffc4640..60a60377d8a3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1059,13 +1059,13 @@ object SparkSession extends Logging { /** Register the AppEnd listener onto the Context */ private def registerContextListener(sparkContext: SparkContext): Unit = { - if (!SparkSession.listenerRegistered.get()) { + if (!listenerRegistered.get()) { sparkContext.addSparkListener(new SparkListener { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { defaultSession.set(null) } }) - SparkSession.listenerRegistered.set(true) + listenerRegistered.set(true) } }