Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/lance-graph-callcenter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ futures = { version = "0.3", optional = true }
axum = { version = "0.8", features = ["ws"], optional = true }
tower-http = { version = "0.5", features = ["cors"], optional = true }

# `transcode` submodules: `ontology_table` needs `async_trait` for the
# DataFusion `TableProvider` impl. The trait crate is small and pulls in
# nothing else, so it's worth always-on (no dep behind a feature).
async-trait = "0.1"

[features]
default = []
persist = ["dep:arrow", "dep:lance"]
Expand Down
7 changes: 7 additions & 0 deletions crates/lance-graph-callcenter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,10 @@ pub mod audit;
feature = "full"
))]
pub mod policy;

// Outer ↔ inner ontology transcode — reusable Foundry primitives.
// Domain-agnostic mapper between the wire-shape DTO surface (already in
// `ontology_dto`) and the inner SoA / SPO substrate. Also hosts the
// **single deliberate transition bandaid**: `parallelbetrieb` for the
// MySQL ↔ DataFusion ↔ SPO reconciler. See `transcode/mod.rs`.
pub mod transcode;
135 changes: 135 additions & 0 deletions crates/lance-graph-callcenter/src/transcode/cam_pq_decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//! Decode-on-read shim for persistent-SoA columns.
//!
//! Each outer-ontology column maps to a [`CodecRoute`] from
//! [`lance_graph_contract::cam`]:
//!
//! - `Passthrough` — exact-identity f32, zero-copy on read.
//! - `CamPq` — 6-byte product-quantised codes, decoded on read.
//! - `Skip` — no codec; raw bytes (Utf8, scalar columns).
//!
//! This module is the **dispatch contract** — domain-agnostic, reusable
//! across every ontology that lands a Lance dataset. The actual PQ math
//! lives in `lance_graph_contract::cam` and `lance_graph_planner::physical::cam_pq_scan`;
//! this trait only carries the read-side shape downstream consumers
//! interact with.

use lance_graph_contract::cam::{route_tensor, CodecRoute};

use super::zerocopy::{ArrowTypeCode, OuterColumn};

/// Pick the `CodecRoute` for an outer-ontology column. Same heuristic
/// `route_tensor` uses for tensors, applied to the column's Arrow shape.
pub fn route_for_column(col: &OuterColumn) -> CodecRoute {
match col.arrow_type_code {
ArrowTypeCode::FixedSizeListF32(n) => route_tensor(col.name, &[n as u64]),
ArrowTypeCode::FixedSizeBinary(n) if n >= 64 => route_tensor(col.name, &[n as u64]),
_ => CodecRoute::Skip,
}
}

/// Read-side decoder. Implementations get one row's worth of bytes and
/// hand back the f32 representation the consumer expects.
pub trait CamPqDecoder: Send + Sync {
fn decode_row(&self, encoded: &[u8], out: &mut [f32]) -> Result<usize, DecodeError>;
}

/// Errors from the decode path.
#[derive(Debug, Clone, PartialEq)]
pub enum DecodeError {
BadStride {
expected: usize,
got: usize,
},
OutputTooSmall {
needed: usize,
got: usize,
},
/// Codebook not registered for the column's route — caller should
/// either register one or fall back to the [`PassthroughDecoder`].
NoCodebook,
}

impl core::fmt::Display for DecodeError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
DecodeError::BadStride { expected, got } => {
write!(f, "bad stride: expected {expected} bytes, got {got}")
}
DecodeError::OutputTooSmall { needed, got } => {
write!(f, "output buffer too small: need {needed}, got {got}")
}
DecodeError::NoCodebook => {
write!(f, "no codebook registered for column's CAM-PQ route")
}
}
}
}

impl std::error::Error for DecodeError {}

/// Trivial decoder that re-interprets encoded bytes as little-endian f32.
/// Used for `CodecRoute::Passthrough` / `Skip` columns where no codec
/// transform happened.
#[derive(Debug, Default, Clone, Copy)]
pub struct PassthroughDecoder;

impl CamPqDecoder for PassthroughDecoder {
fn decode_row(&self, encoded: &[u8], out: &mut [f32]) -> Result<usize, DecodeError> {
let needed = out.len() * 4;
if encoded.len() < needed {
return Err(DecodeError::BadStride {
expected: needed,
got: encoded.len(),
});
}
for (i, chunk) in encoded.chunks_exact(4).take(out.len()).enumerate() {
out[i] = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
}
Ok(out.len())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::transcode::zerocopy::OuterSchema;
use lance_graph_contract::ontology::Ontology;
use lance_graph_contract::property::Schema;

#[test]
fn route_for_scalar_columns_skips_codec() {
let ont = Ontology::builder("T")
.schema(Schema::builder("Patient").required("name").build())
.build();
let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap();
assert!(matches!(
route_for_column(&soa.columns[0]),
CodecRoute::Skip
));
}

#[test]
fn passthrough_decoder_round_trips_le_f32() {
let xs: [f32; 4] = [1.0, -2.5, 3.25, 0.0];
let bytes: Vec<u8> = xs.iter().flat_map(|x| x.to_le_bytes()).collect();
let mut out = [0.0f32; 4];
let n = PassthroughDecoder.decode_row(&bytes, &mut out).unwrap();
assert_eq!(n, 4);
assert_eq!(out, xs);
}

#[test]
fn passthrough_decoder_rejects_short_input() {
let mut out = [0.0f32; 4];
let err = PassthroughDecoder
.decode_row(&[0u8; 8], &mut out)
.unwrap_err();
assert!(matches!(
err,
DecodeError::BadStride {
expected: 16,
got: 8
}
));
}
}
79 changes: 79 additions & 0 deletions crates/lance-graph-callcenter/src/transcode/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Outer ↔ inner ontology transcode — reusable Foundry primitives.
//!
//! This module group is the **mapper** between two ontology surfaces:
//!
//! - **Outer ontology** — the shape every external consumer sees. Object
//! types (`Schema`), link types (`LinkSpec`), action types (`ActionSpec`),
//! bilingual labels (`Locale` + `Label`), and the wire DTOs in
//! [`crate::ontology_dto`]. Foundry's "Object Type / Link Type" surface.
//!
//! - **Inner ontology** — the shape the internal SoA actually stores.
//! `BindSpace` columns (`FingerprintColumns`, `EdgeColumn`,
//! `QualiaColumn`, `MetaColumn`, `entity_type`), CAM-PQ-encoded
//! compressed columns when persisted to a Lance dataset, and SPO
//! triples produced by [`SchemaExpander`].
//!
//! The pieces below are **domain-agnostic**. None of them reference
//! medcare, smb, callcenter, or any specific ontology — they operate on
//! whatever `Ontology` is handed in. Domain-specific schemas live where
//! they belong: `medcare_ontology()` in [`crate::ontology_dto`],
//! `smb_ontology()` in [`crate::ontology_dto`], future verticals in their
//! own factory.
//!
//! ## Modules
//!
//! - [`zerocopy`] — owned-column → Arrow `RecordBatch` mapping. The
//! canonical zerocopy path: `Vec<T>` → `Buffer` is an `O(1)`
//! reinterpretation. Wraps an `Ontology`-derived schema, refuses
//! undeclared columns at the boundary.
//! - [`cam_pq_decode`] — codec dispatch for persistent SoA columns. The
//! `CamPqDecoder` trait + `PassthroughDecoder` cover the
//! `Skip`/`Passthrough` `CodecRoute` lanes; `CamPq` decode-on-read
//! plumbs once a codebook handle is wired (see `ROADMAP` below).
//! - [`ontology_table`] — DataFusion `TableProvider` over an
//! `(Ontology, entity_type)` pair. Schema reflection works today;
//! filter pushdown to the SPO store is the canonical Phase 2 lift.
//! - [`spo_filter`] — SQL filter → SPO lookup translator. Recognises
//! `entity_type`/`predicate`/`entity_id`/`nars_frequency`/
//! `nars_confidence` against any `Ontology`.
//! - [`parallelbetrieb`] — **the one deliberate transition bandaid**.
//! MySQL ↔ DataFusion ↔ SPO reconciler. Necessary as ground truth
//! during F1/F2; documented as transitional, not as Foundry primitive.
//!
//! ## What this module is NOT
//!
//! - A new "transcode crate". After PR #73 framed `lance-graph-callcenter`
//! itself as the Foundry / supabase-realtime transcode crate, the right
//! home for these helpers is here, alongside `ontology_dto` and
//! `version_watcher`. A sibling crate would create a competing framing.
//! - A duplicate of `ontology_dto`. The DTO surface (`OntologyDto`,
//! `EntityTypeDto`, etc.) stays canonical in `crate::ontology_dto`.
//! This module group consumes that surface; it does not redefine it.
//! - A duplicate of `version_watcher`. Realtime fan-out belongs to the
//! existing `LanceVersionWatcher`. This module group does not introduce
//! a second channel primitive.
//!
//! ## Feature gating
//!
//! The whole subtree is behind the `transcode` feature so consumers that
//! don't need DataFusion/Arrow can still depend on the rest of
//! `lance-graph-callcenter`. Each submodule then layers on the feature
//! it strictly needs (`query-lite` for DataFusion, `arrow` for the
//! zerocopy mapping, etc.).

pub mod cam_pq_decode;
pub mod parallelbetrieb;
pub mod spo_filter;
pub mod zerocopy;

#[cfg(feature = "query-lite")]
pub mod ontology_table;

// Re-export the outer-ontology DTO types so consumers reach the whole
// transcode surface from one import path.
pub use crate::ontology_dto::{
ActionTypeDto, EntityTypeDto, LinkTypeDto, OntologyDto, PropertyDto,
};
pub use lance_graph_contract::ontology::{
EntityTypeId, ExpandedTriple, Label, Locale, Ontology, SchemaExpander,
};
Loading
Loading