Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ Cross-package release notes for relayburn. Package changelogs contain package-le

### Added

- `relayburn-sdk`: `Inference` aggregate keys per-API-call rollups by
`(source, session_id, request_id)` with merged usage and `kind`
(`reasoning` / `message` / `tool-use` / `mixed`). Read via
`LedgerHandle::inferences(opts)` (free function `inferences()` too);
persisted at ingest into the new `inferences` table. Falls back to
`message_id` for harnesses without a `requestId` (Codex, opencode,
older Claude). (#434)
- `burn summary`: one-line `Turn outcomes: …` breakdown of assistant
`stop_reason` counts, plus a `stopReasons` block in `--json`. (#437)
- Ledger fingerprint primitive (`{count}:{maxMtimeUnix}:{totalBytes}`) for
Expand Down Expand Up @@ -45,6 +52,15 @@ Cross-package release notes for relayburn. Package changelogs contain package-le
/ `output_truncated` columns (#436). Both are migrated in place on
`Ledger::open`; existing rows leave the new columns `NULL`. Run `burn
state rebuild` to backfill an older ledger.
- `relayburn-sdk` ledger schema bumps to v5: adds the `inferences`
derived table for per-API-call aggregates. Created idempotently on
open; rebuilt by `burn state rebuild`. Pre-v5 ledgers stay empty
until rebuild or the next ingest run. (#434)
- `relayburn-sdk`: Claude Code parser now correctly merges `usage`
from the carrier row of a multi-block assistant message. Previously,
if the row carrying the `usage` block was not the first row for a
given `message_id`, its tokens were dropped. The new merge adopts
the carrier row's usage values whichever row owns them. (#434)

## [2.10.0] - 2026-05-24

Expand Down
4 changes: 4 additions & 0 deletions crates/relayburn-cli/src/commands/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ fn format_status(s: &StateStatus) -> String {
" tool_result_events: {}\n",
format_uint(s.burn.rows.tool_result_events)
));
out.push_str(&format!(
" inferences: {}\n",
format_uint(s.burn.rows.inferences)
));
out.push_str(&format!(
" sessions: {}\n",
format_uint(s.burn.rows.sessions)
Expand Down
98 changes: 89 additions & 9 deletions crates/relayburn-sdk/src/ingest/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::reader::{
ContentRecord, ContentStoreMode, CumulativeUsage as ReaderCumulativeUsage,
ParseCodexIncrementalOptions, ParseCodexIncrementalResult, ParseOpencodeIncrementalOptions,
ParseOpencodeIncrementalResult, PersistedUserTurnSlot, ReconcileClaudeRelationshipsInput,
SessionRelationshipRecord, ToolResultEventRecord, UserTurnRecord,
SessionRelationshipRecord, ToolResultEventRecord, TurnRecord, UserTurnRecord,
};

use crate::ingest::cursors::{
Expand Down Expand Up @@ -933,9 +933,29 @@ trait DerivedRecords {
fn relationships(&self) -> &[SessionRelationshipRecord];
fn tool_result_events(&self) -> &[ToolResultEventRecord];
fn user_turns(&self) -> &[UserTurnRecord];
/// Per-turn upstream-`requestId` lookup, keyed by `(source,
/// session_id, message_id)`. Default returns an empty cow; harnesses
/// without a `requestId` equivalent (Codex, opencode) accept this
/// default and the inference builder falls back to `message_id`. See
/// issue #434 and `reader::inference`.
fn request_id_lookup(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> {
std::borrow::Cow::Owned(crate::reader::RequestIdLookup::new())
}
/// Turns the parser produced. The trailing-record appenders don't
/// strictly need it (the harness orchestrators count + append turns
/// themselves before calling [`apply_parsed_extras`]); the
/// `apply_parsed_extras` inference materializer uses it to rebuild
/// the per-API-call rows in lockstep with the persisted turns.
fn turns(&self) -> &[TurnRecord];
}

macro_rules! impl_derived_records {
/// Shared accessor body for the four parser-result types. The
/// `request_id_lookup` override is intentionally NOT in this macro —
/// Claude's two result types implement it directly below, while Codex /
/// opencode inherit the trait default empty `RequestIdLookup`. Splitting
/// the override out keeps the macro single-arm and avoids macro-level
/// boolean branching that `macro_rules!` doesn't natively support.
macro_rules! impl_derived_records_common {
($ty:ty) => {
impl DerivedRecords for $ty {
fn content(&self) -> &[ContentRecord] {
Expand All @@ -953,19 +973,66 @@ macro_rules! impl_derived_records {
fn user_turns(&self) -> &[UserTurnRecord] {
&self.user_turns
}
fn turns(&self) -> &[TurnRecord] {
&self.turns
}
fn request_id_lookup(
&self,
) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> {
// Default: empty lookup (Codex / opencode). Claude
// overrides this in the per-impl block below; we can't
// express that override via a single macro arm because
// `macro_rules!` literals don't dispatch.
claude_request_id_lookup_for(self)
}
}
};
}

impl_derived_records!(ClaudeParseResult);
impl_derived_records!(ClaudeParseIncrementalResult);
impl_derived_records!(ParseCodexIncrementalResult);
impl_derived_records!(ParseOpencodeIncrementalResult);
impl_derived_records_common!(ClaudeParseResult);
impl_derived_records_common!(ClaudeParseIncrementalResult);
impl_derived_records_common!(ParseCodexIncrementalResult);
impl_derived_records_common!(ParseOpencodeIncrementalResult);

/// Per-type adapter for the `request_id_lookup` override (issue #434).
/// Specialized for Claude's two result types so they borrow the parser's
/// real lookup; the generic fallback returns an empty owned lookup so
/// Codex / opencode behave as "no requestId".
trait ClaudeRequestIdSource {
fn lookup_cow(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup>;
}

impl ClaudeRequestIdSource for ClaudeParseResult {
fn lookup_cow(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> {
std::borrow::Cow::Borrowed(&self.request_id_lookup)
}
}
impl ClaudeRequestIdSource for ClaudeParseIncrementalResult {
fn lookup_cow(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> {
std::borrow::Cow::Borrowed(&self.request_id_lookup)
}
}
impl ClaudeRequestIdSource for ParseCodexIncrementalResult {
fn lookup_cow(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> {
std::borrow::Cow::Owned(crate::reader::RequestIdLookup::new())
}
}
impl ClaudeRequestIdSource for ParseOpencodeIncrementalResult {
fn lookup_cow(&self) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> {
std::borrow::Cow::Owned(crate::reader::RequestIdLookup::new())
}
}

fn claude_request_id_lookup_for<P: ClaudeRequestIdSource + ?Sized>(
p: &P,
) -> std::borrow::Cow<'_, crate::reader::RequestIdLookup> {
p.lookup_cow()
}

/// Append the trailing derived-record buckets shared by every parser
/// result: content, compactions, relationships, tool-result events, and
/// user-turn rows. Each bucket is gated on non-empty to avoid a no-op
/// transaction.
/// result: content, compactions, relationships, tool-result events,
/// user-turn rows, and (issue #434) per-API-call inferences. Each
/// bucket is gated on non-empty to avoid a no-op transaction.
fn apply_parsed_extras<P: DerivedRecords>(ledger: &mut Ledger, p: &P) -> anyhow::Result<()> {
if !p.content().is_empty() {
ledger.append_content(p.content())?;
Expand All @@ -982,6 +1049,19 @@ fn apply_parsed_extras<P: DerivedRecords>(ledger: &mut Ledger, p: &P) -> anyhow:
if !p.user_turns().is_empty() {
ledger.append_user_turns(p.user_turns())?;
}
// Materialize inferences from the same turn slice the orchestrator
// appended. Building here (not in the parser) keeps the inference
// table in lockstep with what actually got persisted — if a turn
// was deduped at append time by the content-fingerprint check, its
// inference will simply re-replace the prior row via the
// `INSERT OR REPLACE` writer, which is the correct steady-state.
if !p.turns().is_empty() {
let lookup = p.request_id_lookup();
let inferences = crate::reader::build_inferences(p.turns(), lookup.as_ref());
if !inferences.is_empty() {
ledger.append_inferences(&inferences)?;
}
}
Ok(())
}

Expand Down
19 changes: 19 additions & 0 deletions crates/relayburn-sdk/src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ impl Ledger {
writer::append_user_turns(&mut self.conns.burn, records)
}

/// Append per-API-call inferences (see issue #434). Re-ingest of the
/// same `(source, session_id, request_id)` triple replaces the
/// existing row — inferences are pure derived state and a re-parse
/// can legitimately produce updated `end_ts` / merged `usage`
/// values.
pub fn append_inferences(
&mut self,
records: &[crate::reader::Inference],
) -> Result<usize> {
writer::append_inferences(&mut self.conns.burn, records)
}

pub fn append_stamp(&mut self, stamp: &Stamp) -> Result<()> {
writer::append_stamp(&mut self.conns.burn, stamp)
}
Expand Down Expand Up @@ -151,6 +163,13 @@ impl Ledger {
reader::query_user_turns(&self.conns.burn, q)
}

/// Read per-API-call inferences for the given query window. See
/// [`Self::append_inferences`] for the underlying table semantics
/// (issue #434).
pub fn query_inferences(&self, q: &Query) -> Result<Vec<crate::reader::Inference>> {
reader::query_inferences(&self.conns.burn, q)
}

pub fn list_stamps(&self) -> Result<Vec<Stamp>> {
reader::list_stamps(&self.conns.burn)
}
Expand Down
40 changes: 40 additions & 0 deletions crates/relayburn-sdk/src/ledger/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,46 @@ fn migrate_burn_schema(conn: &Connection) -> Result<()> {
)?;
}

if current_version < 5 {
// v4 → v5: add the `inferences` derived table. The full DDL is
// baked into `BURN_DDL`'s `CREATE TABLE IF NOT EXISTS` block so a
// fresh open is already correct; here we only need to make sure
// the table EXISTS on legacy DBs whose `BURN_DDL` pre-pass might
// have happened against the pre-v5 schema (e.g. if a future
// migration step changes ordering). The CREATE is idempotent on
// every open, so re-running this step costs only a catalog
// probe. The table is populated by the ingest pipeline; pre-v5
// ledgers stay empty until `burn state rebuild`. See issue #434.
conn.execute(
"CREATE TABLE IF NOT EXISTS inferences (\
source TEXT NOT NULL,\
session_id TEXT NOT NULL,\
request_id TEXT NOT NULL,\
request_id_source TEXT NOT NULL,\
turn_id TEXT NOT NULL,\
model TEXT NOT NULL,\
kind TEXT NOT NULL,\
start_ts TEXT NOT NULL,\
end_ts TEXT NOT NULL,\
record_json TEXT NOT NULL,\
PRIMARY KEY (source, session_id, request_id)\
) STRICT",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_inferences_session ON inferences(session_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_inferences_turn ON inferences(turn_id)",
[],
)?;
conn.execute(
"UPDATE archive_state SET schema_version = 5 WHERE id = 1",
[],
)?;
}

// The `idx_turns_stop_reason` index is created here rather than in
// the static DDL so a legacy v1 table (no `stop_reason` column yet)
// doesn't fail the DDL pre-pass. By this point the column either
Expand Down
62 changes: 61 additions & 1 deletion crates/relayburn-sdk/src/ledger/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use rusqlite::{Connection, params_from_iter};
use serde::{Deserialize, Serialize};

use crate::reader::{
CompactionEvent, SessionRelationshipRecord, ToolResultEventRecord, TurnRecord, UserTurnRecord,
CompactionEvent, Inference, SessionRelationshipRecord, ToolResultEventRecord, TurnRecord,
UserTurnRecord,
};

use crate::ledger::error::Result;
Expand Down Expand Up @@ -119,6 +120,64 @@ pub(crate) fn query_user_turns(conn: &Connection, q: &Query) -> Result<Vec<UserT
)
}

/// Read per-API-call inferences, applying the standard `Query` filters
/// (since / until / session_id / source). The `inferences` table stores
/// `start_ts` as `ts` for filter purposes — earliest row in the call
/// wins for "did anything happen in this window".
pub(crate) fn query_inferences(conn: &Connection, q: &Query) -> Result<Vec<Inference>> {
// The inferences table doesn't carry a `ts` column literally; the
// since/until filters route through `start_ts` instead. We reuse the
// generic `build_select_sql` by wrapping the SQL ourselves rather
// than threading another `TableFilters` knob for a single-table case.
let mut sql = String::from("SELECT record_json FROM inferences");
let mut clauses: Vec<&'static str> = Vec::new();
let mut bound: Vec<String> = Vec::new();
if let Some(since) = &q.since {
clauses.push("start_ts >= ?");
bound.push(since.clone());
}
if let Some(until) = &q.until {
clauses.push("start_ts <= ?");
bound.push(until.clone());
}
if let Some(sid) = &q.session_id {
clauses.push("session_id = ?");
bound.push(sid.clone());
}
if let Some(source) = q.source {
clauses.push("source = ?");
bound.push(source.wire_str().to_string());
}
Comment on lines +147 to +150

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Apply project filter when querying inferences

InferencesOptions accepts project, and LedgerHandle::inferences builds a Query with that field, but query_inferences never applies q.project in SQL. This means callers requesting project-scoped results will receive cross-project inferences, which breaks expected filtering behavior for multi-project ledgers.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ff7bfaa. The inferences table doesn't carry project / project_key columns directly — those live on turns — so the filter is applied via a subquery: session_id IN (SELECT DISTINCT session_id FROM turns WHERE project = ? OR project_key = ?). Mirrors the predicate shape query_turns already uses (matches against either column). Added a regression test that ingests two sessions with distinct projects and asserts the project-scoped query returns only the matching one.


Generated by Claude Code

// The `inferences` table doesn't carry `project` / `project_key`
// directly — those live on `turns`. Inferences are derived per
// session, so filtering by "session has any turn with this project"
// is sufficient. Mirrors the predicate shape used by `query_turns`.
if let Some(project) = &q.project {
clauses.push(
"session_id IN (SELECT DISTINCT session_id FROM turns \
WHERE project = ? OR project_key = ?)",
);
bound.push(project.clone());
bound.push(project.clone());
}
if !clauses.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(&clauses.join(" AND "));
}
sql.push_str(" ORDER BY rowid");
let mut stmt = conn.prepare_cached(&sql)?;
let rows = stmt
.query_map(params_from_iter(bound.iter()), |r| r.get::<_, String>(0))?
.collect::<rusqlite::Result<Vec<_>>>()?;
let mut out = Vec::with_capacity(rows.len());
for json in rows {
if let Ok(rec) = serde_json::from_str::<Inference>(&json) {
out.push(rec);
}
}
Ok(out)
}

pub(crate) fn list_stamps(conn: &Connection) -> Result<Vec<Stamp>> {
collect_stamps(conn)
}
Expand Down Expand Up @@ -317,6 +376,7 @@ const QUERYABLE_TABLES: &[&str] = &[
"relationships",
"tool_result_events",
"user_turns",
"inferences",
"sessions",
"stamps",
"archive_state",
Expand Down
Loading
Loading