Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 127 additions & 109 deletions crates/relayburn-sdk/src/analyze/hotspots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,16 +241,7 @@ pub fn attribute_hotspots(turns: &[TurnRecord], opts: &HotspotsOptions<'_>) -> H
user_turns,
);

let mut session_grand = 0.0_f64;
for t in &session_turns {
// Use the canonical `cost_for_turn` so hotspots totals stay in
// lock-step with `cost.rs` for sources whose reasoning billing
// semantics differ (Codex `included_in_output`, models with a
// separate reasoning tariff, etc.).
if let Some(b) = cost_for_turn(t, opts.pricing) {
session_grand += b.total;
}
}
let session_grand = session_result.grand_total;
let session_attributed: f64 = session_result
.attributions
.iter()
Expand Down Expand Up @@ -287,6 +278,9 @@ struct PerTurnContent {
struct SessionAttribution {
attributions: Vec<ToolAttribution>,
method: AttributionMethod,
/// Sum of `cost_for_turn` over the session's turns. Computed inside
/// `attribute_session`'s turn loop so callers don't need a second pass.
grand_total: f64,
}

fn attribute_session(
Expand All @@ -299,6 +293,7 @@ fn attribute_session(
return SessionAttribution {
attributions: Vec::new(),
method: AttributionMethod::EvenSplit,
grand_total: 0.0,
};
}

Expand Down Expand Up @@ -344,10 +339,20 @@ fn attribute_session(
// to ride along (persistence) on subsequent turns until the cacheRead
// eviction signal drops them.
let mut riding_active: Vec<usize> = Vec::new();
let mut grand_total = 0.0_f64;

for turn in turns {
let turn_rate = lookup_model_rate(&turn.model, pricing);

// Accumulate the per-turn grand total in this same pass. Routes
// through the canonical `cost_for_turn` so hotspots stays in
// lock-step with `cost.rs` for source-specific reasoning billing
// (Codex `included_in_output`, models with a separate reasoning
// tariff, etc.).
if let Some(b) = cost_for_turn(turn, pricing) {
grand_total += b.total;
}

// 1) Initial cost: this turn pays for tool_results emitted on the
// previous turn. Use THIS turn's rate and (input/cacheCreate) mix
// — not the emit turn's.
Expand Down Expand Up @@ -479,6 +484,7 @@ fn attribute_session(
SessionAttribution {
attributions,
method,
grand_total,
}
}

Expand Down Expand Up @@ -559,82 +565,99 @@ fn estimate_tokens(text: &str) -> u64 {
utf16_len.div_ceil(CHARS_PER_TOKEN)
}

/// Shared shape for the simple aggregations: filter attributions by a key
/// extractor, accumulate into a per-key row, and sort by `total_cost` desc.
/// `aggregate_by_bash_verb` does not use this because it tracks distinct
/// hashes and per-verb examples on top of the basic shape.
fn aggregate<K, R, KeyFn, InitFn, AccFn, CostFn>(
attributions: &[ToolAttribution],
key: KeyFn,
init: InitFn,
accumulate: AccFn,
cost: CostFn,
) -> Vec<R>
where
K: Eq + std::hash::Hash + Clone,
KeyFn: Fn(&ToolAttribution) -> Option<K>,
InitFn: Fn(&K, &ToolAttribution) -> R,
AccFn: Fn(&mut R, &ToolAttribution),
CostFn: Fn(&R) -> f64,
{
let mut by_key: IndexMap<K, R> = IndexMap::new();
for a in attributions {
let Some(k) = key(a) else { continue };
let row = by_key.entry(k.clone()).or_insert_with(|| init(&k, a));
accumulate(row, a);
}
let mut out: Vec<R> = by_key.into_values().collect();
out.sort_by(|a, b| cost(b).total_cmp(&cost(a)));
out
}

/// Roll up file-touching tool attributions (`Read | Edit | Write |
/// NotebookEdit`) by their target path. Rows missing or with an empty target
/// are skipped. Output is sorted by `total_cost` descending.
pub fn aggregate_by_file(attributions: &[ToolAttribution]) -> Vec<FileAggregation> {
let mut by_path: IndexMap<String, FileAggregation> = IndexMap::new();
for a in attributions {
if !FILE_TOOLS.contains(a.tool_name.as_str()) {
continue;
}
let target = match a.target.as_ref() {
Some(t) if !t.is_empty() => t,
_ => continue,
};
let row = by_path
.entry(target.clone())
.or_insert_with(|| FileAggregation {
path: target.clone(),
tool_call_count: 0,
initial_tokens: 0.0,
persistence_tokens: 0.0,
riding_turns: 0,
total_cost: 0.0,
first_emit_ts: a.emit_ts.clone(),
first_emit_turn_index: a.emit_turn_index,
});
row.tool_call_count += 1;
row.initial_tokens += a.initial_tokens;
row.persistence_tokens += a.persistence_tokens;
row.riding_turns += a.riding_turns;
row.total_cost += a.total_cost;
if a.emit_ts < row.first_emit_ts {
row.first_emit_ts = a.emit_ts.clone();
row.first_emit_turn_index = a.emit_turn_index;
}
}
let mut out: Vec<FileAggregation> = by_path.into_values().collect();
out.sort_by(|a, b| {
b.total_cost
.partial_cmp(&a.total_cost)
.unwrap_or(std::cmp::Ordering::Equal)
});
out
aggregate(
attributions,
|a| {
if !FILE_TOOLS.contains(a.tool_name.as_str()) {
return None;
}
match a.target.as_ref() {
Some(t) if !t.is_empty() => Some(t.clone()),
_ => None,
}
},
|path, a| FileAggregation {
path: path.clone(),
tool_call_count: 0,
initial_tokens: 0.0,
persistence_tokens: 0.0,
riding_turns: 0,
total_cost: 0.0,
first_emit_ts: a.emit_ts.clone(),
first_emit_turn_index: a.emit_turn_index,
},
|row, a| {
row.tool_call_count += 1;
row.initial_tokens += a.initial_tokens;
row.persistence_tokens += a.persistence_tokens;
row.riding_turns += a.riding_turns;
row.total_cost += a.total_cost;
if a.emit_ts < row.first_emit_ts {
row.first_emit_ts = a.emit_ts.clone();
row.first_emit_turn_index = a.emit_turn_index;
}
},
|row| row.total_cost,
)
}

/// Roll up `Bash` tool attributions by `args_hash`, collapsing repeated
/// invocations of the same canonicalized command into a single row. The
/// representative `command` is the first-seen literal target. Output is
/// sorted by `total_cost` descending.
pub fn aggregate_by_bash(attributions: &[ToolAttribution]) -> Vec<BashAggregation> {
let mut by_hash: IndexMap<String, BashAggregation> = IndexMap::new();
for a in attributions {
if a.tool_name != "Bash" {
continue;
}
let row = by_hash
.entry(a.args_hash.clone())
.or_insert_with(|| BashAggregation {
args_hash: a.args_hash.clone(),
command: a.target.clone(),
call_count: 0,
total_cost: 0.0,
initial_tokens: 0.0,
persistence_tokens: 0.0,
});
row.call_count += 1;
row.total_cost += a.total_cost;
row.initial_tokens += a.initial_tokens;
row.persistence_tokens += a.persistence_tokens;
}
let mut out: Vec<BashAggregation> = by_hash.into_values().collect();
out.sort_by(|a, b| {
b.total_cost
.partial_cmp(&a.total_cost)
.unwrap_or(std::cmp::Ordering::Equal)
});
out
aggregate(
attributions,
|a| (a.tool_name == "Bash").then(|| a.args_hash.clone()),
|_, a| BashAggregation {
args_hash: a.args_hash.clone(),
command: a.target.clone(),
call_count: 0,
total_cost: 0.0,
initial_tokens: 0.0,
persistence_tokens: 0.0,
},
|row, a| {
row.call_count += 1;
row.total_cost += a.total_cost;
row.initial_tokens += a.initial_tokens;
row.persistence_tokens += a.persistence_tokens;
},
|row| row.total_cost,
)
}

struct BashVerbAccumulator {
Expand Down Expand Up @@ -723,8 +746,7 @@ where
let mut examples: Vec<BashVerbExample> = row.examples.into_values().collect();
examples.sort_by(|a, b| {
b.total_cost
.partial_cmp(&a.total_cost)
.unwrap_or(std::cmp::Ordering::Equal)
.total_cmp(&a.total_cost)
.then_with(|| a.command.cmp(&b.command))
});
let top_examples: Vec<String> = examples
Expand All @@ -750,8 +772,7 @@ where
.collect();
out.sort_by(|a, b| {
b.total_cost
.partial_cmp(&a.total_cost)
.unwrap_or(std::cmp::Ordering::Equal)
.total_cmp(&a.total_cost)
.then_with(|| a.verb.cmp(&b.verb))
});
out
Expand All @@ -761,36 +782,33 @@ where
/// without a resolved type bucket under `"(unknown)"`. Output is sorted by
/// `total_cost` descending.
pub fn aggregate_by_subagent(attributions: &[ToolAttribution]) -> Vec<SubagentAggregation> {
let mut by_type: IndexMap<String, SubagentAggregation> = IndexMap::new();
for a in attributions {
if a.tool_name != "Agent" && a.tool_name != "Task" {
continue;
}
let key = a
.subagent_type
.clone()
.unwrap_or_else(|| "(unknown)".to_string());
let row = by_type
.entry(key.clone())
.or_insert_with(|| SubagentAggregation {
subagent_type: key,
call_count: 0,
total_cost: 0.0,
initial_tokens: 0.0,
persistence_tokens: 0.0,
});
row.call_count += 1;
row.total_cost += a.total_cost;
row.initial_tokens += a.initial_tokens;
row.persistence_tokens += a.persistence_tokens;
}
let mut out: Vec<SubagentAggregation> = by_type.into_values().collect();
out.sort_by(|a, b| {
b.total_cost
.partial_cmp(&a.total_cost)
.unwrap_or(std::cmp::Ordering::Equal)
});
out
aggregate(
attributions,
|a| {
if a.tool_name != "Agent" && a.tool_name != "Task" {
return None;
}
Some(
a.subagent_type
.clone()
.unwrap_or_else(|| "(unknown)".to_string()),
)
},
|key, _| SubagentAggregation {
subagent_type: key.clone(),
call_count: 0,
total_cost: 0.0,
initial_tokens: 0.0,
persistence_tokens: 0.0,
},
|row, a| {
row.call_count += 1;
row.total_cost += a.total_cost;
row.initial_tokens += a.initial_tokens;
row.persistence_tokens += a.persistence_tokens;
},
|row| row.total_cost,
)
}

#[cfg(test)]
Expand Down
Loading