diff --git a/vortex-cuda/README.md b/vortex-cuda/README.md index d71ad593ce7..4024372adf3 100644 --- a/vortex-cuda/README.md +++ b/vortex-cuda/README.md @@ -39,3 +39,15 @@ cmake -S cpp -B cpp/build \ -DCMAKE_EXE_LINKER_FLAGS="-Wl,--stub-group-size=1048576" \ -GNinja && cmake --build cpp/build --target INTEROP_TEST --parallel ``` + +## Running the cuDF test harness + +```sh +cargo build -p vortex-test-e2e-cuda +cmake --build /path/to/cudf-test-harness/build --target cudf-test-harness --parallel + +LD_LIBRARY_PATH=/usr/local/cuda-13.1/compat \ + target/debug/cudf_harness_runner \ + /path/to/cudf-test-harness/build/cudf-test-harness \ + target/debug/libvortex_test_e2e_cuda.so +``` diff --git a/vortex-cuda/build.rs b/vortex-cuda/build.rs index e6024d3c275..eb4743bab21 100644 --- a/vortex-cuda/build.rs +++ b/vortex-cuda/build.rs @@ -206,6 +206,7 @@ fn generate_arrow_device_array_bindings(manifest_dir: &Path, out_dir: &Path) { .header(header.to_string_lossy()) .allowlist_type("ArrowArray") .allowlist_type("ArrowDeviceArray") + .allowlist_type("ArrowDeviceArrayStream") .allowlist_type("ArrowDeviceType") .allowlist_var("ARROW_DEVICE_.*") // ArrowArray/ArrowDeviceArray own producer state through release/private_data. diff --git a/vortex-cuda/ffi/cinclude/vortex_cuda.h b/vortex-cuda/ffi/cinclude/vortex_cuda.h index ee0f0fb3ca3..0b06a660afa 100644 --- a/vortex-cuda/ffi/cinclude/vortex_cuda.h +++ b/vortex-cuda/ffi/cinclude/vortex_cuda.h @@ -43,6 +43,18 @@ struct ArrowDeviceArray { }; #endif +#if !defined(ARROW_C_DEVICE_STREAM_INTERFACE) && !defined(USE_OWN_ARROW_DEVICE) +#define ARROW_C_DEVICE_STREAM_INTERFACE +struct ArrowDeviceArrayStream { + ArrowDeviceType device_type; + int (*get_schema)(struct ArrowDeviceArrayStream *, struct ArrowSchema *out); + int (*get_next)(struct ArrowDeviceArrayStream *, struct ArrowDeviceArray *out); + const char *(*get_last_error)(struct ArrowDeviceArrayStream *); + void (*release)(struct ArrowDeviceArrayStream *); + void *private_data; +}; +#endif + /** * Create a CUDA Vortex session. * @@ -69,6 +81,25 @@ int vx_cuda_array_export_arrow_device(const vx_session *session, struct ArrowDeviceArray *out_array, vx_error **error_out); +/** + * Consume a Vortex partition and scan it as an Arrow C Device stream. + * + * This function takes ownership of `partition`. Callers must not free or reuse + * it after calling this function, regardless of success or failure. + * + * On success returns 0 and writes an owned `ArrowDeviceArrayStream` to + * `out_stream`. The stream owns the resulting scan iterator. The caller must + * release the stream through its embedded Arrow `release` callback, and must + * release each produced `ArrowDeviceArray` through its embedded + * `ArrowArray.release` callback. + * + * On error returns 1 and writes a `vx_error` to `error_out` when non-NULL. + */ +int vx_cuda_partition_scan_arrow_device_stream(const vx_session *session, + vx_partition *partition, + struct ArrowDeviceArrayStream *out_stream, + vx_error **error_out); + #ifdef __cplusplus } #endif diff --git a/vortex-cuda/ffi/src/lib.rs b/vortex-cuda/ffi/src/lib.rs index f3f97210d5b..b5718d3d5fa 100644 --- a/vortex-cuda/ffi/src/lib.rs +++ b/vortex-cuda/ffi/src/lib.rs @@ -17,11 +17,16 @@ use vortex::error::vortex_ensure; use vortex::session::VortexSession; use vortex_cuda::CudaSession; use vortex_cuda::arrow::ArrowDeviceArray; +use vortex_cuda::arrow::ArrowDeviceArrayStream; use vortex_cuda::arrow::DeviceArrayExt; +use vortex_cuda::arrow::DeviceArrayStreamExt; +use vortex_ffi::ffi_runtime; use vortex_ffi::try_or; use vortex_ffi::vx_array; use vortex_ffi::vx_array_ref; use vortex_ffi::vx_error; +use vortex_ffi::vx_partition; +use vortex_ffi::vx_partition_into_array_stream; use vortex_ffi::vx_session; use vortex_ffi::vx_session_new_with; use vortex_ffi::vx_session_ref; @@ -29,6 +34,10 @@ use vortex_ffi::vx_session_ref; const VX_CUDA_OK: c_int = 0; const VX_CUDA_ERR: c_int = 1; +/// Return a Vortex session with a [`CudaSession`] session variable. +/// +/// If `session` already has CUDA support, this returns a clone of it. Otherwise it +/// returns a new session cloned from `session` with a default [`CudaSession`] attached. fn session_with_cuda(session: &VortexSession) -> VortexResult { if session.get_opt::().is_some() { return Ok(session.clone()); @@ -100,6 +109,46 @@ pub unsafe extern "C-unwind" fn vx_cuda_array_export_arrow_device( }) } +/// Consume a Vortex partition and scan it as an Arrow C Device stream. +/// +/// This function takes ownership of `partition`. Callers must not free or reuse it after calling +/// this function, regardless of success or failure. +/// +/// On success returns `0` and writes an owned `ArrowDeviceArrayStream` to `out_stream`. The stream +/// owns the resulting scan iterator. The caller must release the stream through its embedded Arrow +/// `release` callback, and must release each produced `ArrowDeviceArray` through its embedded +/// `ArrowArray.release` callback. +/// +/// On error returns `1` and, when `error_out` is non-null, writes a `vx_error` (free with +/// `vx_error_free`). +/// +/// # Safety +/// +/// `session` must be a valid borrowed handle created by `vortex-ffi`. `partition` must be an owned +/// partition handle created by `vortex-ffi`. `out_stream` must be a valid writable pointer. If +/// `error_out` is non-null, it must be valid for writing one error pointer. +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device_stream( + session: *const vx_session, + partition: *mut vx_partition, + out_stream: *mut ArrowDeviceArrayStream, + error_out: *mut *mut vx_error, +) -> c_int { + try_or(error_out, VX_CUDA_ERR, || { + vortex_ensure!(!partition.is_null(), "null vx_partition"); + + let array_stream = unsafe { vx_partition_into_array_stream(partition) }?; + vortex_ensure!(!out_stream.is_null(), "null ArrowDeviceArrayStream output"); + + let session = session_with_cuda(unsafe { vx_session_ref(session) }?)?; + // Drive the stream on the same runtime the partition's scan spawned its work onto. + let device_stream = array_stream.export_device_array_stream(&session, ffi_runtime())?; + + unsafe { ptr::write(out_stream, device_stream) }; + Ok(VX_CUDA_OK) + }) +} + #[cfg(test)] mod tests { use std::ptr; diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 63582b22a24..fb475ab2b24 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -11,8 +11,13 @@ mod canonical; mod list_view; +use std::ffi::CString; +use std::ffi::c_char; +use std::ffi::c_int; use std::ffi::c_void; use std::fmt::Debug; +use std::panic::AssertUnwindSafe; +use std::panic::catch_unwind; use std::ptr; use std::sync::Arc; @@ -38,13 +43,18 @@ use vortex::array::arrays::listview::ListViewArrayExt; use vortex::array::arrays::struct_::StructArrayExt; use vortex::array::arrow::ArrowSessionExt; use vortex::array::buffer::BufferHandle; +use vortex::array::stream::SendableArrayStream; use vortex::dtype::DType; use vortex::dtype::DecimalDType; use vortex::dtype::DecimalType; use vortex::dtype::PType; use vortex::dtype::StructFields; use vortex::error::VortexResult; +use vortex::error::vortex_ensure; use vortex::error::vortex_err; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::current::CurrentThreadRuntime; +use vortex::session::VortexSession; use crate::CudaBufferExt; use crate::CudaExecutionCtx; @@ -61,7 +71,9 @@ mod arrow_c_abi { pub use arrow_c_abi::ArrowArray; pub use arrow_c_abi::ArrowDeviceArray; +pub use arrow_c_abi::ArrowDeviceArrayStream; pub use arrow_c_abi::ArrowDeviceType; +use arrow_c_abi::ArrowSchema; #[cfg(feature = "_test-harness")] #[doc(hidden)] @@ -93,6 +105,21 @@ impl ArrowArray { } } +impl ArrowDeviceArray { + /// A zeroed device array: an empty Arrow array with no device. Used as a + /// callback output placeholder and as the basis for the end-of-stream + /// marker. + fn empty() -> Self { + Self { + array: ArrowArray::empty(), + device_id: 0, + device_type: 0, + sync_event: ptr::null_mut(), + reserved: Default::default(), + } + } +} + unsafe impl Send for ArrowArray {} unsafe impl Sync for ArrowArray {} @@ -242,6 +269,408 @@ impl DeviceArrayExt for ArrayRef { } } +/// POSIX EIO for Arrow stream producer/export failures. +const LIBC_EIO: c_int = 5; + +/// POSIX EINVAL for invalid Arrow stream callback arguments or released streams. +const LIBC_EINVAL: c_int = 22; + +#[derive(Debug, PartialEq)] +enum ArrowDeviceStreamSchema { + Schema(Schema), + Field(Field), +} + +impl ArrowDeviceStreamSchema { + /// Convert an Arrow C schema into the stream schema shape for `dtype`. + fn from_ffi(schema: &FFI_ArrowSchema, dtype: &DType) -> VortexResult { + if matches!(dtype, DType::Struct(..)) { + Ok(Self::Schema(Schema::try_from(schema)?)) + } else { + Ok(Self::Field(Field::try_from(schema)?)) + } + } + + /// Convert a Vortex dtype into a stream schema when no stream array is available. + /// + /// This uses only the logical dtype, so it can differ from a non-empty stream's first-array + /// schema for encodings the dtype does not capture: a dictionary column reports a plain field + /// here but `DataType::Dictionary` once a concrete array is seen. + fn from_dtype(dtype: &DType, ctx: &mut CudaExecutionCtx) -> VortexResult { + let dtype = arrow_device_export_dtype(dtype); + if let DType::Struct(struct_dtype, _) = &dtype { + Ok(Self::Schema(Schema::new( + arrow_device_export_struct_fields(struct_dtype, ctx)?, + ))) + } else { + Ok(Self::Field(arrow_device_export_field("", &dtype, ctx)?)) + } + } + + /// Export this stream schema as an owned Arrow C schema. + fn to_ffi(&self) -> VortexResult { + match self { + Self::Schema(schema) => Ok(FFI_ArrowSchema::try_from(schema)?), + Self::Field(field) => Ok(FFI_ArrowSchema::try_from(field)?), + } + } +} + +type ArrayStreamIterator = Box>>; + +struct DeviceArrayStreamPrivateData { + array_iter: ArrayStreamIterator, + ctx: CudaExecutionCtx, + /// Runtime used by Arrow stream callbacks to pull from `array_iter` and block on per-array + /// CUDA exports. It must match the runtime that owns the underlying Vortex scan tasks so those + /// tasks are polled while callbacks are producing arrays. + runtime: CurrentThreadRuntime, + dtype: DType, + schema: Option, + pending_array: Option, + device_id: i64, + last_error: Option, +} + +impl DeviceArrayStreamPrivateData { + /// Clear the last stream error before a new callback invocation. + fn clear_error(&mut self) { + self.last_error = None; + } + + /// Store the last stream error and return the requested Arrow callback error code. + /// + /// Interior NUL bytes are replaced so `get_last_error` is never null while a non-zero status + /// is reported. + fn set_error(&mut self, error: impl ToString, code: c_int) -> c_int { + let message = error.to_string().replace('\0', " "); + self.last_error = Some(CString::new(message).unwrap_or_default()); + code + } + + /// Return the stream schema, exporting the first stream array to derive it if needed. + /// + /// A first array is held in `pending_array` so the following `get_next` returns it. + fn get_or_init_schema(&mut self) -> VortexResult<&ArrowDeviceStreamSchema> { + if self.schema.is_none() { + match self.array_iter.next() { + Some(array) => self.pending_array = Some(self.export_stream_array(array?)?), + None => { + self.schema = Some(ArrowDeviceStreamSchema::from_dtype( + &self.dtype, + &mut self.ctx, + )?); + } + } + } + + self.schema + .as_ref() + .ok_or_else(|| vortex_err!("ArrowDeviceArrayStream schema was not initialized")) + } + + /// Export and return the next Arrow device array, or `None` at end of stream. + fn next_array(&mut self) -> VortexResult> { + if let Some(array) = self.pending_array.take() { + return Ok(Some(array)); + } + + match self.array_iter.next() { + Some(array) => self.export_stream_array(array?).map(Some), + None => Ok(None), + } + } + + /// Export one array from the Vortex stream, validating it against the device stream. + fn export_stream_array(&mut self, array: ArrayRef) -> VortexResult { + vortex_ensure!( + array.dtype() == &self.dtype, + "stream array dtype changed from {} to {}", + self.dtype, + array.dtype() + ); + + let ArrowDeviceArrayWithSchema { + schema: mut ffi_schema, + array: mut device_array, + } = self + .runtime + .block_on(array.export_device_array_with_schema(&mut self.ctx))?; + + // Release the schema we no longer need, and on failure release the array we will not + // return. + let checked = self.check_stream_array(&ffi_schema, &device_array); + release_schema(&mut ffi_schema); + let exported_schema = match checked { + Ok(exported_schema) => exported_schema, + Err(error) => { + release_device_array(&mut device_array); + return Err(error); + } + }; + if self.schema.is_none() { + self.schema = Some(exported_schema); + } + Ok(device_array) + } + + /// Check that a freshly exported device array matches the stream schema and CUDA device. + fn check_stream_array( + &self, + ffi_schema: &FFI_ArrowSchema, + device_array: &ArrowDeviceArray, + ) -> VortexResult { + vortex_ensure!( + device_array.device_type == ARROW_DEVICE_CUDA, + "stream array exported on non-CUDA device type {}", + device_array.device_type + ); + vortex_ensure!( + device_array.device_id == self.device_id, + "stream array moved from CUDA device {} to {}", + self.device_id, + device_array.device_id + ); + + let exported_schema = ArrowDeviceStreamSchema::from_ffi(ffi_schema, &self.dtype)?; + if let Some(stream_schema) = &self.schema { + vortex_ensure!( + stream_schema == &exported_schema, + "stream array Arrow schema changed from {:?} to {:?}; an Arrow C device stream \ + requires every array to share one schema, so chunks must not vary their \ + encoding (for example a dictionary-encoded chunk among plain chunks)", + stream_schema, + exported_schema + ); + } + Ok(exported_schema) + } +} + +impl Drop for DeviceArrayStreamPrivateData { + /// Release a first stream array if `get_schema` exported it and `get_next` never returned it. + fn drop(&mut self) { + if let Some(mut array) = self.pending_array.take() { + release_device_array(&mut array); + } + } +} + +/// Extension trait for exporting a Vortex array stream as an Arrow Device stream. +pub trait DeviceArrayStreamExt { + /// Export this stream as an [`ArrowDeviceArrayStream`]. + /// + /// Arrays are exported by reusing one [`CudaExecutionCtx`], and every produced + /// [`ArrowDeviceArray`] must remain on the CUDA device captured at stream construction. The + /// returned [`ArrowDeviceArrayStream`] owns the Vortex stream and must be released through its + /// embedded `release` callback. + /// + /// The Arrow Device stream contract requires all arrays to share the schema reported by + /// `get_schema`. The schema is derived from the first array, or from the logical dtype + /// for an empty stream. Chunks that export to different Arrow types are rejected mid-stream. + /// + /// Drive the returned stream from one thread. `runtime` must be the runtime that owns the + /// underlying scan tasks and per-array exports. + fn export_device_array_stream( + self, + session: &VortexSession, + runtime: &CurrentThreadRuntime, + ) -> VortexResult; +} + +impl DeviceArrayStreamExt for SendableArrayStream { + /// Drive this stream on `runtime` and export it. + fn export_device_array_stream( + self, + session: &VortexSession, + runtime: &CurrentThreadRuntime, + ) -> VortexResult { + let dtype = self.dtype().clone(); + let ctx = crate::CudaSession::create_execution_ctx(session)?; + let array_iter = Box::new(runtime.block_on_stream(self)); + Ok(device_array_stream(array_iter, dtype, ctx, runtime.clone())) + } +} + +/// Build the Arrow Device stream that owns `array_iter` and exports its arrays through `ctx`. +fn device_array_stream( + array_iter: ArrayStreamIterator, + dtype: DType, + ctx: CudaExecutionCtx, + runtime: CurrentThreadRuntime, +) -> ArrowDeviceArrayStream { + let private_data = Box::new(DeviceArrayStreamPrivateData { + device_id: ctx.stream().context().ordinal() as i64, + array_iter, + ctx, + runtime, + dtype, + schema: None, + pending_array: None, + last_error: None, + }); + + ArrowDeviceArrayStream { + device_type: ARROW_DEVICE_CUDA, + get_schema: Some(device_stream_get_schema), + get_next: Some(device_stream_get_next), + get_last_error: Some(device_stream_get_last_error), + release: Some(device_stream_release), + private_data: Box::into_raw(private_data).cast(), + } +} + +/// Returns the stream state stored in `private_data`. +unsafe fn device_stream_private_data<'a>( + stream: *mut ArrowDeviceArrayStream, +) -> Option<&'a mut DeviceArrayStreamPrivateData> { + let stream = unsafe { stream.as_mut()? }; + unsafe { + stream + .private_data + .cast::() + .as_mut() + } +} + +/// Create the Arrow end-of-stream marker for the stream's CUDA device. +fn released_device_array(device_id: i64) -> ArrowDeviceArray { + ArrowDeviceArray { + device_id, + device_type: ARROW_DEVICE_CUDA, + ..ArrowDeviceArray::empty() + } +} + +/// Release an Arrow C schema if it is live. +fn release_schema(schema: &mut FFI_ArrowSchema) { + if let Some(release) = schema.release { + unsafe { release(schema) }; + } +} + +/// Release an Arrow device array if it is live. +fn release_device_array(array: &mut ArrowDeviceArray) { + if let Some(release) = array.array.release { + unsafe { release(&raw mut array.array) }; + } +} + +/// Runs an Arrow stream callback body. +/// +/// Returns an Arrow callback status code and stores failures in `last_error`. +fn device_stream_callback( + state: &mut DeviceArrayStreamPrivateData, + panic_message: &'static str, + callback: impl FnOnce(&mut DeviceArrayStreamPrivateData) -> VortexResult<()>, +) -> c_int { + let result = catch_unwind(AssertUnwindSafe(|| callback(state))); + match result { + Ok(Ok(())) => 0, + Ok(Err(err)) => state.set_error(err, LIBC_EIO), + Err(_) => state.set_error(panic_message, LIBC_EIO), + } +} + +/// Write the stream's Arrow schema, initializing it from the first stream array if unset. +unsafe extern "C" fn device_stream_get_schema( + stream: *mut ArrowDeviceArrayStream, + out: *mut ArrowSchema, +) -> c_int { + let Some(state) = (unsafe { device_stream_private_data(stream) }) else { + return LIBC_EINVAL; + }; + state.clear_error(); + + if out.is_null() { + return state.set_error("null ArrowSchema output", LIBC_EINVAL); + } + + fn body(state: &mut DeviceArrayStreamPrivateData, out: *mut ArrowSchema) -> VortexResult<()> { + let schema = state.get_or_init_schema()?.to_ffi()?; + unsafe { ptr::write(out.cast::(), schema) }; + Ok(()) + } + + device_stream_callback( + state, + "panic in ArrowDeviceArrayStream::get_schema", + |state| body(state, out), + ) +} + +/// Write the next exported Arrow device array, or a released array at end of stream. +unsafe extern "C" fn device_stream_get_next( + stream: *mut ArrowDeviceArrayStream, + out: *mut ArrowDeviceArray, +) -> c_int { + let Some(state) = (unsafe { device_stream_private_data(stream) }) else { + return LIBC_EINVAL; + }; + state.clear_error(); + + if out.is_null() { + return state.set_error("null ArrowDeviceArray output", LIBC_EINVAL); + } + + // Keep the fallible part in a local function so `device_stream_callback` handles callback + // status and error reporting consistently. + fn body( + state: &mut DeviceArrayStreamPrivateData, + out: *mut ArrowDeviceArray, + ) -> VortexResult<()> { + let array = state + .next_array()? + .unwrap_or_else(|| released_device_array(state.device_id)); + unsafe { ptr::write(out, array) }; + Ok(()) + } + + device_stream_callback( + state, + "panic in ArrowDeviceArrayStream::get_next", + |state| body(state, out), + ) +} + +/// Return the most recent callback error message, or null if no error is stored. +unsafe extern "C" fn device_stream_get_last_error( + stream: *mut ArrowDeviceArrayStream, +) -> *const c_char { + let Some(state) = (unsafe { device_stream_private_data(stream) }) else { + return ptr::null(); + }; + + state + .last_error + .as_ref() + .map_or(ptr::null(), |error| error.as_ptr()) +} + +/// Free the stream state and null its callbacks. The null `release` makes a second call a no-op. +unsafe extern "C" fn device_stream_release(stream: *mut ArrowDeviceArrayStream) { + let Some(stream_ref) = (unsafe { stream.as_mut() }) else { + return; + }; + if stream_ref.release.is_none() { + return; + } + + stream_ref.get_schema = None; + stream_ref.get_next = None; + stream_ref.get_last_error = None; + stream_ref.release = None; + + let private_data = std::mem::replace(&mut stream_ref.private_data, ptr::null_mut()); + if !private_data.is_null() { + drop(catch_unwind(AssertUnwindSafe(|| unsafe { + drop(Box::from_raw( + private_data.cast::(), + )); + }))); + } +} + /// Build the Arrow C schema that describes the exported device array. pub(crate) fn arrow_schema_for_array( array: &ArrayRef, @@ -472,3 +901,178 @@ pub trait ExportDeviceArray: Debug + Send + Sync + 'static { Ok(ArrowDeviceArrayWithSchema { schema, array }) } } + +#[cfg(test)] +mod tests { + use std::ffi::CStr; + use std::ptr; + + use arrow_schema::DataType; + use arrow_schema::ffi::FFI_ArrowSchema; + use futures::stream; + use vortex::VortexSessionDefault; + use vortex::array::ArrayRef; + use vortex::array::IntoArray; + use vortex::array::arrays::PrimitiveArray; + use vortex::array::stream::ArrayStreamAdapter; + use vortex::array::stream::ArrayStreamExt; + use vortex::dtype::DType; + use vortex::dtype::Nullability; + use vortex::dtype::PType; + use vortex::error::VortexResult; + use vortex::error::vortex_err; + use vortex::io::runtime::current::CurrentThreadRuntime; + use vortex::session::VortexSession; + use vortex_cuda_macros::test as cuda_test; + + use crate::CudaSession; + use crate::arrow::ARROW_DEVICE_CUDA; + use crate::arrow::ArrowDeviceArray; + use crate::arrow::ArrowDeviceArrayStream; + use crate::arrow::ArrowSchema; + use crate::arrow::DeviceArrayStreamExt; + use crate::arrow::LIBC_EINVAL; + use crate::arrow::release_device_array; + use crate::arrow::release_schema; + + fn last_error(stream: &mut ArrowDeviceArrayStream) -> VortexResult { + let get_last_error = stream + .get_last_error + .ok_or_else(|| vortex_err!("stream missing get_last_error callback"))?; + let error = unsafe { get_last_error(stream as *mut ArrowDeviceArrayStream) }; + Ok(if error.is_null() { + String::new() + } else { + unsafe { CStr::from_ptr(error) } + .to_string_lossy() + .into_owned() + }) + } + + #[cuda_test] + fn test_export_device_array_stream_schema_next_eos_release() -> VortexResult<()> { + let runtime = CurrentThreadRuntime::new(); + let session = VortexSession::default().with_some(CudaSession::try_default()?); + let array = PrimitiveArray::from_iter(0u32..5).into_array(); + let stream = array.to_array_stream().boxed(); + let mut device_stream = stream.export_device_array_stream(&session, &runtime)?; + assert_eq!(device_stream.device_type, ARROW_DEVICE_CUDA); + + let mut schema = FFI_ArrowSchema::empty(); + let get_schema = device_stream + .get_schema + .ok_or_else(|| vortex_err!("stream missing get_schema callback"))?; + let status = unsafe { + get_schema( + &raw mut device_stream, + (&raw mut schema).cast::(), + ) + }; + assert_eq!(status, 0); + let field = arrow_schema::Field::try_from(&schema)?; + assert_eq!(field.data_type(), &DataType::UInt32); + + let get_next = device_stream + .get_next + .ok_or_else(|| vortex_err!("stream missing get_next callback"))?; + let mut first_array = ArrowDeviceArray::empty(); + let status = unsafe { get_next(&raw mut device_stream, &raw mut first_array) }; + assert_eq!(status, 0); + assert_eq!(first_array.device_type, ARROW_DEVICE_CUDA); + assert_eq!(first_array.array.length, 5); + assert!(first_array.array.release.is_some()); + + let mut eos = ArrowDeviceArray::empty(); + let status = unsafe { get_next(&raw mut device_stream, &raw mut eos) }; + assert_eq!(status, 0); + assert_eq!(eos.device_type, ARROW_DEVICE_CUDA); + assert!(eos.array.release.is_none()); + + unsafe { + release_device_array(&mut first_array); + release_schema(&mut schema); + let release = device_stream + .release + .ok_or_else(|| vortex_err!("stream missing release callback"))?; + release(&raw mut device_stream); + release(&raw mut device_stream); + } + assert!(device_stream.release.is_none()); + Ok(()) + } + + #[cuda_test] + fn test_export_device_array_stream_empty_stream_schema_and_eos() -> VortexResult<()> { + let runtime = CurrentThreadRuntime::new(); + let session = VortexSession::default().with_some(CudaSession::try_default()?); + let dtype = DType::Primitive(PType::U32, Nullability::NonNullable); + let stream = + ArrayStreamAdapter::new(dtype, stream::empty::>()).boxed(); + let mut device_stream = stream.export_device_array_stream(&session, &runtime)?; + + let get_schema = device_stream + .get_schema + .ok_or_else(|| vortex_err!("stream missing get_schema callback"))?; + let mut schema = FFI_ArrowSchema::empty(); + let status = unsafe { + get_schema( + &raw mut device_stream, + (&raw mut schema).cast::(), + ) + }; + assert_eq!(status, 0); + let field = arrow_schema::Field::try_from(&schema)?; + assert_eq!(field.data_type(), &DataType::UInt32); + + let get_next = device_stream + .get_next + .ok_or_else(|| vortex_err!("stream missing get_next callback"))?; + let mut eos = ArrowDeviceArray::empty(); + let status = unsafe { get_next(&raw mut device_stream, &raw mut eos) }; + assert_eq!(status, 0); + assert!(eos.array.release.is_none()); + + unsafe { + release_schema(&mut schema); + let release = device_stream + .release + .ok_or_else(|| vortex_err!("stream missing release callback"))?; + release(&raw mut device_stream); + } + Ok(()) + } + + #[cuda_test] + fn test_export_device_array_stream_null_outputs_report_error() -> VortexResult<()> { + let runtime = CurrentThreadRuntime::new(); + let session = VortexSession::default().with_some(CudaSession::try_default()?); + let array = PrimitiveArray::from_iter(0u32..5).into_array(); + let stream = array.to_array_stream().boxed(); + let mut device_stream = stream.export_device_array_stream(&session, &runtime)?; + + let get_schema = device_stream + .get_schema + .ok_or_else(|| vortex_err!("stream missing get_schema callback"))?; + let status = unsafe { get_schema(&raw mut device_stream, ptr::null_mut()) }; + assert_eq!(status, LIBC_EINVAL); + assert_eq!(last_error(&mut device_stream)?, "null ArrowSchema output"); + + let get_next = device_stream + .get_next + .ok_or_else(|| vortex_err!("stream missing get_next callback"))?; + let status = unsafe { get_next(&raw mut device_stream, ptr::null_mut()) }; + assert_eq!(status, LIBC_EINVAL); + assert_eq!( + last_error(&mut device_stream)?, + "null ArrowDeviceArray output" + ); + + unsafe { + let release = device_stream + .release + .ok_or_else(|| vortex_err!("stream missing release callback"))?; + release(&raw mut device_stream); + } + Ok(()) + } +} diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index 706b9c3c2bd..a09301299fa 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -25,6 +25,7 @@ mod stream_pool; pub use arrow::ArrowDeviceArrayWithSchema; pub use arrow::DeviceArrayExt; +pub use arrow::DeviceArrayStreamExt; pub use arrow::ExportDeviceArray; pub use canonical::CanonicalCudaExt; pub use device_buffer::CudaBufferExt; diff --git a/vortex-ffi/src/error.rs b/vortex-ffi/src/error.rs index 9459d77c6a4..3ee02c99e21 100644 --- a/vortex-ffi/src/error.rs +++ b/vortex-ffi/src/error.rs @@ -1,6 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::any::Any; +use std::panic::AssertUnwindSafe; +use std::panic::catch_unwind; use std::ptr; use std::sync::Arc; @@ -43,20 +46,35 @@ fn clear_error(error_out: *mut *mut vx_error) { unsafe { error_out.write(ptr::null_mut()) }; } +/// Convert a panic payload into the message stored in an FFI error. +fn panic_message(payload: &(dyn Any + Send)) -> String { + if let Some(message) = payload.downcast_ref::<&str>() { + format!("panic in Vortex FFI function: {message}") + } else if let Some(message) = payload.downcast_ref::() { + format!("panic in Vortex FFI function: {message}") + } else { + "panic in Vortex FFI function".to_string() + } +} + #[inline] pub fn try_or_default( error_out: *mut *mut vx_error, function: impl FnOnce() -> VortexResult, ) -> T { - match function() { - Ok(value) => { + match catch_unwind(AssertUnwindSafe(function)) { + Ok(Ok(value)) => { clear_error(error_out); value } - Err(err) => { + Ok(Err(err)) => { write_error(error_out, &err.to_string()); T::default() } + Err(payload) => { + write_error(error_out, &panic_message(payload.as_ref())); + T::default() + } } } @@ -69,15 +87,19 @@ pub fn try_or( error_value: T, function: impl FnOnce() -> VortexResult, ) -> T { - match function() { - Ok(value) => { + match catch_unwind(AssertUnwindSafe(function)) { + Ok(Ok(value)) => { clear_error(error_out); value } - Err(err) => { + Ok(Err(err)) => { write_error(error_out, &err.to_string()); error_value } + Err(payload) => { + write_error(error_out, &panic_message(payload.as_ref())); + error_value + } } } @@ -126,4 +148,18 @@ mod tests { assert_eq!(try_or(&raw mut error, -1, || Ok(42)), 42); assert!(error.is_null()); } + + #[test] + fn test_try_or_catches_panic() { + let mut error: *mut vx_error = ptr::null_mut(); + + assert_eq!(try_or(&raw mut error, -1, || panic!("boom")), -1); + assert!(!error.is_null()); + let message = unsafe { vx_error_get_message(error) }; + assert_eq!( + vx_string::as_ref(message).as_ref(), + "panic in Vortex FFI function: boom" + ); + unsafe { vx_error_free(error) }; + } } diff --git a/vortex-ffi/src/lib.rs b/vortex-ffi/src/lib.rs index f4ce314c073..43e108e6c28 100644 --- a/vortex-ffi/src/lib.rs +++ b/vortex-ffi/src/lib.rs @@ -35,6 +35,8 @@ pub use error::try_or; pub use error::vx_error; pub use error::vx_error_free; pub use log::vx_log_level; +pub use scan::vx_partition; +pub use scan::vx_partition_into_array_stream; pub use session::vx_session; pub use session::vx_session_free; pub use session::vx_session_new_with; @@ -53,6 +55,16 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; // TODO(ngates): also create a CurrentThreadPool to manage background worker threads. static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); +/// Return the shared FFI runtime for layered FFI crates that drive Vortex streams produced through +/// `vortex-ffi`. +/// +/// Streams from `vortex-ffi` partitions spawn their scan work onto this runtime's executor, so a +/// consumer crate (for example `vortex-cuda`'s Arrow device stream export) must drive them on this +/// same runtime rather than a private one. +pub fn ffi_runtime() -> &'static CurrentThreadRuntime { + &RUNTIME +} + /// SAFETY: name must be a non-NULL pointer pub(crate) unsafe fn to_field_name(name: *const c_char) -> VortexResult { let name = unsafe { CStr::from_ptr(name) } diff --git a/vortex-ffi/src/scan.rs b/vortex-ffi/src/scan.rs index 921c4f50621..4b8191bd4bb 100644 --- a/vortex-ffi/src/scan.rs +++ b/vortex-ffi/src/scan.rs @@ -66,6 +66,22 @@ crate::box_wrapper!( VxPartitionScan, vx_partition); +/// Consume an owned partition pointer for layered FFI crates and return its Vortex array stream. +/// +/// # Safety +/// +/// `partition` must be a non-null owned partition handle created by `vortex-ffi`. This function +/// consumes the handle; callers must not use or free it after calling this function. +pub unsafe fn vx_partition_into_array_stream( + partition: *mut vx_partition, +) -> VortexResult { + vortex_ensure!(!partition.is_null(), "null vx_partition"); + match *vx_partition::into_box(partition) { + VxPartitionScan::Pending(partition) => partition.execute(), + _ => vortex_bail!("partition already being consumed"), + } +} + // We parse Selection from vx_scan_selection[_include], so we don't need // to instantiate VX_SELECTION_* items directly. #[repr(C)] diff --git a/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs b/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs index 17678d2800c..a7ed5d06a55 100644 --- a/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs +++ b/vortex-test/e2e-cuda/src/bin/cudf_harness_runner.rs @@ -9,10 +9,11 @@ const PRIMITIVE_DTYPES: &[&str] = &[ "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", ]; const PRIMITIVE_DTYPE_ENV: &str = "VORTEX_CUDF_PRIMITIVE_DTYPE"; +const HARNESS_COMMANDS: &[&str] = &["check", "check-stream"]; fn main() -> ExitCode { let args = env::args().collect::>(); - let [program, harness, library] = args.as_slice() else { + let [_program, harness, library] = args.as_slice() else { eprintln!( "Usage: {} ", args.first().map_or("cudf_harness_runner", String::as_str) @@ -21,25 +22,27 @@ fn main() -> ExitCode { }; for primitive_dtype in PRIMITIVE_DTYPES { - eprintln!("running {program} with {PRIMITIVE_DTYPE_ENV}={primitive_dtype}"); + for harness_command in HARNESS_COMMANDS { + eprintln!("\n== {PRIMITIVE_DTYPE_ENV}={primitive_dtype} :: {harness_command} =="); - let status = Command::new("compute-sanitizer") - .args(["--tool", "memcheck", "--error-exitcode", "1"]) - .arg(harness) - .arg("check") - .arg(library) - .env(PRIMITIVE_DTYPE_ENV, primitive_dtype) - .status(); + let status = Command::new("compute-sanitizer") + .args(["--tool", "memcheck", "--error-exitcode", "1"]) + .arg(harness) + .arg(harness_command) + .arg(library) + .env(PRIMITIVE_DTYPE_ENV, primitive_dtype) + .status(); - match status { - Ok(status) if status.success() => {} - Ok(status) => { - eprintln!("cudf-test-harness failed with {status}"); - return ExitCode::from(1); - } - Err(err) => { - eprintln!("failed to run cudf-test-harness: {err}"); - return ExitCode::from(1); + match status { + Ok(status) if status.success() => {} + Ok(status) => { + eprintln!("cudf-test-harness {harness_command} failed with {status}"); + return ExitCode::from(1); + } + Err(err) => { + eprintln!("failed to run cudf-test-harness {harness_command}: {err}"); + return ExitCode::from(1); + } } } } diff --git a/vortex-test/e2e-cuda/src/lib.rs b/vortex-test/e2e-cuda/src/lib.rs index 7a2e062ac2d..493d7c7d23c 100644 --- a/vortex-test/e2e-cuda/src/lib.rs +++ b/vortex-test/e2e-cuda/src/lib.rs @@ -46,6 +46,7 @@ use vortex::array::arrays::TemporalArray; use vortex::array::arrays::VarBinViewArray; use vortex::array::arrays::varbinview::BinaryView; use vortex::array::arrow::ArrowSessionExt; +use vortex::array::stream::ArrayStreamExt; use vortex::array::validity::Validity; use vortex::buffer::Buffer; use vortex::buffer::ByteBuffer; @@ -55,12 +56,15 @@ use vortex::dtype::FieldNames; use vortex::dtype::NativePType; use vortex::dtype::Nullability; use vortex::extension::datetime::TimeUnit; +use vortex::io::runtime::current::CurrentThreadRuntime; use vortex::io::session::RuntimeSession; use vortex::layout::session::LayoutSession; use vortex::session::VortexSession; use vortex_cuda::CudaSession; use vortex_cuda::arrow::ArrowDeviceArray; +use vortex_cuda::arrow::ArrowDeviceArrayStream; use vortex_cuda::arrow::DeviceArrayExt; +use vortex_cuda::arrow::DeviceArrayStreamExt; const PRIMITIVE_DTYPE_ENV: &str = "VORTEX_CUDF_PRIMITIVE_DTYPE"; @@ -216,32 +220,9 @@ fn dictionary_array() -> VortexArrayRef { .into_array() } -/// # Safety -/// `schema_ptr` and `array_ptr` must be valid writable pointers. -#[unsafe(no_mangle)] -pub unsafe extern "C" fn export_array( - schema_ptr: &mut FFI_ArrowSchema, - array_ptr: &mut ArrowDeviceArray, -) -> i32 { - ffi_boundary("export_array", || export_array_inner(schema_ptr, array_ptr)) -} - -fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDeviceArray) -> i32 { - let mut ctx = match CudaSession::create_execution_ctx(&SESSION) { - Ok(ctx) => ctx, - Err(err) => { - eprintln!("error creating CUDA execution context: {err}"); - return 1; - } - }; - - let primitive = match primitive_array() { - Ok(array) => array, - Err(err) => { - eprintln!("error in export_array: {err}"); - return 1; - } - }; +/// Build the shared cuDF interop test array used by array and stream exports. +fn cudf_test_array() -> Result { + let primitive = primitive_array()?; // cuDF supports Arrow decimal device imports through Decimal128. Decimal256 is intentionally // not included here because cuDF has no DECIMAL256 type_id or Arrow interop mapping. let decimal32 = DecimalArray::from_option_iter( @@ -269,7 +250,7 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev TimeUnit::Days, ); - let array = StructArray::new( + Ok(StructArray::new( FieldNames::from_iter([ "prims", "bools", @@ -310,7 +291,38 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev 5, Validity::NonNullable, ) - .into_array(); + .into_array()) +} + +/// Export the shared cuDF test array as one Arrow device array. +/// +/// # Safety +/// `schema_ptr` and `array_ptr` must be valid writable pointers. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn export_array( + schema_ptr: &mut FFI_ArrowSchema, + array_ptr: &mut ArrowDeviceArray, +) -> i32 { + ffi_boundary("export_array", || export_array_inner(schema_ptr, array_ptr)) +} + +/// Implement `export_array` inside the panic-catching FFI boundary. +fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDeviceArray) -> i32 { + let mut ctx = match CudaSession::create_execution_ctx(&SESSION) { + Ok(ctx) => ctx, + Err(err) => { + eprintln!("error creating CUDA execution context: {err}"); + return 1; + } + }; + + let array = match cudf_test_array() { + Ok(array) => array, + Err(err) => { + eprintln!("error in export_array: {err}"); + return 1; + } + }; match block_on(array.export_device_array_with_schema(&mut ctx)) { Ok(exported) => { @@ -325,6 +337,45 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev } } +/// Export the shared cuDF test array as an Arrow device array stream. +/// +/// # Safety +/// `stream_ptr` must be a valid writable pointer. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn export_device_stream(stream_ptr: &mut ArrowDeviceArrayStream) -> i32 { + ffi_boundary("export_device_stream", || { + export_device_stream_inner(stream_ptr) + }) +} + +/// Implement `export_device_stream` inside the panic-catching FFI boundary. +fn export_device_stream_inner(stream_ptr: &mut ArrowDeviceArrayStream) -> i32 { + let array = match cudf_test_array() { + Ok(array) => array, + Err(err) => { + eprintln!("error in export_device_stream: {err}"); + return 1; + } + }; + + // The in-memory stream is inert, so any current-thread runtime drives it correctly. + let runtime = CurrentThreadRuntime::new(); + match array + .to_array_stream() + .boxed() + .export_device_array_stream(&SESSION, &runtime) + { + Ok(stream) => { + *stream_ptr = stream; + 0 + } + Err(err) => { + eprintln!("error in export_device_array_stream: {err}"); + 1 + } + } +} + /// # Safety /// `ffi_schema` and `ffi_array` must describe a valid Arrow C Data array. #[unsafe(no_mangle)]