From 6a78fead71eb1678098fb59eb066a1ecefe60f09 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 17 Jun 2026 18:46:54 +0000 Subject: [PATCH 01/25] Implement Arrow device array stream Signed-off-by: Alexander Droste --- vortex-cuda/build.rs | 1 + vortex-cuda/ffi/cinclude/vortex_cuda.h | 28 ++ vortex-cuda/ffi/src/lib.rs | 39 +++ vortex-cuda/src/arrow/mod.rs | 460 +++++++++++++++++++++++++ vortex-cuda/src/lib.rs | 2 + vortex-ffi/src/lib.rs | 4 + vortex-ffi/src/scan.rs | 36 ++ 7 files changed, 570 insertions(+) diff --git a/vortex-cuda/build.rs b/vortex-cuda/build.rs index e6024d3c275..eb4743bab21 100644 --- a/vortex-cuda/build.rs +++ b/vortex-cuda/build.rs @@ -206,6 +206,7 @@ fn generate_arrow_device_array_bindings(manifest_dir: &Path, out_dir: &Path) { .header(header.to_string_lossy()) .allowlist_type("ArrowArray") .allowlist_type("ArrowDeviceArray") + .allowlist_type("ArrowDeviceArrayStream") .allowlist_type("ArrowDeviceType") .allowlist_var("ARROW_DEVICE_.*") // ArrowArray/ArrowDeviceArray own producer state through release/private_data. diff --git a/vortex-cuda/ffi/cinclude/vortex_cuda.h b/vortex-cuda/ffi/cinclude/vortex_cuda.h index ee0f0fb3ca3..83aa9988b9e 100644 --- a/vortex-cuda/ffi/cinclude/vortex_cuda.h +++ b/vortex-cuda/ffi/cinclude/vortex_cuda.h @@ -43,6 +43,18 @@ struct ArrowDeviceArray { }; #endif +#if !defined(ARROW_C_DEVICE_STREAM_INTERFACE) && !defined(USE_OWN_ARROW_DEVICE) +#define ARROW_C_DEVICE_STREAM_INTERFACE +struct ArrowDeviceArrayStream { + ArrowDeviceType device_type; + int (*get_schema)(struct ArrowDeviceArrayStream *, struct ArrowSchema *out); + int (*get_next)(struct ArrowDeviceArrayStream *, struct ArrowDeviceArray *out); + const char *(*get_last_error)(struct ArrowDeviceArrayStream *); + void (*release)(struct ArrowDeviceArrayStream *); + void *private_data; +}; +#endif + /** * Create a CUDA Vortex session. * @@ -69,6 +81,22 @@ int vx_cuda_array_export_arrow_device(const vx_session *session, struct ArrowDeviceArray *out_array, vx_error **error_out); +/** + * Scan a Vortex partition as an Arrow C Device stream. + * + * On success returns 0 and writes an owned `ArrowDeviceArrayStream` to `out_stream`. The stream owns + * the partition and must be released through its embedded Arrow `release` callback. Each produced + * `ArrowDeviceArray` is exported on one CUDA device and must be released independently by the + * consumer through its embedded `ArrowArray.release` callback. + * + * On error returns 1 and, when `error_out` is non-NULL, writes a `vx_error` (free with + * `vx_error_free`). If `partition` is consumed by this call, callers must not free or reuse it. + */ +int vx_cuda_partition_scan_arrow_device(const vx_session *session, + vx_partition *partition, + struct ArrowDeviceArrayStream *out_stream, + vx_error **error_out); + #ifdef __cplusplus } #endif diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index f3f97210d5b..a807aeb1f13 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -17,11 +17,14 @@ use vortex::error::vortex_ensure; use vortex::session::VortexSession; use vortex_cuda::CudaSession; use vortex_cuda::arrow::ArrowDeviceArray; +use vortex_cuda::arrow::ArrowDeviceArrayStream; use vortex_cuda::arrow::DeviceArrayExt; use vortex_ffi::try_or; use vortex_ffi::vx_array; use vortex_ffi::vx_array_ref; use vortex_ffi::vx_error; +use vortex_ffi::vx_partition; +use vortex_ffi::vx_partition_into_array_iter; use vortex_ffi::vx_session; use vortex_ffi::vx_session_new_with; use vortex_ffi::vx_session_ref; @@ -100,6 +103,42 @@ pub unsafe extern "C-unwind" fn vx_cuda_array_export_arrow_device( }) } +/// Scan a Vortex partition as an Arrow C Device stream. +/// +/// On success returns `0` and writes an owned `ArrowDeviceArrayStream` to `out_stream`. The stream +/// owns the partition and must be released through its embedded Arrow `release` callback. Each +/// produced `ArrowDeviceArray` is exported on one CUDA device and is independently released by the +/// consumer through the embedded `ArrowArray.release` callback. +/// +/// On error returns `1` and, when `error_out` is non-null, writes a `vx_error` (free with +/// `vx_error_free`). If `partition` is consumed by this call, callers must not free or reuse it. +/// +/// # Safety +/// +/// `session` must be a valid borrowed handle created by `vortex-ffi`. `partition` must be an owned +/// partition handle created by `vortex-ffi`. `out_stream` must be a valid writable pointer. If +/// `error_out` is non-null, it must be valid for writing one error pointer. +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device( + session: *const vx_session, + partition: *mut vx_partition, + out_stream: *mut ArrowDeviceArrayStream, + error_out: *mut *mut vx_error, +) -> c_int { + try_or(error_out, VX_CUDA_ERR, || { + vortex_ensure!(!partition.is_null(), "null vx_partition"); + vortex_ensure!(!out_stream.is_null(), "null ArrowDeviceArrayStream output"); + + let session = session_with_cuda(unsafe { vx_session_ref(session) }?)?; + let (dtype, array_iter) = unsafe { vx_partition_into_array_iter(partition) }?; + let device_stream = + vortex_cuda::export_device_array_stream_from_iter(array_iter, dtype, &session)?; + + unsafe { ptr::write(out_stream, device_stream) }; + Ok(VX_CUDA_OK) + }) +} + #[cfg(test)] mod tests { use std::ptr; diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 63582b22a24..455037db63f 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -11,8 +11,13 @@ mod canonical; mod list_view; +use std::ffi::CString; +use std::ffi::c_char; +use std::ffi::c_int; use std::ffi::c_void; use std::fmt::Debug; +use std::panic::AssertUnwindSafe; +use std::panic::catch_unwind; use std::ptr; use std::sync::Arc; @@ -25,6 +30,7 @@ pub(crate) use canonical::CanonicalDeviceArrayExport; use cudarc::driver::CudaEvent; use cudarc::driver::CudaStream; use cudarc::runtime::sys::cudaEvent_t; +use futures::StreamExt; use vortex::array::ArrayRef; use vortex::array::arrays::Dict; use vortex::array::arrays::FixedSizeList; @@ -38,13 +44,16 @@ use vortex::array::arrays::listview::ListViewArrayExt; use vortex::array::arrays::struct_::StructArrayExt; use vortex::array::arrow::ArrowSessionExt; use vortex::array::buffer::BufferHandle; +use vortex::array::stream::SendableArrayStream; use vortex::dtype::DType; use vortex::dtype::DecimalDType; use vortex::dtype::DecimalType; use vortex::dtype::PType; use vortex::dtype::StructFields; use vortex::error::VortexResult; +use vortex::error::vortex_ensure; use vortex::error::vortex_err; +use vortex::session::VortexSession; use crate::CudaBufferExt; use crate::CudaExecutionCtx; @@ -61,7 +70,9 @@ mod arrow_c_abi { pub use arrow_c_abi::ArrowArray; pub use arrow_c_abi::ArrowDeviceArray; +pub use arrow_c_abi::ArrowDeviceArrayStream; pub use arrow_c_abi::ArrowDeviceType; +use arrow_c_abi::ArrowSchema; #[cfg(feature = "_test-harness")] #[doc(hidden)] @@ -242,6 +253,361 @@ impl DeviceArrayExt for ArrayRef { } } +const ARROW_STREAM_EIO: c_int = 5; +const ARROW_STREAM_EINVAL: c_int = 22; + +#[derive(Clone, Debug, PartialEq)] +enum DeviceExportSchema { + Schema(Schema), + Field(Field), +} + +impl DeviceExportSchema { + fn from_ffi(schema: &FFI_ArrowSchema, dtype: &DType) -> VortexResult { + if matches!(dtype, DType::Struct(..)) { + Ok(Self::Schema(Schema::try_from(schema)?)) + } else { + Ok(Self::Field(Field::try_from(schema)?)) + } + } + + fn from_dtype(dtype: &DType, ctx: &mut CudaExecutionCtx) -> VortexResult { + let dtype = arrow_device_export_dtype(dtype); + if let DType::Struct(struct_dtype, _) = &dtype { + Ok(Self::Schema(Schema::new( + arrow_device_export_struct_fields(struct_dtype, ctx)?, + ))) + } else { + Ok(Self::Field(arrow_device_export_field("", &dtype, ctx)?)) + } + } + + fn to_ffi(&self) -> VortexResult { + match self { + Self::Schema(schema) => Ok(FFI_ArrowSchema::try_from(schema)?), + Self::Field(field) => Ok(FFI_ArrowSchema::try_from(field)?), + } + } +} + +type ArrayStreamIterator = Box>>; + +struct FuturesArrayStreamIterator { + stream: SendableArrayStream, +} + +impl Iterator for FuturesArrayStreamIterator { + type Item = VortexResult; + + fn next(&mut self) -> Option { + futures::executor::block_on(self.stream.next()) + } +} + +struct DeviceArrayStreamPrivateData { + array_iter: ArrayStreamIterator, + ctx: CudaExecutionCtx, + dtype: DType, + schema: Option, + pending_array: Option, + device_id: i64, + last_error: Option, +} + +impl DeviceArrayStreamPrivateData { + fn clear_error(&mut self) { + self.last_error = None; + } + + fn set_error(&mut self, error: impl ToString) -> c_int { + let message = error.to_string().replace('\0', "\\0"); + self.last_error = CString::new(message).ok(); + ARROW_STREAM_EIO + } + + fn ensure_schema(&mut self) -> VortexResult<&DeviceExportSchema> { + if self.schema.is_none() { + match self.array_iter.next() { + Some(array) => { + let array = self.export_batch(array?)?; + self.pending_array = Some(array); + } + None => { + self.schema = Some(DeviceExportSchema::from_dtype(&self.dtype, &mut self.ctx)?); + } + } + } + + self.schema + .as_ref() + .ok_or_else(|| vortex_err!("ArrowDeviceArrayStream schema was not initialized")) + } + + fn next_array(&mut self) -> VortexResult> { + if let Some(array) = self.pending_array.take() { + return Ok(Some(array)); + } + + let Some(array) = self.array_iter.next() else { + if self.schema.is_none() { + self.schema = Some(DeviceExportSchema::from_dtype(&self.dtype, &mut self.ctx)?); + } + return Ok(None); + }; + + self.export_batch(array?).map(Some) + } + + fn export_batch(&mut self, array: ArrayRef) -> VortexResult { + vortex_ensure!( + array.dtype() == &self.dtype, + "stream batch dtype changed from {} to {}", + self.dtype, + array.dtype() + ); + + let exported = + futures::executor::block_on(array.export_device_array_with_schema(&mut self.ctx))?; + let ArrowDeviceArrayWithSchema { + mut schema, + mut array, + } = exported; + let batch_schema = DeviceExportSchema::from_ffi(&schema, &self.dtype); + release_schema(&mut schema); + let batch_schema = match batch_schema { + Ok(batch_schema) => batch_schema, + Err(error) => { + release_device_array(&mut array); + return Err(error); + } + }; + + let validation = (|| -> VortexResult<()> { + if let Some(schema) = &self.schema { + vortex_ensure!( + schema == &batch_schema, + "stream batch Arrow schema changed from {:?} to {:?}", + schema, + batch_schema + ); + } else { + self.schema = Some(batch_schema); + } + + vortex_ensure!( + array.device_type == ARROW_DEVICE_CUDA, + "stream batch exported on non-CUDA device type {}", + array.device_type + ); + vortex_ensure!( + array.device_id == self.device_id, + "stream batch moved from CUDA device {} to {}", + self.device_id, + array.device_id + ); + Ok(()) + })(); + + if let Err(error) = validation { + release_device_array(&mut array); + return Err(error); + } + + Ok(array) + } +} + +impl Drop for DeviceArrayStreamPrivateData { + fn drop(&mut self) { + if let Some(mut array) = self.pending_array.take() { + release_device_array(&mut array); + } + } +} + +/// Extension trait for exporting a Vortex array stream as an Arrow C Device stream. +pub trait DeviceArrayStreamExt { + /// Export this stream as an [`ArrowDeviceArrayStream`]. + /// + /// Batches are exported through one persistent [`CudaExecutionCtx`]. The stream records that + /// context's CUDA device at construction time, and each `get_next` verifies that the produced + /// [`ArrowDeviceArray`] is CUDA-resident on that same device. The returned C stream owns the + /// Vortex stream and must be released through its embedded `release` callback. + fn export_device_array_stream( + self, + session: &VortexSession, + ) -> VortexResult; +} + +impl DeviceArrayStreamExt for SendableArrayStream { + fn export_device_array_stream( + self, + session: &VortexSession, + ) -> VortexResult { + let dtype = self.dtype().clone(); + export_device_array_stream_from_iter( + FuturesArrayStreamIterator { stream: self }, + dtype, + session, + ) + } +} + +/// Export a blocking Vortex array iterator as an [`ArrowDeviceArrayStream`]. +/// +/// The iterator is advanced by the Arrow stream callbacks. Use this helper when the stream must be +/// driven by a specific runtime or executor before crossing the Arrow C Device stream boundary. +/// Each yielded array must have `dtype`; every exported batch is validated to stay on the CUDA +/// device selected by the session's CUDA execution context. +pub fn export_device_array_stream_from_iter( + array_iter: impl Iterator> + 'static, + dtype: DType, + session: &VortexSession, +) -> VortexResult { + let ctx = crate::CudaSession::create_execution_ctx(session)?; + let device_id = ctx.stream().context().ordinal() as i64; + + let private_data = Box::new(DeviceArrayStreamPrivateData { + array_iter: Box::new(array_iter), + ctx, + dtype, + schema: None, + pending_array: None, + device_id, + last_error: None, + }); + + Ok(ArrowDeviceArrayStream { + device_type: ARROW_DEVICE_CUDA, + get_schema: Some(device_stream_get_schema), + get_next: Some(device_stream_get_next), + get_last_error: Some(device_stream_get_last_error), + release: Some(device_stream_release), + private_data: Box::into_raw(private_data).cast(), + }) +} + +unsafe fn device_stream_private_data<'a>( + stream: *mut ArrowDeviceArrayStream, +) -> Option<&'a mut DeviceArrayStreamPrivateData> { + let stream = unsafe { stream.as_mut()? }; + unsafe { + stream + .private_data + .cast::() + .as_mut() + } +} + +fn released_device_array(device_id: i64) -> ArrowDeviceArray { + ArrowDeviceArray { + array: ArrowArray::empty(), + device_id, + device_type: ARROW_DEVICE_CUDA, + sync_event: ptr::null_mut(), + reserved: Default::default(), + } +} + +fn release_schema(schema: &mut FFI_ArrowSchema) { + if let Some(release) = schema.release { + unsafe { release(schema) }; + } +} + +fn release_device_array(array: &mut ArrowDeviceArray) { + if let Some(release) = array.array.release { + unsafe { release(&raw mut array.array) }; + } +} + +unsafe extern "C" fn device_stream_get_schema( + stream: *mut ArrowDeviceArrayStream, + out: *mut ArrowSchema, +) -> c_int { + let Some(state) = (unsafe { device_stream_private_data(stream) }) else { + return ARROW_STREAM_EINVAL; + }; + state.clear_error(); + + if out.is_null() { + return state.set_error("null ArrowSchema output"); + } + + match catch_unwind(AssertUnwindSafe(|| state.ensure_schema()?.to_ffi())) { + Ok(Ok(schema)) => { + unsafe { ptr::write(out.cast::(), schema) }; + 0 + } + Ok(Err(err)) => state.set_error(err), + Err(_) => state.set_error("panic in ArrowDeviceArrayStream::get_schema"), + } +} + +unsafe extern "C" fn device_stream_get_next( + stream: *mut ArrowDeviceArrayStream, + out: *mut ArrowDeviceArray, +) -> c_int { + let Some(state) = (unsafe { device_stream_private_data(stream) }) else { + return ARROW_STREAM_EINVAL; + }; + state.clear_error(); + + if out.is_null() { + return state.set_error("null ArrowDeviceArray output"); + } + + match catch_unwind(AssertUnwindSafe(|| -> VortexResult<()> { + match state.next_array()? { + Some(array) => unsafe { ptr::write(out, array) }, + None => unsafe { ptr::write(out, released_device_array(state.device_id)) }, + } + Ok(()) + })) { + Ok(Ok(())) => 0, + Ok(Err(err)) => state.set_error(err), + Err(_) => state.set_error("panic in ArrowDeviceArrayStream::get_next"), + } +} + +unsafe extern "C" fn device_stream_get_last_error( + stream: *mut ArrowDeviceArrayStream, +) -> *const c_char { + let Some(state) = (unsafe { device_stream_private_data(stream) }) else { + return ptr::null(); + }; + + state + .last_error + .as_ref() + .map_or(ptr::null(), |error| error.as_ptr()) +} + +unsafe extern "C" fn device_stream_release(stream: *mut ArrowDeviceArrayStream) { + let Some(stream_ref) = (unsafe { stream.as_mut() }) else { + return; + }; + if stream_ref.release.is_none() { + return; + } + + stream_ref.get_schema = None; + stream_ref.get_next = None; + stream_ref.get_last_error = None; + stream_ref.release = None; + + if !stream_ref.private_data.is_null() { + unsafe { + drop(Box::from_raw( + stream_ref + .private_data + .cast::(), + )); + } + stream_ref.private_data = ptr::null_mut(); + } +} + /// Build the Arrow C schema that describes the exported device array. pub(crate) fn arrow_schema_for_array( array: &ArrayRef, @@ -472,3 +838,97 @@ pub trait ExportDeviceArray: Debug + Send + Sync + 'static { Ok(ArrowDeviceArrayWithSchema { schema, array }) } } + +#[cfg(test)] +mod tests { + use arrow_schema::DataType; + use arrow_schema::ffi::FFI_ArrowSchema; + use vortex::VortexSessionDefault; + use vortex::array::IntoArray; + use vortex::array::arrays::PrimitiveArray; + use vortex::array::stream::ArrayStreamExt; + use vortex::error::VortexResult; + use vortex::error::vortex_err; + use vortex::session::VortexSession; + use vortex_cuda_macros::test as cuda_test; + + use crate::CudaSession; + use crate::arrow::ARROW_DEVICE_CUDA; + use crate::arrow::ArrowArray; + use crate::arrow::ArrowDeviceArray; + use crate::arrow::ArrowSchema; + use crate::arrow::DeviceArrayStreamExt; + + unsafe fn release_schema(schema: &mut FFI_ArrowSchema) { + if let Some(release) = schema.release { + unsafe { release(schema) }; + } + } + + unsafe fn release_device_array(array: &mut ArrowDeviceArray) { + if let Some(release) = array.array.release { + unsafe { release(&raw mut array.array) }; + } + } + + fn empty_device_array() -> ArrowDeviceArray { + ArrowDeviceArray { + array: ArrowArray::empty(), + device_id: 0, + device_type: 0, + sync_event: std::ptr::null_mut(), + reserved: [0; 3], + } + } + + #[cuda_test] + fn test_export_device_array_stream_schema_next_eos_release() -> VortexResult<()> { + let session = VortexSession::default().with_some(CudaSession::try_default()?); + let array = PrimitiveArray::from_iter(0u32..5).into_array(); + let stream = array.to_array_stream().boxed(); + let mut device_stream = stream.export_device_array_stream(&session)?; + assert_eq!(device_stream.device_type, ARROW_DEVICE_CUDA); + + let mut schema = FFI_ArrowSchema::empty(); + let get_schema = device_stream + .get_schema + .ok_or_else(|| vortex_err!("stream missing get_schema callback"))?; + let status = unsafe { + get_schema( + &raw mut device_stream, + (&raw mut schema).cast::(), + ) + }; + assert_eq!(status, 0); + let field = arrow_schema::Field::try_from(&schema)?; + assert_eq!(field.data_type(), &DataType::UInt32); + + let get_next = device_stream + .get_next + .ok_or_else(|| vortex_err!("stream missing get_next callback"))?; + let mut first_batch = empty_device_array(); + let status = unsafe { get_next(&raw mut device_stream, &raw mut first_batch) }; + assert_eq!(status, 0); + assert_eq!(first_batch.device_type, ARROW_DEVICE_CUDA); + assert_eq!(first_batch.array.length, 5); + assert!(first_batch.array.release.is_some()); + + let mut eos = empty_device_array(); + let status = unsafe { get_next(&raw mut device_stream, &raw mut eos) }; + assert_eq!(status, 0); + assert_eq!(eos.device_type, ARROW_DEVICE_CUDA); + assert!(eos.array.release.is_none()); + + unsafe { + release_device_array(&mut first_batch); + release_schema(&mut schema); + let release = device_stream + .release + .ok_or_else(|| vortex_err!("stream missing release callback"))?; + release(&raw mut device_stream); + release(&raw mut device_stream); + } + assert!(device_stream.release.is_none()); + Ok(()) + } +} diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index 706b9c3c2bd..427c1627e95 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -25,7 +25,9 @@ mod stream_pool; pub use arrow::ArrowDeviceArrayWithSchema; pub use arrow::DeviceArrayExt; +pub use arrow::DeviceArrayStreamExt; pub use arrow::ExportDeviceArray; +pub use arrow::export_device_array_stream_from_iter; pub use canonical::CanonicalCudaExt; pub use device_buffer::CudaBufferExt; pub use device_buffer::CudaDeviceBuffer; diff --git a/vortex-ffi/src/lib.rs b/vortex-ffi/src/lib.rs index f4ce314c073..309956dc5aa 100644 --- a/vortex-ffi/src/lib.rs +++ b/vortex-ffi/src/lib.rs @@ -35,6 +35,10 @@ pub use error::try_or; pub use error::vx_error; pub use error::vx_error_free; pub use log::vx_log_level; +pub use scan::VxPartitionArrayIter; +pub use scan::vx_partition; +pub use scan::vx_partition_into_array_iter; +pub use scan::vx_partition_into_array_stream; pub use session::vx_session; pub use session::vx_session_free; pub use session::vx_session_new_with; diff --git a/vortex-ffi/src/scan.rs b/vortex-ffi/src/scan.rs index 921c4f50621..3a827cef526 100644 --- a/vortex-ffi/src/scan.rs +++ b/vortex-ffi/src/scan.rs @@ -22,6 +22,7 @@ use vortex::array::arrow::ArrowSessionExt; use vortex::array::expr::stats::Precision; use vortex::array::stream::SendableArrayStream; use vortex::buffer::Buffer; +use vortex::dtype::DType; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_ensure; @@ -66,6 +67,41 @@ crate::box_wrapper!( VxPartitionScan, vx_partition); +/// A blocking iterator over arrays produced by a consumed partition. +pub type VxPartitionArrayIter = Box>>; + +/// Consume an owned partition pointer and return the partition's Vortex array stream. +/// +/// # Safety +/// +/// `partition` must be a non-null owned partition handle created by `vortex-ffi`. This function +/// consumes the handle; callers must not use or free it after calling this function. +pub unsafe fn vx_partition_into_array_stream( + partition: *mut vx_partition, +) -> VortexResult { + vortex_ensure!(!partition.is_null(), "null vx_partition"); + match *vx_partition::into_box(partition) { + VxPartitionScan::Pending(partition) => partition.execute(), + _ => vortex_bail!("partition already being consumed"), + } +} + +/// Consume an owned partition pointer and return a runtime-driven blocking array iterator. +/// +/// The returned iterator drives the same FFI runtime used by the other partition scan entry points. +/// +/// # Safety +/// +/// `partition` must be a non-null owned partition handle created by `vortex-ffi`. This function +/// consumes the handle; callers must not use or free it after calling this function. +pub unsafe fn vx_partition_into_array_iter( + partition: *mut vx_partition, +) -> VortexResult<(DType, VxPartitionArrayIter)> { + let array_stream = unsafe { vx_partition_into_array_stream(partition) }?; + let dtype = array_stream.dtype().clone(); + Ok((dtype, Box::new(RUNTIME.block_on_stream(array_stream)))) +} + // We parse Selection from vx_scan_selection[_include], so we don't need // to instantiate VX_SELECTION_* items directly. #[repr(C)] From e61249b25ffccd1d467d238822adba60629c6c05 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 17 Jun 2026 19:09:59 +0000 Subject: [PATCH 02/25] Test CUDA Arrow device array streams with cuDF harness Signed-off-by: Alexander Droste --- .../e2e-cuda/src/bin/cudf_harness_runner.rs | 39 ++++---- vortex-test/e2e-cuda/src/lib.rs | 97 +++++++++++++------ 2 files changed, 91 insertions(+), 45 deletions(-) diff --git a/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs b/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs index 17678d2800c..76e73190568 100644 --- a/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs +++ b/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs @@ -9,6 +9,7 @@ const PRIMITIVE_DTYPES: &[&str] = &[ "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", ]; const PRIMITIVE_DTYPE_ENV: &str = "VORTEX_CUDF_PRIMITIVE_DTYPE"; +const HARNESS_COMMANDS: &[&str] = &["check", "check-stream"]; fn main() -> ExitCode { let args = env::args().collect::>(); @@ -21,25 +22,29 @@ fn main() -> ExitCode { }; for primitive_dtype in PRIMITIVE_DTYPES { - eprintln!("running {program} with {PRIMITIVE_DTYPE_ENV}={primitive_dtype}"); + for harness_command in HARNESS_COMMANDS { + eprintln!( + "running {program} {harness_command} with {PRIMITIVE_DTYPE_ENV}={primitive_dtype}" + ); - let status = Command::new("compute-sanitizer") - .args(["--tool", "memcheck", "--error-exitcode", "1"]) - .arg(harness) - .arg("check") - .arg(library) - .env(PRIMITIVE_DTYPE_ENV, primitive_dtype) - .status(); + let status = Command::new("compute-sanitizer") + .args(["--tool", "memcheck", "--error-exitcode", "1"]) + .arg(harness) + .arg(harness_command) + .arg(library) + .env(PRIMITIVE_DTYPE_ENV, primitive_dtype) + .status(); - match status { - Ok(status) if status.success() => {} - Ok(status) => { - eprintln!("cudf-test-harness failed with {status}"); - return ExitCode::from(1); - } - Err(err) => { - eprintln!("failed to run cudf-test-harness: {err}"); - return ExitCode::from(1); + match status { + Ok(status) if status.success() => {} + Ok(status) => { + eprintln!("cudf-test-harness {harness_command} failed with {status}"); + return ExitCode::from(1); + } + Err(err) => { + eprintln!("failed to run cudf-test-harness {harness_command}: {err}"); + return ExitCode::from(1); + } } } } diff --git a/vortex-test/e2e-cuda/src/lib.rs b/vortex-test/e2e-cuda/src/lib.rs index 7a2e062ac2d..48b74d218b1 100644 --- a/vortex-test/e2e-cuda/src/lib.rs +++ b/vortex-test/e2e-cuda/src/lib.rs @@ -46,6 +46,7 @@ use vortex::array::arrays::TemporalArray; use vortex::array::arrays::VarBinViewArray; use vortex::array::arrays::varbinview::BinaryView; use vortex::array::arrow::ArrowSessionExt; +use vortex::array::stream::ArrayStreamExt; use vortex::array::validity::Validity; use vortex::buffer::Buffer; use vortex::buffer::ByteBuffer; @@ -60,7 +61,9 @@ use vortex::layout::session::LayoutSession; use vortex::session::VortexSession; use vortex_cuda::CudaSession; use vortex_cuda::arrow::ArrowDeviceArray; +use vortex_cuda::arrow::ArrowDeviceArrayStream; use vortex_cuda::arrow::DeviceArrayExt; +use vortex_cuda::arrow::DeviceArrayStreamExt; const PRIMITIVE_DTYPE_ENV: &str = "VORTEX_CUDF_PRIMITIVE_DTYPE"; @@ -216,32 +219,8 @@ fn dictionary_array() -> VortexArrayRef { .into_array() } -/// # Safety -/// `schema_ptr` and `array_ptr` must be valid writable pointers. -#[unsafe(no_mangle)] -pub unsafe extern "C" fn export_array( - schema_ptr: &mut FFI_ArrowSchema, - array_ptr: &mut ArrowDeviceArray, -) -> i32 { - ffi_boundary("export_array", || export_array_inner(schema_ptr, array_ptr)) -} - -fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDeviceArray) -> i32 { - let mut ctx = match CudaSession::create_execution_ctx(&SESSION) { - Ok(ctx) => ctx, - Err(err) => { - eprintln!("error creating CUDA execution context: {err}"); - return 1; - } - }; - - let primitive = match primitive_array() { - Ok(array) => array, - Err(err) => { - eprintln!("error in export_array: {err}"); - return 1; - } - }; +fn cudf_test_array() -> Result { + let primitive = primitive_array()?; // cuDF supports Arrow decimal device imports through Decimal128. Decimal256 is intentionally // not included here because cuDF has no DECIMAL256 type_id or Arrow interop mapping. let decimal32 = DecimalArray::from_option_iter( @@ -269,7 +248,7 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev TimeUnit::Days, ); - let array = StructArray::new( + Ok(StructArray::new( FieldNames::from_iter([ "prims", "bools", @@ -310,7 +289,35 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev 5, Validity::NonNullable, ) - .into_array(); + .into_array()) +} + +/// # Safety +/// `schema_ptr` and `array_ptr` must be valid writable pointers. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn export_array( + schema_ptr: &mut FFI_ArrowSchema, + array_ptr: &mut ArrowDeviceArray, +) -> i32 { + ffi_boundary("export_array", || export_array_inner(schema_ptr, array_ptr)) +} + +fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDeviceArray) -> i32 { + let mut ctx = match CudaSession::create_execution_ctx(&SESSION) { + Ok(ctx) => ctx, + Err(err) => { + eprintln!("error creating CUDA execution context: {err}"); + return 1; + } + }; + + let array = match cudf_test_array() { + Ok(array) => array, + Err(err) => { + eprintln!("error in export_array: {err}"); + return 1; + } + }; match block_on(array.export_device_array_with_schema(&mut ctx)) { Ok(exported) => { @@ -325,6 +332,40 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev } } +/// # Safety +/// `stream_ptr` must be a valid writable pointer. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn export_device_stream(stream_ptr: &mut ArrowDeviceArrayStream) -> i32 { + ffi_boundary("export_device_stream", || { + export_device_stream_inner(stream_ptr) + }) +} + +fn export_device_stream_inner(stream_ptr: &mut ArrowDeviceArrayStream) -> i32 { + let array = match cudf_test_array() { + Ok(array) => array, + Err(err) => { + eprintln!("error in export_device_stream: {err}"); + return 1; + } + }; + + match array + .to_array_stream() + .boxed() + .export_device_array_stream(&SESSION) + { + Ok(stream) => { + *stream_ptr = stream; + 0 + } + Err(err) => { + eprintln!("error in export_device_array_stream: {err}"); + 1 + } + } +} + /// # Safety /// `ffi_schema` and `ffi_array` must describe a valid Arrow C Data array. #[unsafe(no_mangle)] From 6d11e48f247f1d29995c7ed7a91e2cf5df96d526 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 17 Jun 2026 19:23:26 +0000 Subject: [PATCH 03/25] Document cuDF device stream harness Signed-off-by: Alexander Droste --- vortex-cuda/README.md | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/vortex-cuda/README.md b/vortex-cuda/README.md index d71ad593ce7..5bd08edc945 100644 --- a/vortex-cuda/README.md +++ b/vortex-cuda/README.md @@ -39,3 +39,32 @@ cmake -S cpp -B cpp/build \ -DCMAKE_EXE_LINKER_FLAGS="-Wl,--stub-group-size=1048576" \ -GNinja && cmake --build cpp/build --target INTEROP_TEST --parallel ``` + +## Running the cuDF test harness + +```sh +cargo build -p vortex-test-e2e-cuda +cmake --build /path/to/cudf-test-harness/build --target cudf-test-harness --parallel + +/path/to/cudf-test-harness/build/cudf-test-harness \ + check-stream \ + target/debug/libvortex_test_e2e_cuda.so +``` + +To run both `check` and `check-stream` under `compute-sanitizer` for all primitive dtypes: + +```sh +target/debug/cudf_harness_runner \ + /path/to/cudf-test-harness/build/cudf-test-harness \ + target/debug/libvortex_test_e2e_cuda.so +``` + +If cuDF fails with `cudaErrorInsufficientDriver` when using CUDA 13, use the compatibility driver +libraries: + +```sh +LD_LIBRARY_PATH=/usr/local/cuda-13.1/compat \ + target/debug/cudf_harness_runner \ + /path/to/cudf-test-harness/build/cudf-test-harness \ + target/debug/libvortex_test_e2e_cuda.so +``` From e53feae6ffadc5e1eb03cf3d27deb2b2028aede8 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 17 Jun 2026 19:24:23 +0000 Subject: [PATCH 04/25] Simplify cuDF harness README command Signed-off-by: Alexander Droste --- vortex-cuda/README.md | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/vortex-cuda/README.md b/vortex-cuda/README.md index 5bd08edc945..4024372adf3 100644 --- a/vortex-cuda/README.md +++ b/vortex-cuda/README.md @@ -46,23 +46,6 @@ cmake -S cpp -B cpp/build \ cargo build -p vortex-test-e2e-cuda cmake --build /path/to/cudf-test-harness/build --target cudf-test-harness --parallel -/path/to/cudf-test-harness/build/cudf-test-harness \ - check-stream \ - target/debug/libvortex_test_e2e_cuda.so -``` - -To run both `check` and `check-stream` under `compute-sanitizer` for all primitive dtypes: - -```sh -target/debug/cudf_harness_runner \ - /path/to/cudf-test-harness/build/cudf-test-harness \ - target/debug/libvortex_test_e2e_cuda.so -``` - -If cuDF fails with `cudaErrorInsufficientDriver` when using CUDA 13, use the compatibility driver -libraries: - -```sh LD_LIBRARY_PATH=/usr/local/cuda-13.1/compat \ target/debug/cudf_harness_runner \ /path/to/cudf-test-harness/build/cudf-test-harness \ From 7330751765609cb2b2fc2d8def25852e7c91cc1a Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 17 Jun 2026 19:52:38 +0000 Subject: [PATCH 05/25] Avoid consuming CUDA partition before stream setup Signed-off-by: Alexander Droste --- vortex-cuda/ffi/src/lib.rs | 3 ++- vortex-cuda/src/arrow/mod.rs | 20 ++++++++++++++++++-- vortex-cuda/src/lib.rs | 1 + 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index a807aeb1f13..bf39001b96f 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -130,9 +130,10 @@ pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device( vortex_ensure!(!out_stream.is_null(), "null ArrowDeviceArrayStream output"); let session = session_with_cuda(unsafe { vx_session_ref(session) }?)?; + let ctx = CudaSession::create_execution_ctx(&session)?; let (dtype, array_iter) = unsafe { vx_partition_into_array_iter(partition) }?; let device_stream = - vortex_cuda::export_device_array_stream_from_iter(array_iter, dtype, &session)?; + vortex_cuda::export_device_array_stream_from_iter_with_ctx(array_iter, dtype, ctx); unsafe { ptr::write(out_stream, device_stream) }; Ok(VX_CUDA_OK) diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 455037db63f..5dfa2371cd5 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -465,6 +465,22 @@ pub fn export_device_array_stream_from_iter( session: &VortexSession, ) -> VortexResult { let ctx = crate::CudaSession::create_execution_ctx(session)?; + Ok(export_device_array_stream_from_iter_with_ctx( + array_iter, dtype, ctx, + )) +} + +/// Export a blocking Vortex array iterator as an [`ArrowDeviceArrayStream`] using an existing CUDA +/// execution context. +/// +/// This is useful for FFI entry points that must finish all fallible CUDA initialization before +/// consuming an owned input handle. Each yielded array must have `dtype`; every exported batch is +/// validated to stay on the CUDA device selected by `ctx`. +pub fn export_device_array_stream_from_iter_with_ctx( + array_iter: impl Iterator> + 'static, + dtype: DType, + ctx: CudaExecutionCtx, +) -> ArrowDeviceArrayStream { let device_id = ctx.stream().context().ordinal() as i64; let private_data = Box::new(DeviceArrayStreamPrivateData { @@ -477,14 +493,14 @@ pub fn export_device_array_stream_from_iter( last_error: None, }); - Ok(ArrowDeviceArrayStream { + ArrowDeviceArrayStream { device_type: ARROW_DEVICE_CUDA, get_schema: Some(device_stream_get_schema), get_next: Some(device_stream_get_next), get_last_error: Some(device_stream_get_last_error), release: Some(device_stream_release), private_data: Box::into_raw(private_data).cast(), - }) + } } unsafe fn device_stream_private_data<'a>( diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index 427c1627e95..cafb8e63c52 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -28,6 +28,7 @@ pub use arrow::DeviceArrayExt; pub use arrow::DeviceArrayStreamExt; pub use arrow::ExportDeviceArray; pub use arrow::export_device_array_stream_from_iter; +pub use arrow::export_device_array_stream_from_iter_with_ctx; pub use canonical::CanonicalCudaExt; pub use device_buffer::CudaBufferExt; pub use device_buffer::CudaDeviceBuffer; From d82937d10dc6db00eb9ef133c037e621012e0858 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 17 Jun 2026 19:57:06 +0000 Subject: [PATCH 06/25] Improve cuDF harness runner output Signed-off-by: Alexander Droste --- vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs b/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs index 76e73190568..a7ed5d06a55 100644 --- a/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs +++ b/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs @@ -13,7 +13,7 @@ const HARNESS_COMMANDS: &[&str] = &["check", "check-stream"]; fn main() -> ExitCode { let args = env::args().collect::>(); - let [program, harness, library] = args.as_slice() else { + let [_program, harness, library] = args.as_slice() else { eprintln!( "Usage: {} ", args.first().map_or("cudf_harness_runner", String::as_str) @@ -23,9 +23,7 @@ fn main() -> ExitCode { for primitive_dtype in PRIMITIVE_DTYPES { for harness_command in HARNESS_COMMANDS { - eprintln!( - "running {program} {harness_command} with {PRIMITIVE_DTYPE_ENV}={primitive_dtype}" - ); + eprintln!("\n== {PRIMITIVE_DTYPE_ENV}={primitive_dtype} :: {harness_command} =="); let status = Command::new("compute-sanitizer") .args(["--tool", "memcheck", "--error-exitcode", "1"]) From 52952d293389ee6152a08939537b3ae3abd0377a Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 17 Jun 2026 20:17:20 +0000 Subject: [PATCH 07/25] Wait for cuDF harness release asset Signed-off-by: Alexander Droste --- .github/workflows/cuda.yaml | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cuda.yaml b/.github/workflows/cuda.yaml index a8b702188c3..842b3727fee 100644 --- a/.github/workflows/cuda.yaml +++ b/.github/workflows/cuda.yaml @@ -164,7 +164,7 @@ jobs: always() && github.repository == 'vortex-data/vortex' && needs.changes.outputs.run-cuda-san == 'true' name: "CUDA tests (cudf)" - timeout-minutes: 30 + timeout-minutes: 60 runs-on: runs-on=${{ github.run_id }}/runner=gpu/tag=cuda-test-cudf steps: - uses: runs-on/action@v2 @@ -182,8 +182,24 @@ jobs: - name: Build cudf test library run: cargo build --profile ci --locked -p vortex-test-e2e-cuda --target x86_64-unknown-linux-gnu - name: Download and run cudf-test-harness + env: + HARNESS_URL: >- + https://github.com/vortex-data/cudf-test-harness/releases/latest/download/cudf-test-harness-x86_64.tar.gz + MAX_HARNESS_WAIT_SECONDS: 1800 + HARNESS_RETRY_SECONDS: 30 run: | - curl -fsSL https://github.com/vortex-data/cudf-test-harness/releases/latest/download/cudf-test-harness-x86_64.tar.gz | tar -xz + deadline=$((SECONDS + MAX_HARNESS_WAIT_SECONDS)) + until curl -fsSL --output cudf-test-harness-x86_64.tar.gz "$HARNESS_URL"; do + if ((SECONDS >= deadline)); then + echo "cudf-test-harness release asset was not available" >&2 + exit 1 + fi + + echo "cudf-test-harness release asset not available yet; retrying" + sleep "$HARNESS_RETRY_SECONDS" + done + + tar -xzf cudf-test-harness-x86_64.tar.gz $GITHUB_WORKSPACE/target/x86_64-unknown-linux-gnu/ci/cudf_harness_runner \ ./cudf-test-harness-x86_64/cudf-test-harness \ $GITHUB_WORKSPACE/target/x86_64-unknown-linux-gnu/ci/libvortex_test_e2e_cuda.so From 0722ada226e6c3c1671439d0184227cd8684f538 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 17 Jun 2026 20:17:46 +0000 Subject: [PATCH 08/25] Revert "Wait for cuDF harness release asset" This reverts commit 52952d293389ee6152a08939537b3ae3abd0377a. Signed-off-by: Alexander Droste --- .github/workflows/cuda.yaml | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/.github/workflows/cuda.yaml b/.github/workflows/cuda.yaml index 842b3727fee..a8b702188c3 100644 --- a/.github/workflows/cuda.yaml +++ b/.github/workflows/cuda.yaml @@ -164,7 +164,7 @@ jobs: always() && github.repository == 'vortex-data/vortex' && needs.changes.outputs.run-cuda-san == 'true' name: "CUDA tests (cudf)" - timeout-minutes: 60 + timeout-minutes: 30 runs-on: runs-on=${{ github.run_id }}/runner=gpu/tag=cuda-test-cudf steps: - uses: runs-on/action@v2 @@ -182,24 +182,8 @@ jobs: - name: Build cudf test library run: cargo build --profile ci --locked -p vortex-test-e2e-cuda --target x86_64-unknown-linux-gnu - name: Download and run cudf-test-harness - env: - HARNESS_URL: >- - https://github.com/vortex-data/cudf-test-harness/releases/latest/download/cudf-test-harness-x86_64.tar.gz - MAX_HARNESS_WAIT_SECONDS: 1800 - HARNESS_RETRY_SECONDS: 30 run: | - deadline=$((SECONDS + MAX_HARNESS_WAIT_SECONDS)) - until curl -fsSL --output cudf-test-harness-x86_64.tar.gz "$HARNESS_URL"; do - if ((SECONDS >= deadline)); then - echo "cudf-test-harness release asset was not available" >&2 - exit 1 - fi - - echo "cudf-test-harness release asset not available yet; retrying" - sleep "$HARNESS_RETRY_SECONDS" - done - - tar -xzf cudf-test-harness-x86_64.tar.gz + curl -fsSL https://github.com/vortex-data/cudf-test-harness/releases/latest/download/cudf-test-harness-x86_64.tar.gz | tar -xz $GITHUB_WORKSPACE/target/x86_64-unknown-linux-gnu/ci/cudf_harness_runner \ ./cudf-test-harness-x86_64/cudf-test-harness \ $GITHUB_WORKSPACE/target/x86_64-unknown-linux-gnu/ci/libvortex_test_e2e_cuda.so From 4ab3b2e652fd1774b4132f8bd6479c2d1862927b Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 17 Jun 2026 21:11:03 +0000 Subject: [PATCH 09/25] Defer cuDF stream harness opt-in Signed-off-by: Alexander Droste --- .../e2e-cuda/src/bin/cudf_harness_runner.rs | 37 +++++++++---------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs b/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs index a7ed5d06a55..7babf15948a 100644 --- a/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs +++ b/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs @@ -9,7 +9,6 @@ const PRIMITIVE_DTYPES: &[&str] = &[ "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", ]; const PRIMITIVE_DTYPE_ENV: &str = "VORTEX_CUDF_PRIMITIVE_DTYPE"; -const HARNESS_COMMANDS: &[&str] = &["check", "check-stream"]; fn main() -> ExitCode { let args = env::args().collect::>(); @@ -22,27 +21,25 @@ fn main() -> ExitCode { }; for primitive_dtype in PRIMITIVE_DTYPES { - for harness_command in HARNESS_COMMANDS { - eprintln!("\n== {PRIMITIVE_DTYPE_ENV}={primitive_dtype} :: {harness_command} =="); + eprintln!("\n== {PRIMITIVE_DTYPE_ENV}={primitive_dtype} :: check =="); - let status = Command::new("compute-sanitizer") - .args(["--tool", "memcheck", "--error-exitcode", "1"]) - .arg(harness) - .arg(harness_command) - .arg(library) - .env(PRIMITIVE_DTYPE_ENV, primitive_dtype) - .status(); + let status = Command::new("compute-sanitizer") + .args(["--tool", "memcheck", "--error-exitcode", "1"]) + .arg(harness) + .arg("check") + .arg(library) + .env(PRIMITIVE_DTYPE_ENV, primitive_dtype) + .status(); - match status { - Ok(status) if status.success() => {} - Ok(status) => { - eprintln!("cudf-test-harness {harness_command} failed with {status}"); - return ExitCode::from(1); - } - Err(err) => { - eprintln!("failed to run cudf-test-harness {harness_command}: {err}"); - return ExitCode::from(1); - } + match status { + Ok(status) if status.success() => {} + Ok(status) => { + eprintln!("cudf-test-harness failed with {status}"); + return ExitCode::from(1); + } + Err(err) => { + eprintln!("failed to run cudf-test-harness: {err}"); + return ExitCode::from(1); } } } From b2b0cbd8594f0e53f849a2786ecf33f2016bc409 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 17 Jun 2026 21:28:21 +0000 Subject: [PATCH 10/25] Run cuDF device stream harness in CI Signed-off-by: Alexander Droste --- .../e2e-cuda/src/bin/cudf_harness_runner.rs | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs b/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs index 7babf15948a..a7ed5d06a55 100644 --- a/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs +++ b/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs @@ -9,6 +9,7 @@ const PRIMITIVE_DTYPES: &[&str] = &[ "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", ]; const PRIMITIVE_DTYPE_ENV: &str = "VORTEX_CUDF_PRIMITIVE_DTYPE"; +const HARNESS_COMMANDS: &[&str] = &["check", "check-stream"]; fn main() -> ExitCode { let args = env::args().collect::>(); @@ -21,25 +22,27 @@ fn main() -> ExitCode { }; for primitive_dtype in PRIMITIVE_DTYPES { - eprintln!("\n== {PRIMITIVE_DTYPE_ENV}={primitive_dtype} :: check =="); + for harness_command in HARNESS_COMMANDS { + eprintln!("\n== {PRIMITIVE_DTYPE_ENV}={primitive_dtype} :: {harness_command} =="); - let status = Command::new("compute-sanitizer") - .args(["--tool", "memcheck", "--error-exitcode", "1"]) - .arg(harness) - .arg("check") - .arg(library) - .env(PRIMITIVE_DTYPE_ENV, primitive_dtype) - .status(); + let status = Command::new("compute-sanitizer") + .args(["--tool", "memcheck", "--error-exitcode", "1"]) + .arg(harness) + .arg(harness_command) + .arg(library) + .env(PRIMITIVE_DTYPE_ENV, primitive_dtype) + .status(); - match status { - Ok(status) if status.success() => {} - Ok(status) => { - eprintln!("cudf-test-harness failed with {status}"); - return ExitCode::from(1); - } - Err(err) => { - eprintln!("failed to run cudf-test-harness: {err}"); - return ExitCode::from(1); + match status { + Ok(status) if status.success() => {} + Ok(status) => { + eprintln!("cudf-test-harness {harness_command} failed with {status}"); + return ExitCode::from(1); + } + Err(err) => { + eprintln!("failed to run cudf-test-harness {harness_command}: {err}"); + return ExitCode::from(1); + } } } } From a9970b324c352d30d10c5a81ec2215b17248c279 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 07:45:21 +0000 Subject: [PATCH 11/25] Clarify CUDA stream ownership Signed-off-by: Alexander Droste --- vortex-cuda/ffi/cinclude/vortex_cuda.h | 14 ++++++++------ vortex-cuda/ffi/src/lib.rs | 15 +++++++++------ vortex-cuda/src/arrow/mod.rs | 6 +++--- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/vortex-cuda/ffi/cinclude/vortex_cuda.h b/vortex-cuda/ffi/cinclude/vortex_cuda.h index 83aa9988b9e..2032cfa8aaf 100644 --- a/vortex-cuda/ffi/cinclude/vortex_cuda.h +++ b/vortex-cuda/ffi/cinclude/vortex_cuda.h @@ -82,15 +82,17 @@ int vx_cuda_array_export_arrow_device(const vx_session *session, vx_error **error_out); /** - * Scan a Vortex partition as an Arrow C Device stream. + * Consume a Vortex partition and scan it as an Arrow C Device stream. + * + * This function takes ownership of `partition`. Callers must not free or reuse + * it after calling this function, regardless of success or failure. * * On success returns 0 and writes an owned `ArrowDeviceArrayStream` to `out_stream`. The stream owns - * the partition and must be released through its embedded Arrow `release` callback. Each produced - * `ArrowDeviceArray` is exported on one CUDA device and must be released independently by the - * consumer through its embedded `ArrowArray.release` callback. + * the resulting scan iterator and must be released through its embedded Arrow `release` callback. + * Each produced `ArrowDeviceArray` must be released through its embedded `ArrowArray.release` + * callback. * - * On error returns 1 and, when `error_out` is non-NULL, writes a `vx_error` (free with - * `vx_error_free`). If `partition` is consumed by this call, callers must not free or reuse it. + * On error returns 1 and writes a `vx_error` to `error_out` when non-NULL. */ int vx_cuda_partition_scan_arrow_device(const vx_session *session, vx_partition *partition, diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index bf39001b96f..65a4597c91c 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -103,15 +103,18 @@ pub unsafe extern "C-unwind" fn vx_cuda_array_export_arrow_device( }) } -/// Scan a Vortex partition as an Arrow C Device stream. +/// Consume a Vortex partition and scan it as an Arrow C Device stream. +/// +/// This function takes ownership of `partition`. Callers must not free or reuse it after calling +/// this function, regardless of success or failure. /// /// On success returns `0` and writes an owned `ArrowDeviceArrayStream` to `out_stream`. The stream -/// owns the partition and must be released through its embedded Arrow `release` callback. Each -/// produced `ArrowDeviceArray` is exported on one CUDA device and is independently released by the -/// consumer through the embedded `ArrowArray.release` callback. +/// owns the resulting scan iterator. The caller must release the stream through its embedded Arrow +/// `release` callback, and must release each produced `ArrowDeviceArray` through its embedded +/// `ArrowArray.release` callback. /// /// On error returns `1` and, when `error_out` is non-null, writes a `vx_error` (free with -/// `vx_error_free`). If `partition` is consumed by this call, callers must not free or reuse it. +/// `vx_error_free`). /// /// # Safety /// @@ -129,9 +132,9 @@ pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device( vortex_ensure!(!partition.is_null(), "null vx_partition"); vortex_ensure!(!out_stream.is_null(), "null ArrowDeviceArrayStream output"); + let (dtype, array_iter) = unsafe { vx_partition_into_array_iter(partition) }?; let session = session_with_cuda(unsafe { vx_session_ref(session) }?)?; let ctx = CudaSession::create_execution_ctx(&session)?; - let (dtype, array_iter) = unsafe { vx_partition_into_array_iter(partition) }?; let device_stream = vortex_cuda::export_device_array_stream_from_iter_with_ctx(array_iter, dtype, ctx); diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 5dfa2371cd5..1b941d9d044 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -473,9 +473,9 @@ pub fn export_device_array_stream_from_iter( /// Export a blocking Vortex array iterator as an [`ArrowDeviceArrayStream`] using an existing CUDA /// execution context. /// -/// This is useful for FFI entry points that must finish all fallible CUDA initialization before -/// consuming an owned input handle. Each yielded array must have `dtype`; every exported batch is -/// validated to stay on the CUDA device selected by `ctx`. +/// Use this helper when the caller has already selected the CUDA execution context that must drive +/// the exported stream. Each yielded array must have `dtype`; every exported batch is validated to +/// stay on the CUDA device selected by `ctx`. pub fn export_device_array_stream_from_iter_with_ctx( array_iter: impl Iterator> + 'static, dtype: DType, From 0b5fea33e0034d7bdf0044d037507a765be7693c Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 08:08:38 +0000 Subject: [PATCH 12/25] Rename CUDA partition device stream FFI Signed-off-by: Alexander Droste --- vortex-cuda/ffi/cinclude/vortex_cuda.h | 17 +++++++++-------- vortex-cuda/ffi/src/lib.rs | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/vortex-cuda/ffi/cinclude/vortex_cuda.h b/vortex-cuda/ffi/cinclude/vortex_cuda.h index 2032cfa8aaf..0b06a660afa 100644 --- a/vortex-cuda/ffi/cinclude/vortex_cuda.h +++ b/vortex-cuda/ffi/cinclude/vortex_cuda.h @@ -87,17 +87,18 @@ int vx_cuda_array_export_arrow_device(const vx_session *session, * This function takes ownership of `partition`. Callers must not free or reuse * it after calling this function, regardless of success or failure. * - * On success returns 0 and writes an owned `ArrowDeviceArrayStream` to `out_stream`. The stream owns - * the resulting scan iterator and must be released through its embedded Arrow `release` callback. - * Each produced `ArrowDeviceArray` must be released through its embedded `ArrowArray.release` - * callback. + * On success returns 0 and writes an owned `ArrowDeviceArrayStream` to + * `out_stream`. The stream owns the resulting scan iterator. The caller must + * release the stream through its embedded Arrow `release` callback, and must + * release each produced `ArrowDeviceArray` through its embedded + * `ArrowArray.release` callback. * * On error returns 1 and writes a `vx_error` to `error_out` when non-NULL. */ -int vx_cuda_partition_scan_arrow_device(const vx_session *session, - vx_partition *partition, - struct ArrowDeviceArrayStream *out_stream, - vx_error **error_out); +int vx_cuda_partition_scan_arrow_device_stream(const vx_session *session, + vx_partition *partition, + struct ArrowDeviceArrayStream *out_stream, + vx_error **error_out); #ifdef __cplusplus } diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index 65a4597c91c..e91a8ab8318 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -122,7 +122,7 @@ pub unsafe extern "C-unwind" fn vx_cuda_array_export_arrow_device( /// partition handle created by `vortex-ffi`. `out_stream` must be a valid writable pointer. If /// `error_out` is non-null, it must be valid for writing one error pointer. #[unsafe(no_mangle)] -pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device( +pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device_stream( session: *const vx_session, partition: *mut vx_partition, out_stream: *mut ArrowDeviceArrayStream, From 581d260f11b71370282d10a40358e45232fe1014 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 09:12:05 +0000 Subject: [PATCH 13/25] Document CUDA stream helpers Signed-off-by: Alexander Droste --- vortex-cuda/ffi/src/lib.rs | 10 +-- vortex-cuda/src/arrow/mod.rs | 148 ++++++++++++++++++++++---------- vortex-ffi/src/lib.rs | 2 - vortex-ffi/src/scan.rs | 20 ----- vortex-test/e2e-cuda/src/lib.rs | 7 ++ 5 files changed, 114 insertions(+), 73 deletions(-) diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index e91a8ab8318..70da3fd13fb 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -19,12 +19,13 @@ use vortex_cuda::CudaSession; use vortex_cuda::arrow::ArrowDeviceArray; use vortex_cuda::arrow::ArrowDeviceArrayStream; use vortex_cuda::arrow::DeviceArrayExt; +use vortex_cuda::arrow::DeviceArrayStreamExt; use vortex_ffi::try_or; use vortex_ffi::vx_array; use vortex_ffi::vx_array_ref; use vortex_ffi::vx_error; use vortex_ffi::vx_partition; -use vortex_ffi::vx_partition_into_array_iter; +use vortex_ffi::vx_partition_into_array_stream; use vortex_ffi::vx_session; use vortex_ffi::vx_session_new_with; use vortex_ffi::vx_session_ref; @@ -32,6 +33,7 @@ use vortex_ffi::vx_session_ref; const VX_CUDA_OK: c_int = 0; const VX_CUDA_ERR: c_int = 1; +/// Return a session with CUDA state, adding default CUDA support when needed. fn session_with_cuda(session: &VortexSession) -> VortexResult { if session.get_opt::().is_some() { return Ok(session.clone()); @@ -132,11 +134,9 @@ pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device_stream( vortex_ensure!(!partition.is_null(), "null vx_partition"); vortex_ensure!(!out_stream.is_null(), "null ArrowDeviceArrayStream output"); - let (dtype, array_iter) = unsafe { vx_partition_into_array_iter(partition) }?; + let array_stream = unsafe { vx_partition_into_array_stream(partition) }?; let session = session_with_cuda(unsafe { vx_session_ref(session) }?)?; - let ctx = CudaSession::create_execution_ctx(&session)?; - let device_stream = - vortex_cuda::export_device_array_stream_from_iter_with_ctx(array_iter, dtype, ctx); + let device_stream = array_stream.export_device_array_stream(&session)?; unsafe { ptr::write(out_stream, device_stream) }; Ok(VX_CUDA_OK) diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 1b941d9d044..c137d2e94cc 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -20,6 +20,7 @@ use std::panic::AssertUnwindSafe; use std::panic::catch_unwind; use std::ptr; use std::sync::Arc; +use std::sync::LazyLock; use arrow_schema::DataType; use arrow_schema::Field; @@ -30,7 +31,6 @@ pub(crate) use canonical::CanonicalDeviceArrayExport; use cudarc::driver::CudaEvent; use cudarc::driver::CudaStream; use cudarc::runtime::sys::cudaEvent_t; -use futures::StreamExt; use vortex::array::ArrayRef; use vortex::array::arrays::Dict; use vortex::array::arrays::FixedSizeList; @@ -53,6 +53,9 @@ use vortex::dtype::StructFields; use vortex::error::VortexResult; use vortex::error::vortex_ensure; use vortex::error::vortex_err; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::current::CurrentThreadRuntime; +use vortex::io::runtime::current::CurrentThreadWorkerPool; use vortex::session::VortexSession; use crate::CudaBufferExt; @@ -253,16 +256,33 @@ impl DeviceArrayExt for ArrayRef { } } +// POSIX EIO for Arrow stream producer/export failures. const ARROW_STREAM_EIO: c_int = 5; +// POSIX EINVAL for invalid Arrow stream callback arguments or released streams. const ARROW_STREAM_EINVAL: c_int = 22; +static DEVICE_STREAM_RUNTIME: LazyLock = + LazyLock::new(CurrentThreadRuntime::new); +static DEVICE_STREAM_WORKER_POOL: LazyLock = LazyLock::new(|| { + let pool = DEVICE_STREAM_RUNTIME.new_pool(); + pool.set_workers_to_available_parallelism(); + pool +}); + +/// Return the shared runtime used to drive Vortex streams for Arrow Device export. +fn device_stream_runtime() -> &'static CurrentThreadRuntime { + LazyLock::force(&DEVICE_STREAM_WORKER_POOL); + &DEVICE_STREAM_RUNTIME +} + #[derive(Clone, Debug, PartialEq)] -enum DeviceExportSchema { +enum ArrowDeviceStreamSchema { Schema(Schema), Field(Field), } -impl DeviceExportSchema { +impl ArrowDeviceStreamSchema { + /// Interpret an Arrow C schema as the stream schema shape for `dtype`. fn from_ffi(schema: &FFI_ArrowSchema, dtype: &DType) -> VortexResult { if matches!(dtype, DType::Struct(..)) { Ok(Self::Schema(Schema::try_from(schema)?)) @@ -271,6 +291,7 @@ impl DeviceExportSchema { } } + /// Build the Arrow stream schema for an empty stream with the given dtype. fn from_dtype(dtype: &DType, ctx: &mut CudaExecutionCtx) -> VortexResult { let dtype = arrow_device_export_dtype(dtype); if let DType::Struct(struct_dtype, _) = &dtype { @@ -282,6 +303,7 @@ impl DeviceExportSchema { } } + /// Export this stream schema as an owned Arrow C schema. fn to_ffi(&self) -> VortexResult { match self { Self::Schema(schema) => Ok(FFI_ArrowSchema::try_from(schema)?), @@ -292,40 +314,30 @@ impl DeviceExportSchema { type ArrayStreamIterator = Box>>; -struct FuturesArrayStreamIterator { - stream: SendableArrayStream, -} - -impl Iterator for FuturesArrayStreamIterator { - type Item = VortexResult; - - fn next(&mut self) -> Option { - futures::executor::block_on(self.stream.next()) - } -} - struct DeviceArrayStreamPrivateData { array_iter: ArrayStreamIterator, ctx: CudaExecutionCtx, dtype: DType, - schema: Option, + schema: Option, pending_array: Option, device_id: i64, last_error: Option, } impl DeviceArrayStreamPrivateData { + /// Clear the last stream error before a new callback invocation. fn clear_error(&mut self) { self.last_error = None; } + /// Store the last stream error and return the Arrow callback error code. fn set_error(&mut self, error: impl ToString) -> c_int { - let message = error.to_string().replace('\0', "\\0"); - self.last_error = CString::new(message).ok(); + self.last_error = CString::new(error.to_string()).ok(); ARROW_STREAM_EIO } - fn ensure_schema(&mut self) -> VortexResult<&DeviceExportSchema> { + /// Initialize and return the stream schema, exporting the first batch if needed. + fn ensure_schema(&mut self) -> VortexResult<&ArrowDeviceStreamSchema> { if self.schema.is_none() { match self.array_iter.next() { Some(array) => { @@ -333,7 +345,10 @@ impl DeviceArrayStreamPrivateData { self.pending_array = Some(array); } None => { - self.schema = Some(DeviceExportSchema::from_dtype(&self.dtype, &mut self.ctx)?); + self.schema = Some(ArrowDeviceStreamSchema::from_dtype( + &self.dtype, + &mut self.ctx, + )?); } } } @@ -343,6 +358,7 @@ impl DeviceArrayStreamPrivateData { .ok_or_else(|| vortex_err!("ArrowDeviceArrayStream schema was not initialized")) } + /// Export and return the next device batch, or `None` at end of stream. fn next_array(&mut self) -> VortexResult> { if let Some(array) = self.pending_array.take() { return Ok(Some(array)); @@ -350,7 +366,10 @@ impl DeviceArrayStreamPrivateData { let Some(array) = self.array_iter.next() else { if self.schema.is_none() { - self.schema = Some(DeviceExportSchema::from_dtype(&self.dtype, &mut self.ctx)?); + self.schema = Some(ArrowDeviceStreamSchema::from_dtype( + &self.dtype, + &mut self.ctx, + )?); } return Ok(None); }; @@ -358,6 +377,7 @@ impl DeviceArrayStreamPrivateData { self.export_batch(array?).map(Some) } + /// Export one Vortex stream batch and validate it against the stream schema and device. fn export_batch(&mut self, array: ArrayRef) -> VortexResult { vortex_ensure!( array.dtype() == &self.dtype, @@ -366,14 +386,14 @@ impl DeviceArrayStreamPrivateData { array.dtype() ); - let exported = - futures::executor::block_on(array.export_device_array_with_schema(&mut self.ctx))?; + let exported = device_stream_runtime() + .block_on(array.export_device_array_with_schema(&mut self.ctx))?; let ArrowDeviceArrayWithSchema { - mut schema, + schema: mut ffi_schema, mut array, } = exported; - let batch_schema = DeviceExportSchema::from_ffi(&schema, &self.dtype); - release_schema(&mut schema); + let batch_schema = ArrowDeviceStreamSchema::from_ffi(&ffi_schema, &self.dtype); + release_schema(&mut ffi_schema); let batch_schema = match batch_schema { Ok(batch_schema) => batch_schema, Err(error) => { @@ -383,11 +403,11 @@ impl DeviceArrayStreamPrivateData { }; let validation = (|| -> VortexResult<()> { - if let Some(schema) = &self.schema { + if let Some(stream_schema) = &self.schema { vortex_ensure!( - schema == &batch_schema, + stream_schema == &batch_schema, "stream batch Arrow schema changed from {:?} to {:?}", - schema, + stream_schema, batch_schema ); } else { @@ -418,6 +438,7 @@ impl DeviceArrayStreamPrivateData { } impl Drop for DeviceArrayStreamPrivateData { + /// Release a first batch if `get_schema` exported it and `get_next` never returned it. fn drop(&mut self) { if let Some(mut array) = self.pending_array.take() { release_device_array(&mut array); @@ -440,13 +461,14 @@ pub trait DeviceArrayStreamExt { } impl DeviceArrayStreamExt for SendableArrayStream { + /// Export this stream by driving it on the shared Arrow Device stream runtime. fn export_device_array_stream( self, session: &VortexSession, ) -> VortexResult { let dtype = self.dtype().clone(); export_device_array_stream_from_iter( - FuturesArrayStreamIterator { stream: self }, + device_stream_runtime().block_on_stream(self), dtype, session, ) @@ -503,6 +525,7 @@ pub fn export_device_array_stream_from_iter_with_ctx( } } +/// Return the private stream state for a live Arrow device stream. unsafe fn device_stream_private_data<'a>( stream: *mut ArrowDeviceArrayStream, ) -> Option<&'a mut DeviceArrayStreamPrivateData> { @@ -515,6 +538,7 @@ unsafe fn device_stream_private_data<'a>( } } +/// Create the Arrow end-of-stream marker for the stream's CUDA device. fn released_device_array(device_id: i64) -> ArrowDeviceArray { ArrowDeviceArray { array: ArrowArray::empty(), @@ -525,18 +549,35 @@ fn released_device_array(device_id: i64) -> ArrowDeviceArray { } } +/// Release an Arrow C schema if it is live. fn release_schema(schema: &mut FFI_ArrowSchema) { if let Some(release) = schema.release { unsafe { release(schema) }; } } +/// Release an Arrow device array if it is live. fn release_device_array(array: &mut ArrowDeviceArray) { if let Some(release) = array.array.release { unsafe { release(&raw mut array.array) }; } } +/// Run a stream callback body and convert errors or panics into Arrow status codes. +fn device_stream_callback( + state: &mut DeviceArrayStreamPrivateData, + panic_message: &'static str, + callback: impl FnOnce(&mut DeviceArrayStreamPrivateData) -> VortexResult<()>, +) -> c_int { + let result = catch_unwind(AssertUnwindSafe(|| callback(state))); + match result { + Ok(Ok(())) => 0, + Ok(Err(err)) => state.set_error(err), + Err(_) => state.set_error(panic_message), + } +} + +/// Write the stream's Arrow schema, initializing it from the first batch if necessary. unsafe extern "C" fn device_stream_get_schema( stream: *mut ArrowDeviceArrayStream, out: *mut ArrowSchema, @@ -550,16 +591,20 @@ unsafe extern "C" fn device_stream_get_schema( return state.set_error("null ArrowSchema output"); } - match catch_unwind(AssertUnwindSafe(|| state.ensure_schema()?.to_ffi())) { - Ok(Ok(schema)) => { - unsafe { ptr::write(out.cast::(), schema) }; - 0 - } - Ok(Err(err)) => state.set_error(err), - Err(_) => state.set_error("panic in ArrowDeviceArrayStream::get_schema"), + fn body(state: &mut DeviceArrayStreamPrivateData, out: *mut ArrowSchema) -> VortexResult<()> { + let schema = state.ensure_schema()?.to_ffi()?; + unsafe { ptr::write(out.cast::(), schema) }; + Ok(()) } + + device_stream_callback( + state, + "panic in ArrowDeviceArrayStream::get_schema", + |state| body(state, out), + ) } +/// Write the next exported Arrow device batch, or a released array at end of stream. unsafe extern "C" fn device_stream_get_next( stream: *mut ArrowDeviceArrayStream, out: *mut ArrowDeviceArray, @@ -573,19 +618,25 @@ unsafe extern "C" fn device_stream_get_next( return state.set_error("null ArrowDeviceArray output"); } - match catch_unwind(AssertUnwindSafe(|| -> VortexResult<()> { - match state.next_array()? { - Some(array) => unsafe { ptr::write(out, array) }, - None => unsafe { ptr::write(out, released_device_array(state.device_id)) }, - } + fn body( + state: &mut DeviceArrayStreamPrivateData, + out: *mut ArrowDeviceArray, + ) -> VortexResult<()> { + let array = state + .next_array()? + .unwrap_or_else(|| released_device_array(state.device_id)); + unsafe { ptr::write(out, array) }; Ok(()) - })) { - Ok(Ok(())) => 0, - Ok(Err(err)) => state.set_error(err), - Err(_) => state.set_error("panic in ArrowDeviceArrayStream::get_next"), } + + device_stream_callback( + state, + "panic in ArrowDeviceArrayStream::get_next", + |state| body(state, out), + ) } +/// Return the most recent callback error message, or null if no error is stored. unsafe extern "C" fn device_stream_get_last_error( stream: *mut ArrowDeviceArrayStream, ) -> *const c_char { @@ -599,6 +650,7 @@ unsafe extern "C" fn device_stream_get_last_error( .map_or(ptr::null(), |error| error.as_ptr()) } +/// Release the stream state and clear callbacks so release is idempotent. unsafe extern "C" fn device_stream_release(stream: *mut ArrowDeviceArrayStream) { let Some(stream_ref) = (unsafe { stream.as_mut() }) else { return; @@ -875,18 +927,21 @@ mod tests { use crate::arrow::ArrowSchema; use crate::arrow::DeviceArrayStreamExt; + /// Release an Arrow C schema in stream tests if it is live. unsafe fn release_schema(schema: &mut FFI_ArrowSchema) { if let Some(release) = schema.release { unsafe { release(schema) }; } } + /// Release an Arrow device array in stream tests if it is live. unsafe fn release_device_array(array: &mut ArrowDeviceArray) { if let Some(release) = array.array.release { unsafe { release(&raw mut array.array) }; } } + /// Create a zeroed placeholder Arrow device array for callback outputs. fn empty_device_array() -> ArrowDeviceArray { ArrowDeviceArray { array: ArrowArray::empty(), @@ -897,6 +952,7 @@ mod tests { } } + /// Verify schema, batch, EOS, and idempotent release stream behavior. #[cuda_test] fn test_export_device_array_stream_schema_next_eos_release() -> VortexResult<()> { let session = VortexSession::default().with_some(CudaSession::try_default()?); diff --git a/vortex-ffi/src/lib.rs b/vortex-ffi/src/lib.rs index 309956dc5aa..c3af0659647 100644 --- a/vortex-ffi/src/lib.rs +++ b/vortex-ffi/src/lib.rs @@ -35,9 +35,7 @@ pub use error::try_or; pub use error::vx_error; pub use error::vx_error_free; pub use log::vx_log_level; -pub use scan::VxPartitionArrayIter; pub use scan::vx_partition; -pub use scan::vx_partition_into_array_iter; pub use scan::vx_partition_into_array_stream; pub use session::vx_session; pub use session::vx_session_free; diff --git a/vortex-ffi/src/scan.rs b/vortex-ffi/src/scan.rs index 3a827cef526..83bf47f39eb 100644 --- a/vortex-ffi/src/scan.rs +++ b/vortex-ffi/src/scan.rs @@ -22,7 +22,6 @@ use vortex::array::arrow::ArrowSessionExt; use vortex::array::expr::stats::Precision; use vortex::array::stream::SendableArrayStream; use vortex::buffer::Buffer; -use vortex::dtype::DType; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_ensure; @@ -67,9 +66,6 @@ crate::box_wrapper!( VxPartitionScan, vx_partition); -/// A blocking iterator over arrays produced by a consumed partition. -pub type VxPartitionArrayIter = Box>>; - /// Consume an owned partition pointer and return the partition's Vortex array stream. /// /// # Safety @@ -86,22 +82,6 @@ pub unsafe fn vx_partition_into_array_stream( } } -/// Consume an owned partition pointer and return a runtime-driven blocking array iterator. -/// -/// The returned iterator drives the same FFI runtime used by the other partition scan entry points. -/// -/// # Safety -/// -/// `partition` must be a non-null owned partition handle created by `vortex-ffi`. This function -/// consumes the handle; callers must not use or free it after calling this function. -pub unsafe fn vx_partition_into_array_iter( - partition: *mut vx_partition, -) -> VortexResult<(DType, VxPartitionArrayIter)> { - let array_stream = unsafe { vx_partition_into_array_stream(partition) }?; - let dtype = array_stream.dtype().clone(); - Ok((dtype, Box::new(RUNTIME.block_on_stream(array_stream)))) -} - // We parse Selection from vx_scan_selection[_include], so we don't need // to instantiate VX_SELECTION_* items directly. #[repr(C)] diff --git a/vortex-test/e2e-cuda/src/lib.rs b/vortex-test/e2e-cuda/src/lib.rs index 48b74d218b1..9800c9ffd4f 100644 --- a/vortex-test/e2e-cuda/src/lib.rs +++ b/vortex-test/e2e-cuda/src/lib.rs @@ -219,6 +219,7 @@ fn dictionary_array() -> VortexArrayRef { .into_array() } +/// Build the shared cuDF interop test array used by array and stream exports. fn cudf_test_array() -> Result { let primitive = primitive_array()?; // cuDF supports Arrow decimal device imports through Decimal128. Decimal256 is intentionally @@ -292,6 +293,8 @@ fn cudf_test_array() -> Result { .into_array()) } +/// Export the shared cuDF test array as one Arrow device array. +/// /// # Safety /// `schema_ptr` and `array_ptr` must be valid writable pointers. #[unsafe(no_mangle)] @@ -302,6 +305,7 @@ pub unsafe extern "C" fn export_array( ffi_boundary("export_array", || export_array_inner(schema_ptr, array_ptr)) } +/// Implement `export_array` inside the panic-catching FFI boundary. fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDeviceArray) -> i32 { let mut ctx = match CudaSession::create_execution_ctx(&SESSION) { Ok(ctx) => ctx, @@ -332,6 +336,8 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev } } +/// Export the shared cuDF test array as an Arrow device array stream. +/// /// # Safety /// `stream_ptr` must be a valid writable pointer. #[unsafe(no_mangle)] @@ -341,6 +347,7 @@ pub unsafe extern "C" fn export_device_stream(stream_ptr: &mut ArrowDeviceArrayS }) } +/// Implement `export_device_stream` inside the panic-catching FFI boundary. fn export_device_stream_inner(stream_ptr: &mut ArrowDeviceArrayStream) -> i32 { let array = match cudf_test_array() { Ok(array) => array, From 07926a03cf5128f33c2a29207da515ba5f7dfad9 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 13:54:19 +0000 Subject: [PATCH 14/25] simplify Signed-off-by: Alexander Droste --- vortex-cuda/src/arrow/mod.rs | 192 ++++++++++++++++------------------- vortex-cuda/src/lib.rs | 2 - 2 files changed, 90 insertions(+), 104 deletions(-) diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index c137d2e94cc..06a0e9fefd2 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -275,14 +275,14 @@ fn device_stream_runtime() -> &'static CurrentThreadRuntime { &DEVICE_STREAM_RUNTIME } -#[derive(Clone, Debug, PartialEq)] +#[derive(Debug, PartialEq)] enum ArrowDeviceStreamSchema { Schema(Schema), Field(Field), } impl ArrowDeviceStreamSchema { - /// Interpret an Arrow C schema as the stream schema shape for `dtype`. + /// Convert an Arrow C schema into the stream schema shape for `dtype`. fn from_ffi(schema: &FFI_ArrowSchema, dtype: &DType) -> VortexResult { if matches!(dtype, DType::Struct(..)) { Ok(Self::Schema(Schema::try_from(schema)?)) @@ -291,7 +291,12 @@ impl ArrowDeviceStreamSchema { } } - /// Build the Arrow stream schema for an empty stream with the given dtype. + /// Convert a Vortex dtype into a stream schema when no batch is available. + /// + /// This uses only the logical dtype, so it can differ from a non-empty stream's first-batch + /// schema for encodings the dtype does not capture: a dictionary column reports a plain field + /// here but `DataType::Dictionary` once a concrete batch is seen. This is harmless because an + /// empty stream carries no data. fn from_dtype(dtype: &DType, ctx: &mut CudaExecutionCtx) -> VortexResult { let dtype = arrow_device_export_dtype(dtype); if let DType::Struct(struct_dtype, _) = &dtype { @@ -331,25 +336,34 @@ impl DeviceArrayStreamPrivateData { } /// Store the last stream error and return the Arrow callback error code. + /// + /// Interior NUL bytes are replaced so `get_last_error` is never null while a non-zero status + /// is reported. fn set_error(&mut self, error: impl ToString) -> c_int { - self.last_error = CString::new(error.to_string()).ok(); + let message = error.to_string().replace('\0', " "); + self.last_error = Some(CString::new(message).unwrap_or_default()); ARROW_STREAM_EIO } - /// Initialize and return the stream schema, exporting the first batch if needed. + /// Set the schema from the dtype alone, so an empty stream still has a schema to report. + fn set_empty_schema(&mut self) -> VortexResult<()> { + if self.schema.is_none() { + self.schema = Some(ArrowDeviceStreamSchema::from_dtype( + &self.dtype, + &mut self.ctx, + )?); + } + Ok(()) + } + + /// Return the stream schema, exporting the first batch to derive it if needed. + /// + /// A first batch is held in `pending_array` so the following `get_next` returns it. fn ensure_schema(&mut self) -> VortexResult<&ArrowDeviceStreamSchema> { if self.schema.is_none() { match self.array_iter.next() { - Some(array) => { - let array = self.export_batch(array?)?; - self.pending_array = Some(array); - } - None => { - self.schema = Some(ArrowDeviceStreamSchema::from_dtype( - &self.dtype, - &mut self.ctx, - )?); - } + Some(array) => self.pending_array = Some(self.export_batch(array?)?), + None => self.set_empty_schema()?, } } @@ -364,20 +378,16 @@ impl DeviceArrayStreamPrivateData { return Ok(Some(array)); } - let Some(array) = self.array_iter.next() else { - if self.schema.is_none() { - self.schema = Some(ArrowDeviceStreamSchema::from_dtype( - &self.dtype, - &mut self.ctx, - )?); + match self.array_iter.next() { + Some(array) => self.export_batch(array?).map(Some), + None => { + self.set_empty_schema()?; + Ok(None) } - return Ok(None); - }; - - self.export_batch(array?).map(Some) + } } - /// Export one Vortex stream batch and validate it against the stream schema and device. + /// Export one Vortex batch as a device array, validating it against the stream. fn export_batch(&mut self, array: ArrayRef) -> VortexResult { vortex_ensure!( array.dtype() == &self.dtype, @@ -386,54 +396,55 @@ impl DeviceArrayStreamPrivateData { array.dtype() ); - let exported = device_stream_runtime() - .block_on(array.export_device_array_with_schema(&mut self.ctx))?; let ArrowDeviceArrayWithSchema { - schema: mut ffi_schema, + mut schema, mut array, - } = exported; - let batch_schema = ArrowDeviceStreamSchema::from_ffi(&ffi_schema, &self.dtype); - release_schema(&mut ffi_schema); - let batch_schema = match batch_schema { - Ok(batch_schema) => batch_schema, - Err(error) => { - release_device_array(&mut array); - return Err(error); - } - }; + } = device_stream_runtime() + .block_on(array.export_device_array_with_schema(&mut self.ctx))?; - let validation = (|| -> VortexResult<()> { - if let Some(stream_schema) = &self.schema { + // Release the schema we no longer need, and on any failure the array we will not return. + let checked = self.check_batch(&schema, &array); + release_schema(&mut schema); + if let Err(error) = checked { + release_device_array(&mut array); + return Err(error); + } + Ok(array) + } + + /// Check that a freshly exported batch matches the stream schema and CUDA device. + fn check_batch( + &mut self, + schema: &FFI_ArrowSchema, + array: &ArrowDeviceArray, + ) -> VortexResult<()> { + vortex_ensure!( + array.device_type == ARROW_DEVICE_CUDA, + "stream batch exported on non-CUDA device type {}", + array.device_type + ); + vortex_ensure!( + array.device_id == self.device_id, + "stream batch moved from CUDA device {} to {}", + self.device_id, + array.device_id + ); + + // Commit the schema only after the batch is fully accepted, so a rejected first batch + // never becomes the schema later reported by `get_schema`. + let batch_schema = ArrowDeviceStreamSchema::from_ffi(schema, &self.dtype)?; + match &self.schema { + Some(stream_schema) => { vortex_ensure!( stream_schema == &batch_schema, "stream batch Arrow schema changed from {:?} to {:?}", stream_schema, batch_schema ); - } else { - self.schema = Some(batch_schema); } - - vortex_ensure!( - array.device_type == ARROW_DEVICE_CUDA, - "stream batch exported on non-CUDA device type {}", - array.device_type - ); - vortex_ensure!( - array.device_id == self.device_id, - "stream batch moved from CUDA device {} to {}", - self.device_id, - array.device_id - ); - Ok(()) - })(); - - if let Err(error) = validation { - release_device_array(&mut array); - return Err(error); + None => self.schema = Some(batch_schema), } - - Ok(array) + Ok(()) } } @@ -454,6 +465,9 @@ pub trait DeviceArrayStreamExt { /// context's CUDA device at construction time, and each `get_next` verifies that the produced /// [`ArrowDeviceArray`] is CUDA-resident on that same device. The returned C stream owns the /// Vortex stream and must be released through its embedded `release` callback. + /// + /// Per the Arrow C stream contract, drive the returned stream from a single thread; its + /// callbacks must not be invoked concurrently. fn export_device_array_stream( self, session: &VortexSession, @@ -461,57 +475,31 @@ pub trait DeviceArrayStreamExt { } impl DeviceArrayStreamExt for SendableArrayStream { - /// Export this stream by driving it on the shared Arrow Device stream runtime. + /// Drive this stream on the shared Arrow Device stream runtime and export it. fn export_device_array_stream( self, session: &VortexSession, ) -> VortexResult { let dtype = self.dtype().clone(); - export_device_array_stream_from_iter( - device_stream_runtime().block_on_stream(self), - dtype, - session, - ) - } -} - -/// Export a blocking Vortex array iterator as an [`ArrowDeviceArrayStream`]. -/// -/// The iterator is advanced by the Arrow stream callbacks. Use this helper when the stream must be -/// driven by a specific runtime or executor before crossing the Arrow C Device stream boundary. -/// Each yielded array must have `dtype`; every exported batch is validated to stay on the CUDA -/// device selected by the session's CUDA execution context. -pub fn export_device_array_stream_from_iter( - array_iter: impl Iterator> + 'static, - dtype: DType, - session: &VortexSession, -) -> VortexResult { - let ctx = crate::CudaSession::create_execution_ctx(session)?; - Ok(export_device_array_stream_from_iter_with_ctx( - array_iter, dtype, ctx, - )) -} - -/// Export a blocking Vortex array iterator as an [`ArrowDeviceArrayStream`] using an existing CUDA -/// execution context. -/// -/// Use this helper when the caller has already selected the CUDA execution context that must drive -/// the exported stream. Each yielded array must have `dtype`; every exported batch is validated to -/// stay on the CUDA device selected by `ctx`. -pub fn export_device_array_stream_from_iter_with_ctx( - array_iter: impl Iterator> + 'static, + let ctx = crate::CudaSession::create_execution_ctx(session)?; + let array_iter = Box::new(device_stream_runtime().block_on_stream(self)); + Ok(device_array_stream(array_iter, dtype, ctx)) + } +} + +/// Build the Arrow C Device stream that owns `array_iter` and exports its batches through `ctx`. +fn device_array_stream( + array_iter: ArrayStreamIterator, dtype: DType, ctx: CudaExecutionCtx, ) -> ArrowDeviceArrayStream { - let device_id = ctx.stream().context().ordinal() as i64; - let private_data = Box::new(DeviceArrayStreamPrivateData { - array_iter: Box::new(array_iter), + device_id: ctx.stream().context().ordinal() as i64, + array_iter, ctx, dtype, schema: None, pending_array: None, - device_id, last_error: None, }); @@ -650,7 +638,7 @@ unsafe extern "C" fn device_stream_get_last_error( .map_or(ptr::null(), |error| error.as_ptr()) } -/// Release the stream state and clear callbacks so release is idempotent. +/// Free the stream state and null its callbacks. The null `release` makes a second call a no-op. unsafe extern "C" fn device_stream_release(stream: *mut ArrowDeviceArrayStream) { let Some(stream_ref) = (unsafe { stream.as_mut() }) else { return; diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index cafb8e63c52..a09301299fa 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -27,8 +27,6 @@ pub use arrow::ArrowDeviceArrayWithSchema; pub use arrow::DeviceArrayExt; pub use arrow::DeviceArrayStreamExt; pub use arrow::ExportDeviceArray; -pub use arrow::export_device_array_stream_from_iter; -pub use arrow::export_device_array_stream_from_iter_with_ctx; pub use canonical::CanonicalCudaExt; pub use device_buffer::CudaBufferExt; pub use device_buffer::CudaDeviceBuffer; From 17d85cbd4c401d4dd33124b46e53816de501e128 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 14:59:23 +0000 Subject: [PATCH 15/25] schema Signed-off-by: Alexander Droste --- vortex-cuda/ffi/src/lib.rs | 2 +- vortex-cuda/src/arrow/mod.rs | 23 +++++++---------------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index 70da3fd13fb..a0375ff92a7 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -33,7 +33,7 @@ use vortex_ffi::vx_session_ref; const VX_CUDA_OK: c_int = 0; const VX_CUDA_ERR: c_int = 1; -/// Return a session with CUDA state, adding default CUDA support when needed. +/// Return a session with a [`CudaSession`], adding the default CUDA session when needed. fn session_with_cuda(session: &VortexSession) -> VortexResult { if session.get_opt::().is_some() { return Ok(session.clone()); diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 06a0e9fefd2..5dfe131547a 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -345,17 +345,6 @@ impl DeviceArrayStreamPrivateData { ARROW_STREAM_EIO } - /// Set the schema from the dtype alone, so an empty stream still has a schema to report. - fn set_empty_schema(&mut self) -> VortexResult<()> { - if self.schema.is_none() { - self.schema = Some(ArrowDeviceStreamSchema::from_dtype( - &self.dtype, - &mut self.ctx, - )?); - } - Ok(()) - } - /// Return the stream schema, exporting the first batch to derive it if needed. /// /// A first batch is held in `pending_array` so the following `get_next` returns it. @@ -363,7 +352,12 @@ impl DeviceArrayStreamPrivateData { if self.schema.is_none() { match self.array_iter.next() { Some(array) => self.pending_array = Some(self.export_batch(array?)?), - None => self.set_empty_schema()?, + None => { + self.schema = Some(ArrowDeviceStreamSchema::from_dtype( + &self.dtype, + &mut self.ctx, + )?); + } } } @@ -380,10 +374,7 @@ impl DeviceArrayStreamPrivateData { match self.array_iter.next() { Some(array) => self.export_batch(array?).map(Some), - None => { - self.set_empty_schema()?; - Ok(None) - } + None => Ok(None), } } From b5a6697135fef5dbbe9af4c05ea4bf7ed1fd7539 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 15:05:02 +0000 Subject: [PATCH 16/25] docs Signed-off-by: Alexander Droste --- vortex-cuda/src/arrow/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 5dfe131547a..ace44a390f3 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -404,6 +404,10 @@ impl DeviceArrayStreamPrivateData { } /// Check that a freshly exported batch matches the stream schema and CUDA device. + /// + /// The caller still owns `schema` and `array` and is responsible for releasing them on error. + /// This method only commits `self.schema` after the batch is accepted, so a rejected first batch + /// never becomes the schema later reported by `get_schema`. fn check_batch( &mut self, schema: &FFI_ArrowSchema, From f4f5d1b54bc5753c2bcbfdcdee8699a8b8e1b20f Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 15:08:41 +0000 Subject: [PATCH 17/25] Polish CUDA device stream naming Signed-off-by: Alexander Droste --- vortex-cuda/src/arrow/mod.rs | 100 +++++++++++++++++------------------ 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index ace44a390f3..43df884b4b6 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -291,11 +291,11 @@ impl ArrowDeviceStreamSchema { } } - /// Convert a Vortex dtype into a stream schema when no batch is available. + /// Convert a Vortex dtype into a stream schema when no stream array is available. /// - /// This uses only the logical dtype, so it can differ from a non-empty stream's first-batch + /// This uses only the logical dtype, so it can differ from a non-empty stream's first-array /// schema for encodings the dtype does not capture: a dictionary column reports a plain field - /// here but `DataType::Dictionary` once a concrete batch is seen. This is harmless because an + /// here but `DataType::Dictionary` once a concrete array is seen. This is harmless because an /// empty stream carries no data. fn from_dtype(dtype: &DType, ctx: &mut CudaExecutionCtx) -> VortexResult { let dtype = arrow_device_export_dtype(dtype); @@ -345,13 +345,13 @@ impl DeviceArrayStreamPrivateData { ARROW_STREAM_EIO } - /// Return the stream schema, exporting the first batch to derive it if needed. + /// Return the stream schema, exporting the first stream array to derive it if needed. /// - /// A first batch is held in `pending_array` so the following `get_next` returns it. + /// A first array is held in `pending_array` so the following `get_next` returns it. fn ensure_schema(&mut self) -> VortexResult<&ArrowDeviceStreamSchema> { if self.schema.is_none() { match self.array_iter.next() { - Some(array) => self.pending_array = Some(self.export_batch(array?)?), + Some(array) => self.pending_array = Some(self.export_stream_array(array?)?), None => { self.schema = Some(ArrowDeviceStreamSchema::from_dtype( &self.dtype, @@ -366,85 +366,85 @@ impl DeviceArrayStreamPrivateData { .ok_or_else(|| vortex_err!("ArrowDeviceArrayStream schema was not initialized")) } - /// Export and return the next device batch, or `None` at end of stream. + /// Export and return the next Arrow device array, or `None` at end of stream. fn next_array(&mut self) -> VortexResult> { if let Some(array) = self.pending_array.take() { return Ok(Some(array)); } match self.array_iter.next() { - Some(array) => self.export_batch(array?).map(Some), + Some(array) => self.export_stream_array(array?).map(Some), None => Ok(None), } } - /// Export one Vortex batch as a device array, validating it against the stream. - fn export_batch(&mut self, array: ArrayRef) -> VortexResult { + /// Export one array from the Vortex stream, validating it against the device stream. + fn export_stream_array(&mut self, array: ArrayRef) -> VortexResult { vortex_ensure!( array.dtype() == &self.dtype, - "stream batch dtype changed from {} to {}", + "stream array dtype changed from {} to {}", self.dtype, array.dtype() ); let ArrowDeviceArrayWithSchema { - mut schema, - mut array, + schema: mut ffi_schema, + array: mut device_array, } = device_stream_runtime() .block_on(array.export_device_array_with_schema(&mut self.ctx))?; - // Release the schema we no longer need, and on any failure the array we will not return. - let checked = self.check_batch(&schema, &array); - release_schema(&mut schema); + // Release the schema we no longer need, and on failure release the array we will not return. + let checked = self.check_stream_array(&ffi_schema, &device_array); + release_schema(&mut ffi_schema); if let Err(error) = checked { - release_device_array(&mut array); + release_device_array(&mut device_array); return Err(error); } - Ok(array) + Ok(device_array) } - /// Check that a freshly exported batch matches the stream schema and CUDA device. + /// Check that a freshly exported device array matches the stream schema and CUDA device. /// - /// The caller still owns `schema` and `array` and is responsible for releasing them on error. - /// This method only commits `self.schema` after the batch is accepted, so a rejected first batch - /// never becomes the schema later reported by `get_schema`. - fn check_batch( + /// The caller still owns `ffi_schema` and `device_array` and is responsible for releasing them on + /// error. This method only commits `self.schema` after the array is accepted, so a rejected first + /// stream array never becomes the schema later reported by `get_schema`. + fn check_stream_array( &mut self, - schema: &FFI_ArrowSchema, - array: &ArrowDeviceArray, + ffi_schema: &FFI_ArrowSchema, + device_array: &ArrowDeviceArray, ) -> VortexResult<()> { vortex_ensure!( - array.device_type == ARROW_DEVICE_CUDA, - "stream batch exported on non-CUDA device type {}", - array.device_type + device_array.device_type == ARROW_DEVICE_CUDA, + "stream array exported on non-CUDA device type {}", + device_array.device_type ); vortex_ensure!( - array.device_id == self.device_id, - "stream batch moved from CUDA device {} to {}", + device_array.device_id == self.device_id, + "stream array moved from CUDA device {} to {}", self.device_id, - array.device_id + device_array.device_id ); - // Commit the schema only after the batch is fully accepted, so a rejected first batch - // never becomes the schema later reported by `get_schema`. - let batch_schema = ArrowDeviceStreamSchema::from_ffi(schema, &self.dtype)?; + // Commit the schema only after the array is fully accepted, so a rejected first stream + // array never becomes the schema later reported by `get_schema`. + let exported_schema = ArrowDeviceStreamSchema::from_ffi(ffi_schema, &self.dtype)?; match &self.schema { Some(stream_schema) => { vortex_ensure!( - stream_schema == &batch_schema, - "stream batch Arrow schema changed from {:?} to {:?}", + stream_schema == &exported_schema, + "stream array Arrow schema changed from {:?} to {:?}", stream_schema, - batch_schema + exported_schema ); } - None => self.schema = Some(batch_schema), + None => self.schema = Some(exported_schema), } Ok(()) } } impl Drop for DeviceArrayStreamPrivateData { - /// Release a first batch if `get_schema` exported it and `get_next` never returned it. + /// Release a first stream array if `get_schema` exported it and `get_next` never returned it. fn drop(&mut self) { if let Some(mut array) = self.pending_array.take() { release_device_array(&mut array); @@ -456,7 +456,7 @@ impl Drop for DeviceArrayStreamPrivateData { pub trait DeviceArrayStreamExt { /// Export this stream as an [`ArrowDeviceArrayStream`]. /// - /// Batches are exported through one persistent [`CudaExecutionCtx`]. The stream records that + /// Arrays are exported through one persistent [`CudaExecutionCtx`]. The stream records that /// context's CUDA device at construction time, and each `get_next` verifies that the produced /// [`ArrowDeviceArray`] is CUDA-resident on that same device. The returned C stream owns the /// Vortex stream and must be released through its embedded `release` callback. @@ -482,7 +482,7 @@ impl DeviceArrayStreamExt for SendableArrayStream { } } -/// Build the Arrow C Device stream that owns `array_iter` and exports its batches through `ctx`. +/// Build the Arrow C Device stream that owns `array_iter` and exports its arrays through `ctx`. fn device_array_stream( array_iter: ArrayStreamIterator, dtype: DType, @@ -560,7 +560,7 @@ fn device_stream_callback( } } -/// Write the stream's Arrow schema, initializing it from the first batch if necessary. +/// Write the stream's Arrow schema, initializing it from the first stream array if necessary. unsafe extern "C" fn device_stream_get_schema( stream: *mut ArrowDeviceArrayStream, out: *mut ArrowSchema, @@ -587,7 +587,7 @@ unsafe extern "C" fn device_stream_get_schema( ) } -/// Write the next exported Arrow device batch, or a released array at end of stream. +/// Write the next exported Arrow device array, or a released array at end of stream. unsafe extern "C" fn device_stream_get_next( stream: *mut ArrowDeviceArrayStream, out: *mut ArrowDeviceArray, @@ -935,7 +935,7 @@ mod tests { } } - /// Verify schema, batch, EOS, and idempotent release stream behavior. + /// Verify schema, array, EOS, and idempotent release stream behavior. #[cuda_test] fn test_export_device_array_stream_schema_next_eos_release() -> VortexResult<()> { let session = VortexSession::default().with_some(CudaSession::try_default()?); @@ -961,12 +961,12 @@ mod tests { let get_next = device_stream .get_next .ok_or_else(|| vortex_err!("stream missing get_next callback"))?; - let mut first_batch = empty_device_array(); - let status = unsafe { get_next(&raw mut device_stream, &raw mut first_batch) }; + let mut first_array = empty_device_array(); + let status = unsafe { get_next(&raw mut device_stream, &raw mut first_array) }; assert_eq!(status, 0); - assert_eq!(first_batch.device_type, ARROW_DEVICE_CUDA); - assert_eq!(first_batch.array.length, 5); - assert!(first_batch.array.release.is_some()); + assert_eq!(first_array.device_type, ARROW_DEVICE_CUDA); + assert_eq!(first_array.array.length, 5); + assert!(first_array.array.release.is_some()); let mut eos = empty_device_array(); let status = unsafe { get_next(&raw mut device_stream, &raw mut eos) }; @@ -975,7 +975,7 @@ mod tests { assert!(eos.array.release.is_none()); unsafe { - release_device_array(&mut first_batch); + release_device_array(&mut first_array); release_schema(&mut schema); let release = device_stream .release From e70ffe71218ce1bde1a96916f683f8719f6c5fbe Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 15:30:54 +0000 Subject: [PATCH 18/25] Polish CUDA device stream FFI handling Signed-off-by: Alexander Droste --- vortex-cuda/ffi/src/lib.rs | 5 +++-- vortex-cuda/src/arrow/mod.rs | 32 ++++++++++++++++++++------------ vortex-ffi/src/scan.rs | 2 +- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index a0375ff92a7..a4b77f358bb 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -33,7 +33,7 @@ use vortex_ffi::vx_session_ref; const VX_CUDA_OK: c_int = 0; const VX_CUDA_ERR: c_int = 1; -/// Return a session with a [`CudaSession`], adding the default CUDA session when needed. +/// Return a session with a [`CudaSession`], creating one with [`CudaSession::try_default`] if missing. fn session_with_cuda(session: &VortexSession) -> VortexResult { if session.get_opt::().is_some() { return Ok(session.clone()); @@ -132,9 +132,10 @@ pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device_stream( ) -> c_int { try_or(error_out, VX_CUDA_ERR, || { vortex_ensure!(!partition.is_null(), "null vx_partition"); - vortex_ensure!(!out_stream.is_null(), "null ArrowDeviceArrayStream output"); let array_stream = unsafe { vx_partition_into_array_stream(partition) }?; + vortex_ensure!(!out_stream.is_null(), "null ArrowDeviceArrayStream output"); + let session = session_with_cuda(unsafe { vx_session_ref(session) }?)?; let device_stream = array_stream.export_device_array_stream(&session)?; diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 43df884b4b6..b9766d59267 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -335,14 +335,24 @@ impl DeviceArrayStreamPrivateData { self.last_error = None; } - /// Store the last stream error and return the Arrow callback error code. + /// Store the last stream error and return the requested Arrow callback error code. /// /// Interior NUL bytes are replaced so `get_last_error` is never null while a non-zero status /// is reported. - fn set_error(&mut self, error: impl ToString) -> c_int { + fn set_error_with_code(&mut self, error: impl ToString, code: c_int) -> c_int { let message = error.to_string().replace('\0', " "); self.last_error = Some(CString::new(message).unwrap_or_default()); - ARROW_STREAM_EIO + code + } + + /// Store the last stream error and return the Arrow producer/export failure code. + fn set_error(&mut self, error: impl ToString) -> c_int { + self.set_error_with_code(error, ARROW_STREAM_EIO) + } + + /// Store the last stream error and return the Arrow invalid-argument code. + fn set_invalid_error(&mut self, error: impl ToString) -> c_int { + self.set_error_with_code(error, ARROW_STREAM_EINVAL) } /// Return the stream schema, exporting the first stream array to derive it if needed. @@ -571,7 +581,7 @@ unsafe extern "C" fn device_stream_get_schema( state.clear_error(); if out.is_null() { - return state.set_error("null ArrowSchema output"); + return state.set_invalid_error("null ArrowSchema output"); } fn body(state: &mut DeviceArrayStreamPrivateData, out: *mut ArrowSchema) -> VortexResult<()> { @@ -598,7 +608,7 @@ unsafe extern "C" fn device_stream_get_next( state.clear_error(); if out.is_null() { - return state.set_error("null ArrowDeviceArray output"); + return state.set_invalid_error("null ArrowDeviceArray output"); } fn body( @@ -647,15 +657,13 @@ unsafe extern "C" fn device_stream_release(stream: *mut ArrowDeviceArrayStream) stream_ref.get_last_error = None; stream_ref.release = None; - if !stream_ref.private_data.is_null() { - unsafe { + let private_data = std::mem::replace(&mut stream_ref.private_data, ptr::null_mut()); + if !private_data.is_null() { + drop(catch_unwind(AssertUnwindSafe(|| unsafe { drop(Box::from_raw( - stream_ref - .private_data - .cast::(), + private_data.cast::(), )); - } - stream_ref.private_data = ptr::null_mut(); + }))); } } diff --git a/vortex-ffi/src/scan.rs b/vortex-ffi/src/scan.rs index 83bf47f39eb..4b8191bd4bb 100644 --- a/vortex-ffi/src/scan.rs +++ b/vortex-ffi/src/scan.rs @@ -66,7 +66,7 @@ crate::box_wrapper!( VxPartitionScan, vx_partition); -/// Consume an owned partition pointer and return the partition's Vortex array stream. +/// Consume an owned partition pointer for layered FFI crates and return its Vortex array stream. /// /// # Safety /// From 9332a30a9567eb77c24feb50d18edc71b7cff710 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 15:45:27 +0000 Subject: [PATCH 19/25] Catch panics in Vortex FFI error helpers Signed-off-by: Alexander Droste --- vortex-ffi/src/error.rs | 48 +++++++++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/vortex-ffi/src/error.rs b/vortex-ffi/src/error.rs index 9459d77c6a4..3ee02c99e21 100644 --- a/vortex-ffi/src/error.rs +++ b/vortex-ffi/src/error.rs @@ -1,6 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::any::Any; +use std::panic::AssertUnwindSafe; +use std::panic::catch_unwind; use std::ptr; use std::sync::Arc; @@ -43,20 +46,35 @@ fn clear_error(error_out: *mut *mut vx_error) { unsafe { error_out.write(ptr::null_mut()) }; } +/// Convert a panic payload into the message stored in an FFI error. +fn panic_message(payload: &(dyn Any + Send)) -> String { + if let Some(message) = payload.downcast_ref::<&str>() { + format!("panic in Vortex FFI function: {message}") + } else if let Some(message) = payload.downcast_ref::() { + format!("panic in Vortex FFI function: {message}") + } else { + "panic in Vortex FFI function".to_string() + } +} + #[inline] pub fn try_or_default( error_out: *mut *mut vx_error, function: impl FnOnce() -> VortexResult, ) -> T { - match function() { - Ok(value) => { + match catch_unwind(AssertUnwindSafe(function)) { + Ok(Ok(value)) => { clear_error(error_out); value } - Err(err) => { + Ok(Err(err)) => { write_error(error_out, &err.to_string()); T::default() } + Err(payload) => { + write_error(error_out, &panic_message(payload.as_ref())); + T::default() + } } } @@ -69,15 +87,19 @@ pub fn try_or( error_value: T, function: impl FnOnce() -> VortexResult, ) -> T { - match function() { - Ok(value) => { + match catch_unwind(AssertUnwindSafe(function)) { + Ok(Ok(value)) => { clear_error(error_out); value } - Err(err) => { + Ok(Err(err)) => { write_error(error_out, &err.to_string()); error_value } + Err(payload) => { + write_error(error_out, &panic_message(payload.as_ref())); + error_value + } } } @@ -126,4 +148,18 @@ mod tests { assert_eq!(try_or(&raw mut error, -1, || Ok(42)), 42); assert!(error.is_null()); } + + #[test] + fn test_try_or_catches_panic() { + let mut error: *mut vx_error = ptr::null_mut(); + + assert_eq!(try_or(&raw mut error, -1, || panic!("boom")), -1); + assert!(!error.is_null()); + let message = unsafe { vx_error_get_message(error) }; + assert_eq!( + vx_string::as_ref(message).as_ref(), + "panic in Vortex FFI function: boom" + ); + unsafe { vx_error_free(error) }; + } } From a4b7e5ff249db38b155dbd347b06d01da9ae5925 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 16:38:21 +0000 Subject: [PATCH 20/25] Drive Arrow device stream on the session runtime The Arrow C device array stream export drove the Vortex stream on a private CurrentThreadRuntime, but a partition scan spawns its decode work onto the session's runtime (vortex-ffi's RUNTIME). Nothing ever drove that runtime during streaming, so the first get_next on a real partition deadlocked waiting on tasks that never ran. The existing tests only exercise an inert in-memory stream, so they never hit it. Thread the session's runtime through export_device_array_stream and drive the stream and per-array exports on it, removing the private runtime and worker pool. Expose vortex_ffi::runtime() so layered FFI crates can pass the same runtime the partition's scan spawns onto. Signed-off-by: Alexander Droste --- vortex-cuda/ffi/src/lib.rs | 4 +++- vortex-cuda/src/arrow/mod.rs | 42 ++++++++++++++++----------------- vortex-ffi/src/lib.rs | 10 ++++++++ vortex-test/e2e-cuda/src/lib.rs | 5 +++- 4 files changed, 38 insertions(+), 23 deletions(-) diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index a4b77f358bb..fff0624576c 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -20,6 +20,7 @@ use vortex_cuda::arrow::ArrowDeviceArray; use vortex_cuda::arrow::ArrowDeviceArrayStream; use vortex_cuda::arrow::DeviceArrayExt; use vortex_cuda::arrow::DeviceArrayStreamExt; +use vortex_ffi::runtime; use vortex_ffi::try_or; use vortex_ffi::vx_array; use vortex_ffi::vx_array_ref; @@ -137,7 +138,8 @@ pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device_stream( vortex_ensure!(!out_stream.is_null(), "null ArrowDeviceArrayStream output"); let session = session_with_cuda(unsafe { vx_session_ref(session) }?)?; - let device_stream = array_stream.export_device_array_stream(&session)?; + // Drive the stream on the same runtime the partition's scan spawned its work onto. + let device_stream = array_stream.export_device_array_stream(&session, runtime())?; unsafe { ptr::write(out_stream, device_stream) }; Ok(VX_CUDA_OK) diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index b9766d59267..4d690759978 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -20,7 +20,6 @@ use std::panic::AssertUnwindSafe; use std::panic::catch_unwind; use std::ptr; use std::sync::Arc; -use std::sync::LazyLock; use arrow_schema::DataType; use arrow_schema::Field; @@ -55,7 +54,6 @@ use vortex::error::vortex_ensure; use vortex::error::vortex_err; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::CurrentThreadRuntime; -use vortex::io::runtime::current::CurrentThreadWorkerPool; use vortex::session::VortexSession; use crate::CudaBufferExt; @@ -261,20 +259,6 @@ const ARROW_STREAM_EIO: c_int = 5; // POSIX EINVAL for invalid Arrow stream callback arguments or released streams. const ARROW_STREAM_EINVAL: c_int = 22; -static DEVICE_STREAM_RUNTIME: LazyLock = - LazyLock::new(CurrentThreadRuntime::new); -static DEVICE_STREAM_WORKER_POOL: LazyLock = LazyLock::new(|| { - let pool = DEVICE_STREAM_RUNTIME.new_pool(); - pool.set_workers_to_available_parallelism(); - pool -}); - -/// Return the shared runtime used to drive Vortex streams for Arrow Device export. -fn device_stream_runtime() -> &'static CurrentThreadRuntime { - LazyLock::force(&DEVICE_STREAM_WORKER_POOL); - &DEVICE_STREAM_RUNTIME -} - #[derive(Debug, PartialEq)] enum ArrowDeviceStreamSchema { Schema(Schema), @@ -322,6 +306,10 @@ type ArrayStreamIterator = Box>>; struct DeviceArrayStreamPrivateData { array_iter: ArrayStreamIterator, ctx: CudaExecutionCtx, + /// The runtime that drives `array_iter` and per-array exports. It must be the runtime whose + /// executor the underlying Vortex stream spawns its scan tasks onto, otherwise those tasks are + /// never driven and the callbacks deadlock. + runtime: CurrentThreadRuntime, dtype: DType, schema: Option, pending_array: Option, @@ -400,7 +388,8 @@ impl DeviceArrayStreamPrivateData { let ArrowDeviceArrayWithSchema { schema: mut ffi_schema, array: mut device_array, - } = device_stream_runtime() + } = self + .runtime .block_on(array.export_device_array_with_schema(&mut self.ctx))?; // Release the schema we no longer need, and on failure release the array we will not return. @@ -473,22 +462,29 @@ pub trait DeviceArrayStreamExt { /// /// Per the Arrow C stream contract, drive the returned stream from a single thread; its /// callbacks must not be invoked concurrently. + /// + /// `runtime` drives the underlying Vortex stream and each per-array export. It must be the + /// runtime whose executor the stream spawns its scan tasks onto: a Vortex partition scan spawns + /// work onto the session's runtime, so passing a different runtime leaves that work undriven and + /// the callbacks deadlock. Callers that hold the session's blocking runtime should pass it here. fn export_device_array_stream( self, session: &VortexSession, + runtime: &CurrentThreadRuntime, ) -> VortexResult; } impl DeviceArrayStreamExt for SendableArrayStream { - /// Drive this stream on the shared Arrow Device stream runtime and export it. + /// Drive this stream on `runtime` and export it. fn export_device_array_stream( self, session: &VortexSession, + runtime: &CurrentThreadRuntime, ) -> VortexResult { let dtype = self.dtype().clone(); let ctx = crate::CudaSession::create_execution_ctx(session)?; - let array_iter = Box::new(device_stream_runtime().block_on_stream(self)); - Ok(device_array_stream(array_iter, dtype, ctx)) + let array_iter = Box::new(runtime.block_on_stream(self)); + Ok(device_array_stream(array_iter, dtype, ctx, runtime.clone())) } } @@ -497,11 +493,13 @@ fn device_array_stream( array_iter: ArrayStreamIterator, dtype: DType, ctx: CudaExecutionCtx, + runtime: CurrentThreadRuntime, ) -> ArrowDeviceArrayStream { let private_data = Box::new(DeviceArrayStreamPrivateData { device_id: ctx.stream().context().ordinal() as i64, array_iter, ctx, + runtime, dtype, schema: None, pending_array: None, @@ -908,6 +906,7 @@ mod tests { use vortex::array::stream::ArrayStreamExt; use vortex::error::VortexResult; use vortex::error::vortex_err; + use vortex::io::runtime::current::CurrentThreadRuntime; use vortex::session::VortexSession; use vortex_cuda_macros::test as cuda_test; @@ -946,10 +945,11 @@ mod tests { /// Verify schema, array, EOS, and idempotent release stream behavior. #[cuda_test] fn test_export_device_array_stream_schema_next_eos_release() -> VortexResult<()> { + let runtime = CurrentThreadRuntime::new(); let session = VortexSession::default().with_some(CudaSession::try_default()?); let array = PrimitiveArray::from_iter(0u32..5).into_array(); let stream = array.to_array_stream().boxed(); - let mut device_stream = stream.export_device_array_stream(&session)?; + let mut device_stream = stream.export_device_array_stream(&session, &runtime)?; assert_eq!(device_stream.device_type, ARROW_DEVICE_CUDA); let mut schema = FFI_ArrowSchema::empty(); diff --git a/vortex-ffi/src/lib.rs b/vortex-ffi/src/lib.rs index c3af0659647..a8ac3de5cb5 100644 --- a/vortex-ffi/src/lib.rs +++ b/vortex-ffi/src/lib.rs @@ -55,6 +55,16 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; // TODO(ngates): also create a CurrentThreadPool to manage background worker threads. static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); +/// Return the shared FFI runtime for layered FFI crates that drive Vortex streams produced through +/// `vortex-ffi`. +/// +/// Streams from `vortex-ffi` partitions spawn their scan work onto this runtime's executor, so a +/// consumer crate (for example `vortex-cuda`'s Arrow device stream export) must drive them on this +/// same runtime rather than a private one, otherwise the spawned work is never driven. +pub fn runtime() -> &'static CurrentThreadRuntime { + &RUNTIME +} + /// SAFETY: name must be a non-NULL pointer pub(crate) unsafe fn to_field_name(name: *const c_char) -> VortexResult { let name = unsafe { CStr::from_ptr(name) } diff --git a/vortex-test/e2e-cuda/src/lib.rs b/vortex-test/e2e-cuda/src/lib.rs index 9800c9ffd4f..493d7c7d23c 100644 --- a/vortex-test/e2e-cuda/src/lib.rs +++ b/vortex-test/e2e-cuda/src/lib.rs @@ -56,6 +56,7 @@ use vortex::dtype::FieldNames; use vortex::dtype::NativePType; use vortex::dtype::Nullability; use vortex::extension::datetime::TimeUnit; +use vortex::io::runtime::current::CurrentThreadRuntime; use vortex::io::session::RuntimeSession; use vortex::layout::session::LayoutSession; use vortex::session::VortexSession; @@ -357,10 +358,12 @@ fn export_device_stream_inner(stream_ptr: &mut ArrowDeviceArrayStream) -> i32 { } }; + // The in-memory stream is inert, so any current-thread runtime drives it correctly. + let runtime = CurrentThreadRuntime::new(); match array .to_array_stream() .boxed() - .export_device_array_stream(&SESSION) + .export_device_array_stream(&SESSION, &runtime) { Ok(stream) => { *stream_ptr = stream; From ad8b44859c9f8c9a6d8ccc6b56842bdf4a085607 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 16:39:36 +0000 Subject: [PATCH 21/25] Document Arrow device stream schema stability requirement The device stream derives its schema from the first array and rejects any later array whose Arrow schema differs, which is required by the Arrow C stream contract but means a stream whose chunks vary their encoding (a dictionary-encoded chunk among plain chunks) fails mid-stream. Document this on the trait, note that an empty stream reports a dtype-derived schema that can differ from a non-empty run, and sharpen the mismatch error to name the cause. Signed-off-by: Alexander Droste --- vortex-cuda/src/arrow/mod.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 4d690759978..87d0cf39d3e 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -431,7 +431,9 @@ impl DeviceArrayStreamPrivateData { Some(stream_schema) => { vortex_ensure!( stream_schema == &exported_schema, - "stream array Arrow schema changed from {:?} to {:?}", + "stream array Arrow schema changed from {:?} to {:?}; an Arrow C device stream \ + requires every array to share one schema, so chunks must not vary their \ + encoding (for example a dictionary-encoded chunk among plain chunks)", stream_schema, exported_schema ); @@ -460,6 +462,14 @@ pub trait DeviceArrayStreamExt { /// [`ArrowDeviceArray`] is CUDA-resident on that same device. The returned C stream owns the /// Vortex stream and must be released through its embedded `release` callback. /// + /// The Arrow C stream contract requires every produced array to share the schema reported by + /// `get_schema`. That schema is derived from the first array, so all arrays must export to the + /// same Arrow type: a stream whose chunks vary their encoding (for example a dictionary-encoded + /// chunk among plain chunks) is rejected mid-stream rather than silently producing a + /// schema-mismatched array. An empty stream has no first array, so its schema is derived from + /// the logical dtype and may report a plain field where a non-empty run would report + /// `DataType::Dictionary`. + /// /// Per the Arrow C stream contract, drive the returned stream from a single thread; its /// callbacks must not be invoked concurrently. /// From 1cb2daad04902a1110aa810ea02b8f4d41ec7410 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 16:44:33 +0000 Subject: [PATCH 22/25] Deduplicate Arrow device array stream helpers Add a shared ArrowDeviceArray::empty() constructor and build the end-of-stream marker from it, replacing the hand-rolled struct literal. The stream tests now call the module-level release_schema/release_device_array helpers instead of redefining byte-for-byte copies, and drop the duplicate empty_device_array placeholder in favor of ArrowDeviceArray::empty(). Signed-off-by: Alexander Droste --- vortex-cuda/src/arrow/mod.rs | 50 ++++++++++++++---------------------- 1 file changed, 19 insertions(+), 31 deletions(-) diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 87d0cf39d3e..b2cbe145fc8 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -105,6 +105,20 @@ impl ArrowArray { } } +impl ArrowDeviceArray { + /// A zeroed device array: an empty Arrow array with no device. Used as a callback output + /// placeholder and as the basis for the end-of-stream marker. + fn empty() -> Self { + Self { + array: ArrowArray::empty(), + device_id: 0, + device_type: 0, + sync_event: ptr::null_mut(), + reserved: Default::default(), + } + } +} + unsafe impl Send for ArrowArray {} unsafe impl Sync for ArrowArray {} @@ -542,11 +556,9 @@ unsafe fn device_stream_private_data<'a>( /// Create the Arrow end-of-stream marker for the stream's CUDA device. fn released_device_array(device_id: i64) -> ArrowDeviceArray { ArrowDeviceArray { - array: ArrowArray::empty(), device_id, device_type: ARROW_DEVICE_CUDA, - sync_event: ptr::null_mut(), - reserved: Default::default(), + ..ArrowDeviceArray::empty() } } @@ -922,35 +934,11 @@ mod tests { use crate::CudaSession; use crate::arrow::ARROW_DEVICE_CUDA; - use crate::arrow::ArrowArray; use crate::arrow::ArrowDeviceArray; use crate::arrow::ArrowSchema; use crate::arrow::DeviceArrayStreamExt; - - /// Release an Arrow C schema in stream tests if it is live. - unsafe fn release_schema(schema: &mut FFI_ArrowSchema) { - if let Some(release) = schema.release { - unsafe { release(schema) }; - } - } - - /// Release an Arrow device array in stream tests if it is live. - unsafe fn release_device_array(array: &mut ArrowDeviceArray) { - if let Some(release) = array.array.release { - unsafe { release(&raw mut array.array) }; - } - } - - /// Create a zeroed placeholder Arrow device array for callback outputs. - fn empty_device_array() -> ArrowDeviceArray { - ArrowDeviceArray { - array: ArrowArray::empty(), - device_id: 0, - device_type: 0, - sync_event: std::ptr::null_mut(), - reserved: [0; 3], - } - } + use crate::arrow::release_device_array; + use crate::arrow::release_schema; /// Verify schema, array, EOS, and idempotent release stream behavior. #[cuda_test] @@ -979,14 +967,14 @@ mod tests { let get_next = device_stream .get_next .ok_or_else(|| vortex_err!("stream missing get_next callback"))?; - let mut first_array = empty_device_array(); + let mut first_array = ArrowDeviceArray::empty(); let status = unsafe { get_next(&raw mut device_stream, &raw mut first_array) }; assert_eq!(status, 0); assert_eq!(first_array.device_type, ARROW_DEVICE_CUDA); assert_eq!(first_array.array.length, 5); assert!(first_array.array.release.is_some()); - let mut eos = empty_device_array(); + let mut eos = ArrowDeviceArray::empty(); let status = unsafe { get_next(&raw mut device_stream, &raw mut eos) }; assert_eq!(status, 0); assert_eq!(eos.device_type, ARROW_DEVICE_CUDA); From b89a5c928563e2036a96e0346279c426bec396b0 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 18 Jun 2026 16:46:29 +0000 Subject: [PATCH 23/25] Wrap Arrow device stream comments to 100 columns Several doc and line comments added for the Arrow device array stream exceeded the 100-column limit. Wrap them; no behavior change. Signed-off-by: Alexander Droste --- vortex-cuda/ffi/src/lib.rs | 3 ++- vortex-cuda/src/arrow/mod.rs | 14 ++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index fff0624576c..8eb35d29678 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -34,7 +34,8 @@ use vortex_ffi::vx_session_ref; const VX_CUDA_OK: c_int = 0; const VX_CUDA_ERR: c_int = 1; -/// Return a session with a [`CudaSession`], creating one with [`CudaSession::try_default`] if missing. +/// Return a session with a [`CudaSession`], creating one with [`CudaSession::try_default`] if +/// missing. fn session_with_cuda(session: &VortexSession) -> VortexResult { if session.get_opt::().is_some() { return Ok(session.clone()); diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index b2cbe145fc8..7ffeb90b1de 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -406,7 +406,8 @@ impl DeviceArrayStreamPrivateData { .runtime .block_on(array.export_device_array_with_schema(&mut self.ctx))?; - // Release the schema we no longer need, and on failure release the array we will not return. + // Release the schema we no longer need, and on failure release the array we will not + // return. let checked = self.check_stream_array(&ffi_schema, &device_array); release_schema(&mut ffi_schema); if let Err(error) = checked { @@ -418,9 +419,9 @@ impl DeviceArrayStreamPrivateData { /// Check that a freshly exported device array matches the stream schema and CUDA device. /// - /// The caller still owns `ffi_schema` and `device_array` and is responsible for releasing them on - /// error. This method only commits `self.schema` after the array is accepted, so a rejected first - /// stream array never becomes the schema later reported by `get_schema`. + /// The caller still owns `ffi_schema` and `device_array` and is responsible for releasing + /// them on error. This method only commits `self.schema` after the array is accepted, so a + /// rejected first stream array never becomes the schema later reported by `get_schema`. fn check_stream_array( &mut self, ffi_schema: &FFI_ArrowSchema, @@ -489,8 +490,9 @@ pub trait DeviceArrayStreamExt { /// /// `runtime` drives the underlying Vortex stream and each per-array export. It must be the /// runtime whose executor the stream spawns its scan tasks onto: a Vortex partition scan spawns - /// work onto the session's runtime, so passing a different runtime leaves that work undriven and - /// the callbacks deadlock. Callers that hold the session's blocking runtime should pass it here. + /// work onto the session's runtime, so passing a different runtime leaves that work undriven + /// and the callbacks deadlock. Callers that hold the session's blocking runtime should pass + /// it here. fn export_device_array_stream( self, session: &VortexSession, From cfff6cd97c61cac8ff527184355dc018fca60ebf Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Fri, 19 Jun 2026 05:11:31 +0000 Subject: [PATCH 24/25] cleanup Signed-off-by: Alexander Droste --- vortex-cuda/ffi/src/lib.rs | 6 +- vortex-cuda/src/arrow/mod.rs | 231 +++++++++++++++++++++++------------ vortex-ffi/src/lib.rs | 2 +- 3 files changed, 161 insertions(+), 78 deletions(-) diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index 8eb35d29678..1a23e7e1d4a 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -34,8 +34,10 @@ use vortex_ffi::vx_session_ref; const VX_CUDA_OK: c_int = 0; const VX_CUDA_ERR: c_int = 1; -/// Return a session with a [`CudaSession`], creating one with [`CudaSession::try_default`] if -/// missing. +/// Return a Vortex session with a [`CudaSession`] session variable. +/// +/// If `session` already has CUDA support, this returns a clone of it. Otherwise it +/// returns a new session cloned from `session` with a default [`CudaSession`] attached. fn session_with_cuda(session: &VortexSession) -> VortexResult { if session.get_opt::().is_some() { return Ok(session.clone()); diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 7ffeb90b1de..036c5e36363 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -106,8 +106,9 @@ impl ArrowArray { } impl ArrowDeviceArray { - /// A zeroed device array: an empty Arrow array with no device. Used as a callback output - /// placeholder and as the basis for the end-of-stream marker. + /// A zeroed device array: an empty Arrow array with no device. Used as a + /// callback output placeholder and as the basis for the end-of-stream + /// marker. fn empty() -> Self { Self { array: ArrowArray::empty(), @@ -268,9 +269,10 @@ impl DeviceArrayExt for ArrayRef { } } -// POSIX EIO for Arrow stream producer/export failures. +/// POSIX EIO for Arrow stream producer/export failures. const ARROW_STREAM_EIO: c_int = 5; -// POSIX EINVAL for invalid Arrow stream callback arguments or released streams. + +/// POSIX EINVAL for invalid Arrow stream callback arguments or released streams. const ARROW_STREAM_EINVAL: c_int = 22; #[derive(Debug, PartialEq)] @@ -293,8 +295,7 @@ impl ArrowDeviceStreamSchema { /// /// This uses only the logical dtype, so it can differ from a non-empty stream's first-array /// schema for encodings the dtype does not capture: a dictionary column reports a plain field - /// here but `DataType::Dictionary` once a concrete array is seen. This is harmless because an - /// empty stream carries no data. + /// here but `DataType::Dictionary` once a concrete array is seen. fn from_dtype(dtype: &DType, ctx: &mut CudaExecutionCtx) -> VortexResult { let dtype = arrow_device_export_dtype(dtype); if let DType::Struct(struct_dtype, _) = &dtype { @@ -320,9 +321,9 @@ type ArrayStreamIterator = Box>>; struct DeviceArrayStreamPrivateData { array_iter: ArrayStreamIterator, ctx: CudaExecutionCtx, - /// The runtime that drives `array_iter` and per-array exports. It must be the runtime whose - /// executor the underlying Vortex stream spawns its scan tasks onto, otherwise those tasks are - /// never driven and the callbacks deadlock. + /// Runtime used by Arrow stream callbacks to pull from `array_iter` and block on per-array + /// CUDA exports. It must match the runtime that owns the underlying Vortex scan tasks so those + /// tasks are polled while callbacks are producing arrays. runtime: CurrentThreadRuntime, dtype: DType, schema: Option, @@ -341,26 +342,16 @@ impl DeviceArrayStreamPrivateData { /// /// Interior NUL bytes are replaced so `get_last_error` is never null while a non-zero status /// is reported. - fn set_error_with_code(&mut self, error: impl ToString, code: c_int) -> c_int { + fn set_error(&mut self, error: impl ToString, code: c_int) -> c_int { let message = error.to_string().replace('\0', " "); self.last_error = Some(CString::new(message).unwrap_or_default()); code } - /// Store the last stream error and return the Arrow producer/export failure code. - fn set_error(&mut self, error: impl ToString) -> c_int { - self.set_error_with_code(error, ARROW_STREAM_EIO) - } - - /// Store the last stream error and return the Arrow invalid-argument code. - fn set_invalid_error(&mut self, error: impl ToString) -> c_int { - self.set_error_with_code(error, ARROW_STREAM_EINVAL) - } - /// Return the stream schema, exporting the first stream array to derive it if needed. /// /// A first array is held in `pending_array` so the following `get_next` returns it. - fn ensure_schema(&mut self) -> VortexResult<&ArrowDeviceStreamSchema> { + fn get_or_init_schema(&mut self) -> VortexResult<&ArrowDeviceStreamSchema> { if self.schema.is_none() { match self.array_iter.next() { Some(array) => self.pending_array = Some(self.export_stream_array(array?)?), @@ -410,23 +401,25 @@ impl DeviceArrayStreamPrivateData { // return. let checked = self.check_stream_array(&ffi_schema, &device_array); release_schema(&mut ffi_schema); - if let Err(error) = checked { - release_device_array(&mut device_array); - return Err(error); + let exported_schema = match checked { + Ok(exported_schema) => exported_schema, + Err(error) => { + release_device_array(&mut device_array); + return Err(error); + } + }; + if self.schema.is_none() { + self.schema = Some(exported_schema); } Ok(device_array) } /// Check that a freshly exported device array matches the stream schema and CUDA device. - /// - /// The caller still owns `ffi_schema` and `device_array` and is responsible for releasing - /// them on error. This method only commits `self.schema` after the array is accepted, so a - /// rejected first stream array never becomes the schema later reported by `get_schema`. fn check_stream_array( - &mut self, + &self, ffi_schema: &FFI_ArrowSchema, device_array: &ArrowDeviceArray, - ) -> VortexResult<()> { + ) -> VortexResult { vortex_ensure!( device_array.device_type == ARROW_DEVICE_CUDA, "stream array exported on non-CUDA device type {}", @@ -439,23 +432,18 @@ impl DeviceArrayStreamPrivateData { device_array.device_id ); - // Commit the schema only after the array is fully accepted, so a rejected first stream - // array never becomes the schema later reported by `get_schema`. let exported_schema = ArrowDeviceStreamSchema::from_ffi(ffi_schema, &self.dtype)?; - match &self.schema { - Some(stream_schema) => { - vortex_ensure!( - stream_schema == &exported_schema, - "stream array Arrow schema changed from {:?} to {:?}; an Arrow C device stream \ - requires every array to share one schema, so chunks must not vary their \ - encoding (for example a dictionary-encoded chunk among plain chunks)", - stream_schema, - exported_schema - ); - } - None => self.schema = Some(exported_schema), + if let Some(stream_schema) = &self.schema { + vortex_ensure!( + stream_schema == &exported_schema, + "stream array Arrow schema changed from {:?} to {:?}; an Arrow C device stream \ + requires every array to share one schema, so chunks must not vary their \ + encoding (for example a dictionary-encoded chunk among plain chunks)", + stream_schema, + exported_schema + ); } - Ok(()) + Ok(exported_schema) } } @@ -468,31 +456,21 @@ impl Drop for DeviceArrayStreamPrivateData { } } -/// Extension trait for exporting a Vortex array stream as an Arrow C Device stream. +/// Extension trait for exporting a Vortex array stream as an Arrow Device stream. pub trait DeviceArrayStreamExt { /// Export this stream as an [`ArrowDeviceArrayStream`]. /// - /// Arrays are exported through one persistent [`CudaExecutionCtx`]. The stream records that - /// context's CUDA device at construction time, and each `get_next` verifies that the produced - /// [`ArrowDeviceArray`] is CUDA-resident on that same device. The returned C stream owns the - /// Vortex stream and must be released through its embedded `release` callback. + /// Arrays are exported by reusing one [`CudaExecutionCtx`], and every produced + /// [`ArrowDeviceArray`] must remain on the CUDA device captured at stream construction. The + /// returned [`ArrowDeviceArrayStream`] owns the Vortex stream and must be released through its + /// embedded `release` callback. /// - /// The Arrow C stream contract requires every produced array to share the schema reported by - /// `get_schema`. That schema is derived from the first array, so all arrays must export to the - /// same Arrow type: a stream whose chunks vary their encoding (for example a dictionary-encoded - /// chunk among plain chunks) is rejected mid-stream rather than silently producing a - /// schema-mismatched array. An empty stream has no first array, so its schema is derived from - /// the logical dtype and may report a plain field where a non-empty run would report - /// `DataType::Dictionary`. + /// The Arrow Device stream contract requires all arrays to share the schema reported by + /// `get_schema`. The schema is derived from the first array, or from the logical dtype + /// for an empty stream. Chunks that export to different Arrow types are rejected mid-stream. /// - /// Per the Arrow C stream contract, drive the returned stream from a single thread; its - /// callbacks must not be invoked concurrently. - /// - /// `runtime` drives the underlying Vortex stream and each per-array export. It must be the - /// runtime whose executor the stream spawns its scan tasks onto: a Vortex partition scan spawns - /// work onto the session's runtime, so passing a different runtime leaves that work undriven - /// and the callbacks deadlock. Callers that hold the session's blocking runtime should pass - /// it here. + /// Drive the returned stream from one thread. `runtime` must be the runtime that owns the + /// underlying scan tasks and per-array exports. fn export_device_array_stream( self, session: &VortexSession, @@ -514,7 +492,7 @@ impl DeviceArrayStreamExt for SendableArrayStream { } } -/// Build the Arrow C Device stream that owns `array_iter` and exports its arrays through `ctx`. +/// Build the Arrow Device stream that owns `array_iter` and exports its arrays through `ctx`. fn device_array_stream( array_iter: ArrayStreamIterator, dtype: DType, @@ -542,7 +520,7 @@ fn device_array_stream( } } -/// Return the private stream state for a live Arrow device stream. +/// Returns the stream state stored in `private_data`. unsafe fn device_stream_private_data<'a>( stream: *mut ArrowDeviceArrayStream, ) -> Option<&'a mut DeviceArrayStreamPrivateData> { @@ -578,7 +556,9 @@ fn release_device_array(array: &mut ArrowDeviceArray) { } } -/// Run a stream callback body and convert errors or panics into Arrow status codes. +/// Runs an Arrow stream callback body. +/// +/// Returns an Arrow callback status code and stores failures in `last_error`. fn device_stream_callback( state: &mut DeviceArrayStreamPrivateData, panic_message: &'static str, @@ -587,12 +567,12 @@ fn device_stream_callback( let result = catch_unwind(AssertUnwindSafe(|| callback(state))); match result { Ok(Ok(())) => 0, - Ok(Err(err)) => state.set_error(err), - Err(_) => state.set_error(panic_message), + Ok(Err(err)) => state.set_error(err, ARROW_STREAM_EIO), + Err(_) => state.set_error(panic_message, ARROW_STREAM_EIO), } } -/// Write the stream's Arrow schema, initializing it from the first stream array if necessary. +/// Write the stream's Arrow schema, initializing it from the first stream array if unset. unsafe extern "C" fn device_stream_get_schema( stream: *mut ArrowDeviceArrayStream, out: *mut ArrowSchema, @@ -603,11 +583,11 @@ unsafe extern "C" fn device_stream_get_schema( state.clear_error(); if out.is_null() { - return state.set_invalid_error("null ArrowSchema output"); + return state.set_error("null ArrowSchema output", ARROW_STREAM_EINVAL); } fn body(state: &mut DeviceArrayStreamPrivateData, out: *mut ArrowSchema) -> VortexResult<()> { - let schema = state.ensure_schema()?.to_ffi()?; + let schema = state.get_or_init_schema()?.to_ffi()?; unsafe { ptr::write(out.cast::(), schema) }; Ok(()) } @@ -630,9 +610,11 @@ unsafe extern "C" fn device_stream_get_next( state.clear_error(); if out.is_null() { - return state.set_invalid_error("null ArrowDeviceArray output"); + return state.set_error("null ArrowDeviceArray output", ARROW_STREAM_EINVAL); } + // Keep the fallible part in a local function so `device_stream_callback` handles callback + // status and error reporting consistently. fn body( state: &mut DeviceArrayStreamPrivateData, out: *mut ArrowDeviceArray, @@ -922,12 +904,21 @@ pub trait ExportDeviceArray: Debug + Send + Sync + 'static { #[cfg(test)] mod tests { + use std::ffi::CStr; + use std::ptr; + use arrow_schema::DataType; use arrow_schema::ffi::FFI_ArrowSchema; + use futures::stream; use vortex::VortexSessionDefault; + use vortex::array::ArrayRef; use vortex::array::IntoArray; use vortex::array::arrays::PrimitiveArray; + use vortex::array::stream::ArrayStreamAdapter; use vortex::array::stream::ArrayStreamExt; + use vortex::dtype::DType; + use vortex::dtype::Nullability; + use vortex::dtype::PType; use vortex::error::VortexResult; use vortex::error::vortex_err; use vortex::io::runtime::current::CurrentThreadRuntime; @@ -936,13 +927,28 @@ mod tests { use crate::CudaSession; use crate::arrow::ARROW_DEVICE_CUDA; + use crate::arrow::ARROW_STREAM_EINVAL; use crate::arrow::ArrowDeviceArray; + use crate::arrow::ArrowDeviceArrayStream; use crate::arrow::ArrowSchema; use crate::arrow::DeviceArrayStreamExt; use crate::arrow::release_device_array; use crate::arrow::release_schema; - /// Verify schema, array, EOS, and idempotent release stream behavior. + fn last_error(stream: &mut ArrowDeviceArrayStream) -> VortexResult { + let get_last_error = stream + .get_last_error + .ok_or_else(|| vortex_err!("stream missing get_last_error callback"))?; + let error = unsafe { get_last_error(stream as *mut ArrowDeviceArrayStream) }; + Ok(if error.is_null() { + String::new() + } else { + unsafe { CStr::from_ptr(error) } + .to_string_lossy() + .into_owned() + }) + } + #[cuda_test] fn test_export_device_array_stream_schema_next_eos_release() -> VortexResult<()> { let runtime = CurrentThreadRuntime::new(); @@ -994,4 +1000,79 @@ mod tests { assert!(device_stream.release.is_none()); Ok(()) } + + #[cuda_test] + fn test_export_device_array_stream_empty_stream_schema_and_eos() -> VortexResult<()> { + let runtime = CurrentThreadRuntime::new(); + let session = VortexSession::default().with_some(CudaSession::try_default()?); + let dtype = DType::Primitive(PType::U32, Nullability::NonNullable); + let stream = + ArrayStreamAdapter::new(dtype, stream::empty::>()).boxed(); + let mut device_stream = stream.export_device_array_stream(&session, &runtime)?; + + let get_schema = device_stream + .get_schema + .ok_or_else(|| vortex_err!("stream missing get_schema callback"))?; + let mut schema = FFI_ArrowSchema::empty(); + let status = unsafe { + get_schema( + &raw mut device_stream, + (&raw mut schema).cast::(), + ) + }; + assert_eq!(status, 0); + let field = arrow_schema::Field::try_from(&schema)?; + assert_eq!(field.data_type(), &DataType::UInt32); + + let get_next = device_stream + .get_next + .ok_or_else(|| vortex_err!("stream missing get_next callback"))?; + let mut eos = ArrowDeviceArray::empty(); + let status = unsafe { get_next(&raw mut device_stream, &raw mut eos) }; + assert_eq!(status, 0); + assert!(eos.array.release.is_none()); + + unsafe { + release_schema(&mut schema); + let release = device_stream + .release + .ok_or_else(|| vortex_err!("stream missing release callback"))?; + release(&raw mut device_stream); + } + Ok(()) + } + + #[cuda_test] + fn test_export_device_array_stream_null_outputs_report_error() -> VortexResult<()> { + let runtime = CurrentThreadRuntime::new(); + let session = VortexSession::default().with_some(CudaSession::try_default()?); + let array = PrimitiveArray::from_iter(0u32..5).into_array(); + let stream = array.to_array_stream().boxed(); + let mut device_stream = stream.export_device_array_stream(&session, &runtime)?; + + let get_schema = device_stream + .get_schema + .ok_or_else(|| vortex_err!("stream missing get_schema callback"))?; + let status = unsafe { get_schema(&raw mut device_stream, ptr::null_mut()) }; + assert_eq!(status, ARROW_STREAM_EINVAL); + assert_eq!(last_error(&mut device_stream)?, "null ArrowSchema output"); + + let get_next = device_stream + .get_next + .ok_or_else(|| vortex_err!("stream missing get_next callback"))?; + let status = unsafe { get_next(&raw mut device_stream, ptr::null_mut()) }; + assert_eq!(status, ARROW_STREAM_EINVAL); + assert_eq!( + last_error(&mut device_stream)?, + "null ArrowDeviceArray output" + ); + + unsafe { + let release = device_stream + .release + .ok_or_else(|| vortex_err!("stream missing release callback"))?; + release(&raw mut device_stream); + } + Ok(()) + } } diff --git a/vortex-ffi/src/lib.rs b/vortex-ffi/src/lib.rs index a8ac3de5cb5..0fd46c2eb2d 100644 --- a/vortex-ffi/src/lib.rs +++ b/vortex-ffi/src/lib.rs @@ -60,7 +60,7 @@ static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRunt /// /// Streams from `vortex-ffi` partitions spawn their scan work onto this runtime's executor, so a /// consumer crate (for example `vortex-cuda`'s Arrow device stream export) must drive them on this -/// same runtime rather than a private one, otherwise the spawned work is never driven. +/// same runtime rather than a private one. pub fn runtime() -> &'static CurrentThreadRuntime { &RUNTIME } From f1a8999910bc12ac87af413220a2d6953a4600c2 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Fri, 19 Jun 2026 11:34:39 +0000 Subject: [PATCH 25/25] address comments Signed-off-by: Alexander Droste --- vortex-cuda/ffi/src/lib.rs | 4 ++-- vortex-cuda/src/arrow/mod.rs | 22 +++++++++++----------- vortex-ffi/src/lib.rs | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index 1a23e7e1d4a..b5718d3d5fa 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -20,7 +20,7 @@ use vortex_cuda::arrow::ArrowDeviceArray; use vortex_cuda::arrow::ArrowDeviceArrayStream; use vortex_cuda::arrow::DeviceArrayExt; use vortex_cuda::arrow::DeviceArrayStreamExt; -use vortex_ffi::runtime; +use vortex_ffi::ffi_runtime; use vortex_ffi::try_or; use vortex_ffi::vx_array; use vortex_ffi::vx_array_ref; @@ -142,7 +142,7 @@ pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device_stream( let session = session_with_cuda(unsafe { vx_session_ref(session) }?)?; // Drive the stream on the same runtime the partition's scan spawned its work onto. - let device_stream = array_stream.export_device_array_stream(&session, runtime())?; + let device_stream = array_stream.export_device_array_stream(&session, ffi_runtime())?; unsafe { ptr::write(out_stream, device_stream) }; Ok(VX_CUDA_OK) diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 036c5e36363..fb475ab2b24 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -270,10 +270,10 @@ impl DeviceArrayExt for ArrayRef { } /// POSIX EIO for Arrow stream producer/export failures. -const ARROW_STREAM_EIO: c_int = 5; +const LIBC_EIO: c_int = 5; /// POSIX EINVAL for invalid Arrow stream callback arguments or released streams. -const ARROW_STREAM_EINVAL: c_int = 22; +const LIBC_EINVAL: c_int = 22; #[derive(Debug, PartialEq)] enum ArrowDeviceStreamSchema { @@ -567,8 +567,8 @@ fn device_stream_callback( let result = catch_unwind(AssertUnwindSafe(|| callback(state))); match result { Ok(Ok(())) => 0, - Ok(Err(err)) => state.set_error(err, ARROW_STREAM_EIO), - Err(_) => state.set_error(panic_message, ARROW_STREAM_EIO), + Ok(Err(err)) => state.set_error(err, LIBC_EIO), + Err(_) => state.set_error(panic_message, LIBC_EIO), } } @@ -578,12 +578,12 @@ unsafe extern "C" fn device_stream_get_schema( out: *mut ArrowSchema, ) -> c_int { let Some(state) = (unsafe { device_stream_private_data(stream) }) else { - return ARROW_STREAM_EINVAL; + return LIBC_EINVAL; }; state.clear_error(); if out.is_null() { - return state.set_error("null ArrowSchema output", ARROW_STREAM_EINVAL); + return state.set_error("null ArrowSchema output", LIBC_EINVAL); } fn body(state: &mut DeviceArrayStreamPrivateData, out: *mut ArrowSchema) -> VortexResult<()> { @@ -605,12 +605,12 @@ unsafe extern "C" fn device_stream_get_next( out: *mut ArrowDeviceArray, ) -> c_int { let Some(state) = (unsafe { device_stream_private_data(stream) }) else { - return ARROW_STREAM_EINVAL; + return LIBC_EINVAL; }; state.clear_error(); if out.is_null() { - return state.set_error("null ArrowDeviceArray output", ARROW_STREAM_EINVAL); + return state.set_error("null ArrowDeviceArray output", LIBC_EINVAL); } // Keep the fallible part in a local function so `device_stream_callback` handles callback @@ -927,11 +927,11 @@ mod tests { use crate::CudaSession; use crate::arrow::ARROW_DEVICE_CUDA; - use crate::arrow::ARROW_STREAM_EINVAL; use crate::arrow::ArrowDeviceArray; use crate::arrow::ArrowDeviceArrayStream; use crate::arrow::ArrowSchema; use crate::arrow::DeviceArrayStreamExt; + use crate::arrow::LIBC_EINVAL; use crate::arrow::release_device_array; use crate::arrow::release_schema; @@ -1054,14 +1054,14 @@ mod tests { .get_schema .ok_or_else(|| vortex_err!("stream missing get_schema callback"))?; let status = unsafe { get_schema(&raw mut device_stream, ptr::null_mut()) }; - assert_eq!(status, ARROW_STREAM_EINVAL); + assert_eq!(status, LIBC_EINVAL); assert_eq!(last_error(&mut device_stream)?, "null ArrowSchema output"); let get_next = device_stream .get_next .ok_or_else(|| vortex_err!("stream missing get_next callback"))?; let status = unsafe { get_next(&raw mut device_stream, ptr::null_mut()) }; - assert_eq!(status, ARROW_STREAM_EINVAL); + assert_eq!(status, LIBC_EINVAL); assert_eq!( last_error(&mut device_stream)?, "null ArrowDeviceArray output" diff --git a/vortex-ffi/src/lib.rs b/vortex-ffi/src/lib.rs index 0fd46c2eb2d..43e108e6c28 100644 --- a/vortex-ffi/src/lib.rs +++ b/vortex-ffi/src/lib.rs @@ -61,7 +61,7 @@ static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRunt /// Streams from `vortex-ffi` partitions spawn their scan work onto this runtime's executor, so a /// consumer crate (for example `vortex-cuda`'s Arrow device stream export) must drive them on this /// same runtime rather than a private one. -pub fn runtime() -> &'static CurrentThreadRuntime { +pub fn ffi_runtime() -> &'static CurrentThreadRuntime { &RUNTIME }