From b489ef1b954d65f106f8629ca3adc15012500f13 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 9 May 2026 03:12:20 +0000 Subject: [PATCH] refactor(sdk): share aggregator shape across hotspots roll-ups - Extract a generic `aggregate` helper for `aggregate_by_file`, `aggregate_by_bash`, `aggregate_by_subagent`. Each caller passes its key extractor, row initializer, and accumulator; the shared helper owns iteration, deduping, and the cost-desc sort. - Switch cost comparators from `partial_cmp(...).unwrap_or(Ordering::Equal)` to `f64::total_cmp` so monotonic ordering is exact. Applies to file/bash/subagent aggregates plus the bash-verb path. - Fold the per-session grand-total accumulation into `attribute_session`'s existing turn loop, eliminating the second pass over `session_turns` in `attribute_hotspots`. Closes #340. --- crates/relayburn-sdk/src/analyze/hotspots.rs | 236 ++++++++++--------- 1 file changed, 127 insertions(+), 109 deletions(-) diff --git a/crates/relayburn-sdk/src/analyze/hotspots.rs b/crates/relayburn-sdk/src/analyze/hotspots.rs index 8324c562..6204ee0b 100644 --- a/crates/relayburn-sdk/src/analyze/hotspots.rs +++ b/crates/relayburn-sdk/src/analyze/hotspots.rs @@ -241,16 +241,7 @@ pub fn attribute_hotspots(turns: &[TurnRecord], opts: &HotspotsOptions<'_>) -> H user_turns, ); - let mut session_grand = 0.0_f64; - for t in &session_turns { - // Use the canonical `cost_for_turn` so hotspots totals stay in - // lock-step with `cost.rs` for sources whose reasoning billing - // semantics differ (Codex `included_in_output`, models with a - // separate reasoning tariff, etc.). - if let Some(b) = cost_for_turn(t, opts.pricing) { - session_grand += b.total; - } - } + let session_grand = session_result.grand_total; let session_attributed: f64 = session_result .attributions .iter() @@ -287,6 +278,9 @@ struct PerTurnContent { struct SessionAttribution { attributions: Vec, method: AttributionMethod, + /// Sum of `cost_for_turn` over the session's turns. Computed inside + /// `attribute_session`'s turn loop so callers don't need a second pass. + grand_total: f64, } fn attribute_session( @@ -299,6 +293,7 @@ fn attribute_session( return SessionAttribution { attributions: Vec::new(), method: AttributionMethod::EvenSplit, + grand_total: 0.0, }; } @@ -344,10 +339,20 @@ fn attribute_session( // to ride along (persistence) on subsequent turns until the cacheRead // eviction signal drops them. let mut riding_active: Vec = Vec::new(); + let mut grand_total = 0.0_f64; for turn in turns { let turn_rate = lookup_model_rate(&turn.model, pricing); + // Accumulate the per-turn grand total in this same pass. Routes + // through the canonical `cost_for_turn` so hotspots stays in + // lock-step with `cost.rs` for source-specific reasoning billing + // (Codex `included_in_output`, models with a separate reasoning + // tariff, etc.). + if let Some(b) = cost_for_turn(turn, pricing) { + grand_total += b.total; + } + // 1) Initial cost: this turn pays for tool_results emitted on the // previous turn. Use THIS turn's rate and (input/cacheCreate) mix // — not the emit turn's. @@ -479,6 +484,7 @@ fn attribute_session( SessionAttribution { attributions, method, + grand_total, } } @@ -559,48 +565,73 @@ fn estimate_tokens(text: &str) -> u64 { utf16_len.div_ceil(CHARS_PER_TOKEN) } +/// Shared shape for the simple aggregations: filter attributions by a key +/// extractor, accumulate into a per-key row, and sort by `total_cost` desc. +/// `aggregate_by_bash_verb` does not use this because it tracks distinct +/// hashes and per-verb examples on top of the basic shape. +fn aggregate( + attributions: &[ToolAttribution], + key: KeyFn, + init: InitFn, + accumulate: AccFn, + cost: CostFn, +) -> Vec +where + K: Eq + std::hash::Hash + Clone, + KeyFn: Fn(&ToolAttribution) -> Option, + InitFn: Fn(&K, &ToolAttribution) -> R, + AccFn: Fn(&mut R, &ToolAttribution), + CostFn: Fn(&R) -> f64, +{ + let mut by_key: IndexMap = IndexMap::new(); + for a in attributions { + let Some(k) = key(a) else { continue }; + let row = by_key.entry(k.clone()).or_insert_with(|| init(&k, a)); + accumulate(row, a); + } + let mut out: Vec = by_key.into_values().collect(); + out.sort_by(|a, b| cost(b).total_cmp(&cost(a))); + out +} + /// Roll up file-touching tool attributions (`Read | Edit | Write | /// NotebookEdit`) by their target path. Rows missing or with an empty target /// are skipped. Output is sorted by `total_cost` descending. pub fn aggregate_by_file(attributions: &[ToolAttribution]) -> Vec { - let mut by_path: IndexMap = IndexMap::new(); - for a in attributions { - if !FILE_TOOLS.contains(a.tool_name.as_str()) { - continue; - } - let target = match a.target.as_ref() { - Some(t) if !t.is_empty() => t, - _ => continue, - }; - let row = by_path - .entry(target.clone()) - .or_insert_with(|| FileAggregation { - path: target.clone(), - tool_call_count: 0, - initial_tokens: 0.0, - persistence_tokens: 0.0, - riding_turns: 0, - total_cost: 0.0, - first_emit_ts: a.emit_ts.clone(), - first_emit_turn_index: a.emit_turn_index, - }); - row.tool_call_count += 1; - row.initial_tokens += a.initial_tokens; - row.persistence_tokens += a.persistence_tokens; - row.riding_turns += a.riding_turns; - row.total_cost += a.total_cost; - if a.emit_ts < row.first_emit_ts { - row.first_emit_ts = a.emit_ts.clone(); - row.first_emit_turn_index = a.emit_turn_index; - } - } - let mut out: Vec = by_path.into_values().collect(); - out.sort_by(|a, b| { - b.total_cost - .partial_cmp(&a.total_cost) - .unwrap_or(std::cmp::Ordering::Equal) - }); - out + aggregate( + attributions, + |a| { + if !FILE_TOOLS.contains(a.tool_name.as_str()) { + return None; + } + match a.target.as_ref() { + Some(t) if !t.is_empty() => Some(t.clone()), + _ => None, + } + }, + |path, a| FileAggregation { + path: path.clone(), + tool_call_count: 0, + initial_tokens: 0.0, + persistence_tokens: 0.0, + riding_turns: 0, + total_cost: 0.0, + first_emit_ts: a.emit_ts.clone(), + first_emit_turn_index: a.emit_turn_index, + }, + |row, a| { + row.tool_call_count += 1; + row.initial_tokens += a.initial_tokens; + row.persistence_tokens += a.persistence_tokens; + row.riding_turns += a.riding_turns; + row.total_cost += a.total_cost; + if a.emit_ts < row.first_emit_ts { + row.first_emit_ts = a.emit_ts.clone(); + row.first_emit_turn_index = a.emit_turn_index; + } + }, + |row| row.total_cost, + ) } /// Roll up `Bash` tool attributions by `args_hash`, collapsing repeated @@ -608,33 +639,25 @@ pub fn aggregate_by_file(attributions: &[ToolAttribution]) -> Vec Vec { - let mut by_hash: IndexMap = IndexMap::new(); - for a in attributions { - if a.tool_name != "Bash" { - continue; - } - let row = by_hash - .entry(a.args_hash.clone()) - .or_insert_with(|| BashAggregation { - args_hash: a.args_hash.clone(), - command: a.target.clone(), - call_count: 0, - total_cost: 0.0, - initial_tokens: 0.0, - persistence_tokens: 0.0, - }); - row.call_count += 1; - row.total_cost += a.total_cost; - row.initial_tokens += a.initial_tokens; - row.persistence_tokens += a.persistence_tokens; - } - let mut out: Vec = by_hash.into_values().collect(); - out.sort_by(|a, b| { - b.total_cost - .partial_cmp(&a.total_cost) - .unwrap_or(std::cmp::Ordering::Equal) - }); - out + aggregate( + attributions, + |a| (a.tool_name == "Bash").then(|| a.args_hash.clone()), + |_, a| BashAggregation { + args_hash: a.args_hash.clone(), + command: a.target.clone(), + call_count: 0, + total_cost: 0.0, + initial_tokens: 0.0, + persistence_tokens: 0.0, + }, + |row, a| { + row.call_count += 1; + row.total_cost += a.total_cost; + row.initial_tokens += a.initial_tokens; + row.persistence_tokens += a.persistence_tokens; + }, + |row| row.total_cost, + ) } struct BashVerbAccumulator { @@ -723,8 +746,7 @@ where let mut examples: Vec = row.examples.into_values().collect(); examples.sort_by(|a, b| { b.total_cost - .partial_cmp(&a.total_cost) - .unwrap_or(std::cmp::Ordering::Equal) + .total_cmp(&a.total_cost) .then_with(|| a.command.cmp(&b.command)) }); let top_examples: Vec = examples @@ -750,8 +772,7 @@ where .collect(); out.sort_by(|a, b| { b.total_cost - .partial_cmp(&a.total_cost) - .unwrap_or(std::cmp::Ordering::Equal) + .total_cmp(&a.total_cost) .then_with(|| a.verb.cmp(&b.verb)) }); out @@ -761,36 +782,33 @@ where /// without a resolved type bucket under `"(unknown)"`. Output is sorted by /// `total_cost` descending. pub fn aggregate_by_subagent(attributions: &[ToolAttribution]) -> Vec { - let mut by_type: IndexMap = IndexMap::new(); - for a in attributions { - if a.tool_name != "Agent" && a.tool_name != "Task" { - continue; - } - let key = a - .subagent_type - .clone() - .unwrap_or_else(|| "(unknown)".to_string()); - let row = by_type - .entry(key.clone()) - .or_insert_with(|| SubagentAggregation { - subagent_type: key, - call_count: 0, - total_cost: 0.0, - initial_tokens: 0.0, - persistence_tokens: 0.0, - }); - row.call_count += 1; - row.total_cost += a.total_cost; - row.initial_tokens += a.initial_tokens; - row.persistence_tokens += a.persistence_tokens; - } - let mut out: Vec = by_type.into_values().collect(); - out.sort_by(|a, b| { - b.total_cost - .partial_cmp(&a.total_cost) - .unwrap_or(std::cmp::Ordering::Equal) - }); - out + aggregate( + attributions, + |a| { + if a.tool_name != "Agent" && a.tool_name != "Task" { + return None; + } + Some( + a.subagent_type + .clone() + .unwrap_or_else(|| "(unknown)".to_string()), + ) + }, + |key, _| SubagentAggregation { + subagent_type: key.clone(), + call_count: 0, + total_cost: 0.0, + initial_tokens: 0.0, + persistence_tokens: 0.0, + }, + |row, a| { + row.call_count += 1; + row.total_cost += a.total_cost; + row.initial_tokens += a.initial_tokens; + row.persistence_tokens += a.persistence_tokens; + }, + |row| row.total_cost, + ) } #[cfg(test)]