fix(gossipsub): queue early publishes until identify completes#1201
fix(gossipsub): queue early publishes until identify completes#1201imApoorva36 wants to merge 10 commits intolibp2p:mainfrom
Conversation
e21772b to
69c5cbf
Compare
|
@seetadev @sumanjeet0012 this PR is now ready for review, would love your thoughts |
libp2p/pubsub/gossipsub.py
Outdated
| topic not in pubsub.peer_topics | ||
| or peer_id not in pubsub.peer_topics[topic] | ||
| ): | ||
| self._pending_messages[peer_id].append(rpc_msg) |
There was a problem hiding this comment.
I'm concerned that there isn't a limit or TTL for queued messages. If a peer never subscribes to a topic, they could accumulate unbounded pending messages and cause memory issues.
There was a problem hiding this comment.
Good point I think I missed out on that, I’ll add a per-peer cap and TTL so pending messages don't grow without limit
| # point. Flushing happens in ``flush_pending_messages`` which is | ||
| # called by Pubsub._handle_new_peer after the stream is registered. | ||
|
|
||
| async def flush_pending_messages(self, peer_id: ID) -> None: |
There was a problem hiding this comment.
It appears that the pending message queue accumulates messages on any topic, then flushes them all to the new peer regardless of which topic they subscribed to. Is that the desired behavior? It would seem we would want to filter and flush only relevant messages to the peer.
There was a problem hiding this comment.
ah right, I'll make it per topic instead of per peer, good point
| ) | ||
| for rpc_msg in queued: | ||
| try: | ||
| await pubsub.write_msg(stream, rpc_msg) |
There was a problem hiding this comment.
In the normal publish path (GossipSub.publish) we use a Publish gate:
if self.scorer is not None and not self.scorer.allow_publish:But here we do not. Should we be making the same checks?
There was a problem hiding this comment.
right I missed out on that I believe, will add scorer.allow_publish gate to queued publishes
| rpc_msg.senderRecord = envelope_bytes | ||
|
|
||
| try: | ||
| await pubsub.write_msg(stream, rpc_msg) |
There was a problem hiding this comment.
Same as above: In the normal publish path (GossipSub.publish) we use a Publish gate:
if self.scorer is not None and not self.scorer.allow_publish:But here we do not. Should we be making the same checks?
|
Thank you so much @pacrob for the thorough and thoughtful review: appreciate you digging into the edge cases here. Your points around queue bounds/TTL, topic filtering during flush, and maintaining consistency with the allow_publish gate are especially important for correctness and long-term stability. @imApoorva36 this is shaping up to be an important fix for the early-publish timing issue: could you please address the feedback raised above by @pacrob? Specifically: Introduce a bounded queue or TTL to avoid unbounded memory growth. Ensure we flush only messages relevant to the peer’s subscribed topics. Apply the same publish gate (scorer.allow_publish) logic during flush to maintain behavioral parity with the normal publish path. Please reply below the feedback as you resolve each of the points. CCing @Winter-Soren, @yashksaini-coder and @sumanjeet0012, who will help you arrive at a good conclusion on the issue. Once these are incorporated, this will significantly strengthen the robustness of GossipSub around identify timing and avoid subtle production issues. Great progress overall — looking forward to the next iteration 🚀 |
GossipSub could drop messages published immediately after connecting to a
peer, before the identify handshake completed. While #893 prevented a
crash, the behavior was still timing-dependent.
This PR makes publishing identify-aware by queueing messages for peers
whose protocol negotiation is still in progress and flushing them once
the peer stream is registered.
Includes regression tests for early publish scenarios.
Closes #887