From d5ef65ad88aa0129803926215df1daf4ae3d59ee Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 11 Feb 2026 19:05:31 -0800 Subject: [PATCH 1/5] [SPARK-55495][CORE] Fix `EventLogFileWriters.closeWriter` to handle `checkError` --- .../spark/deploy/history/EventLogFileWriters.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 4e3bee1015ff3..7ae37c2ba2b69 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -131,7 +131,20 @@ abstract class EventLogFileWriter( } protected def closeWriter(): Unit = { + // 1. Flush first to check the errors + writer.foreach(_.flush()) + if (writer.exists(_.checkError())) { + logWarning("Spark detects errors while flushing event logs.") + } + hadoopDataStream.foreach(_.hflush()) + + // 2. Try to close and check the errors writer.foreach(_.close()) + if (writer.exists(_.checkError())) { + logWarning("Spark detects errors while closing event logs.") + } + // 3. Ensuring the underlying stream is closed at least (best-effort). + hadoopDataStream.foreach(_.close()) } protected def renameFile(src: Path, dest: Path, overwrite: Boolean): Unit = { From 5e524c49ac3c11933985bf9cfb72de2f4bc7482e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 11 Feb 2026 20:47:54 -0800 Subject: [PATCH 2/5] Add test cases --- .../history/EventLogFileWritersSuite.scala | 142 +++++++++++++++++- 1 file changed, 141 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index d9d6a4f8d35df..eefa5c61ca464 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.history -import java.io.{File, FileOutputStream, IOException} +import java.io.{File, FileOutputStream, IOException, OutputStream, PrintWriter} import java.net.URI import scala.collection.mutable @@ -160,8 +160,148 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon expectedLines: Seq[String] = Seq.empty): Unit } +/** + * A test OutputStream that simulates IO errors. + */ +class ErrorThrowingOutputStream extends OutputStream { + var throwOnWrite: Boolean = false + var throwOnFlush: Boolean = false + var throwOnClose: Boolean = false + + override def write(b: Int): Unit = { + if (throwOnWrite) { + throw new IOException("Simulated write error") + } + } + + override def write(b: Array[Byte], off: Int, len: Int): Unit = { + if (throwOnWrite) { + throw new IOException("Simulated write error") + } + } + + override def flush(): Unit = { + if (throwOnFlush) { + throw new IOException("Simulated flush error") + } + } + + override def close(): Unit = { + if (throwOnClose) { + throw new IOException("Simulated close error") + } + } +} + +/** + * A testable subclass of SingleEventLogFileWriter that exposes the writer field + * and closeWriter method for testing. + */ +class TestableSingleEventLogFileWriter( + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration) + extends SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) { + + def setWriterForTest(pw: PrintWriter): Unit = { + writer = Some(pw) + } + + def callCloseWriter(): Unit = { + closeWriter() + } +} + class SingleEventLogFileWriterSuite extends EventLogFileWritersSuite { + test("SPARK-55495: closeWriter should log warning when flush error occurs") { + val appId = getUniqueApplicationId + val attemptId = None + val conf = getLoggingConf(testDirPath, None) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + + val writer = new TestableSingleEventLogFileWriter( + appId, attemptId, testDirPath.toUri, conf, hadoopConf) + + // Create a PrintWriter with an ErrorThrowingOutputStream + val errorStream = new ErrorThrowingOutputStream() + val printWriter = new PrintWriter(errorStream) + + // Simulate an error by writing to a closed stream that causes checkError to return true + errorStream.throwOnWrite = true + printWriter.println("test") // This will set the error flag + + writer.setWriterForTest(printWriter) + + val logAppender = new LogAppender("closeWriter flush error test") + withLogAppender(logAppender, level = Some(org.apache.logging.log4j.Level.WARN)) { + writer.callCloseWriter() + } + + val warningMessages = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + assert(warningMessages.exists(_.contains("Spark detects errors while flushing")), + s"Expected warning message not found. Messages: $warningMessages") + } + + test("SPARK-55495: closeWriter should log warning when close error occurs") { + val appId = getUniqueApplicationId + val attemptId = None + val conf = getLoggingConf(testDirPath, None) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + + val writer = new TestableSingleEventLogFileWriter( + appId, attemptId, testDirPath.toUri, conf, hadoopConf) + + // Create a PrintWriter with an ErrorThrowingOutputStream that errors on close + val errorStream = new ErrorThrowingOutputStream() + val printWriter = new PrintWriter(errorStream) + + // First write something successfully + printWriter.println("test") + printWriter.flush() + + // Now set up to error on close + errorStream.throwOnClose = true + + writer.setWriterForTest(printWriter) + + val logAppender = new LogAppender("closeWriter close error test") + withLogAppender(logAppender, level = Some(org.apache.logging.log4j.Level.WARN)) { + writer.callCloseWriter() + } + + val warningMessages = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + assert(warningMessages.exists(_.contains("Spark detects errors while closing")), + s"Expected warning message not found. Messages: $warningMessages") + } + + test("SPARK-55495: closeWriter should complete without warnings when no errors") { + val appId = getUniqueApplicationId + val attemptId = None + val conf = getLoggingConf(testDirPath, None) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + + val writer = new TestableSingleEventLogFileWriter( + appId, attemptId, testDirPath.toUri, conf, hadoopConf) + + // Create a normal PrintWriter with no errors + val normalStream = new ErrorThrowingOutputStream() + val printWriter = new PrintWriter(normalStream) + + writer.setWriterForTest(printWriter) + + val logAppender = new LogAppender("closeWriter no error test") + withLogAppender(logAppender, level = Some(org.apache.logging.log4j.Level.WARN)) { + writer.callCloseWriter() + } + + val warningMessages = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + assert(!warningMessages.exists(_.contains("Spark detects errors")), + s"Unexpected warning message found. Messages: $warningMessages") + } + test("Log overwriting") { val appId = "test" val appAttemptId = None From fa1757ca0e96a8866cf50548bb13528e528bb92a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 11 Feb 2026 22:20:48 -0800 Subject: [PATCH 3/5] Address comments --- .../org/apache/spark/deploy/history/EventLogFileWriters.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 7ae37c2ba2b69..218f7dd5b840b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -142,9 +142,9 @@ abstract class EventLogFileWriter( writer.foreach(_.close()) if (writer.exists(_.checkError())) { logWarning("Spark detects errors while closing event logs.") + // 3. Ensuring the underlying stream is closed at least (best-effort). + hadoopDataStream.foreach(_.close()) } - // 3. Ensuring the underlying stream is closed at least (best-effort). - hadoopDataStream.foreach(_.close()) } protected def renameFile(src: Path, dest: Path, overwrite: Boolean): Unit = { From d090af9c9da8bf2c7fb5e67d4e5b7d085b137c53 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 12 Feb 2026 08:24:24 -0800 Subject: [PATCH 4/5] fix linter --- .../spark/deploy/history/EventLogFileWritersSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index eefa5c61ca464..00a92c503be4e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -231,7 +231,9 @@ class SingleEventLogFileWriterSuite extends EventLogFileWritersSuite { // Simulate an error by writing to a closed stream that causes checkError to return true errorStream.throwOnWrite = true + // scalastyle:off println printWriter.println("test") // This will set the error flag + // scalastyle:on println writer.setWriterForTest(printWriter) @@ -259,7 +261,9 @@ class SingleEventLogFileWriterSuite extends EventLogFileWritersSuite { val printWriter = new PrintWriter(errorStream) // First write something successfully + // scalastyle:off println printWriter.println("test") + // scalastyle:on println printWriter.flush() // Now set up to error on close From 5cc81fc1c6cabd045a3ae94d5295daa3bcf9dc09 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 12 Feb 2026 09:31:33 -0800 Subject: [PATCH 5/5] Address comments --- .../org/apache/spark/deploy/history/EventLogFileWriters.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 218f7dd5b840b..7c022c283db41 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -134,14 +134,14 @@ abstract class EventLogFileWriter( // 1. Flush first to check the errors writer.foreach(_.flush()) if (writer.exists(_.checkError())) { - logWarning("Spark detects errors while flushing event logs.") + logError("Spark detects errors while flushing event logs.") } hadoopDataStream.foreach(_.hflush()) // 2. Try to close and check the errors writer.foreach(_.close()) if (writer.exists(_.checkError())) { - logWarning("Spark detects errors while closing event logs.") + logError("Spark detects errors while closing event logs.") // 3. Ensuring the underlying stream is closed at least (best-effort). hadoopDataStream.foreach(_.close()) }