fix: expose row-group admission controls#745
Conversation
Review: PR #745 — Expose row-group admission controlsSummaryThis PR promotes the previously-internal row-group admission controls of The core API is small, well validated, and correctly preserves defaults: FindingsBehavior change worth a release-note callout (medium)
In practice this is unlikely to bite typical users (
|
Greptile SummaryThis PR exposes
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Threads row-group admission config through constructor; _pending_row_group_admission_size set/cleared in try/finally cleanly prevents stale adaptive-target updates; in_flight_count==0 guard on checkpoint is correct; _dispatched cleanup for checkpointed groups fixes the old skip-flag leak; DatasetGenerationError now propagated on finalization failure. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/completion.py | Adds release_row_group with memory-bounded range-summary data structures; _batch_complete changed from set to dict (column→size) for correct size-aware completion checks; _remaining_cell_rows lazy-init cache is correctly updated on mark_cell_complete and drop_row with idempotency guards; range-merge algorithm and _trim_released_summary_ranges are correct. |
| packages/data-designer-config/src/data_designer/config/run_config.py | Adds RowGroupAdmissionMode, RowGroupAdmissionConfig, and related constants; adds RunConfig.validate_row_group_admission_budget cross-field validator; all validation logic is correct and well-tested. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | Correctly translates RunConfig.row_group_admission into AsyncTaskScheduler kwargs; defensive ValueError for un-normalized adaptive_initial_target is safe since the Pydantic validator always normalizes it. |
| packages/data-designer-engine/src/data_designer/engine/capacity.py | Adds max_admitted_rows_source field to RowGroupAdmission dataclass for telemetry; minimal change, correct. |
Sequence Diagram
sequenceDiagram
participant User
participant RunConfig
participant DatasetBuilder
participant AsyncTaskScheduler
participant CompletionTracker
User->>RunConfig: "RunConfig(row_group_admission=RowGroupAdmissionConfig(...))"
RunConfig->>RunConfig: validate_adaptive_settings() / validate_row_group_admission_budget()
User->>DatasetBuilder: build() / run_async()
DatasetBuilder->>DatasetBuilder: _prepare_async_run() translate RowGroupAdmissionConfig to kwargs
DatasetBuilder->>AsyncTaskScheduler: __init__(max_concurrent_row_groups, adaptive_row_group_admission, max_admitted_rows, row_group_admission_source)
AsyncTaskScheduler->>AsyncTaskScheduler: _validate_row_group_row_budget()
AsyncTaskScheduler->>AsyncTaskScheduler: run() launch _admit_row_groups task
loop For each row group
AsyncTaskScheduler->>AsyncTaskScheduler: "_pending = rg_size"
AsyncTaskScheduler->>AsyncTaskScheduler: _wait_for_row_group_admission_capacity(rg_size)
AsyncTaskScheduler->>AsyncTaskScheduler: "_pending = None (finally)"
AsyncTaskScheduler->>AsyncTaskScheduler: acquire rg_semaphore, dispatch seeds
end
loop Main dispatch loop
AsyncTaskScheduler->>AsyncTaskScheduler: _maybe_update_adaptive_row_group_target() early-exit if _pending is None
AsyncTaskScheduler->>AsyncTaskScheduler: "_checkpoint_completed_row_groups() guard in_flight_count==0"
AsyncTaskScheduler->>CompletionTracker: release_row_group(rg_id, rg_size, all_columns)
CompletionTracker->>CompletionTracker: compress to _ReleasedRows range summary
AsyncTaskScheduler->>AsyncTaskScheduler: rg_semaphore.release() + row_group_admission_event.set()
end
AsyncTaskScheduler->>User: capacity_plan() with max_admitted_rows_source
Reviews (2): Last reviewed commit: "Expose row-group admission controls" | Re-trigger Greptile
| diagnostics = self._row_group_admission_diagnostics(reason=reason) | ||
| if extra: | ||
| diagnostics |= extra | ||
| self._emit_scheduler_event(event_kind, diagnostics=diagnostics) | ||
| self._emit_scheduler_health_snapshot(event_kind) | ||
|
|
||
| async def _admit_row_groups(self) -> None: | ||
| """Admit row groups as semaphore slots become available.""" | ||
| all_admitted = True | ||
| for rg_id, rg_size in self._row_groups: | ||
| await self._wait_for_row_group_admission_capacity(rg_size) | ||
| if self._early_shutdown or self._fatal_worker_error is not None: | ||
| all_admitted = False | ||
| break | ||
| await self._rg_semaphore.acquire() | ||
| if self._early_shutdown or self._fatal_worker_error is not None: | ||
| self._rg_semaphore.release() | ||
| all_admitted = False | ||
| break | ||
| if not self._row_group_row_guard_allows(rg_size): | ||
| self._rg_semaphore.release() | ||
| try: | ||
| for rg_id, rg_size in self._row_groups: | ||
| self._pending_row_group_admission_size = rg_size | ||
| await self._wait_for_row_group_admission_capacity(rg_size) | ||
| if self._early_shutdown or self._fatal_worker_error is not None: | ||
| all_admitted = False | ||
| break | ||
| await self._rg_semaphore.acquire() | ||
| if self._early_shutdown or self._fatal_worker_error is not None: | ||
| self._rg_semaphore.release() | ||
| all_admitted = False | ||
| break | ||
| self._rg_states[rg_id] = _RowGroupState(size=rg_size) | ||
| self._rg_states[rg_id] = _RowGroupState(size=rg_size) | ||
|
|
||
| if self._buffer_manager is not None: | ||
| self._buffer_manager.init_row_group(rg_id, rg_size) | ||
| if self._buffer_manager is not None: | ||
| self._buffer_manager.init_row_group(rg_id, rg_size) | ||
|
|
||
| await self._dispatch_seeds(rg_id, rg_size) | ||
| self._emit_scheduler_event( | ||
| "row_group_admitted", | ||
| diagnostics=self._row_group_admission_diagnostics(reason="admitted") | ||
| | {"row_group": rg_id, "row_group_size": rg_size}, | ||
| ) | ||
| self._emit_scheduler_health_snapshot("row_group_admitted") | ||
| self._wake_event.set() | ||
| await self._dispatch_seeds(rg_id, rg_size) | ||
| self._emit_row_group_admission_event( | ||
| "row_group_admitted", | ||
| reason="admitted", | ||
| extra={"row_group": rg_id, "row_group_size": rg_size}, | ||
| ) | ||
| self._wake_event.set() | ||
| finally: | ||
| self._pending_row_group_admission_size = None | ||
| self._all_rgs_admitted = all_admitted | ||
| self._wake_event.set() |
There was a problem hiding this comment.
Stale pending size in the post-admit window for adaptive mode
Between self._rg_states[rg_id] = _RowGroupState(size=rg_size) and the start of the next loop iteration (which sets _pending_row_group_admission_size = next_rg_size), _pending_row_group_admission_size still holds the size of the just-admitted row group. During await self._dispatch_seeds(...), worker coroutines from earlier row groups can call _maybe_update_adaptive_row_group_target, which forwards this value to _adaptive_row_group_block_reason. The row-guard check then evaluates admitted_rows + just_admitted_size, double-counting the admitted group's rows, and may return "max_admitted_rows" even when the actual next pending row group would fit.
The old _next_unadmitted_row_group_size() walk always returned the true next-unadmitted size. The new approach avoids the O(n) scan but introduces this brief stale window. For uniform buffer_size workloads (the common case) the effect is invisible, but for heterogeneous row-group sizes with a tight max_admitted_rows budget, the adaptive target can be held back for one update cycle longer than necessary.
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 1001-1037
Comment:
**Stale pending size in the post-admit window for adaptive mode**
Between `self._rg_states[rg_id] = _RowGroupState(size=rg_size)` and the start of the next loop iteration (which sets `_pending_row_group_admission_size = next_rg_size`), `_pending_row_group_admission_size` still holds the size of the *just-admitted* row group. During `await self._dispatch_seeds(...)`, worker coroutines from earlier row groups can call `_maybe_update_adaptive_row_group_target`, which forwards this value to `_adaptive_row_group_block_reason`. The row-guard check then evaluates `admitted_rows + just_admitted_size`, double-counting the admitted group's rows, and may return `"max_admitted_rows"` even when the actual next pending row group would fit.
The old `_next_unadmitted_row_group_size()` walk always returned the true next-unadmitted size. The new approach avoids the O(n) scan but introduces this brief stale window. For uniform `buffer_size` workloads (the common case) the effect is invisible, but for heterogeneous row-group sizes with a tight `max_admitted_rows` budget, the adaptive target can be held back for one update cycle longer than necessary.
How can I resolve this? If you propose a fix, please make it concise.Fixes NVIDIA-NeMo#741 Signed-off-by: Eric W. Tramel <1223539+eric-tramel@users.noreply.github.com>
1872d25 to
abe044f
Compare
📋 Summary
Adds public
RunConfig.row_group_admissioncontrols so async runs can tune fixed/adaptive row-group admission and active-row guardrails without reaching into engine internals. The scheduler now reports the configured/adaptive row-group state in capacity plans, rejects unsafe oversized row groups when a row budget is active, and releases checkpointed completion state without retaining unbounded row/drop summaries.🔗 Related Issue
Fixes #741
🔄 Changes
RowGroupAdmissionConfig/RowGroupAdmissionMode, public caps, validation, and lazy config exports.RunConfiginto the async scheduler.3) as row-group-count-only, while widened fixed and adaptive modes derive a capped active-row guard when one is not explicit.🧪 Testing
make testpassesAdditional validation:
make check-all-fixNODE_OPTIONS=--experimental-global-webcrypto make check-fern-docs DOCS_PYTHON_VERSION=3.12(fern check: 0 errors, 1 warning)634 passed; engine2273 passed; interface955 passed, 1 skipped50 passedpeak_mb=0.386), alternating released summaries stay capped at4096, reopened released ranges stay capped, 1M-row completion polling remains cache-backed, and 64x16k fixed/adaptive mock scheduler runs retain bounded summaries.✅ Checklist