Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ class SparkEnv (
// Get or Else synchronized to protect
// against concurrent creation requests.
udfDispatcherManager.getOrElse {
createUDFDispatcherManager()
val created = createUDFDispatcherManager()
udfDispatcherManager = Some(created)
created
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.annotation.Experimental
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.sql.catalyst.expressions.{Attribute,
ExternalUserDefinedFunction}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
Expand Down Expand Up @@ -46,13 +47,15 @@ trait ExternalUDF extends UnaryNode {
* @param function The UDF to invoke. Output attributes are
* derived from `function.dataType`.
* @param isBarrier Whether to use barrier execution.
* @param profile Optional resource profile for the UDF execution.
* @param child Input relation whose partitions are processed.
*/
@Experimental
case class MapPartitionsExternalUDF(
workerSpec: UDFWorkerSpecification,
function: ExternalUserDefinedFunction,
isBarrier: Boolean,
profile: Option[ResourceProfile],
child: LogicalPlan)
extends ExternalUDF {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,10 +973,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.MapInArrow(func, output, child, isBarrier, profile) =>
execution.python.MapInArrowExec(func, output, planLater(child), isBarrier, profile) :: Nil
case logical.MapPartitionsExternalUDF(
workerSpec, functionExpr, isBarrier, child) =>
workerSpec, functionExpr, isBarrier, profile, child) =>
execution.externalUDF.MapPartitionsExternalUDFExec(
workerSpec, functionExpr,
isBarrier, planLater(child)) :: Nil
isBarrier, profile, planLater(child)) :: Nil
case logical.AttachDistributedSequence(attr, child, cache) =>
execution.python.AttachDistributedSequenceExec(attr, planLater(child), cache) :: Nil
case logical.PythonWorkerLogs(jsonAttr) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ trait ExternalUDFExec extends UnaryExecNode {
* Creates a [[WorkerSession]] via [[SparkEnv#getExternalUDFDispatcher]].
* Registers session cancellation on task failure and session termination
* on task completion. The provided function receives the session
* and must return the result iterator. The function CAN but MUST NOT
* cancel or close the session.
* and must return the result iterator. The function may use the
* session but MUST NOT cancel or close it.
*/
protected def withUDFWorkerSession(
taskContext: TaskContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class UnifiedExternalUDFPlanner(
children = Seq.empty,
udfDeterministic = pythonUdf.udfDeterministic,
udfNullable = true)
MapPartitionsExternalUDF(workerSpec, udf, isBarrier, child)
MapPartitionsExternalUDF(workerSpec, udf, isBarrier, profile, child)
}

override def planPythonMapInArrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.externalUDF
import org.apache.spark.TaskContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{
Attribute,
Expand All @@ -36,16 +37,17 @@ import org.apache.spark.udf.worker.UDFWorkerSpecification
* external worker process.
*
* @param workerSpec Specification describing the UDF worker.
* @param functionExpr The UDF to invoke.
* @param function The UDF to invoke.
* @param isBarrier Whether the UDF should be invoked using barrier execution.
* @param resultAttributes Output attributes produced by the UDF.
* @param profile Optional resource profile for the UDF execution.
* @param child Child plan providing input partitions.
*/
@Experimental
case class MapPartitionsExternalUDFExec(
workerSpec: UDFWorkerSpecification,
function: ExternalUserDefinedFunction,
isBarrier: Boolean,
profile: Option[ResourceProfile],
child: SparkPlan)
extends ExternalUDFExec {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import org.apache.spark.udf.worker.UDFWorkerSpecification
* [[UDFWorkerSpecification]] (protobuf value equality).
*
* Callers obtain a dispatcher via [[getDispatcher]] and create
* sessions on it directly. On [[stop]], all cached dispatchers
* sessions on it directly. On [[close]], all cached dispatchers
* are closed -- dispatchers are responsible for cleaning up
* their own sessions.
*
* Thread safety: a [[ReentrantReadWriteLock]] allows concurrent
* [[getDispatcher]] calls (read lock) while [[stop]] has
* [[getDispatcher]] calls (read lock) while [[close]] has
* exclusive access (write lock).
*/
@Experimental
Expand All @@ -46,7 +46,7 @@ class UDFDispatcherManager(

// Guarded by `rwLock`. The read lock is used by getDispatcher
// (with upgrade when a new dispatcher must be added) and the
// write lock is used by stop.
// write lock is used by close.
private val rwLock = new ReentrantReadWriteLock()
private val dispatchers =
new HashMap[UDFWorkerSpecification, WorkerDispatcher]()
Expand All @@ -63,7 +63,6 @@ class UDFDispatcherManager(
try {
if (closed) throwClosed()

// Reading existing dispatcher = quick path
val dispatcher = dispatchers.get(workerSpec)
if (dispatcher != null) {
return dispatcher
Expand Down Expand Up @@ -93,7 +92,7 @@ class UDFDispatcherManager(
}

private def throwClosed(): Nothing =
throw new IllegalStateException("UDFDispatcherManager is stopped")
throw new IllegalStateException("UDFDispatcherManager is closed")

/**
* Closes all cached dispatchers and resets internal state.
Expand Down