Skip to content

ENG-9398: fix upload handler race condition#6368

Merged
adhami3310 merged 2 commits into
mainfrom
fix-enqueue-stream-delta-race
Apr 23, 2026
Merged

ENG-9398: fix upload handler race condition#6368
adhami3310 merged 2 commits into
mainfrom
fix-enqueue-stream-delta-race

Conversation

@adhami3310
Copy link
Copy Markdown
Member

No description provided.

@linear
Copy link
Copy Markdown

linear Bot commented Apr 22, 2026

@codspeed-hq
Copy link
Copy Markdown

codspeed-hq Bot commented Apr 22, 2026

Merging this PR will not alter performance

✅ 9 untouched benchmarks


Comparing fix-enqueue-stream-delta-race (40da3d9) with main (bdc2ee7)

Open in CodSpeed

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 22, 2026

Greptile Summary

This PR fixes a race condition in enqueue_stream_delta where back-to-back deltas emitted by upload handlers could be silently dropped. The old as_completed + waiting_for-set pattern allowed a pending deltas.get() task to consume a queued delta at the same tick that all_task_futures resolved, causing the outer loop to exit before yielding it. The fix extracts a _stream_queue_until_done helper that uses a sentinel value pushed through the same queue by a done-callback, guaranteeing no items enqueued before the sentinel are ever lost.

Confidence Score: 5/5

Safe to merge — the fix is correct and well-tested; the only remaining finding is a minor asyncio warning in error paths.

All remaining findings are P2 (style/cosmetic). The core race-condition fix is sound: the sentinel-in-queue pattern is a well-established approach, and the six new unit tests plus the regression test give strong confidence in edge-case coverage. The uv.lock additions are unrelated housekeeping.

No files require special attention; the minor unhandled-exception warning in _stream_queue_until_done is cosmetic.

Important Files Changed

Filename Overview
packages/reflex-base/src/reflex_base/event/processor/event_processor.py Replaces racy as_completed + waiting_for-set pattern with a cleaner sentinel-in-queue helper (_stream_queue_until_done) to fix delta loss during upload handler completion; one minor unhandled-exception warning in error paths.
tests/units/reflex_base/event/processor/test_event_processor.py Adds regression test for rapid back-to-back deltas and six unit tests covering _stream_queue_until_done edge cases (ordering, draining, empty stream, watcher exception, early cancellation, concurrent put+completion).
uv.lock Adds email-validator, httpx, pyyaml, and ruff-format as dependencies of reflex-site-shared; unrelated to the upload handler race fix.

Sequence Diagram

sequenceDiagram
    participant C as Caller
    participant EP as enqueue_stream_delta
    participant H as Handler Task (task_future)
    participant Q as deltas Queue
    participant S as _stream_queue_until_done

    C->>EP: enqueue_stream_delta(token, event)
    EP->>H: enqueue(event) → task_future
    EP->>S: _stream_queue_until_done(queue=deltas, done_when=task_future.wait_all())
    S->>S: create_task(done_when) → watcher
    S->>S: watcher.add_done_callback(→ queue.put_nowait(sentinel))

    H->>Q: emit_delta({i: 0}) → put(delta0)
    S->>Q: await queue.get() → delta0
    S-->>EP: yield delta0
    EP-->>C: yield delta0

    H->>Q: emit_delta({i: 1}) → put(delta1)
    H->>H: complete (task_future done)
    H-->>S: done_callback fires → queue.put_nowait(sentinel)

    S->>Q: await queue.get() → delta1
    S-->>EP: yield delta1
    EP-->>C: yield delta1

    S->>Q: await queue.get() → sentinel
    S->>S: return (end of stream)
    EP->>H: await task_future.wait_all() (re-raise any exception)
    EP-->>C: stream complete
Loading

Reviews (2): Last reviewed commit: "make helper and write tests" | Re-trigger Greptile

Comment thread packages/reflex-base/src/reflex_base/event/processor/event_processor.py Outdated
masenf
masenf previously approved these changes Apr 22, 2026
@adhami3310
Copy link
Copy Markdown
Member Author

@greptile

@adhami3310 adhami3310 merged commit 9ee70de into main Apr 23, 2026
68 checks passed
@adhami3310 adhami3310 deleted the fix-enqueue-stream-delta-race branch April 23, 2026 00:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants