Refactor topic reader/writer onto a shared stream stack#842
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #842 +/- ##
==========================================
+ Coverage 81.01% 81.03% +0.02%
==========================================
Files 94 96 +2
Lines 12091 12099 +8
Branches 1182 1185 +3
==========================================
+ Hits 9795 9804 +9
- Misses 1837 1840 +3
+ Partials 459 455 -4
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
@vgvoleg Please set up your personal token, see instructions: https://nda.ya.ru/t/dur-L1y77biDC2 |
b6ab1c8 to
f2eaa09
Compare
There was a problem hiding this comment.
Pull request overview
Refactors the topic reader/writer asyncio implementations to share a common bidi-stream lifecycle + reconnect loop via new base classes in ydb/_topic_common (StreamReconnector, StreamConnection), reducing duplication and addressing reconnect/close-ordering issues.
Changes:
- Introduces
StreamReconnector(shared reconnect loop/backoff/close ordering) andStreamConnection(shared stream ownership, connect handshake, update-token loop). - Migrates topic reader and writer asyncio reconnectors/streams to thin subclasses using the shared stack.
- Updates and extends unit tests to validate the new structural “no zombie stream on handshake cancel” guarantee.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| ydb/_topic_writer/topic_writer_asyncio.py | Writer migrated to StreamReconnector/StreamConnection; reconnect hooks and close/flush ordering refactored. |
| ydb/_topic_writer/topic_writer_asyncio_test.py | Tests updated for _new_connection-based reconnect; adds handshake-cancel stream-close test. |
| ydb/_topic_reader/topic_reader_asyncio.py | Reader migrated to StreamReconnector/StreamConnection; reconnect loop replaced by shared hooks. |
| ydb/_topic_reader/topic_reader_asyncio_test.py | Tests updated for _new_connection-based reconnect; adds handshake-cancel stream-close test and close cleanup. |
| ydb/_topic_common/STREAM_DESIGN.md | Documents the shared stream stack architecture, hooks, signals, and invariants. |
| ydb/_topic_common/common_test.py | Adds unit tests covering StreamReconnector/StreamConnection base behavior (reconnect, cancellation, close swallowing). |
| ydb/_topic_common/_stream_reconnector.py | New shared reconnect loop implementation with hooks for concrete reader/writer logic. |
| ydb/_topic_common/_stream_connection.py | New shared connection lifecycle (sync wrapper ownership, connect handshake, update-token loop, per-conn death signal). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
AI Review Summary
Verdict: ✅ No critical issues found
Critical issues
No critical issues found.
Other findings
- Minor | Medium:
asyncio.sleep(self._update_token_interval or 0)creates a hot loop when_update_token_intervalisNone—ydb/_topic_common/_stream_connection.py:74 - Minor | Medium:
self._conn = connbefore handshake exposes half-initialized connection to concurrent consumers during failed handshakes —ydb/_topic_common/_stream_reconnector.py:85 - Nit | High: Legacy
create()+_start()paths allocate a duplicateGrpcWrapperAsyncIOfromStreamConnection.__init__that is immediately overridden —ydb/_topic_reader/topic_reader_asyncio.py:492/ydb/_topic_writer/topic_writer_asyncio.py:787 - Nit | Medium:
_stream_readerbackward-compatibility property onReaderReconnectorappears unused — can be removed unless external callers depend on it —ydb/_topic_reader/topic_reader_asyncio.py:249
This review was generated automatically. Critical issues require attention; other findings are advisory.
If this comment was useful, please give it a 👍 — it helps us improve the review bot.
|
Analysis performed by claude, claude-opus-4-6. |
f2eaa09 to
73c2519
Compare
| def _set_first_error(self, err: BaseException) -> None: | ||
| """Record the first error that ended this connection; later errors are ignored.""" | ||
| try: | ||
| self._first_error.set_result(err) | ||
| except asyncio.InvalidStateError: | ||
| pass | ||
| self._on_first_error() | ||
|
|
| Subclasses must set up their own attributes BEFORE calling ``super().__init__()``, | ||
| because that call schedules the connection loop which immediately uses ``_connect``. | ||
| """ |
| self._loop = asyncio.get_running_loop() | ||
| self._stub = stub | ||
| self._method = method | ||
| # Built (not started) here so the connection owns its transport before connect()'s first | ||
| # network await — that is what makes the no-zombie guarantee structural. The legacy | ||
| # _start(stream, ...) injection path overrides this with an externally provided stream. | ||
| self._stream: IGrpcWrapperAsyncIO = GrpcWrapperAsyncIO(from_proto) | ||
| self._background_tasks: Set[asyncio.Task] = set() |
| def receive_batch_with_tx_nowait(self, tx: "BaseQueryTxContext", max_messages: Optional[int] = None): | ||
| if self._stream_reader is None: | ||
| if self._conn is None: | ||
| return None | ||
| batch = self._stream_reader.receive_batch_nowait( | ||
| batch = self._conn.receive_batch_nowait( | ||
| max_messages=max_messages, | ||
| ) |
Extract the reconnect and bidi-stream lifecycle shared by the topic reader and writer into two base classes in ydb/_topic_common: - StreamReconnector: the single reconnect loop, backoff, fatal signal and close ordering, via _new_connection()/_handshake()/_run() hooks. Because _new_connection() is synchronous and the connection owns its gRPC stream before the first network await, "one live stream, no zombie" becomes structural — a cancel mid-handshake always closes the stream (this now covers the writer too, which had no such guard). - StreamConnection: the bidi-stream lifecycle (gRPC wrapper, connect = start + init handshake, update-token loop, per-connection wait_error). The reader/writer reconnectors and stream objects become thin subclasses; _run(conn) = conn.wait_error() for both. Also fixes the writer reconnect backoff (attempt was reset every loop iteration, a retry storm at the backoff floor) and a flush-on-close deadlock.
73c2519 to
5bac08f
Compare
| attempt = 0 | ||
| while not closed: | ||
| conn = self._new_connection() # SYNC: builds the connection, which owns its gRPC stream | ||
| self._conn = conn # owned BEFORE the first network await |
There was a problem hiding this comment.
Severity: Minor
Confidence: High
The pseudocode shows self._conn = conn before await self._handshake(conn), but the actual implementation in _stream_reconnector.py:84-89 assigns _conn after the handshake succeeds. The structural no-zombie guarantee relies on the local conn variable being available in the finally block, not on early _conn assignment. As written, this pseudocode could mislead maintainers into thinking consumers can observe the connection before the handshake completes (they cannot — _conn is only published after a full handshake).
Suggested fix: swap the two lines in the pseudocode to match the actual code:
conn = self._new_connection() # SYNC
try:
await self._handshake(conn)
self._conn = conn # published only AFTER successful handshake| # ------------------------------------------------------------------ the one reconnect loop | ||
|
|
||
| async def _connection_loop(self) -> None: | ||
| attempt = 0 |
There was a problem hiding this comment.
Severity: Minor
Confidence: Medium
The PR description mentions two important bug fixes: (1) writer reconnect backoff storm (attempt was reset every iteration in the old code, now fixed by declaring attempt = 0 outside the while loop at line 74), and (2) flush-on-close deadlock (writer now flushes before _closed=True at lines 334-336).
Both fixes look correct, but neither has a regression test that would catch a reintroduction. Consider adding:
- A test that verifies
attemptgrows across consecutive retriable connect failures (e.g., assert the backoff sleep increases on the 2nd and 3rd failures rather than staying at the floor). - A test that reproduces the deadlock scenario: enqueue a message, drop the connection during
flush(), and verify the reconnect loop brings up a new connection to complete the flush (rather than deadlocking because_closed=Trueprevents reconnection).
| "writer reconnector %s retry in %s seconds", | ||
| self._id, | ||
| err_info.sleep_timeout_seconds, | ||
| ) |
There was a problem hiding this comment.
Severity: Nit
Confidence: High
This self._state_changed.set() is redundant: the base class _connection_loop already calls self._state_changed.set() on _stream_reconnector.py:92, immediately after await self._on_connected(conn) returns. Since there is no await between the two .set() calls, they fire in the same event-loop turn and the duplicate is a no-op.
Harmless, but removing it avoids the false impression that signaling here is load-bearing (it isn't — the base class handles it).
AI Review SummaryVerdict: ✅ No critical issues found Critical issuesNo critical issues found. Other findings
Detailed notes: The refactoring is well-designed. The two base classes (
This review was generated automatically. Critical issues require attention; other findings are advisory. |
|
Analysis performed by claude, claude-opus-4-6. |
Extracts the reconnect + bidi-stream lifecycle shared by the topic reader and writer into two base classes in
ydb/_topic_common—StreamReconnectorandStreamConnection. The reader/writer reconnectors and stream objects become thin subclasses.Why
_run(conn) = conn.wait_error()symmetric for reader and writerFixes
attemptwas reset every loop iteration → retry storm at the backoff floor; now grows correctlyNotes
ydb/_topic_common/STREAM_DESIGN.mdDraft — preserving the work, not for merge yet.