Skip to content

fix: speed up scheduler queue views#728

Merged
eric-tramel merged 4 commits into
mainfrom
codex/fix-724-scheduler-queue-view
Jun 2, 2026
Merged

fix: speed up scheduler queue views#728
eric-tramel merged 4 commits into
mainfrom
codex/fix-724-scheduler-queue-view

Conversation

@eric-tramel
Copy link
Copy Markdown
Contributor

@eric-tramel eric-tramel commented Jun 1, 2026

📋 Summary

Fixes scheduler hot-path scaling from Issue #724 by maintaining fair-queue group counts and resource demand incrementally. This keeps FairTaskQueue.view() from rebuilding queue summaries by scanning every queued task during dispatch and diagnostics.

🔗 Related Issue

Fixes #724

🔄 Changes

  • Maintain queued counts and resource demand when tasks are enqueued, discarded, or committed.
  • Build QueueView from maintained accounting plus first-candidate group heads.
  • Add regression coverage for accounting updates after removals, duplicate enqueue accounting, and avoiding full non-candidate task resource scans.

🧪 Testing

  • make check-all-fix
  • uv run pytest packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_queue.py - 10 passed
  • make test-engine - 2,217 passed
  • Unit tests added/updated
  • E2E tests added/updated: N/A - scheduler unit/integration coverage only

Performance Demonstration

Same-machine benchmark loading origin/main and the fixed branch in one run:

Scenario origin/main median Fixed branch median Improvement
FairTaskQueue.view() over 8,192 queued tasks, 100 calls 0.754456s 0.033932s 22.2x faster, 95.5% less time
Dispatch loop over 2,048 queued tasks, 512 select+commit 0.858851s 0.103152s 8.3x faster, 88.0% less time

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated: N/A - internal performance fix with no public behavior change

Maintain fair-queue group counts and resource demand as tasks enter and leave the ready queue, so QueueView creation no longer scans every queued task in scheduler hot paths.

Add regression coverage for queue accounting after discard/commit and for avoiding full queued-task value scans.

Fixes #724

Signed-off-by: Eric W. Tramel <1223539+eric-tramel@users.noreply.github.com>
@eric-tramel eric-tramel requested a review from a team as a code owner June 1, 2026 22:33
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 1, 2026

Review: PR #728 — fix: speed up scheduler queue views

Summary

Replaces the O(N-queued-tasks) scan inside FairTaskQueue.view() with O(unique-groups) reads from incrementally maintained accounting counters. Three new bookkeeping fields (_queued_by_group, _queued_resource_demand_by_group, _queued_peer_demand_by_resource) are updated on enqueue/discard/commit through a pair of small helpers (_increment_queue_accounting / _decrement_queue_accounting) and a unified _remove_queued_item shared by the discard and commit paths. view() now only iterates groups (not tasks) to look up first-candidate metadata, while the totals come straight from the maintained counters. Public surface (QueueView shape) is unchanged. Reported speedup: 71× on the view() micro-benchmark and 7.6× on the dispatch loop benchmark — consistent with eliminating the per-task resource_request.amounts.items() scan.

Findings

Correctness

  • Discard / commit symmetry looks right. commit now does queue.popleft() then _remove_queued_item(...) (which decrements the maintained counters); discard calls _remove_queued_item directly and bumps _sequence_version only when something was actually removed. Behavior matches the prior code, with bookkeeping kept in sync.
  • _first_valid_item simplification is sound. It now purges the head and returns queue[0] (or None). This is equivalent to the prior linear scan because _purge_queue_head already walks the deque dropping any leading entries that aren't in _queued/_task_groups. The linear scan past the head was dead code in practice — the head is always either the first valid item or gets purged.
  • Counter self-zeroing ensures keys don't accumulate. _decrement_queue_accounting deletes counter keys at <= 0, so view() doesn't silently grow stale entries. The defensive count > 0 filter in view() is belt-and-suspenders; either mechanism alone would suffice. Keeping both is fine — they're cheap.
  • view() still iterates self._queues.items() rather than self._queued_by_group. _queues is never pruned when a group empties out, so over a long run with many transient groups, this loop is O(groups-ever-seen). Each empty entry short-circuits via the <= 0 check, so it's not a hot-path issue, but iterating self._queued_by_group directly would be tighter. (queue.py:148-156)
  • Edge case: zero-valued amounts. If a resource_request ever contains {r: 0}, the increment writes 0 into the per-group / per-resource Counter via += 0, leaving a 0 entry that the count > 0 filter then suppresses in view(). Decrement rebalances it cleanly. Not a real problem (resource amounts shouldn't be zero), but worth noting that the count > 0 filter is what saves view consumers from seeing them.
  • Thread safety unchanged. No locking added; this matches the prior code, which also assumed single-threaded mutation.

Tests

  • test_queue_view_updates_incremental_accounting_after_removals covers the mixed-removal flow (enqueue 3, discard 1, select-and-commit 1) and asserts the maintained counters reach the expected steady state across all three Counters in QueueView. Good targeted coverage of the new bookkeeping.
  • test_queue_view_uses_incremental_accounting_for_non_candidate_tasks is the standout: a _FailIfScannedAmounts dict subclass raises if items() is called after enqueue, which is exactly the scan that the fix eliminates. This is a regression guard that will catch any future change that re-introduces the O(N-tasks) scan in view(). Nice.
  • One small caveat on the guard: it only intercepts items(). A future refactor that scanned via keys(), values(), or item access wouldn't trigger it. Considering adding __iter__ / keys / values overrides would make the guard more robust, but it's not necessary for the current fix.
  • No new test exercises discard_where, but discard_where delegates to discard, which goes through _remove_queued_item — the new test does exercise that path.

Style / conventions

  • Absolute imports, from __future__ import annotations, modern type syntax, no public-API churn — all consistent with STYLEGUIDE.md.
  • Helper method names (_increment_queue_accounting, _decrement_queue_accounting, _remove_queued_item) are clear and parallel.
  • Long attribute names like _queued_resource_demand_by_group are slightly verbose but match the existing queued_resource_demand_by_group field on QueueView, so the symmetry is worth the length.
  • Comments are correctly absent — the code is self-explanatory.

Performance

  • The fix moves view() from O(N-queued-tasks × avg-resources-per-task) to O(unique-groups-with-tasks). The reported 71× speedup at 8,192 queued tasks is what you'd expect.
  • Per-task overhead at enqueue/discard/commit is now O(resources-per-task) for counter maintenance — the same big-O as before, just shifted forward. Net win because view() is called more often than enqueue per dispatch loop.
  • The commit path now calls _purge_queue_head plus _first_valid_item (which itself purges), so there's a redundant purge in some flows. Each purge is cheap when the head is valid (one comparison and break), so this is negligible.

Verdict

Approve with minor optional follow-ups. This is a clean, well-targeted performance fix. The accounting is maintained symmetrically with the existing _queued/_task_groups mutations, the public QueueView shape is preserved, and the new tests both validate correctness and pin in the performance characteristic via a clever scan-detector. Optional follow-ups, none blocking:

  • Iterate self._queued_by_group instead of self._queues in view() to avoid touching long-dead group entries (queue.py:148).
  • Broaden _FailIfScannedAmounts to also fail on keys(), values(), and __iter__ so future refactors can't sneak past the guard.
  • Consider also pruning empty entries in self._queues when a group's count hits zero, so the view() loop stays tight even after long-running schedulers churn through many groups.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 1, 2026

Greptile Summary

This PR replaces the O(n) full-queue scan in FairTaskQueue.view() with O(1) lookups by maintaining three incremental accounting counters — _queued_by_group, _queued_resource_demand_by_group, and _queued_peer_demand_by_resource — that are kept current on every enqueue, discard, and commit.

  • Incremental accounting: _increment_queue_accounting and _decrement_queue_accounting are introduced to update counters atomically, and a shared _remove_queued_item helper is used consistently across discard, commit, and the new accounting path.
  • view() and _first_valid_item refactored: view() iterates only the pre-built group set to find first-candidate tasks (lazy-purging deque heads), and _first_valid_item now delegates all stale-head removal to _purge_queue_head before returning queue[0] directly.
  • New regression tests: Three tests cover duplicate-enqueue accounting correctness, accounting integrity after discard + commit, and verification that non-candidate task resources are never re-scanned during view().

Confidence Score: 5/5

Safe to merge — the incremental accounting is correctly maintained across all mutation paths, the lazy deque purge does not create counter drift, and the view() refactor is a straightforward substitution.

All three counters are incremented exactly once per enqueue (guarded by the existing idempotency check) and decremented exactly once per removal via the shared _remove_queued_item helper. _purge_queue_head only pops items already absent from _queued, so it never double-decrements accounting. The new tests exercise the key removal and duplicate-enqueue paths explicitly. No correctness issues were found.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/queue.py Core scheduler rewrite — incremental accounting counters added and maintained correctly across enqueue/discard/commit; view() and _first_valid_item simplified to use pre-built state; no logic errors found.
packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_queue.py Three new tests cover idempotent-enqueue accounting, post-removal accounting, and a locked-items sentinel that enforces non-scanning of non-candidate tasks.

Reviews (3): Last reviewed commit: "Merge branch 'main' into codex/fix-724-s..." | Re-trigger Greptile

Signed-off-by: Eric W. Tramel <1223539+eric-tramel@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

@johnnygreco johnnygreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review complete. I did not find any actionable issues.

I checked the fair-queue accounting changes around duplicate enqueue, discard, commit, lazy head purging, and QueueView demand snapshots. The incremental counters look consistent with the previous queue membership contract, and the added tests cover the important stale-accounting risks.

Verification run:

  • ruff check on the changed files
  • ruff format --check on the changed files
  • pytest packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_queue.py -q (10 passed)
  • git diff --check origin/main...HEAD

Copy link
Copy Markdown
Contributor

@johnnygreco johnnygreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved. My review found no actionable issues or required updates.

@eric-tramel eric-tramel merged commit f121bba into main Jun 2, 2026
61 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Async scheduler large ready queues burn CPU in queue observation

2 participants