From 0240cca2cbbcc6073dddab01ec7e6109cc6ead11 Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Sun, 9 Feb 2025 20:06:25 +0000 Subject: [PATCH] [SPARK-51136][HISTORYSERVER] Set CallerContext for History Server ### What changes were proposed in this pull request? Initialize the Hadoop RPC `CallerContext` during History Server startup, before `FileSystem` access. Calls to HDFS will get tagged in the audit log as originating from the History Server. ### Why are the changes needed? Other YARN-based Spark processes set the `CallerContext`, so that additional auditing context propagates in Hadoop RPC calls. This PR provides auditing context for calls from the History Server. Other callers provide additional information like app ID, attempt ID, etc. We don't provide that here through History Server, which serves multiple apps/attempts. ### Does this PR introduce _any_ user-facing change? Yes. In environments that configure `hadoop.caller.context.enabled=true`, users will now see additional information in the HDFS audit logs explicitly stating that calls originated from the History Server. ### How was this patch tested? A new unit test has been added. All tests pass in the history package. ``` build/mvn -pl core test -Dtest=none -DmembersOnlySuites=org.apache.spark.deploy.history ``` When the changes are deployed to a running cluster, the new caller context is visible in the HDFS audit logs. ``` 2025-02-07 23:00:54,657 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=spark (auth:SIMPLE) ip=/10.240.5.205 cmd=open src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0012 dst=null perm=null proto=rpc callerContext=SPARK_HISTORY 2025-02-07 23:00:54,683 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=spark (auth:SIMPLE) ip=/10.240.5.205 cmd=open src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0011 dst=null perm=null proto=rpc callerContext=SPARK_HISTORY 2025-02-07 23:00:54,699 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=spark (auth:SIMPLE) ip=/10.240.5.205 cmd=open src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0011 dst=null perm=null proto=rpc callerContext=SPARK_HISTORY 2025-02-07 23:00:54,715 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=spark (auth:SIMPLE) ip=/10.240.5.205 cmd=open src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0010 dst=null perm=null proto=rpc callerContext=SPARK_HISTORY 2025-02-07 23:00:54,729 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=spark (auth:SIMPLE) ip=/10.240.5.205 cmd=open src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0010 dst=null perm=null proto=rpc callerContext=SPARK_HISTORY 2025-02-07 23:00:54,743 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=spark (auth:SIMPLE) ip=/10.240.5.205 cmd=open src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0009 dst=null perm=null proto=rpc callerContext=SPARK_HISTORY 2025-02-07 23:00:54,755 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=spark (auth:SIMPLE) ip=/10.240.5.205 cmd=open src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0009 dst=null perm=null proto=rpc callerContext=SPARK_HISTORY 2025-02-07 23:00:54,767 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=spark (auth:SIMPLE) ip=/10.240.5.205 cmd=open src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0008 dst=null perm=null proto=rpc callerContext=SPARK_HISTORY 2025-02-07 23:00:54,779 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=spark (auth:SIMPLE) ip=/10.240.5.205 cmd=open src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0008 dst=null perm=null proto=rpc callerContext=SPARK_HISTORY 2025-02-07 23:01:04,160 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=spark (auth:SIMPLE) ip=/10.240.5.205 cmd=listStatus src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history dst=null perm=null proto=rpc callerContext=SPARK_HISTORY ``` ### Was this patch authored or co-authored using generative AI tooling? No. --- .../spark/deploy/history/FsHistoryProvider.scala | 3 ++- .../deploy/history/FsHistoryProviderSuite.scala | 13 +++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index ec227d40f21ac..a5e78ffc6d0ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -52,7 +52,7 @@ import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} +import org.apache.spark.util.{CallerContext, Clock, SystemClock, ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.kvstore._ @@ -396,6 +396,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } override def start(): Unit = { + new CallerContext("HISTORY").setCurrentContext() initThread = initialize() } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 852f94bda870d..ff1d319c5703c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -29,6 +29,7 @@ import com.google.common.io.{ByteStreams, Files} import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path} import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem} +import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext} import org.apache.hadoop.security.AccessControlException import org.mockito.ArgumentMatchers.{any, argThat} import org.mockito.Mockito.{doThrow, mock, spy, verify, when} @@ -1776,6 +1777,18 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P assert(log2.exists()) } + test("SPARK-51136: FsHistoryProvider start should set Hadoop CallerContext") { + val provider = new FsHistoryProvider(createTestConf()) + provider.start() + + try { + val hadoopCallerContext = HadoopCallerContext.getCurrent() + assert(hadoopCallerContext.getContext() === "SPARK_HISTORY") + } finally { + provider.stop() + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: