diff --git a/crates/lance-graph-callcenter/src/lib.rs b/crates/lance-graph-callcenter/src/lib.rs index 9f37af47..392f22b1 100644 --- a/crates/lance-graph-callcenter/src/lib.rs +++ b/crates/lance-graph-callcenter/src/lib.rs @@ -100,3 +100,10 @@ pub mod postgrest; // A3: in-memory ring buffer skeleton; Lance-backed writer arrives later. #[cfg(feature = "audit-log")] pub mod audit; + +// PR #278 outlook E1 — generalized PolicyRewriter trait (column masking, +// row encryption, differential privacy stubs) sharing the OptimizerRule slot +// with the existing RLS rewriter. Gated on auth-rls-lite (where the +// DataFusion types live). +#[cfg(any(feature = "auth-rls-lite", feature = "auth-rls", feature = "auth", feature = "full"))] +pub mod policy; diff --git a/crates/lance-graph-callcenter/src/policy.rs b/crates/lance-graph-callcenter/src/policy.rs new file mode 100644 index 00000000..7f5e819f --- /dev/null +++ b/crates/lance-graph-callcenter/src/policy.rs @@ -0,0 +1,309 @@ +//! Policy-layer rewriting framework. +//! +//! Generalization of `crate::rls::RlsRewriter`. The same DataFusion +//! `OptimizerRule` machinery used for tenant-predicate injection serves +//! column masking, row-level encryption, and differential-privacy noise +//! injection. Each policy type is a `PolicyRewriter` impl; they compose +//! via the optimizer rule chain. +//! +//! See PR #278 outlook epiphany E1 for the full motivation. +//! +//! META-AGENT: add `pub mod policy;` to lib.rs gated by `feature = "policy"`. +//! Default to including `policy` feature in `auth-rls-lite` (it's purely +//! additive). Suggested Cargo.toml entry: +//! +//! ```toml +//! policy = ["auth-rls-lite"] +//! ``` + +use std::sync::Arc; + +#[cfg(feature = "auth-rls-lite")] +use datafusion::common::tree_node::Transformed; +#[cfg(feature = "auth-rls-lite")] +use datafusion::common::Result as DFResult; +#[cfg(feature = "auth-rls-lite")] +use datafusion::logical_expr::LogicalPlan; +#[cfg(feature = "auth-rls-lite")] +use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; + +// ── Policy taxonomy ────────────────────────────────────────────────────────── + +/// Policy classification — what kind of transform a rewriter implements. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum PolicyKind { + /// Inject row-filter predicates (e.g. `tenant_id = 't1'`). + RowFilter, + /// Mask / redact / hash columns based on actor role. + ColumnMask, + /// Encrypt selected columns at rest using a key handle. + RowEncryption, + /// Inject differential-privacy noise into aggregate outputs. + DifferentialPrivacy, + /// Emit audit events (read-only side channel). + Audit, +} + +/// Generalized policy rewriter. Implementors transform a LogicalPlan and +/// declare their kind for ordering / introspection. +#[cfg(feature = "auth-rls-lite")] +pub trait PolicyRewriter: Send + Sync + std::fmt::Debug { + fn kind(&self) -> PolicyKind; + /// Stable name (e.g. "rls_rewriter", "column_mask"). Used by audit log. + fn name(&self) -> &'static str; + /// Rewrite predicate. Default = identity (subclasses override what they need). + fn rewrite_plan(&self, plan: LogicalPlan) -> DFResult> { + Ok(Transformed::no(plan)) + } +} + +// ── Column masking policy ──────────────────────────────────────────────────── + +/// Per-column redaction mode. Drives how `ColumnMaskRewriter` rewrites +/// referenced expressions in `Projection` nodes. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RedactionMode { + /// Replace with NULL. + Null, + /// Replace with a constant ("[REDACTED]"). + Constant, + /// Hash via FNV-64 (stable across builds). + Hash, + /// First-N-chars only (e.g. credit card last-4). + Truncate(usize), +} + +/// Column masking policy: redact / hash / mask values from selected columns +/// based on actor role. Stub UDF reference; concrete UDFs land in a follow-up. +#[derive(Debug, Clone)] +pub struct ColumnMaskPolicy { + /// Table whose columns this policy applies to. + pub table_name: String, + /// Per-column redaction mode. Missing column = unmasked. + pub columns: std::collections::HashMap, +} + +#[derive(Debug, Default, Clone)] +pub struct ColumnMaskRegistry { + policies: std::collections::HashMap, +} + +impl ColumnMaskRegistry { + pub fn new() -> Self { + Self::default() + } + pub fn register(&mut self, policy: ColumnMaskPolicy) { + self.policies.insert(policy.table_name.clone(), policy); + } + pub fn lookup(&self, table_name: &str) -> Option<&ColumnMaskPolicy> { + self.policies.get(table_name) + } +} + +#[cfg(feature = "auth-rls-lite")] +#[derive(Debug)] +pub struct ColumnMaskRewriter { + pub registry: Arc, + pub actor_role: String, +} + +#[cfg(feature = "auth-rls-lite")] +impl PolicyRewriter for ColumnMaskRewriter { + fn kind(&self) -> PolicyKind { + PolicyKind::ColumnMask + } + fn name(&self) -> &'static str { + "column_mask" + } + fn rewrite_plan(&self, plan: LogicalPlan) -> DFResult> { + // Walk plan; on Projection, rewrite expressions for redacted columns. + // For this PR ship the structural skeleton; the actual UDF wrap lands + // in a follow-up once redaction UDFs are registered. + // TODO: wrap Expr::Column(c) in mask_udf(...) for c in policy.columns + Ok(Transformed::no(plan)) + } +} + +#[cfg(feature = "auth-rls-lite")] +impl OptimizerRule for ColumnMaskRewriter { + fn name(&self) -> &str { + "column_mask_rewriter" + } + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + fn supports_rewrite(&self) -> bool { + true + } + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> DFResult> { + self.rewrite_plan(plan) + } +} + +// ── Row encryption policy (stub, no executor yet) ──────────────────────────── + +/// Row encryption policy: encrypt selected columns at rest using a key +/// handle. The actual cipher binding lands in a follow-up; this carries +/// the per-column key-handle association so the registry surface is stable. +#[derive(Debug, Clone)] +pub struct RowEncryptionPolicy { + /// Table whose columns this policy applies to. + pub table_name: String, + /// Per-column key handle. Missing column = unencrypted. + pub columns: std::collections::HashMap, +} + +/// Opaque key handle. Resolves through a downstream KMS / keyring service. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct KeyHandle(pub String); + +#[derive(Debug, Default, Clone)] +pub struct RowEncryptionRegistry { + policies: std::collections::HashMap, +} + +impl RowEncryptionRegistry { + pub fn new() -> Self { + Self::default() + } + pub fn register(&mut self, policy: RowEncryptionPolicy) { + self.policies.insert(policy.table_name.clone(), policy); + } + pub fn lookup(&self, table_name: &str) -> Option<&RowEncryptionPolicy> { + self.policies.get(table_name) + } +} + +// ── Differential-privacy policy (stub, no executor yet) ────────────────────── + +/// DP noise mechanism. Drives the noise distribution used when the +/// rewriter wraps aggregate outputs. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum DpMechanism { + /// Laplace mechanism — calibrated to ε. + Laplace, + /// Gaussian mechanism — calibrated to (ε, δ). + Gaussian, +} + +/// Differential-privacy policy. Carries the privacy budget and noise +/// mechanism; the rewriter wraps SUM/COUNT/AVG aggregates with calibrated +/// noise injection (follow-up PR). +#[derive(Debug, Clone)] +pub struct DifferentialPrivacyPolicy { + /// Table this policy applies to. + pub table_name: String, + /// Privacy budget. Smaller ε = more noise = stronger privacy. + pub epsilon: f64, + /// Noise mechanism. + pub mechanism: DpMechanism, +} + +#[derive(Debug, Default, Clone)] +pub struct DifferentialPrivacyRegistry { + policies: std::collections::HashMap, +} + +impl DifferentialPrivacyRegistry { + pub fn new() -> Self { + Self::default() + } + pub fn register(&mut self, policy: DifferentialPrivacyPolicy) { + self.policies.insert(policy.table_name.clone(), policy); + } + pub fn lookup(&self, table_name: &str) -> Option<&DifferentialPrivacyPolicy> { + self.policies.get(table_name) + } +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use std::collections::HashSet; + + #[test] + fn column_mask_registry_register_lookup() { + let mut registry = ColumnMaskRegistry::new(); + let mut columns = HashMap::new(); + columns.insert("ssn".to_string(), RedactionMode::Hash); + columns.insert("card".to_string(), RedactionMode::Truncate(4)); + registry.register(ColumnMaskPolicy { + table_name: "customers".to_string(), + columns, + }); + + let policy = registry.lookup("customers").expect("policy registered"); + assert_eq!(policy.table_name, "customers"); + assert_eq!(policy.columns.get("ssn"), Some(&RedactionMode::Hash)); + assert_eq!( + policy.columns.get("card"), + Some(&RedactionMode::Truncate(4)) + ); + assert!(registry.lookup("missing").is_none()); + } + + #[test] + fn redaction_mode_variants_distinct() { + // Each variant is a distinct value; equality is structural. + assert_ne!(RedactionMode::Null, RedactionMode::Constant); + assert_ne!(RedactionMode::Hash, RedactionMode::Null); + assert_ne!(RedactionMode::Truncate(4), RedactionMode::Truncate(8)); + assert_eq!(RedactionMode::Truncate(4), RedactionMode::Truncate(4)); + } + + #[cfg(feature = "auth-rls-lite")] + #[test] + fn column_mask_rewriter_kind_is_column_mask() { + let rewriter = ColumnMaskRewriter { + registry: Arc::new(ColumnMaskRegistry::new()), + actor_role: "analyst".to_string(), + }; + assert_eq!(rewriter.kind(), PolicyKind::ColumnMask); + assert_eq!(::name(&rewriter), "column_mask"); + } + + #[test] + fn policy_kind_is_hashable_for_dispatch() { + // PolicyKind being Hash + Eq lets a registry dispatch + // rewriters by kind in a HashSet/HashMap. Smoke-test the trait + // bounds by inserting into a HashSet. + let mut set: HashSet = HashSet::new(); + set.insert(PolicyKind::RowFilter); + set.insert(PolicyKind::ColumnMask); + set.insert(PolicyKind::RowEncryption); + set.insert(PolicyKind::DifferentialPrivacy); + set.insert(PolicyKind::Audit); + // Inserting a duplicate should be a no-op. + assert!(!set.insert(PolicyKind::ColumnMask)); + assert_eq!(set.len(), 5); + } + + #[cfg(feature = "auth-rls-lite")] + #[test] + fn column_mask_rewriter_passes_through_for_now() { + use datafusion::logical_expr::{EmptyRelation, LogicalPlan}; + use std::sync::Arc as StdArc; + + let rewriter = ColumnMaskRewriter { + registry: Arc::new(ColumnMaskRegistry::new()), + actor_role: "analyst".to_string(), + }; + let plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: StdArc::new(datafusion::common::DFSchema::empty()), + }); + let transformed = rewriter + .rewrite_plan(plan) + .expect("rewrite should succeed"); + // Skeleton implementation — should be a no-op until the UDF wrap + // lands. + assert!(!transformed.transformed); + } +} diff --git a/crates/lance-graph-contract/Cargo.toml b/crates/lance-graph-contract/Cargo.toml index 52ff9c54..6215dc31 100644 --- a/crates/lance-graph-contract/Cargo.toml +++ b/crates/lance-graph-contract/Cargo.toml @@ -9,3 +9,9 @@ keywords = ["lance", "graph", "contract", "orchestration", "thinking"] [dependencies] # Zero dependencies by design — this is a trait-only crate. # Consumers depend on this; implementations depend on lance-graph-planner. + +[features] +# A-unlock-stepdomain — `step_trajectory_hash` forward stub for the E4 +# cross-PR bridge between PR #278 audit log + PR #279 grammar trajectory. +# No-op alias today; concrete impl lands once the bridge PR ships. +trajectory-audit = [] diff --git a/crates/lance-graph-contract/src/orchestration.rs b/crates/lance-graph-contract/src/orchestration.rs index 177df50b..af0b14e8 100644 --- a/crates/lance-graph-contract/src/orchestration.rs +++ b/crates/lance-graph-contract/src/orchestration.rs @@ -75,6 +75,131 @@ impl StepDomain { _ => None, } } + + /// Per-domain orchestration profile (E5 from PR #278 outlook). + /// + /// `StepDomain` is the seam for vertical-specific orchestration: + /// verb taxonomy, calibration thresholds, retention windows, and + /// escalation defaults are picked HERE so downstream code does not + /// hard-code Medcare-vs-SMB conditionals at every call site. + /// + /// Profiles are STATIC defaults — the runtime can override via the + /// membrane registry without changing the enum. Tune empirically + /// per deployment; the values below are conservative starters. + pub fn profile(&self) -> DomainProfile { + match self { + Self::Smb => DomainProfile { + audit_retention_days: 90, + auto_action_confidence: 0.75, + escalation: Escalation::Llm, + requires_fail_closed: false, + verb_taxonomy: VerbTaxonomyId::Smb, + }, + Self::Medcare => DomainProfile { + // 6 years (HIPAA §164.316(b)(2)(i)) — starter, tune empirically. + audit_retention_days: 2190, + auto_action_confidence: 0.92, + escalation: Escalation::Human, + requires_fail_closed: true, + verb_taxonomy: VerbTaxonomyId::Medcare, + }, + // Generic defaults for infrastructure / orchestration domains. + // These are NOT vertical-facing; they execute the cycle, not + // the policy. Starter values — tune empirically. + Self::Crew + | Self::Ladybug + | Self::N8n + | Self::LanceGraph + | Self::Ndarray => DomainProfile { + audit_retention_days: 30, + auto_action_confidence: 0.70, + escalation: Escalation::Llm, + requires_fail_closed: false, + verb_taxonomy: VerbTaxonomyId::Generic, + }, + } + } +} + +impl core::fmt::Display for StepDomain { + /// Lowercase form mirroring `from_step_type` keys exactly. + /// `from_step_type(&domain.to_string()) == Some(domain)` for every variant. + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let s = match self { + Self::Crew => "crew", + Self::Ladybug => "lb", + Self::N8n => "n8n", + Self::LanceGraph => "lg", + Self::Ndarray => "nd", + Self::Smb => "smb", + Self::Medcare => "medcare", + }; + f.write_str(s) + } +} + +/// Per-domain orchestration profile. Carries calibration thresholds, +/// retention windows, escalation defaults, and verb-taxonomy markers. +/// +/// Profiles are STATIC defaults — runtime can override via the membrane +/// registry without changing the enum. +#[derive(Debug, Clone, Copy)] +pub struct DomainProfile { + /// Audit retention in days. Medcare (HIPAA) = 6 years (2190); SMB = 90. + pub audit_retention_days: u32, + /// Confidence threshold above which automated actions are allowed + /// without human review. Medcare requires higher threshold. + pub auto_action_confidence: f32, + /// Escalation target on uncertainty: Llm = degrade to LLM tail; + /// Human = require human-in-the-loop; Reject = fail closed. + pub escalation: Escalation, + /// Whether this domain demands fail-closed access control. + /// Medcare = true (HIPAA); SMB = false (commerce). + pub requires_fail_closed: bool, + /// Verb taxonomy id — picks which 144-cell verb table to consult. + pub verb_taxonomy: VerbTaxonomyId, +} + +/// Escalation target on uncertainty. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Escalation { + /// Degrade to LLM tail (best-effort, no human in loop). + Llm, + /// Require human-in-the-loop (HIPAA-grade verticals default here). + Human, + /// Fail closed — reject the step rather than guess. + Reject, +} + +/// Verb taxonomy identifier — selects the per-domain 144-cell verb table. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum VerbTaxonomyId { + /// 12 generic semantic families (BECOMES, CAUSES, SUPPORTS, ...). + Generic, + /// SMB-specific: invoice, quote, dispatch, fulfill, return, refund, ... + Smb, + /// Medcare-specific: prescribe, refer, discharge, admit, treat, diagnose, ... + Medcare, +} + +/// Compute a Trajectory-aware audit hash for a step within this domain. +/// +/// This is the cross-PR bridge between PR #279's grammar substrate and +/// PR #278's audit log: the trajectory becomes the audit key, replacing +/// the syntactic statement_hash. +/// +/// PR #279 epiphany E4. Implementation lands in the bridge PR. +/// +/// META-AGENT: feature-gated stub. Do NOT call until the bridge PR +/// implements it; signature is locked here so callers can compile-test +/// against the trajectory-audit feature flag. +#[cfg(feature = "trajectory-audit")] +pub fn step_trajectory_hash( + _domain: StepDomain, + _step: &UnifiedStep, + _trajectory: &[u64; 256], +) -> u64 { + unimplemented!("see PR #279 outlook E4") } #[cfg(test)] @@ -100,6 +225,55 @@ mod tests { ); assert_eq!(StepDomain::from_step_type("unknown.foo"), None); } + + #[test] + fn display_round_trips_through_from_step_type() { + // Every variant must serialize to a string that `from_step_type` + // accepts and round-trips back. Keeps the Display impl honest. + let all = [ + StepDomain::Crew, + StepDomain::Ladybug, + StepDomain::N8n, + StepDomain::LanceGraph, + StepDomain::Ndarray, + StepDomain::Smb, + StepDomain::Medcare, + ]; + for domain in all { + let s = domain.to_string(); + assert_eq!( + StepDomain::from_step_type(&s), + Some(domain), + "Display→from_step_type round-trip failed for {domain:?} (got {s:?})", + ); + } + } + + #[test] + fn medcare_requires_fail_closed() { + assert!(StepDomain::Medcare.profile().requires_fail_closed); + } + + #[test] + fn medcare_auto_action_threshold_higher_than_smb() { + let medcare = StepDomain::Medcare.profile(); + let smb = StepDomain::Smb.profile(); + assert!( + medcare.auto_action_confidence > smb.auto_action_confidence, + "medcare ({}) must demand a higher auto-action confidence than smb ({})", + medcare.auto_action_confidence, + smb.auto_action_confidence, + ); + } + + #[test] + fn medcare_audit_retention_is_hipaa_grade() { + // HIPAA §164.316(b)(2)(i) = 6 years = 2190 days. + assert!( + StepDomain::Medcare.profile().audit_retention_days >= 2190, + "medcare audit retention must be >= 2190 days (HIPAA 6 years)", + ); + } } /// Unified step — the unit of work crossing system boundaries.