Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6a78fea
Implement Arrow device array stream
0ax1 Jun 17, 2026
e61249b
Test CUDA Arrow device array streams with cuDF harness
0ax1 Jun 17, 2026
6d11e48
Document cuDF device stream harness
0ax1 Jun 17, 2026
e53feae
Simplify cuDF harness README command
0ax1 Jun 17, 2026
7330751
Avoid consuming CUDA partition before stream setup
0ax1 Jun 17, 2026
d82937d
Improve cuDF harness runner output
0ax1 Jun 17, 2026
52952d2
Wait for cuDF harness release asset
0ax1 Jun 17, 2026
0722ada
Revert "Wait for cuDF harness release asset"
0ax1 Jun 17, 2026
4ab3b2e
Defer cuDF stream harness opt-in
0ax1 Jun 17, 2026
b2b0cbd
Run cuDF device stream harness in CI
0ax1 Jun 17, 2026
a9970b3
Clarify CUDA stream ownership
0ax1 Jun 18, 2026
0b5fea3
Rename CUDA partition device stream FFI
0ax1 Jun 18, 2026
581d260
Document CUDA stream helpers
0ax1 Jun 18, 2026
07926a0
simplify
0ax1 Jun 18, 2026
17d85cb
schema
0ax1 Jun 18, 2026
b5a6697
docs
0ax1 Jun 18, 2026
f4f5d1b
Polish CUDA device stream naming
0ax1 Jun 18, 2026
e70ffe7
Polish CUDA device stream FFI handling
0ax1 Jun 18, 2026
9332a30
Catch panics in Vortex FFI error helpers
0ax1 Jun 18, 2026
a4b7e5f
Drive Arrow device stream on the session runtime
0ax1 Jun 18, 2026
ad8b448
Document Arrow device stream schema stability requirement
0ax1 Jun 18, 2026
1cb2daa
Deduplicate Arrow device array stream helpers
0ax1 Jun 18, 2026
b89a5c9
Wrap Arrow device stream comments to 100 columns
0ax1 Jun 18, 2026
cfff6cd
cleanup
0ax1 Jun 19, 2026
f1a8999
address comments
0ax1 Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions vortex-cuda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,15 @@ cmake -S cpp -B cpp/build \
-DCMAKE_EXE_LINKER_FLAGS="-Wl,--stub-group-size=1048576" \
-GNinja && cmake --build cpp/build --target INTEROP_TEST --parallel
```

## Running the cuDF test harness

```sh
cargo build -p vortex-test-e2e-cuda
cmake --build /path/to/cudf-test-harness/build --target cudf-test-harness --parallel

LD_LIBRARY_PATH=/usr/local/cuda-13.1/compat \
target/debug/cudf_harness_runner \
/path/to/cudf-test-harness/build/cudf-test-harness \
target/debug/libvortex_test_e2e_cuda.so
```
1 change: 1 addition & 0 deletions vortex-cuda/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ fn generate_arrow_device_array_bindings(manifest_dir: &Path, out_dir: &Path) {
.header(header.to_string_lossy())
.allowlist_type("ArrowArray")
.allowlist_type("ArrowDeviceArray")
.allowlist_type("ArrowDeviceArrayStream")
.allowlist_type("ArrowDeviceType")
.allowlist_var("ARROW_DEVICE_.*")
// ArrowArray/ArrowDeviceArray own producer state through release/private_data.
Expand Down
31 changes: 31 additions & 0 deletions vortex-cuda/ffi/cinclude/vortex_cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ struct ArrowDeviceArray {
};
#endif

#if !defined(ARROW_C_DEVICE_STREAM_INTERFACE) && !defined(USE_OWN_ARROW_DEVICE)
#define ARROW_C_DEVICE_STREAM_INTERFACE
struct ArrowDeviceArrayStream {
ArrowDeviceType device_type;
int (*get_schema)(struct ArrowDeviceArrayStream *, struct ArrowSchema *out);
int (*get_next)(struct ArrowDeviceArrayStream *, struct ArrowDeviceArray *out);
const char *(*get_last_error)(struct ArrowDeviceArrayStream *);
void (*release)(struct ArrowDeviceArrayStream *);
void *private_data;
};
#endif

/**
* Create a CUDA Vortex session.
*
Expand All @@ -69,6 +81,25 @@ int vx_cuda_array_export_arrow_device(const vx_session *session,
struct ArrowDeviceArray *out_array,
vx_error **error_out);

/**
* Consume a Vortex partition and scan it as an Arrow C Device stream.
*
* This function takes ownership of `partition`. Callers must not free or reuse
* it after calling this function, regardless of success or failure.
*
* On success returns 0 and writes an owned `ArrowDeviceArrayStream` to
* `out_stream`. The stream owns the resulting scan iterator. The caller must
* release the stream through its embedded Arrow `release` callback, and must
* release each produced `ArrowDeviceArray` through its embedded
* `ArrowArray.release` callback.
*
* On error returns 1 and writes a `vx_error` to `error_out` when non-NULL.
*/
int vx_cuda_partition_scan_arrow_device_stream(const vx_session *session,
vx_partition *partition,
struct ArrowDeviceArrayStream *out_stream,
vx_error **error_out);

#ifdef __cplusplus
}
#endif
49 changes: 49 additions & 0 deletions vortex-cuda/ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@ use vortex::error::vortex_ensure;
use vortex::session::VortexSession;
use vortex_cuda::CudaSession;
use vortex_cuda::arrow::ArrowDeviceArray;
use vortex_cuda::arrow::ArrowDeviceArrayStream;
use vortex_cuda::arrow::DeviceArrayExt;
use vortex_cuda::arrow::DeviceArrayStreamExt;
use vortex_ffi::ffi_runtime;
use vortex_ffi::try_or;
use vortex_ffi::vx_array;
use vortex_ffi::vx_array_ref;
use vortex_ffi::vx_error;
use vortex_ffi::vx_partition;
use vortex_ffi::vx_partition_into_array_stream;
use vortex_ffi::vx_session;
use vortex_ffi::vx_session_new_with;
use vortex_ffi::vx_session_ref;

const VX_CUDA_OK: c_int = 0;
const VX_CUDA_ERR: c_int = 1;

/// Return a Vortex session with a [`CudaSession`] session variable.
///
/// If `session` already has CUDA support, this returns a clone of it. Otherwise it
/// returns a new session cloned from `session` with a default [`CudaSession`] attached.
fn session_with_cuda(session: &VortexSession) -> VortexResult<VortexSession> {
if session.get_opt::<CudaSession>().is_some() {
return Ok(session.clone());
Expand Down Expand Up @@ -100,6 +109,46 @@ pub unsafe extern "C-unwind" fn vx_cuda_array_export_arrow_device(
})
}

/// Consume a Vortex partition and scan it as an Arrow C Device stream.
///
/// This function takes ownership of `partition`. Callers must not free or reuse it after calling
/// this function, regardless of success or failure.
///
/// On success returns `0` and writes an owned `ArrowDeviceArrayStream` to `out_stream`. The stream
/// owns the resulting scan iterator. The caller must release the stream through its embedded Arrow
/// `release` callback, and must release each produced `ArrowDeviceArray` through its embedded
/// `ArrowArray.release` callback.
///
/// On error returns `1` and, when `error_out` is non-null, writes a `vx_error` (free with
/// `vx_error_free`).
///
/// # Safety
///
/// `session` must be a valid borrowed handle created by `vortex-ffi`. `partition` must be an owned
/// partition handle created by `vortex-ffi`. `out_stream` must be a valid writable pointer. If
/// `error_out` is non-null, it must be valid for writing one error pointer.
#[unsafe(no_mangle)]
pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device_stream(
session: *const vx_session,
partition: *mut vx_partition,
out_stream: *mut ArrowDeviceArrayStream,
error_out: *mut *mut vx_error,
) -> c_int {
try_or(error_out, VX_CUDA_ERR, || {
vortex_ensure!(!partition.is_null(), "null vx_partition");

let array_stream = unsafe { vx_partition_into_array_stream(partition) }?;
Comment thread
0ax1 marked this conversation as resolved.
vortex_ensure!(!out_stream.is_null(), "null ArrowDeviceArrayStream output");

let session = session_with_cuda(unsafe { vx_session_ref(session) }?)?;
// Drive the stream on the same runtime the partition's scan spawned its work onto.
let device_stream = array_stream.export_device_array_stream(&session, ffi_runtime())?;

unsafe { ptr::write(out_stream, device_stream) };
Ok(VX_CUDA_OK)
})
}

#[cfg(test)]
mod tests {
use std::ptr;
Expand Down
Loading
Loading