diff --git a/CHANGELOG.md b/CHANGELOG.md index d344adba..c6b12e0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ Cross-package release notes for relayburn. Package changelogs contain package-le - `relayburn-cli`: `burn sessions list` human output now keeps full session ids, shows a single human-readable last-seen date column, and truncates long project paths from the beginning. +- `relayburn-sdk` / `relayburn-cli`: `burn ingest --watch` now wakes on + filesystem events (with burst coalescing and a 30s polling backstop), + reducing steady-state polling; pass `--no-fsevents` to force polling. ## [2.7.0] - 2026-05-09 diff --git a/Cargo.lock b/Cargo.lock index 194652b6..6f585b47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -404,6 +404,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures-core" version = "0.3.32" @@ -527,6 +536,26 @@ dependencies = [ "web-time", ] +[[package]] +name = "inotify" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd5b3eaf1a28b758ac0faa5a4254e8ab2705605496f1b1f3fbbc3988ad73d199" +dependencies = [ + "bitflags", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -551,6 +580,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7b65860415f949f23fa882e669f2dbd4a0f0eeb1acdd56790b30494afd7da2f" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -639,6 +688,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.61.2", ] @@ -709,6 +759,33 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" +[[package]] +name = "notify" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" +dependencies = [ + "bitflags", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.60.2", +] + +[[package]] +name = "notify-types" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" +dependencies = [ + "bitflags", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -991,6 +1068,7 @@ dependencies = [ "indexmap", "libc", "memchr", + "notify", "phf", "regex", "rusqlite", @@ -1050,6 +1128,15 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1432,6 +1519,16 @@ dependencies = [ "libc", ] +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -1561,6 +1658,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -1579,7 +1685,16 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", ] [[package]] @@ -1597,14 +1712,31 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm 0.52.6", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", ] [[package]] @@ -1613,48 +1745,96 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/crates/relayburn-cli/src/cli.rs b/crates/relayburn-cli/src/cli.rs index 37c8b981..15571939 100644 --- a/crates/relayburn-cli/src/cli.rs +++ b/crates/relayburn-cli/src/cli.rs @@ -156,6 +156,13 @@ pub struct IngestArgs { /// on its own (or with `--watch`) so a typo can't silently no-op. #[arg(long, requires = "hook")] pub quiet: bool, + + /// Force the polling driver in `--watch` mode instead of the + /// default `notify`-backed FS-event driver. Use this on + /// filesystems where FS events are unreliable (network mounts, + /// some Docker setups). Ignored without `--watch`. + #[arg(long, requires = "watch")] + pub no_fsevents: bool, } /// Per-command flags for `burn mcp-server`. The stdio MCP server speaks diff --git a/crates/relayburn-cli/src/commands/ingest.rs b/crates/relayburn-cli/src/commands/ingest.rs index f6966750..f4074e55 100644 --- a/crates/relayburn-cli/src/commands/ingest.rs +++ b/crates/relayburn-cli/src/commands/ingest.rs @@ -37,8 +37,8 @@ use std::sync::Arc; use std::time::Duration; use relayburn_sdk::{ - ingest_all, start_watch_loop, IngestReport, Ledger, LedgerHandle, LedgerOpenOptions, - StartWatchLoopOptions, + default_session_roots, ingest_all, start_watch_loop, IngestReport, IngestRoots, Ledger, + LedgerHandle, LedgerOpenOptions, StartWatchLoopOptions, }; use crate::cli::{GlobalArgs, IngestArgs}; @@ -158,12 +158,27 @@ fn run_watch(globals: &GlobalArgs, args: &IngestArgs) -> i32 { }; let quiet = args.quiet; - let watch_message = format!("watching every {interval_ms}ms; Ctrl-C to stop"); + let no_fsevents = args.no_fsevents; + // The FS-event driver may silently demote to polling at startup + // (no watchable path yet) or mid-run (notify channel closes), so + // the banner doesn't assert which driver is live — only that the + // user opted out of the FS-event attempt. + let watch_message = if no_fsevents { + format!("watching (polling every {interval_ms}ms); Ctrl-C to stop") + } else { + "watching (FS events with polling fallback); Ctrl-C to stop".to_string() + }; if !quiet { if progress.is_visible() { progress.set_task(watch_message.clone()); + } else if no_fsevents { + eprintln!( + "[burn] ingest: foreground ingest polling every {interval_ms}ms; Ctrl-C to stop", + ); } else { - eprintln!("[burn] ingest: foreground ingest every {interval_ms}ms; Ctrl-C to stop",); + eprintln!( + "[burn] ingest: foreground ingest on FS events (polling fallback); Ctrl-C to stop", + ); } } @@ -213,9 +228,18 @@ fn run_watch(globals: &GlobalArgs, args: &IngestArgs) -> i32 { }); }); + // Default to the `notify`-backed FS-event driver against the + // three session-store roots ingest scans. Falls back to polling + // automatically when no path exists yet (fresh install) or + // when the user passes `--no-fsevents`. The slow polling + // backstop in the SDK keeps progress on filesystems where FS + // events are unreliable. Closes #250. + let watch_paths = default_session_roots(&IngestRoots::default()); let opts = StartWatchLoopOptions::new(ingest_fn) .with_interval(Duration::from_millis(interval_ms)) .with_immediate(true) + .with_watch_paths(watch_paths) + .with_disable_fsevents(no_fsevents) .with_on_report(on_report) .with_on_error(on_error); let controller = start_watch_loop(opts); diff --git a/crates/relayburn-cli/src/harnesses/pending_stamp.rs b/crates/relayburn-cli/src/harnesses/pending_stamp.rs index ea04e0b7..506c98fe 100644 --- a/crates/relayburn-cli/src/harnesses/pending_stamp.rs +++ b/crates/relayburn-cli/src/harnesses/pending_stamp.rs @@ -232,9 +232,16 @@ impl HarnessAdapter for PendingStampAdapterImpl { // child has barely started; let the periodic interval drive the // first scan so we don't spawn an ingest pass that races the // freshly-written pending stamp. + // + // Watch the per-harness session root with `notify` so the loop + // wakes on session writes instead of polling every second + // (#250). `watch_interval` becomes the polling fallback cadence + // when `notify` cannot attach (fresh install with no session + // dir yet, network mount, etc.). let opts = StartWatchLoopOptions::new(self.ingest_fn(ctx.ledger_home.clone())) .with_immediate(false) .with_interval(self.watch_interval) + .with_watch_paths(vec![(self.session_root)()]) .with_on_report(on_report); Some(WatcherController::new(start_watch_loop(opts))) } diff --git a/crates/relayburn-sdk/Cargo.toml b/crates/relayburn-sdk/Cargo.toml index ce87932b..aa2cfb97 100644 --- a/crates/relayburn-sdk/Cargo.toml +++ b/crates/relayburn-sdk/Cargo.toml @@ -52,6 +52,13 @@ time = { version = "0.3", default-features = false, features = ["formatting", "p # matching the TS `Object.values` / `Object.entries` iteration semantics. indexmap = { version = "2", features = ["serde"] } +# ingest: filesystem event watcher for the watch loop. Drives the +# `notify`-backed driver in `ingest/watch_loop.rs` (FSEvents on macOS, +# inotify on Linux, ReadDirectoryChangesW on Windows) with a slow +# polling fallback when FS events are unavailable (network filesystems, +# unsupported targets, or `--no-fsevents`). Closes #250. +notify = { version = "8", default-features = false, features = ["macos_fsevent"] } + [target.'cfg(unix)'.dependencies] # ingest: pid-liveness probe in the pending-stamp resolver libc = "0.2" diff --git a/crates/relayburn-sdk/src/ingest.rs b/crates/relayburn-sdk/src/ingest.rs index e45dace9..d5b012cd 100644 --- a/crates/relayburn-sdk/src/ingest.rs +++ b/crates/relayburn-sdk/src/ingest.rs @@ -25,6 +25,7 @@ #![allow(dead_code, unused_imports)] pub mod cursors; +pub(crate) mod fs_events; pub mod gap; pub mod ingest; pub mod pending_stamps; @@ -72,22 +73,22 @@ pub use gap::{ // tests and to the `test-utils` feature for downstream integration // tests; deliberately NOT part of the default SDK surface so embedders // can't hijack the global gap-warning writer for the whole process. +pub use crate::reader::ContentStoreMode; #[cfg(any(test, feature = "test-utils"))] pub use gap::{restore_ingest_gap_writer, set_ingest_gap_writer}; pub use ingest::{ - ingest_all, ingest_claude_projects, ingest_claude_session, ingest_codex_sessions, - ingest_opencode_sessions, IngestOptions, IngestReport, IngestRoots, + default_session_roots, ingest_all, ingest_claude_projects, ingest_claude_session, + ingest_codex_sessions, ingest_opencode_sessions, IngestOptions, IngestReport, IngestRoots, }; -pub use crate::reader::ContentStoreMode; -pub use reingest::{derive_codex_session_id, reingest_missing_content, ReingestContentReport}; pub use pending_stamps::{ cleanup_stale_pending_stamps, cleanup_stale_pending_stamps_at, pending_stamps_dir, resolve_pending_stamps_for_session, write_pending_stamp, PendingStamp, PendingStampCleanupResult, PendingStampHarness, PendingStampResolveResult, PendingStampSessionCandidate, PendingStampWriteResult, WriteOptions, PENDING_STAMP_TTL_MS, }; +pub use reingest::{derive_codex_session_id, reingest_missing_content, ReingestContentReport}; pub use walk::{walk_jsonl, walk_opencode_sessions}; pub use watch_loop::{ run_ingest_tick, start_watch_loop, ErrorSink, IngestFn, ReportSink, StartWatchLoopOptions, - WatchController, + WatchController, DEFAULT_FS_DEBOUNCE, DEFAULT_SLOW_FALLBACK, }; diff --git a/crates/relayburn-sdk/src/ingest/fs_events.rs b/crates/relayburn-sdk/src/ingest/fs_events.rs new file mode 100644 index 00000000..f5186689 --- /dev/null +++ b/crates/relayburn-sdk/src/ingest/fs_events.rs @@ -0,0 +1,213 @@ +//! Filesystem-event driver for the ingest watch loop. +//! +//! Wraps `notify::recommended_watcher` so the watch loop can wake on +//! actual session-store writes (FSEvents / inotify / RDCW) instead of a +//! 1s polling tick. The async wakeup surface is a single-permit +//! [`tokio::sync::Notify`]: the notify callback runs on its own OS +//! thread and calls `notify_one` per relevant event, which collapses +//! into a single pending permit regardless of event rate. That keeps +//! memory bounded under noisy roots — N events allocate O(1), not +//! O(N). +//! +//! [`FsBurst::wait_for_burst`] consumes that permit, then sleeps for +//! `debounce` to coalesce further events landing inside the window. +//! Crucially the burst future *always* returns after at most one +//! debounce window: under sustained writes the loop emits a steady +//! ~`debounce` cadence rather than waiting for a quiet period that +//! never arrives. The slow polling backstop in the watch loop only +//! kicks in when the FS-event channel goes silent. +//! +//! The watcher is best-effort: paths that don't exist are skipped, and +//! a complete failure to attach to any path returns `Err`. Callers fall +//! back to the polling driver in that case (network filesystems, Docker +//! mounts without inotify, `--no-fsevents`). + +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use notify::event::EventKind; +use notify::{RecommendedWatcher, RecursiveMode, Watcher}; +use tokio::sync::Notify; + +/// Active filesystem watcher backed by `notify`. The held +/// [`RecommendedWatcher`] keeps the OS-level watch alive; dropping the +/// struct stops the watcher. The notify callback retains an +/// `Arc`, so events posted while no consumer is awaiting still +/// store a single pending permit that the next `wait_for_burst` call +/// observes. +pub(crate) struct FsBurst { + _watcher: RecommendedWatcher, + pending: Arc, +} + +impl FsBurst { + /// Attach to every existing path in `paths` and return a burst + /// receiver. Returns `Err` when no path could be watched — callers + /// should fall back to the polling driver in that case. + /// + /// `Recursive` mode is required: ingest cares about new files + /// landing inside `~/.claude/projects//` etc., not about + /// the project root itself. + pub fn new(paths: &[PathBuf]) -> anyhow::Result { + let pending = Arc::new(Notify::new()); + let mut watcher = build_watcher(pending.clone())?; + let mut watched_any = false; + for p in paths { + if !p.exists() { + continue; + } + if watcher.watch(p, RecursiveMode::Recursive).is_ok() { + watched_any = true; + } + } + if !watched_any { + anyhow::bail!("no watchable session-store paths"); + } + Ok(Self { + _watcher: watcher, + pending, + }) + } + + /// Wait for the next FS event, then sleep `debounce` to coalesce + /// further events that land inside the window. Always returns + /// `Some(())` once the window elapses. + /// + /// Cadence: under bursty writes, N events fired during the debounce + /// window collapse into a single tick (the goal). Under *sustained* + /// writes, this returns once per `debounce` because we deliberately + /// don't extend the window past its first interval — extending + /// would let a continuous write stream starve the ingest loop and + /// demote it to the 30s slow polling backstop. + /// + /// The notify slot is single-bit, so memory is O(1) regardless of + /// event rate. Lost cancellation is also fine: if the future is + /// dropped between the wake and the sleep, any subsequent event + /// re-stores the permit and the next call observes it. + pub async fn wait_for_burst(&mut self, debounce: Duration) -> Option<()> { + // `Notified::enable` (tokio 1.13+) latches the waker on + // creation so a permit posted between this future being + // constructed and being polled isn't lost. Without enable, a + // notify_one between the previous return and this re-park + // could fall on the floor. + let notified = self.pending.notified(); + tokio::pin!(notified); + notified.as_mut().enable(); + notified.await; + // Coalescing window. Events landing inside this sleep set the + // pending permit (single-bit) and the next call to + // `wait_for_burst` observes it immediately — that's how + // sustained writes get a steady ~debounce-cadence tick stream + // instead of waiting for a quiet period. + tokio::time::sleep(debounce).await; + Some(()) + } +} + +fn build_watcher(pending: Arc) -> anyhow::Result { + let watcher = notify::recommended_watcher(move |res: notify::Result| { + let Ok(event) = res else { + return; + }; + // Pure metadata churn (atime updates from a `cat`, attribute + // changes) doesn't change the JSONL content the ingest reads. + // Filtering at the source keeps wakeups honest under backups + // / antivirus scans. + if matches!( + event.kind, + EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) + ) { + // `notify_one` collapses multiple calls into a single + // pending permit — bounded memory, bounded wakeups even + // under noisy roots that fire thousands of events per + // second. + pending.notify_one(); + } + })?; + Ok(watcher) +} + +#[cfg(test)] +mod tests { + use std::fs; + + use super::*; + + /// Best-effort smoke test: write a file inside a watched temp dir + /// and confirm the burst receiver wakes. + /// + /// Marked `#[ignore]` because FS-event delivery latency varies by + /// platform and CI sandbox; the test is informational under + /// `cargo test -- --ignored`. The protocol-level guarantees the + /// watch loop relies on are exercised by the polling-fallback path + /// in `watch_loop_tests.rs`. + #[tokio::test] + #[ignore] + async fn fs_burst_wakes_on_write() { + let dir = tempfile::tempdir().unwrap(); + let mut burst = FsBurst::new(&[dir.path().to_path_buf()]).unwrap(); + let path = dir.path().join("session.jsonl"); + let writer = std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(50)); + fs::write(&path, b"{}\n").unwrap(); + }); + let woke = tokio::time::timeout( + Duration::from_secs(2), + burst.wait_for_burst(Duration::from_millis(50)), + ) + .await; + writer.join().unwrap(); + assert!(matches!(woke, Ok(Some(())))); + } + + /// `FsBurst::new` returns `Err` when none of the supplied paths + /// exist. Watch loop relies on this to fall back to polling. + #[tokio::test] + async fn fs_burst_errors_when_no_paths_exist() { + let result = FsBurst::new(&[PathBuf::from("/nonexistent/relayburn/test/path")]); + assert!(result.is_err()); + } + + /// Sustained writes must produce a steady tick cadence rather + /// than starve the loop. Verifies the fix for the Codex review + /// comment on #410: an earlier "wait for quiet" implementation + /// would never return under continuous events, demoting watch + /// mode to the 30s slow fallback. + /// + /// Also marked `#[ignore]` because it depends on real FS-event + /// delivery — same caveat as `fs_burst_wakes_on_write`. + #[tokio::test] + #[ignore] + async fn fs_burst_emits_under_sustained_writes() { + let dir = tempfile::tempdir().unwrap(); + let mut burst = FsBurst::new(&[dir.path().to_path_buf()]).unwrap(); + let dir_path = dir.path().to_path_buf(); + let stop = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let stop_for_writer = stop.clone(); + let writer = std::thread::spawn(move || { + let mut i = 0u64; + while !stop_for_writer.load(std::sync::atomic::Ordering::SeqCst) { + let _ = fs::write(dir_path.join(format!("s-{i}.jsonl")), b"{}\n"); + i += 1; + std::thread::sleep(Duration::from_millis(5)); + } + }); + + // Three back-to-back wake cycles must each return inside + // ~debounce + slack. If the burst future waited for a quiet + // period, the second call would hang indefinitely. + let debounce = Duration::from_millis(50); + for _ in 0..3 { + let result = tokio::time::timeout( + Duration::from_millis(500), + burst.wait_for_burst(debounce), + ) + .await; + assert!(matches!(result, Ok(Some(())))); + } + + stop.store(true, std::sync::atomic::Ordering::SeqCst); + writer.join().unwrap(); + } +} diff --git a/crates/relayburn-sdk/src/ingest/ingest.rs b/crates/relayburn-sdk/src/ingest/ingest.rs index c1c4ddc7..27a0dca2 100644 --- a/crates/relayburn-sdk/src/ingest/ingest.rs +++ b/crates/relayburn-sdk/src/ingest/ingest.rs @@ -24,26 +24,26 @@ use std::time::SystemTime; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::ledger::{Ledger, load_config}; +use crate::ledger::{load_config, Ledger}; use crate::reader::{ + parse_claude_session, parse_claude_session_incremental, parse_codex_session_incremental, + parse_opencode_session_incremental, reconcile_claude_session_relationships, ClaudeParseIncrementalOptions, ClaudeParseOptions, CodexLastCompletedTurn, CodexResumeState, CodexTurnContext, ContentStoreMode, CumulativeUsage as ReaderCumulativeUsage, ParseCodexIncrementalOptions, ParseOpencodeIncrementalOptions, PersistedUserTurnSlot, - ReconcileClaudeRelationshipsInput, parse_claude_session, parse_claude_session_incremental, - parse_codex_session_incremental, parse_opencode_session_incremental, - reconcile_claude_session_relationships, + ReconcileClaudeRelationshipsInput, }; use crate::ingest::cursors::{ - ClaudeCursor, CodexCumulative, CodexCursor, Cursors, FileCursor, OpencodeCursor, load_cursors, - save_cursors_if_changed, + load_cursors, save_cursors_if_changed, ClaudeCursor, CodexCumulative, CodexCursor, Cursors, + FileCursor, OpencodeCursor, }; use crate::ingest::gap::{ - AdapterName, count_new_tool_calls, count_new_tool_results, emit_gap_warning, record_session_gap, + count_new_tool_calls, count_new_tool_results, emit_gap_warning, record_session_gap, AdapterName, }; use crate::ingest::pending_stamps::{ - PendingStampHarness, PendingStampSessionCandidate, cleanup_stale_pending_stamps_in, - resolve_pending_stamps_for_session_in, + cleanup_stale_pending_stamps_in, resolve_pending_stamps_for_session_in, PendingStampHarness, + PendingStampSessionCandidate, }; use crate::ingest::reingest::derive_codex_session_id; use crate::ingest::walk::{walk_jsonl, walk_opencode_sessions}; @@ -139,6 +139,24 @@ pub(crate) fn opencode_session_root(roots: &IngestRoots) -> PathBuf { opencode_storage_dir(roots).join("session") } +/// Resolve the default session-store roots ingest scans, in the same +/// order `ingest_all` walks them. Used by the watch loop to drive its +/// `notify`-backed FS-event driver against the harness home dirs the +/// SDK already owns. Test injection: pass an explicit +/// [`IngestRoots`] to override individual paths; defaults still come +/// from `$HOME` for fields left `None`. +/// +/// Returns the Claude / Codex / OpenCode roots in that order — the +/// caller doesn't have to filter for existence; the FS-event driver +/// silently skips any path that doesn't yet exist. +pub fn default_session_roots(roots: &IngestRoots) -> Vec { + vec![ + claude_projects_dir(roots), + codex_sessions_dir(roots), + opencode_storage_dir(roots), + ] +} + pub(crate) fn opencode_message_root(roots: &IngestRoots) -> PathBuf { opencode_storage_dir(roots).join("message") } diff --git a/crates/relayburn-sdk/src/ingest/watch_loop.rs b/crates/relayburn-sdk/src/ingest/watch_loop.rs index f4e8b5ba..805ab370 100644 --- a/crates/relayburn-sdk/src/ingest/watch_loop.rs +++ b/crates/relayburn-sdk/src/ingest/watch_loop.rs @@ -1,10 +1,23 @@ -//! Watch loop — Rust port of `packages/ingest/src/watch-loop.ts`. +//! Watch loop — Rust port of `packages/ingest/src/watch-loop.ts`, +//! upgraded with a `notify`-backed FS-event driver per #250. //! //! Drives a periodic `ingest` callable, drains the report through an -//! optional `on_report` sink, and routes errors through `on_error`. The TS -//! adapter uses `setInterval` + a `running` guard to prevent overlapping -//! ticks; the Rust port uses `tokio::time::interval` and a `Mutex` over an -//! in-flight future, with the same skip-if-running invariant. +//! optional `on_report` sink, and routes errors through `on_error`. Two +//! drivers wake the loop: +//! +//! * **FS events** (preferred): `notify::recommended_watcher` watches the +//! session-store roots passed in [`StartWatchLoopOptions::watch_paths`] +//! and wakes the loop on writes. Bursts are coalesced via the debounce +//! window so 100 inotify events from a single tool dump produce one +//! ingest tick, not 100. A slow polling backstop +//! ([`StartWatchLoopOptions::slow_fallback_interval`], default 30s) +//! covers the platforms where `notify` reports unsupported events +//! silently (network filesystems, some Docker setups). +//! * **Pure polling** (fallback): when no `watch_paths` are supplied, +//! when [`StartWatchLoopOptions::disable_fsevents`] is set, or when +//! `notify` cannot attach to any path, the loop falls back to the +//! original `tokio::time::interval` cadence at +//! [`StartWatchLoopOptions::interval`]. //! //! Concurrency model: //! @@ -18,6 +31,7 @@ //! landed before they tear down state. use std::future::Future; +use std::path::PathBuf; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -26,6 +40,7 @@ use std::time::Duration; use tokio::sync::{Mutex, Notify}; use tokio::task::JoinHandle; +use crate::ingest::fs_events::FsBurst; use crate::ingest::ingest::IngestReport; pub type IngestFn = Arc< @@ -35,6 +50,19 @@ pub type IngestFn = Arc< pub type ReportSink = Arc; pub type ErrorSink = Arc; +/// Default debounce window for the FS-event driver. 200ms is short +/// enough that an interactive Claude / Codex pause feels live and long +/// enough to coalesce the inotify burst from a single tool result +/// dumping a multi-line transcript update. Tuned alongside the burst +/// test in `watch_loop_tests::burst_writes_coalesce_into_one_tick`. +pub const DEFAULT_FS_DEBOUNCE: Duration = Duration::from_millis(200); + +/// Default slow polling backstop when the FS-event driver is active. +/// Covers `notify` silently reporting "no events" on filesystems where +/// FSEvents / inotify are unreliable (network mounts, some Docker +/// setups). 30s matches the issue #250 acceptance shape. +pub const DEFAULT_SLOW_FALLBACK: Duration = Duration::from_secs(30); + #[derive(Clone)] pub struct StartWatchLoopOptions { pub interval: Duration, @@ -42,12 +70,33 @@ pub struct StartWatchLoopOptions { pub ingest: IngestFn, pub on_report: Option, pub on_error: Option, + /// Session-store roots to monitor with `notify`. Empty disables + /// the FS-event driver (the loop polls at `interval`). + pub watch_paths: Vec, + /// Coalescing window for bursty FS events. After the first event + /// wakes the loop, further events landing within this window roll + /// into the same tick. + pub debounce: Duration, + /// Slow polling cadence used as a backstop *while the FS-event + /// driver is active*. When the driver is inactive (no watch paths, + /// `disable_fsevents`, or notify couldn't attach), the loop uses + /// `interval` instead. + pub slow_fallback_interval: Duration, + /// Force the polling driver even when `watch_paths` is non-empty. + /// Surfaced to the CLI as `burn ingest --watch --no-fsevents` so a + /// user on a filesystem where `notify` misbehaves can opt out. + pub disable_fsevents: bool, } impl StartWatchLoopOptions { /// Build options around `ingest`. Defaults: 1000ms interval, immediate /// first tick, stderr error sink, no report sink. Mirrors the TS /// defaults so existing CLI wrappers keep their behavior on port. + /// Defaults: 1000ms polling fallback interval, immediate first tick, + /// FS-event driver enabled (but inert until `with_watch_paths` is + /// called), 30s slow polling backstop, 200ms burst debounce. + /// Mirrors the TS defaults so existing CLI wrappers keep their + /// behavior on port. pub fn new(ingest: IngestFn) -> Self { Self { interval: Duration::from_millis(1000), @@ -55,6 +104,10 @@ impl StartWatchLoopOptions { ingest, on_report: None, on_error: None, + watch_paths: Vec::new(), + debounce: DEFAULT_FS_DEBOUNCE, + slow_fallback_interval: DEFAULT_SLOW_FALLBACK, + disable_fsevents: false, } } @@ -77,6 +130,30 @@ impl StartWatchLoopOptions { self.on_error = Some(sink); self } + + /// Enable the FS-event driver against the given session-store + /// roots. Pass the harness-default paths from + /// [`crate::default_session_roots`] for the CLI wide-scan, or a + /// single-harness root for adapter watchers. + pub fn with_watch_paths(mut self, paths: Vec) -> Self { + self.watch_paths = paths; + self + } + + pub fn with_debounce(mut self, debounce: Duration) -> Self { + self.debounce = debounce; + self + } + + pub fn with_slow_fallback_interval(mut self, interval: Duration) -> Self { + self.slow_fallback_interval = interval; + self + } + + pub fn with_disable_fsevents(mut self, disable: bool) -> Self { + self.disable_fsevents = disable; + self + } } /// Controller returned by [`start_watch_loop`]. Drop alone won't cancel the @@ -202,9 +279,18 @@ where ingest().await } -/// Spawn a background ticker that calls `opts.ingest` every `opts.interval`, -/// skipping ticks while one is in flight. Returns a [`WatchController`] the -/// caller uses to invoke an extra tick on demand or stop the loop. +/// Spawn a background ticker that calls `opts.ingest` whenever the +/// active driver fires, skipping ticks while one is in flight. Returns +/// a [`WatchController`] the caller uses to invoke an extra tick on +/// demand or stop the loop. +/// +/// Driver selection (see module docs for the full rationale): +/// +/// * If `watch_paths` is non-empty, `disable_fsevents` is false, and +/// `notify` can attach to at least one path → FS-event driver with a +/// slow polling backstop at `slow_fallback_interval`. +/// * Otherwise → polling driver at `interval`, matching the legacy +/// 1.x `setInterval` cadence. pub fn start_watch_loop(opts: StartWatchLoopOptions) -> WatchController { let inner = Arc::new(WatchInner { stopped: AtomicBool::new(false), @@ -217,38 +303,33 @@ pub fn start_watch_loop(opts: StartWatchLoopOptions) -> WatchController { }); let interval = opts.interval; let immediate = opts.immediate; + let watch_paths = opts.watch_paths; + let debounce = opts.debounce; + let slow_fallback = opts.slow_fallback_interval; + let disable_fsevents = opts.disable_fsevents; let ticker = inner.clone(); let handle = tokio::spawn(async move { + // Try to bring up the FS-event driver. Failure (no path exists, + // notify backend errors) silently demotes us to the polling + // driver — that's the slow-fallback acceptance criterion from + // #250. + let burst = if !disable_fsevents && !watch_paths.is_empty() { + FsBurst::new(&watch_paths).ok() + } else { + None + }; + if immediate { ticker.run_tick_skip_if_busy().await; } - // Schedule the periodic ticker to first fire `interval` from now. - // `tokio::time::interval` fires immediately on the first `tick()`, - // so for the immediate path we'd want to skip that first tick; - // for the non-immediate path we'd want to wait `interval` before - // the first periodic run. `interval_at(now + interval, …)` covers - // both: the next tick lands `interval` after start in either case. - let start_at = tokio::time::Instant::now() + interval; - let mut iv = tokio::time::interval_at(start_at, interval); - // Default `MissedTickBehavior::Burst` would fire catch-up ticks - // back-to-back after a slow ingest pass, which can spike CPU/IO - // exactly when the system is already under load. `Delay` schedules - // the next tick `interval` after the previous fires, preserving - // stable polling cadence — closer to TS `setInterval` pacing under - // a single-threaded runner. - iv.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - loop { - if ticker.stopped.load(Ordering::SeqCst) { - break; - } - tokio::select! { - _ = iv.tick() => {} - _ = ticker.stop_signal.notified() => break, + + match burst { + Some(mut burst) => { + run_fs_event_driver(&ticker, &mut burst, debounce, slow_fallback).await; } - if ticker.stopped.load(Ordering::SeqCst) { - break; + None => { + run_polling_driver(&ticker, interval).await; } - ticker.run_tick_skip_if_busy().await; } }); WatchController { @@ -257,6 +338,73 @@ pub fn start_watch_loop(opts: StartWatchLoopOptions) -> WatchController { } } +/// Pure-polling driver — matches the pre-#250 behaviour exactly. Used +/// when no `watch_paths` are configured, when `disable_fsevents` is +/// set, or when `FsBurst` couldn't attach (network mount, etc.). +async fn run_polling_driver(ticker: &Arc, interval: Duration) { + // Schedule the periodic ticker to first fire `interval` from now. + // `tokio::time::interval` fires immediately on the first `tick()`, + // so for the immediate path we'd want to skip that first tick; + // for the non-immediate path we'd want to wait `interval` before + // the first periodic run. `interval_at(now + interval, …)` covers + // both: the next tick lands `interval` after start in either case. + let start_at = tokio::time::Instant::now() + interval; + let mut iv = tokio::time::interval_at(start_at, interval); + // Default `MissedTickBehavior::Burst` would fire catch-up ticks + // back-to-back after a slow ingest pass, which can spike CPU/IO + // exactly when the system is already under load. `Delay` schedules + // the next tick `interval` after the previous fires, preserving + // stable polling cadence — closer to TS `setInterval` pacing under + // a single-threaded runner. + iv.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { + if ticker.stopped.load(Ordering::SeqCst) { + break; + } + tokio::select! { + _ = iv.tick() => {} + _ = ticker.stop_signal.notified() => break, + } + if ticker.stopped.load(Ordering::SeqCst) { + break; + } + ticker.run_tick_skip_if_busy().await; + } +} + +/// FS-event driver — sleep until either the burst receiver fires (and +/// the burst window settles) or the slow polling backstop ticks. +async fn run_fs_event_driver( + ticker: &Arc, + burst: &mut FsBurst, + debounce: Duration, + slow_fallback: Duration, +) { + let start_at = tokio::time::Instant::now() + slow_fallback; + let mut slow = tokio::time::interval_at(start_at, slow_fallback); + slow.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { + if ticker.stopped.load(Ordering::SeqCst) { + break; + } + tokio::select! { + // `wait_for_burst` always resolves to `Some(())` once its + // debounce window elapses; under sustained writes that's + // every ~debounce, under bursts it coalesces. We don't + // pattern-match the Option because the Notify-backed + // channel can't close without dropping `_watcher`, which + // would have already torn down this task. + _ = burst.wait_for_burst(debounce) => {} + _ = slow.tick() => {} + _ = ticker.stop_signal.notified() => break, + } + if ticker.stopped.load(Ordering::SeqCst) { + break; + } + ticker.run_tick_skip_if_busy().await; + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/relayburn-sdk/src/ingest/watch_loop_tests.rs b/crates/relayburn-sdk/src/ingest/watch_loop_tests.rs index 05df0a03..a0e4a3a6 100644 --- a/crates/relayburn-sdk/src/ingest/watch_loop_tests.rs +++ b/crates/relayburn-sdk/src/ingest/watch_loop_tests.rs @@ -160,3 +160,140 @@ async fn stop_awaits_in_flight_tick() { "stop() returned before the in-flight tick completed" ); } + +/// `with_watch_paths` pointing at a non-existent path must demote +/// silently to the polling driver — that's the slow-fallback acceptance +/// criterion from #250 ("when `notify` reports unsupported, the loop +/// falls back to polling cleanly"). Reproduces the network-mount +/// scenario without needing a real network mount: a path that doesn't +/// exist exercises the same `FsBurst::new -> Err -> polling driver` +/// branch. +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn fs_events_fall_back_to_polling_when_no_path_exists() { + let runs = Arc::new(AtomicUsize::new(0)); + let runs_for_ingest = runs.clone(); + let ingest: IngestFn = Arc::new(move || { + let runs = runs_for_ingest.clone(); + Box::pin(async move { + runs.fetch_add(1, Ordering::SeqCst); + Ok(IngestReport::default()) + }) + }); + + // Build a guaranteed-missing child of a fresh tempdir so the + // `FsBurst::new -> Err -> polling` demotion is deterministic + // across environments (a hardcoded absolute path could collide + // with an unusual filesystem layout). + let tmp = tempfile::tempdir().unwrap(); + let missing = tmp.path().join("definitely-missing-child"); + let opts = StartWatchLoopOptions::new(ingest) + .with_immediate(false) + .with_interval(Duration::from_millis(100)) + .with_watch_paths(vec![missing]); + let ctrl = start_watch_loop(opts); + + // If the FS-event driver were active, the loop would idle until a + // notify event landed (and none ever will here). The polling + // fallback must drive at least one tick within the polling + // interval. We yield + advance virtual time across two intervals + // to give the spawned task room to run. + for _ in 0..6 { + tokio::task::yield_now().await; + tokio::time::advance(Duration::from_millis(100)).await; + } + ctrl.stop().await; + + let n = runs.load(Ordering::SeqCst); + assert!( + n >= 1, + "polling fallback did not fire after FS-event driver demotion (runs={n})" + ); +} + +/// `disable_fsevents = true` must take the polling path even when +/// `watch_paths` references a real, watchable directory. Mirrors the +/// `--no-fsevents` opt-out in the CLI. +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn disable_fsevents_forces_polling_driver() { + let dir = tempfile::tempdir().unwrap(); + let runs = Arc::new(AtomicUsize::new(0)); + let runs_for_ingest = runs.clone(); + let ingest: IngestFn = Arc::new(move || { + let runs = runs_for_ingest.clone(); + Box::pin(async move { + runs.fetch_add(1, Ordering::SeqCst); + Ok(IngestReport::default()) + }) + }); + + let opts = StartWatchLoopOptions::new(ingest) + .with_immediate(false) + .with_interval(Duration::from_millis(50)) + .with_watch_paths(vec![dir.path().to_path_buf()]) + .with_disable_fsevents(true); + let ctrl = start_watch_loop(opts); + + // Polling cadence is 50ms; advance 4 intervals. + for _ in 0..6 { + tokio::task::yield_now().await; + tokio::time::advance(Duration::from_millis(50)).await; + } + ctrl.stop().await; + + assert!( + runs.load(Ordering::SeqCst) >= 1, + "polling driver did not fire under disable_fsevents=true" + ); +} + +/// Burst test: a flood of FS writes inside the debounce window must +/// produce a single ingest tick, not one per write. Acceptance +/// criterion from #250 ("rapid-fire 100 session writes within 100ms +/// produces ≤ 5 ingest cycles, not 100"). Uses real time because the +/// `notify` driver runs on its own OS thread and doesn't observe +/// `tokio::time::pause`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[ignore = "FS-event delivery is platform-dependent; run via cargo test -- --ignored"] +async fn burst_writes_coalesce_into_one_tick() { + let dir = tempfile::tempdir().unwrap(); + let runs = Arc::new(AtomicUsize::new(0)); + let runs_for_ingest = runs.clone(); + let ingest: IngestFn = Arc::new(move || { + let runs = runs_for_ingest.clone(); + Box::pin(async move { + runs.fetch_add(1, Ordering::SeqCst); + Ok(IngestReport::default()) + }) + }); + + let opts = StartWatchLoopOptions::new(ingest) + .with_immediate(false) + .with_interval(Duration::from_secs(60)) + .with_slow_fallback_interval(Duration::from_secs(60)) + .with_debounce(Duration::from_millis(150)) + .with_watch_paths(vec![dir.path().to_path_buf()]); + let ctrl = start_watch_loop(opts); + + // Give the watcher a moment to attach. + tokio::time::sleep(Duration::from_millis(100)).await; + + // 100 writes in rapid succession. + for i in 0..100 { + std::fs::write(dir.path().join(format!("burst-{i}.jsonl")), b"{}\n").unwrap(); + } + // Wait for the debounce window plus generous slack for the OS + // event-delivery latency. + tokio::time::sleep(Duration::from_millis(500)).await; + + ctrl.stop().await; + + let n = runs.load(Ordering::SeqCst); + assert!( + n <= 5, + "100 burst writes within 150ms debounce should coalesce to ≤ 5 ticks, got {n}" + ); + assert!( + n >= 1, + "100 burst writes should still wake the loop at least once, got {n}" + ); +} diff --git a/crates/relayburn-sdk/src/lib.rs b/crates/relayburn-sdk/src/lib.rs index 7eeb8bdb..fe91cfbd 100644 --- a/crates/relayburn-sdk/src/lib.rs +++ b/crates/relayburn-sdk/src/lib.rs @@ -107,11 +107,12 @@ pub use crate::analyze::{ }; pub use crate::ingest::{ - cleanup_stale_pending_stamps, ingest_all, ingest_claude_session, ingest_codex_sessions, - ingest_opencode_sessions, run_ingest_tick, start_watch_loop, write_pending_stamp, ErrorSink, - IngestFn, IngestOptions as RawIngestOptions, IngestReport, IngestRoots, PendingStamp, - PendingStampHarness, PendingStampWriteResult, ReportSink, StartWatchLoopOptions, - WatchController, WriteOptions as PendingStampWriteOptions, + cleanup_stale_pending_stamps, default_session_roots, ingest_all, ingest_claude_session, + ingest_codex_sessions, ingest_opencode_sessions, run_ingest_tick, start_watch_loop, + write_pending_stamp, ErrorSink, IngestFn, IngestOptions as RawIngestOptions, IngestReport, + IngestRoots, PendingStamp, PendingStampHarness, PendingStampWriteResult, ReportSink, + StartWatchLoopOptions, WatchController, WriteOptions as PendingStampWriteOptions, + DEFAULT_FS_DEBOUNCE, DEFAULT_SLOW_FALLBACK, }; // --- LedgerOpenOptions -----------------------------------------------------