CAMEL-23391: Fix race condition in channel subscription and improve reconnect advice handling#22874
Conversation
…ove reconnect advice handling
|
🌟 Thank you for your contribution to the Apache Camel project! 🌟 🐫 Apache Camel Committers, please review the following items:
|
|
🧪 CI tested the following changed modules:
All tested modules (10 modules)
|
gnodet
left a comment
There was a problem hiding this comment.
Thanks for the follow-up to #22840 — the race condition fix and reconnect advice improvements are directionally good. A few items to address before this can be merged:
1. Missing JIRA ticket
Per project conventions, PRs should reference a JIRA ticket with the format CAMEL-XXXX: Brief description in the title and branch name. Please create a JIRA ticket and update the PR title/branch accordingly.
2. No tests for changed behavior
The PR changes concurrency semantics (lock + snapshot) and reconnect advice handling (multiple new branches), but adds no tests. The existing SubscriptionHelperTest only covers replay ID logic.
At minimum, the reconnect advice logic could be unit-tested by invoking the connection listener with mocked messages carrying different advice values ("none", "handshake", "retry", null).
3. Incomplete lock coverage — potential lost update on channelsToSubscribe
The new channelsLock protects the compound operations in the handshake and connection listeners, but subscribe() at line 451 calls channelsToSubscribe.add(channelName) under a different lock (lock), not channelsLock. This creates a window where subscribe() adds a channel after the snapshot is taken but before clear() in the connection listener — the channel is added, then immediately cleared, and never subscribed.
Similarly, channelsToSubscribe.remove() at lines 204 and 266 is unprotected, though those are benign (removing from a cleared set is a no-op).
Consider either:
- Using
channelsLockconsistently for all mutations ofchannelsToSubscribe, or - Documenting why the partial coverage is sufficient.
4. Behavioral change when advice is null on failed connect
The old code handshakes when advice == null OR reconnect == "none":
if (message.getAdvice() == null || "none".equals(message.getAdvice().get("reconnect")))The new code only handshakes when reconnect is non-null and not "retry":
if (reconnectAdvice != null && !"retry".equals(reconnectAdvice))When the server sends no advice at all and the error is not a temporary error, the old code would handshake but the new code does nothing. Could this regress certain failure scenarios? If this change is intentional per the Bayeux spec, it would be helpful to add a comment explaining the rationale.
5. Minor question — reconnect advice on successful connect
The new code handles non-retry reconnect advice on successful connect messages. Has this scenario been observed in practice with Salesforce's CometD server, or is it a defensive addition per the spec? Either way is fine, just curious.
Overall this is a solid improvement to the streaming subscription resilience. Addressing the above (especially items 2–4) would make it ready to merge. Thank you for the contribution!
Claude Code on behalf of Guillaume Nodet
#1 (Missing JIRA ticket): We can use this one https://issues.apache.org/jira/browse/CAMEL-23391 #2 (No tests): #3 (Incomplete lock coverage): #4 (Null advice regression): #5 (Successful connect advice): |
…ubscribe() to prevent re-subscription on every connect cycle
|
@gnodet During testing we discovered that subscribe() was adding the channel back to channelsToSubscribe via channelsToSubscribe.add(channelName). Since the connection listener takes a snapshot of channelsToSubscribe, clears it, and calls subscribe() for each consumer — subscribe() was re-adding the channel, causing the next connect cycle to find it non-empty and call subscribe() again. This stacked duplicate CometD listeners on the same channel, resulting in each event being delivered N times (once per stacked listener, where N = number of connect cycles since handshake). The fix removes channelsToSubscribe.add(channelName) from subscribe(). This is safe because channelsToSubscribe is only meant to track channels pending subscription after a handshake — the handshake listener populates it from channelToConsumers.keySet(), which already includes the channel after channelToConsumers.computeIfAbsent(...).add(consumer). |
…in handshake listener
gnodet
left a comment
There was a problem hiding this comment.
Thanks for the thorough responses and the fixes — all the review concerns are addressed:
- Item #3 (lock coverage): The deadlock risk explanation with
lock→channelsLockordering inversion is valid. And more importantly, removingchannelsToSubscribe.add(channelName)fromsubscribe()eliminates the race entirely —channelsToSubscribeis now only mutated underchannelsLock(handshake and connection listeners) plus benignremove()calls. The duplicate event delivery bug you caught is a great find. - Item #4 (null advice): Fixed correctly with
reconnectAdvice == null || !"retry".equals(reconnectAdvice), restoring the original behavior while also covering all non-retry advice per Bayeux spec. The comment explaining the rationale is appreciated. - Item #5 (successful connect): Production log confirms this is a real scenario, not speculative.
- Bonus: The
AUTHENTICATION_INVALID.equals(failureReason)null-safety fix is a nice touch.
One remaining minor item: the PR title should be prefixed with the JIRA ticket per project conventions — e.g., CAMEL-23391: Fix race condition in channel subscription and improve reconnect advice handling.
Claude Code on behalf of Guillaume Nodet
|
@shaipan Thank you for this contribution and for the quick follow-up fixes! The race condition fix with the snapshot approach, the duplicate event delivery bug catch, and the Bayeux-spec-compliant reconnect handling are all solid improvements. Great work! Claude Code on behalf of Guillaume Nodet |
| LOG.debug("Reconnect advice [{}] on failed connect, initiating handshake", reconnectAdvice); | ||
| client.handshake(); | ||
| } else if (isTemporaryError(message)) { | ||
| LOG.debug("Initiating handshake after temporary error: {}", message); |
There was a problem hiding this comment.
In this case, we might need some backoff, try to reuse the existing handshakeBackoff + backoffIncrement pattern already in subscriptionFailed()
There was a problem hiding this comment.
Okay. I will try to test and add in a seprate PR
apupier
left a comment
There was a problem hiding this comment.
Why there are locks on some places around addition/removal of elements for channelsToSubscribe but not for all?
like https://github.com/shaipan/camel/blob/ed6412bdba7da6d3e0281a948d90df28ae9b8f43/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java#L236
https://github.com/shaipan/camel/blob/ed6412bdba7da6d3e0281a948d90df28ae9b8f43/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java#L298
This remove is no-op. I didnt touch this code. |
|
@davsclaus @gnodet @Croway are we commit this change? Will try to add additional suggestion in seprate PR. |
|
LGTM a new PR is likely easier for more work than having forever PRs |
Fix a race condition and improve reconnect behavior in the Streaming API subscription helper.
the connection listener from reading a partially updated set
concurrent modification
on successful connect messages