Skip to content

fix: producer.close() orphans drain task on error #17

Merged
quettabit merged 1 commit intomainfrom
qb/iss-9
Apr 8, 2026
Merged

fix: producer.close() orphans drain task on error #17
quettabit merged 1 commit intomainfrom
qb/iss-9

Conversation

@quettabit
Copy link
Copy Markdown
Member

No description provided.

@quettabit quettabit requested a review from a team as a code owner April 8, 2026 07:30
@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented Apr 8, 2026

Greptile Summary

This PR fixes a resource leak in Producer.close() where a background drain task (_drain_task) could be orphaned if _flush() or _session.close() raised an exception. The fix wraps those two awaits in a try/finally block so that _batch_ready.set() and await self._drain_task are always executed, regardless of whether an error occurred.

Confidence Score: 5/5

Safe to merge — targeted one-file fix with no new logic, correctly prevents drain task orphaning on error.

The change is minimal: one try/finally wrapping two awaits. The finally block correctly signals _batch_ready and awaits the drain task in all exit paths. _drain_acks catches BaseException internally so it won't raise from the finally block and mask the original exception. No P0/P1 findings.

No files require special attention.

Vulnerabilities

No security concerns identified.

Important Files Changed

Filename Overview
src/s2_sdk/_producer.py Wraps _flush() + _session.close() in try/finally so the drain task is always signalled and awaited; fix is correct and minimal.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant Producer
    participant Session as AppendSession
    participant DrainTask as _drain_task (_drain_acks)

    Note over Caller,DrainTask: Happy path (no error)
    Caller->>Producer: close()
    Producer->>Producer: _closed = True
    Producer->>Session: _flush() → submit batch
    Session-->>Producer: ok
    Producer->>Session: session.close()
    Session-->>Producer: ok
    Note over Producer: finally block always runs
    Producer->>DrainTask: _batch_ready.set()
    Producer->>DrainTask: await _drain_task
    DrainTask-->>Producer: resolved (self._closed=True → return)
    Producer-->>Caller: (no error raised)

    Note over Caller,DrainTask: Error path (e.g. _flush raises)
    Caller->>Producer: close()
    Producer->>Producer: _closed = True
    Producer->>Session: _flush()
    Session-->>Producer: raises exception (sets self._error)
    Note over Producer: finally block still runs
    Producer->>DrainTask: _batch_ready.set()
    Producer->>DrainTask: await _drain_task
    DrainTask-->>Producer: sees _closed=True, returns
    Producer-->>Caller: exception propagated
Loading

Reviews (1): Last reviewed commit: "initial commit" | Re-trigger Greptile

@quettabit quettabit merged commit e448d1d into main Apr 8, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Detail Bug] Producer close() can orphan background drain task when session.close() raises

1 participant