From a9195c37c8c500562fc1117899bf71e30af138ce Mon Sep 17 00:00:00 2001 From: poursoul Date: Thu, 12 Mar 2026 11:10:49 +0800 Subject: [PATCH] Refactor: consolidate per-task scheduling state into cache-aligned PTO2TaskSlotState - Extract fanin/fanout fields from PTO2TaskDescriptor (shared memory) and separate scheduler arrays into a unified PTO2TaskSlotState struct (64-byte aligned, scheduler-private) for better cache locality on hot paths - Slim down PTO2TaskDescriptor to contain only static identification and buffer pointers; dynamic scheduling state now lives in slot_states - Cache func_id_to_addr in AicpuExecutor to remove Runtime* dependency from build_pto2_payload; use positional arg indexing instead of compacted - Move tensor update_start_offset() to orchestrator submission time - Add multi-threaded orchestration support in alternating_orch test (round-robin group distribution across orch threads) - Increase local dispatch and deferred release buffer capacities to 256 - Fix CYCLE_COUNT_LAP placement to measure correct profiling phases --- .../aicpu/aicpu_executor.cpp | 35 ++++-- .../runtime/pto_orchestrator.cpp | 51 ++++---- .../runtime/pto_runtime2_types.h | 71 +++++++---- .../runtime/pto_scheduler.cpp | 67 ++++------- .../runtime/pto_scheduler.h | 110 ++++++++---------- .../runtime/runtime.cpp | 18 ++- .../runtime/runtime.h | 2 + .../orchestration/alternating_orch.cpp | 7 +- 8 files changed, 186 insertions(+), 175 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index cd7f5e29b..53becd28b 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -219,6 +219,12 @@ struct AicpuExecutor { uint64_t dispatch_timestamps_[RUNTIME_MAX_WORKER]; // Per-core AICPU dispatch timestamp uint32_t core_dispatch_counts_[RUNTIME_MAX_WORKER]; // Per-core total dispatched task counter (for buffer management) + uint64_t* func_id_to_addr_; + uint64_t get_function_bin_addr(int func_id) const { + if (func_id < 0 || func_id >= RUNTIME_MAX_FUNC_ID) return 0; + return func_id_to_addr_[func_id]; + } + // ===== Methods ===== int32_t init(Runtime* runtime); int32_t handshake_all_cores(Runtime* runtime); @@ -235,10 +241,9 @@ struct AicpuExecutor { // Build slim PTO2DispatchPayload: only function_bin_addr + args. // Metadata (mixed_task_id, subslot, kernel_id, core_type) stays in TaskDescriptor. void build_pto2_payload(PTO2DispatchPayload* out, - Runtime* runtime, int32_t kernel_id, PTO2TaskPayload* task_pl) { - out->function_bin_addr = runtime->get_function_bin_addr(kernel_id); + out->function_bin_addr = get_function_bin_addr(kernel_id); int32_t n = 0; for (int32_t i = 0; i < task_pl->param_count; i++) { if (!task_pl->is_tensor[i]) { @@ -480,7 +485,7 @@ struct AicpuExecutor { ) { PTO2DispatchPayload* payload = &s_pto2_payload_per_core[core_id]; int32_t slot_idx = static_cast(subslot); - build_pto2_payload(payload, runtime, task->kernel_id[slot_idx], task_pl); + build_pto2_payload(payload, task->kernel_id[slot_idx], task_pl); s_executing_subslot[core_id] = subslot; #if PTO2_PROFILING if (profiling_enabled) { @@ -765,6 +770,8 @@ int32_t AicpuExecutor::init(Runtime* runtime) { return -1; } + func_id_to_addr_ = runtime->func_id_to_addr_; + // Read execution parameters from runtime thread_num_ = runtime->sche_cpu_num; orch_thread_num_ = runtime->orch_thread_num; @@ -947,13 +954,13 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa // Local-first dispatch buffers (stack-allocated, one per CoreType per scheduling thread). // Initialized once; must be empty at the start of each iteration. - constexpr int LOCAL_READY_CAP_PER_TYPE = 64; + constexpr int LOCAL_READY_CAP_PER_TYPE = 256; int32_t local_aic_ids[LOCAL_READY_CAP_PER_TYPE]; int32_t local_aiv_ids[LOCAL_READY_CAP_PER_TYPE]; PTO2LocalReadyBuffer local_bufs[PTO2_LOCAL_DISPATCH_TYPE_NUM]; // [0]=AIC, [1]=AIV local_bufs[0].reset(local_aic_ids, LOCAL_READY_CAP_PER_TYPE); local_bufs[1].reset(local_aiv_ids, LOCAL_READY_CAP_PER_TYPE); - int32_t deferred_release_ids[128]; + int32_t deferred_release_ids[256]; int32_t deferred_release_count = 0; bool cores_released = false; @@ -1046,6 +1053,9 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ); } if (completed_this_turn > 0) { +#if PTO2_SCHED_PROFILING + rt->scheduler.tasks_completed.fetch_add(completed_this_turn, std::memory_order_relaxed); +#endif int32_t prev = completed_tasks_.fetch_add(completed_this_turn, std::memory_order_relaxed); int32_t new_total = prev + completed_this_turn; last_progress_count = new_total; @@ -1063,13 +1073,13 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa if (!try_completed) { CYCLE_COUNT_LAP(sched_idle_cycle); } else { + CYCLE_COUNT_LAP(sched_complete_cycle); if (profiling_enabled && phase_complete_count > 0) { perf_aicpu_record_phase( thread_idx, AicpuPhaseId::SCHED_COMPLETE, _t0_phase, _t1, sched_loop_count, phase_complete_count); _t0_phase = _t1; phase_complete_count = 0; } - CYCLE_COUNT_LAP(sched_complete_cycle); } #endif @@ -1226,13 +1236,13 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa if (!try_pushed) { CYCLE_COUNT_LAP(sched_idle_cycle); } else { + CYCLE_COUNT_LAP(sched_dispatch_cycle); if (profiling_enabled && phase_dispatch_count > 0) { perf_aicpu_record_phase( thread_idx, AicpuPhaseId::SCHED_DISPATCH, _t0_phase, _t1, sched_loop_count, phase_dispatch_count); _t0_phase = _t1; phase_dispatch_count = 0; } - CYCLE_COUNT_LAP(sched_dispatch_cycle); #endif } @@ -1264,9 +1274,9 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa int32_t cnt_ready = 0, cnt_waiting = 0, cnt_inflight = 0; for (int32_t si = 0; si < task_count; si++) { int32_t slot = si & window_mask; - PTO2TaskState st = sched->task_state[slot].load(std::memory_order_relaxed); - int32_t rc = sched->fanin_refcount[slot].load(std::memory_order_relaxed); - int32_t fi = task_descriptors[slot].fanin_count; + PTO2TaskState st = sched->slot_states[slot].task_state.load(std::memory_order_relaxed); + int32_t rc = sched->slot_states[slot].fanin_refcount.load(std::memory_order_relaxed); + int32_t fi = sched->slot_states[slot].fanin_count; int32_t kid = task_descriptors[slot].kernel_id[0]; if (st >= PTO2_TASK_COMPLETED) continue; // Already done if (st == PTO2_TASK_READY || st == PTO2_TASK_RUNNING) { cnt_inflight++; continue; } @@ -1340,12 +1350,12 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa SPIN_WAIT_HINT(); } #if PTO2_PROFILING + CYCLE_COUNT_LAP(sched_idle_cycle); if (profiling_enabled) { perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_IDLE_WAIT, _t0_phase, _t1, sched_loop_count, 0); _t0_phase = _t1; } - CYCLE_COUNT_LAP(sched_idle_cycle); #endif } } @@ -1634,6 +1644,9 @@ int32_t AicpuExecutor::run(Runtime* runtime) { return -1; } + // Wire up slot_states pointer for profiling (complete_perf_records) + runtime->set_pto2_slot_states_ptr(rt->scheduler.slot_states); + // Store shared state for other orchestrator threads orch_func_ = orch_func; orch_args_cached_ = args; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 597ef47d4..b8cafd900 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -277,14 +277,22 @@ void pto2_submit_mixed_task( task.kernel_id[static_cast(PTO2SubtaskSlot::AIV1)] = normalized.aiv1_kernel_id; task.active_mask = active_mask; task.subtask_done_mask.store(0, std::memory_order_relaxed); - task.fanin_count = 0; - task.fanout_head = nullptr; - task.fanout_lock.store(0, std::memory_order_relaxed); - // Initial fanout_count = 1 (the owning scope holds one reference) - task.fanout_count = 1; task.packed_buffer_base = NULL; task.packed_buffer_end = NULL; + // Initialize slot state (scheduler-private) + PTO2SchedulerState* sched = orch->scheduler; + if (sched) { + PTO2TaskSlotState& slot_state = sched->slot_states[slot]; + slot_state.fanin_count = 0; + slot_state.fanout_head = nullptr; + slot_state.fanout_lock.store(0, std::memory_order_relaxed); + // Initial fanout_count = 1 (the owning scope holds one reference) + slot_state.fanout_count = 1; + slot_state.fanout_refcount.store(0, std::memory_order_release); + slot_state.fanin_refcount.store(0, std::memory_order_release); + } + // Register this task in its owning scope scope_tasks_push(orch, task_id); @@ -408,13 +416,12 @@ void pto2_submit_mixed_task( // === STEP 5: Finalize fanin list === // First build the fanin list - if (orch->scheduler) { - PTO2SchedulerState* sched = orch->scheduler; - + if (sched) { + PTO2TaskSlotState& cur_slot_state = sched->slot_states[slot]; // Initialize scheduler state BEFORE adding to producer fanout lists, // so concurrent on_mixed_task_complete can safely access task_state/fanout_refcount. - sched->task_state[slot].store(PTO2_TASK_PENDING, std::memory_order_relaxed); - sched->fanout_refcount[slot].store(0, std::memory_order_relaxed); + cur_slot_state.task_state.store(PTO2_TASK_PENDING, std::memory_order_relaxed); + cur_slot_state.fanout_refcount.store(0, std::memory_order_relaxed); auto& dep_pool = orch->dep_pool; if (orch->dep_pool_cur_entry == nullptr) { @@ -422,7 +429,7 @@ void pto2_submit_mixed_task( } int32_t early_finished = 0; - task.fanin_count = fanin_count + 1; // +1 redundance for not being ready too early + cur_slot_state.fanin_count = fanin_count + 1; // +1 redundance for not being ready too early payload->fanin_actual_count = fanin_count; for (int i = 0; i < fanin_count; i++) { payload->fanin_tasks[i] = fanin_temp[i]; @@ -431,33 +438,33 @@ void pto2_submit_mixed_task( int32_t producer_task_id = fanin_temp[i]; // Add this task to producer's fanout list (with spinlock) int32_t prod_slot = task_ring.get_task_slot(producer_task_id); - PTO2TaskDescriptor& producer = task_ring.get_task_by_slot(prod_slot); + PTO2TaskSlotState& producer_slot_state = sched->slot_states[prod_slot]; + orch->dep_pool_cur_entry->task_id = task_id; + orch->dep_pool_cur_entry->next = producer_slot_state.fanout_head; #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING - pto2_fanout_lock(producer, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle); + pto2_fanout_lock(producer_slot_state, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle); #else - pto2_fanout_lock(producer); + pto2_fanout_lock(producer_slot_state); #endif // Normal path: prepend consumer to producer's fanout list - producer.fanout_count += 1; - int32_t prod_state = sched->task_state[prod_slot].load(std::memory_order_acquire); + producer_slot_state.fanout_count += 1; + int32_t prod_state = producer_slot_state.task_state.load(std::memory_order_acquire); if (prod_state >= PTO2_TASK_COMPLETED) { // Early return optimization: if producer already completed, we can skip adding dependency and directly // decrement fanin_count early_finished++; } else { - orch->dep_pool_cur_entry->task_id = task_id; - orch->dep_pool_cur_entry->next = producer.fanout_head; - producer.fanout_head = orch->dep_pool_cur_entry; + producer_slot_state.fanout_head = orch->dep_pool_cur_entry; } - pto2_fanout_unlock(producer); - if (producer.fanout_head == orch->dep_pool_cur_entry) { + pto2_fanout_unlock(producer_slot_state); + if (producer_slot_state.fanout_head == orch->dep_pool_cur_entry) { orch->dep_pool_cur_entry = &dep_pool.alloc(); } } // Combined release: merge early_finished batch + init_task's +1 release // into a single atomic fetch_add (saves one acq_rel cache-line bounce per task). int32_t initial_refcount = early_finished + 1; // +1 for the init release - int32_t new_rc = sched->fanin_refcount[slot].fetch_add(initial_refcount, std::memory_order_acq_rel) + int32_t new_rc = cur_slot_state.fanin_refcount.fetch_add(initial_refcount, std::memory_order_acq_rel) + initial_refcount; if (new_rc >= fanin_count + 1) { PTO2ResourceShape shape = pto2_active_mask_to_shape(active_mask); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h index 41a5db45f..afbea42a1 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h @@ -272,16 +272,13 @@ struct PTO2DepListEntry { // ============================================================================= /** - * Task descriptor structure + * Task descriptor structure (shared memory) * * Stored in the TaskDescriptor ring buffer in shared memory. - * Contains both static info (set at submission) and dynamic state. + * Contains static identification and buffer pointers only. + * Dynamic scheduling state (fanin/fanout/task_state) is in PTO2TaskSlotState. * - * Concurrency notes: - * - fanout_head, fanout_count protected by fanout_lock (per-task spinlock) - * - fanin_count set once at submission, read-only after (hot path for ready check) - * - fanin_tasks stored in TaskPayload (cold path for release) - * - Other fields set by Orchestrator, read by Scheduler + * Fields set by Orchestrator at submission, read by Scheduler for dispatch. */ struct PTO2TaskDescriptor { // Mixed-task identification @@ -296,21 +293,45 @@ struct PTO2TaskDescriptor { // Completion aggregation: each subtask sets its done bit atomically std::atomic subtask_done_mask; - // Dependency lists (linked list heads - offsets into DepListPool) - // Fanin: producers this task depends on (set once at submission) - int32_t fanin_count; // Number of producer dependencies - - // Fanout: consumers that depend on this task (grows as consumers submit) - // PROTECTED BY fanout_lock - std::atomic fanout_lock; // Per-task spinlock (0=unlocked, 1=locked) - PTO2DepListEntry* fanout_head; // Pointer to first fanout entry (nullptr = empty), PROTECTED BY fanout_lock - int32_t fanout_count; // 1 (owning scope) + number of consumers - // Packed output buffer (all outputs packed into single contiguous buffer) void* packed_buffer_base; // Start of packed buffer in GM Heap void* packed_buffer_end; // End of packed buffer (for heap reclamation) }; +// ============================================================================= +// Per-Slot Scheduling State +// ============================================================================= + +/** + * Per-task slot scheduling state (scheduler-private, NOT in shared memory) + * + * Consolidates all hot-path scheduling fields into a single cache-friendly + * structure (32 bytes = half a cache line). Accessing any field of a task's + * slot state brings all related fields into the same cache line. + * + * Concurrency notes: + * - fanout_head, fanout_count protected by fanout_lock (per-task spinlock) + * - fanin_count set once at submission, read-only after (hot path for ready check) + * - task_state, fanin_refcount, fanout_refcount updated atomically + */ +struct alignas(64) PTO2TaskSlotState { + // Fanout lock + list (accessed together under lock in on_task_complete) + std::atomic fanout_lock; // Per-task spinlock (0=unlocked, 1=locked) + int32_t fanout_count; // 1 (owning scope) + number of consumers + + PTO2DepListEntry* fanout_head; // Pointer to first fanout entry (nullptr = empty) + + // Task state (completion, consumed check, ready check) + std::atomic task_state; // PENDING/READY/RUNNING/COMPLETED/CONSUMED + + // Fanin (accessed together in release_fanin_and_check_ready) + std::atomic fanin_refcount; // Dynamic: counts completed producers + int32_t fanin_count; // Number of producer dependencies (set once) + + // Fanout refcount (accessed with fanout_count in check_and_handle_consumed) + std::atomic fanout_refcount; // Dynamic: counts released references +}; + /** * Task payload data (cold path - only accessed during orchestration and dispatch) * @@ -391,20 +412,20 @@ typedef void (*PTO2InCoreFunc)(void** args, int32_t num_args); #endif #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING -static inline void pto2_fanout_lock(PTO2TaskDescriptor& task, +static inline void pto2_fanout_lock(PTO2TaskSlotState& slot_state, uint64_t& atomic_count, uint64_t& wait_cycle) { uint64_t t0 = get_sys_cnt_aicpu(); bool contended = false; uint32_t atomic_ops = 0; for (;;) { - while (task.fanout_lock.load(std::memory_order_acquire) != 0) { + while (slot_state.fanout_lock.load(std::memory_order_acquire) != 0) { contended = true; atomic_ops++; // each load = 1 atomic SPIN_WAIT_HINT(); } int32_t expected = 0; - if (task.fanout_lock.compare_exchange_weak(expected, 1, + if (slot_state.fanout_lock.compare_exchange_weak(expected, 1, std::memory_order_acquire, std::memory_order_relaxed)) { atomic_ops++; // successful CAS = 1 atomic atomic_count += atomic_ops; @@ -419,21 +440,21 @@ static inline void pto2_fanout_lock(PTO2TaskDescriptor& task, } #endif -static inline void pto2_fanout_lock(PTO2TaskDescriptor& task) { +static inline void pto2_fanout_lock(PTO2TaskSlotState& slot_state) { for (;;) { - while (task.fanout_lock.load(std::memory_order_acquire) != 0) { + while (slot_state.fanout_lock.load(std::memory_order_acquire) != 0) { SPIN_WAIT_HINT(); } int32_t expected = 0; - if (task.fanout_lock.compare_exchange_weak(expected, 1, + if (slot_state.fanout_lock.compare_exchange_weak(expected, 1, std::memory_order_acquire, std::memory_order_relaxed)) { return; } } } -static inline void pto2_fanout_unlock(PTO2TaskDescriptor& task) { - task.fanout_lock.store(0, std::memory_order_release); +static inline void pto2_fanout_unlock(PTO2TaskSlotState& slot_state) { + slot_state.fanout_lock.store(0, std::memory_order_release); } #endif // PTO_RUNTIME2_TYPES_H diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp index 7cf8ab7f8..8b6081eb2 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp @@ -106,11 +106,10 @@ bool pto2_scheduler_init(PTO2SchedulerState* sched, PTO2SharedMemoryHandle* sm_handle, void* heap_base) { sched->sm_handle = sm_handle; + sched->task_descriptors = sm_handle->task_descriptors; sched->heap_base = heap_base; - sched->task_state = nullptr; - sched->fanin_refcount = nullptr; - sched->fanout_refcount = nullptr; -#if PTO2_PROFILING + sched->slot_states = nullptr; +#if PTO2_SCHED_PROFILING sched->tasks_completed.store(0, std::memory_order_relaxed); sched->tasks_consumed.store(0, std::memory_order_relaxed); #endif @@ -126,36 +125,24 @@ bool pto2_scheduler_init(PTO2SchedulerState* sched, sched->last_heap_consumed = 0; sched->heap_tail = 0; - // Allocate per-task state arrays (dynamically sized based on runtime window_size) - sched->task_state = new (std::nothrow) std::atomic[window_size]; - if (!sched->task_state) { - return false; - } - - sched->fanin_refcount = new (std::nothrow) std::atomic[window_size]; - if (!sched->fanin_refcount) { - delete[] sched->task_state; - sched->task_state = nullptr; - return false; - } - - sched->fanout_refcount = new (std::nothrow) std::atomic[window_size]; - if (!sched->fanout_refcount) { - delete[] sched->fanin_refcount; - delete[] sched->task_state; - sched->fanin_refcount = nullptr; - sched->task_state = nullptr; + // Allocate per-task slot state array (dynamically sized based on runtime window_size) + sched->slot_states = new (std::nothrow) PTO2TaskSlotState[window_size]; + if (!sched->slot_states) { return false; } - // Zero-initialize all per-task state arrays. + // Zero-initialize all per-task slot state fields. // new[] default-initializes std::atomic which leaves values indeterminate. // Scheduler logic (e.g. fanin_refcount fetch_add in release_fanin_and_check_ready) // assumes slots start at zero before init_task writes them. for (uint64_t i = 0; i < window_size; i++) { - sched->task_state[i].store(static_cast(0), std::memory_order_relaxed); - sched->fanin_refcount[i].store(0, std::memory_order_relaxed); - sched->fanout_refcount[i].store(0, std::memory_order_relaxed); + sched->slot_states[i].fanout_lock.store(0, std::memory_order_relaxed); + sched->slot_states[i].fanout_count = 0; + sched->slot_states[i].fanout_head = nullptr; + sched->slot_states[i].task_state.store(static_cast(0), std::memory_order_relaxed); + sched->slot_states[i].fanin_refcount.store(0, std::memory_order_relaxed); + sched->slot_states[i].fanin_count = 0; + sched->slot_states[i].fanout_refcount.store(0, std::memory_order_relaxed); } // Initialize ready queues (one per resource shape) @@ -165,12 +152,8 @@ bool pto2_scheduler_init(PTO2SchedulerState* sched, for (int j = 0; j < i; j++) { pto2_ready_queue_destroy(&sched->ready_queues[j]); } - delete[] sched->fanout_refcount; - delete[] sched->fanin_refcount; - delete[] sched->task_state; - sched->fanout_refcount = nullptr; - sched->fanin_refcount = nullptr; - sched->task_state = nullptr; + delete[] sched->slot_states; + sched->slot_states = nullptr; return false; } } @@ -179,19 +162,9 @@ bool pto2_scheduler_init(PTO2SchedulerState* sched, } void pto2_scheduler_destroy(PTO2SchedulerState* sched) { - if (sched->task_state) { - delete[] sched->task_state; - sched->task_state = nullptr; - } - - if (sched->fanin_refcount) { - delete[] sched->fanin_refcount; - sched->fanin_refcount = nullptr; - } - - if (sched->fanout_refcount) { - delete[] sched->fanout_refcount; - sched->fanout_refcount = nullptr; + if (sched->slot_states) { + delete[] sched->slot_states; + sched->slot_states = nullptr; } for (int i = 0; i < PTO2_NUM_RESOURCE_SHAPES; i++) { @@ -207,7 +180,7 @@ void pto2_scheduler_print_stats(PTO2SchedulerState* sched) { LOG_INFO("=== Scheduler Statistics ==="); LOG_INFO("last_task_alive: %d", sched->last_task_alive); LOG_INFO("heap_tail: %" PRIu64, sched->heap_tail); -#if PTO2_PROFILING +#if PTO2_SCHED_PROFILING LOG_INFO("tasks_completed: %lld", (long long)sched->tasks_completed.load(std::memory_order_relaxed)); LOG_INFO("tasks_consumed: %lld", (long long)sched->tasks_consumed.load(std::memory_order_relaxed)); #endif diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h index 5d9dd77db..61a5c6a3e 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h @@ -280,6 +280,7 @@ struct PTO2CompletionStats { struct PTO2SchedulerState { // Shared memory access PTO2SharedMemoryHandle* sm_handle; + PTO2TaskDescriptor* task_descriptors; // Local copies of ring pointers (written to shared memory after update) int32_t last_task_alive; // Task ring tail (advances on COMPLETED for slot reuse) @@ -295,16 +296,16 @@ struct PTO2SchedulerState { // === PRIVATE DATA (not in shared memory) === - // Per-task state arrays (dynamically allocated, indexed by task_id & task_window_mask) - std::atomic* task_state; // PENDING/READY/RUNNING/COMPLETED/CONSUMED - std::atomic* fanin_refcount; // Dynamic: counts completed producers - std::atomic* fanout_refcount; // Dynamic: counts released references + // Per-task slot state (dynamically allocated, indexed by task_id & task_window_mask) + // Consolidates task_state, fanin/fanout refcounts, and dependency metadata + // into a single cache-friendly structure (32 bytes per slot). + PTO2TaskSlotState* slot_states; // Ready queues (one per resource shape) PTO2ReadyQueue ready_queues[PTO2_NUM_RESOURCE_SHAPES]; // Statistics -#if PTO2_PROFILING +#if PTO2_SCHED_PROFILING std::atomic tasks_completed; std::atomic tasks_consumed; #endif @@ -314,10 +315,13 @@ struct PTO2SchedulerState { // Inline hot-path methods // ========================================================================= - int32_t pto2_task_slot(int32_t task_id) { + int32_t get_task_slot(int32_t task_id) { return task_id & task_window_mask; } + PTO2TaskSlotState& get_slot_state_by_slot(int32_t slot) { return slot_states[slot]; } + PTO2TaskSlotState& get_slot_state_by_task_id(int32_t task_id) { return slot_states[task_id & task_window_mask]; } + void sync_to_sm() { PTO2SharedMemoryHeader* header = sm_handle->header; header->last_task_alive.store(last_task_alive, std::memory_order_release); @@ -330,8 +334,8 @@ struct PTO2SchedulerState { int32_t current_task_index = header->current_task_index.load(std::memory_order_acquire); while (last_task_alive < current_task_index) { - int32_t slot = pto2_task_slot(last_task_alive); - if (task_state[slot].load(std::memory_order_acquire) != PTO2_TASK_CONSUMED) { + PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(last_task_alive); + if (slot_state.task_state.load(std::memory_order_acquire) != PTO2_TASK_CONSUMED) { break; } last_task_alive++; @@ -348,20 +352,18 @@ struct PTO2SchedulerState { sync_to_sm(); } - void check_and_handle_consumed(int32_t slot, PTO2TaskDescriptor& task) { - if (fanout_refcount[slot].load(std::memory_order_acquire) != task.fanout_count) return; + void check_and_handle_consumed(PTO2TaskSlotState& slot_state) { + if (slot_state.fanout_refcount.load(std::memory_order_acquire) != slot_state.fanout_count) return; PTO2TaskState expected = PTO2_TASK_COMPLETED; - if (!task_state[slot].compare_exchange_strong(expected, PTO2_TASK_CONSUMED, + if (!slot_state.task_state.compare_exchange_strong(expected, PTO2_TASK_CONSUMED, std::memory_order_acq_rel, std::memory_order_acquire)) { return; } -#if PTO2_PROFILING +#if PTO2_SCHED_PROFILING tasks_consumed.fetch_add(1, std::memory_order_relaxed); #endif - fanout_refcount[slot].store(0, std::memory_order_release); - fanin_refcount[slot].store(0, std::memory_order_release); // Try-lock — if another thread is advancing, it will scan our CONSUMED task int32_t expected_lock = 0; @@ -373,19 +375,16 @@ struct PTO2SchedulerState { } #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING - void check_and_handle_consumed(int32_t task_id, PTO2TaskDescriptor& task, - uint64_t& atomic_count) { - int32_t slot = pto2_task_slot(task_id); - - int32_t fc = task.fanout_count; - int32_t rc = fanout_refcount[slot].load(std::memory_order_acquire); + void check_and_handle_consumed(PTO2TaskSlotState& slot_state, uint64_t& atomic_count) { + int32_t fc = slot_state.fanout_count; + int32_t rc = slot_state.fanout_refcount.load(std::memory_order_acquire); atomic_count += 2; // fanout_count.load + fanout_refcount.load if (rc != fc) return; PTO2TaskState expected = PTO2_TASK_COMPLETED; - if (!task_state[slot].compare_exchange_strong(expected, PTO2_TASK_CONSUMED, + if (!slot_state.task_state.compare_exchange_strong(expected, PTO2_TASK_CONSUMED, std::memory_order_acq_rel, std::memory_order_acquire)) { atomic_count += 1; // failed CAS return; @@ -393,13 +392,9 @@ struct PTO2SchedulerState { atomic_count += 1; // successful CAS -#if PTO2_PROFILING +#if PTO2_SCHED_PROFILING tasks_consumed.fetch_add(1, std::memory_order_relaxed); #endif - fanout_refcount[slot].store(0, std::memory_order_release); - fanin_refcount[slot].store(0, std::memory_order_release); - - atomic_count += 2; // fanout_refcount.store + fanin_refcount.store // Try-lock — if another thread is advancing, it will scan our CONSUMED task int32_t expected_lock = 0; @@ -415,33 +410,31 @@ struct PTO2SchedulerState { #endif void release_producer(int32_t producer_id) { - int32_t slot = pto2_task_slot(producer_id); - PTO2TaskDescriptor& producer = pto2_sm_get_task_by_slot(sm_handle, slot); - fanout_refcount[slot].fetch_add(1, std::memory_order_acq_rel); - check_and_handle_consumed(slot, producer); + PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(producer_id); + slot_state.fanout_refcount.fetch_add(1, std::memory_order_acq_rel); + check_and_handle_consumed(slot_state); } #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING void release_producer(int32_t producer_id, uint64_t& atomic_count) { - int32_t slot = pto2_task_slot(producer_id); - PTO2TaskDescriptor& producer = pto2_sm_get_task_by_slot(sm_handle, slot); - fanout_refcount[slot].fetch_add(1, std::memory_order_acq_rel); + PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(producer_id); + slot_state.fanout_refcount.fetch_add(1, std::memory_order_acq_rel); atomic_count += 1; // fanout_refcount.fetch_add - check_and_handle_consumed(producer_id, producer, atomic_count); + check_and_handle_consumed(slot_state, atomic_count); } #endif bool release_fanin_and_check_ready(int32_t task_id, PTO2TaskDescriptor* task, PTO2LocalReadyBuffer* local_bufs = nullptr) { - int32_t slot = pto2_task_slot(task_id); + PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(task_id); // Atomically increment fanin_refcount and check if all producers are done // ACQ_REL on fanin_refcount already synchronizes with the orchestrator's // release in init_task, making fanin_count visible — plain load suffices. - int32_t new_refcount = fanin_refcount[slot].fetch_add(1, std::memory_order_acq_rel) + 1; + int32_t new_refcount = slot_state.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1; - if (new_refcount == task->fanin_count) { + if (new_refcount == slot_state.fanin_count) { // Local-first: try per-CoreType thread-local buffer before global queue // Route by active_mask: AIC-containing tasks → buf[0], AIV-only → buf[1] PTO2ResourceShape shape = pto2_active_mask_to_shape(task->active_mask); @@ -462,14 +455,14 @@ struct PTO2SchedulerState { bool release_fanin_and_check_ready(int32_t task_id, PTO2TaskDescriptor* task, uint64_t& atomic_count, uint64_t& push_wait, PTO2LocalReadyBuffer* local_bufs = nullptr) { - int32_t slot = pto2_task_slot(task_id); + PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(task_id); - int32_t new_refcount = fanin_refcount[slot].fetch_add(1, std::memory_order_acq_rel) + 1; + int32_t new_refcount = slot_state.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1; atomic_count += 1; // fanin_refcount.fetch_add - if (new_refcount == task->fanin_count) { + if (new_refcount == slot_state.fanin_count) { PTO2TaskState expected = PTO2_TASK_PENDING; - if (task_state[slot].compare_exchange_strong( + if (slot_state.task_state.compare_exchange_strong( expected, PTO2_TASK_READY, std::memory_order_acq_rel, std::memory_order_acquire)) { atomic_count += 1; // CAS(task_state PENDING→READY) // Local-first: try per-CoreType thread-local buffer before global queue @@ -490,14 +483,14 @@ struct PTO2SchedulerState { #endif void init_task(int32_t task_id, PTO2TaskDescriptor* task) { - int32_t slot = pto2_task_slot(task_id); + PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(task_id); - task_state[slot].store(PTO2_TASK_PENDING, std::memory_order_relaxed); // Orchestrator is the unique owner + slot_state.task_state.store(PTO2_TASK_PENDING, std::memory_order_relaxed); // Orchestrator is the unique owner // Reset fanout_refcount for new task lifecycle. // Do NOT reset fanin_refcount — it may have been incremented by - // concurrent on_mixed_task_complete between Step 5 and Step 6. - fanout_refcount[slot].store(0, std::memory_order_relaxed); + // concurrent on_task_complete between Step 5 and Step 6. + slot_state.fanout_refcount.store(0, std::memory_order_relaxed); #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING extern uint64_t g_orch_finalize_atomic_count; @@ -543,7 +536,7 @@ struct PTO2SchedulerState { * Pushes the task back into its shape-based queue. */ void requeue_ready_task(int32_t task_id) { - int32_t slot = pto2_task_slot(task_id); + int32_t slot = get_task_slot(task_id); PTO2TaskDescriptor& task = pto2_sm_get_task_by_slot(sm_handle, slot); PTO2ResourceShape shape = pto2_active_mask_to_shape(task.active_mask); ready_queues[static_cast(shape)].push(task_id); @@ -570,7 +563,7 @@ struct PTO2SchedulerState { * @return true if this subtask was the last one, completing the mixed task. */ bool on_subtask_complete(int32_t mixed_task_id, PTO2SubtaskSlot subslot) { - int32_t slot = pto2_task_slot(mixed_task_id); + int32_t slot = get_task_slot(mixed_task_id); PTO2TaskDescriptor& task = pto2_sm_get_task_by_slot(sm_handle, slot); uint8_t done_bit = (1u << static_cast(subslot)); @@ -598,12 +591,7 @@ struct PTO2SchedulerState { void on_mixed_task_complete(int32_t mixed_task_id, PTO2LocalReadyBuffer* local_bufs = nullptr) { #endif - int32_t slot = pto2_task_slot(mixed_task_id); - PTO2TaskDescriptor& task = pto2_sm_get_task_by_slot(sm_handle, slot); - -#if PTO2_PROFILING - tasks_completed.fetch_add(1, std::memory_order_relaxed); -#endif + PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(mixed_task_id); #if PTO2_SCHED_PROFILING extern uint64_t g_sched_lock_cycle[], g_sched_fanout_cycle[]; @@ -614,13 +602,13 @@ struct PTO2SchedulerState { #endif #if PTO2_SCHED_PROFILING - pto2_fanout_lock(task, lock_atomics, lock_wait); + pto2_fanout_lock(slot_state, lock_atomics, lock_wait); #else - pto2_fanout_lock(task); + pto2_fanout_lock(slot_state); #endif - task_state[slot].store(PTO2_TASK_COMPLETED, std::memory_order_release); - PTO2DepListEntry* current = task.fanout_head; // Protected by fanout_lock - pto2_fanout_unlock(task); + slot_state.task_state.store(PTO2_TASK_COMPLETED, std::memory_order_release); + PTO2DepListEntry* current = slot_state.fanout_head; // Protected by fanout_lock + pto2_fanout_unlock(slot_state); #if PTO2_SCHED_PROFILING lock_atomics += 2; // state.store + unlock.store @@ -683,7 +671,7 @@ struct PTO2SchedulerState { #else int32_t on_task_release(int32_t task_id) { #endif - int32_t slot = pto2_task_slot(task_id); + int32_t slot = get_task_slot(task_id); PTO2TaskPayload* payload = &sm_handle->task_payloads[slot]; int32_t fanin_edges = payload->fanin_actual_count; for (int32_t i = 0; i < fanin_edges; i++) { @@ -701,12 +689,12 @@ struct PTO2SchedulerState { // Self consumed check #if PTO2_SCHED_PROFILING uint64_t self_atomics = 0; - check_and_handle_consumed(slot, pto2_sm_get_task_by_slot(sm_handle, slot), self_atomics); + check_and_handle_consumed(get_slot_state_by_slot(slot), self_atomics); g_sched_self_atomic_count[thread_idx] += self_atomics; PTO2_SCHED_CYCLE_LAP(g_sched_self_consumed_cycle[thread_idx]); g_sched_complete_count[thread_idx]++; #else - check_and_handle_consumed(slot, pto2_sm_get_task_by_slot(sm_handle, slot)); + check_and_handle_consumed(get_slot_state_by_slot(slot)); #endif return fanin_edges; } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp index e9c1a52d7..7175e5f7f 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp @@ -34,6 +34,7 @@ Runtime::Runtime() { orch_built_on_host_ = true; pto2_gm_sm_ptr_ = nullptr; pto2_gm_heap_ptr_ = nullptr; + pto2_slot_states_ptr_ = nullptr; orch_args_ = nullptr; orch_arg_count_ = 0; @@ -93,6 +94,7 @@ int Runtime::get_orch_arg_count() const { return orch_arg_count_; } void Runtime::set_orch_built_on_host(bool v) { orch_built_on_host_ = v; } void Runtime::set_pto2_gm_sm_ptr(void* p) { pto2_gm_sm_ptr_ = p; } void Runtime::set_pto2_gm_heap(void* p) { pto2_gm_heap_ptr_ = p; } +void Runtime::set_pto2_slot_states_ptr(void* p) { pto2_slot_states_ptr_ = p; } void Runtime::set_orch_args(uint64_t* args, int count) { orch_arg_count_ = count <= RUNTIME_MAX_ARGS ? count : RUNTIME_MAX_ARGS; if (args && orch_arg_count_ > 0) { @@ -165,10 +167,14 @@ void Runtime::complete_perf_records(PerfBuffer* perf_buf) { return; } - // Get PTO2 data structures + // Get slot states for fanout traversal + PTO2TaskSlotState* slot_states = static_cast(pto2_slot_states_ptr_); + if (slot_states == nullptr) { + return; + } + + // Get window mask from shared memory header PTO2SharedMemoryHeader* header = static_cast(sm_base); - PTO2TaskDescriptor* task_descriptors = reinterpret_cast( - static_cast(sm_base) + header->task_descriptors_offset); int32_t window_mask = header->task_window_size - 1; uint32_t count = perf_buf->count; @@ -177,13 +183,13 @@ void Runtime::complete_perf_records(PerfBuffer* perf_buf) { PerfRecord* record = &perf_buf->records[i]; int32_t task_id = record->task_id; - // Get TaskDescriptor from PTO2 shared memory + // Get slot state for fanout traversal int32_t slot = task_id & window_mask; - PTO2TaskDescriptor* task = &task_descriptors[slot]; + PTO2TaskSlotState& ss = slot_states[slot]; // Fill fanout information by traversing the linked list record->fanout_count = 0; - PTO2DepListEntry* cur = task->fanout_head; + PTO2DepListEntry* cur = ss.fanout_head; while (cur != nullptr && record->fanout_count < RUNTIME_MAX_FANOUT) { record->fanout[record->fanout_count++] = cur->task_id; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h index a135675d6..7690cd90f 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h @@ -170,6 +170,7 @@ class Runtime { bool orch_built_on_host_; void* pto2_gm_sm_ptr_; // GM pointer to PTO2 shared memory (device) void* pto2_gm_heap_ptr_; // GM heap for orchestrator output buffers (device) + void* pto2_slot_states_ptr_; // Pointer to PTO2TaskSlotState array (scheduler-private, for profiling) uint64_t* orch_args_; // Arguments for device orchestration int orch_arg_count_; uint64_t orch_args_storage_[RUNTIME_MAX_ARGS]; // Copy of args for device @@ -235,6 +236,7 @@ class Runtime { void set_orch_built_on_host(bool v); void set_pto2_gm_sm_ptr(void* p); void set_pto2_gm_heap(void* p); + void set_pto2_slot_states_ptr(void* p); void set_orch_args(uint64_t* args, int count); // Device orchestration SO binary (for dlopen on AICPU thread 3) diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp b/tests/device_tests/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp index a522d153b..9a7f4e221 100644 --- a/tests/device_tests/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp @@ -52,7 +52,7 @@ PTO2OrchestrationConfig aicpu_orchestration_config(uint64_t* args, int arg_count } __attribute__((visibility("default"))) -void aicpu_orchestration_entry(PTO2Runtime* rt, uint64_t* args, int arg_count) { +void aicpu_orchestration_entry(PTO2Runtime* rt, uint64_t* args, int arg_count, int orch_thread_num, int orch_thread_index) { (void)arg_count; void* dev_A = (void*)(uintptr_t)args[ARG_PTR_A]; @@ -75,6 +75,7 @@ void aicpu_orchestration_entry(PTO2Runtime* rt, uint64_t* args, int arg_count) { int matmul_batch = (int)config[3]; int add_batch = (int)config[4]; + LOG_ALWAYS(rt, "[alternating_orch] thread num: %d, idx: %d", orch_thread_num, orch_thread_index); LOG_INFO(rt, "[alternating_orch] Batch: %d, M: %d, N: %d, matmul_batch: %d, add_batch: %d", batch, M, N, matmul_batch, add_batch); @@ -103,7 +104,7 @@ void aicpu_orchestration_entry(PTO2Runtime* rt, uint64_t* args, int arg_count) { int max_groups = num_matmul_groups > num_add_groups ? num_matmul_groups : num_add_groups; // Interleaved submit: matmul and add groups alternate - for (int group_idx = 0; group_idx < max_groups; group_idx++) { + for (int group_idx = orch_thread_index; group_idx < max_groups; group_idx += orch_thread_num) { if (group_idx < num_matmul_groups) { int start_task_idx = group_idx * matmul_batch; uint64_t offset = (uint64_t)start_task_idx * MATMUL_ELEMS; @@ -147,7 +148,7 @@ void aicpu_orchestration_entry(PTO2Runtime* rt, uint64_t* args, int arg_count) { } } - LOG_INFO(rt, "[alternating_orch] Submitted %d matmul groups and %d add groups", + LOG_ALWAYS(rt, "[alternating_orch] Submitted %d matmul groups and %d add groups", total_matmul, total_add); }