diff --git a/crates/lance-graph-callcenter/Cargo.toml b/crates/lance-graph-callcenter/Cargo.toml index 4cc912a7..45a1f2a8 100644 --- a/crates/lance-graph-callcenter/Cargo.toml +++ b/crates/lance-graph-callcenter/Cargo.toml @@ -34,6 +34,11 @@ futures = { version = "0.3", optional = true } axum = { version = "0.8", features = ["ws"], optional = true } tower-http = { version = "0.5", features = ["cors"], optional = true } +# `transcode` submodules: `ontology_table` needs `async_trait` for the +# DataFusion `TableProvider` impl. The trait crate is small and pulls in +# nothing else, so it's worth always-on (no dep behind a feature). +async-trait = "0.1" + [features] default = [] persist = ["dep:arrow", "dep:lance"] diff --git a/crates/lance-graph-callcenter/src/lib.rs b/crates/lance-graph-callcenter/src/lib.rs index 808a2da2..a7e54e26 100644 --- a/crates/lance-graph-callcenter/src/lib.rs +++ b/crates/lance-graph-callcenter/src/lib.rs @@ -122,3 +122,10 @@ pub mod audit; feature = "full" ))] pub mod policy; + +// Outer ↔ inner ontology transcode — reusable Foundry primitives. +// Domain-agnostic mapper between the wire-shape DTO surface (already in +// `ontology_dto`) and the inner SoA / SPO substrate. Also hosts the +// **single deliberate transition bandaid**: `parallelbetrieb` for the +// MySQL ↔ DataFusion ↔ SPO reconciler. See `transcode/mod.rs`. +pub mod transcode; diff --git a/crates/lance-graph-callcenter/src/transcode/cam_pq_decode.rs b/crates/lance-graph-callcenter/src/transcode/cam_pq_decode.rs new file mode 100644 index 00000000..b3f90d1a --- /dev/null +++ b/crates/lance-graph-callcenter/src/transcode/cam_pq_decode.rs @@ -0,0 +1,135 @@ +//! Decode-on-read shim for persistent-SoA columns. +//! +//! Each outer-ontology column maps to a [`CodecRoute`] from +//! [`lance_graph_contract::cam`]: +//! +//! - `Passthrough` — exact-identity f32, zero-copy on read. +//! - `CamPq` — 6-byte product-quantised codes, decoded on read. +//! - `Skip` — no codec; raw bytes (Utf8, scalar columns). +//! +//! This module is the **dispatch contract** — domain-agnostic, reusable +//! across every ontology that lands a Lance dataset. The actual PQ math +//! lives in `lance_graph_contract::cam` and `lance_graph_planner::physical::cam_pq_scan`; +//! this trait only carries the read-side shape downstream consumers +//! interact with. + +use lance_graph_contract::cam::{route_tensor, CodecRoute}; + +use super::zerocopy::{ArrowTypeCode, OuterColumn}; + +/// Pick the `CodecRoute` for an outer-ontology column. Same heuristic +/// `route_tensor` uses for tensors, applied to the column's Arrow shape. +pub fn route_for_column(col: &OuterColumn) -> CodecRoute { + match col.arrow_type_code { + ArrowTypeCode::FixedSizeListF32(n) => route_tensor(col.name, &[n as u64]), + ArrowTypeCode::FixedSizeBinary(n) if n >= 64 => route_tensor(col.name, &[n as u64]), + _ => CodecRoute::Skip, + } +} + +/// Read-side decoder. Implementations get one row's worth of bytes and +/// hand back the f32 representation the consumer expects. +pub trait CamPqDecoder: Send + Sync { + fn decode_row(&self, encoded: &[u8], out: &mut [f32]) -> Result; +} + +/// Errors from the decode path. +#[derive(Debug, Clone, PartialEq)] +pub enum DecodeError { + BadStride { + expected: usize, + got: usize, + }, + OutputTooSmall { + needed: usize, + got: usize, + }, + /// Codebook not registered for the column's route — caller should + /// either register one or fall back to the [`PassthroughDecoder`]. + NoCodebook, +} + +impl core::fmt::Display for DecodeError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + DecodeError::BadStride { expected, got } => { + write!(f, "bad stride: expected {expected} bytes, got {got}") + } + DecodeError::OutputTooSmall { needed, got } => { + write!(f, "output buffer too small: need {needed}, got {got}") + } + DecodeError::NoCodebook => { + write!(f, "no codebook registered for column's CAM-PQ route") + } + } + } +} + +impl std::error::Error for DecodeError {} + +/// Trivial decoder that re-interprets encoded bytes as little-endian f32. +/// Used for `CodecRoute::Passthrough` / `Skip` columns where no codec +/// transform happened. +#[derive(Debug, Default, Clone, Copy)] +pub struct PassthroughDecoder; + +impl CamPqDecoder for PassthroughDecoder { + fn decode_row(&self, encoded: &[u8], out: &mut [f32]) -> Result { + let needed = out.len() * 4; + if encoded.len() < needed { + return Err(DecodeError::BadStride { + expected: needed, + got: encoded.len(), + }); + } + for (i, chunk) in encoded.chunks_exact(4).take(out.len()).enumerate() { + out[i] = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]); + } + Ok(out.len()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::transcode::zerocopy::OuterSchema; + use lance_graph_contract::ontology::Ontology; + use lance_graph_contract::property::Schema; + + #[test] + fn route_for_scalar_columns_skips_codec() { + let ont = Ontology::builder("T") + .schema(Schema::builder("Patient").required("name").build()) + .build(); + let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap(); + assert!(matches!( + route_for_column(&soa.columns[0]), + CodecRoute::Skip + )); + } + + #[test] + fn passthrough_decoder_round_trips_le_f32() { + let xs: [f32; 4] = [1.0, -2.5, 3.25, 0.0]; + let bytes: Vec = xs.iter().flat_map(|x| x.to_le_bytes()).collect(); + let mut out = [0.0f32; 4]; + let n = PassthroughDecoder.decode_row(&bytes, &mut out).unwrap(); + assert_eq!(n, 4); + assert_eq!(out, xs); + } + + #[test] + fn passthrough_decoder_rejects_short_input() { + let mut out = [0.0f32; 4]; + let err = PassthroughDecoder + .decode_row(&[0u8; 8], &mut out) + .unwrap_err(); + assert!(matches!( + err, + DecodeError::BadStride { + expected: 16, + got: 8 + } + )); + } +} diff --git a/crates/lance-graph-callcenter/src/transcode/mod.rs b/crates/lance-graph-callcenter/src/transcode/mod.rs new file mode 100644 index 00000000..1294e786 --- /dev/null +++ b/crates/lance-graph-callcenter/src/transcode/mod.rs @@ -0,0 +1,79 @@ +//! Outer ↔ inner ontology transcode — reusable Foundry primitives. +//! +//! This module group is the **mapper** between two ontology surfaces: +//! +//! - **Outer ontology** — the shape every external consumer sees. Object +//! types (`Schema`), link types (`LinkSpec`), action types (`ActionSpec`), +//! bilingual labels (`Locale` + `Label`), and the wire DTOs in +//! [`crate::ontology_dto`]. Foundry's "Object Type / Link Type" surface. +//! +//! - **Inner ontology** — the shape the internal SoA actually stores. +//! `BindSpace` columns (`FingerprintColumns`, `EdgeColumn`, +//! `QualiaColumn`, `MetaColumn`, `entity_type`), CAM-PQ-encoded +//! compressed columns when persisted to a Lance dataset, and SPO +//! triples produced by [`SchemaExpander`]. +//! +//! The pieces below are **domain-agnostic**. None of them reference +//! medcare, smb, callcenter, or any specific ontology — they operate on +//! whatever `Ontology` is handed in. Domain-specific schemas live where +//! they belong: `medcare_ontology()` in [`crate::ontology_dto`], +//! `smb_ontology()` in [`crate::ontology_dto`], future verticals in their +//! own factory. +//! +//! ## Modules +//! +//! - [`zerocopy`] — owned-column → Arrow `RecordBatch` mapping. The +//! canonical zerocopy path: `Vec` → `Buffer` is an `O(1)` +//! reinterpretation. Wraps an `Ontology`-derived schema, refuses +//! undeclared columns at the boundary. +//! - [`cam_pq_decode`] — codec dispatch for persistent SoA columns. The +//! `CamPqDecoder` trait + `PassthroughDecoder` cover the +//! `Skip`/`Passthrough` `CodecRoute` lanes; `CamPq` decode-on-read +//! plumbs once a codebook handle is wired (see `ROADMAP` below). +//! - [`ontology_table`] — DataFusion `TableProvider` over an +//! `(Ontology, entity_type)` pair. Schema reflection works today; +//! filter pushdown to the SPO store is the canonical Phase 2 lift. +//! - [`spo_filter`] — SQL filter → SPO lookup translator. Recognises +//! `entity_type`/`predicate`/`entity_id`/`nars_frequency`/ +//! `nars_confidence` against any `Ontology`. +//! - [`parallelbetrieb`] — **the one deliberate transition bandaid**. +//! MySQL ↔ DataFusion ↔ SPO reconciler. Necessary as ground truth +//! during F1/F2; documented as transitional, not as Foundry primitive. +//! +//! ## What this module is NOT +//! +//! - A new "transcode crate". After PR #73 framed `lance-graph-callcenter` +//! itself as the Foundry / supabase-realtime transcode crate, the right +//! home for these helpers is here, alongside `ontology_dto` and +//! `version_watcher`. A sibling crate would create a competing framing. +//! - A duplicate of `ontology_dto`. The DTO surface (`OntologyDto`, +//! `EntityTypeDto`, etc.) stays canonical in `crate::ontology_dto`. +//! This module group consumes that surface; it does not redefine it. +//! - A duplicate of `version_watcher`. Realtime fan-out belongs to the +//! existing `LanceVersionWatcher`. This module group does not introduce +//! a second channel primitive. +//! +//! ## Feature gating +//! +//! The whole subtree is behind the `transcode` feature so consumers that +//! don't need DataFusion/Arrow can still depend on the rest of +//! `lance-graph-callcenter`. Each submodule then layers on the feature +//! it strictly needs (`query-lite` for DataFusion, `arrow` for the +//! zerocopy mapping, etc.). + +pub mod cam_pq_decode; +pub mod parallelbetrieb; +pub mod spo_filter; +pub mod zerocopy; + +#[cfg(feature = "query-lite")] +pub mod ontology_table; + +// Re-export the outer-ontology DTO types so consumers reach the whole +// transcode surface from one import path. +pub use crate::ontology_dto::{ + ActionTypeDto, EntityTypeDto, LinkTypeDto, OntologyDto, PropertyDto, +}; +pub use lance_graph_contract::ontology::{ + EntityTypeId, ExpandedTriple, Label, Locale, Ontology, SchemaExpander, +}; diff --git a/crates/lance-graph-callcenter/src/transcode/ontology_table.rs b/crates/lance-graph-callcenter/src/transcode/ontology_table.rs new file mode 100644 index 00000000..9306d42d --- /dev/null +++ b/crates/lance-graph-callcenter/src/transcode/ontology_table.rs @@ -0,0 +1,182 @@ +//! Ontology-bound DataFusion `TableProvider`. +//! +//! [`OntologyTableProvider`] presents one entity_type as a queryable +//! table. The Arrow schema is derived from the [`Ontology`] via +//! [`OuterSchema`] + [`arrow_schema`]; today the read path is backed by +//! a [`MemTable`] (round-1 stub). Filter pushdown to the inner +//! ontology's SPO store is delegated to [`SpoFilterTranslator`] via +//! [`OntologyTableProvider::translate_filters`]. +//! +//! Domain-agnostic. Pass any `(Ontology, entity_type)` and you get a +//! provider — Foundry-style. + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use datafusion::catalog::Session; +use datafusion::datasource::{MemTable, TableProvider, TableType}; +use datafusion::error::Result as DfResult; +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; +use datafusion::physical_plan::ExecutionPlan; + +use lance_graph_contract::ontology::Ontology; + +use super::spo_filter::{FilterTerm, SpoFilterTranslator, SpoLookup}; +use super::zerocopy::{arrow_schema, OuterSchema}; + +/// DataFusion table provider for one ontology-bound entity_type. +#[derive(Debug)] +pub struct OntologyTableProvider { + ontology: Arc, + soa: OuterSchema, + inner: MemTable, +} + +impl OntologyTableProvider { + /// Build a provider for `entity_type`. Returns `None` if the entity + /// type isn't declared in the ontology. + pub fn new(ontology: Arc, entity_type: &str) -> Option { + let soa = OuterSchema::from_ontology(&ontology, entity_type)?; + let arrow_schema = arrow_schema(&soa); + let inner = MemTable::try_new(arrow_schema, vec![vec![]]).ok()?; + Some(Self { + ontology, + soa, + inner, + }) + } + + /// Build a provider with pre-populated batches. + pub fn with_batches( + ontology: Arc, + entity_type: &str, + batches: Vec, + ) -> Option { + let soa = OuterSchema::from_ontology(&ontology, entity_type)?; + let arrow_schema = arrow_schema(&soa); + let inner = MemTable::try_new(arrow_schema, vec![batches]).ok()?; + Some(Self { + ontology, + soa, + inner, + }) + } + + pub fn entity_type(&self) -> &str { + self.soa.entity_type + } + + pub fn ontology(&self) -> &Ontology { + &self.ontology + } + + pub fn outer_schema(&self) -> &OuterSchema { + &self.soa + } + + /// Translate filter terms to an `SpoLookup`. Useful for direct + /// callers and tests; the DataFusion path eventually goes through + /// `scan`'s filter slice. + pub fn translate_filters(&self, terms: &[FilterTerm]) -> SpoLookup { + SpoFilterTranslator::new(&self.ontology).translate(terms) + } +} + +#[async_trait] +impl TableProvider for OntologyTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> DfResult> { + // Round-1: delegate to MemTable. SpoStore-backed scan is the next + // round (mirrors Phase 2 of the SQL↔SPO ontology bridge plan). + self.inner.scan(state, projection, filters, limit).await + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DfResult> { + // Honest round-1 answer: nothing pushed down yet. + Ok(filters + .iter() + .map(|_| TableProviderFilterPushDown::Unsupported) + .collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use lance_graph_contract::property::Schema; + + fn medcare_ontology() -> Arc { + Arc::new( + Ontology::builder("Medcare") + .schema( + Schema::builder("Patient") + .required("patient_id") + .required("name") + .required("geburtsdatum") + .optional("krankenkasse") + .build(), + ) + .build(), + ) + } + + #[test] + fn provider_returns_arrow_schema_with_id_first() { + let ont = medcare_ontology(); + let p = OntologyTableProvider::new(ont, "Patient").unwrap(); + let s = p.schema(); + assert_eq!(s.field(0).name(), "id"); + assert_eq!(s.field(1).name(), "entity_type"); + assert!(s.fields().iter().any(|f| f.name() == "patient_id")); + assert!(s.fields().iter().any(|f| f.name() == "geburtsdatum")); + } + + #[test] + fn provider_returns_none_for_unknown_entity_type() { + let ont = medcare_ontology(); + assert!(OntologyTableProvider::new(ont, "Spaceship").is_none()); + } + + #[test] + fn provider_table_type_is_base() { + let ont = medcare_ontology(); + let p = OntologyTableProvider::new(ont, "Patient").unwrap(); + assert_eq!(p.table_type(), TableType::Base); + } + + #[test] + fn provider_translate_filters_routes_known_columns() { + use super::super::spo_filter::{Literal, Op}; + let ont = medcare_ontology(); + let p = OntologyTableProvider::new(ont, "Patient").unwrap(); + let look = p.translate_filters(&[FilterTerm { + column: "entity_type".into(), + op: Op::Eq, + literal: Literal::Utf8("Patient".into()), + }]); + assert_eq!(look.entity_type_id, Some(1)); + } +} diff --git a/crates/lance-graph-callcenter/src/transcode/parallelbetrieb.rs b/crates/lance-graph-callcenter/src/transcode/parallelbetrieb.rs new file mode 100644 index 00000000..0f2d05b6 --- /dev/null +++ b/crates/lance-graph-callcenter/src/transcode/parallelbetrieb.rs @@ -0,0 +1,293 @@ +//! MySQL ↔ DataFusion ↔ SPO parallelbetrieb — **the one deliberate transition bandaid**. +//! +//! Everything else under `transcode/` is reusable Foundry primitive: outer +//! ontology → inner ontology mapping that should still make sense in five +//! years. *This* module is different. It exists to run two systems in +//! parallel — a MySQL ground-truth and the new SPO substrate over a +//! DataFusion surface — long enough to reconcile every drift and prove the +//! new side is correct. +//! +//! ## Why parallelbetrieb is necessary +//! +//! No phase advances with nonzero drift, by explicit user directive. To +//! detect drift you need the two systems answering the same question and +//! a reconciler diffing the answers. That reconciler is structurally +//! transitional: +//! +//! - **F1** — both systems answer; reconciler is on; consumers still hit +//! MySQL. +//! - **F2** — both still answer; consumers now hit the new system; MySQL +//! becomes the witness. +//! - **F3** — both still answer; the long-tail drift is chased to zero. +//! - **F4 / F5** — Foundry features go live on top of the new system. +//! MySQL remains as the permanent reference (per directive), but the +//! reconciler's *primary* purpose is satisfied. +//! +//! Even at F5 the reconciler stays — MySQL is permanent — but its mode +//! shifts from "consensus required for any commit" to "background witness +//! that emits drift events when something diverges". The bandaid framing +//! is for the **parallel-evaluation overhead**, not for the witness +//! itself. +//! +//! ## What this module provides today +//! +//! - [`DriftEvent`] — the JSON-shape every reconciler emits. Matches the +//! schema MedCareV2's C# `LanceProbe` already POSTs to +//! `/api/__parity/csharp` (medcare-rs PR #71). +//! - [`DriftKind`] — `Match` / `ValueDrift` / `ShapeDrift` / `MissingMysql` +//! / `MissingLance`. +//! - [`Reconciler`] trait — the contract a parallelbetrieb runner +//! implements. Two callers fill it: the C# `LanceProbe` (cross-language +//! diverse-redundancy witness) and the Rust-side reconciler that +//! compares MySQL `mysql_query` results to SPO `spo_lookup` results. +//! +//! ## What's deferred +//! +//! - The Rust-side reconciler implementation. It needs an MySQL query +//! issuer + an SPO query issuer + a canonicaliser; the canonicaliser +//! is the same field-rule table the C# side already implements +//! (date-only, "F4" doubles, soft-delete coercion, second-truncated +//! timestamps). Land both rules in one place when the Rust query +//! path is wired (Phase 3 of `.claude/plans/sql-spo-ontology-bridge-v1.md`). +//! - The drift-event sink. Today the C# side POSTs to a medcare-rs +//! route; the Rust side will publish to the same persistent ring +//! buffer ([`crate::audit::LanceAuditSink`]) once the wiring lands. +//! +//! ## Hard rules +//! +//! - **No Foundry primitive in this module.** If the type is reusable +//! beyond parallelbetrieb, it goes in a sibling module. Keep this +//! file focused on the bandaid surface so it can be deleted (or +//! reduced) when the F5 sunset clarifies what stays. +//! - **No silent reconciliation.** If MySQL says `pf_delete=NULL` and +//! SPO says `deleted=false`, the canonicaliser collapses both to +//! `false` and emits `Match`; if MySQL says `pf_delete=1` and SPO +//! says `deleted=false`, the reconciler emits `ValueDrift`. No +//! silent agree-to-disagree. + +use core::fmt; + +/// Kind of drift the reconciler observed. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum DriftKind { + /// MySQL and SPO agree after canonicalisation. + Match, + /// Both sides have a row; one or more field values disagree. + ValueDrift, + /// Both sides have a row; the shape (columns / list lengths) disagrees. + ShapeDrift, + /// MySQL has a row, SPO doesn't. + MissingLance, + /// SPO has a row, MySQL doesn't. + MissingMysql, +} + +impl fmt::Display for DriftKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + DriftKind::Match => "Match", + DriftKind::ValueDrift => "ValueDrift", + DriftKind::ShapeDrift => "ShapeDrift", + DriftKind::MissingLance => "MissingLance", + DriftKind::MissingMysql => "MissingMysql", + }) + } +} + +/// One field-level drift entry: identifies the field path and both +/// canonicalised representations. Mirrors the C# `DriftField`. +#[derive(Clone, Debug, PartialEq)] +pub struct DriftField { + pub path: String, + pub mysql: String, + pub lance: String, +} + +/// One reconciliation outcome. Identical schema to MedCareV2's C# +/// `DriftEvent.ToJson()` so the same JSON document parses on both sides. +#[derive(Clone, Debug)] +pub struct DriftEvent { + /// Origin of the event — e.g. `medcarev2-lance-probe` (C# side) or + /// `medcare-rs-reconciler` (Rust side). + pub source: &'static str, + pub route: String, + pub method: &'static str, + pub kind: DriftKind, + pub fields: Vec, + /// ISO 8601 UTC second-resolution timestamp when the comparison ran. + pub captured_at: String, +} + +impl DriftEvent { + /// Construct a `Match` event — no drift, just confirms the + /// reconciliation ran. Useful for sampling counts and "we saw nothing + /// wrong" telemetry. + pub fn matched(source: &'static str, route: impl Into) -> Self { + Self { + source, + route: route.into(), + method: "GET", + kind: DriftKind::Match, + fields: Vec::new(), + captured_at: now_iso8601_seconds(), + } + } + + /// Construct a `ValueDrift` event with one or more field + /// disagreements. The fields list must be non-empty — empty `fields` + /// + `ValueDrift` would be a contradiction. + pub fn value_drift( + source: &'static str, + route: impl Into, + fields: Vec, + ) -> Self { + debug_assert!( + !fields.is_empty(), + "ValueDrift requires at least one DriftField; use matched() if no drift" + ); + Self { + source, + route: route.into(), + method: "GET", + kind: DriftKind::ValueDrift, + fields, + captured_at: now_iso8601_seconds(), + } + } +} + +/// Contract for any parallelbetrieb implementation. +/// +/// Two implementors are anticipated: +/// 1. **C#-side** `LanceProbe` (already shipped in MedCareV2 PR #1, #2, +/// #3) — runs in the legacy desktop app's process; POSTs DriftEvents +/// to a medcare-rs route. +/// 2. **Rust-side** reconciler (deferred) — runs as part of the +/// membrane; compares MySQL query results to SPO results and +/// publishes DriftEvents directly to the audit log. +/// +/// Both report drift in the same JSON shape so a single dashboard can +/// merge both feeds. +pub trait Reconciler { + /// Reconcile one query — implementations issue the MySQL form, the + /// SPO form, canonicalise both, diff, and return the event. + /// Implementations must NOT throw; bubble errors as + /// `DriftKind::ShapeDrift` with the error message in the field. + fn reconcile(&self, route: &str) -> DriftEvent; +} + +fn now_iso8601_seconds() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + // Cheap 32-byte ISO 8601: yyyy-mm-ddTHH:MM:SSZ. Avoids pulling chrono. + let (y, m, d, hh, mm, ss) = unix_to_ymd_hms(secs); + format!("{y:04}-{m:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z") +} + +/// Convert a Unix timestamp (seconds) to (year, month, day, hour, minute, +/// second). Self-contained so the parallelbetrieb module has no +/// chrono / time dep. +fn unix_to_ymd_hms(secs: u64) -> (u32, u32, u32, u32, u32, u32) { + let day = secs / 86_400; + let secs_of_day = secs - day * 86_400; + let hh = (secs_of_day / 3600) as u32; + let mm = ((secs_of_day / 60) % 60) as u32; + let ss = (secs_of_day % 60) as u32; + + // Civil-from-days, Howard Hinnant. Public-domain algorithm; matches + // chrono's `NaiveDate::from_num_days_from_ce_opt` modulo offset. + let z = day as i64 + 719_468; + let era = if z >= 0 { z } else { z - 146_096 } / 146_097; + let doe = (z - era * 146_097) as u64; + let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365; + let y_civil = yoe as i64 + era * 400; + let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); + let mp = (5 * doy + 2) / 153; + let d_civil = doy - (153 * mp + 2) / 5 + 1; + let m_civil = if mp < 10 { mp + 3 } else { mp - 9 }; + let y = y_civil + i64::from(m_civil <= 2); + (y as u32, m_civil as u32, d_civil as u32, hh, mm, ss) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn drift_kind_displays_camelcase() { + assert_eq!(DriftKind::Match.to_string(), "Match"); + assert_eq!(DriftKind::ValueDrift.to_string(), "ValueDrift"); + assert_eq!(DriftKind::MissingMysql.to_string(), "MissingMysql"); + } + + #[test] + fn matched_event_has_no_fields() { + let ev = DriftEvent::matched("test", "/api/patient/1"); + assert_eq!(ev.kind, DriftKind::Match); + assert!(ev.fields.is_empty()); + assert_eq!(ev.method, "GET"); + assert_eq!(ev.source, "test"); + } + + #[test] + fn value_drift_carries_field_path() { + let ev = DriftEvent::value_drift( + "test", + "/api/patient/1", + vec![DriftField { + path: "$.name".to_string(), + mysql: "Anna".to_string(), + lance: "Anneliese".to_string(), + }], + ); + assert_eq!(ev.kind, DriftKind::ValueDrift); + assert_eq!(ev.fields.len(), 1); + assert_eq!(ev.fields[0].path, "$.name"); + } + + #[test] + fn iso8601_handles_unix_epoch() { + let (y, m, d, hh, mm, ss) = unix_to_ymd_hms(0); + assert_eq!((y, m, d, hh, mm, ss), (1970, 1, 1, 0, 0, 0)); + } + + #[test] + fn iso8601_handles_one_day_later() { + // Unix 86400 = 1970-01-02 00:00:00 UTC (one full day after epoch). + let (y, m, d, hh, mm, ss) = unix_to_ymd_hms(86_400); + assert_eq!((y, m, d, hh, mm, ss), (1970, 1, 2, 0, 0, 0)); + } + + #[test] + fn iso8601_handles_y2k_boundary() { + // Unix 946_684_800 = 2000-01-01 00:00:00 UTC (Y2K rollover). + let (y, m, d, hh, mm, ss) = unix_to_ymd_hms(946_684_800); + assert_eq!((y, m, d, hh, mm, ss), (2000, 1, 1, 0, 0, 0)); + } + + #[test] + fn iso8601_handles_leap_year_feb_29() { + // Unix 1_582_934_400 = 2020-02-29 00:00:00 UTC (most recent + // observed leap day before this code shipped). + let (y, m, d, hh, mm, ss) = unix_to_ymd_hms(1_582_934_400); + assert_eq!((y, m, d, hh, mm, ss), (2020, 2, 29, 0, 0, 0)); + } + + #[test] + fn iso8601_round_trips_seconds_and_minutes() { + // 1970-01-01 12:34:56 UTC = 12*3600 + 34*60 + 56 = 45_296. + let (y, m, d, hh, mm, ss) = unix_to_ymd_hms(45_296); + assert_eq!((y, m, d, hh, mm, ss), (1970, 1, 1, 12, 34, 56)); + } + + #[test] + fn captured_at_format_is_well_formed() { + let ev = DriftEvent::matched("test", "/api/patient/1"); + assert_eq!(ev.captured_at.len(), 20); // yyyy-mm-ddTHH:MM:SSZ + assert!(ev.captured_at.ends_with('Z')); + assert!(ev.captured_at.contains('T')); + } +} diff --git a/crates/lance-graph-callcenter/src/transcode/spo_filter.rs b/crates/lance-graph-callcenter/src/transcode/spo_filter.rs new file mode 100644 index 00000000..f8709ac9 --- /dev/null +++ b/crates/lance-graph-callcenter/src/transcode/spo_filter.rs @@ -0,0 +1,167 @@ +//! SQL filter ↔ SPO triple-lookup translator. +//! +//! Given a flat list of `(column, op, literal)` terms — what DataFusion +//! pushdown hands us — produce an [`SpoLookup`] the SPO store can +//! evaluate. This is the **read-side bridge** between the outer +//! ontology's SQL surface and the inner ontology's triple store. +//! +//! Domain-agnostic: any [`Ontology`] resolves entity_type names through +//! [`entity_type_id`]; predicate names hash to fingerprints through the +//! contract's canonical [`fnv1a`] (so the encode and decode sides agree +//! without sharing a code-loaded codebook). + +use lance_graph_contract::hash::fnv1a; +use lance_graph_contract::ontology::{entity_type_id, EntityTypeId, Ontology}; + +/// Stable u64 fingerprint of a predicate string. +pub fn predicate_fingerprint(predicate: &str) -> u64 { + fnv1a(predicate.as_bytes()) +} + +/// Op codes the bridge translates today. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Op { + Eq, + NotEq, + Gt, + Gte, + Lt, + Lte, +} + +/// Single literal-typed comparison: `column op literal`. +#[derive(Debug, Clone)] +pub struct FilterTerm { + pub column: String, + pub op: Op, + pub literal: Literal, +} + +#[derive(Debug, Clone)] +pub enum Literal { + Utf8(String), + UInt64(u64), + Float32(f32), +} + +/// Resolved lookup against the inner-ontology SPO store. Carries every- +/// thing the store needs to evaluate a single-table filter; nothing it +/// doesn't. +#[derive(Debug, Clone, Default)] +pub struct SpoLookup { + pub entity_type_id: Option, + pub predicate_fp: Option, + pub predicate_fp_excluded: Option, + pub min_frequency: Option, + pub min_confidence: Option, + pub entity_id: Option, +} + +/// Translates `FilterTerm`s into an `SpoLookup`. Unknown columns are +/// **silently left as residual** — the table provider hands them back to +/// DataFusion. This keeps the bridge's surface tight and avoids silent +/// over-rejection. +#[derive(Debug)] +pub struct SpoFilterTranslator<'a> { + ontology: &'a Ontology, +} + +impl<'a> SpoFilterTranslator<'a> { + pub fn new(ontology: &'a Ontology) -> Self { + Self { ontology } + } + + pub fn translate(&self, terms: &[FilterTerm]) -> SpoLookup { + let mut out = SpoLookup::default(); + for t in terms { + match (t.column.as_str(), t.op, &t.literal) { + ("entity_type", Op::Eq, Literal::Utf8(s)) => { + let id = entity_type_id(self.ontology, s); + if id != 0 { + out.entity_type_id = Some(id); + } + } + ("entity_id", Op::Eq, Literal::UInt64(n)) => { + out.entity_id = Some(*n); + } + ("predicate", Op::Eq, Literal::Utf8(s)) => { + out.predicate_fp = Some(predicate_fingerprint(s)); + } + ("predicate", Op::NotEq, Literal::Utf8(s)) => { + out.predicate_fp_excluded = Some(predicate_fingerprint(s)); + } + ("nars_frequency", Op::Gt | Op::Gte, Literal::Float32(x)) => { + out.min_frequency = Some(*x); + } + ("nars_confidence", Op::Gt | Op::Gte, Literal::Float32(x)) => { + out.min_confidence = Some(*x); + } + _ => {} + } + } + out + } +} + +#[cfg(test)] +mod tests { + use super::*; + use lance_graph_contract::property::Schema; + + fn ontology_with_patient() -> Ontology { + Ontology::builder("Test") + .schema(Schema::builder("Patient").required("name").build()) + .build() + } + + #[test] + fn translates_entity_type_eq_to_id() { + let ont = ontology_with_patient(); + let look = SpoFilterTranslator::new(&ont).translate(&[FilterTerm { + column: "entity_type".into(), + op: Op::Eq, + literal: Literal::Utf8("Patient".into()), + }]); + assert_eq!(look.entity_type_id, Some(1)); + } + + #[test] + fn unknown_entity_type_eq_drops_to_none() { + let ont = ontology_with_patient(); + let look = SpoFilterTranslator::new(&ont).translate(&[FilterTerm { + column: "entity_type".into(), + op: Op::Eq, + literal: Literal::Utf8("Unknown".into()), + }]); + assert!(look.entity_type_id.is_none()); + } + + #[test] + fn predicate_fingerprint_uses_canonical_fnv1a() { + let h = predicate_fingerprint("name"); + assert_eq!(h, fnv1a(b"name")); + } + + #[test] + fn nars_frequency_gt_lifts_threshold() { + let ont = ontology_with_patient(); + let look = SpoFilterTranslator::new(&ont).translate(&[FilterTerm { + column: "nars_frequency".into(), + op: Op::Gt, + literal: Literal::Float32(0.7), + }]); + assert_eq!(look.min_frequency, Some(0.7)); + } + + #[test] + fn unrecognised_terms_silently_become_residual() { + let ont = ontology_with_patient(); + let look = SpoFilterTranslator::new(&ont).translate(&[FilterTerm { + column: "weird".into(), + op: Op::Eq, + literal: Literal::Utf8("x".into()), + }]); + assert!(look.entity_type_id.is_none()); + assert!(look.predicate_fp.is_none()); + } +} diff --git a/crates/lance-graph-callcenter/src/transcode/zerocopy.rs b/crates/lance-graph-callcenter/src/transcode/zerocopy.rs new file mode 100644 index 00000000..e56172c2 --- /dev/null +++ b/crates/lance-graph-callcenter/src/transcode/zerocopy.rs @@ -0,0 +1,413 @@ +//! Outer-ontology row data → Arrow `RecordBatch` (the wire shape). +//! +//! Inputs are owned scalar / vector columns; outputs are Arrow arrays +//! that DataFusion + Lance + downstream consumers all understand. +//! +//! ## Why `OwnedColumn`? +//! +//! The cheap-zerocopy lane in Arrow 57 is `Vec` → `Buffer`: for +//! `T: ArrowNativeType` it is an `O(1)` reinterpretation — Vec's +//! allocation becomes the Buffer's allocation, no per-element copy. +//! Borrowed `&[T]` references can't take that lane without either +//! moving ownership or wrapping in a custom-owner `Buffer::from_bytes`, +//! both of which require the producer to expose owned slices. +//! +//! Today's producer surface (`BindSpace` in `cognitive-shader-driver`) +//! does not yet hand out ownership of its column allocations. So this +//! round we accept owned columns and document the path explicitly. The +//! BindSpace zerocopy view ships when the producer side adds an +//! accessor (tracked in callcenter's wiring plan). +//! +//! ## Domain-agnostic +//! +//! The mapper takes any `Ontology` + `entity_type` name and projects +//! them to an Arrow schema. No medcare-specific or smb-specific code +//! lives here — that's all in `ontology_dto::medcare_ontology()` / +//! `ontology_dto::smb_ontology()`. + +#[cfg(any(feature = "persist", feature = "query-lite"))] +use std::sync::Arc; + +#[cfg(any(feature = "persist", feature = "query-lite"))] +use arrow::array::{ + Array, ArrayRef, FixedSizeBinaryArray, FixedSizeListArray, Float32Array, StringArray, + UInt32Array, UInt64Array, +}; +#[cfg(any(feature = "persist", feature = "query-lite"))] +use arrow::buffer::Buffer; +#[cfg(any(feature = "persist", feature = "query-lite"))] +use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef}; +#[cfg(any(feature = "persist", feature = "query-lite"))] +use arrow::record_batch::RecordBatch; + +use lance_graph_contract::ontology::{entity_type_id, EntityTypeId, Ontology}; +use lance_graph_contract::property::{Marking, PropertyKind, Schema, SemanticType}; + +/// Lightweight enum mirroring the Arrow types this module emits. +/// Arrow-feature-gated translation lives in [`arrow_data_type`]. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ArrowTypeCode { + Utf8, + UInt32, + UInt64, + Float32, + /// Fixed-size list of f32 with the given size (e.g. 16 384 for VSA carriers). + FixedSizeListF32(usize), + /// Fixed-size binary of the given byte width (e.g. 64 for `Fingerprint`). + FixedSizeBinary(usize), + /// Arrow `Null` — used for fields whose payload doesn't cross the boundary. + Null, +} + +/// One column in the outer-ontology view of an entity type. +#[derive(Clone, Debug)] +pub struct OuterColumn { + pub name: &'static str, + pub kind: PropertyKind, + pub semantic_type: SemanticType, + pub marking: Marking, + pub arrow_type_code: ArrowTypeCode, +} + +/// Outer-ontology projection of one entity type's columns. Derived from +/// an [`Ontology`] schema by [`OuterSchema::from_ontology`]. +#[derive(Clone, Debug)] +pub struct OuterSchema { + /// Locale-stable schema key (`"Patient"`, `"Customer"`, …). + pub entity_type: &'static str, + /// Numeric id assigned by the parent ontology. + pub entity_type_id: EntityTypeId, + /// Body columns derived from the `Schema`. + pub columns: Vec, +} + +impl OuterSchema { + /// Derive an outer schema for `entity_type` from `ontology`. Returns + /// `None` if the entity type is not declared. + pub fn from_ontology(ontology: &Ontology, entity_type: &str) -> Option { + let schema = ontology.schema(entity_type)?; + let id = entity_type_id(ontology, entity_type); + Some(Self { + entity_type: schema.name, + entity_type_id: id, + columns: schema_columns(schema), + }) + } +} + +fn schema_columns(schema: &Schema) -> Vec { + schema + .properties + .iter() + .map(|p| OuterColumn { + name: p.predicate, + kind: p.kind, + semantic_type: p.semantic_type.clone(), + marking: p.marking, + arrow_type_code: arrow_type_for_semantic(&p.semantic_type), + }) + .collect() +} + +/// Round-1 collapses every semantic type to `Utf8`. Round 2 plumbs richer +/// Arrow types (`Date32`, `Decimal128`, etc.) per consumer demand. +fn arrow_type_for_semantic(_st: &SemanticType) -> ArrowTypeCode { + ArrowTypeCode::Utf8 +} + +// ── Arrow wiring (feature `persist`) ───────────────────────────────────────── + +/// Map an [`OuterSchema`] to an Arrow [`SchemaRef`]. +#[cfg(any(feature = "persist", feature = "query-lite"))] +pub fn arrow_schema(soa: &OuterSchema) -> SchemaRef { + let mut fields: Vec = Vec::with_capacity(soa.columns.len() + 2); + fields.push(Field::new("id", DataType::UInt64, false)); + fields.push(Field::new("entity_type", DataType::Utf8, false)); + for col in &soa.columns { + fields.push(Field::new( + col.name, + arrow_data_type(col.arrow_type_code), + !matches!(col.kind, PropertyKind::Required), + )); + } + Arc::new(ArrowSchema::new(fields)) +} + +/// Translate an [`ArrowTypeCode`] to a concrete Arrow [`DataType`]. +#[cfg(any(feature = "persist", feature = "query-lite"))] +pub fn arrow_data_type(code: ArrowTypeCode) -> DataType { + match code { + ArrowTypeCode::Utf8 => DataType::Utf8, + ArrowTypeCode::UInt32 => DataType::UInt32, + ArrowTypeCode::UInt64 => DataType::UInt64, + ArrowTypeCode::Float32 => DataType::Float32, + ArrowTypeCode::FixedSizeListF32(n) => DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, false)), + n as i32, + ), + ArrowTypeCode::FixedSizeBinary(n) => DataType::FixedSizeBinary(n as i32), + ArrowTypeCode::Null => DataType::Null, + } +} + +/// Owned column input — moves ownership into the Arrow Buffer. +#[cfg(any(feature = "persist", feature = "query-lite"))] +#[derive(Debug)] +pub enum OwnedColumn { + UInt64(Vec), + UInt32(Vec), + Float32(Vec), + Utf8(Vec), + /// VSA carriers — flat row-major `f32`. `inner_size` is the per-row + /// dimensionality (e.g. 16384 for Vsa16kF32). Total length must be + /// `nrows * inner_size`. + FixedSizeListF32 { + flat: Vec, + inner_size: usize, + }, + /// Fingerprints — flat row-major bytes, `inner_size` bytes per row. + FixedSizeBinary { + flat: Vec, + inner_size: usize, + }, +} + +#[cfg(any(feature = "persist", feature = "query-lite"))] +impl OwnedColumn { + /// Length in rows. Returns the floor on misaligned shapes; the + /// downstream `into_array()` returns a typed `ShapeMismatch` error, + /// so the caller never panics. + pub fn rows(&self) -> usize { + match self { + OwnedColumn::UInt64(v) => v.len(), + OwnedColumn::UInt32(v) => v.len(), + OwnedColumn::Float32(v) => v.len(), + OwnedColumn::Utf8(v) => v.len(), + OwnedColumn::FixedSizeListF32 { flat, inner_size } => { + if *inner_size == 0 { + 0 + } else { + flat.len() / inner_size + } + } + OwnedColumn::FixedSizeBinary { flat, inner_size } => { + if *inner_size == 0 { + 0 + } else { + flat.len() / inner_size + } + } + } + } + + fn into_array(self) -> Result { + match self { + OwnedColumn::UInt64(v) => Ok(Arc::new(UInt64Array::from(v)) as ArrayRef), + OwnedColumn::UInt32(v) => Ok(Arc::new(UInt32Array::from(v)) as ArrayRef), + OwnedColumn::Float32(v) => Ok(Arc::new(Float32Array::from(v)) as ArrayRef), + OwnedColumn::Utf8(v) => Ok(Arc::new(StringArray::from(v)) as ArrayRef), + OwnedColumn::FixedSizeListF32 { flat, inner_size } => { + if inner_size == 0 || flat.len() % inner_size != 0 { + return Err(TranscodeError::ShapeMismatch); + } + let nrows = flat.len() / inner_size; + let values = Float32Array::from(flat); + let field = Arc::new(Field::new("item", DataType::Float32, false)); + let arr = + FixedSizeListArray::try_new(field, inner_size as i32, Arc::new(values), None) + .map_err(|_| TranscodeError::ShapeMismatch)?; + debug_assert_eq!(arr.len(), nrows); + Ok(Arc::new(arr) as ArrayRef) + } + OwnedColumn::FixedSizeBinary { flat, inner_size } => { + if inner_size == 0 || flat.len() % inner_size != 0 { + return Err(TranscodeError::ShapeMismatch); + } + let buf = Buffer::from_vec(flat); + let arr = FixedSizeBinaryArray::try_new(inner_size as i32, buf, None) + .map_err(|_| TranscodeError::ShapeMismatch)?; + Ok(Arc::new(arr) as ArrayRef) + } + } + } +} + +/// Build a `RecordBatch` from named owned columns. `id` and +/// `entity_type` are filled from the schema; all other columns must +/// match `soa.columns` by name. Undeclared columns are rejected at the +/// boundary — silent widening would defeat the ontology's purpose. +#[cfg(any(feature = "persist", feature = "query-lite"))] +pub fn from_columns( + soa: &OuterSchema, + ids: Vec, + body_columns: Vec<(&str, OwnedColumn)>, +) -> Result { + let nrows = ids.len(); + let arrow_schema = arrow_schema(soa); + + let mut by_name: Vec<(String, Option)> = body_columns + .into_iter() + .map(|(n, c)| (n.to_string(), Some(c))) + .collect(); + + let entity_type_strs: Vec<&'static str> = (0..nrows).map(|_| soa.entity_type).collect(); + let mut arrays: Vec = Vec::with_capacity(arrow_schema.fields().len()); + arrays.push(Arc::new(UInt64Array::from(ids)) as ArrayRef); + arrays.push(Arc::new(StringArray::from(entity_type_strs)) as ArrayRef); + + for soa_col in &soa.columns { + let slot = by_name + .iter_mut() + .find(|(n, c)| n == soa_col.name && c.is_some()); + let owned = match slot { + Some((_, slot_opt)) => slot_opt + .take() + .ok_or_else(|| TranscodeError::MissingColumn(soa_col.name.to_string()))?, + None => return Err(TranscodeError::MissingColumn(soa_col.name.to_string())), + }; + if owned.rows() != nrows { + return Err(TranscodeError::RowCountMismatch); + } + arrays.push(owned.into_array()?); + } + + if let Some((extra, _)) = by_name.iter().find(|(_, c)| c.is_some()) { + return Err(TranscodeError::UndeclaredColumn(extra.clone())); + } + + RecordBatch::try_new(arrow_schema, arrays).map_err(TranscodeError::Arrow) +} + +/// Errors from the transcode layer. +#[derive(Debug)] +pub enum TranscodeError { + MissingColumn(String), + UndeclaredColumn(String), + RowCountMismatch, + ShapeMismatch, + #[cfg(any(feature = "persist", feature = "query-lite"))] + Arrow(arrow::error::ArrowError), +} + +impl core::fmt::Display for TranscodeError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + TranscodeError::MissingColumn(c) => write!(f, "missing column: {c}"), + TranscodeError::UndeclaredColumn(c) => write!( + f, + "column {c} not declared in ontology schema (refusing to widen the boundary)" + ), + TranscodeError::RowCountMismatch => write!(f, "row count mismatch across columns"), + TranscodeError::ShapeMismatch => write!(f, "fixed-size column shape mismatch"), + #[cfg(any(feature = "persist", feature = "query-lite"))] + TranscodeError::Arrow(e) => write!(f, "arrow error: {e}"), + } + } +} + +impl std::error::Error for TranscodeError {} + +#[cfg(test)] +mod tests { + use super::*; + use lance_graph_contract::property::Schema; + + fn build_test_ontology() -> Ontology { + Ontology::builder("Test") + .schema( + Schema::builder("Patient") + .required("patient_id") + .required("name") + .required("geburtsdatum") + .optional("krankenkasse") + .build(), + ) + .build() + } + + #[test] + fn outer_schema_derives_required_and_optional_from_ontology() { + let ontology = build_test_ontology(); + let s = OuterSchema::from_ontology(&ontology, "Patient").expect("declared"); + assert_eq!(s.entity_type, "Patient"); + assert_eq!(s.entity_type_id, 1); + assert_eq!(s.columns.len(), 4); + assert!(s + .columns + .iter() + .any(|c| c.name == "patient_id" && c.kind == PropertyKind::Required)); + assert!(s + .columns + .iter() + .any(|c| c.name == "krankenkasse" && c.kind == PropertyKind::Optional)); + } + + #[test] + fn outer_schema_returns_none_for_unknown_entity_type() { + let ontology = build_test_ontology(); + assert!(OuterSchema::from_ontology(&ontology, "Unknown").is_none()); + } + + #[cfg(any(feature = "persist", feature = "query-lite"))] + #[test] + fn arrow_schema_includes_id_and_entity_type_first() { + let ont = Ontology::builder("T") + .schema(Schema::builder("Patient").required("name").build()) + .build(); + let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap(); + let s = arrow_schema(&soa); + assert_eq!(s.field(0).name(), "id"); + assert_eq!(s.field(1).name(), "entity_type"); + assert_eq!(s.field(2).name(), "name"); + } + + #[cfg(any(feature = "persist", feature = "query-lite"))] + #[test] + fn from_columns_builds_record_batch_in_declared_order() { + let ont = Ontology::builder("T") + .schema( + Schema::builder("Patient") + .required("name") + .required("age") + .build(), + ) + .build(); + let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap(); + let batch = from_columns( + &soa, + vec![10u64, 11u64, 12u64], + vec![ + ( + "name", + OwnedColumn::Utf8(vec!["a".into(), "b".into(), "c".into()]), + ), + ( + "age", + OwnedColumn::Utf8(vec!["1".into(), "2".into(), "3".into()]), + ), + ], + ) + .unwrap(); + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 4); + } + + #[cfg(any(feature = "persist", feature = "query-lite"))] + #[test] + fn from_columns_rejects_undeclared_column() { + let ont = Ontology::builder("T") + .schema(Schema::builder("Patient").required("name").build()) + .build(); + let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap(); + let err = from_columns( + &soa, + vec![1u64], + vec![ + ("name", OwnedColumn::Utf8(vec!["a".into()])), + ("uninvited", OwnedColumn::Utf8(vec!["x".into()])), + ], + ) + .unwrap_err(); + assert!(matches!(err, TranscodeError::UndeclaredColumn(_))); + } +}