Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
18ce1b8
Add capability to inherit SessionState (SQL conf, temp tables, regist…
kunalkhamar Feb 3, 2017
9beb78d
Add tests for forking new session with inherit config enabled. Update…
kunalkhamar Feb 6, 2017
a343d8a
Fix constructor default args for bytecode compatibility.
kunalkhamar Feb 6, 2017
4210079
Incorporate feedback. Fix association of incorrect SparkSession while…
kunalkhamar Feb 10, 2017
6da6bda
Update spark version. Rename clone to copy, in order to avoid Java Ob…
kunalkhamar Feb 10, 2017
579d0b7
Make lazy vals strict.
kunalkhamar Feb 14, 2017
2837e73
Refactor SessionState to remove passing of base SessionState, and ini…
kunalkhamar Feb 16, 2017
8c00344
Remove unused import.
kunalkhamar Feb 16, 2017
f423f74
Remove SparkSession reference from SessionState.
kunalkhamar Feb 16, 2017
b1371d8
Merge branch 'master' into fork-sparksession
kunalkhamar Feb 16, 2017
2cee190
Fix initialization loop.
kunalkhamar Feb 17, 2017
e2bbfa8
Fix var name error.
kunalkhamar Feb 17, 2017
8ac778a
Add tests. Refactor. Temporarily disable subtest SPARK-18360: default…
kunalkhamar Feb 18, 2017
0c732ce
Merge branch 'master' into fork-sparksession
kunalkhamar Feb 21, 2017
3c995e1
Fix copy of SessionCatalog. Changes from review.
kunalkhamar Feb 21, 2017
292011a
Merge branch 'fork-sparksession' of github.com:kunalkhamar/spark into…
kunalkhamar Feb 21, 2017
b027412
Merge branch 'master' into fork-sparksession
kunalkhamar Feb 21, 2017
295ee41
Add synchronized blocks. Ignore hive metastore tests for now.
kunalkhamar Feb 21, 2017
847b484
Merge branch 'fork-sparksession' of github.com:kunalkhamar/spark into…
kunalkhamar Feb 21, 2017
9beba84
Add tests. Force copy of session state on cloneSession.
kunalkhamar Feb 22, 2017
3d2e4a6
Rename copy to clone() to work around copy method of case classes. Mo…
kunalkhamar Feb 22, 2017
4f70d12
Fix HiveSessionState clone.
kunalkhamar Feb 22, 2017
dd2dedd
Add tests for HiveSessionState. Review feedback.
kunalkhamar Feb 23, 2017
8a8d47b
Simplify TestSQLContext. Review feedback.
kunalkhamar Feb 23, 2017
ffc2058
(attempt to) Fix tests.
kunalkhamar Feb 24, 2017
16824f9
Review.
kunalkhamar Feb 24, 2017
fd11ee2
Update test case.
kunalkhamar Feb 24, 2017
437b0bc
Add throwing exception if wrong SessionState clone is called. Update …
kunalkhamar Feb 25, 2017
300d3a0
Most of the changes from review.
kunalkhamar Feb 28, 2017
3ee271f
All but one review feedback.
kunalkhamar Mar 6, 2017
c3f052f
Merge remote-tracking branch 'origin/master' into pr16826
zsxwing Mar 6, 2017
0bdc81c
Merge remote-tracking branch 'origin/master' into pr16826
zsxwing Mar 6, 2017
2740c63
Fix tests
zsxwing Mar 6, 2017
0f167db
Clean up tests
zsxwing Mar 6, 2017
2f0b1ad
fix SessionCatalogSuite
zsxwing Mar 7, 2017
c41e7bc
More cleanup
zsxwing Mar 7, 2017
5eb6733
More tests
zsxwing Mar 7, 2017
05abcf8
Update tests and a param comment.
kunalkhamar Mar 7, 2017
4c23e7a
Merge branch 'master' into fork-sparksession
kunalkhamar Mar 8, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make lazy vals strict.
  • Loading branch information
kunalkhamar committed Feb 14, 2017
commit 579d0b77738e2de53c06725e55f6a0de905325a5
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ private[sql] class SessionState(
this(sparkSession, None)
}

// Note: These are all lazy vals because they depend on each other (e.g. conf) and we
// want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.
// Note: Many of these vals depend on each other (e.g. conf) and should be initialized
// with an early initializer if we want subclasses to override some of the fields.
// Otherwise, we would get a lot of NPEs.

/**
* SQL-specific key-value configurations.
*/
lazy val conf: SQLConf = {
val conf: SQLConf = {
parentSessionState.map(_.conf.copy).getOrElse(new SQLConf)
}

Expand All @@ -74,7 +75,7 @@ private[sql] class SessionState(
hadoopConf
}

lazy val experimentalMethods: ExperimentalMethods = {
val experimentalMethods: ExperimentalMethods = {
parentSessionState
.map(_.experimentalMethods.copy)
.getOrElse(new ExperimentalMethods)
Expand All @@ -83,14 +84,14 @@ private[sql] class SessionState(
/**
* Internal catalog for managing functions registered by the user.
*/
lazy val functionRegistry: FunctionRegistry = {
val functionRegistry: FunctionRegistry = {
parentSessionState.map(_.functionRegistry.copy).getOrElse(FunctionRegistry.builtin.copy)
}

/**
* A class for loading resources specified by a function.
*/
lazy val functionResourceLoader: FunctionResourceLoader = {
val functionResourceLoader: FunctionResourceLoader = {
new FunctionResourceLoader {
override def loadResource(resource: FunctionResource): Unit = {
resource.resourceType match {
Expand All @@ -108,7 +109,7 @@ private[sql] class SessionState(
/**
* Internal catalog for managing table and database states.
*/
lazy val catalog: SessionCatalog = {
val catalog: SessionCatalog = {
parentSessionState
.map(_.catalog.copy)
.getOrElse(new SessionCatalog(
Expand All @@ -125,12 +126,12 @@ private[sql] class SessionState(
* Interface exposed to the user for registering user-defined functions.
* Note that the user-defined functions must be deterministic.
*/
lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry)
val udf: UDFRegistration = new UDFRegistration(functionRegistry)

/**
* Logical query plan analyzer for resolving unresolved attributes and relations.
*/
lazy val analyzer: Analyzer = {
val analyzer: Analyzer = {
new Analyzer(catalog, conf) {
override val extendedResolutionRules =
new FindDataSourceTable(sparkSession) ::
Expand All @@ -149,12 +150,12 @@ private[sql] class SessionState(
/**
* Logical query plan optimizer.
*/
lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)
val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)

/**
* Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
*/
lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
val sqlParser: ParserInterface = new SparkSqlParser(conf)

/**
* Planner that converts optimized logical plans to physical plans.
Expand All @@ -166,12 +167,12 @@ private[sql] class SessionState(
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
* that listen for execution metrics.
*/
lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
val listenerManager: ExecutionListenerManager = new ExecutionListenerManager

/**
* Interface to start and stop [[StreamingQuery]]s.
*/
lazy val streamingQueryManager: StreamingQueryManager = {
val streamingQueryManager: StreamingQueryManager = {
new StreamingQueryManager(sparkSession)
}

Expand All @@ -187,8 +188,8 @@ private[sql] class SessionState(
/**
* Get an identical copy of the `SessionState` and associate it with the given `SparkSession`
*/
def copy(sc: SparkSession): SessionState = {
new SessionState(sc, Some(this))
def copy(associatedSparkSession: SparkSession): SessionState = {
new SessionState(associatedSparkSession, Some(this))
}

// ------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ private[sql] class TestSparkSession(sc: SparkContext) extends SparkSession(sc) {
this(new SparkConf)
}

@transient
protected[sql] override lazy val sessionState: SessionState = new SessionState(self) {
override lazy val conf: SQLConf = {
class TestSessionState(sparkSession: SparkSession) extends {
override val conf: SQLConf = {
new SQLConf {
clear()
override def clear(): Unit = {
Expand All @@ -46,7 +45,10 @@ private[sql] class TestSparkSession(sc: SparkContext) extends SparkSession(sc) {
}
}
}
}
} with SessionState(sparkSession)

@transient
protected[sql] override lazy val sessionState: SessionState = new TestSessionState(this)

// Needed for Java tests
def loadTestData(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,29 @@ import org.apache.spark.sql.internal.SessionState
/**
* A class that holds all session-specific state in a given [[SparkSession]] backed by Hive.
*/
private[hive] class HiveSessionState(sparkSession: SparkSession)
extends SessionState(sparkSession) {
private[hive] class HiveSessionState(
sparkSession: SparkSession,
parentHiveSessionState: Option[HiveSessionState])
extends SessionState(sparkSession, parentHiveSessionState) {

self =>

private[hive] def this(associatedSparkSession: SparkSession) = {
this(associatedSparkSession, None)
}

/**
* A Hive client used for interacting with the metastore.
*/
lazy val metadataHive: HiveClient =
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.newSession()
val metadataHive: HiveClient =
parentHiveSessionState.map(_.metadataHive).getOrElse(
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
.newSession())

/**
* Internal catalog for managing table and database states.
*/
override lazy val catalog = {
override val catalog = {
new HiveSessionCatalog(
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
sparkSession.sharedState.globalTempViewManager,
Expand All @@ -57,7 +65,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
/**
* An analyzer that uses the Hive metastore.
*/
override lazy val analyzer: Analyzer = {
override val analyzer: Analyzer = {
new Analyzer(catalog, conf) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
Expand Down Expand Up @@ -147,4 +155,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
}

override def copy(associatedSparkSession: SparkSession): HiveSessionState = {
new HiveSessionState(associatedSparkSession, Some(this.asInstanceOf[HiveSessionState]))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to style guide, these are neither encouraged nor discouraged.
To be consistent, will delete all extra lines after last member of class/object.

}