From f827d8a775f5cbf8e76160d7818e8eddce41a4ee Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 25 Jun 2026 17:15:07 +0000 Subject: [PATCH] feat: add metadata bridge for Python CUDA export Add a private metadata-only bridge on PyVortex arrays so the optional CUDA extension can reconstruct arrays through its own local Vortex session instead of passing Rust ArrayRef values across Python extension modules. The CUDA extension now parses the metadata tree, calls local array plugins for deserialization, and exports bufferless arrays through the Arrow C Device capsule path. The capsule ownership code handles live and consumed capsule names and releases Arrow resources on error paths. Physical buffer handoff remains a follow-up. Signed-off-by: "Alexander Droste" Signed-off-by: Alexander Droste --- Cargo.lock | 2 + vortex-cuda/src/arrow/mod.rs | 4 +- vortex-python-cuda/Cargo.toml | 2 + vortex-python-cuda/pyproject.toml | 5 +- .../python/vortex_cuda/__init__.py | 10 +- .../python/vortex_cuda/_lib.pyi | 4 + vortex-python-cuda/src/lib.rs | 438 ++++++++++++++++++ vortex-python-cuda/test/test_native_bridge.py | 33 ++ vortex-python/python/vortex/_lib/arrays.pyi | 1 + vortex-python/src/arrays/mod.rs | 53 ++- 10 files changed, 543 insertions(+), 9 deletions(-) create mode 100644 vortex-python-cuda/test/test_native_bridge.py diff --git a/Cargo.lock b/Cargo.lock index 66457601e06..5fea7c1986a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10341,7 +10341,9 @@ dependencies = [ name = "vortex-python-cuda" version = "0.1.0" dependencies = [ + "arrow-schema", "pyo3", + "vortex", "vortex-cuda", ] diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index fb475ab2b24..456d00c48d4 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -543,14 +543,14 @@ fn released_device_array(device_id: i64) -> ArrowDeviceArray { } /// Release an Arrow C schema if it is live. -fn release_schema(schema: &mut FFI_ArrowSchema) { +pub fn release_schema(schema: &mut FFI_ArrowSchema) { if let Some(release) = schema.release { unsafe { release(schema) }; } } /// Release an Arrow device array if it is live. -fn release_device_array(array: &mut ArrowDeviceArray) { +pub fn release_device_array(array: &mut ArrowDeviceArray) { if let Some(release) = array.array.release { unsafe { release(&raw mut array.array) }; } diff --git a/vortex-python-cuda/Cargo.toml b/vortex-python-cuda/Cargo.toml index 8735175177d..d4e0b7a4b6e 100644 --- a/vortex-python-cuda/Cargo.toml +++ b/vortex-python-cuda/Cargo.toml @@ -27,5 +27,7 @@ default = ["extension-module"] extension-module = [] [dependencies] +arrow-schema = { workspace = true } pyo3 = { workspace = true, features = ["abi3", "abi3-py311"] } +vortex = { workspace = true } vortex-cuda = { workspace = true } diff --git a/vortex-python-cuda/pyproject.toml b/vortex-python-cuda/pyproject.toml index 5ee577e4875..b0b23da4d5b 100644 --- a/vortex-python-cuda/pyproject.toml +++ b/vortex-python-cuda/pyproject.toml @@ -4,8 +4,9 @@ name = "vortex-data-cuda" dynamic = ["version", "description", "authors"] requires-python = ">= 3.11" # The CUDA extension package must exactly match the base package version because the two extensions -# exchange Vortex IPC buffers. Keep this in sync with the workspace package version; release -# automation may rewrite it when publishing a non-workspace version. +# exchange private Vortex vtable metadata. Future buffer-handoff support will extend this private +# bridge. Keep this in sync with the workspace package version; release automation may rewrite it +# when publishing a non-workspace version. dependencies = ["vortex-data==0.1.0"] classifiers = [ "Development Status :: 4 - Beta", diff --git a/vortex-python-cuda/python/vortex_cuda/__init__.py b/vortex-python-cuda/python/vortex_cuda/__init__.py index d259d24d698..c9040bb66a5 100644 --- a/vortex-python-cuda/python/vortex_cuda/__init__.py +++ b/vortex-python-cuda/python/vortex_cuda/__init__.py @@ -1,6 +1,12 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors -from ._lib import cuda_available # pyright: ignore[reportMissingModuleSource] +from ._lib import ( # pyright: ignore[reportMissingModuleSource] + _debug_array_metadata_dtype as _debug_array_metadata_dtype, +) +from ._lib import ( + cuda_available, + export_device_array, +) -__all__ = ["cuda_available"] +__all__ = ["cuda_available", "export_device_array"] diff --git a/vortex-python-cuda/python/vortex_cuda/_lib.pyi b/vortex-python-cuda/python/vortex_cuda/_lib.pyi index 431190faa0a..6b93fec0986 100644 --- a/vortex-python-cuda/python/vortex_cuda/_lib.pyi +++ b/vortex-python-cuda/python/vortex_cuda/_lib.pyi @@ -1,4 +1,8 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors +def _debug_array_metadata_dtype(array: object) -> str: ... def cuda_available() -> bool: ... +def export_device_array( + array: object, requested_schema: object | None = None, **kwargs: object +) -> tuple[object, object]: ... diff --git a/vortex-python-cuda/src/lib.rs b/vortex-python-cuda/src/lib.rs index 26da87f2530..c4a8e585003 100644 --- a/vortex-python-cuda/src/lib.rs +++ b/vortex-python-cuda/src/lib.rs @@ -7,7 +7,84 @@ //! the CPU-only `vortex-data` wheel. Keeping CUDA in its own extension keeps the base wheel free of //! CUDA build/runtime dependencies; `vortex.cuda_extension_installed()` reports whether it is present. +use std::ffi::CStr; +use std::ffi::c_void; +use std::ptr::NonNull; +use std::sync::LazyLock; + +use arrow_schema::Field; +use arrow_schema::Schema; +use arrow_schema::ffi::FFI_ArrowSchema; +use pyo3::exceptions::PyNotImplementedError; +use pyo3::exceptions::PyRuntimeError; +use pyo3::exceptions::PyValueError; +use pyo3::ffi; +use pyo3::ffi::c_str; use pyo3::prelude::*; +use pyo3::types::PyCapsule; +use pyo3::types::PyDict; +use pyo3::types::PyList; +use pyo3::types::PyTuple; +use vortex::VortexSessionDefault; +use vortex::array::ArrayId; +use vortex::array::ArrayRef; +use vortex::array::buffer::BufferHandle; +use vortex::array::serde::ArrayChildren; +use vortex::array::session::ArraySessionExt; +use vortex::buffer::ByteBuffer; +use vortex::dtype::DType; +use vortex::error::VortexError; +use vortex::error::VortexResult; +use vortex::error::vortex_bail; +use vortex::error::vortex_ensure; +use vortex::error::vortex_err; +use vortex::flatbuffers::FlatBuffer; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::current::CurrentThreadRuntime; +use vortex::session::VortexSession; +use vortex_cuda::CudaSession; +use vortex_cuda::arrow::ARROW_DEVICE_CUDA; +use vortex_cuda::arrow::ArrowDeviceArray; +use vortex_cuda::arrow::ArrowDeviceArrayWithSchema; +use vortex_cuda::arrow::DeviceArrayExt; +use vortex_cuda::arrow::release_device_array; +use vortex_cuda::arrow::release_schema; + +const ARROW_SCHEMA_CAPSULE_NAME: &CStr = c_str!("arrow_schema"); +const USED_ARROW_SCHEMA_CAPSULE_NAME: &CStr = c_str!("used_arrow_schema"); +const ARROW_DEVICE_ARRAY_CAPSULE_NAME: &CStr = c_str!("arrow_device_array"); +const USED_ARROW_DEVICE_ARRAY_CAPSULE_NAME: &CStr = c_str!("used_arrow_device_array"); + +struct ExportedDeviceArray(ArrowDeviceArrayWithSchema); + +// The exported Arrow C Device structs own CPU-side metadata plus CUDA device pointers through their +// Arrow release callbacks. `Python::detach` requires a `Send` return value even though it executes +// the closure synchronously with the GIL released; this wrapper lets us move the owned export result +// back across that boundary without changing the ABI structs themselves. +unsafe impl Send for ExportedDeviceArray {} + +static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); +static METADATA_SESSION: LazyLock = + LazyLock::new(::default); +static CUDA_SESSION: LazyLock> = LazyLock::new(|| { + if !vortex_cuda::cuda_available() { + return Err("CUDA is not available: no usable CUDA driver/device was found".to_string()); + } + + let cuda_session = CudaSession::try_default().map_err(|err| err.to_string())?; + Ok(::default().with_some(cuda_session)) +}); + +fn cuda_session() -> PyResult<&'static VortexSession> { + match &*CUDA_SESSION { + Ok(session) => Ok(session), + Err(err) => Err(PyRuntimeError::new_err(err.clone())), + } +} + +fn to_py_err(err: VortexError) -> PyErr { + PyRuntimeError::new_err(err.to_string()) +} /// Return whether a usable CUDA device is available in the current process. /// @@ -19,10 +96,371 @@ fn cuda_available() -> bool { vortex_cuda::cuda_available() } +struct ArrayMetadata { + encoding_id: String, + dtype: Vec, + len: usize, + metadata: Vec, + buffer_count: usize, + children: Vec, +} + +struct MetadataChildren(Vec); + +impl ArrayChildren for MetadataChildren { + fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult { + let child = self + .0 + .as_slice() + .get(index) + .ok_or_else(|| vortex_err!("array metadata child index {index} out of bounds"))? + .clone(); + vortex_ensure!( + child.dtype() == dtype, + "array metadata child {index} has dtype {}, expected {dtype}", + child.dtype() + ); + vortex_ensure!( + child.len() == len, + "array metadata child {index} has length {}, expected {len}", + child.len() + ); + Ok(child) + } + + fn len(&self) -> usize { + self.0.len() + } +} + +fn extract_array_metadata(array: &Bound<'_, PyAny>) -> PyResult { + let metadata = array.call_method0("__vortex_array_metadata__")?; + parse_array_metadata(&metadata) +} + +fn parse_array_metadata(value: &Bound<'_, PyAny>) -> PyResult { + let tuple = value.cast::()?; + if tuple.len() != 6 { + return Err(PyValueError::new_err(format!( + "expected Vortex array metadata tuple of length 6, got {}", + tuple.len() + ))); + } + + let children = tuple + .get_item(5)? + .cast::()? + .iter() + .map(|child| parse_array_metadata(&child)) + .collect::>>()?; + + Ok(ArrayMetadata { + encoding_id: tuple.get_item(0)?.extract()?, + dtype: tuple.get_item(1)?.extract()?, + len: tuple.get_item(2)?.extract()?, + metadata: tuple.get_item(3)?.extract()?, + buffer_count: tuple.get_item(4)?.extract()?, + children, + }) +} + +fn dtype_from_metadata(metadata: &ArrayMetadata, session: &VortexSession) -> VortexResult { + let flatbuffer = FlatBuffer::align_from(ByteBuffer::from(metadata.dtype.clone())); + DType::from_flatbuffer(flatbuffer, session) +} + +fn deserialize_metadata_tree( + metadata: &ArrayMetadata, + session: &VortexSession, +) -> VortexResult { + if metadata.buffer_count != 0 { + vortex_bail!( + "metadata-only bridge cannot deserialize array {} with {} buffers yet", + metadata.encoding_id, + metadata.buffer_count + ); + } + + let dtype = dtype_from_metadata(metadata, session)?; + let children = metadata + .children + .iter() + .map(|child| deserialize_metadata_tree(child, session)) + .collect::>>()?; + let children = MetadataChildren(children); + let encoding_id = ArrayId::new(&metadata.encoding_id); + let plugin = session + .arrays() + .registry() + .find(&encoding_id) + .ok_or_else(|| vortex_err!("Unknown array encoding: {}", metadata.encoding_id))?; + let buffers: &[BufferHandle] = &[]; + let decoded = plugin.deserialize( + &dtype, + metadata.len, + &metadata.metadata, + buffers, + &children, + session, + )?; + vortex_ensure!( + decoded.len() == metadata.len, + "Array decoded from {} has incorrect length {}, expected {}", + metadata.encoding_id, + decoded.len(), + metadata.len + ); + vortex_ensure!( + decoded.dtype() == &dtype, + "Array decoded from {} has incorrect dtype {}, expected {}", + metadata.encoding_id, + decoded.dtype(), + dtype + ); + vortex_ensure!( + plugin.is_supported_encoding(&decoded.encoding_id()), + "Array decoded from {} has incorrect encoding {}", + metadata.encoding_id, + decoded.encoding_id() + ); + Ok(decoded) +} + +// PyO3 exposes a synchronous Python API, while the CUDA Arrow Device export is async. +// Keep this adapter private to the Python extension so this PR does not add a public +// blocking convenience API to `vortex-cuda`. +fn export_device_array_with_schema_blocking( + array: ArrayRef, + session: &VortexSession, + runtime: &CurrentThreadRuntime, +) -> VortexResult { + let mut ctx = CudaSession::create_execution_ctx(session)?; + runtime.block_on(array.export_device_array_with_schema(&mut ctx)) +} + +/// Return the dtype string after crossing the private vtable-metadata bridge. +#[pyfunction] +fn _debug_array_metadata_dtype(array: Bound<'_, PyAny>) -> PyResult { + let metadata = extract_array_metadata(&array)?; + let array = deserialize_metadata_tree(&metadata, &METADATA_SESSION).map_err(to_py_err)?; + Ok(array.dtype().to_string()) +} + +/// Export a PyVortex array as Arrow C Device schema and array PyCapsules. +#[pyfunction] +#[pyo3(signature = (array, requested_schema = None, **kwargs))] +fn export_device_array<'py>( + py: Python<'py>, + array: Bound<'py, PyAny>, + requested_schema: Option>, + kwargs: Option<&Bound<'py, PyDict>>, +) -> PyResult<(Bound<'py, PyCapsule>, Bound<'py, PyCapsule>)> { + reject_unsupported_kwargs(kwargs)?; + + let metadata = extract_array_metadata(&array)?; + let session = cuda_session()?; + let array = deserialize_metadata_tree(&metadata, session).map_err(to_py_err)?; + let dtype = array.dtype().clone(); + + let exported = py + .detach(move || { + export_device_array_with_schema_blocking(array, session, &RUNTIME) + .map(ExportedDeviceArray) + }) + .map_err(to_py_err)?; + let mut exported = exported.0; + + if let Err(err) = check_requested_schema(requested_schema.as_ref(), &exported.schema, &dtype) { + release_exported(&mut exported); + return Err(err); + } + + let ArrowDeviceArrayWithSchema { schema, mut array } = exported; + let schema = match schema_capsule(py, schema) { + Ok(schema) => schema, + Err(err) => { + release_device_array(&mut array); + return Err(err); + } + }; + let array = device_array_capsule(py, array)?; + Ok((schema, array)) +} + +fn reject_unsupported_kwargs(kwargs: Option<&Bound<'_, PyDict>>) -> PyResult<()> { + let Some(kwargs) = kwargs else { + return Ok(()); + }; + + for (name, value) in kwargs.iter() { + if !value.is_none() { + return Err(PyNotImplementedError::new_err(format!( + "unsupported __arrow_c_device_array__ keyword argument {name}={value:?}" + ))); + } + } + Ok(()) +} + +fn check_requested_schema( + requested_schema: Option<&Bound<'_, PyAny>>, + exported_schema: &FFI_ArrowSchema, + dtype: &DType, +) -> PyResult<()> { + let Some(requested_schema) = requested_schema else { + return Ok(()); + }; + if requested_schema.is_none() { + return Ok(()); + } + + let requested_schema = requested_schema.cast::()?; + let requested_schema = unsafe { + requested_schema + .pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))? + .cast::() + .as_ref() + }; + + if matches!(dtype, DType::Struct(..)) { + let requested = Schema::try_from(requested_schema) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + let exported = Schema::try_from(exported_schema) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + if requested == exported { + return Ok(()); + } + } else { + let requested = Field::try_from(requested_schema) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + let exported = Field::try_from(exported_schema) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + if requested == exported { + return Ok(()); + } + } + + Err(PyNotImplementedError::new_err( + "requested_schema coercion is not supported by vortex_cuda.export_device_array", + )) +} + +fn release_exported(exported: &mut ArrowDeviceArrayWithSchema) { + release_schema(&mut exported.schema); + release_device_array(&mut exported.array); +} + +fn schema_capsule<'py>( + py: Python<'py>, + schema: FFI_ArrowSchema, +) -> PyResult> { + let ptr = Box::into_raw(Box::new(schema)).cast::(); + let ptr = NonNull::new(ptr) + .ok_or_else(|| PyRuntimeError::new_err("failed to allocate ArrowSchema capsule"))?; + let capsule = unsafe { + PyCapsule::new_with_pointer_and_destructor( + py, + ptr, + ARROW_SCHEMA_CAPSULE_NAME, + Some(release_schema_capsule), + ) + }; + match capsule { + Ok(capsule) => Ok(capsule), + Err(err) => { + let mut schema = unsafe { Box::from_raw(ptr.as_ptr().cast::()) }; + release_schema(&mut schema); + Err(err) + } + } +} + +fn device_array_capsule<'py>( + py: Python<'py>, + array: ArrowDeviceArray, +) -> PyResult> { + debug_assert_eq!(array.device_type, ARROW_DEVICE_CUDA); + let ptr = Box::into_raw(Box::new(array)).cast::(); + let ptr = NonNull::new(ptr) + .ok_or_else(|| PyRuntimeError::new_err("failed to allocate ArrowDeviceArray capsule"))?; + let capsule = unsafe { + PyCapsule::new_with_pointer_and_destructor( + py, + ptr, + ARROW_DEVICE_ARRAY_CAPSULE_NAME, + Some(release_device_array_capsule), + ) + }; + match capsule { + Ok(capsule) => Ok(capsule), + Err(err) => { + let mut array = unsafe { Box::from_raw(ptr.as_ptr().cast::()) }; + release_device_array(&mut array); + Err(err) + } + } +} + +// The `used_*` names are only seen after a consumer imports and renames the capsule. CI cannot +// exercise that path without a CUDA Arrow Device consumer, but the destructor must still reclaim +// the outer boxed C struct after the consumer move-nulls the embedded release callback. +unsafe fn capsule_pointer_with_name_or_used( + capsule: *mut ffi::PyObject, + name: &CStr, + used_name: &CStr, +) -> *mut c_void { + let ptr = unsafe { ffi::PyCapsule_GetPointer(capsule, name.as_ptr()) }; + if !ptr.is_null() { + return ptr; + } + unsafe { ffi::PyErr_Clear() }; + + let ptr = unsafe { ffi::PyCapsule_GetPointer(capsule, used_name.as_ptr()) }; + if !ptr.is_null() { + return ptr; + } + unsafe { ffi::PyErr_Clear() }; + std::ptr::null_mut() +} + +unsafe extern "C" fn release_schema_capsule(capsule: *mut ffi::PyObject) { + let ptr = unsafe { + capsule_pointer_with_name_or_used( + capsule, + ARROW_SCHEMA_CAPSULE_NAME, + USED_ARROW_SCHEMA_CAPSULE_NAME, + ) + }; + if ptr.is_null() { + return; + } + + let mut schema = unsafe { Box::from_raw(ptr.cast::()) }; + release_schema(&mut schema); +} + +unsafe extern "C" fn release_device_array_capsule(capsule: *mut ffi::PyObject) { + let ptr = unsafe { + capsule_pointer_with_name_or_used( + capsule, + ARROW_DEVICE_ARRAY_CAPSULE_NAME, + USED_ARROW_DEVICE_ARRAY_CAPSULE_NAME, + ) + }; + if ptr.is_null() { + return; + } + + let mut array = unsafe { Box::from_raw(ptr.cast::()) }; + release_device_array(&mut array); +} + /// The `vortex_cuda._lib` extension module. #[cfg(feature = "extension-module")] #[pymodule] fn _lib(m: &Bound) -> PyResult<()> { m.add_function(wrap_pyfunction!(cuda_available, m)?)?; + m.add_function(wrap_pyfunction!(_debug_array_metadata_dtype, m)?)?; + m.add_function(wrap_pyfunction!(export_device_array, m)?)?; Ok(()) } diff --git a/vortex-python-cuda/test/test_native_bridge.py b/vortex-python-cuda/test/test_native_bridge.py new file mode 100644 index 00000000000..ca0a0072291 --- /dev/null +++ b/vortex-python-cuda/test/test_native_bridge.py @@ -0,0 +1,33 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +import pytest +import vortex_cuda + +import vortex + + +def test_debug_array_metadata_dtype_reads_base_vortex_array(): + array = vortex.Array.from_range(range(0, 3)) + + assert vortex_cuda._debug_array_metadata_dtype(array) == str(array.dtype) + + +def test_metadata_bridge_reports_arrays_that_need_buffer_handoff(): + array = vortex.array([1, 2, 3]) + + with pytest.raises(RuntimeError, match="metadata-only bridge.*buffers"): + _ = vortex_cuda._debug_array_metadata_dtype(array) + + +def test_export_device_array_returns_capsules_or_clean_cuda_error(): + array = vortex.Array.from_range(range(0, 3)) + + if not vortex_cuda.cuda_available(): + with pytest.raises(RuntimeError, match="CUDA"): + _ = vortex_cuda.export_device_array(array) + return + + schema, device_array = vortex_cuda.export_device_array(array) + assert type(schema).__name__ == "PyCapsule" + assert type(device_array).__name__ == "PyCapsule" diff --git a/vortex-python/python/vortex/_lib/arrays.pyi b/vortex-python/python/vortex/_lib/arrays.pyi index f7649d3487f..3e4cd0931a4 100644 --- a/vortex-python/python/vortex/_lib/arrays.pyi +++ b/vortex-python/python/vortex/_lib/arrays.pyi @@ -26,6 +26,7 @@ class Array: @staticmethod def from_range(obj: range, *, dtype: DType | None = None) -> Array: ... def to_arrow_array(self) -> pa.Array[pa.Scalar[pa.DataType]]: ... + def __vortex_array_metadata__(self) -> tuple[str, bytes, int, bytes, int, list[object]]: ... @property def id(self) -> str: ... @property diff --git a/vortex-python/src/arrays/mod.rs b/vortex-python/src/arrays/mod.rs index 295836d6e55..e79a0c0a620 100644 --- a/vortex-python/src/arrays/mod.rs +++ b/vortex-python/src/arrays/mod.rs @@ -17,11 +17,13 @@ use pyo3::exceptions::PyTypeError; use pyo3::exceptions::PyValueError; use pyo3::intern; use pyo3::prelude::*; +use pyo3::types::PyBytes; use pyo3::types::PyDict; use pyo3::types::PyList; use pyo3::types::PyRange; use pyo3::types::PyRangeMethods; -use pyo3_bytes::PyBytes; +use pyo3::types::PyTuple; +use pyo3_bytes::PyBytes as PyBufferBytes; use vortex::array::ArrayRef; use vortex::array::Canonical; use vortex::array::IntoArray; @@ -33,9 +35,11 @@ use vortex::array::arrays::chunked::ChunkedArrayExt; use vortex::array::arrow::ArrowSessionExt; use vortex::array::builtins::ArrayBuiltins; use vortex::array::match_each_integer_ptype; +use vortex::array::session::ArraySessionExt; use vortex::dtype::DType; use vortex::dtype::Nullability; use vortex::dtype::PType; +use vortex::flatbuffers::WriteFlatBufferExt; use vortex::ipc::messages::EncoderMessage; use vortex::ipc::messages::MessageEncoder; use vortex::scalar_fn::fns::operators::Operator; @@ -56,6 +60,38 @@ use crate::scalar::PyScalar; use crate::serde::context::PyArrayContext; use crate::session::session; +fn array_metadata_tuple<'py>( + py: Python<'py>, + array: &ArrayRef, +) -> PyVortexResult> { + let metadata = session().array_serialize(array)?.ok_or_else(|| { + PyValueError::new_err(format!( + "Array {} does not support metadata serialization", + array.encoding_id() + )) + })?; + let dtype = array.dtype().write_flatbuffer_bytes()?; + let children = array + .children() + .iter() + .map(|child| array_metadata_tuple(py, child).map(|tuple| tuple.into_any())) + .collect::>>()?; + let children = PyList::new(py, children)?; + + PyTuple::new( + py, + [ + array.encoding_id().to_string().into_py_any(py)?, + PyBytes::new(py, dtype.as_slice()).into_any().into(), + array.len().into_py_any(py)?, + PyBytes::new(py, metadata.as_slice()).into_any().into(), + array.nbuffers().into_py_any(py)?, + children.into_any().into(), + ], + ) + .map_err(Into::into) +} + pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { let m = PyModule::new(py, "arrays")?; parent.add_submodule(&m)?; @@ -374,6 +410,17 @@ impl PyArray { } } + /// Export this array's vtable metadata tree for optional native extensions. + /// + /// The returned private tuple intentionally excludes buffers; consumers must provide buffer + /// handles through a separate bridge before deserializing arrays that own physical buffers. + fn __vortex_array_metadata__<'py>( + self_: &'py Bound<'py, Self>, + ) -> PyVortexResult> { + let array = PyArrayRef::extract(self_.as_any().as_borrowed())?.into_inner(); + array_metadata_tuple(self_.py(), &array) + } + fn __len__(&self) -> PyResult { Err(PyTypeError::new_err("__len__ is not implemented for Array")) } @@ -801,14 +848,14 @@ impl PyArray { for buf in array_buffers.into_iter() { // PyBytes wraps bytes::Bytes and implements the buffer protocol // This allows PickleBuffer to reference the data without copying - let py_bytes = PyBytes::new(buf).into_py_any(py)?; + let py_bytes = PyBufferBytes::new(buf).into_py_any(py)?; let pickle_buffer = pickle_buffer_class.call1((py_bytes,))?; pickle_buffers.push(pickle_buffer); } let mut dtype_pickle_buffers = Vec::new(); for buf in dtype_buffers.into_iter() { - let py_bytes = PyBytes::new(buf).into_py_any(py)?; + let py_bytes = PyBufferBytes::new(buf).into_py_any(py)?; let pickle_buffer = pickle_buffer_class.call1((py_bytes,))?; dtype_pickle_buffers.push(pickle_buffer); }