From 00a7b12daf720b1f1af39c313ba24da542c08312 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 26 May 2026 11:37:55 +0000 Subject: [PATCH 1/2] sdk/cli: per-inference context-delta attribution (#432) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New `burn overhead deltas` verb answers "what blew up my context between inference N and inference N+1?" by walking each session's TurnSpanTree timeline, pairing same-rail Inference spans, and attributing the delta in `tokens.input + cache_read + cache_write` to the intervening ToolResult / UserPrompt / SystemReminder leaves. SDK surface: `LedgerHandle::context_delta(ContextDeltaOpts)` returns `Vec` with per-step intervening breakdown, attributed cost (charged at cache_read rate — what the *future* will pay for the persisted prefix), and compaction events surfaced as their own row rather than negative deltas. Main-rail deltas never see subagent tool_results and vice versa. Tool-result token estimates use `output_bytes / 4` as a first-cut fallback; documented as approximate in the output. CLI: `burn overhead deltas [--session ID] [--top N] [--min-delta TOK] [--owner main|subagent|all] [--explain] [--json]`. Default top is 20, default min_delta is 1000 tokens. Compaction rows ignore min_delta so they always surface. Tests: 9 unit tests covering the Bash blow-up driver path, compaction- replaces-negative-delta, subagent isolation, owner filter, top cap, min_delta filter, single-inference no-op, and JSON wire format. Two golden snapshots (`overhead-deltas`, `overhead-deltas-json`) anchor the CLI output against the fixture ledger. --- CHANGELOG.md | 1 + crates/relayburn-cli/src/cli.rs | 49 + crates/relayburn-cli/src/commands/overhead.rs | 264 +++- crates/relayburn-sdk/src/analyze.rs | 5 + .../src/analyze/context_delta.rs | 1115 +++++++++++++++++ crates/relayburn-sdk/src/lib.rs | 5 + crates/relayburn-sdk/src/query_verbs.rs | 91 ++ tests/fixtures/cli-golden/invocations.json | 12 + .../snapshots/overhead-deltas-json.stdout.txt | 59 + .../snapshots/overhead-deltas.stdout.txt | 8 + 10 files changed, 1605 insertions(+), 4 deletions(-) create mode 100644 crates/relayburn-sdk/src/analyze/context_delta.rs create mode 100644 tests/fixtures/cli-golden/snapshots/overhead-deltas-json.stdout.txt create mode 100644 tests/fixtures/cli-golden/snapshots/overhead-deltas.stdout.txt diff --git a/CHANGELOG.md b/CHANGELOG.md index b1c32077..5d75d717 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Cross-package release notes for relayburn. Package changelogs contain package-le ### Added +- `burn overhead deltas`: per-inference context-window attribution. New `--session`, `--top`, `--min-delta`, `--owner`, `--explain`, `--json` flags surface "what blew up my context between inference N and inference N+1?" — pairs same-rail `Inference` spans, attributes the delta in `input + cache_read + cache_write` to intervening `ToolResult` / `UserPrompt` / `SystemReminder` leaves, surfaces compaction events as their own row (never a negative delta), and isolates main-rail deltas from subagent rails. SDK entry point: `LedgerHandle::context_delta(opts)`. (#432) - `relayburn-sdk`: per-turn span tree as derived analytical primitive. New `LedgerHandle::turn_span_tree(session_id, turn_id)` and `session_span_trees(session_id)` verbs project `TurnRecord` + diff --git a/crates/relayburn-cli/src/cli.rs b/crates/relayburn-cli/src/cli.rs index 66264b29..9e6055a7 100644 --- a/crates/relayburn-cli/src/cli.rs +++ b/crates/relayburn-cli/src/cli.rs @@ -313,6 +313,9 @@ pub enum OverheadAction { /// each overhead file. Recommendations only — `burn` never /// modifies the source files. Trim(OverheadTrimArgs), + /// Per-inference context-window deltas: "what blew up my context + /// between inference N and inference N+1?" See AgentWorkforce/burn#432. + Deltas(OverheadDeltasArgs), } /// `burn overhead trim` flags layered on top of [`OverheadArgs`]. @@ -323,6 +326,52 @@ pub struct OverheadTrimArgs { pub top: Option, } +/// `burn overhead deltas` flags layered on top of [`OverheadArgs`]. +#[derive(Debug, ClapArgs)] +pub struct OverheadDeltasArgs { + /// Restrict to a single session id. When unset, every session in the + /// ledger window contributes. + #[arg(long, value_name = "ID")] + pub session: Option, + + /// Row cap. Defaults to 20. + #[arg(long, value_name = "N")] + pub top: Option, + + /// Hide deltas below this many tokens. Defaults to 1000 (the noise + /// floor). Compaction rows always show through regardless. + #[arg(long, value_name = "TOKENS")] + pub min_delta: Option, + + /// Rail filter: `main` (top-level conversation), `subagent`, or + /// `all` (default). + #[arg(long, value_enum, value_name = "RAIL", default_value = "all")] + pub owner: OverheadDeltasOwner, + + /// Expand intervening steps in the human table. Without this, only + /// the driver step is shown per row. + #[arg(long)] + pub explain: bool, +} + +/// CLI-facing mirror of [`relayburn_sdk::ContextDeltaOwnerFilter`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] +pub enum OverheadDeltasOwner { + All, + Main, + Subagent, +} + +impl From for relayburn_sdk::ContextDeltaOwnerFilter { + fn from(o: OverheadDeltasOwner) -> Self { + match o { + OverheadDeltasOwner::All => relayburn_sdk::ContextDeltaOwnerFilter::All, + OverheadDeltasOwner::Main => relayburn_sdk::ContextDeltaOwnerFilter::Main, + OverheadDeltasOwner::Subagent => relayburn_sdk::ContextDeltaOwnerFilter::Subagent, + } + } +} + // --------------------------------------------------------------------------- // `burn state` — typed args + nested subcommand // --------------------------------------------------------------------------- diff --git a/crates/relayburn-cli/src/commands/overhead.rs b/crates/relayburn-cli/src/commands/overhead.rs index 998ee342..365fafde 100644 --- a/crates/relayburn-cli/src/commands/overhead.rs +++ b/crates/relayburn-cli/src/commands/overhead.rs @@ -9,12 +9,14 @@ use std::io::{self, Write}; use std::path::{Path, PathBuf}; use relayburn_sdk::{ - describe_applies_to, overhead as sdk_overhead, overhead_trim as sdk_overhead_trim, - OverheadFileSummary, OverheadOptions, OverheadPerFileEntry, OverheadResult, - OverheadSectionCost, OverheadTrimOptions, OverheadTrimResult, + context_delta as sdk_context_delta, describe_applies_to, overhead as sdk_overhead, + overhead_trim as sdk_overhead_trim, ContextDelta, ContextDeltaOpts, + ContextDeltaOwnerRail as OwnerRail, InterveningStep, OverheadFileSummary, OverheadOptions, + OverheadPerFileEntry, OverheadResult, OverheadSectionCost, OverheadTrimOptions, + OverheadTrimResult, }; -use crate::cli::{GlobalArgs, OverheadAction, OverheadArgs}; +use crate::cli::{GlobalArgs, OverheadAction, OverheadArgs, OverheadDeltasArgs}; use crate::render::error::report_error; use crate::render::format::{ coerce_whole_f64_to_int, format_tokens, format_uint, format_usd, render_table, @@ -27,6 +29,7 @@ pub fn run(globals: &GlobalArgs, args: OverheadArgs) -> i32 { Some(OverheadAction::Trim(trim)) => { run_trim(globals, args.project, args.since, args.kind, trim.top) } + Some(OverheadAction::Deltas(deltas)) => run_deltas(globals, deltas), None => run_report(globals, args.project, args.since, args.kind), } } @@ -361,6 +364,205 @@ fn format_line_range(start: u64, end: u64) -> String { format!("{s}-{e}") } +// --------------------------------------------------------------------------- +// `burn overhead deltas` (#432) +// --------------------------------------------------------------------------- + +fn run_deltas(globals: &GlobalArgs, args: OverheadDeltasArgs) -> i32 { + let opts = ContextDeltaOpts { + session: args.session.clone(), + since: None, + top: args.top, + min_delta: args.min_delta, + owner: args.owner.into(), + }; + let progress = TaskProgress::new(globals, "overhead deltas"); + progress.set_task("computing context deltas"); + let deltas = match sdk_context_delta(opts, globals.ledger_path.clone()) { + Ok(d) => d, + Err(err) => { + progress.finish_and_clear(); + return report_error(&err, globals); + } + }; + progress.finish_and_clear(); + + if globals.json { + let mut value = match serde_json::to_value(&deltas) { + Ok(v) => v, + Err(err) => return report_error(&io::Error::other(err), globals), + }; + coerce_whole_f64_to_int(&mut value); + if let Err(err) = render_json(&value) { + return report_error(&err, globals); + } + return 0; + } + + if let Err(err) = render_human_deltas(&deltas, args.explain) { + return report_error(&err, globals); + } + 0 +} + +fn render_human_deltas(deltas: &[ContextDelta], explain: bool) -> io::Result<()> { + let stdout = io::stdout(); + let mut handle = stdout.lock(); + + if deltas.is_empty() { + return handle.write_all(b"# no context deltas above threshold\n"); + } + + let mut table: Vec> = Vec::with_capacity(deltas.len() + 1); + table.push(vec![ + "Inference".to_string(), + "Owner".to_string(), + "Delta".to_string(), + "Cost".to_string(), + "Driver".to_string(), + ]); + for d in deltas { + let inf_label = format!("{}/inf{}", short_turn_label(&d.turn_id), d.inference_idx); + let owner_label = match &d.owner_rail { + OwnerRail::Main => "main".to_string(), + OwnerRail::Subagent { agent_id } => format!("sub:{}", short_agent_label(agent_id)), + }; + let delta_label = format_signed_tokens(d.delta_tokens); + let cost_label = format_usd(d.attributed_cost_usd); + let driver_label = driver_summary(&d.intervening); + table.push(vec![ + inf_label, + owner_label, + delta_label, + cost_label, + driver_label, + ]); + } + handle.write_all(render_table(&table).as_bytes())?; + handle.write_all(b"\n")?; + + if explain { + handle.write_all(b"\n")?; + for d in deltas { + let inf_label = format!("{}/inf{}", short_turn_label(&d.turn_id), d.inference_idx); + let header = format!( + "{inf_label} — {} steps, prior {} -> current {} tok\n", + d.intervening.len(), + format_tokens(d.prior_context_tokens), + format_tokens(d.current_context_tokens), + ); + handle.write_all(header.as_bytes())?; + for step in &d.intervening { + let line = format!(" - {}\n", explain_step(step)); + handle.write_all(line.as_bytes())?; + } + } + } + + handle.write_all( + b"\n# token / cost figures are approximate (bytes/4 for tool results,\n\ + # cache-read rate for cost). Compaction rows surface separately and\n\ + # never appear as negative deltas.\n", + )?; + handle.flush()?; + Ok(()) +} + +fn short_turn_label(turn_id: &str) -> String { + // Turn ids on Claude are `msg-...` UUIDs; trim to a short prefix + // for the table. Keep the original for JSON output. + let trimmed = turn_id.trim_start_matches("msg_"); + let trimmed = trimmed.trim_start_matches("msg-"); + if trimmed.len() > 8 { + format!("T{}", &trimmed[..8]) + } else { + format!("T{trimmed}") + } +} + +fn short_agent_label(agent_id: &str) -> String { + let trimmed = agent_id.trim_start_matches("agent-"); + if trimmed.len() > 8 { + trimmed[..8].to_string() + } else { + trimmed.to_string() + } +} + +fn format_signed_tokens(n: i64) -> String { + let sign = if n > 0 { "+" } else { "" }; + format!("{sign}{}", format_tokens(n.unsigned_abs())) +} + +fn driver_summary(steps: &[InterveningStep]) -> String { + if steps.is_empty() { + return "(no intervening leaves)".to_string(); + } + // Largest step by approx_tokens, with a "N steps" suffix when more + // than one. Compaction rows always win their summary because + // freeing tokens is the most explanatory signal. + if let Some(comp) = steps + .iter() + .find(|s| matches!(s, InterveningStep::Compaction { .. })) + { + return comp.driver_label(); + } + let largest = steps + .iter() + .max_by_key(|s| s.approx_tokens()) + .expect("non-empty"); + let extra = steps.len().saturating_sub(1); + if extra == 0 { + largest.driver_label() + } else { + format!( + "{} (+{extra} more step{})", + largest.driver_label(), + if extra == 1 { "" } else { "s" } + ) + } +} + +fn explain_step(step: &InterveningStep) -> String { + match step { + InterveningStep::ToolResult { + tool_use_id, + tool_name, + approx_tokens, + approx_bytes, + truncated, + } => format!( + "tool_result {tool_name} (id={tool_use_id}): ~{} tok / {} bytes{}", + format_tokens(*approx_tokens), + format_uint(*approx_bytes), + if *truncated { " [truncated]" } else { "" }, + ), + InterveningStep::UserPrompt { + approx_tokens, + has_system_reminder, + } => format!( + "user prompt: ~{} tok{}", + format_tokens(*approx_tokens), + if *has_system_reminder { + " (with system-reminder)" + } else { + "" + }, + ), + InterveningStep::SystemReminder { + source, + approx_tokens, + } => format!( + "system-reminder ({source:?}): ~{} tok", + format_tokens(*approx_tokens), + ), + InterveningStep::Compaction { tokens_freed } => { + format!("compaction: -{} tok freed", format_tokens(*tokens_freed)) + } + InterveningStep::Other => "other".to_string(), + } +} + #[cfg(test)] mod tests { use super::*; @@ -370,4 +572,58 @@ mod tests { assert_eq!(format_line_range(7, 11), " 7- 11"); assert_eq!(format_line_range(100, 200), " 100- 200"); } + + #[test] + fn short_turn_label_trims_msg_prefix() { + assert_eq!(short_turn_label("msg_abcdef1234"), "Tabcdef12"); + assert_eq!(short_turn_label("msg-deadbeef"), "Tdeadbeef"); + assert_eq!(short_turn_label("xyz"), "Txyz"); + } + + #[test] + fn driver_summary_singles_out_compaction() { + let steps = vec![ + InterveningStep::ToolResult { + tool_use_id: "tu-1".into(), + tool_name: "Bash".into(), + approx_tokens: 100, + approx_bytes: 400, + truncated: false, + }, + InterveningStep::Compaction { + tokens_freed: 5000, + }, + ]; + let s = driver_summary(&steps); + assert!(s.contains("compaction")); + } + + #[test] + fn driver_summary_picks_largest_step() { + let steps = vec![ + InterveningStep::ToolResult { + tool_use_id: "tu-1".into(), + tool_name: "Bash".into(), + approx_tokens: 100, + approx_bytes: 400, + truncated: false, + }, + InterveningStep::ToolResult { + tool_use_id: "tu-2".into(), + tool_name: "Read".into(), + approx_tokens: 5000, + approx_bytes: 20_000, + truncated: false, + }, + ]; + let s = driver_summary(&steps); + assert!(s.contains("Read"), "got {s}"); + assert!(s.contains("more"), "got {s}"); + } + + #[test] + fn format_signed_tokens_handles_positive_and_zero() { + assert_eq!(format_signed_tokens(0), "0"); + assert!(format_signed_tokens(5_000).starts_with('+')); + } } diff --git a/crates/relayburn-sdk/src/analyze.rs b/crates/relayburn-sdk/src/analyze.rs index f6a57027..bc8eea99 100644 --- a/crates/relayburn-sdk/src/analyze.rs +++ b/crates/relayburn-sdk/src/analyze.rs @@ -17,6 +17,7 @@ pub mod claude_md; pub mod compare; +pub mod context_delta; pub mod cost; pub mod fidelity; pub mod findings; @@ -44,6 +45,10 @@ pub use compare::{ build_compare_table, compare_from_archive, CompareCategory, CompareCell, CompareFromArchiveResult, CompareOptions, CompareTable, CompareTotals, DEFAULT_MIN_SAMPLE, }; +pub use context_delta::{ + deltas_for_session, ContextDelta, ContextDeltaOpts, InterveningStep, OwnerFilter, OwnerRail, + ReminderSource, +}; pub use cost::{cost_for_turn, cost_for_usage, sum_costs, CostBreakdown}; pub use overhead::{ attribute_overhead, describe_applies_to, find_overhead_files, load_overhead_file, diff --git a/crates/relayburn-sdk/src/analyze/context_delta.rs b/crates/relayburn-sdk/src/analyze/context_delta.rs new file mode 100644 index 00000000..1c330173 --- /dev/null +++ b/crates/relayburn-sdk/src/analyze/context_delta.rs @@ -0,0 +1,1115 @@ +//! Per-inference context-window delta attribution. +//! +//! Answers "what blew up my context between inference N and inference +//! N+1?" by walking [`TurnSpanTree`]s in order, pairing same-rail +//! [`SpanKind::Inference`] nodes, and attributing the delta in +//! `context_tokens` to the [`InterveningStep`]s that landed in the +//! prompt between them. See AgentWorkforce/burn#432. +//! +//! # Algorithm +//! +//! 1. Flatten every span across every turn in DFS order into a single +//! timeline. Each leaf the consumer cares about (`Inference`, +//! `ToolResult`, `UserPrompt`, system-reminder `UserPrompt`) gets a +//! position equal to its DFS index across the session. +//! 2. Bucket inferences by **owner rail**: `Main` for spans whose path +//! from the root does not pass through a `Subagent` node; +//! `Subagent(agent_id)` for spans that do. Each leaf is attributed to +//! exactly one rail — no cross-contamination. +//! 3. Within each rail, walk pairwise `(prev, curr)` inferences and +//! collect the intervening leaves (events whose position falls +//! strictly between `prev` and `curr`). +//! 4. `context_tokens(inf) = tokens.input + tokens.cache_read + +//! tokens.cache_write` summed off the `Inference` span's attributes. +//! 5. `delta = curr.context_tokens - prev.context_tokens`. +//! 6. **Compaction handling**: if `delta < 0` AND a [`CompactionEvent`] +//! sits between `prev` and `curr` (by timestamp), surface the row as +//! [`InterveningStep::Compaction`] with `tokens_freed = +//! prev - curr`. The `delta_tokens` on the returned [`ContextDelta`] +//! stays `0` in that case so a negative number never lands in the +//! output. +//! 7. **Cost**: charge `max(delta_tokens, 0) * curr_inference_cache_read_rate`. +//! Cache-read is the rate the *future* will pay for the persisted +//! prefix, which is the right charge for a "this much got added to +//! your context window" question (vs. cache-write, which the next +//! inference pays once when first persisting). The decision is +//! documented in the issue's open-question #3. +//! +//! # Subagent isolation +//! +//! Main-rail inferences never see subagent tool_results, and vice +//! versa. A subagent's tool_use under its parent `Task` is attributed +//! to the subagent rail; its inferences never enter the main-rail +//! pairwise walk. +//! +//! # Token estimates for tool_results +//! +//! We use `output_bytes / 4` as the approximate token count. The +//! `output_bytes` field comes from the ingest-time byte measurement +//! recorded on [`crate::reader::ToolResultEventRecord::output_bytes`] +//! (issue #444). The 4-bytes-per-token ratio is a first-cut +//! approximation; downstream consumers should treat the number as +//! advisory. + +use std::collections::HashMap; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +use crate::analyze::pricing::PricingTable; +use crate::analyze::span_tree::{AttrValue, SpanKind, SpanNode, TurnSpanTree}; +use crate::reader::CompactionEvent; + +/// Approximate bytes-per-token ratio used when no real tokenizer pass is +/// available. Mirrors the rule of thumb used elsewhere in burn for +/// content-size approximation. The output is marked approximate in +/// downstream JSON so consumers know not to bill on it. +const BYTES_PER_TOKEN: u64 = 4; + +/// Which "rail" an inference belongs to. The main conversation rail is +/// independent of every subagent rail; deltas are computed per-rail and +/// never cross. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "kebab-case")] +pub enum OwnerRail { + /// The top-level conversation between the user and the model. + Main, + /// A subagent dispatched by a `Task` tool_use. The `agentId` field + /// carries the `agent_id` attribute from the [`SpanKind::Subagent`] + /// span (or its parent in a nested case). + Subagent { + #[serde(rename = "agentId")] + agent_id: String, + }, +} + +/// Filter for [`ContextDeltaOpts::owner`]. Mirrors the CLI's +/// `--owner main|subagent|all` flag. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum OwnerFilter { + /// No filter: emit deltas for every rail. + #[default] + All, + /// Only main-rail deltas. + Main, + /// Only subagent-rail deltas. + Subagent, +} + +/// Source for a synthetic `` step. First-cut implementation +/// classifies every reminder as [`ReminderSource::Other`]; downstream +/// issue #425 will split into `Relaycast` / `Harness` proper. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum ReminderSource { + /// Reminder originated from relaycast injection. + Relaycast, + /// Reminder originated from the harness (Claude Code, opencode, …). + Harness, + /// Unclassified — first-cut default. Refined by #425. + Other, +} + +/// One leaf that landed in the prompt between two consecutive inferences +/// on the same rail. The `approx_tokens` fields are best-effort estimates +/// derived from `output_bytes / 4` (tool_result) or text-byte / 4 +/// (prompts and reminders). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "kebab-case")] +pub enum InterveningStep { + /// A tool_result block paired to a tool_use the model issued before + /// `prev` inference. Carries the tool name and an approximate + /// token / byte count for the result payload. + ToolResult { + #[serde(rename = "toolUseId")] + tool_use_id: String, + #[serde(rename = "toolName")] + tool_name: String, + #[serde(rename = "approxTokens")] + approx_tokens: u64, + #[serde(rename = "approxBytes")] + approx_bytes: u64, + truncated: bool, + }, + /// A user prompt that landed between the two inferences. Rare on + /// single-turn flows but happens on multi-turn auto-responses + /// (e.g. plan-mode confirmations). + UserPrompt { + #[serde(rename = "approxTokens")] + approx_tokens: u64, + #[serde(rename = "hasSystemReminder")] + has_system_reminder: bool, + }, + /// A `` content block. + SystemReminder { + source: ReminderSource, + #[serde(rename = "approxTokens")] + approx_tokens: u64, + }, + /// A compaction event sat between the two inferences and the delta + /// went negative as a result. The negative delta is replaced by a + /// `Compaction` row in the intervening list and the delta on the + /// containing [`ContextDelta`] is clamped to `0`. + Compaction { + #[serde(rename = "tokensFreed")] + tokens_freed: u64, + }, + /// Catch-all for spans that don't fall into the above categories + /// (kept for forward-compatibility — current builders never emit + /// this variant). + Other, +} + +impl InterveningStep { + /// Approximate token count attributed to this step. `Compaction` + /// counts as zero — it doesn't add tokens, it frees them. + pub fn approx_tokens(&self) -> u64 { + match self { + Self::ToolResult { approx_tokens, .. } => *approx_tokens, + Self::UserPrompt { approx_tokens, .. } => *approx_tokens, + Self::SystemReminder { approx_tokens, .. } => *approx_tokens, + Self::Compaction { .. } => 0, + Self::Other => 0, + } + } + + /// Short label for the "driver" column in human renderers. + pub fn driver_label(&self) -> String { + match self { + Self::ToolResult { tool_name, .. } => format!("{tool_name} result"), + Self::UserPrompt { .. } => "user prompt".to_string(), + Self::SystemReminder { .. } => "system-reminder".to_string(), + Self::Compaction { tokens_freed } => format!("compaction -{tokens_freed} tok"), + Self::Other => "other".to_string(), + } + } +} + +/// One per-rail (`prev`, `curr`) pair. The list returned by +/// [`LedgerHandle::context_delta`](crate::LedgerHandle::context_delta) +/// is sorted by `delta_tokens` descending (then `inference_idx` +/// ascending) and truncated to [`ContextDeltaOpts::top`]. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ContextDelta { + pub session_id: String, + pub turn_id: String, + /// Position of `curr` inference within its rail, 1-indexed. The + /// first inference of a rail has no `prev` so it never appears here. + pub inference_idx: u32, + pub owner_rail: OwnerRail, + pub prior_context_tokens: u64, + pub current_context_tokens: u64, + /// Always `>= 0` in the output. Negative raw deltas are surfaced as + /// [`InterveningStep::Compaction`] rows instead. `i64` is preserved + /// in the type so a future "raw delta" surface can use it without a + /// schema change. + pub delta_tokens: i64, + pub intervening: Vec, + #[serde(rename = "attributedCostUSD")] + pub attributed_cost_usd: f64, +} + +/// Options for the context-delta verb. Each field has a sensible +/// default; callers usually only need to set `session` and possibly +/// `top` or `min_delta`. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ContextDeltaOpts { + /// When set, narrow to a single session. When `None`, every session + /// in the ledger window contributes. + pub session: Option, + /// Time window (relative — `Duration::from_secs(24 * 3600)` by + /// default). Sessions whose latest activity falls before + /// `now - since` are skipped. + pub since: Option, + /// Output cap. Defaults to 20. + pub top: Option, + /// Hide deltas below this threshold. Defaults to 1000 tokens — the + /// "noise floor" the issue specifies. Compaction rows ignore this + /// (a compaction with `tokens_freed < min_delta` would otherwise + /// vanish, defeating the point). + pub min_delta: Option, + /// Rail filter. + #[serde(default)] + pub owner: OwnerFilter, +} + +impl ContextDeltaOpts { + pub fn effective_top(&self) -> u32 { + self.top.unwrap_or(20) + } + + pub fn effective_min_delta(&self) -> u64 { + self.min_delta.unwrap_or(1000) + } + + pub fn effective_since(&self) -> Duration { + self.since.unwrap_or(Duration::from_secs(24 * 3600)) + } +} + +// --------------------------------------------------------------------------- +// Pure algorithm: span trees + compactions + pricing -> Vec +// --------------------------------------------------------------------------- + +/// Compute per-rail context deltas across one session, given the +/// session's [`TurnSpanTree`]s in turn order plus its +/// [`CompactionEvent`]s (in any order — they're sorted internally). +/// +/// Pure derivation: no I/O, no DB writes, no caching. The +/// [`LedgerHandle`](crate::LedgerHandle) wrapper does the loading and +/// then calls into here. +/// +/// `pricing` is consulted for the per-million `cache_read` rate of the +/// `curr` inference's model. Models the pricing table doesn't recognize +/// charge `0.0` (matching the rest of the analyze surface, which never +/// surfaces costs it can't price). +pub fn deltas_for_session( + trees: &[TurnSpanTree], + compactions: &[CompactionEvent], + pricing: &PricingTable, + opts: &ContextDeltaOpts, +) -> Vec { + if trees.is_empty() { + return Vec::new(); + } + let timeline = build_timeline(trees); + let mut compactions_sorted: Vec<&CompactionEvent> = compactions.iter().collect(); + compactions_sorted.sort_by_key(|c| parse_iso_ms(&c.ts).unwrap_or(0)); + + let mut per_rail: HashMap> = HashMap::new(); + for (idx, item) in timeline.iter().enumerate() { + if matches!(item.kind, TimelineKind::Inference { .. }) { + per_rail.entry(item.owner.clone()).or_default().push(idx); + } + } + + let min_delta = opts.effective_min_delta() as i64; + let mut out: Vec = Vec::new(); + for (rail, inf_indices) in per_rail.iter() { + if !rail_passes_filter(rail, opts.owner) { + continue; + } + for (pair_idx, window) in inf_indices.windows(2).enumerate() { + let prev_pos = window[0]; + let curr_pos = window[1]; + let TimelineKind::Inference { + context_tokens: prev_ctx, + .. + } = timeline[prev_pos].kind + else { + continue; + }; + let TimelineKind::Inference { + context_tokens: curr_ctx, + model: ref curr_model, + } = timeline[curr_pos].kind + else { + continue; + }; + + let raw_delta = curr_ctx as i64 - prev_ctx as i64; + + // Collect intervening leaves between (prev_pos, curr_pos) on + // the same rail. Walk the flat timeline; ignore items on + // other rails so subagent leaves never enter a main-rail + // delta (and vice versa). + let mut intervening: Vec = Vec::new(); + for item in &timeline[prev_pos + 1..curr_pos] { + if item.owner != *rail { + continue; + } + if let Some(step) = item.to_intervening_step() { + intervening.push(step); + } + } + + // Compaction handling: if there's a compaction event between + // prev.end_ms and curr.start_ms AND the delta is negative, + // surface it as a Compaction row and clamp delta to 0. + let prev_end = timeline[prev_pos].end_ms; + let curr_start = timeline[curr_pos].start_ms; + let compaction_between = compactions_sorted.iter().any(|c| { + let ms = parse_iso_ms(&c.ts).unwrap_or(0); + ms >= prev_end && ms <= curr_start + }); + let (delta_tokens, intervening) = if raw_delta < 0 && compaction_between { + let freed = (prev_ctx - curr_ctx) as u64; + let mut steps = intervening; + steps.push(InterveningStep::Compaction { + tokens_freed: freed, + }); + (0i64, steps) + } else { + (raw_delta, intervening) + }; + + if delta_tokens < min_delta + && !intervening + .iter() + .any(|s| matches!(s, InterveningStep::Compaction { .. })) + { + continue; + } + + let session_id = timeline[curr_pos].session_id.clone(); + let turn_id = timeline[curr_pos].turn_id.clone(); + let cost = attributed_cost(delta_tokens, curr_model, pricing); + + out.push(ContextDelta { + session_id, + turn_id, + // 1-indexed position within the rail. `windows(2)` + // gives us pair index 0 = first pair = curr is the + // second inference, so the curr inference index is + // `pair_idx + 2` in 1-indexed terms. + inference_idx: (pair_idx as u32) + 2, + owner_rail: rail.clone(), + prior_context_tokens: prev_ctx, + current_context_tokens: curr_ctx, + delta_tokens, + intervening, + attributed_cost_usd: cost, + }); + } + } + + // Sort by delta descending, ties broken by turn_id then inference_idx + // so the output is deterministic across HashMap iteration order. + out.sort_by(|a, b| { + b.delta_tokens + .cmp(&a.delta_tokens) + .then_with(|| a.turn_id.cmp(&b.turn_id)) + .then_with(|| a.inference_idx.cmp(&b.inference_idx)) + }); + + let top = opts.effective_top() as usize; + if out.len() > top { + out.truncate(top); + } + out +} + +fn rail_passes_filter(rail: &OwnerRail, filter: OwnerFilter) -> bool { + match (rail, filter) { + (_, OwnerFilter::All) => true, + (OwnerRail::Main, OwnerFilter::Main) => true, + (OwnerRail::Subagent { .. }, OwnerFilter::Subagent) => true, + _ => false, + } +} + +fn attributed_cost(delta_tokens: i64, model: &str, pricing: &PricingTable) -> f64 { + if delta_tokens <= 0 { + return 0.0; + } + let Some(rate) = crate::analyze::cost::lookup_model_rate(model, pricing) else { + return 0.0; + }; + // Charge at cache_read because cache_read is what every *future* + // inference pays for the persisted prefix this delta added. The + // model's first inference after the prompt grows pays cache_write + // once; every subsequent inference pays cache_read. We bill at + // cache_read here so the "what did this cost me" number reflects + // the steady-state, not the one-shot. + (delta_tokens as f64 / 1_000_000.0) * rate.cache_read +} + +// --------------------------------------------------------------------------- +// Timeline construction (DFS of spans across the session) +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone)] +struct TimelineItem { + session_id: String, + turn_id: String, + owner: OwnerRail, + kind: TimelineKind, + start_ms: i64, + end_ms: i64, +} + +#[derive(Debug, Clone)] +enum TimelineKind { + Inference { + context_tokens: u64, + model: String, + }, + ToolResult { + tool_use_id: String, + tool_name: String, + approx_bytes: u64, + truncated: bool, + }, + UserPrompt { + approx_tokens: u64, + has_system_reminder: bool, + }, + // Reserved for the system-reminder detection follow-up (#425). The + // span-tree builders do not yet synthesize `SystemReminder` leaves, + // so this variant is unreachable from the live timeline today; it's + // kept in the shape so the day #425 lands no consumer surface + // changes. Suppress dead_code until the builder wires it up. + #[allow(dead_code)] + SystemReminder { + source: ReminderSource, + approx_tokens: u64, + }, +} + +impl TimelineItem { + fn to_intervening_step(&self) -> Option { + match &self.kind { + TimelineKind::ToolResult { + tool_use_id, + tool_name, + approx_bytes, + truncated, + } => Some(InterveningStep::ToolResult { + tool_use_id: tool_use_id.clone(), + tool_name: tool_name.clone(), + approx_tokens: *approx_bytes / BYTES_PER_TOKEN, + approx_bytes: *approx_bytes, + truncated: *truncated, + }), + TimelineKind::UserPrompt { + approx_tokens, + has_system_reminder, + } => Some(InterveningStep::UserPrompt { + approx_tokens: *approx_tokens, + has_system_reminder: *has_system_reminder, + }), + TimelineKind::SystemReminder { + source, + approx_tokens, + } => Some(InterveningStep::SystemReminder { + source: *source, + approx_tokens: *approx_tokens, + }), + TimelineKind::Inference { .. } => None, + } + } +} + +fn build_timeline(trees: &[TurnSpanTree]) -> Vec { + let mut out: Vec = Vec::new(); + for tree in trees { + walk_node( + &tree.root, + &tree.session_id, + &tree.turn_id, + &OwnerRail::Main, + &mut out, + ); + } + out +} + +fn walk_node( + node: &SpanNode, + session_id: &str, + turn_id: &str, + parent_owner: &OwnerRail, + out: &mut Vec, +) { + // If this span is a Subagent root, switch the owner rail for the + // subtree to `Subagent(agent_id)`. The Subagent span itself does + // not emit a timeline item — it's a rail boundary, not a leaf the + // delta consumer cares about. + let owner_for_subtree = if matches!(node.kind, SpanKind::Subagent) { + let agent_id = match node.attributes.get("agent_id") { + Some(AttrValue::String(s)) => s.clone(), + _ => String::new(), + }; + OwnerRail::Subagent { agent_id } + } else { + parent_owner.clone() + }; + + match node.kind { + SpanKind::Inference => { + let input = attr_int(node, "tokens.input").unwrap_or(0); + let cache_read = attr_int(node, "tokens.cache_read").unwrap_or(0); + let cache_write = attr_int(node, "tokens.cache_write").unwrap_or(0); + let context_tokens = (input + cache_read + cache_write).max(0) as u64; + let model = match node.attributes.get("model") { + Some(AttrValue::String(s)) => s.clone(), + _ => node.name.clone(), + }; + out.push(TimelineItem { + session_id: session_id.to_string(), + turn_id: turn_id.to_string(), + owner: parent_owner.clone(), + kind: TimelineKind::Inference { + context_tokens, + model, + }, + start_ms: node.start_ms, + end_ms: node.end_ms, + }); + } + SpanKind::ToolResult => { + let tool_use_id = match node.attributes.get("tool_use_id") { + Some(AttrValue::String(s)) => s.clone(), + _ => String::new(), + }; + let approx_bytes = attr_int(node, "output_bytes").unwrap_or(0).max(0) as u64; + let truncated = matches!( + node.attributes.get("output_truncated"), + Some(AttrValue::Bool(true)) + ); + // Tool name lives on the parent ToolUse, not the + // ToolResult — we don't have a back-pointer here, so we + // emit an empty string and let the parent-loop replace it + // before pushing. The caller (`walk_node` for ToolUse + // below) fills it in. + out.push(TimelineItem { + session_id: session_id.to_string(), + turn_id: turn_id.to_string(), + owner: parent_owner.clone(), + kind: TimelineKind::ToolResult { + tool_use_id, + tool_name: String::new(), + approx_bytes, + truncated, + }, + start_ms: node.start_ms, + end_ms: node.end_ms, + }); + } + SpanKind::ToolUse => { + // Walk children; if a child ToolResult lands in `out`, + // backfill its tool_name from this ToolUse's name. + let tool_name = node.name.clone(); + let before = out.len(); + for child in &node.children { + walk_node(child, session_id, turn_id, &owner_for_subtree, out); + } + for item in out.iter_mut().skip(before) { + if let TimelineKind::ToolResult { + tool_name: ref mut tn, + .. + } = item.kind + { + if tn.is_empty() { + *tn = tool_name.clone(); + } + } + } + return; + } + SpanKind::UserPrompt => { + out.push(TimelineItem { + session_id: session_id.to_string(), + turn_id: turn_id.to_string(), + owner: parent_owner.clone(), + kind: TimelineKind::UserPrompt { + approx_tokens: 0, + has_system_reminder: false, + }, + start_ms: node.start_ms, + end_ms: node.end_ms, + }); + } + SpanKind::Subagent | SpanKind::Skill | SpanKind::Turn => { + // Pass-through containers — recurse with the (possibly + // adjusted) owner rail. + } + } + + for child in &node.children { + walk_node(child, session_id, turn_id, &owner_for_subtree, out); + } +} + +fn attr_int(node: &SpanNode, key: &str) -> Option { + match node.attributes.get(key) { + Some(AttrValue::Int(i)) => Some(*i), + _ => None, + } +} + +/// ISO-8601 -> Unix-ms. Mirror of the parsers elsewhere in the SDK; kept +/// here so this module has no dependency on `reader::claude::span_tree`'s +/// internals (which is a peer module, not an upstream). +fn parse_iso_ms(s: &str) -> Option { + let bytes = s.as_bytes(); + if bytes.len() < 19 { + return None; + } + if !(bytes[4] == b'-' + && bytes[7] == b'-' + && (bytes[10] == b'T' || bytes[10] == b' ') + && bytes[13] == b':' + && bytes[16] == b':') + { + return None; + } + let year: i64 = std::str::from_utf8(&bytes[0..4]).ok()?.parse().ok()?; + let month: u32 = std::str::from_utf8(&bytes[5..7]).ok()?.parse().ok()?; + let day: u32 = std::str::from_utf8(&bytes[8..10]).ok()?.parse().ok()?; + let hour: u32 = std::str::from_utf8(&bytes[11..13]).ok()?.parse().ok()?; + let minute: u32 = std::str::from_utf8(&bytes[14..16]).ok()?.parse().ok()?; + let second: u32 = std::str::from_utf8(&bytes[17..19]).ok()?.parse().ok()?; + let mut millis: i64 = 0; + let mut idx = 19; + if idx < bytes.len() && bytes[idx] == b'.' { + idx += 1; + let frac_start = idx; + while idx < bytes.len() && bytes[idx].is_ascii_digit() { + idx += 1; + } + let mut frac = std::str::from_utf8(&bytes[frac_start..idx]) + .ok()? + .to_string(); + if frac.len() > 3 { + frac.truncate(3); + } + while frac.len() < 3 { + frac.push('0'); + } + millis = frac.parse().ok()?; + } + let m = month as i64; + let d = day as i64; + let y = if m <= 2 { year - 1 } else { year }; + let era = if y >= 0 { y } else { y - 399 } / 400; + let yoe = (y - era * 400) as u64; + let mp = if m > 2 { m - 3 } else { m + 9 } as u64; + let doy = (153 * mp + 2) / 5 + (d as u64) - 1; + let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; + let days_from_epoch = era * 146_097 + (doe as i64) - 719_468; + let secs = + days_from_epoch * 86_400 + (hour as i64) * 3_600 + (minute as i64) * 60 + (second as i64); + Some(secs * 1_000 + millis) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::analyze::span_tree::{SpanKind, SpanNode, SpanStatus, TurnSpanTree}; + use crate::reader::{CompactionEvent, SourceKind}; + + fn make_inf(req_id: &str, model: &str, input: i64, cache_read: i64, cache_write: i64) -> SpanNode { + let mut n = SpanNode::new(SpanKind::Inference, model); + n.set_attr("model", AttrValue::str(model)); + n.set_attr("request_id", AttrValue::str(req_id)); + n.set_attr("tokens.input", AttrValue::Int(input)); + n.set_attr("tokens.output", AttrValue::Int(0)); + n.set_attr("tokens.cache_read", AttrValue::Int(cache_read)); + n.set_attr("tokens.cache_write", AttrValue::Int(cache_write)); + n.set_attr("tokens.reasoning", AttrValue::Int(0)); + n + } + + fn make_tool_use(name: &str, tool_use_id: &str) -> SpanNode { + let mut n = SpanNode::new(SpanKind::ToolUse, name); + n.set_attr("tool_use_id", AttrValue::str(tool_use_id)); + n + } + + fn make_tool_result(tool_use_id: &str, bytes: i64, truncated: bool) -> SpanNode { + let mut n = SpanNode::new(SpanKind::ToolResult, "tool-result"); + n.set_attr("tool_use_id", AttrValue::str(tool_use_id)); + n.set_attr("output_bytes", AttrValue::Int(bytes)); + if truncated { + n.set_attr("output_truncated", AttrValue::Bool(true)); + } + n + } + + fn make_user_prompt() -> SpanNode { + SpanNode::new(SpanKind::UserPrompt, "user-prompt") + } + + fn turn_tree(session: &str, turn: &str, root: SpanNode) -> TurnSpanTree { + TurnSpanTree { + session_id: session.to_string(), + turn_id: turn.to_string(), + turn_number: 0, + root, + } + } + + /// Build a single-turn fixture: two inferences on the main rail + /// with a Bash tool_result between them whose 40_000 bytes + /// translates to a ~10k token jump. The delta should surface as + /// the top row with Bash as the driver. + #[test] + fn bash_blowup_surfaces_as_top_delta_with_bash_driver() { + // inference #1: context = 1000 + let inf1 = make_inf("req-1", "claude-sonnet-4-6", 1000, 0, 0); + let mut bash_use = make_tool_use("Bash", "tu-1"); + bash_use.children.push(make_tool_result("tu-1", 40_000, false)); + let mut inf1 = inf1; + inf1.children.push(bash_use); + + // inference #2: context jumped to 12_000 — delta of 11_000. + let inf2 = make_inf("req-2", "claude-sonnet-4-6", 12_000, 0, 0); + + let mut root = SpanNode::new(SpanKind::Turn, "turn"); + root.status = SpanStatus::Ok; + root.children.push(make_user_prompt()); + root.children.push(inf1); + root.children.push(inf2); + + let tree = turn_tree("sess-1", "msg-1", root); + let pricing = crate::analyze::pricing::load_builtin_pricing(); + let opts = ContextDeltaOpts::default(); + let deltas = deltas_for_session(&[tree], &[], &pricing, &opts); + assert_eq!(deltas.len(), 1, "one pairwise delta expected"); + let d = &deltas[0]; + assert_eq!(d.session_id, "sess-1"); + assert_eq!(d.turn_id, "msg-1"); + assert_eq!(d.owner_rail, OwnerRail::Main); + assert_eq!(d.prior_context_tokens, 1000); + assert_eq!(d.current_context_tokens, 12_000); + assert_eq!(d.delta_tokens, 11_000); + assert_eq!(d.intervening.len(), 1); + match &d.intervening[0] { + InterveningStep::ToolResult { + tool_name, + approx_bytes, + approx_tokens, + truncated, + .. + } => { + assert_eq!(tool_name, "Bash"); + assert_eq!(*approx_bytes, 40_000); + assert_eq!(*approx_tokens, 10_000); + assert!(!*truncated); + } + other => panic!("expected ToolResult step, got {other:?}"), + } + // The driver_label helper should mention Bash. + assert!(d.intervening[0].driver_label().contains("Bash")); + // Cost is non-negative. + assert!(d.attributed_cost_usd >= 0.0); + } + + /// Compaction handling: a CompactionEvent between two inferences + /// where the second has *less* context than the first must surface + /// as `Compaction { tokens_freed }`, NOT a negative `delta_tokens`. + #[test] + fn compaction_replaces_negative_delta() { + let inf1 = make_inf("req-1", "claude-sonnet-4-6", 50_000, 0, 0); + // After compaction, context drops to 8_000. + let inf2 = make_inf("req-2", "claude-sonnet-4-6", 8_000, 0, 0); + + // Stamp timestamps so the compaction event sits between them. + let mut inf1 = inf1; + inf1.start_ms = 1_776_643_201_000; + inf1.end_ms = 1_776_643_202_000; + let mut inf2 = inf2; + inf2.start_ms = 1_776_643_204_000; + inf2.end_ms = 1_776_643_205_000; + + let mut root = SpanNode::new(SpanKind::Turn, "turn"); + root.children.push(make_user_prompt()); + root.children.push(inf1); + root.children.push(inf2); + let tree = turn_tree("sess-1", "msg-1", root); + + let compaction = CompactionEvent { + v: 1, + source: SourceKind::ClaudeCode, + session_id: "sess-1".into(), + ts: "2026-04-20T00:00:03.000Z".into(), + preceding_message_id: Some("msg-1".into()), + tokens_before_compact: Some(50_000), + }; + + let pricing = crate::analyze::pricing::load_builtin_pricing(); + // Opts: min_delta 0 so the row isn't filtered out (delta_tokens is 0). + let opts = ContextDeltaOpts { + min_delta: Some(0), + ..ContextDeltaOpts::default() + }; + let deltas = deltas_for_session(&[tree], &[compaction], &pricing, &opts); + assert_eq!(deltas.len(), 1); + let d = &deltas[0]; + assert_eq!(d.delta_tokens, 0, "compaction clamps to 0"); + let has_compaction = d + .intervening + .iter() + .any(|s| matches!(s, InterveningStep::Compaction { tokens_freed } if *tokens_freed == 42_000)); + assert!( + has_compaction, + "expected Compaction step with tokens_freed=42000, got {:?}", + d.intervening + ); + } + + /// Subagent isolation: a main-rail Inference and a subagent + /// Inference both happen, with a subagent tool_result between + /// them. The main-rail delta must NOT include the subagent's + /// tool_result. + #[test] + fn subagent_isolation_main_rail_excludes_subagent_results() { + // Main rail: two inferences with a 10k context jump. + let mut main_inf1 = make_inf("req-main-1", "claude-sonnet-4-6", 1000, 0, 0); + // Add a Task tool_use that fans out to a Subagent with its own + // inference + tool_result. The subagent's tool_result has + // 40k bytes (~10k tokens) — and the main rail's tool_use also + // gets a small result so the main delta is non-zero. + let mut task_use = make_tool_use("Task", "tu-task"); + + let mut sub_node = SpanNode::new(SpanKind::Subagent, "general-purpose"); + sub_node.set_attr("agent_id", AttrValue::str("agent-a")); + // Subagent inferences: + let sub_inf1 = make_inf("req-sub-1", "claude-sonnet-4-6", 2000, 0, 0); + let mut sub_bash = make_tool_use("Bash", "tu-sub-bash"); + sub_bash + .children + .push(make_tool_result("tu-sub-bash", 40_000, false)); + let mut sub_inf1 = sub_inf1; + sub_inf1.children.push(sub_bash); + let sub_inf2 = make_inf("req-sub-2", "claude-sonnet-4-6", 12_000, 0, 0); + sub_node.children.push(sub_inf1); + sub_node.children.push(sub_inf2); + task_use.children.push(sub_node); + main_inf1.children.push(task_use); + + // Main rail #2: small jump only (no big tool_result on its own). + let main_inf2 = make_inf("req-main-2", "claude-sonnet-4-6", 3000, 0, 0); + + let mut root = SpanNode::new(SpanKind::Turn, "turn"); + root.children.push(make_user_prompt()); + root.children.push(main_inf1); + root.children.push(main_inf2); + let tree = turn_tree("sess-1", "msg-1", root); + + let pricing = crate::analyze::pricing::load_builtin_pricing(); + let opts = ContextDeltaOpts { + min_delta: Some(0), + ..ContextDeltaOpts::default() + }; + let deltas = deltas_for_session(&[tree], &[], &pricing, &opts); + + // We expect one main-rail delta and one subagent-rail delta. + let main_delta = deltas + .iter() + .find(|d| d.owner_rail == OwnerRail::Main) + .expect("main-rail delta missing"); + let sub_delta = deltas + .iter() + .find(|d| matches!(&d.owner_rail, OwnerRail::Subagent { agent_id } if agent_id == "agent-a")) + .expect("subagent-rail delta missing"); + + // Main delta intervening must NOT include the subagent's tool_result. + for step in &main_delta.intervening { + if let InterveningStep::ToolResult { tool_use_id, .. } = step { + assert_ne!( + tool_use_id, "tu-sub-bash", + "main rail must NOT see the subagent's tool_result" + ); + } + } + + // Subagent delta intervening SHOULD include its own Bash result. + let sub_has_bash = sub_delta.intervening.iter().any(|s| match s { + InterveningStep::ToolResult { tool_use_id, .. } => tool_use_id == "tu-sub-bash", + _ => false, + }); + assert!( + sub_has_bash, + "subagent rail must see its own Bash tool_result" + ); + } + + /// Empty rail (single inference, no prev) → no delta emitted, no + /// panic. + #[test] + fn single_inference_yields_no_delta() { + let inf1 = make_inf("req-1", "claude-sonnet-4-6", 1000, 0, 0); + let mut root = SpanNode::new(SpanKind::Turn, "turn"); + root.children.push(make_user_prompt()); + root.children.push(inf1); + let tree = turn_tree("sess-1", "msg-1", root); + let pricing = crate::analyze::pricing::load_builtin_pricing(); + let deltas = deltas_for_session(&[tree], &[], &pricing, &ContextDeltaOpts::default()); + assert!( + deltas.is_empty(), + "single inference must not emit a pairwise delta" + ); + } + + /// `min_delta` filters out small jumps. + #[test] + fn min_delta_filters_small_jumps() { + let inf1 = make_inf("req-1", "claude-sonnet-4-6", 1000, 0, 0); + // Small jump: +500 tokens. + let inf2 = make_inf("req-2", "claude-sonnet-4-6", 1500, 0, 0); + + let mut root = SpanNode::new(SpanKind::Turn, "turn"); + root.children.push(make_user_prompt()); + root.children.push(inf1); + root.children.push(inf2); + let tree = turn_tree("sess-1", "msg-1", root); + let pricing = crate::analyze::pricing::load_builtin_pricing(); + // Default min_delta is 1000; 500 < 1000 → filtered out. + let deltas = deltas_for_session(&[tree], &[], &pricing, &ContextDeltaOpts::default()); + assert!(deltas.is_empty(), "500 token jump must be filtered"); + + // Lower the threshold to 100 → row appears. + let opts = ContextDeltaOpts { + min_delta: Some(100), + ..ContextDeltaOpts::default() + }; + let deltas = deltas_for_session( + &[turn_tree( + "sess-1", + "msg-1", + root_with_two_infs(1000, 1500), + )], + &[], + &pricing, + &opts, + ); + assert_eq!(deltas.len(), 1); + assert_eq!(deltas[0].delta_tokens, 500); + } + + fn root_with_two_infs(ctx1: i64, ctx2: i64) -> SpanNode { + let mut root = SpanNode::new(SpanKind::Turn, "turn"); + root.children.push(make_user_prompt()); + root.children + .push(make_inf("req-1", "claude-sonnet-4-6", ctx1, 0, 0)); + root.children + .push(make_inf("req-2", "claude-sonnet-4-6", ctx2, 0, 0)); + root + } + + /// `--top N` caps output. + #[test] + fn top_caps_output() { + // Build a tree with 5 inferences, each adding 5000 tokens. + let mut root = SpanNode::new(SpanKind::Turn, "turn"); + root.children.push(make_user_prompt()); + let ctx_steps = [1000, 6000, 11_000, 16_000, 21_000]; + for (i, c) in ctx_steps.iter().enumerate() { + root.children.push(make_inf( + &format!("req-{i}"), + "claude-sonnet-4-6", + *c, + 0, + 0, + )); + } + let tree = turn_tree("sess-1", "msg-1", root); + let pricing = crate::analyze::pricing::load_builtin_pricing(); + + // No cap → 4 pairwise deltas (5 inferences = 4 windows). + let opts = ContextDeltaOpts { + min_delta: Some(0), + ..ContextDeltaOpts::default() + }; + let all = deltas_for_session(&[tree.clone()], &[], &pricing, &opts); + assert_eq!(all.len(), 4); + + // Cap at 2 → only the top 2 deltas. + let opts = ContextDeltaOpts { + min_delta: Some(0), + top: Some(2), + ..ContextDeltaOpts::default() + }; + let top2 = deltas_for_session(&[tree], &[], &pricing, &opts); + assert_eq!(top2.len(), 2); + } + + /// `--owner main` filter excludes subagent rails. + #[test] + fn owner_filter_main_excludes_subagent_rail() { + // Reuse the subagent-isolation fixture shape. + let mut main_inf1 = make_inf("req-main-1", "claude-sonnet-4-6", 1000, 0, 0); + let mut task_use = make_tool_use("Task", "tu-task"); + let mut sub_node = SpanNode::new(SpanKind::Subagent, "general-purpose"); + sub_node.set_attr("agent_id", AttrValue::str("agent-a")); + sub_node + .children + .push(make_inf("req-sub-1", "claude-sonnet-4-6", 2000, 0, 0)); + sub_node + .children + .push(make_inf("req-sub-2", "claude-sonnet-4-6", 22_000, 0, 0)); + task_use.children.push(sub_node); + main_inf1.children.push(task_use); + let main_inf2 = make_inf("req-main-2", "claude-sonnet-4-6", 3000, 0, 0); + + let mut root = SpanNode::new(SpanKind::Turn, "turn"); + root.children.push(make_user_prompt()); + root.children.push(main_inf1); + root.children.push(main_inf2); + let tree = turn_tree("sess-1", "msg-1", root); + + let pricing = crate::analyze::pricing::load_builtin_pricing(); + let opts = ContextDeltaOpts { + min_delta: Some(0), + owner: OwnerFilter::Main, + ..ContextDeltaOpts::default() + }; + let deltas = deltas_for_session(&[tree], &[], &pricing, &opts); + for d in &deltas { + assert_eq!( + d.owner_rail, + OwnerRail::Main, + "owner filter Main must exclude subagent rails" + ); + } + assert!(!deltas.is_empty(), "expected at least one main-rail delta"); + } + + /// JSON shape: rail serializes with a `kind` discriminant, steps + /// keep their kebab-case kind tag. Catch wire-format drift early. + #[test] + fn json_shape_uses_kebab_case_discriminants() { + let d = ContextDelta { + session_id: "s".into(), + turn_id: "t".into(), + inference_idx: 2, + owner_rail: OwnerRail::Subagent { + agent_id: "agent-x".into(), + }, + prior_context_tokens: 10, + current_context_tokens: 20, + delta_tokens: 10, + intervening: vec![InterveningStep::ToolResult { + tool_use_id: "tu-1".into(), + tool_name: "Bash".into(), + approx_tokens: 5, + approx_bytes: 20, + truncated: false, + }], + attributed_cost_usd: 0.0, + }; + let s = serde_json::to_string(&d).unwrap(); + assert!(s.contains("\"kind\":\"subagent\""), "got {s}"); + assert!(s.contains("\"agentId\":\"agent-x\""), "got {s}"); + assert!(s.contains("\"kind\":\"tool-result\""), "got {s}"); + assert!(s.contains("\"toolUseId\":\"tu-1\""), "got {s}"); + let back: ContextDelta = serde_json::from_str(&s).unwrap(); + assert_eq!(back, d); + } + + /// System reminder step surfaces as its own intervening step row. + /// We construct one directly (the timeline builder doesn't yet + /// synthesize SystemReminder spans from content sidecars; first- + /// cut behavior per the issue). + #[test] + fn system_reminder_step_round_trips_in_json() { + let step = InterveningStep::SystemReminder { + source: ReminderSource::Other, + approx_tokens: 250, + }; + let s = serde_json::to_string(&step).unwrap(); + assert!(s.contains("\"kind\":\"system-reminder\""), "got {s}"); + assert!(s.contains("\"source\":\"other\""), "got {s}"); + let back: InterveningStep = serde_json::from_str(&s).unwrap(); + assert_eq!(back, step); + } +} diff --git a/crates/relayburn-sdk/src/lib.rs b/crates/relayburn-sdk/src/lib.rs index d357d3e2..a741ae70 100644 --- a/crates/relayburn-sdk/src/lib.rs +++ b/crates/relayburn-sdk/src/lib.rs @@ -86,6 +86,11 @@ pub use crate::ledger::{ StampSelector, DEFAULT_RETENTION_DAYS, }; +pub use crate::analyze::{ + deltas_for_session, ContextDelta, ContextDeltaOpts, InterveningStep, + OwnerFilter as ContextDeltaOwnerFilter, OwnerRail as ContextDeltaOwnerRail, ReminderSource, +}; + pub use crate::analyze::{ aggregate_by_bash, aggregate_by_bash_verb, aggregate_by_file, aggregate_by_mcp_server, aggregate_by_provider, aggregate_by_subagent, aggregate_subagent_type_stats, diff --git a/crates/relayburn-sdk/src/query_verbs.rs b/crates/relayburn-sdk/src/query_verbs.rs index 2048e960..4976ec36 100644 --- a/crates/relayburn-sdk/src/query_verbs.rs +++ b/crates/relayburn-sdk/src/query_verbs.rs @@ -4548,6 +4548,97 @@ pub fn session_span_trees( handle.session_span_trees(session_id) } +// --------------------------------------------------------------------------- +// Context delta — per-inference attribution of context-window growth (#432) +// --------------------------------------------------------------------------- +// +// Pure derivation over `session_span_trees`. The `ledger_home` plumbing is +// the only I/O; the math lives in `analyze::context_delta::deltas_for_session`. + +impl LedgerHandle { + /// Per-inference context-window deltas. + /// + /// Walks each session's [`TurnSpanTree`] timeline, pairs same-rail + /// `Inference` spans, and attributes the delta in `context_tokens = + /// input + cache_read + cache_write` to the intervening + /// [`InterveningStep`]s. See the module-level docs of + /// [`crate::analyze::context_delta`] for the algorithm and the + /// decision rationale (cost rate, compaction handling, rail + /// isolation). + /// + /// When [`ContextDeltaOpts::session`] is `Some`, only that session is + /// scanned. When `None`, every session in the ledger window + /// contributes; sessions are picked from + /// [`SessionsListOptions`]-style filters (no `since` is applied to + /// session enumeration today — the issue's window flag controls the + /// `Vec` cap, not the input set). + pub fn context_delta( + &self, + opts: crate::analyze::context_delta::ContextDeltaOpts, + ) -> Result> { + let pricing = load_pricing(None); + + let session_ids: Vec = match opts.session.clone() { + Some(id) => vec![id], + None => { + // Enumerate sessions present on the ledger (independent + // of `since` — that flag caps output rows, not the + // input scan). + let mut ids: BTreeSet = BTreeSet::new(); + let all = self.inner.query_turns(&Query::default())?; + for enriched in all { + ids.insert(enriched.turn.session_id); + } + ids.into_iter().collect() + } + }; + + let mut out: Vec = Vec::new(); + for session_id in session_ids { + let trees = self.session_span_trees(&session_id)?; + if trees.is_empty() { + continue; + } + let compactions = self.inner.query_compactions(&Query { + session_id: Some(session_id.clone()), + ..Default::default() + })?; + let per_session = crate::analyze::context_delta::deltas_for_session( + &trees, + &compactions, + &pricing, + &opts, + ); + out.extend(per_session); + } + + // Cross-session sort + top cap. `deltas_for_session` already + // sorted within a single session; re-sort here so multi-session + // calls return a single coherent top-N list. + out.sort_by(|a, b| { + b.delta_tokens + .cmp(&a.delta_tokens) + .then_with(|| a.session_id.cmp(&b.session_id)) + .then_with(|| a.turn_id.cmp(&b.turn_id)) + .then_with(|| a.inference_idx.cmp(&b.inference_idx)) + }); + let top = opts.effective_top() as usize; + if out.len() > top { + out.truncate(top); + } + Ok(out) + } +} + +/// Free-function form of [`LedgerHandle::context_delta`]. +pub fn context_delta( + opts: crate::analyze::context_delta::ContextDeltaOpts, + ledger_home: Option, +) -> Result> { + let handle = open_with(ledger_home.as_deref())?; + handle.context_delta(opts) +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- diff --git a/tests/fixtures/cli-golden/invocations.json b/tests/fixtures/cli-golden/invocations.json index 89227783..a049abbc 100644 --- a/tests/fixtures/cli-golden/invocations.json +++ b/tests/fixtures/cli-golden/invocations.json @@ -59,6 +59,18 @@ "expectStatus": 0, "enabled": true }, + { + "name": "overhead-deltas", + "args": ["overhead", "deltas", "--min-delta", "0", "--top", "5"], + "expectStatus": 0, + "enabled": true + }, + { + "name": "overhead-deltas-json", + "args": ["overhead", "deltas", "--min-delta", "0", "--top", "5", "--json"], + "expectStatus": 0, + "enabled": true + }, { "name": "state-status", "args": ["state", "status"], diff --git a/tests/fixtures/cli-golden/snapshots/overhead-deltas-json.stdout.txt b/tests/fixtures/cli-golden/snapshots/overhead-deltas-json.stdout.txt new file mode 100644 index 00000000..fe9f76de --- /dev/null +++ b/tests/fixtures/cli-golden/snapshots/overhead-deltas-json.stdout.txt @@ -0,0 +1,59 @@ +[ + { + "sessionId": "11111111-1111-1111-1111-111111111111", + "turnId": "msg-c1-2", + "inferenceIdx": 2, + "ownerRail": { + "kind": "main" + }, + "priorContextTokens": 6500, + "currentContextTokens": 8000, + "deltaTokens": 1500, + "intervening": [ + { + "kind": "user-prompt", + "approxTokens": 0, + "hasSystemReminder": false + } + ], + "attributedCostUSD": 0.00045 + }, + { + "sessionId": "22222222-2222-2222-2222-222222222222", + "turnId": "msg-c2-2", + "inferenceIdx": 2, + "ownerRail": { + "kind": "main" + }, + "priorContextTokens": 2900, + "currentContextTokens": 3300, + "deltaTokens": 400, + "intervening": [ + { + "kind": "user-prompt", + "approxTokens": 0, + "hasSystemReminder": false + } + ], + "attributedCostUSD": 0.00004 + }, + { + "sessionId": "11111111-1111-1111-1111-111111111111", + "turnId": "msg-c1-3", + "inferenceIdx": 3, + "ownerRail": { + "kind": "main" + }, + "priorContextTokens": 8000, + "currentContextTokens": 8200, + "deltaTokens": 200, + "intervening": [ + { + "kind": "user-prompt", + "approxTokens": 0, + "hasSystemReminder": false + } + ], + "attributedCostUSD": 0.00006 + } +] diff --git a/tests/fixtures/cli-golden/snapshots/overhead-deltas.stdout.txt b/tests/fixtures/cli-golden/snapshots/overhead-deltas.stdout.txt new file mode 100644 index 00000000..e78aa4cd --- /dev/null +++ b/tests/fixtures/cli-golden/snapshots/overhead-deltas.stdout.txt @@ -0,0 +1,8 @@ +Inference Owner Delta Cost Driver +Tc1-2/inf2 main +1.5k $0.0004 user prompt +Tc2-2/inf2 main +400 $0.0000 user prompt +Tc1-3/inf3 main +200 $0.0001 user prompt + +# token / cost figures are approximate (bytes/4 for tool results, +# cache-read rate for cost). Compaction rows surface separately and +# never appear as negative deltas. From cf3c61ecc48ee898664ff64d1839bf23e4e20bc9 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 26 May 2026 12:49:46 +0000 Subject: [PATCH 2/2] fix(context-delta): address review feedback on PR #452 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In-scope #452 fixes (A-G): - `burn overhead deltas` now honors `--since` (A): thread the parent overhead args' `since` into `ContextDeltaOpts`, parse relative ranges (`24h`/`7d`/`4w`/`2m`) into `Duration`, and use that to scope the session-enumeration `query_turns` seed inside the SDK so the `None`-session path no longer walks every historical session. - Make the per-rail and cross-session `ContextDelta` sort fully deterministic across HashMap iteration order (B): chain `owner_rail` then `session_id` as final tie-breakers. - UTF-8-safe `short_turn_label` / `short_agent_label` (C): switch from byte slicing to `chars().take(8)` so multi-byte ids never panic. - `format_signed_tokens` preserves the negative sign (D): emit `-` for negative deltas instead of dropping it. - Sort compactions with `sort_by_cached_key` (E) so `parse_iso_ms` runs once per element rather than once per comparison. - Dedup four copies of `parse_iso_ms` (F) into `crates/relayburn-sdk/src/util/time.rs`; the analyze/context_delta, query_verbs, reader/claude, and reader/codex copies now share one implementation. - `read_jsonl_values` streams via `BufReader::lines()` (G) rather than reading the entire file into memory. Foundation fixes (carried in #452's diff; will also land on #451): - Propagate `output_truncated` on `ToolResult` span nodes (H) in both the Claude and Codex builders so downstream consumers can flag truncated tool outputs. - Propagate `ToolResult` error status to the parent `ToolUse` (I, J) in both builders — the runtime tool_result is the ground truth, not the assistant row's `is_error` hint. - Don't drop subagents whose `paired_tool_use_id` doesn't match any ToolUse in the turn (K): drain leftover `paired_subagents` after the inference walk and surface them as `unattached` siblings under the turn root. - Stop swallowing real ledger-read failures (M): replace the blanket `unwrap_or_default()` on `query_inferences` / `query_tool_result_events` with a `match` that tolerates only the pre-schema "no such table/column" class of error and propagates every other failure. Tests: `cargo test --workspace` (zero failures, +2 new tests in the `query_verbs` mod for the since filter and helper). Zero warnings on `cargo build --workspace`. --- crates/relayburn-cli/src/commands/overhead.rs | 66 ++++-- .../src/analyze/context_delta.rs | 77 ++----- crates/relayburn-sdk/src/lib.rs | 1 + crates/relayburn-sdk/src/query_verbs.rs | 194 +++++++++++------- .../src/reader/claude/span_tree.rs | 88 +++----- .../src/reader/codex/span_tree.rs | 68 ++---- crates/relayburn-sdk/src/util.rs | 8 + crates/relayburn-sdk/src/util/time.rs | 87 ++++++++ 8 files changed, 337 insertions(+), 252 deletions(-) create mode 100644 crates/relayburn-sdk/src/util.rs create mode 100644 crates/relayburn-sdk/src/util/time.rs diff --git a/crates/relayburn-cli/src/commands/overhead.rs b/crates/relayburn-cli/src/commands/overhead.rs index 365fafde..e36e2369 100644 --- a/crates/relayburn-cli/src/commands/overhead.rs +++ b/crates/relayburn-cli/src/commands/overhead.rs @@ -29,7 +29,7 @@ pub fn run(globals: &GlobalArgs, args: OverheadArgs) -> i32 { Some(OverheadAction::Trim(trim)) => { run_trim(globals, args.project, args.since, args.kind, trim.top) } - Some(OverheadAction::Deltas(deltas)) => run_deltas(globals, deltas), + Some(OverheadAction::Deltas(deltas)) => run_deltas(globals, args.since, deltas), None => run_report(globals, args.project, args.since, args.kind), } } @@ -368,10 +368,14 @@ fn format_line_range(start: u64, end: u64) -> String { // `burn overhead deltas` (#432) // --------------------------------------------------------------------------- -fn run_deltas(globals: &GlobalArgs, args: OverheadDeltasArgs) -> i32 { +fn run_deltas( + globals: &GlobalArgs, + since: Option, + args: OverheadDeltasArgs, +) -> i32 { let opts = ContextDeltaOpts { session: args.session.clone(), - since: None, + since: since.as_deref().and_then(parse_since_duration), top: args.top, min_delta: args.min_delta, owner: args.owner.into(), @@ -468,29 +472,61 @@ fn render_human_deltas(deltas: &[ContextDelta], explain: bool) -> io::Result<()> Ok(()) } +/// Parse the CLI's relative-range `--since` form (`24h`, `7d`, `4w`, `2m`) +/// into a [`std::time::Duration`]. ISO-timestamp forms are accepted by the +/// SDK's `normalize_since` elsewhere, but the deltas verb only takes a +/// relative window today (`ContextDeltaOpts::since: Option`). +/// Unrecognized inputs fall through to `None` — the SDK then applies the +/// 24h default. +fn parse_since_duration(s: &str) -> Option { + if s.is_empty() { + return None; + } + let bytes = s.as_bytes(); + let unit = *bytes.last()? as char; + if !matches!(unit, 'h' | 'd' | 'w' | 'm') { + return None; + } + let num = &s[..s.len() - 1]; + if num.is_empty() || !num.bytes().all(|b| b.is_ascii_digit()) { + return None; + } + let n: u64 = num.parse().ok()?; + let secs = match unit { + 'h' => n.checked_mul(3_600)?, + 'd' => n.checked_mul(86_400)?, + 'w' => n.checked_mul(7 * 86_400)?, + 'm' => n.checked_mul(30 * 86_400)?, + _ => unreachable!(), + }; + Some(std::time::Duration::from_secs(secs)) +} + fn short_turn_label(turn_id: &str) -> String { // Turn ids on Claude are `msg-...` UUIDs; trim to a short prefix - // for the table. Keep the original for JSON output. + // for the table. Keep the original for JSON output. Use + // `chars().take(8)` rather than byte slicing so non-ASCII ids + // (defensive — Claude ids are ASCII, but the helper is generic) + // don't panic on a mid-byte cut. let trimmed = turn_id.trim_start_matches("msg_"); let trimmed = trimmed.trim_start_matches("msg-"); - if trimmed.len() > 8 { - format!("T{}", &trimmed[..8]) - } else { - format!("T{trimmed}") - } + let short: String = trimmed.chars().take(8).collect(); + format!("T{short}") } fn short_agent_label(agent_id: &str) -> String { let trimmed = agent_id.trim_start_matches("agent-"); - if trimmed.len() > 8 { - trimmed[..8].to_string() - } else { - trimmed.to_string() - } + trimmed.chars().take(8).collect() } fn format_signed_tokens(n: i64) -> String { - let sign = if n > 0 { "+" } else { "" }; + let sign = if n > 0 { + "+" + } else if n < 0 { + "-" + } else { + "" + }; format!("{sign}{}", format_tokens(n.unsigned_abs())) } diff --git a/crates/relayburn-sdk/src/analyze/context_delta.rs b/crates/relayburn-sdk/src/analyze/context_delta.rs index 1c330173..297c8fbf 100644 --- a/crates/relayburn-sdk/src/analyze/context_delta.rs +++ b/crates/relayburn-sdk/src/analyze/context_delta.rs @@ -59,6 +59,7 @@ use serde::{Deserialize, Serialize}; use crate::analyze::pricing::PricingTable; use crate::analyze::span_tree::{AttrValue, SpanKind, SpanNode, TurnSpanTree}; use crate::reader::CompactionEvent; +use crate::util::time::parse_iso_ms; /// Approximate bytes-per-token ratio used when no real tokenizer pass is /// available. Mirrors the rule of thumb used elsewhere in burn for @@ -277,7 +278,9 @@ pub fn deltas_for_session( } let timeline = build_timeline(trees); let mut compactions_sorted: Vec<&CompactionEvent> = compactions.iter().collect(); - compactions_sorted.sort_by_key(|c| parse_iso_ms(&c.ts).unwrap_or(0)); + // `sort_by_cached_key` so the relatively expensive `parse_iso_ms` runs once + // per element rather than once per comparison. + compactions_sorted.sort_by_cached_key(|c| parse_iso_ms(&c.ts).unwrap_or(0)); let mut per_rail: HashMap> = HashMap::new(); for (idx, item) in timeline.iter().enumerate() { @@ -376,13 +379,16 @@ pub fn deltas_for_session( } } - // Sort by delta descending, ties broken by turn_id then inference_idx - // so the output is deterministic across HashMap iteration order. + // Sort by delta descending, with a full lex chain so the output is + // deterministic across HashMap iteration order even when multiple + // rails / sessions tie on (delta_tokens, turn_id, inference_idx). out.sort_by(|a, b| { b.delta_tokens .cmp(&a.delta_tokens) .then_with(|| a.turn_id.cmp(&b.turn_id)) .then_with(|| a.inference_idx.cmp(&b.inference_idx)) + .then_with(|| owner_rail_sort_key(&a.owner_rail).cmp(&owner_rail_sort_key(&b.owner_rail))) + .then_with(|| a.session_id.cmp(&b.session_id)) }); let top = opts.effective_top() as usize; @@ -392,6 +398,16 @@ pub fn deltas_for_session( out } +/// Stable lex key for sorting `OwnerRail` so tie-breakers are deterministic +/// regardless of HashMap iteration order. `Main` sorts before any subagent; +/// subagents sort by `agent_id`. +fn owner_rail_sort_key(rail: &OwnerRail) -> (&str, &str) { + match rail { + OwnerRail::Main => ("main", ""), + OwnerRail::Subagent { agent_id } => ("subagent", agent_id.as_str()), + } +} + fn rail_passes_filter(rail: &OwnerRail, filter: OwnerFilter) -> bool { match (rail, filter) { (_, OwnerFilter::All) => true, @@ -631,61 +647,6 @@ fn attr_int(node: &SpanNode, key: &str) -> Option { } } -/// ISO-8601 -> Unix-ms. Mirror of the parsers elsewhere in the SDK; kept -/// here so this module has no dependency on `reader::claude::span_tree`'s -/// internals (which is a peer module, not an upstream). -fn parse_iso_ms(s: &str) -> Option { - let bytes = s.as_bytes(); - if bytes.len() < 19 { - return None; - } - if !(bytes[4] == b'-' - && bytes[7] == b'-' - && (bytes[10] == b'T' || bytes[10] == b' ') - && bytes[13] == b':' - && bytes[16] == b':') - { - return None; - } - let year: i64 = std::str::from_utf8(&bytes[0..4]).ok()?.parse().ok()?; - let month: u32 = std::str::from_utf8(&bytes[5..7]).ok()?.parse().ok()?; - let day: u32 = std::str::from_utf8(&bytes[8..10]).ok()?.parse().ok()?; - let hour: u32 = std::str::from_utf8(&bytes[11..13]).ok()?.parse().ok()?; - let minute: u32 = std::str::from_utf8(&bytes[14..16]).ok()?.parse().ok()?; - let second: u32 = std::str::from_utf8(&bytes[17..19]).ok()?.parse().ok()?; - let mut millis: i64 = 0; - let mut idx = 19; - if idx < bytes.len() && bytes[idx] == b'.' { - idx += 1; - let frac_start = idx; - while idx < bytes.len() && bytes[idx].is_ascii_digit() { - idx += 1; - } - let mut frac = std::str::from_utf8(&bytes[frac_start..idx]) - .ok()? - .to_string(); - if frac.len() > 3 { - frac.truncate(3); - } - while frac.len() < 3 { - frac.push('0'); - } - millis = frac.parse().ok()?; - } - let m = month as i64; - let d = day as i64; - let y = if m <= 2 { year - 1 } else { year }; - let era = if y >= 0 { y } else { y - 399 } / 400; - let yoe = (y - era * 400) as u64; - let mp = if m > 2 { m - 3 } else { m + 9 } as u64; - let doy = (153 * mp + 2) / 5 + (d as u64) - 1; - let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; - let days_from_epoch = era * 146_097 + (doe as i64) - 719_468; - let secs = - days_from_epoch * 86_400 + (hour as i64) * 3_600 + (minute as i64) * 60 + (second as i64); - Some(secs * 1_000 + millis) -} - // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- diff --git a/crates/relayburn-sdk/src/lib.rs b/crates/relayburn-sdk/src/lib.rs index a741ae70..50423ede 100644 --- a/crates/relayburn-sdk/src/lib.rs +++ b/crates/relayburn-sdk/src/lib.rs @@ -52,6 +52,7 @@ mod export_verbs; mod ingest_verb; mod query_verbs; mod stamp_verb; +mod util; pub use export_verbs::*; pub use ingest_verb::*; diff --git a/crates/relayburn-sdk/src/query_verbs.rs b/crates/relayburn-sdk/src/query_verbs.rs index 4976ec36..d7d93f91 100644 --- a/crates/relayburn-sdk/src/query_verbs.rs +++ b/crates/relayburn-sdk/src/query_verbs.rs @@ -4235,9 +4235,23 @@ impl LedgerHandle { let source = turns[0].source; // Bulk-load the per-session sidecar tables. - let inferences = self.inner.query_inferences(&session_q).unwrap_or_default(); - let tool_result_events = - self.inner.query_tool_result_events(&session_q).unwrap_or_default(); + // + // These tables landed in later schema versions (see #434 / #444); + // a pre-schema ledger reports "no such table" / "no such column". + // Tolerate that single class of failure so the span-tree builder + // still works on older snapshots, but propagate every other read + // error so corrupted ledgers don't silently produce truncated + // span trees (which would mis-attribute downstream context deltas). + let inferences = match self.inner.query_inferences(&session_q) { + Ok(v) => v, + Err(err) if is_schema_missing(&err) => Vec::new(), + Err(err) => return Err(err.into()), + }; + let tool_result_events = match self.inner.query_tool_result_events(&session_q) { + Ok(v) => v, + Err(err) if is_schema_missing(&err) => Vec::new(), + Err(err) => return Err(err.into()), + }; // Group sidecars by message_id for fast per-turn slicing. let mut infs_by_msg: HashMap> = HashMap::new(); @@ -4399,58 +4413,10 @@ fn first_record_ts_ms(records: &[serde_json::Value]) -> Option { earliest } -/// Local ISO-8601-with-fractional-seconds parser, returning Unix ms. -/// Mirrors the per-builder copies (see `reader::claude::span_tree::parse_iso_ms`) -/// — duplicated here so `query_verbs` doesn't reach into the builder -/// internals. +/// ISO-8601 parser thin wrapper. Reuses the shared `crate::util::time` +/// helper so all four ex-copies stay in sync. fn parse_iso_ms_compat(s: &str) -> Option { - let bytes = s.as_bytes(); - if bytes.len() < 19 { - return None; - } - if !(bytes[4] == b'-' - && bytes[7] == b'-' - && (bytes[10] == b'T' || bytes[10] == b' ') - && bytes[13] == b':' - && bytes[16] == b':') - { - return None; - } - let year: i64 = std::str::from_utf8(&bytes[0..4]).ok()?.parse().ok()?; - let month: u32 = std::str::from_utf8(&bytes[5..7]).ok()?.parse().ok()?; - let day: u32 = std::str::from_utf8(&bytes[8..10]).ok()?.parse().ok()?; - let hour: u32 = std::str::from_utf8(&bytes[11..13]).ok()?.parse().ok()?; - let minute: u32 = std::str::from_utf8(&bytes[14..16]).ok()?.parse().ok()?; - let second: u32 = std::str::from_utf8(&bytes[17..19]).ok()?.parse().ok()?; - let mut millis: i64 = 0; - let mut idx = 19; - if idx < bytes.len() && bytes[idx] == b'.' { - idx += 1; - let frac_start = idx; - while idx < bytes.len() && bytes[idx].is_ascii_digit() { - idx += 1; - } - let mut frac = std::str::from_utf8(&bytes[frac_start..idx]).ok()?.to_string(); - if frac.len() > 3 { - frac.truncate(3); - } - while frac.len() < 3 { - frac.push('0'); - } - millis = frac.parse().ok()?; - } - let m = month as i64; - let d = day as i64; - let y = if m <= 2 { year - 1 } else { year }; - let era = if y >= 0 { y } else { y - 399 } / 400; - let yoe = (y - era * 400) as u64; - let mp = if m > 2 { m - 3 } else { m + 9 } as u64; - let doy = (153 * mp + 2) / 5 + (d as u64) - 1; - let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; - let days_from_epoch = era * 146_097 + (doe as i64) - 719_468; - let secs = - days_from_epoch * 86_400 + (hour as i64) * 3_600 + (minute as i64) * 60 + (second as i64); - Some(secs * 1_000 + millis) + crate::util::time::parse_iso_ms(s) } /// Resolve the Claude projects root and discover + pair subagent @@ -4509,17 +4475,17 @@ fn discover_and_pair_subagents( /// sidecar as orphan in that case, which is the right fallback when /// the parent transcript is missing or corrupt. fn read_jsonl_values(path: &Path) -> Vec { - let bytes = match std::fs::read(path) { - Ok(b) => b, - Err(_) => return Vec::new(), - }; - let text = match std::str::from_utf8(&bytes) { - Ok(s) => s, + use std::io::BufRead; + let file = match std::fs::File::open(path) { + Ok(f) => f, Err(_) => return Vec::new(), }; - text.lines() + let reader = std::io::BufReader::new(file); + reader + .lines() .filter_map(|line| { - let t = line.trim(); + let l = line.ok()?; + let t = l.trim(); if t.is_empty() { None } else { @@ -4555,6 +4521,40 @@ pub fn session_span_trees( // Pure derivation over `session_span_trees`. The `ledger_home` plumbing is // the only I/O; the math lives in `analyze::context_delta::deltas_for_session`. +/// Return `true` when `err` looks like a pre-schema "table / column missing" +/// SQLite failure. Used to distinguish a tolerable "this ledger predates the +/// inferences / tool_result_events tables" miss from a real ledger-read +/// failure that should propagate to the caller. +fn is_schema_missing(err: &crate::ledger::LedgerError) -> bool { + let crate::ledger::LedgerError::Sqlite(rusqlite::Error::SqliteFailure(_, Some(msg))) = err + else { + return false; + }; + msg.contains("no such table") || msg.contains("no such column") +} + +/// Convert a relative `Duration` window into a canonical +/// `now - duration` ISO-8601 timestamp suitable for a [`Query::since`] +/// filter. Centralized so the deltas seed-query mirrors the same +/// `format_iso_z_ms` shape the rest of the SDK emits. +fn duration_to_since_iso(d: std::time::Duration) -> String { + let now = system_now_secs(); + let when = now.saturating_sub(d.as_secs()) as i64; + format_iso_z_ms(when, 0) +} + +/// Lex key for sorting cross-session [`ContextDelta`] rows by owner_rail +/// when other tie-breakers are equal. Mirrors the per-session helper in +/// `analyze::context_delta`. +fn owner_rail_str(rail: &crate::analyze::context_delta::OwnerRail) -> (&str, &str) { + match rail { + crate::analyze::context_delta::OwnerRail::Main => ("main", ""), + crate::analyze::context_delta::OwnerRail::Subagent { agent_id } => { + ("subagent", agent_id.as_str()) + } + } +} + impl LedgerHandle { /// Per-inference context-window deltas. /// @@ -4567,25 +4567,38 @@ impl LedgerHandle { /// isolation). /// /// When [`ContextDeltaOpts::session`] is `Some`, only that session is - /// scanned. When `None`, every session in the ledger window - /// contributes; sessions are picked from - /// [`SessionsListOptions`]-style filters (no `since` is applied to - /// session enumeration today — the issue's window flag controls the - /// `Vec` cap, not the input set). + /// scanned. When `None`, every session in the ledger that has activity + /// inside the [`ContextDeltaOpts::since`] window contributes — sessions + /// whose latest activity falls outside the window are skipped before any + /// span trees get loaded. The same window is then applied to the + /// returned [`Vec`] cap. pub fn context_delta( &self, opts: crate::analyze::context_delta::ContextDeltaOpts, ) -> Result> { let pricing = load_pricing(None); + // Build the seed `since` filter from `opts.since`. We always have a + // sensible `effective_since()` default, but only apply it when the + // caller actually passed a value — when `None`, scan every session. + // (Honoring the default would change historic behavior for callers + // that relied on "no since = all time".) + let seed_since: Option = opts.since.map(duration_to_since_iso); + let session_query = Query { + since: seed_since.clone(), + ..Default::default() + }; + let session_ids: Vec = match opts.session.clone() { Some(id) => vec![id], None => { - // Enumerate sessions present on the ledger (independent - // of `since` — that flag caps output rows, not the - // input scan). + // Enumerate sessions that have activity inside the + // `since` window. Walking only the matching `turns` + // rows keeps this cheap on large ledgers — we never + // load span trees for sessions that already missed + // the filter. let mut ids: BTreeSet = BTreeSet::new(); - let all = self.inner.query_turns(&Query::default())?; + let all = self.inner.query_turns(&session_query)?; for enriched in all { ids.insert(enriched.turn.session_id); } @@ -4614,13 +4627,16 @@ impl LedgerHandle { // Cross-session sort + top cap. `deltas_for_session` already // sorted within a single session; re-sort here so multi-session - // calls return a single coherent top-N list. + // calls return a single coherent top-N list. Tie chain includes + // `owner_rail` so subagent-vs-main ties stay stable across + // HashMap iteration order from the per-session pass. out.sort_by(|a, b| { b.delta_tokens .cmp(&a.delta_tokens) .then_with(|| a.session_id.cmp(&b.session_id)) .then_with(|| a.turn_id.cmp(&b.turn_id)) .then_with(|| a.inference_idx.cmp(&b.inference_idx)) + .then_with(|| owner_rail_str(&a.owner_rail).cmp(&owner_rail_str(&b.owner_rail))) }); let top = opts.effective_top() as usize; if out.len() > top { @@ -6245,6 +6261,40 @@ mod tests { assert!(result.replacement_savings.is_none()); } + #[test] + fn duration_to_since_iso_emits_canonical_zulu_ms() { + let iso = super::duration_to_since_iso(std::time::Duration::from_secs(60)); + // Shape only — actual value depends on system clock. We assert + // the canonical lower-bound shape `YYYY-MM-DDTHH:MM:SS.mmmZ` + // that `Query::since` lex-compares against ledger rows. + assert_eq!(iso.len(), 24, "{iso}"); + assert!(iso.ends_with(".000Z")); + assert!(iso.contains('T')); + } + + /// Regression for the `since`-is-ignored bug: when `opts.since` is + /// `Some`, sessions whose latest turn is older than the window must + /// not appear in the deltas output. With a 1-second window and + /// fixtures whose turns are dated 2026-04 (weeks in the past), + /// every fixture session falls outside and the result is empty. + /// Without the fix the SDK would walk every session, so this + /// asserts the seed `query_turns(&since_scoped)` actually narrows. + #[test] + fn context_delta_since_filter_excludes_old_sessions() { + use crate::analyze::context_delta::ContextDeltaOpts; + let (_dir, handle) = multi_session_handle(); + let opts = ContextDeltaOpts { + since: Some(std::time::Duration::from_secs(1)), + ..ContextDeltaOpts::default() + }; + let deltas = handle.context_delta(opts).expect("context_delta"); + assert!( + deltas.is_empty(), + "since=1s must drop fixture sessions whose latest turn is weeks old; got {} deltas", + deltas.len(), + ); + } + fn multi_session_handle() -> (TempDir, LedgerHandle) { let dir = tempfile::tempdir().unwrap(); let opts = LedgerOpenOptions::with_home(dir.path()); diff --git a/crates/relayburn-sdk/src/reader/claude/span_tree.rs b/crates/relayburn-sdk/src/reader/claude/span_tree.rs index 226a71a8..5dfc0574 100644 --- a/crates/relayburn-sdk/src/reader/claude/span_tree.rs +++ b/crates/relayburn-sdk/src/reader/claude/span_tree.rs @@ -52,6 +52,7 @@ use crate::reader::inference::Inference; use crate::reader::types::{ StopReason, ToolCall, ToolResultEventRecord, ToolResultStatus, TurnRecord, }; +use crate::util::time::parse_iso_ms; /// Inputs to the Claude span-tree builder. Grouped here (instead of as /// a long positional argument list) so future inputs — content sidecar @@ -175,7 +176,21 @@ pub fn build_claude_span_tree(inputs: ClaudeSpanTreeInputs<'_>) -> TurnSpanTree // Unpaired subagents — sibling nodes under the root with the // `unattached` flag. See module doc for the orphan-semantics choice. - for sa in unpaired_subagents { + // + // Anything still in `paired_subagents` after the inference walk + // had a `paired_tool_use_id` that didn't match any `ToolUse` we + // saw on this turn (out-of-sync inference view, ingest race, etc). + // Surface those as `unattached` siblings too rather than dropping + // them — losing a subagent transcript silently is the worst + // failure mode here. + let mut dangling_paired: Vec<&SubagentTranscript> = + paired_subagents.into_values().flatten().collect(); + dangling_paired.sort_by(|a, b| a.source_path.cmp(&b.source_path)); + + for sa in unpaired_subagents + .into_iter() + .chain(dangling_paired.into_iter()) + { root.children.push(build_subagent_node(sa, true)); } @@ -359,6 +374,16 @@ fn build_inference_node( if tool_node.end_ms < result_node.end_ms { tool_node.end_ms = result_node.end_ms; } + // Propagate the result's error up to the tool_use parent. + // `ToolCall::is_error` is the assistant-row hint; the + // tool_result event carries the runtime outcome. The + // parent inference rollup below only consults + // `tool_node.status`, so without this an errored + // tool_result on a tool_use whose `is_error` flag was + // unset would silently report as success. + if result_node.status.is_error() { + tool_node.set_error("tool_error"); + } tool_node.children.push(result_node); } } @@ -423,6 +448,13 @@ fn build_tool_result_node(events: &[&ToolResultEventRecord]) -> Option if let Some(bytes) = final_event.output_bytes { node.set_attr("output_bytes", AttrValue::Int(bytes as i64)); } + // Propagate `output_truncated` so downstream consumers (context-delta + // attribution, hotspots-by-bytes presenters) can flag tool outputs + // that ingest decided to cap. Without this, large outputs appear as + // fully representative even when only the head was retained. + if let Some(truncated) = final_event.output_truncated { + node.set_attr("output_truncated", AttrValue::Bool(truncated)); + } if final_event.is_error.unwrap_or(false) { node.set_error("tool_error"); } @@ -479,60 +511,6 @@ fn apply_stop_reason_status(root: &mut SpanNode, reason: Option) { }; } -/// Local copy of the inference module's `parse_iso_ms`. Duplicated to -/// keep the `analyze::span_tree` types module free of a dependency on -/// `reader::inference` internals — the parser is a peer, not an -/// upstream. Same Howard Hinnant math as everywhere else in the SDK. -fn parse_iso_ms(s: &str) -> Option { - let bytes = s.as_bytes(); - if bytes.len() < 19 { - return None; - } - if !(bytes[4] == b'-' - && bytes[7] == b'-' - && (bytes[10] == b'T' || bytes[10] == b' ') - && bytes[13] == b':' - && bytes[16] == b':') - { - return None; - } - let year: i64 = std::str::from_utf8(&bytes[0..4]).ok()?.parse().ok()?; - let month: u32 = std::str::from_utf8(&bytes[5..7]).ok()?.parse().ok()?; - let day: u32 = std::str::from_utf8(&bytes[8..10]).ok()?.parse().ok()?; - let hour: u32 = std::str::from_utf8(&bytes[11..13]).ok()?.parse().ok()?; - let minute: u32 = std::str::from_utf8(&bytes[14..16]).ok()?.parse().ok()?; - let second: u32 = std::str::from_utf8(&bytes[17..19]).ok()?.parse().ok()?; - let mut millis: i64 = 0; - let mut idx = 19; - if idx < bytes.len() && bytes[idx] == b'.' { - idx += 1; - let frac_start = idx; - while idx < bytes.len() && bytes[idx].is_ascii_digit() { - idx += 1; - } - let mut frac = std::str::from_utf8(&bytes[frac_start..idx]).ok()?.to_string(); - if frac.len() > 3 { - frac.truncate(3); - } - while frac.len() < 3 { - frac.push('0'); - } - millis = frac.parse().ok()?; - } - let m = month as i64; - let d = day as i64; - let y = if m <= 2 { year - 1 } else { year }; - let era = if y >= 0 { y } else { y - 399 } / 400; - let yoe = (y - era * 400) as u64; - let mp = if m > 2 { m - 3 } else { m + 9 } as u64; - let doy = (153 * mp + 2) / 5 + (d as u64) - 1; - let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; - let days_from_epoch = era * 146_097 + (doe as i64) - 719_468; - let secs = - days_from_epoch * 86_400 + (hour as i64) * 3_600 + (minute as i64) * 60 + (second as i64); - Some(secs * 1_000 + millis) -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/relayburn-sdk/src/reader/codex/span_tree.rs b/crates/relayburn-sdk/src/reader/codex/span_tree.rs index 18b86ef9..5d3e9192 100644 --- a/crates/relayburn-sdk/src/reader/codex/span_tree.rs +++ b/crates/relayburn-sdk/src/reader/codex/span_tree.rs @@ -41,6 +41,7 @@ use crate::reader::inference::{Inference, InferenceKeySource, InferenceKind, Too use crate::reader::types::{ StopReason, ToolCall, ToolResultEventRecord, ToolResultStatus, TurnRecord, }; +use crate::util::time::parse_iso_ms; /// Inputs to the Codex span-tree builder. Mirrors the Claude builder's /// input struct minus the subagent transcripts. @@ -232,6 +233,16 @@ fn build_inference_node( if tool_node.end_ms < result_node.end_ms { tool_node.end_ms = result_node.end_ms; } + // Propagate the result's error up to the tool_use parent. + // The Codex `ToolCall::is_error` only reflects what the + // assistant row claimed; the runtime tool_result event + // is the ground truth. Without this, an errored result + // on a tool_use that the assistant didn't flag would + // never reach the parent inference's `child_error` + // rollup below. + if result_node.status.is_error() { + tool_node.set_error("tool_error"); + } tool_node.children.push(result_node); } } @@ -270,6 +281,11 @@ fn build_tool_result_node(events: &[&ToolResultEventRecord]) -> Option if let Some(bytes) = final_event.output_bytes { node.set_attr("output_bytes", AttrValue::Int(bytes as i64)); } + // Propagate `output_truncated` mirroring the Claude builder (see the + // matching block in `reader::claude::span_tree::build_tool_result_node`). + if let Some(truncated) = final_event.output_truncated { + node.set_attr("output_truncated", AttrValue::Bool(truncated)); + } if final_event.is_error.unwrap_or(false) { node.set_error("tool_error"); } @@ -288,58 +304,6 @@ fn apply_stop_reason_status(root: &mut SpanNode, reason: Option) { }; } -/// Same Howard-Hinnant ISO parser the Claude builder uses. Duplicated -/// to keep each builder self-contained. -fn parse_iso_ms(s: &str) -> Option { - let bytes = s.as_bytes(); - if bytes.len() < 19 { - return None; - } - if !(bytes[4] == b'-' - && bytes[7] == b'-' - && (bytes[10] == b'T' || bytes[10] == b' ') - && bytes[13] == b':' - && bytes[16] == b':') - { - return None; - } - let year: i64 = std::str::from_utf8(&bytes[0..4]).ok()?.parse().ok()?; - let month: u32 = std::str::from_utf8(&bytes[5..7]).ok()?.parse().ok()?; - let day: u32 = std::str::from_utf8(&bytes[8..10]).ok()?.parse().ok()?; - let hour: u32 = std::str::from_utf8(&bytes[11..13]).ok()?.parse().ok()?; - let minute: u32 = std::str::from_utf8(&bytes[14..16]).ok()?.parse().ok()?; - let second: u32 = std::str::from_utf8(&bytes[17..19]).ok()?.parse().ok()?; - let mut millis: i64 = 0; - let mut idx = 19; - if idx < bytes.len() && bytes[idx] == b'.' { - idx += 1; - let frac_start = idx; - while idx < bytes.len() && bytes[idx].is_ascii_digit() { - idx += 1; - } - let mut frac = std::str::from_utf8(&bytes[frac_start..idx]).ok()?.to_string(); - if frac.len() > 3 { - frac.truncate(3); - } - while frac.len() < 3 { - frac.push('0'); - } - millis = frac.parse().ok()?; - } - let m = month as i64; - let d = day as i64; - let y = if m <= 2 { year - 1 } else { year }; - let era = if y >= 0 { y } else { y - 399 } / 400; - let yoe = (y - era * 400) as u64; - let mp = if m > 2 { m - 3 } else { m + 9 } as u64; - let doy = (153 * mp + 2) / 5 + (d as u64) - 1; - let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; - let days_from_epoch = era * 146_097 + (doe as i64) - 719_468; - let secs = - days_from_epoch * 86_400 + (hour as i64) * 3_600 + (minute as i64) * 60 + (second as i64); - Some(secs * 1_000 + millis) -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/relayburn-sdk/src/util.rs b/crates/relayburn-sdk/src/util.rs new file mode 100644 index 00000000..8ea18f87 --- /dev/null +++ b/crates/relayburn-sdk/src/util.rs @@ -0,0 +1,8 @@ +//! Crate-internal helpers shared across reader / analyze / query modules. +//! +//! Modules here are deliberately `pub(crate)`; they do not appear on the +//! published SDK surface. New helpers should live here only if they're +//! genuinely cross-module — single-module utilities belong with their +//! caller. + +pub(crate) mod time; diff --git a/crates/relayburn-sdk/src/util/time.rs b/crates/relayburn-sdk/src/util/time.rs new file mode 100644 index 00000000..6f745d36 --- /dev/null +++ b/crates/relayburn-sdk/src/util/time.rs @@ -0,0 +1,87 @@ +//! Time parsing helpers shared across the SDK. +//! +//! Centralizes a single ISO-8601 → Unix milliseconds parser so the four +//! historical copies (analyze::context_delta, query_verbs, reader::claude, +//! reader::codex) stay in sync. Uses Howard Hinnant's days-from-civil +//! formulation; no external `chrono`/`time` dependency. + +/// Parse an ISO-8601 / RFC-3339 timestamp (with optional fractional +/// seconds, truncated to milliseconds) into Unix milliseconds. +/// +/// Returns `None` for strings that don't match the expected +/// `YYYY-MM-DDTHH:MM:SS[.fff][Z]` shape. The trailing `Z` is optional — +/// callers passing offset-less ISO strings get the same answer as if `Z` +/// were present (the SDK's wire ledger format is always UTC). +pub(crate) fn parse_iso_ms(s: &str) -> Option { + let bytes = s.as_bytes(); + if bytes.len() < 19 { + return None; + } + if !(bytes[4] == b'-' + && bytes[7] == b'-' + && (bytes[10] == b'T' || bytes[10] == b' ') + && bytes[13] == b':' + && bytes[16] == b':') + { + return None; + } + let year: i64 = std::str::from_utf8(&bytes[0..4]).ok()?.parse().ok()?; + let month: u32 = std::str::from_utf8(&bytes[5..7]).ok()?.parse().ok()?; + let day: u32 = std::str::from_utf8(&bytes[8..10]).ok()?.parse().ok()?; + let hour: u32 = std::str::from_utf8(&bytes[11..13]).ok()?.parse().ok()?; + let minute: u32 = std::str::from_utf8(&bytes[14..16]).ok()?.parse().ok()?; + let second: u32 = std::str::from_utf8(&bytes[17..19]).ok()?.parse().ok()?; + let mut millis: i64 = 0; + let mut idx = 19; + if idx < bytes.len() && bytes[idx] == b'.' { + idx += 1; + let frac_start = idx; + while idx < bytes.len() && bytes[idx].is_ascii_digit() { + idx += 1; + } + let mut frac = std::str::from_utf8(&bytes[frac_start..idx]) + .ok()? + .to_string(); + if frac.len() > 3 { + frac.truncate(3); + } + while frac.len() < 3 { + frac.push('0'); + } + millis = frac.parse().ok()?; + } + let m = month as i64; + let d = day as i64; + let y = if m <= 2 { year - 1 } else { year }; + let era = if y >= 0 { y } else { y - 399 } / 400; + let yoe = (y - era * 400) as u64; + let mp = if m > 2 { m - 3 } else { m + 9 } as u64; + let doy = (153 * mp + 2) / 5 + (d as u64) - 1; + let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; + let days_from_epoch = era * 146_097 + (doe as i64) - 719_468; + let secs = + days_from_epoch * 86_400 + (hour as i64) * 3_600 + (minute as i64) * 60 + (second as i64); + Some(secs * 1_000 + millis) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_epoch() { + assert_eq!(parse_iso_ms("1970-01-01T00:00:00.000Z"), Some(0)); + } + + #[test] + fn parse_with_fractional() { + // 2026-01-01T00:00:00.500Z == 1767225600500 + assert_eq!(parse_iso_ms("2026-01-01T00:00:00.500Z"), Some(1_767_225_600_500)); + } + + #[test] + fn rejects_garbage() { + assert_eq!(parse_iso_ms("not a date"), None); + assert_eq!(parse_iso_ms("short"), None); + } +}