diff --git a/Cargo.lock b/Cargo.lock index 66457601e06..b3030452e6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10148,6 +10148,7 @@ dependencies = [ "arrow-array", "arrow-schema", "async-fs", + "codspeed-divan-compat", "futures", "jni", "object_store", diff --git a/java/settings.gradle.kts b/java/settings.gradle.kts index a601cfa9488..a71a1b9fd77 100644 --- a/java/settings.gradle.kts +++ b/java/settings.gradle.kts @@ -17,8 +17,10 @@ toolchainManagement { rootProject.name = "vortex-root" -// API bindings +// API bindings (JMH benchmarks live in vortex-jni's `jmh` source set; see vortex-jni/BENCHMARKS.md) include("vortex-jni") + +// Spark integration include("vortex-spark_2.12") project(":vortex-spark_2.12").projectDir = file("vortex-spark") diff --git a/java/vortex-jni/build.gradle.kts b/java/vortex-jni/build.gradle.kts index bb652d0d86e..7ce45755fa1 100644 --- a/java/vortex-jni/build.gradle.kts +++ b/java/vortex-jni/build.gradle.kts @@ -2,12 +2,14 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar +import net.ltgt.gradle.errorprone.errorprone import org.gradle.kotlin.dsl.support.serviceOf plugins { `java-library` `jvm-test-suite` id("com.gradleup.shadow") version "9.4.2" + id("me.champeau.jmh") version "0.7.3" } dependencies { @@ -92,8 +94,9 @@ tasks.withType().all { ) } -// shade guava and arrow dependencies -tasks.withType { +// shade guava and arrow dependencies in the published jar only. The JMH benchmark links the real +// (unrelocated) Arrow classes, so its jar must not be relocated — scope this to the `shadowJar` task. +tasks.named("shadowJar") { relocate("com.google.common", "dev.vortex.relocated.com.google.common") relocate("org.apache.arrow", "dev.vortex.relocated.org.apache.arrow") { // exclude C Data Interface since JNI cannot be relocated @@ -214,4 +217,140 @@ tasks.register("generateJniHeaders") { dependsOn("compileJava") } +// --------------------------------------------------------------------------- +// JMH benchmarks (src/jmh). See BENCHMARKS.md. +// +// The read-boundary benchmark is meaningless against a debug native lib, so the `jmh` task builds +// and stages the release_debug cdylib itself (buildJmhNativeLib) rather than reusing the dev +// `makeTestFiles` debug build. The benchmark links the real Arrow classes off the runtime classpath +// (it is not run from the relocated shadowJar), so no relocation applies to it. +// --------------------------------------------------------------------------- +// Shared canonical benchmark file, generated by the Rust side and read by BOTH the JMH benchmark and +// the Rust `read_boundary` Divan bench so the two measure reads of the exact same bytes. +val workspaceRoot = rootProject.projectDir.absoluteFile.parentFile +val benchFile = workspaceRoot.resolve("target/vortex-jni-bench/data.vortex") + +jmh { + jmhVersion.set("1.37") + // These reach the forked benchmark JVM. The Arrow C Data Interface needs the --add-opens; the + // system property points the benchmark at the shared canonical file. + jvmArgsAppend.addAll( + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "-Dvortex.jni.bench.file=${benchFile.absolutePath}", + ) +} + +// Generate the shared canonical .vortex file via the Rust generator example. Idempotent: skipped +// while the file exists (delete it to regenerate). Both `jmh` and the Rust bench read this file. +val generateBenchFile = + tasks.register("generateBenchFile") { + description = "Generate the shared canonical .vortex file read by the JMH and Rust read benchmarks" + group = "verification" + + outputs.file(benchFile) + + doLast { + benchFile.parentFile.mkdirs() + serviceOf().exec { + workingDir = workspaceRoot + executable = "cargo" + args( + "run", + "--profile", + "release_debug", + "--quiet", + "--package", + "vortex-jni", + "--example", + "gen_bench_data", + "--", + benchFile.absolutePath, + ) + } + } + } + +// JMH benchmark classes/methods must be public and non-final, which the nopen checker forbids, and +// the generated JMH glue trips error-prone under -Werror. Relax both for the jmh source set only; +// main and test keep full strictness. +tasks.withType().configureEach { + if (name.lowercase().contains("jmh")) { + options.errorprone.enabled.set(false) + options.compilerArgs.remove("-Werror") + } +} + +// Skip the redundant debug `makeTestFiles` build when this invocation runs the benchmark; the +// benchmark consumes the release_debug lib staged by buildJmhNativeLib instead. +val benchmarkRequested = objects.property(Boolean::class.java).convention(false) +gradle.taskGraph.whenReady { + benchmarkRequested.set(allTasks.any { it.project == project && (it.name == "jmh" || it.name == "jmhJar") }) +} +tasks.named("makeTestFiles").configure { + onlyIf { !benchmarkRequested.get() } +} + +val buildJmhNativeLib = + tasks.register("buildJmhNativeLib") { + description = "Build the release_debug vortex-jni cdylib and stage it for the JMH benchmark" + group = "verification" + + // Stage on top of the processed resources so the benchmark loads it from the runtime classpath. + dependsOn("processResources") + + doLast { + val workspaceRoot = rootProject.projectDir.absoluteFile.parentFile + + serviceOf().exec { + workingDir = workspaceRoot + executable = "cargo" + args("build", "--profile", "release_debug", "--package", "vortex-jni") + } + + val osName = System.getProperty("os.name").lowercase() + val osArch = System.getProperty("os.arch").lowercase() + val osShortName = + when { + osName.contains("mac") -> "darwin" + osName.contains("nix") || osName.contains("nux") -> "linux" + osName.contains("win") -> "win" + else -> throw GradleException("Unsupported OS for buildJmhNativeLib: $osName") + } + val libExt = + when (osShortName) { + "darwin" -> ".dylib" + "linux" -> ".so" + "win" -> ".dll" + else -> throw GradleException("Unsupported OS short name: $osShortName") + } + + copy { + from("$workspaceRoot/target/release_debug/libvortex_jni$libExt") + into(layout.buildDirectory.dir("resources/main/native/$osShortName-$osArch")) + } + } + } + +tasks.named("jmh").configure { + dependsOn(buildJmhNativeLib) + dependsOn(generateBenchFile) +} +tasks.named("jmhJar").configure { dependsOn(buildJmhNativeLib) } + +// Standalone read-batch-granularity diagnostic (VortexJniBatchDiagnostic, not a JMH benchmark). Run +// it off the jmh runtime classpath, which carries the real Arrow classes and the staged +// release_debug lib (me.champeau.jmh's fat `jmhJar` does not bundle deps under com.gradleup.shadow). +tasks.register("batchDiagnostic") { + description = "Run the standalone read-batch-granularity diagnostic (VortexJniBatchDiagnostic)" + group = "verification" + dependsOn("buildJmhNativeLib") + classpath = sourceSets["jmh"].runtimeClasspath + mainClass.set("dev.vortex.bench.VortexJniBatchDiagnostic") + jvmArgs( + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + ) +} + description = "JNI bindings for the Vortex format" diff --git a/java/vortex-jni/src/jmh/java/dev/vortex/bench/BenchData.java b/java/vortex-jni/src/jmh/java/dev/vortex/bench/BenchData.java new file mode 100644 index 00000000000..0b0fec24e82 --- /dev/null +++ b/java/vortex-jni/src/jmh/java/dev/vortex/bench/BenchData.java @@ -0,0 +1,99 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.bench; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import dev.vortex.api.Session; +import dev.vortex.api.VortexWriter; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ViewVarCharVector; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * Shared synthetic table used by both {@link VortexJniReadBenchmark} and {@link VortexJniBatchDiagnostic} so they + * measure and inspect the exact same data shape: six columns (2× int64, 2× float64, 2× Utf8View) over {@link #ROWS} + * rows, with a deterministic fixed seed. + * + *

{@code id} is sequential, {@code cat} is a periodic low-cardinality column kept non-null so a {@code cat='alpha'} + * filter has selectivity exactly {@code 1/|CATS|}, and {@code tag} is high-cardinality with a 10% null rate to exercise + * a validity buffer. + */ +final class BenchData { + + static final int ROWS = 2_000_000; + static final String[] CATS = { + "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", + "india", "juliet", "kilo", "lima", "mike", "november", "oscar", "papa" + }; + + private BenchData() {} + + static Schema schema() { + return new Schema(List.of( + Field.notNullable("id", new ArrowType.Int(64, true)), + Field.notNullable("x", new ArrowType.Int(64, true)), + Field.notNullable("y", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + Field.notNullable("z", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + Field.nullable("cat", ArrowType.Utf8View.INSTANCE), + Field.nullable("tag", ArrowType.Utf8View.INSTANCE))); + } + + static void writeTable(Session session, BufferAllocator allocator, String uri, int chunk) throws Exception { + Schema schema = schema(); + Random rnd = new Random(42); + try (VortexWriter writer = VortexWriter.create(session, uri, schema, new HashMap<>(), allocator); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + BigIntVector id = (BigIntVector) root.getVector("id"); + BigIntVector x = (BigIntVector) root.getVector("x"); + Float8Vector y = (Float8Vector) root.getVector("y"); + Float8Vector z = (Float8Vector) root.getVector("z"); + ViewVarCharVector cat = (ViewVarCharVector) root.getVector("cat"); + ViewVarCharVector tag = (ViewVarCharVector) root.getVector("tag"); + + long written = 0; + while (written < ROWS) { + int batch = (int) Math.min(chunk, ROWS - written); + for (FieldVector v : root.getFieldVectors()) { + v.reset(); + } + for (int i = 0; i < batch; i++) { + long r = written + i; + id.setSafe(i, r); + x.setSafe(i, rnd.nextInt(1_000_000)); + y.setSafe(i, rnd.nextDouble()); + z.setSafe(i, rnd.nextDouble()); + // cat stays non-null and deterministic so filter selectivity is exactly 1/|CATS|. + cat.setSafe(i, CATS[(int) (r % CATS.length)].getBytes(UTF_8)); + // tag carries nulls (every 10th row) and high-cardinality values to exercise a validity buffer. + if (r % 10 == 0) { + tag.setNull(i); + } else { + tag.setSafe(i, Long.toString(r).getBytes(UTF_8)); + } + } + root.setRowCount(batch); + try (ArrowArray arr = ArrowArray.allocateNew(allocator); + ArrowSchema sch = ArrowSchema.allocateNew(allocator)) { + Data.exportVectorSchemaRoot(allocator, root, null, arr, sch); + writer.writeBatch(arr.memoryAddress(), sch.memoryAddress()); + } + written += batch; + } + } + } +} diff --git a/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniBatchDiagnostic.java b/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniBatchDiagnostic.java new file mode 100644 index 00000000000..1b1658de678 --- /dev/null +++ b/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniBatchDiagnostic.java @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.bench; + +import dev.vortex.api.DataSource; +import dev.vortex.api.Partition; +import dev.vortex.api.Scan; +import dev.vortex.api.ScanOptions; +import dev.vortex.api.Session; +import dev.vortex.jni.NativeLoader; +import java.nio.file.Files; +import java.nio.file.Path; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Standalone diagnostic (not a JMH benchmark): writes the shared {@link BenchData} table at several writer chunk sizes + * and prints the resulting read-batch row-count distribution, showing that Vortex coalesces to a stable read-batch + * granularity (~64K rows) independent of how the file was written. + * + *

Run it with {@code ./gradlew :vortex-jni:batchDiagnostic}. + */ +public final class VortexJniBatchDiagnostic { + + private VortexJniBatchDiagnostic() {} + + public static void main(String[] args) throws Exception { + NativeLoader.loadJni(); + for (int chunk : new int[] {8192, 65536, 131072}) { + BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + Session sess = Session.create(); + Path f = Files.createTempFile("vortex-jni-diag-" + chunk + "-", ".vortex"); + Files.deleteIfExists(f); + String uri = f.toAbsolutePath().toUri().toString(); + BenchData.writeTable(sess, alloc, uri, chunk); + DataSource ds = DataSource.open(sess, uri); + long batches = 0; + long rowsSeen = 0; + long minRows = Long.MAX_VALUE; + long maxRows = 0; + Scan scan = ds.scan(ScanOptions.of()); + while (scan.hasNext()) { + Partition partition = scan.next(); + try (ArrowReader reader = partition.scanArrow(alloc)) { + while (reader.loadNextBatch()) { + int rows = reader.getVectorSchemaRoot().getRowCount(); + batches++; + rowsSeen += rows; + minRows = Math.min(minRows, rows); + maxRows = Math.max(maxRows, rows); + } + } + } + System.out.printf( + "writeChunkRows=%d -> %d read batches over %d rows (min=%d, max=%d, avg=%d)%n", + chunk, batches, rowsSeen, minRows, maxRows, batches == 0 ? 0 : rowsSeen / batches); + Files.deleteIfExists(f); + } + } +} diff --git a/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java b/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java new file mode 100644 index 00000000000..577cc257104 --- /dev/null +++ b/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java @@ -0,0 +1,203 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.bench; + +import dev.vortex.api.DataSource; +import dev.vortex.api.Expression; +import dev.vortex.api.Scan; +import dev.vortex.api.ScanOptions; +import dev.vortex.api.Session; +import dev.vortex.jni.NativeLoader; +import dev.vortex.jni.NativeRuntime; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Field; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Measures read throughput through the vortex-jni boundary (JNI + the Arrow C Data Interface). + * + *

Every invocation scans the full {@link BenchData#ROWS}-row table, so {@code @OperationsPerInvocation(ROWS)} makes + * JMH report input rows scanned per second directly. Each lane materializes the result batches and sums their + * {@code getRowCount()} — no per-value work — so the numbers reflect scan + boundary cost rather than JVM-side + * arithmetic. Vortex coalesces to ~64K-row read batches regardless of the writer's chunk size (see + * {@link VortexJniBatchDiagnostic}), so boundary cost is amortized over large batches by construction. + * + *

    + *
  • {@code fullScan} — read all six columns. + *
  • {@code projection} — native projection of {@code id, y} (two of six columns). + *
  • {@code selectiveFilter} — native filter {@code cat = 'alpha'} (~1/16 selectivity). + *
+ * + *

This reads a shared canonical file generated by the Rust side (see {@code BENCHMARKS.md} and the + * {@code :vortex-jni:generateBenchFile} Gradle task), so these JMH {@code ops/s} are directly comparable to the Rust + * {@code read_boundary} Divan bench reading the same bytes — the difference is the JNI + Arrow C Data boundary cost. + * The Gradle {@code jmh} task passes the file path via the {@code -Dvortex.jni.bench.file} system property; running the + * benchmark by hand requires setting it (or {@code VORTEX_JNI_BENCH_FILE}). + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@OperationsPerInvocation(BenchData.ROWS) +@Warmup(iterations = 3, time = 2) +@Measurement(iterations = 5, time = 2) +@Fork(1) +@Threads(1) +@State(Scope.Benchmark) +public class VortexJniReadBenchmark { + + /** System property (set by the Gradle {@code jmh} task) pointing at the shared canonical {@code .vortex} file. */ + static final String BENCH_FILE_PROPERTY = "vortex.jni.bench.file"; + + static final long EXPECTED_ALPHA_MATCHES = BenchData.ROWS / BenchData.CATS.length; + + /** + * Background worker threads driving the JVM-wide pool: {@code 0} keeps the reading thread the only driver + * (single-threaded), {@code -1} sizes the pool to the available parallelism so the scan's split tasks decode in + * parallel in the background. + */ + @Param({"0", "-1"}) + public int workerThreads; + + BufferAllocator allocator; + Session session; + DataSource dataSource; + + @Setup(Level.Trial) + public void setup() throws Exception { + NativeLoader.loadJni(); + if (workerThreads < 0) { + NativeRuntime.setWorkerThreadsToAvailableParallelism(); + } else { + NativeRuntime.setWorkerThreads(workerThreads); + } + allocator = new RootAllocator(Long.MAX_VALUE); + session = Session.create(); + String uri = benchFile().toUri().toString(); + dataSource = DataSource.open(session, uri); + validate(); + } + + /** Locate the shared canonical file (generated by {@code :vortex-jni:generateBenchFile}); fail loudly if absent. */ + private static Path benchFile() { + String configured = System.getProperty(BENCH_FILE_PROPERTY); + if (configured == null || configured.isBlank()) { + configured = System.getenv("VORTEX_JNI_BENCH_FILE"); + } + if (configured == null || configured.isBlank()) { + throw new IllegalStateException("canonical bench file not configured: set -D" + BENCH_FILE_PROPERTY + + " (the Gradle `jmh` task does this). Run `./gradlew :vortex-jni:jmh`."); + } + Path path = Path.of(configured).toAbsolutePath(); + if (!Files.isRegularFile(path)) { + throw new IllegalStateException("canonical bench file does not exist: " + path + + " — run `./gradlew :vortex-jni:generateBenchFile`."); + } + return path; + } + + @TearDown(Level.Trial) + public void teardown() { + // The canonical file is shared with the Rust bench and managed by Gradle, so it is not deleted here. + // Intentionally does not close the allocator either: DataSource/Scan native resources are released by + // VortexCleaner at GC time, which races an explicit allocator.close() and trips leak detection. The JMH fork + // exits after the trial and reclaims everything. + dataSource = null; + } + + /** Fail the trial loudly if the file or pushdown semantics are wrong — fast garbage must not be cited. */ + private void validate() throws Exception { + if (!(dataSource.rowCount() instanceof DataSource.RowCount.Exact exact) || exact.value() != BenchData.ROWS) { + throw new IllegalStateException( + "expected exactly " + BenchData.ROWS + " rows, got " + dataSource.rowCount()); + } + // Native filter must return exactly ROWS/|CATS| rows for cat='alpha'. + Expression filter = Expression.binary( + Expression.BinaryOp.EQ, Expression.column("cat"), Expression.literal(BenchData.CATS[0])); + long matched = 0; + Scan scan = dataSource.scan(ScanOptions.builder().filter(filter).build()); + while (scan.hasNext()) { + try (ArrowReader reader = scan.next().scanArrow(allocator)) { + while (reader.loadNextBatch()) { + matched += reader.getVectorSchemaRoot().getRowCount(); + } + } + } + if (matched != EXPECTED_ALPHA_MATCHES) { + throw new IllegalStateException( + "filter cat='alpha' returned " + matched + ", expected " + EXPECTED_ALPHA_MATCHES); + } + // Native projection must yield exactly [id, y]. + Expression projection = Expression.select(new String[] {"id", "y"}, Expression.root()); + Scan pscan = + dataSource.scan(ScanOptions.builder().projection(projection).build()); + if (pscan.hasNext()) { + try (ArrowReader reader = pscan.next().scanArrow(allocator)) { + if (reader.loadNextBatch()) { + List names = reader.getVectorSchemaRoot().getSchema().getFields().stream() + .map(Field::getName) + .toList(); + if (!names.equals(List.of("id", "y"))) { + throw new IllegalStateException("projection schema expected [id, y], got " + names); + } + } + } + } + } + + /** Full scan of all six columns. */ + @Benchmark + public void fullScan(Blackhole bh) throws Exception { + bh.consume(countRows(dataSource.scan(ScanOptions.of()))); + } + + /** Native projection pushdown: only id,y cross the boundary. */ + @Benchmark + public void projection(Blackhole bh) throws Exception { + Expression projection = Expression.select(new String[] {"id", "y"}, Expression.root()); + bh.consume(countRows( + dataSource.scan(ScanOptions.builder().projection(projection).build()))); + } + + /** Native filter pushdown: only matching rows cross the boundary. */ + @Benchmark + public void selectiveFilter(Blackhole bh) throws Exception { + Expression filter = Expression.binary( + Expression.BinaryOp.EQ, Expression.column("cat"), Expression.literal(BenchData.CATS[0])); + bh.consume( + countRows(dataSource.scan(ScanOptions.builder().filter(filter).build()))); + } + + /** Materialize every result batch and return the total row count — no per-value consumption. */ + private long countRows(Scan scan) throws Exception { + long rows = 0; + while (scan.hasNext()) { + try (ArrowReader reader = scan.next().scanArrow(allocator)) { + while (reader.loadNextBatch()) { + rows += reader.getVectorSchemaRoot().getRowCount(); + } + } + } + return rows; + } +} diff --git a/vortex-jni/Cargo.toml b/vortex-jni/Cargo.toml index d5f5cea61aa..046d5c017fa 100644 --- a/vortex-jni/Cargo.toml +++ b/vortex-jni/Cargo.toml @@ -33,10 +33,17 @@ vortex = { workspace = true, features = ["object_store", "files"] } vortex-parquet-variant = { workspace = true } [dev-dependencies] +divan = { workspace = true } jni = { workspace = true, features = ["invocation"] } [lib] -crate-type = ["staticlib", "cdylib"] +# rlib is added so the read-boundary bench and the canonical-file generator example can link the +# crate; staticlib/cdylib remain for the JNI native library. +crate-type = ["staticlib", "cdylib", "rlib"] + +[[bench]] +name = "read_boundary" +harness = false [lints] workspace = true diff --git a/vortex-jni/benches/canonical/mod.rs b/vortex-jni/benches/canonical/mod.rs new file mode 100644 index 00000000000..9e48cb58ee8 --- /dev/null +++ b/vortex-jni/benches/canonical/mod.rs @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Deterministic generator for the canonical read-boundary benchmark file. +//! +//! Both the `read_boundary` Divan bench and the `gen_bench_data` example write/read the SAME file +//! so the Rust "native floor" and the Java JMH benchmark (`VortexJniReadBenchmark`) measure reads of +//! the exact same bytes. The Gradle `generateBenchFile` task runs the example to produce it, then +//! points the JMH fork at it via `-Dvortex.jni.bench.file`. +//! +//! The table is six columns over [`ROWS`] rows — `id`, `x` (int64), `y`, `z` (float64), and `cat`, +//! `tag` (Utf8View) — generated by a fixed formula (no RNG) so the file is reproducible: `id` is +//! sequential, `cat` is a periodic 16-value low-cardinality column kept non-null so `cat = 'alpha'` +//! has selectivity exactly `1/|CATS|`, and `tag` is high-cardinality with a 10% null rate. + +use std::path::Path; +use std::path::PathBuf; + +use vortex::VortexSessionDefault; +use vortex::array::ArrayRef; +use vortex::array::IntoArray; +use vortex::array::arrays::ChunkedArray; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::StructArray; +use vortex::array::arrays::VarBinViewArray; +use vortex::array::validity::Validity; +use vortex::dtype::FieldNames; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::file::WriteOptionsSessionExt; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::current::CurrentThreadRuntime; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; + +/// Rows in the canonical table. Must match the JMH side's `BenchData.ROWS`. +pub const ROWS: usize = 2_000_000; +/// Rows per Arrow batch handed to the writer. +pub const WRITE_CHUNK: usize = 65_536; +/// 16 low-cardinality category values; `cat = 'alpha'` matches exactly `ROWS / 16` rows. +pub const CATS: [&str; 16] = [ + "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", "india", "juliet", + "kilo", "lima", "mike", "november", "oscar", "papa", +]; + +/// Absolute path of the canonical file, shared with the Gradle/JMH side. +/// +/// Honors `VORTEX_JNI_BENCH_FILE`; otherwise defaults to `/target/vortex-jni-bench/ +/// data.vortex` (the same location the Gradle task passes), so a standalone `cargo bench` and a +/// `./gradlew :vortex-jni:jmh` run resolve to the same file. +pub fn default_path() -> PathBuf { + if let Ok(path) = std::env::var("VORTEX_JNI_BENCH_FILE") { + return PathBuf::from(path); + } + Path::new(env!("CARGO_MANIFEST_DIR")).join("../target/vortex-jni-bench/data.vortex") +} + +/// Build one `[start, end)` slice of the canonical table as a Vortex struct array. +fn build_chunk(start: usize, end: usize) -> ArrayRef { + let id = PrimitiveArray::from_iter((start..end).map(|r| r as i64)).into_array(); + // Deterministic spread across [0, 1_000_000); a fixed multiplicative hash, no RNG. + let xs = PrimitiveArray::from_iter( + (start..end).map(|r| (r as i64).wrapping_mul(2_654_435_761).rem_euclid(1_000_000)), + ) + .into_array(); + let ys = + PrimitiveArray::from_iter((start..end).map(|r| (r % 1000) as f64 / 1000.0)).into_array(); + let zs = PrimitiveArray::from_iter((start..end).map(|r| ((r / 1000) % 1000) as f64 / 1000.0)) + .into_array(); + // cat stays non-null and periodic so filter selectivity is exactly 1/|CATS|. + let cat = + VarBinViewArray::from_iter_str((start..end).map(|r| CATS[r % CATS.len()])).into_array(); + // tag carries nulls (every 10th row) and high-cardinality values to exercise a validity buffer. + let tag = VarBinViewArray::from_iter_nullable_str((start..end).map(|r| { + if r % 10 == 0 { + None + } else { + Some(r.to_string()) + } + })) + .into_array(); + + StructArray::new( + FieldNames::from(["id", "x", "y", "z", "cat", "tag"]), + vec![id, xs, ys, zs, cat, tag], + end - start, + Validity::NonNullable, + ) + .into_array() +} + +/// Build the canonical six-column table as a chunked array of [`WRITE_CHUNK`]-row chunks. +fn build_table() -> VortexResult { + let mut chunks = Vec::new(); + let mut start = 0; + while start < ROWS { + let end = (start + WRITE_CHUNK).min(ROWS); + chunks.push(build_chunk(start, end)); + start = end; + } + let dtype = chunks[0].dtype().clone(); + Ok(ChunkedArray::try_new(chunks, dtype)?.into_array()) +} + +/// Write the canonical file to `path`, creating parent directories as needed. +pub fn write_canonical(path: &Path) -> VortexResult<()> { + let runtime = CurrentThreadRuntime::new(); + let session = VortexSession::default().with_handle(runtime.handle()); + let table = build_table()?; + + let mut bytes: Vec = Vec::new(); + runtime.block_on(async { + session + .write_options() + .write(&mut bytes, table.to_array_stream()) + .await + })?; + + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| vortex_err!("failed to create {}: {e}", parent.display()))?; + } + std::fs::write(path, &bytes) + .map_err(|e| vortex_err!("failed to write {}: {e}", path.display()))?; + Ok(()) +} + +/// Write the canonical file only if it does not already exist (idempotent), so the bench and the +/// Gradle generator share one file rather than racing to overwrite it. +pub fn ensure_canonical(path: &Path) -> VortexResult<()> { + let present = path.metadata().map(|m| m.len() > 0).unwrap_or(false); + if present { + return Ok(()); + } + write_canonical(path) +} diff --git a/vortex-jni/benches/read_boundary.rs b/vortex-jni/benches/read_boundary.rs new file mode 100644 index 00000000000..ea18a63b689 --- /dev/null +++ b/vortex-jni/benches/read_boundary.rs @@ -0,0 +1,227 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Native "floor" for the `VortexJniReadBenchmark` JMH lanes. +//! +//! This reads the SAME canonical `.vortex` file the JMH benchmark reads (see [`canonical`]) and runs +//! the same lanes — full scan, native projection, and native filter — but entirely in Rust: +//! `scan -> Arrow RecordBatch`, with no JNI crossing and no Arrow C Data export. Comparing these +//! numbers against the JMH `ops/s` isolates the cost of the JNI + Arrow C Data boundary from the +//! underlying format read. +//! +//! Like the JMH side, every lane scans the full table and reports the produced row count, so +//! `ItemsCount::new(ROWS)` makes Divan print **input rows scanned per second** — directly comparable +//! to JMH's `@OperationsPerInvocation(ROWS)` `ops/s`. Each chunk is materialized to Arrow (forcing +//! the decode) and the batch row count is taken; no per-value work is done, so the numbers reflect +//! scan + materialization, not consume-side arithmetic. +//! +//! Each lane runs in two modes: single-threaded (no pool workers — the consuming thread drives the +//! scan, mirroring the JNI default) and `*_pooled` (a background `CurrentThreadWorkerPool` sized to +//! `available_parallelism() - 1`). The Vortex -> Arrow conversion runs inside the scan's `map`, which +//! executes on the handle-spawned split tasks, so the pool parallelizes both the decode and the Arrow +//! conversion. Utf8View columns are downgraded to flat Arrow `Utf8` via a stripped target field, +//! exactly as the JNI path does, so both benches materialize the same types. + +#![expect(clippy::unwrap_used)] + +mod canonical; + +use std::sync::Arc; +use std::sync::LazyLock; + +use arrow_array::Array; +use arrow_schema::DataType; +use arrow_schema::Field; +use divan::Bencher; +use divan::counter::ItemsCount; +use vortex::VortexSessionDefault; +use vortex::array::VortexSessionExecute; +use vortex::array::arrow::ArrowSessionExt; +use vortex::dtype::DType; +use vortex::dtype::FieldName; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::expr::Expression; +use vortex::expr::get_item; +use vortex::expr::lit; +use vortex::expr::root; +use vortex::expr::select; +use vortex::file::OpenOptionsSessionExt; +use vortex::file::VortexFile; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::current::CurrentThreadRuntime; +use vortex::io::runtime::current::CurrentThreadWorkerPool; +use vortex::io::session::RuntimeSessionExt; +use vortex::scalar_fn::ScalarFnVTableExt; +use vortex::scalar_fn::fns::binary::Binary; +use vortex::scalar_fn::fns::operators::Operator; +use vortex::session::VortexSession; +use vortex::utils::parallelism::get_available_parallelism; + +use crate::canonical::ROWS; + +/// Shared current-thread runtime and its background worker pool, mirroring the JNI's static +/// `RUNTIME`/`POOL`. One long-lived pool is used (it has no `Drop`, so a per-sample pool would leak +/// worker threads); `set_workers` adjusts the count per lane — `0` for the single-threaded lanes, +/// `available_parallelism() - 1` for the pooled ones. +static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); +static POOL: LazyLock = LazyLock::new(|| RUNTIME.new_pool()); + +fn main() { + divan::main(); +} + +/// The lanes, mirroring `VortexJniReadBenchmark`. +#[derive(Clone, Copy)] +enum Lane { + /// Read all six columns. + FullScan, + /// Native projection of `id, y`. + Projection, + /// Native filter `cat = 'alpha'` (~1/16 selectivity). + SelectiveFilter, +} + +/// Read state opened once per Divan sample (outside the timed region) and reused across iterations. +struct Env { + session: VortexSession, + file: VortexFile, +} + +impl Env { + fn open() -> VortexResult { + let path = canonical::default_path(); + canonical::ensure_canonical(&path)?; + + let session = VortexSession::default().with_handle(RUNTIME.handle()); + let file = RUNTIME.block_on( + session + .open_options() + .with_layout_reader_cache() + .open_path(&path), + )?; + Ok(Self { session, file }) + } + + /// Scan the table for `lane`, materialize each chunk to Arrow, and return the total row count so + /// the read is observable without per-value work. + /// + /// The Vortex -> Arrow conversion runs inside the scan's `map`, which executes within each split + /// task spawned on the session's runtime handle — so with pool workers configured the decode AND + /// the Arrow conversion run in parallel in the background, rather than inline on this thread. + fn run(&self, lane: Lane) -> VortexResult { + let mut builder = self.file.scan()?; + match lane { + Lane::Projection => builder = builder.with_projection(project_id_y()), + Lane::SelectiveFilter => builder = builder.with_filter(filter_cat_alpha()), + Lane::FullScan => {} + } + + // Downgrade Utf8View -> Utf8 (and BinaryView -> Binary), matching the JNI Arrow boundary. + let target = stripped_target(&builder.dtype()?)?; + let session = self.session.clone(); + + let mut rows = 0u64; + for batch in builder + .map(move |array| { + let mut ctx = session.create_execution_ctx(); + let arrow = session + .arrow() + .execute_arrow(array, Some(&target), &mut ctx)?; + Ok(arrow.len() as u64) + }) + .into_iter(&*RUNTIME)? + { + rows += batch?; + } + Ok(rows) + } +} + +/// Build the Arrow target field for `execute_arrow`, replacing view string/binary types with their +/// flat equivalents so the materialized types match what the JNI path hands to Java. +fn stripped_target(dtype: &DType) -> VortexResult { + let schema = dtype.to_arrow_schema()?; + let stripped = strip_views(DataType::Struct(schema.fields().clone())); + let DataType::Struct(fields) = stripped else { + return Err(vortex_err!("scan dtype did not export as an Arrow struct")); + }; + Ok(Field::new_struct("", fields, false)) +} + +fn strip_views(data_type: DataType) -> DataType { + match data_type { + DataType::Utf8View => DataType::Utf8, + DataType::BinaryView => DataType::Binary, + DataType::Struct(fields) => DataType::Struct( + fields + .iter() + .map(|f| { + Arc::new(Field::new( + f.name(), + strip_views(f.data_type().clone()), + f.is_nullable(), + )) + }) + .collect(), + ), + other => other, + } +} + +fn project_id_y() -> Expression { + select(vec![FieldName::from("id"), FieldName::from("y")], root()) +} + +fn filter_cat_alpha() -> Expression { + Binary.new_expr( + Operator::Eq, + [get_item(FieldName::from("cat"), root()), lit("alpha")], + ) +} + +/// Background worker count for the pooled lanes: one fewer than the available parallelism (the +/// consuming thread also drives the executor), at least one. +fn worker_threads() -> usize { + get_available_parallelism() + .map(|n| n.saturating_sub(1).max(1)) + .unwrap_or(1) +} + +fn run_lane(bencher: Bencher<'_, '_>, lane: Lane, workers: usize) { + POOL.set_workers(workers); + bencher + .with_inputs(|| Env::open().unwrap()) + .input_counter(|_| ItemsCount::new(ROWS)) + .bench_refs(move |env| env.run(lane).unwrap()); +} + +#[divan::bench] +fn full_scan(bencher: Bencher) { + run_lane(bencher, Lane::FullScan, 0); +} + +#[divan::bench] +fn projection(bencher: Bencher) { + run_lane(bencher, Lane::Projection, 0); +} + +#[divan::bench] +fn selective_filter(bencher: Bencher) { + run_lane(bencher, Lane::SelectiveFilter, 0); +} + +#[divan::bench] +fn full_scan_pooled(bencher: Bencher) { + run_lane(bencher, Lane::FullScan, worker_threads()); +} + +#[divan::bench] +fn projection_pooled(bencher: Bencher) { + run_lane(bencher, Lane::Projection, worker_threads()); +} + +#[divan::bench] +fn selective_filter_pooled(bencher: Bencher) { + run_lane(bencher, Lane::SelectiveFilter, worker_threads()); +} diff --git a/vortex-jni/examples/gen_bench_data.rs b/vortex-jni/examples/gen_bench_data.rs new file mode 100644 index 00000000000..615aaf11e2f --- /dev/null +++ b/vortex-jni/examples/gen_bench_data.rs @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Generates the canonical read-boundary benchmark file shared by the Rust `read_boundary` bench +//! and the Java `VortexJniReadBenchmark` JMH benchmark. +//! +//! Run directly (`cargo run -p vortex-jni --example gen_bench_data -- [PATH]`) or via the Gradle +//! `generateBenchFile` task. With no argument it writes to [`canonical::default_path`]. Generation +//! is idempotent: an existing non-empty file is left untouched so both benches read the same bytes. + +#[path = "../benches/canonical/mod.rs"] +mod canonical; + +use std::path::PathBuf; +use std::process::ExitCode; + +fn main() -> ExitCode { + let path: PathBuf = std::env::args() + .nth(1) + .map(PathBuf::from) + .unwrap_or_else(canonical::default_path); + + let existed = path.metadata().map(|m| m.len() > 0).unwrap_or(false); + match canonical::ensure_canonical(&path) { + Ok(()) => { + if existed { + println!("canonical bench file already present: {}", path.display()); + } else { + println!( + "wrote canonical bench file ({} rows): {}", + canonical::ROWS, + path.display() + ); + } + ExitCode::SUCCESS + } + Err(e) => { + eprintln!("failed to generate canonical bench file: {e}"); + ExitCode::FAILURE + } + } +}