Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 237 additions & 23 deletions piano-runtime/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,105 @@ thread_local! {
static THREAD_COOKIE: u64 = NEXT_THREAD_COOKIE.fetch_add(1, Ordering::Relaxed);
}

/// Pack a thread cookie (48 bits) and stack depth (16 bits) into a single u64.
// -- Interned function name table ------------------------------------------
//
// Maps u16 IDs to `&'static str` function names so the Guard can carry a
// compact name reference without growing beyond 16 bytes.
//
// Layout: append-only Vec behind a Mutex. Reads during drop_cold take the
// lock briefly; writes happen once per unique name in enter_cold.
// A thread-local cache avoids the global lock on the hot path.

/// Global interned name table: index -> &'static str.
static NAME_TABLE: SyncOnceCell<Mutex<Vec<&'static str>>> = SyncOnceCell::new();

fn name_table() -> &'static Mutex<Vec<&'static str>> {
NAME_TABLE.get_or_init(|| Mutex::new(Vec::new()))
}

// Thread-local cache mapping name pointer -> interned ID.
// Uses pointer identity (`&'static str` addresses are stable).
thread_local! {
static NAME_CACHE: RefCell<HashMap<usize, u16>> = RefCell::new(HashMap::new());
}

/// Intern a function name, returning its u16 ID.
/// Fast path: thread-local cache hit (no global lock).
/// Slow path: global table lookup/insert under lock, then cache.
#[inline(always)]
fn intern_name(name: &'static str) -> u16 {
let ptr = name.as_ptr() as usize;
let cached = NAME_CACHE.with(|cache| cache.borrow().get(&ptr).copied());
if let Some(id) = cached {
return id;
}
intern_name_slow(name, ptr)
}

#[inline(never)]
fn intern_name_slow(name: &'static str, ptr: usize) -> u16 {
let mut table = name_table().lock().unwrap_or_else(|e| e.into_inner());
// Check if already in global table (another thread may have added it).
let id = if let Some(pos) = table.iter().position(|&n| n.as_ptr() as usize == ptr) {
pos as u16
} else {
let len = table.len();
debug_assert!(
len <= u16::MAX as usize,
"interned name table overflow: more than 65535 unique function names"
);
if len > u16::MAX as usize {
// Table full (65536 entries, indices 0..=u16::MAX). Saturate
// instead of wrapping. lookup_name handles out-of-bounds by
// returning "<unknown>", so this degrades gracefully.
return u16::MAX;
}
let id = len as u16;
table.push(name);
id
};
drop(table);
NAME_CACHE.with(|cache| {
cache.borrow_mut().insert(ptr, id);
});
id
}

/// Look up a function name by its interned ID.
/// Returns `"<unknown>"` if the ID is out of bounds (should never happen).
fn lookup_name(id: u16) -> &'static str {
let table = name_table().lock().unwrap_or_else(|e| e.into_inner());
table.get(id as usize).copied().unwrap_or("<unknown>")
}

/// Pack a thread cookie (32 bits), name ID (16 bits), and stack depth (16 bits)
/// into a single u64. Guard stays 16 bytes.
///
/// Layout: `[cookie:32][name_id:16][depth:16]`
///
/// Note: only the low 32 bits of the cookie are stored, limiting unique thread
/// identification to ~4 billion threads per process. This is a deliberate
/// tradeoff (from a previous 48-bit cookie) to make room for the 16-bit
/// name_id. If THREAD_COOKIE exceeds 2^32, `unpack_cookie` returns only the
/// low 32 bits, which could cause false migration detection in `drop_cold`.
#[inline(always)]
fn pack_cookie_depth(cookie: u64, depth: u16) -> u64 {
(cookie << 16) | depth as u64
fn pack_cookie_name_depth(cookie: u64, name_id: u16, depth: u16) -> u64 {
(cookie << 32) | ((name_id as u64) << 16) | (depth as u64)
}

/// Unpack the thread cookie from a packed u64.
/// Unpack the thread cookie (high 32 bits) from a packed u64.
#[inline(always)]
fn unpack_cookie(packed: u64) -> u64 {
packed >> 16
packed >> 32
}

/// Unpack the name ID (bits 16..31) from a packed u64.
#[inline(always)]
fn unpack_name_id(packed: u64) -> u16 {
(packed >> 16) as u16
}

/// Unpack the stack depth from a packed u64.
/// Unpack the stack depth (low 16 bits) from a packed u64.
#[inline(always)]
fn unpack_depth(packed: u64) -> u16 {
packed as u16
Expand All @@ -230,7 +316,10 @@ fn unpack_depth(packed: u64) -> u16 {
#[must_use = "dropping the guard immediately records ~0ms; bind it with `let _guard = ...`"]
pub struct Guard {
start_tsc: u64,
/// High 48 bits: thread cookie, low 16 bits: stack depth.
/// Bit layout: `[cookie:32][name_id:16][depth:16]`
/// - cookie: identifies the thread that called enter()
/// - name_id: index into the global interned name table
/// - depth: stack depth at the time of enter()
packed: u64,
}

Expand Down Expand Up @@ -300,6 +389,7 @@ fn drop_cold(guard: &Guard, end_tsc: u64, #[cfg(feature = "cpu-time")] cpu_end_n
let migrated = drop_cookie != enter_cookie;

if migrated {
let name = lookup_name(unpack_name_id(guard.packed));
let elapsed_ns = crate::tsc::elapsed_ns(guard.start_tsc, end_tsc);
let elapsed_ms = elapsed_ns as f64 / 1_000_000.0;
let start_ns = crate::tsc::ticks_to_epoch_ns(guard.start_tsc, crate::tsc::epoch_tsc());
Expand Down Expand Up @@ -328,7 +418,7 @@ fn drop_cold(guard: &Guard, end_tsc: u64, #[cfg(feature = "cpu-time")] cpu_end_n
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(RawRecord {
name: "<migrated>",
name,
elapsed_ms,
children_ms: phantom_children_ms,
#[cfg(feature = "cpu-time")]
Expand All @@ -337,7 +427,7 @@ fn drop_cold(guard: &Guard, end_tsc: u64, #[cfg(feature = "cpu-time")] cpu_end_n
});

let invocation = InvocationRecord {
name: "<migrated>",
name,
start_ns,
elapsed_ns,
self_ns,
Expand Down Expand Up @@ -483,13 +573,14 @@ impl Drop for Guard {
}
}

/// Bookkeeping half of enter(): epoch, alloc save, stack push.
/// Returns a packed u64 (thread cookie << 16 | depth).
/// Bookkeeping half of enter(): epoch, alloc save, stack push, name interning.
/// Returns a packed u64: `[cookie:32][name_id:16][depth:16]`.
#[inline(never)]
fn enter_cold(name: &'static str) -> u64 {
let _ = epoch();

let cookie = THREAD_COOKIE.with(|c| *c);
let name_id = intern_name(name);

let saved_alloc = crate::alloc::ALLOC_COUNTERS
.try_with(|cell| {
Expand Down Expand Up @@ -518,7 +609,7 @@ fn enter_cold(name: &'static str) -> u64 {
depth
});

pack_cookie_depth(cookie, depth)
pack_cookie_name_depth(cookie, name_id, depth)
}

/// Start timing a function. Returns a Guard that records the measurement on drop.
Expand Down Expand Up @@ -2000,10 +2091,11 @@ mod tests {
});

let records = collect_all();
let rec = records.iter().find(|r| r.name == "<migrated>");
let rec = records.iter().find(|r| r.name == "migrating_fn");
assert!(
rec.is_some(),
"migrated guard should still produce a record with <migrated> name"
"migrated guard should preserve function name 'migrating_fn'. Got: {:?}",
records.iter().map(|r| &r.name).collect::<Vec<_>>()
);
assert!(
rec.unwrap().total_ms > 0.5,
Expand Down Expand Up @@ -2225,7 +2317,10 @@ mod tests {
});

let records = collect_all();
let rec = records.iter().find(|r| r.name == "<migrated>").unwrap();
let rec = records
.iter()
.find(|r| r.name == "cpu_migrated")
.expect("migrated guard should preserve name 'cpu_migrated'");
assert!(rec.total_ms > 0.0, "wall time captured");
assert!(
rec.cpu_self_ms == 0.0,
Expand Down Expand Up @@ -2310,8 +2405,8 @@ mod tests {

let parent_inv = invocations
.iter()
.find(|r| r.name == "<migrated>")
.expect("migrated parent should produce an invocation");
.find(|r| r.name == "mig_parent")
.expect("migrated parent should preserve name 'mig_parent'");
let child_inv = invocations
.iter()
.find(|r| r.name == "mig_child")
Expand Down Expand Up @@ -2348,8 +2443,8 @@ mod tests {
let records = collect_all();
let parent_rec = records
.iter()
.find(|r| r.name == "<migrated>")
.expect("migrated parent in collect_all");
.find(|r| r.name == "rec_parent")
.expect("migrated parent should preserve name 'rec_parent'");

assert!(
parent_rec.self_ms < parent_rec.total_ms,
Expand All @@ -2374,8 +2469,8 @@ mod tests {
let invocations = collect_invocations();
let migrated = invocations
.iter()
.find(|r| r.name == "<migrated>")
.expect("should have a migrated record");
.find(|r| r.name == "other_thread")
.expect("migrated guard should preserve name 'other_thread'");

assert_eq!(
migrated.self_ns, migrated.elapsed_ns,
Expand Down Expand Up @@ -2421,8 +2516,8 @@ mod tests {

let migrated = c_invocations
.iter()
.find(|r| r.name == "<migrated>")
.expect("should have migrated record");
.find(|r| r.name == "bc_parent")
.expect("migrated guard should preserve name 'bc_parent'");

// C's phantom captures c_child's time. B's phantom is leaked
// (documented limitation: no cross-thread phantom forwarding yet).
Expand Down Expand Up @@ -2462,4 +2557,123 @@ mod tests {
});
});
}

#[test]
fn migrated_guard_preserves_function_name() {
// Migrated guards should report the actual function name,
// not a generic "<migrated>" placeholder.
reset();
let guard = enter("real_fn_name");
burn_cpu(10_000);

std::thread::scope(|s| {
s.spawn(move || {
burn_cpu(10_000);
drop(guard);
});
});

let records = collect_all();
let rec = records.iter().find(|r| r.name == "real_fn_name");
assert!(
rec.is_some(),
"migrated guard should preserve function name 'real_fn_name'. Got: {:?}",
records.iter().map(|r| &r.name).collect::<Vec<_>>()
);
assert!(
rec.unwrap().total_ms > 0.0,
"should have recorded wall time"
);
}

#[test]
fn migrated_guards_distinguish_multiple_functions() {
// When multiple functions migrate, each should retain its own name
// instead of collapsing into a single "<migrated>" bucket.
reset();
let guard_a = enter("fn_alpha");
burn_cpu(5_000);

let guard_b = std::thread::scope(|s| s.spawn(|| enter("fn_beta")).join().unwrap());
burn_cpu(5_000);

// Drop both guards on different threads than where they were created.
std::thread::scope(|s| {
s.spawn(move || {
drop(guard_a);
});
});
std::thread::scope(|s| {
s.spawn(move || {
drop(guard_b);
});
});

let records = collect_all();
let names: Vec<&str> = records.iter().map(|r| r.name.as_str()).collect();
assert!(
names.contains(&"fn_alpha"),
"should have fn_alpha in records. Got: {names:?}"
);
assert!(
names.contains(&"fn_beta"),
"should have fn_beta in records. Got: {names:?}"
);
assert!(
!names.contains(&"<migrated>"),
"should NOT have <migrated> placeholder. Got: {names:?}"
);
}

#[test]
fn migrated_invocation_has_real_name() {
// Verify InvocationRecord also carries the real function name.
reset();
let guard = enter("inv_migrated_fn");
burn_cpu(10_000);

let invocations = std::thread::scope(|s| {
s.spawn(move || {
burn_cpu(10_000);
drop(guard);
collect_invocations()
})
.join()
.unwrap()
});

let inv = invocations.iter().find(|r| r.name == "inv_migrated_fn");
assert!(
inv.is_some(),
"migrated invocation should have name 'inv_migrated_fn'. Got: {:?}",
invocations.iter().map(|r| r.name).collect::<Vec<_>>()
);
assert!(
inv.unwrap().elapsed_ns > 0,
"should have recorded elapsed time"
);
}

#[test]
fn pack_unpack_round_trip() {
let cookie = 42u64;
let name_id = 1234u16;
let depth = 567u16;
let packed = pack_cookie_name_depth(cookie, name_id, depth);
assert_eq!(unpack_cookie(packed), cookie);
assert_eq!(unpack_name_id(packed), name_id);
assert_eq!(unpack_depth(packed), depth);

// Max values: verifies the full bit range.
let packed_max = pack_cookie_name_depth(u32::MAX as u64, u16::MAX, u16::MAX);
assert_eq!(unpack_cookie(packed_max), u32::MAX as u64);
assert_eq!(unpack_name_id(packed_max), u16::MAX);
assert_eq!(unpack_depth(packed_max), u16::MAX);

// Zero values: verifies zero-packing.
let packed_zero = pack_cookie_name_depth(0, 0, 0);
assert_eq!(unpack_cookie(packed_zero), 0);
assert_eq!(unpack_name_id(packed_zero), 0);
assert_eq!(unpack_depth(packed_zero), 0);
}
}
Loading