KAFKA-19571: Race condition between log segment flush and file deletion causing log dir to go offline#20289
Conversation
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
junrao
left a comment
There was a problem hiding this comment.
@itoumlilt : Thanks for identifying the issue and submitting a PR. Left a comment.
| } | ||
| }); | ||
| } catch (ClosedChannelException e) { | ||
| if (!log.file().exists()) { |
There was a problem hiding this comment.
In replaceCurrentWithFutureLog(), we rename the log dir to deleted, close the channel and schedule it to be deleted asynchronously. If we get here, it's possible that the renamed file still exists.
Also, while this fixes the issue with flush, the issue with closed channel could be exposed through read too and therefore cause the same issue of forcing the log directory to be offline.
An alternative is to avoid closing srcLog in replaceCurrentWithFutureLog(). The segments will be closed when the log is deleted after a delay. By that time, the expectation is that there won't be any pending flushes, reads, etc on the log. This approach is also consistent with the approach in LogManager.asyncDelete().
There was a problem hiding this comment.
@junrao Thanks for the detailed review! You were right: handling this at the flush() level was incomplete as it didn't cover read operations.
I've updated the PR to implement your suggestion:
- Reverted the changes to
LogSegment.java. - Modified
LogManager.replaceCurrentWithFutureLogto not close the source log immediately. This keeps the channel open for any pending operations on the renamed (.delete) directory. - Added a new unit test testReplaceCurrentWithFutureLogDoesNotCloseSourceLog in LogManagerTest to verify that the source log remains open during the swap.
Please let me know if this looks good!
|
This PR is being marked as stale since it has not had any activity in 90 days. If you If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
b3c93af to
7455a35
Compare
|
Proposed a new fix implementation, amend & push forced to previous commit. |
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the PR, I left a couple of suggestions
| @@ -1249,7 +1247,6 @@ class LogManager(logDirs: Seq[File], | |||
| srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), true) | |||
| // Now that replica in source log directory has been successfully renamed for deletion. | |||
There was a problem hiding this comment.
If we decide to not close the log here, can we update the comment too? including explaining why
There was a problem hiding this comment.
Done in 19057e4 , updated the comment to explain the reasoning.
| @@ -43,7 +43,7 @@ import java.util.{Collections, Optional, OptionalLong, Properties} | |||
| import org.apache.kafka.server.metrics.KafkaYammerMetrics | |||
| import org.apache.kafka.server.storage.log.FetchIsolation | |||
| import org.apache.kafka.server.util.{FileLock, KafkaScheduler, MockTime, Scheduler} | |||
| import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogMetricNames, LogManager => JLogManager, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog} | |||
| import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogMetricNames, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog, LogManager => JLogManager} | |||
There was a problem hiding this comment.
Updated the import reordering in 19057e4
|
I've seen a similar failure. |
|
Thanks @mimaison for your review and for sharing this stacktrace! We've encountered the same issue in production and this patch has resolved it for us. |
19057e4 to
7d46e72
Compare
showuon
left a comment
There was a problem hiding this comment.
Overall LGTM, left a comment to improve the test to make sure when the race condition happened again, the logSegment.flush() won't have exception thrown.
| verify(spyCurrentLog, never()).close() | ||
|
|
||
| // Verify the source log was renamed to .delete | ||
| assertTrue(spyCurrentLog.dir.getName.endsWith(LogFileUtils.DELETE_DIR_SUFFIX)) |
There was a problem hiding this comment.
Could we also verify that in this situation (i.e. after replaceCurrentWithFutureLog is invoked without channel closed), the logSegment.flush() can be invoked without error?
| // Verify that flush() can be called without error (no ClosedChannelException) | ||
| val flushLog: Executable = () => spyCurrentLog.flush(false) | ||
| assertDoesNotThrow(flushLog) |
There was a problem hiding this comment.
To trigger the flush, we have to set flushOffset > localLog.recoveryPoint() (here). Because there's no any data, the flushOffset is 0 and recoveryPoint is 0, too. I just tested it, and this flush will not throw exception even if we close the srcLog as before. We have to make the flushOffset > 0 to trigger the exception, something like this:
// Verify that flush() can be called without error (no ClosedChannelException)
when(spyCurrentLog.logEndOffset()).thenReturn(100L)
val flushLog: Executable = () => spyCurrentLog.flush(false)
assertDoesNotThrow(flushLog)
A race condition can occur during replica rebalancing where a log segment's file is closed and deleted/renamed while an asynchronous flush or read operation is still pending. This would previously cause an unhandled ClosedChannelException, leading the ReplicaManager to mark the entire log directory as offline. The fix involves removing the explicit close() of the source log in replaceCurrentWithFutureLog(). By leaving the channel open, concurrent operations can complete successfully on the renamed files (which are moved to the .delete directory). The log is already scheduled for asynchronous deletion (via addLogToBeDeleted), which ensures that the log and its resources will be properly closed and deleted by the background deletion thread after the configured file delete delay. A new unit test `testReplaceCurrentWithFutureLogDoesNotCloseSourceLog` in `LogManagerTest` has been added to verify that the source log is not closed during the swap operation. The fix involves removing the explicit close() of the source log in replaceCurrentWithFutureLog(). By leaving the channel open, concurrent operations can complete successfully on the renamed files. The log is already scheduled for asynchronous deletion (via addLogToBeDeleted), which ensures that the log and its resources will be properly closed and deleted by the background deletion thread after the configured file delete delay.
Verify that flush() can be called without ClosedChannelException after replaceCurrentWithFutureLog, confirming the race condition is resolved.
The flush only occurs when flushOffset > localLog.recoveryPoint()
10e33f4 to
da387d0
Compare
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the PR, LGTM
I was wondering if there was a way to verify that UnifiedLog.flush() was actually calling the underlying flush() methods on LocalLog/LogSegment after the assertDoesNotThrow() assertion. But I couldn't find a nice way to do so.
|
@junrao Do you want to take another look? |
|
Ok I'll go ahead and merge |
…on causing log dir to go offline (#20289) Reviewers: Jun Rao <jun@confluent.io>, Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
junrao
left a comment
There was a problem hiding this comment.
@itoumlilt : Thanks for the updated PR. Sorry for the late review. LGTM. Just a minor comment.
| // operations (e.g., log flusher, fetch requests) might encounter ClosedChannelException. | ||
| // The log will be deleted asynchronously by the background delete-logs thread. | ||
| // File handles are intentionally left open; Unix semantics allow the renamed files | ||
| // to remain accessible until all handles are closed. |
There was a problem hiding this comment.
//The log will be deleted asynchronously by the background delete-logs thread. // File handles are intentionally left open; Unix semantics allow the renamed files // to remain accessible until all handles are closed.
How about the following?
File handles are intentionally left open; Unix semantics allow the renamed files
to remain accessible until all handles are closed.
The log will be deleted asynchronously by the background delete-logs thread.
File handles are closed and files are deleted after a configured delay log.segment.delete.delay.ms.
At that time, the expectation is that no other concurrent operations need to access
the deleted file handles any more.
|
@itoumlilt Could you update the Proposed Fix section in the jira since it's outdated? @mimaison : Since this is cherry-picked to 4.1, should we cherry-pick this to 4.2 if there is another RC? |
|
Yes, we should backport to 4.2. I asked Christo on the dev list to see if we can sneak it in for 4.2.0. Otherwise I'll backport to 4.2 once 4.2.0 is out. I've done the backport to 4.1 yesterday, I'll do it for 4.0 next week (or feel free to do it if you have time) |
…on causing log dir to go offline (#20289) Reviewers: Jun Rao <jun@confluent.io>, Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
…on causing log dir to go offline (#20289) Reviewers: Jun Rao <jun@confluent.io>, Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
|
Cherry picked to 4.0, 4.1 and 4.2. The fix will be in 4.0.2, 4.1.2 and 4.2.0 |
Following JIRA Ticket: https://issues.apache.org/jira/browse/KAFKA-19571
A race condition can occur during replica rebalancing where a log
segment's file is deleted after an asynchronous flush has been scheduled
but before it executes.
This would previously cause an unhandled
ClosedChannelException,leading the
ReplicaManagerto mark the entire log directory asoffline.
The fix involves catching the
ClosedChannelExceptionwithin theLogSegment.flush()method and suppressing it only if the underlyinglog file no longer exists, which is the specific symptom of this race
condition. Legitimate I/O errors on existing files will still be thrown.
Unit test has been added to
LogSegmentTestto verify both the fix andthe case where the exception should still be thrown.