Skip to content

[DO_NOT_MERGE][FLINK-38544] Validation: Force enable checkpointing during recovery in tests#28558

Draft
rkhachatryan wants to merge 21 commits into
apache:masterfrom
rkhachatryan:cdr-ftw-ena
Draft

[DO_NOT_MERGE][FLINK-38544] Validation: Force enable checkpointing during recovery in tests#28558
rkhachatryan wants to merge 21 commits into
apache:masterfrom
rkhachatryan:cdr-ftw-ena

Conversation

@rkhachatryan

Copy link
Copy Markdown
Contributor

Testing #28554

1996fanrui and others added 18 commits June 23, 2026 15:33
…m toBeConsumedBuffers

Split the single toBeConsumedBuffers queue into two queues with disjoint
responsibilities:

  - recoveredBuffers (new): holds buffers migrated from RecoveredInputChannel
    during construction; consumed by getNextRecoveredBuffer() which retains
    the priority-event interleaving and last-buffer dynamic next-data-type
    detection introduced by FLINK-39018.
  - toBeConsumedBuffers (existing): reverted to its pre-FLINK-39018 role of
    holding FullyFilledBuffer partial-buffer splits only. The recovery-aware
    early branch in getNextBuffer() and the checkpointStarted inflight scan
    no longer touch this queue.

Restores the checkState(toBeConsumedBuffers.isEmpty()) guard in
requestSubpartitions() (removed by cebc174). hasPendingPriorityEvent,
notifyPriorityEvent, and the constructor signature are unchanged.

Pure refactor: no public API change, no new tests; verified by the 9 existing
LocalInputChannelTest regression cases.

(cherry picked from commit 292cc4b)
(cherry picked from commit 7fbfc78)
…lling v2

- Adds BufferRequester, RecoverableInputChannel, RecoveryCheckpointTrigger
  interfaces with their final signatures (including getChannelInfo on
  RecoverableInputChannel and NO_OP singleton on RecoveryCheckpointTrigger).
- Adds RecoveryCheckpointBarrier sentinel + DiskSnapshot data class with
  final 3-arg constructor signature and Chunk / StartPos / empty() helpers.
- ChannelStateWriter gains addInputDataFromSpill and peekWriteResult default
  methods so all callers can compile against the interface without the
  dispatcher implementation landing in this phase.
- RecoveredInputChannel#releaseAllResources visibility: package-private -> public

References to SpillFile in DiskSnapshot's constructor are forward references;
SpillFile itself lands in Phase 3. Each phase commit only needs to compile
as a whole tree at the final commit, not in isolation.

Design: requirements/38544/phase1_interfaces/design.md
(cherry picked from commit 98c7b42)
- Local/Remote InputChannel implement RecoverableInputChannel from Phase 1
- recoveredBuffers reshaped to Deque<Buffer>; allRecoveredBuffersDelivered flag
- getNextBuffer() unified under a single inRecovery predicate
- checkpointStarted split into mutually-exclusive in-recovery / not-in-recovery
- stateConsumedFuture triggered by (allRecoveredBuffersDelivered && queue empty)
- RecoveredInputChannel.toInputChannel migrates via the new push interface;
  the initialRecoveredBuffers constructor parameter is gone.
- LocalInputChannel.getNextRecoveredBuffer helper deleted

Design: requirements/38544/phase2_input_channel/design.md
(cherry picked from commit 8290409)
- New SpillFile: append-only segmented disk store with 64 MiB segments,
  reference counter + cleanedUp guard, and Snapshot view over segments and
  entries. All public signatures (append, snapshot, readBytesAt, acquire,
  release, isClosed) land in this commit; later phases only fill in bodies.
- New FilteredBufferWriter: prefilter + postfilter buffer accumulator,
  flushing the post-filter buffer to disk on rotation.
- New SpillFileWriter: thin facade exposing SpillFile lifecycle to filter
  callers.
- RecoveredChannelStateHandler.recover filter branch routes output to a
  SpillFile instead of channel.onRecoveredStateBuffer; the accumulator's
  prefilter and postfilter buffers are sourced from the source channel's
  exclusive pool (no heap fallback).
- InputChannelRecoveredStateHandler exposes getProducedSpillFile so Phase 4
  drain wiring can pick up the frozen file after filter completes;
  spill-tmp-directories argument is required (no backward-compat shim).

Design: requirements/38544/phase3_spill_writer/design.md
(cherry picked from commit 2cbbbd6)
… removal

- New SpillFileReader implements RecoveryCheckpointTrigger + Closeable.
  drain(): buffer alloc + disk read outside lock; deliver + offset advance
  inside lock. snapshotAndInsertBarriers(cpId): atomic startPos snapshot +
  per-channel barrier insert. Constructor derives the InputChannelInfo map
  internally; bodies pair acquire/release against SpillFile's ref counter.
- New RecoveredChannelBufferRequester delegates to RecoveredInputChannel pool.
- RecoveredInputChannel.requestBufferBlocking heap fallback removed
  (no more MemorySegmentFactory.allocateUnpooledSegment; OOM path eliminated).
- channelIOExecutor wired: filter-on submits drain after conversion completes;
  exceptions bubble via StreamTask.asyncExceptionHandler.

Design: requirements/38544/phase4_spill_reader/design.md
(cherry picked from commit 1315d38)
- ChannelState dispatcher onCheckpointStartedForAllInputs implements
  Step 1 (snapshotAndInsertBarriers) -> Step 2 (per-input checkpointStarted)
  -> Step 3 (addInputDataFromSpill) -> cpId-completion release callback.
- Hook AlternatingWaitingForFirstBarrierUnaligned.barrierReceived and
  AlternatingCollectingBarriers.alignedCheckpointTimeout into the dispatcher.
- ChannelStateWriterImpl.addInputDataFromSpill: async demux by Chunk.channelInfo,
  empty snapshot inline early return, failures propagate via ChannelStateWriteResult.
- Stream task pipelines (One/Two/Multiple) wire ChannelState through the
  InputProcessorUtil + SingleCheckpointBarrierHandler so the dispatcher hook
  reaches the right barrier-handler instance.
- ITCases (relocated under flink-runtime to share the package with SpillFile):
  rescale + filter + large record OOM regression, UC during recovery.

FLINK-38544 spilling v2 feature complete.

Design: requirements/38544/phase5_coordination/design.md
(cherry picked from commit 7badbd2)
Delete the verbose explanatory comments describing the old channel-state
recovery wiring (the recoverySetupCompleteFuture javadoc, the
allOf-vs-thenRun race essay, the setCheckpointingDuringRecoveryEnabled
note). These annotate code that the subsequent async-recovery rewrite
replaces; removing them up front keeps the rewrite diff free of
comment-deletion churn.
Rewrite StreamTask channel-state recovery to run asynchronously on the
channelIOExecutor: split restoreStateAndGates into
recoverChannelsWithCheckpointing / recoverChannelsWithoutCheckpointing,
threading the recovery checkpoint trigger and the fetched-state drainer
through the new SequentialChannelStateReader / FetchedChannelState(Drainer)
/ RecoveryCheckpointTrigger interfaces. Also release channel state before
returning in SequentialChannelStateReaderImpl.
…annel-state read

The async-recovery rewrite put the channel-state read future
(runAsync(readInputChannelState)) into the completeAll(...) set that
gates the recovery-completion suspend() poison mail. That future is
never already-complete, so suspend() was deferred past the start of the
restore mailbox loop. The loop then ran the default action (record
processing) before recovery finished:

 - records were processed before gate conversion/requestPartitions,
   losing them (MultipleInputStreamTaskTest: 10 -> 7);
 - processInput hitting END_OF_INPUT during restore called suspend()
   itself, exiting the loop before the recovery future was done, so
   restoreInternal's checkState(allGatesRecoveredFuture.isDone()) threw
   "Mailbox loop interrupted before recovery was finished"
   (StreamTaskTest.testProcessWith*); and
 - on downscale, recovery could stall entirely
   (UnalignedCheckpointRescaleITCase hang).

Run readInputChannelState as a fire-and-forget feeder instead, and gate
suspend()/recoveryCompletionFuture on the gates' stateConsumedFutures
only. A stateConsumedFuture completes only once the consumer drains the
end-of-state sentinel that readInputChannelState pushes, so gating on it
already implies the read finished, and requestPartitions still runs
after the read. For a task with no recovered state the futures are
already complete, so suspend() is enqueued before the loop runs the
default action -- restoring the pre-rewrite ordering. Unlike suspending
the default action, this does not block the consumer, so recovery that
must drain real channel state still completes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… record-filter context

When checkpointing-during-recovery is disabled, the RecordFilterContext
never spills, so it needs no spill directories. createRecordFilterContext
nevertheless dereferenced
getEnvironment().getIOManager().getSpillingDirectoriesPaths(), which NPEs
in minimal environments that return a null IOManager (e.g.
DummyEnvironment-based StreamTaskTest cases). On the channelIOExecutor
during recovery that NPE was routed to handleAsyncException ->
failExternally, which DummyEnvironment rejects with
UnsupportedOperationException, escalating to the fatal error handler and
crashing the surefire fork.

Pass an empty spill-directory array for the disabled context instead of
touching the IOManager. The async-recovery rewrite introduced this
IOManager access during recovery; these StreamTaskTest cases passed
before it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drop the needsRecovery and checkpointingDuringRecoveryEnabled fields and
their accessors from the input-gate API. needsRecovery is now passed as a
parameter (requestPartitions(boolean) / convertRecoveredInputChannels(boolean))
and checkpointing-during-recovery is read from the job config at the call
site instead of being pushed onto each gate. Updates the gate mocks and
test builders accordingly.
…ier missing

When collectPreRecoveryBarrier finds no during-recovery sentinel for a
checkpoint in a still-recovering channel, decline with
CHECKPOINT_DECLINED_TASK_NOT_READY (not an IOException). That reason is not
counted against the tolerable-failure threshold, so the checkpoint is
deferred and retried; the recovered buffers stay queued and are captured by
a later checkpoint, so no in-flight data is lost.
A disabled RecordFilterContext never spills, so it does not need spill
directories. Only require non-empty tmpDirectories when
checkpointing-during-recovery is enabled; otherwise tolerate a null/empty
value (e.g. an environment without IOManager spilling directories).
Inline // review / // review nit / // review todo annotations left for the
reviewer; drop this commit before merge.
…state tests

CI (build 76428) failed the spotless-check on flink-runtime for four
recovery test files. Applied spotless:apply (google-java-format);
formatting-only changes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…overy

recoverChannelsWithoutCheckpointing deferred requestPartitions/conversion
until completeAll(all gates' stateConsumedFutures), so no gate converted
until every gate's recovered state had been drained. A selective-reading
multi-input operator only drains the *selected* input's end-of-state
sentinel, so an unselected gate never drained (it is read only after
conversion) while conversion waited for it to drain first -- a circular
wait that deadlocked the restore mailbox loop (parked in
processMailsWhenDefaultActionUnavailable -> mailbox.take()). This hung
StreamTaskSelectiveReadingITCase and other multi-input recovery in CI
(build 76435: tests/table/python groups, watchdog-killed).

Trigger each gate's requestPartitions(false) off its own
getStateConsumedFuture() (restoring the pre-rewrite per-gate behavior
from d9fc48), so a drained gate converts immediately and the reader can
progress; suspend() remains gated on completeAll(futures). The flag-ON
path (recoverChannelsWithCheckpointing) is unaffected -- it is
drainer-driven, not gated on the consumer draining.

Validated: StreamTaskSelectiveReadingITCase clean over 75 runs under
JDK-17 load (was hanging on iteration 1); TaskCheckpointingBehaviourTest,
recovered-channel tests and MultipleInputStreamTaskTest green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@flinkbot

flinkbot commented Jun 27, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Comment on lines +150 to +152
conf, CheckpointingOptions.UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM, true, false);
randomize(
conf, CheckpointingOptions.CHECKPOINTING_DURING_RECOVERY_ENABLED, true, false);
conf, CheckpointingOptions.CHECKPOINTING_DURING_RECOVERY_ENABLED, true);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please enable UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM as well to force enable checkpointing during recovery

rkhachatryan and others added 2 commits June 27, 2026 16:15
…-loop race

With checkpointing-during-recovery enabled, recoverChannelsWithCheckpointing
completes the recovery future asynchronously via a chain of mailbox mails and
channelIOExecutor stages. For a task with no input channels (e.g. a legacy
SourceStreamTask) there is nothing to recover, yet the restore mailbox loop's
default action starts the source. A finite/fast source can finish and call
mailboxProcessor.suspend() from its completion callback before the asynchronous
recovery future completes, exiting the restore loop early and tripping
checkState(allGatesRecoveredFuture.isDone()) in restoreInternal with
"Mailbox loop interrupted before recovery was finished".

The checkpointing-disabled path (recoverChannelsWithoutCheckpointing) never hit
this because, with no input gates, it completes the recovery future
synchronously, so the recovery-completion suspend() is enqueued before the
restore loop runs the source's default action.

Short-circuit recoverChannelsWithCheckpointing for empty input gates to complete
synchronously as well (trigger straight to NO_OP), matching the disabled path.
The source then starts in the main mailbox loop, as it always did with
checkpointing-during-recovery disabled, and the race is gone.

Reproduced via the heavy-deployment e2e test (parallelism 100), which failed
within 1-3 iterations before the fix and ran 15/15 clean after.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@rkhachatryan rkhachatryan changed the title [DO_NOT_MERGE] Force CDR in tests [DO_NOT_MERGE][FLINK-38544] Validation: Force enable checkpointing during recovery in tests Jun 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants