Skip to content

Flink: FlinkSink data loss with unaligned checkpoints#15913

Closed
UrsSchoenenbergerNu wants to merge 2 commits into
apache:mainfrom
nubank:flinksink_data_loss_with_unaligned_checkpoints
Closed

Flink: FlinkSink data loss with unaligned checkpoints#15913
UrsSchoenenbergerNu wants to merge 2 commits into
apache:mainfrom
nubank:flinksink_data_loss_with_unaligned_checkpoints

Conversation

@UrsSchoenenbergerNu
Copy link
Copy Markdown

@UrsSchoenenbergerNu UrsSchoenenbergerNu commented Apr 8, 2026

Closes #15846

Quick Summary

When running with unaligned checkpoints, data files may reach IcebergFilesCommitter after the associated checkpoint's first barrier.
We then need to account for these files in the state for the 'next' checkpoint to avoid discarding them during recovery or failed checkpoints.

Details

We have recently encountered a case of data loss in an application using FlinkSink (v1), running on Flink 2.x with unaligned checkpoints.

What we saw:

One commit to the Iceberg table was missing approximately half of the data files that were written by the writer during this checkpoint.
Around this time, one checkpoint was started and Flink snapshots were triggered, but this checkpoint timed out before completing.
Based on our research, we believe that the culprit might be the internal state tracking in IcebergFilesCommitter. Here's our current theory:

For unaligned checkpoints, the order of operations happening on IcebergFilesCommitter can change subtly since processElement can be called for a FlinkWriteResult that is part of checkpoint N even after snapshotState was called for checkpoint N:

  • processElement(FlinkWriteResult1[part of checkpoint N]), recorded in writeResultsSinceLastSnapshot
  • snapshotState(checkpoint N)
  • writeToManifestUptoLatestCheckpoint(N) is called, putting the contents of writeResultsSinceLastSnapshot to dataFilesPerCheckpoint(N) and clearing writeResultsSinceLastSnapshot
  • processElement(FlinkWriteResult2[part of checkpoint N]), recorded in writeResultsSinceLastSnapshot (now the single element there)
  • checkpoint N times out, so notifyCheckpointComplete(N) is never called
  • processElement(FlinkWriteResult3[part of checkpoint N+1])
  • snapshotState(checkpoint N+1)
  • writeToManifestUptoLatestCheckpoint(N+1) is called, putting the contents of writeResultsSinceLastSnapshot to dataFilesPerCheckpoint(N) and dataFilesPerCheckpoint(N+1) and clearing writeResultsSinceLastSnapshot. dataFilesPerCheckpoint(N) now loses FlinkWriteResult1 even though it was never committed and contains only FlinkWriteResult2
  • notifyCheckpointComplete(checkpoint N+1)
  • Iceberg commit is started, but contains only FlinkWriteResult2 and FlinkWriteResult3, not FlinkWriteResult1.
    This sequence, or equivalently one where checkpoint N does not time out, but completes after checkpoint N+1, therefore leads to data loss.

This explanation aligns with the effects that we're seeing. The root cause seems to be that snapshotState->writeToManifestUpToLatestCheckpoint->writeResultsSinceLastSnapshot seems to implicitly assume that all records for the checkpoints have already been processed when snapshotState() is called - i.e. it assumes aligned checkpoints. If this assumption breaks, AND in addition a later checkpoint is snapshotted before an earlier one was notified complete, the issue described above is observed.

Additionally, we suspect there's a second failure mode with unaligned checkpoints that loses data on job recovery: If our above theory is correct, then IcebergFilesCommitter has an issue with elements for checkpoint N being processed after snapshotState(N). But the iceberg commit triggered during notifyComplete(N) only commits the records from the snapshot. On recovery, this means one of two things: Either initializeState()->.tailMap(N) loses information about not-yet-committed records; or even if it remembered to do so, it would think that these records should be committed as part of Flink checkpoint N. It feels like there's a problem when a checkpoint ID can appear in dataFilesPerCheckpoint after it has already been used as maxCommittedCheckpointId, and the strict tailMap exclusion has no way to distinguish "already committed" from "deferred post-barrier" data.

Contents of this PR

This PR contains three test cases that trigger the test harness in ways that are only possible when using unaligned checkpoints. Without the associated code change in prod code, these test cases fail.

Disclaimer / disclosure as per contributing guidelines

AI tools were used to initially pinpoint the possible issue and draft test cases. The code was subsequently rewritten condensed and clarified by hand. To the best of my knowledge, I believe that the way in which the reproduction test cases trigger the harness are all cases that DO happen with unaligned checkpoints, and the data loss that we had encountered at runtime aligns with the one that happens in these reproduction test cases.

Caveats

The logic that I'm changing here was last changed by #10526 while fixing a data duplication bug. The test cases that were added for that bug are still green with the changes that I'm proposing here. I would still like to tag @pvary and @zhongqishang and kindly ask them for advice on this issue here and this associated PR, as I'm sure they have a much deeper understanding of the inner workings of IcebergFileCommitter than I do.

(issue apache#15846)

When running with unaligned checkpoints, data files
may reach IcebergFilesCommitter after the associated
checkpoint's first barrier.

We then need to account for these files in the state
for the 'next' checkpoint to avoid discarding them during
recovery or failed checkpoints.
@github-actions github-actions Bot added the flink label Apr 8, 2026
Copy link
Copy Markdown
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough analysis! I'll have a look. Could this also affect the V2 IcebergSink?

Comment on lines +468 to +470
return sourceCheckpointIdAlreadyCommitted || sourceCheckpointIdHasDataInSnapshot
? checkpointId
: sourceCheckpointId;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct under all scenarios? This will mean that we include files of an older snapshot into the current checkpoint, but IMHO they need to be in the following snapshot (sourceCheckpointId + 1), not in the latest. There can be outstanding WriteResults for multiple snapshots, which have not been committed to Iceberg.

Otherwise, we violate the order in case of deletions.

Copy link
Copy Markdown
Author

@UrsSchoenenbergerNu UrsSchoenenbergerNu Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me clarify please if I understand your point correctly: by just using checkpointId we might run into a case where for the next commit, the deletes might have been pre-barrier and the data post-barrier or the other way round, creating an inconsistent commit?

That seems like a variation of the problem that #10526 tried to fix. The tests introduced in that PR pass with my modifications, so the scenario must be slightly more complicated.

Regarding your proposed fix of sourceCheckpointId + 1 works, I am worried that this might produce new data loss victims due to the tailMap call on operator recovery. At least in my head, I had convinced myself that only checkpointId (current barrier) is safe enough to not lose any data on recovery.

On a higher level, I am wondering whether there even is an implementation that can maintain consistent commits when faced with both equality delete files and unaligned checkpoints. Isn't there something inherently at odds with these two concepts as we can always have deletions pre-barrier and data post-barrier or the other way round?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. To be honest, I don't have a ton of experience with unaligned checkpoints, but it looks like it violates the fundamental assumption that we can flush any processed records before the checkpoint completes. If we cannot guarantee that, deletes can be applied too late, e.g. in the next snapshot, which is not correct. We can only guarantee this if we wait on some kind of barrier, which is exactly what the regular synchronous checkpointing does.

@pvary
Copy link
Copy Markdown
Contributor

pvary commented Apr 15, 2026

We have discussed unaligned checkpoints with @gyfora previously and if I remember correctly we decided that the unaligned checkpoint concept does not really allow committing a consistent state to the Iceberg table. We might be able to work around this by adding some extra checks/barriers but the end result will be something like an aligned checkpoint implemented by hand, so we decided that it doesn't seem to worth the effort.

If we all agree on this, we might update the documentation to explicitly state that aligned checkpoints are a requirement for a Flink Sink. Maybe even add a check which would prevent adding a Flink Sink when unaligned checkpoint is used?

@github-actions
Copy link
Copy Markdown

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions Bot added the stale label May 16, 2026
@github-actions
Copy link
Copy Markdown

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions Bot closed this May 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Data loss with FlinkSink (v1) - due to unaligned checkpoints?

3 participants