Refactor: drop slot ring, make DistTaskSlotState storage dynamic#563
Conversation
Removes the fixed DIST_TASK_WINDOW_SIZE slot pool and the per-slot array
DistWorker used to carry. At L3 the slot state lives entirely in the
parent process's heap -- never crossed into child workers -- so the ring
index L2 uses to address shmem descriptors buys us nothing here. Only
the heap needs a pre-sized region for MAP_SHARED fork inheritance.
- DistRing:
- init() drops window_size; takes only (heap_bytes, timeout_ms).
- alloc() returns a monotonic task id; no back-pressure on slot count,
only on heap space.
- Owns the slot state pool as std::deque<std::unique_ptr<SlotState>>.
push_back never invalidates existing pointers, so slot_state(id)
returns a pointer that stays valid for the slot's lifetime without
holding the mutex past the lookup.
- released_ and slot_heap_end_ become std::vector<>, grown via
push_back on alloc, indexed directly by task id.
- advance_last_alive_locked no longer needs to undo the released bit
(entries aren't recycled within a run; reset_to_empty clears them
all at drain).
- New reset_to_empty(): drops all slot state and zeroes counters.
DistOrchestrator::drain() calls it right after active_tasks_ hits 0
so each Worker.run() starts from task id 0 with bounded memory.
- DistOrchestrator::init drops slots/num_slots params. slot_state(id)
delegates to ring.slot_state(id) with a nullptr->throw guard.
- DistScheduler::Config drops slots/num_slots; takes DistRing* and reads
slot state via ring->slot_state(id) at every access site.
- DistWorker drops the std::unique_ptr<SlotState[]> member; slot state
is now entirely in allocator_. DistWorker::init() is a straight
passthrough to allocator_/orchestrator_/scheduler_.
- dist_types.h: remove DIST_TASK_WINDOW_SIZE constant.
Tests:
- test_dist_ring rewritten: drop window_size tests, add
SlotAllocGrowsPastLegacyWindow (2048 allocs past the old 128 cap),
SlotStateIsPointerStable (push_back doesn't invalidate refs),
ResetToEmptyRequiresAllReleased, ResetToEmptyResetsCounters.
- test_dist_orchestrator / test_dist_scheduler fixtures drop the
std::unique_ptr<SlotState[]> member and access via a local S(id)
helper that calls ring.slot_state(id).
Docs:
- orchestrator.md section 5 rewritten to describe the three resources
DistRing now owns (task id, heap, slot state) and the end-of-run
reset contract.
- roadmap.md Dispatch internals bullet updated.
Plan (local, gitignored): PR-I moved to "in review"; Allowed Exception
hw-native-sys#6 kept (explains why L3 doesn't need a shmem slot ring).
No user-visible behaviour change: heap_ring_size still configurable via
Worker ctor, OUTPUT auto-alloc / WaW tag semantics unchanged, back-
pressure timeout still throws std::runtime_error on heap exhaustion.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors the task management system to use monotonic task IDs and dynamic slot state storage within DistRing, removing the fixed-size window limitation and allowing memory reclamation via a new reset_to_empty method. Feedback identifies a critical exception safety issue in DistRing::alloc where partial state updates could leave the allocator inconsistent, and suggests documentation improvements for type clarity in std::deque declarations.
| int32_t task_id = next_task_id_++; | ||
| DistTaskSlot slot = task_id & window_mask_; | ||
| released_[static_cast<size_t>(slot)] = 0; | ||
| slot_heap_end_[static_cast<size_t>(slot)] = heap_end; | ||
| return DistAllocResult{slot, heap_ptr, heap_end}; | ||
| released_.push_back(0); | ||
| slot_heap_end_.push_back(heap_end); | ||
| slot_states_.emplace_back(std::make_unique<DistTaskSlotState>()); | ||
| return DistAllocResult{task_id, heap_ptr, heap_end}; |
There was a problem hiding this comment.
The sequence of state updates to released_, slot_heap_end_, and slot_states_ is not exception-safe. If a push_back or emplace_back call throws an exception (e.g., std::bad_alloc), the DistRing will be left in an inconsistent state. To ensure strong exception safety, the state modifications should be transactional, rolling back partial updates if a subsequent update fails. Additionally, ensure consistent type casting when assigning to int32_t to avoid narrowing conversion warnings.
int32_t task_id = static_cast<int32_t>(next_task_id_);
released_.push_back(0);
try {
slot_heap_end_.push_back(heap_end);
try {
slot_states_.emplace_back(std::make_unique<DistTaskSlotState>());
} catch (...) {
slot_heap_end_.pop_back();
throw;
}
} catch (...) {
released_.pop_back();
throw;
}
next_task_id_++;
return DistAllocResult{task_id, heap_ptr, heap_end};References
- Always check the return values of resource allocation functions and handle potential failures gracefully, for example, by rolling back partial allocations.
- Be consistent in type casting when assigning size_t values to int32_t variables across similar methods to avoid compiler warnings about narrowing conversions.
| so it has to be pre-sized. `heap_ring_size` on the Worker ctor | ||
| controls the total bytes (default 1 GiB). | ||
| 3. The **per-task slot state** (`DistTaskSlotState`) — stored in a | ||
| `std::deque<std::unique_ptr<...>>`. `std::deque::push_back` never |
There was a problem hiding this comment.
| * the heap must be `mmap(MAP_SHARED)` and forked into child workers, | ||
| * which forces a pre-sized region. | ||
| * 3. The per-task scheduling state (`DistTaskSlotState`). Stored in a | ||
| * `std::deque<std::unique_ptr<...>>` so push_back never invalidates |
There was a problem hiding this comment.
For improved clarity in the documentation, it would be better to use the full type name instead of ... in the std::deque declaration.
| * `std::deque<std::unique_ptr<...>>` so push_back never invalidates | |
| * 3. The per-task scheduling state (DistTaskSlotState). Stored in a | |
| * std::deque<std::unique_ptr<DistTaskSlotState>> so push_back never invalidates |
There was a problem hiding this comment.
Code Review
This pull request refactors the distributed task management system by replacing the fixed-size task slot ring with a monotonic task ID system and dynamic state storage in DistRing. This change removes the legacy window size constraint and provides stable pointers to task states. However, a critical race condition was identified in DistOrchestrator::drain(), where reset_to_empty() is called before the scheduler thread has fully finished accessing task state, potentially leading to a use-after-free.
| // Every slot is CONSUMED (active_tasks_ == 0 ⇒ allocator last_alive_ == | ||
| // next_task_id_). Drop all per-slot state so the next Worker.run() | ||
| // starts from task_id = 0 with no accumulated memory. | ||
| allocator_->reset_to_empty(); |
There was a problem hiding this comment.
There appears to be a critical race condition here. The drain() function, running on the user's thread, calls allocator_->reset_to_empty() as soon as active_tasks_ becomes zero. However, the scheduler thread might still be processing tasks and accessing slot state data.
Specifically, active_tasks_ is decremented in on_consumed, which is called from the scheduler thread. The scheduler thread might continue executing code within on_task_complete (e.g., accessing consumers of a completed task) after active_tasks_ has reached zero and unblocked drain().
When reset_to_empty() is called, it deallocates all DistTaskSlotState objects. If the scheduler thread is concurrently trying to access one of these objects via a pointer obtained from ring->slot_state(), it will result in a use-after-free.
To fix this, you need to ensure the scheduler is quiescent (i.e., not in the middle of processing any task logic) before reset_to_empty() is called. This might involve more sophisticated synchronization between the orchestrator's drain() and the scheduler thread.
References
- If a component is accessed by multiple threads (e.g., Orchestrator and Scheduler), it requires a mutex to protect its operations to prevent data races.
- A producer should only be skipped or resources released if it is in the CONSUMED state, not just COMPLETED, to prevent use-after-free issues.
Summary
Internal cleanup PR, follow-up to #560. Replaces the fixed-size slot pool (
DIST_TASK_WINDOW_SIZE = 128+DistTaskSlotState[]insideDistWorker) with dynamic storage owned byDistRing. Slot state lives entirely in the parent process's heap at L3 — Orchestrator and Scheduler read it directly; child workers only receive payloads through a mailbox, never the slot state — so the shmem-backed ring index L2 uses is unnecessary at this level.Only the heap still needs a pre-sized region (
MAP_SHARED | MAP_ANONYMOUSinherited across fork). Heap back-pressure behaviour is unchanged.No user-visible change:
heap_ring_size=…still works,OUTPUTauto-alloc /INOUTWaW semantics unchanged, back-pressure timeout still throwsstd::runtime_error.What changed
DistRingowns three correlated per-task resources now:int32_ttask id (no window, no modulo wrap),std::deque<std::unique_ptr<DistTaskSlotState>>.std::deque::push_backnever invalidates existing pointers, soring.slot_state(id)hands out a pointer that stays valid for the slot's lifetime without keeping the mutex held past the lookup.init(heap_bytes, timeout_ms)drops thewindow_sizeparameter; the slot ring is gone entirely.reset_to_empty(): new method called byDistOrchestrator::drain()right afteractive_tasks_hits 0. Drops all slot states and zeroes task-id / heap cursors so eachWorker.run()starts fromtask_id = 0with bounded memory (per-run peak, not cumulative).DistOrchestrator::initdropsslots/num_slots.slot_state(id)delegates toring.slot_state(id)with a nullptr-check.DistScheduler::Configdropsslots/num_slots, takesDistRing *ringinstead. Everycfg_.slots[id]access becomes*cfg_.ring->slot_state(id).DistWorkerdrops thestd::unique_ptr<DistTaskSlotState[]> slots_member; slot state lives inside the allocator now.DistWorker::init()is a straight pass-through.DIST_TASK_WINDOW_SIZEconstant removed fromdist_types.h.Tests
test_dist_ringrewritten:SlotAllocGrowsPastLegacyWindow— 2048 allocs past the old 128 cap, verifies no "window full" errorSlotStateIsPointerStable— grabs a ptr to slot 0, allocs 1000 more slots, verifies ptr identityResetToEmptyRequiresAllReleased+ResetToEmptyResetsCountersBackPressureThenReleaseUnblocks/TimeoutThrowsRuntimeError/ShutdownUnblocksAllocnow exercise heap back-pressure directly (the only source of back-pressure remaining)test_dist_orchestrator/test_dist_schedulerfixtures drop thestd::unique_ptr<DistTaskSlotState[]> slotsmember and access slot state via a shortS(id)fixture helper that wrapsring.slot_state(id).Test plan
cmake --build /tmp/dist_ut_build && ctest)test_alloc_dep_wires_via_tensormapand friends unchanged — sameINOUT-driven WaW path.Plan reference
Follow-up of #560 (PR-H). Tracked as PR-I in the hierarchical-runtime plan under "Follow-up PR chain" / Allowed Exception #6 in the L2 Consistency Audit.