diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 4e3bee1015ff3..7c022c283db41 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -131,7 +131,20 @@ abstract class EventLogFileWriter( } protected def closeWriter(): Unit = { + // 1. Flush first to check the errors + writer.foreach(_.flush()) + if (writer.exists(_.checkError())) { + logError("Spark detects errors while flushing event logs.") + } + hadoopDataStream.foreach(_.hflush()) + + // 2. Try to close and check the errors writer.foreach(_.close()) + if (writer.exists(_.checkError())) { + logError("Spark detects errors while closing event logs.") + // 3. Ensuring the underlying stream is closed at least (best-effort). + hadoopDataStream.foreach(_.close()) + } } protected def renameFile(src: Path, dest: Path, overwrite: Boolean): Unit = { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index d9d6a4f8d35df..00a92c503be4e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.history -import java.io.{File, FileOutputStream, IOException} +import java.io.{File, FileOutputStream, IOException, OutputStream, PrintWriter} import java.net.URI import scala.collection.mutable @@ -160,8 +160,152 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon expectedLines: Seq[String] = Seq.empty): Unit } +/** + * A test OutputStream that simulates IO errors. + */ +class ErrorThrowingOutputStream extends OutputStream { + var throwOnWrite: Boolean = false + var throwOnFlush: Boolean = false + var throwOnClose: Boolean = false + + override def write(b: Int): Unit = { + if (throwOnWrite) { + throw new IOException("Simulated write error") + } + } + + override def write(b: Array[Byte], off: Int, len: Int): Unit = { + if (throwOnWrite) { + throw new IOException("Simulated write error") + } + } + + override def flush(): Unit = { + if (throwOnFlush) { + throw new IOException("Simulated flush error") + } + } + + override def close(): Unit = { + if (throwOnClose) { + throw new IOException("Simulated close error") + } + } +} + +/** + * A testable subclass of SingleEventLogFileWriter that exposes the writer field + * and closeWriter method for testing. + */ +class TestableSingleEventLogFileWriter( + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration) + extends SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) { + + def setWriterForTest(pw: PrintWriter): Unit = { + writer = Some(pw) + } + + def callCloseWriter(): Unit = { + closeWriter() + } +} + class SingleEventLogFileWriterSuite extends EventLogFileWritersSuite { + test("SPARK-55495: closeWriter should log warning when flush error occurs") { + val appId = getUniqueApplicationId + val attemptId = None + val conf = getLoggingConf(testDirPath, None) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + + val writer = new TestableSingleEventLogFileWriter( + appId, attemptId, testDirPath.toUri, conf, hadoopConf) + + // Create a PrintWriter with an ErrorThrowingOutputStream + val errorStream = new ErrorThrowingOutputStream() + val printWriter = new PrintWriter(errorStream) + + // Simulate an error by writing to a closed stream that causes checkError to return true + errorStream.throwOnWrite = true + // scalastyle:off println + printWriter.println("test") // This will set the error flag + // scalastyle:on println + + writer.setWriterForTest(printWriter) + + val logAppender = new LogAppender("closeWriter flush error test") + withLogAppender(logAppender, level = Some(org.apache.logging.log4j.Level.WARN)) { + writer.callCloseWriter() + } + + val warningMessages = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + assert(warningMessages.exists(_.contains("Spark detects errors while flushing")), + s"Expected warning message not found. Messages: $warningMessages") + } + + test("SPARK-55495: closeWriter should log warning when close error occurs") { + val appId = getUniqueApplicationId + val attemptId = None + val conf = getLoggingConf(testDirPath, None) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + + val writer = new TestableSingleEventLogFileWriter( + appId, attemptId, testDirPath.toUri, conf, hadoopConf) + + // Create a PrintWriter with an ErrorThrowingOutputStream that errors on close + val errorStream = new ErrorThrowingOutputStream() + val printWriter = new PrintWriter(errorStream) + + // First write something successfully + // scalastyle:off println + printWriter.println("test") + // scalastyle:on println + printWriter.flush() + + // Now set up to error on close + errorStream.throwOnClose = true + + writer.setWriterForTest(printWriter) + + val logAppender = new LogAppender("closeWriter close error test") + withLogAppender(logAppender, level = Some(org.apache.logging.log4j.Level.WARN)) { + writer.callCloseWriter() + } + + val warningMessages = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + assert(warningMessages.exists(_.contains("Spark detects errors while closing")), + s"Expected warning message not found. Messages: $warningMessages") + } + + test("SPARK-55495: closeWriter should complete without warnings when no errors") { + val appId = getUniqueApplicationId + val attemptId = None + val conf = getLoggingConf(testDirPath, None) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + + val writer = new TestableSingleEventLogFileWriter( + appId, attemptId, testDirPath.toUri, conf, hadoopConf) + + // Create a normal PrintWriter with no errors + val normalStream = new ErrorThrowingOutputStream() + val printWriter = new PrintWriter(normalStream) + + writer.setWriterForTest(printWriter) + + val logAppender = new LogAppender("closeWriter no error test") + withLogAppender(logAppender, level = Some(org.apache.logging.log4j.Level.WARN)) { + writer.callCloseWriter() + } + + val warningMessages = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + assert(!warningMessages.exists(_.contains("Spark detects errors")), + s"Unexpected warning message found. Messages: $warningMessages") + } + test("Log overwriting") { val appId = "test" val appAttemptId = None