Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions bench-orchestrator/bench_orchestrator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ def run_ref_auto_complete() -> list[str]:
return list(map(lambda x: x.run_id, ResultStore().list_runs(limit=None)))


def targets_from_axes(engine: str, format: str) -> tuple[list[BenchmarkTarget], list[str]]:
def targets_from_axes(
engine: str, format: str, benchmark: Benchmark | None = None
) -> tuple[list[BenchmarkTarget], list[str]]:
"""Resolve legacy engine/format axes into explicit benchmark targets."""
return resolve_axis_targets(parse_engines(engine), parse_formats(format))
return resolve_axis_targets(parse_engines(engine), parse_formats(format), benchmark)


def backends_for_engines(engines: list[Engine]) -> list[Engine]:
Expand Down Expand Up @@ -260,7 +262,7 @@ def run(
targets = parse_targets_json(targets_json)
warnings: list[str] = []
else:
targets, warnings = targets_from_axes(engine, format)
targets, warnings = targets_from_axes(engine, format, benchmark)
except ValueError as exc:
console.print(f"[red]{exc}[/red]")
raise typer.Exit(1) from exc
Expand Down
27 changes: 24 additions & 3 deletions bench-orchestrator/bench_orchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Benchmark(Enum):
POLARSIGNALS = "polarsignals"
PUBLIC_BI = "public-bi"
STATPOPGEN = "statpopgen"
SPATIALBENCH = "spatialbench"


# Engine to supported formats mapping.
Expand All @@ -72,6 +73,19 @@ class Benchmark(Enum):
Engine.LANCE: [Format.LANCE],
}

# Engines each benchmark can run on. Benchmarks default to *every* engine; list one here only to
# restrict it. SpatialBench's queries use DuckDB-specific `ST_*` spatial SQL that DataFusion has no
# functions for yet.
BENCHMARK_ENGINES: dict[Benchmark, frozenset[Engine]] = {
Benchmark.SPATIALBENCH: frozenset({Engine.DUCKDB}),
}


def engines_for_benchmark(benchmark: Benchmark) -> frozenset[Engine]:
"""Return the engines `benchmark` supports, defaulting to every engine when unrestricted."""
return BENCHMARK_ENGINES.get(benchmark, frozenset(Engine))


T = TypeVar("T")


Expand Down Expand Up @@ -175,13 +189,16 @@ def parse_formats_json(value: str) -> list[Format]:


def resolve_axis_targets(
engines: Iterable[Engine], formats: Iterable[Format]
engines: Iterable[Engine], formats: Iterable[Format], benchmark: Benchmark | None = None
) -> tuple[list[BenchmarkTarget], list[str]]:
"""Expand engine/format axes into supported explicit targets."""
warnings: list[str] = []
targets: list[BenchmarkTarget] = []

for engine in engines:
if benchmark is not None and engine not in engines_for_benchmark(benchmark):
warnings.append(f"Benchmark {benchmark.value} does not support engine {engine.value}")
continue
for fmt in formats:
target = BenchmarkTarget(engine=engine, format=fmt).normalized()
if not target.is_supported():
Expand All @@ -200,14 +217,18 @@ def group_targets_by_backend(targets: Iterable[BenchmarkTarget]) -> dict[Engine,
return groups


def validate_targets(targets: Iterable[BenchmarkTarget], options: dict[str, str]) -> list[str]:
def validate_targets(
targets: Iterable[BenchmarkTarget], options: dict[str, str], benchmark: Benchmark | None = None
) -> list[str]:
"""Validate explicit targets against benchmark runner constraints."""
errors: list[str] = []

normalized_targets = [target.normalized() for target in targets]
for target in normalized_targets:
if not target.is_supported():
errors.append(f"Format {target.format.value} is not supported by engine {target.engine.value}")
if benchmark is not None and target.engine not in engines_for_benchmark(benchmark):
errors.append(f"Benchmark {benchmark.value} does not support engine {target.engine.value}")

if options.get("remote-data-dir") and any(target.format == Format.LANCE for target in normalized_targets):
errors.append("Lance format is not supported for remote storage benchmarks.")
Expand Down Expand Up @@ -242,7 +263,7 @@ def backends(self) -> list[Engine]:

def validate(self) -> list[str]:
"""Validate the configuration and return any errors."""
return validate_targets(self.targets, self.options)
return validate_targets(self.targets, self.options, self.benchmark)


@dataclass
Expand Down
26 changes: 26 additions & 0 deletions bench-orchestrator/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright the Vortex contributors

from bench_orchestrator.config import (
Benchmark,
BenchmarkTarget,
Engine,
Format,
Expand Down Expand Up @@ -39,6 +40,31 @@ def test_resolve_axis_targets_filters_unsupported_combinations() -> None:
assert warnings == ["Format arrow is not supported by engine duckdb"]


def test_resolve_axis_targets_skips_engines_a_benchmark_cannot_run() -> None:
# SpatialBench is DuckDB-only (ST_* spatial SQL), so the DataFusion axis is dropped with a warning.
targets, warnings = resolve_axis_targets(
[Engine.DATAFUSION, Engine.DUCKDB],
[Format.PARQUET, Format.VORTEX],
Benchmark.SPATIALBENCH,
)

assert targets == [
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.PARQUET),
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX),
]
assert warnings == ["Benchmark spatialbench does not support engine datafusion"]


def test_validate_targets_rejects_engine_a_benchmark_cannot_run() -> None:
errors = validate_targets(
[BenchmarkTarget(engine=Engine.DATAFUSION, format=Format.PARQUET)],
{},
Benchmark.SPATIALBENCH,
)

assert errors == ["Benchmark spatialbench does not support engine datafusion"]


def test_validate_targets_rejects_remote_lance() -> None:
errors = validate_targets(
[BenchmarkTarget(engine=Engine.DATAFUSION, format=Format.LANCE)],
Expand Down
60 changes: 58 additions & 2 deletions vortex-bench/src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;

use futures::StreamExt;
use futures::TryStreamExt;
Expand All @@ -26,11 +27,19 @@ use vortex::array::arrow::FromArrowArray;
use vortex::array::builders::builder_with_capacity;
use vortex::array::stream::ArrayStreamAdapter;
use vortex::array::stream::ArrayStreamExt;
use vortex::compressor::BtrBlocksCompressorBuilder;
use vortex::dtype::DType;
use vortex::dtype::FieldPath;
use vortex::dtype::arrow::FromArrowType;
use vortex::error::VortexResult;
use vortex::error::vortex_err;
use vortex::file::VortexWriteOptions;
use vortex::file::WriteOptionsSessionExt;
use vortex::file::WriteStrategyBuilder;
use vortex::layout::LayoutStrategy;
use vortex::layout::layouts::chunked::writer::ChunkedLayoutStrategy;
use vortex::layout::layouts::compressed::CompressingStrategy;
use vortex::layout::layouts::flat::writer::FlatLayoutStrategy;
use vortex::session::VortexSession;

use crate::CompactionStrategy;
Expand Down Expand Up @@ -126,8 +135,7 @@ pub async fn convert_parquet_file_to_vortex(
.open(output_path)
.await?;

compaction
.apply_options(SESSION.write_options())
write_options_for(compaction, &dtype, is_spatialbench(parquet_path))
.write(
&mut output_file,
ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, stream)),
Expand All @@ -137,6 +145,54 @@ pub async fn convert_parquet_file_to_vortex(
Ok(())
}

/// Whether `path` points at SpatialBench data.
fn is_spatialbench(path: &Path) -> bool {
path.components()
.any(|component| component.as_os_str() == "spatialbench")
}

/// Vortex write options for converting `dtype`-shaped data.
///
/// For SpatialBench (`skip_binary_dict`), the geometry blobs are large and
/// unique, so the dictionary builder balloons memory (tens of GB) for zero gain.
fn write_options_for(
compaction: CompactionStrategy,
dtype: &DType,
skip_binary_dict: bool,
) -> VortexWriteOptions {
let binary_fields: Vec<_> = match dtype {
DType::Struct(fields, _) if skip_binary_dict => fields
.names()
.iter()
.zip(fields.fields())
.filter(|(_, field)| matches!(field, DType::Binary(_)))
.map(|(name, _)| name.clone())
.collect(),
_ => Vec::new(),
};
if binary_fields.is_empty() {
return compaction.apply_options(SESSION.write_options());
}

let mut builder = WriteStrategyBuilder::default();
if matches!(compaction, CompactionStrategy::Compact) {
builder =
builder.with_btrblocks_builder(BtrBlocksCompressorBuilder::default().with_compact());
}
for name in binary_fields {
builder = builder.with_field_writer(FieldPath::from_name(name), no_dict_layout());
}
SESSION.write_options().with_strategy(builder.build())
}

/// A chunked + compressed layout that skips dictionary encoding for opaque `Binary` blobs.
fn no_dict_layout() -> Arc<dyn LayoutStrategy> {
Arc::new(CompressingStrategy::new(
ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()),
BtrBlocksCompressorBuilder::default().build(),
))
}

/// Convert all Parquet files in a directory to Vortex format.
///
/// This function reads Parquet files from `{input_path}/parquet/` and writes Vortex files to
Expand Down
62 changes: 62 additions & 0 deletions vortex-bench/src/spatialbench/datagen/wkb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
//! Geometry is emitted as WKB, which DuckDB reads directly as `GEOMETRY` via `ST_GeomFromWKB`.

use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Context;
use anyhow::Result;
// spatialbench emits arrow-56 batches, so they must be written with its matching arrow-56
// parquet crate, not the workspace's arrow-58 one. The parquet file itself is version-neutral.
Expand All @@ -22,7 +24,9 @@ use spatialbench_parquet::arrow::AsyncArrowWriter;
use spatialbench_parquet::basic::Compression;
use spatialbench_parquet::file::properties::WriterProperties;
use tokio::fs::File as TokioFile;
use tokio::process::Command;
use tracing::info;
use tracing::warn;

use super::table::TABLES;
use super::table::Table;
Expand Down Expand Up @@ -103,5 +107,63 @@ pub async fn generate_tables(scale_factor: &str, output_dir: PathBuf) -> Result<
}
}

// `zone` isn't one of the in-process `TABLES`; it comes from the upstream CLI. Best-effort:
// a missing/failed CLI shouldn't block the zone-free queries, so warn and carry on.
if let Err(e) = generate_zone(scale_factor, &parquet_dir).await {
warn!(
error = %e,
"zone table not generated — SpatialBench queries Q2/Q4/Q6/Q10/Q11 need it. Install the \
upstream generator (`cargo install --path <sedona-spatialbench>/spatialbench-cli`) or \
set SPATIALBENCH_CLI to its binary, then re-run."
);
}

Ok(())
}

/// Generate the externally-sourced `zone` table by shelling out to the upstream `spatialbench-cli`.
async fn generate_zone(scale_factor: f64, parquet_dir: &Path) -> Result<()> {
if parquet_dir.join("zone_0.parquet").exists() {
return Ok(());
}
let cli = std::env::var("SPATIALBENCH_CLI").unwrap_or_else(|_| "spatialbench-cli".to_string());

// Generate into a scratch dir so the CLI's `zone.parquet` name can't collide with the base
// tables, then move the produced parts into place as `zone_{part}.parquet`.
// Start from an empty scratch dir (clear any leftover from an interrupted run).
let scratch = parquet_dir.join(".zone-scratch");
fs::remove_dir_all(&scratch).ok();
fs::create_dir_all(&scratch)?;

info!(
scale_factor,
cli, "Generating SpatialBench zone table via spatialbench-cli"
);
let status = Command::new(&cli)
.arg("-s")
.arg(scale_factor.to_string())
.args(["-T", "zone", "-f", "parquet", "-o"])
.arg(&scratch)
.status()
.await
.with_context(|| format!("failed to spawn `{cli}` (is it installed / on PATH?)"))?;
anyhow::ensure!(
status.success(),
"`{cli}` exited with {status} while generating zone"
);

// The CLI writes `zone.parquet` (single part) or `zone/zone.N.parquet`.
let mut produced: Vec<PathBuf> = glob::glob(&scratch.join("**/*.parquet").to_string_lossy())?
.collect::<std::result::Result<_, _>>()?;
produced.sort();
anyhow::ensure!(
!produced.is_empty(),
"`{cli}` produced no zone parquet under {}",
scratch.display()
);
for (part_idx, src) in produced.iter().enumerate() {
fs::rename(src, parquet_dir.join(format!("zone_{part_idx}.parquet")))?;
}
fs::remove_dir_all(&scratch).ok();
Ok(())
}
Loading