From 1208dc78b0e669c7019cf6d5ace34017bfb0a0f7 Mon Sep 17 00:00:00 2001 From: "jianjian.xie" Date: Sat, 4 Apr 2026 17:39:28 -0700 Subject: [PATCH 1/8] docs: add coding standards from upstream Lance AGENTS.md Add cross-language binding guidelines, naming conventions, error handling, testing, and dependency management standards adapted from the main Lance project. Co-Authored-By: Claude Opus 4.6 --- AGENTS.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/AGENTS.md b/AGENTS.md index 14232c3..0bbfdea 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -21,6 +21,32 @@ test C/C++ compilation: `cargo test --test compile_and_run_test -- --ignored` - Arrow C Data Interface for zero-copy data exchange. - `panic = "abort"` in release to prevent unwinding across FFI. +## Coding Standards + +### Cross-Language Bindings +- Keep C/C++ bindings as thin wrappers — centralize validation and logic in the Rust core. +- Keep parameter names consistent across all bindings (Rust, C, C++) — rename everywhere or nowhere. +- Never break public API signatures — deprecate with `#[deprecated]` and add a new method. +- Replace mutually exclusive boolean flags with a single enum/mode parameter. + +### Naming +- Name variables after what the value *is* (e.g., `partition_id` not `mask`). +- Drop redundant prefixes when the struct/module already implies the domain. +- Use `indices` (not `indexes`) consistently in all APIs and docs. + +### Error Handling +- Validate inputs and reject invalid values with descriptive errors at API boundaries — never silently clamp or adjust. +- Include full context in error messages: variable names, values, sizes, types. + +### Testing +- All bugfixes and features must have corresponding tests. +- Cover NULL/empty edge cases. +- Include multi-fragment scenarios for dataset operations. + +### Dependencies +- Prefer the standard library or existing workspace dependencies before adding new external crates. +- Keep `Cargo.lock` changes intentional; revert unrelated dependency bumps. + ## Adding New APIs 1. Add `extern "C"` function in `src/`. 2. Add declaration to `include/lance.h`. From 1edb13ab635da8b6a0db10ffbcce8b546273923d Mon Sep 17 00:00:00 2001 From: "jianjian.xie" Date: Tue, 24 Mar 2026 13:12:24 -0700 Subject: [PATCH 2/8] docs: add AGENTS.md and CLAUDE.md for coding agent guidance Co-Authored-By: Claude Opus 4.6 (1M context) --- AGENTS.md | 1 - 1 file changed, 1 deletion(-) diff --git a/AGENTS.md b/AGENTS.md index 0bbfdea..a1c0041 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -52,4 +52,3 @@ test C/C++ compilation: `cargo test --test compile_and_run_test -- --ignored` 2. Add declaration to `include/lance.h`. 3. Add C++ wrapper to `include/lance.hpp`. 4. Add test in `tests/c_api_test.rs`. - From 8867c4a69784ca51b86e3fd677f01bab2cf28bcb Mon Sep 17 00:00:00 2001 From: "jianjian.xie" Date: Fri, 3 Apr 2026 16:38:47 -0700 Subject: [PATCH 3/8] feat: add lance_write_fragments for local fragment creation Exposes a C/C++ API for writing Arrow data to Lance fragment files without committing them to a dataset manifest. Enables efficient ingestion from embedded/robotics C++ codebases where a separate Rust finalizer later commits the fragments to a remote data lake. - lance_write_fragments(uri, stream, storage_opts) writes an ArrowArrayStream to fragment files and returns a JSON array of fragment metadata (freed with lance_free_string) - Rust finalizer deserializes the JSON and commits via CommitBuilder - C++ wrapper lance::write_fragments() in lance.hpp - Two new integration tests: round-trip and null-URI error path --- Cargo.lock | 2 + Cargo.toml | 3 + include/lance.h | 23 +++++++ include/lance.hpp | 36 +++++++++++ src/fragment_writer.rs | 138 +++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 + tests/c_api_test.rs | 75 ++++++++++++++++++++++ 7 files changed, 279 insertions(+) create mode 100644 src/fragment_writer.rs diff --git a/Cargo.lock b/Cargo.lock index f22c167..4496c90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3215,8 +3215,10 @@ dependencies = [ "lance-core", "lance-datagen", "lance-io", + "lance-table", "log", "pin-project", + "serde_json", "snafu", "tempfile", "tokio", diff --git a/Cargo.toml b/Cargo.toml index fdb55c3..2f7bd0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,13 +29,16 @@ futures = "0.3" log = "0.4" pin-project = "1.0" snafu = "0.9" +serde_json = "1" [dev-dependencies] lance = "3.0.1" lance-datagen = "3.0.1" +lance-table = "3.0.1" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } arrow-array = "57.0.0" arrow-schema = "57.0.0" +serde_json = "1" tempfile = "3" [profile.release] diff --git a/include/lance.h b/include/lance.h index 082cba8..69dba95 100644 --- a/include/lance.h +++ b/include/lance.h @@ -281,6 +281,29 @@ int32_t lance_batch_to_arrow( /** Free a batch handle. */ void lance_batch_free(LanceBatch* batch); +/* ─── Fragment writer ─── */ + +/** + * Write an Arrow record batch stream to fragment files at `uri`. + * + * The data is written but NOT committed — no dataset manifest is created or + * updated. The returned JSON array can be forwarded to a Rust finalizer that + * calls CommitBuilder to publish the fragments into a dataset. + * + * @param uri Directory URI for fragment files (file://, s3://, etc.) + * @param stream Arrow C Data Interface stream; consumed by this call — + * do not use the stream after returning. + * @param storage_opts NULL-terminated key-value pairs ["k","v",NULL], or NULL. + * @return JSON array string "[{...}, ...]", one object per fragment. + * Caller must free with lance_free_string(). + * Returns NULL on error. + */ +const char* lance_write_fragments( + const char* uri, + struct ArrowArrayStream* stream, + const char* const* storage_opts +); + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/include/lance.hpp b/include/lance.hpp index 87ed26c..aeb8fbf 100644 --- a/include/lance.hpp +++ b/include/lance.hpp @@ -274,4 +274,40 @@ class Batch { } // namespace lance +// ─── Fragment writer (free functions) ──────────────────────────────────────── + +namespace lance { + +/** + * Write an Arrow record batch stream to fragment files at `uri`. + * + * Returns a JSON string describing the written fragments. + * The Rust finalizer deserializes this to commit the fragments into a dataset. + * + * @param uri Directory URI (file://, s3://, etc.) + * @param stream ArrowArrayStream to consume. Must not be used after this call. + * @param storage_opts Key-value storage options, or empty for defaults. + * @return JSON array string "[{...}]". Must be freed with lance_free_string(). + * @throws lance::Error on failure. + */ +inline const char* write_fragments( + const std::string& uri, + ArrowArrayStream* stream, + const std::vector>& storage_opts = {}) +{ + std::vector kv; + for (auto& [k, v] : storage_opts) { + kv.push_back(k.c_str()); + kv.push_back(v.c_str()); + } + kv.push_back(nullptr); + + const char* const* opts_ptr = storage_opts.empty() ? nullptr : kv.data(); + const char* json = lance_write_fragments(uri.c_str(), stream, opts_ptr); + if (!json) check_error(); + return json; +} + +} // namespace lance + #endif /* LANCE_HPP */ diff --git a/src/fragment_writer.rs b/src/fragment_writer.rs new file mode 100644 index 0000000..7d26bbc --- /dev/null +++ b/src/fragment_writer.rs @@ -0,0 +1,138 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Fragment writer C API: write Arrow data to local fragment files without committing. +//! +//! # Workflow +//! +//! **Writer process (C/C++):** +//! ```c +//! const char* json = lance_write_fragments("file:///staging/robot.lance", &stream, NULL); +//! // store json to disk / send over socket +//! lance_free_string(json); +//! ``` +//! +//! **Finalizer process (Rust):** +//! ```ignore +//! let frags: Vec = serde_json::from_str(json)?; +//! let txn = Transaction::new(0, Operation::Append { fragments: frags }, None); +//! CommitBuilder::new(uri).execute(txn).await?; +//! ``` + +use std::ffi::{CString, c_char}; +use std::sync::Arc; + +use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; +use lance::dataset::transaction::Operation; +use lance::dataset::{InsertBuilder, WriteParams}; +use lance_core::Result; +use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor}; + +use crate::error::ffi_try; +use crate::helpers; +use crate::runtime::block_on; + +/// Write an Arrow record batch stream to fragment files at `uri`. +/// +/// The data is written but **not committed** — no dataset manifest is created +/// or updated. The returned JSON array describes the written fragments and can +/// be passed to the Lance Rust API to commit them: +/// +/// ```ignore +/// let frags: Vec = serde_json::from_str(json)?; +/// let txn = lance::dataset::transaction::Transaction::new( +/// 0, +/// lance::dataset::transaction::Operation::Append { fragments: frags }, +/// None, +/// ); +/// lance::dataset::CommitBuilder::new(uri).execute(txn).await?; +/// ``` +/// +/// - `uri`: Directory URI where fragment files are written (`file://`, `s3://`, etc.) +/// - `stream`: Arrow C Data Interface stream consumed by this call. The caller must +/// not use the stream after this function returns. +/// - `storage_opts`: NULL-terminated key-value pairs `["key","val",NULL]`, or NULL. +/// +/// Returns a JSON array string `[{...}, ...]`, one object per fragment written. +/// **Caller must free with `lance_free_string()`.** +/// Returns NULL on error — check `lance_last_error_code()` / `lance_last_error_message()`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_write_fragments( + uri: *const c_char, + stream: *mut FFI_ArrowArrayStream, + storage_opts: *const *const c_char, +) -> *const c_char { + ffi_try!( + unsafe { write_fragments_inner(uri, stream, storage_opts) }, + null + ) +} + +unsafe fn write_fragments_inner( + uri: *const c_char, + stream: *mut FFI_ArrowArrayStream, + storage_opts: *const *const c_char, +) -> Result<*const c_char> { + if uri.is_null() || stream.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "uri and stream must not be NULL".into(), + location: snafu::location!(), + }); + } + + let uri_str = unsafe { helpers::parse_c_string(uri)? }.ok_or_else(|| { + lance_core::Error::InvalidInput { + source: "uri must not be empty".into(), + location: snafu::location!(), + } + })?; + + let opts = unsafe { helpers::parse_storage_options(storage_opts)? }; + + // Consume the C stream into an Arrow RecordBatch reader. + let reader = unsafe { ArrowArrayStreamReader::from_raw(stream) }.map_err(|e| { + lance_core::Error::InvalidInput { + source: e.to_string().into(), + location: snafu::location!(), + } + })?; + + let mut params = WriteParams::default(); + if !opts.is_empty() { + params.store_params = Some(ObjectStoreParams { + storage_options_accessor: Some(Arc::new( + StorageOptionsAccessor::with_static_options(opts), + )), + ..ObjectStoreParams::default() + }); + } + + let transaction = block_on( + InsertBuilder::new(uri_str) + .with_params(¶ms) + .execute_uncommitted_stream(reader), + )?; + + let fragments = match transaction.operation { + Operation::Append { fragments } => fragments, + Operation::Overwrite { fragments, .. } => fragments, + other => { + return Err(lance_core::Error::Internal { + message: format!("unexpected operation from write_fragments: {other}"), + location: snafu::location!(), + }); + } + }; + + let json = serde_json::to_string(&fragments).map_err(|e| lance_core::Error::Internal { + message: format!("failed to serialize fragments to JSON: {e}"), + location: snafu::location!(), + })?; + + let c_str = CString::new(json).map_err(|e| lance_core::Error::Internal { + message: format!("fragment JSON contained interior null byte: {e}"), + location: snafu::location!(), + })?; + + Ok(c_str.into_raw()) +} diff --git a/src/lib.rs b/src/lib.rs index e8edec2..f4966e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ mod async_dispatcher; mod batch; mod dataset; mod error; +mod fragment_writer; mod helpers; pub mod runtime; mod scanner; @@ -26,6 +27,7 @@ mod scanner; // Re-export all extern "C" symbols so they appear in the cdylib. pub use batch::*; pub use dataset::*; +pub use fragment_writer::*; pub use error::{ LanceErrorCode, lance_free_string, lance_last_error_code, lance_last_error_message, }; diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index 5a62210..97eda0e 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -19,6 +19,7 @@ use arrow_array::{Float32Array, Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; use lance::Dataset; use lance_c::*; +use lance_table::format::Fragment; /// Helper: create a test dataset in a temp directory and return its path. fn create_test_dataset() -> (tempfile::TempDir, String) { @@ -1385,3 +1386,77 @@ fn test_historical_dataset_open_specific_version() { assert_eq!(unsafe { lance_dataset_version(ds2) }, 2); unsafe { lance_dataset_close(ds2) }; } + +// --------------------------------------------------------------------------- +// Fragment writer +// --------------------------------------------------------------------------- + +/// Helper: build an FFI_ArrowArrayStream from a single RecordBatch. +fn batch_to_ffi_stream(batch: RecordBatch) -> FFI_ArrowArrayStream { + let schema = batch.schema(); + let reader = arrow::record_batch::RecordBatchIterator::new(vec![Ok(batch)], schema); + FFI_ArrowArrayStream::new(Box::new(reader)) +} + +#[test] +fn test_write_fragments_returns_json() { + let tmp = tempfile::tempdir().unwrap(); + let uri = format!("file://{}", tmp.path().to_str().unwrap()); + let c_uri = CString::new(uri.clone()).unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("val", DataType::Float32, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0])), + ], + ) + .unwrap(); + + let mut stream = batch_to_ffi_stream(batch); + let json_ptr = + unsafe { lance_write_fragments(c_uri.as_ptr(), &mut stream, ptr::null()) }; + assert!(!json_ptr.is_null(), "lance_write_fragments returned NULL"); + + let json_str = unsafe { std::ffi::CStr::from_ptr(json_ptr) } + .to_str() + .expect("JSON must be valid UTF-8"); + + // Must parse as a non-empty JSON array of Fragment objects. + let fragments: Vec = + serde_json::from_str(json_str).expect("must parse as Vec"); + assert!(!fragments.is_empty(), "expected at least one fragment"); + + // Each fragment must reference at least one data file. + for frag in &fragments { + assert!(!frag.files.is_empty(), "fragment has no data files"); + } + + // Total row count across fragments must match input. + let total_rows: usize = fragments + .iter() + .filter_map(|f| f.physical_rows) + .sum(); + assert_eq!(total_rows, 3); + + unsafe { lance_free_string(json_ptr) }; +} + +#[test] +fn test_write_fragments_null_uri_returns_null() { + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1]))], + ) + .unwrap(); + let mut stream = batch_to_ffi_stream(batch); + + let result = unsafe { lance_write_fragments(ptr::null(), &mut stream, ptr::null()) }; + assert!(result.is_null()); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); +} From 85a0a39aa0144cdf7de663d92077873d30b3cfe6 Mon Sep 17 00:00:00 2001 From: "jianjian.xie" Date: Fri, 3 Apr 2026 16:48:40 -0700 Subject: [PATCH 4/8] style: apply cargo fmt --- src/fragment_writer.rs | 6 +++--- src/lib.rs | 2 +- tests/c_api_test.rs | 15 ++++----------- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/src/fragment_writer.rs b/src/fragment_writer.rs index 7d26bbc..9519e04 100644 --- a/src/fragment_writer.rs +++ b/src/fragment_writer.rs @@ -100,9 +100,9 @@ unsafe fn write_fragments_inner( let mut params = WriteParams::default(); if !opts.is_empty() { params.store_params = Some(ObjectStoreParams { - storage_options_accessor: Some(Arc::new( - StorageOptionsAccessor::with_static_options(opts), - )), + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + opts, + ))), ..ObjectStoreParams::default() }); } diff --git a/src/lib.rs b/src/lib.rs index f4966e2..b04319e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,8 +27,8 @@ mod scanner; // Re-export all extern "C" symbols so they appear in the cdylib. pub use batch::*; pub use dataset::*; -pub use fragment_writer::*; pub use error::{ LanceErrorCode, lance_free_string, lance_last_error_code, lance_last_error_message, }; +pub use fragment_writer::*; pub use scanner::*; diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index 97eda0e..ad3b59d 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -1418,8 +1418,7 @@ fn test_write_fragments_returns_json() { .unwrap(); let mut stream = batch_to_ffi_stream(batch); - let json_ptr = - unsafe { lance_write_fragments(c_uri.as_ptr(), &mut stream, ptr::null()) }; + let json_ptr = unsafe { lance_write_fragments(c_uri.as_ptr(), &mut stream, ptr::null()) }; assert!(!json_ptr.is_null(), "lance_write_fragments returned NULL"); let json_str = unsafe { std::ffi::CStr::from_ptr(json_ptr) } @@ -1437,10 +1436,7 @@ fn test_write_fragments_returns_json() { } // Total row count across fragments must match input. - let total_rows: usize = fragments - .iter() - .filter_map(|f| f.physical_rows) - .sum(); + let total_rows: usize = fragments.iter().filter_map(|f| f.physical_rows).sum(); assert_eq!(total_rows, 3); unsafe { lance_free_string(json_ptr) }; @@ -1449,11 +1445,8 @@ fn test_write_fragments_returns_json() { #[test] fn test_write_fragments_null_uri_returns_null() { let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(Int32Array::from(vec![1]))], - ) - .unwrap(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1]))]).unwrap(); let mut stream = batch_to_ffi_stream(batch); let result = unsafe { lance_write_fragments(ptr::null(), &mut stream, ptr::null()) }; From fdc316b86cff3ce5827e65b5d5ff70e5f5ba5f89 Mon Sep 17 00:00:00 2001 From: "jianjian.xie" Date: Sat, 4 Apr 2026 20:34:33 -0700 Subject: [PATCH 5/8] refactor: lance_write_fragments returns int32_t, writes sidecar metadata MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - No dynamic allocation returned to C++ caller — returns 0/-1 only - Fragment metadata written as JSON sidecar to /_fragments/.json - Requires explicit ArrowSchema* parameter for fail-fast schema validation - Rust finalizer reads sidecar files to commit via CommitBuilder - Three tests: round-trip with sidecar, null-args error, schema mismatch --- Cargo.lock | 2 + Cargo.toml | 2 + include/lance.h | 14 ++--- include/lance.hpp | 16 +++--- src/fragment_writer.rs | 119 ++++++++++++++++++++++++++--------------- tests/c_api_test.rs | 76 +++++++++++++++++++------- 6 files changed, 155 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4496c90..859ea03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3217,11 +3217,13 @@ dependencies = [ "lance-io", "lance-table", "log", + "object_store", "pin-project", "serde_json", "snafu", "tempfile", "tokio", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2f7bd0f..1a29ee5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,8 @@ log = "0.4" pin-project = "1.0" snafu = "0.9" serde_json = "1" +object_store = "0.12" +uuid = { version = "1", features = ["v4"] } [dev-dependencies] lance = "3.0.1" diff --git a/include/lance.h b/include/lance.h index 69dba95..ad12f20 100644 --- a/include/lance.h +++ b/include/lance.h @@ -287,19 +287,21 @@ void lance_batch_free(LanceBatch* batch); * Write an Arrow record batch stream to fragment files at `uri`. * * The data is written but NOT committed — no dataset manifest is created or - * updated. The returned JSON array can be forwarded to a Rust finalizer that - * calls CommitBuilder to publish the fragments into a dataset. + * updated. Fragment metadata is written as a JSON sidecar file under + * `/_fragments/.json`. A Rust finalizer can read these files, + * deserialize the fragments, and commit them via CommitBuilder. * * @param uri Directory URI for fragment files (file://, s3://, etc.) + * @param schema Required Arrow schema. The stream schema must match + * or the call fails with LANCE_ERR_INVALID_ARGUMENT. * @param stream Arrow C Data Interface stream; consumed by this call — * do not use the stream after returning. * @param storage_opts NULL-terminated key-value pairs ["k","v",NULL], or NULL. - * @return JSON array string "[{...}, ...]", one object per fragment. - * Caller must free with lance_free_string(). - * Returns NULL on error. + * @return 0 on success, -1 on error */ -const char* lance_write_fragments( +int32_t lance_write_fragments( const char* uri, + const struct ArrowSchema* schema, struct ArrowArrayStream* stream, const char* const* storage_opts ); diff --git a/include/lance.hpp b/include/lance.hpp index aeb8fbf..ea25129 100644 --- a/include/lance.hpp +++ b/include/lance.hpp @@ -281,17 +281,19 @@ namespace lance { /** * Write an Arrow record batch stream to fragment files at `uri`. * - * Returns a JSON string describing the written fragments. - * The Rust finalizer deserializes this to commit the fragments into a dataset. + * Fragment metadata is written as a JSON sidecar under `/_fragments/`. + * A Rust finalizer reads these files and commits via CommitBuilder. + * No dynamic memory is returned to the caller. * * @param uri Directory URI (file://, s3://, etc.) + * @param schema Required Arrow schema — stream schema must match. * @param stream ArrowArrayStream to consume. Must not be used after this call. * @param storage_opts Key-value storage options, or empty for defaults. - * @return JSON array string "[{...}]". Must be freed with lance_free_string(). * @throws lance::Error on failure. */ -inline const char* write_fragments( +inline void write_fragments( const std::string& uri, + const ArrowSchema* schema, ArrowArrayStream* stream, const std::vector>& storage_opts = {}) { @@ -303,9 +305,9 @@ inline const char* write_fragments( kv.push_back(nullptr); const char* const* opts_ptr = storage_opts.empty() ? nullptr : kv.data(); - const char* json = lance_write_fragments(uri.c_str(), stream, opts_ptr); - if (!json) check_error(); - return json; + if (lance_write_fragments(uri.c_str(), schema, stream, opts_ptr) != 0) { + check_error(); + } } } // namespace lance diff --git a/src/fragment_writer.rs b/src/fragment_writer.rs index 9519e04..199b709 100644 --- a/src/fragment_writer.rs +++ b/src/fragment_writer.rs @@ -7,75 +7,73 @@ //! //! **Writer process (C/C++):** //! ```c -//! const char* json = lance_write_fragments("file:///staging/robot.lance", &stream, NULL); -//! // store json to disk / send over socket -//! lance_free_string(json); +//! int32_t rc = lance_write_fragments("file:///staging/robot.lance", &schema, &stream, NULL); //! ``` //! //! **Finalizer process (Rust):** //! ```ignore -//! let frags: Vec = serde_json::from_str(json)?; +//! // Read sidecar metadata written by lance_write_fragments +//! let json = std::fs::read_to_string("staging/robot.lance/_fragments/xxx.json")?; +//! let frags: Vec = serde_json::from_str(&json)?; //! let txn = Transaction::new(0, Operation::Append { fragments: frags }, None); //! CommitBuilder::new(uri).execute(txn).await?; //! ``` -use std::ffi::{CString, c_char}; +use std::ffi::c_char; use std::sync::Arc; +use arrow::ffi::FFI_ArrowSchema; use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; +use arrow::record_batch::RecordBatchReader; +use arrow_schema::Schema as ArrowSchema; use lance::dataset::transaction::Operation; use lance::dataset::{InsertBuilder, WriteParams}; use lance_core::Result; -use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor}; +use lance_io::object_store::{ObjectStore, ObjectStoreParams, StorageOptionsAccessor}; use crate::error::ffi_try; use crate::helpers; use crate::runtime::block_on; +/// Directory name for fragment metadata sidecar files. +const FRAGMENTS_META_DIR: &str = "_fragments"; + /// Write an Arrow record batch stream to fragment files at `uri`. /// /// The data is written but **not committed** — no dataset manifest is created -/// or updated. The returned JSON array describes the written fragments and can -/// be passed to the Lance Rust API to commit them: -/// -/// ```ignore -/// let frags: Vec = serde_json::from_str(json)?; -/// let txn = lance::dataset::transaction::Transaction::new( -/// 0, -/// lance::dataset::transaction::Operation::Append { fragments: frags }, -/// None, -/// ); -/// lance::dataset::CommitBuilder::new(uri).execute(txn).await?; -/// ``` +/// or updated. Fragment metadata is written as a JSON sidecar file under +/// `/_fragments/.json`, which a Rust finalizer can read and commit. /// /// - `uri`: Directory URI where fragment files are written (`file://`, `s3://`, etc.) -/// - `stream`: Arrow C Data Interface stream consumed by this call. The caller must -/// not use the stream after this function returns. +/// - `schema`: Required Arrow schema. The stream's schema must match; the call +/// fails fast with `LANCE_ERR_INVALID_ARGUMENT` on mismatch. +/// - `stream`: Arrow C Data Interface stream consumed by this call. The caller +/// must not use the stream after this function returns. /// - `storage_opts`: NULL-terminated key-value pairs `["key","val",NULL]`, or NULL. /// -/// Returns a JSON array string `[{...}, ...]`, one object per fragment written. -/// **Caller must free with `lance_free_string()`.** -/// Returns NULL on error — check `lance_last_error_code()` / `lance_last_error_message()`. +/// Returns 0 on success, -1 on error. #[unsafe(no_mangle)] pub unsafe extern "C" fn lance_write_fragments( uri: *const c_char, + schema: *const FFI_ArrowSchema, stream: *mut FFI_ArrowArrayStream, storage_opts: *const *const c_char, -) -> *const c_char { +) -> i32 { ffi_try!( - unsafe { write_fragments_inner(uri, stream, storage_opts) }, - null + unsafe { write_fragments_inner(uri, schema, stream, storage_opts) }, + neg ) } unsafe fn write_fragments_inner( uri: *const c_char, + schema: *const FFI_ArrowSchema, stream: *mut FFI_ArrowArrayStream, storage_opts: *const *const c_char, -) -> Result<*const c_char> { - if uri.is_null() || stream.is_null() { +) -> Result { + if uri.is_null() || schema.is_null() || stream.is_null() { return Err(lance_core::Error::InvalidInput { - source: "uri and stream must not be NULL".into(), + source: "uri, schema, and stream must not be NULL".into(), location: snafu::location!(), }); } @@ -87,6 +85,14 @@ unsafe fn write_fragments_inner( } })?; + // Import the caller-provided schema from the Arrow C Data Interface. + let expected_schema = ArrowSchema::try_from(unsafe { &*schema }).map_err(|e| { + lance_core::Error::InvalidInput { + source: format!("invalid schema: {e}").into(), + location: snafu::location!(), + } + })?; + let opts = unsafe { helpers::parse_storage_options(storage_opts)? }; // Consume the C stream into an Arrow RecordBatch reader. @@ -97,14 +103,32 @@ unsafe fn write_fragments_inner( } })?; - let mut params = WriteParams::default(); - if !opts.is_empty() { - params.store_params = Some(ObjectStoreParams { + // Fail fast: compare the stream schema against the caller-provided schema. + let stream_schema = reader.schema(); + if stream_schema.fields() != expected_schema.fields() { + return Err(lance_core::Error::InvalidInput { + source: format!( + "stream schema does not match the provided schema.\n expected: {expected_schema}\n got: {stream_schema}" + ) + .into(), + location: snafu::location!(), + }); + } + + let store_params = if opts.is_empty() { + None + } else { + Some(ObjectStoreParams { storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( - opts, + opts.clone(), ))), ..ObjectStoreParams::default() - }); + }) + }; + + let mut params = WriteParams::default(); + if let Some(ref sp) = store_params { + params.store_params = Some(sp.clone()); } let transaction = block_on( @@ -124,15 +148,24 @@ unsafe fn write_fragments_inner( } }; - let json = serde_json::to_string(&fragments).map_err(|e| lance_core::Error::Internal { - message: format!("failed to serialize fragments to JSON: {e}"), - location: snafu::location!(), - })?; + // Serialize fragment metadata and write as a sidecar JSON file. + let json = + serde_json::to_string_pretty(&fragments).map_err(|e| lance_core::Error::Internal { + message: format!("failed to serialize fragments to JSON: {e}"), + location: snafu::location!(), + })?; - let c_str = CString::new(json).map_err(|e| lance_core::Error::Internal { - message: format!("fragment JSON contained interior null byte: {e}"), - location: snafu::location!(), - })?; + let (object_store, base_path) = block_on(ObjectStore::from_uri_and_params( + Arc::new(Default::default()), + uri_str, + &store_params.unwrap_or_default(), + ))?; + + let sidecar_filename = format!("{}.json", uuid::Uuid::new_v4()); + let sidecar_path = base_path + .child(FRAGMENTS_META_DIR) + .child(sidecar_filename.as_str()); + block_on(object_store.put(&sidecar_path, json.as_bytes()))?; - Ok(c_str.into_raw()) + Ok(0) } diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index ad3b59d..a0e271c 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -1398,8 +1398,13 @@ fn batch_to_ffi_stream(batch: RecordBatch) -> FFI_ArrowArrayStream { FFI_ArrowArrayStream::new(Box::new(reader)) } +/// Helper: export an Arrow Schema to FFI_ArrowSchema. +fn schema_to_ffi(schema: &Schema) -> FFI_ArrowSchema { + FFI_ArrowSchema::try_from(schema).expect("schema export must succeed") +} + #[test] -fn test_write_fragments_returns_json() { +fn test_write_fragments_writes_sidecar() { let tmp = tempfile::tempdir().unwrap(); let uri = format!("file://{}", tmp.path().to_str().unwrap()); let c_uri = CString::new(uri.clone()).unwrap(); @@ -1417,39 +1422,74 @@ fn test_write_fragments_returns_json() { ) .unwrap(); + let ffi_schema = schema_to_ffi(&schema); let mut stream = batch_to_ffi_stream(batch); - let json_ptr = unsafe { lance_write_fragments(c_uri.as_ptr(), &mut stream, ptr::null()) }; - assert!(!json_ptr.is_null(), "lance_write_fragments returned NULL"); + let rc = + unsafe { lance_write_fragments(c_uri.as_ptr(), &ffi_schema, &mut stream, ptr::null()) }; + assert_eq!(rc, 0, "lance_write_fragments failed"); - let json_str = unsafe { std::ffi::CStr::from_ptr(json_ptr) } - .to_str() - .expect("JSON must be valid UTF-8"); + // A sidecar JSON file should exist under _fragments/. + let fragments_dir = tmp.path().join("_fragments"); + assert!(fragments_dir.exists(), "_fragments dir must exist"); - // Must parse as a non-empty JSON array of Fragment objects. + let entries: Vec<_> = std::fs::read_dir(&fragments_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert_eq!(entries.len(), 1, "expected exactly one sidecar file"); + + let json = std::fs::read_to_string(entries[0].path()).unwrap(); let fragments: Vec = - serde_json::from_str(json_str).expect("must parse as Vec"); + serde_json::from_str(&json).expect("must parse as Vec"); assert!(!fragments.is_empty(), "expected at least one fragment"); - // Each fragment must reference at least one data file. - for frag in &fragments { - assert!(!frag.files.is_empty(), "fragment has no data files"); - } - // Total row count across fragments must match input. let total_rows: usize = fragments.iter().filter_map(|f| f.physical_rows).sum(); assert_eq!(total_rows, 3); - - unsafe { lance_free_string(json_ptr) }; } #[test] -fn test_write_fragments_null_uri_returns_null() { +fn test_write_fragments_null_args_returns_error() { let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1]))]).unwrap(); let mut stream = batch_to_ffi_stream(batch); - let result = unsafe { lance_write_fragments(ptr::null(), &mut stream, ptr::null()) }; - assert!(result.is_null()); + // NULL uri + let ffi_schema = schema_to_ffi(&schema); + let result = + unsafe { lance_write_fragments(ptr::null(), &ffi_schema, &mut stream, ptr::null()) }; + assert_eq!(result, -1); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); +} + +#[test] +fn test_write_fragments_schema_mismatch() { + let tmp = tempfile::tempdir().unwrap(); + let uri = format!("file://{}", tmp.path().to_str().unwrap()); + let c_uri = CString::new(uri).unwrap(); + + // Stream has columns (id: Int32, val: Float32) + let stream_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("val", DataType::Float32, true), + ])); + let batch = RecordBatch::try_new( + stream_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(Float32Array::from(vec![1.0])), + ], + ) + .unwrap(); + let mut stream = batch_to_ffi_stream(batch); + + // But the declared schema only has (id: Int32) — mismatch. + let declared_schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); + let ffi_schema = schema_to_ffi(&declared_schema); + + let rc = + unsafe { lance_write_fragments(c_uri.as_ptr(), &ffi_schema, &mut stream, ptr::null()) }; + assert_eq!(rc, -1, "should fail on schema mismatch"); assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); } From 4af7ef79536ed434a816da3639fc02aee4ab1139 Mon Sep 17 00:00:00 2001 From: "jianjian.xie" Date: Sat, 4 Apr 2026 20:44:26 -0700 Subject: [PATCH 6/8] simplify: drop sidecar metadata, add usage context in comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove sidecar JSON write — Rust finalizer reconstructs Fragment metadata from .lance file footers instead - Remove serde_json, object_store, uuid dependencies (no longer needed) - Add robotics/embedded pipeline context to doc comments in both Rust source and C/C++ headers - Tests verify data files are written under data/ --- Cargo.lock | 4 -- Cargo.toml | 5 --- include/lance.h | 10 +++-- include/lance.hpp | 4 +- src/fragment_writer.rs | 85 ++++++++++++++---------------------------- tests/c_api_test.rs | 26 +++++-------- 6 files changed, 46 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 859ea03..f22c167 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3215,15 +3215,11 @@ dependencies = [ "lance-core", "lance-datagen", "lance-io", - "lance-table", "log", - "object_store", "pin-project", - "serde_json", "snafu", "tempfile", "tokio", - "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1a29ee5..fdb55c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,18 +29,13 @@ futures = "0.3" log = "0.4" pin-project = "1.0" snafu = "0.9" -serde_json = "1" -object_store = "0.12" -uuid = { version = "1", features = ["v4"] } [dev-dependencies] lance = "3.0.1" lance-datagen = "3.0.1" -lance-table = "3.0.1" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } arrow-array = "57.0.0" arrow-schema = "57.0.0" -serde_json = "1" tempfile = "3" [profile.release] diff --git a/include/lance.h b/include/lance.h index ad12f20..c5da5cc 100644 --- a/include/lance.h +++ b/include/lance.h @@ -286,10 +286,14 @@ void lance_batch_free(LanceBatch* batch); /** * Write an Arrow record batch stream to fragment files at `uri`. * + * Designed for embedded / robotics C++ pipelines: write Lance fragment files + * locally with minimal overhead. A separate Rust finalizer process later + * reconstructs Fragment metadata from the file footers and commits them + * into a dataset on a remote data lake via CommitBuilder. + * * The data is written but NOT committed — no dataset manifest is created or - * updated. Fragment metadata is written as a JSON sidecar file under - * `/_fragments/.json`. A Rust finalizer can read these files, - * deserialize the fragments, and commit them via CommitBuilder. + * updated. The written .lance files under /data/ contain full metadata + * in their footers (schema with field IDs, row counts, format version). * * @param uri Directory URI for fragment files (file://, s3://, etc.) * @param schema Required Arrow schema. The stream schema must match diff --git a/include/lance.hpp b/include/lance.hpp index ea25129..b6bc47e 100644 --- a/include/lance.hpp +++ b/include/lance.hpp @@ -281,8 +281,8 @@ namespace lance { /** * Write an Arrow record batch stream to fragment files at `uri`. * - * Fragment metadata is written as a JSON sidecar under `/_fragments/`. - * A Rust finalizer reads these files and commits via CommitBuilder. + * Data files are written under `/data/`. A Rust finalizer reconstructs + * Fragment metadata from the file footers and commits via CommitBuilder. * No dynamic memory is returned to the caller. * * @param uri Directory URI (file://, s3://, etc.) diff --git a/src/fragment_writer.rs b/src/fragment_writer.rs index 199b709..c76c2f3 100644 --- a/src/fragment_writer.rs +++ b/src/fragment_writer.rs @@ -3,20 +3,25 @@ //! Fragment writer C API: write Arrow data to local fragment files without committing. //! -//! # Workflow +//! Designed for embedded / robotics C++ pipelines where sensor data is ingested +//! at high frequency on edge devices. The C++ process writes Lance fragment files +//! locally with minimal overhead (no manifest, no coordination). A separate Rust +//! finalizer process later reads the file footers, reconstructs fragment metadata, +//! and commits them into a dataset on a remote data lake (S3, GCS, etc.). //! -//! **Writer process (C/C++):** +//! # Two-process workflow +//! +//! **1. Writer process (C/C++ on edge device):** //! ```c -//! int32_t rc = lance_write_fragments("file:///staging/robot.lance", &schema, &stream, NULL); +//! // Stream sensor batches into local fragment files. +//! int32_t rc = lance_write_fragments( +//! "file:///data/staging/robot.lance", &schema, &stream, NULL); //! ``` //! -//! **Finalizer process (Rust):** +//! **2. Finalizer process (Rust, runs periodically or on sync):** //! ```ignore -//! // Read sidecar metadata written by lance_write_fragments -//! let json = std::fs::read_to_string("staging/robot.lance/_fragments/xxx.json")?; -//! let frags: Vec = serde_json::from_str(&json)?; -//! let txn = Transaction::new(0, Operation::Append { fragments: frags }, None); -//! CommitBuilder::new(uri).execute(txn).await?; +//! // Scan data/*.lance files, reconstruct Fragment metadata from file footers, +//! // then commit via CommitBuilder to publish to the data lake. //! ``` use std::ffi::c_char; @@ -26,23 +31,21 @@ use arrow::ffi::FFI_ArrowSchema; use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; use arrow::record_batch::RecordBatchReader; use arrow_schema::Schema as ArrowSchema; -use lance::dataset::transaction::Operation; use lance::dataset::{InsertBuilder, WriteParams}; use lance_core::Result; -use lance_io::object_store::{ObjectStore, ObjectStoreParams, StorageOptionsAccessor}; +use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor}; use crate::error::ffi_try; use crate::helpers; use crate::runtime::block_on; -/// Directory name for fragment metadata sidecar files. -const FRAGMENTS_META_DIR: &str = "_fragments"; - /// Write an Arrow record batch stream to fragment files at `uri`. /// /// The data is written but **not committed** — no dataset manifest is created -/// or updated. Fragment metadata is written as a JSON sidecar file under -/// `/_fragments/.json`, which a Rust finalizer can read and commit. +/// or updated. The written `.lance` files under `/data/` contain full +/// metadata in their footers (schema with field IDs, row counts, format version). +/// A Rust finalizer can reconstruct `Fragment` metadata by reading these footers +/// and commit via `CommitBuilder`. /// /// - `uri`: Directory URI where fragment files are written (`file://`, `s3://`, etc.) /// - `schema`: Required Arrow schema. The stream's schema must match; the call @@ -115,57 +118,23 @@ unsafe fn write_fragments_inner( }); } - let store_params = if opts.is_empty() { - None - } else { - Some(ObjectStoreParams { + let mut params = WriteParams::default(); + if !opts.is_empty() { + params.store_params = Some(ObjectStoreParams { storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( - opts.clone(), + opts, ))), ..ObjectStoreParams::default() - }) - }; - - let mut params = WriteParams::default(); - if let Some(ref sp) = store_params { - params.store_params = Some(sp.clone()); + }); } - let transaction = block_on( + // Write fragment data files. The Transaction result is discarded — + // the finalizer reconstructs Fragment metadata from the file footers. + let _transaction = block_on( InsertBuilder::new(uri_str) .with_params(¶ms) .execute_uncommitted_stream(reader), )?; - let fragments = match transaction.operation { - Operation::Append { fragments } => fragments, - Operation::Overwrite { fragments, .. } => fragments, - other => { - return Err(lance_core::Error::Internal { - message: format!("unexpected operation from write_fragments: {other}"), - location: snafu::location!(), - }); - } - }; - - // Serialize fragment metadata and write as a sidecar JSON file. - let json = - serde_json::to_string_pretty(&fragments).map_err(|e| lance_core::Error::Internal { - message: format!("failed to serialize fragments to JSON: {e}"), - location: snafu::location!(), - })?; - - let (object_store, base_path) = block_on(ObjectStore::from_uri_and_params( - Arc::new(Default::default()), - uri_str, - &store_params.unwrap_or_default(), - ))?; - - let sidecar_filename = format!("{}.json", uuid::Uuid::new_v4()); - let sidecar_path = base_path - .child(FRAGMENTS_META_DIR) - .child(sidecar_filename.as_str()); - block_on(object_store.put(&sidecar_path, json.as_bytes()))?; - Ok(0) } diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index a0e271c..a359e3b 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -19,7 +19,6 @@ use arrow_array::{Float32Array, Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; use lance::Dataset; use lance_c::*; -use lance_table::format::Fragment; /// Helper: create a test dataset in a temp directory and return its path. fn create_test_dataset() -> (tempfile::TempDir, String) { @@ -1404,7 +1403,7 @@ fn schema_to_ffi(schema: &Schema) -> FFI_ArrowSchema { } #[test] -fn test_write_fragments_writes_sidecar() { +fn test_write_fragments_creates_data_files() { let tmp = tempfile::tempdir().unwrap(); let uri = format!("file://{}", tmp.path().to_str().unwrap()); let c_uri = CString::new(uri.clone()).unwrap(); @@ -1428,24 +1427,19 @@ fn test_write_fragments_writes_sidecar() { unsafe { lance_write_fragments(c_uri.as_ptr(), &ffi_schema, &mut stream, ptr::null()) }; assert_eq!(rc, 0, "lance_write_fragments failed"); - // A sidecar JSON file should exist under _fragments/. - let fragments_dir = tmp.path().join("_fragments"); - assert!(fragments_dir.exists(), "_fragments dir must exist"); + // Data files should exist under data/. + let data_dir = tmp.path().join("data"); + assert!(data_dir.exists(), "data/ dir must exist"); - let entries: Vec<_> = std::fs::read_dir(&fragments_dir) + let lance_files: Vec<_> = std::fs::read_dir(&data_dir) .unwrap() .filter_map(|e| e.ok()) + .filter(|e| e.path().extension().is_some_and(|ext| ext == "lance")) .collect(); - assert_eq!(entries.len(), 1, "expected exactly one sidecar file"); - - let json = std::fs::read_to_string(entries[0].path()).unwrap(); - let fragments: Vec = - serde_json::from_str(&json).expect("must parse as Vec"); - assert!(!fragments.is_empty(), "expected at least one fragment"); - - // Total row count across fragments must match input. - let total_rows: usize = fragments.iter().filter_map(|f| f.physical_rows).sum(); - assert_eq!(total_rows, 3); + assert!( + !lance_files.is_empty(), + "expected at least one .lance data file" + ); } #[test] From cdcdf2357b3cc14bd3143776fcacd60d867a1321 Mon Sep 17 00:00:00 2001 From: "jianjian.xie" Date: Sat, 4 Apr 2026 20:53:17 -0700 Subject: [PATCH 7/8] test: add end-to-end robotics scenario test Simulates the full ingestion pipeline: 1. C++ edge device writes sensor data via lance_write_fragments 2. Rust finalizer scans .lance files, reconstructs Fragment metadata from file footers (schema with field IDs, row counts, format version) 3. Commits fragments into a dataset via CommitBuilder 4. Verifies the committed dataset is readable with correct row count --- Cargo.lock | 2 + Cargo.toml | 2 + tests/c_api_test.rs | 168 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 172 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index f22c167..48cf1f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3214,7 +3214,9 @@ dependencies = [ "lance", "lance-core", "lance-datagen", + "lance-file", "lance-io", + "lance-table", "log", "pin-project", "snafu", diff --git a/Cargo.toml b/Cargo.toml index fdb55c3..defe092 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,8 @@ snafu = "0.9" [dev-dependencies] lance = "3.0.1" lance-datagen = "3.0.1" +lance-file = "3.0.1" +lance-table = "3.0.1" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } arrow-array = "57.0.0" arrow-schema = "57.0.0" diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index a359e3b..4665b17 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -1487,3 +1487,171 @@ fn test_write_fragments_schema_mismatch() { assert_eq!(rc, -1, "should fail on schema mismatch"); assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); } + +// --------------------------------------------------------------------------- +// End-to-end robotics scenario: C++ writes fragments, Rust finalizer commits +// --------------------------------------------------------------------------- + +/// Simulate the full robotics ingestion pipeline: +/// 1. C++ edge device writes sensor data via lance_write_fragments +/// 2. Separate Rust finalizer scans .lance files, reconstructs Fragment +/// metadata from file footers, and commits into a dataset +/// 3. The committed dataset is readable and contains the original data +#[test] +fn test_robotics_e2e_write_then_finalize() { + use lance::dataset::transaction::{Operation, Transaction}; + use lance::dataset::{CommitBuilder, WriteDestination}; + use lance_file::reader::{CachedFileMetadata, FileReader as LanceFileReader}; + use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; + use lance_io::utils::CachedFileSize; + use lance_table::format::{DataFile, Fragment}; + + // ── Step 1: "C++ edge device" writes fragment data files ── + + let staging_dir = tempfile::tempdir().unwrap(); + let staging_uri = format!("file://{}", staging_dir.path().to_str().unwrap()); + let c_uri = CString::new(staging_uri.clone()).unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("sensor_id", DataType::Int32, false), + Field::new("temperature", DataType::Float32, true), + Field::new("label", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(Float32Array::from(vec![20.1, 21.5, 19.8, 22.0, 20.5])), + Arc::new(StringArray::from(vec![ + "front", "rear", "left", "right", "top", + ])), + ], + ) + .unwrap(); + + let ffi_schema = schema_to_ffi(&schema); + let mut stream = batch_to_ffi_stream(batch); + let rc = + unsafe { lance_write_fragments(c_uri.as_ptr(), &ffi_schema, &mut stream, ptr::null()) }; + assert_eq!(rc, 0, "lance_write_fragments failed"); + + // ── Step 2: "Rust finalizer" scans files and reconstructs fragments ── + + let dataset_dir = tempfile::tempdir().unwrap(); + let dataset_uri = dataset_dir + .path() + .join("robot.lance") + .to_str() + .unwrap() + .to_string(); + + let fragments = lance_c::runtime::block_on(async { + let (object_store, _base_path) = + lance_io::object_store::ObjectStore::from_uri(&staging_uri) + .await + .unwrap(); + let scan_scheduler = ScanScheduler::new( + object_store.clone(), + SchedulerConfig::max_bandwidth(&object_store), + ); + + // Discover .lance files in data/ directory + let data_dir = staging_dir.path().join("data"); + let lance_files: Vec<_> = std::fs::read_dir(&data_dir) + .unwrap() + .filter_map(|e| e.ok()) + .filter(|e| e.path().extension().is_some_and(|ext| ext == "lance")) + .collect(); + assert!(!lance_files.is_empty()); + + let mut fragments = Vec::new(); + for (frag_idx, entry) in lance_files.iter().enumerate() { + let filename = entry.file_name().to_string_lossy().to_string(); + let file_path = lance_io::object_store::ObjectStore::extract_path_from_uri( + Arc::new(Default::default()), + &format!("{}/data/{}", staging_uri, filename), + ) + .unwrap(); + + let file_size: CachedFileSize = Default::default(); + let file_scheduler = scan_scheduler + .open_file(&file_path, &file_size) + .await + .unwrap(); + let meta: CachedFileMetadata = LanceFileReader::read_all_metadata(&file_scheduler) + .await + .unwrap(); + + // Reconstruct DataFile from footer metadata + let field_ids: Vec = meta.file_schema.field_ids(); + let column_indices: Vec = (0..field_ids.len() as i32).collect(); + + let data_file = DataFile::new( + format!("data/{}", filename), + field_ids, + column_indices, + meta.major_version as u32, + meta.minor_version as u32, + None, // file_size_bytes + None, // base_id + ); + + let mut fragment = Fragment::new(frag_idx as u64); + fragment.files.push(data_file); + fragment.physical_rows = Some(meta.num_rows as usize); + fragments.push(fragment); + } + fragments + }); + + assert!(!fragments.is_empty()); + let total_rows: usize = fragments.iter().filter_map(|f| f.physical_rows).sum(); + assert_eq!(total_rows, 5); + + // ── Step 3: Commit fragments into a new dataset ── + + // Copy data files to the dataset directory first + let src_data = staging_dir.path().join("data"); + let dst_data = dataset_dir.path().join("robot.lance").join("data"); + std::fs::create_dir_all(&dst_data).unwrap(); + for entry in std::fs::read_dir(&src_data).unwrap() { + let entry = entry.unwrap(); + std::fs::copy(entry.path(), dst_data.join(entry.file_name())).unwrap(); + } + + // Build a lance schema from the arrow schema for the Overwrite operation + let lance_schema = lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap(); + + let transaction = Transaction::new( + 0, + Operation::Overwrite { + fragments, + schema: lance_schema, + config_upsert_values: None, + initial_bases: None, + }, + None, + ); + + lance_c::runtime::block_on(async { + CommitBuilder::new(WriteDestination::Uri(&dataset_uri)) + .execute(transaction) + .await + .unwrap(); + }); + + // ── Step 4: Verify the committed dataset is readable ── + + let c_ds_uri = CString::new(dataset_uri.clone()).unwrap(); + let ds = unsafe { lance_dataset_open(c_ds_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null(), "failed to open committed dataset"); + + let count = unsafe { lance_dataset_count_rows(ds) }; + assert_eq!(count, 5, "committed dataset should have 5 rows"); + + let frag_count = unsafe { lance_dataset_fragment_count(ds) }; + assert_eq!(frag_count, 1, "committed dataset should have 1 fragment"); + + unsafe { lance_dataset_close(ds) }; +} From 4be919b4be03da4a4ce9bf630a7dc3b29efa3728 Mon Sep 17 00:00:00 2001 From: "jianjian.xie" Date: Sun, 5 Apr 2026 22:16:53 -0700 Subject: [PATCH 8/8] fix: rustdoc invalid_rust_codeblocks warning in fragment_writer --- src/fragment_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fragment_writer.rs b/src/fragment_writer.rs index c76c2f3..a13693e 100644 --- a/src/fragment_writer.rs +++ b/src/fragment_writer.rs @@ -19,7 +19,7 @@ //! ``` //! //! **2. Finalizer process (Rust, runs periodically or on sync):** -//! ```ignore +//! ```text //! // Scan data/*.lance files, reconstruct Fragment metadata from file footers, //! // then commit via CommitBuilder to publish to the data lake. //! ```