You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This pull request addresses a potential thread synchronization issue identified in FutureCompletingBlockingQueue (see FLINK-37663). The concern is that the wakeUpPuttingThread method signals a putting thread's condition variable but might not remove it from the internal notFull condition queue.
This could lead to an inconsistent state where subsequent signals (triggered when dequeue finds space) are sent to the condition of an already woken or potentially closed thread, effectively losing the signal for a genuinely waiting thread. This scenario could potentially cause source fetchers to stall indefinitely. This change aims to prevent this potential lost signal by ensuring the condition queue remains consistent.
Brief change log
Modified FutureCompletingBlockingQueue#wakeUpPuttingThread(int fetcherIndex) to explicitly remove the corresponding condition variable from the notFull queue after retrieving it and before signaling it.
Added FutureCompletingBlockingQueueTest#testManualWakeupRemovesCondition (or a similar name) to specifically verify that a manually woken thread's condition is removed from the notFull queue and does not interfere with subsequent signals intended for other waiting threads.
Verifying this change
This change added tests and can be verified as follows:
Added FutureCompletingBlockingQueueTest#testManualWakeupRemovesCondition (or your actual test name) which simulates the scenario described in FLINK-37663:
A thread blocks on put().
It gets manually woken up via wakeUpPuttingThread().
Another thread blocks on put().
An element is polled from the queue using poll().
The test verifies that the second thread (which was genuinely waiting) is woken up by the signal triggered during poll(), confirming the first thread's condition was removed upon manual wakeup.
Does this pull request potentially affect one of the following parts:
Dependencies (does it add or upgrade a dependency): no
The public API, i.e., is any changed class annotated with @Public(Evolving): no (FutureCompletingBlockingQueue is @Internal)
The serializers: no
The runtime per-record code paths (performance sensitive): don't know (This queue is used in source connectors' data path; the change adds a queue removal operation which has minor performance overhead?)
Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: don't know (If source connectors stall due to this bug, it can affect checkpointing progress and overall job health/recovery).
The S3 file system connector: don't know (It affects all connectors using the base source framework, potentially including S3 if it uses this mechanism, but the change is in connector-base).
Documentation
Does this pull request introduce a new feature? no
If yes, how is the feature documented? not applicable
This PR is being marked as stale since it has not had any activity in the last 90 days.
If you would like to keep this PR alive, please leave a comment asking for a review.
If the PR has merge conflicts, update it with the latest from the base branch.
This PR has been closed since it has not had any activity in 120 days.
If you feel like this was a mistake, or you would like to continue working on it,
please feel free to re-open the PR and ask for a review.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This pull request addresses a potential thread synchronization issue identified in
FutureCompletingBlockingQueue(see FLINK-37663). The concern is that thewakeUpPuttingThreadmethod signals a putting thread's condition variable but might not remove it from the internalnotFullcondition queue.This could lead to an inconsistent state where subsequent signals (triggered when
dequeuefinds space) are sent to the condition of an already woken or potentially closed thread, effectively losing the signal for a genuinely waiting thread. This scenario could potentially cause source fetchers to stall indefinitely. This change aims to prevent this potential lost signal by ensuring the condition queue remains consistent.Brief change log
FutureCompletingBlockingQueue#wakeUpPuttingThread(int fetcherIndex)to explicitly remove the corresponding condition variable from thenotFullqueue after retrieving it and before signaling it.FutureCompletingBlockingQueueTest#testManualWakeupRemovesCondition(or a similar name) to specifically verify that a manually woken thread's condition is removed from thenotFullqueue and does not interfere with subsequent signals intended for other waiting threads.Verifying this change
This change added tests and can be verified as follows:
FutureCompletingBlockingQueueTest#testManualWakeupRemovesCondition(or your actual test name) which simulates the scenario described in FLINK-37663:put().wakeUpPuttingThread().put().poll().poll(), confirming the first thread's condition was removed upon manual wakeup.Does this pull request potentially affect one of the following parts:
@Public(Evolving): no (FutureCompletingBlockingQueueis@Internal)Documentation