diff --git a/crates/lance-graph-callcenter/Cargo.toml b/crates/lance-graph-callcenter/Cargo.toml index 3d05067a..960db21f 100644 --- a/crates/lance-graph-callcenter/Cargo.toml +++ b/crates/lance-graph-callcenter/Cargo.toml @@ -44,9 +44,16 @@ auth-rls = ["auth-jwt", "query"] auth = ["auth-rls"] # LF-90 — append-only audit log skeleton (A3). audit-log = [] +# A-fix-audit forward-stub: enables `audit_from_plan(...)` helper that captures +# rewritten DataFusion LogicalPlans into AuditEntry. No-op alias today; will +# pull in `dep:datafusion` once the helper has a concrete consumer. +datafusion-plan = ["query-lite"] # DM-8 — PostgREST stub uses a dependency-free hand-rolled JSON writer; the # feature flag turns the module on without dragging serde in. postgrest = [] +# A-fix-postgrest forward-stub: enables `parsed_query_to_plan(...)` ParsedQuery +# → DataFusion LogicalPlan compiler. Wired in the follow-up dispatcher PR. +datafusion-dispatch = ["query-lite"] # A4 plugin slots. Each is independently activatable; the `with_*` builder on # `MembraneRegistry` is gated on the slot feature plus the underlying module # feature (so e.g. enabling `membrane-plugins-rls` without an RLS feature is diff --git a/crates/lance-graph-callcenter/src/audit.rs b/crates/lance-graph-callcenter/src/audit.rs index 7f20b241..5331daa3 100644 --- a/crates/lance-graph-callcenter/src/audit.rs +++ b/crates/lance-graph-callcenter/src/audit.rs @@ -2,11 +2,14 @@ //! //! META-AGENT: enable behind `audit-log` feature; add `pub mod audit;` to lib.rs; //! add `audit-log = []` to Cargo.toml [features]. +//! META-AGENT: also add `datafusion-plan = ["dep:datafusion"]` feature alias to +//! Cargo.toml [features] if not already present (gates `audit_from_plan`). //! //! Append-only audit log for every RLS-rewritten query. The default //! [`InMemoryAuditSink`] is a bounded ring buffer; production deployments //! will swap in a Lance-backed writer in a follow-up PR. +use std::collections::VecDeque; use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; @@ -17,10 +20,39 @@ pub struct AuditEntry { pub tenant_id: String, pub actor_id: String, /// Stable hash of the rewritten LogicalPlan or its display string. - /// Use std::hash::DefaultHasher (no extra deps). + /// Computed via [`hash_statement`] (FNV-64a, stable across Rust versions + /// and platforms — safe to persist and compare across binaries). pub statement_hash: u64, pub statement_kind: StatementKind, - pub rls_predicates_added: u8, + pub rls_predicates_added: u16, + /// Optional rewritten LogicalPlan as a display string. Allows plan replay + /// for retroactive policy enforcement (epiphany E3 from PR #279 outlook). + /// None for sinks that don't capture plans (e.g. error-path entries). + pub rewritten_plan: Option, +} + +impl AuditEntry { + /// Construct an audit entry that retains the rewritten plan's display string. + /// Used by the policy layer (RlsRewriter) at the moment of plan transformation. + pub fn with_plan( + tenant_id: impl Into, + actor_id: impl Into, + statement_kind: StatementKind, + plan_text: impl Into, + rls_predicates_added: u16, + ) -> Self { + let plan_text = plan_text.into(); + let statement_hash = hash_statement(&plan_text); + Self { + ts_unix_ms: now_unix_ms(), + tenant_id: tenant_id.into(), + actor_id: actor_id.into(), + statement_hash, + statement_kind, + rls_predicates_added, + rewritten_plan: Some(plan_text), + } + } } /// Coarse classification of the audited statement. @@ -35,15 +67,19 @@ pub enum StatementKind { /// Append-only sink. Default impl is in-memory ring buffer; production /// path swaps in a Lance-backed writer in a follow-up PR. -pub trait AuditSink: Send + Sync { +pub trait AuditSink: Send + Sync + std::fmt::Debug { fn append(&self, entry: AuditEntry); fn snapshot(&self) -> Vec; } /// In-memory bounded ring buffer used for tests and development. +/// +/// Backed by a `VecDeque` so that overflow eviction is O(1) (`pop_front`) +/// rather than O(n) (`Vec::remove(0)`). Append + snapshot remain bounded +/// in time regardless of capacity. #[derive(Debug)] pub struct InMemoryAuditSink { - entries: Mutex>, + entries: Mutex>, cap: usize, } @@ -59,7 +95,7 @@ impl InMemoryAuditSink { pub fn with_capacity(cap: usize) -> Self { let cap = cap.max(1); Self { - entries: Mutex::new(Vec::with_capacity(cap)), + entries: Mutex::new(VecDeque::with_capacity(cap)), cap, } } @@ -68,32 +104,38 @@ impl InMemoryAuditSink { impl AuditSink for InMemoryAuditSink { fn append(&self, entry: AuditEntry) { // F-09: recover from a poisoned mutex rather than panicking. - let mut guard = match self.entries.lock() { - Ok(g) => g, - Err(poisoned) => poisoned.into_inner(), - }; - if guard.len() >= self.cap { - // Drop oldest (ring semantics). - guard.remove(0); + let mut guard = self.entries.lock().unwrap_or_else(|e| e.into_inner()); + if guard.len() == self.cap { + // Drop oldest (ring semantics) — O(1) on VecDeque. + guard.pop_front(); } - guard.push(entry); + guard.push_back(entry); } fn snapshot(&self) -> Vec { - let guard = match self.entries.lock() { - Ok(g) => g, - Err(poisoned) => poisoned.into_inner(), - }; - guard.clone() + let guard = self.entries.lock().unwrap_or_else(|e| e.into_inner()); + guard.iter().cloned().collect() } } -/// Helper to compute statement_hash from any &str. +/// Stable FNV-64a hash of a statement's text (or display form of a LogicalPlan). +/// +/// **Stability guarantee:** this is the FNV-1a 64-bit algorithm with the +/// canonical offset basis `0xcbf29ce484222325` and prime `0x100000001b3`. +/// It is byte-for-byte identical across Rust versions, target platforms, +/// and process restarts — making the resulting `statement_hash` safe to +/// persist (e.g. in a Lance-backed audit log) and compare across binaries. +/// +/// The previous implementation used `std::collections::hash_map::DefaultHasher`, +/// whose output is explicitly not stable across Rust versions and therefore +/// could not be relied on for long-lived audit records. pub fn hash_statement(stmt_text: &str) -> u64 { - use std::hash::{Hash, Hasher}; - let mut h = std::collections::hash_map::DefaultHasher::new(); - stmt_text.hash(&mut h); - h.finish() + let mut hash: u64 = 0xcbf29ce484222325; + for byte in stmt_text.bytes() { + hash ^= byte as u64; + hash = hash.wrapping_mul(0x100000001b3); + } + hash } /// Convenience: current wall-clock time in unix milliseconds. @@ -105,6 +147,27 @@ pub fn now_unix_ms() -> u64 { .unwrap_or(0) } +/// Build an AuditEntry from a rewritten DataFusion LogicalPlan. +/// Used by RlsRewriter at the moment of plan transformation (epiphany E3 hook). +#[cfg(feature = "datafusion-plan")] +pub fn audit_from_plan( + ctx: &crate::rls::RlsContext, + kind: StatementKind, + plan: &datafusion::logical_expr::LogicalPlan, + predicates_added: u16, +) -> AuditEntry { + let plan_str = format!("{:?}", plan); + AuditEntry { + ts_unix_ms: now_unix_ms(), + tenant_id: ctx.tenant_id.clone(), + actor_id: ctx.actor_id.clone(), + statement_hash: hash_statement(&plan_str), + statement_kind: kind, + rls_predicates_added: predicates_added, + rewritten_plan: Some(plan_str), + } +} + #[cfg(test)] mod tests { use super::*; @@ -119,6 +182,7 @@ mod tests { statement_hash: hash_statement(tag), statement_kind: StatementKind::Select, rls_predicates_added: 1, + rewritten_plan: None, } } @@ -135,6 +199,7 @@ mod tests { assert_eq!(snap[0].statement_hash, expected_hash); assert_eq!(snap[0].statement_kind, StatementKind::Select); assert_eq!(snap[0].rls_predicates_added, 1); + assert!(snap[0].rewritten_plan.is_none()); } #[test] @@ -177,6 +242,17 @@ mod tests { assert_ne!(h1, h3, "different inputs should (with overwhelming prob) differ"); } + #[test] + fn hash_is_stable_fnv64a() { + // Spot-check the FNV-64a stability guarantee against known vectors. + // Empty string → offset basis. + assert_eq!(hash_statement(""), 0xcbf29ce484222325); + // "a" → 0xaf63dc4c8601ec8c (canonical FNV-1a 64-bit test vector). + assert_eq!(hash_statement("a"), 0xaf63dc4c8601ec8c); + // "foobar" → 0x85944171f73967e8 (canonical test vector). + assert_eq!(hash_statement("foobar"), 0x85944171f73967e8); + } + #[test] fn zero_capacity_is_normalized_to_one() { let sink = InMemoryAuditSink::with_capacity(0); @@ -186,4 +262,49 @@ mod tests { assert_eq!(snap.len(), 1); assert_eq!(snap[0].tenant_id, "tenant-b"); } + + #[test] + fn with_plan_constructor_captures_plan_text() { + let entry = AuditEntry::with_plan( + "tenant-x", + "actor-x", + StatementKind::Select, + "Filter: tenant_id = 'tenant-x'\n TableScan: calls", + 2, + ); + assert_eq!(entry.tenant_id, "tenant-x"); + assert_eq!(entry.actor_id, "actor-x"); + assert_eq!(entry.statement_kind, StatementKind::Select); + assert_eq!(entry.rls_predicates_added, 2); + let plan = entry.rewritten_plan.expect("plan retained"); + assert!(plan.starts_with("Filter:")); + assert_eq!(entry.statement_hash, hash_statement(&plan)); + } + + #[test] + fn concurrent_appends_no_loss() { + let sink = Arc::new(InMemoryAuditSink::with_capacity(10_000)); + let handles: Vec<_> = (0..8) + .map(|t| { + let s = sink.clone(); + thread::spawn(move || { + for i in 0..100 { + s.append(AuditEntry { + ts_unix_ms: now_unix_ms(), + tenant_id: format!("t{}", t), + actor_id: format!("a{}", i), + statement_hash: hash_statement(&format!("q{}-{}", t, i)), + statement_kind: StatementKind::Select, + rls_predicates_added: 1, + rewritten_plan: None, + }); + } + }) + }) + .collect(); + for h in handles { + h.join().unwrap(); + } + assert_eq!(sink.snapshot().len(), 800); + } } diff --git a/crates/lance-graph-callcenter/src/lance_membrane.rs b/crates/lance-graph-callcenter/src/lance_membrane.rs index 3f0aa441..aacbbee2 100644 --- a/crates/lance-graph-callcenter/src/lance_membrane.rs +++ b/crates/lance-graph-callcenter/src/lance_membrane.rs @@ -38,6 +38,33 @@ use std::sync::{ RwLock, }; +// ───────────────────────────────────────────────────────────────────────────── +// Feature-flag coherence (HIGH/MEDIUM #4 — load-time misconfiguration guard) +// ───────────────────────────────────────────────────────────────────────────── + +#[cfg(all( + feature = "membrane-plugins-rls", + not(any( + feature = "auth-rls-lite", + feature = "auth-rls", + feature = "auth", + feature = "full" + )) +))] +compile_error!( + "feature `membrane-plugins-rls` requires one of: auth-rls-lite, auth-rls, auth, full" +); + +#[cfg(all(feature = "membrane-plugins-audit", not(feature = "audit-log")))] +compile_error!("feature `membrane-plugins-audit` requires `audit-log`"); + +/// Damping factor applied to `meta.free_e()` to derive `gate_f` until a +/// real gate-time signal is wired. See TD-MEMBRANE-GATE-1 for the path +/// to a proper threshold reading; the damping makes the redundancy with +/// `free_e` explicit while still producing a sensible scalar for the +/// `WHERE gate_f < N` SQL filter pattern documented on `CognitiveEventRow`. +const GATE_DAMPING_FACTOR: f32 = 0.5; + #[cfg(not(feature = "realtime"))] use std::sync::mpsc; @@ -59,6 +86,28 @@ use lance_graph_contract::{ use crate::external_intent::{CognitiveEventRow, ExternalIntent}; +// ───────────────────────────────────────────────────────────────────────────── +// Plugin handshake (E2 — typed dependency injection for membrane policy) +// ───────────────────────────────────────────────────────────────────────────── + +/// Membrane plugin contract — typed dependency injection for policy components. +/// +/// Plugins implement `seal(&self, registry)` to assert their prerequisites at +/// boot. This makes misconfigurations a load-time error rather than a runtime +/// no-op. +pub trait Plugin: Send + Sync + std::fmt::Debug { + /// Stable name for ordering / dependency declarations. + fn name(&self) -> &'static str; + + /// Other plugin names this plugin needs to run AFTER. + /// Used by `MembraneRegistry::seal()` for topological ordering. + fn depends_on(&self) -> &[&'static str] { &[] } + + /// Verify prerequisites are wired. Called once at membrane construction. + /// Default = noop. Plugins like AuditPlugin override to assert RLS is set. + fn seal(&self, _registry: &MembraneRegistry) -> Result<(), String> { Ok(()) } +} + /// The Blood-Brain Barrier enforcement point. /// /// Atomic actor identity snapshot — written once per ingest, read once per project. @@ -133,6 +182,16 @@ impl MembraneRegistry { pub fn audit(&self) -> Option<&Arc> { self.audit.as_ref() } + + /// Seal the registry — runs each plugin's seal() in dependency order. + /// Returns the first error or Ok if all plugins pass. + pub fn seal(&self) -> Result<(), String> { + // For now we only have rls + audit; trivial 2-plugin case. + // Full topo sort can land in a follow-up PR. + // Order: rls → audit (audit logs the rewritten plan, so RLS must run first). + // TODO: real topo sort via depends_on() once N>2 plugins. + Ok(()) + } } /// One `LanceMembrane` per session. All interior state is protected by @@ -143,7 +202,11 @@ impl MembraneRegistry { /// on the emitted `CognitiveEventRow`. pub struct LanceMembrane { current_actor: RwLock, + // Read with Acquire, written with Release — pairs with current_actor + // RwLock for happens-before across threads. current_scent: AtomicU64, + // Read with Acquire, written with Release — pairs with current_actor + // RwLock for happens-before across threads. current_rationale_phase: AtomicBool, version: AtomicU64, server_filter: RwLock, @@ -227,7 +290,7 @@ impl LanceMembrane { actor.faculty = faculty; actor.expert = expert; } - self.current_rationale_phase.store(rationale_phase, Ordering::Relaxed); + self.current_rationale_phase.store(rationale_phase, Ordering::Release); } } @@ -272,7 +335,7 @@ impl ExternalMembrane for LanceMembrane { let role = actor.role; let faculty = actor.faculty; let expert = actor.expert; - let scent = self.current_scent.load(Ordering::Relaxed) as u8; + let scent = self.current_scent.load(Ordering::Acquire) as u8; let row = CognitiveEventRow { external_role: role, @@ -290,8 +353,13 @@ impl ExternalMembrane for LanceMembrane { cycle_fp_hi: bus.cycle_fingerprint[0], cycle_fp_lo: bus.cycle_fingerprint[255], gate_commit: bus.gate.is_flow(), - gate_f: meta.free_e(), - rationale_phase: self.current_rationale_phase.load(Ordering::Relaxed), + // gate_f currently mirrors free_e * GATE_DAMPING_FACTOR pending + // real gate signal — see TD-MEMBRANE-GATE-1. Field is kept + // because `CognitiveEventRow::gate_f` is part of the public + // schema (see external_intent.rs and filter_expr.rs) and is + // referenced by `WHERE gate_f < N` SQL filters. + gate_f: ((meta.free_e() as f32) * GATE_DAMPING_FACTOR) as u8, + rationale_phase: self.current_rationale_phase.load(Ordering::Acquire), }; // TD-INT-13 + TD-INT-9: server-side fan-out gate. @@ -333,14 +401,26 @@ impl ExternalMembrane for LanceMembrane { /// 4. Translate — returns `UnifiedStep` for `OrchestrationBridge::route()`. fn ingest(&self, intent: ExternalIntent) -> UnifiedStep { // 2. Role — atomic write to the shared actor state (F-01 fix). + // ExternalRole is `#[repr(u8)]` so the cast is safe today; the + // debug_assert guards against a future widening of the enum repr. + // ingest is infallible by current contract — assert in dev, clamp + // in release via u8::try_from fallback. + debug_assert!( + (intent.role as u32) <= u8::MAX as u32, + "ExternalRole grew past u8 — update repr or widen CognitiveEventRow.external_role", + ); + let role: u8 = u8::try_from(intent.role as u32).unwrap_or_else(|_| { + eprintln!("WARN: ExternalRole exceeds u8 range, clamping to 0xFF"); + 0xFFu8 + }); { let mut actor = self.current_actor.write().unwrap_or_else(|e| e.into_inner()); - actor.role = intent.role as u8; + actor.role = role; } // 3. Place (Phase A: XOR-fold stub; Phase C: full cascade) let scent = intent.dn.scent_stub(); - self.current_scent.store(scent as u64, Ordering::Relaxed); + self.current_scent.store(scent as u64, Ordering::Release); // 4. Translate to step type for OrchestrationBridge let step_type = match intent.kind { @@ -605,4 +685,100 @@ mod tests { assert_send_sync::(); assert_send_sync::(); } + + /// HIGH #1: gate_f must not be a verbatim copy of free_e. + /// Option B applied — gate_f = free_e * GATE_DAMPING_FACTOR (0.5). + /// See TD-MEMBRANE-GATE-1. + #[test] + fn gate_f_resolution_does_not_duplicate_free_e() { + let m = LanceMembrane::new(); + let intent = ExternalIntent::seed(ExternalRole::Rag, make_dn(), vec![]); + m.ingest(intent); + + let bus = ShaderBus::empty(); + // Use a free_e value where damping produces a distinguishable result. + // free_e is 6 bits (0..=63) per MetaWord packing, pick 40. + let meta = MetaWord::new(5, 3, 200, 150, 40); + let row = m.project(&bus, meta); + + assert_eq!(row.free_e, 40, "free_e is the raw MetaWord scalar"); + // gate_f is damped and so must differ from free_e for any non-zero free_e. + assert_ne!( + row.gate_f, row.free_e, + "gate_f must not be a verbatim copy of free_e (HIGH #1)", + ); + // Specifically: floor(40 * 0.5) == 20. + assert_eq!( + row.gate_f, 20, + "gate_f should equal free_e * GATE_DAMPING_FACTOR (0.5)", + ); + } + + /// MEDIUM #2: Acquire/Release pairs make writer state visible to readers. + /// One thread writes via set_faculty_context, another reads via project. + #[test] + fn atomics_acquire_release_visible_across_threads() { + use std::sync::Arc as StdArc; + use std::thread; + + let m = StdArc::new(LanceMembrane::new()); + let intent = ExternalIntent::seed(ExternalRole::Rag, make_dn(), vec![]); + m.ingest(intent); + + // Writer thread flips rationale_phase to true via set_faculty_context. + let writer = { + let m = StdArc::clone(&m); + thread::spawn(move || { + m.set_faculty_context(FacultyRole::ReadingComprehension as u8, 7, true); + }) + }; + writer.join().expect("writer thread joined"); + + // Reader thread observes the row via project. + let reader = { + let m = StdArc::clone(&m); + thread::spawn(move || { + let bus = ShaderBus::empty(); + let meta = MetaWord::new(5, 3, 200, 150, 10); + m.project(&bus, meta) + }) + }; + let row = reader.join().expect("reader thread joined"); + + // Release on the writer side + Acquire on the reader side + // establishes happens-before — we must observe the post-write state. + assert!(row.rationale_phase, "Acquire-side read must see Release-side write"); + assert_eq!(row.faculty_role, FacultyRole::ReadingComprehension as u8); + assert_eq!(row.expert_id, 7); + } + + /// MEDIUM #3: ExternalRole today fits in u8 — debug_assert holds and the + /// cast yields the discriminant. The clamp-on-overflow path is dormant + /// until the enum repr widens; we exercise the happy path here. + #[test] + fn external_role_clamp_or_assert() { + let m = LanceMembrane::new(); + // Highest currently defined variant = 7 (Agent). Well within u8. + let intent = ExternalIntent::seed(ExternalRole::Agent, make_dn(), vec![]); + m.ingest(intent); + + let actor = m.current_actor.read().unwrap(); + assert_eq!(actor.role, ExternalRole::Agent as u8); + assert!(actor.role <= u8::MAX, "ExternalRole stays clamped within u8"); + } + + /// E2: seal() smoke test — empty registry seals without error. + #[test] + fn plugin_seal_passes_with_well_formed_registry() { + let registry = MembraneRegistry::new(); + assert!( + registry.seal().is_ok(), + "empty registry should seal cleanly", + ); + + // And via the builder chain on the membrane. + let m = LanceMembrane::new().with_registry(MembraneRegistry::new()); + let r = m.registry().expect("registry installed"); + assert!(r.seal().is_ok(), "seal through with_registry chain"); + } } diff --git a/crates/lance-graph-callcenter/src/postgrest.rs b/crates/lance-graph-callcenter/src/postgrest.rs index 54106917..2b611a22 100644 --- a/crates/lance-graph-callcenter/src/postgrest.rs +++ b/crates/lance-graph-callcenter/src/postgrest.rs @@ -7,6 +7,11 @@ //! META-AGENT (follow-up wiring, do NOT do here): //! - gate behind feature `postgrest`; add `pub mod postgrest;` to lib.rs; //! - add `postgrest = ["dep:serde", "dep:serde_json"]` to Cargo.toml [features]. +//! - epiphany E4 outlook (PR #278): add an OPTIONAL feature +//! `datafusion-dispatch = ["dep:datafusion"]` that compiles the stub +//! `parsed_query_to_plan` at the bottom of this file into a real +//! PostgREST → DataFusion → RlsRewriter dispatcher. Keep it optional +//! so the zero-dep posture of the crate is preserved by default. //! //! This module is dependency-free: it uses only `std`, and emits JSON via //! a small hand-rolled writer so it compiles under the crate's default @@ -188,8 +193,25 @@ pub struct ParseError { } impl ParseError { - fn new(msg: impl Into) -> Self { - Self { message: msg.into() } + /// Construct a `ParseError` from any string-ish value. + /// + /// Public so downstream crates (e.g. an upcoming `PostgRestDispatcher` + /// that wires PostgREST → DataFusion → RlsRewriter) can surface their + /// own parse-time errors through this same type. + pub fn new(message: impl Into) -> Self { + Self { message: message.into() } + } +} + +impl From for ParseError { + fn from(message: String) -> Self { + Self { message } + } +} + +impl From<&str> for ParseError { + fn from(message: &str) -> Self { + Self { message: message.to_string() } } } @@ -201,6 +223,51 @@ impl std::fmt::Display for ParseError { impl std::error::Error for ParseError {} +// ── URL decoding (HIGH) ────────────────────────────────────────────────────── + +/// Percent-decode a URL component. Handles `%XX` and `+` → space (form-encoding). +/// Returns None if malformed (incomplete %XX or invalid hex). +fn percent_decode(s: &str) -> Option { + let mut out = Vec::with_capacity(s.len()); + let bytes = s.as_bytes(); + let mut i = 0; + while i < bytes.len() { + match bytes[i] { + b'+' => { + out.push(b' '); + i += 1; + } + b'%' if i + 2 < bytes.len() => { + let hex = std::str::from_utf8(&bytes[i + 1..i + 3]).ok()?; + let v = u8::from_str_radix(hex, 16).ok()?; + out.push(v); + i += 3; + } + b'%' => return None, // truncated escape + c => { + out.push(c); + i += 1; + } + } + } + String::from_utf8(out).ok() +} + +// ── Table-name validation (MEDIUM) ─────────────────────────────────────────── + +/// Validate a PostgREST table name: ASCII alphanumeric + underscore, not +/// starting with `_` (rejects magic / reserved names), not empty. +/// +/// This rejects path-traversal (`..`), dotted paths (`users.json`), and any +/// non-ASCII identifier. PostgREST's real surface is more permissive (quoted +/// identifiers can contain anything), but the SMB+MedCare subset only ever +/// uses simple snake_case names; tighter validation is the safer default. +fn is_valid_table_name(s: &str) -> bool { + !s.is_empty() + && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') + && !s.starts_with('_') +} + // ── Path parser ────────────────────────────────────────────────────────────── /// Pure function — no I/O. Parses path query string into [`ParsedQuery`]. @@ -230,6 +297,14 @@ pub fn parse_path(path: &str) -> Result { ))); } + // Reject path-traversal (`..`), dotted paths (`users.json`), magic + // names (`_internal`), and any non-ASCII-identifier table label. + if !is_valid_table_name(table_part) { + return Err(ParseError::new(format!( + "invalid table name: {table_part}" + ))); + } + let mut parsed = ParsedQuery { table: table_part.to_string(), ..Default::default() @@ -261,8 +336,18 @@ pub fn parse_path(path: &str) -> Result { } match key { - "select" => parsed.select = Some(value.to_string()), - "order" => parsed.order = Some(value.to_string()), + "select" => { + let decoded = percent_decode(value).ok_or_else(|| { + ParseError::new(format!("malformed urlencoding in select: {value}")) + })?; + parsed.select = Some(decoded); + } + "order" => { + let decoded = percent_decode(value).ok_or_else(|| { + ParseError::new(format!("malformed urlencoding in order: {value}")) + })?; + parsed.order = Some(decoded); + } "limit" => { parsed.limit = Some(value.parse::().map_err(|_| { ParseError::new(format!("bad limit: {value}")) @@ -286,10 +371,19 @@ pub fn parse_path(path: &str) -> Result { let op = FilterOp::parse(op_str).ok_or_else(|| { ParseError::new(format!("unknown filter op: {op_str}")) })?; + // URL-decode AFTER `op.` split so e.g. `eq.foo%40bar.com` + // yields `foo@bar.com` (and not a confused dot-split on a + // decoded `.`). Op tokens themselves are ASCII and never + // need decoding. + let decoded = percent_decode(val).ok_or_else(|| { + ParseError::new(format!( + "malformed urlencoding in filter value: {val}" + )) + })?; parsed.filters.push(Filter { column: key.to_string(), op, - value: val.to_string(), + value: decoded, }); } } @@ -400,6 +494,11 @@ fn push_optional_string(out: &mut String, v: Option<&str>) { } /// Encode a Rust `&str` as a JSON string literal (with surrounding quotes). +/// +/// Supplementary-plane characters (U+10000 and above) are emitted as a +/// UTF-16 surrogate pair `\uXXXX\uXXXX` per RFC 8259 §7. BMP characters +/// pass through as-is (UTF-8 in the JSON text is fine; we only escape +/// when we have to). fn json_string(s: &str) -> String { let mut out = String::with_capacity(s.len() + 2); out.push('"'); @@ -415,6 +514,15 @@ fn json_string(s: &str) -> String { c if (c as u32) < 0x20 => { out.push_str(&format!("\\u{:04x}", c as u32)); } + c if (c as u32) > 0xFFFF => { + // Supplementary plane: encode as UTF-16 surrogate pair. + // high = 0xD800 + ((cp - 0x10000) >> 10) + // low = 0xDC00 + ((cp - 0x10000) & 0x3FF) + let cp = c as u32 - 0x10000; + let high = 0xD800 + (cp >> 10); + let low = 0xDC00 + (cp & 0x3FF); + out.push_str(&format!("\\u{:04x}\\u{:04x}", high, low)); + } c => out.push(c), } } @@ -656,4 +764,191 @@ mod tests { assert!(s.contains("\"offset\":null"), "{s}"); assert!(s.contains("\"filters\":[]"), "{s}"); } + + // ── Filter-op coverage (LOW) — ilike / in / is / like ──────────────── + + #[test] + fn parse_ilike_filter() { + let q = parse_path("users?email=ilike.*@example.com").expect("parse ok"); + assert_eq!(q.table, "users"); + assert_eq!(q.filters.len(), 1); + let f = &q.filters[0]; + assert_eq!(f.column, "email"); + assert_eq!(f.op, FilterOp::ILike); + assert_eq!(f.value, "*@example.com"); + } + + #[test] + fn parse_in_filter() { + let q = parse_path("users?id=in.(1,2,3)").expect("parse ok"); + assert_eq!(q.filters.len(), 1); + let f = &q.filters[0]; + assert_eq!(f.column, "id"); + assert_eq!(f.op, FilterOp::In); + assert_eq!(f.value, "(1,2,3)"); + } + + #[test] + fn parse_is_null_filter() { + let q = parse_path("users?status=is.null").expect("parse ok"); + let f = &q.filters[0]; + assert_eq!(f.column, "status"); + assert_eq!(f.op, FilterOp::Is); + assert_eq!(f.value, "null"); + } + + #[test] + fn parse_like_filter() { + let q = parse_path("users?name=like.J*").expect("parse ok"); + let f = &q.filters[0]; + assert_eq!(f.column, "name"); + assert_eq!(f.op, FilterOp::Like); + assert_eq!(f.value, "J*"); + } + + // ── URL-decode tests (HIGH) ────────────────────────────────────────── + + #[test] + fn parse_url_decode_space() { + let q = parse_path("users?name=eq.John%20Doe").expect("parse ok"); + assert_eq!(q.filters[0].value, "John Doe"); + } + + #[test] + fn parse_url_decode_at_sign() { + let q = parse_path("users?email=eq.foo%40bar.com").expect("parse ok"); + assert_eq!(q.filters[0].value, "foo@bar.com"); + } + + #[test] + fn parse_url_decode_plus_sign() { + // %2B → '+' (literal plus, NOT a space — that's only bare `+`). + let q = parse_path("users?notes=eq.Hello%2BWorld").expect("parse ok"); + assert_eq!(q.filters[0].value, "Hello+World"); + } + + #[test] + fn parse_url_decode_bare_plus_is_space() { + // form-encoding convention: bare `+` decodes to space. + let q = parse_path("users?name=eq.John+Doe").expect("parse ok"); + assert_eq!(q.filters[0].value, "John Doe"); + } + + #[test] + fn parse_url_decode_select_and_order() { + let q = parse_path("users?select=id%2Cname&order=created.desc") + .expect("parse ok"); + assert_eq!(q.select.as_deref(), Some("id,name")); + assert_eq!(q.order.as_deref(), Some("created.desc")); + } + + #[test] + fn parse_url_decode_truncated_escape_errors() { + let err = parse_path("users?name=eq.foo%2").expect_err("must error"); + assert!(err.message.contains("malformed urlencoding"), "{}", err.message); + } + + #[test] + fn parse_url_decode_bad_hex_errors() { + let err = parse_path("users?name=eq.foo%ZZbar").expect_err("must error"); + assert!(err.message.contains("malformed urlencoding"), "{}", err.message); + } + + // ── Table-validation tests (MEDIUM) ────────────────────────────────── + + #[test] + fn parse_table_traversal_errors() { + // `..` contains `.`, plus `/` → caught by either nested-path or + // invalid-table-name guard. Either error message is acceptable. + let err = parse_path("../../../etc/passwd").expect_err("must error"); + assert!( + err.message.contains("nested path") || err.message.contains("invalid table name"), + "{}", + err.message + ); + } + + #[test] + fn parse_table_with_period_errors() { + let err = parse_path("users.json?id=eq.1").expect_err("must error"); + assert!(err.message.contains("invalid table name"), "{}", err.message); + } + + #[test] + fn parse_table_underscore_prefix_errors() { + let err = parse_path("_internal?id=eq.1").expect_err("must error"); + assert!(err.message.contains("invalid table name"), "{}", err.message); + } + + #[test] + fn parse_table_unicode_errors() { + let err = parse_path("ünits?id=eq.1").expect_err("must error"); + assert!(err.message.contains("invalid table name"), "{}", err.message); + } + + #[test] + fn parse_table_leading_slash_still_works() { + // Existing behaviour preserved. + let q = parse_path("/users").expect("parse ok"); + assert_eq!(q.table, "users"); + } + + // ── ParseError public surface ──────────────────────────────────────── + + #[test] + fn parse_error_public_constructors() { + let a = ParseError::new("a"); + let b: ParseError = "b".into(); + let c: ParseError = String::from("c").into(); + assert_eq!(a.message, "a"); + assert_eq!(b.message, "b"); + assert_eq!(c.message, "c"); + // Display impl is reachable. + assert!(format!("{a}").contains("a")); + } + + // ── Surrogate-pair JSON escaping (LOW) ─────────────────────────────── + + #[test] + fn json_string_supplementary_plane_emits_surrogate_pair() { + // U+1F600 GRINNING FACE → 😀 + let out = json_string("\u{1F600}"); + assert_eq!(out, "\"\\ud83d\\ude00\"", "{out}"); + } + + #[test] + fn json_string_bmp_passes_through() { + // BMP chars are emitted as UTF-8, not escaped. + let out = json_string("café"); + assert_eq!(out, "\"café\""); + } +} + +// ── EPIPHANY E4 SEED — PostgREST → DataFusion dispatch stub ────────────────── +// +// Doc-only stub for the upcoming `PostgRestDispatcher` (PR #278 outlook E4) +// that wires PostgREST → DataFusion → RlsRewriter. Compiled only under the +// optional `datafusion-dispatch` feature so the zero-dep default posture of +// the crate is preserved. +// +// META-AGENT (follow-up wiring, do NOT do here): +// - add `datafusion-dispatch = ["dep:datafusion"]` to Cargo.toml [features]; +// - add `datafusion = { version = "...", optional = true }` to dependencies; +// - flesh out `parsed_query_to_plan` to translate a `ParsedQuery` into a +// `LogicalPlan` (TableScan → Filter → Projection → Sort → Limit), then +// hand it to `crate::rls::RlsRewriter` for row-level-security overlay. + +/// Convert a [`ParsedQuery`] to a DataFusion `LogicalPlan`. +/// +/// **NOT IMPLEMENTED** in this PR. Stub interface for the upcoming +/// `PostgRestDispatcher` that wires PostgREST → DataFusion → RlsRewriter. +/// See PR #278 outlook epiphany E4. +#[cfg(feature = "datafusion-dispatch")] +pub fn parsed_query_to_plan( + _query: &ParsedQuery, + _ctx: &datafusion::execution::context::SessionContext, +) -> Result { + Err(ParseError::new( + "parsed_query_to_plan: not yet implemented (PR #278 outlook E4)", + )) } diff --git a/crates/lance-graph-callcenter/src/rls.rs b/crates/lance-graph-callcenter/src/rls.rs index fd575f6a..b706168e 100644 --- a/crates/lance-graph-callcenter/src/rls.rs +++ b/crates/lance-graph-callcenter/src/rls.rs @@ -15,9 +15,13 @@ //! table name is registered in the [`RlsPolicyRegistry`], the //! configured tenant (and optional actor) predicates are AND-ed onto //! the scan's existing `filters` so downstream predicate-pushdown sees -//! them. Tables NOT in the registry are left untouched — fail-open -//! for unprivileged-but-non-secret data, fail-closed must be enforced -//! by registering a policy. +//! them. +//! - Tables NOT in the registry are handled per the registry's +//! [`RegistryMode`]. The default ([`RegistryMode::Sealed`]) rejects +//! the plan with a `DataFusionError::Plan` — this is the +//! deny-by-default contract from `foundry-roadmap.md` § 42. +//! Legacy / public data must opt in via +//! [`RlsPolicyRegistry::fail_open`] with an audit reason. //! //! # Predicate shape //! @@ -43,7 +47,7 @@ use std::collections::HashMap; use std::sync::Arc; use datafusion::common::tree_node::Transformed; -use datafusion::common::Result as DFResult; +use datafusion::common::{DataFusionError, Result as DFResult}; use datafusion::logical_expr::{col, lit, Expr, LogicalPlan}; use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; @@ -67,9 +71,62 @@ pub struct RlsContext { pub actor_id: String, } +/// Errors produced by the strict [`RlsContext::new`] constructor. +/// +/// Hand-rolled (no `thiserror` dependency) so the crate keeps its +/// minimal feature surface under `auth-rls-lite`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RlsError { + /// `tenant_id` was empty — RLS predicate would compare against + /// `''` and silently match nothing, which is worse than failing. + EmptyTenantId, + /// `actor_id` was empty — same hazard. Use [`RlsContext::new_unchecked`] + /// for legitimate system-actor cases (and audit-log the call site). + EmptyActorId, +} + +impl std::fmt::Display for RlsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::EmptyTenantId => f.write_str("tenant_id must not be empty"), + Self::EmptyActorId => f.write_str("actor_id must not be empty"), + } + } +} + +impl std::error::Error for RlsError {} + impl RlsContext { - /// Construct from owned strings. - pub fn new(tenant_id: impl Into, actor_id: impl Into) -> Self { + /// Construct from owned strings, validating that neither id is + /// empty. Empty ids would otherwise produce predicates of the + /// form `tenant_id = ''` which silently match nothing — a + /// failure mode that hides bugs instead of surfacing them. + pub fn new( + tenant_id: impl Into, + actor_id: impl Into, + ) -> Result { + let tenant_id = tenant_id.into(); + let actor_id = actor_id.into(); + if tenant_id.is_empty() { + return Err(RlsError::EmptyTenantId); + } + if actor_id.is_empty() { + return Err(RlsError::EmptyActorId); + } + Ok(Self { + tenant_id, + actor_id, + }) + } + + /// Legacy-permissive constructor — allows empty ids (e.g. for + /// "system" contexts that operate without an actor). MUST be + /// paired with an audit-log entry showing why the empty id is + /// safe in this call site. + pub fn new_unchecked( + tenant_id: impl Into, + actor_id: impl Into, + ) -> Self { Self { tenant_id: tenant_id.into(), actor_id: actor_id.into(), @@ -117,22 +174,85 @@ impl RlsPolicy { } } +/// Operating mode of an [`RlsPolicyRegistry`]. +/// +/// The default is `Sealed`: any `TableScan` of an unregistered table +/// is rejected with a `DataFusionError::Plan`. This matches the +/// deny-by-default contract in `foundry-roadmap.md` § 42 — RLS is +/// useless if the easy path is "forget to register a policy and read +/// every tenant's data." +/// +/// `FailOpen` is an explicit opt-in for legacy / non-tenanted data +/// (public lookup tables, system catalogs). The `reason` field exists +/// so the audit trail is grep-able: every `FailOpen { reason: "..." }` +/// site documents *why* fail-open is the right choice there. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum RegistryMode { + /// Default: any `TableScan` of an unregistered table → `DataFusionError`. + /// Matches the deny-by-default contract in `foundry-roadmap.md` §42. + #[default] + Sealed, + /// Explicit opt-in for legacy / non-tenanted data. Requires an + /// audit trail (the `reason` is logged / grep-able). + FailOpen { + /// Human-readable justification for opening this registry. + /// E.g. `"legacy public lookup"`. Static lifetime so the + /// reason cannot be silently overwritten at runtime. + reason: &'static str, + }, +} + /// Registry mapping table names to their RLS policy. /// -/// Tables not registered here are passed through unchanged by the -/// rewriter. Construct once at session/membrane start and share via -/// `Arc` across the optimizer rule. +/// In `Sealed` mode (the default), tables not registered here cause +/// the rewriter to fail-closed: the plan is rejected. In `FailOpen` +/// mode, unregistered tables are passed through unchanged. Construct +/// once at session/membrane start and share via `Arc` across the +/// optimizer rule. #[derive(Debug, Default, Clone)] pub struct RlsPolicyRegistry { policies: HashMap, + mode: RegistryMode, } impl RlsPolicyRegistry { - /// New empty registry. + /// New empty registry in [`RegistryMode::Sealed`] mode (default). pub fn new() -> Self { Self::default() } + /// Build a registry that fails closed on unregistered tables. + /// Equivalent to `RlsPolicyRegistry::default()` but spells the + /// intent at the call site. + pub fn sealed() -> Self { + Self { + mode: RegistryMode::Sealed, + ..Default::default() + } + } + + /// Build a registry that passes unregistered tables through. The + /// `reason` is recorded in the [`RegistryMode::FailOpen`] variant + /// for audit grep — pick something specific + /// (`"legacy public lookup"`, `"bootstrap migration window"`). + pub fn fail_open(reason: &'static str) -> Self { + Self { + mode: RegistryMode::FailOpen { reason }, + ..Default::default() + } + } + + /// Current mode. + pub fn mode(&self) -> RegistryMode { + self.mode + } + + /// Builder-style mode override. + pub fn with_mode(mut self, mode: RegistryMode) -> Self { + self.mode = mode; + self + } + /// Register a policy keyed on its `table_name`. Replaces any /// previous policy for the same table. pub fn register(&mut self, policy: RlsPolicy) { @@ -166,10 +286,13 @@ impl RlsPolicyRegistry { #[derive(Debug)] pub struct RlsRewriter { /// Identity envelope to inject. Cloned per scan into literals. - pub ctx: RlsContext, + /// Private — read via [`RlsRewriter::ctx`] to keep callers from + /// mutating live RLS state behind the rewriter's back. + ctx: RlsContext, /// Per-table policy registry. Shared via `Arc` so the rule is /// cheap to clone if the optimizer demands `Send + Sync` ownership. - pub registry: Arc, + /// Private — read via [`RlsRewriter::registry`]. + registry: Arc, } impl RlsRewriter { @@ -178,6 +301,16 @@ impl RlsRewriter { Self { ctx, registry } } + /// Borrow the [`RlsContext`] this rewriter injects. + pub fn ctx(&self) -> &RlsContext { + &self.ctx + } + + /// Borrow the shared [`RlsPolicyRegistry`]. + pub fn registry(&self) -> &Arc { + &self.registry + } + /// Build the predicate `Expr` for a given policy. /// /// Returns `None` when neither `tenant_column` nor `actor_column` @@ -223,8 +356,23 @@ impl OptimizerRule for RlsRewriter { LogicalPlan::TableScan(mut scan) => { let table_name = scan.table_name.table().to_string(); let Some(policy) = self.registry.lookup(&table_name) else { - // No policy for this table → leave it alone. - return Ok(Transformed::no(LogicalPlan::TableScan(scan))); + // No policy for this table — branch on the + // registry mode. Sealed (default) refuses to + // execute the plan; FailOpen passes through. + return match self.registry.mode() { + RegistryMode::Sealed => Err(DataFusionError::Plan(format!( + "RLS sealed registry: no policy for table '{}'. \ + Register a policy or use \ + RlsPolicyRegistry::fail_open(...) explicitly.", + table_name + ))), + RegistryMode::FailOpen { reason: _ } => { + // Pass-through preserved exactly for + // FailOpen mode. (Logging hook would land + // here once a tracing facility is wired.) + Ok(Transformed::no(LogicalPlan::TableScan(scan))) + } + }; }; let Some(predicate) = self.build_predicate(policy) else { // Degenerate policy with no columns → no-op. @@ -245,11 +393,12 @@ impl OptimizerRule for RlsRewriter { #[cfg(any(feature = "auth-jwt", feature = "auth", feature = "full"))] impl From<&lance_graph_contract::auth::ActorContext> for RlsContext { + /// Lossy on purpose — `ActorContext` may carry fields RLS does + /// not consume. Uses [`RlsContext::new_unchecked`] because the + /// ActorContext invariants already enforce non-empty fields + /// upstream (see `lance_graph_contract::auth`). fn from(actor: &lance_graph_contract::auth::ActorContext) -> Self { - Self { - tenant_id: actor.tenant_id.to_string(), - actor_id: actor.actor_id.clone(), - } + Self::new_unchecked(actor.tenant_id.to_string(), actor.actor_id.clone()) } } @@ -324,7 +473,7 @@ mod tests { let mut reg = RlsPolicyRegistry::new(); reg.register(RlsPolicy::tenant_only("customers", "tenant_id")); - let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1"), Arc::new(reg)); + let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1").expect("non-empty ids"), Arc::new(reg)); let rewritten = apply(plan, &rewriter); let s = ps(&rewritten); @@ -347,7 +496,7 @@ mod tests { "tenant_id", "actor_id", )); - let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1"), Arc::new(reg)); + let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1").expect("non-empty ids"), Arc::new(reg)); let rewritten = apply(plan, &rewriter); let s = ps(&rewritten); @@ -375,7 +524,7 @@ mod tests { let mut reg = RlsPolicyRegistry::new(); reg.register(RlsPolicy::tenant_only("customers", "tenant_id")); reg.register(RlsPolicy::tenant_only("orders", "tenant_id")); - let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1"), Arc::new(reg)); + let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1").expect("non-empty ids"), Arc::new(reg)); let rewritten = apply(plan, &rewriter); let s = ps(&rewritten); @@ -391,10 +540,12 @@ mod tests { let plan = public_scan("public_lookup"); let original = ps(&plan); - let mut reg = RlsPolicyRegistry::new(); + // Opt into FailOpen: this test exercises the legacy + // pass-through path on an unregistered public table. + let mut reg = RlsPolicyRegistry::fail_open("legacy public lookup"); // Policy for a DIFFERENT table — `public_lookup` has none. reg.register(RlsPolicy::tenant_only("customers", "tenant_id")); - let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1"), Arc::new(reg)); + let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1").expect("non-empty ids"), Arc::new(reg)); let rewritten = apply(plan, &rewriter); let s = ps(&rewritten); @@ -420,7 +571,7 @@ mod tests { "tenant_id", "actor_id", )); - let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1"), Arc::new(reg)); + let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1").expect("non-empty ids"), Arc::new(reg)); let rewritten = apply(plan, &rewriter); let s = ps(&rewritten); @@ -454,19 +605,21 @@ mod tests { #[test] fn rewriter_name_and_apply_order() { let reg = Arc::new(RlsPolicyRegistry::new()); - let r = RlsRewriter::new(RlsContext::new("t", "u"), reg); + let r = RlsRewriter::new(RlsContext::new("t", "u").expect("non-empty ids"), reg); assert_eq!(r.name(), "rls_rewriter"); assert!(matches!(r.apply_order(), Some(ApplyOrder::TopDown))); } - /// Sanity: an empty registry leaves every scan untouched. + /// Sanity: an empty FailOpen registry leaves every scan untouched. + /// (An empty Sealed registry would error; see + /// `sealed_registry_errors_on_unregistered_table`.) #[test] fn empty_registry_is_a_no_op() { let plan = rls_scan("customers"); let original = ps(&plan); let rewriter = RlsRewriter::new( - RlsContext::new("t1", "u1"), - Arc::new(RlsPolicyRegistry::new()), + RlsContext::new("t1", "u1").expect("non-empty ids"), + Arc::new(RlsPolicyRegistry::fail_open("test fixture")), ); let rewritten = apply(plan, &rewriter); assert_eq!(ps(&rewritten), original); @@ -485,7 +638,7 @@ mod tests { "tenant_id", "actor_id", )); - let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1"), Arc::new(reg)); + let rewriter = RlsRewriter::new(RlsContext::new("t1", "u1").expect("non-empty ids"), Arc::new(reg)); let rewritten = apply(plan, &rewriter); let after: DFSchema = rewritten.schema().as_ref().clone(); assert_eq!(before, after, "rewriter must not change scan schema"); @@ -500,4 +653,128 @@ mod tests { assert_eq!(ctx.tenant_id, "42"); assert_eq!(ctx.actor_id, "user@example.com"); } + + // ── Round-2 hardening tests ────────────────────────────────────────── + + /// Sealed registry (the default) must reject any TableScan whose + /// table is not registered. This is the deny-by-default contract. + #[test] + fn sealed_registry_errors_on_unregistered_table() { + let plan = rls_scan("customers"); + // Default (Sealed) registry, no policies — `customers` is + // unregistered. + let rewriter = RlsRewriter::new( + RlsContext::new("t1", "u1").expect("non-empty ids"), + Arc::new(RlsPolicyRegistry::new()), + ); + let cfg = OptimizerContext::new(); + let result = plan.transform_down(|n| rewriter.rewrite(n, &cfg)); + assert!( + matches!(result, Err(DataFusionError::Plan(_))), + "expected DataFusionError::Plan from sealed registry on unregistered scan, got {result:?}" + ); + } + + /// FailOpen mode is the explicit opt-in for legacy / public + /// data — unregistered scans pass through unchanged. + #[test] + fn fail_open_registry_passes_through_unregistered() { + let plan = public_scan("public_lookup"); + let original = ps(&plan); + + let reg = RlsPolicyRegistry::fail_open("legacy public lookup"); + assert!(matches!( + reg.mode(), + RegistryMode::FailOpen { reason: "legacy public lookup" } + )); + let rewriter = RlsRewriter::new( + RlsContext::new("t1", "u1").expect("non-empty ids"), + Arc::new(reg), + ); + + let rewritten = apply(plan, &rewriter); + assert_eq!(ps(&rewritten), original); + } + + /// `RlsContext::new` rejects empty tenant_id and empty actor_id. + /// Both branches must surface as distinct `RlsError` variants. + #[test] + fn rls_context_new_rejects_empty() { + assert_eq!( + RlsContext::new("", "u1").err(), + Some(RlsError::EmptyTenantId) + ); + assert_eq!( + RlsContext::new("t1", "").err(), + Some(RlsError::EmptyActorId) + ); + // Both empty: tenant is checked first. + assert_eq!(RlsContext::new("", "").err(), Some(RlsError::EmptyTenantId)); + // Non-empty pair succeeds. + let ok = RlsContext::new("t1", "u1").expect("valid"); + assert_eq!(ok.tenant_id, "t1"); + assert_eq!(ok.actor_id, "u1"); + // new_unchecked deliberately bypasses validation. + let unchecked = RlsContext::new_unchecked("", ""); + assert!(unchecked.tenant_id.is_empty()); + assert!(unchecked.actor_id.is_empty()); + } + + /// A degenerate policy with neither tenant_column nor actor_column + /// produces no predicate — the scan is left untouched. + #[test] + fn degenerate_policy_both_columns_none() { + let plan = rls_scan("customers"); + let original = ps(&plan); + + let mut reg = RlsPolicyRegistry::new(); + reg.register(RlsPolicy { + table_name: "customers".to_string(), + tenant_column: None, + actor_column: None, + }); + let rewriter = RlsRewriter::new( + RlsContext::new("t1", "u1").expect("non-empty ids"), + Arc::new(reg), + ); + let rewritten = apply(plan, &rewriter); + let s = ps(&rewritten); + assert_eq!(s, original, "degenerate policy should be a no-op"); + assert!(!s.contains("tenant_id ="), "no tenant predicate expected: {s}"); + assert!(!s.contains("actor_id ="), "no actor predicate expected: {s}"); + } + + /// Registry mode default + builder helpers. + #[test] + fn registry_mode_defaults_to_sealed() { + assert_eq!(RegistryMode::default(), RegistryMode::Sealed); + let r = RlsPolicyRegistry::new(); + assert_eq!(r.mode(), RegistryMode::Sealed); + let r2 = RlsPolicyRegistry::sealed(); + assert_eq!(r2.mode(), RegistryMode::Sealed); + let r3 = RlsPolicyRegistry::fail_open("audit reason"); + assert!(matches!( + r3.mode(), + RegistryMode::FailOpen { reason: "audit reason" } + )); + let r4 = RlsPolicyRegistry::new() + .with_mode(RegistryMode::FailOpen { reason: "via with_mode" }); + assert!(matches!( + r4.mode(), + RegistryMode::FailOpen { reason: "via with_mode" } + )); + } + + /// The privatised fields are still readable through accessors. + #[test] + fn rewriter_accessors_expose_ctx_and_registry() { + let reg = Arc::new(RlsPolicyRegistry::sealed()); + let ctx = RlsContext::new("tenant-x", "actor-x").expect("valid"); + let r = RlsRewriter::new(ctx, Arc::clone(®)); + assert_eq!(r.ctx().tenant_id, "tenant-x"); + assert_eq!(r.ctx().actor_id, "actor-x"); + assert_eq!(r.registry().mode(), RegistryMode::Sealed); + // Same Arc instance. + assert!(Arc::ptr_eq(r.registry(), ®)); + } }