Skip to content

[Detail Bug] HTTP/2 flow-control bytes can be lost during request cleanup, causing stalled connections after repeated timeouts #25

@detail-app

Description

@detail-app

Detail Bug Report

https://app.detail.dev/org_89d327b3-b883-4365-b6a3-46b6701342a9/bugs/bug_43848909-62a0-44b2-b816-eefb18cc0b70

Summary

  • Context: The HttpClient class in _client.py implements HTTP/2 flow control to manage data transmission between client and server, using unacked_flow_bytes to track bytes received but not yet acknowledged.
  • Bug: A race condition in _take_all_unacked_flow_bytes can cause received bytes to be lost (never acknowledged) when data arrives during cleanup after a timeout or early exit.
  • Actual vs. expected: Bytes arriving during await conn.ack_data(...) in the finally block are captured by _handle_event but never acknowledged to the server.
  • Impact: Connection flow control window becomes corrupted over time, potentially causing connection stalls when repeated timeouts occur.

Code with Bug

# In unary_request (lines 172-183) and streaming_request (lines 265-275)
finally:
    if stream_id is not None:
        nbytes = _take_all_unacked_flow_bytes(state)  # <-- BUG: Captures current bytes, resets to 0
        if nbytes > 0:
            try:
                await conn.ack_data(stream_id, nbytes)  # <-- Yields control; _recv_loop can run here
            except Exception:
                pass
        if not state.ended.is_set():
            await conn.reset_stream(stream_id)
    conn.release_stream(stream_id, state)
    pc.touch_idle()
# _take_all_unacked_flow_bytes (lines 992-995)
def _take_all_unacked_flow_bytes(state: _StreamState) -> int:
    nbytes = state.unacked_flow_bytes
    state.unacked_flow_bytes = 0  # <-- BUG 🔴 Resets before the ack is actually sent
    return nbytes
# _handle_event in _recv_loop (lines 901-905)
elif isinstance(event, h2.events.DataReceived):
    state = self._streams.get(event.stream_id)
    if state:
        state.data_queue.put_nowait((event.data, event.flow_controlled_length))
        state.unacked_flow_bytes += event.flow_controlled_length  # <-- Can increment during cleanup

Explanation

During request cleanup, the code reads and resets state.unacked_flow_bytes before awaiting conn.ack_data(...). While that await is pending (e.g., waiting on _write_lock), _recv_loop can process DataReceived events and increment unacked_flow_bytes. Those newly received bytes are not included in the already-captured nbytes, and then release_stream() removes the stream state—so the extra bytes are never acknowledged to the server. Over repeated timeouts/early exits, this desynchronizes the HTTP/2 flow-control window and can eventually stall the connection.

Failing Test

import asyncio
from s2_sdk._client import _StreamState, _take_all_unacked_flow_bytes

async def test_race():
    state = _StreamState()
    state.unacked_flow_bytes = 1000
    
    increment_happened = asyncio.Event()
    
    async def simulate_recv_loop():
        await asyncio.sleep(0.001)  # Let main task start
        state.unacked_flow_bytes += 500  # Simulates _handle_event
        increment_happened.set()
    
    async def simulate_cleanup():
        nbytes = _take_all_unacked_flow_bytes(state)  # Reads 1000, resets to 0
        await asyncio.sleep(0.002)  # Simulates await conn.ack_data(...)
        return nbytes
    
    recv_task = asyncio.create_task(simulate_recv_loop())
    nbytes = await simulate_cleanup()
    await recv_task
    
    print(f"Acked: {nbytes}")  # 1000
    print(f"Lost: {state.unacked_flow_bytes}")  # 500 (never acked!)
    
asyncio.run(test_race())

Output:

Acked: 1000
Lost: 500

Recommended Fix

Move the capture-and-reset into the lock-protected section:

finally:
    if stream_id is not None:
        async with conn._write_lock:  # <-- FIX: Acquire lock before capturing
            nbytes = state.unacked_flow_bytes
            state.unacked_flow_bytes = 0
        if nbytes > 0:
            try:
                await conn.ack_data(stream_id, nbytes)
            except Exception:
                pass
        if not state.ended.is_set():
            await conn.reset_stream(stream_id)
    conn.release_stream(stream_id, state)
    pc.touch_idle()

Metadata

Metadata

Assignees

Labels

detail-bugbug flagged by https://detail.dev/

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions