diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 9c4abdf665796..39f12e03a9336 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -159,7 +159,9 @@ class SparkEnv ( // Get or Else synchronized to protect // against concurrent creation requests. udfDispatcherManager.getOrElse { - createUDFDispatcherManager() + val created = createUDFDispatcherManager() + udfDispatcherManager = Some(created) + created } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/logicalExternalUDFOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/logicalExternalUDFOperators.scala index 84163e7350f3a..f29c90772d4f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/logicalExternalUDFOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/logicalExternalUDFOperators.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.annotation.Experimental +import org.apache.spark.resource.ResourceProfile import org.apache.spark.sql.catalyst.expressions.{Attribute, ExternalUserDefinedFunction} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes @@ -46,6 +47,7 @@ trait ExternalUDF extends UnaryNode { * @param function The UDF to invoke. Output attributes are * derived from `function.dataType`. * @param isBarrier Whether to use barrier execution. + * @param profile Optional resource profile for the UDF execution. * @param child Input relation whose partitions are processed. */ @Experimental @@ -53,6 +55,7 @@ case class MapPartitionsExternalUDF( workerSpec: UDFWorkerSpecification, function: ExternalUserDefinedFunction, isBarrier: Boolean, + profile: Option[ResourceProfile], child: LogicalPlan) extends ExternalUDF { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 455933d8e085e..1729e2dd0dfc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -973,10 +973,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.MapInArrow(func, output, child, isBarrier, profile) => execution.python.MapInArrowExec(func, output, planLater(child), isBarrier, profile) :: Nil case logical.MapPartitionsExternalUDF( - workerSpec, functionExpr, isBarrier, child) => + workerSpec, functionExpr, isBarrier, profile, child) => execution.externalUDF.MapPartitionsExternalUDFExec( workerSpec, functionExpr, - isBarrier, planLater(child)) :: Nil + isBarrier, profile, planLater(child)) :: Nil case logical.AttachDistributedSequence(attr, child, cache) => execution.python.AttachDistributedSequenceExec(attr, planLater(child), cache) :: Nil case logical.PythonWorkerLogs(jsonAttr) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/ExternalUDFExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/ExternalUDFExec.scala index 541b03c52a0b1..883e7dce88b10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/ExternalUDFExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/ExternalUDFExec.scala @@ -62,8 +62,8 @@ trait ExternalUDFExec extends UnaryExecNode { * Creates a [[WorkerSession]] via [[SparkEnv#getExternalUDFDispatcher]]. * Registers session cancellation on task failure and session termination * on task completion. The provided function receives the session - * and must return the result iterator. The function CAN but MUST NOT - * cancel or close the session. + * and must return the result iterator. The function may use the + * session but MUST NOT cancel or close it. */ protected def withUDFWorkerSession( taskContext: TaskContext, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/ExternalUDFPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/ExternalUDFPlanner.scala index 589fa3e1696b1..c90a8af89783f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/ExternalUDFPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/ExternalUDFPlanner.scala @@ -110,7 +110,7 @@ class UnifiedExternalUDFPlanner( children = Seq.empty, udfDeterministic = pythonUdf.udfDeterministic, udfNullable = true) - MapPartitionsExternalUDF(workerSpec, udf, isBarrier, child) + MapPartitionsExternalUDF(workerSpec, udf, isBarrier, profile, child) } override def planPythonMapInArrow( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/MapPartitionsExternalUDFExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/MapPartitionsExternalUDFExec.scala index 01d27ba115db6..dbb5bfef007e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/MapPartitionsExternalUDFExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/MapPartitionsExternalUDFExec.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.externalUDF import org.apache.spark.TaskContext import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD +import org.apache.spark.resource.ResourceProfile import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ Attribute, @@ -36,9 +37,9 @@ import org.apache.spark.udf.worker.UDFWorkerSpecification * external worker process. * * @param workerSpec Specification describing the UDF worker. - * @param functionExpr The UDF to invoke. + * @param function The UDF to invoke. * @param isBarrier Whether the UDF should be invoked using barrier execution. - * @param resultAttributes Output attributes produced by the UDF. + * @param profile Optional resource profile for the UDF execution. * @param child Child plan providing input partitions. */ @Experimental @@ -46,6 +47,7 @@ case class MapPartitionsExternalUDFExec( workerSpec: UDFWorkerSpecification, function: ExternalUserDefinedFunction, isBarrier: Boolean, + profile: Option[ResourceProfile], child: SparkPlan) extends ExternalUDFExec { diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/UDFDispatcherManager.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/UDFDispatcherManager.scala index cf3d36a7c5d7b..7fd624b9d0d8e 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/UDFDispatcherManager.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/UDFDispatcherManager.scala @@ -30,12 +30,12 @@ import org.apache.spark.udf.worker.UDFWorkerSpecification * [[UDFWorkerSpecification]] (protobuf value equality). * * Callers obtain a dispatcher via [[getDispatcher]] and create - * sessions on it directly. On [[stop]], all cached dispatchers + * sessions on it directly. On [[close]], all cached dispatchers * are closed -- dispatchers are responsible for cleaning up * their own sessions. * * Thread safety: a [[ReentrantReadWriteLock]] allows concurrent - * [[getDispatcher]] calls (read lock) while [[stop]] has + * [[getDispatcher]] calls (read lock) while [[close]] has * exclusive access (write lock). */ @Experimental @@ -46,7 +46,7 @@ class UDFDispatcherManager( // Guarded by `rwLock`. The read lock is used by getDispatcher // (with upgrade when a new dispatcher must be added) and the - // write lock is used by stop. + // write lock is used by close. private val rwLock = new ReentrantReadWriteLock() private val dispatchers = new HashMap[UDFWorkerSpecification, WorkerDispatcher]() @@ -63,7 +63,6 @@ class UDFDispatcherManager( try { if (closed) throwClosed() - // Reading existing dispatcher = quick path val dispatcher = dispatchers.get(workerSpec) if (dispatcher != null) { return dispatcher @@ -93,7 +92,7 @@ class UDFDispatcherManager( } private def throwClosed(): Nothing = - throw new IllegalStateException("UDFDispatcherManager is stopped") + throw new IllegalStateException("UDFDispatcherManager is closed") /** * Closes all cached dispatchers and resets internal state.