Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
323 changes: 323 additions & 0 deletions crates/lance-graph-callcenter/src/transcode/zerocopy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,183 @@ pub fn from_columns_partial(
RecordBatch::try_new(arrow_schema, arrays).map_err(TranscodeError::Arrow)
}

/// Build a `RecordBatch` from a stream of [`ExpandedTriple`]s — the
/// **reverse direction** that the original transcode round flagged as
/// deferred (Phase 5 in #309's ROADMAP, reframed here as Phase-2-B).
///
/// `ExpandedTriple` is what `Ontology::expand_entity()` returns — one
/// triple per (entity_id, predicate). A row in the outer-DTO view is
/// the gather of all triples sharing one `subject_label`. This helper
/// performs that gather: groups by subject, projects each group's
/// predicate→value pairs into the schema's column slots, and emits a
/// single `RecordBatch` covering all subjects in the input slice.
///
/// ## Domain-agnostic
///
/// Works for any `(Ontology, entity_type)` pair. Subject extraction
/// uses the canonical `entity:{type}:{id}` label format that
/// `expand_entity()` produces. Callers that mint subject labels by
/// other means must canonicalise first.
///
/// ## What this honestly does NOT do today
///
/// - It does NOT consult an `SpoStore`. The Phase-2 plan doc described
/// that as `walk SpoStore::scan(lookup)`, but `SpoStore` is
/// fingerprint-Hamming-indexed and one-way (the FNV-1a fingerprint
/// doesn't round-trip back to `entity_id`). A real SpoStore reader
/// needs a side-table mapping subject fingerprint → entity_id, which
/// is a separate primitive.
/// - It does NOT push values through the per-column codec. Every value
/// crosses as the `object_label` string from the triple. Round 3
/// adds typed-value reconstruction.
///
/// ## Errors
///
/// - `UnknownEntityType` if any triple's `entity_type_id` doesn't
/// match the schema's id.
/// - `Arrow` for `RecordBatch::try_new` failures (shape misalignment).
///
/// Triples whose `predicate` isn't declared in the schema are silently
/// dropped. This matches the BBB "outer view shows only declared
/// fields" rule — undeclared properties stay inside the substrate.
#[cfg(any(feature = "persist", feature = "query-lite"))]
pub fn triples_to_batch(
soa: &OuterSchema,
triples: &[lance_graph_contract::ontology::ExpandedTriple],
) -> Result<RecordBatch, TranscodeError> {
use std::collections::BTreeMap;

// Group triples by subject_label, preserving insertion order via the
// BTreeMap (lexical subject sort). Entity_id parsed from
// "entity:{type}:{id}" once per group.
let mut grouped: BTreeMap<String, (u64, Vec<&lance_graph_contract::ontology::ExpandedTriple>)> =
BTreeMap::new();

for t in triples {
if t.entity_type_id != soa.entity_type_id {
return Err(TranscodeError::EntityTypeMismatch {
expected: soa.entity_type_id,
got: t.entity_type_id,
});
}
let entity_id = parse_entity_id_from_label(&t.subject_label, soa.entity_type)
.ok_or_else(|| TranscodeError::BadSubjectLabel(t.subject_label.clone()))?;
grouped
.entry(t.subject_label.clone())
.or_insert_with(|| (entity_id, Vec::new()))
.1
.push(t);
}

let nrows = grouped.len();
let arrow_schema = arrow_schema(soa);

// Build columns. id + entity_type are always present; body columns
// are projected from the gathered triples.
let mut ids: Vec<u64> = Vec::with_capacity(nrows);
let mut entity_type_strs: Vec<&'static str> = Vec::with_capacity(nrows);
// For each declared body column, accumulate one Option<String> per
// row in iteration order; missing predicates become null.
let ncols = soa.columns.len();
let mut body: Vec<Vec<Option<String>>> =
(0..ncols).map(|_| Vec::with_capacity(nrows)).collect();

for (_, (entity_id, group_triples)) in &grouped {
ids.push(*entity_id);
entity_type_strs.push(soa.entity_type);

// For each declared column, find the matching triple (if any).
for (col_idx, soa_col) in soa.columns.iter().enumerate() {
let value = group_triples
.iter()
.find(|t| t.predicate == soa_col.name)
.map(|t| t.object_label.clone());
body[col_idx].push(value);
}
}

// Materialise into Arrow arrays. id + entity_type first; body
// columns as nullable string arrays (round-1 — every value crosses
// as Utf8; round-3 plumbs typed reconstruction).
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
arrays.push(Arc::new(UInt64Array::from(ids)) as ArrayRef);
arrays.push(Arc::new(StringArray::from(entity_type_strs)) as ArrayRef);

for (idx, col_values) in body.into_iter().enumerate() {
let field = arrow_schema.field(idx + 2);
// Required columns disallow null per the schema. If we collected
// a None for a required column the row is incomplete; surface
// that as an error rather than silently dropping the field.
let required = !field.is_nullable();
if required && col_values.iter().any(|v| v.is_none()) {
return Err(TranscodeError::MissingColumn(soa.columns[idx].name.into()));
}
// Round-1: every column emits as nullable Utf8 regardless of
// the schema's declared Arrow type. The schema's typed
// semantic_type → ArrowTypeCode mapping (Float32, Date32, etc.)
// applies on the from_columns / from_columns_partial path,
// which has typed input. For triples_to_batch the input is
// string-shaped (object_label), so round-1 keeps it Utf8.
// Cast to declared type happens at the consumer; round 3 adds
// an in-place per-column casting layer here.
let _ = field; // Silence unused-warning when the cast layer lands.
let arr = StringArray::from(col_values);
arrays.push(Arc::new(arr) as ArrayRef);
}

// The schema we computed assumes typed columns (Float32, Date32,
// etc.) per arrow_schema(). Since round-1 emits Utf8 for every body
// column, we re-derive a "round-1 lenient" schema here that swaps
// every body field to Utf8 (nullable per the original `kind`).
//
// This avoids a `RecordBatch::try_new` mismatch error and is
// documented honestly: round 3 swaps to typed values + the lenient
// schema goes away.
let lenient_schema = round1_lenient_schema(soa);
RecordBatch::try_new(lenient_schema, arrays).map_err(TranscodeError::Arrow)
}

/// Round-1 helper: produce a `SchemaRef` whose body columns are all
/// nullable `Utf8`, matching what [`triples_to_batch`] emits.
#[cfg(any(feature = "persist", feature = "query-lite"))]
pub fn round1_lenient_schema(soa: &OuterSchema) -> SchemaRef {
let mut fields: Vec<Field> = Vec::with_capacity(soa.columns.len() + 2);
fields.push(Field::new("id", DataType::UInt64, false));
fields.push(Field::new("entity_type", DataType::Utf8, false));
for col in &soa.columns {
fields.push(Field::new(col.name, DataType::Utf8, true));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve required-field nullability in lenient schema

round1_lenient_schema marks every body field as nullable, which drops the required/optional contract encoded in OuterSchema and used by arrow_schema. That means batches produced by triples_to_batch no longer carry non-null constraints for required properties, and can become schema-incompatible with batches built through from_columns for the same entity type. The field nullability should still follow PropertyKind even if the data type is temporarily widened to Utf8.

Useful? React with 👍 / 👎.

}
Arc::new(ArrowSchema::new(fields))
}

/// Parse "entity:{type}:{id}" into the trailing `id`. Used by
/// [`triples_to_batch`] to recover entity_id from the canonical
/// subject label that `Ontology::expand_entity()` mints.
#[cfg(any(feature = "persist", feature = "query-lite"))]
fn parse_entity_id_from_label(subject_label: &str, expected_type: &str) -> Option<u64> {
let prefix = format!("entity:{expected_type}:");
subject_label
.strip_prefix(&prefix)
.and_then(|rest| rest.parse::<u64>().ok())
}

/// Errors from the transcode layer.
#[derive(Debug)]
pub enum TranscodeError {
MissingColumn(String),
UndeclaredColumn(String),
RowCountMismatch,
ShapeMismatch,
/// A triple's `entity_type_id` did not match the schema's id —
/// `triples_to_batch` rejects mixed-type input rather than silently
/// projecting the wrong rows.
EntityTypeMismatch {
expected: lance_graph_contract::ontology::EntityTypeId,
got: lance_graph_contract::ontology::EntityTypeId,
},
/// `subject_label` didn't follow the canonical `entity:{type}:{id}`
/// shape that `Ontology::expand_entity()` produces.
BadSubjectLabel(String),
#[cfg(any(feature = "persist", feature = "query-lite"))]
Arrow(arrow::error::ArrowError),
}
Expand All @@ -410,6 +580,14 @@ impl core::fmt::Display for TranscodeError {
),
TranscodeError::RowCountMismatch => write!(f, "row count mismatch across columns"),
TranscodeError::ShapeMismatch => write!(f, "fixed-size column shape mismatch"),
TranscodeError::EntityTypeMismatch { expected, got } => write!(
f,
"triple's entity_type_id ({got}) does not match the schema's ({expected})"
),
TranscodeError::BadSubjectLabel(s) => write!(
f,
"subject label `{s}` is not in the canonical `entity:{{type}}:{{id}}` form"
),
#[cfg(any(feature = "persist", feature = "query-lite"))]
TranscodeError::Arrow(e) => write!(f, "arrow error: {e}"),
}
Expand Down Expand Up @@ -521,4 +699,149 @@ mod tests {
.unwrap_err();
assert!(matches!(err, TranscodeError::UndeclaredColumn(_)));
}

// ── triples_to_batch (Phase-2-B reverse path) ────────────────────────────
#[cfg(any(feature = "persist", feature = "query-lite"))]
mod triples_round_trip {
use super::*;
use lance_graph_contract::ontology::SchemaExpander;

fn build_ontology() -> Ontology {
Ontology::builder("Test")
.schema(
Schema::builder("Patient")
.required("patient_id")
.required("name")
.optional("krankenkasse")
.build(),
)
.build()
}

#[test]
fn triples_to_batch_produces_one_row_per_subject() {
let ont = build_ontology();
let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap();
// Build triples for two patients via expand_entity (the
// canonical mint).
let mut triples = ont.expand_entity(
"Patient",
42,
&[
("patient_id", b"42-AA"),
("name", b"Anna Mueller"),
("krankenkasse", b"AOK"),
],
);
triples.extend(ont.expand_entity(
"Patient",
17,
&[("patient_id", b"17-BB"), ("name", b"Boris Stolz")],
));
let batch = triples_to_batch(&soa, &triples).unwrap();
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 5);
assert_eq!(batch.schema().field(0).name(), "id");
assert_eq!(batch.schema().field(1).name(), "entity_type");
}

#[test]
fn triples_to_batch_rejects_mixed_entity_types() {
let ont = Ontology::builder("Test")
.schema(Schema::builder("Patient").required("name").build())
.schema(Schema::builder("Diagnosis").required("code").build())
.build();
let patient_soa = OuterSchema::from_ontology(&ont, "Patient").unwrap();
// Triples for Diagnosis fed to a Patient soa — must error.
let triples = ont.expand_entity("Diagnosis", 1, &[("code", b"M51")]);
let err = triples_to_batch(&patient_soa, &triples).unwrap_err();
assert!(matches!(err, TranscodeError::EntityTypeMismatch { .. }));
}

#[test]
fn triples_to_batch_returns_empty_batch_for_empty_input() {
let ont = build_ontology();
let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap();
let batch = triples_to_batch(&soa, &[]).unwrap();
assert_eq!(batch.num_rows(), 0);
assert_eq!(batch.num_columns(), 5); // id + entity_type + 3 body
}

#[test]
fn triples_to_batch_drops_undeclared_predicates_silently() {
let ont = build_ontology();
let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap();
// expand_entity with an undeclared "weird" predicate. The
// ontology's expand_entity emits triples even for
// undeclared predicates (with PropertyKind::Free defaults);
// triples_to_batch should drop them since they're not in
// the schema's column list.
let triples = ont.expand_entity(
"Patient",
1,
&[
("patient_id", b"1-XX"),
("name", b"X"),
("weird", b"shouldn't surface"),
],
);
let batch = triples_to_batch(&soa, &triples).unwrap();
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 5);
assert!(batch.schema().fields().iter().all(|f| f.name() != "weird"));
}

#[test]
fn triples_to_batch_rejects_missing_required_column() {
let ont = build_ontology();
let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap();
// Only one of two required columns supplied for entity 1.
let triples = ont.expand_entity("Patient", 1, &[("patient_id", b"1-XX")]);
let err = triples_to_batch(&soa, &triples).unwrap_err();
assert!(matches!(err, TranscodeError::MissingColumn(_)));
}

#[test]
fn triples_to_batch_subject_label_round_trip() {
// Verifies the subject_label format that expand_entity
// mints and parse_entity_id_from_label expects agree —
// i.e. "entity:Patient:42" → 42.
let ont = build_ontology();
let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap();
let triples =
ont.expand_entity("Patient", 999_999, &[("patient_id", b"X"), ("name", b"Y")]);
let batch = triples_to_batch(&soa, &triples).unwrap();
let id_col = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(id_col.value(0), 999_999);
}

#[test]
fn triples_to_batch_preserves_lex_subject_order() {
// BTreeMap groups by lexical subject_label, so two patients
// 17 and 42 should appear in order "entity:Patient:17"
// before "entity:Patient:42" (lexical, not numeric).
let ont = build_ontology();
let soa = OuterSchema::from_ontology(&ont, "Patient").unwrap();
let mut triples =
ont.expand_entity("Patient", 42, &[("patient_id", b"42"), ("name", b"A")]);
triples.extend(ont.expand_entity(
"Patient",
17,
&[("patient_id", b"17"), ("name", b"B")],
));
let batch = triples_to_batch(&soa, &triples).unwrap();
let id_col = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
// Lexical order: "17" < "42" → row 0 is 17.
assert_eq!(id_col.value(0), 17);
assert_eq!(id_col.value(1), 42);
}
}
}
Loading