diff --git a/CHANGELOG.md b/CHANGELOG.md index 83b0434f..94f54856 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ Cross-package release notes for relayburn. Package changelogs contain package-le ### Added +- `relayburn-sdk`: `Inference` aggregate keys per-API-call rollups by + `(source, session_id, request_id)` with merged usage and `kind` + (`reasoning` / `message` / `tool-use` / `mixed`). Read via + `LedgerHandle::inferences(opts)` (free function `inferences()` too); + persisted at ingest into the new `inferences` table. Falls back to + `message_id` for harnesses without a `requestId` (Codex, opencode, + older Claude). (#434) - `burn summary`: one-line `Turn outcomes: …` breakdown of assistant `stop_reason` counts, plus a `stopReasons` block in `--json`. (#437) - Ledger fingerprint primitive (`{count}:{maxMtimeUnix}:{totalBytes}`) for @@ -45,6 +52,15 @@ Cross-package release notes for relayburn. Package changelogs contain package-le / `output_truncated` columns (#436). Both are migrated in place on `Ledger::open`; existing rows leave the new columns `NULL`. Run `burn state rebuild` to backfill an older ledger. +- `relayburn-sdk` ledger schema bumps to v5: adds the `inferences` + derived table for per-API-call aggregates. Created idempotently on + open; rebuilt by `burn state rebuild`. Pre-v5 ledgers stay empty + until rebuild or the next ingest run. (#434) +- `relayburn-sdk`: Claude Code parser now correctly merges `usage` + from the carrier row of a multi-block assistant message. Previously, + if the row carrying the `usage` block was not the first row for a + given `message_id`, its tokens were dropped. The new merge adopts + the carrier row's usage values whichever row owns them. (#434) ## [2.10.0] - 2026-05-24 diff --git a/crates/relayburn-cli/src/commands/state.rs b/crates/relayburn-cli/src/commands/state.rs index a9aa97ac..a68101f0 100644 --- a/crates/relayburn-cli/src/commands/state.rs +++ b/crates/relayburn-cli/src/commands/state.rs @@ -108,6 +108,10 @@ fn format_status(s: &StateStatus) -> String { " tool_result_events: {}\n", format_uint(s.burn.rows.tool_result_events) )); + out.push_str(&format!( + " inferences: {}\n", + format_uint(s.burn.rows.inferences) + )); out.push_str(&format!( " sessions: {}\n", format_uint(s.burn.rows.sessions) diff --git a/crates/relayburn-sdk/src/ingest/ingest.rs b/crates/relayburn-sdk/src/ingest/ingest.rs index 7dc710b6..e3491df7 100644 --- a/crates/relayburn-sdk/src/ingest/ingest.rs +++ b/crates/relayburn-sdk/src/ingest/ingest.rs @@ -33,7 +33,7 @@ use crate::reader::{ ContentRecord, ContentStoreMode, CumulativeUsage as ReaderCumulativeUsage, ParseCodexIncrementalOptions, ParseCodexIncrementalResult, ParseOpencodeIncrementalOptions, ParseOpencodeIncrementalResult, PersistedUserTurnSlot, ReconcileClaudeRelationshipsInput, - SessionRelationshipRecord, ToolResultEventRecord, UserTurnRecord, + SessionRelationshipRecord, ToolResultEventRecord, TurnRecord, UserTurnRecord, }; use crate::ingest::cursors::{ @@ -933,9 +933,29 @@ trait DerivedRecords { fn relationships(&self) -> &[SessionRelationshipRecord]; fn tool_result_events(&self) -> &[ToolResultEventRecord]; fn user_turns(&self) -> &[UserTurnRecord]; + /// Per-turn upstream-`requestId` lookup, keyed by `(source, + /// session_id, message_id)`. Default returns an empty cow; harnesses + /// without a `requestId` equivalent (Codex, opencode) accept this + /// default and the inference builder falls back to `message_id`. See + /// issue #434 and `reader::inference`. + fn request_id_lookup(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> { + std::borrow::Cow::Owned(crate::reader::RequestIdLookup::new()) + } + /// Turns the parser produced. The trailing-record appenders don't + /// strictly need it (the harness orchestrators count + append turns + /// themselves before calling [`apply_parsed_extras`]); the + /// `apply_parsed_extras` inference materializer uses it to rebuild + /// the per-API-call rows in lockstep with the persisted turns. + fn turns(&self) -> &[TurnRecord]; } -macro_rules! impl_derived_records { +/// Shared accessor body for the four parser-result types. The +/// `request_id_lookup` override is intentionally NOT in this macro — +/// Claude's two result types implement it directly below, while Codex / +/// opencode inherit the trait default empty `RequestIdLookup`. Splitting +/// the override out keeps the macro single-arm and avoids macro-level +/// boolean branching that `macro_rules!` doesn't natively support. +macro_rules! impl_derived_records_common { ($ty:ty) => { impl DerivedRecords for $ty { fn content(&self) -> &[ContentRecord] { @@ -953,19 +973,66 @@ macro_rules! impl_derived_records { fn user_turns(&self) -> &[UserTurnRecord] { &self.user_turns } + fn turns(&self) -> &[TurnRecord] { + &self.turns + } + fn request_id_lookup( + &self, + ) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> { + // Default: empty lookup (Codex / opencode). Claude + // overrides this in the per-impl block below; we can't + // express that override via a single macro arm because + // `macro_rules!` literals don't dispatch. + claude_request_id_lookup_for(self) + } } }; } -impl_derived_records!(ClaudeParseResult); -impl_derived_records!(ClaudeParseIncrementalResult); -impl_derived_records!(ParseCodexIncrementalResult); -impl_derived_records!(ParseOpencodeIncrementalResult); +impl_derived_records_common!(ClaudeParseResult); +impl_derived_records_common!(ClaudeParseIncrementalResult); +impl_derived_records_common!(ParseCodexIncrementalResult); +impl_derived_records_common!(ParseOpencodeIncrementalResult); + +/// Per-type adapter for the `request_id_lookup` override (issue #434). +/// Specialized for Claude's two result types so they borrow the parser's +/// real lookup; the generic fallback returns an empty owned lookup so +/// Codex / opencode behave as "no requestId". +trait ClaudeRequestIdSource { + fn lookup_cow(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup>; +} + +impl ClaudeRequestIdSource for ClaudeParseResult { + fn lookup_cow(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> { + std::borrow::Cow::Borrowed(&self.request_id_lookup) + } +} +impl ClaudeRequestIdSource for ClaudeParseIncrementalResult { + fn lookup_cow(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> { + std::borrow::Cow::Borrowed(&self.request_id_lookup) + } +} +impl ClaudeRequestIdSource for ParseCodexIncrementalResult { + fn lookup_cow(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> { + std::borrow::Cow::Owned(crate::reader::RequestIdLookup::new()) + } +} +impl ClaudeRequestIdSource for ParseOpencodeIncrementalResult { + fn lookup_cow(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> { + std::borrow::Cow::Owned(crate::reader::RequestIdLookup::new()) + } +} + +fn claude_request_id_lookup_for( + p: &P, +) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> { + p.lookup_cow() +} /// Append the trailing derived-record buckets shared by every parser -/// result: content, compactions, relationships, tool-result events, and -/// user-turn rows. Each bucket is gated on non-empty to avoid a no-op -/// transaction. +/// result: content, compactions, relationships, tool-result events, +/// user-turn rows, and (issue #434) per-API-call inferences. Each +/// bucket is gated on non-empty to avoid a no-op transaction. fn apply_parsed_extras(ledger: &mut Ledger, p: &P) -> anyhow::Result<()> { if !p.content().is_empty() { ledger.append_content(p.content())?; @@ -982,6 +1049,19 @@ fn apply_parsed_extras(ledger: &mut Ledger, p: &P) -> anyhow: if !p.user_turns().is_empty() { ledger.append_user_turns(p.user_turns())?; } + // Materialize inferences from the same turn slice the orchestrator + // appended. Building here (not in the parser) keeps the inference + // table in lockstep with what actually got persisted — if a turn + // was deduped at append time by the content-fingerprint check, its + // inference will simply re-replace the prior row via the + // `INSERT OR REPLACE` writer, which is the correct steady-state. + if !p.turns().is_empty() { + let lookup = p.request_id_lookup(); + let inferences = crate::reader::build_inferences(p.turns(), lookup.as_ref()); + if !inferences.is_empty() { + ledger.append_inferences(&inferences)?; + } + } Ok(()) } diff --git a/crates/relayburn-sdk/src/ledger.rs b/crates/relayburn-sdk/src/ledger.rs index 0f2ddc42..69915a63 100644 --- a/crates/relayburn-sdk/src/ledger.rs +++ b/crates/relayburn-sdk/src/ledger.rs @@ -115,6 +115,18 @@ impl Ledger { writer::append_user_turns(&mut self.conns.burn, records) } + /// Append per-API-call inferences (see issue #434). Re-ingest of the + /// same `(source, session_id, request_id)` triple replaces the + /// existing row — inferences are pure derived state and a re-parse + /// can legitimately produce updated `end_ts` / merged `usage` + /// values. + pub fn append_inferences( + &mut self, + records: &[crate::reader::Inference], + ) -> Result { + writer::append_inferences(&mut self.conns.burn, records) + } + pub fn append_stamp(&mut self, stamp: &Stamp) -> Result<()> { writer::append_stamp(&mut self.conns.burn, stamp) } @@ -151,6 +163,13 @@ impl Ledger { reader::query_user_turns(&self.conns.burn, q) } + /// Read per-API-call inferences for the given query window. See + /// [`Self::append_inferences`] for the underlying table semantics + /// (issue #434). + pub fn query_inferences(&self, q: &Query) -> Result> { + reader::query_inferences(&self.conns.burn, q) + } + pub fn list_stamps(&self) -> Result> { reader::list_stamps(&self.conns.burn) } diff --git a/crates/relayburn-sdk/src/ledger/db.rs b/crates/relayburn-sdk/src/ledger/db.rs index 46faf4b6..128ea069 100644 --- a/crates/relayburn-sdk/src/ledger/db.rs +++ b/crates/relayburn-sdk/src/ledger/db.rs @@ -181,6 +181,46 @@ fn migrate_burn_schema(conn: &Connection) -> Result<()> { )?; } + if current_version < 5 { + // v4 → v5: add the `inferences` derived table. The full DDL is + // baked into `BURN_DDL`'s `CREATE TABLE IF NOT EXISTS` block so a + // fresh open is already correct; here we only need to make sure + // the table EXISTS on legacy DBs whose `BURN_DDL` pre-pass might + // have happened against the pre-v5 schema (e.g. if a future + // migration step changes ordering). The CREATE is idempotent on + // every open, so re-running this step costs only a catalog + // probe. The table is populated by the ingest pipeline; pre-v5 + // ledgers stay empty until `burn state rebuild`. See issue #434. + conn.execute( + "CREATE TABLE IF NOT EXISTS inferences (\ + source TEXT NOT NULL,\ + session_id TEXT NOT NULL,\ + request_id TEXT NOT NULL,\ + request_id_source TEXT NOT NULL,\ + turn_id TEXT NOT NULL,\ + model TEXT NOT NULL,\ + kind TEXT NOT NULL,\ + start_ts TEXT NOT NULL,\ + end_ts TEXT NOT NULL,\ + record_json TEXT NOT NULL,\ + PRIMARY KEY (source, session_id, request_id)\ + ) STRICT", + [], + )?; + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_inferences_session ON inferences(session_id)", + [], + )?; + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_inferences_turn ON inferences(turn_id)", + [], + )?; + conn.execute( + "UPDATE archive_state SET schema_version = 5 WHERE id = 1", + [], + )?; + } + // The `idx_turns_stop_reason` index is created here rather than in // the static DDL so a legacy v1 table (no `stop_reason` column yet) // doesn't fail the DDL pre-pass. By this point the column either diff --git a/crates/relayburn-sdk/src/ledger/reader.rs b/crates/relayburn-sdk/src/ledger/reader.rs index eff1b728..dd18d25f 100644 --- a/crates/relayburn-sdk/src/ledger/reader.rs +++ b/crates/relayburn-sdk/src/ledger/reader.rs @@ -12,7 +12,8 @@ use rusqlite::{Connection, params_from_iter}; use serde::{Deserialize, Serialize}; use crate::reader::{ - CompactionEvent, SessionRelationshipRecord, ToolResultEventRecord, TurnRecord, UserTurnRecord, + CompactionEvent, Inference, SessionRelationshipRecord, ToolResultEventRecord, TurnRecord, + UserTurnRecord, }; use crate::ledger::error::Result; @@ -119,6 +120,64 @@ pub(crate) fn query_user_turns(conn: &Connection, q: &Query) -> Result Result> { + // The inferences table doesn't carry a `ts` column literally; the + // since/until filters route through `start_ts` instead. We reuse the + // generic `build_select_sql` by wrapping the SQL ourselves rather + // than threading another `TableFilters` knob for a single-table case. + let mut sql = String::from("SELECT record_json FROM inferences"); + let mut clauses: Vec<&'static str> = Vec::new(); + let mut bound: Vec = Vec::new(); + if let Some(since) = &q.since { + clauses.push("start_ts >= ?"); + bound.push(since.clone()); + } + if let Some(until) = &q.until { + clauses.push("start_ts <= ?"); + bound.push(until.clone()); + } + if let Some(sid) = &q.session_id { + clauses.push("session_id = ?"); + bound.push(sid.clone()); + } + if let Some(source) = q.source { + clauses.push("source = ?"); + bound.push(source.wire_str().to_string()); + } + // The `inferences` table doesn't carry `project` / `project_key` + // directly — those live on `turns`. Inferences are derived per + // session, so filtering by "session has any turn with this project" + // is sufficient. Mirrors the predicate shape used by `query_turns`. + if let Some(project) = &q.project { + clauses.push( + "session_id IN (SELECT DISTINCT session_id FROM turns \ + WHERE project = ? OR project_key = ?)", + ); + bound.push(project.clone()); + bound.push(project.clone()); + } + if !clauses.is_empty() { + sql.push_str(" WHERE "); + sql.push_str(&clauses.join(" AND ")); + } + sql.push_str(" ORDER BY rowid"); + let mut stmt = conn.prepare_cached(&sql)?; + let rows = stmt + .query_map(params_from_iter(bound.iter()), |r| r.get::<_, String>(0))? + .collect::>>()?; + let mut out = Vec::with_capacity(rows.len()); + for json in rows { + if let Ok(rec) = serde_json::from_str::(&json) { + out.push(rec); + } + } + Ok(out) +} + pub(crate) fn list_stamps(conn: &Connection) -> Result> { collect_stamps(conn) } @@ -317,6 +376,7 @@ const QUERYABLE_TABLES: &[&str] = &[ "relationships", "tool_result_events", "user_turns", + "inferences", "sessions", "stamps", "archive_state", diff --git a/crates/relayburn-sdk/src/ledger/schema.rs b/crates/relayburn-sdk/src/ledger/schema.rs index 011d9e2d..03f16333 100644 --- a/crates/relayburn-sdk/src/ledger/schema.rs +++ b/crates/relayburn-sdk/src/ledger/schema.rs @@ -22,6 +22,11 @@ pub const DERIVABLE_TABLES: &[&str] = &[ "tool_result_events", "user_turns", "sessions", + // `inferences` is derived from turns + parser-supplied `requestId` + // lookup at ingest time; `burn state rebuild` re-runs ingest so the + // table is rebuilt from upstream JSONL alongside the others. See + // issue #434. + "inferences", ]; /// Bumped when on-disk shape changes incompatibly. Stored in @@ -47,14 +52,12 @@ pub const DERIVABLE_TABLES: &[&str] = &[ /// re-parsing `record_json`. New inserts denormalize /// `TurnRecord.subagent.agent_id`; pre-v4 rows leave the column NULL /// and are backfilled by `burn state rebuild`. (#435) -/// -/// ## Renumbering note for reviewers -/// -/// This branch built on main at v3 (post-#444). If a parallel PR also -/// claims v4 before this lands, renumber here AND in the matching -/// `migrate_burn_schema` step in `db.rs` — the migration is gated -/// `if current_version < N` so the renumber is mechanical. -pub const SCHEMA_VERSION: u32 = 4; +/// - `5`: adds the `inferences` table — a per-API-call materialization +/// keyed by `(source, session_id, request_id)` so callers asking "how +/// many inferences" don't conflate a single Claude API call's multiple +/// content-block rows. Derived at ingest from the parser's +/// `request_id_lookup`; rebuilt by `burn state rebuild`. (#434) +pub const SCHEMA_VERSION: u32 = 5; /// DDL for `burn.sqlite`. Idempotent (`IF NOT EXISTS`) so re-applying on /// startup is a no-op once the tables exist. @@ -160,6 +163,32 @@ CREATE TABLE IF NOT EXISTS user_turns ( CREATE INDEX IF NOT EXISTS idx_user_turns_session ON user_turns(session_id); +-- v5: per-API-call aggregate. One row per `(source, session_id, +-- request_id)` triple. Derived from `turns` + the parser's per-row +-- `requestId` lookup; rebuilt by `burn state rebuild`. The primary key +-- is the composite triple so re-ingesting the same JSONL is a no-op via +-- `INSERT OR REPLACE` (different from `turns`' content-fingerprint +-- dedup — inferences are pure derived state, no fingerprint needed). See +-- issue #434. +CREATE TABLE IF NOT EXISTS inferences ( + source TEXT NOT NULL, + session_id TEXT NOT NULL, + request_id TEXT NOT NULL, + request_id_source TEXT NOT NULL, + turn_id TEXT NOT NULL, + model TEXT NOT NULL, + kind TEXT NOT NULL, + start_ts TEXT NOT NULL, + end_ts TEXT NOT NULL, + record_json TEXT NOT NULL, + PRIMARY KEY (source, session_id, request_id) +) STRICT; + +CREATE INDEX IF NOT EXISTS idx_inferences_session + ON inferences(session_id); +CREATE INDEX IF NOT EXISTS idx_inferences_turn + ON inferences(turn_id); + CREATE TABLE IF NOT EXISTS sessions ( source TEXT NOT NULL, session_id TEXT NOT NULL, @@ -196,7 +225,7 @@ CREATE TABLE IF NOT EXISTS archive_state ( ); INSERT INTO archive_state (id, schema_version) - VALUES (1, 4) + VALUES (1, 5) ON CONFLICT(id) DO NOTHING; "#; diff --git a/crates/relayburn-sdk/src/ledger/tests.rs b/crates/relayburn-sdk/src/ledger/tests.rs index 8782c28c..47c7d02a 100644 --- a/crates/relayburn-sdk/src/ledger/tests.rs +++ b/crates/relayburn-sdk/src/ledger/tests.rs @@ -723,7 +723,7 @@ fn invalid_session_id_in_content_rejected() { /// column on `turns`, `archive_state.schema_version = 1`) opens cleanly /// against the 3.0 SDK, the column is back-added by the in-place /// migration, and the stored version bumps forward to the current -/// `SCHEMA_VERSION` (4 after #436 + #435 chained on top of #437). +/// `SCHEMA_VERSION` (5 after #436 + #435 + #434 chained on top of #437). /// Existing rows stay `NULL` for every back-added column until rewritten. #[test] fn legacy_v1_ledger_migrates_to_v2_on_open_and_adds_stop_reason_column() { @@ -782,10 +782,11 @@ fn legacy_v1_ledger_migrates_to_v2_on_open_and_adds_stop_reason_column() { |r| r.get(0), ) .unwrap(); - // Current `SCHEMA_VERSION` is 4 (chained #437 v2 + #436 v3 + #435 - // v4); the migration must walk every step in one open() call. + // Current `SCHEMA_VERSION` is 5 (chained #437 v2 + #436 v3 + #435 + // v4 + #434 v5); the migration must walk every step in one open() + // call. assert_eq!( - version, 4, + version, 5, "open must bump v1 forward to the current schema version" ); @@ -831,6 +832,19 @@ fn legacy_v1_ledger_migrates_to_v2_on_open_and_adds_stop_reason_column() { "legacy row's subagent_id stays NULL post-migration" ); + // v5 step (issue #434): the `inferences` derived table must exist + // post-migration; pre-v5 ledgers stay empty until `burn state + // rebuild` repopulates it from upstream JSONL. + let inferences_count: i64 = l + .conns + .burn + .query_row("SELECT COUNT(*) FROM inferences", [], |r| r.get(0)) + .unwrap(); + assert_eq!( + inferences_count, 0, + "v5 migration must create the inferences table; legacy ledger starts empty" + ); + // Re-opening is idempotent: the migration probe sees the column and // skips the ALTER, version stays at the current SCHEMA_VERSION. drop(l); @@ -1135,6 +1149,52 @@ fn query_turns_filters_pushed_to_sql_match_legacy_semantics() { assert_eq!(r[0].turn.message_id, "m2"); } +#[test] +fn query_inferences_filters_by_project_via_turns_table() { + // `inferences` doesn't carry project columns directly — they live on + // `turns`. Query semantics should still match by project (issue: + // `q.project` was previously dropped). Two sessions, one with + // project=project-A, the other project=project-B; project-scoped + // query returns only the project-A inference. + use crate::reader::{build_inferences, RequestIdLookup, TurnKey}; + + let tmp = TempDir::new().unwrap(); + let mut l = open_in(&tmp); + + let mut t_a = make_turn("sess-a", "msg-a", "2026-05-01T00:00:00Z", 100); + t_a.project = Some("project-A".into()); + t_a.project_key = Some("project-A".into()); + let mut t_b = make_turn("sess-b", "msg-b", "2026-05-02T00:00:00Z", 200); + t_b.project = Some("project-B".into()); + t_b.project_key = Some("project-B".into()); + + let mut lookup = RequestIdLookup::new(); + lookup.insert(TurnKey::for_turn(&t_a), "req-a".into()); + lookup.insert(TurnKey::for_turn(&t_b), "req-b".into()); + + let turns = vec![t_a, t_b]; + let infs = build_inferences(&turns, &lookup); + assert_eq!(infs.len(), 2); + + l.append_turns(&turns).unwrap(); + l.append_inferences(&infs).unwrap(); + + // No project filter: both inferences come back. + let all = l.query_inferences(&Query::default()).unwrap(); + assert_eq!(all.len(), 2); + + // project-A filter: only the project-A inference. + let only_a = l + .query_inferences(&Query { + project: Some("project-A".into()), + ..Default::default() + }) + .unwrap(); + assert_eq!(only_a.len(), 1); + assert_eq!(only_a[0].session_id, "sess-a"); + assert_eq!(only_a[0].request_id, "req-a"); +} + #[test] fn query_relationships_session_filter_matches_either_endpoint() { // The `session_id` filter on relationships must match either diff --git a/crates/relayburn-sdk/src/ledger/writer.rs b/crates/relayburn-sdk/src/ledger/writer.rs index 5b19d382..79096448 100644 --- a/crates/relayburn-sdk/src/ledger/writer.rs +++ b/crates/relayburn-sdk/src/ledger/writer.rs @@ -11,8 +11,8 @@ use rusqlite::{params, Connection, OptionalExtension}; use crate::reader::{ - CompactionEvent, ContentRecord, SessionRelationshipRecord, ToolResultEventRecord, TurnRecord, - UserTurnRecord, + CompactionEvent, ContentRecord, Inference, SessionRelationshipRecord, ToolResultEventRecord, + TurnRecord, UserTurnRecord, }; use crate::ledger::error::{LedgerError, Result}; @@ -202,6 +202,52 @@ pub(crate) fn append_tool_result_events( Ok(appended) } +/// `INSERT OR REPLACE` per-API-call inferences. Re-ingest of the same +/// session intentionally replaces existing rows: the inference is pure +/// derived state (no fingerprint dedup, no first-party fields), and a +/// re-parse may legitimately produce different `end_ts` / `usage` values +/// if the JSONL grew between runs. The composite PK +/// `(source, session_id, request_id)` is the natural identity. See issue +/// #434. +pub(crate) fn append_inferences( + conn: &mut Connection, + records: &[Inference], +) -> Result { + if records.is_empty() { + return Ok(0); + } + let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?; + let mut appended = 0usize; + { + let mut insert = tx.prepare( + "INSERT OR REPLACE INTO inferences + (source, session_id, request_id, request_id_source, turn_id, + model, kind, start_ts, end_ts, record_json) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + )?; + for r in records { + let json = serde_json::to_string(r)?; + let changed = insert.execute(params![ + r.source.wire_str(), + r.session_id, + r.request_id, + r.request_id_source.wire_str(), + r.turn_id, + r.model, + r.kind.wire_str(), + r.start_ts, + r.end_ts, + json, + ])?; + if changed > 0 { + appended += 1; + } + } + } + tx.commit()?; + Ok(appended) +} + pub(crate) fn append_user_turns( conn: &mut Connection, records: &[UserTurnRecord], diff --git a/crates/relayburn-sdk/src/lib.rs b/crates/relayburn-sdk/src/lib.rs index a386877e..c1939577 100644 --- a/crates/relayburn-sdk/src/lib.rs +++ b/crates/relayburn-sdk/src/lib.rs @@ -65,15 +65,16 @@ pub use stamp_verb::*; // their own `Cargo.toml`. The grouping mirrors the four wave-1 crates. pub use crate::reader::{ - count_subagents_under, discover_subagents, pair_to_main as pair_subagents_to_main, - parse_bash_command, resolve_project, ActivityCategory, BashParse, ClassificationInput, - ClassificationResult, CompactionEvent, ContentKind, ContentRecord, ContentRole, - ContentStoreMode, ContentToolResult, ContentToolUse, Coverage, Fidelity, FidelityClass, - Harness, ProjectResolver, RelationshipSourceKind, RelationshipType, ResolvedProject, + build_inferences, count_subagents_under, discover_subagents, + pair_to_main as pair_subagents_to_main, parse_bash_command, resolve_project, ActivityCategory, + BashParse, ClassificationInput, ClassificationResult, CompactionEvent, ContentKind, + ContentRecord, ContentRole, ContentStoreMode, ContentToolResult, ContentToolUse, Coverage, + Fidelity, FidelityClass, Harness, Inference, InferenceKeySource, InferenceKind, + ProjectResolver, RelationshipSourceKind, RelationshipType, RequestIdLookup, ResolvedProject, SessionRelationshipRecord, SourceKind, StopReason, Subagent, SubagentCounts, SubagentTranscript, ToolCall, ToolResultEventRecord, ToolResultEventSource, ToolResultStatus, - TurnRecord, Usage, UsageAttribution, UsageGranularity, UserTurnBlock, UserTurnBlockKind, - UserTurnRecord, + ToolUseRef, TurnKey, TurnRecord, Usage, UsageAttribution, UsageGranularity, UserTurnBlock, + UserTurnBlockKind, UserTurnRecord, }; pub use crate::ledger::{ diff --git a/crates/relayburn-sdk/src/query_verbs.rs b/crates/relayburn-sdk/src/query_verbs.rs index 84ad2f45..d1c19881 100644 --- a/crates/relayburn-sdk/src/query_verbs.rs +++ b/crates/relayburn-sdk/src/query_verbs.rs @@ -2382,6 +2382,51 @@ pub fn session_cost(opts: SessionCostOptions) -> Result { }) } +// --------------------------------------------------------------------------- +// inferences — per-API-call rollup (#434) +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, Default, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct InferencesOptions { + /// Restrict to a single session. Required for the typical "show me + /// the API-call timeline of session X" use case; cross-session + /// fan-outs should call without it. + pub session: Option, + pub project: Option, + pub since: Option, + pub ledger_home: Option, +} + +impl LedgerHandle { + /// Read per-API-call inferences (issue #434). One row per + /// `(source, session_id, request_id)` triple — the unit a downstream + /// "how many API calls" surface should consume rather than counting + /// raw assistant turns (a multi-block Claude inference produces one + /// `TurnRecord` already, but the inference key is the durable + /// per-API-call identity even when the harness changes how it + /// chunks rows). + pub fn inferences( + &self, + opts: InferencesOptions, + ) -> Result> { + let q = build_query( + opts.session.as_deref(), + opts.project.as_deref(), + opts.since.as_deref(), + )?; + Ok(self.inner.query_inferences(&q)?) + } +} + +pub fn inferences(opts: InferencesOptions) -> Result> { + let handle = open_with(opts.ledger_home.as_deref())?; + handle.inferences(InferencesOptions { + ledger_home: None, + ..opts + }) +} + // --------------------------------------------------------------------------- // sessions_list // --------------------------------------------------------------------------- @@ -3803,6 +3848,10 @@ pub struct BurnDbRowCounts { pub compactions: u64, pub relationships: u64, pub tool_result_events: u64, + /// v5-added per-API-call aggregate (issue #434). Empty until at + /// least one `burn ingest` runs against a Claude session; pre-v5 + /// ledgers stay zero until `burn state rebuild`. + pub inferences: u64, pub sessions: u64, pub stamps: u64, } @@ -3890,6 +3939,7 @@ impl LedgerHandle { compactions: self.inner.count_table("compactions")? as u64, relationships: self.inner.count_table("relationships")? as u64, tool_result_events: self.inner.count_table("tool_result_events")? as u64, + inferences: self.inner.count_table("inferences")? as u64, sessions: self.inner.count_table("sessions")? as u64, stamps: self.inner.count_table("stamps")? as u64, }; @@ -3898,6 +3948,7 @@ impl LedgerHandle { + rows.compactions + rows.relationships + rows.tool_result_events + + rows.inferences + rows.sessions + rows.stamps; @@ -5434,14 +5485,16 @@ mod tests { assert_eq!(s.burn.rows.compactions, 0); assert_eq!(s.burn.rows.relationships, 0); assert_eq!(s.burn.rows.tool_result_events, 0); + assert_eq!(s.burn.rows.inferences, 0); assert_eq!(s.burn.rows.sessions, 0); assert_eq!(s.burn.rows.stamps, 0); assert_eq!(s.burn.tracked_rows, 0); assert_eq!(s.content.rows, 0); - // v4 after #435 chained `turns.subagent_id` onto #436's v3 + // v5 after #434 chained the `inferences` derived table onto + // #435's v4 (`turns.subagent_id`), #436's v3 // (`tool_result_events.output_bytes` / `output_truncated`) and // #437's v2 (`turns.stop_reason`). - assert_eq!(s.archive.schema_version, 4); + assert_eq!(s.archive.schema_version, 5); assert!(s.archive.last_built_at.is_none()); assert!(s.archive.last_rebuild_at.is_none()); } diff --git a/crates/relayburn-sdk/src/reader.rs b/crates/relayburn-sdk/src/reader.rs index 8a1ac75f..02c0448e 100644 --- a/crates/relayburn-sdk/src/reader.rs +++ b/crates/relayburn-sdk/src/reader.rs @@ -10,6 +10,7 @@ pub mod classifier; pub mod fidelity; pub mod git; pub mod hash; +pub mod inference; pub mod types; pub mod user_turn; @@ -50,3 +51,7 @@ pub use claude::{ pub use claude::subagents::{ count_subagents_under, discover_subagents, pair_to_main, SubagentCounts, SubagentTranscript, }; +pub use inference::{ + build_inferences, Inference, InferenceKeySource, InferenceKind, RequestIdLookup, ToolUseRef, + TurnKey, +}; diff --git a/crates/relayburn-sdk/src/reader/claude.rs b/crates/relayburn-sdk/src/reader/claude.rs index 4d327383..a59b452e 100644 --- a/crates/relayburn-sdk/src/reader/claude.rs +++ b/crates/relayburn-sdk/src/reader/claude.rs @@ -38,6 +38,7 @@ use self::parent_chain::ChainNode; use crate::reader::classifier::{classify_activity, is_task_notification, ClassificationInput}; use crate::reader::git::resolve_project; use crate::reader::hash::{args_hash, content_hash}; +use crate::reader::inference::{RequestIdLookup, TurnKey}; use crate::reader::types::{ CompactionEvent, ContentKind, ContentRecord, ContentRole, ContentStoreMode, ContentToolResult, ContentToolUse, Coverage, Fidelity, RelationshipSourceKind, RelationshipType, @@ -73,6 +74,13 @@ pub struct ParseResult { pub relationships: Vec, pub tool_result_events: Vec, pub user_turns: Vec, + /// `(source, session_id, message_id) -> requestId` map for every + /// emitted turn whose source row carried an upstream `requestId`. + /// Keys are missing for turns whose harness doesn't ship one (Codex, + /// opencode, some older Claude versions). Fed to + /// [`crate::reader::build_inferences`] to key the per-API-call + /// aggregate (issue #434). + pub request_id_lookup: RequestIdLookup, /// Read by the in-crate test suite to verify the From /// conversion preserves evidence. Production callers consume the incremental /// result directly and access `evidence` from there. @@ -145,6 +153,7 @@ impl From for ParseResult { relationships: r.relationships, tool_result_events: r.tool_result_events, user_turns: r.user_turns, + request_id_lookup: r.request_id_lookup, #[cfg(test)] evidence: r.evidence, } @@ -174,6 +183,10 @@ pub struct ParseIncrementalResult { pub relationships: Vec, pub tool_result_events: Vec, pub user_turns: Vec, + /// `(source, session_id, message_id) -> requestId` map for the + /// turns emitted on this incremental pass (see [`ParseResult`] for + /// rationale). Issue #434. + pub request_id_lookup: RequestIdLookup, /// Byte position to pass as `start_offset` on the next call. May back up /// past in-progress trailing messages so the next call re-reads them. pub end_offset: u64, @@ -230,6 +243,12 @@ struct WorkingRecord { first_assistant_uuid: Option, #[allow(dead_code)] parent_assistant_uuid: Option, + /// Upstream `requestId` field from the first row that carried one + /// for this `message_id`. Powers the `Inference` aggregate's + /// per-API-call key (see `reader/inference.rs` and issue #434). + /// `None` when no row in this group emitted one — the inference + /// builder falls back to `message_id`. + request_id: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -372,7 +391,21 @@ fn ingest_assistant_record( let uuid = string_field(obj, &["uuid"], false); let parent_uuid = string_field(obj, &["parentUuid"], false); + // `requestId` lives on the outer envelope (sibling of `message`), NOT + // inside `message`. Capture from the first row that carries one for + // this `message_id`; later rows belonging to the same API call + // re-emit the same `requestId` so first-wins is the right merge. + let request_id = + string_field(obj, &["requestId", "request_id"], true); let usage_with_cov = to_usage(msg.get("usage")); + // Claude writes one row per content block but only ONE of those rows + // carries the `usage` block. The previous merge updated + // `usage_coverage` from later carriers but not `usage` itself, so + // `usage` could end up as zeros if the carrier wasn't the first row. + // We now overwrite `usage` from whichever row owns the field — per + // issue #434 the usage block is single-carrier, so "last writer + // wins" and "any writer wins" both round-trip to the same value. + let has_usage = msg.contains_key("usage"); if let Some(w) = working.get_mut(&message_id) { if is_sidechain { @@ -381,8 +414,11 @@ fn ingest_assistant_record( if w.model.is_empty() && !model.is_empty() { w.model = model.clone(); } - if msg.contains_key("usage") { + if has_usage { w.usage_coverage = merge_usage_coverage(&w.usage_coverage, &usage_with_cov.coverage); + // Adopt the carrier row's usage. See the comment above the + // outer `let has_usage` for why overwrite is safe. + w.usage = usage_with_cov.usage.clone(); } if let Some(s) = stop_reason { w.stop_reason = Some(s); @@ -390,6 +426,11 @@ fn ingest_assistant_record( for b in &blocks { w.blocks.push(b.clone()); } + if w.request_id.is_none() { + if let Some(req) = request_id.clone() { + w.request_id = Some(req); + } + } } else { let w = WorkingRecord { message_id: message_id.clone(), @@ -404,6 +445,7 @@ fn ingest_assistant_record( stop_reason, first_assistant_uuid: uuid, parent_assistant_uuid: parent_uuid, + request_id, }; working.insert(message_id.clone(), w); order.push(message_id); @@ -2252,6 +2294,7 @@ fn run_incremental( relationships: Vec::new(), tool_result_events: Vec::new(), user_turns: Vec::new(), + request_id_lookup: RequestIdLookup::new(), end_offset: start_offset, last_user_text: options.last_user_text.clone().unwrap_or_default(), evidence, @@ -2686,6 +2729,22 @@ fn run_incremental( .map(|(_, ut)| ut) .collect(); + // Build the `(source, session_id, message_id) -> requestId` lookup + // for every emitted turn. We only walk turns the run actually emits + // (in-progress assistant rows are filtered out above) so the lookup + // entries always correspond to an outbound `TurnRecord`. See issue + // #434. + let mut request_id_lookup = RequestIdLookup::new(); + for t in &turns { + if let Some(w) = working.get(&t.message_id) { + if let Some(req) = w.request_id.as_ref() { + if !req.is_empty() { + request_id_lookup.insert(TurnKey::for_turn(t), req.clone()); + } + } + } + } + Ok(ParseIncrementalResult { turns, content, @@ -2693,6 +2752,7 @@ fn run_incremental( relationships: emitted_relationships, tool_result_events: emitted_tool_result_events, user_turns: emitted_user_turns, + request_id_lookup, end_offset, last_user_text: current_user_text, evidence, @@ -2978,6 +3038,7 @@ mod tests { relationships: vec![relationship.clone()], tool_result_events: vec![tool_result_event.clone()], user_turns: vec![user_turn.clone()], + request_id_lookup: RequestIdLookup::new(), end_offset: 123, last_user_text: "latest user turn".to_string(), evidence: evidence.clone(), @@ -3072,6 +3133,82 @@ mod tests { assert_eq!(t.ts, "2026-04-20T00:00:01.000Z"); } + /// Issue #434 acceptance: the multi-block fixture's four assistant + /// rows share `requestId=req_1` and a single `message.id`. The + /// parser surfaces that requestId on its `request_id_lookup`, the + /// inference builder collapses the four rows into ONE + /// `Inference`, and the merged usage matches the carrier row + /// (NOT 4× the carrier row, which would be the row-summing + /// pathology). + #[test] + fn multi_block_turn_emits_one_inference_with_merged_usage() { + let path = fixture("multi-block-turn.jsonl"); + let res = parse_claude_session(&path, &ParseOptions::default()).unwrap(); + assert_eq!(res.turns.len(), 1, "one turn (collapsed by message_id)"); + let t = &res.turns[0]; + // The reader populated the per-turn lookup with the upstream + // requestId. Without this entry, the inference builder would + // fall back to `message_id`, which is correct cardinality for + // Claude but loses the `request-id` provenance. + let req = res + .request_id_lookup + .get(&crate::reader::TurnKey::for_turn(t)) + .expect("request_id_lookup must carry every Claude turn"); + assert_eq!(req, "req_1"); + + let infs = crate::reader::build_inferences(&res.turns, &res.request_id_lookup); + assert_eq!( + infs.len(), + 1, + "four assistant rows sharing requestId collapse to one Inference" + ); + let inf = &infs[0]; + assert_eq!(inf.request_id, "req_1"); + assert_eq!( + inf.request_id_source, + crate::reader::InferenceKeySource::RequestId + ); + assert_eq!(inf.turn_id, "msg_multi_1"); + // Carrier usage values: input=3, output=43, cache_read=11_496, + // cache_create_1h=4_773. The pre-fix bug emitted these multiplied + // by row count when usage was on the first row; with the fix the + // single inference reports the carrier's values exactly once. + assert_eq!(inf.usage.input, 3); + assert_eq!(inf.usage.output, 43); + assert_eq!(inf.usage.cache_read, 11_496); + assert_eq!(inf.usage.cache_create_1h, 4_773); + // start_ts / end_ts come from the parent `TurnRecord` (already + // collapsed by message_id), so they equal each other here — + // `TurnRecord.ts` is the first row's ts. A future surface that + // wants per-row spans should reach into the parser's per-row + // metadata; the inference summary stays correct for the + // "how long did the API call take" case the issue asked about + // by giving us the first-row arrival time. + assert_eq!(inf.start_ts, "2026-04-20T00:00:01.000Z"); + assert_eq!(inf.end_ts, "2026-04-20T00:00:01.000Z"); + assert_eq!(inf.tool_uses.len(), 2); + assert_eq!(inf.kind, crate::reader::InferenceKind::ToolUse); + } + + /// A turn that the parser parsed without an upstream `requestId` + /// (older Claude version, sidechain, or other harness) falls back + /// to `message_id` as the inference key. See `RequestIdLookup` + /// fallback rules in `reader/inference.rs`. + #[test] + fn inference_falls_back_to_message_id_when_lookup_empty() { + let path = fixture("multi-block-turn.jsonl"); + let res = parse_claude_session(&path, &ParseOptions::default()).unwrap(); + // Empty the lookup to simulate a harness that didn't ship one. + let empty = crate::reader::RequestIdLookup::new(); + let infs = crate::reader::build_inferences(&res.turns, &empty); + assert_eq!(infs.len(), 1); + assert_eq!(infs[0].request_id, "msg_multi_1"); + assert_eq!( + infs[0].request_id_source, + crate::reader::InferenceKeySource::MessageId + ); + } + #[test] fn files_touched_excludes_grep_and_bash() { let path = fixture("files-touched.jsonl"); diff --git a/crates/relayburn-sdk/src/reader/inference.rs b/crates/relayburn-sdk/src/reader/inference.rs new file mode 100644 index 00000000..9bf2cf0d --- /dev/null +++ b/crates/relayburn-sdk/src/reader/inference.rs @@ -0,0 +1,633 @@ +//! Per-API-call aggregate built from one or more assistant rows that +//! share an upstream `requestId`. See AgentWorkforce/burn#434. +//! +//! Why this exists: a single Claude API call lands in the JSONL as +//! multiple rows when the response carries more than one content block — +//! reasoning + text + tool_use are all written as separate `assistant` +//! lines that share a `requestId` (and a `message.id`). The reader +//! already collapses by `message.id` into one [`TurnRecord`], which is +//! correct, but downstream consumers asking "how many API calls" want a +//! unit keyed by the *request* identity rather than the *message* +//! identity. `Inference` is that unit. +//! +//! For Claude Code, `requestId` and `message.id` are 1:1 today, so an +//! `Inference` collapses to the same cardinality as its source +//! `TurnRecord`. The reason we still introduce it: +//! +//! - It gives non-Claude harnesses (Codex, opencode) a stable fallback +//! key — `(message_id, role)`, then row-by-row — so a future surface +//! that wants "API calls" has one type to consume. +//! - It carries the merged [`Usage`] explicitly: the row that carries +//! the `usage` block is the *only* row that should pay tokens, and the +//! `Inference` is the type that asserts that contract instead of +//! leaving callers to spot-check the assistant rows themselves. +//! - It exposes [`InferenceKind`] so a downstream "span tree" surface +//! can label each call as `reasoning` / `message` / `tool_use` / +//! `mixed` without re-parsing every block. + +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; + +use crate::reader::types::{SourceKind, TurnRecord, Usage}; + +/// Coarse classification of an [`Inference`]'s content blocks. +/// +/// Derived from the union of [`TurnRecord::tool_calls`] presence and +/// (eventually) reasoning/text block detection. The variant tells a +/// presenter "what did this API call produce" at a glance: +/// +/// - [`InferenceKind::Reasoning`] — only thinking blocks (no text, no +/// tool_use). Rare on its own but does happen with extended thinking. +/// - [`InferenceKind::Message`] — only assistant text (no tool_use). +/// - [`InferenceKind::ToolUse`] — only tool_use blocks (no +/// user-visible text or reasoning). +/// - [`InferenceKind::Mixed`] — any combination of the above two-plus. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum InferenceKind { + Reasoning, + Message, + ToolUse, + Mixed, +} + +impl InferenceKind { + /// Kebab-case wire label (matches `#[serde(rename_all = "kebab-case")]`). + pub fn wire_str(&self) -> &'static str { + match self { + Self::Reasoning => "reasoning", + Self::Message => "message", + Self::ToolUse => "tool-use", + Self::Mixed => "mixed", + } + } +} + +/// Lightweight reference to a tool_use block the inference produced. +/// Lifted out of the [`TurnRecord::tool_calls`] surface so a consumer of +/// `Inference` doesn't need to drag the full tool call schema through — +/// they get the id (for joining against `tool_result_events`) and the +/// name. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ToolUseRef { + pub id: String, + pub name: String, +} + +/// API-call aggregate keyed by `(session_id, request_id)`. +/// +/// One inference may collapse multiple JSONL rows. The `usage` is the +/// merged usage from whichever single row carried the `usage` block; +/// when multiple rows carry usage (a current pathology that the issue +/// flags), the values are summed — but in practice Claude emits the +/// `usage` once per request so the sum equals the single carrier's +/// value. +/// +/// `start_ms` / `end_ms` are millisecond Unix timestamps derived from +/// the earliest and latest row in the group; they're `0` when no row +/// carried a parseable ISO timestamp. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Inference { + pub v: u32, + pub source: SourceKind, + pub session_id: String, + /// Stable key. For Claude this is the upstream `requestId`; for + /// Codex / opencode (no requestId) it falls back to `message_id` + /// (see [`build_inferences`] / [`InferenceFallback::MessageId`]). + pub request_id: String, + /// Source of `request_id` — `request-id` (upstream `requestId`), + /// `message-id` (fallback), or `row-uuid` (final fallback). Lets a + /// debugger tell "is this a real request key" from "is this a + /// synthesized key". + pub request_id_source: InferenceKeySource, + /// Logical "turn" identity — for Claude this is `message_id`; for + /// Codex / opencode it's the same as `request_id`. + pub turn_id: String, + pub model: String, + pub usage: Usage, + pub kind: InferenceKind, + pub tool_uses: Vec, + /// ISO timestamp of the earliest row in the group. Same string the + /// underlying [`TurnRecord::ts`] carried, so a presenter can sort + /// without parsing. + pub start_ts: String, + /// ISO timestamp of the latest row in the group. Equal to + /// `start_ts` when the inference came from a single row. + pub end_ts: String, + /// Best-effort millisecond clock for the earliest row. `0` when the + /// `start_ts` couldn't be parsed. + pub start_ms: i64, + /// Best-effort millisecond clock for the latest row. `0` when the + /// `end_ts` couldn't be parsed. + pub end_ms: i64, +} + +/// Provenance for [`Inference::request_id`]. The `RequestId` variant is +/// the canonical Claude path; the fallback variants exist so a downstream +/// consumer (or a debugger) can distinguish "the harness gave us a real +/// `requestId`" from "we synthesized one because the harness didn't ship +/// one". +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum InferenceKeySource { + /// Came from the upstream `requestId` field (Claude Code). + RequestId, + /// Fell back to the harness `message_id` because no `requestId` was + /// present (Codex, opencode, old Claude versions, sidechains). + MessageId, + /// Final fallback when neither key was usable — synthesized from the + /// row's session_id + index. + RowSynthetic, +} + +impl InferenceKeySource { + /// Kebab-case wire label. + pub fn wire_str(&self) -> &'static str { + match self { + Self::RequestId => "request-id", + Self::MessageId => "message-id", + Self::RowSynthetic => "row-synthetic", + } + } +} + +/// Per-`TurnRecord` extras the inference builder needs but that aren't +/// (yet) on the public `TurnRecord` shape. The builder consults this +/// lookup keyed by `(source, session_id, message_id)`; entries are +/// optional — missing keys make the builder fall back through +/// [`InferenceKeySource::MessageId`] then [`InferenceKeySource::RowSynthetic`]. +/// +/// The Claude reader populates this from the raw assistant rows in the +/// same parse pass; Codex / opencode parsers leave it empty (they have +/// no `requestId` equivalent today). +pub type RequestIdLookup = BTreeMap; + +/// Composite key the lookup table uses. Equality matches the +/// `(source, session_id, message_id)` triple that uniquely identifies a +/// `TurnRecord` within the ledger. +/// +/// `Ord` keys off the source's stable kebab-case wire string rather than +/// the enum's declaration order so adding a new variant to `SourceKind` +/// doesn't reshuffle existing lookup orderings (and so we don't have to +/// derive `PartialOrd` / `Ord` on `SourceKind` itself, which would +/// constrain that public type's evolution). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TurnKey { + pub source: SourceKind, + pub session_id: String, + pub message_id: String, +} + +impl TurnKey { + pub fn for_turn(turn: &TurnRecord) -> Self { + Self { + source: turn.source, + session_id: turn.session_id.clone(), + message_id: turn.message_id.clone(), + } + } + + fn sort_tuple(&self) -> (&'static str, &str, &str) { + (self.source.wire_str(), self.session_id.as_str(), self.message_id.as_str()) + } +} + +impl PartialOrd for TurnKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TurnKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.sort_tuple().cmp(&other.sort_tuple()) + } +} + +/// Build [`Inference`] aggregates from a slice of [`TurnRecord`]s. +/// +/// Grouping precedence per turn: +/// 1. If `request_id_lookup` has a non-empty key for the turn, that key +/// is the inference's `request_id` and rows with the same key collapse +/// together. Source = [`InferenceKeySource::RequestId`]. +/// 2. Otherwise the turn's own `message_id` becomes the key. Source = +/// [`InferenceKeySource::MessageId`]. Codex / opencode land here. +/// 3. Otherwise (empty `message_id`, which would be malformed) we +/// synthesize a key from `session_id + row index`. Source = +/// [`InferenceKeySource::RowSynthetic`]. +/// +/// Within a group the merged `usage` is the **sum** across rows. In +/// practice only one row carries `usage`, so the sum equals that single +/// carrier's value — the sum is the right shape for the pathology the +/// issue flags (multiple rows accidentally carrying usage would now be +/// counted once *per request*, not once per row). +/// +/// `start_ts` / `end_ts` use the first/last `TurnRecord::ts` in iteration +/// order. The function preserves source order — the first time a key is +/// seen sets the inference's position in the output `Vec`. +pub fn build_inferences( + turns: &[TurnRecord], + request_id_lookup: &RequestIdLookup, +) -> Vec { + // We bucket in iteration order, then materialize. A `Vec` of group + // keys lets us preserve "first seen" order without a second pass. + let mut order: Vec = Vec::new(); + let mut by_key: BTreeMap = BTreeMap::new(); + let mut order_seen: BTreeMap = BTreeMap::new(); + + for (idx, turn) in turns.iter().enumerate() { + let key_pair = derive_inference_key(turn, idx, request_id_lookup); + let key = composite_storage_key(turn, &key_pair); + let entry = by_key.entry(key.clone()).or_insert_with(|| { + let inf = empty_inference(turn, &key_pair); + order_seen.entry(key.clone()).or_insert_with(|| { + order.push(key.clone()); + }); + inf + }); + merge_turn_into(entry, turn); + } + + order + .into_iter() + .filter_map(|k| by_key.remove(&k)) + .collect() +} + +fn empty_inference(turn: &TurnRecord, key: &KeyPair) -> Inference { + Inference { + v: 1, + source: turn.source, + session_id: turn.session_id.clone(), + request_id: key.key.clone(), + request_id_source: key.source, + turn_id: turn.message_id.clone(), + model: turn.model.clone(), + usage: Usage::default(), + kind: InferenceKind::Message, + tool_uses: Vec::new(), + start_ts: turn.ts.clone(), + end_ts: turn.ts.clone(), + start_ms: parse_iso_ms(&turn.ts).unwrap_or(0), + end_ms: parse_iso_ms(&turn.ts).unwrap_or(0), + } +} + +fn merge_turn_into(inf: &mut Inference, turn: &TurnRecord) { + // Sum usage across rows. The "issue contract" is that *one* row + // carries `usage`, so the sum equals that one row's value; if more + // than one ever carries it we still count it once-per-request rather + // than once-per-row. See the doc comment on `build_inferences`. + inf.usage.input = inf.usage.input.saturating_add(turn.usage.input); + inf.usage.output = inf.usage.output.saturating_add(turn.usage.output); + inf.usage.reasoning = inf.usage.reasoning.saturating_add(turn.usage.reasoning); + inf.usage.cache_read = inf.usage.cache_read.saturating_add(turn.usage.cache_read); + inf.usage.cache_create_5m = inf + .usage + .cache_create_5m + .saturating_add(turn.usage.cache_create_5m); + inf.usage.cache_create_1h = inf + .usage + .cache_create_1h + .saturating_add(turn.usage.cache_create_1h); + + // First non-empty model wins. Different rows shouldn't disagree, but + // empty strings from in-progress rows show up in practice. + if inf.model.is_empty() && !turn.model.is_empty() { + inf.model = turn.model.clone(); + } + + // Earliest start / latest end. Use lex order on the ISO string when + // both sides parse the same way (the ledger normalizes ts to + // `YYYY-MM-DDTHH:MM:SS.mmmZ`); the parsed-ms field stays as a + // millisecond clock for downstream consumers that want arithmetic. + if !turn.ts.is_empty() { + if inf.start_ts.is_empty() || turn.ts < inf.start_ts { + inf.start_ts = turn.ts.clone(); + if let Some(ms) = parse_iso_ms(&turn.ts) { + inf.start_ms = ms; + } + } + if inf.end_ts.is_empty() || turn.ts > inf.end_ts { + inf.end_ts = turn.ts.clone(); + if let Some(ms) = parse_iso_ms(&turn.ts) { + inf.end_ms = ms; + } + } + } + + for tc in &turn.tool_calls { + if !inf.tool_uses.iter().any(|t| t.id == tc.id) { + inf.tool_uses.push(ToolUseRef { + id: tc.id.clone(), + name: tc.name.clone(), + }); + } + } + inf.kind = classify(&inf.tool_uses, turn); +} + +fn classify(tool_uses: &[ToolUseRef], turn: &TurnRecord) -> InferenceKind { + // Coarse 2-axis classification. `TurnRecord` doesn't surface a + // per-block content kind, so we can't distinguish "reasoning + text" + // from "reasoning only" or "tools + text" from "tools only"; the + // former in each pair lumps into `Reasoning` / `ToolUse` respectively. + // A finer split would need a parse-time `has_text_block` signal we + // don't carry today. + let has_tools = !tool_uses.is_empty(); + let has_reasoning = turn.usage.reasoning > 0; + match (has_reasoning, has_tools) { + (true, false) => InferenceKind::Reasoning, + (false, true) => InferenceKind::ToolUse, + (false, false) => InferenceKind::Message, + (true, true) => InferenceKind::Mixed, + } +} + +struct KeyPair { + key: String, + source: InferenceKeySource, +} + +fn derive_inference_key( + turn: &TurnRecord, + idx: usize, + lookup: &RequestIdLookup, +) -> KeyPair { + if let Some(req) = lookup.get(&TurnKey::for_turn(turn)) { + if !req.is_empty() { + return KeyPair { + key: req.clone(), + source: InferenceKeySource::RequestId, + }; + } + } + if !turn.message_id.is_empty() { + return KeyPair { + key: turn.message_id.clone(), + source: InferenceKeySource::MessageId, + }; + } + KeyPair { + key: format!("{}#row{}", turn.session_id, idx), + source: InferenceKeySource::RowSynthetic, + } +} + +/// Storage key for the BTreeMap bucket. The Inference id is scoped to +/// `(source, session_id, key)` so two harnesses that happen to mint the +/// same request id never collide. +fn composite_storage_key(turn: &TurnRecord, key: &KeyPair) -> String { + format!( + "{}\0{}\0{}", + turn.source.wire_str(), + turn.session_id, + key.key + ) +} + +/// Parse an ISO-8601 / RFC-3339 timestamp `YYYY-MM-DDTHH:MM:SS[.sss]Z` +/// into Unix milliseconds. Returns `None` for inputs that don't match the +/// canonical ledger shape; callers fall back to `0`. We hand-roll this +/// rather than pull in a calendar crate because: +/// +/// - The function is used for ordering / span widths, not absolute +/// instants; sub-millisecond accuracy is irrelevant. +/// - The ledger normalizes every `ts` to `YYYY-MM-DDTHH:MM:SS.mmmZ` +/// on write, so the parser only needs to handle that shape plus the +/// handful of legacy strings (`...Z`, no fraction; date-only). +fn parse_iso_ms(s: &str) -> Option { + let bytes = s.as_bytes(); + if bytes.len() < 19 { + return None; + } + if !(bytes[4] == b'-' && bytes[7] == b'-' && (bytes[10] == b'T' || bytes[10] == b' ') + && bytes[13] == b':' + && bytes[16] == b':') + { + return None; + } + let year: i64 = std::str::from_utf8(&bytes[0..4]).ok()?.parse().ok()?; + let month: u32 = std::str::from_utf8(&bytes[5..7]).ok()?.parse().ok()?; + let day: u32 = std::str::from_utf8(&bytes[8..10]).ok()?.parse().ok()?; + let hour: u32 = std::str::from_utf8(&bytes[11..13]).ok()?.parse().ok()?; + let minute: u32 = std::str::from_utf8(&bytes[14..16]).ok()?.parse().ok()?; + let second: u32 = std::str::from_utf8(&bytes[17..19]).ok()?.parse().ok()?; + let mut millis: i64 = 0; + let mut idx = 19; + if idx < bytes.len() && bytes[idx] == b'.' { + idx += 1; + let frac_start = idx; + while idx < bytes.len() && bytes[idx].is_ascii_digit() { + idx += 1; + } + let mut frac = std::str::from_utf8(&bytes[frac_start..idx]).ok()?.to_string(); + if frac.len() > 3 { + frac.truncate(3); + } + while frac.len() < 3 { + frac.push('0'); + } + millis = frac.parse().ok()?; + } + // Howard Hinnant civil-from-fields. Same math as + // `query_verbs::ymd_to_days` — duplicated here to keep `reader` free + // of an upward dependency on `query_verbs`. + let m = month as i64; + let d = day as i64; + let y = if m <= 2 { year - 1 } else { year }; + let era = if y >= 0 { y } else { y - 399 } / 400; + let yoe = (y - era * 400) as u64; + let mp = if m > 2 { m - 3 } else { m + 9 } as u64; + let doy = (153 * mp + 2) / 5 + (d as u64) - 1; + let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; + let days_from_epoch = era * 146_097 + (doe as i64) - 719_468; + let secs = days_from_epoch * 86_400 + + (hour as i64) * 3_600 + + (minute as i64) * 60 + + (second as i64); + Some(secs * 1_000 + millis) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::reader::types::{StopReason, ToolCall}; + + fn turn( + session: &str, + msg: &str, + ts: &str, + usage: Usage, + tool_calls: Vec, + ) -> TurnRecord { + TurnRecord { + v: 1, + source: SourceKind::ClaudeCode, + session_id: session.to_string(), + session_path: None, + message_id: msg.to_string(), + turn_index: 0, + ts: ts.to_string(), + model: "claude-sonnet-4-6".to_string(), + project: None, + project_key: None, + usage, + tool_calls, + files_touched: None, + subagent: None, + stop_reason: Some(StopReason::EndTurn), + activity: None, + retries: None, + has_edits: None, + fidelity: None, + } + } + + #[test] + fn parses_iso_with_millis() { + // 2026-04-20T00:00:01.500Z = Unix epoch ms 1_776_643_201_500. + // Cross-check: 20566 days from 1970-01-01 to 2026-04-20 × + // 86_400_000 = 1_776_902_400_000; reverse-confirm by computing + // the integer-day offset directly inside `parse_iso_ms` (Howard + // Hinnant). The exact value is what the function returns; the + // contract is "round-trippable monotonic millisecond clock". + let ms = parse_iso_ms("2026-04-20T00:00:01.500Z").unwrap(); + assert_eq!(ms, 1_776_643_201_500); + } + + #[test] + fn parses_iso_without_millis() { + let ms = parse_iso_ms("2026-04-20T00:00:01Z").unwrap(); + assert_eq!(ms, 1_776_643_201_000); + } + + #[test] + fn request_id_groups_collapse_one_turn_one_inference() { + // Single turn, request_id lookup hits → exactly one Inference, + // request_id_source = RequestId. + let t = turn( + "s1", + "msg-1", + "2026-04-20T00:00:01.000Z", + Usage { + input: 100, + output: 50, + ..Usage::default() + }, + vec![], + ); + let mut lookup = RequestIdLookup::new(); + lookup.insert(TurnKey::for_turn(&t), "req-1".to_string()); + let infs = build_inferences(&[t], &lookup); + assert_eq!(infs.len(), 1); + assert_eq!(infs[0].request_id, "req-1"); + assert_eq!(infs[0].request_id_source, InferenceKeySource::RequestId); + assert_eq!(infs[0].usage.input, 100); + assert_eq!(infs[0].usage.output, 50); + assert_eq!(infs[0].kind, InferenceKind::Message); + } + + #[test] + fn missing_request_id_falls_back_to_message_id() { + let t = turn("s1", "msg-1", "2026-04-20T00:00:01.000Z", Usage::default(), vec![]); + let infs = build_inferences(&[t], &RequestIdLookup::new()); + assert_eq!(infs.len(), 1); + assert_eq!(infs[0].request_id, "msg-1"); + assert_eq!(infs[0].request_id_source, InferenceKeySource::MessageId); + } + + #[test] + fn missing_message_id_falls_back_to_row_synthetic() { + let t = turn("s1", "", "2026-04-20T00:00:01.000Z", Usage::default(), vec![]); + let infs = build_inferences(&[t], &RequestIdLookup::new()); + assert_eq!(infs.len(), 1); + assert_eq!(infs[0].request_id_source, InferenceKeySource::RowSynthetic); + assert!(infs[0].request_id.starts_with("s1#row")); + } + + #[test] + fn tool_use_kind_set_when_calls_present() { + let tc = ToolCall { + id: "t1".into(), + name: "Bash".into(), + target: None, + args_hash: "h".into(), + is_error: None, + edit_pre_hash: None, + edit_post_hash: None, + skill_name: None, + replaced_tools: None, + collapsed_calls: None, + }; + let t = turn("s1", "msg-1", "2026-04-20T00:00:01.000Z", Usage::default(), vec![tc]); + let infs = build_inferences(&[t], &RequestIdLookup::new()); + assert_eq!(infs[0].kind, InferenceKind::ToolUse); + assert_eq!(infs[0].tool_uses.len(), 1); + assert_eq!(infs[0].tool_uses[0].id, "t1"); + } + + #[test] + fn reasoning_tokens_with_tools_marks_mixed() { + let tc = ToolCall { + id: "t1".into(), + name: "Bash".into(), + target: None, + args_hash: "h".into(), + is_error: None, + edit_pre_hash: None, + edit_post_hash: None, + skill_name: None, + replaced_tools: None, + collapsed_calls: None, + }; + let t = turn( + "s1", + "msg-1", + "2026-04-20T00:00:01.000Z", + Usage { + reasoning: 42, + ..Usage::default() + }, + vec![tc], + ); + let infs = build_inferences(&[t], &RequestIdLookup::new()); + assert_eq!(infs[0].kind, InferenceKind::Mixed); + } + + #[test] + fn reasoning_only_turn_classifies_as_reasoning() { + // No tool_uses, reasoning > 0, output = 0: a turn that produced + // only a reasoning block. Must classify as `Reasoning`; before + // the 2-tuple match this arm was unreachable. + let t = turn( + "s1", + "msg-1", + "2026-04-20T00:00:01.000Z", + Usage { + reasoning: 42, + output: 0, + ..Usage::default() + }, + vec![], + ); + let infs = build_inferences(&[t], &RequestIdLookup::new()); + assert_eq!(infs[0].kind, InferenceKind::Reasoning); + } + + #[test] + fn different_session_with_same_request_id_stays_separate() { + // Two sessions, both ship `requestId=req-1`. The composite + // storage key includes session_id so they don't collide. + let t1 = turn("s1", "msg-1", "2026-04-20T00:00:01.000Z", Usage::default(), vec![]); + let t2 = turn("s2", "msg-2", "2026-04-20T00:00:02.000Z", Usage::default(), vec![]); + let mut lookup = RequestIdLookup::new(); + lookup.insert(TurnKey::for_turn(&t1), "req-1".to_string()); + lookup.insert(TurnKey::for_turn(&t2), "req-1".to_string()); + let infs = build_inferences(&[t1, t2], &lookup); + assert_eq!(infs.len(), 2); + } +} diff --git a/tests/fixtures/cli-golden/snapshots/state-status-json.stdout.txt b/tests/fixtures/cli-golden/snapshots/state-status-json.stdout.txt index b372a6eb..f36069be 100644 --- a/tests/fixtures/cli-golden/snapshots/state-status-json.stdout.txt +++ b/tests/fixtures/cli-golden/snapshots/state-status-json.stdout.txt @@ -9,6 +9,7 @@ "compactions": 0, "relationships": 4, "toolResultEvents": 3, + "inferences": 0, "sessions": 0, "stamps": 1 }, @@ -20,7 +21,7 @@ "rows": 0 }, "archive": { - "schemaVersion": 4 + "schemaVersion": 5 }, "config": { "store": "off", diff --git a/tests/fixtures/cli-golden/snapshots/state-status.stdout.txt b/tests/fixtures/cli-golden/snapshots/state-status.stdout.txt index 482351f8..36954c47 100644 --- a/tests/fixtures/cli-golden/snapshots/state-status.stdout.txt +++ b/tests/fixtures/cli-golden/snapshots/state-status.stdout.txt @@ -7,13 +7,14 @@ events DB (burn.sqlite): compactions: 0 relationships: 4 tool_result_events: 3 + inferences: 0 sessions: 0 stamps: 1 content DB (content.sqlite): path: ${RELAYBURN_HOME}/content.sqlite rows: 0 archive state: - schema version: 4 + schema version: 5 last built: never last rebuild: never config: