diff --git a/src/a5/platform/src/host/pmu_collector.cpp b/src/a5/platform/src/host/pmu_collector.cpp index caed545c8..72922032c 100644 --- a/src/a5/platform/src/host/pmu_collector.cpp +++ b/src/a5/platform/src/host/pmu_collector.cpp @@ -73,6 +73,14 @@ int PmuCollector::init( /*shm_dev=*/nullptr, /*shm_host=*/nullptr, /*shm_size=*/0, device_id ); + // RAII rollback: any early return after this point releases the shm + // region + ring address table + per-core PmuAicoreRing + per-core + // PmuBuffers. Per-core rings are tracked separately via + // `guard.add_direct_ptr()` because they're plain alloc_cb allocations + // (no host shadow / not in dev_to_host_). `guard.commit()` runs on the + // success path before the trailing return 0. + profiling_common::InitRollbackGuard guard(manager_, free_cb); + // ---- Allocate shared header + buffer-state region ---- size_t shm_size = calc_pmu_data_size(num_cores); void *shm_host_local = nullptr; @@ -113,6 +121,7 @@ int PmuCollector::init( return -1; } aicore_rings_dev_[c] = ring_dev; + guard.add_direct_ptr(ring_dev); state->aicore_ring_ptr = reinterpret_cast(ring_dev); reinterpret_cast(aicore_ring_addrs_host_)[c] = reinterpret_cast(ring_dev); @@ -176,6 +185,7 @@ int PmuCollector::init( "PMU collector initialized: %d cores, %d threads, SHM=0x%lx, CSV=%s (opened on first record)", num_cores, num_threads, reinterpret_cast(shm_dev_), csv_path_.c_str() ); + guard.commit(); return 0; } diff --git a/src/common/platform/include/host/profiling_common/buffer_pool_manager.h b/src/common/platform/include/host/profiling_common/buffer_pool_manager.h index 2fa650465..f411bf2b7 100644 --- a/src/common/platform/include/host/profiling_common/buffer_pool_manager.h +++ b/src/common/platform/include/host/profiling_common/buffer_pool_manager.h @@ -254,6 +254,46 @@ class BufferPoolManager { malloc_shadows_.clear(); } + /** + * Abort-path cleanup: free EVERY framework-tracked device pointer (via + * `release_fn`) and every framework-malloc'd host shadow, then clear all + * containers. Distinct from `release_owned_buffers()` + `clear_mappings()` + * because this also catches buffers parked in callers' SPSC free_queues + * (which the framework tracked via `register_mapping` but does not own a + * queue for). Intended for `init()` error paths where `finalize()` has + * not run. + * + * Drains recycled/done/ready first (just discards — release goes via + * dev_to_host_ to avoid double-free) and then iterates the full + * dev→host map. Each unique dev_ptr is released exactly once. + */ + template + void release_all_owned(const ReleaseFn &release_fn) { + for (auto &pool : recycled_) + pool.clear(); + { + std::scoped_lock lock(done_mutex_); + std::queue().swap(done_queue_); + } + { + std::scoped_lock lock(ready_mutex_); + std::queue().swap(ready_queue_); + } + for (auto &kv : dev_to_host_) { + if (kv.first != nullptr) { + release_fn(kv.first); + } + // erase-based check (matches release_owned_buffers): atomic + // check-and-remove guards against a double-free if any duplicate + // mapping ever sneaks into dev_to_host_. + if (kv.second != nullptr && malloc_shadows_.erase(kv.second) > 0) { + std::free(kv.second); + } + } + dev_to_host_.clear(); + malloc_shadows_.clear(); + } + // ------------------------------------------------------------------------- // Per-tick mirror of the shared-memory region // ------------------------------------------------------------------------- diff --git a/src/common/platform/include/host/profiling_common/profiler_base.h b/src/common/platform/include/host/profiling_common/profiler_base.h index c2a4570aa..1cea19c31 100644 --- a/src/common/platform/include/host/profiling_common/profiler_base.h +++ b/src/common/platform/include/host/profiling_common/profiler_base.h @@ -160,6 +160,7 @@ #include #include #include +#include #include "common/memory_barrier.h" #include "common/platform_config.h" @@ -189,6 +190,74 @@ using ProfFreeCallback = std::function; // safe teardown via `clear_mappings()` / `release_owned_buffers()`. See // `ProfilerBase::start()` for the inline definition. +/** + * RAII scope guard for collector `init()` rollback. On destruction (without + * `commit()`) it (1) calls `manager.release_all_owned(release_fn)` to free + * every framework-tracked dev_ptr + host shadow, and (2) releases any extra + * direct dev_ptrs the collector added via `add_direct_ptr()` (used for + * pointers the collector owns outside the framework — e.g. PMU per-core + * `PmuAicoreRing` allocations on a5). + * + * Pattern: + * int Collector::init(...) { + * ... + * set_memory_context(...); + * InitRollbackGuard guard(manager_, free_cb); + * void *dev_ptr = alloc_paired_buffer(size, &host_ptr); + * if (dev_ptr == nullptr) return -1; // guard runs, frees nothing yet + * ... + * void *direct = alloc_cb(...); + * guard.add_direct_ptr(direct); // ensure it's freed on abort + * ... + * guard.commit(); // success — disarm + * initialized_ = true; + * return 0; + * } + */ +template +class InitRollbackGuard { +public: + using ReleaseFn = std::function; + + InitRollbackGuard(Manager &manager, ReleaseFn release_fn) : + manager_(manager), + release_fn_(std::move(release_fn)), + committed_(false) {} + + ~InitRollbackGuard() { + if (committed_) return; + for (void *p : direct_ptrs_) { + if (p != nullptr && release_fn_) { + release_fn_(p); + } + } + // Call release_all_owned unconditionally: it also frees malloc'd + // host shadows (via std::free, no callback needed). Gating on + // release_fn_ here would leak shadows if a collector ever passed + // an empty free_cb. Device-pointer release is gated inside the + // lambda instead. + manager_.release_all_owned([this](void *p) { + if (p != nullptr && release_fn_) { + release_fn_(p); + } + }); + } + + InitRollbackGuard(const InitRollbackGuard &) = delete; + InitRollbackGuard &operator=(const InitRollbackGuard &) = delete; + + void add_direct_ptr(void *p) { + if (p != nullptr) direct_ptrs_.push_back(p); + } + void commit() { committed_ = true; } + +private: + Manager &manager_; + ReleaseFn release_fn_; + std::vector direct_ptrs_; + bool committed_; +}; + // Result of Module::resolve_entry. Carries everything the unified // process_entry algorithm needs to (a) refill the originating pool's free // queue and (b) hand the ready buffer off to the collector. diff --git a/src/common/platform/src/host/scope_stats_collector.cpp b/src/common/platform/src/host/scope_stats_collector.cpp index 585e19179..d6aea4c4d 100644 --- a/src/common/platform/src/host/scope_stats_collector.cpp +++ b/src/common/platform/src/host/scope_stats_collector.cpp @@ -75,6 +75,12 @@ int ScopeStatsCollector::init( /*shm_dev=*/nullptr, /*shm_host=*/nullptr, /*shm_size=*/0, device_id ); + // RAII rollback: any early return after this point releases every + // framework-tracked buffer (shm region + per-buffer-state PmuBuffer-style + // entries) via free_cb. `guard.commit()` runs on the success path before + // the trailing return 0. + profiling_common::InitRollbackGuard guard(manager_, free_cb); + const int num_instances = 1; size_t shm_size = calc_scope_stats_shm_size(num_instances); void *shm_host_local = nullptr; @@ -115,6 +121,7 @@ int ScopeStatsCollector::init( initialized_ = true; shm_dev_ = shm_dev_local; + guard.commit(); // Re-set_memory_context now that the shm region is ready. start(tf) gates // on shm_host_ being non-null, so this is the moment the collector becomes diff --git a/src/common/platform/src/host/tensor_dump_collector.cpp b/src/common/platform/src/host/tensor_dump_collector.cpp index 8ffd875ea..ee3f3dff8 100644 --- a/src/common/platform/src/host/tensor_dump_collector.cpp +++ b/src/common/platform/src/host/tensor_dump_collector.cpp @@ -67,6 +67,12 @@ int TensorDumpCollector::initialize( /*shm_dev=*/nullptr, /*shm_host=*/nullptr, /*shm_size=*/0, device_id ); + // RAII rollback: any early return after this point releases the shm + // region + per-thread arenas + DumpMetaBuffers through the framework's + // dev→host map. `guard.commit()` runs on the success path before the + // trailing return 0. + profiling_common::InitRollbackGuard guard(manager_, free_cb); + // Allocate dump shared memory (header + buffer states) size_t shm_size = calc_dump_data_size(num_dump_threads); void *shm_host_local = nullptr; @@ -151,6 +157,7 @@ int TensorDumpCollector::initialize( arena_size / (1024 * 1024), PLATFORM_DUMP_BUFFERS_PER_THREAD ); + guard.commit(); return 0; }