Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions crates/lance-graph-callcenter/src/transcode/cam_pq_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@
//! this trait only carries the read-side shape downstream consumers
//! interact with.

use lance_graph_contract::cam::{route_tensor, CodecRoute};
use lance_graph_contract::cam::CodecRoute;

use super::zerocopy::{ArrowTypeCode, OuterColumn};
use super::zerocopy::OuterColumn;

/// Pick the `CodecRoute` for an outer-ontology column. Same heuristic
/// `route_tensor` uses for tensors, applied to the column's Arrow shape.
/// Pick the `CodecRoute` for an outer-ontology column.
///
/// Returns the column's declarative route copied straight from the
/// upstream `PropertySpec.codec_route`. This is round-2 of the dispatch
/// path: round-1 inferred the route by feeding the column name through
/// `lance_graph_contract::cam::route_tensor`, which is calibrated for
/// model-weight tensor names (`q_proj`, `lm_head`, …) and would
/// silently mis-classify document predicates (`patient_id`, `iban`, …).
///
/// The schema layer already declares each property's route; honouring
/// that field is correct by construction.
pub fn route_for_column(col: &OuterColumn) -> CodecRoute {
match col.arrow_type_code {
ArrowTypeCode::FixedSizeListF32(n) => route_tensor(col.name, &[n as u64]),
ArrowTypeCode::FixedSizeBinary(n) if n >= 64 => route_tensor(col.name, &[n as u64]),
_ => CodecRoute::Skip,
}
col.codec_route
}

/// Read-side decoder. Implementations get one row's worth of bytes and
Expand Down Expand Up @@ -97,15 +102,23 @@ mod tests {
use lance_graph_contract::property::Schema;

#[test]
fn route_for_scalar_columns_skips_codec() {
fn route_for_required_scalar_column_uses_property_spec_default() {
// PropertySpec::required() defaults `codec_route` to
// `CodecRoute::Passthrough` (Index regime). The transcode-layer
// dispatcher must honour that — round-1's heuristic returned
// Skip for scalars by guessing from the Arrow shape, which
// diverged from the contract's own field. Round-2 just reads
// `OuterColumn.codec_route`, so the result is whatever the
// schema declared.
let ont = Ontology::builder("T")
.schema(Schema::builder("Patient").required("name").build())
.build();
let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap();
assert!(matches!(
assert_eq!(
route_for_column(&soa.columns[0]),
CodecRoute::Skip
));
CodecRoute::Passthrough,
"required scalar default route should be Passthrough"
);
}

#[test]
Expand Down
108 changes: 108 additions & 0 deletions crates/lance-graph-callcenter/src/transcode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,111 @@ pub use crate::ontology_dto::{
pub use lance_graph_contract::ontology::{
EntityTypeId, ExpandedTriple, Label, Locale, Ontology, SchemaExpander,
};

// ── Cached ontology bundle ───────────────────────────────────────────────────

use std::collections::HashMap;
use std::sync::Arc;

/// Bundle that caches the bilingual DTO projections of one `Ontology`.
///
/// `OntologyDto::from_ontology(_, locale)` walks every schema +
/// link + action and is `O(properties + links + actions)`. Per-call
/// rebuilds in a hot path waste cycles. Round-2 of the transcode crate
/// extracts the cached pattern that medcare-rs's `MedcareOntology` and
/// smb-office-rs's session ontology both grew independently — one place,
/// one bug to fix, identical semantics for both consumers.
///
/// Construction is `O(work)`; every subsequent `dto(locale)` call is a
/// `HashMap` hit. The DTOs are cloned-cheap (`Arc<OntologyDto>`).
///
/// # Example
///
/// ```rust,ignore
/// use lance_graph_callcenter::transcode::CachedOntology;
/// use lance_graph_callcenter::ontology_dto::medcare_ontology;
/// use lance_graph_contract::ontology::Locale;
///
/// let cached = CachedOntology::new(medcare_ontology());
/// let de = cached.dto(Locale::De); // O(1) cache hit
/// let en = cached.dto(Locale::En); // O(1) cache hit
/// assert_eq!(cached.inner().name, "medcare");
/// ```
#[derive(Debug)]
pub struct CachedOntology {
inner: Arc<Ontology>,
dtos: HashMap<Locale, Arc<OntologyDto>>,
}

impl CachedOntology {
/// Build a `CachedOntology` for the given inner ontology, eagerly
/// projecting it to every supported locale (`De`, `En`).
pub fn new(ontology: Ontology) -> Self {
let inner = Arc::new(ontology);
let mut dtos = HashMap::with_capacity(2);
for locale in [Locale::De, Locale::En] {
dtos.insert(locale, Arc::new(OntologyDto::from_ontology(&inner, locale)));
}
Self { inner, dtos }
}

/// Return a shared reference to the underlying inner ontology.
/// Use this when you need to consume the canonical Schema /
/// LinkSpec / ActionSpec entries directly (e.g. inside
/// `OntologyTableProvider::new`).
pub fn inner(&self) -> &Arc<Ontology> {
&self.inner
}

/// Look up the DTO projection for one locale. Returns the cached
/// `Arc<OntologyDto>` — clone is cheap.
///
/// Panics only if the locale wasn't projected at construction
/// time, which can't happen with the present implementation
/// (constructor projects all variants of `Locale`). The panic
/// would surface as a fast-failure indicator if the enum grows.
pub fn dto(&self, locale: Locale) -> Arc<OntologyDto> {
self.dtos
.get(&locale)
.cloned()
.expect("Locale must be projected at CachedOntology construction time")
}
}

#[cfg(test)]
mod tests {
use super::*;
use lance_graph_contract::property::Schema;

fn small_ontology() -> Ontology {
Ontology::builder("Small")
.label(Label::new("small", "Small Test", "Kleiner Test"))
.schema(Schema::builder("Patient").required("name").build())
.build()
}

#[test]
fn cached_ontology_projects_every_locale_at_construction() {
let cached = CachedOntology::new(small_ontology());
let de = cached.dto(Locale::De);
let en = cached.dto(Locale::En);
// Just verifying the cache is populated and clones are cheap.
assert_eq!(de.locale, Locale::De);
assert_eq!(en.locale, Locale::En);
}

#[test]
fn cached_ontology_clones_are_arc_cheap() {
let cached = CachedOntology::new(small_ontology());
let a = cached.dto(Locale::De);
let b = cached.dto(Locale::De);
// Both Arcs point at the same cached projection.
assert!(Arc::ptr_eq(&a, &b));
}

#[test]
fn cached_ontology_inner_round_trips() {
let cached = CachedOntology::new(small_ontology());
assert_eq!(cached.inner().name, "Small");
}
}
83 changes: 83 additions & 0 deletions crates/lance-graph-callcenter/src/transcode/parallelbetrieb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,47 @@ impl DriftEvent {
}
}

/// Validate that a drift-event route resolves to an entity_type in the
/// given ontology.
///
/// Routes follow the convention `/api/{entity_type_lowercase}/{...}` —
/// e.g. `/api/patient/42`, `/api/labresult/17`. The validator parses the
/// second path segment, lowercases it, and checks that it matches any
/// `Schema.name` in the ontology under a case-insensitive comparison.
///
/// Returns `Ok(())` on a valid route, `Err(reason)` otherwise. The error
/// is intended for fast-fail in tests and for warnings in dashboard
/// rendering — the parallelbetrieb runner itself should *not* gate on
/// route validation, because a typo route is still genuine telemetry
/// (it tells you a buggy callsite exists). The validator is for
/// pre-flight checks of static route lists.
pub fn validate_route(
route: &str,
ontology: &lance_graph_contract::ontology::Ontology,
) -> Result<(), String> {
let stripped = route
.strip_prefix("/api/")
.ok_or_else(|| format!("route does not start with `/api/` (got `{route}`)"))?;
let entity_segment = stripped
.split('/')
.next()
.filter(|s| !s.is_empty())
.ok_or_else(|| format!("route has no entity segment after `/api/` (got `{route}`)"))?;
let normalized = entity_segment.to_ascii_lowercase();
let matches = ontology
.schemas
.iter()
.any(|s| s.name.eq_ignore_ascii_case(&normalized));
if matches {
Ok(())
} else {
let known: Vec<&str> = ontology.schemas.iter().map(|s| s.name).collect();
Err(format!(
"route entity_type `{entity_segment}` not found in ontology — known: {known:?}"
))
}
}

/// Contract for any parallelbetrieb implementation.
///
/// Two implementors are anticipated:
Expand Down Expand Up @@ -290,4 +331,46 @@ mod tests {
assert!(ev.captured_at.ends_with('Z'));
assert!(ev.captured_at.contains('T'));
}

// ── route validation ────────────────────────────────────────────────────

fn ontology_with_patient_and_labresult() -> lance_graph_contract::ontology::Ontology {
use lance_graph_contract::property::Schema;
lance_graph_contract::ontology::Ontology::builder("Test")
.schema(Schema::builder("Patient").required("name").build())
.schema(Schema::builder("LabResult").required("value").build())
.build()
}

#[test]
fn validate_route_accepts_known_entity_type() {
let ont = ontology_with_patient_and_labresult();
assert!(validate_route("/api/patient/42", &ont).is_ok());
assert!(validate_route("/api/Patient/42", &ont).is_ok());
assert!(validate_route("/api/labresult/17", &ont).is_ok());
assert!(validate_route("/api/LabResult/17", &ont).is_ok());
}

#[test]
fn validate_route_rejects_typo_entity_type() {
let ont = ontology_with_patient_and_labresult();
let err = validate_route("/api/patient_typo/42", &ont).unwrap_err();
assert!(err.contains("patient_typo"));
assert!(err.contains("Patient"));
assert!(err.contains("LabResult"));
}

#[test]
fn validate_route_rejects_missing_api_prefix() {
let ont = ontology_with_patient_and_labresult();
assert!(validate_route("/v1/patient/42", &ont).is_err());
assert!(validate_route("patient/42", &ont).is_err());
}

#[test]
fn validate_route_rejects_empty_entity_segment() {
let ont = ontology_with_patient_and_labresult();
assert!(validate_route("/api/", &ont).is_err());
assert!(validate_route("/api//42", &ont).is_err());
}
}
Loading
Loading