diff --git a/crates/lance-graph-callcenter/src/transcode/cam_pq_decode.rs b/crates/lance-graph-callcenter/src/transcode/cam_pq_decode.rs index b3f90d1a..9d2c52a4 100644 --- a/crates/lance-graph-callcenter/src/transcode/cam_pq_decode.rs +++ b/crates/lance-graph-callcenter/src/transcode/cam_pq_decode.rs @@ -13,18 +13,23 @@ //! this trait only carries the read-side shape downstream consumers //! interact with. -use lance_graph_contract::cam::{route_tensor, CodecRoute}; +use lance_graph_contract::cam::CodecRoute; -use super::zerocopy::{ArrowTypeCode, OuterColumn}; +use super::zerocopy::OuterColumn; -/// Pick the `CodecRoute` for an outer-ontology column. Same heuristic -/// `route_tensor` uses for tensors, applied to the column's Arrow shape. +/// Pick the `CodecRoute` for an outer-ontology column. +/// +/// Returns the column's declarative route copied straight from the +/// upstream `PropertySpec.codec_route`. This is round-2 of the dispatch +/// path: round-1 inferred the route by feeding the column name through +/// `lance_graph_contract::cam::route_tensor`, which is calibrated for +/// model-weight tensor names (`q_proj`, `lm_head`, …) and would +/// silently mis-classify document predicates (`patient_id`, `iban`, …). +/// +/// The schema layer already declares each property's route; honouring +/// that field is correct by construction. pub fn route_for_column(col: &OuterColumn) -> CodecRoute { - match col.arrow_type_code { - ArrowTypeCode::FixedSizeListF32(n) => route_tensor(col.name, &[n as u64]), - ArrowTypeCode::FixedSizeBinary(n) if n >= 64 => route_tensor(col.name, &[n as u64]), - _ => CodecRoute::Skip, - } + col.codec_route } /// Read-side decoder. Implementations get one row's worth of bytes and @@ -97,15 +102,23 @@ mod tests { use lance_graph_contract::property::Schema; #[test] - fn route_for_scalar_columns_skips_codec() { + fn route_for_required_scalar_column_uses_property_spec_default() { + // PropertySpec::required() defaults `codec_route` to + // `CodecRoute::Passthrough` (Index regime). The transcode-layer + // dispatcher must honour that — round-1's heuristic returned + // Skip for scalars by guessing from the Arrow shape, which + // diverged from the contract's own field. Round-2 just reads + // `OuterColumn.codec_route`, so the result is whatever the + // schema declared. let ont = Ontology::builder("T") .schema(Schema::builder("Patient").required("name").build()) .build(); let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap(); - assert!(matches!( + assert_eq!( route_for_column(&soa.columns[0]), - CodecRoute::Skip - )); + CodecRoute::Passthrough, + "required scalar default route should be Passthrough" + ); } #[test] diff --git a/crates/lance-graph-callcenter/src/transcode/mod.rs b/crates/lance-graph-callcenter/src/transcode/mod.rs index 1294e786..95ebe335 100644 --- a/crates/lance-graph-callcenter/src/transcode/mod.rs +++ b/crates/lance-graph-callcenter/src/transcode/mod.rs @@ -77,3 +77,111 @@ pub use crate::ontology_dto::{ pub use lance_graph_contract::ontology::{ EntityTypeId, ExpandedTriple, Label, Locale, Ontology, SchemaExpander, }; + +// ── Cached ontology bundle ─────────────────────────────────────────────────── + +use std::collections::HashMap; +use std::sync::Arc; + +/// Bundle that caches the bilingual DTO projections of one `Ontology`. +/// +/// `OntologyDto::from_ontology(_, locale)` walks every schema + +/// link + action and is `O(properties + links + actions)`. Per-call +/// rebuilds in a hot path waste cycles. Round-2 of the transcode crate +/// extracts the cached pattern that medcare-rs's `MedcareOntology` and +/// smb-office-rs's session ontology both grew independently — one place, +/// one bug to fix, identical semantics for both consumers. +/// +/// Construction is `O(work)`; every subsequent `dto(locale)` call is a +/// `HashMap` hit. The DTOs are cloned-cheap (`Arc`). +/// +/// # Example +/// +/// ```rust,ignore +/// use lance_graph_callcenter::transcode::CachedOntology; +/// use lance_graph_callcenter::ontology_dto::medcare_ontology; +/// use lance_graph_contract::ontology::Locale; +/// +/// let cached = CachedOntology::new(medcare_ontology()); +/// let de = cached.dto(Locale::De); // O(1) cache hit +/// let en = cached.dto(Locale::En); // O(1) cache hit +/// assert_eq!(cached.inner().name, "medcare"); +/// ``` +#[derive(Debug)] +pub struct CachedOntology { + inner: Arc, + dtos: HashMap>, +} + +impl CachedOntology { + /// Build a `CachedOntology` for the given inner ontology, eagerly + /// projecting it to every supported locale (`De`, `En`). + pub fn new(ontology: Ontology) -> Self { + let inner = Arc::new(ontology); + let mut dtos = HashMap::with_capacity(2); + for locale in [Locale::De, Locale::En] { + dtos.insert(locale, Arc::new(OntologyDto::from_ontology(&inner, locale))); + } + Self { inner, dtos } + } + + /// Return a shared reference to the underlying inner ontology. + /// Use this when you need to consume the canonical Schema / + /// LinkSpec / ActionSpec entries directly (e.g. inside + /// `OntologyTableProvider::new`). + pub fn inner(&self) -> &Arc { + &self.inner + } + + /// Look up the DTO projection for one locale. Returns the cached + /// `Arc` — clone is cheap. + /// + /// Panics only if the locale wasn't projected at construction + /// time, which can't happen with the present implementation + /// (constructor projects all variants of `Locale`). The panic + /// would surface as a fast-failure indicator if the enum grows. + pub fn dto(&self, locale: Locale) -> Arc { + self.dtos + .get(&locale) + .cloned() + .expect("Locale must be projected at CachedOntology construction time") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use lance_graph_contract::property::Schema; + + fn small_ontology() -> Ontology { + Ontology::builder("Small") + .label(Label::new("small", "Small Test", "Kleiner Test")) + .schema(Schema::builder("Patient").required("name").build()) + .build() + } + + #[test] + fn cached_ontology_projects_every_locale_at_construction() { + let cached = CachedOntology::new(small_ontology()); + let de = cached.dto(Locale::De); + let en = cached.dto(Locale::En); + // Just verifying the cache is populated and clones are cheap. + assert_eq!(de.locale, Locale::De); + assert_eq!(en.locale, Locale::En); + } + + #[test] + fn cached_ontology_clones_are_arc_cheap() { + let cached = CachedOntology::new(small_ontology()); + let a = cached.dto(Locale::De); + let b = cached.dto(Locale::De); + // Both Arcs point at the same cached projection. + assert!(Arc::ptr_eq(&a, &b)); + } + + #[test] + fn cached_ontology_inner_round_trips() { + let cached = CachedOntology::new(small_ontology()); + assert_eq!(cached.inner().name, "Small"); + } +} diff --git a/crates/lance-graph-callcenter/src/transcode/parallelbetrieb.rs b/crates/lance-graph-callcenter/src/transcode/parallelbetrieb.rs index 0f2d05b6..fb5c21a4 100644 --- a/crates/lance-graph-callcenter/src/transcode/parallelbetrieb.rs +++ b/crates/lance-graph-callcenter/src/transcode/parallelbetrieb.rs @@ -156,6 +156,47 @@ impl DriftEvent { } } +/// Validate that a drift-event route resolves to an entity_type in the +/// given ontology. +/// +/// Routes follow the convention `/api/{entity_type_lowercase}/{...}` — +/// e.g. `/api/patient/42`, `/api/labresult/17`. The validator parses the +/// second path segment, lowercases it, and checks that it matches any +/// `Schema.name` in the ontology under a case-insensitive comparison. +/// +/// Returns `Ok(())` on a valid route, `Err(reason)` otherwise. The error +/// is intended for fast-fail in tests and for warnings in dashboard +/// rendering — the parallelbetrieb runner itself should *not* gate on +/// route validation, because a typo route is still genuine telemetry +/// (it tells you a buggy callsite exists). The validator is for +/// pre-flight checks of static route lists. +pub fn validate_route( + route: &str, + ontology: &lance_graph_contract::ontology::Ontology, +) -> Result<(), String> { + let stripped = route + .strip_prefix("/api/") + .ok_or_else(|| format!("route does not start with `/api/` (got `{route}`)"))?; + let entity_segment = stripped + .split('/') + .next() + .filter(|s| !s.is_empty()) + .ok_or_else(|| format!("route has no entity segment after `/api/` (got `{route}`)"))?; + let normalized = entity_segment.to_ascii_lowercase(); + let matches = ontology + .schemas + .iter() + .any(|s| s.name.eq_ignore_ascii_case(&normalized)); + if matches { + Ok(()) + } else { + let known: Vec<&str> = ontology.schemas.iter().map(|s| s.name).collect(); + Err(format!( + "route entity_type `{entity_segment}` not found in ontology — known: {known:?}" + )) + } +} + /// Contract for any parallelbetrieb implementation. /// /// Two implementors are anticipated: @@ -290,4 +331,46 @@ mod tests { assert!(ev.captured_at.ends_with('Z')); assert!(ev.captured_at.contains('T')); } + + // ── route validation ──────────────────────────────────────────────────── + + fn ontology_with_patient_and_labresult() -> lance_graph_contract::ontology::Ontology { + use lance_graph_contract::property::Schema; + lance_graph_contract::ontology::Ontology::builder("Test") + .schema(Schema::builder("Patient").required("name").build()) + .schema(Schema::builder("LabResult").required("value").build()) + .build() + } + + #[test] + fn validate_route_accepts_known_entity_type() { + let ont = ontology_with_patient_and_labresult(); + assert!(validate_route("/api/patient/42", &ont).is_ok()); + assert!(validate_route("/api/Patient/42", &ont).is_ok()); + assert!(validate_route("/api/labresult/17", &ont).is_ok()); + assert!(validate_route("/api/LabResult/17", &ont).is_ok()); + } + + #[test] + fn validate_route_rejects_typo_entity_type() { + let ont = ontology_with_patient_and_labresult(); + let err = validate_route("/api/patient_typo/42", &ont).unwrap_err(); + assert!(err.contains("patient_typo")); + assert!(err.contains("Patient")); + assert!(err.contains("LabResult")); + } + + #[test] + fn validate_route_rejects_missing_api_prefix() { + let ont = ontology_with_patient_and_labresult(); + assert!(validate_route("/v1/patient/42", &ont).is_err()); + assert!(validate_route("patient/42", &ont).is_err()); + } + + #[test] + fn validate_route_rejects_empty_entity_segment() { + let ont = ontology_with_patient_and_labresult(); + assert!(validate_route("/api/", &ont).is_err()); + assert!(validate_route("/api//42", &ont).is_err()); + } } diff --git a/crates/lance-graph-callcenter/src/transcode/zerocopy.rs b/crates/lance-graph-callcenter/src/transcode/zerocopy.rs index e56172c2..31da71b7 100644 --- a/crates/lance-graph-callcenter/src/transcode/zerocopy.rs +++ b/crates/lance-graph-callcenter/src/transcode/zerocopy.rs @@ -51,6 +51,8 @@ pub enum ArrowTypeCode { UInt32, UInt64, Float32, + /// 32-bit days since Unix epoch — Arrow `Date32`. + Date32, /// Fixed-size list of f32 with the given size (e.g. 16 384 for VSA carriers). FixedSizeListF32(usize), /// Fixed-size binary of the given byte width (e.g. 64 for `Fingerprint`). @@ -60,6 +62,11 @@ pub enum ArrowTypeCode { } /// One column in the outer-ontology view of an entity type. +/// +/// Carries the [`CodecRoute`] copied from the upstream `PropertySpec` so +/// that read-side dispatch (in [`super::cam_pq_decode`]) can consult the +/// declarative route directly, without re-running the model-weight +/// classifier in `lance_graph_contract::cam::route_tensor`. #[derive(Clone, Debug)] pub struct OuterColumn { pub name: &'static str, @@ -67,6 +74,12 @@ pub struct OuterColumn { pub semantic_type: SemanticType, pub marking: Marking, pub arrow_type_code: ArrowTypeCode, + /// Codec route declared by the upstream `PropertySpec`. Round-1 of + /// the transcode crate inferred this from the column name; round-2 + /// honours the contract's own field — every PropertySpec already + /// carries a `codec_route` and hand-rolled inference would only + /// ever drift from it. + pub codec_route: lance_graph_contract::cam::CodecRoute, } /// Outer-ontology projection of one entity type's columns. Derived from @@ -105,14 +118,37 @@ fn schema_columns(schema: &Schema) -> Vec { semantic_type: p.semantic_type.clone(), marking: p.marking, arrow_type_code: arrow_type_for_semantic(&p.semantic_type), + codec_route: p.codec_route, }) .collect() } -/// Round-1 collapses every semantic type to `Utf8`. Round 2 plumbs richer -/// Arrow types (`Date32`, `Decimal128`, etc.) per consumer demand. -fn arrow_type_for_semantic(_st: &SemanticType) -> ArrowTypeCode { - ArrowTypeCode::Utf8 +/// Map a `SemanticType` to the Arrow type the external surface should +/// emit. Picked so that DataFusion predicate pushdown is meaningful: +/// +/// - **`Currency` → `Float32`**: numeric comparison (`amount > 1000.0`) +/// becomes a fast scan filter instead of a string compare. +/// - **`Date(_)` → `Date32`**: temporal comparison (`birth >= 1980-01-01`) +/// pushes down through Arrow's date kernels. +/// - **`CustomerId` / `InvoiceNumber` → `UInt64`**: per-tenant numeric +/// identifiers; faster equality checks than string equality. +/// - **Everything else → `Utf8`**: opaque text, lexical compare only. +/// +/// Currency carries an ISO 4217 code (`Currency("EUR")`); the code is +/// metadata at the schema layer, not per-row data, so we don't widen the +/// Arrow type to a struct. Consumers that need the code read it from the +/// `OuterColumn.semantic_type` field. +fn arrow_type_for_semantic(st: &SemanticType) -> ArrowTypeCode { + match st { + SemanticType::Currency(_) => ArrowTypeCode::Float32, + SemanticType::Date(_) => ArrowTypeCode::Date32, + SemanticType::CustomerId | SemanticType::InvoiceNumber => ArrowTypeCode::UInt64, + // Geo / File / Image / Address / Iban / Email / Phone / Url / + // TaxId / PlainText all collapse to opaque text. Round 3 may + // pivot specific ones (Geo → struct{lat: f32, lon: f32}) when + // a consumer asks. + _ => ArrowTypeCode::Utf8, + } } // ── Arrow wiring (feature `persist`) ───────────────────────────────────────── @@ -141,6 +177,7 @@ pub fn arrow_data_type(code: ArrowTypeCode) -> DataType { ArrowTypeCode::UInt32 => DataType::UInt32, ArrowTypeCode::UInt64 => DataType::UInt64, ArrowTypeCode::Float32 => DataType::Float32, + ArrowTypeCode::Date32 => DataType::Date32, ArrowTypeCode::FixedSizeListF32(n) => DataType::FixedSizeList( Arc::new(Field::new("item", DataType::Float32, false)), n as i32, @@ -158,6 +195,9 @@ pub enum OwnedColumn { UInt32(Vec), Float32(Vec), Utf8(Vec), + /// Date32 — days since Unix epoch (1970-01-01). Negative values are + /// pre-epoch. Backed by Arrow's native `Date32Array`. + Date32(Vec), /// VSA carriers — flat row-major `f32`. `inner_size` is the per-row /// dimensionality (e.g. 16384 for Vsa16kF32). Total length must be /// `nrows * inner_size`. @@ -183,6 +223,7 @@ impl OwnedColumn { OwnedColumn::UInt32(v) => v.len(), OwnedColumn::Float32(v) => v.len(), OwnedColumn::Utf8(v) => v.len(), + OwnedColumn::Date32(v) => v.len(), OwnedColumn::FixedSizeListF32 { flat, inner_size } => { if *inner_size == 0 { 0 @@ -206,6 +247,7 @@ impl OwnedColumn { OwnedColumn::UInt32(v) => Ok(Arc::new(UInt32Array::from(v)) as ArrayRef), OwnedColumn::Float32(v) => Ok(Arc::new(Float32Array::from(v)) as ArrayRef), OwnedColumn::Utf8(v) => Ok(Arc::new(StringArray::from(v)) as ArrayRef), + OwnedColumn::Date32(v) => Ok(Arc::new(arrow::array::Date32Array::from(v)) as ArrayRef), OwnedColumn::FixedSizeListF32 { flat, inner_size } => { if inner_size == 0 || flat.len() % inner_size != 0 { return Err(TranscodeError::ShapeMismatch); @@ -278,6 +320,75 @@ pub fn from_columns( RecordBatch::try_new(arrow_schema, arrays).map_err(TranscodeError::Arrow) } +/// Build a `RecordBatch` for a **partial** row write — for PATCH-style +/// upserts where only a subset of fields is being changed. +/// +/// Rules (stricter than [`from_columns`] in one place, looser in another): +/// +/// 1. **Required columns** declared in the ontology must still be +/// present. Required-by-construction means the entity row can't exist +/// without them; allowing partial writes that omit required fields +/// would invite silent rows missing key data. +/// 2. **Optional / Free columns** may be omitted. Omitted columns appear +/// as Arrow null arrays in the output batch — DataFusion's standard +/// `IS NULL` filter sees them. +/// 3. Undeclared columns are still rejected (same as the strict path). +/// 4. The mode is honest about itself — round-1 emits all-null arrays +/// for missing optionals, which costs `O(nrows)` bytes per skipped +/// column. A full Arrow null-bitmap path is the round-2 lift; this +/// keeps the API surface stable while lifting the contract. +#[cfg(any(feature = "persist", feature = "query-lite"))] +pub fn from_columns_partial( + soa: &OuterSchema, + ids: Vec, + body_columns: Vec<(&str, OwnedColumn)>, +) -> Result { + use arrow::array::new_null_array; + + let nrows = ids.len(); + let arrow_schema = arrow_schema(soa); + + let mut by_name: Vec<(String, Option)> = body_columns + .into_iter() + .map(|(n, c)| (n.to_string(), Some(c))) + .collect(); + + let entity_type_strs: Vec<&'static str> = (0..nrows).map(|_| soa.entity_type).collect(); + let mut arrays: Vec = Vec::with_capacity(arrow_schema.fields().len()); + arrays.push(Arc::new(UInt64Array::from(ids)) as ArrayRef); + arrays.push(Arc::new(StringArray::from(entity_type_strs)) as ArrayRef); + + for (idx, soa_col) in soa.columns.iter().enumerate() { + let slot = by_name + .iter_mut() + .find(|(n, c)| n == soa_col.name && c.is_some()); + match slot { + Some((_, slot_opt)) => { + let owned = slot_opt.take().expect("just-checked-some"); + if owned.rows() != nrows { + return Err(TranscodeError::RowCountMismatch); + } + arrays.push(owned.into_array()?); + } + None => { + if matches!(soa_col.kind, PropertyKind::Required) { + return Err(TranscodeError::MissingColumn(soa_col.name.to_string())); + } + // Optional / Free — fill with nulls. Field index in + // arrow_schema is `idx + 2` (id + entity_type prefix). + let field = arrow_schema.field(idx + 2); + arrays.push(new_null_array(field.data_type(), nrows)); + } + } + } + + if let Some((extra, _)) = by_name.iter().find(|(_, c)| c.is_some()) { + return Err(TranscodeError::UndeclaredColumn(extra.clone())); + } + + RecordBatch::try_new(arrow_schema, arrays).map_err(TranscodeError::Arrow) +} + /// Errors from the transcode layer. #[derive(Debug)] pub enum TranscodeError {