Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,20 @@ abstract class EventLogFileWriter(
}

protected def closeWriter(): Unit = {
// 1. Flush first to check the errors
writer.foreach(_.flush())
if (writer.exists(_.checkError())) {
logError("Spark detects errors while flushing event logs.")
}
hadoopDataStream.foreach(_.hflush())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hadoopDataStream.foreach(_.hflush()) can throw unhandled IOException, shall we wrap a try-catch here. Maybe we can leverage something like Utils.logIOError for all fallback paths

@dongjoon-hyun dongjoon-hyun Feb 12, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR aims to avoid a silent failure inside checkError. For other propagatable unhandled IOException, SparkContext.stop already logs like the following. So, I didn't use try ... catch ... or Utils.tryLog... intentionally.

Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}

override def stop(): Unit = {
closeWriter()
val appStatusPathIncomplete = getAppStatusFilePath(logDirForAppPath, appId, appAttemptId,
inProgress = true)
val appStatusPathComplete = getAppStatusFilePath(logDirForAppPath, appId, appAttemptId,
inProgress = false)
renameFile(appStatusPathIncomplete, appStatusPathComplete, overwrite = true)
}

In my case, the silent failure happens before renameFile. So, the last log file is not uploaded correctly and inprogress file remains. As a result, SHS shows running stages always because it's not finished.


// 2. Try to close and check the errors
writer.foreach(_.close())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catch exceptions from close() directly to ensure the fallback runs?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of HDFS, it should presumably not throw an exception; instead, a boolean status would be set to true. However, I'm not sure whether third-party libraries will adhere to this convention.

if (writer.exists(_.checkError())) {
logError("Spark detects errors while closing event logs.")
// 3. Ensuring the underlying stream is closed at least (best-effort).
hadoopDataStream.foreach(_.close())
}
}

protected def renameFile(src: Path, dest: Path, overwrite: Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.history

import java.io.{File, FileOutputStream, IOException}
import java.io.{File, FileOutputStream, IOException, OutputStream, PrintWriter}
import java.net.URI

import scala.collection.mutable
Expand Down Expand Up @@ -160,8 +160,152 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon
expectedLines: Seq[String] = Seq.empty): Unit
}

/**
* A test OutputStream that simulates IO errors.
*/
class ErrorThrowingOutputStream extends OutputStream {
var throwOnWrite: Boolean = false
var throwOnFlush: Boolean = false
var throwOnClose: Boolean = false

override def write(b: Int): Unit = {
if (throwOnWrite) {
throw new IOException("Simulated write error")
}
}

override def write(b: Array[Byte], off: Int, len: Int): Unit = {
if (throwOnWrite) {
throw new IOException("Simulated write error")
}
}

override def flush(): Unit = {
if (throwOnFlush) {
throw new IOException("Simulated flush error")
}
}

override def close(): Unit = {
if (throwOnClose) {
throw new IOException("Simulated close error")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we match the synthetic error msg in the tests make sure that we capture the right one

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case matches Spark's code's exception message like the following. Instead of this test suite string.

assert(warningMessages.exists(_.contains("Spark detects errors while flushing")),
assert(warningMessages.exists(_.contains("Spark detects errors while closing")),

}
}
}

/**
* A testable subclass of SingleEventLogFileWriter that exposes the writer field
* and closeWriter method for testing.
*/
class TestableSingleEventLogFileWriter(
appId: String,
appAttemptId: Option[String],
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) {

def setWriterForTest(pw: PrintWriter): Unit = {
writer = Some(pw)
}

def callCloseWriter(): Unit = {
closeWriter()
}
}

class SingleEventLogFileWriterSuite extends EventLogFileWritersSuite {

test("SPARK-55495: closeWriter should log warning when flush error occurs") {
val appId = getUniqueApplicationId
val attemptId = None
val conf = getLoggingConf(testDirPath, None)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

val writer = new TestableSingleEventLogFileWriter(
appId, attemptId, testDirPath.toUri, conf, hadoopConf)

// Create a PrintWriter with an ErrorThrowingOutputStream
val errorStream = new ErrorThrowingOutputStream()
val printWriter = new PrintWriter(errorStream)

// Simulate an error by writing to a closed stream that causes checkError to return true
errorStream.throwOnWrite = true
// scalastyle:off println
printWriter.println("test") // This will set the error flag
// scalastyle:on println

writer.setWriterForTest(printWriter)

val logAppender = new LogAppender("closeWriter flush error test")
withLogAppender(logAppender, level = Some(org.apache.logging.log4j.Level.WARN)) {
writer.callCloseWriter()
}

val warningMessages = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
assert(warningMessages.exists(_.contains("Spark detects errors while flushing")),
s"Expected warning message not found. Messages: $warningMessages")
}

test("SPARK-55495: closeWriter should log warning when close error occurs") {
val appId = getUniqueApplicationId
val attemptId = None
val conf = getLoggingConf(testDirPath, None)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

val writer = new TestableSingleEventLogFileWriter(
appId, attemptId, testDirPath.toUri, conf, hadoopConf)

// Create a PrintWriter with an ErrorThrowingOutputStream that errors on close
val errorStream = new ErrorThrowingOutputStream()
val printWriter = new PrintWriter(errorStream)

// First write something successfully
// scalastyle:off println
printWriter.println("test")
// scalastyle:on println
printWriter.flush()

// Now set up to error on close
errorStream.throwOnClose = true

writer.setWriterForTest(printWriter)

val logAppender = new LogAppender("closeWriter close error test")
withLogAppender(logAppender, level = Some(org.apache.logging.log4j.Level.WARN)) {
writer.callCloseWriter()
}

val warningMessages = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
assert(warningMessages.exists(_.contains("Spark detects errors while closing")),
s"Expected warning message not found. Messages: $warningMessages")
}

test("SPARK-55495: closeWriter should complete without warnings when no errors") {
val appId = getUniqueApplicationId
val attemptId = None
val conf = getLoggingConf(testDirPath, None)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

val writer = new TestableSingleEventLogFileWriter(
appId, attemptId, testDirPath.toUri, conf, hadoopConf)

// Create a normal PrintWriter with no errors
val normalStream = new ErrorThrowingOutputStream()
val printWriter = new PrintWriter(normalStream)

writer.setWriterForTest(printWriter)

val logAppender = new LogAppender("closeWriter no error test")
withLogAppender(logAppender, level = Some(org.apache.logging.log4j.Level.WARN)) {
writer.callCloseWriter()
}

val warningMessages = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
assert(!warningMessages.exists(_.contains("Spark detects errors")),
s"Unexpected warning message found. Messages: $warningMessages")
}

test("Log overwriting") {
val appId = "test"
val appAttemptId = None
Expand Down