From 7b630075102fe4236be286472063e10e346e5591 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 25 Jun 2026 16:48:39 +0100 Subject: [PATCH] Spawn arrow conversion in jni bindings Signed-off-by: Robert Kruszewski --- vortex-jni/src/scan.rs | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/vortex-jni/src/scan.rs b/vortex-jni/src/scan.rs index 606ec8cd040..7f923af1ac9 100644 --- a/vortex-jni/src/scan.rs +++ b/vortex-jni/src/scan.rs @@ -28,8 +28,6 @@ use jni::objects::JClass; use jni::objects::JLongArray; use jni::sys::jboolean; use jni::sys::jlong; -use vortex::array::ArrayRef; -use vortex::array::ExecutionCtx; use vortex::array::VortexSessionExecute; use vortex::array::arrow::ArrowSessionExt; use vortex::array::stream::SendableArrayStream; @@ -47,6 +45,7 @@ use vortex::scan::PartitionStream; use vortex::scan::ScanRequest; use vortex::scan::selection::Selection; +use crate::POOL; use crate::RUNTIME; use crate::data_source::NativeDataSource; use crate::dtype::strip_views; @@ -351,23 +350,32 @@ pub extern "system" fn Java_dev_vortex_jni_NativePartition_scanArrow( _ => unreachable!("Vortex DType always exports as a struct"), }; let schema = Arc::new(arrow_schema::Schema::new(fields)); - let target = Field::new_struct("", schema.fields().clone(), false); + let target = Arc::new(Field::new_struct("", schema.fields().clone(), false)); let session = unsafe { session_ref(session_ptr) }; let iter = RUNTIME - .block_on_stream_thread_safe(|_handle| array_stream) - .map( - move |chunk: VortexResult| -> VortexResult { - let chunk: ArrayRef = chunk?; - let mut ctx: ExecutionCtx = session.create_execution_ctx(); - let arrow = session - .arrow() - .execute_arrow(chunk, Some(&target), &mut ctx)?; - Ok(RecordBatch::from(arrow.as_struct().clone())) - }, - ) - .map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e)))); + .block_on_stream_thread_safe(|handle| { + array_stream + .map(move |chunk| { + let session = session.clone(); + let target = Arc::clone(&target); + handle.spawn(async move { + let chunk = chunk?; + let mut ctx = session.create_execution_ctx(); + let arrow = session.arrow().execute_arrow( + chunk, + Some(target.as_ref()), + &mut ctx, + )?; + Ok(RecordBatch::from(arrow.as_struct().clone())) + }) + }) + .buffered(POOL.worker_count().max(1)) + }) + .map(|result: VortexResult| { + result.map_err(|e| ArrowError::ExternalError(Box::new(e))) + }); let reader = RecordBatchIteratorAdapter::new(iter, schema); let arrow_stream = FFI_ArrowArrayStream::new(Box::new(reader));