Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ struct AicpuExecutor {
uint64_t dispatch_timestamps_[RUNTIME_MAX_WORKER]; // Per-core AICPU dispatch timestamp
uint32_t core_dispatch_counts_[RUNTIME_MAX_WORKER]; // Per-core total dispatched task counter (for buffer management)

uint64_t* func_id_to_addr_;
uint64_t get_function_bin_addr(int func_id) const {
if (func_id < 0 || func_id >= RUNTIME_MAX_FUNC_ID) return 0;
return func_id_to_addr_[func_id];
}

// ===== Methods =====
int32_t init(Runtime* runtime);
int32_t handshake_all_cores(Runtime* runtime);
Expand All @@ -235,10 +241,9 @@ struct AicpuExecutor {
// Build slim PTO2DispatchPayload: only function_bin_addr + args.
// Metadata (mixed_task_id, subslot, kernel_id, core_type) stays in TaskDescriptor.
void build_pto2_payload(PTO2DispatchPayload* out,
Runtime* runtime,
int32_t kernel_id,
PTO2TaskPayload* task_pl) {
out->function_bin_addr = runtime->get_function_bin_addr(kernel_id);
out->function_bin_addr = get_function_bin_addr(kernel_id);
int32_t n = 0;
for (int32_t i = 0; i < task_pl->param_count; i++) {
if (!task_pl->is_tensor[i]) {
Expand Down Expand Up @@ -480,7 +485,7 @@ struct AicpuExecutor {
) {
PTO2DispatchPayload* payload = &s_pto2_payload_per_core[core_id];
int32_t slot_idx = static_cast<int32_t>(subslot);
build_pto2_payload(payload, runtime, task->kernel_id[slot_idx], task_pl);
build_pto2_payload(payload, task->kernel_id[slot_idx], task_pl);
s_executing_subslot[core_id] = subslot;
#if PTO2_PROFILING
if (profiling_enabled) {
Expand Down Expand Up @@ -765,6 +770,8 @@ int32_t AicpuExecutor::init(Runtime* runtime) {
return -1;
}

func_id_to_addr_ = runtime->func_id_to_addr_;

// Read execution parameters from runtime
thread_num_ = runtime->sche_cpu_num;
orch_thread_num_ = runtime->orch_thread_num;
Expand Down Expand Up @@ -947,13 +954,13 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa

// Local-first dispatch buffers (stack-allocated, one per CoreType per scheduling thread).
// Initialized once; must be empty at the start of each iteration.
constexpr int LOCAL_READY_CAP_PER_TYPE = 64;
constexpr int LOCAL_READY_CAP_PER_TYPE = 256;
int32_t local_aic_ids[LOCAL_READY_CAP_PER_TYPE];
int32_t local_aiv_ids[LOCAL_READY_CAP_PER_TYPE];
PTO2LocalReadyBuffer local_bufs[PTO2_LOCAL_DISPATCH_TYPE_NUM]; // [0]=AIC, [1]=AIV
local_bufs[0].reset(local_aic_ids, LOCAL_READY_CAP_PER_TYPE);
local_bufs[1].reset(local_aiv_ids, LOCAL_READY_CAP_PER_TYPE);
int32_t deferred_release_ids[128];
int32_t deferred_release_ids[256];
int32_t deferred_release_count = 0;

bool cores_released = false;
Expand Down Expand Up @@ -1046,6 +1053,9 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa
);
}
if (completed_this_turn > 0) {
#if PTO2_SCHED_PROFILING
rt->scheduler.tasks_completed.fetch_add(completed_this_turn, std::memory_order_relaxed);
#endif
int32_t prev = completed_tasks_.fetch_add(completed_this_turn, std::memory_order_relaxed);
int32_t new_total = prev + completed_this_turn;
last_progress_count = new_total;
Expand All @@ -1063,13 +1073,13 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa
if (!try_completed) {
CYCLE_COUNT_LAP(sched_idle_cycle);
} else {
CYCLE_COUNT_LAP(sched_complete_cycle);
if (profiling_enabled && phase_complete_count > 0) {
perf_aicpu_record_phase(
thread_idx, AicpuPhaseId::SCHED_COMPLETE, _t0_phase, _t1, sched_loop_count, phase_complete_count);
_t0_phase = _t1;
phase_complete_count = 0;
}
CYCLE_COUNT_LAP(sched_complete_cycle);
}
#endif

Expand Down Expand Up @@ -1226,13 +1236,13 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa
if (!try_pushed) {
CYCLE_COUNT_LAP(sched_idle_cycle);
} else {
CYCLE_COUNT_LAP(sched_dispatch_cycle);
if (profiling_enabled && phase_dispatch_count > 0) {
perf_aicpu_record_phase(
thread_idx, AicpuPhaseId::SCHED_DISPATCH, _t0_phase, _t1, sched_loop_count, phase_dispatch_count);
_t0_phase = _t1;
phase_dispatch_count = 0;
}
CYCLE_COUNT_LAP(sched_dispatch_cycle);
#endif
}

Expand Down Expand Up @@ -1264,9 +1274,9 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa
int32_t cnt_ready = 0, cnt_waiting = 0, cnt_inflight = 0;
for (int32_t si = 0; si < task_count; si++) {
int32_t slot = si & window_mask;
PTO2TaskState st = sched->task_state[slot].load(std::memory_order_relaxed);
int32_t rc = sched->fanin_refcount[slot].load(std::memory_order_relaxed);
int32_t fi = task_descriptors[slot].fanin_count;
PTO2TaskState st = sched->slot_states[slot].task_state.load(std::memory_order_relaxed);
int32_t rc = sched->slot_states[slot].fanin_refcount.load(std::memory_order_relaxed);
int32_t fi = sched->slot_states[slot].fanin_count;
int32_t kid = task_descriptors[slot].kernel_id[0];
if (st >= PTO2_TASK_COMPLETED) continue; // Already done
if (st == PTO2_TASK_READY || st == PTO2_TASK_RUNNING) { cnt_inflight++; continue; }
Expand Down Expand Up @@ -1340,12 +1350,12 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa
SPIN_WAIT_HINT();
}
#if PTO2_PROFILING
CYCLE_COUNT_LAP(sched_idle_cycle);
if (profiling_enabled) {
perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_IDLE_WAIT,
_t0_phase, _t1, sched_loop_count, 0);
_t0_phase = _t1;
}
CYCLE_COUNT_LAP(sched_idle_cycle);
#endif
}
}
Expand Down Expand Up @@ -1634,6 +1644,9 @@ int32_t AicpuExecutor::run(Runtime* runtime) {
return -1;
}

// Wire up slot_states pointer for profiling (complete_perf_records)
runtime->set_pto2_slot_states_ptr(rt->scheduler.slot_states);

// Store shared state for other orchestrator threads
orch_func_ = orch_func;
orch_args_cached_ = args;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,22 @@ void pto2_submit_mixed_task(
task.kernel_id[static_cast<int>(PTO2SubtaskSlot::AIV1)] = normalized.aiv1_kernel_id;
task.active_mask = active_mask;
task.subtask_done_mask.store(0, std::memory_order_relaxed);
task.fanin_count = 0;
task.fanout_head = nullptr;
task.fanout_lock.store(0, std::memory_order_relaxed);
// Initial fanout_count = 1 (the owning scope holds one reference)
task.fanout_count = 1;
task.packed_buffer_base = NULL;
task.packed_buffer_end = NULL;

// Initialize slot state (scheduler-private)
PTO2SchedulerState* sched = orch->scheduler;
if (sched) {
PTO2TaskSlotState& slot_state = sched->slot_states[slot];
slot_state.fanin_count = 0;
slot_state.fanout_head = nullptr;
slot_state.fanout_lock.store(0, std::memory_order_relaxed);
// Initial fanout_count = 1 (the owning scope holds one reference)
slot_state.fanout_count = 1;
slot_state.fanout_refcount.store(0, std::memory_order_release);
slot_state.fanin_refcount.store(0, std::memory_order_release);
}

// Register this task in its owning scope
scope_tasks_push(orch, task_id);

Expand Down Expand Up @@ -408,21 +416,20 @@ void pto2_submit_mixed_task(

// === STEP 5: Finalize fanin list ===
// First build the fanin list
if (orch->scheduler) {
PTO2SchedulerState* sched = orch->scheduler;

if (sched) {
PTO2TaskSlotState& cur_slot_state = sched->slot_states[slot];
// Initialize scheduler state BEFORE adding to producer fanout lists,
// so concurrent on_mixed_task_complete can safely access task_state/fanout_refcount.
sched->task_state[slot].store(PTO2_TASK_PENDING, std::memory_order_relaxed);
sched->fanout_refcount[slot].store(0, std::memory_order_relaxed);
cur_slot_state.task_state.store(PTO2_TASK_PENDING, std::memory_order_relaxed);
cur_slot_state.fanout_refcount.store(0, std::memory_order_relaxed);

auto& dep_pool = orch->dep_pool;
if (orch->dep_pool_cur_entry == nullptr) {
orch->dep_pool_cur_entry = &dep_pool.alloc();
}

int32_t early_finished = 0;
task.fanin_count = fanin_count + 1; // +1 redundance for not being ready too early
cur_slot_state.fanin_count = fanin_count + 1; // +1 redundance for not being ready too early
payload->fanin_actual_count = fanin_count;
for (int i = 0; i < fanin_count; i++) {
payload->fanin_tasks[i] = fanin_temp[i];
Expand All @@ -431,33 +438,33 @@ void pto2_submit_mixed_task(
int32_t producer_task_id = fanin_temp[i];
// Add this task to producer's fanout list (with spinlock)
int32_t prod_slot = task_ring.get_task_slot(producer_task_id);
PTO2TaskDescriptor& producer = task_ring.get_task_by_slot(prod_slot);
PTO2TaskSlotState& producer_slot_state = sched->slot_states[prod_slot];
orch->dep_pool_cur_entry->task_id = task_id;
orch->dep_pool_cur_entry->next = producer_slot_state.fanout_head;
#if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING
pto2_fanout_lock(producer, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle);
pto2_fanout_lock(producer_slot_state, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle);
#else
pto2_fanout_lock(producer);
pto2_fanout_lock(producer_slot_state);
#endif
// Normal path: prepend consumer to producer's fanout list
producer.fanout_count += 1;
int32_t prod_state = sched->task_state[prod_slot].load(std::memory_order_acquire);
producer_slot_state.fanout_count += 1;
int32_t prod_state = producer_slot_state.task_state.load(std::memory_order_acquire);
if (prod_state >= PTO2_TASK_COMPLETED) {
// Early return optimization: if producer already completed, we can skip adding dependency and directly
// decrement fanin_count
early_finished++;
} else {
orch->dep_pool_cur_entry->task_id = task_id;
orch->dep_pool_cur_entry->next = producer.fanout_head;
producer.fanout_head = orch->dep_pool_cur_entry;
producer_slot_state.fanout_head = orch->dep_pool_cur_entry;
}
pto2_fanout_unlock(producer);
if (producer.fanout_head == orch->dep_pool_cur_entry) {
pto2_fanout_unlock(producer_slot_state);
if (producer_slot_state.fanout_head == orch->dep_pool_cur_entry) {
orch->dep_pool_cur_entry = &dep_pool.alloc();
}
}
// Combined release: merge early_finished batch + init_task's +1 release
// into a single atomic fetch_add (saves one acq_rel cache-line bounce per task).
int32_t initial_refcount = early_finished + 1; // +1 for the init release
int32_t new_rc = sched->fanin_refcount[slot].fetch_add(initial_refcount, std::memory_order_acq_rel)
int32_t new_rc = cur_slot_state.fanin_refcount.fetch_add(initial_refcount, std::memory_order_acq_rel)
+ initial_refcount;
if (new_rc >= fanin_count + 1) {
PTO2ResourceShape shape = pto2_active_mask_to_shape(active_mask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,13 @@ struct PTO2DepListEntry {
// =============================================================================

/**
* Task descriptor structure
* Task descriptor structure (shared memory)
*
* Stored in the TaskDescriptor ring buffer in shared memory.
* Contains both static info (set at submission) and dynamic state.
* Contains static identification and buffer pointers only.
* Dynamic scheduling state (fanin/fanout/task_state) is in PTO2TaskSlotState.
*
* Concurrency notes:
* - fanout_head, fanout_count protected by fanout_lock (per-task spinlock)
* - fanin_count set once at submission, read-only after (hot path for ready check)
* - fanin_tasks stored in TaskPayload (cold path for release)
* - Other fields set by Orchestrator, read by Scheduler
* Fields set by Orchestrator at submission, read by Scheduler for dispatch.
*/
struct PTO2TaskDescriptor {
// Mixed-task identification
Expand All @@ -296,21 +293,45 @@ struct PTO2TaskDescriptor {
// Completion aggregation: each subtask sets its done bit atomically
std::atomic<uint8_t> subtask_done_mask;

// Dependency lists (linked list heads - offsets into DepListPool)
// Fanin: producers this task depends on (set once at submission)
int32_t fanin_count; // Number of producer dependencies

// Fanout: consumers that depend on this task (grows as consumers submit)
// PROTECTED BY fanout_lock
std::atomic<int32_t> fanout_lock; // Per-task spinlock (0=unlocked, 1=locked)
PTO2DepListEntry* fanout_head; // Pointer to first fanout entry (nullptr = empty), PROTECTED BY fanout_lock
int32_t fanout_count; // 1 (owning scope) + number of consumers

// Packed output buffer (all outputs packed into single contiguous buffer)
void* packed_buffer_base; // Start of packed buffer in GM Heap
void* packed_buffer_end; // End of packed buffer (for heap reclamation)
};

// =============================================================================
// Per-Slot Scheduling State
// =============================================================================

/**
* Per-task slot scheduling state (scheduler-private, NOT in shared memory)
*
* Consolidates all hot-path scheduling fields into a single cache-friendly
* structure (32 bytes = half a cache line). Accessing any field of a task's
* slot state brings all related fields into the same cache line.
*
* Concurrency notes:
* - fanout_head, fanout_count protected by fanout_lock (per-task spinlock)
* - fanin_count set once at submission, read-only after (hot path for ready check)
* - task_state, fanin_refcount, fanout_refcount updated atomically
*/
struct alignas(64) PTO2TaskSlotState {
// Fanout lock + list (accessed together under lock in on_task_complete)
std::atomic<int32_t> fanout_lock; // Per-task spinlock (0=unlocked, 1=locked)
int32_t fanout_count; // 1 (owning scope) + number of consumers

PTO2DepListEntry* fanout_head; // Pointer to first fanout entry (nullptr = empty)

// Task state (completion, consumed check, ready check)
std::atomic<PTO2TaskState> task_state; // PENDING/READY/RUNNING/COMPLETED/CONSUMED

// Fanin (accessed together in release_fanin_and_check_ready)
std::atomic<int32_t> fanin_refcount; // Dynamic: counts completed producers
int32_t fanin_count; // Number of producer dependencies (set once)

// Fanout refcount (accessed with fanout_count in check_and_handle_consumed)
std::atomic<int32_t> fanout_refcount; // Dynamic: counts released references
};

/**
* Task payload data (cold path - only accessed during orchestration and dispatch)
*
Expand Down Expand Up @@ -391,20 +412,20 @@ typedef void (*PTO2InCoreFunc)(void** args, int32_t num_args);
#endif

#if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING
static inline void pto2_fanout_lock(PTO2TaskDescriptor& task,
static inline void pto2_fanout_lock(PTO2TaskSlotState& slot_state,
uint64_t& atomic_count, uint64_t& wait_cycle) {
uint64_t t0 = get_sys_cnt_aicpu();
bool contended = false;
uint32_t atomic_ops = 0;

for (;;) {
while (task.fanout_lock.load(std::memory_order_acquire) != 0) {
while (slot_state.fanout_lock.load(std::memory_order_acquire) != 0) {
contended = true;
atomic_ops++; // each load = 1 atomic
SPIN_WAIT_HINT();
}
int32_t expected = 0;
if (task.fanout_lock.compare_exchange_weak(expected, 1,
if (slot_state.fanout_lock.compare_exchange_weak(expected, 1,
std::memory_order_acquire, std::memory_order_relaxed)) {
atomic_ops++; // successful CAS = 1 atomic
atomic_count += atomic_ops;
Expand All @@ -419,21 +440,21 @@ static inline void pto2_fanout_lock(PTO2TaskDescriptor& task,
}
#endif

static inline void pto2_fanout_lock(PTO2TaskDescriptor& task) {
static inline void pto2_fanout_lock(PTO2TaskSlotState& slot_state) {
for (;;) {
while (task.fanout_lock.load(std::memory_order_acquire) != 0) {
while (slot_state.fanout_lock.load(std::memory_order_acquire) != 0) {
SPIN_WAIT_HINT();
}
int32_t expected = 0;
if (task.fanout_lock.compare_exchange_weak(expected, 1,
if (slot_state.fanout_lock.compare_exchange_weak(expected, 1,
std::memory_order_acquire, std::memory_order_relaxed)) {
return;
}
}
}

static inline void pto2_fanout_unlock(PTO2TaskDescriptor& task) {
task.fanout_lock.store(0, std::memory_order_release);
static inline void pto2_fanout_unlock(PTO2TaskSlotState& slot_state) {
slot_state.fanout_lock.store(0, std::memory_order_release);
}

#endif // PTO_RUNTIME2_TYPES_H
Loading
Loading