Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/a2a3/platform/include/aicpu/l2_swimlane_collector_aicpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,22 @@ void l2_swimlane_aicpu_init_core_assignments(int total_cores);
void l2_swimlane_aicpu_write_core_assignments_for_thread(int thread_idx, const int *core_ids, int core_num);

/**
* Flush remaining phase records for a thread
* Flush the remaining scheduler-phase records for a scheduler thread.
*
* Marks the current WRITING phase buffer as READY and enqueues it
* for host collection. Called at thread exit (analogous to l2_swimlane_aicpu_flush).
* Marks the thread's current WRITING sched-phase buffer as READY and enqueues
* it for host collection. Called at scheduler-thread exit.
*
* @param thread_idx Thread index (scheduler thread or orchestrator)
* @param thread_idx Scheduler thread index (= sched pool index = ready queue)
*/
void l2_swimlane_aicpu_flush_phase_buffers(int thread_idx);
void l2_swimlane_aicpu_flush_sched_phase_buffer(int thread_idx);

/**
* Flush the remaining orchestrator-phase records (single orch instance, pool
* ordinal 0). Called once by the orchestrator thread at orchestration end.
*
* @param thread_idx Calling (orchestrator) AICPU thread index — selects the
* ready queue to enqueue into. The pool/lane tag is ordinal 0.
*/
void l2_swimlane_aicpu_flush_orch_phase_buffer(int thread_idx);

#endif // PLATFORM_AICPU_L2_SWIMLANE_COLLECTOR_AICPU_H_
12 changes: 7 additions & 5 deletions src/a2a3/platform/include/common/l2_swimlane_profiling.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,13 @@ struct L2SwimlaneDataHeader {
// at init; AICPU reads in l2_swimlane_aicpu_init.

// Phase profiling metadata (AICPU writes in l2_swimlane_aicpu_init_phase;
// Host reads at drain time). Both thread counts == 0 means phase
// profiling was not initialized. Gated by l2_swimlane_level >=
// SCHED_PHASES at write time. Sched and orch pools are sized
// independently — typically num_orch_phase_threads == 1, but in
// orch_to_sched mode both equal num_aicpu_threads.
// Host reads at drain time). Both counts == 0 means phase profiling was not
// initialized. Gated by l2_swimlane_level >= SCHED_PHASES at write time.
// num_sched_phase_threads counts the active scheduler threads (sched-phase
// pools are per scheduler thread, indexed by thread id). Orchestration is
// single-threaded, so orch-phase is a single instance: num_orch_phase_threads
// == 1 and records land in orch pool ordinal 0 (dep_gen / scope_stats style),
// regardless of which AICPU thread the orchestrator runs on.
uint32_t num_sched_phase_threads; // Number of sched-phase pools the AICPU initialized
uint32_t num_orch_phase_threads; // Number of orch-phase pools the AICPU initialized
uint32_t num_phase_cores; // Number of valid entries in core_to_thread (0 = unset)
Expand Down
10 changes: 8 additions & 2 deletions src/a2a3/platform/include/host/l2_swimlane_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,9 @@ class L2SwimlaneCollector : public profiling_common::ProfilerBase<L2SwimlaneColl
* @return 0 on success, error code on failure
*/
int initialize(
int num_aicore, int device_id, L2SwimlaneLevel l2_swimlane_level, const L2SwimlaneAllocCallback &alloc_cb,
L2SwimlaneRegisterCallback register_cb, const L2SwimlaneFreeCallback &free_cb, const std::string &output_prefix
int num_aicore, int aicpu_thread_num, int device_id, L2SwimlaneLevel l2_swimlane_level,
const L2SwimlaneAllocCallback &alloc_cb, L2SwimlaneRegisterCallback register_cb,
const L2SwimlaneFreeCallback &free_cb, const std::string &output_prefix
);

/**
Expand Down Expand Up @@ -413,6 +414,11 @@ class L2SwimlaneCollector : public profiling_common::ProfilerBase<L2SwimlaneColl
void *aicore_ring_addr_table_dev_{nullptr};

int num_aicore_{0};
// Total AICPU threads launched this run. The dedicated orchestrator runs on
// the last one (aicpu_thread_num_ - 1); used to report its thread number in
// the phase-metadata log (orch-phase is a single pool, so its index alone
// does not encode the AICPU thread).
int aicpu_thread_num_{0};
L2SwimlaneLevel l2_swimlane_level_{L2SwimlaneLevel::DISABLED};

// Per-task output directory captured at initialize() time. Consumed by
Expand Down
6 changes: 3 additions & 3 deletions src/a2a3/platform/onboard/host/device_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ int DeviceRunner::run(Runtime &runtime, int block_dim, int launch_aicpu_num) {

// Initialize per-subsystem shared memory.
if (enable_l2_swimlane_) {
rc = init_l2_swimlane(num_aicore, device_id_);
rc = init_l2_swimlane(num_aicore, runtime.aicpu_thread_num, device_id_);
if (rc != 0) {
LOG_ERROR("init_l2_swimlane failed: %d", rc);
return rc;
Expand Down Expand Up @@ -457,7 +457,7 @@ int DeviceRunner::finalize() {

// `launch_aicpu_kernel` and `launch_aicore_kernel` live on `DeviceRunnerBase`.

int DeviceRunner::init_l2_swimlane(int num_aicore, int device_id) {
int DeviceRunner::init_l2_swimlane(int num_aicore, int aicpu_thread_num, int device_id) {
auto alloc_cb = [this](size_t size) -> void * {
return mem_alloc_.alloc(size);
};
Expand All @@ -480,7 +480,7 @@ int DeviceRunner::init_l2_swimlane(int num_aicore, int device_id) {
};

int rc = l2_swimlane_collector_.initialize(
num_aicore, device_id, l2_swimlane_level_, alloc_cb, register_cb, free_cb, output_prefix_
num_aicore, aicpu_thread_num, device_id, l2_swimlane_level_, alloc_cb, register_cb, free_cb, output_prefix_
);
if (rc != 0) {
return rc;
Expand Down
2 changes: 1 addition & 1 deletion src/a2a3/platform/onboard/host/device_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class DeviceRunner : public DeviceRunnerBase {
* @param device_id Device ID for host registration
* @return 0 on success, error code on failure
*/
int init_l2_swimlane(int num_aicore, int device_id);
int init_l2_swimlane(int num_aicore, int aicpu_thread_num, int device_id);

/**
* Initialize tensor dump shared memory and collector.
Expand Down
113 changes: 71 additions & 42 deletions src/a2a3/platform/shared/aicpu/l2_swimlane_collector_aicpu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ L2SwimlaneLevel get_l2_swimlane_level() { return g_l2_swimlane_level; }
* Enqueue ready buffer to per-thread queue
*
* @param header L2SwimlaneDataHeader pointer
* @param thread_idx Thread index
* @param core_index Core index (or thread_idx for phase entries)
* @param thread_idx AICPU thread index (selects the per-thread ready queue)
* @param core_index Core index for task entries, or pool ordinal for phase entries
* @param buffer_ptr Device pointer to the full buffer
* @param buffer_seq Sequence number for ordering
* @param kind Buffer kind discriminator (see L2SwimlaneBufferKind)
Expand Down Expand Up @@ -684,18 +684,24 @@ void l2_swimlane_aicpu_init_phase(int worker_count, int num_sched_phase_threads,
// ready queue under `kind`, then pop a fresh buffer from free_queue. Sets
// `*current_buf_out` to nullptr if no free buffer is available — subsequent
// records on that thread will drop until the host catches up.
// `thread_idx` is the AICPU thread doing the enqueue (always the caller); it
// selects that thread's own SPSC ready queue, which it must own exclusively.
// `pool_idx` is the pool ordinal the host uses to file records and recycle the
// buffer to that pool (the same ordinal indexes the output lane). For sched
// pools the two coincide (thread t → queue t, pool t); for the single orch
// instance they differ (orchestrator's thread, but pool ordinal 0).
template <typename Buffer>
static void switch_phase_buffer_kind(
int thread_idx, L2SwimlaneAicpuTaskPool *state, Buffer **current_buf_out, L2SwimlaneBufferKind kind,
const char *kind_label
int thread_idx, uint32_t pool_idx, L2SwimlaneAicpuTaskPool *state, Buffer **current_buf_out,
L2SwimlaneBufferKind kind, const char *kind_label
) {
Buffer *full_buf = *current_buf_out;
if (state == nullptr || full_buf == nullptr) return;

LOG_INFO_V0("Thread %d: %s phase buffer is full (count=%u)", thread_idx, kind_label, full_buf->count);

uint32_t seq = state->head.current_buf_seq;
int rc = enqueue_ready_buffer(s_l2_swimlane_header, thread_idx, thread_idx, state->head.current_buf_ptr, seq, kind);
int rc = enqueue_ready_buffer(s_l2_swimlane_header, thread_idx, pool_idx, state->head.current_buf_ptr, seq, kind);
if (rc != 0) {
LOG_ERROR(
"Thread %d: failed to enqueue %s phase buffer (queue full), %u records lost!", thread_idx, kind_label,
Expand Down Expand Up @@ -741,8 +747,8 @@ static void switch_phase_buffer_kind(
// callers should bump `dropped_record_count` and return when nullptr.
template <typename Buffer, typename Record>
static Record *acquire_phase_slot(
int thread_idx, L2SwimlaneAicpuTaskPool *state, Buffer **current_buf_out, L2SwimlaneBufferKind kind,
const char *kind_label
int thread_idx, uint32_t pool_idx, L2SwimlaneAicpuTaskPool *state, Buffer **current_buf_out,
L2SwimlaneBufferKind kind, const char *kind_label
) {
Buffer *buf = *current_buf_out;
if (buf == nullptr) {
Expand All @@ -766,7 +772,7 @@ static Record *acquire_phase_slot(

uint32_t idx = buf->count;
if (idx >= PLATFORM_PHASE_RECORDS_PER_THREAD) {
switch_phase_buffer_kind(thread_idx, state, current_buf_out, kind, kind_label);
switch_phase_buffer_kind(thread_idx, pool_idx, state, current_buf_out, kind, kind_label);
buf = *current_buf_out;
if (buf == nullptr) return nullptr;
idx = buf->count;
Expand All @@ -788,7 +794,8 @@ void l2_swimlane_aicpu_record_sched_phase(
state->head.total_record_count += 1;

auto *record = acquire_phase_slot<L2SwimlaneAicpuSchedPhaseBuffer, L2SwimlaneAicpuSchedPhaseRecord>(
thread_idx, state, &s_current_sched_phase_buffers[thread_idx], L2SwimlaneBufferKind::AicpuSchedPhase, "sched"
/*thread_idx=*/thread_idx, /*pool_idx=*/static_cast<uint32_t>(thread_idx), state,
&s_current_sched_phase_buffers[thread_idx], L2SwimlaneBufferKind::AicpuSchedPhase, "sched"
);
if (record == nullptr) {
state->head.dropped_record_count += 1;
Expand All @@ -809,13 +816,18 @@ void l2_swimlane_aicpu_record_orch_phase(
uint64_t start_time, uint64_t end_time, uint64_t task_id, uint32_t submit_idx
) {
if (s_orch_thread_idx < 0 || !s_phase_initialized) return;
auto *state = s_orch_phase_pools[s_orch_thread_idx];
// Single orch instance (dep_gen / scope_stats style): all orch records
// funnel into pool ordinal 0, regardless of which AICPU thread the
// orchestrator runs on. s_orch_thread_idx is the orchestrator's AICPU
// thread index — used only to pick its own ready queue (SPSC owner); the
// entry is tagged with pool ordinal 0 so the host files it into orch lane 0.
auto *state = s_orch_phase_pools[0];
if (state == nullptr) return;

state->head.total_record_count += 1;

auto *record = acquire_phase_slot<L2SwimlaneAicpuOrchPhaseBuffer, L2SwimlaneAicpuOrchPhaseRecord>(
s_orch_thread_idx, state, &s_current_orch_phase_buffers[s_orch_thread_idx],
/*thread_idx=*/s_orch_thread_idx, /*pool_idx=*/0, state, &s_current_orch_phase_buffers[0],
L2SwimlaneBufferKind::AicpuOrchPhase, "orch"
);
if (record == nullptr) {
Expand All @@ -828,39 +840,56 @@ void l2_swimlane_aicpu_record_orch_phase(
record->submit_idx = submit_idx;
}

// Final-drain flush for both phase pools owned by this thread (sched + orch).
// Called once per AICPU thread at end-of-run.
void l2_swimlane_aicpu_flush_phase_buffers(int thread_idx) {
if (!s_phase_initialized || s_l2_swimlane_header == nullptr) return;

auto flush_one = [&](L2SwimlaneAicpuTaskPool *state, L2SwimlaneBufferKind kind, const char *kind_label) {
if (state == nullptr) return;
rmb();
uint64_t buf_ptr = state->head.current_buf_ptr;
if (buf_ptr == 0) return;
// Reuse TypedBuffer's count layout — same offset regardless of payload type.
auto *buf = reinterpret_cast<L2SwimlaneAicpuSchedPhaseBuffer *>(buf_ptr);
if (buf->count == 0) return;
uint32_t seq = state->head.current_buf_seq;
int rc = enqueue_ready_buffer(s_l2_swimlane_header, thread_idx, thread_idx, buf_ptr, seq, kind);
if (rc == 0) {
LOG_INFO_V0("Thread %d: flushed %s phase buffer with %u records", thread_idx, kind_label, buf->count);
} else {
LOG_ERROR(
"Thread %d: failed to enqueue %s phase buffer (queue full), %u records lost!", thread_idx, kind_label,
buf->count
);
state->head.dropped_record_count += buf->count;
buf->count = 0;
}
state->head.current_buf_ptr = 0;
wmb();
};
// Final-drain flush of one phase pool's active buffer. `thread_idx` / `pool_idx`
// as in switch_phase_buffer_kind.
static void flush_phase_pool(
int thread_idx, uint32_t pool_idx, L2SwimlaneAicpuTaskPool *state, L2SwimlaneBufferKind kind, const char *kind_label
) {
if (state == nullptr) return;
rmb();
uint64_t buf_ptr = state->head.current_buf_ptr;
if (buf_ptr == 0) return;
// `count` sits AFTER the records[] array in TypedBuffer, so its byte offset
// is N * sizeof(Record) — different for sched (40B) vs orch (32B) records.
// Read/write it through the matching buffer type; a single fixed cast reads
// past the orch buffer, sees 0, and silently skips the orch flush.
volatile uint32_t *count_ptr = (kind == L2SwimlaneBufferKind::AicpuOrchPhase) ?
&reinterpret_cast<L2SwimlaneAicpuOrchPhaseBuffer *>(buf_ptr)->count :
&reinterpret_cast<L2SwimlaneAicpuSchedPhaseBuffer *>(buf_ptr)->count;
if (*count_ptr == 0) return;
uint32_t seq = state->head.current_buf_seq;
int rc = enqueue_ready_buffer(s_l2_swimlane_header, thread_idx, pool_idx, buf_ptr, seq, kind);
if (rc == 0) {
LOG_INFO_V0("Thread %d: flushed %s phase buffer with %u records", thread_idx, kind_label, *count_ptr);
} else {
LOG_ERROR(
"Thread %d: failed to enqueue %s phase buffer (queue full), %u records lost!", thread_idx, kind_label,
*count_ptr
);
state->head.dropped_record_count += *count_ptr;
*count_ptr = 0;
}
state->head.current_buf_ptr = 0;
wmb();
}

flush_one(s_sched_phase_pools[thread_idx], L2SwimlaneBufferKind::AicpuSchedPhase, "sched");
// Final-drain flush of the scheduler-phase pool owned by this scheduler thread.
void l2_swimlane_aicpu_flush_sched_phase_buffer(int thread_idx) {
if (!s_phase_initialized || s_l2_swimlane_header == nullptr) return;
flush_phase_pool(
thread_idx, static_cast<uint32_t>(thread_idx), s_sched_phase_pools[thread_idx],
L2SwimlaneBufferKind::AicpuSchedPhase, "sched"
);
s_current_sched_phase_buffers[thread_idx] = nullptr;
flush_one(s_orch_phase_pools[thread_idx], L2SwimlaneBufferKind::AicpuOrchPhase, "orch");
s_current_orch_phase_buffers[thread_idx] = nullptr;
}

// Final-drain flush of the single orchestrator's orch-phase pool (ordinal 0).
// Called once by the orchestrator thread at orchestration end; see
// record_orch_phase for the pool-0 / own-ready-queue tagging.
void l2_swimlane_aicpu_flush_orch_phase_buffer(int thread_idx) {
if (!s_phase_initialized || s_l2_swimlane_header == nullptr) return;
flush_phase_pool(thread_idx, /*pool_idx=*/0, s_orch_phase_pools[0], L2SwimlaneBufferKind::AicpuOrchPhase, "orch");
s_current_orch_phase_buffers[0] = nullptr;
}

void l2_swimlane_aicpu_init_core_assignments(int total_cores) {
Expand Down
Loading