Skip to content

Commit d71d3d3

Browse files
a10ypalaska
andauthored
pluggable registry for input/export arrow kernels (#7824)
## Summary Adds a pluggable `ArrowSession` registry on `VortexSession` for round-tripping Vortex extension types in and out of Arrow extension types. Unblocks Arrow round-trip for `arrow.uuid` today, with `arrow.parquet.variant`, GeoArrow, and tensor types as the next consumers. Part of #7686. ## API changes The session exposes two trait-driven plugin slots: - `ArrowExportVTable` — dispatched by **target Arrow extension name** (`ARROW:extension:name`). Implementations turn a Vortex `ArrayRef` into an Arrow `ArrayRef` shaped to the requested `Field`. Also provides `to_arrow_field` for schema inference when only a Vortex `DType` is in hand. - `ArrowImportVTable` — dispatched by **source Arrow extension name** carried on the incoming `Field`. Implementations turn an Arrow `ArrayRef` back into a Vortex `ArrayRef`, including any storage re-encoding (e.g. `FixedSizeBinary[16]` → `FixedSizeList<u8; 16>` for UUID). Both traits return `Unsupported(input)` to defer to the next plugin or to the canonical fallback, so multiple plugins can register against the same key and probe in order. New session entry points (`vortex-array/src/arrow/session.rs`): - `ArrowSession::to_arrow_field` / `to_arrow_schema` — Vortex `DType` → Arrow `Field`/`Schema`, recursing into containers so nested extension fields go through the registered plugin. - `ArrowSession::from_arrow_field` / `from_arrow_schema` — inverse direction, plugin-aware. - `ArrowSession::from_arrow_record_batch` / `execute_record_batch` — `RecordBatch` round-trip. - `ArrowSessionExt` extension trait so any `SessionExt` can call `session.arrow().…`. The default session pre-registers the builtin UUID plugin (`vortex-array/src/extension/uuid/arrow.rs`). ## What's *not* in the plugin layer `Date`, `Time`, and `Timestamp` are Vortex builtin extensions that map directly to native Arrow temporal types, so they continue to go through the canonical executor (`vortex-array/src/arrow/executor/temporal.rs`) rather than the plugin registry. The plugin layer is reserved for **Arrow extension types** that the canonical path can't express. ## DataFusion wiring `vortex-datafusion` now goes through the session for schema/array conversion: - `convert/schema.rs::calculate_physical_schema` uses `ArrowSession::to_arrow_field` so extension metadata survives projection. - `persistent/format.rs` and `persistent/opener.rs` route schema inference through the session. - `persistent/sink.rs` uses `from_arrow_record_batch`, passing the original schema separately from `RecordBatch::schema()` to preserve `ARROW:extension:name` metadata that DataFusion strips at runtime. ## Tests Two new end-to-end tests in `vortex-datafusion/src/persistent/tests.rs`: - `arrow_uuid_extension_roundtrip` — write Arrow UUID column to a Vortex file via the session, `SELECT *` it back, assert the field still carries the `Uuid` extension type and the values match. - `arrow_uuid_extension_roundtrip_nested_struct` — same flow with the UUID nested in a top-level `Struct`, exercising recursive session-aware schema inference. --------- Signed-off-by: Andrew Duffy <andrew@a10y.dev> Signed-off-by: Baris Palaska <barispalaska@gmail.com> Co-authored-by: Baris Palaska <barispalaska@gmail.com>
1 parent 96dda71 commit d71d3d3

50 files changed

Lines changed: 2753 additions & 710 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ debug = "full"
382382
inherits = "release"
383383

384384
[profile.bench]
385-
codegen-units = 16
385+
codegen-units = 1
386386
debug = "full"
387387
lto = false
388388

encodings/parquet-variant/src/array.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ use vortex_array::arrays::StructArray;
2222
use vortex_array::arrays::VariantArray;
2323
use vortex_array::arrays::list::ListArrayExt;
2424
use vortex_array::arrays::struct_::StructArrayExt;
25+
#[expect(
26+
deprecated,
27+
reason = "TODO(aduffy): figure out what to do with Parquet Variant"
28+
)]
2529
use vortex_array::arrow::ArrowArrayExecutor;
2630
use vortex_array::arrow::FromArrowArray;
2731
use vortex_array::arrow::to_arrow_null_buffer;
@@ -331,6 +335,10 @@ pub trait ParquetVariantArrayExt: TypedArrayRef<ParquetVariant> {
331335
}
332336

333337
/// Converts this storage array to Arrow's canonical Parquet Variant extension storage.
338+
#[expect(
339+
deprecated,
340+
reason = "TODO(aduffy): figure out what to do with Parquet Variant"
341+
)]
334342
fn to_arrow(&self, ctx: &mut ExecutionCtx) -> VortexResult<ArrowVariantArray> {
335343
let metadata = self.metadata_array();
336344
let len = metadata.len();

encodings/parquet-variant/src/kernel.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ fn to_parquet_variant_path(path: &VariantPath) -> VortexResult<PqVariantPath<'st
104104
.map(PqVariantPath::new)
105105
}
106106

107+
#[expect(
108+
deprecated,
109+
reason = "TODO(aduffy): figure out what to do with Parquet Variant"
110+
)]
107111
fn to_arrow_as_type(dtype: Option<&DType>) -> VortexResult<Option<FieldRef>> {
108112
match dtype {
109113
Some(dtype) if !dtype.is_variant() => Ok(Some(Arc::new(Field::new(

encodings/pco/src/tests.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use vortex_array::LEGACY_SESSION;
1010
use vortex_array::VortexSessionExecute;
1111
use vortex_array::arrays::BoolArray;
1212
use vortex_array::arrays::PrimitiveArray;
13-
use vortex_array::arrow::ArrowArrayExecutor;
13+
use vortex_array::arrow::ArrowSessionExt;
1414
use vortex_array::assert_arrays_eq;
1515
use vortex_array::assert_nth_scalar;
1616
use vortex_array::dtype::DType;
@@ -213,9 +213,14 @@ fn test_serde() -> VortexResult<()> {
213213
&ReadContext::new(context.to_ids()),
214214
&SESSION,
215215
)?;
216-
let data_type = data.dtype().to_arrow_dtype()?;
217-
let pco_arrow = pco.execute_arrow(Some(&data_type), &mut ctx)?;
218-
let decoded_arrow = decoded.execute_arrow(Some(&data_type), &mut ctx)?;
216+
let data_type = LEGACY_SESSION.arrow().to_arrow_field("", data.dtype())?;
217+
let pco_arrow = LEGACY_SESSION
218+
.arrow()
219+
.execute_arrow(pco, Some(&data_type), &mut ctx)?;
220+
let decoded_arrow =
221+
LEGACY_SESSION
222+
.arrow()
223+
.execute_arrow(decoded, Some(&data_type), &mut ctx)?;
219224
assert!(pco_arrow == decoded_arrow);
220225
Ok(())
221226
}

encodings/runend/src/arrow.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ mod tests {
8585
use vortex_array::VortexSessionExecute as _;
8686
use vortex_array::arrays::PrimitiveArray;
8787
use vortex_array::arrays::primitive::PrimitiveArrayExt;
88-
use vortex_array::arrow::ArrowArrayExecutor;
88+
use vortex_array::arrow::ArrowSessionExt;
8989
use vortex_array::arrow::FromArrowArray;
9090
use vortex_array::assert_arrays_eq;
9191
use vortex_array::dtype::DType;
@@ -301,7 +301,10 @@ mod tests {
301301
}
302302

303303
fn execute(array: ArrayRef, dt: &DataType) -> VortexResult<arrow_array::ArrayRef> {
304-
array.execute_arrow(Some(dt), &mut SESSION.create_execution_ctx())
304+
let field = Field::new("", dt.clone(), true);
305+
SESSION
306+
.arrow()
307+
.execute_arrow(array, Some(&field), &mut SESSION.create_execution_ctx())
305308
}
306309

307310
#[test]

encodings/sparse/src/canonical.rs

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ mod test {
599599
use vortex_array::arrays::VarBinArray;
600600
use vortex_array::arrays::VarBinViewArray;
601601
use vortex_array::arrays::listview::ListViewArrayExt;
602-
use vortex_array::arrow::ArrowArrayExecutor;
602+
use vortex_array::arrow::ArrowSessionExt;
603603
use vortex_array::assert_arrays_eq;
604604
use vortex_array::dtype::DType;
605605
use vortex_array::dtype::DecimalDType;
@@ -845,23 +845,33 @@ mod test {
845845
let fill_scalar = Scalar::decimal(DecimalValue::I32(123), decimal_dtype, Nullable);
846846
let sparse_struct = Sparse::try_new(indices, patch_values, len, fill_scalar).unwrap();
847847

848-
let expected = DecimalArray::new(
849-
buffer![100i128, 200, 123, 123, 123, 123, 123, 300, 4000, 123],
850-
decimal_dtype,
851-
// NB: patch indices: [0, 1, 7, 8]; patch validity: [Valid, Valid, Valid, Invalid]; ergo 0, 1, 7 are valid.
852-
Validity::from_mask(Mask::from_excluded_indices(10, vec![8]), Nullable),
853-
)
854-
.into_array()
855-
.execute_arrow(None, &mut ctx)
856-
.unwrap();
848+
let expected = LEGACY_SESSION
849+
.arrow()
850+
.execute_arrow(
851+
DecimalArray::new(
852+
buffer![100i128, 200, 123, 123, 123, 123, 123, 300, 4000, 123],
853+
decimal_dtype,
854+
// NB: patch indices: [0, 1, 7, 8]; patch validity: [Valid, Valid, Valid, Invalid]; ergo 0, 1, 7 are valid.
855+
Validity::from_mask(Mask::from_excluded_indices(10, vec![8]), Nullable),
856+
)
857+
.into_array(),
858+
None,
859+
&mut ctx,
860+
)
861+
.unwrap();
857862

858-
let actual = sparse_struct
859-
.as_array()
860-
.clone()
861-
.execute::<DecimalArray>(&mut ctx)
862-
.unwrap()
863-
.into_array()
864-
.execute_arrow(None, &mut ctx)
863+
let actual = LEGACY_SESSION
864+
.arrow()
865+
.execute_arrow(
866+
sparse_struct
867+
.as_array()
868+
.clone()
869+
.execute::<DecimalArray>(&mut ctx)
870+
.unwrap()
871+
.into_array(),
872+
None,
873+
&mut ctx,
874+
)
865875
.unwrap();
866876

867877
assert_eq!(expected.data_type(), actual.data_type());
@@ -1544,9 +1554,16 @@ mod test {
15441554
assert_arrays_eq!(&actual, &expected);
15451555

15461556
// Note that the preferred arrow list representation is `List` (not `ListView`).
1547-
let arrow_dtype = expected.dtype().to_arrow_dtype()?;
1548-
let actual = actual.execute_arrow(Some(&arrow_dtype), &mut ctx)?;
1549-
let expected = expected.execute_arrow(Some(&arrow_dtype), &mut ctx)?;
1557+
let arrow_dtype = LEGACY_SESSION
1558+
.arrow()
1559+
.to_arrow_field("", expected.dtype())?;
1560+
let actual = LEGACY_SESSION
1561+
.arrow()
1562+
.execute_arrow(actual, Some(&arrow_dtype), &mut ctx)?;
1563+
let expected =
1564+
LEGACY_SESSION
1565+
.arrow()
1566+
.execute_arrow(expected, Some(&arrow_dtype), &mut ctx)?;
15501567

15511568
assert_eq!(actual.data_type(), expected.data_type());
15521569
Ok(())

vortex-array/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ arrow-buffer = { workspace = true }
2929
arrow-cast = { workspace = true }
3030
arrow-data = { workspace = true }
3131
arrow-ord = { workspace = true }
32-
arrow-schema = { workspace = true }
32+
arrow-schema = { workspace = true, features = ["canonical_extension_types"] }
3333
arrow-select = { workspace = true }
3434
arrow-string = { workspace = true }
3535
async-lock = { workspace = true }
@@ -206,3 +206,7 @@ harness = false
206206
[[bench]]
207207
name = "slice_dict_primitive"
208208
harness = false
209+
210+
[[bench]]
211+
name = "to_arrow"
212+
harness = false

vortex-array/benches/to_arrow.rs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
#![expect(clippy::unwrap_used)]
5+
6+
use std::sync::Arc;
7+
8+
use divan::Bencher;
9+
use vortex_array::ArrayRef;
10+
use vortex_array::IntoArray;
11+
use vortex_array::LEGACY_SESSION;
12+
use vortex_array::VortexSessionExecute;
13+
use vortex_array::arrays::DecimalArray;
14+
use vortex_array::arrays::ListArray;
15+
use vortex_array::arrays::PrimitiveArray;
16+
use vortex_array::arrays::StructArray;
17+
#[expect(
18+
deprecated,
19+
reason = "benchmark comparing deprecated method with new one"
20+
)]
21+
use vortex_array::arrow::ArrowArrayExecutor;
22+
use vortex_array::arrow::ArrowSessionExt;
23+
use vortex_array::dtype::DType;
24+
use vortex_array::dtype::DecimalDType;
25+
use vortex_array::dtype::Nullability;
26+
use vortex_array::dtype::PType;
27+
use vortex_array::dtype::StructFields;
28+
29+
fn main() {
30+
divan::main();
31+
}
32+
33+
fn schema() -> DType {
34+
let fields = StructFields::from_iter([
35+
(
36+
"primitive",
37+
DType::Primitive(PType::F32, Nullability::Nullable),
38+
),
39+
(
40+
"list",
41+
DType::List(
42+
Arc::new(DType::Binary(Nullability::NonNullable)),
43+
Nullability::Nullable,
44+
),
45+
),
46+
(
47+
"decimal",
48+
DType::Decimal(DecimalDType::new(19, 10), Nullability::Nullable),
49+
),
50+
]);
51+
DType::Struct(fields, Nullability::NonNullable)
52+
}
53+
54+
fn array() -> ArrayRef {
55+
StructArray::from_fields(&[
56+
(
57+
"primitive",
58+
PrimitiveArray::from_iter(0i16..1024).into_array(),
59+
),
60+
(
61+
"list",
62+
ListArray::from_iter_slow::<u32, _>(
63+
(0..1024).map(|_| vec!["a", "b", "c"]).collect::<Vec<_>>(),
64+
Arc::new(DType::Utf8(Nullability::NonNullable)),
65+
)
66+
.unwrap()
67+
.into_array(),
68+
),
69+
(
70+
"decimal",
71+
DecimalArray::from_iter(0i64..1024, DecimalDType::new(19, 2)).into_array(),
72+
),
73+
])
74+
.unwrap()
75+
.into_array()
76+
}
77+
78+
#[divan::bench]
79+
fn to_arrow_dtype(bencher: Bencher) {
80+
bencher.with_inputs(schema).bench_values(|dtype| {
81+
#[expect(deprecated, reason = "benchmarking deprecated code path")]
82+
dtype.to_arrow_dtype().unwrap()
83+
});
84+
}
85+
86+
#[allow(non_snake_case)]
87+
#[divan::bench]
88+
fn ArrowExportVTable_to_arrow_field(bencher: Bencher) {
89+
// Warm the ArrowSession
90+
drop(
91+
LEGACY_SESSION
92+
.arrow()
93+
.to_arrow_field("", &schema())
94+
.unwrap(),
95+
);
96+
97+
bencher
98+
.with_inputs(schema)
99+
.bench_values(|dtype| LEGACY_SESSION.arrow().to_arrow_field("", &dtype).unwrap())
100+
}
101+
102+
#[divan::bench]
103+
fn to_arrow_array(bencher: Bencher) {
104+
bencher
105+
.with_inputs(|| (array(), LEGACY_SESSION.create_execution_ctx()))
106+
.bench_values(|(array, mut ctx)| {
107+
#[expect(deprecated, reason = "benchmarking deprecated code path")]
108+
array.execute_arrow(None, &mut ctx).unwrap()
109+
});
110+
}
111+
112+
#[allow(non_snake_case)]
113+
#[divan::bench]
114+
fn ArrowExportVTable_execute_arrow(bencher: Bencher) {
115+
// Warm the ArrowSession
116+
drop(LEGACY_SESSION.arrow().execute_arrow(
117+
array(),
118+
None,
119+
&mut LEGACY_SESSION.create_execution_ctx(),
120+
));
121+
122+
bencher
123+
.with_inputs(|| (array(), LEGACY_SESSION.create_execution_ctx()))
124+
.bench_values(|(array, mut ctx)| {
125+
LEGACY_SESSION
126+
.arrow()
127+
.execute_arrow(array, None, &mut ctx)
128+
.unwrap()
129+
})
130+
}

0 commit comments

Comments
 (0)