Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions vortex-duckdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,4 @@ loaded.
## Testing a custom DuckDB tag

Change `DUCKDB_VERSION` environment variable value to a preferred hash or commit
(local build), or change build.rs (for testing in CI). If you use a commit,
DuckDB needs to link httpfs statically so you also need to install CURL
development headers (e.g. `libcurl4-openssl-dev`).
(local build), or change build.rs (for testing in CI).
5 changes: 2 additions & 3 deletions vortex-duckdb/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,12 @@ fn build_duckdb(version: &DuckDBVersion, duckdb_repo_dir: &Path) {
("1", "0")
};

// If we're building from a commit we need to build httpfs and benchmark
// If we're building from a commit we need to build benchmark
// extensions statically, otherwise DuckDB tries to load them from an http
// endpoint with version 0.0.1 (all non-tagged builds) which doesn't exist.
// httpfs static build also requires CURL dev headers
let static_extensions = match version {
DuckDBVersion::Release(_) => "parquet;jemalloc",
DuckDBVersion::Commit(_) => "parquet;jemalloc;httpfs;tpch;tpcds",
DuckDBVersion::Commit(_) => "parquet;jemalloc;tpch;tpcds",
};

let envs = [
Expand Down
5 changes: 2 additions & 3 deletions vortex-duckdb/cpp/copy_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,12 @@ unique_ptr<FunctionData> copy_to_bind(ClientContext &,
}

unique_ptr<GlobalFunctionData>
copy_to_initialize_global(ClientContext &context, FunctionData &bind_data, const string &file_path) {
copy_to_initialize_global(ClientContext &, FunctionData &bind_data, const string &file_path) {
void *const ffi_bind = bind_data.Cast<CopyBindData>().ffi_data->DataPtr();
const auto ffi_ctx = reinterpret_cast<duckdb_client_context>(&context);

duckdb_vx_error error_out = nullptr;
const duckdb_vx_data ffi_global =
duckdb_copy_function_copy_to_initialize_global(ffi_ctx, ffi_bind, file_path.c_str(), &error_out);
duckdb_copy_function_copy_to_initialize_global(ffi_bind, file_path.c_str(), &error_out);
if (error_out) {
throw ExecutorException(IntoErrString(error_out));
}
Expand Down
6 changes: 2 additions & 4 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,14 @@ bool projection_expression_pushdown(ClientContext &, const TableFunctionProjecti
* and after a query another file is added matching the glob, for second query
* bind() will be called again.
*/
unique_ptr<FunctionData> duckdb_vx_table_function_bind(ClientContext &context,
unique_ptr<FunctionData> duckdb_vx_table_function_bind(ClientContext &,
TableFunctionBindInput &input,
vector<LogicalType> &return_types,
vector<string> &names) {
CTableBindResult result = {return_types, names};

duckdb_vx_error error_out = nullptr;
auto ctx = reinterpret_cast<duckdb_client_context>(&context);
auto ffi_bind_data = duckdb_table_function_bind(ctx,
reinterpret_cast<duckdb_vx_tfunc_bind_input>(&input),
auto ffi_bind_data = duckdb_table_function_bind(reinterpret_cast<duckdb_vx_tfunc_bind_input>(&input),
reinterpret_cast<duckdb_vx_tfunc_bind_result>(&result),
&error_out);
if (error_out) {
Expand Down
6 changes: 2 additions & 4 deletions vortex-duckdb/include/vortex.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ duckdb_vx_data duckdb_table_function_init_global(const duckdb_vx_tfunc_init_inpu
extern duckdb_vx_data duckdb_table_function_init_local(void *global_init_data);

extern
duckdb_vx_data duckdb_table_function_bind(duckdb_client_context ctx,
duckdb_vx_tfunc_bind_input bind_input,
duckdb_vx_data duckdb_table_function_bind(duckdb_vx_tfunc_bind_input bind_input,
duckdb_vx_tfunc_bind_result bind_result,
duckdb_vx_error *error_out);

Expand All @@ -74,8 +73,7 @@ duckdb_vx_data duckdb_copy_function_copy_to_bind(const char *const *column_names
duckdb_vx_error *error_out);

extern
duckdb_vx_data duckdb_copy_function_copy_to_initialize_global(duckdb_client_context client_context,
const void *bind_data,
duckdb_vx_data duckdb_copy_function_copy_to_initialize_global(const void *bind_data,
const char *file_path,
duckdb_vx_error *error_out);

Expand Down
22 changes: 10 additions & 12 deletions vortex-duckdb/src/copy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use async_fs::OpenOptions;
use futures::SinkExt;
use futures::TryStreamExt;
use futures::channel::mpsc;
Expand Down Expand Up @@ -28,9 +29,7 @@ use crate::RUNTIME;
use crate::SESSION;
use crate::convert::FromLogicalType;
use crate::convert::data_chunk_to_vortex;
use crate::duckdb::ClientContextRef;
use crate::duckdb::DataChunkRef;
use crate::duckdb::DuckDbFsWriter;
use crate::duckdb::LogicalTypeRef;

#[derive(Clone)]
Expand Down Expand Up @@ -111,7 +110,6 @@ pub fn copy_to_finalize(init_global: &mut CopyFunctionGlobal) -> VortexResult<()
}

pub fn copy_to_initialize_global(
client_context: &ClientContextRef,
bind_data: &CopyFunctionBind,
file_path: String,
) -> VortexResult<CopyFunctionGlobal> {
Expand All @@ -120,16 +118,16 @@ pub fn copy_to_initialize_global(
let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream());

let handle = SESSION.handle();
// SAFETY: The ClientContext is owned by the Connection and lives for the duration of
// query execution. DuckDB keeps the connection alive while this copy function runs.
let ctx = unsafe { client_context.erase_lifetime() };

// Use DuckDB FS exclusively to match the DuckDB client context configuration.
let writer = DuckDbFsWriter::new(ctx, &file_path)
.map_err(|e| vortex_err!("Failed to create DuckDB FS writer for {file_path}: {e}"))?;

let write_task =
handle.spawn(async move { SESSION.write_options().write(writer, array_stream).await });
let write_task = handle.spawn(async move {
let writer = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(file_path)
.await?;
SESSION.write_options().write(writer, array_stream).await
});

let worker_pool = RUNTIME.new_pool();
worker_pool.set_workers_to_available_parallelism();
Expand Down
198 changes: 0 additions & 198 deletions vortex-duckdb/src/duckdb/file_system.rs

This file was deleted.

2 changes: 0 additions & 2 deletions vortex-duckdb/src/duckdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ mod data_chunk;
mod database;
mod ddb_string;
mod expr;
mod file_system;
mod logical_type;
mod macro_;
mod query_result;
Expand All @@ -36,7 +35,6 @@ pub use data_chunk::*;
pub use database::*;
pub use ddb_string::*;
pub use expr::*;
pub use file_system::*;
pub use logical_type::*;
pub use query_result::*;
pub use reusable_dict::*;
Expand Down
Loading
Loading