Skip to content

[Detail Bug] Streaming append retry can hang indefinitely after all inputs are acked #10

@detail-app

Description

@detail-app

Detail Bug Report

https://app.detail.dev/org_89d327b3-b883-4365-b6a3-46b6701342a9/bugs/bug_b70abee0-7b6c-4eb3-9e0d-a6c610dea224

Introduced in #1 by @quettabit on Apr 7, 2026

Summary

  • Context: The run_append_session function in _append_session.py manages a streaming append session with retry logic, coordinating between input production (pipe_inputs), request body generation (_body_gen), and ack handling.
  • Bug: When a retry is triggered after all inputs have been sent and acknowledged, _body_gen deadlocks waiting for input_queue.get() because the sentinel None (marking end of input stream) was already consumed in the first attempt.
  • Actual vs. expected: The session should either complete successfully after all inputs are acked, or properly handle the case where there's nothing to resend. Instead, it hangs indefinitely.
  • Impact: Client applications hang with no error message, requiring forced termination. Data loss is possible if the application is killed before graceful shutdown.

Code with Bug

# src/s2_sdk/_s2s/_append_session.py

async def pipe_inputs():
    try:
        async for inp in inputs:
            await input_queue.put(inp)
    finally:
        await input_queue.put(None)  # <-- Sentinel consumed only once

async def _body_gen(...):
    # ... resend logic ...

    while True:
        inp = await input_queue.get()  # <-- BUG 🔴 deadlocks on retry when sentinel was consumed in prior attempt
        if inp is None:
            return
        # ...

Explanation

On the first attempt, pipe_inputs enqueues all inputs followed by a single None sentinel. _body_gen consumes that sentinel and returns when the input stream ends.

If the server then returns a retriable error after acknowledging all sent batches (so inflight_inputs is empty), the retry logic starts a second attempt under the default AppendRetryPolicy.ALL. On that retry attempt:

  • pending_resend is empty (nothing to resend)
  • _body_gen falls through into its while True loop
  • input_queue is empty and the None sentinel will never be re-enqueued (producer already finished)

Result: _body_gen blocks forever on await input_queue.get(), so the retry attempt never sends a body and the overall session hangs.

Failing Test

import asyncio
from typing import cast

from s2_sdk._s2s._append_session import run_append_session
from s2_sdk._s2s._protocol import Message, frame_message
from s2_sdk._types import AppendInput, Record, Retry, Compression
from s2_sdk._exceptions import S2ServerError
import s2_sdk._generated.s2.v1.s2_pb2 as pb


class _TestClient:
    def __init__(self):
        self.attempt = 0
        self.body_chunks: list[list[bytes]] = []

    def streaming_request(self, method: str, path: str, **kwargs):
        self.attempt += 1
        self.body_chunks.append([])
        content = kwargs.get("content")
        return _TestResponse(self.attempt, content, self.body_chunks[-1])


class _TestResponse:
    def __init__(self, attempt: int, content, body_chunks: list[bytes]):
        self.status_code = 200
        self._attempt = attempt
        self._content = content
        self._body_chunks = body_chunks
        self._body_task = None

    async def __aenter__(self):
        if self._content:
            self._body_task = asyncio.create_task(self._consume_body())
        return self

    async def _consume_body(self):
        try:
            async for chunk in self._content:
                self._body_chunks.append(chunk)
        except asyncio.CancelledError:
            pass

    async def __aexit__(self, *args):
        if self._body_task:
            try:
                await asyncio.wait_for(self._body_task, timeout=1.0)
            except asyncio.TimeoutError:
                self._body_task.cancel()
                try:
                    await self._body_task
                except asyncio.CancelledError:
                    pass

    async def aread(self):
        return b""

    def aiter_bytes(self):
        return self._aiter_bytes()

    async def _aiter_bytes(self):
        if self._attempt == 1:
            await asyncio.sleep(0.1)
            seq_num = 0
            # Wait for all body chunks to be sent
            while len(self._body_chunks) < 2:
                await asyncio.sleep(0.01)
            # Ack all batches
            for _ in self._body_chunks:
                ack = pb.AppendAck(
                    start=pb.StreamPosition(seq_num=seq_num, timestamp=10),
                    end=pb.StreamPosition(seq_num=seq_num + 1, timestamp=10),
                    tail=pb.StreamPosition(seq_num=seq_num + 1, timestamp=10),
                )
                seq_num += 1
                yield frame_message(Message(ack.SerializeToString(), terminal=False, compression=Compression.NONE))
            # After all acks, raise retriable error
            raise S2ServerError("internal", "Internal server error", 500)
        else:
            # Second attempt - simulate server that would respond
            # but client deadlocks before sending any data
            await asyncio.sleep(100)


async def test_deadlock_with_timeout():
    client = _TestClient()

    async def _inputs():
        yield AppendInput(records=[Record(body=b"batch-1")])
        yield AppendInput(records=[Record(body=b"batch-2")])

    async def run_test():
        outputs = []
        try:
            async for output in run_append_session(
                cast("HttpClient", client),
                "test-stream",
                _inputs(),
                retry=Retry(max_attempts=2),
                compression=Compression.NONE,
            ):
                outputs.append(output)
        except Exception as e:
            print(f"Exception: {type(e).__name__}: {e}")
        return outputs

    try:
        # 3 second timeout should be plenty for a normal flow
        outputs = await asyncio.wait_for(run_test(), timeout=3.0)
        print(f"Completed normally with {len(outputs)} outputs")
    except asyncio.TimeoutError:
        print("DEADLOCK CONFIRMED: Test timed out after 3 seconds")
        print(f"First attempt body chunks: {len(client.body_chunks[0])}")
        if len(client.body_chunks) > 1:
            print(f"Second attempt body chunks: {len(client.body_chunks[1])}")


asyncio.run(test_deadlock_with_timeout())

Test output:

DEADLOCK CONFIRMED: Test timed out after 3 seconds
First attempt body chunks: 2
Second attempt body chunks: 0

Recommended Fix

Track whether the input stream has been exhausted, and have _body_gen exit without blocking when retrying after exhaustion.

Proposed approach from exploration:

  • Set an input_exhausted flag in pipe_inputs before enqueuing the sentinel.
  • Pass input_exhausted through _run_attempt into _body_gen.
  • In _body_gen, check the flag before awaiting input_queue.get() (so retry attempts with no more inputs don’t hang).

History

This bug was introduced in commit 3dc9795. The initial retry implementation read from input_queue in an unbounded loop and relied on a single-consumption None sentinel, with no persistent indication (across attempts) that input had already been exhausted.

Metadata

Metadata

Assignees

Labels

detail-bugbug flagged by https://detail.dev/

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions