diff --git a/crates/relayburn-reader/src/claude.rs b/crates/relayburn-reader/src/claude.rs index 0b3e1821..93bb847d 100644 --- a/crates/relayburn-reader/src/claude.rs +++ b/crates/relayburn-reader/src/claude.rs @@ -1,8 +1,7 @@ //! Claude Code session parser — Rust port of `packages/reader/src/claude.ts`. //! -//! Covers `parse_claude_session` and `reconcile_claude_session_relationships`. -//! The incremental entry point (`parseClaudeSessionIncremental`) is scaffolded -//! but not yet ported — see #255 follow-ups. +//! Covers `parse_claude_session`, `parse_claude_session_incremental`, and +//! `reconcile_claude_session_relationships`. //! //! The on-disk JSONL has a very loose shape (any extra fields permitted, any //! field can be absent), so we keep raw lines as `serde_json::Value` and use @@ -11,7 +10,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs::File; -use std::io::{BufRead, BufReader}; +use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; use std::path::Path; use serde_json::Value; @@ -109,6 +108,59 @@ pub fn parse_claude_session_with_counter, C: TokenCounter + ?Size Ok(state.finish(options, capture_content)) } +#[derive(Debug, Clone, Default)] +pub struct ParseIncrementalOptions { + pub session_path: Option, + pub content_mode: Option, + pub tokenizer: Option, + pub file_session_id: Option, + /// Byte offset to resume parsing from. The previous incremental call's + /// `end_offset` is the right value to pass. + pub start_offset: Option, + /// The most recent user prompt text seen before `start_offset`. Carried + /// forward from the prior call's result so an in-progress turn whose user + /// prompt was before the resume cursor still classifies against the + /// prompt for keyword refinement. + pub last_user_text: Option, +} + +#[derive(Debug, Clone)] +pub struct ParseIncrementalResult { + pub turns: Vec, + pub content: Vec, + pub events: Vec, + pub relationships: Vec, + pub tool_result_events: Vec, + pub user_turns: Vec, + /// Byte position to pass as `start_offset` on the next call. May back up + /// past in-progress trailing messages so the next call re-reads them. + pub end_offset: u64, + /// Carry forward to the next call's `last_user_text` option. + pub last_user_text: String, + pub evidence: ClaudeRelationshipEvidence, +} + +/// Synchronous Rust counterpart of `parseClaudeSessionIncremental`. Reads the +/// file from `start_offset` and emits only records that lie strictly before +/// the returned `end_offset` so the next call (resumed at `end_offset`) does +/// not double-emit. Trailing in-progress messages back up `end_offset` to the +/// byte position of the earliest in-progress assistant line. +pub fn parse_claude_session_incremental>( + path: P, + options: &ParseIncrementalOptions, +) -> std::io::Result { + let counter = HeuristicCounter; + parse_claude_session_incremental_with_counter(path, options, &counter) +} + +pub fn parse_claude_session_incremental_with_counter, C: TokenCounter + ?Sized>( + path: P, + options: &ParseIncrementalOptions, + counter: &C, +) -> std::io::Result { + run_incremental(path.as_ref(), options, counter) +} + // --------------------------------------------------------------------------- // Internal parse state. // --------------------------------------------------------------------------- @@ -2229,22 +2281,589 @@ fn has_explicit_target(targets: &Option>, session_id: &str) -> bool .is_some_and(|t| t.iter().any(|s| s == session_id)) } +// --------------------------------------------------------------------------- +// Incremental parser. +// --------------------------------------------------------------------------- + +struct PrescanOutput { + last_assistant_message_id: Option, + next_event_index: u64, +} + +/// Pre-read the already-ingested prefix `[0, end_offset)` so a resumed call +/// has the same node graph, evidence, tool-result counters, event index, and +/// last-assistant-message-id it would have if it had started from byte 0. +/// Mirrors `prescanNodes` in `packages/reader/src/claude.ts`. +fn prescan_nodes( + path: &Path, + end_offset: u64, + nodes_by_uuid: &mut HashMap, + evidence: &mut ClaudeRelationshipEvidence, + tool_result_counters: &mut HashMap, +) -> std::io::Result { + if end_offset == 0 { + return Ok(PrescanOutput { + last_assistant_message_id: None, + next_event_index: 0, + }); + } + let mut file = File::open(path)?; + let size = file.metadata()?.len(); + let length = end_offset.min(size); + if length == 0 { + return Ok(PrescanOutput { + last_assistant_message_id: None, + next_event_index: 0, + }); + } + let mut buf = vec![0u8; length as usize]; + file.read_exact(&mut buf)?; + let mut p: usize = 0; + let mut last_assistant_message_id: Option = None; + let mut next_event_index: u64 = 0; + while p < buf.len() { + let nl_idx = match buf[p..].iter().position(|&b| b == b'\n') { + Some(i) => p + i, + None => break, + }; + let raw = std::str::from_utf8(&buf[p..nl_idx]).unwrap_or("").trim(); + p = nl_idx + 1; + if raw.is_empty() { + continue; + } + let parsed: Value = match serde_json::from_str(raw) { + Ok(v) => v, + Err(_) => continue, + }; + let obj = match parsed.as_object() { + Some(o) => o.clone(), + None => continue, + }; + let line_type = obj.get("type").and_then(Value::as_str).unwrap_or(""); + match line_type { + "assistant" => { + register_assistant_node(&parsed, nodes_by_uuid); + record_evidence_from_line(evidence, &parsed); + record_explicit_relationship_evidence(evidence, &obj); + if let Some(mid) = obj + .get("message") + .and_then(|m| m.get("id")) + .and_then(Value::as_str) + { + last_assistant_message_id = Some(mid.to_string()); + } + } + "user" => { + register_user_node(&parsed, nodes_by_uuid); + record_evidence_from_line(evidence, &parsed); + record_explicit_relationship_evidence(evidence, &obj); + record_resume_marker(evidence, &obj); + let mut harvested: Vec = Vec::new(); + next_event_index = collect_tool_result_events( + &obj, + &mut harvested, + tool_result_counters, + next_event_index, + ); + } + "system" => { + if build_claude_system_tool_result_event( + &obj, + tool_result_counters, + next_event_index, + ) + .is_some() + { + next_event_index += 1; + } + } + _ => {} + } + } + Ok(PrescanOutput { + last_assistant_message_id, + next_event_index, + }) +} + +fn record_root_incremental( + out: &mut Vec<(u64, SessionRelationshipRecord)>, + seen: &mut HashSet, + session_id: &str, + ts: Option<&str>, + line_offset: u64, + file_session_id: Option<&str>, +) { + let canonical = file_session_id.unwrap_or(session_id).to_string(); + if !seen.insert(canonical.clone()) { + return; + } + let mut row = SessionRelationshipRecord { + v: 1, + source: RelationshipSourceKind::ClaudeCode, + session_id: canonical, + related_session_id: None, + relationship_type: RelationshipType::Root, + ts: None, + source_session_id: None, + source_version: None, + parent_tool_use_id: None, + agent_id: None, + subagent_type: None, + description: None, + }; + if let Some(t) = ts { + if !t.is_empty() { + row.ts = Some(t.to_string()); + } + } + out.push((line_offset, row)); +} + +fn collect_explicit_claude_relationships_incremental( + line: &serde_json::Map, + evidence: &mut ClaudeRelationshipEvidence, + out: &mut Vec<(u64, SessionRelationshipRecord)>, + seen: &mut HashSet, + session_id: &str, + fallback_ts: Option<&str>, + line_offset: u64, +) { + record_explicit_relationship_evidence(evidence, line); + for row in build_explicit_claude_relationships(line, session_id, fallback_ts) { + let key = relationship_key(&row); + if !seen.insert(key) { + continue; + } + out.push((line_offset, row)); + } +} + +fn run_incremental( + path: &Path, + options: &ParseIncrementalOptions, + counter: &C, +) -> std::io::Result { + let start_offset = options.start_offset.unwrap_or(0); + let content_mode = options.content_mode.unwrap_or(ContentStoreMode::Off); + let capture_content = matches!(content_mode, ContentStoreMode::Full); + + let file_session_id = derive_file_session_id_from_parts( + options.file_session_id.as_deref(), + options.session_path.as_deref(), + ); + let mut evidence = new_evidence(file_session_id.clone()); + + let metadata = std::fs::metadata(path)?; + let size = metadata.len(); + if start_offset >= size { + return Ok(ParseIncrementalResult { + turns: Vec::new(), + content: Vec::new(), + events: Vec::new(), + relationships: Vec::new(), + tool_result_events: Vec::new(), + user_turns: Vec::new(), + end_offset: start_offset, + last_user_text: options.last_user_text.clone().unwrap_or_default(), + evidence, + }); + } + + let mut nodes_by_uuid: HashMap = HashMap::new(); + let mut invocation_cache: HashMap> = HashMap::new(); + let mut tool_result_counters: HashMap = HashMap::new(); + let mut next_event_index: u64 = 0; + let mut last_assistant_message_id: Option = None; + + if start_offset > 0 { + let pre = prescan_nodes( + path, + start_offset, + &mut nodes_by_uuid, + &mut evidence, + &mut tool_result_counters, + )?; + last_assistant_message_id = pre.last_assistant_message_id; + next_event_index = pre.next_event_index; + } + + // -1 sentinel: resume marker came from the prescan (definitely emit). + // u64::MAX sentinel: no resume marker yet seen. + // Otherwise: byte offset of the line that first set the marker on this pass. + let mut resume_marker_offset: u64 = if evidence.has_resume_marker { + 0 + } else { + u64::MAX + }; + + let mut current_user_text = options.last_user_text.clone().unwrap_or_default(); + + let mut working: HashMap = HashMap::new(); + let mut order: Vec = Vec::new(); + let mut message_id_first_offset: HashMap = HashMap::new(); + let mut user_text_by_message_id: HashMap = HashMap::new(); + let mut errored_tool_use_ids: HashSet = HashSet::new(); + let mut replacement_meta_by_tool_use_id: HashMap = HashMap::new(); + let mut events: Vec<(u64, CompactionEvent)> = Vec::new(); + let mut pending_user_content: Vec<(u64, ContentRecord)> = Vec::new(); + let mut pending_tool_result_events: Vec<(u64, ToolResultEventRecord)> = Vec::new(); + let mut pending_relationships: Vec<(u64, SessionRelationshipRecord)> = Vec::new(); + let mut pending_user_turns: Vec<(u64, UserTurnRecord)> = Vec::new(); + let mut seen_root_session_ids: HashSet = HashSet::new(); + let mut seen_explicit_relationship_ids: HashSet = HashSet::new(); + let mut pending_user_turn_inc_idx: Option = None; + + let mut file = File::open(path)?; + file.seek(SeekFrom::Start(start_offset))?; + let mut buf: Vec = Vec::with_capacity((size - start_offset) as usize); + file.read_to_end(&mut buf)?; + + let mut p: usize = 0; + let mut cursor_offset: u64 = start_offset; // position past last complete \n + while p < buf.len() { + let nl_idx = match buf[p..].iter().position(|&b| b == b'\n') { + Some(i) => p + i, + None => break, + }; + let line_start_offset = start_offset + p as u64; + let line_end_offset = start_offset + nl_idx as u64 + 1; + let trimmed = std::str::from_utf8(&buf[p..nl_idx]).unwrap_or("").trim(); + p = nl_idx + 1; + cursor_offset = line_end_offset; + if trimmed.is_empty() { + continue; + } + let parsed: Value = match serde_json::from_str(trimmed) { + Ok(v) => v, + Err(_) => continue, + }; + let obj = match parsed.as_object() { + Some(o) => o.clone(), + None => continue, + }; + let line_type = obj.get("type").and_then(Value::as_str).unwrap_or(""); + match line_type { + "assistant" => { + let mid = obj + .get("message") + .and_then(|m| m.get("id")) + .and_then(Value::as_str) + .map(str::to_string); + if let Some(ref mid_str) = mid { + if let Some(idx) = pending_user_turn_inc_idx { + if !message_id_first_offset.contains_key(mid_str) { + pending_user_turns[idx].1.following_message_id = + Some(mid_str.clone()); + pending_user_turn_inc_idx = None; + } + } + message_id_first_offset + .entry(mid_str.clone()) + .or_insert(line_start_offset); + user_text_by_message_id + .entry(mid_str.clone()) + .or_insert_with(|| current_user_text.clone()); + last_assistant_message_id = Some(mid_str.clone()); + } + let session_id = string_field(&obj, "sessionId"); + let timestamp = string_field(&obj, "timestamp"); + if let Some(ref sid) = session_id { + if !sid.is_empty() { + record_root_incremental( + &mut pending_relationships, + &mut seen_root_session_ids, + sid, + timestamp.as_deref(), + line_start_offset, + file_session_id.as_deref(), + ); + collect_explicit_claude_relationships_incremental( + &obj, + &mut evidence, + &mut pending_relationships, + &mut seen_explicit_relationship_ids, + file_session_id.as_deref().unwrap_or(sid.as_str()), + timestamp.as_deref(), + line_start_offset, + ); + } + } + record_evidence_from_line(&mut evidence, &parsed); + ingest_assistant_record( + &parsed, + &obj, + &mut working, + &mut order, + &mut nodes_by_uuid, + ); + } + "user" => { + register_user_node(&parsed, &mut nodes_by_uuid); + if let Some(text) = extract_plain_user_text_from_obj(&obj) { + if !text.is_empty() { + current_user_text = text; + } + } + collect_errored_tool_use_ids(&obj, &mut errored_tool_use_ids); + collect_replacement_meta(&obj, &mut replacement_meta_by_tool_use_id); + let session_id = string_field(&obj, "sessionId"); + let timestamp = string_field(&obj, "timestamp"); + if let Some(ref sid) = session_id { + if !sid.is_empty() { + record_root_incremental( + &mut pending_relationships, + &mut seen_root_session_ids, + sid, + timestamp.as_deref(), + line_start_offset, + file_session_id.as_deref(), + ); + collect_explicit_claude_relationships_incremental( + &obj, + &mut evidence, + &mut pending_relationships, + &mut seen_explicit_relationship_ids, + file_session_id.as_deref().unwrap_or(sid.as_str()), + timestamp.as_deref(), + line_start_offset, + ); + } + } + record_evidence_from_line(&mut evidence, &parsed); + let had_resume_before = evidence.has_resume_marker; + record_resume_marker(&mut evidence, &obj); + if !had_resume_before && evidence.has_resume_marker { + resume_marker_offset = line_start_offset; + } + let mut harvested: Vec = Vec::new(); + next_event_index = collect_tool_result_events( + &obj, + &mut harvested, + &mut tool_result_counters, + next_event_index, + ); + for ev in harvested { + pending_tool_result_events.push((line_start_offset, ev)); + } + if let Some(record) = build_user_turn_record( + &obj, + last_assistant_message_id.as_deref(), + counter, + ) { + let idx = pending_user_turns.len(); + pending_user_turns.push((line_start_offset, record)); + pending_user_turn_inc_idx = Some(idx); + } + if capture_content { + for c in extract_user_content(&obj) { + pending_user_content.push((line_start_offset, c)); + } + } + } + "system" => { + if obj.get("subtype").and_then(Value::as_str) == Some("compact_boundary") { + let session_id = string_field(&obj, "sessionId").unwrap_or_default(); + let ts = string_field(&obj, "timestamp").unwrap_or_default(); + if !session_id.is_empty() { + let mut ev = CompactionEvent { + v: 1, + source: SourceKind::ClaudeCode, + session_id, + ts, + preceding_message_id: None, + tokens_before_compact: None, + }; + if let Some(ref last) = last_assistant_message_id { + ev.preceding_message_id = Some(last.clone()); + } + events.push((line_start_offset, ev)); + } + } + if let Some(ev) = build_claude_system_tool_result_event( + &obj, + &mut tool_result_counters, + next_event_index, + ) { + pending_tool_result_events.push((line_start_offset, ev)); + next_event_index += 1; + } + } + _ => {} + } + } + + // end_offset = byte position of the earliest in-progress messageId, or + // cursor_offset (= position past the last complete newline) when all + // messages are complete. + let mut earliest_incomplete: Option = None; + for id in &order { + let w = match working.get(id) { + Some(w) => w, + None => continue, + }; + if w.stop_reason.is_none() { + if let Some(off) = message_id_first_offset.get(id) { + if earliest_incomplete.is_none_or(|e| *off < e) { + earliest_incomplete = Some(*off); + } + } + } + } + let end_offset = earliest_incomplete.unwrap_or(cursor_offset); + + // Emit completed turns. In-progress messages (no stop_reason) are deferred + // — `end_offset` already backs up to before their first byte so the next + // call re-reads them. + let mut turns: Vec = Vec::new(); + let mut assistant_pending: Vec<(u64, usize, ContentRecord)> = Vec::new(); + for (i, id) in order.iter().enumerate() { + let w = match working.get(id) { + Some(w) => w, + None => continue, + }; + if w.stop_reason.is_none() { + continue; + } + let tool_calls = extract_tool_calls( + &w.blocks, + &errored_tool_use_ids, + Some(&replacement_meta_by_tool_use_id), + ); + let files_touched = extract_files_touched(&tool_calls); + let subagent = resolve_subagent(w, &nodes_by_uuid, &mut invocation_cache); + + let mut record = TurnRecord { + v: 1, + source: SourceKind::ClaudeCode, + session_id: w.session_id.clone(), + session_path: options.session_path.clone(), + message_id: w.message_id.clone(), + turn_index: i as u64, + ts: w.first_ts.clone(), + model: w.model.clone(), + project: None, + project_key: None, + usage: w.usage.clone(), + tool_calls: tool_calls.clone(), + files_touched: if files_touched.is_empty() { + None + } else { + Some(files_touched) + }, + subagent, + stop_reason: w.stop_reason.clone(), + activity: None, + retries: None, + has_edits: None, + fidelity: Some(build_claude_fidelity(&w.usage_coverage)), + }; + if let Some(ref cwd) = w.cwd { + let resolved = resolve_project(cwd); + record.project = Some(resolved.project); + record.project_key = resolved.project_key; + } + apply_classification(&mut record, w, &user_text_by_message_id, &errored_tool_use_ids); + turns.push(record); + + if capture_content { + let msg_offset = *message_id_first_offset.get(&w.message_id).unwrap_or(&0); + for (sub, r) in extract_assistant_content(w).into_iter().enumerate() { + assistant_pending.push((msg_offset, sub + 1, r)); + } + } + } + + // Filter content by end_offset and interleave by source-line offset. + // appendContent has no row-level dedup, so we MUST drop rows past + // end_offset — the next call will re-read those bytes and re-emit them. + let mut content: Vec = Vec::new(); + if capture_content { + let mut merged: Vec<(u64, usize, ContentRecord)> = Vec::new(); + for (off, rec) in pending_user_content.into_iter() { + if off < end_offset { + merged.push((off, 0, rec)); + } + } + for (off, sub, rec) in assistant_pending.into_iter() { + if off < end_offset { + merged.push((off, sub, rec)); + } + } + merged.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1))); + content = merged.into_iter().map(|(_, _, r)| r).collect(); + } + + let mut emitted_events: Vec = events + .into_iter() + .filter(|(off, _)| *off < end_offset) + .map(|(_, ev)| ev) + .collect(); + annotate_compaction_events(&mut emitted_events, &turns); + + let mut emitted_relationships: Vec = pending_relationships + .into_iter() + .filter(|(off, _)| *off < end_offset) + .map(|(_, r)| r) + .collect(); + collect_subagent_relationships(&turns, &mut emitted_relationships); + if resume_marker_offset < end_offset { + emit_local_continuation_from_resume(&mut emitted_relationships, &evidence); + } + annotate_relationships_with_evidence(&mut emitted_relationships, &evidence); + + let mut emitted_tool_result_events: Vec = pending_tool_result_events + .into_iter() + .filter(|(off, _)| *off < end_offset) + .map(|(_, ev)| ev) + .collect(); + annotate_spawn_events(&mut emitted_tool_result_events, &turns); + + let emitted_user_turns: Vec = pending_user_turns + .into_iter() + .filter(|(off, _)| *off < end_offset) + .map(|(_, ut)| ut) + .collect(); + + Ok(ParseIncrementalResult { + turns, + content, + events: emitted_events, + relationships: emitted_relationships, + tool_result_events: emitted_tool_result_events, + user_turns: emitted_user_turns, + end_offset, + last_user_text: current_user_text, + evidence, + }) +} + // --------------------------------------------------------------------------- // Misc helpers. // --------------------------------------------------------------------------- fn derive_file_session_id(options: &ParseOptions, _path: &Path) -> Option { + derive_file_session_id_from_parts( + options.file_session_id.as_deref(), + options.session_path.as_deref(), + ) +} + +fn derive_file_session_id_from_parts( + file_session_id: Option<&str>, + session_path: Option<&str>, +) -> Option { // Mirrors the TS `deriveFileSessionId`: only honor explicit caller signals // (`fileSessionId` then `sessionPath` basename). Do NOT fall back to the // on-disk path the parser opened — that would canonicalize relationship // rows to the input filename for default-options callers, breaking joins // against the real in-log `sessionId` UUIDs. - if let Some(ref s) = options.file_session_id { + if let Some(s) = file_session_id { if !s.is_empty() { - return Some(s.clone()); + return Some(s.to_string()); } } - if let Some(sp) = options.session_path.as_deref() { + if let Some(sp) = session_path { if !sp.is_empty() { return basename_without_ext(sp, "jsonl"); } @@ -2411,4 +3030,674 @@ mod tests { Some("55555555-5555-5555-5555-555555555555") ); } + + // ----- parseClaudeSessionIncremental conformance ----- + // + // Mirrors `describe('parseClaudeSessionIncremental', ...)` in + // packages/reader/src/claude.test.ts. Each Rust test corresponds to one + // `it()` case; fixture files are read from the shared + // `tests/fixtures/claude/` directory so the TS and Rust suites exercise + // the same input bytes. + + use crate::types::ActivityCategory; + use std::io::Write as _; + + fn read_bytes(p: &std::path::Path) -> Vec { + std::fs::read(p).unwrap() + } + + fn write_bytes(p: &std::path::Path, b: &[u8]) { + let mut f = std::fs::File::create(p).unwrap(); + f.write_all(b).unwrap(); + } + + fn append_str(p: &std::path::Path, s: &str) { + let mut prev = std::fs::read(p).unwrap(); + prev.extend_from_slice(s.as_bytes()); + write_bytes(p, &prev); + } + + /// Returns the byte offset of the line whose JSON contains `needle`. + fn line_start_offset(path: &std::path::Path, needle: &str) -> u64 { + let raw = std::fs::read_to_string(path).unwrap(); + let mut off: u64 = 0; + for line in raw.split_inclusive('\n') { + if line.contains(needle) { + return off; + } + off += line.len() as u64; + } + panic!("needle {:?} not found in {:?}", needle, path); + } + + #[test] + fn incremental_reads_whole_file_from_start() { + let src = fixture("simple-turn.jsonl"); + let raw_len = read_bytes(&src).len() as u64; + let r = + parse_claude_session_incremental(&src, &ParseIncrementalOptions::default()).unwrap(); + assert_eq!(r.turns.len(), 1); + assert_eq!(r.turns[0].message_id, "msg_simple_1"); + assert_eq!(r.end_offset, raw_len); + } + + #[test] + fn incremental_returns_zero_turns_when_start_at_eof() { + let src = fixture("simple-turn.jsonl"); + let raw_len = read_bytes(&src).len() as u64; + let r = parse_claude_session_incremental( + &src, + &ParseIncrementalOptions { + start_offset: Some(raw_len), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(r.turns.len(), 0); + assert_eq!(r.end_offset, raw_len); + } + + #[test] + fn incremental_appended_turn_emitted_on_resume() { + let src = fixture("simple-turn.jsonl"); + let dir = tempfile::tempdir().unwrap(); + let working = dir.path().join("session.jsonl"); + std::fs::copy(&src, &working).unwrap(); + let first = + parse_claude_session_incremental(&working, &ParseIncrementalOptions::default()) + .unwrap(); + assert_eq!(first.turns.len(), 1); + + let appended = serde_json::json!({ + "parentUuid": "u-asst-1", + "isSidechain": false, + "message": { + "model": "claude-sonnet-4-6", + "id": "msg_simple_2", + "type": "message", + "role": "assistant", + "content": [{"type": "text", "text": "and another"}], + "stop_reason": "end_turn", + "usage": { + "input_tokens": 2, + "output_tokens": 1, + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": 0, + "cache_creation": {"ephemeral_5m_input_tokens": 0, "ephemeral_1h_input_tokens": 0} + } + }, + "type": "assistant", + "uuid": "u-asst-2", + "timestamp": "2026-04-20T00:00:05.000Z", + "cwd": "/tmp/project", + "sessionId": "11111111-1111-1111-1111-111111111111", + }); + append_str(&working, &(appended.to_string() + "\n")); + + let second = parse_claude_session_incremental( + &working, + &ParseIncrementalOptions { + start_offset: Some(first.end_offset), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(second.turns.len(), 1); + assert_eq!(second.turns[0].message_id, "msg_simple_2"); + let full_len = read_bytes(&working).len() as u64; + assert_eq!(second.end_offset, full_len); + } + + #[test] + fn incremental_defers_in_progress_trailing_message() { + let src = fixture("incomplete-then-complete.jsonl"); + let inprog_offset = line_start_offset(&src, "\"id\":\"msg_inprog_1\""); + let r = + parse_claude_session_incremental(&src, &ParseIncrementalOptions::default()).unwrap(); + assert_eq!(r.turns.len(), 1, "only the complete message is emitted"); + assert_eq!(r.turns[0].message_id, "msg_done_1"); + assert_eq!( + r.end_offset, inprog_offset, + "endOffset backs up to start of in-progress line" + ); + } + + #[test] + fn incremental_defers_content_for_in_progress_then_emits_after_completion() { + let src = fixture("incomplete-then-complete.jsonl"); + let dir = tempfile::tempdir().unwrap(); + let working = dir.path().join("session.jsonl"); + std::fs::copy(&src, &working).unwrap(); + + let first = parse_claude_session_incremental( + &working, + &ParseIncrementalOptions { + content_mode: Some(ContentStoreMode::Full), + ..Default::default() + }, + ) + .unwrap(); + let asst_first: Vec<&ContentRecord> = first + .content + .iter() + .filter(|c| matches!(c.role, ContentRole::Assistant)) + .collect(); + assert!(asst_first + .iter() + .all(|c| c.message_id == "msg_done_1")); + + let tail = serde_json::json!({ + "parentUuid": "u-asst-1", + "isSidechain": false, + "message": { + "model": "claude-sonnet-4-6", + "id": "msg_inprog_1", + "type": "message", + "role": "assistant", + "content": [{"type": "text", "text": "done now"}], + "stop_reason": "end_turn", + "usage": { + "input_tokens": 7, + "output_tokens": 3, + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": 0, + "cache_creation": {"ephemeral_5m_input_tokens": 0, "ephemeral_1h_input_tokens": 0} + } + }, + "type": "assistant", + "uuid": "u-asst-2", + "timestamp": "2026-04-20T00:00:02.000Z", + "cwd": "/tmp/project", + "sessionId": "33333333-3333-3333-3333-333333333333", + }); + append_str(&working, &(tail.to_string() + "\n")); + + let second = parse_claude_session_incremental( + &working, + &ParseIncrementalOptions { + start_offset: Some(first.end_offset), + content_mode: Some(ContentStoreMode::Full), + ..Default::default() + }, + ) + .unwrap(); + let asst_second: Vec<&ContentRecord> = second + .content + .iter() + .filter(|c| matches!(c.role, ContentRole::Assistant)) + .collect(); + assert!(!asst_second.is_empty()); + assert!(asst_second + .iter() + .all(|c| c.message_id == "msg_inprog_1")); + assert!(asst_second.iter().any(|c| matches!(c.kind, ContentKind::Text) + && c.text.as_deref() == Some("done now"))); + } + + #[test] + fn incremental_defers_assistant_content_after_in_progress_message() { + // msg_done_1 (complete) → msg_inprog_1 (incomplete) → msg_after_1 (complete). + // endOffset must back up to msg_inprog_1, so msg_after_1 content is deferred + // — appendContent has no row dedup so the next pass would otherwise duplicate it. + let dir = tempfile::tempdir().unwrap(); + let working = dir.path().join("session.jsonl"); + let lines = [ + serde_json::json!({ + "parentUuid": null, + "isSidechain": false, + "type": "user", + "message": {"role": "user", "content": "hi"}, + "uuid": "u-user-1", + "timestamp": "2026-04-20T00:00:00.000Z", + "cwd": "/tmp/project", + "sessionId": "sess-dup", + }), + serde_json::json!({ + "parentUuid": "u-user-1", + "isSidechain": false, + "message": { + "model": "claude-sonnet-4-6", + "id": "msg_done_1", + "type": "message", + "role": "assistant", + "content": [{"type": "text", "text": "done"}], + "stop_reason": "end_turn", + "usage": {"input_tokens": 1, "output_tokens": 1, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "cache_creation": {"ephemeral_5m_input_tokens": 0, "ephemeral_1h_input_tokens": 0}}, + }, + "type": "assistant", + "uuid": "u-asst-1", + "timestamp": "2026-04-20T00:00:01.000Z", + "cwd": "/tmp/project", + "sessionId": "sess-dup", + }), + serde_json::json!({ + "parentUuid": "u-asst-1", + "isSidechain": false, + "message": { + "model": "claude-sonnet-4-6", + "id": "msg_inprog_1", + "type": "message", + "role": "assistant", + "content": [{"type": "text", "text": "working..."}], + "stop_reason": null, + "usage": {"input_tokens": 1, "output_tokens": 1, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "cache_creation": {"ephemeral_5m_input_tokens": 0, "ephemeral_1h_input_tokens": 0}}, + }, + "type": "assistant", + "uuid": "u-asst-2", + "timestamp": "2026-04-20T00:00:02.000Z", + "cwd": "/tmp/project", + "sessionId": "sess-dup", + }), + serde_json::json!({ + "parentUuid": "u-asst-2", + "isSidechain": false, + "message": { + "model": "claude-sonnet-4-6", + "id": "msg_after_1", + "type": "message", + "role": "assistant", + "content": [{"type": "text", "text": "after"}], + "stop_reason": "end_turn", + "usage": {"input_tokens": 1, "output_tokens": 1, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "cache_creation": {"ephemeral_5m_input_tokens": 0, "ephemeral_1h_input_tokens": 0}}, + }, + "type": "assistant", + "uuid": "u-asst-3", + "timestamp": "2026-04-20T00:00:03.000Z", + "cwd": "/tmp/project", + "sessionId": "sess-dup", + }), + ]; + let body: String = lines + .iter() + .map(|j| j.to_string()) + .collect::>() + .join("\n") + + "\n"; + write_bytes(&working, body.as_bytes()); + + let r = parse_claude_session_incremental( + &working, + &ParseIncrementalOptions { + content_mode: Some(ContentStoreMode::Full), + ..Default::default() + }, + ) + .unwrap(); + let message_ids: Vec<&str> = r + .content + .iter() + .filter(|c| matches!(c.role, ContentRole::Assistant)) + .map(|c| c.message_id.as_str()) + .collect(); + assert_eq!(message_ids, vec!["msg_done_1"]); + let buf_len = read_bytes(&working).len() as u64; + assert!(r.end_offset < buf_len); + } + + #[test] + fn incremental_skips_incomplete_turn_then_emits_when_completion_arrives() { + let src = fixture("incomplete-then-complete.jsonl"); + let dir = tempfile::tempdir().unwrap(); + let working = dir.path().join("session.jsonl"); + std::fs::copy(&src, &working).unwrap(); + let first = + parse_claude_session_incremental(&working, &ParseIncrementalOptions::default()) + .unwrap(); + assert_eq!(first.turns.len(), 1); + + // Append a completion line for msg_inprog_1 (same id, but stop_reason set). + let tail = serde_json::json!({ + "parentUuid": "u-asst-1", + "isSidechain": false, + "message": { + "model": "claude-sonnet-4-6", + "id": "msg_inprog_1", + "type": "message", + "role": "assistant", + "content": [{"type": "text", "text": "working..."}], + "stop_reason": "end_turn", + "usage": { + "input_tokens": 7, + "output_tokens": 3, + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": 0, + "cache_creation": {"ephemeral_5m_input_tokens": 0, "ephemeral_1h_input_tokens": 0} + } + }, + "type": "assistant", + "uuid": "u-asst-2", + "timestamp": "2026-04-20T00:00:02.000Z", + "cwd": "/tmp/project", + "sessionId": "33333333-3333-3333-3333-333333333333", + }); + append_str(&working, &(tail.to_string() + "\n")); + + let second = parse_claude_session_incremental( + &working, + &ParseIncrementalOptions { + start_offset: Some(first.end_offset), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(second.turns.len(), 1); + assert_eq!(second.turns[0].message_id, "msg_inprog_1"); + assert_eq!(second.turns[0].stop_reason.as_deref(), Some("end_turn")); + } + + #[test] + fn incremental_preserves_user_prompt_across_resume() { + // Regression: when an incomplete assistant message forces endOffset to + // back up past the user prompt, the resumed call re-reads the + // assistant line without seeing the prompt. We carry lastUserText + // forward so the classifier still has keyword context. + let dir = tempfile::tempdir().unwrap(); + let working = dir.path().join("session.jsonl"); + let session_id = "44444444-4444-4444-4444-444444444444"; + let lines = [ + serde_json::json!({ + "parentUuid": null, + "isSidechain": false, + "type": "user", + "message": {"role": "user", "content": "fix the bug in auth.ts"}, + "uuid": "u-user-1", + "timestamp": "2026-04-20T00:00:00.000Z", + "cwd": "/tmp/project", + "sessionId": session_id, + }), + serde_json::json!({ + "parentUuid": "u-user-1", + "isSidechain": false, + "message": { + "model": "claude-sonnet-4-6", + "id": "msg_resume_1", + "type": "message", + "role": "assistant", + "content": [{"type": "tool_use", "id": "tu_edit_1", "name": "Edit", "input": {"file_path": "/auth.ts"}}], + "stop_reason": null, + "usage": {"input_tokens": 1, "output_tokens": 1, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "cache_creation": {"ephemeral_5m_input_tokens": 0, "ephemeral_1h_input_tokens": 0}}, + }, + "type": "assistant", + "uuid": "u-asst-1", + "timestamp": "2026-04-20T00:00:01.000Z", + "cwd": "/tmp/project", + "sessionId": session_id, + }), + ]; + let body: String = lines + .iter() + .map(|j| j.to_string()) + .collect::>() + .join("\n") + + "\n"; + write_bytes(&working, body.as_bytes()); + + let first = + parse_claude_session_incremental(&working, &ParseIncrementalOptions::default()) + .unwrap(); + assert_eq!(first.turns.len(), 0, "incomplete turn is deferred"); + assert_eq!(first.last_user_text, "fix the bug in auth.ts"); + + // Append completion of msg_resume_1. + let tail = serde_json::json!({ + "parentUuid": "u-asst-1", + "isSidechain": false, + "message": { + "model": "claude-sonnet-4-6", + "id": "msg_resume_1", + "type": "message", + "role": "assistant", + "content": [{"type": "tool_use", "id": "tu_edit_1", "name": "Edit", "input": {"file_path": "/auth.ts"}}], + "stop_reason": "end_turn", + "usage": {"input_tokens": 1, "output_tokens": 1, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "cache_creation": {"ephemeral_5m_input_tokens": 0, "ephemeral_1h_input_tokens": 0}}, + }, + "type": "assistant", + "uuid": "u-asst-1", + "timestamp": "2026-04-20T00:00:01.000Z", + "cwd": "/tmp/project", + "sessionId": session_id, + }); + append_str(&working, &(tail.to_string() + "\n")); + + let second = parse_claude_session_incremental( + &working, + &ParseIncrementalOptions { + start_offset: Some(first.end_offset), + last_user_text: Some(first.last_user_text.clone()), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(second.turns.len(), 1); + let t = &second.turns[0]; + assert_eq!(t.message_id, "msg_resume_1"); + assert_eq!( + t.activity, + Some(ActivityCategory::Debugging), + "user prompt mentions 'bug' so edit turn is debugging" + ); + + // Without the seed, the prompt is lost on resume and the classifier + // falls back to coding. + let without_seed = parse_claude_session_incremental( + &working, + &ParseIncrementalOptions { + start_offset: Some(first.end_offset), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!( + without_seed.turns[0].activity, + Some(ActivityCategory::Coding) + ); + } + + #[test] + fn incremental_user_turns_emitted_once_across_resumed_passes() { + let src = fixture("user-turn-blocks.jsonl"); + let full = std::fs::read_to_string(&src).unwrap(); + let dir = tempfile::tempdir().unwrap(); + let working = dir.path().join("session.jsonl"); + + // Pass 1: write only through msg_utb_2 (4 lines: user, asst, user, asst). + let lines: Vec<&str> = full.split('\n').filter(|l| !l.is_empty()).collect(); + let prefix = lines[..4].join("\n") + "\n"; + write_bytes(&working, prefix.as_bytes()); + let first = + parse_claude_session_incremental(&working, &ParseIncrementalOptions::default()) + .unwrap(); + let first_ids: Vec<&str> = first + .user_turns + .iter() + .map(|u| u.user_uuid.as_str()) + .collect(); + assert_eq!(first_ids, vec!["u-user-1", "u-user-2"]); + + // Pass 2: full file. Must emit only u-user-3 (no re-emit of 1/2). + write_bytes(&working, full.as_bytes()); + let second = parse_claude_session_incremental( + &working, + &ParseIncrementalOptions { + start_offset: Some(first.end_offset), + last_user_text: Some(first.last_user_text.clone()), + ..Default::default() + }, + ) + .unwrap(); + let second_ids: Vec<&str> = second + .user_turns + .iter() + .map(|u| u.user_uuid.as_str()) + .collect(); + assert_eq!(second_ids, vec!["u-user-3"]); + let u3 = &second.user_turns[0]; + assert_eq!(u3.preceding_message_id.as_deref(), Some("msg_utb_2")); + assert_eq!(u3.following_message_id.as_deref(), Some("msg_utb_3")); + assert_eq!(u3.blocks[0].is_error, Some(true)); + } + + #[test] + fn incremental_seeds_tool_result_event_counters_from_prescan() { + let dir = tempfile::tempdir().unwrap(); + let working = dir.path().join("session.jsonl"); + let session_id = "66666666-6666-6666-6666-666666666666"; + let user_result = serde_json::json!({ + "parentUuid": null, + "isSidechain": false, + "type": "user", + "message": { + "role": "user", + "content": [{"type": "tool_result", "tool_use_id": "toolu_system", "content": "done"}] + }, + "uuid": "u-result-1", + "timestamp": "2026-04-24T01:00:00.000Z", + "cwd": "/tmp/project", + "sessionId": session_id, + }); + let incomplete_assistant = serde_json::json!({ + "parentUuid": "u-result-1", + "isSidechain": false, + "message": { + "model": "claude-sonnet-4-6", + "id": "msg_waiting", + "type": "message", + "role": "assistant", + "content": [{"type": "text", "text": "waiting"}], + "stop_reason": null, + "usage": {"input_tokens": 1, "output_tokens": 1, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0}, + }, + "type": "assistant", + "uuid": "u-asst-waiting", + "timestamp": "2026-04-24T01:00:01.000Z", + "cwd": "/tmp/project", + "sessionId": session_id, + }); + let system_notification = serde_json::json!({ + "type": "system", + "subtype": "subagent_completed", + "sessionId": session_id, + "timestamp": "2026-04-24T01:00:02.000Z", + "parent_tool_use_id": "toolu_system", + "agent_id": "agent-system-2", + "subagent_session_id": "session-system-child-2", + "status": "completed", + }); + let body = [&user_result, &incomplete_assistant, &system_notification] + .iter() + .map(|j| j.to_string()) + .collect::>() + .join("\n") + + "\n"; + write_bytes(&working, body.as_bytes()); + + let first = + parse_claude_session_incremental(&working, &ParseIncrementalOptions::default()) + .unwrap(); + assert_eq!(first.tool_result_events.len(), 1); + assert_eq!( + first.tool_result_events[0].event_source, + ToolResultEventSource::ToolResult + ); + assert_eq!(first.tool_result_events[0].tool_use_id, "toolu_system"); + assert_eq!(first.tool_result_events[0].call_index, Some(0)); + assert_eq!(first.tool_result_events[0].event_index, 0); + + // Append a completion line for msg_waiting so the deferred system + // notification line gets re-read on the next pass. + let mut complete_assistant = incomplete_assistant.clone(); + complete_assistant["message"]["stop_reason"] = serde_json::Value::from("end_turn"); + let body2 = [ + &user_result, + &incomplete_assistant, + &system_notification, + &complete_assistant, + ] + .iter() + .map(|j| j.to_string()) + .collect::>() + .join("\n") + + "\n"; + write_bytes(&working, body2.as_bytes()); + + let second = parse_claude_session_incremental( + &working, + &ParseIncrementalOptions { + start_offset: Some(first.end_offset), + last_user_text: Some(first.last_user_text.clone()), + ..Default::default() + }, + ) + .unwrap(); + let ev = second + .tool_result_events + .iter() + .find(|e| matches!(e.event_source, ToolResultEventSource::SubagentNotification)) + .expect("resumed pass should emit the deferred system notification"); + assert_eq!(ev.tool_use_id, "toolu_system"); + assert_eq!(ev.call_index, Some(1)); + assert_eq!(ev.event_index, 1); + assert_eq!(ev.agent_id.as_deref(), Some("agent-system-2")); + assert_eq!( + ev.subagent_session_id.as_deref(), + Some("session-system-child-2") + ); + } + + #[test] + fn incremental_resolves_subagent_tree_via_prescan() { + // Pass 1 ingests the main thread + Agent spawn line. Pass 2 starts + // beyond them and must still populate agentId / parentAgentId / + // parentToolUseId on the sidechain turns via the prescan registering + // the prior parentUuid nodes. + let src = fixture("nested-subagent.jsonl"); + let full = std::fs::read_to_string(&src).unwrap(); + let dir = tempfile::tempdir().unwrap(); + let working = dir.path().join("session.jsonl"); + + let lines: Vec<&str> = full.split('\n').filter(|l| !l.is_empty()).collect(); + // Write only through the outer Agent spawn line on pass 1. + let prefix = lines[..2].join("\n") + "\n"; + write_bytes(&working, prefix.as_bytes()); + let first = + parse_claude_session_incremental(&working, &ParseIncrementalOptions::default()) + .unwrap(); + assert!(!first.turns.is_empty()); + + write_bytes(&working, full.as_bytes()); + let second = parse_claude_session_incremental( + &working, + &ParseIncrementalOptions { + start_offset: Some(first.end_offset), + ..Default::default() + }, + ) + .unwrap(); + + let by_id: HashMap<&str, &TurnRecord> = second + .turns + .iter() + .map(|t| (t.message_id.as_str(), t)) + .collect(); + let sub1_1 = by_id + .get("msg_sub1_1") + .expect("outer sidechain turn should be emitted on pass 2"); + let sub2_1 = by_id + .get("msg_sub2_1") + .expect("inner sidechain turn should be emitted on pass 2"); + + let s1 = sub1_1.subagent.as_ref().unwrap(); + assert_eq!(s1.agent_id.as_deref(), Some("u-sub1-user")); + assert_eq!(s1.parent_tool_use_id.as_deref(), Some("toolu_outer")); + assert_eq!(s1.subagent_type.as_deref(), Some("Explore")); + assert_eq!( + s1.parent_agent_id.as_deref(), + Some("55555555-5555-5555-5555-555555555555") + ); + + let s2 = sub2_1.subagent.as_ref().unwrap(); + assert_eq!(s2.agent_id.as_deref(), Some("u-sub2-user")); + assert_eq!(s2.parent_agent_id.as_deref(), Some("u-sub1-user")); + assert_eq!(s2.parent_tool_use_id.as_deref(), Some("toolu_inner")); + } } diff --git a/crates/relayburn-reader/src/lib.rs b/crates/relayburn-reader/src/lib.rs index a88d2129..9636db11 100644 --- a/crates/relayburn-reader/src/lib.rs +++ b/crates/relayburn-reader/src/lib.rs @@ -3,11 +3,10 @@ //! This crate is a work-in-progress port of the TS reader package. Foundational //! modules (`types`, `hash`, `fidelity`, `git`, `classifier`, `user_turn`) are //! ported with native conformance tests; the `codex` parser is ported (#256); -//! the Claude Code parser (`claude`) is in progress under #255 — synchronous -//! `parse_claude_session` and the cross-file reconciler are landed, the -//! incremental entry point is not yet ported. The remaining per-harness -//! parsers (`opencode`, `opencode_stream`) are scaffolded but not yet -//! implemented — see #257 / #258. +//! the Claude Code parser (`claude`) covers the synchronous, incremental, and +//! cross-file reconciliation surface (#255). The remaining per-harness parsers +//! (`opencode`, `opencode_stream`) are scaffolded but not yet implemented — +//! see #257 / #258. pub mod classifier; pub mod fidelity; @@ -49,7 +48,10 @@ pub use user_turn::{ TokenCounter, UserTurnTokenizer, }; pub use claude::{ - parse_claude_session, parse_claude_session_with_counter, reconcile_claude_session_relationships, - ClaudeRelationshipEvidence, ParseOptions as ClaudeParseOptions, + parse_claude_session, parse_claude_session_incremental, + parse_claude_session_incremental_with_counter, parse_claude_session_with_counter, + reconcile_claude_session_relationships, ClaudeRelationshipEvidence, + ParseIncrementalOptions as ClaudeParseIncrementalOptions, + ParseIncrementalResult as ClaudeParseIncrementalResult, ParseOptions as ClaudeParseOptions, ParseResult as ClaudeParseResult, ReconcileClaudeRelationshipsInput, };