Fix gRPC thread leak on failed topic writer reconnect#845
Conversation
WriterAsyncIOStream.create() left the stream open when startup failed after stream.start(), leaking a gRPC consumer thread per reconnect attempt. Close it on failure (mirrors ReaderStream.create) and add a regression test.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #845 +/- ##
=======================================
Coverage 81.05% 81.05%
=======================================
Files 94 94
Lines 12101 12109 +8
Branches 1182 1184 +2
=======================================
+ Hits 9808 9815 +7
+ Misses 1836 1835 -1
- Partials 457 459 +2
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Fixes a gRPC consumer-thread leak in the topic writer reconnection path by ensuring WriterAsyncIOStream.create() closes the underlying GrpcWrapperAsyncIO stream when initialization fails after stream.start(). Adds a regression test that reproduces the leak using a real in-process gRPC streaming RPC (so the gRPC consumption thread is actually spawned).
Changes:
- Wrap
WriterAsyncIOStream.create()initialization intry/exceptand close the stream on failure (mirrorsReaderStream.create()behavior). - Add an in-process gRPC server–based regression test that repeatedly triggers create failures and asserts no consumer threads remain stranded.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| ydb/_topic_writer/topic_writer_asyncio.py | Ensures streams started during writer creation are closed on init failure to prevent stranded gRPC consumption threads. |
| ydb/_topic_writer/topic_writer_asyncio_test.py | Adds a regression test using an in-process gRPC StreamWrite endpoint to detect the thread leak. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
AI Review Summary
Verdict: ✅ No critical issues found
Critical issues
No critical issues found.
Other findings
- Minor | Medium: Cleanup logic in
create()diverges from theReaderStream.create()pattern it claims to mirror;close()is unsafe for partially-initialized writers —ydb/_topic_writer/topic_writer_asyncio.py:864 - Minor | Medium:
WriterAsyncIOStream.close()unconditionally accessesself._streamwithout a guard, unlikeReaderStream.close()—ydb/_topic_writer/topic_writer_asyncio.py:836 - Nit | Medium: Thread-leak detection in the test relies on stack-frame inspection of specific filenames and method names —
ydb/_topic_writer/topic_writer_asyncio_test.py:1053
This review was generated automatically. Critical issues require attention; other findings are advisory.
If this comment was useful, please give it a 👍 — it helps us improve the review bot.
|
Analysis performed by claude, claude-opus-4-6. |
Set _stream at the start of _start() and guard close() (mirrors ReaderStream); match the exact __next__ code object for leak detection and wait for the test gRPC server to stop.
There was a problem hiding this comment.
AI Review Summary
Verdict: ✅ No critical issues found
Critical issues
No critical issues found.
Other findings
-
Nit | Medium: Stale comment in existing test
test_init_timeout_behavior—ydb/_topic_writer/topic_writer_asyncio_test.py:288Line 288 says _"Don't close writer since _start failed and stream was never set". After this PR,
_start()assignsself._stream = streamas its first action (line 893), so_streamis now set even when_start()fails mid-handshake. The comment should be updated and ideally the test should callawait writer.close()for consistency with the new cleanup pattern. (The mock stream is still cleaned up by thestreamfixture teardown, so this is cosmetic.)
Notes on the review:
The fix is well-targeted and correctly mirrors the already-proven ReaderStream.create() pattern (line 586 in topic_reader_asyncio.py). The three changes form a coherent unit:
create()try/except — ensures the stream is closed on any failure afterstream.start(), whether the failure is in theWriterAsyncIOStreamconstructor or in_start()._streamassignment moved earlier in_start()— guaranteesclose()can reach the stream even if the init handshake fails (timeout, status error, connection loss).getattrguard inclose()— safely handles the case whereclose()is called before_start()runs (since_streamis not set in__init__).
The regression test is thorough: it uses a real in-process gRPC server (not mocks) to reproduce the actual thread leak, and detects stranded threads by matching the exact AsyncQueueToSyncIteratorAsyncIO.__next__ code object in sys._current_frames().
This review was generated automatically. Critical issues require attention; other findings are advisory.
If this comment was useful, please give it a 👍 — it helps us improve the review bot.
|
Analysis performed by claude, claude-opus-4-6. |
🌋 SLO Test Results🟢 2 workload(s) tested — All thresholds passed
Generated by ydb-slo-action |
Restrict the test gRPC handler to StreamWrite and assert against a baseline thread count.
Problem
On connection loss during reconnect,
WriterAsyncIOStream.create()raised afterstream.start()without closing the stream, stranding a gRPC consumption thread on every attempt → unbounded growth →RuntimeError: can't start new thread. Happens even without anywrite()call.Fix
Close the stream on failure, mirroring the already-fixed
ReaderStream.create().