From 98893ca74cad3d0c31d92e33667ae7f82a0d521c Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 29 Apr 2026 20:10:41 +0000 Subject: [PATCH 1/2] =?UTF-8?q?feat(callcenter):=20add=20LanceAuditSink=20?= =?UTF-8?q?=E2=80=94=20Lance-backed=20persistent=20audit=20writer=20(PR-F3?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements LanceAuditSink behind the `audit-log` feature flag (now pulls lance + arrow + futures + tokio). Entries are buffered via append(), flushed to a Lance dataset in append mode, and read back via scan_back(n). Schema: tenant_id (Utf8), actor_id (Utf8), statement_hash (UInt64), timestamp (Int64), action (Utf8). Tests: flush-10 roundtrip, flush-1000 count, multi-flush accumulation, empty-flush noop. All 75 tests pass (4 new Lance tests + 71 existing). https://claude.ai/code/session_01NYGrxVopyszZYgLBxe4hgj --- Cargo.lock | 1 + crates/lance-graph-callcenter/Cargo.toml | 6 +- crates/lance-graph-callcenter/src/audit.rs | 416 ++++++++++++++++++++- 3 files changed, 421 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c0bd3a5d..019fa6a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4338,6 +4338,7 @@ dependencies = [ "arrow", "axum", "datafusion", + "futures", "lance", "lance-graph-contract", "serde", diff --git a/crates/lance-graph-callcenter/Cargo.toml b/crates/lance-graph-callcenter/Cargo.toml index 960db21f..4cc912a7 100644 --- a/crates/lance-graph-callcenter/Cargo.toml +++ b/crates/lance-graph-callcenter/Cargo.toml @@ -27,6 +27,9 @@ tokio-tungstenite = { version = "0.24", optional = true } serde = { version = "1", features = ["derive"], optional = true } serde_json = { version = "1", optional = true } +# [audit-log] — Lance-backed audit writer needs futures for stream collect +futures = { version = "0.3", optional = true } + # [serve] — axum WS server (implies realtime + query) axum = { version = "0.8", features = ["ws"], optional = true } tower-http = { version = "0.5", features = ["cors"], optional = true } @@ -43,7 +46,8 @@ auth-rls-lite = ["auth-jwt", "query-lite"] auth-rls = ["auth-jwt", "query"] auth = ["auth-rls"] # LF-90 — append-only audit log skeleton (A3). -audit-log = [] +# PR-F3: LanceAuditSink requires lance + arrow + futures + tokio for async I/O. +audit-log = ["dep:arrow", "dep:lance", "dep:futures", "dep:tokio"] # A-fix-audit forward-stub: enables `audit_from_plan(...)` helper that captures # rewritten DataFusion LogicalPlans into AuditEntry. No-op alias today; will # pull in `dep:datafusion` once the helper has a concrete consumer. diff --git a/crates/lance-graph-callcenter/src/audit.rs b/crates/lance-graph-callcenter/src/audit.rs index 5331daa3..8593e047 100644 --- a/crates/lance-graph-callcenter/src/audit.rs +++ b/crates/lance-graph-callcenter/src/audit.rs @@ -7,7 +7,7 @@ //! //! Append-only audit log for every RLS-rewritten query. The default //! [`InMemoryAuditSink`] is a bounded ring buffer; production deployments -//! will swap in a Lance-backed writer in a follow-up PR. +//! use [`LanceAuditSink`] which persists entries to a Lance dataset. use std::collections::VecDeque; use std::sync::Mutex; @@ -118,6 +118,246 @@ impl AuditSink for InMemoryAuditSink { } } +// --------------------------------------------------------------------------- +// LanceAuditSink — Lance-backed persistent audit writer (PR-F3) +// --------------------------------------------------------------------------- + +/// Lance-backed audit sink that persists [`AuditEntry`] rows to a Lance dataset. +/// +/// Entries are buffered in memory via `append()`. Calling [`flush()`](LanceAuditSink::flush) +/// converts the buffer into an Arrow `RecordBatch` and appends it to the Lance +/// dataset in append mode (no overwrites). [`scan_back(n)`](LanceAuditSink::scan_back) +/// reads the last `n` entries from the persisted dataset. +/// +/// # Schema +/// +/// | Column | Arrow Type | Source | +/// |------------------|------------|-------------------------------| +/// | `tenant_id` | Utf8 | `AuditEntry::tenant_id` | +/// | `actor_id` | Utf8 | `AuditEntry::actor_id` | +/// | `statement_hash` | UInt64 | `AuditEntry::statement_hash` | +/// | `timestamp` | Int64 | `AuditEntry::ts_unix_ms` as i64 | +/// | `action` | Utf8 | `StatementKind` display name | +/// +/// # Example +/// +/// ```ignore +/// let sink = LanceAuditSink::new("/tmp/audit.lance"); +/// sink.append(entry); +/// sink.flush().await.unwrap(); +/// let recent = sink.scan_back(10).await.unwrap(); +/// ``` +#[cfg(feature = "audit-log")] +#[derive(Debug)] +pub struct LanceAuditSink { + /// Path to the Lance dataset directory. + dataset_path: String, + /// In-memory buffer of entries not yet flushed. + buffer: Mutex>, +} + +#[cfg(feature = "audit-log")] +impl LanceAuditSink { + /// Create a new Lance audit sink that will write to the given dataset path. + /// The dataset is created on first flush if it does not exist. + pub fn new(dataset_path: impl Into) -> Self { + Self { + dataset_path: dataset_path.into(), + buffer: Mutex::new(Vec::new()), + } + } + + /// Flush buffered entries to the Lance dataset (append mode). + /// + /// Drains the in-memory buffer, converts entries to an Arrow RecordBatch, + /// and appends them to the Lance dataset. If the dataset does not exist, + /// it is created. Returns the number of entries flushed. + pub async fn flush(&self) -> Result { + use arrow::array::{Int64Array, StringArray, UInt64Array}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use lance::dataset::{Dataset, WriteMode, WriteParams}; + use std::sync::Arc; + + let entries: Vec = { + let mut guard = self.buffer.lock().unwrap_or_else(|e| e.into_inner()); + std::mem::take(&mut *guard) + }; + + if entries.is_empty() { + return Ok(0); + } + + let n = entries.len(); + + // Build columnar arrays from the buffered entries. + let tenant_ids: Vec<&str> = entries.iter().map(|e| e.tenant_id.as_str()).collect(); + let actor_ids: Vec<&str> = entries.iter().map(|e| e.actor_id.as_str()).collect(); + let hashes: Vec = entries.iter().map(|e| e.statement_hash).collect(); + let timestamps: Vec = entries.iter().map(|e| e.ts_unix_ms as i64).collect(); + let actions: Vec<&str> = entries + .iter() + .map(|e| statement_kind_str(e.statement_kind)) + .collect(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("tenant_id", DataType::Utf8, false), + Field::new("actor_id", DataType::Utf8, false), + Field::new("statement_hash", DataType::UInt64, false), + Field::new("timestamp", DataType::Int64, false), + Field::new("action", DataType::Utf8, false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(tenant_ids)), + Arc::new(StringArray::from(actor_ids)), + Arc::new(UInt64Array::from(hashes)), + Arc::new(Int64Array::from(timestamps)), + Arc::new(StringArray::from(actions)), + ], + ) + .map_err(|e| format!("Arrow batch error: {e}"))?; + + let reader = arrow::record_batch::RecordBatchIterator::new( + vec![Ok(batch)], + schema, + ); + + // Determine write mode: Create if new, Append if existing. + let mode = match Dataset::open(&self.dataset_path).await { + Ok(_) => WriteMode::Append, + Err(_) => WriteMode::Create, + }; + + let params = WriteParams { + mode, + ..Default::default() + }; + + Dataset::write(reader, &self.dataset_path, Some(params)) + .await + .map_err(|e| format!("Lance write error: {e}"))?; + + Ok(n) + } + + /// Read the last `n` entries from the Lance dataset. + /// + /// Returns entries in dataset order (oldest first among the returned set). + /// If fewer than `n` entries exist, all entries are returned. + pub async fn scan_back(&self, n: usize) -> Result, String> { + use arrow::array::{Int64Array, StringArray, UInt64Array}; + use futures::TryStreamExt; + use lance::dataset::Dataset; + + let ds = Dataset::open(&self.dataset_path) + .await + .map_err(|e| format!("Lance open error: {e}"))?; + + let total_rows = ds.count_rows(None) + .await + .map_err(|e| format!("Lance count error: {e}"))?; + + let skip = if total_rows > n { total_rows - n } else { 0 }; + + let batches: Vec = ds + .scan() + .project(&["tenant_id", "actor_id", "statement_hash", "timestamp", "action"]) + .map_err(|e| format!("Lance project error: {e}"))? + .try_into_stream() + .await + .map_err(|e| format!("Lance stream error: {e}"))? + .try_collect() + .await + .map_err(|e| format!("Lance collect error: {e}"))?; + + let mut entries = Vec::new(); + let mut row_offset: usize = 0; + + for batch in &batches { + let tenant_col = batch + .column(0) + .as_any() + .downcast_ref::() + .ok_or("tenant_id column type mismatch")?; + let actor_col = batch + .column(1) + .as_any() + .downcast_ref::() + .ok_or("actor_id column type mismatch")?; + let hash_col = batch + .column(2) + .as_any() + .downcast_ref::() + .ok_or("statement_hash column type mismatch")?; + let ts_col = batch + .column(3) + .as_any() + .downcast_ref::() + .ok_or("timestamp column type mismatch")?; + let action_col = batch + .column(4) + .as_any() + .downcast_ref::() + .ok_or("action column type mismatch")?; + + for i in 0..batch.num_rows() { + if row_offset + i >= skip { + entries.push(AuditEntry { + ts_unix_ms: ts_col.value(i) as u64, + tenant_id: tenant_col.value(i).to_string(), + actor_id: actor_col.value(i).to_string(), + statement_hash: hash_col.value(i), + statement_kind: parse_statement_kind(action_col.value(i)), + rls_predicates_added: 0, // not stored in Lance schema + rewritten_plan: None, // not stored in Lance schema + }); + } + } + row_offset += batch.num_rows(); + } + + Ok(entries) + } +} + +#[cfg(feature = "audit-log")] +impl AuditSink for LanceAuditSink { + fn append(&self, entry: AuditEntry) { + let mut guard = self.buffer.lock().unwrap_or_else(|e| e.into_inner()); + guard.push(entry); + } + + fn snapshot(&self) -> Vec { + let guard = self.buffer.lock().unwrap_or_else(|e| e.into_inner()); + guard.clone() + } +} + +/// Convert [`StatementKind`] to its string representation for the Lance schema. +fn statement_kind_str(kind: StatementKind) -> &'static str { + match kind { + StatementKind::Select => "Select", + StatementKind::Insert => "Insert", + StatementKind::Update => "Update", + StatementKind::Delete => "Delete", + StatementKind::Other => "Other", + } +} + +/// Parse a string back into [`StatementKind`]. +fn parse_statement_kind(s: &str) -> StatementKind { + match s { + "Select" => StatementKind::Select, + "Insert" => StatementKind::Insert, + "Update" => StatementKind::Update, + "Delete" => StatementKind::Delete, + _ => StatementKind::Other, + } +} + /// Stable FNV-64a hash of a statement's text (or display form of a LogicalPlan). /// /// **Stability guarantee:** this is the FNV-1a 64-bit algorithm with the @@ -307,4 +547,178 @@ mod tests { } assert_eq!(sink.snapshot().len(), 800); } + + // ── LanceAuditSink tests ────────────────────────────────────────────── + + #[cfg(feature = "audit-log")] + mod lance_tests { + use super::*; + + fn lance_sample_entry(tag: &str) -> AuditEntry { + AuditEntry { + ts_unix_ms: 1000 + tag.len() as u64, + tenant_id: format!("tenant-{tag}"), + actor_id: format!("actor-{tag}"), + statement_hash: hash_statement(tag), + statement_kind: StatementKind::Select, + rls_predicates_added: 2, + rewritten_plan: None, + } + } + + /// Flush 10 entries → scan_back(10) → verify round-trip. + #[tokio::test] + async fn flush_10_then_scan_back_roundtrip() { + let dir = std::env::temp_dir().join(format!( + "lance_audit_test_10_{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + let path = dir.to_str().unwrap(); + + let sink = LanceAuditSink::new(path); + + // Append 10 entries with distinct tags. + for i in 0..10 { + sink.append(lance_sample_entry(&format!("e{i}"))); + } + + // snapshot() should see the 10 buffered entries. + assert_eq!(sink.snapshot().len(), 10); + + // Flush to Lance. + let flushed = sink.flush().await.unwrap(); + assert_eq!(flushed, 10); + + // After flush, in-memory buffer is empty. + assert_eq!(sink.snapshot().len(), 0); + + // Read back from Lance. + let entries = sink.scan_back(10).await.unwrap(); + assert_eq!(entries.len(), 10); + + // Verify round-trip field integrity. + for (i, entry) in entries.iter().enumerate() { + let tag = format!("e{i}"); + assert_eq!(entry.tenant_id, format!("tenant-{tag}")); + assert_eq!(entry.actor_id, format!("actor-{tag}")); + assert_eq!(entry.statement_hash, hash_statement(&tag)); + assert_eq!(entry.statement_kind, StatementKind::Select); + assert_eq!(entry.ts_unix_ms, 1000 + tag.len() as u64); + } + + // Cleanup. + let _ = std::fs::remove_dir_all(&dir); + } + + /// Flush 1000 entries → verify count via scan_back. + #[tokio::test] + async fn flush_1000_verify_count() { + let dir = std::env::temp_dir().join(format!( + "lance_audit_test_1000_{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + let path = dir.to_str().unwrap(); + + let sink = LanceAuditSink::new(path); + + // Append 1000 entries. + for i in 0..1000 { + sink.append(AuditEntry { + ts_unix_ms: i as u64, + tenant_id: format!("t-{i}"), + actor_id: format!("a-{i}"), + statement_hash: hash_statement(&format!("stmt-{i}")), + statement_kind: if i % 2 == 0 { + StatementKind::Select + } else { + StatementKind::Insert + }, + rls_predicates_added: 1, + rewritten_plan: None, + }); + } + + let flushed = sink.flush().await.unwrap(); + assert_eq!(flushed, 1000); + + // scan_back with very large n returns all 1000. + let all = sink.scan_back(2000).await.unwrap(); + assert_eq!(all.len(), 1000); + + // scan_back(10) returns last 10. + let last_10 = sink.scan_back(10).await.unwrap(); + assert_eq!(last_10.len(), 10); + // The last entry should be t-999. + assert_eq!(last_10[9].tenant_id, "t-999"); + // The first of the last 10 should be t-990. + assert_eq!(last_10[0].tenant_id, "t-990"); + + // Verify alternating action round-trip. + assert_eq!(last_10[0].statement_kind, StatementKind::Select); // 990 is even + assert_eq!(last_10[1].statement_kind, StatementKind::Insert); // 991 is odd + + // Cleanup. + let _ = std::fs::remove_dir_all(&dir); + } + + /// Multiple flushes append (don't overwrite). + #[tokio::test] + async fn multiple_flushes_accumulate() { + let dir = std::env::temp_dir().join(format!( + "lance_audit_test_accum_{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + let path = dir.to_str().unwrap(); + + let sink = LanceAuditSink::new(path); + + // First batch. + for i in 0..5 { + sink.append(lance_sample_entry(&format!("batch1-{i}"))); + } + sink.flush().await.unwrap(); + + // Second batch. + for i in 0..5 { + sink.append(lance_sample_entry(&format!("batch2-{i}"))); + } + sink.flush().await.unwrap(); + + // Should have 10 total entries. + let all = sink.scan_back(100).await.unwrap(); + assert_eq!(all.len(), 10); + + // Cleanup. + let _ = std::fs::remove_dir_all(&dir); + } + + /// Flush of empty buffer is a no-op. + #[tokio::test] + async fn flush_empty_is_noop() { + let dir = std::env::temp_dir().join(format!( + "lance_audit_test_empty_{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + let path = dir.to_str().unwrap(); + + let sink = LanceAuditSink::new(path); + let flushed = sink.flush().await.unwrap(); + assert_eq!(flushed, 0); + + // Cleanup (dir may not exist). + let _ = std::fs::remove_dir_all(&dir); + } + } } From 928c5d7f8e839f30ffb634dfa42096e0a236141a Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 29 Apr 2026 21:15:53 +0000 Subject: [PATCH 2/2] refactor(F3): timestamp temporal type + missing columns round-trip + lance limit-pushdown --- crates/lance-graph-callcenter/src/audit.rs | 238 ++++++++++++++++++--- 1 file changed, 203 insertions(+), 35 deletions(-) diff --git a/crates/lance-graph-callcenter/src/audit.rs b/crates/lance-graph-callcenter/src/audit.rs index 8593e047..53e0559f 100644 --- a/crates/lance-graph-callcenter/src/audit.rs +++ b/crates/lance-graph-callcenter/src/audit.rs @@ -131,13 +131,20 @@ impl AuditSink for InMemoryAuditSink { /// /// # Schema /// -/// | Column | Arrow Type | Source | -/// |------------------|------------|-------------------------------| -/// | `tenant_id` | Utf8 | `AuditEntry::tenant_id` | -/// | `actor_id` | Utf8 | `AuditEntry::actor_id` | -/// | `statement_hash` | UInt64 | `AuditEntry::statement_hash` | -/// | `timestamp` | Int64 | `AuditEntry::ts_unix_ms` as i64 | -/// | `action` | Utf8 | `StatementKind` display name | +/// | Column | Arrow Type | Source | +/// |------------------------|-----------------------------------------|---------------------------------| +/// | `tenant_id` | Utf8 | `AuditEntry::tenant_id` | +/// | `actor_id` | Utf8 | `AuditEntry::actor_id` | +/// | `statement_hash` | UInt64 | `AuditEntry::statement_hash` | +/// | `timestamp` | Timestamp(Millisecond, "UTC") | `AuditEntry::ts_unix_ms` as i64 | +/// | `action` | Utf8 | `StatementKind` display name | +/// | `rls_predicates_added` | UInt16 | `AuditEntry::rls_predicates_added` | +/// | `rewritten_plan` | Utf8 (nullable) | `AuditEntry::rewritten_plan` | +/// +/// `timestamp` is declared as a temporal type (millisecond precision, UTC) so +/// DataFusion temporal predicates (`>=`, `BETWEEN`, etc.) work on the column; +/// Lance still stores it as int64 underneath. The `as i64` cast at flush-time +/// is therefore safe. /// /// # Example /// @@ -173,8 +180,8 @@ impl LanceAuditSink { /// and appends them to the Lance dataset. If the dataset does not exist, /// it is created. Returns the number of entries flushed. pub async fn flush(&self) -> Result { - use arrow::array::{Int64Array, StringArray, UInt64Array}; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow::array::{StringArray, TimestampMillisecondArray, UInt16Array, UInt64Array}; + use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; use lance::dataset::{Dataset, WriteMode, WriteParams}; use std::sync::Arc; @@ -199,23 +206,38 @@ impl LanceAuditSink { .iter() .map(|e| statement_kind_str(e.statement_kind)) .collect(); + let rls_preds: Vec = entries.iter().map(|e| e.rls_predicates_added).collect(); + let plans: Vec> = + entries.iter().map(|e| e.rewritten_plan.as_deref()).collect(); + let tz: Arc = Arc::from("UTC"); let schema = Arc::new(Schema::new(vec![ Field::new("tenant_id", DataType::Utf8, false), Field::new("actor_id", DataType::Utf8, false), Field::new("statement_hash", DataType::UInt64, false), - Field::new("timestamp", DataType::Int64, false), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, Some(tz.clone())), + false, + ), Field::new("action", DataType::Utf8, false), + Field::new("rls_predicates_added", DataType::UInt16, false), + Field::new("rewritten_plan", DataType::Utf8, true), ])); + let ts_array = + TimestampMillisecondArray::from(timestamps).with_timezone(tz.clone()); + let batch = RecordBatch::try_new( schema.clone(), vec![ Arc::new(StringArray::from(tenant_ids)), Arc::new(StringArray::from(actor_ids)), Arc::new(UInt64Array::from(hashes)), - Arc::new(Int64Array::from(timestamps)), + Arc::new(ts_array), Arc::new(StringArray::from(actions)), + Arc::new(UInt16Array::from(rls_preds)), + Arc::new(StringArray::from(plans)), ], ) .map_err(|e| format!("Arrow batch error: {e}"))?; @@ -248,7 +270,9 @@ impl LanceAuditSink { /// Returns entries in dataset order (oldest first among the returned set). /// If fewer than `n` entries exist, all entries are returned. pub async fn scan_back(&self, n: usize) -> Result, String> { - use arrow::array::{Int64Array, StringArray, UInt64Array}; + use arrow::array::{ + Array, StringArray, TimestampMillisecondArray, UInt16Array, UInt64Array, + }; use futures::TryStreamExt; use lance::dataset::Dataset; @@ -256,16 +280,35 @@ impl LanceAuditSink { .await .map_err(|e| format!("Lance open error: {e}"))?; - let total_rows = ds.count_rows(None) + let total_rows = ds + .count_rows(None) .await .map_err(|e| format!("Lance count error: {e}"))?; - let skip = if total_rows > n { total_rows - n } else { 0 }; - - let batches: Vec = ds - .scan() - .project(&["tenant_id", "actor_id", "statement_hash", "timestamp", "action"]) - .map_err(|e| format!("Lance project error: {e}"))? + let skip = total_rows.saturating_sub(n); + + // Push the limit + offset into the Lance scanner so we don't pull all + // fragments only to slice off the tail in process. `Scanner::limit` + // takes (limit, offset) on Lance v4. If the underlying fragment layout + // doesn't support efficient offsets, Lance will still degrade + // gracefully — but we get the win for typical append-only audit logs. + let mut scanner = ds.scan(); + scanner + .project(&[ + "tenant_id", + "actor_id", + "statement_hash", + "timestamp", + "action", + "rls_predicates_added", + "rewritten_plan", + ]) + .map_err(|e| format!("Lance project error: {e}"))?; + scanner + .limit(Some(n as i64), Some(skip as i64)) + .map_err(|e| format!("Lance limit error: {e}"))?; + + let batches: Vec = scanner .try_into_stream() .await .map_err(|e| format!("Lance stream error: {e}"))? @@ -274,7 +317,6 @@ impl LanceAuditSink { .map_err(|e| format!("Lance collect error: {e}"))?; let mut entries = Vec::new(); - let mut row_offset: usize = 0; for batch in &batches { let tenant_col = batch @@ -295,28 +337,40 @@ impl LanceAuditSink { let ts_col = batch .column(3) .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or("timestamp column type mismatch")?; let action_col = batch .column(4) .as_any() .downcast_ref::() .ok_or("action column type mismatch")?; + let rls_col = batch + .column(5) + .as_any() + .downcast_ref::() + .ok_or("rls_predicates_added column type mismatch")?; + let plan_col = batch + .column(6) + .as_any() + .downcast_ref::() + .ok_or("rewritten_plan column type mismatch")?; for i in 0..batch.num_rows() { - if row_offset + i >= skip { - entries.push(AuditEntry { - ts_unix_ms: ts_col.value(i) as u64, - tenant_id: tenant_col.value(i).to_string(), - actor_id: actor_col.value(i).to_string(), - statement_hash: hash_col.value(i), - statement_kind: parse_statement_kind(action_col.value(i)), - rls_predicates_added: 0, // not stored in Lance schema - rewritten_plan: None, // not stored in Lance schema - }); - } + let plan = if plan_col.is_null(i) { + None + } else { + Some(plan_col.value(i).to_string()) + }; + entries.push(AuditEntry { + ts_unix_ms: ts_col.value(i) as u64, + tenant_id: tenant_col.value(i).to_string(), + actor_id: actor_col.value(i).to_string(), + statement_hash: hash_col.value(i), + statement_kind: parse_statement_kind(action_col.value(i)), + rls_predicates_added: rls_col.value(i), + rewritten_plan: plan, + }); } - row_offset += batch.num_rows(); } Ok(entries) @@ -580,9 +634,16 @@ mod tests { let sink = LanceAuditSink::new(path); - // Append 10 entries with distinct tags. + // Append 10 entries with distinct tags. The first 5 receive + // non-default `rls_predicates_added` values so we verify the + // column survives the Lance round-trip (and isn't silently + // rebuilt as 0 like the original schema-dropping behaviour). for i in 0..10 { - sink.append(lance_sample_entry(&format!("e{i}"))); + let mut entry = lance_sample_entry(&format!("e{i}")); + if i < 5 { + entry.rls_predicates_added = (i as u16) + 7; // 7..=11 + } + sink.append(entry); } // snapshot() should see the 10 buffered entries. @@ -607,6 +668,11 @@ mod tests { assert_eq!(entry.statement_hash, hash_statement(&tag)); assert_eq!(entry.statement_kind, StatementKind::Select); assert_eq!(entry.ts_unix_ms, 1000 + tag.len() as u64); + let expected_preds = if i < 5 { (i as u16) + 7 } else { 2 }; + assert_eq!( + entry.rls_predicates_added, expected_preds, + "rls_predicates_added must round-trip (idx {i})" + ); } // Cleanup. @@ -701,6 +767,108 @@ mod tests { let _ = std::fs::remove_dir_all(&dir); } + /// Failing-test-first (loose end #1): the persisted Lance schema must declare + /// `timestamp` as a temporal type so DataFusion temporal predicates work. + /// On the original code this fails because the column is `Int64`. + #[tokio::test] + async fn test_timestamp_column_is_temporal_type() { + use arrow::datatypes::DataType; + use lance::dataset::Dataset; + + let dir = std::env::temp_dir().join(format!( + "lance_audit_test_ts_temporal_{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + let path = dir.to_str().unwrap(); + + let sink = LanceAuditSink::new(path); + sink.append(lance_sample_entry("ts-probe")); + sink.flush().await.unwrap(); + + let ds = Dataset::open(path).await.unwrap(); + let schema = ds.schema(); + // Lance's internal Schema → Arrow Schema for the field type assertion. + let arrow_schema: arrow::datatypes::Schema = schema.into(); + let ts_field = arrow_schema + .field_with_name("timestamp") + .expect("timestamp column exists"); + match ts_field.data_type() { + DataType::Timestamp(_, _) => { /* pass */ } + other => panic!( + "timestamp must be a temporal type for DataFusion predicates, got {:?}", + other + ), + } + + let _ = std::fs::remove_dir_all(&dir); + } + + /// Failing-test-first (loose end #2): `rls_predicates_added` and `rewritten_plan` + /// must round-trip through Lance. On the original code these were silently + /// dropped from the schema and rebuilt as `0` / `None` on read-back. + #[tokio::test] + async fn test_round_trip_preserves_rls_predicates_added_and_rewritten_plan() { + let dir = std::env::temp_dir().join(format!( + "lance_audit_test_lossless_{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + let path = dir.to_str().unwrap(); + + let sink = LanceAuditSink::new(path); + + // 10 entries; 5 of them with non-default rls_predicates_added + rewritten_plan. + for i in 0..10 { + let plan = if i < 5 { + Some(format!("Filter: tenant_id = 't{i}'\n TableScan: calls")) + } else { + None + }; + let preds = if i < 5 { (i as u16) + 3 } else { 0 }; + sink.append(AuditEntry { + ts_unix_ms: 2000 + i as u64, + tenant_id: format!("t-{i}"), + actor_id: format!("a-{i}"), + statement_hash: hash_statement(&format!("stmt-{i}")), + statement_kind: StatementKind::Select, + rls_predicates_added: preds, + rewritten_plan: plan, + }); + } + sink.flush().await.unwrap(); + + let entries = sink.scan_back(10).await.unwrap(); + assert_eq!(entries.len(), 10); + + for (i, e) in entries.iter().enumerate() { + if i < 5 { + assert_eq!( + e.rls_predicates_added, + (i as u16) + 3, + "rls_predicates_added must round-trip (idx {i})" + ); + let plan = e + .rewritten_plan + .as_ref() + .unwrap_or_else(|| panic!("rewritten_plan must round-trip (idx {i})")); + assert!( + plan.contains(&format!("tenant_id = 't{i}'")), + "rewritten_plan content must round-trip (idx {i}): got {plan:?}" + ); + } else { + assert_eq!(e.rls_predicates_added, 0); + assert!(e.rewritten_plan.is_none()); + } + } + + let _ = std::fs::remove_dir_all(&dir); + } + /// Flush of empty buffer is a no-op. #[tokio::test] async fn flush_empty_is_noop() {